Skip to content

queries

queries

Finder-specific database queries.

Classes

Functions

fetch_cursors_to_find async

fetch_cursors_to_find(brotr: Brotr) -> list[FinderCursor]

Fetch all relays with their event-scanning cursor position.

Performs a single LEFT JOIN between the relay table and service_state (filtered on finder / cursor), returning an FinderCursor for every relay. Relays without a stored cursor get default values (timestamp=0, scan from beginning).

Results are ordered by (timestamp, id) ascending so that relays with the least scanning progress are processed first.

Parameters:

Returns:

Source code in src/bigbrotr/services/finder/queries.py
async def fetch_cursors_to_find(brotr: Brotr) -> list[FinderCursor]:
    """Fetch all relays with their event-scanning cursor position.

    Performs a single ``LEFT JOIN`` between the ``relay`` table and
    ``service_state`` (filtered on ``finder`` / ``cursor``), returning an
    [FinderCursor][bigbrotr.services.common.types.FinderCursor] for
    every relay.  Relays without a stored cursor get default values
    (``timestamp=0``, scan from beginning).

    Results are ordered by ``(timestamp, id)`` ascending so that relays
    with the least scanning progress are processed first.

    Args:
        brotr: The [Brotr][bigbrotr.core.brotr.Brotr] database facade.

    Returns:
        List of
        [FinderCursor][bigbrotr.services.common.types.FinderCursor],
        one per relay in the database, ordered by ``(timestamp, id)``.
    """
    rows = await brotr.fetch(
        """
        WITH cursors AS (
            SELECT state_key,
                   state_value,
                   (state_value->>'timestamp')::bigint AS ts,
                   state_value->>'id' AS cursor_id
            FROM service_state
            WHERE service_name = $1
              AND state_type = $2
        )
        SELECT r.url, c.state_value
        FROM relay r
        LEFT JOIN cursors c ON c.state_key = r.url
        ORDER BY COALESCE(c.ts, 0) ASC,
                 COALESCE(c.cursor_id, repeat('0', 64)) ASC
        """,
        ServiceName.FINDER,
        ServiceStateType.CURSOR,
    )
    results: list[FinderCursor] = []
    for row in rows:
        sv = row["state_value"]
        if sv:
            results.append(
                FinderCursor(key=row["url"], timestamp=int(sv["timestamp"]), id=str(sv["id"]))
            )
        else:
            results.append(FinderCursor(key=row["url"]))
    return results

scan_event_relay async

scan_event_relay(
    brotr: Brotr, cursor: FinderCursor, limit: int
) -> list[dict[str, Any]]

Scan event-relay rows for a specific relay, cursor-paginated.

Uses a composite cursor (timestamp, id) for deterministic pagination that handles ties in seen_at. When the cursor has timestamp=0 (default), scanning starts from the beginning.

Parameters:

Returns:

  • list[dict[str, Any]]

    List of dicts with all event columns plus seen_at from the

  • list[dict[str, Any]]

    event_relay junction.

Source code in src/bigbrotr/services/finder/queries.py
async def scan_event_relay(
    brotr: Brotr,
    cursor: FinderCursor,
    limit: int,
) -> list[dict[str, Any]]:
    """Scan event-relay rows for a specific relay, cursor-paginated.

    Uses a composite cursor ``(timestamp, id)`` for deterministic
    pagination that handles ties in ``seen_at``. When the cursor has
    ``timestamp=0`` (default), scanning starts from the beginning.

    Args:
        brotr: The [Brotr][bigbrotr.core.brotr.Brotr] database facade.
        cursor: [FinderCursor][bigbrotr.services.common.types.FinderCursor]
            with relay URL and pagination position.
        limit: Maximum rows per batch.

    Returns:
        List of dicts with all event columns plus ``seen_at`` from the
        ``event_relay`` junction.
    """
    rows = await brotr.fetch(
        """
        SELECT e.id AS event_id, e.pubkey, e.created_at, e.kind,
               e.tags, e.tagvalues, e.content, e.sig, er.seen_at
        FROM event e
        INNER JOIN event_relay er ON e.id = er.event_id
        WHERE er.relay_url = $1
          AND (er.seen_at, e.id) > ($2::bigint, decode($3, 'hex'))
        ORDER BY er.seen_at ASC, e.id ASC
        LIMIT $4
        """,
        cursor.key,
        cursor.timestamp,
        cursor.id,
        limit,
    )
    return [dict(row) for row in rows]

fetch_api_checkpoints async

fetch_api_checkpoints(
    brotr: Brotr, urls: list[str]
) -> list[ApiCheckpoint]

Fetch per-source API checkpoints, returning one per URL.

Returns an ApiCheckpoint for every URL in urls. URLs with a stored CHECKPOINT record get their persisted timestamp; URLs without a record get a default checkpoint with timestamp=0 (immediately eligible for refresh).

Records that cannot be parsed (missing or non-integer timestamp) are treated as missing and receive the default.

Parameters:

  • brotr (Brotr) –

    The Brotr database facade.

  • urls (list[str]) –

    Source URLs to fetch checkpoints for.

Returns:

