Skip to content

brotr

brotr

High-level database interface built on stored procedures.

Provides typed wrappers around PostgreSQL stored procedures for all data operations: relay management, event ingestion, metadata storage, service state persistence, and materialized view maintenance.

Bulk inserts use array parameters to perform the entire batch in a single database round-trip. All insert methods accept only validated dataclass instances (Relay, Event, EventRelay, Metadata, RelayMetadata) to enforce type safety at the API boundary.

Uses composition with Pool for connection management and implements an async context manager for automatic pool lifecycle handling.

See Also

Pool: Low-level connection pool that this module wraps. bigbrotr.services.common.queries: Domain SQL query functions that use Brotr for execution. bigbrotr.models: Dataclass models consumed by the insert methods.

Classes

BatchConfig

Bases: BaseModel

Controls the maximum number of records per bulk insert operation.

Note

The batch size limit prevents excessively large array parameters from consuming too much memory in PostgreSQL. All insert methods on Brotr validate against this limit before executing.

See Also

BrotrConfig: Parent configuration that embeds this model.

TimeoutsConfig

Bases: BaseModel

Timeout settings for Brotr operations (in seconds).

Each timeout can be set to None for no limit (infinite wait) or to a float >= 0.1 seconds. Different categories allow tuning timeouts for fast queries vs. slow bulk inserts vs. long-running maintenance tasks.

Note

These timeouts are enforced client-side by asyncpg and are separate from the server-side statement_timeout configured in ServerSettingsConfig. The refresh timeout defaults to None (infinite) because REFRESH MATERIALIZED VIEW CONCURRENTLY can take minutes on large tables.

See Also

BrotrConfig: Parent configuration that embeds this model. TimeoutsConfig: Lower-level pool acquisition and health-check timeouts.

Functions
validate_timeout classmethod
validate_timeout(v: float | None) -> float | None

Validate timeout: None (infinite) or >= 0.1 seconds.

Source code in src/bigbrotr/core/brotr.py
@field_validator("query", "batch", "cleanup", "refresh", mode="after")
@classmethod
def validate_timeout(cls, v: float | None) -> float | None:
    """Validate timeout: None (infinite) or >= 0.1 seconds."""
    if v is not None and v < _MIN_TIMEOUT_SECONDS:
        raise ValueError(
            f"Timeout must be None (infinite) or >= {_MIN_TIMEOUT_SECONDS} seconds"
        )
    return v

BrotrConfig

Bases: BaseModel

Aggregate configuration for the Brotr database interface.

See Also

BatchConfig: Bulk insert size limits. TimeoutsConfig: Per-category timeout settings. Brotr: The database interface class that consumes this configuration.

Brotr

Brotr(
    pool: Pool | None = None,
    config: BrotrConfig | None = None,
)

High-level database interface wrapping PostgreSQL stored procedures.

Brotr is the shared DB contract across all BigBrotr implementations (bigbrotr, lilbrotr, ...). It is domain-aware by design: typed insert methods accept validated dataclass instances (Relay, Event, EventRelay, Metadata, RelayMetadata) and call domain-specific stored procedures. However, all domain SQL queries live in services/common/queries.py, not here.

Bulk inserts use array parameters for single-roundtrip efficiency. Uses composition with a private Pool instance for connection management. Exposes generic query methods (fetch(), fetchrow(), fetchval(), execute(), transaction()) as a facade over the pool for custom queries. Implements async context manager for automatic pool lifecycle management.

Examples:

brotr = Brotr.from_yaml("config.yaml")

async with brotr:
    relay = Relay("wss://relay.example.com")
    await brotr.insert_relay(records=[relay])

    event_relay = EventRelay(event=Event(nostr_event), relay=relay)
    await brotr.insert_event_relay(records=[event_relay])
Note

The _pool attribute is intentionally private. Services must use Brotr methods for all database access, never the pool directly. This ensures consistent timeout application, batch-size validation, and structured logging across the codebase.

See Also

Pool: The underlying connection pool. BrotrConfig: Configuration model for batch sizes and timeouts. BaseService: Abstract service base class that receives a Brotr instance. bigbrotr.models.relay.Relay: Relay dataclass consumed by insert_relay(). bigbrotr.models.event.Event: Event dataclass consumed by insert_event(). bigbrotr.models.metadata.Metadata: Metadata dataclass consumed by insert_metadata().

Initialize the database interface.

The instance is created in a disconnected state. Call connect() or use the async context manager to establish the underlying pool connection.

Parameters:

  • pool (Pool | None, default: None ) –

    Connection pool for database access. Creates a default Pool if not provided.

  • config (BrotrConfig | None, default: None ) –

    Brotr-specific configuration (batch sizes, timeouts). Uses default BrotrConfig if not provided.

