Skip to content

catalog

catalog

Schema introspection and safe query builder for read-only database access.

Discovers PostgreSQL tables, views, and materialized views at runtime, then provides parameterized query building with whitelist-by-construction column/table validation. Shared by the API and DVM services.

The Catalog performs four discovery queries at startup:

  1. information_schema.tables for base tables and regular views.
  2. pg_catalog.pg_matviews for materialized views (not in information_schema).
  3. pg_attribute for column metadata of all discovered objects (including materialized views, which are absent from information_schema.columns).
  4. pg_constraint + pg_attribute for primary keys, plus pg_index for materialized-view unique indexes.
See Also

Api: REST API service that uses the Catalog for endpoint generation. Dvm: NIP-90 DVM service that uses the Catalog for query execution.

Classes

CatalogError

CatalogError(message: str)

Bases: Exception

Client-safe error raised by Catalog operations.

Messages are always controlled literals or validated identifiers — never raw database error details.

The :attr:client_message attribute provides the sanitised string intended for HTTP/Nostr responses, avoiding str(exception) which static analysers flag as potential stack-trace exposure.

Source code in src/bigbrotr/services/common/catalog.py
def __init__(self, message: str) -> None:
    super().__init__(message)
    self.client_message: str = message

ColumnSchema dataclass

ColumnSchema(name: str, pg_type: str, nullable: bool)

Schema information for a single column.

TableSchema dataclass

TableSchema(
    name: str,
    columns: tuple[ColumnSchema, ...],
    primary_key: tuple[str, ...],
    is_view: bool,
)

Schema information for a table, view, or materialized view.

QueryResult dataclass

QueryResult(
    rows: list[dict[str, Any]],
    total: int,
    limit: int,
    offset: int,
)

Result of a paginated query.

Catalog

Catalog()

Schema introspector and safe parameterized query builder.

Usage::

catalog = Catalog()
await catalog.discover(brotr)
result = await catalog.query(brotr, "relay_stats", limit=100, offset=0)
Source code in src/bigbrotr/services/common/catalog.py
def __init__(self) -> None:
    self._tables: dict[str, TableSchema] = {}
Attributes
tables property
tables: dict[str, TableSchema]

Discovered table schemas, keyed by name.

Functions
discover async
discover(brotr: Brotr) -> None

Introspect the public schema and populate table metadata.

Queries information_schema for tables/views, pg_matviews for materialized views, pg_attribute for column info, and pg_constraint/pg_index for primary keys and unique indexes.

Source code in src/bigbrotr/services/common/catalog.py
async def discover(self, brotr: Brotr) -> None:
    """Introspect the public schema and populate table metadata.

    Queries information_schema for tables/views, pg_matviews for
    materialized views, pg_attribute for column info, and
    pg_constraint/pg_index for primary keys and unique indexes.
    """
    base_table_names, view_names = await self._discover_table_and_view_names(brotr)
    matview_names = await self._discover_matview_names(brotr)
    all_names = base_table_names | view_names | matview_names

    if not all_names:
        logger.warning("no tables or views discovered in public schema")
        return

    columns_by_table = await self._discover_columns(brotr, all_names)
    pk_by_table = await self._discover_primary_keys(brotr, base_table_names)
    unique_by_matview = await self._discover_matview_unique_indexes(brotr, matview_names)

    tables: dict[str, TableSchema] = {}
    for name in sorted(all_names):
        cols = columns_by_table.get(name, ())
        if not cols:
            continue
        pk = pk_by_table.get(name, ()) or unique_by_matview.get(name, ())
        is_view = name in view_names or name in matview_names
        tables[name] = TableSchema(
            name=name,
            columns=tuple(cols),
            primary_key=tuple(pk),
            is_view=is_view,
        )

    self._tables = tables
    logger.info(
        "catalog_discovered tables=%d views=%d",
        sum(1 for t in tables.values() if not t.is_view),
        sum(1 for t in tables.values() if t.is_view),
    )
query async
query(
    brotr: Brotr,
    table: str,
    *,
    limit: int,
    offset: int,
    max_page_size: int = 1000,
    filters: dict[str, str] | None = None,
    sort: str | None = None,
) -> QueryResult

Execute a safe paginated query against a discovered table.

Parameters:

  • brotr (Brotr) –

    Database interface.

  • table (str) –

    Table or view name (must exist in discovered schema).

  • limit (int) –

    Maximum rows to return.

  • offset (int) –

    Number of rows to skip.

  • max_page_size (int, default: 1000 ) –

    Hard ceiling on limit.

  • filters (dict[str, str] | None, default: None ) –

    Column filters as {column: "op:value"} or {column: "value"} (defaults to =).

  • sort (str | None, default: None ) –

    Sort specification as "column" or "column:desc".

Returns:

Raises:

  • CatalogError

    If the table, column, or operator is invalid.

