Skip to content

pool

pool

Async PostgreSQL connection pool built on asyncpg.

Manages a pool of database connections with configurable size limits, automatic retry with exponential backoff on connection failures, health-checked connection acquisition, and transactional context managers. Compatible with PGBouncer for connection multiplexing in containerized deployments.

All query methods (fetch(), fetchrow(), fetchval(), execute()) retry automatically on transient connection errors (InterfaceError, ConnectionDoesNotExistError) but do not retry on query-level errors such as syntax errors or constraint violations.

Examples:

pool = Pool.from_yaml("config.yaml")

async with pool:
    rows = await pool.fetch("SELECT * FROM relay LIMIT 10")

    async with pool.transaction() as conn:
        await conn.execute("INSERT INTO relay ...")
See Also

Brotr: High-level database facade that wraps this pool and exposes domain-specific insert/query methods. PoolConfig: Aggregate configuration grouping all pool-related settings.

Classes

DatabaseConfig

Bases: BaseModel

PostgreSQL connection parameters.

The password is loaded from the environment variable named by password_env (default: DB_ADMIN_PASSWORD). It is never read from configuration files directly.

Warning

The password field is a SecretStr and will never appear in string representations or serialized output. Ensure the environment variable named by password_env is set before constructing this model.

See Also

PoolConfig: Parent configuration that embeds this model.

Functions
resolve_password classmethod
resolve_password(data: dict[str, Any]) -> dict[str, Any]

Resolve the database password from the environment variable.

Source code in src/bigbrotr/core/pool.py
@model_validator(mode="before")
@classmethod
def resolve_password(cls, data: dict[str, Any]) -> dict[str, Any]:
    """Resolve the database password from the environment variable."""
    if isinstance(data, dict) and "password" not in data:
        env_var = data.get("password_env", "DB_ADMIN_PASSWORD")  # pragma: allowlist secret
        value = os.getenv(env_var)
        if not value:
            raise ValueError(f"{env_var} environment variable not set")
        data["password"] = SecretStr(value)
    return data

LimitsConfig

Bases: BaseModel

Connection pool size and resource limits.

Controls the minimum and maximum number of connections maintained by the pool, the query count before a connection is recycled, and the idle timeout before an unused connection is closed.

Note

max_queries triggers connection recycling to prevent memory leaks in long-lived connections. The default of 50,000 queries balances connection reuse against the overhead of establishing new connections.

See Also

PoolConfig: Parent configuration that embeds this model.

Functions
validate_max_size classmethod
validate_max_size(v: int, info: ValidationInfo) -> int

Ensure max_size >= min_size.

Source code in src/bigbrotr/core/pool.py
@field_validator("max_size")
@classmethod
def validate_max_size(cls, v: int, info: ValidationInfo) -> int:
    """Ensure max_size >= min_size."""
    min_size = info.data.get("min_size", 2)
    if v < min_size:
        raise ValueError(f"max_size ({v}) must be >= min_size ({min_size})")
    return v

TimeoutsConfig

Bases: BaseModel

Timeout settings for pool operations (in seconds).

See Also

TimeoutsConfig: Higher-level timeouts for query, batch, cleanup, and refresh operations. PoolConfig: Parent configuration that embeds this model.

RetryConfig

Bases: BaseModel

Retry strategy for failed connection attempts.

Supports both exponential and linear backoff between retries.

Note

Exponential backoff (the default) doubles the delay each attempt: initial_delay * 2^attempt, capped at max_delay. Linear backoff increases linearly: initial_delay * (attempt + 1). Exponential backoff is preferred for production to reduce thundering-herd effects during database recovery.

See Also

PoolConfig: Parent configuration that embeds this model.

Functions
validate_max_delay classmethod
validate_max_delay(v: float, info: ValidationInfo) -> float

Ensure max_delay >= initial_delay.

Source code in src/bigbrotr/core/pool.py
@field_validator("max_delay")
@classmethod
def validate_max_delay(cls, v: float, info: ValidationInfo) -> float:
    """Ensure max_delay >= initial_delay."""
    initial_delay = info.data.get("initial_delay", 1.0)
    if v < initial_delay:
        raise ValueError(f"max_delay ({v}) must be >= initial_delay ({initial_delay})")
    return v

ServerSettingsConfig

Bases: BaseModel

PostgreSQL server-side session settings.

These are sent as server_settings when creating the asyncpg pool and apply to every connection in the pool.