See Also

from_yaml(): Construct from a YAML configuration file. from_dict(): Construct from a pre-parsed dictionary.

Source code in src/bigbrotr/core/brotr.py
def __init__(
    self,
    pool: Pool | None = None,
    config: BrotrConfig | None = None,
) -> None:
    """Initialize the database interface.

    The instance is created in a disconnected state. Call
    [connect()][bigbrotr.core.brotr.Brotr.connect] or use the async
    context manager to establish the underlying pool connection.

    Args:
        pool: Connection pool for database access. Creates a default
            [Pool][bigbrotr.core.pool.Pool] if not provided.
        config: Brotr-specific configuration (batch sizes, timeouts).
            Uses default [BrotrConfig][bigbrotr.core.brotr.BrotrConfig]
            if not provided.

    See Also:
        [from_yaml()][bigbrotr.core.brotr.Brotr.from_yaml]: Construct
            from a YAML configuration file.
        [from_dict()][bigbrotr.core.brotr.Brotr.from_dict]: Construct
            from a pre-parsed dictionary.
    """
    self._pool = pool or Pool()
    self._config = config or BrotrConfig()
    self._logger = Logger("brotr")
Attributes
config property
config: BrotrConfig

The Brotr configuration (read-only).

Functions
from_yaml classmethod
from_yaml(config_path: str) -> Brotr

Create a Brotr instance from a YAML configuration file.

The YAML file should contain a pool key for Pool connection settings and optional batch/timeouts keys for Brotr-specific settings. Delegates to load_yaml() for safe YAML parsing.

Parameters:

  • config_path (str) –

    Path to the YAML configuration file.

Returns:

  • Brotr

    A configured Brotr instance (not yet connected).

See Also

from_dict(): Construct from a pre-parsed dictionary.

Source code in src/bigbrotr/core/brotr.py
@classmethod
def from_yaml(cls, config_path: str) -> Brotr:
    """Create a Brotr instance from a YAML configuration file.

    The YAML file should contain a ``pool`` key for
    [Pool][bigbrotr.core.pool.Pool] connection settings and optional
    ``batch``/``timeouts`` keys for Brotr-specific settings. Delegates
    to [load_yaml()][bigbrotr.core.yaml.load_yaml] for safe YAML
    parsing.

    Args:
        config_path: Path to the YAML configuration file.

    Returns:
        A configured Brotr instance (not yet connected).

    See Also:
        [from_dict()][bigbrotr.core.brotr.Brotr.from_dict]: Construct
            from a pre-parsed dictionary.
    """
    return cls.from_dict(load_yaml(config_path))
from_dict classmethod
from_dict(config_dict: dict[str, Any]) -> Brotr

Create a Brotr instance from a configuration dictionary.

Extracts the pool key to build the Pool and passes remaining keys as BrotrConfig fields (batch sizes, timeouts).

Source code in src/bigbrotr/core/brotr.py
@classmethod
def from_dict(cls, config_dict: dict[str, Any]) -> Brotr:
    """Create a Brotr instance from a configuration dictionary.

    Extracts the ``pool`` key to build the
    [Pool][bigbrotr.core.pool.Pool] and passes remaining keys as
    [BrotrConfig][bigbrotr.core.brotr.BrotrConfig] fields (batch sizes,
    timeouts).
    """
    pool = None
    if "pool" in config_dict:
        pool = Pool.from_dict(config_dict["pool"])

    brotr_config_dict = {k: v for k, v in config_dict.items() if k != "pool"}
    config = BrotrConfig(**brotr_config_dict) if brotr_config_dict else None

    return cls(pool=pool, config=config)
fetch async
fetch(query: str, *args: Any) -> list[Record]

Execute a query and return all rows.

Delegates to Pool.fetch() with timeout from config.timeouts.query.

Parameters:

  • query (str) –

    SQL query with $1, $2, ... placeholders.

  • *args (Any, default: () ) –

    Query parameters.

Source code in src/bigbrotr/core/brotr.py
async def fetch(
    self,
    query: str,
    *args: Any,
) -> list[asyncpg.Record]:
    """Execute a query and return all rows.

    Delegates to [Pool.fetch()][bigbrotr.core.pool.Pool.fetch] with
    timeout from
    [config.timeouts.query][bigbrotr.core.brotr.TimeoutsConfig].

    Args:
        query: SQL query with ``$1``, ``$2``, ... placeholders.
        *args: Query parameters.
    """
    return await self._pool.fetch(query, *args, timeout=self._config.timeouts.query)
fetchrow async
fetchrow(query: str, *args: Any) -> Record | None

