Skip to content

queries

queries

Domain-specific database queries for BigBrotr services.

All SQL queries used by services are centralized here. Each function accepts a Brotr instance and returns typed results. Services import from this module instead of writing inline SQL.

The 15 query functions are grouped into five categories:

  • Relay queries: get_all_relay_urls, get_all_relays, filter_new_relay_urls, insert_relays
  • Check / monitoring queries: count_relays_due_for_check, fetch_relays_due_for_check
  • Event queries: fetch_event_tagvalues
  • Candidate lifecycle: insert_candidates, count_candidates, fetch_candidate_chunk, delete_stale_candidates, delete_exhausted_candidates, promote_candidates
  • Cursor queries: get_all_service_cursors, delete_orphan_cursors
Warning

All queries use the timeout from TimeoutsConfig (timeouts.query for reads, timeouts.batch for writes). The PostgreSQL statement_timeout acts as a server-side safety net.

See Also

Brotr: Database facade that provides fetch(), fetchrow(), fetchval(), execute(), and transaction() methods used by every query function. ServiceState: Dataclass used for candidate and cursor records in service_state.

Classes

Functions

get_all_relay_urls async

get_all_relay_urls(brotr: Brotr) -> list[str]

Fetch all relay URLs from the database, ordered alphabetically.

Called by Finder at the start of event-scanning relay discovery to determine which relays to scan.

See Also

get_all_relays: Similar query that also returns network and discovered_at.

Source code in src/bigbrotr/services/common/queries.py
async def get_all_relay_urls(brotr: Brotr) -> list[str]:
    """Fetch all relay URLs from the database, ordered alphabetically.

    Called by [Finder][bigbrotr.services.finder.Finder] at the start of
    event-scanning relay discovery to determine which relays to scan.

    See Also:
        [get_all_relays][bigbrotr.services.common.queries.get_all_relays]:
            Similar query that also returns ``network`` and
            ``discovered_at``.
    """
    rows = await brotr.fetch("SELECT url FROM relay ORDER BY url")
    return [row["url"] for row in rows]

get_all_relays async

get_all_relays(brotr: Brotr) -> list[dict[str, Any]]

Fetch all relays with their network and discovery timestamp.

Called by Synchronizer to build the list of relays for event collection.

Returns:

  • list[dict[str, Any]]

    List of dicts with keys: url, network, discovered_at.

See Also

get_all_relay_urls: Lightweight variant returning only URLs.

Source code in src/bigbrotr/services/common/queries.py
async def get_all_relays(brotr: Brotr) -> list[dict[str, Any]]:
    """Fetch all relays with their network and discovery timestamp.

    Called by [Synchronizer][bigbrotr.services.synchronizer.Synchronizer]
    to build the list of relays for event collection.

    Returns:
        List of dicts with keys: ``url``, ``network``, ``discovered_at``.

    See Also:
        [get_all_relay_urls][bigbrotr.services.common.queries.get_all_relay_urls]:
            Lightweight variant returning only URLs.
    """
    rows = await brotr.fetch(
        """
        SELECT url, network, discovered_at
        FROM relay
        ORDER BY discovered_at ASC
        """
    )
    return [dict(row) for row in rows]

filter_new_relay_urls async

filter_new_relay_urls(
    brotr: Brotr, urls: list[str]
) -> list[str]

Filter URLs to those not already in relays or validator candidates.

Called by Seeder to avoid inserting duplicate seed URLs that already exist as relays or as pending validation candidates in service_state.

Parameters:

  • brotr (Brotr) –

    Brotr database interface.

  • urls (list[str]) –

    Candidate URLs to check.

Returns:

  • list[str]

    URLs that are genuinely new (not in relays, not in candidates).

See Also

insert_candidates: Uses this function internally to skip known URLs.

