Skip to content

service

service

Finder service for BigBrotr.

Discovers Nostr relay URLs from two sources:

  1. External APIs -- Public endpoints like nostr.watch that list relays. Each API source declares how relay URLs are extracted from its JSON response (flat array, nested path, object keys, etc.) via ApiSourceConfig.
  2. Database events -- Tag values from all stored events are parsed via parse_relay; only valid wss:// / ws:// URLs pass validation. This is kind-agnostic: any event whose tagvalues column contains relay-like strings will contribute discovered URLs.

Discovered URLs are inserted as validation candidates for the Validator service via insert_relays_as_candidates.

Note

Event scanning uses per-relay cursor-based pagination so that historical events inserted by the Synchronizer are eventually processed. Cursors are stored as ServiceState records with state_type='cursor' and a composite (timestamp, id) cursor in state_value for deterministic resumption.

See Also

FinderConfig: Configuration model for API sources, event scanning, and concurrency. BaseService: Abstract base class providing run(), run_forever(), and from_yaml(). Brotr: Database facade used for event queries and candidate insertion. Seeder: Upstream service that bootstraps initial relay URLs. Validator: Downstream service that validates the candidates discovered here.

Examples:

from bigbrotr.core import Brotr
from bigbrotr.services import Finder

brotr = Brotr.from_yaml("config/brotr.yaml")
finder = Finder.from_yaml("config/services/finder.yaml", brotr=brotr)

async with brotr:
    async with finder:
        await finder.run_forever()

Classes

Finder

Finder(brotr: Brotr, config: FinderConfig | None = None)

Bases: ConcurrentStreamMixin, BaseService[FinderConfig]

Relay discovery service.

Discovers Nostr relay URLs from external APIs and stored database events, then inserts them as validation candidates for the Validator service via insert_relays_as_candidates.

See Also

FinderConfig: Configuration model for this service. Seeder: Upstream service that provides initial seed URLs. Validator: Downstream service that validates discovered candidates.

Source code in src/bigbrotr/services/finder/service.py
def __init__(
    self,
    brotr: Brotr,
    config: FinderConfig | None = None,
) -> None:
    config = config or FinderConfig()
    super().__init__(brotr=brotr, config=config)
    self._config: FinderConfig
Functions
run async
run() -> None

Execute a single discovery cycle across all configured sources.

Source code in src/bigbrotr/services/finder/service.py
async def run(self) -> None:
    """Execute a single discovery cycle across all configured sources."""
    await self.find()
cleanup async
cleanup() -> int

Remove stale state: orphaned relay cursors and obsolete API checkpoints.

Source code in src/bigbrotr/services/finder/service.py
async def cleanup(self) -> int:
    """Remove stale state: orphaned relay cursors and obsolete API checkpoints."""
    removed = await delete_stale_cursors(self._brotr)
    active_urls = [s.url for s in self._config.api.sources]
    removed += await delete_stale_api_checkpoints(self._brotr, active_urls)
    return removed
find async
find() -> int

Discover relay URLs from all configured sources.

Runs API fetching first (fast), then event scanning (slow), respecting the api.enabled and events.enabled configuration flags. Returns the total number of relay URLs inserted as candidates.

Returns:

  • int

    Total number of relay URLs discovered and inserted.

Source code in src/bigbrotr/services/finder/service.py
async def find(self) -> int:
    """Discover relay URLs from all configured sources.

    Runs API fetching first (fast), then event scanning (slow),
    respecting the ``api.enabled`` and ``events.enabled`` configuration
    flags. Returns the total number of relay URLs inserted as candidates.

    Returns:
        Total number of relay URLs discovered and inserted.
    """
    found = 0
    found += await self.find_from_api()
    found += await self.find_from_events()
    return found
find_from_api async
find_from_api() -> int

Discover relay URLs from configured external API endpoints.

Delegates fetching to _find_from_api_worker, which iterates all enabled sources sequentially. The parent saves updated checkpoints and inserts discovered URLs as candidates.

