service
service
¶
Synchronizer service for BigBrotr.
Collects Nostr events from validated relays and stores them in the database.
Uses asyncio.TaskGroup with per-network semaphores for structured, bounded concurrency.
The synchronization workflow proceeds as follows:
- Fetch sync cursors for all relays via fetch_cursors_to_sync, ordered by sync progress ascending (most behind first).
- Connect to each relay and stream events since the last sync timestamp.
- Insert pre-validated events (filtering, signature verification, and
deduplication are handled at the fetch layer by
_fetch_validated) using a global buffer flushed atprocessing.batch_size. - Update per-relay cursors in batch after each buffer flush, derived from the last event seen per relay in that batch.
Note
Workers yield (Event, Relay) pairs. The parent accumulates them into
a global buffer and flushes to the database when the buffer reaches
processing.batch_size. At each flush, per-relay cursors are
computed from the last event per relay in the buffer and persisted
alongside the events. This bounds memory regardless of concurrent relay
count and minimises DB round-trips.
Cursor-based pagination ensures each relay is synced incrementally. Cursors are persisted in batch after every buffer flush and after the post-loop flush, so a crash never loses more than one batch of progress.
Relays are processed in sync-progress order (most behind first) so that the most stale relays receive priority.
See Also
SynchronizerConfig:
Configuration model for networks, filters, time ranges,
and concurrency.
BaseService: Abstract base
class providing run(), run_forever(), and from_yaml().
Brotr: Database facade used for event
insertion and cursor management.
Monitor: Upstream service that
health-checks the relays synced here.
Finder: Downstream consumer that
discovers relay URLs from the events collected here.
connect_relay: High-level
relay connection helper with automatic SSL fallback.
Examples:
from bigbrotr.core import Brotr
from bigbrotr.services import Synchronizer
brotr = Brotr.from_yaml("config/brotr.yaml")
sync = Synchronizer.from_yaml("config/services/synchronizer.yaml", brotr=brotr)
async with brotr:
async with sync:
await sync.run_forever()
Classes¶
Synchronizer
¶
Synchronizer(
brotr: Brotr, config: SynchronizerConfig | None = None
)
Bases: ConcurrentStreamMixin, NetworkSemaphoresMixin, BaseService[SynchronizerConfig]
Event synchronization service.
Collects Nostr events from validated relays and stores them in the
database. Uses asyncio.TaskGroup with per-network semaphores for
structured, bounded concurrency.
Each cycle fetches sync cursors for all relays in a single query
(LEFT JOIN), ordered by sync progress ascending so the most stale
relays are processed first. Workers stream (Event, Relay) pairs
and the parent batch-inserts events using a global buffer flushed at
processing.batch_size. Per-relay cursors are derived from
the last event seen per relay in each batch and persisted at flush
time for crash resilience.
See Also
SynchronizerConfig: Configuration model for this service. Monitor: Upstream service that health-checks relays before they are synced. Finder: Downstream consumer that discovers relay URLs from the events collected here.
Source code in src/bigbrotr/services/synchronizer/service.py
Functions¶
run
async
¶
cleanup
async
¶
synchronize
async
¶
Fetch cursors and sync events from all relays concurrently.
Fetches sync cursors for enabled networks in a single query, ordered
by sync progress ascending (most behind first). Workers yield
(Event, Relay) pairs. The parent accumulates them into a global
buffer and flushes to the database at processing.batch_size.
Per-relay cursors are derived from the last event seen and persisted
at each flush.
Returns:
-
int–Total events synced across all relays.