pool
pool
¶
Async PostgreSQL connection pool built on asyncpg.
Manages a pool of database connections with configurable size limits, automatic retry with exponential backoff on connection failures, health-checked connection acquisition, and transactional context managers. Compatible with PGBouncer for connection multiplexing in containerized deployments.
All query methods (fetch(),
fetchrow(),
fetchval(),
execute()) retry automatically on
transient connection errors (InterfaceError,
ConnectionDoesNotExistError) but do not retry on query-level errors such as
syntax errors or constraint violations.
Examples:
pool = Pool.from_yaml("config.yaml")
async with pool:
rows = await pool.fetch("SELECT * FROM relay LIMIT 10")
async with pool.transaction() as conn:
await conn.execute("INSERT INTO relay ...")
See Also
Brotr: High-level database facade that wraps this pool and exposes domain-specific insert/query methods. PoolConfig: Aggregate configuration grouping all pool-related settings.
Classes¶
DatabaseConfig
¶
Bases: BaseModel
PostgreSQL connection parameters.
The password is loaded from the environment variable named by
password_env (default: DB_ADMIN_PASSWORD). It is never read from
configuration files directly.
Warning
The password field is a SecretStr and will never appear in
string representations or serialized output. Ensure the environment
variable named by password_env is set before constructing this
model.
See Also
PoolConfig: Parent configuration that embeds this model.
Functions¶
resolve_password
classmethod
¶
Resolve the database password from the environment variable.
Source code in src/bigbrotr/core/pool.py
LimitsConfig
¶
Bases: BaseModel
Connection pool size and resource limits.
Controls the minimum and maximum number of connections maintained by the pool, the query count before a connection is recycled, and the idle timeout before an unused connection is closed.
Note
max_queries triggers connection recycling to prevent memory leaks in
long-lived connections. The default of 50,000 queries balances connection
reuse against the overhead of establishing new connections.
See Also
PoolConfig: Parent configuration that embeds this model.
TimeoutsConfig
¶
Bases: BaseModel
Timeout settings for pool operations (in seconds).
See Also
TimeoutsConfig: Higher-level timeouts for query, batch, cleanup, and refresh operations. PoolConfig: Parent configuration that embeds this model.
RetryConfig
¶
Bases: BaseModel
Retry strategy for failed connection attempts.
Supports both exponential and linear backoff between retries.
Note
Exponential backoff (the default) doubles the delay each attempt:
initial_delay * 2^attempt, capped at max_delay. Linear backoff
increases linearly: initial_delay * (attempt + 1). Exponential
backoff is preferred for production to reduce thundering-herd effects
during database recovery.
See Also
PoolConfig: Parent configuration that embeds this model.
Functions¶
validate_max_delay
classmethod
¶
Ensure max_delay >= initial_delay.
Source code in src/bigbrotr/core/pool.py
ServerSettingsConfig
¶
Bases: BaseModel
PostgreSQL server-side session settings.
These are sent as server_settings when creating the asyncpg pool
and apply to every connection in the pool.
Note
When using PgBouncer in transaction pooling mode,
statement_timeout is stripped by ignore_startup_parameters
and never reaches PostgreSQL. Set to 0 (default) to avoid
sending a parameter that has no effect. Use PgBouncer's
query_timeout as the server-side safety net instead.
Timeout cascade (tightest wins):
- asyncpg client-side via
TimeoutsConfig(query=60s, batch=120s, cleanup=90s). - PgBouncer
query_timeout(300s safety net for crashed clients). - PostgreSQL
statement_timeoutinpostgresql.conf(0 = disabled; handled by layers above).
See Also
PoolConfig: Parent configuration that embeds this model.
PoolConfig
¶
Bases: BaseModel
Aggregate configuration for the connection pool.
Groups all pool-related settings: database credentials, connection limits, timeouts, retry strategy, and PostgreSQL server settings.
See Also
DatabaseConfig: PostgreSQL connection credentials. LimitsConfig: Min/max connections and recycling thresholds. TimeoutsConfig: Acquisition and health-check timeouts. RetryConfig: Backoff strategy for connection failures. ServerSettingsConfig: PostgreSQL session-level settings. Pool: The pool class that consumes this configuration.
Pool
¶
Pool(config: PoolConfig | None = None)
Async PostgreSQL connection pool manager.
Wraps asyncpg.Pool to provide retry logic, health-checked connection
acquisition, and transactional context managers. Connections are initialized
with JSON/JSONB codecs for transparent dict serialization.
Supports two construction patterns: direct instantiation with a PoolConfig object, or factory methods from_yaml() / from_dict() for configuration-driven setup.
Examples:
pool = Pool.from_yaml("config.yaml")
async with pool:
rows = await pool.fetch("SELECT * FROM event LIMIT 10")
async with pool.transaction() as conn:
await conn.execute("INSERT INTO ...")
Note
Services should never use Pool directly for domain operations.
Instead, use Brotr which wraps a private
Pool instance and provides typed insert/query methods. Pool is
exposed for advanced use cases such as custom health checks or direct
SQL access outside the stored-procedure layer.
See Also
Brotr: High-level database facade that wraps this pool. PoolConfig: Full configuration model for this class. DatabaseConfig: Connection credential subset of the configuration.
Initialize pool with optional configuration.
The pool is created in a disconnected state. Call connect() or use the async context manager to establish the connection.
Parameters:
-
config(PoolConfig | None, default:None) –Pool configuration. If not provided, uses defaults which read
DB_ADMIN_PASSWORDfrom the environment.
See Also
from_yaml(): Construct from a YAML file. from_dict(): Construct from a dictionary.
Source code in src/bigbrotr/core/pool.py
Attributes¶
is_connected
property
¶
Whether the pool has an active connection to the database.
Functions¶
from_yaml
classmethod
¶
from_yaml(config_path: str) -> Pool
Create a Pool from a YAML configuration file.
Delegates to load_yaml() for safe YAML parsing, then to from_dict() for construction.
Parameters:
-
config_path(str) –Path to the YAML configuration file.
Returns:
-
Pool–A configured Pool instance (not yet connected).
Raises:
-
FileNotFoundError–If the configuration file does not exist.
See Also
from_dict(): Construct from a pre-parsed dictionary.
Source code in src/bigbrotr/core/pool.py
from_dict
classmethod
¶
from_dict(config_dict: dict[str, Any]) -> Pool
Create a Pool from a configuration dictionary.
Parameters:
-
config_dict(dict[str, Any]) –Dictionary with pool settings matching PoolConfig field names.
Returns:
-
Pool–A configured Pool instance (not yet connected).
Source code in src/bigbrotr/core/pool.py
connect
async
¶
Create the asyncpg connection pool with retry on failure.
Uses exponential or linear backoff (per RetryConfig) between attempts. Thread-safe: guarded by an internal asyncio lock to prevent concurrent pool creation.
Raises:
-
ConnectionError–If all retry attempts are exhausted.
Note
The retry strategy uses exponential backoff by default
(initial_delay * 2^attempt, capped at max_delay) to
avoid overwhelming a recovering database. Each new connection
is initialized with JSON/JSONB codecs via _init_connection
for transparent dict serialization.
Source code in src/bigbrotr/core/pool.py
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 | |
close
async
¶
Close the pool and release all connections.
Idempotent: safe to call multiple times. Always resets internal state even if the underlying close raises an exception.
Source code in src/bigbrotr/core/pool.py
acquire
¶
Acquire a connection from the pool.
Returns an async context manager that yields a connection. The connection is automatically returned to the pool when the context exits.
Yields:
-
AbstractAsyncContextManager[Connection[Record]]–An asyncpg connection from the pool.
Raises:
-
RuntimeError–If the pool has not been connected yet.
Examples:
See Also
transaction(): Acquire with an active database transaction.
Source code in src/bigbrotr/core/pool.py
transaction
async
¶
Acquire a connection with an active database transaction.
The transaction commits automatically on normal exit and rolls back if an exception propagates out of the context manager.
Yields:
-
AsyncIterator[Connection[Record]]–An asyncpg connection with an active transaction. The
-
AsyncIterator[Connection[Record]]–transaction commits on normal exit and rolls back on exception.
Raises:
-
RuntimeError–If the pool has not been connected yet.
-
PostgresError–On database errors (triggers rollback).
Examples:
async with pool.transaction() as conn:
await conn.execute("INSERT INTO event ...")
await conn.execute("INSERT INTO relay ...")
# Both succeed together or roll back on error.
See Also
acquire(): Acquire without a transaction (auto-commit mode). Brotr.transaction(): Higher-level facade that delegates to this method.
Source code in src/bigbrotr/core/pool.py
fetch
async
¶
Execute a query and return all matching rows.
Parameters:
-
query(str) –SQL query with $1, $2, ... placeholders.
-
*args(Any, default:()) –Query parameters.
-
timeout(float | None, default:None) –Query timeout in seconds (None = no timeout).
Returns:
-
list[Record]–List of asyncpg Record objects. Empty list if no rows match.
Source code in src/bigbrotr/core/pool.py
fetchrow
async
¶
Execute a query and return the first row.
Parameters:
-
query(str) –SQL query with $1, $2, ... placeholders.
-
*args(Any, default:()) –Query parameters.
-
timeout(float | None, default:None) –Query timeout in seconds (None = no timeout).
Returns:
-
Record | None–A single asyncpg Record, or None if the query returns no rows.
Source code in src/bigbrotr/core/pool.py
fetchval
async
¶
Execute a query and return a single scalar value.
Parameters:
-
query(str) –SQL query with $1, $2, ... placeholders.
-
*args(Any, default:()) –Query parameters.
-
column(int, default:0) –Zero-based column index to extract (default: 0).
-
timeout(float | None, default:None) –Query timeout in seconds (None = no timeout).
Returns:
-
Any–The value from the specified column of the first row,
-
Any–or None if the query returns no rows.
Source code in src/bigbrotr/core/pool.py
execute
async
¶
Execute a query and return the command status tag.
Parameters:
-
query(str) –SQL query with $1, $2, ... placeholders.
-
*args(Any, default:()) –Query parameters.
-
timeout(float | None, default:None) –Query timeout in seconds (None = no timeout).
Returns:
-
str–PostgreSQL command status string (e.g. "INSERT 0 1", "UPDATE 5").