Skip to content

service

service

Synchronizer service for BigBrotr.

Collects Nostr events from validated relays and stores them in the database. Uses asyncio.TaskGroup with per-network semaphores for structured, bounded concurrency.

The synchronization workflow proceeds as follows:

  1. Fetch relays from the database via get_all_relays (optionally filtered by metadata age).
  2. Load per-relay sync cursors from service_state via get_all_service_cursors.
  3. Connect to each relay and fetch events since the last sync timestamp.
  4. Validate event signatures and timestamps before insertion.
  5. Update per-relay cursors for the next cycle.
Note

Cursor-based pagination ensures each relay is synced incrementally. The cursor (last_synced_at) is stored as a ServiceState record with state_type='cursor'. Cursor updates are batched (flushed every cursor_flush_interval relays) for crash resilience.

Relay processing order is randomized (shuffled) to avoid thundering-herd effects when multiple synchronizer instances run concurrently.

See Also

SynchronizerConfig: Configuration model for networks, filters, time ranges, concurrency, and relay overrides. BaseService: Abstract base class providing run(), run_forever(), and from_yaml(). Brotr: Database facade used for event insertion and cursor management. Monitor: Upstream service that health-checks the relays synced here. Finder: Downstream consumer that discovers relay URLs from the events collected here. create_client: Factory for the nostr-sdk client used for WebSocket connections.

Examples:

from bigbrotr.core import Brotr
from bigbrotr.services import Synchronizer

brotr = Brotr.from_yaml("config/brotr.yaml")
sync = Synchronizer.from_yaml("config/services/synchronizer.yaml", brotr=brotr)

async with brotr:
    async with sync:
        await sync.run_forever()

Classes

Synchronizer

Synchronizer(
    brotr: Brotr, config: SynchronizerConfig | None = None
)

Bases: NetworkSemaphoresMixin, BaseService[SynchronizerConfig]

Event synchronization service.

Collects Nostr events from validated relays and stores them in the database. Uses asyncio.TaskGroup with per-network semaphores for structured, bounded concurrency.

Each cycle fetches relays from the database, loads per-relay sync cursors from service_state, connects to each relay to fetch events since the last sync, validates signatures and timestamps, batch-inserts events, and updates per-relay cursors for the next cycle.

Note

The relay list is shuffled before processing to prevent all synchronizer instances from hitting the same relays in the same order, reducing thundering-herd effects. Relay overrides can customize per-relay timeouts for high-traffic relays.

See Also

SynchronizerConfig: Configuration model for this service. Monitor: Upstream service that health-checks relays before they are synced. Finder: Downstream consumer that discovers relay URLs from the events collected here. get_all_service_cursors: Pre-fetches all per-relay cursor values.

Source code in src/bigbrotr/services/synchronizer/service.py
def __init__(
    self,
    brotr: Brotr,
    config: SynchronizerConfig | None = None,
) -> None:
    config = config or SynchronizerConfig()
    super().__init__(brotr=brotr, config=config, networks=config.networks)
    self._config: SynchronizerConfig
    self._counters = SyncCycleCounters()
    self._keys: Keys = self._config.keys.keys  # For NIP-42 authentication
Functions
run async
run() -> None

Execute one complete synchronization cycle across all relays.

Orchestrates counter reset, synchronization, and cycle-level logging. Delegates the core work to synchronize.

Source code in src/bigbrotr/services/synchronizer/service.py
async def run(self) -> None:
    """Execute one complete synchronization cycle across all relays.

    Orchestrates counter reset, synchronization, and cycle-level logging.
    Delegates the core work to ``synchronize``.
    """
    self._logger.info(
        "cycle_started",
        from_database=self._config.source.from_database,
        overrides=len(self._config.overrides),
    )

    cycle_start = time.monotonic()
    self._counters.reset()

    relay_count = await self.synchronize()

    self.set_gauge("total", relay_count)
    self.set_gauge("synced_relays", self._counters.synced_relays)
    self.set_gauge("failed_relays", self._counters.failed_relays)
    self.set_gauge("synced_events", self._counters.synced_events)
    self.set_gauge("invalid_events", self._counters.invalid_events)

    self._logger.info(
        "cycle_completed",
        synced_relays=self._counters.synced_relays,
        failed_relays=self._counters.failed_relays,
        synced_events=self._counters.synced_events,
        invalid_events=self._counters.invalid_events,
        duration_s=round(time.monotonic() - cycle_start, 2),
    )
