Skip to content

service

service

Monitor service for relay health monitoring with NIP-66 compliance.

Performs comprehensive health checks on relays and stores results as content-addressed Metadata. Optionally publishes Kind 30166 relay discovery events and Kind 10166 monitor announcements to the Nostr network.

Health checks include:

Note

Event building is delegated to bigbrotr.nips.event_builders and broadcasting to bigbrotr.utils.transport. The Monitor handles orchestration: when to publish, which data to extract from CheckResult, and lifecycle management of publishing intervals via service state checkpoints.

See Also

MonitorConfig: Configuration model for networks, processing, geo, publishing, and discovery. BaseService: Abstract base class providing run(), run_forever(), and from_yaml(). Brotr: Database facade used for metadata persistence and checkpoint management. Validator: Upstream service that promotes candidates to the relay table.

Examples:

from bigbrotr.core import Brotr
from bigbrotr.services import Monitor

brotr = Brotr.from_yaml("config/brotr.yaml")
monitor = Monitor.from_yaml("config/services/monitor.yaml", brotr=brotr)

async with brotr:
    async with monitor:
        await monitor.run_forever()

Classes

CheckResult

Bases: NamedTuple

Result of a single relay health check.

Each field contains the typed NIP metadata container if that check was run and produced data, or None if the check was skipped (disabled in config) or failed completely. Use has_data to test whether any check produced results.

Attributes:

  • generated_at (int) –

    Unix timestamp when the health check was performed.

  • nip11 (Nip11InfoMetadata | None) –

    NIP-11 relay information document (name, description, pubkey, etc.).

  • nip66_rtt (Nip66RttMetadata | None) –

    Round-trip times for open/read/write operations in milliseconds.

  • nip66_ssl (Nip66SslMetadata | None) –

    SSL certificate validation (valid, expiry timestamp, issuer).

  • nip66_geo (Nip66GeoMetadata | None) –

    Geolocation data (country, city, coordinates, timezone, geohash).

  • nip66_net (Nip66NetMetadata | None) –

    Network information (IP address, ASN, organization).

  • nip66_dns (Nip66DnsMetadata | None) –

    DNS resolution data (IPs, CNAME, nameservers, reverse DNS).

  • nip66_http (Nip66HttpMetadata | None) –

    HTTP metadata (status code, headers, redirect chain).

See Also

MetadataFlags: Boolean flags controlling which check types are computed and stored.

Attributes
has_data property
has_data: bool

True if at least one NIP check produced data.

Monitor

Monitor(brotr: Brotr, config: MonitorConfig | None = None)

Bases: ChunkProgressMixin, NetworkSemaphoresMixin, GeoReaderMixin, BaseService[MonitorConfig]

Relay health monitoring service with NIP-66 compliance.

Performs comprehensive health checks on relays and stores results as content-addressed Metadata. Optionally publishes NIP-66 events:

  • Kind 10166: Monitor announcement (capabilities, frequency, timeouts).
  • Kind 30166: Per-relay discovery event (RTT, SSL, geo, NIP-11 tags).

Each cycle updates GeoLite2 databases, publishes profile/announcement events if due, fetches relays needing checks, processes them in chunks with per-network semaphores, persists metadata results, and publishes Kind 30166 discovery events. Supports clearnet (direct), Tor, I2P, and Lokinet (via SOCKS5 proxy).

Event building is delegated to bigbrotr.nips.event_builders and broadcasting to bigbrotr.utils.transport.

See Also

MonitorConfig: Configuration model for this service. Validator: Upstream service that promotes candidates to the relay table. Synchronizer: Downstream service that collects events from monitored relays.

Source code in src/bigbrotr/services/monitor/service.py
def __init__(self, brotr: Brotr, config: MonitorConfig | None = None) -> None:
    config = config or MonitorConfig()
    super().__init__(brotr=brotr, config=config, networks=config.networks)
    self._config: MonitorConfig
    self._keys: Keys = self._config.keys.keys
Functions
run async
run() -> None

Execute one complete monitoring cycle.

Orchestrates setup, publishing, monitoring, and cycle-level logging. Delegates the core work to update_geo_databases, publish_profile, publish_announcement, and monitor.

