Skip to content

service

service

Finder service for BigBrotr.

Discovers Nostr relay URLs from two sources:

  1. External APIs -- Public endpoints like nostr.watch that list relays. Each API source declares how relay URLs are extracted from its JSON response (flat array, nested path, object keys, etc.) via ApiSourceConfig.
  2. Database events -- Tag values from all stored events are parsed via parse_relay_url; only valid wss:// / ws:// URLs pass validation. This is kind-agnostic: any event whose tagvalues column contains relay-like strings will contribute discovered URLs.

Discovered URLs are inserted as validation candidates for the Validator service via insert_candidates.

Note

Event scanning uses per-relay cursor-based pagination so that historical events inserted by the Synchronizer are eventually processed. Cursors are stored as ServiceState records with state_type='cursor' and state_value.last_seen_at.

See Also

FinderConfig: Configuration model for API sources, event scanning, and concurrency. BaseService: Abstract base class providing run(), run_forever(), and from_yaml(). Brotr: Database facade used for event queries and candidate insertion. Seeder: Upstream service that bootstraps initial relay URLs. Validator: Downstream service that validates the candidates discovered here.

Examples:

from bigbrotr.core import Brotr
from bigbrotr.services import Finder

brotr = Brotr.from_yaml("config/brotr.yaml")
finder = Finder.from_yaml("config/services/finder.yaml", brotr=brotr)

async with brotr:
    async with finder:
        await finder.run_forever()

Classes

Finder

Finder(brotr: Brotr, config: FinderConfig | None = None)

Bases: BaseService[FinderConfig]

Relay discovery service.

Discovers Nostr relay URLs from external APIs and stored database events, then inserts them as validation candidates for the Validator service via insert_candidates.

See Also

FinderConfig: Configuration model for this service. Seeder: Upstream service that provides initial seed URLs. Validator: Downstream service that validates discovered candidates.

Source code in src/bigbrotr/services/finder/service.py
def __init__(
    self,
    brotr: Brotr,
    config: FinderConfig | None = None,
) -> None:
    super().__init__(brotr=brotr, config=config)
    self._config: FinderConfig
Functions
run async
run() -> None

Execute a single discovery cycle across all configured sources.

Orchestrates discovery and cycle-level logging. Delegates the core work to find.

Source code in src/bigbrotr/services/finder/service.py
async def run(self) -> None:
    """Execute a single discovery cycle across all configured sources.

    Orchestrates discovery and cycle-level logging. Delegates the core
    work to ``find``.
    """
    self._logger.info(
        "cycle_started",
        events_enabled=self._config.events.enabled,
        api_enabled=self._config.api.enabled,
    )
    found = await self.find()
    self._logger.info("cycle_completed", found=found)
find async
find() -> int

Discover relay URLs from all configured sources.

Runs event scanning and API fetching (in that order), respecting the events.enabled and api.enabled configuration flags. Returns the total number of relay URLs inserted as candidates.

This is the method run() delegates to. It can also be called standalone without cycle-level logging.

Returns:

  • int

    Total number of relay URLs discovered and inserted.

Source code in src/bigbrotr/services/finder/service.py
async def find(self) -> int:
    """Discover relay URLs from all configured sources.

    Runs event scanning and API fetching (in that order), respecting
    the ``events.enabled`` and ``api.enabled`` configuration flags.
    Returns the total number of relay URLs inserted as candidates.

    This is the method ``run()`` delegates to. It can also be called
    standalone without cycle-level logging.

    Returns:
        Total number of relay URLs discovered and inserted.
    """
    found = 0
    found += await self.find_from_events()
    found += await self.find_from_api()
    return found
find_from_events async
find_from_events() -> int

Discover relay URLs from stored events using per-relay cursor pagination.

Scans all relays in the database concurrently (bounded by concurrency.max_parallel_events) for relay URLs embedded in tags and content fields. Each relay maintains its own cursor (based on seen_at timestamp) so that historical events inserted by the Synchronizer are still processed.

Uses asyncio.TaskGroup with a semaphore to bound concurrent database queries, following the same pattern as Synchronizer._sync_all_relays.

Controlled by events.enabled in FinderConfig.

Returns:

  • int

    Number of relay URLs discovered and inserted as candidates.

