Skip to content

mixins

mixins

Reusable service mixins for BigBrotr.

All service extensions live here as mixin classes. Each mixin uses cooperative multiple inheritance (super().__init__(**kwargs)) so that initialization is handled automatically via the MRO — no explicit _init_*() calls are needed in service constructors.

See Also

BaseService: The base class that mixin classes are composed with via multiple inheritance. NetworksConfig: Provides max_tasks values consumed by NetworkSemaphoresMixin.

Classes

ChunkProgress dataclass

ChunkProgress(
    started_at: float = 0.0,
    _monotonic_start: float = 0.0,
    total: int = 0,
    processed: int = 0,
    succeeded: int = 0,
    failed: int = 0,
    chunks: int = 0,
)

Tracks progress of a chunk-based processing cycle.

All counters are reset at the start of each cycle via reset(). Use record() after processing each chunk to update counters.

Attributes:

  • started_at (float) –

    Timestamp when the cycle started (time.time()).

  • total (int) –

    Total items to process in this cycle.

  • processed (int) –

    Items processed so far.

  • succeeded (int) –

    Items that succeeded.

  • failed (int) –

    Items that failed.

  • chunks (int) –

    Number of chunks completed.

Note

started_at uses time.time() for Unix timestamps (used in SQL comparisons), while elapsed uses time.monotonic() for accurate duration measurement unaffected by clock adjustments.

See Also

ChunkProgressMixin: Mixin that exposes a chunk_progress attribute of this type.

Attributes
remaining property
remaining: int

Number of items left to process.

elapsed property
elapsed: float

Seconds elapsed since processing started, rounded to 1 decimal.

Functions
reset
reset() -> None

Reset all counters and set started_at to the current time.

Source code in src/bigbrotr/services/common/mixins.py
def reset(self) -> None:
    """Reset all counters and set ``started_at`` to the current time."""
    self.started_at = time.time()
    self._monotonic_start = time.monotonic()
    self.total = 0
    self.processed = 0
    self.succeeded = 0
    self.failed = 0
    self.chunks = 0
record
record(succeeded: int, failed: int) -> None

Record the results of one processed chunk.

Parameters:

  • succeeded (int) –

    Number of items that succeeded in this chunk.

  • failed (int) –

    Number of items that failed in this chunk.

Source code in src/bigbrotr/services/common/mixins.py
def record(self, succeeded: int, failed: int) -> None:
    """Record the results of one processed chunk.

    Args:
        succeeded: Number of items that succeeded in this chunk.
        failed: Number of items that failed in this chunk.
    """
    self.processed += succeeded + failed
    self.succeeded += succeeded
    self.failed += failed
    self.chunks += 1

ChunkProgressMixin

ChunkProgressMixin(**kwargs: Any)

Mixin providing chunk-based processing progress tracking.

Services that process items in chunks compose this mixin to get a chunk_progress attribute with counters and timing. Initialization is automatic via __init__.

See Also

ChunkProgress: The dataclass this mixin manages. Validator, Monitor: Services that compose this mixin.

Examples:

class MyService(ChunkProgressMixin, BaseService[MyConfig]):
    async def run(self):
        self.chunk_progress.reset()
        ...
        self.chunk_progress.record(succeeded=len(ok), failed=len(err))
Source code in src/bigbrotr/services/common/mixins.py
def __init__(self, **kwargs: Any) -> None:
    super().__init__(**kwargs)
    self.chunk_progress = ChunkProgress()

NetworkSemaphores

NetworkSemaphores(networks: NetworksConfig)

Per-network concurrency semaphores.

Creates an asyncio.Semaphore for each operational NetworkType (clearnet, Tor, I2P, Lokinet) to cap the number of simultaneous connections.

See Also

NetworksConfig: Provides max_tasks per network type.

Source code in src/bigbrotr/services/common/mixins.py
def __init__(self, networks: NetworksConfig) -> None:
    self._map: dict[NetworkType, asyncio.Semaphore] = {
        nt: asyncio.Semaphore(networks.get(nt).max_tasks) for nt in OPERATIONAL_NETWORKS
    }
