Skip to content

mixins

mixins

Reusable service mixins for BigBrotr.

All service extensions live here as mixin classes. Each mixin uses cooperative multiple inheritance (super().__init__(**kwargs)) so that initialization is handled automatically via the MRO — no explicit _init_*() calls are needed in service constructors.

See Also

BaseService: The base class that mixin classes are composed with via multiple inheritance. NetworksConfig: Provides max_tasks values consumed by NetworkSemaphoresMixin.

Classes

NetworkSemaphores

NetworkSemaphores(networks: NetworksConfig)

Per-network concurrency semaphores.

Creates an asyncio.Semaphore for each operational NetworkType (clearnet, Tor, I2P, Lokinet) to cap the number of simultaneous connections.

See Also

NetworksConfig: Provides max_tasks per network type.

Source code in src/bigbrotr/services/common/mixins.py
def __init__(self, networks: NetworksConfig) -> None:
    self._map: dict[NetworkType, asyncio.Semaphore] = {
        nt: asyncio.Semaphore(networks.get(nt).max_tasks) for nt in OPERATIONAL_NETWORKS
    }
Functions
get
get(network: NetworkType) -> Semaphore | None

Look up the concurrency semaphore for a network type.

Returns:

  • Semaphore | None

    The semaphore, or None for non-operational networks

  • Semaphore | None

    (LOCAL, UNKNOWN).

Source code in src/bigbrotr/services/common/mixins.py
def get(self, network: NetworkType) -> asyncio.Semaphore | None:
    """Look up the concurrency semaphore for a network type.

    Returns:
        The semaphore, or ``None`` for non-operational networks
        (LOCAL, UNKNOWN).
    """
    return self._map.get(network)

NetworkSemaphoresMixin

NetworkSemaphoresMixin(**kwargs: Any)

Mixin providing per-network concurrency semaphores.

Exposes a network_semaphores attribute of type NetworkSemaphores, initialized from the networks keyword argument.

Services must pass networks=config.networks in their super().__init__() call.

See Also

Validator, Monitor, Synchronizer: Services that compose this mixin for bounded concurrency.

Source code in src/bigbrotr/services/common/mixins.py
def __init__(self, **kwargs: Any) -> None:
    networks: NetworksConfig = kwargs.pop("networks")
    super().__init__(**kwargs)
    self.network_semaphores = NetworkSemaphores(networks)

ConcurrentStreamMixin

Mixin providing concurrent item processing with streaming results.

Adds _iter_concurrent() which launches asyncio.TaskGroup tasks and streams results through an asyncio.Queue bridge as each worker completes — enabling progressive metric updates instead of waiting for all items to finish.

See Also

Finder, Synchronizer, Monitor, Validator: Services that compose this mixin.

GeoReaders

GeoReaders()

GeoIP database reader container for city and ASN lookups.

Manages the lifecycle of geoip2.database.Reader instances. Reader initialization is offloaded to a thread via open() to avoid blocking the event loop.

Attributes:

  • city (Reader | None) –

    GeoLite2-City reader for geolocation lookups, or None.

  • asn (Reader | None) –

    GeoLite2-ASN reader for network info lookups, or None.

See Also

GeoReaderMixin: Mixin that exposes a geo_readers attribute of this type.

Source code in src/bigbrotr/services/common/mixins.py
def __init__(self) -> None:
    self.city: geoip2.database.Reader | None = None
    self.asn: geoip2.database.Reader | None = None
Functions
open async
open(
    *,
    city_path: str | None = None,
    asn_path: str | None = None,
) -> None

Open GeoIP readers from file paths via asyncio.to_thread.

Parameters:

  • city_path (str | None, default: None ) –

    Path to GeoLite2-City database. None to skip.

  • asn_path (str | None, default: None ) –

    Path to GeoLite2-ASN database. None to skip.

Source code in src/bigbrotr/services/common/mixins.py
async def open(
    self,
    *,
    city_path: str | None = None,
    asn_path: str | None = None,
) -> None:
    """Open GeoIP readers from file paths via ``asyncio.to_thread``.

    Args:
        city_path: Path to GeoLite2-City database. ``None`` to skip.
        asn_path: Path to GeoLite2-ASN database. ``None`` to skip.
    """
    import geoip2.database as geoip2_db  # noqa: PLC0415  # runtime import

    if city_path:
        self.city = await asyncio.to_thread(geoip2_db.Reader, city_path)
    if asn_path:
        self.asn = await asyncio.to_thread(geoip2_db.Reader, asn_path)
close
close() -> None

Close readers and set to None. Idempotent.

Source code in src/bigbrotr/services/common/mixins.py
def close(self) -> None:
    """Close readers and set to ``None``. Idempotent."""
    if self.city:
        self.city.close()
        self.city = None
    if self.asn:
        self.asn.close()
        self.asn = None

GeoReaderMixin

GeoReaderMixin(**kwargs: Any)

Mixin providing GeoIP database reader lifecycle management.

Exposes a geo_readers attribute of type GeoReaders.

Note

Call geo_readers.close() in a finally block or __aexit__.

See Also

Monitor: The service that composes this mixin for NIP-66 geo/net checks.

Source code in src/bigbrotr/services/common/mixins.py
def __init__(self, **kwargs: Any) -> None:
    super().__init__(**kwargs)
    self.geo_readers = GeoReaders()