Note

When using PgBouncer in transaction pooling mode, statement_timeout is stripped by ignore_startup_parameters and never reaches PostgreSQL. Set to 0 (default) to avoid sending a parameter that has no effect. Use PgBouncer's query_timeout as the server-side safety net instead.

Timeout cascade (tightest wins):

  1. asyncpg client-side via TimeoutsConfig (query=60s, batch=120s, cleanup=90s).
  2. PgBouncer query_timeout (300s safety net for crashed clients).
  3. PostgreSQL statement_timeout in postgresql.conf (0 = disabled; handled by layers above).
See Also

PoolConfig: Parent configuration that embeds this model.

PoolConfig

Bases: BaseModel

Aggregate configuration for the connection pool.

Groups all pool-related settings: database credentials, connection limits, timeouts, retry strategy, and PostgreSQL server settings.

See Also

DatabaseConfig: PostgreSQL connection credentials. LimitsConfig: Min/max connections and recycling thresholds. TimeoutsConfig: Acquisition and health-check timeouts. RetryConfig: Backoff strategy for connection failures. ServerSettingsConfig: PostgreSQL session-level settings. Pool: The pool class that consumes this configuration.

Pool

Pool(config: PoolConfig | None = None)

Async PostgreSQL connection pool manager.

Wraps asyncpg.Pool to provide retry logic, health-checked connection acquisition, and transactional context managers. Connections are initialized with JSON/JSONB codecs for transparent dict serialization.

Supports two construction patterns: direct instantiation with a PoolConfig object, or factory methods from_yaml() / from_dict() for configuration-driven setup.

Examples:

pool = Pool.from_yaml("config.yaml")

async with pool:
    rows = await pool.fetch("SELECT * FROM event LIMIT 10")

    async with pool.transaction() as conn:
        await conn.execute("INSERT INTO ...")
Note

Services should never use Pool directly for domain operations. Instead, use Brotr which wraps a private Pool instance and provides typed insert/query methods. Pool is exposed for advanced use cases such as custom health checks or direct SQL access outside the stored-procedure layer.

See Also

Brotr: High-level database facade that wraps this pool. PoolConfig: Full configuration model for this class. DatabaseConfig: Connection credential subset of the configuration.

Initialize pool with optional configuration.

The pool is created in a disconnected state. Call connect() or use the async context manager to establish the connection.

Parameters:

  • config (PoolConfig | None, default: None ) –

    Pool configuration. If not provided, uses defaults which read DB_ADMIN_PASSWORD from the environment.

See Also

from_yaml(): Construct from a YAML file. from_dict(): Construct from a dictionary.

Source code in src/bigbrotr/core/pool.py
def __init__(self, config: PoolConfig | None = None) -> None:
    """Initialize pool with optional configuration.

    The pool is created in a disconnected state. Call
    [connect()][bigbrotr.core.pool.Pool.connect] or use the async
    context manager to establish the connection.

    Args:
        config: Pool configuration. If not provided, uses defaults
            which read ``DB_ADMIN_PASSWORD`` from the environment.

    See Also:
        [from_yaml()][bigbrotr.core.pool.Pool.from_yaml]: Construct from
            a YAML file.
        [from_dict()][bigbrotr.core.pool.Pool.from_dict]: Construct from
            a dictionary.
    """
    self._config = config or PoolConfig()
    self._pool: asyncpg.Pool[asyncpg.Record] | None = None
    self._is_connected: bool = False
    self._connection_lock = asyncio.Lock()
    self._logger = Logger("pool")
Attributes
is_connected property
is_connected: bool

Whether the pool has an active connection to the database.

config property
config: PoolConfig

The pool configuration (read-only).

Functions
from_yaml classmethod
from_yaml(config_path: str) -> Pool

Create a Pool from a YAML configuration file.

Delegates to load_yaml() for safe YAML parsing, then to from_dict() for construction.

Parameters:

  • config_path (str) –

    Path to the YAML configuration file.

Returns:

  • Pool

    A configured Pool instance (not yet connected).

Raises:

  • FileNotFoundError

    If the configuration file does not exist.

See Also

from_dict(): Construct from a pre-parsed dictionary.

