utils
utils
¶
Synchronizer service utility functions.
Module-level sync logic, event batch management, and context types.
Classes¶
EventBatch
¶
Bounded container for Nostr events within a time interval.
Collects events whose created_at falls within [since, until]
and tracks min/max timestamps. Raises OverflowError if the batch
limit is exceeded.
Attributes:
-
since–Inclusive lower bound timestamp.
-
until–Inclusive upper bound timestamp.
-
limit–Maximum number of events allowed.
-
size–Current event count.
-
events(list[Event]) –Collected
NostrEventobjects. -
min_created_at(int | None) –Earliest
created_atin the batch (orNoneif empty). -
max_created_at(int | None) –Latest
created_atin the batch (orNoneif empty).
See Also
_insert_batch:
Validates and persists batch contents to the database.
Source code in src/bigbrotr/services/synchronizer/utils.py
Functions¶
append
¶
Add an event if its timestamp is within [since, until].
Parameters:
-
event(Event) –NostrEvent to add.
Raises:
-
OverflowError–If the batch has reached its size limit.
Source code in src/bigbrotr/services/synchronizer/utils.py
is_full
¶
SyncCycleCounters
dataclass
¶
SyncCycleCounters(
synced_events: int = 0,
synced_relays: int = 0,
failed_relays: int = 0,
invalid_events: int = 0,
lock: Lock = Lock(),
)
Per-cycle synchronization counters.
Groups relay/event outcome counts and the lock that guards
concurrent updates from TaskGroup workers.
See Also
Synchronizer: Service that owns an instance of this dataclass.
SyncBatchState
dataclass
¶
SyncBatchState(
cursor_updates: list[ServiceState],
cursor_lock: Lock,
cursor_flush_interval: int,
)
Shared mutable cursor state across sync workers within a single cycle.
Groups the lock and cursor update buffer used by
Synchronizer
workers running concurrently under a TaskGroup.
Note
Not frozen because cursor_updates is mutated under
cursor_lock during concurrent processing.
SyncContext
dataclass
¶
SyncContext(
filter_config: FilterConfig,
network_config: NetworksConfig,
request_timeout: float,
brotr: Brotr,
keys: Keys,
)
Immutable context shared across all relay sync operations in a cycle.
See Also
sync_relay_events:
The function that consumes this context.
Functions¶
create_filter
¶
create_filter(
since: int, until: int, config: FilterConfig
) -> Filter
Build a nostr-sdk Filter from the given time range and filter configuration.
Supports standard fields (ids, kinds, authors) and tag
filters specified as {tag_letter: [values]} (e.g.,
{"e": ["event_id"], "t": ["hashtag"]}).
See Also
FilterConfig: The configuration model consumed by this function.
Source code in src/bigbrotr/services/synchronizer/utils.py
insert_batch
async
¶
insert_batch(
batch: EventBatch,
relay: Relay,
brotr: Brotr,
since: int,
until: int,
) -> tuple[int, int]
Validate and insert a batch of events into the database.
Each event is verified for signature validity and timestamp range before insertion. Invalid events are counted but not inserted.
Parameters:
-
batch(EventBatch) –EventBatch containing nostr-sdk Events.
-
relay(Relay) –Source Relay for attribution.
-
brotr(Brotr) –Brotr database interface.
-
since(int) –Lower timestamp bound (events must be >= this).
-
until(int) –Upper timestamp bound (events must be <= this).
Returns:
-
tuple[int, int]–Tuple of (events_inserted, events_invalid).
Note
Events are inserted via the event_relay_insert_cascade stored
procedure, which atomically inserts the event, relay, and
junction record. The batch is split into sub-batches of
brotr.config.batch.max_size for insertion.
Source code in src/bigbrotr/services/synchronizer/utils.py
sync_relay_events
async
¶
sync_relay_events(
relay: Relay,
start_time: int,
end_time: int,
ctx: SyncContext,
) -> tuple[int, int]
Core sync algorithm: connect to a relay, fetch events, and insert into the database.
Uses create_client to establish a WebSocket connection (with optional SOCKS5 proxy for overlay networks), fetches events matching the configured filter, and batch-inserts valid events.
Parameters:
-
relay(Relay) –Relay to sync from.
-
start_time(int) –Inclusive start timestamp (since).
-
end_time(int) –Inclusive end timestamp (until).
-
ctx(SyncContext) –SyncContext with filter, network, timeout, database, and key settings.
Returns:
-
tuple[int, int]–Tuple of (events_synced, invalid_events).