Source code in src/bigbrotr/services/common/queries.py
async def filter_new_relay_urls(
    brotr: Brotr,
    urls: list[str],
) -> list[str]:
    """Filter URLs to those not already in relays or validator candidates.

    Called by [Seeder][bigbrotr.services.seeder.Seeder] to avoid inserting
    duplicate seed URLs that already exist as relays or as pending
    validation candidates in ``service_state``.

    Args:
        brotr: [Brotr][bigbrotr.core.brotr.Brotr] database interface.
        urls: Candidate URLs to check.

    Returns:
        URLs that are genuinely new (not in relays, not in candidates).

    See Also:
        [insert_candidates][bigbrotr.services.common.queries.insert_candidates]:
            Uses this function internally to skip known URLs.
    """
    rows = await brotr.fetch(
        """
        SELECT url FROM unnest($1::text[]) AS url
        WHERE NOT EXISTS (SELECT 1 FROM relay r WHERE r.url = url)
          AND NOT EXISTS (
              SELECT 1 FROM service_state ss
              WHERE ss.service_name = $2 AND ss.state_type = $3
                AND ss.state_key = url
          )
        """,
        urls,
        ServiceName.VALIDATOR,
        ServiceStateType.CANDIDATE,
    )
    return [row["url"] for row in rows]

insert_relays async

insert_relays(brotr: Brotr, relays: list[Relay]) -> int

Bulk-insert relays directly into the relay table.

Respects the configured batch size from BatchConfig, splitting large inputs into multiple insert_relay calls. Duplicates are silently skipped (ON CONFLICT DO NOTHING).

Called by Seeder when to_validate=False to bypass validation.

Parameters:

Returns:

  • int

    Number of relays actually inserted.

See Also

insert_candidates: Alternative that inserts as validation candidates instead.

Source code in src/bigbrotr/services/common/queries.py
async def insert_relays(brotr: Brotr, relays: list[Relay]) -> int:
    """Bulk-insert relays directly into the ``relay`` table.

    Respects the configured batch size from
    [BatchConfig][bigbrotr.core.brotr.BatchConfig], splitting large inputs
    into multiple ``insert_relay`` calls. Duplicates are silently skipped
    (``ON CONFLICT DO NOTHING``).

    Called by [Seeder][bigbrotr.services.seeder.Seeder] when
    ``to_validate=False`` to bypass validation.

    Args:
        brotr: [Brotr][bigbrotr.core.brotr.Brotr] database interface.
        relays: [Relay][bigbrotr.models.relay.Relay] objects to insert.

    Returns:
        Number of relays actually inserted.

    See Also:
        [insert_candidates][bigbrotr.services.common.queries.insert_candidates]:
            Alternative that inserts as validation candidates instead.
    """
    if not relays:
        return 0

    inserted = 0
    batch_size = brotr.config.batch.max_size
    for i in range(0, len(relays), batch_size):
        inserted += await brotr.insert_relay(relays[i : i + batch_size])
    return inserted

count_relays_due_for_check async

count_relays_due_for_check(
    brotr: Brotr,
    service_name: ServiceName,
    threshold: int,
    networks: list[str],
) -> int

Count relays needing health checks.

Called by Monitor at the start of each cycle to populate ChunkProgress.total.

Parameters:

  • brotr (Brotr) –

    Brotr database interface.

  • service_name (ServiceName) –

    Service requesting checks (e.g., ServiceName.MONITOR).

  • threshold (int) –

    Unix timestamp cutoff -- relays last checked before this are considered due.

  • networks (list[str]) –

    Network type strings to include.

Returns:

  • int

    Number of relays due for a check.

See Also

fetch_relays_due_for_check: Companion function that fetches the actual relay rows.