Source code in src/bigbrotr/core/pool.py
@classmethod
def from_yaml(cls, config_path: str) -> Pool:
    """Create a Pool from a YAML configuration file.

    Delegates to [load_yaml()][bigbrotr.core.yaml.load_yaml] for safe
    YAML parsing, then to
    [from_dict()][bigbrotr.core.pool.Pool.from_dict] for construction.

    Args:
        config_path: Path to the YAML configuration file.

    Returns:
        A configured Pool instance (not yet connected).

    Raises:
        FileNotFoundError: If the configuration file does not exist.

    See Also:
        [from_dict()][bigbrotr.core.pool.Pool.from_dict]: Construct from
            a pre-parsed dictionary.
    """
    return cls.from_dict(load_yaml(config_path))
from_dict classmethod
from_dict(config_dict: dict[str, Any]) -> Pool

Create a Pool from a configuration dictionary.

Parameters:

  • config_dict (dict[str, Any]) –

    Dictionary with pool settings matching PoolConfig field names.

Returns:

  • Pool

    A configured Pool instance (not yet connected).

Source code in src/bigbrotr/core/pool.py
@classmethod
def from_dict(cls, config_dict: dict[str, Any]) -> Pool:
    """Create a Pool from a configuration dictionary.

    Args:
        config_dict: Dictionary with pool settings matching
            [PoolConfig][bigbrotr.core.pool.PoolConfig] field names.

    Returns:
        A configured Pool instance (not yet connected).
    """
    config = PoolConfig(**config_dict)
    return cls(config=config)
connect async
connect() -> None

Create the asyncpg connection pool with retry on failure.

Uses exponential or linear backoff (per RetryConfig) between attempts. Thread-safe: guarded by an internal asyncio lock to prevent concurrent pool creation.

Raises:

  • ConnectionError

    If all retry attempts are exhausted.

Note

The retry strategy uses exponential backoff by default (initial_delay * 2^attempt, capped at max_delay) to avoid overwhelming a recovering database. Each new connection is initialized with JSON/JSONB codecs via _init_connection for transparent dict serialization.

Source code in src/bigbrotr/core/pool.py
async def connect(self) -> None:
    """Create the asyncpg connection pool with retry on failure.

    Uses exponential or linear backoff (per
    [RetryConfig][bigbrotr.core.pool.RetryConfig]) between
    attempts. Thread-safe: guarded by an internal asyncio lock to prevent
    concurrent pool creation.

    Raises:
        ConnectionError: If all retry attempts are exhausted.

    Note:
        The retry strategy uses exponential backoff by default
        (``initial_delay * 2^attempt``, capped at ``max_delay``) to
        avoid overwhelming a recovering database. Each new connection
        is initialized with JSON/JSONB codecs via ``_init_connection``
        for transparent dict serialization.
    """
    async with self._connection_lock:
        if self._is_connected:
            return

        db = self._config.database

        self._logger.info(
            "connection_starting",
            host=db.host,
            port=db.port,
            database=db.database,
        )

        for attempt in range(self._config.retry.max_attempts):
            try:
                server_settings: dict[str, str] = {
                    "application_name": self._config.server_settings.application_name,
                    "timezone": self._config.server_settings.timezone,
                }
                if self._config.server_settings.statement_timeout > 0:
                    server_settings["statement_timeout"] = str(
                        self._config.server_settings.statement_timeout
                    )

                self._pool = await asyncpg.create_pool(
                    host=db.host,
                    port=db.port,
                    database=db.database,
                    user=db.user,
                    password=db.password.get_secret_value(),
                    min_size=self._config.limits.min_size,
                    max_size=self._config.limits.max_size,
                    max_queries=self._config.limits.max_queries,
                    max_inactive_connection_lifetime=self._config.limits.max_inactive_connection_lifetime,
                    timeout=self._config.timeouts.acquisition,
                    statement_cache_size=0,
                    init=_init_connection,
                    server_settings=server_settings,
                )
                self._is_connected = True
                self._logger.info("connection_established")
                return

            except (asyncpg.PostgresError, OSError, ConnectionError) as e:
                if attempt + 1 >= self._config.retry.max_attempts:
                    self._logger.error(
                        "connection_failed",
                        attempts=attempt + 1,
                        error=str(e),
                    )
                    raise ConnectionError(
                        f"Failed to connect after {attempt + 1} attempts: {e}"
                    ) from e

                delay = self._retry_delay(attempt)
                self._logger.warning(
                    "connection_retry",
                    attempt=attempt + 1,
                    delay=delay,
                    error=str(e),
                )
                await asyncio.sleep(delay)