Execute a query and return the first row.

Delegates to Pool.fetchrow() with timeout from config.timeouts.query.

Parameters:

  • query (str) –

    SQL query with $1, $2, ... placeholders.

  • *args (Any, default: () ) –

    Query parameters.

Source code in src/bigbrotr/core/brotr.py
async def fetchrow(
    self,
    query: str,
    *args: Any,
) -> asyncpg.Record | None:
    """Execute a query and return the first row.

    Delegates to [Pool.fetchrow()][bigbrotr.core.pool.Pool.fetchrow] with
    timeout from
    [config.timeouts.query][bigbrotr.core.brotr.TimeoutsConfig].

    Args:
        query: SQL query with ``$1``, ``$2``, ... placeholders.
        *args: Query parameters.
    """
    return await self._pool.fetchrow(query, *args, timeout=self._config.timeouts.query)
fetchval async
fetchval(query: str, *args: Any) -> Any

Execute a query and return the first column of the first row.

Delegates to Pool.fetchval() with timeout from config.timeouts.query.

Parameters:

  • query (str) –

    SQL query with $1, $2, ... placeholders.

  • *args (Any, default: () ) –

    Query parameters.

Source code in src/bigbrotr/core/brotr.py
async def fetchval(self, query: str, *args: Any) -> Any:
    """Execute a query and return the first column of the first row.

    Delegates to [Pool.fetchval()][bigbrotr.core.pool.Pool.fetchval] with
    timeout from
    [config.timeouts.query][bigbrotr.core.brotr.TimeoutsConfig].

    Args:
        query: SQL query with ``$1``, ``$2``, ... placeholders.
        *args: Query parameters.
    """
    return await self._pool.fetchval(query, *args, timeout=self._config.timeouts.query)
execute async
execute(query: str, *args: Any) -> str

Execute a query and return the command status string.

Delegates to Pool.execute() with timeout from config.timeouts.query.

Parameters:

  • query (str) –

    SQL query with $1, $2, ... placeholders.

  • *args (Any, default: () ) –

    Query parameters.

Source code in src/bigbrotr/core/brotr.py
async def execute(self, query: str, *args: Any) -> str:
    """Execute a query and return the command status string.

    Delegates to [Pool.execute()][bigbrotr.core.pool.Pool.execute] with
    timeout from
    [config.timeouts.query][bigbrotr.core.brotr.TimeoutsConfig].

    Args:
        query: SQL query with ``$1``, ``$2``, ... placeholders.
        *args: Query parameters.
    """
    return await self._pool.execute(query, *args, timeout=self._config.timeouts.query)
transaction
transaction() -> AbstractAsyncContextManager[
    Connection[Record]
]

Return a transaction context manager from the pool.

The transaction commits automatically on normal exit and rolls back if an exception propagates. Delegates to Pool.transaction().

Yields:

  • AbstractAsyncContextManager[Connection[Record]]

    An asyncpg connection with an active transaction. The

  • AbstractAsyncContextManager[Connection[Record]]

    transaction commits on normal exit and rolls back on exception.

Examples:

async with brotr.transaction() as conn:
    await conn.execute("INSERT INTO ...")
    await conn.execute("DELETE FROM ...")
See Also

Pool.transaction(): Underlying pool method.

Source code in src/bigbrotr/core/brotr.py
def transaction(self) -> AbstractAsyncContextManager[asyncpg.Connection[asyncpg.Record]]:
    """Return a transaction context manager from the pool.

    The transaction commits automatically on normal exit and rolls back
    if an exception propagates. Delegates to
    [Pool.transaction()][bigbrotr.core.pool.Pool.transaction].

    Yields:
        An asyncpg connection with an active transaction. The
        transaction commits on normal exit and rolls back on exception.

    Examples:
        ```python
        async with brotr.transaction() as conn:
            await conn.execute("INSERT INTO ...")
            await conn.execute("DELETE FROM ...")
        ```

    See Also:
        [Pool.transaction()][bigbrotr.core.pool.Pool.transaction]:
            Underlying pool method.
    """
    return self._pool.transaction()
insert_relay async
insert_relay(records: list[Relay]) -> int

Bulk-insert relay records into the relay table.

Calls the relay_insert stored procedure with transposed column arrays for single-roundtrip efficiency.

Parameters:

  • records (list[Relay]) –

    Validated Relay dataclass instances.

Returns:

  • int

    Number of new relays inserted (duplicates are skipped).

Raises:

  • PostgresError

    On database errors.

  • ValueError

    If the batch exceeds the configured maximum size from BatchConfig.

See Also

insert_event_relay(): Cascade insert that also creates relay records.

