service
service
¶
Synchronizer service for BigBrotr.
Collects Nostr events from validated relays and stores them in the database.
Uses asyncio.TaskGroup with per-network semaphores for structured, bounded concurrency.
The synchronization workflow proceeds as follows:
- Fetch relays from the database via get_all_relays (optionally filtered by metadata age).
- Load per-relay sync cursors from
service_statevia get_all_service_cursors. - Connect to each relay and fetch events since the last sync timestamp.
- Validate event signatures and timestamps before insertion.
- Update per-relay cursors for the next cycle.
Note
Cursor-based pagination ensures each relay is synced incrementally.
The cursor (last_synced_at) is stored as a
ServiceState record
with state_type='cursor'. Cursor updates are batched (flushed
every cursor_flush_interval relays) for crash resilience.
Relay processing order is randomized (shuffled) to avoid thundering-herd effects when multiple synchronizer instances run concurrently.
See Also
SynchronizerConfig:
Configuration model for networks, filters, time ranges,
concurrency, and relay overrides.
BaseService: Abstract base
class providing run(), run_forever(), and from_yaml().
Brotr: Database facade used for event
insertion and cursor management.
Monitor: Upstream service that
health-checks the relays synced here.
Finder: Downstream consumer that
discovers relay URLs from the events collected here.
create_client: Factory for
the nostr-sdk client used for WebSocket connections.
Examples:
from bigbrotr.core import Brotr
from bigbrotr.services import Synchronizer
brotr = Brotr.from_yaml("config/brotr.yaml")
sync = Synchronizer.from_yaml("config/services/synchronizer.yaml", brotr=brotr)
async with brotr:
async with sync:
await sync.run_forever()
Classes¶
Synchronizer
¶
Synchronizer(
brotr: Brotr, config: SynchronizerConfig | None = None
)
Bases: NetworkSemaphoresMixin, BaseService[SynchronizerConfig]
Event synchronization service.
Collects Nostr events from validated relays and stores them in the
database. Uses asyncio.TaskGroup with per-network semaphores for
structured, bounded concurrency.
Each cycle fetches relays from the database, loads per-relay sync
cursors from service_state, connects to each relay to fetch events
since the last sync, validates signatures and timestamps, batch-inserts
events, and updates per-relay cursors for the next cycle.
Note
The relay list is shuffled before processing to prevent all synchronizer instances from hitting the same relays in the same order, reducing thundering-herd effects. Relay overrides can customize per-relay timeouts for high-traffic relays.
See Also
SynchronizerConfig: Configuration model for this service. Monitor: Upstream service that health-checks relays before they are synced. Finder: Downstream consumer that discovers relay URLs from the events collected here. get_all_service_cursors: Pre-fetches all per-relay cursor values.
Source code in src/bigbrotr/services/synchronizer/service.py
Functions¶
run
async
¶
Execute one complete synchronization cycle across all relays.
Orchestrates counter reset, synchronization, and cycle-level logging.
Delegates the core work to synchronize.
Source code in src/bigbrotr/services/synchronizer/service.py
synchronize
async
¶
Fetch relays, merge overrides, and sync events from all of them.
High-level entry point that fetches relays from the database, merges configured relay overrides, shuffles the list to avoid thundering-herd effects, and syncs all relays concurrently.
This is the method run() delegates to. It can also be called
standalone without cycle-level logging or counter reset.
Returns:
-
int–Number of relays that were processed.
Source code in src/bigbrotr/services/synchronizer/service.py
fetch_relays
async
¶
fetch_relays() -> list[Relay]
Fetch validated relays from the database for synchronization.
Filters relays to only include enabled networks, avoiding unnecessary relay loading for disabled network types.
Controlled by source.from_database in
SynchronizerConfig.
Returns:
-
list[Relay]–List of relays to sync (filtered by enabled networks).
See Also
get_all_relays: The SQL query executed by this method.
Source code in src/bigbrotr/services/synchronizer/service.py
fetch_cursors
async
¶
Batch-fetch all relay sync cursors in a single query.
Controlled by time_range.use_relay_state in
SynchronizerConfig.
Returns:
-
dict[str, int]–Dict mapping relay URL to
last_synced_attimestamp.
See Also
get_all_service_cursors: The SQL query executed by this method.