Architecture¶
Comprehensive architecture reference for BigBrotr.
Overview¶
BigBrotr uses a Diamond DAG architecture. Four tiers with strict top-to-bottom import flow:
graph TD
S["<b>services</b><br/><small>Business logic</small>"]
C["<b>core</b><br/><small>Pool, Brotr, BaseService</small>"]
N["<b>nips</b><br/><small>NIP-11, NIP-66</small>"]
U["<b>utils</b><br/><small>DNS, Keys, Transport</small>"]
M["<b>models</b><br/><small>Pure dataclasses</small>"]
S --> C
S --> N
S --> U
N --> C
N --> U
C --> M
N --> M
U --> M
style S fill:#7B1FA2,color:#fff,stroke:#4A148C
style C fill:#512DA8,color:#fff,stroke:#311B92
style N fill:#512DA8,color:#fff,stroke:#311B92
style U fill:#512DA8,color:#fff,stroke:#311B92
style M fill:#311B92,color:#fff,stroke:#1A237E
| Tier | Layer | Purpose | Changes When |
|---|---|---|---|
| Top | Services | Business logic, orchestration | New features, protocol updates |
| Mid | Core | Pool, Brotr, BaseService, Logger, Metrics | Rarely |
| Mid | NIPs | NIP-11 info fetch/parse, NIP-66 health checks (I/O) | Protocol spec updates |
| Mid | Utils | DNS, keys, transport, SOCKS5 proxy | Cross-cutting needs |
| Foundation | Models | Frozen dataclasses, validation, DB mapping | Schema changes |
Deployments (deployments/{bigbrotr,lilbrotr}/) sit outside the package and configure behavior through YAML, SQL schemas, and Docker Compose.
Models Layer¶
src/bigbrotr/models/ -- Pure frozen dataclasses. Zero I/O, zero bigbrotr imports. Uses only import logging + logging.getLogger().
Data Models¶
| Model | File | Purpose |
|---|---|---|
Relay |
relay.py |
URL validation (rfc3986), network detection (clearnet/tor/i2p/loki/local), local IP rejection |
Event |
event.py |
Wraps nostr-sdk Event, extracts hex fields, tag parsing |
EventRelay |
event_relay.py |
Event-relay junction with seen_at timestamp |
Metadata |
metadata.py |
Content-addressed metadata: SHA-256 hash over canonical JSON |
RelayMetadata |
relay_metadata.py |
Relay-metadata junction with metadata_type and generated_at |
ServiceState |
service_state.py |
Per-service operational state (candidates, cursors, checkpoints) |
Enumerations¶
| Type | File | Values |
|---|---|---|
NetworkType |
constants.py |
clearnet, tor, i2p, loki, local, unknown |
MetadataType |
metadata.py |
nip11_info, nip66_rtt, nip66_ssl, nip66_geo, nip66_net, nip66_dns, nip66_http |
ServiceStateType |
service_state.py |
candidate, cursor, checkpoint |
ServiceName |
constants.py |
seeder, finder, validator, monitor, synchronizer |
EventKind |
constants.py |
SET_METADATA=0, RECOMMEND_RELAY=2, CONTACTS=3, RELAY_LIST=10002, NIP66_TEST=22456, MONITOR_ANNOUNCEMENT=10166, RELAY_DISCOVERY=30166 |
Model Patterns¶
All models follow the same frozen dataclass pattern:
@dataclass(frozen=True, slots=True)
class Relay:
url: str
discovered_at: int
_db_params: RelayDbParams = field(init=False, repr=False)
def __post_init__(self) -> None:
# Validate and compute derived fields
parsed = rfc3986.uri_reference(self.url)
network = _detect_network(parsed.host)
object.__setattr__(self, "network", network)
object.__setattr__(self, "_db_params", self._compute_db_params())
def _compute_db_params(self) -> RelayDbParams: ...
def to_db_params(self) -> RelayDbParams:
return self._db_params # cached, no recomputation
@classmethod
def from_db_params(cls, url: str, ...) -> Relay: ...
Key characteristics:
frozen=True+slots=Trueon all models_compute_db_params()runs once in__post_init__, cached in_db_paramsobject.__setattr__for setting fields in frozen__post_init__from_db_params()classmethod for reconstruction from database rowsto_db_params()returns a typed NamedTuple matching stored procedure parameter order
ServiceState¶
ServiceState follows the same DbParams pattern as other models, with typed ServiceName and ServiceStateType enum fields.
@dataclass(frozen=True, slots=True)
class ServiceState:
service_name: ServiceName
state_type: ServiceStateType
state_key: str
state_value: dict[str, Any]
updated_at: int
Core Layer¶
src/bigbrotr/core/ -- Reusable infrastructure with zero business logic. Depends only on models.
Pool (pool.py)¶
Async PostgreSQL connection pool via asyncpg with retry/backoff, health-checked acquisition.
Configuration:
class PoolConfig(BaseModel):
database: DatabaseConfig # host, port, database, user, password_env (per-service overrides)
limits: LimitsConfig # min_size, max_size, max_queries, max_inactive_connection_lifetime
timeouts: TimeoutsConfig # acquisition
retry: RetryConfig # max_attempts, initial_delay, max_delay, exponential_backoff
server_settings: ServerSettingsConfig # application_name (auto-set), timezone, statement_timeout
API Reference
See bigbrotr.core.pool for the complete Pool API.
Key methods:
| Method | Purpose |
|---|---|
connect() |
Create asyncpg pool with retry backoff |
close() |
Idempotent pool teardown |
acquire() |
Get connection from pool |
transaction() |
Async context manager for ACID transactions |
fetch(), fetchrow(), fetchval() |
Query methods with automatic retry on transient errors |
execute() |
Mutation method |
metrics (property) |
Pool statistics: size, utilization, is_connected |
Connection pooling flow:
sequenceDiagram
participant S as Service
participant P as Pool
participant PGB as PGBouncer
participant PG as PostgreSQL
S->>P: acquire()
P->>P: get connection from pool
P-->>S: connection
S->>P: fetch() / execute()
P->>PGB: SQL query
PGB->>PG: forward
alt success
PG-->>PGB: result
PGB-->>P: result
P-->>S: result
else transient error
PG-->>P: error
P->>P: retry with backoff
P->>PGB: retry query
PGB->>PG: forward
PG-->>PGB: result
PGB-->>P: result
P-->>S: result
end
In Docker deployments, services connect to PGBouncer (port 6432/6433) which provides infrastructure-level connection pooling in transaction mode. Pool provides application-level retry, health checking, and query methods.
Brotr (brotr.py)¶
High-level database facade. Wraps stored procedures via _call_procedure(). Provides generic query methods that services use.
API Reference
See bigbrotr.core.brotr for the complete Brotr API.
Configuration:
class BrotrConfig(BaseModel):
batch: BatchConfig # max_size (default 1000)
timeouts: TimeoutsConfig # query (60s), batch (120s), cleanup (90s), refresh (None)
Insert operations (all accept lists, auto-batch by batch.max_size):
| Method | Stored Function Called | Cascade |
|---|---|---|
insert_relay(relays) |
relay_insert |
-- |
insert_event(events) |
event_insert |
-- |
insert_metadata(metadata) |
metadata_insert |
-- |
insert_event_relay(records, cascade=True) |
event_relay_insert_cascade |
Relay + Event + Junction |
insert_relay_metadata(records, cascade=True) |
relay_metadata_insert_cascade |
Relay + Metadata + Junction |
Service state:
| Method | Stored Function Called |
|---|---|
upsert_service_state(records) |
service_state_upsert |
get_service_state(service, type, key?) |
service_state_get |
delete_service_state(keys) |
service_state_delete |
Cleanup and maintenance:
| Method | Stored Function Called |
|---|---|
delete_orphan_event() |
orphan_event_delete |
delete_orphan_metadata() |
orphan_metadata_delete |
refresh_materialized_view(name) |
{name}_refresh |
Generic query facade (used by services for ad-hoc queries):
fetch(query, *args, timeout)->list[Record]fetchrow(query, *args, timeout)->Record | Nonefetchval(query, *args, timeout)->Anyexecute(query, *args, timeout)->strtransaction()-> async context manager yielding a connection
Warning
Brotr._pool is private -- services use Brotr methods, never pool directly.
BaseService (base_service.py)¶
Abstract base class for all six services. Generic over configuration type.
class BaseService(ABC, Generic[ConfigT]):
SERVICE_NAME: ClassVar[ServiceName]
CONFIG_CLASS: ClassVar[type[BaseModel]]
_brotr: Brotr
_config: ConfigT
API Reference
See bigbrotr.core.base_service for the complete BaseService API.
Lifecycle:
| Method | Purpose |
|---|---|
run() |
Abstract -- single cycle logic |
run_forever() |
Loop: run() -> wait(interval) -> repeat. Tracks consecutive failures. |
request_shutdown() |
Sync-safe flag for signal handlers |
wait(timeout) -> bool |
Interruptible sleep. Returns True if shutdown was requested. |
is_running (property) |
True until shutdown requested |
Factory methods:
from_yaml(config_path, brotr, **kwargs)-> configured service instancefrom_dict(data, brotr, **kwargs)-> configured service instance
Metrics integration:
set_gauge(name, value)-- custom Prometheus gaugeinc_counter(name, value=1)-- custom Prometheus counter- Automatic cycle duration tracking via
CYCLE_DURATION_SECONDShistogram
Context manager (async with service:) handles lifecycle setup/teardown.
Graceful shutdown flow:
sequenceDiagram
participant OS as OS Signal
participant H as Signal Handler
participant S as Service
participant R as run_forever()
OS->>H: SIGTERM
H->>S: request_shutdown()
S->>S: set _shutdown_event
loop Service Loop
R->>S: run() (single cycle)
S-->>R: cycle complete
R->>S: wait(interval)
S->>S: check _shutdown_event
S-->>R: True (shutdown requested)
end
R->>S: __aexit__()
S->>S: cleanup resources
Logger (logger.py)¶
Structured key=value logging with optional JSON output mode.
logger = Logger("synchronizer")
logger.info("sync_completed", events=1500, duration=45.2)
# Output: 2026-02-09 12:00:00 INFO synchronizer: sync_completed events=1500 duration=45.2
JSON mode output for cloud aggregation:
{"timestamp": "2026-02-09T12:34:56+00:00", "level": "info", "service": "synchronizer", "message": "sync_completed", "events": 1500}
Metrics (metrics.py)¶
Prometheus metrics served on /metrics (port 8000).
| Metric | Type | Labels | Description |
|---|---|---|---|
service_info |
Info | service | Static service metadata |
service_gauge |
Gauge | service, name | Point-in-time state (consecutive_failures, progress, last_cycle_timestamp) |
service_counter |
Counter | service, name | Cumulative totals (cycles_success, cycles_failed, errors) |
cycle_duration_seconds |
Histogram | service | Cycle latency, 10 buckets (1s to 1h) |
Note
For details on alerting and dashboard setup, see Monitoring.
YAML Loader (yaml.py)¶
YAML configuration loading with environment variable interpolation.
NIPs Layer¶
src/bigbrotr/nips/ -- NIP-11 and NIP-66 protocol implementations. Has I/O (HTTP, DNS, SSL, WebSocket, GeoIP). Depends on models, utils, core.
NIP-11 (nip11/)¶
Relay Information Document (NIP-11) fetch and parse.
| Module | Purpose |
|---|---|
fetch.py |
HTTP GET to relay URL with Accept: application/nostr+json |
data.py |
Parsed NIP-11 document fields (name, description, pubkey, contact, supported_nips, etc.) |
logs.py |
Structured logging for NIP-11 operations |
nip11.py |
Nip11 orchestrator class |
NIP-66 (nip66/)¶
Relay Monitoring and Discovery (NIP-66) health check implementations.
| Module | What It Measures |
|---|---|
rtt.py |
WebSocket round-trip times: open, read, write latency (ms) |
ssl.py |
Certificate validity, expiry date, issuer, cipher suite |
dns.py |
A/AAAA/CNAME/NS/PTR records, query time |
geo.py |
Country, city, coordinates, timezone, geohash (via GeoLite2) |
net.py |
IP address, ASN number, ASN organization (via GeoLite2 ASN) |
http.py |
Server header, X-Powered-By header |
nip66.py |
Nip66 orchestrator class |
Each module produces a RelayMetadata object with the corresponding MetadataType. The Monitor service calls these and persists results.
NIP-66 health check data flow:
flowchart LR
R["Relay URL"]
R --> RTT["rtt.py<br/><small>WebSocket RTT</small>"]
R --> SSL["ssl.py<br/><small>Certificate</small>"]
R --> DNS["dns.py<br/><small>DNS Records</small>"]
R --> GEO["geo.py<br/><small>GeoIP</small>"]
R --> NET["net.py<br/><small>ASN/IP</small>"]
R --> HTTP["http.py<br/><small>HTTP Headers</small>"]
RTT --> RM["RelayMetadata"]
SSL --> RM
DNS --> RM
GEO --> RM
NET --> RM
HTTP --> RM
style R fill:#7B1FA2,color:#fff,stroke:#4A148C
style RM fill:#311B92,color:#fff,stroke:#1A237E
Utils Layer¶
src/bigbrotr/utils/ -- Shared utilities. Depends only on models.
| Module | Key Exports | Purpose |
|---|---|---|
transport.py |
connect_relay(), is_nostr_relay(), create_client(), create_insecure_client() |
WebSocket/HTTP transport with SOCKS5 proxy and SSL fallback |
keys.py |
load_keys_from_env(), KeysConfig |
Nostr key management (hex/nsec loading from env) |
dns.py |
DNS resolution utilities | dnspython wrapper |
Transport SSL Fallback¶
connect_relay() implements a two-phase connection strategy:
flowchart TD
A["connect_relay(url)"] --> B{Network type?}
B -->|Clearnet| C["Try standard SSL"]
B -->|Overlay<br/>Tor/I2P/Loki| G["Require proxy_url"]
C --> D{Success?}
D -->|Yes| E["Return connection"]
D -->|No| F{allow_insecure?}
F -->|Yes| H["Try insecure SSL<br/>(skip verification)"]
F -->|No| I["Raise SSLError"]
H --> J{Success?}
J -->|Yes| E
J -->|No| I
G --> K["Connect via SOCKS5<br/>(no SSL fallback)"]
K --> E
style A fill:#7B1FA2,color:#fff,stroke:#4A148C
style E fill:#1B5E20,color:#fff,stroke:#0D3B0F
style I fill:#B71C1C,color:#fff,stroke:#7F0000
InsecureWebSocketAdapter and InsecureWebSocketTransport handle relays with invalid certificates.
Service Layer¶
src/bigbrotr/services/ -- Business logic. Depends on core, nips, utils, models.
Note
For a detailed walkthrough of each service and data flow, see Services.
Service Architecture Pattern¶
All six services follow the same pattern:
class MyService(BaseService[MyServiceConfig]):
SERVICE_NAME = "myservice"
CONFIG_CLASS = MyServiceConfig
def __init__(self, brotr: Brotr, config: MyServiceConfig | None = None):
super().__init__(brotr=brotr, config=config or MyServiceConfig())
async def run(self) -> None:
"""Single cycle -- called repeatedly by run_forever()."""
...
Configuration classes inherit from BaseServiceConfig which provides:
interval: float(default 300s, minimum 60s)max_consecutive_failures: int(default 5, 0=unlimited)metrics: MetricsConfig
Shared Infrastructure (services/common/)¶
| Module | Purpose |
|---|---|
queries.py |
15 domain SQL query functions |
mixins.py |
ChunkProgress, NetworkSemaphores, GeoReaders + cooperative-inheritance mixins |
configs.py |
Per-network Pydantic config models |
Domain Query Functions (15 total in queries.py):
| Function | Purpose |
|---|---|
get_all_relay_urls(brotr) |
All relay URLs |
get_all_relays(brotr) |
All relays with network + discovered_at |
filter_new_relay_urls(brotr, urls) |
URLs not yet in relay table |
insert_relays(brotr, relays) |
Insert relays directly into relay table |
count_relays_due_for_check(brotr, ...) |
Count relays needing health check |
fetch_relays_due_for_check(brotr, ...) |
Fetch relays needing health check |
fetch_event_tagvalues(brotr, ...) |
Event tagvalues for relay URL extraction |
insert_candidates(brotr, relays) |
Insert new validation candidates (filters duplicates) |
count_candidates(brotr, networks) |
Count pending candidates |
fetch_candidate_chunk(brotr, ...) |
Fetch candidate batch for validation |
delete_stale_candidates(brotr) |
Remove candidates already in relay table |
delete_exhausted_candidates(brotr, ...) |
Remove candidates exceeding max_failures |
promote_candidates(brotr, relays) |
Move validated candidates to relay table |
get_all_service_cursors(brotr, ...) |
Get sync cursors for all relays |
delete_orphan_cursors(brotr, ...) |
Delete cursors for relays no longer in the relay table |
Network Configuration (configs.py):
| Config | Default Enabled | Proxy | Max Tasks | Timeout |
|---|---|---|---|---|
ClearnetConfig |
Yes | None | 50 | 10s |
TorConfig |
No | socks5://tor:9050 |
10 | 30s |
I2pConfig |
No | socks5://i2p:4447 |
5 | 45s |
LokiConfig |
No | socks5://lokinet:1080 |
5 | 30s |
NetworksConfig wraps all four and provides get(network), is_enabled(network), get_proxy_url(network), get_enabled_networks().
ChunkProgress (mixins.py):
Mutable tracker for chunk-based processing (not frozen -- has reset() method):
@dataclass(slots=True)
class ChunkProgress:
started_at: float # wall-clock
total: int
processed: int
success: int
failed: int
chunks: int
NetworkSemaphores (mixins.py): creates one asyncio.Semaphore per enabled network, limiting concurrency to max_tasks.
GeoReaders (mixins.py): lifecycle manager for GeoIP2 database readers (city + ASN), with open() and close() methods.
Data Flow¶
Metadata Deduplication¶
Metadata is content-addressed: SHA-256 hash over canonical JSON. When the Monitor produces identical metadata for a relay, only a new relay_metadata row is inserted (linking the relay to the existing metadata record). The cascade function handles this:
flowchart TD
A["Monitor._check_one(relay)"] --> B["CheckResult<br/><small>7 metadata types</small>"]
B --> C["insert_relay_metadata<br/>(records, cascade=True)"]
C --> D["relay_metadata_insert_cascade"]
D --> E["1. INSERT INTO relay<br/>ON CONFLICT DO NOTHING"]
D --> F["2. INSERT INTO metadata<br/>ON CONFLICT DO NOTHING<br/><small>(dedup by hash)</small>"]
D --> G["3. INSERT INTO relay_metadata<br/><small>(time-series link)</small>"]
style A fill:#7B1FA2,color:#fff,stroke:#4A148C
style D fill:#512DA8,color:#fff,stroke:#311B92
Concurrency Model¶
Async I/O¶
All I/O is async:
asyncpgfor databaseaiohttpfor HTTPaiohttp-socksfor SOCKS5 proxynostr-sdkfor WebSocket (via Rust FFI/PyO3)
Connection Pooling¶
flowchart LR
subgraph "Writer Services"
F["Finder"]
V["Validator"]
MO["Monitor"]
SY["Synchronizer"]
RE["Refresher"]
end
subgraph "Per-Service asyncpg Pool"
APW["Writer Pool<br/><small>per-service sizing</small>"]
end
subgraph PGBouncer
PBW["Writer pool<br/><small>pool_size=10</small>"]
PBR["Reader pool<br/><small>pool_size=8</small>"]
end
subgraph PostgreSQL
PG["Database<br/><small>max_connections=200</small>"]
end
F --> APW
V --> APW
MO --> APW
SY --> APW
RE --> APW
APW --> PBW
PBW --> PG
PBR --> PG
style APW fill:#512DA8,color:#fff,stroke:#311B92
style PBW fill:#1565C0,color:#fff,stroke:#0D47A1
style PBR fill:#1565C0,color:#fff,stroke:#0D47A1
style PG fill:#1B5E20,color:#fff,stroke:#0D3B0F
Each service has its own asyncpg pool with per-service sizing (via pool overrides in service config). PGBouncer provides two database pools: a writer pool for services and a reader pool for read-only consumers (postgres-exporter, future API/DVM).
Per-Network Semaphores¶
Services that contact relays (Validator, Monitor, Synchronizer) use NetworkSemaphoreMixin to limit concurrent connections per network type:
| Network | Default Max Tasks | Default Timeout |
|---|---|---|
| Clearnet | 50 | 10s |
| Tor | 10 | 30s |
| I2P | 5 | 45s |
| Lokinet | 5 | 30s |
Design Patterns¶
Dependency Injection¶
Services receive Brotr via constructor, enabling mock injection for testing:
service = Monitor(brotr=brotr, config=config)
# Testing
mock_brotr = MagicMock(spec=Brotr)
service = Monitor(brotr=mock_brotr)
Composition over Inheritance¶
Brotr HAS-A Pool (not IS-A). Services HAS-A Brotr. This enables independent testing and lifecycle management.
Template Method¶
BaseService.run_forever() calls the abstract run() method in a loop with interval, failure tracking, and shutdown checks. Subclasses implement only run().
Mixins for Cross-Cutting Concerns¶
Monitor uses multiple mixins to compose behavior:
ChunkProgressMixin-- chunk processing tracking (fromservices/common/mixins.py)NetworkSemaphoresMixin-- per-network concurrency (fromservices/common/mixins.py)GeoReaderMixin-- GeoIP database lifecycle (fromservices/common/mixins.py)
Content-Addressed Deduplication¶
Metadata uses SHA-256 hash as primary key. Identical NIP-11/NIP-66 results across time or relays produce the same hash, deduplicating storage. Time-series tracking is via the relay_metadata junction table.
Factory Methods¶
All services support three construction paths:
service = Monitor.from_yaml("config.yaml", brotr=brotr) # from YAML file
service = Monitor.from_dict(config_dict, brotr=brotr) # from dict
service = Monitor(brotr=brotr, config=MonitorConfig(...)) # direct
Context Managers¶
Resources are automatically managed:
async with brotr: # connect pool on enter, close on exit
async with service: # lifecycle setup/teardown
await service.run_forever()
Configuration System¶
Configuration Hierarchy¶
flowchart TD
A["deployments/bigbrotr/config/"]
A --> B["brotr.yaml<br/><small>Pool, batch, timeouts</small>"]
A --> C["services/"]
C --> D["seeder.yaml"]
C --> E["finder.yaml"]
C --> F["validator.yaml"]
C --> G["monitor.yaml"]
C --> H["synchronizer.yaml"]
C --> I["refresher.yaml"]
style A fill:#7B1FA2,color:#fff,stroke:#4A148C
style B fill:#512DA8,color:#fff,stroke:#311B92
style C fill:#512DA8,color:#fff,stroke:#311B92
Note
For the complete configuration reference with all fields, defaults, and examples, see Configuration.
Pydantic Validation¶
All configuration uses Pydantic v2 models with:
- Typed fields with defaults and constraints (
Field(ge=1, le=65535)) - Nested models (e.g.,
MonitorConfig.networks.clearnet.timeout) model_validatorfor cross-field validation (e.g., Monitor keys at config load time)- Environment variable injection for secrets (
DB_WRITER_PASSWORD,DB_READER_PASSWORD,PRIVATE_KEY)
Environment Variables¶
| Variable | Required | Used By |
|---|---|---|
DB_ADMIN_PASSWORD |
Yes | PostgreSQL admin, PGBouncer auth |
DB_WRITER_PASSWORD |
Yes | Writer services (via per-service pool overrides) |
DB_READER_PASSWORD |
Yes | Read-only services (postgres-exporter, future API/DVM) |
PRIVATE_KEY |
For Monitor | Monitor (Nostr event signing, RTT write tests) |
GRAFANA_PASSWORD |
No | Grafana (admin password) |
Testing Architecture¶
Test Structure¶
tests/
├── conftest.py # Root: mock_pool, mock_brotr, mock_connection, sample_*
├── fixtures/
│ └── relays.py # Shared relay fixtures (registered as pytest plugin)
├── unit/ # ~2300 tests, mirrors src/ structure
│ ├── core/ # test_pool.py, test_brotr.py, test_base_service.py, ...
│ ├── models/ # test_relay.py, test_event.py, test_metadata.py, ...
│ ├── nips/ # nip11/, nip66/
│ ├── services/ # test_monitor.py, test_synchronizer.py, ...
│ └── utils/ # test_transport.py, test_keys.py, ...
└── integration/ # ~95 tests (testcontainers PostgreSQL)
Test Configuration¶
asyncio_mode = "auto"-- no@pytest.mark.asyncioneeded- Global timeout:
--timeout=120per test - Coverage threshold:
fail_under = 80(branch coverage enabled) - Fixtures registered via
pytest_plugins = ["tests.fixtures.relays"] - Custom markers:
integration,unit,slow
Mock Patterns¶
- Mock targets use
bigbrotr.prefix:@patch("bigbrotr.services.validator.is_nostr_relay") - Service tests mock query functions at the service module namespace (not
bigbrotr.core.queries.*) - Root conftest provides:
mock_pool,mock_brotr,mock_connection,mock_asyncpg_pool,sample_event,sample_relay,sample_metadata,sample_events_batch,sample_relays_batch
Related Documentation¶
- Services -- Deep dive into the six independent services and data flow
- Configuration -- Complete YAML configuration reference
- Database -- PostgreSQL schema, stored functions, and indexes
- Monitoring -- Prometheus metrics, alerting, and Grafana dashboards