Source code in src/bigbrotr/core/brotr.py
async def insert_relay(self, records: list[Relay]) -> int:
    """Bulk-insert relay records into the ``relay`` table.

    Calls the ``relay_insert`` stored procedure with transposed column
    arrays for single-roundtrip efficiency.

    Args:
        records: Validated [Relay][bigbrotr.models.relay.Relay] dataclass
            instances.

    Returns:
        Number of new relays inserted (duplicates are skipped).

    Raises:
        asyncpg.PostgresError: On database errors.
        ValueError: If the batch exceeds the configured maximum size
            from [BatchConfig][bigbrotr.core.brotr.BatchConfig].

    See Also:
        [insert_event_relay()][bigbrotr.core.brotr.Brotr.insert_event_relay]:
            Cascade insert that also creates relay records.
    """
    if not records:
        return 0

    self._validate_batch_size(records, "insert_relay")

    params = [relay.to_db_params() for relay in records]
    columns = self._transpose_to_columns(params)

    inserted: int = await self._call_procedure(
        "relay_insert",
        *columns,
        fetch_result=True,
        timeout=self._config.timeouts.batch,
    )

    self._logger.debug("relay_inserted", count=inserted, attempted=len(params))
    return inserted
insert_event async
insert_event(records: list[Event]) -> int

Bulk-insert event records into the event table only.

Does not create relay associations. Use insert_event_relay() with cascade=True to also insert relays and junction records.

Parameters:

  • records (list[Event]) –

    Validated Event dataclass instances.

Returns:

  • int

    Number of new events inserted (duplicates are skipped).

Raises:

  • PostgresError

    On database errors.

  • ValueError

    If the batch exceeds the configured maximum size from BatchConfig.

See Also

insert_event_relay(): Cascade insert that creates events, relays, and junction records in a single stored procedure call.

Source code in src/bigbrotr/core/brotr.py
async def insert_event(self, records: list[Event]) -> int:
    """Bulk-insert event records into the ``event`` table only.

    Does not create relay associations. Use
    [insert_event_relay()][bigbrotr.core.brotr.Brotr.insert_event_relay]
    with ``cascade=True`` to also insert relays and junction records.

    Args:
        records: Validated [Event][bigbrotr.models.event.Event] dataclass
            instances.

    Returns:
        Number of new events inserted (duplicates are skipped).

    Raises:
        asyncpg.PostgresError: On database errors.
        ValueError: If the batch exceeds the configured maximum size
            from [BatchConfig][bigbrotr.core.brotr.BatchConfig].

    See Also:
        [insert_event_relay()][bigbrotr.core.brotr.Brotr.insert_event_relay]:
            Cascade insert that creates events, relays, and junction records
            in a single stored procedure call.
    """
    if not records:
        return 0

    self._validate_batch_size(records, "insert_event")

    params = [event.to_db_params() for event in records]
    columns = self._transpose_to_columns(params)

    inserted: int = await self._call_procedure(
        "event_insert",
        *columns,
        fetch_result=True,
        timeout=self._config.timeouts.batch,
    )

    self._logger.debug("event_inserted", count=inserted, attempted=len(params))
    return inserted
insert_event_relay async
insert_event_relay(
    records: list[EventRelay], *, cascade: bool = True
) -> int

Bulk-insert event-relay junction records.

Parameters:

  • records (list[EventRelay]) –

    Validated EventRelay dataclass instances.

  • cascade (bool, default: True ) –

    If True (default), also inserts the parent Relay and Event records atomically (relays -> events -> junctions) via the event_relay_insert_cascade stored procedure. If False, only inserts junction rows via event_relay_insert and expects foreign keys to already exist.

Returns:

  • int

    Number of new junction records inserted.

Raises:

  • PostgresError

    On database errors.

  • ValueError

    If the batch exceeds the configured maximum size from BatchConfig.

See Also

insert_event(): Insert events without relay associations. insert_relay(): Insert relays without event associations.

