Skip to content

service

service

NIP-90 Data Vending Machine service for Nostr protocol database queries.

Listens for NIP-90 job requests on configured relays, executes read-only queries via the shared Catalog, and publishes results as job-result events (request kind + 1000). Per-table pricing via TableConfig enables the NIP-90 bid/payment-required mechanism.

Each run() cycle polls for new job requests using fetch_events() with a since timestamp filter, processes them, and publishes results or error feedback.

Note

Event IDs are deduplicated in-memory (capped at 10,000) to avoid processing the same job twice within the since overlap window. The since timestamp filter provides a secondary deduplication boundary so that the in-memory set only needs to cover the current cycle.

See Also

DvmConfig: Configuration model for relays, pricing, and NIP-90 settings. Catalog: Schema introspection and query builder shared with the API service. BaseService: Abstract base class providing lifecycle and metrics.

Examples:

from bigbrotr.core import Brotr
from bigbrotr.services import Dvm

brotr = Brotr.from_yaml("config/brotr.yaml")
dvm = Dvm.from_yaml("config/services/dvm.yaml", brotr=brotr)

async with brotr:
    async with dvm:
        await dvm.run_forever()

Classes

Dvm

Dvm(brotr: Brotr, config: DvmConfig | None = None)

Bases: CatalogAccessMixin, BaseService[DvmConfig]

NIP-90 Data Vending Machine for BigBrotr database queries.

Processes NIP-90 job requests (default Kind 5050) by executing read-only database queries and publishing results (Kind 6050). Supports per-table pricing with bid/payment-required negotiation.

Lifecycle
  1. __aenter__: discover schema, create Nostr client, connect to relays, optionally publish NIP-89 announcement.
  2. run(): fetch new job requests, process each, publish results.
  3. __aexit__: disconnect client.
See Also

DvmConfig: Configuration model for this service. Api: Sibling service that exposes the same Catalog data via HTTP REST.

Source code in src/bigbrotr/services/dvm/service.py
def __init__(self, brotr: Brotr, config: DvmConfig | None = None) -> None:
    super().__init__(brotr=brotr, config=config)
    self._config: DvmConfig
    self._client: Client | None = None
    self._last_fetch_ts: int = 0
    self._processed_ids: set[str] = set()
Functions
cleanup async
cleanup() -> int

No-op: Dvm does not use service state.

Source code in src/bigbrotr/services/dvm/service.py
async def cleanup(self) -> int:
    """No-op: Dvm does not use service state."""
    return 0
run async
run() -> None

Fetch and process NIP-90 job requests for one cycle.

Captures the current timestamp before fetching so that events arriving during processing are not lost (the dedup set handles the overlap window). After processing, updates metrics and manages the dedup set size.

Source code in src/bigbrotr/services/dvm/service.py
async def run(self) -> None:
    """Fetch and process NIP-90 job requests for one cycle.

    Captures the current timestamp before fetching so that events
    arriving during processing are not lost (the dedup set handles
    the overlap window).  After processing, updates metrics and
    manages the dedup set size.
    """
    if self._client is None:
        return

    # Capture timestamp before fetching so events arriving during
    # processing are not lost (dedup set handles the overlap)
    fetch_ts = int(time.time())
    events = await self._fetch_job_requests()
    if not events:
        self._last_fetch_ts = fetch_ts
        self._report_metrics(0, 0, 0, 0)
        return

    received = 0
    processed = 0
    failed = 0
    payment_required = 0
    pubkey_hex = self._config.keys.public_key().to_hex()

    for event in events:
        r, p, f, pr = await self._process_event(event, pubkey_hex)
        received += r
        processed += p
        failed += f
        payment_required += pr

    self._manage_dedup_set()
    self._last_fetch_ts = fetch_ts
    self._report_metrics(received, processed, failed, payment_required)

Functions