Source code in src/bigbrotr/services/common/queries.py
async def count_relays_due_for_check(
    brotr: Brotr,
    service_name: ServiceName,
    threshold: int,
    networks: list[str],
) -> int:
    """Count relays needing health checks.

    Called by [Monitor][bigbrotr.services.monitor.Monitor] at the start of
    each cycle to populate
    ``ChunkProgress.total``.

    Args:
        brotr: [Brotr][bigbrotr.core.brotr.Brotr] database interface.
        service_name: Service requesting checks (e.g.,
            ``ServiceName.MONITOR``).
        threshold: Unix timestamp cutoff -- relays last checked before this
            are considered due.
        networks: Network type strings to include.

    Returns:
        Number of relays due for a check.

    See Also:
        [fetch_relays_due_for_check][bigbrotr.services.common.queries.fetch_relays_due_for_check]:
            Companion function that fetches the actual relay rows.
    """
    row = await brotr.fetchrow(
        f"SELECT COUNT(*)::int AS count {_RELAYS_DUE_FOR_CHECK_BASE}",
        service_name,
        ServiceStateType.CHECKPOINT,
        networks,
        threshold,
    )
    return row["count"] if row else 0

fetch_relays_due_for_check async

fetch_relays_due_for_check(
    brotr: Brotr,
    service_name: ServiceName,
    threshold: int,
    networks: list[str],
    limit: int,
) -> list[dict[str, Any]]

Fetch relays due for health checks, ordered by least-recently-checked.

Called by Monitor during chunk-based processing to retrieve the next batch of relays needing health checks. Ordering ensures that relays with the oldest (or missing) checkpoint are checked first.

Parameters:

  • brotr (Brotr) –

    Brotr database interface.

  • service_name (ServiceName) –

    Service requesting checks (e.g., ServiceName.MONITOR).

  • threshold (int) –

    Unix timestamp cutoff -- relays last checked before this are considered due.

  • networks (list[str]) –

    Network type strings to include.

  • limit (int) –

    Maximum relays to return.

Returns:

  • list[dict[str, Any]]

    List of dicts with keys: url, network, discovered_at.

See Also

count_relays_due_for_check: Companion count query sharing the same base SQL.

Source code in src/bigbrotr/services/common/queries.py
async def fetch_relays_due_for_check(
    brotr: Brotr,
    service_name: ServiceName,
    threshold: int,
    networks: list[str],
    limit: int,
) -> list[dict[str, Any]]:
    """Fetch relays due for health checks, ordered by least-recently-checked.

    Called by [Monitor][bigbrotr.services.monitor.Monitor] during chunk-based
    processing to retrieve the next batch of relays needing health checks.
    Ordering ensures that relays with the oldest (or missing) checkpoint
    are checked first.

    Args:
        brotr: [Brotr][bigbrotr.core.brotr.Brotr] database interface.
        service_name: Service requesting checks (e.g.,
            ``ServiceName.MONITOR``).
        threshold: Unix timestamp cutoff -- relays last checked before this
            are considered due.
        networks: Network type strings to include.
        limit: Maximum relays to return.

    Returns:
        List of dicts with keys: ``url``, ``network``, ``discovered_at``.

    See Also:
        [count_relays_due_for_check][bigbrotr.services.common.queries.count_relays_due_for_check]:
            Companion count query sharing the same base SQL.
    """
    rows = await brotr.fetch(
        f"""
        SELECT r.url, r.network, r.discovered_at
        {_RELAYS_DUE_FOR_CHECK_BASE}
        ORDER BY
            COALESCE((ss.state_value->>'last_check_at')::BIGINT, 0) ASC,
            r.discovered_at ASC
        LIMIT $5
        """,
        service_name,
        ServiceStateType.CHECKPOINT,
        networks,
        threshold,
        limit,
    )
    return [dict(row) for row in rows]

fetch_event_tagvalues async

fetch_event_tagvalues(
    brotr: Brotr,
    relay_url: str,
    last_seen_at: int,
    limit: int,
) -> list[dict[str, Any]]

Fetch event tagvalues from a specific relay, cursor-paginated.

Retrieves all events seen on the relay after the cursor position. The caller extracts relay URLs by parsing each tagvalue via parse_relay_url.

Called by Finder during per-relay event scanning.