Source code in src/bigbrotr/services/finder/service.py
async def find_from_events(self) -> int:
    """Discover relay URLs from stored events using per-relay cursor pagination.

    Scans all relays in the database concurrently (bounded by
    ``concurrency.max_parallel_events``) for relay URLs embedded in tags
    and content fields. Each relay maintains its own cursor (based on
    ``seen_at`` timestamp) so that historical events inserted by the
    Synchronizer are still processed.

    Uses ``asyncio.TaskGroup`` with a semaphore to bound concurrent
    database queries, following the same pattern as
    [Synchronizer._sync_all_relays][bigbrotr.services.synchronizer.Synchronizer].

    Controlled by ``events.enabled`` in
    [FinderConfig][bigbrotr.services.finder.FinderConfig].

    Returns:
        Number of relay URLs discovered and inserted as candidates.
    """
    if not self._config.events.enabled:
        return 0

    total_events_scanned = 0
    total_relays_found = 0
    relays_processed = 0
    relays_failed = 0

    try:
        removed = await delete_orphan_cursors(self._brotr, self.SERVICE_NAME)
        if removed:
            self._logger.info("orphan_cursors_removed", count=removed)
    except (asyncpg.PostgresError, OSError) as e:
        self._logger.warning(
            "orphan_cursor_cleanup_failed", error=str(e), error_type=type(e).__name__
        )

    try:
        relay_urls = await get_all_relay_urls(self._brotr)
    except (asyncpg.PostgresError, OSError) as e:
        self._logger.warning("fetch_relays_failed", error=str(e), error_type=type(e).__name__)
        return 0

    if not relay_urls:
        self._logger.debug("no_relays_to_scan")
        return 0

    self._logger.debug("events_scan_started", relay_count=len(relay_urls))

    try:
        cursors = await self._fetch_all_cursors()
    except (asyncpg.PostgresError, OSError) as e:
        self._logger.warning("fetch_cursors_failed", error=str(e), error_type=type(e).__name__)
        return 0

    semaphore = asyncio.Semaphore(self._config.concurrency.max_parallel_events)

    async def _bounded_scan(url: str) -> tuple[int, int]:
        async with semaphore:
            if not self.is_running:
                return 0, 0
            return await self._scan_relay_events(url, cursors)

    tasks: list[asyncio.Task[tuple[int, int]]] = []
    try:
        async with asyncio.TaskGroup() as tg:
            tasks.extend(tg.create_task(_bounded_scan(relay_url)) for relay_url in relay_urls)
    except ExceptionGroup as eg:
        for exc in eg.exceptions:
            self._logger.error(
                "event_scan_worker_failed",
                error=str(exc),
                error_type=type(exc).__name__,
            )

    for task in tasks:
        if task.cancelled():
            continue
        if task.exception() is None:
            events, relays = task.result()
            total_events_scanned += events
            total_relays_found += relays
            relays_processed += 1
        else:
            relays_failed += 1

    self.set_gauge("events_scanned", total_events_scanned)
    self.set_gauge("relays_found", total_relays_found)
    self.set_gauge("relays_processed", relays_processed)
    self.set_gauge("relays_failed", relays_failed)
    self.inc_counter("total_events_scanned", total_events_scanned)
    self.inc_counter("total_relays_found", total_relays_found)

    self._logger.info(
        "events_completed",
        scanned=total_events_scanned,
        relays_found=total_relays_found,
        relays_processed=relays_processed,
        relays_failed=relays_failed,
    )
    return total_relays_found
discover_from_apis async
discover_from_apis() -> AsyncIterator[
    tuple[str, dict[str, Relay]]
]

Yield (source_url, discovered_relays) from each enabled API source.

Each yield produces the validated relay dict from one API endpoint. Connection pooling is managed internally via a shared aiohttp session. The generator handles rate limiting between sources.

Yields:

  • AsyncIterator[tuple[str, dict[str, Relay]]]

    Tuple of (source URL, dict mapping relay URL to Relay object).

Source code in src/bigbrotr/services/finder/service.py
async def discover_from_apis(self) -> AsyncIterator[tuple[str, dict[str, Relay]]]:
    """Yield ``(source_url, discovered_relays)`` from each enabled API source.

    Each yield produces the validated relay dict from one API endpoint.
    Connection pooling is managed internally via a shared aiohttp session.
    The generator handles rate limiting between sources.

    Yields:
        Tuple of (source URL, dict mapping relay URL to Relay object).
    """
    ssl_context: ssl.SSLContext | bool = True
    if not self._config.api.verify_ssl:
        ssl_context = False

    connector = aiohttp.TCPConnector(ssl=ssl_context)
    async with aiohttp.ClientSession(connector=connector) as session:
        enabled = [s for s in self._config.api.sources if s.enabled]
        for i, source in enumerate(enabled):
            if not self.is_running:
                break
            try:
                source_relays = await self._fetch_single_api(session, source)
                validated: dict[str, Relay] = {}
                for relay_url in source_relays:
                    r = parse_relay_url(str(relay_url))
                    if r:
                        validated[r.url] = r
                yield source.url, validated

                # Rate-limit (skip after last)
                if (
                    self._config.api.delay_between_requests > 0
                    and i < len(enabled) - 1
                    and await self.wait(self._config.api.delay_between_requests)
                ):
                    break
            except (TimeoutError, OSError, aiohttp.ClientError, ValueError) as e:
                self._logger.warning(
                    "api_fetch_failed",
                    error=str(e),
                    error_type=type(e).__name__,
                    url=source.url,
                )
find_from_api async
find_from_api() -> int

Discover relay URLs from configured external API endpoints.

Fetches each enabled API source via discover_from_apis, deduplicates the results, and inserts discovered URLs as validation candidates.

Controlled by api.enabled in FinderConfig.

Returns:

  • int

    Number of relay URLs inserted as candidates.

Source code in src/bigbrotr/services/finder/service.py
async def find_from_api(self) -> int:
    """Discover relay URLs from configured external API endpoints.

    Fetches each enabled API source via ``discover_from_apis``,
    deduplicates the results, and inserts discovered URLs as validation
    candidates.

    Controlled by ``api.enabled`` in
    [FinderConfig][bigbrotr.services.finder.FinderConfig].

    Returns:
        Number of relay URLs inserted as candidates.
    """
    if not self._config.api.enabled:
        return 0

    found = 0
    all_relays: dict[str, Relay] = {}
    async for source_url, relays in self.discover_from_apis():
        all_relays.update(relays)
        self._logger.debug("api_fetched", url=source_url, count=len(relays))

    if all_relays:
        try:
            found = await insert_candidates(self._brotr, all_relays.values())
        except (asyncpg.PostgresError, OSError) as e:
            self._logger.error(
                "insert_candidates_failed",
                error=str(e),
                error_type=type(e).__name__,
                count=len(all_relays),
            )

    self.set_gauge("api_relays", len(all_relays))
    self.inc_counter("total_api_relays_found", len(all_relays))

    self._logger.info("apis_completed", found=found, fetched=len(all_relays))
    return found

Functions