Source code in src/bigbrotr/core/brotr.py
async def insert_event_relay(self, records: list[EventRelay], *, cascade: bool = True) -> int:
    """Bulk-insert event-relay junction records.

    Args:
        records: Validated
            [EventRelay][bigbrotr.models.event_relay.EventRelay] dataclass
            instances.
        cascade: If ``True`` (default), also inserts the parent
            [Relay][bigbrotr.models.relay.Relay] and
            [Event][bigbrotr.models.event.Event] records atomically
            (relays -> events -> junctions) via the
            ``event_relay_insert_cascade`` stored procedure. If
            ``False``, only inserts junction rows via
            ``event_relay_insert`` and expects foreign keys to already
            exist.

    Returns:
        Number of new junction records inserted.

    Raises:
        asyncpg.PostgresError: On database errors.
        ValueError: If the batch exceeds the configured maximum size
            from [BatchConfig][bigbrotr.core.brotr.BatchConfig].

    See Also:
        [insert_event()][bigbrotr.core.brotr.Brotr.insert_event]:
            Insert events without relay associations.
        [insert_relay()][bigbrotr.core.brotr.Brotr.insert_relay]:
            Insert relays without event associations.
    """
    if not records:
        return 0

    self._validate_batch_size(records, "insert_event_relay")

    params = [event_relay.to_db_params() for event_relay in records]
    columns: tuple[list[Any], ...]

    if cascade:
        # Cascade: relay -> event -> event_relay in one procedure call
        columns = self._transpose_to_columns(params)
        procedure = "event_relay_insert_cascade"
    else:
        # Junction-only: caller guarantees foreign keys exist
        event_ids = [p.event_id for p in params]
        relay_urls = [p.relay_url for p in params]
        seen_ats = [p.seen_at for p in params]
        procedure = "event_relay_insert"
        columns = (event_ids, relay_urls, seen_ats)

    inserted: int = await self._call_procedure(
        procedure,
        *columns,
        fetch_result=True,
        timeout=self._config.timeouts.batch,
    )

    self._logger.debug(
        "event_relay_inserted", count=inserted, attempted=len(params), cascade=cascade
    )
    return inserted
insert_metadata async
insert_metadata(records: list[Metadata]) -> int

Bulk-insert metadata records into the metadata table.

Metadata is content-addressed: each record's SHA-256 hash combined with its metadata type forms the composite primary key, providing automatic deduplication within each type. The hash is computed in Python for deterministic behavior across environments.

Use insert_relay_metadata() with cascade=True to also create the relay association in a single stored procedure call.

Parameters:

Returns:

  • int

    Number of new metadata records inserted (duplicates are skipped).

Raises:

  • PostgresError

    On database errors.

  • ValueError

    If the batch exceeds the configured maximum size from BatchConfig.

Note

The metadata table has columns id, type, and data with composite PK (id, type). The SHA-256 hash is computed over the canonical JSON representation in the Metadata model's __post_init__ method.

See Also

insert_relay_metadata(): Cascade insert that also creates relay-metadata junction records.

Source code in src/bigbrotr/core/brotr.py
async def insert_metadata(self, records: list[Metadata]) -> int:
    """Bulk-insert metadata records into the ``metadata`` table.

    Metadata is content-addressed: each record's SHA-256 hash combined
    with its metadata type forms the composite primary key, providing
    automatic deduplication within each type. The hash is computed in
    Python for deterministic behavior across environments.

    Use
    [insert_relay_metadata()][bigbrotr.core.brotr.Brotr.insert_relay_metadata]
    with ``cascade=True`` to also create the relay association in a
    single stored procedure call.

    Args:
        records: Validated [Metadata][bigbrotr.models.metadata.Metadata]
            dataclass instances.

    Returns:
        Number of new metadata records inserted (duplicates are skipped).

    Raises:
        asyncpg.PostgresError: On database errors.
        ValueError: If the batch exceeds the configured maximum size
            from [BatchConfig][bigbrotr.core.brotr.BatchConfig].

    Note:
        The ``metadata`` table has columns ``id``, ``type``, and
        ``data`` with composite PK ``(id, type)``.
        The SHA-256 hash is computed over the canonical JSON representation
        in the [Metadata][bigbrotr.models.metadata.Metadata] model's
        ``__post_init__`` method.

    See Also:
        [insert_relay_metadata()][bigbrotr.core.brotr.Brotr.insert_relay_metadata]:
            Cascade insert that also creates relay-metadata junction records.
    """
    if not records:
        return 0

    self._validate_batch_size(records, "insert_metadata")

    params = [metadata.to_db_params() for metadata in records]
    ids = [p.id for p in params]
    types = [p.type for p in params]
    values = [p.data for p in params]

    inserted: int = await self._call_procedure(
        "metadata_insert",
        ids,
        types,
        values,
        fetch_result=True,
        timeout=self._config.timeouts.batch,
    )

    self._logger.debug("metadata_inserted", count=inserted, attempted=len(params))
    return inserted
insert_relay_metadata async
insert_relay_metadata(
    records: list[RelayMetadata], *, cascade: bool = True
) -> int

Bulk-insert relay-metadata junction records.

Links relays to content-addressed metadata records. SHA-256 hashes are computed in Python for deterministic deduplication.