synchronize async
synchronize() -> int

Fetch relays, merge overrides, and sync events from all of them.

High-level entry point that fetches relays from the database, merges configured relay overrides, shuffles the list to avoid thundering-herd effects, and syncs all relays concurrently.

This is the method run() delegates to. It can also be called standalone without cycle-level logging or counter reset.

Returns:

  • int

    Number of relays that were processed.

Source code in src/bigbrotr/services/synchronizer/service.py
async def synchronize(self) -> int:
    """Fetch relays, merge overrides, and sync events from all of them.

    High-level entry point that fetches relays from the database,
    merges configured relay overrides, shuffles the list to avoid
    thundering-herd effects, and syncs all relays concurrently.

    This is the method ``run()`` delegates to. It can also be called
    standalone without cycle-level logging or counter reset.

    Returns:
        Number of relays that were processed.
    """
    try:
        removed = await delete_orphan_cursors(self._brotr, self.SERVICE_NAME)
        if removed:
            self._logger.info("orphan_cursors_removed", count=removed)
    except (asyncpg.PostgresError, OSError) as e:
        self._logger.warning(
            "orphan_cursor_cleanup_failed", error=str(e), error_type=type(e).__name__
        )

    relays = await self.fetch_relays()
    relays = self._merge_overrides(relays)

    if not relays:
        self._logger.info("no_relays_to_sync")
        return 0

    self._logger.info("sync_started", relay_count=len(relays))
    random.shuffle(relays)
    await self._sync_all_relays(relays)
    return len(relays)
fetch_relays async
fetch_relays() -> list[Relay]

Fetch validated relays from the database for synchronization.

Filters relays to only include enabled networks, avoiding unnecessary relay loading for disabled network types.

Controlled by source.from_database in SynchronizerConfig.

Returns:

  • list[Relay]

    List of relays to sync (filtered by enabled networks).

See Also

get_all_relays: The SQL query executed by this method.

Source code in src/bigbrotr/services/synchronizer/service.py
async def fetch_relays(self) -> list[Relay]:
    """Fetch validated relays from the database for synchronization.

    Filters relays to only include enabled networks, avoiding unnecessary
    relay loading for disabled network types.

    Controlled by ``source.from_database`` in
    [SynchronizerConfig][bigbrotr.services.synchronizer.SynchronizerConfig].

    Returns:
        List of relays to sync (filtered by enabled networks).

    See Also:
        [get_all_relays][bigbrotr.services.common.queries.get_all_relays]:
            The SQL query executed by this method.
    """
    if not self._config.source.from_database:
        return []

    rows = await get_all_relays(self._brotr)

    enabled = set(self._config.networks.get_enabled_networks())
    relays: list[Relay] = []
    for row in rows:
        url_str = row["url"].strip()
        try:
            relay = Relay(url_str, discovered_at=row["discovered_at"])
            if relay.network.value in enabled:
                relays.append(relay)
        except (ValueError, TypeError) as e:
            self._logger.debug("invalid_relay_url", url=url_str, error=str(e))

    self._logger.debug("relays_fetched", count=len(relays))
    return relays
fetch_cursors async
fetch_cursors() -> dict[str, int]

Batch-fetch all relay sync cursors in a single query.

Controlled by time_range.use_relay_state in SynchronizerConfig.

Returns:

  • dict[str, int]

    Dict mapping relay URL to last_synced_at timestamp.

See Also

get_all_service_cursors: The SQL query executed by this method.

Source code in src/bigbrotr/services/synchronizer/service.py
async def fetch_cursors(self) -> dict[str, int]:
    """Batch-fetch all relay sync cursors in a single query.

    Controlled by ``time_range.use_relay_state`` in
    [SynchronizerConfig][bigbrotr.services.synchronizer.SynchronizerConfig].

    Returns:
        Dict mapping relay URL to ``last_synced_at`` timestamp.

    See Also:
        [get_all_service_cursors][bigbrotr.services.common.queries.get_all_service_cursors]:
            The SQL query executed by this method.
    """
    if not self._config.time_range.use_relay_state:
        return {}

    return await get_all_service_cursors(self._brotr, self.SERVICE_NAME, "last_synced_at")

Functions