Parameters:

  • brotr (Brotr) –

    Brotr database interface.

  • relay_url (str) –

    Source relay to scan events from.

  • last_seen_at (int) –

    Cursor position -- only events seen after this.

  • limit (int) –

    Maximum events per batch.

Returns:

  • list[dict[str, Any]]

    List of dicts with keys: tagvalues (list[str]),

  • list[dict[str, Any]]

    seen_at (int).

See Also

get_all_service_cursors: Batch-fetches the per-relay cursor values used as last_seen_at.

Source code in src/bigbrotr/services/common/queries.py
async def fetch_event_tagvalues(
    brotr: Brotr,
    relay_url: str,
    last_seen_at: int,
    limit: int,
) -> list[dict[str, Any]]:
    """Fetch event tagvalues from a specific relay, cursor-paginated.

    Retrieves all events seen on the relay after the cursor position.
    The caller extracts relay URLs by parsing each tagvalue via
    ``parse_relay_url``.

    Called by [Finder][bigbrotr.services.finder.Finder] during per-relay
    event scanning.

    Args:
        brotr: [Brotr][bigbrotr.core.brotr.Brotr] database interface.
        relay_url: Source relay to scan events from.
        last_seen_at: Cursor position -- only events seen after this.
        limit: Maximum events per batch.

    Returns:
        List of dicts with keys: ``tagvalues`` (``list[str]``),
        ``seen_at`` (``int``).

    See Also:
        [get_all_service_cursors][bigbrotr.services.common.queries.get_all_service_cursors]:
            Batch-fetches the per-relay cursor values used as
            ``last_seen_at``.
    """
    rows = await brotr.fetch(
        """
        SELECT e.tagvalues, er.seen_at
        FROM event e
        INNER JOIN event_relay er ON e.id = er.event_id
        WHERE er.relay_url = $1
          AND er.seen_at > $2
        ORDER BY er.seen_at ASC
        LIMIT $3
        """,
        relay_url,
        last_seen_at,
        limit,
    )
    return [dict(row) for row in rows]

insert_candidates async

insert_candidates(
    brotr: Brotr, relays: Iterable[Relay]
) -> int

Insert new validation candidates, skipping known relays and duplicates.

Filters out URLs that already exist in the relay table or as pending candidates in service_state, then persists only genuinely new records. Existing candidates retain their current state (e.g. failures counter is never reset).

Called by Seeder and Finder to register newly discovered relay URLs for validation.

Parameters:

  • brotr (Brotr) –

    Brotr database interface.

  • relays (Iterable[Relay]) –

    Relay objects to register as candidates.

Returns:

  • int

    Number of candidate records actually inserted.

See Also

promote_candidates: Moves validated candidates from service_state to the relay table. fetch_candidate_chunk: Retrieves candidates for validation processing.

Source code in src/bigbrotr/services/common/queries.py
async def insert_candidates(brotr: Brotr, relays: Iterable[Relay]) -> int:
    """Insert new validation candidates, skipping known relays and duplicates.

    Filters out URLs that already exist in the ``relay`` table or as
    pending candidates in ``service_state``, then persists only genuinely
    new records. Existing candidates retain their current state (e.g.
    ``failures`` counter is never reset).

    Called by [Seeder][bigbrotr.services.seeder.Seeder] and
    [Finder][bigbrotr.services.finder.Finder] to register newly
    discovered relay URLs for validation.

    Args:
        brotr: [Brotr][bigbrotr.core.brotr.Brotr] database interface.
        relays: [Relay][bigbrotr.models.relay.Relay] objects to register
            as candidates.

    Returns:
        Number of candidate records actually inserted.

    See Also:
        [promote_candidates][bigbrotr.services.common.queries.promote_candidates]:
            Moves validated candidates from ``service_state`` to the
            ``relay`` table.
        [fetch_candidate_chunk][bigbrotr.services.common.queries.fetch_candidate_chunk]:
            Retrieves candidates for validation processing.
    """
    relay_list = list(relays)
    urls = [r.url for r in relay_list]
    if not urls:
        return 0

    new_urls = set(await filter_new_relay_urls(brotr, urls))
    if not new_urls:
        return 0

    now = int(time.time())
    records: list[ServiceState] = [
        ServiceState(
            service_name=ServiceName.VALIDATOR,
            state_type=ServiceStateType.CANDIDATE,
            state_key=relay.url,
            state_value={"failures": 0, "network": relay.network.value, "inserted_at": now},
            updated_at=now,
        )
        for relay in relay_list
        if relay.url in new_urls
    ]
    batch_size = brotr.config.batch.max_size
    for i in range(0, len(records), batch_size):
        await brotr.upsert_service_state(records[i : i + batch_size])
    return len(records)