Source code in src/bigbrotr/services/monitor/service.py
async def run(self) -> None:
    """Execute one complete monitoring cycle.

    Orchestrates setup, publishing, monitoring, and cycle-level logging.
    Delegates the core work to ``update_geo_databases``,
    ``publish_profile``, ``publish_announcement``, and ``monitor``.
    """
    self._logger.info(
        "cycle_started",
        chunk_size=self._config.processing.chunk_size,
        max_relays=self._config.processing.max_relays,
        networks=self._config.networks.get_enabled_networks(),
    )

    self.chunk_progress.reset()
    await self.update_geo_databases()

    compute = self._config.processing.compute
    await self.geo_readers.open(
        city_path=self._config.geo.city_database_path if compute.nip66_geo else None,
        asn_path=self._config.geo.asn_database_path if compute.nip66_net else None,
    )

    try:
        await self.publish_profile()
        await self.publish_announcement()
        await self.monitor()

        self._logger.info(
            "cycle_completed",
            checked=self.chunk_progress.processed,
            successful=self.chunk_progress.succeeded,
            failed=self.chunk_progress.failed,
            chunks=self.chunk_progress.chunks,
            duration_s=self.chunk_progress.elapsed,
        )
    finally:
        self.geo_readers.close()
update_geo_databases async
update_geo_databases() -> None

Download or re-download GeoLite2 databases if missing or stale.

Download failures are logged and suppressed so that a transient network error does not prevent the monitor cycle from proceeding with a stale (or missing) database.

Source code in src/bigbrotr/services/monitor/service.py
async def update_geo_databases(self) -> None:
    """Download or re-download GeoLite2 databases if missing or stale.

    Download failures are logged and suppressed so that a transient
    network error does not prevent the monitor cycle from proceeding
    with a stale (or missing) database.
    """
    compute = self._config.processing.compute
    geo = self._config.geo

    updates: list[tuple[Path, str, str]] = []
    if compute.nip66_geo:
        updates.append((Path(geo.city_database_path), geo.city_download_url, "city"))
    if compute.nip66_net:
        updates.append((Path(geo.asn_database_path), geo.asn_download_url, "asn"))

    for path, url, name in updates:
        try:
            await self._update_geo_db(path, url, name)
        except (OSError, ValueError) as e:
            self._logger.warning("geo_db_update_failed", db=name, error=str(e))
monitor async
monitor() -> int

Count, check, persist, and publish all pending relays.

High-level entry point that counts relays due for checking, processes them in chunks via check_chunks, publishes Kind 30166 discovery events, persists metadata results, and emits progress metrics. Returns the total number of relays processed.

This is the method run() delegates to after setup. It can also be called standalone when GeoIP update and profile/announcement publishing are not desired.

Returns:

  • int

    Total number of relays processed (successful + failed).

Source code in src/bigbrotr/services/monitor/service.py
async def monitor(self) -> int:
    """Count, check, persist, and publish all pending relays.

    High-level entry point that counts relays due for checking, processes
    them in chunks via ``check_chunks``, publishes Kind 30166 discovery
    events, persists metadata results, and emits progress metrics.
    Returns the total number of relays processed.

    This is the method ``run()`` delegates to after setup. It can also
    be called standalone when GeoIP update and profile/announcement
    publishing are not desired.

    Returns:
        Total number of relays processed (successful + failed).
    """
    networks = self._config.networks.get_enabled_networks()

    self.chunk_progress.total = await self._count_relays(networks)
    self._logger.info("relays_available", total=self.chunk_progress.total)
    self._emit_progress_gauges()

    if not networks:
        self._logger.warning("no_networks_enabled")
    else:
        async for successful, failed in self.check_chunks():
            self.chunk_progress.record(succeeded=len(successful), failed=len(failed))
            await self.publish_relay_discoveries(successful)
            await self._persist_results(successful, failed)
            self._emit_progress_gauges()
            self._logger.info(
                "chunk_completed",
                chunk=self.chunk_progress.chunks,
                successful=len(successful),
                failed=len(failed),
                remaining=self.chunk_progress.remaining,
            )

    self._emit_progress_gauges()
    return self.chunk_progress.processed
check_chunks async
check_chunks() -> AsyncIterator[
    tuple[list[tuple[Relay, CheckResult]], list[Relay]]
]

Yield (successful, failed) for each processed chunk of relays.

