Skip to content

streaming

streaming

Data-driven event streaming algorithm with binary-split fallback.

Provides stream_events — the core windowing algorithm that streams Nostr events in ascending (created_at, id) order, ensuring completeness even when a relay truncates responses.

_fetch_validated is the single source of truth for event validation. stream_events orchestrates windowing, sorting, and domain conversion.

This module lives in bigbrotr.utils (not services) because it has no service-layer dependencies — only nostr_sdk and bigbrotr.models.

Classes

Functions

stream_events async

stream_events(
    client: Client,
    filters: list[Filter],
    start_time: int,
    end_time: int,
    limit: int,
    request_timeout: float,
    idle_timeout: float,
    max_event_size: int | None = None,
) -> AsyncIterator[Event]

Stream all events matching filters in [start_time, end_time], yielded as domain Event objects in ascending (created_at, id) order.

Uses a data-driven windowing algorithm for completeness: when a fetch returns limit events (possible truncation), a verification re-fetch at min_created_at determines whether all events have been captured. Falls back to binary-split windowing on inconsistent relay responses.

Validation (filter matching, signature verification, deduplication) is handled by _fetch_validated. This function orchestrates windowing, then sorts and converts raw events to domain Event models at each yield boundary via _to_domain_events.

The idle_timeout is progress-based: the timer resets every time an event is yielded. A relay that produces events slowly but steadily will never be killed by it. A relay that connects but stops producing events is abandoned after idle_timeout seconds.

Parameters:

  • client (Client) –

    Connected nostr-sdk Client with the target relay added.

  • filters (list[Filter]) –

    Pre-validated base Filter objects without since/until/limit (use SynchronizerConfig.filters).

  • start_time (int) –

    Inclusive lower timestamp bound (since).

  • end_time (int) –

    Inclusive upper timestamp bound (until).

  • limit (int) –

    Max events per relay request (REQ limit).

  • request_timeout (float) –

    Seconds to wait for each stream_events call.

  • idle_timeout (float) –

    Seconds without yielding an event before the stream is abandoned. The timer starts when the function enters its main loop and resets on every yield.

Yields:

  • AsyncIterator[Event]

    Domain Event objects in ascending (created_at, id) order.

Source code in src/bigbrotr/utils/streaming.py
async def stream_events(  # noqa: PLR0913
    client: Client,
    filters: list[Filter],
    start_time: int,
    end_time: int,
    limit: int,
    request_timeout: float,
    idle_timeout: float,
    max_event_size: int | None = None,
) -> AsyncIterator[Event]:
    """Stream all events matching ``filters`` in ``[start_time, end_time]``,
    yielded as domain ``Event`` objects in ascending ``(created_at, id)`` order.

    Uses a data-driven windowing algorithm for completeness: when a fetch
    returns ``limit`` events (possible truncation), a verification re-fetch
    at ``min_created_at`` determines whether all events have been captured.
    Falls back to binary-split windowing on inconsistent relay responses.

    Validation (filter matching, signature verification, deduplication) is
    handled by ``_fetch_validated``. This function orchestrates windowing, then
    sorts and converts raw events to domain ``Event`` models at each yield
    boundary via ``_to_domain_events``.

    The ``idle_timeout`` is progress-based: the timer resets every time an
    event is yielded.  A relay that produces events slowly but steadily will
    never be killed by it.  A relay that connects but stops producing events
    is abandoned after ``idle_timeout`` seconds.

    Args:
        client: Connected nostr-sdk ``Client`` with the target relay added.
        filters: Pre-validated base ``Filter`` objects **without**
            ``since``/``until``/``limit`` (use
            ``SynchronizerConfig.filters``).
        start_time: Inclusive lower timestamp bound (since).
        end_time: Inclusive upper timestamp bound (until).
        limit: Max events per relay request (REQ limit).
        request_timeout: Seconds to wait for each ``stream_events`` call.
        idle_timeout: Seconds without yielding an event before the stream
            is abandoned.  The timer starts when the function enters its
            main loop and resets on every yield.

    Yields:
        Domain ``Event`` objects in ascending ``(created_at, id)`` order.
    """
    if start_time > end_time:
        return

    ctx = _FetchContext(
        client=client,
        filters=filters,
        limit=limit,
        fetch_timeout=timedelta(seconds=request_timeout),
        max_event_size=max_event_size,
    )
    until_stack = [end_time]
    current_since = start_time
    last_yield = time.monotonic()

    while until_stack:
        if time.monotonic() - last_yield > idle_timeout:
            logger.debug("idle_timeout_exceeded elapsed=%.1f", time.monotonic() - last_yield)
            return

        current_until = until_stack[0]

        events = await _fetch_validated(ctx, current_since, current_until, limit)

        if not events:
            until_stack.pop(0)
            current_since = current_until + 1
            continue

        # Single-second window: cannot split further, yield everything.
        if current_since == current_until:
            for evt in _to_domain_events(events, ctx.max_event_size):
                yield evt
                last_yield = time.monotonic()
            until_stack.pop(0)
            current_since = current_until + 1
            continue

        # Multi-second window: always verify completeness. A relay may
        # enforce its own limit lower than ours, returning fewer events
        # even when more exist in the window.
        verified = await _try_verify_completeness(ctx, events, current_since)

        if verified is not None:
            for evt in _to_domain_events(verified, ctx.max_event_size):
                yield evt
                last_yield = time.monotonic()
            until_stack.pop(0)
            current_since = current_until + 1
        else:
            mid = current_since + (current_until - current_since) // 2
            until_stack.insert(0, mid)