count_candidates async

count_candidates(brotr: Brotr, networks: list[str]) -> int

Count pending validation candidates for the given networks.

Called by Validator at the start of each cycle to populate ChunkProgress.total.

Parameters:

  • brotr (Brotr) –

    Brotr database interface.

  • networks (list[str]) –

    Network type strings (e.g. ['clearnet', 'tor']).

Returns:

  • int

    Total count of matching candidates.

See Also

fetch_candidate_chunk: Fetches the actual candidate rows for processing.

Source code in src/bigbrotr/services/common/queries.py
async def count_candidates(
    brotr: Brotr,
    networks: list[str],
) -> int:
    """Count pending validation candidates for the given networks.

    Called by [Validator][bigbrotr.services.validator.Validator] at the
    start of each cycle to populate
    ``ChunkProgress.total``.

    Args:
        brotr: [Brotr][bigbrotr.core.brotr.Brotr] database interface.
        networks: Network type strings (e.g. ``['clearnet', 'tor']``).

    Returns:
        Total count of matching candidates.

    See Also:
        [fetch_candidate_chunk][bigbrotr.services.common.queries.fetch_candidate_chunk]:
            Fetches the actual candidate rows for processing.
    """
    row = await brotr.fetchrow(
        """
        SELECT COUNT(*)::int AS count
        FROM service_state
        WHERE service_name = $1
          AND state_type = $2
          AND state_value->>'network' = ANY($3)
        """,
        ServiceName.VALIDATOR,
        ServiceStateType.CANDIDATE,
        networks,
    )
    return row["count"] if row else 0

fetch_candidate_chunk async

fetch_candidate_chunk(
    brotr: Brotr,
    networks: list[str],
    before_timestamp: int,
    limit: int,
) -> list[dict[str, Any]]

Fetch candidates prioritized by fewest failures, then oldest.

Only returns candidates updated before before_timestamp to avoid reprocessing within the same cycle.

Called by Validator during chunk-based processing. The ordering ensures candidates most likely to succeed (fewest prior failures) are validated first.

Parameters:

  • brotr (Brotr) –

    Brotr database interface.

  • networks (list[str]) –

    Network type strings to include.

  • before_timestamp (int) –

    Exclude candidates updated after this time.

  • limit (int) –

    Maximum candidates to return.

Returns:

  • list[dict[str, Any]]

    List of dicts with keys: state_key, state_value.

See Also

count_candidates: Companion count query. promote_candidates: Called after successful validation to move candidates to the relay table.