Clients

Clients(
    keys: Keys,
    networks: NetworksConfig,
    *,
    allow_insecure: bool = False,
)

Lazy pool of Nostr clients for event broadcasting.

Each relay is connected on first access via get() and cached for subsequent calls. Failed connections are remembered so that the same relay is never retried within a cycle.

Parameters:

  • keys (Keys) –

    Signing keys for event publishing.

  • networks (NetworksConfig) –

    Network configuration for proxy URL resolution and per-network connection timeouts.

  • allow_insecure (bool, default: False ) –

    If True, fall back to insecure transport on SSL failure.

See Also

ClientsMixin: Mixin that exposes a clients attribute of this type. connect_relay: Used internally to establish each connection.

Source code in src/bigbrotr/services/common/mixins.py
def __init__(
    self,
    keys: Keys,
    networks: NetworksConfig,
    *,
    allow_insecure: bool = False,
) -> None:
    self._keys = keys
    self._networks = networks
    self._allow_insecure = allow_insecure
    self._clients: dict[str, Client] = {}
    self._failed: set[str] = set()
Functions
get async
get(relay: Relay) -> Client | None

Return a connected client for a relay, connecting lazily.

Three states per relay URL:

  • Unknown — connect now, cache on success, mark failed otherwise.
  • Connected — return the cached client immediately.
  • Failed — return None without retrying.

Parameters:

  • relay (Relay) –

    Relay to get a client for.

Returns:

  • Client | None

    Connected client, or None if the connection failed.

Source code in src/bigbrotr/services/common/mixins.py
async def get(self, relay: Relay) -> Client | None:
    """Return a connected client for a relay, connecting lazily.

    Three states per relay URL:

    - **Unknown** — connect now, cache on success, mark failed otherwise.
    - **Connected** — return the cached client immediately.
    - **Failed** — return ``None`` without retrying.

    Args:
        relay: Relay to get a client for.

    Returns:
        Connected client, or ``None`` if the connection failed.
    """
    if relay.url in self._clients:
        return self._clients[relay.url]
    if relay.url in self._failed:
        return None

    from bigbrotr.utils.protocol import connect_relay  # noqa: PLC0415

    proxy_url = self._networks.get_proxy_url(relay.network)
    timeout = self._networks.get(relay.network).timeout
    try:
        client = await connect_relay(
            relay,
            keys=self._keys,
            proxy_url=proxy_url,
            timeout=timeout,
            allow_insecure=self._allow_insecure,
        )
        self._clients[relay.url] = client
        return client
    except (OSError, TimeoutError) as e:
        logger.warning("connect_client_failed relay=%s error=%s", relay.url, e)
        self._failed.add(relay.url)
        return None
get_many async
get_many(relays: list[Relay]) -> list[Client]

Return connected clients for multiple relays.

Calls get() for each relay and filters out failures.

Parameters:

  • relays (list[Relay]) –

    Relays to get clients for.

Returns:

  • list[Client]

    Connected clients (order preserved, failed relays skipped).

Source code in src/bigbrotr/services/common/mixins.py
async def get_many(self, relays: list[Relay]) -> list[Client]:
    """Return connected clients for multiple relays.

    Calls ``get()`` for each relay and filters out failures.

    Args:
        relays: Relays to get clients for.

    Returns:
        Connected clients (order preserved, failed relays skipped).
    """
    clients: list[Client] = []
    for relay in relays:
        client = await self.get(relay)
        if client is not None:
            clients.append(client)
    return clients
disconnect async
disconnect() -> None

Disconnect all clients and reset state.

Source code in src/bigbrotr/services/common/mixins.py
async def disconnect(self) -> None:
    """Disconnect all clients and reset state."""
    from bigbrotr.utils.protocol import shutdown_client  # noqa: PLC0415

    for client in self._clients.values():
        try:
            await shutdown_client(client)
        except (OSError, RuntimeError, TimeoutError) as e:
            logger.debug("client_shutdown_error error=%s", e)
    self._clients.clear()
    self._failed.clear()

ClientsMixin

ClientsMixin(**kwargs: Any)

Mixin providing managed Nostr client pool for event broadcasting.

Exposes a clients attribute of type Clients. Pops a pre-constructed clients instance from kwargs.

See Also

Monitor: The service that composes this mixin for Kind 0/10166/30166 event publishing.

Source code in src/bigbrotr/services/common/mixins.py
def __init__(self, **kwargs: Any) -> None:
    self.clients = kwargs.pop("clients")
    super().__init__(**kwargs)

CatalogAccessMixin

CatalogAccessMixin(**kwargs: Any)

Mixin providing schema catalog initialization and discovery.

Creates a Catalog instance during __init__ and discovers the database schema during __aenter__. Also provides a _is_table_enabled() helper that checks table access policy from the service config.

Services must compose this mixin with BaseService (which provides _brotr, _logger, and _config).

See Also

Api, Dvm: Services that compose this mixin. Catalog: Schema introspection and query execution.

Source code in src/bigbrotr/services/common/mixins.py
def __init__(self, **kwargs: Any) -> None:
    super().__init__(**kwargs)
    from .catalog import Catalog  # noqa: PLC0415

    self._catalog = Catalog()