Returns:

  • int

    Number of relay URLs inserted as candidates.

Source code in src/bigbrotr/services/finder/service.py
async def find_from_api(self) -> int:
    """Discover relay URLs from configured external API endpoints.

    Delegates fetching to ``_find_from_api_worker``, which iterates
    all enabled sources sequentially. The parent saves updated
    checkpoints and inserts discovered URLs as candidates.

    Returns:
        Number of relay URLs inserted as candidates.
    """
    if not self._config.api.enabled:
        return 0

    enabled = [s for s in self._config.api.sources if s.enabled]

    buffer: list[Relay] = []
    pending_checkpoints: list[ApiCheckpoint] = []

    self.set_gauge("total_sources", len(enabled))
    self.set_gauge("sources_fetched", 0)
    self.set_gauge("candidates_found_from_api", 0)

    self._logger.info("api_started", source_count=len(enabled))

    async for relays, checkpoint in self._find_from_api_worker(enabled):
        buffer.extend(relays)
        pending_checkpoints.append(checkpoint)
        self.inc_gauge("sources_fetched")

    if pending_checkpoints:
        await upsert_api_checkpoints(self._brotr, pending_checkpoints)

    found = await insert_relays_as_candidates(self._brotr, buffer)
    self.set_gauge("candidates_found_from_api", found)

    self._logger.info("api_completed", found=found, collected=len(buffer))
    return found
find_from_events async
find_from_events() -> int

Discover relay URLs from stored events using cursor pagination.

Fetches current cursor positions, scans all relays concurrently (bounded by events.parallel_relays) via _iter_concurrent(). Workers stream event-relay rows. The parent extracts relay URLs, accumulates them in a global buffer flushed at brotr.config.batch.max_size, and saves cursors in batch after each flush.

Returns:

  • int

    Number of relay URLs discovered and inserted as candidates.

Source code in src/bigbrotr/services/finder/service.py
async def find_from_events(self) -> int:
    """Discover relay URLs from stored events using cursor pagination.

    Fetches current cursor positions, scans all relays concurrently
    (bounded by ``events.parallel_relays``) via ``_iter_concurrent()``.
    Workers stream event-relay rows. The parent extracts relay URLs,
    accumulates them in a global buffer flushed at
    ``brotr.config.batch.max_size``, and saves cursors in batch after
    each flush.

    Returns:
        Number of relay URLs discovered and inserted as candidates.
    """
    if not self._config.events.enabled:
        return 0

    cursors = await fetch_cursors_to_find(self._brotr)
    if not cursors:
        self._logger.debug("no_relays_to_scan")
        return 0

    self._event_semaphore = asyncio.Semaphore(self._config.events.parallel_relays)
    self._phase_start = time.monotonic()

    total_found = 0
    buffer: list[Relay] = []
    pending_cursors: dict[str, FinderCursor] = {}
    batch_size = self._config.events.batch_size

    self.set_gauge("relays_seen", 0)
    self.set_gauge("rows_seen", 0)
    self.set_gauge("candidates_found_from_events", 0)

    self._logger.info("scan_started", relay_count=len(cursors))

    async for relays, cursor in self._iter_concurrent(cursors, self._find_from_events_worker):
        buffer.extend(relays)
        pending_cursors[cursor.key] = cursor
        self.inc_gauge("rows_seen")
        if len(buffer) >= batch_size:
            found = await insert_relays_as_candidates(self._brotr, buffer)
            total_found += found
            self.inc_gauge("candidates_found_from_events", found)
            buffer = []
            await upsert_finder_cursors(self._brotr, pending_cursors.values())
            pending_cursors = {}

    if buffer:
        found = await insert_relays_as_candidates(self._brotr, buffer)
        total_found += found
        self.inc_gauge("candidates_found_from_events", found)
    if pending_cursors:
        await upsert_finder_cursors(self._brotr, pending_cursors.values())

    self._logger.info("scan_completed", found=total_found)
    return total_found

Functions