Skip to content

service

service

Validator service for BigBrotr.

Validates relay candidates discovered by the Finder service by checking whether they speak the Nostr protocol via WebSocket. Valid candidates are promoted to the relays table; invalid ones have their failure counter incremented and are retried in future cycles.

Validation criteria: a candidate is valid if it accepts a WebSocket connection and responds to a Nostr REQ message with EOSE, EVENT, NOTICE, or AUTH, as determined by is_nostr_relay.

Note

Each cycle initializes per-network semaphores from NetworksConfig, cleans up stale/exhausted candidates, then processes remaining candidates in configurable chunks. Candidate priority is ordered by fewest failures first (most likely to succeed).

See Also

ValidatorConfig: Configuration model for networks, processing, and cleanup. BaseService: Abstract base class providing run(), run_forever(), and from_yaml(). Brotr: Database facade used for candidate queries and relay promotion. Finder: Upstream service that discovers and inserts candidates. Monitor: Downstream service that health-checks promoted relays. is_nostr_relay: WebSocket probe function used for validation. promote_candidates: Atomic insert+delete query for promotion.

Examples:

from bigbrotr.core import Brotr
from bigbrotr.services import Validator

brotr = Brotr.from_yaml("config/brotr.yaml")
validator = Validator.from_yaml("config/services/validator.yaml", brotr=brotr)

async with brotr:
    async with validator:
        await validator.run_forever()

Classes

Candidate dataclass

Candidate(relay: Relay, data: dict[str, Any])

Relay candidate pending validation.

Wraps a Relay object with its service_state metadata, providing convenient access to validation state (e.g., failure count).

Attributes:

  • relay (Relay) –

    Relay object with URL and network information.

  • data (dict[str, Any]) –

    Metadata from the service_state table (network, failures, etc.).

See Also

fetch_candidate_chunk: Query that produces the rows from which candidates are built.

Attributes
failures property
failures: int

Return the number of failed validation attempts for this candidate.

Validator

Validator(
    brotr: Brotr, config: ValidatorConfig | None = None
)

Bases: ChunkProgressMixin, NetworkSemaphoresMixin, BaseService[ValidatorConfig]

Validates relay candidates by checking if they speak the Nostr protocol.

Processes candidate URLs discovered by the Finder service. Valid relays are promoted to the relays table via promote_candidates; invalid ones have their failure counter incremented for retry in future cycles.

Each cycle initializes per-network semaphores via NetworkSemaphoresMixin, cleans up stale/exhausted candidates, then processes remaining candidates in configurable chunks. Supports clearnet (direct), Tor (.onion via SOCKS5), I2P (.i2p via SOCKS5), and Lokinet (.loki via SOCKS5).

See Also

ValidatorConfig: Configuration model for this service. Finder: Upstream service that creates the candidates validated here. Monitor: Downstream service that health-checks promoted relays. is_nostr_relay: WebSocket probe used by validate_candidate().

Source code in src/bigbrotr/services/validator/service.py
def __init__(self, brotr: Brotr, config: ValidatorConfig | None = None) -> None:
    config = config or ValidatorConfig()
    super().__init__(brotr=brotr, config=config, networks=config.networks)
    self._config: ValidatorConfig
Functions
run async
run() -> None

Execute one complete validation cycle.

Orchestrates cleanup, validation, and cycle-level logging. Delegates the core work to cleanup_stale, cleanup_exhausted, and validate.

Source code in src/bigbrotr/services/validator/service.py
async def run(self) -> None:
    """Execute one complete validation cycle.

    Orchestrates cleanup, validation, and cycle-level logging.
    Delegates the core work to ``cleanup_stale``, ``cleanup_exhausted``,
    and ``validate``.
    """
    self._logger.info(
        "cycle_started",
        chunk_size=self._config.processing.chunk_size,
        max_candidates=self._config.processing.max_candidates,
        networks=self._config.networks.get_enabled_networks(),
    )

    self.chunk_progress.reset()
    await self.cleanup_stale()
    await self.cleanup_exhausted()
    await self.validate()

    self._logger.info(
        "cycle_completed",
        validated=self.chunk_progress.succeeded,
        invalidated=self.chunk_progress.failed,
        chunks=self.chunk_progress.chunks,
        duration_s=self.chunk_progress.elapsed,
    )
validate async
validate() -> int

Count, validate, and persist all pending candidates.

High-level entry point that counts available candidates, processes them in chunks via validate_chunks, persists results, and emits progress metrics. Returns the total number of candidates processed.

This is the method run() delegates to after cleanup. It can also be called standalone when cleanup is not desired.

Returns:

  • int

    Total number of candidates processed (valid + invalid).

Source code in src/bigbrotr/services/validator/service.py
async def validate(self) -> int:
    """Count, validate, and persist all pending candidates.

    High-level entry point that counts available candidates, processes
    them in chunks via ``validate_chunks``, persists results, and emits
    progress metrics. Returns the total number of candidates processed.

    This is the method ``run()`` delegates to after cleanup. It can also
    be called standalone when cleanup is not desired.

    Returns:
        Total number of candidates processed (valid + invalid).
    """
    networks = self._config.networks.get_enabled_networks()

    self.chunk_progress.total = await count_candidates(self._brotr, networks)
    self._logger.info("candidates_available", total=self.chunk_progress.total)
    self._emit_progress_gauges()

    if not networks:
        self._logger.warning("no_networks_enabled")
    else:
        async for valid, invalid in self.validate_chunks():
            self.chunk_progress.record(succeeded=len(valid), failed=len(invalid))
            await self._persist_results(valid, invalid)
            self._emit_progress_gauges()
            self._logger.info(
                "chunk_completed",
                chunk=self.chunk_progress.chunks,
                valid=len(valid),
                invalid=len(invalid),
                remaining=self.chunk_progress.remaining,
            )

    self._emit_progress_gauges()
    return self.chunk_progress.processed