close async
close() -> None

Close the pool and release all connections.

Idempotent: safe to call multiple times. Always resets internal state even if the underlying close raises an exception.

Source code in src/bigbrotr/core/pool.py
async def close(self) -> None:
    """Close the pool and release all connections.

    Idempotent: safe to call multiple times. Always resets internal
    state even if the underlying close raises an exception.
    """
    async with self._connection_lock:
        if self._pool is not None:
            try:
                await self._pool.close()
                self._logger.info("connection_closed")
            finally:
                self._pool = None
                self._is_connected = False
acquire
acquire() -> AbstractAsyncContextManager[
    Connection[Record]
]

Acquire a connection from the pool.

Returns an async context manager that yields a connection. The connection is automatically returned to the pool when the context exits.

Yields:

  • AbstractAsyncContextManager[Connection[Record]]

    An asyncpg connection from the pool.

Raises:

  • RuntimeError

    If the pool has not been connected yet.

Examples:

async with pool.acquire() as conn:
    result = await conn.fetch("SELECT * FROM event")
See Also

transaction(): Acquire with an active database transaction.

Source code in src/bigbrotr/core/pool.py
def acquire(self) -> AbstractAsyncContextManager[asyncpg.Connection[asyncpg.Record]]:
    """Acquire a connection from the pool.

    Returns an async context manager that yields a connection. The
    connection is automatically returned to the pool when the context
    exits.

    Yields:
        An asyncpg connection from the pool.

    Raises:
        RuntimeError: If the pool has not been connected yet.

    Examples:
        ```python
        async with pool.acquire() as conn:
            result = await conn.fetch("SELECT * FROM event")
        ```

    See Also:
        [transaction()][bigbrotr.core.pool.Pool.transaction]: Acquire
            with an active database transaction.
    """
    if not self._is_connected or self._pool is None:
        raise RuntimeError("Pool not connected. Call connect() first.")
    # asyncpg's PoolAcquireContext is duck-type compatible with AbstractAsyncContextManager
    return cast(
        "AbstractAsyncContextManager[asyncpg.Connection[asyncpg.Record]]",
        self._pool.acquire(),
    )
transaction async
transaction() -> AsyncIterator[Connection[Record]]

Acquire a connection with an active database transaction.

The transaction commits automatically on normal exit and rolls back if an exception propagates out of the context manager.

Yields:

  • AsyncIterator[Connection[Record]]

    An asyncpg connection with an active transaction. The

  • AsyncIterator[Connection[Record]]

    transaction commits on normal exit and rolls back on exception.

Raises:

  • RuntimeError

    If the pool has not been connected yet.

  • PostgresError

    On database errors (triggers rollback).

Examples:

async with pool.transaction() as conn:
    await conn.execute("INSERT INTO event ...")
    await conn.execute("INSERT INTO relay ...")
    # Both succeed together or roll back on error.
See Also

acquire(): Acquire without a transaction (auto-commit mode). Brotr.transaction(): Higher-level facade that delegates to this method.

Source code in src/bigbrotr/core/pool.py
@asynccontextmanager
async def transaction(self) -> AsyncIterator[asyncpg.Connection[asyncpg.Record]]:
    """Acquire a connection with an active database transaction.

    The transaction commits automatically on normal exit and rolls back
    if an exception propagates out of the context manager.

    Yields:
        An asyncpg connection with an active transaction. The
        transaction commits on normal exit and rolls back on exception.

    Raises:
        RuntimeError: If the pool has not been connected yet.
        asyncpg.PostgresError: On database errors (triggers rollback).

    Examples:
        ```python
        async with pool.transaction() as conn:
            await conn.execute("INSERT INTO event ...")
            await conn.execute("INSERT INTO relay ...")
            # Both succeed together or roll back on error.
        ```

    See Also:
        [acquire()][bigbrotr.core.pool.Pool.acquire]: Acquire without
            a transaction (auto-commit mode).
        [Brotr.transaction()][bigbrotr.core.brotr.Brotr.transaction]:
            Higher-level facade that delegates to this method.
    """
    async with self.acquire() as conn, conn.transaction():
        yield conn
fetch async
fetch(
    query: str, *args: Any, timeout: float | None = None
) -> list[Record]

Execute a query and return all matching rows.