Parameters:

  • records (list[RelayMetadata]) –

    Validated RelayMetadata dataclass instances.

  • cascade (bool, default: True ) –

    If True (default), also inserts the parent Relay and Metadata records (relays -> metadata -> relay_metadata) via the relay_metadata_insert_cascade stored procedure. If False, only inserts junction rows via relay_metadata_insert and expects foreign keys to already exist.

Returns:

  • int

    Number of new relay-metadata records inserted.

Raises:

  • PostgresError

    On database errors.

  • ValueError

    If the batch exceeds the configured maximum size from BatchConfig.

See Also

insert_metadata(): Insert metadata without relay associations. insert_relay(): Insert relays without metadata associations.

Source code in src/bigbrotr/core/brotr.py
async def insert_relay_metadata(
    self, records: list[RelayMetadata], *, cascade: bool = True
) -> int:
    """Bulk-insert relay-metadata junction records.

    Links relays to content-addressed metadata records. SHA-256 hashes
    are computed in Python for deterministic deduplication.

    Args:
        records: Validated
            [RelayMetadata][bigbrotr.models.relay_metadata.RelayMetadata]
            dataclass instances.
        cascade: If ``True`` (default), also inserts the parent
            [Relay][bigbrotr.models.relay.Relay] and
            [Metadata][bigbrotr.models.metadata.Metadata] records
            (relays -> metadata -> relay_metadata) via the
            ``relay_metadata_insert_cascade`` stored procedure. If
            ``False``, only inserts junction rows via
            ``relay_metadata_insert`` and expects foreign keys to
            already exist.

    Returns:
        Number of new relay-metadata records inserted.

    Raises:
        asyncpg.PostgresError: On database errors.
        ValueError: If the batch exceeds the configured maximum size
            from [BatchConfig][bigbrotr.core.brotr.BatchConfig].

    See Also:
        [insert_metadata()][bigbrotr.core.brotr.Brotr.insert_metadata]:
            Insert metadata without relay associations.
        [insert_relay()][bigbrotr.core.brotr.Brotr.insert_relay]:
            Insert relays without metadata associations.
    """
    if not records:
        return 0

    self._validate_batch_size(records, "insert_relay_metadata")

    params = [record.to_db_params() for record in records]

    if cascade:
        # Cascade: relays -> metadata -> relay_metadata in one procedure call
        columns = self._transpose_to_columns(params)
        procedure = "relay_metadata_insert_cascade"
    else:
        # Junction-only: caller guarantees foreign keys exist
        relay_urls = [p.relay_url for p in params]
        metadata_ids = [p.metadata_id for p in params]
        metadata_types = [p.metadata_type for p in params]
        generated_ats = [p.generated_at for p in params]
        procedure = "relay_metadata_insert"
        columns = (relay_urls, metadata_ids, metadata_types, generated_ats)

    inserted: int = await self._call_procedure(
        procedure,
        *columns,
        fetch_result=True,
        timeout=self._config.timeouts.batch,
    )

    self._logger.debug(
        "relay_metadata_inserted",
        count=inserted,
        attempted=len(params),
        cascade=cascade,
    )
    return inserted
delete_orphan_event async
delete_orphan_event() -> int

Delete events that have no associated relay in the junction table.

Orphaned events occur when relays are deleted or events were inserted without relay associations. Removing them reclaims storage and maintains referential consistency. Calls the orphan_event_delete stored procedure.

Returns:

  • int

    Number of orphaned events deleted.

Raises:

  • PostgresError

    On database errors.

See Also

delete_orphan_metadata(): Companion cleanup for orphaned metadata records.

Source code in src/bigbrotr/core/brotr.py
async def delete_orphan_event(self) -> int:
    """Delete events that have no associated relay in the junction table.

    Orphaned events occur when relays are deleted or events were
    inserted without relay associations. Removing them reclaims
    storage and maintains referential consistency. Calls the
    ``orphan_event_delete`` stored procedure.

    Returns:
        Number of orphaned events deleted.

    Raises:
        asyncpg.PostgresError: On database errors.

    See Also:
        [delete_orphan_metadata()][bigbrotr.core.brotr.Brotr.delete_orphan_metadata]:
            Companion cleanup for orphaned metadata records.
    """
    result: int = await self._call_procedure(
        "orphan_event_delete",
        fetch_result=True,
        timeout=self._config.timeouts.cleanup,
    )
    return result
delete_orphan_metadata async
delete_orphan_metadata() -> int

Delete metadata records that have no associated relay in the junction table.

Orphaned metadata occurs when all relay associations for a content- addressed blob are removed (e.g., superseded NIP-11 or NIP-66 data). Removing them reclaims storage. Calls the orphan_metadata_delete stored procedure.