Source code in src/bigbrotr/services/common/catalog.py
async def query(  # noqa: PLR0913
    self,
    brotr: Brotr,
    table: str,
    *,
    limit: int,
    offset: int,
    max_page_size: int = 1000,
    filters: dict[str, str] | None = None,
    sort: str | None = None,
) -> QueryResult:
    """Execute a safe paginated query against a discovered table.

    Args:
        brotr: Database interface.
        table: Table or view name (must exist in discovered schema).
        limit: Maximum rows to return.
        offset: Number of rows to skip.
        max_page_size: Hard ceiling on limit.
        filters: Column filters as ``{column: "op:value"}`` or
            ``{column: "value"}`` (defaults to ``=``).
        sort: Sort specification as ``"column"`` or ``"column:desc"``.

    Returns:
        Paginated query result.

    Raises:
        CatalogError: If the table, column, or operator is invalid.
    """
    schema = self._get_schema(table)
    limit = min(max(limit, 1), max_page_size)
    offset = min(max(offset, 0), _MAX_OFFSET)

    col_names = {c.name for c in schema.columns}
    col_types = {c.name: c.pg_type for c in schema.columns}

    # Build SELECT columns with type transforms
    select_cols = self._build_select_columns(schema.columns)

    # Build WHERE clause
    where_clauses: list[str] = []
    params: list[Any] = []
    param_idx = 1

    if filters:
        for col, raw_value in filters.items():
            if col not in col_names:
                raise CatalogError(f"Unknown column: {col}")
            op, value = self._parse_filter(raw_value)
            cast = self._param_cast(col_types[col])
            where_clauses.append(f"{col} {op} ${param_idx}{cast}")
            if col_types[col] in _BYTEA_TYPES:
                try:
                    params.append(bytes.fromhex(value))
                except ValueError as e:
                    raise CatalogError(f"Invalid hex value for column {col}: {value}") from e
            else:
                params.append(value)
            param_idx += 1

    where_sql = (" WHERE " + " AND ".join(where_clauses)) if where_clauses else ""

    # Build ORDER BY
    order_sql = ""
    if sort:
        order_col, order_dir = self._parse_sort(sort)
        if order_col not in col_names:
            raise CatalogError(f"Unknown sort column: {order_col}")
        order_sql = f" ORDER BY {order_col} {order_dir}"

    # Count query + data query (wrapped to convert DB type errors to ValueError)
    count_query = f"SELECT COUNT(*)::int FROM {table}{where_sql}"  # noqa: S608
    data_query = (
        f"SELECT {select_cols} FROM {table}{where_sql}{order_sql}"  # noqa: S608
        f" LIMIT ${param_idx} OFFSET ${param_idx + 1}"
    )

    try:
        total: int = await brotr.fetchval(count_query, *params)
        params.extend([limit, offset])
        rows = await brotr.fetch(data_query, *params)
    except asyncpg.DataError as e:
        raise CatalogError("Invalid filter value") from e

    return QueryResult(
        rows=[dict(row) for row in rows],
        total=total,
        limit=limit,
        offset=offset,
    )
get_by_pk async
get_by_pk(
    brotr: Brotr, table: str, pk_values: dict[str, str]
) -> dict[str, Any] | None

Fetch a single row by primary key.

Parameters:

  • brotr (Brotr) –

    Database interface.

  • table (str) –

    Table name.

  • pk_values (dict[str, str]) –

    Primary key column-value pairs.

Returns:

  • dict[str, Any] | None

    Row as a dict, or None if not found.

Raises:

  • CatalogError

    If the table has no primary key or values are missing.

Source code in src/bigbrotr/services/common/catalog.py
async def get_by_pk(
    self,
    brotr: Brotr,
    table: str,
    pk_values: dict[str, str],
) -> dict[str, Any] | None:
    """Fetch a single row by primary key.

    Args:
        brotr: Database interface.
        table: Table name.
        pk_values: Primary key column-value pairs.

    Returns:
        Row as a dict, or None if not found.

    Raises:
        CatalogError: If the table has no primary key or values are missing.
    """
    schema = self._get_schema(table)
    if not schema.primary_key:
        raise CatalogError(f"Table {table} has no primary key")

    col_types = {c.name: c.pg_type for c in schema.columns}
    select_cols = self._build_select_columns(schema.columns)

    where_parts: list[str] = []
    params: list[Any] = []
    for i, pk_col in enumerate(schema.primary_key, 1):
        if pk_col not in pk_values:
            raise CatalogError(f"Missing primary key column: {pk_col}")
        cast = self._param_cast(col_types[pk_col])
        where_parts.append(f"{pk_col} = ${i}{cast}")
        value = pk_values[pk_col]
        if col_types[pk_col] in _BYTEA_TYPES:
            try:
                params.append(bytes.fromhex(value))
            except ValueError as e:
                raise CatalogError(f"Invalid hex value for column {pk_col}: {value}") from e
        else:
            params.append(value)

    where_sql = " AND ".join(where_parts)
    query = f"SELECT {select_cols} FROM {table} WHERE {where_sql}"  # noqa: S608

    try:
        row = await brotr.fetchrow(query, *params)
    except asyncpg.DataError as e:
        raise CatalogError("Invalid parameter value") from e

    return dict(row) if row else None