Source code in src/bigbrotr/services/common/queries.py
async def fetch_candidate_chunk(
    brotr: Brotr,
    networks: list[str],
    before_timestamp: int,
    limit: int,
) -> list[dict[str, Any]]:
    """Fetch candidates prioritized by fewest failures, then oldest.

    Only returns candidates updated before ``before_timestamp`` to avoid
    reprocessing within the same cycle.

    Called by [Validator][bigbrotr.services.validator.Validator] during
    chunk-based processing. The ordering ensures candidates most likely
    to succeed (fewest prior failures) are validated first.

    Args:
        brotr: [Brotr][bigbrotr.core.brotr.Brotr] database interface.
        networks: Network type strings to include.
        before_timestamp: Exclude candidates updated after this time.
        limit: Maximum candidates to return.

    Returns:
        List of dicts with keys: ``state_key``, ``state_value``.

    See Also:
        [count_candidates][bigbrotr.services.common.queries.count_candidates]:
            Companion count query.
        [promote_candidates][bigbrotr.services.common.queries.promote_candidates]:
            Called after successful validation to move candidates to
            the ``relay`` table.
    """
    rows = await brotr.fetch(
        """
        SELECT state_key, state_value
        FROM service_state
        WHERE service_name = $1
          AND state_type = $2
          AND state_value->>'network' = ANY($3)
          AND updated_at < $4
        ORDER BY COALESCE((state_value->>'failures')::int, 0) ASC,
                 updated_at ASC
        LIMIT $5
        """,
        ServiceName.VALIDATOR,
        ServiceStateType.CANDIDATE,
        networks,
        before_timestamp,
        limit,
    )
    return [dict(row) for row in rows]

delete_stale_candidates async

delete_stale_candidates(brotr: Brotr) -> int

Remove candidates whose URLs already exist in the relays table.

Called by Validator during cleanup at the start of each cycle. Stale candidates appear when a relay was validated by another cycle, manually added, or re-discovered by Finder.

Returns:

  • int

    Number of deleted rows.

See Also

delete_exhausted_candidates: Companion cleanup that removes permanently failing candidates.

Source code in src/bigbrotr/services/common/queries.py
async def delete_stale_candidates(brotr: Brotr) -> int:
    """Remove candidates whose URLs already exist in the relays table.

    Called by [Validator][bigbrotr.services.validator.Validator] during
    cleanup at the start of each cycle. Stale candidates appear when a
    relay was validated by another cycle, manually added, or re-discovered
    by [Finder][bigbrotr.services.finder.Finder].

    Returns:
        Number of deleted rows.

    See Also:
        [delete_exhausted_candidates][bigbrotr.services.common.queries.delete_exhausted_candidates]:
            Companion cleanup that removes permanently failing candidates.
    """
    result = await brotr.execute(
        """
        DELETE FROM service_state
        WHERE service_name = $1
          AND state_type = $2
          AND EXISTS (SELECT 1 FROM relay r WHERE r.url = state_key)
        """,
        ServiceName.VALIDATOR,
        ServiceStateType.CANDIDATE,
    )
    return parse_delete_result(result)

delete_exhausted_candidates async

delete_exhausted_candidates(
    brotr: Brotr, max_failures: int
) -> int

Remove candidates that have exceeded the failure threshold.

Called by Validator during cleanup when cleanup.enabled is True. Prevents permanently broken relays from consuming validation resources indefinitely.

Parameters:

  • brotr (Brotr) –

    Brotr database interface.

  • max_failures (int) –

    Maximum allowed failed attempts (from cleanup.max_failures in ValidatorConfig).

Returns:

  • int

    Number of deleted rows.

See Also

delete_stale_candidates: Companion cleanup that removes already-promoted candidates.

Source code in src/bigbrotr/services/common/queries.py
async def delete_exhausted_candidates(
    brotr: Brotr,
    max_failures: int,
) -> int:
    """Remove candidates that have exceeded the failure threshold.

    Called by [Validator][bigbrotr.services.validator.Validator] during
    cleanup when ``cleanup.enabled`` is ``True``. Prevents permanently
    broken relays from consuming validation resources indefinitely.

    Args:
        brotr: [Brotr][bigbrotr.core.brotr.Brotr] database interface.
        max_failures: Maximum allowed failed attempts (from
            ``cleanup.max_failures`` in
            [ValidatorConfig][bigbrotr.services.validator.ValidatorConfig]).

    Returns:
        Number of deleted rows.

    See Also:
        [delete_stale_candidates][bigbrotr.services.common.queries.delete_stale_candidates]:
            Companion cleanup that removes already-promoted candidates.
    """
    result = await brotr.execute(
        """
        DELETE FROM service_state
        WHERE service_name = $1
          AND state_type = $2
          AND COALESCE((state_value->>'failures')::int, 0) >= $3
        """,
        ServiceName.VALIDATOR,
        ServiceStateType.CANDIDATE,
        max_failures,
    )
    return parse_delete_result(result)