Requires geo_readers.open() for full checks. Handles chunk fetching, budget calculation, and concurrent health checks. Persistence and publishing are left to the caller. Networks, chunk size, and relay limit are read from MonitorConfig.

Yields:

  • AsyncIterator[tuple[list[tuple[Relay, CheckResult]], list[Relay]]]

    Tuple of (successful relay-result pairs, failed relays) per chunk.

Source code in src/bigbrotr/services/monitor/service.py
async def check_chunks(
    self,
) -> AsyncIterator[tuple[list[tuple[Relay, CheckResult]], list[Relay]]]:
    """Yield (successful, failed) for each processed chunk of relays.

    Requires ``geo_readers.open()`` for full checks. Handles chunk
    fetching, budget calculation, and concurrent health checks.
    Persistence and publishing are left to the caller. Networks, chunk
    size, and relay limit are read from
    [MonitorConfig][bigbrotr.services.monitor.MonitorConfig].

    Yields:
        Tuple of (successful relay-result pairs, failed relays) per chunk.
    """
    networks = self._config.networks.get_enabled_networks()
    chunk_size = self._config.processing.chunk_size
    max_relays = self._config.processing.max_relays
    processed = 0

    while self.is_running:
        if max_relays is not None:
            budget = max_relays - processed
            if budget <= 0:
                break
            limit = min(chunk_size, budget)
        else:
            limit = chunk_size

        relays = await self._fetch_chunk(networks, limit)
        if not relays:
            break

        successful, failed = await self._check_chunk(relays)
        processed += len(successful) + len(failed)
        yield successful, failed
check_relay async
check_relay(relay: Relay) -> CheckResult

Perform all configured health checks on a single relay.

Runs Nip11, RTT, SSL, DNS, geo, net, and HTTP checks as configured. Uses the network-specific semaphore (from NetworkSemaphoresMixin) to limit concurrency.

Note

NIP-11 is fetched first because the RTT write-test may need the min_pow_difficulty from NIP-11's limitation object to apply proof-of-work on the test event. All other checks (SSL, DNS, Geo, Net, HTTP) run in parallel after NIP-11 and RTT.

Returns:

Source code in src/bigbrotr/services/monitor/service.py
async def check_relay(self, relay: Relay) -> CheckResult:
    """Perform all configured health checks on a single relay.

    Runs [Nip11][bigbrotr.nips.nip11.Nip11], RTT, SSL, DNS, geo, net,
    and HTTP checks as configured. Uses the network-specific semaphore
    (from [NetworkSemaphoresMixin][bigbrotr.services.common.mixins.NetworkSemaphoresMixin])
    to limit concurrency.

    Note:
        NIP-11 is fetched first because the RTT write-test may need
        the ``min_pow_difficulty`` from NIP-11's ``limitation`` object
        to apply proof-of-work on the test event. All other checks
        (SSL, DNS, Geo, Net, HTTP) run in parallel after NIP-11 and RTT.

    Returns:
        [CheckResult][bigbrotr.services.monitor.CheckResult] with
        metadata for each completed check (``None`` if skipped/failed).

    """
    empty = CheckResult()

    semaphore = self.network_semaphores.get(relay.network)
    if semaphore is None:
        self._logger.warning("unknown_network", url=relay.url, network=relay.network.value)
        return empty

    async with semaphore:
        network_config = self._config.networks.get(relay.network)
        proxy_url = self._config.networks.get_proxy_url(relay.network)
        timeout = network_config.timeout
        compute = self._config.processing.compute

        nip11_info: Nip11InfoMetadata | None = None
        generated_at = int(time.time())

        try:
            if compute.nip11_info:
                nip11_info = await self._with_retry(
                    lambda: self._fetch_nip11_info(relay, timeout, proxy_url),
                    self._config.processing.retries.nip11_info,
                    "nip11_info",
                    relay.url,
                )

            rtt_meta: Nip66RttMetadata | None = None

            # RTT test: open/read/write round-trip times
            if compute.nip66_rtt:
                event_builder = EventBuilder(Kind(EventKind.NIP66_TEST), "nip66-test").tags(
                    [Tag.identifier(relay.url)]
                )
                # Apply proof-of-work if NIP-11 specifies minimum difficulty
                if nip11_info and nip11_info.logs.success:
                    pow_difficulty = nip11_info.data.limitation.min_pow_difficulty
                    if pow_difficulty and pow_difficulty > 0:
                        event_builder = event_builder.pow(pow_difficulty)
                read_filter = Filter().limit(1)
                rtt_deps = Nip66RttDependencies(
                    keys=self._keys,
                    event_builder=event_builder,
                    read_filter=read_filter,
                )
                rtt_meta = await self._with_retry(
                    lambda: Nip66RttMetadata.execute(
                        relay,
                        rtt_deps,
                        timeout,
                        proxy_url,
                        allow_insecure=self._config.processing.allow_insecure,
                    ),
                    self._config.processing.retries.nip66_rtt,
                    "nip66_rtt",
                    relay.url,
                )

            # Run independent checks (SSL, DNS, Geo, Net, HTTP) in parallel
            parallel_tasks = self._build_parallel_checks(relay, compute, timeout, proxy_url)

            gathered: dict[str, Any] = {}
            if parallel_tasks:
                parallel_results = await asyncio.gather(
                    *parallel_tasks.values(), return_exceptions=True
                )
                # Re-raise CancelledError from parallel checks
                for r in parallel_results:
                    if isinstance(r, asyncio.CancelledError):
                        raise r
                gathered = dict(zip(parallel_tasks.keys(), parallel_results, strict=True))

            result = CheckResult(
                generated_at=generated_at,
                nip11=nip11_info,
                nip66_rtt=rtt_meta,
                nip66_ssl=safe_result(gathered, "ssl"),
                nip66_geo=safe_result(gathered, "geo"),
                nip66_net=safe_result(gathered, "net"),
                nip66_dns=safe_result(gathered, "dns"),
                nip66_http=safe_result(gathered, "http"),
            )

            if result.has_data:
                self._logger.debug("check_succeeded", url=relay.url)
            else:
                self._logger.debug("check_failed", url=relay.url)

            return result

        except (TimeoutError, OSError) as e:
            self._logger.debug("check_error", url=relay.url, error=str(e))
            return empty