Source code in src/bigbrotr/services/finder/queries.py
async def fetch_api_checkpoints(brotr: Brotr, urls: list[str]) -> list[ApiCheckpoint]:
    """Fetch per-source API checkpoints, returning one per URL.

    Returns an [ApiCheckpoint][bigbrotr.services.common.types.ApiCheckpoint]
    for every URL in *urls*.  URLs with a stored CHECKPOINT record get
    their persisted ``timestamp``; URLs without a record get a default
    checkpoint with ``timestamp=0`` (immediately eligible for refresh).

    Records that cannot be parsed (missing or non-integer ``timestamp``)
    are treated as missing and receive the default.

    Args:
        brotr: The [Brotr][bigbrotr.core.brotr.Brotr] database facade.
        urls: Source URLs to fetch checkpoints for.

    Returns:
        List of [ApiCheckpoint][bigbrotr.services.common.types.ApiCheckpoint],
        one per input URL, in the same order.
    """
    if not urls:
        return []
    rows = await brotr.fetch(
        """
        SELECT state_key, state_value
        FROM service_state
        WHERE service_name = $1
          AND state_type = $2
          AND state_key = ANY($3::text[])
        """,
        ServiceName.FINDER,
        ServiceStateType.CHECKPOINT,
        urls,
    )
    stored: dict[str, ApiCheckpoint] = {}
    for r in rows:
        try:
            stored[r["state_key"]] = ApiCheckpoint(
                key=r["state_key"], timestamp=int(r["state_value"]["timestamp"])
            )
        except (KeyError, ValueError, TypeError):
            continue
    return [stored.get(url, ApiCheckpoint(key=url)) for url in urls]

upsert_api_checkpoints async

upsert_api_checkpoints(
    brotr: Brotr, checkpoints: list[ApiCheckpoint]
) -> None

Persist per-source API timestamps as CHECKPOINT records.

Parameters:

Source code in src/bigbrotr/services/finder/queries.py
async def upsert_api_checkpoints(brotr: Brotr, checkpoints: list[ApiCheckpoint]) -> None:
    """Persist per-source API timestamps as CHECKPOINT records.

    Args:
        brotr: The [Brotr][bigbrotr.core.brotr.Brotr] database facade.
        checkpoints: [ApiCheckpoint][bigbrotr.services.common.types.ApiCheckpoint]
            instances to persist.
    """
    records = [
        ServiceState(
            service_name=ServiceName.FINDER,
            state_type=ServiceStateType.CHECKPOINT,
            state_key=cp.key,
            state_value={"timestamp": cp.timestamp},
        )
        for cp in checkpoints
    ]
    await upsert_service_states(brotr, records)

upsert_finder_cursors async

upsert_finder_cursors(
    brotr: Brotr, cursors: Iterable[FinderCursor]
) -> None

Persist multiple scan cursor positions in a single batch upsert.

Cursors at default position (timestamp=0) are silently skipped.

Parameters:

Source code in src/bigbrotr/services/finder/queries.py
async def upsert_finder_cursors(brotr: Brotr, cursors: Iterable[FinderCursor]) -> None:
    """Persist multiple scan cursor positions in a single batch upsert.

    Cursors at default position (timestamp=0) are silently skipped.

    Args:
        brotr: The [Brotr][bigbrotr.core.brotr.Brotr] database facade.
        cursors: Iterable of [FinderCursor][bigbrotr.services.common.types.FinderCursor]
            instances to persist.
    """
    states = [
        ServiceState(
            service_name=ServiceName.FINDER,
            state_type=ServiceStateType.CURSOR,
            state_key=cursor.key,
            state_value={
                "timestamp": cursor.timestamp,
                "id": cursor.id,
            },
        )
        for cursor in cursors
        if cursor.timestamp > 0
    ]
    await upsert_service_states(brotr, states)

delete_stale_cursors async

delete_stale_cursors(brotr: Brotr) -> int

Delete CURSOR records whose relay no longer exists in the relay table.

Parameters:

Returns:

  • int

    Number of stale cursor records deleted.

Source code in src/bigbrotr/services/finder/queries.py
async def delete_stale_cursors(brotr: Brotr) -> int:
    """Delete CURSOR records whose relay no longer exists in the relay table.

    Args:
        brotr: The [Brotr][bigbrotr.core.brotr.Brotr] database facade.

    Returns:
        Number of stale cursor records deleted.
    """
    count: int = await brotr.fetchval(
        """
        WITH deleted AS (
            DELETE FROM service_state
            WHERE service_name = $1
              AND state_type = $2
              AND NOT EXISTS (SELECT 1 FROM relay r WHERE r.url = state_key)
            RETURNING 1
        )
        SELECT count(*)::int FROM deleted
        """,
        ServiceName.FINDER,
        ServiceStateType.CURSOR,
    )
    return count

delete_stale_api_checkpoints async

delete_stale_api_checkpoints(
    brotr: Brotr, active_urls: list[str]
) -> int

Delete CHECKPOINT records for API sources no longer in the config.

Parameters:

  • brotr (Brotr) –

    The Brotr database facade.

  • active_urls (list[str]) –

    Currently configured source URLs to keep.

Returns:

  • int

    Number of stale checkpoint records deleted.

Source code in src/bigbrotr/services/finder/queries.py
async def delete_stale_api_checkpoints(brotr: Brotr, active_urls: list[str]) -> int:
    """Delete CHECKPOINT records for API sources no longer in the config.

    Args:
        brotr: The [Brotr][bigbrotr.core.brotr.Brotr] database facade.
        active_urls: Currently configured source URLs to keep.

    Returns:
        Number of stale checkpoint records deleted.
    """
    count: int = await brotr.fetchval(
        """
        WITH deleted AS (
            DELETE FROM service_state
            WHERE service_name = $1
              AND state_type = $2
              AND NOT (state_key = ANY($3::text[]))
            RETURNING 1
        )
        SELECT count(*)::int FROM deleted
        """,
        ServiceName.FINDER,
        ServiceStateType.CHECKPOINT,
        active_urls,
    )
    return count