promote_candidates async

promote_candidates(
    brotr: Brotr, relays: list[Relay]
) -> int

Atomically insert relays and remove their candidate records.

Runs both operations in a single Brotr.transaction() to prevent orphaned candidates if the process crashes mid-promotion.

Called by Validator after successful WebSocket validation to move candidates into the relay table.

Parameters:

  • brotr (Brotr) –

    Brotr database interface.

  • relays (list[Relay]) –

    Validated Relay objects to promote from candidates to the relays table.

Returns:

  • int

    Number of relays inserted (duplicates skipped via ON CONFLICT).

Note

Uses the relay_insert stored procedure for bulk insertion. The transaction ensures atomicity: if the insert succeeds but the delete fails, neither operation is committed.

Uses brotr.transaction() for atomicity, which yields a raw asyncpg connection that bypasses Brotr's timeout facade. Timeout uses config.timeouts.batch explicitly.

See Also

insert_candidates: The inverse operation that creates candidate records.

Source code in src/bigbrotr/services/common/queries.py
async def promote_candidates(brotr: Brotr, relays: list[Relay]) -> int:
    """Atomically insert relays and remove their candidate records.

    Runs both operations in a single
    [Brotr.transaction()][bigbrotr.core.brotr.Brotr.transaction] to
    prevent orphaned candidates if the process crashes mid-promotion.

    Called by [Validator][bigbrotr.services.validator.Validator] after
    successful WebSocket validation to move candidates into the ``relay``
    table.

    Args:
        brotr: [Brotr][bigbrotr.core.brotr.Brotr] database interface.
        relays: Validated [Relay][bigbrotr.models.relay.Relay] objects to
            promote from candidates to the relays table.

    Returns:
        Number of relays inserted (duplicates skipped via ``ON CONFLICT``).

    Note:
        Uses the ``relay_insert`` stored procedure for bulk insertion.
        The transaction ensures atomicity: if the insert succeeds but
        the delete fails, neither operation is committed.

        Uses ``brotr.transaction()`` for atomicity, which yields a raw
        asyncpg connection that bypasses Brotr's timeout facade.
        Timeout uses ``config.timeouts.batch`` explicitly.

    See Also:
        [insert_candidates][bigbrotr.services.common.queries.insert_candidates]:
            The inverse operation that creates candidate records.
    """
    if not relays:
        return 0

    params = [relay.to_db_params() for relay in relays]
    urls = [p.url for p in params]
    networks = [p.network for p in params]
    discovered_ats = [p.discovered_at for p in params]
    t = brotr.config.timeouts.batch

    async with brotr.transaction() as conn:
        inserted = (
            await conn.fetchval(
                "SELECT relay_insert($1, $2, $3)",
                urls,
                networks,
                discovered_ats,
                timeout=t,
            )
            or 0
        )
        await conn.execute(
            """
            DELETE FROM service_state
            WHERE service_name = $1
              AND state_type = $2
              AND state_key = ANY($3::text[])
            """,
            ServiceName.VALIDATOR,
            ServiceStateType.CANDIDATE,
            urls,
            timeout=t,
        )

    return inserted

get_all_service_cursors async

get_all_service_cursors(
    brotr: Brotr,
    service_name: ServiceName,
    cursor_field: str = "last_synced_at",
) -> dict[str, int]

Batch-fetch all cursor positions for a service.

Called by Synchronizer and Finder to pre-fetch all per-relay cursor values in a single query, avoiding the N+1 pattern of fetching one cursor per relay.

Parameters:

  • brotr (Brotr) –

    Brotr database interface.

  • service_name (ServiceName) –

    Service owning the cursors (e.g., ServiceName.SYNCHRONIZER).

  • cursor_field (str, default: 'last_synced_at' ) –

    JSON key in state_value containing the cursor value (e.g., "last_synced_at" or "last_seen_at").