Returns:

  • int

    Number of orphaned metadata records deleted.

Raises:

  • PostgresError

    On database errors.

See Also

delete_orphan_event(): Companion cleanup for orphaned event records.

Source code in src/bigbrotr/core/brotr.py
async def delete_orphan_metadata(self) -> int:
    """Delete metadata records that have no associated relay in the junction table.

    Orphaned metadata occurs when all relay associations for a content-
    addressed blob are removed (e.g., superseded NIP-11 or NIP-66 data).
    Removing them reclaims storage. Calls the ``orphan_metadata_delete``
    stored procedure.

    Returns:
        Number of orphaned metadata records deleted.

    Raises:
        asyncpg.PostgresError: On database errors.

    See Also:
        [delete_orphan_event()][bigbrotr.core.brotr.Brotr.delete_orphan_event]:
            Companion cleanup for orphaned event records.
    """
    result: int = await self._call_procedure(
        "orphan_metadata_delete",
        fetch_result=True,
        timeout=self._config.timeouts.cleanup,
    )
    return result
upsert_service_state async
upsert_service_state(records: list[ServiceState]) -> int

Atomically upsert service state records using bulk array parameters.

Services use this to persist operational state (cursors, checkpoints, candidates) across restarts. Each record is identified by the composite key (service_name, state_type, state_key). Calls the service_state_upsert stored procedure.

Parameters:

Returns:

  • int

    Number of records upserted.

See Also

get_service_state(): Retrieve persisted state records. delete_service_state(): Remove persisted state records.

Source code in src/bigbrotr/core/brotr.py
async def upsert_service_state(self, records: list[ServiceState]) -> int:
    """Atomically upsert service state records using bulk array parameters.

    Services use this to persist operational state (cursors, checkpoints,
    candidates) across restarts. Each record is identified by the
    composite key ``(service_name, state_type, state_key)``. Calls the
    ``service_state_upsert`` stored procedure.

    Args:
        records: List of
            [ServiceState][bigbrotr.models.service_state.ServiceState]
            dataclass instances.

    Returns:
        Number of records upserted.

    See Also:
        [get_service_state()][bigbrotr.core.brotr.Brotr.get_service_state]:
            Retrieve persisted state records.
        [delete_service_state()][bigbrotr.core.brotr.Brotr.delete_service_state]:
            Remove persisted state records.
    """
    if not records:
        return 0

    self._validate_batch_size(records, "upsert_service_state")

    params = [r.to_db_params() for r in records]
    columns = self._transpose_to_columns(params)

    # Procedure returns VOID; no DB-confirmed count available
    await self._call_procedure(
        "service_state_upsert",
        *columns,
        fetch_result=False,
        timeout=self._config.timeouts.batch,
    )

    self._logger.debug("service_state_upserted", count=len(records))
    return len(records)
get_service_state async
get_service_state(
    service_name: ServiceName,
    state_type: ServiceStateType,
    key: str | None = None,
) -> list[ServiceState]

Retrieve persisted service state records.

Calls the service_state_get stored procedure.

Parameters:

  • service_name (ServiceName) –

    Owning service name (e.g. ServiceName.FINDER).

  • state_type (ServiceStateType) –

    Category of state. See ServiceStateType for the canonical enum values.

  • key (str | None, default: None ) –

    Specific record key, or None to retrieve all records matching the service/type combination.

Returns:

See Also

upsert_service_state(): Persist state records. delete_service_state(): Remove state records.

Source code in src/bigbrotr/core/brotr.py
async def get_service_state(
    self,
    service_name: ServiceName,
    state_type: ServiceStateType,
    key: str | None = None,
) -> list[ServiceState]:
    """Retrieve persisted service state records.

    Calls the ``service_state_get`` stored procedure.

    Args:
        service_name: Owning service name (e.g.
            ``ServiceName.FINDER``).
        state_type: Category of state. See
            [ServiceStateType][bigbrotr.models.service_state.ServiceStateType]
            for the canonical enum values.
        key: Specific record key, or ``None`` to retrieve all records
            matching the service/type combination.

    Returns:
        List of [ServiceState][bigbrotr.models.service_state.ServiceState]
        instances reconstructed from the database rows.

    See Also:
        [upsert_service_state()][bigbrotr.core.brotr.Brotr.upsert_service_state]:
            Persist state records.
        [delete_service_state()][bigbrotr.core.brotr.Brotr.delete_service_state]:
            Remove state records.
    """
    rows = await self._pool.fetch(
        "SELECT * FROM service_state_get($1, $2, $3)",
        service_name,
        state_type,
        key,
        timeout=self._config.timeouts.query,
    )

    return [
        ServiceState(
            service_name=service_name,
            state_type=state_type,
            state_key=row["state_key"],
            state_value=row["state_value"],
            updated_at=row["updated_at"],
        )
        for row in rows
    ]