publish_announcement async
publish_announcement() -> None

Publish Kind 10166 monitor announcement if the configured interval has elapsed.

Source code in src/bigbrotr/services/monitor/service.py
async def publish_announcement(self) -> None:
    """Publish Kind 10166 monitor announcement if the configured interval has elapsed."""
    ann = self._config.announcement
    await self._publish_if_due(
        enabled=ann.enabled,
        relays=self._get_publish_relays(ann.relays),
        interval=ann.interval,
        state_key="last_announcement",
        builder=self._build_kind_10166(),
        event_name="announcement",
        timeout=self._config.publishing.timeout,
    )
publish_profile async
publish_profile() -> None

Publish Kind 0 profile metadata if the configured interval has elapsed.

Source code in src/bigbrotr/services/monitor/service.py
async def publish_profile(self) -> None:
    """Publish Kind 0 profile metadata if the configured interval has elapsed."""
    profile = self._config.profile
    await self._publish_if_due(
        enabled=profile.enabled,
        relays=self._get_publish_relays(profile.relays),
        interval=profile.interval,
        state_key="last_profile",
        builder=self._build_kind_0(),
        event_name="profile",
        timeout=self._config.publishing.timeout,
    )
publish_relay_discoveries async
publish_relay_discoveries(
    successful: list[tuple[Relay, CheckResult]],
) -> None

Publish Kind 30166 relay discovery events for successful health checks.

Source code in src/bigbrotr/services/monitor/service.py
async def publish_relay_discoveries(self, successful: list[tuple[Relay, CheckResult]]) -> None:
    """Publish Kind 30166 relay discovery events for successful health checks."""
    disc = self._config.discovery
    relays = self._get_publish_relays(disc.relays)
    if not disc.enabled or not relays:
        return

    builders: list[EventBuilder] = []
    for relay, result in successful:
        try:
            builders.append(self._build_kind_30166(relay, result))
        except (ValueError, KeyError, TypeError) as e:
            self._logger.debug("build_30166_failed", url=relay.url, error=str(e))

    if builders:
        sent = await broadcast_events(
            builders,
            relays,
            self._keys,
            timeout=self._config.publishing.timeout,
        )
        if sent:
            self._logger.debug("discoveries_published", count=len(builders))
        else:
            self._logger.warning(
                "discoveries_broadcast_failed",
                count=len(builders),
                error="no relays reachable",
            )

Functions