Returns:

  • dict[str, int]

    Dict mapping state_key (relay URL) to cursor value (timestamp).

Note

Rows where the cursor field is NULL or missing are silently excluded from the result.

See Also

ServiceStateType.CURSOR: The state_type filter used in this query.

Source code in src/bigbrotr/services/common/queries.py
async def get_all_service_cursors(
    brotr: Brotr,
    service_name: ServiceName,
    cursor_field: str = "last_synced_at",
) -> dict[str, int]:
    """Batch-fetch all cursor positions for a service.

    Called by [Synchronizer][bigbrotr.services.synchronizer.Synchronizer]
    and [Finder][bigbrotr.services.finder.Finder] to pre-fetch all per-relay
    cursor values in a single query, avoiding the N+1 pattern of
    fetching one cursor per relay.

    Args:
        brotr: [Brotr][bigbrotr.core.brotr.Brotr] database interface.
        service_name: Service owning the cursors (e.g.,
            ``ServiceName.SYNCHRONIZER``).
        cursor_field: JSON key in ``state_value`` containing the cursor value
            (e.g., ``"last_synced_at"`` or ``"last_seen_at"``).

    Returns:
        Dict mapping ``state_key`` (relay URL) to cursor value (timestamp).

    Note:
        Rows where the cursor field is ``NULL`` or missing are silently
        excluded from the result.

    See Also:
        [ServiceStateType.CURSOR][bigbrotr.models.service_state.ServiceStateType]:
            The ``state_type`` filter used in this query.
    """
    rows = await brotr.fetch(
        """
        SELECT state_key, (state_value->>$1)::BIGINT as cursor_value
        FROM service_state
        WHERE service_name = $2 AND state_type = $3
        """,
        cursor_field,
        service_name,
        ServiceStateType.CURSOR,
    )
    return {r["state_key"]: r["cursor_value"] for r in rows if r["cursor_value"] is not None}

delete_orphan_cursors async

delete_orphan_cursors(
    brotr: Brotr, service_name: ServiceName
) -> int

Remove cursor records whose relay no longer exists in the relay table.

Cursors accumulate indefinitely as relays are discovered and later removed. This cleanup prevents unbounded growth of stale cursor rows in service_state.

Called by Finder and Synchronizer at the start of each cycle, before loading cursors.

Parameters:

  • brotr (Brotr) –

    Brotr database interface.

  • service_name (ServiceName) –

    Service owning the cursors (e.g., ServiceName.FINDER).

Returns:

  • int

    Number of orphan cursor rows deleted.

See Also

get_all_service_cursors: The companion fetch query for cursor records.

Source code in src/bigbrotr/services/common/queries.py
async def delete_orphan_cursors(brotr: Brotr, service_name: ServiceName) -> int:
    """Remove cursor records whose relay no longer exists in the ``relay`` table.

    Cursors accumulate indefinitely as relays are discovered and later removed.
    This cleanup prevents unbounded growth of stale cursor rows in
    ``service_state``.

    Called by [Finder][bigbrotr.services.finder.Finder] and
    [Synchronizer][bigbrotr.services.synchronizer.Synchronizer] at the start
    of each cycle, before loading cursors.

    Args:
        brotr: [Brotr][bigbrotr.core.brotr.Brotr] database interface.
        service_name: Service owning the cursors (e.g.,
            ``ServiceName.FINDER``).

    Returns:
        Number of orphan cursor rows deleted.

    See Also:
        [get_all_service_cursors][bigbrotr.services.common.queries.get_all_service_cursors]:
            The companion fetch query for cursor records.
    """
    result = await brotr.execute(
        """
        DELETE FROM service_state
        WHERE service_name = $1
          AND state_type = $2
          AND state_key NOT IN (SELECT url FROM relay)
        """,
        service_name,
        ServiceStateType.CURSOR,
    )
    return parse_delete_result(result)