brotr
brotr
¶
High-level database interface built on stored procedures.
Provides typed wrappers around PostgreSQL stored procedures for all data operations: relay management, event ingestion, metadata storage, service state persistence, and materialized view maintenance.
Bulk inserts use array parameters to perform the entire batch in a single database round-trip. All insert methods accept only validated dataclass instances (Relay, Event, EventRelay, Metadata, RelayMetadata) to enforce type safety at the API boundary.
Uses composition with Pool for connection management and implements an async context manager for automatic pool lifecycle handling.
See Also
Pool: Low-level connection pool that this module wraps. bigbrotr.services.common.queries: Domain SQL query functions that use Brotr for execution. bigbrotr.models: Dataclass models consumed by the insert methods.
Classes¶
BatchConfig
¶
Bases: BaseModel
Controls the maximum number of records per bulk insert operation.
Note
The batch size limit prevents excessively large array parameters from consuming too much memory in PostgreSQL. All insert methods on Brotr validate against this limit before executing.
See Also
BrotrConfig: Parent configuration that embeds this model.
TimeoutsConfig
¶
Bases: BaseModel
Timeout settings for Brotr operations (in seconds).
Each timeout can be set to None for no limit (infinite wait) or to a
float >= 0.1 seconds. Different categories allow tuning timeouts for
fast queries vs. slow bulk inserts vs. long-running maintenance tasks.
Note
These timeouts are enforced client-side by asyncpg and are separate
from the server-side statement_timeout configured in
ServerSettingsConfig. The
refresh timeout defaults to None (infinite) because
REFRESH MATERIALIZED VIEW CONCURRENTLY can take minutes on
large tables.
See Also
BrotrConfig: Parent configuration that embeds this model. TimeoutsConfig: Lower-level pool acquisition and health-check timeouts.
Functions¶
validate_timeout
classmethod
¶
Validate timeout: None (infinite) or >= 0.1 seconds.
Source code in src/bigbrotr/core/brotr.py
BrotrConfig
¶
Bases: BaseModel
Aggregate configuration for the Brotr database interface.
See Also
BatchConfig: Bulk insert size limits. TimeoutsConfig: Per-category timeout settings. Brotr: The database interface class that consumes this configuration.
Brotr
¶
Brotr(
pool: Pool | None = None,
config: BrotrConfig | None = None,
)
High-level database interface wrapping PostgreSQL stored procedures.
Brotr is the shared DB contract across all BigBrotr implementations
(bigbrotr, lilbrotr, ...). It is domain-aware by design: typed insert
methods accept validated dataclass instances
(Relay,
Event,
EventRelay,
Metadata,
RelayMetadata) and call
domain-specific stored procedures. However, all domain SQL queries
live in services/common/queries.py, not here.
Bulk inserts use array parameters for single-roundtrip efficiency. Uses composition with a private Pool instance for connection management. Exposes generic query methods (fetch(), fetchrow(), fetchval(), execute(), transaction()) as a facade over the pool for custom queries. Implements async context manager for automatic pool lifecycle management.
Examples:
brotr = Brotr.from_yaml("config.yaml")
async with brotr:
relay = Relay("wss://relay.example.com")
await brotr.insert_relay(records=[relay])
event_relay = EventRelay(event=Event(nostr_event), relay=relay)
await brotr.insert_event_relay(records=[event_relay])
Note
The _pool attribute is intentionally private. Services must use
Brotr methods for all database access,
never the pool directly. This ensures consistent timeout application,
batch-size validation, and structured logging across the codebase.
See Also
Pool: The underlying connection pool.
BrotrConfig: Configuration model
for batch sizes and timeouts.
BaseService: Abstract
service base class that receives a Brotr instance.
bigbrotr.models.relay.Relay: Relay
dataclass consumed by
insert_relay().
bigbrotr.models.event.Event: Event
dataclass consumed by
insert_event().
bigbrotr.models.metadata.Metadata:
Metadata dataclass consumed by
insert_metadata().
Initialize the database interface.
The instance is created in a disconnected state. Call connect() or use the async context manager to establish the underlying pool connection.
Parameters:
-
pool(Pool | None, default:None) –Connection pool for database access. Creates a default Pool if not provided.
-
config(BrotrConfig | None, default:None) –Brotr-specific configuration (batch sizes, timeouts). Uses default BrotrConfig if not provided.
See Also
from_yaml(): Construct from a YAML configuration file. from_dict(): Construct from a pre-parsed dictionary.
Source code in src/bigbrotr/core/brotr.py
Attributes¶
Functions¶
from_yaml
classmethod
¶
from_yaml(config_path: str) -> Brotr
Create a Brotr instance from a YAML configuration file.
The YAML file should contain a pool key for
Pool connection settings and optional
batch/timeouts keys for Brotr-specific settings. Delegates
to load_yaml() for safe YAML
parsing.
Parameters:
-
config_path(str) –Path to the YAML configuration file.
Returns:
-
Brotr–A configured Brotr instance (not yet connected).
See Also
from_dict(): Construct from a pre-parsed dictionary.
Source code in src/bigbrotr/core/brotr.py
from_dict
classmethod
¶
from_dict(config_dict: dict[str, Any]) -> Brotr
Create a Brotr instance from a configuration dictionary.
Extracts the pool key to build the
Pool and passes remaining keys as
BrotrConfig fields (batch sizes,
timeouts).
Source code in src/bigbrotr/core/brotr.py
fetch
async
¶
Execute a query and return all rows.
Delegates to Pool.fetch() with timeout from config.timeouts.query.
Parameters:
-
query(str) –SQL query with
$1,$2, ... placeholders. -
*args(Any, default:()) –Query parameters.
Source code in src/bigbrotr/core/brotr.py
fetchrow
async
¶
Execute a query and return the first row.
Delegates to Pool.fetchrow() with timeout from config.timeouts.query.
Parameters:
-
query(str) –SQL query with
$1,$2, ... placeholders. -
*args(Any, default:()) –Query parameters.
Source code in src/bigbrotr/core/brotr.py
fetchval
async
¶
Execute a query and return the first column of the first row.
Delegates to Pool.fetchval() with timeout from config.timeouts.query.
Parameters:
-
query(str) –SQL query with
$1,$2, ... placeholders. -
*args(Any, default:()) –Query parameters.
Source code in src/bigbrotr/core/brotr.py
execute
async
¶
Execute a query and return the command status string.
Delegates to Pool.execute() with timeout from config.timeouts.query.
Parameters:
-
query(str) –SQL query with
$1,$2, ... placeholders. -
*args(Any, default:()) –Query parameters.
Source code in src/bigbrotr/core/brotr.py
transaction
¶
Return a transaction context manager from the pool.
The transaction commits automatically on normal exit and rolls back if an exception propagates. Delegates to Pool.transaction().
Yields:
-
AbstractAsyncContextManager[Connection[Record]]–An asyncpg connection with an active transaction. The
-
AbstractAsyncContextManager[Connection[Record]]–transaction commits on normal exit and rolls back on exception.
Examples:
async with brotr.transaction() as conn:
await conn.execute("INSERT INTO ...")
await conn.execute("DELETE FROM ...")
See Also
Pool.transaction(): Underlying pool method.
Source code in src/bigbrotr/core/brotr.py
insert_relay
async
¶
insert_relay(records: list[Relay]) -> int
Bulk-insert relay records into the relay table.
Calls the relay_insert stored procedure with transposed column
arrays for single-roundtrip efficiency.
Parameters:
Returns:
-
int–Number of new relays inserted (duplicates are skipped).
Raises:
-
PostgresError–On database errors.
-
ValueError–If the batch exceeds the configured maximum size from BatchConfig.
See Also
insert_event_relay(): Cascade insert that also creates relay records.
Source code in src/bigbrotr/core/brotr.py
insert_event
async
¶
insert_event(records: list[Event]) -> int
Bulk-insert event records into the event table only.
Does not create relay associations. Use
insert_event_relay()
with cascade=True to also insert relays and junction records.
Parameters:
Returns:
-
int–Number of new events inserted (duplicates are skipped).
Raises:
-
PostgresError–On database errors.
-
ValueError–If the batch exceeds the configured maximum size from BatchConfig.
See Also
insert_event_relay(): Cascade insert that creates events, relays, and junction records in a single stored procedure call.
Source code in src/bigbrotr/core/brotr.py
insert_event_relay
async
¶
insert_event_relay(
records: list[EventRelay], *, cascade: bool = True
) -> int
Bulk-insert event-relay junction records.
Parameters:
-
records(list[EventRelay]) –Validated EventRelay dataclass instances.
-
cascade(bool, default:True) –
Returns:
-
int–Number of new junction records inserted.
Raises:
-
PostgresError–On database errors.
-
ValueError–If the batch exceeds the configured maximum size from BatchConfig.
See Also
insert_event(): Insert events without relay associations. insert_relay(): Insert relays without event associations.
Source code in src/bigbrotr/core/brotr.py
insert_metadata
async
¶
insert_metadata(records: list[Metadata]) -> int
Bulk-insert metadata records into the metadata table.
Metadata is content-addressed: each record's SHA-256 hash combined with its metadata type forms the composite primary key, providing automatic deduplication within each type. The hash is computed in Python for deterministic behavior across environments.
Use
insert_relay_metadata()
with cascade=True to also create the relay association in a
single stored procedure call.
Parameters:
Returns:
-
int–Number of new metadata records inserted (duplicates are skipped).
Raises:
-
PostgresError–On database errors.
-
ValueError–If the batch exceeds the configured maximum size from BatchConfig.
Note
The metadata table has columns id, type, and
data with composite PK (id, type).
The SHA-256 hash is computed over the canonical JSON representation
in the Metadata model's
__post_init__ method.
See Also
insert_relay_metadata(): Cascade insert that also creates relay-metadata junction records.
Source code in src/bigbrotr/core/brotr.py
insert_relay_metadata
async
¶
insert_relay_metadata(
records: list[RelayMetadata], *, cascade: bool = True
) -> int
Bulk-insert relay-metadata junction records.
Links relays to content-addressed metadata records. SHA-256 hashes are computed in Python for deterministic deduplication.
Parameters:
-
records(list[RelayMetadata]) –Validated RelayMetadata dataclass instances.
-
cascade(bool, default:True) –
Returns:
-
int–Number of new relay-metadata records inserted.
Raises:
-
PostgresError–On database errors.
-
ValueError–If the batch exceeds the configured maximum size from BatchConfig.
See Also
insert_metadata(): Insert metadata without relay associations. insert_relay(): Insert relays without metadata associations.
Source code in src/bigbrotr/core/brotr.py
delete_orphan_event
async
¶
Delete events that have no associated relay in the junction table.
Orphaned events occur when relays are deleted or events were
inserted without relay associations. Removing them reclaims
storage and maintains referential consistency. Calls the
orphan_event_delete stored procedure.
Returns:
-
int–Number of orphaned events deleted.
Raises:
-
PostgresError–On database errors.
See Also
delete_orphan_metadata(): Companion cleanup for orphaned metadata records.
Source code in src/bigbrotr/core/brotr.py
delete_orphan_metadata
async
¶
Delete metadata records that have no associated relay in the junction table.
Orphaned metadata occurs when all relay associations for a content-
addressed blob are removed (e.g., superseded NIP-11 or NIP-66 data).
Removing them reclaims storage. Calls the orphan_metadata_delete
stored procedure.
Returns:
-
int–Number of orphaned metadata records deleted.
Raises:
-
PostgresError–On database errors.
See Also
delete_orphan_event(): Companion cleanup for orphaned event records.
Source code in src/bigbrotr/core/brotr.py
upsert_service_state
async
¶
upsert_service_state(records: list[ServiceState]) -> int
Atomically upsert service state records using bulk array parameters.
Services use this to persist operational state (cursors, checkpoints,
candidates) across restarts. Each record is identified by the
composite key (service_name, state_type, state_key). Calls the
service_state_upsert stored procedure.
Parameters:
-
records(list[ServiceState]) –List of ServiceState dataclass instances.
Returns:
-
int–Number of records upserted.
See Also
get_service_state(): Retrieve persisted state records. delete_service_state(): Remove persisted state records.
Source code in src/bigbrotr/core/brotr.py
get_service_state
async
¶
get_service_state(
service_name: ServiceName,
state_type: ServiceStateType,
key: str | None = None,
) -> list[ServiceState]
Retrieve persisted service state records.
Calls the service_state_get stored procedure.
Parameters:
-
service_name(ServiceName) –Owning service name (e.g.
ServiceName.FINDER). -
state_type(ServiceStateType) –Category of state. See ServiceStateType for the canonical enum values.
-
key(str | None, default:None) –Specific record key, or
Noneto retrieve all records matching the service/type combination.
Returns:
-
list[ServiceState]–List of ServiceState
-
list[ServiceState]–instances reconstructed from the database rows.
See Also
upsert_service_state(): Persist state records. delete_service_state(): Remove state records.
Source code in src/bigbrotr/core/brotr.py
delete_service_state
async
¶
delete_service_state(
service_names: list[ServiceName],
state_types: list[ServiceStateType],
state_keys: list[str],
) -> int
Atomically delete service state records by composite key.
Calls the service_state_delete stored procedure with three
parallel arrays identifying the records to remove.
Parameters:
-
service_names(list[ServiceName]) –Service name for each record.
-
state_types(list[ServiceStateType]) –State type for each record.
-
state_keys(list[str]) –State key for each record.
Returns:
-
int–Number of records actually deleted.
See Also
upsert_service_state(): Persist state records. get_service_state(): Retrieve state records.
Source code in src/bigbrotr/core/brotr.py
refresh_materialized_view
async
¶
Refresh a materialized view concurrently (non-blocking).
Calls a stored procedure named {view_name}_refresh which
performs REFRESH MATERIALIZED VIEW CONCURRENTLY. The view
name is validated by
_call_procedure()
against a strict SQL identifier regex to prevent injection.
Parameters:
-
view_name(str) –Name of the materialized view to refresh (e.g.
"relay_metadata_latest","event_stats").
Raises:
-
ValueError–If the view name is not a valid SQL identifier.
Note
The timeout for refresh operations defaults to None
(infinite) via
TimeoutsConfig.refresh
because REFRESH MATERIALIZED VIEW CONCURRENTLY can take
several minutes on large tables with complex indexes.