Skip to content

utils

utils

Monitor service utility functions.

Pure helpers for health check result inspection and retry logic.

Classes

CheckResult

Bases: NamedTuple

Result of a single relay health check.

Each field contains the typed NIP metadata container if that check was run and produced data, or None if the check was skipped (disabled in config) or failed completely. Use has_data to test whether any check produced results.

Attributes:

  • generated_at (int) –

    Unix timestamp when the health check was performed.

  • nip11_info (Nip11InfoMetadata | None) –

    NIP-11 relay information document (name, description, pubkey, etc.).

  • nip66_rtt (Nip66RttMetadata | None) –

    Round-trip times for open/read/write operations in milliseconds.

  • nip66_ssl (Nip66SslMetadata | None) –

    SSL certificate validation (valid, expiry timestamp, issuer).

  • nip66_geo (Nip66GeoMetadata | None) –

    Geolocation data (country, city, coordinates, timezone, geohash).

  • nip66_net (Nip66NetMetadata | None) –

    Network information (IP address, ASN, organization).

  • nip66_dns (Nip66DnsMetadata | None) –

    DNS resolution data (IPs, CNAME, nameservers, reverse DNS).

  • nip66_http (Nip66HttpMetadata | None) –

    HTTP metadata (server software and framework headers).

See Also

MetadataFlags: Boolean flags controlling which check types are computed and stored.

Attributes
has_data property
has_data: bool

True if at least one NIP check produced data.

Functions

log_success

log_success(result: Any) -> bool

Extract success status from a metadata result's logs object.

Source code in src/bigbrotr/services/monitor/utils.py
def log_success(result: Any) -> bool:
    """Extract success status from a metadata result's logs object."""
    logs = result.logs
    if isinstance(logs, BaseLogs):
        return bool(logs.success)
    if isinstance(logs, Nip66RttMultiPhaseLogs):
        return bool(logs.open_success)
    return False

log_reason

log_reason(result: Any) -> str | None

Extract failure reason from a metadata result's logs object.

Source code in src/bigbrotr/services/monitor/utils.py
def log_reason(result: Any) -> str | None:
    """Extract failure reason from a metadata result's logs object."""
    logs = result.logs
    if isinstance(logs, BaseLogs):
        return str(logs.reason) if logs.reason else None
    if isinstance(logs, Nip66RttMultiPhaseLogs):
        return str(logs.open_reason) if logs.open_reason else None
    return None

extract_result

extract_result(results: dict[str, Any], key: str) -> Any

Extract a successful result from asyncio.gather output.

Returns None if the key is absent or the result is an exception.

Source code in src/bigbrotr/services/monitor/utils.py
def extract_result(results: dict[str, Any], key: str) -> Any:
    """Extract a successful result from asyncio.gather output.

    Returns None if the key is absent or the result is an exception.
    """
    value = results.get(key)
    if value is None or isinstance(value, BaseException):
        return None
    return value

collect_metadata

collect_metadata(
    successful: list[tuple[Relay, CheckResult]],
    store: MetadataFlags,
) -> list[RelayMetadata]

Build storable metadata records from successful health check results.

Iterates over successful relay/result pairs and collects metadata for each check type enabled in store. Field names in CheckResult, MetadataFlags, and MetadataType are aligned by convention (e.g. nip11_info, nip66_rtt).

Parameters:

  • successful (list[tuple[Relay, CheckResult]]) –

    Relays with their health check results.

  • store (MetadataFlags) –

    Flags controlling which metadata types to include.

Returns:

Source code in src/bigbrotr/services/monitor/utils.py
def collect_metadata(
    successful: list[tuple[Relay, CheckResult]],
    store: MetadataFlags,
) -> list[RelayMetadata]:
    """Build storable metadata records from successful health check results.

    Iterates over successful relay/result pairs and collects metadata for
    each check type enabled in ``store``. Field names in ``CheckResult``,
    ``MetadataFlags``, and ``MetadataType`` are aligned by convention
    (e.g. ``nip11_info``, ``nip66_rtt``).

    Args:
        successful: Relays with their health check results.
        store: Flags controlling which metadata types to include.

    Returns:
        List of [RelayMetadata][bigbrotr.models.relay_metadata.RelayMetadata]
        ready for batch insertion.
    """
    metadata: list[RelayMetadata] = []
    for relay, result in successful:
        for meta_type in MetadataType:
            field = meta_type.value
            nip_meta: BaseNipMetadata | None = getattr(result, field)
            if nip_meta and getattr(store, field):
                metadata.append(
                    RelayMetadata(
                        relay=relay,
                        metadata=Metadata(type=meta_type, data=nip_meta.to_dict()),
                        generated_at=result.generated_at,
                    )
                )
    return metadata