Functions
get
get(network: NetworkType) -> Semaphore | None

Look up the concurrency semaphore for a network type.

Returns:

  • Semaphore | None

    The semaphore, or None for non-operational networks

  • Semaphore | None

    (LOCAL, UNKNOWN).

Source code in src/bigbrotr/services/common/mixins.py
def get(self, network: NetworkType) -> asyncio.Semaphore | None:
    """Look up the concurrency semaphore for a network type.

    Returns:
        The semaphore, or ``None`` for non-operational networks
        (LOCAL, UNKNOWN).
    """
    return self._map.get(network)

NetworkSemaphoresMixin

NetworkSemaphoresMixin(**kwargs: Any)

Mixin providing per-network concurrency semaphores.

Exposes a network_semaphores attribute of type NetworkSemaphores, initialized from the networks keyword argument.

Services must pass networks=config.networks in their super().__init__() call.

See Also

Validator, Monitor, Synchronizer: Services that compose this mixin for bounded concurrency.

Source code in src/bigbrotr/services/common/mixins.py
def __init__(self, **kwargs: Any) -> None:
    networks: NetworksConfig = kwargs.pop("networks")
    super().__init__(**kwargs)
    self.network_semaphores = NetworkSemaphores(networks)

GeoReaders

GeoReaders()

GeoIP database reader container for city and ASN lookups.

Manages the lifecycle of geoip2.database.Reader instances. Reader initialization is offloaded to a thread via open() to avoid blocking the event loop.

Attributes:

  • city (Reader | None) –

    GeoLite2-City reader for geolocation lookups, or None.

  • asn (Reader | None) –

    GeoLite2-ASN reader for network info lookups, or None.

See Also

GeoReaderMixin: Mixin that exposes a geo_readers attribute of this type.

Source code in src/bigbrotr/services/common/mixins.py
def __init__(self) -> None:
    self.city: geoip2.database.Reader | None = None
    self.asn: geoip2.database.Reader | None = None
Functions
open async
open(
    *,
    city_path: str | None = None,
    asn_path: str | None = None,
) -> None

Open GeoIP readers from file paths via asyncio.to_thread.

Parameters:

  • city_path (str | None, default: None ) –

    Path to GeoLite2-City database. None to skip.

  • asn_path (str | None, default: None ) –

    Path to GeoLite2-ASN database. None to skip.

Source code in src/bigbrotr/services/common/mixins.py
async def open(
    self,
    *,
    city_path: str | None = None,
    asn_path: str | None = None,
) -> None:
    """Open GeoIP readers from file paths via ``asyncio.to_thread``.

    Args:
        city_path: Path to GeoLite2-City database. ``None`` to skip.
        asn_path: Path to GeoLite2-ASN database. ``None`` to skip.
    """
    import geoip2.database as geoip2_db  # noqa: PLC0415  # runtime import

    if city_path:
        self.city = await asyncio.to_thread(geoip2_db.Reader, city_path)
    if asn_path:
        self.asn = await asyncio.to_thread(geoip2_db.Reader, asn_path)
close
close() -> None

Close readers and set to None. Idempotent.

Source code in src/bigbrotr/services/common/mixins.py
def close(self) -> None:
    """Close readers and set to ``None``. Idempotent."""
    if self.city:
        self.city.close()
        self.city = None
    if self.asn:
        self.asn.close()
        self.asn = None

GeoReaderMixin

GeoReaderMixin(**kwargs: Any)

Mixin providing GeoIP database reader lifecycle management.

Exposes a geo_readers attribute of type GeoReaders.

Note

Call geo_readers.close() in a finally block or __aexit__.

See Also

Monitor: The service that composes this mixin for NIP-66 geo/net checks.

Source code in src/bigbrotr/services/common/mixins.py
def __init__(self, **kwargs: Any) -> None:
    super().__init__(**kwargs)
    self.geo_readers = GeoReaders()