Skip to content

service

service

Validator service for BigBrotr.

Validates relay candidates discovered by the Finder service by checking whether they speak the Nostr protocol via WebSocket. Valid candidates are promoted to the relays table; invalid ones have their failure counter incremented and are retried in future cycles.

Validation criteria: a candidate is valid if it accepts a WebSocket connection and responds to a Nostr REQ message with EOSE, EVENT, NOTICE, or AUTH, as determined by is_nostr_relay.

Note

Each cycle initializes per-network semaphores from NetworksConfig, cleans up stale/exhausted candidates, then processes remaining candidates in configurable chunks. CandidateCheckpoint priority is ordered by fewest failures first (most likely to succeed).

See Also

ValidatorConfig: Configuration model for networks, processing, and cleanup. BaseService: Abstract base class providing run(), run_forever(), and from_yaml(). Brotr: Database facade used for candidate queries and relay promotion. Finder: Upstream service that discovers and inserts candidates. Monitor: Downstream service that health-checks promoted relays. is_nostr_relay: WebSocket probe function used for validation. promote_candidates: Insert+delete for promotion (with cleanup safety net).

Examples:

from bigbrotr.core import Brotr
from bigbrotr.services import Validator

brotr = Brotr.from_yaml("config/brotr.yaml")
validator = Validator.from_yaml("config/services/validator.yaml", brotr=brotr)

async with brotr:
    async with validator:
        await validator.run_forever()

Classes

Validator

Validator(
    brotr: Brotr, config: ValidatorConfig | None = None
)

Bases: ConcurrentStreamMixin, NetworkSemaphoresMixin, BaseService[ValidatorConfig]

Validates relay candidates by checking if they speak the Nostr protocol.

Processes candidate URLs discovered by the Finder service. Valid relays are promoted to the relays table via promote_candidates; invalid ones have their failure counter incremented for retry in future cycles.

Each cycle initializes per-network semaphores via NetworkSemaphoresMixin, cleans up stale/exhausted candidates, then processes remaining candidates in configurable chunks. Supports clearnet (direct), Tor (.onion via SOCKS5), I2P (.i2p via SOCKS5), and Lokinet (.loki via SOCKS5).

See Also

ValidatorConfig: Configuration model for this service. Finder: Upstream service that creates the candidates validated here. Monitor: Downstream service that health-checks promoted relays. is_nostr_relay: WebSocket probe used by _validate_candidate().

Source code in src/bigbrotr/services/validator/service.py
def __init__(self, brotr: Brotr, config: ValidatorConfig | None = None) -> None:
    config = config or ValidatorConfig()
    super().__init__(brotr=brotr, config=config, networks=config.networks)
    self._config: ValidatorConfig
Functions
run async
run() -> None

Execute one complete validation cycle.

Source code in src/bigbrotr/services/validator/service.py
async def run(self) -> None:
    """Execute one complete validation cycle."""
    await self.validate()
cleanup async
cleanup() -> int

Remove promoted candidates and exhausted candidates.

Source code in src/bigbrotr/services/validator/service.py
async def cleanup(self) -> int:
    """Remove promoted candidates and exhausted candidates."""
    removed = await delete_promoted_candidates(self._brotr)
    if self._config.cleanup.enabled:
        removed += await delete_exhausted_candidates(
            self._brotr, self._config.cleanup.max_failures
        )
    return removed
validate async
validate() -> int

Validate all pending candidates and persist results.

Fetches candidates in pages (chunk_size), validates each page concurrently via _iter_concurrent(), and flushes results at each pagination boundary.

Returns:

  • int

    Total number of candidates processed (valid + invalid).

Source code in src/bigbrotr/services/validator/service.py
async def validate(self) -> int:
    """Validate all pending candidates and persist results.

    Fetches candidates in pages (``chunk_size``), validates each page
    concurrently via
    ``_iter_concurrent()``,
    and flushes results at each pagination boundary.

    Returns:
        Total number of candidates processed (valid + invalid).
    """
    networks = self._config.networks.get_enabled_networks()
    if not networks:
        self._logger.warning("no_networks_enabled")
        return 0

    attempted_before = int(time.time() - self._config.processing.interval)

    total = await count_candidates(self._brotr, networks, attempted_before)
    validated = 0
    not_validated = 0

    self.set_gauge("total", total)
    self.set_gauge("validated", 0)
    self.set_gauge("not_validated", 0)

    self._logger.info("candidates_available", total=total)

    chunk_size = self._config.processing.chunk_size
    max_candidates = self._config.processing.max_candidates

    while self.is_running:
        if max_candidates is not None:
            budget = max_candidates - validated - not_validated
            if budget <= 0:
                break
            limit = min(chunk_size, budget)
        else:
            limit = chunk_size

        candidates = await fetch_candidates(self._brotr, networks, attempted_before, limit)
        if not candidates:
            break

        chunk_valid: list[CandidateCheckpoint] = []
        chunk_invalid: list[CandidateCheckpoint] = []

        async for candidate, is_valid in self._iter_concurrent(
            candidates, self._validate_worker
        ):
            if is_valid:
                chunk_valid.append(candidate)
                validated += 1
            else:
                chunk_invalid.append(candidate)
                not_validated += 1
            self.inc_gauge("validated" if is_valid else "not_validated")

        await promote_candidates(self._brotr, chunk_valid)
        await fail_candidates(self._brotr, chunk_invalid)

        self._logger.info(
            "chunk_completed",
            validated=len(chunk_valid),
            not_validated=len(chunk_invalid),
            remaining=total - validated - not_validated,
        )

    return validated + not_validated

Functions