cleanup_stale async
cleanup_stale() -> int

Remove candidates whose URLs already exist in the relays table.

Stale candidates appear when a relay was validated by another cycle, manually added, or re-discovered by the Finder. Removing them prevents wasted validation attempts.

Returns:

  • int

    Number of stale candidates removed.

See Also

delete_stale_candidates: The SQL query executed by this method.

Source code in src/bigbrotr/services/validator/service.py
async def cleanup_stale(self) -> int:
    """Remove candidates whose URLs already exist in the relays table.

    Stale candidates appear when a relay was validated by another cycle,
    manually added, or re-discovered by the
    [Finder][bigbrotr.services.finder.Finder]. Removing them prevents
    wasted validation attempts.

    Returns:
        Number of stale candidates removed.

    See Also:
        [delete_stale_candidates][bigbrotr.services.common.queries.delete_stale_candidates]:
            The SQL query executed by this method.
    """
    count = await delete_stale_candidates(self._brotr)
    if count > 0:
        self.inc_counter("total_stale_removed", count)
        self._logger.info("stale_removed", count=count)
    return count
cleanup_exhausted async
cleanup_exhausted() -> int

Remove candidates that have exceeded the maximum failure threshold.

Prevents permanently broken relays from consuming validation resources. Controlled by cleanup.enabled and cleanup.max_failures in CleanupConfig.

Returns:

  • int

    Number of exhausted candidates removed.

See Also

delete_exhausted_candidates: The SQL query executed by this method.

Source code in src/bigbrotr/services/validator/service.py
async def cleanup_exhausted(self) -> int:
    """Remove candidates that have exceeded the maximum failure threshold.

    Prevents permanently broken relays from consuming validation resources.
    Controlled by ``cleanup.enabled`` and ``cleanup.max_failures`` in
    [CleanupConfig][bigbrotr.services.validator.CleanupConfig].

    Returns:
        Number of exhausted candidates removed.

    See Also:
        [delete_exhausted_candidates][bigbrotr.services.common.queries.delete_exhausted_candidates]:
            The SQL query executed by this method.
    """
    if not self._config.cleanup.enabled:
        return 0

    count = await delete_exhausted_candidates(
        self._brotr,
        self._config.cleanup.max_failures,
    )
    if count > 0:
        self.inc_counter("total_exhausted_removed", count)
        self._logger.info(
            "exhausted_removed",
            count=count,
            threshold=self._config.cleanup.max_failures,
        )
    return count
validate_candidate async
validate_candidate(candidate: Candidate) -> bool

Validate a single relay candidate by connecting and testing the Nostr protocol.

Uses the network-specific semaphore and proxy settings from NetworksConfig. Delegates the actual WebSocket probe to is_nostr_relay.

Parameters:

Returns:

  • bool

    True if the relay speaks Nostr protocol, False otherwise.

Source code in src/bigbrotr/services/validator/service.py
async def validate_candidate(self, candidate: Candidate) -> bool:
    """Validate a single relay candidate by connecting and testing the Nostr protocol.

    Uses the network-specific semaphore and proxy settings from
    [NetworksConfig][bigbrotr.services.common.configs.NetworksConfig].
    Delegates the actual WebSocket probe to
    [is_nostr_relay][bigbrotr.utils.protocol.is_nostr_relay].

    Args:
        candidate: [Candidate][bigbrotr.services.validator.service.Candidate]
            to validate.

    Returns:
        ``True`` if the relay speaks Nostr protocol, ``False`` otherwise.
    """
    relay = candidate.relay
    semaphore = self.network_semaphores.get(relay.network)

    if semaphore is None:
        self._logger.warning("unknown_network", url=relay.url, network=relay.network.value)
        return False

    async with semaphore:
        network_config = self._config.networks.get(relay.network)
        proxy_url = self._config.networks.get_proxy_url(relay.network)
        try:
            return await is_nostr_relay(relay, proxy_url, network_config.timeout)
        except (TimeoutError, OSError):
            return False
validate_chunks async
validate_chunks() -> AsyncIterator[
    tuple[list[Relay], list[Candidate]]
]

Yield (valid_relays, invalid_candidates) for each processed chunk.

Handles chunk fetching, budget calculation, and concurrent validation. Persistence is left to the caller. Networks, chunk size, and candidate limit are read from ValidatorConfig.

Yields:

  • AsyncIterator[tuple[list[Relay], list[Candidate]]]

    Tuple of (valid Relay list, invalid Candidate list) per chunk.

Source code in src/bigbrotr/services/validator/service.py
async def validate_chunks(self) -> AsyncIterator[tuple[list[Relay], list[Candidate]]]:
    """Yield ``(valid_relays, invalid_candidates)`` for each processed chunk.

    Handles chunk fetching, budget calculation, and concurrent validation.
    Persistence is left to the caller. Networks, chunk size, and candidate
    limit are read from
    [ValidatorConfig][bigbrotr.services.validator.ValidatorConfig].

    Yields:
        Tuple of (valid Relay list, invalid Candidate list) per chunk.
    """
    networks = self._config.networks.get_enabled_networks()
    chunk_size = self._config.processing.chunk_size
    max_candidates = self._config.processing.max_candidates
    processed = 0

    while self.is_running:
        if max_candidates is not None:
            budget = max_candidates - processed
            if budget <= 0:
                break
            limit = min(chunk_size, budget)
        else:
            limit = chunk_size

        candidates = await self._fetch_chunk(networks, limit)
        if not candidates:
            break

        valid, invalid = await self._validate_chunk(candidates)
        processed += len(valid) + len(invalid)
        yield valid, invalid

Functions