Parameters:

  • query (str) –

    SQL query with $1, $2, ... placeholders.

  • *args (Any, default: () ) –

    Query parameters.

  • timeout (float | None, default: None ) –

    Query timeout in seconds (None = no timeout).

Returns:

  • list[Record]

    List of asyncpg Record objects. Empty list if no rows match.

Source code in src/bigbrotr/core/pool.py
async def fetch(
    self,
    query: str,
    *args: Any,
    timeout: float | None = None,  # noqa: ASYNC109
) -> list[asyncpg.Record]:
    """Execute a query and return all matching rows.

    Args:
        query: SQL query with $1, $2, ... placeholders.
        *args: Query parameters.
        timeout: Query timeout in seconds (None = no timeout).

    Returns:
        List of asyncpg Record objects. Empty list if no rows match.
    """
    result = await self._execute_with_retry("fetch", query, args, timeout)
    # Dynamic dispatch returns Any; narrow to the actual fetch() return type
    return cast("list[asyncpg.Record]", result)
fetchrow async
fetchrow(
    query: str, *args: Any, timeout: float | None = None
) -> Record | None

Execute a query and return the first row.

Parameters:

  • query (str) –

    SQL query with $1, $2, ... placeholders.

  • *args (Any, default: () ) –

    Query parameters.

  • timeout (float | None, default: None ) –

    Query timeout in seconds (None = no timeout).

Returns:

  • Record | None

    A single asyncpg Record, or None if the query returns no rows.

Source code in src/bigbrotr/core/pool.py
async def fetchrow(
    self,
    query: str,
    *args: Any,
    timeout: float | None = None,  # noqa: ASYNC109
) -> asyncpg.Record | None:
    """Execute a query and return the first row.

    Args:
        query: SQL query with $1, $2, ... placeholders.
        *args: Query parameters.
        timeout: Query timeout in seconds (None = no timeout).

    Returns:
        A single asyncpg Record, or None if the query returns no rows.
    """
    result = await self._execute_with_retry("fetchrow", query, args, timeout)
    # Dynamic dispatch returns Any; narrow to the actual fetchrow() return type
    return cast("asyncpg.Record | None", result)
fetchval async
fetchval(
    query: str,
    *args: Any,
    column: int = 0,
    timeout: float | None = None,
) -> Any

Execute a query and return a single scalar value.

Parameters:

  • query (str) –

    SQL query with $1, $2, ... placeholders.

  • *args (Any, default: () ) –

    Query parameters.

  • column (int, default: 0 ) –

    Zero-based column index to extract (default: 0).

  • timeout (float | None, default: None ) –

    Query timeout in seconds (None = no timeout).

Returns:

  • Any

    The value from the specified column of the first row,

  • Any

    or None if the query returns no rows.

Source code in src/bigbrotr/core/pool.py
async def fetchval(
    self,
    query: str,
    *args: Any,
    column: int = 0,
    timeout: float | None = None,  # noqa: ASYNC109
) -> Any:
    """Execute a query and return a single scalar value.

    Args:
        query: SQL query with $1, $2, ... placeholders.
        *args: Query parameters.
        column: Zero-based column index to extract (default: 0).
        timeout: Query timeout in seconds (None = no timeout).

    Returns:
        The value from the specified column of the first row,
        or None if the query returns no rows.
    """
    return await self._execute_with_retry("fetchval", query, args, timeout, column=column)
execute async
execute(
    query: str, *args: Any, timeout: float | None = None
) -> str

Execute a query and return the command status tag.

Parameters:

  • query (str) –

    SQL query with $1, $2, ... placeholders.

  • *args (Any, default: () ) –

    Query parameters.

  • timeout (float | None, default: None ) –

    Query timeout in seconds (None = no timeout).

Returns:

  • str

    PostgreSQL command status string (e.g. "INSERT 0 1", "UPDATE 5").

Source code in src/bigbrotr/core/pool.py
async def execute(self, query: str, *args: Any, timeout: float | None = None) -> str:  # noqa: ASYNC109
    """Execute a query and return the command status tag.

    Args:
        query: SQL query with $1, $2, ... placeholders.
        *args: Query parameters.
        timeout: Query timeout in seconds (None = no timeout).

    Returns:
        PostgreSQL command status string (e.g. "INSERT 0 1", "UPDATE 5").
    """
    result = await self._execute_with_retry("execute", query, args, timeout)
    # Dynamic dispatch returns Any; narrow to the actual execute() return type
    return cast("str", result)

Functions