delete_service_state async
delete_service_state(
    service_names: list[ServiceName],
    state_types: list[ServiceStateType],
    state_keys: list[str],
) -> int

Atomically delete service state records by composite key.

Calls the service_state_delete stored procedure with three parallel arrays identifying the records to remove.

Parameters:

  • service_names (list[ServiceName]) –

    Service name for each record.

  • state_types (list[ServiceStateType]) –

    State type for each record.

  • state_keys (list[str]) –

    State key for each record.

Returns:

  • int

    Number of records actually deleted.

See Also

upsert_service_state(): Persist state records. get_service_state(): Retrieve state records.

Source code in src/bigbrotr/core/brotr.py
async def delete_service_state(
    self,
    service_names: list[ServiceName],
    state_types: list[ServiceStateType],
    state_keys: list[str],
) -> int:
    """Atomically delete service state records by composite key.

    Calls the ``service_state_delete`` stored procedure with three
    parallel arrays identifying the records to remove.

    Args:
        service_names: Service name for each record.
        state_types: State type for each record.
        state_keys: State key for each record.

    Returns:
        Number of records actually deleted.

    See Also:
        [upsert_service_state()][bigbrotr.core.brotr.Brotr.upsert_service_state]:
            Persist state records.
        [get_service_state()][bigbrotr.core.brotr.Brotr.get_service_state]:
            Retrieve state records.
    """
    if not service_names:
        return 0

    if not (len(service_names) == len(state_types) == len(state_keys)):
        raise ValueError(
            f"Parallel arrays must have equal length: "
            f"service_names={len(service_names)}, "
            f"state_types={len(state_types)}, "
            f"state_keys={len(state_keys)}"
        )

    self._validate_batch_size(service_names, "delete_service_state")

    deleted: int = await self._call_procedure(
        "service_state_delete",
        service_names,
        state_types,
        state_keys,
        fetch_result=True,
        timeout=self._config.timeouts.batch,
    )

    self._logger.debug(
        "service_state_deleted",
        count=deleted,
        attempted=len(service_names),
    )
    return deleted
refresh_materialized_view async
refresh_materialized_view(view_name: str) -> None

Refresh a materialized view concurrently (non-blocking).

Calls a stored procedure named {view_name}_refresh which performs REFRESH MATERIALIZED VIEW CONCURRENTLY. The view name is validated by _call_procedure() against a strict SQL identifier regex to prevent injection.

Parameters:

  • view_name (str) –

    Name of the materialized view to refresh (e.g. "relay_metadata_latest", "event_stats").

Raises:

  • ValueError

    If the view name is not a valid SQL identifier.

Note

The timeout for refresh operations defaults to None (infinite) via TimeoutsConfig.refresh because REFRESH MATERIALIZED VIEW CONCURRENTLY can take several minutes on large tables with complex indexes.

Source code in src/bigbrotr/core/brotr.py
async def refresh_materialized_view(self, view_name: str) -> None:
    """Refresh a materialized view concurrently (non-blocking).

    Calls a stored procedure named ``{view_name}_refresh`` which
    performs ``REFRESH MATERIALIZED VIEW CONCURRENTLY``. The view
    name is validated by
    ``_call_procedure()``
    against a strict SQL identifier regex to prevent injection.

    Args:
        view_name: Name of the materialized view to refresh
            (e.g. ``"relay_metadata_latest"``, ``"event_stats"``).

    Raises:
        ValueError: If the view name is not a valid SQL identifier.

    Note:
        The timeout for refresh operations defaults to ``None``
        (infinite) via
        [TimeoutsConfig.refresh][bigbrotr.core.brotr.TimeoutsConfig]
        because ``REFRESH MATERIALIZED VIEW CONCURRENTLY`` can take
        several minutes on large tables with complex indexes.
    """
    await self._call_procedure(
        f"{view_name}_refresh",
        timeout=self._config.timeouts.refresh,
    )
    self._logger.debug("matview_refreshed", view=view_name)
connect async
connect() -> None

Connect the underlying pool. Idempotent.

Source code in src/bigbrotr/core/brotr.py
async def connect(self) -> None:
    """Connect the underlying pool. Idempotent."""
    await self._pool.connect()
    self._logger.debug("session_started")
close async
close() -> None

Close the underlying pool. Idempotent.

Source code in src/bigbrotr/core/brotr.py
async def close(self) -> None:
    """Close the underlying pool. Idempotent."""
    self._logger.debug("session_ending")
    await self._pool.close()

Functions