retry_fetch async

retry_fetch(
    relay: Relay,
    coro_factory: Callable[[], Coroutine[Any, Any, _T]],
    retry: RetryConfig,
    operation: str,
    wait: Callable[[float], Coroutine[Any, Any, bool]]
    | None = None,
) -> _T | None

Execute a metadata fetch with exponential backoff retry.

Retries on network failures up to retry.max_attempts times. Returns the result (possibly with success=False) or None on exception.

Parameters:

  • relay (Relay) –

    Target relay (used for logging context).

  • coro_factory (Callable[[], Coroutine[Any, Any, _T]]) –

    Factory producing a fresh coroutine per attempt.

  • retry (RetryConfig) –

    Backoff configuration (max attempts, delays, jitter).

  • operation (str) –

    Check name for log messages (e.g. "nip11_info").

  • wait (Callable[[float], Coroutine[Any, Any, bool]] | None, default: None ) –

    Optional shutdown-aware sleep. Receives delay in seconds, returns True if shutdown was requested. When None, falls back to asyncio.sleep.

Note

The coro_factory pattern (a callable returning a coroutine) is required because Python coroutines are single-use: once awaited, they cannot be re-awaited. The factory creates a fresh coroutine for each retry attempt.

Warning

Jitter is computed via random.uniform() (PRNG, # noqa: S311). This is intentional -- jitter only needs to decorrelate concurrent retries, not provide cryptographic randomness.

Source code in src/bigbrotr/services/monitor/utils.py
async def retry_fetch(
    relay: Relay,
    coro_factory: Callable[[], Coroutine[Any, Any, _T]],
    retry: RetryConfig,
    operation: str,
    wait: Callable[[float], Coroutine[Any, Any, bool]] | None = None,
) -> _T | None:
    """Execute a metadata fetch with exponential backoff retry.

    Retries on network failures up to ``retry.max_attempts`` times.
    Returns the result (possibly with ``success=False``) or ``None`` on
    exception.

    Args:
        relay: Target relay (used for logging context).
        coro_factory: Factory producing a fresh coroutine per attempt.
        retry: Backoff configuration (max attempts, delays, jitter).
        operation: Check name for log messages (e.g. ``"nip11_info"``).
        wait: Optional shutdown-aware sleep. Receives delay in seconds,
            returns ``True`` if shutdown was requested. When ``None``,
            falls back to ``asyncio.sleep``.

    Note:
        The ``coro_factory`` pattern (a callable returning a coroutine)
        is required because Python coroutines are single-use: once
        awaited, they cannot be re-awaited. The factory creates a fresh
        coroutine for each retry attempt.

    Warning:
        Jitter is computed via ``random.uniform()`` (PRNG, ``# noqa: S311``).
        This is intentional -- jitter only needs to decorrelate
        concurrent retries, not provide cryptographic randomness.
    """
    max_retries = retry.max_attempts
    result = None

    for attempt in range(max_retries + 1):
        try:
            result = await coro_factory()
            if log_success(result):
                return result
        except (TimeoutError, OSError) as e:
            logger.debug(
                "check_error",
                extra={
                    "operation": operation,
                    "relay": relay.url,
                    "attempt": attempt + 1,
                    "error": str(e),
                },
            )
            result = None

        # Network failure - retry if attempts remaining
        if attempt < max_retries:
            delay = min(retry.initial_delay * (2**attempt), retry.max_delay)
            jitter = random.uniform(0, retry.jitter)  # noqa: S311
            total_delay = delay + jitter
            if wait is not None:
                if await wait(total_delay):
                    return None
            else:
                await asyncio.sleep(total_delay)
            logger.debug(
                "check_retry",
                extra={
                    "operation": operation,
                    "relay": relay.url,
                    "attempt": attempt + 1,
                    "reason": log_reason(result) if result else None,
                    "delay_s": round(total_delay, 2),
                },
            )

    # All retries exhausted
    logger.debug(
        "check_exhausted",
        extra={
            "operation": operation,
            "relay": relay.url,
            "total_attempts": max_retries + 1,
            "reason": log_reason(result) if result else None,
        },
    )
    return result