Skip to content

event_builders

event_builders

Nostr event builders for all NIP-compliant event kinds.

Standalone functions for constructing Nostr events from typed NIP data models. Used by the Monitor service for publishing profile (Kind 0), monitor announcement (Kind 10166), and relay discovery (Kind 30166) events.

See Also

bigbrotr.nips.nip66.data: Typed data models consumed by the Kind 30166 tag builder functions. bigbrotr.nips.nip11.data.Nip11InfoData: NIP-11 data model used for language, requirement, and type tags. bigbrotr.nips.nip66.logs.Nip66RttMultiPhaseLogs: RTT probe logs used for requirement tag derivation.

Classes

AccessFlags

Bases: NamedTuple

Relay access restriction flags derived from NIP-11 and RTT probe results.

Functions

build_profile_event

build_profile_event(
    *,
    name: str | None = None,
    about: str | None = None,
    picture: str | None = None,
    nip05: str | None = None,
    website: str | None = None,
    banner: str | None = None,
    lud16: str | None = None,
) -> EventBuilder

Build a Kind 0 profile metadata event per NIP-01.

Source code in src/bigbrotr/nips/event_builders.py
def build_profile_event(  # noqa: PLR0913
    *,
    name: str | None = None,
    about: str | None = None,
    picture: str | None = None,
    nip05: str | None = None,
    website: str | None = None,
    banner: str | None = None,
    lud16: str | None = None,
) -> EventBuilder:
    """Build a Kind 0 profile metadata event per NIP-01."""
    profile_data: dict[str, str] = {}
    if name:
        profile_data["name"] = name
    if about:
        profile_data["about"] = about
    if picture:
        profile_data["picture"] = picture
    if nip05:
        profile_data["nip05"] = nip05
    if website:
        profile_data["website"] = website
    if banner:
        profile_data["banner"] = banner
    if lud16:
        profile_data["lud16"] = lud16
    return EventBuilder.metadata(NostrMetadata.from_json(json.dumps(profile_data)))

build_relay_list_event

build_relay_list_event(relays: list[Relay]) -> EventBuilder

Build a Kind 10002 relay list metadata event per NIP-65.

Source code in src/bigbrotr/nips/event_builders.py
def build_relay_list_event(relays: list[Relay]) -> EventBuilder:
    """Build a Kind 10002 relay list metadata event per NIP-65."""
    tags = [Tag.parse(["r", relay.url, "write"]) for relay in relays]
    return EventBuilder(Kind(EventKind.RELAY_LIST), "").tags(tags)

build_monitor_announcement

build_monitor_announcement(
    *,
    interval: int,
    timeout_ms: int,
    enabled_networks: list[NetworkType],
    nip11_selection: Nip11Selection,
    nip66_selection: Nip66Selection,
    geohash: str | None = None,
) -> EventBuilder

Build a Kind 10166 monitor announcement event per NIP-66.

Source code in src/bigbrotr/nips/event_builders.py
def build_monitor_announcement(  # noqa: PLR0913
    *,
    interval: int,
    timeout_ms: int,
    enabled_networks: list[NetworkType],
    nip11_selection: Nip11Selection,
    nip66_selection: Nip66Selection,
    geohash: str | None = None,
) -> EventBuilder:
    """Build a Kind 10166 monitor announcement event per NIP-66."""
    tags = [Tag.parse(["frequency", str(interval)])]
    if geohash:
        tags.append(Tag.parse(["g", geohash]))
    tags.extend(Tag.parse(["n", network.value]) for network in enabled_networks)

    ms = str(timeout_ms)

    # Map check names to their enabled flag
    checks: list[tuple[str, bool]] = [
        ("nip11", nip11_selection.info),
        ("ssl", nip66_selection.ssl),
        ("geo", nip66_selection.geo),
        ("net", nip66_selection.net),
        ("dns", nip66_selection.dns),
        ("http", nip66_selection.http),
    ]

    # RTT has three sub-phases
    if nip66_selection.rtt:
        for name in ("open", "read", "write"):
            tags.append(Tag.parse(["timeout", name, ms]))
            tags.append(Tag.parse(["c", name]))

    for name, enabled in checks:
        if enabled:
            tags.append(Tag.parse(["timeout", name, ms]))
            tags.append(Tag.parse(["c", name]))

    return EventBuilder(Kind(EventKind.MONITOR_ANNOUNCEMENT), "").tags(tags)

add_rtt_tags

add_rtt_tags(
    tags: list[Tag], rtt_data: Nip66RttData | None
) -> None

Add round-trip time tags: rtt-open, rtt-read, rtt-write.

Source code in src/bigbrotr/nips/event_builders.py
def add_rtt_tags(tags: list[Tag], rtt_data: Nip66RttData | None) -> None:
    """Add round-trip time tags: ``rtt-open``, ``rtt-read``, ``rtt-write``."""
    if rtt_data is None:
        return
    if rtt_data.rtt_open is not None:
        tags.append(Tag.parse(["rtt-open", str(rtt_data.rtt_open)]))
    if rtt_data.rtt_read is not None:
        tags.append(Tag.parse(["rtt-read", str(rtt_data.rtt_read)]))
    if rtt_data.rtt_write is not None:
        tags.append(Tag.parse(["rtt-write", str(rtt_data.rtt_write)]))

add_ssl_tags

add_ssl_tags(
    tags: list[Tag], ssl_data: Nip66SslData | None
) -> None

Add SSL certificate tags: ssl, ssl-expires, ssl-issuer.

Source code in src/bigbrotr/nips/event_builders.py
def add_ssl_tags(tags: list[Tag], ssl_data: Nip66SslData | None) -> None:
    """Add SSL certificate tags: ``ssl``, ``ssl-expires``, ``ssl-issuer``."""
    if ssl_data is None:
        return
    if ssl_data.ssl_valid is not None:
        tags.append(Tag.parse(["ssl", "valid" if ssl_data.ssl_valid else "!valid"]))
    if ssl_data.ssl_expires is not None:
        tags.append(Tag.parse(["ssl-expires", str(ssl_data.ssl_expires)]))
    if ssl_data.ssl_issuer:
        tags.append(Tag.parse(["ssl-issuer", ssl_data.ssl_issuer]))

add_net_tags

add_net_tags(
    tags: list[Tag], net_data: Nip66NetData | None
) -> None

Add network tags: net-ip, net-ipv6, net-asn, net-asn-org.

Also emits NIP-32 l labels for ASN and ASN org, making them relay-filterable via #l subscription filters.

Source code in src/bigbrotr/nips/event_builders.py
def add_net_tags(tags: list[Tag], net_data: Nip66NetData | None) -> None:
    """Add network tags: ``net-ip``, ``net-ipv6``, ``net-asn``, ``net-asn-org``.

    Also emits NIP-32 ``l`` labels for ASN and ASN org, making them
    relay-filterable via ``#l`` subscription filters.
    """
    if net_data is None:
        return
    if net_data.net_ip:
        tags.append(Tag.parse(["net-ip", net_data.net_ip]))
    if net_data.net_ipv6:
        tags.append(Tag.parse(["net-ipv6", net_data.net_ipv6]))
    if net_data.net_asn is not None:
        tags.append(Tag.parse(["net-asn", str(net_data.net_asn)]))
        tags.append(Tag.parse(["l", str(net_data.net_asn), "IANA-asn"]))
    if net_data.net_asn_org:
        tags.append(Tag.parse(["net-asn-org", net_data.net_asn_org]))
        tags.append(Tag.parse(["l", net_data.net_asn_org, "IANA-asnOrg"]))

add_geo_tags

add_geo_tags(
    tags: list[Tag], geo_data: Nip66GeoData | None
) -> None

Add geolocation tags (g, geo-country, geo-city, etc.).

Indexed (filterable) values are emitted as both informational multi-letter tags and NIP-32 l labels so clients can filter on them:

  • country → ["l", "<ISO-3166-1>", "ISO-3166-1"]
  • city → ["l", "<city>", "nip66.label.city"] (no recognized standard)
  • timezone → ["l", "<tz>", "IANA-tz"]
Source code in src/bigbrotr/nips/event_builders.py
def add_geo_tags(tags: list[Tag], geo_data: Nip66GeoData | None) -> None:
    """Add geolocation tags (``g``, ``geo-country``, ``geo-city``, etc.).

    Indexed (filterable) values are emitted as both informational multi-letter
    tags and NIP-32 ``l`` labels so clients can filter on them:

    - country → ``["l", "<ISO-3166-1>", "ISO-3166-1"]``
    - city    → ``["l", "<city>", "nip66.label.city"]``  (no recognized standard)
    - timezone → ``["l", "<tz>", "IANA-tz"]``
    """
    if geo_data is None:
        return
    if geo_data.geo_hash:
        tags.append(Tag.parse(["g", geo_data.geo_hash]))
    if geo_data.geo_country:
        tags.append(Tag.parse(["geo-country", geo_data.geo_country]))
        tags.append(Tag.parse(["l", geo_data.geo_country, "ISO-3166-1"]))
    if geo_data.geo_city:
        tags.append(Tag.parse(["geo-city", geo_data.geo_city]))
        tags.append(Tag.parse(["l", geo_data.geo_city, "nip66.label.city"]))
    if geo_data.geo_lat is not None:
        tags.append(Tag.parse(["geo-lat", str(geo_data.geo_lat)]))
    if geo_data.geo_lon is not None:
        tags.append(Tag.parse(["geo-lon", str(geo_data.geo_lon)]))
    if geo_data.geo_tz:
        tags.append(Tag.parse(["geo-tz", geo_data.geo_tz]))
        tags.append(Tag.parse(["l", geo_data.geo_tz, "IANA-tz"]))

add_dns_tags

add_dns_tags(
    tags: list[Tag], dns_data: Nip66DnsData | None
) -> None

Add DNS resolution tags: dns-ip, dns-ip6, dns-cname, dns-ttl.

Source code in src/bigbrotr/nips/event_builders.py
def add_dns_tags(tags: list[Tag], dns_data: Nip66DnsData | None) -> None:
    """Add DNS resolution tags: ``dns-ip``, ``dns-ip6``, ``dns-cname``, ``dns-ttl``."""
    if dns_data is None:
        return
    if dns_data.dns_ips:
        tags.append(Tag.parse(["dns-ip", dns_data.dns_ips[0]]))
    if dns_data.dns_ips_v6:
        tags.append(Tag.parse(["dns-ip6", dns_data.dns_ips_v6[0]]))
    if dns_data.dns_cname:
        tags.append(Tag.parse(["dns-cname", dns_data.dns_cname]))
    if dns_data.dns_ttl is not None:
        tags.append(Tag.parse(["dns-ttl", str(dns_data.dns_ttl)]))

add_http_tags

add_http_tags(
    tags: list[Tag], http_data: Nip66HttpData | None
) -> None

Add HTTP server header tags: http-server, http-powered-by.

Source code in src/bigbrotr/nips/event_builders.py
def add_http_tags(tags: list[Tag], http_data: Nip66HttpData | None) -> None:
    """Add HTTP server header tags: ``http-server``, ``http-powered-by``."""
    if http_data is None:
        return
    if http_data.http_server:
        tags.append(Tag.parse(["http-server", http_data.http_server]))
    if http_data.http_powered_by:
        tags.append(Tag.parse(["http-powered-by", http_data.http_powered_by]))

add_attributes_tags

add_attributes_tags(
    tags: list[Tag], nip11_data: Nip11InfoData
) -> None

Add relay attribute tags (W) from NIP-11 attributes field.

Source code in src/bigbrotr/nips/event_builders.py
def add_attributes_tags(tags: list[Tag], nip11_data: Nip11InfoData) -> None:
    """Add relay attribute tags (``W``) from NIP-11 ``attributes`` field."""
    attributes = nip11_data.attributes
    if not attributes:
        return
    tags.extend(Tag.parse(["W", attr]) for attr in attributes if attr and isinstance(attr, str))

add_language_tags

add_language_tags(
    tags: list[Tag], nip11_data: Nip11InfoData
) -> None

Add ISO 639-1 language tags derived from NIP-11 language_tags field.

Source code in src/bigbrotr/nips/event_builders.py
def add_language_tags(tags: list[Tag], nip11_data: Nip11InfoData) -> None:
    """Add ISO 639-1 language tags derived from NIP-11 ``language_tags`` field."""
    language_tags = nip11_data.language_tags
    if not language_tags or "*" in language_tags:
        return
    seen_langs: set[str] = set()
    for lang in language_tags:
        primary = lang.split("-")[0].lower() if lang else ""
        if primary and len(primary) == _ISO_639_1_LENGTH and primary not in seen_langs:
            seen_langs.add(primary)
            tags.append(Tag.parse(["l", primary, "ISO-639-1"]))

add_requirement_and_type_tags

add_requirement_and_type_tags(
    tags: list[Tag],
    nip11_data: Nip11InfoData,
    rtt_logs: Nip66RttMultiPhaseLogs | None,
) -> None

Add R (requirement) and T (type) tags from NIP-11 data and RTT probe logs.

Source code in src/bigbrotr/nips/event_builders.py
def add_requirement_and_type_tags(
    tags: list[Tag],
    nip11_data: Nip11InfoData,
    rtt_logs: Nip66RttMultiPhaseLogs | None,
) -> None:
    """Add ``R`` (requirement) and ``T`` (type) tags from NIP-11 data and RTT probe logs."""
    limitation = nip11_data.limitation
    nip11_auth = limitation.auth_required or False
    nip11_payment = limitation.payment_required or False
    nip11_writes = limitation.restricted_writes or False
    pow_diff = limitation.min_pow_difficulty or 0

    write_success: bool | None = None
    write_reason = ""
    read_success: bool | None = None
    read_reason = ""
    if rtt_logs is not None:
        write_success = rtt_logs.write_success
        write_reason = (rtt_logs.write_reason or "").lower()
        read_success = rtt_logs.read_success
        read_reason = (rtt_logs.read_reason or "").lower()

    # NIP-66 probes are ground truth; NIP-11 is relay self-report (fallback only)
    if write_success is True:
        # Probe wrote successfully without auth or payment — relay is open
        auth = False
        payment = False
        writes = False
    elif write_success is False and write_reason:
        # Probe failed — trust failure reason, augment with NIP-11 where probe is ambiguous
        rtt_auth = "auth" in write_reason
        rtt_payment = "pay" in write_reason or "paid" in write_reason
        auth = bool(rtt_auth or nip11_auth)
        payment = bool(rtt_payment or nip11_payment)
        writes = bool(not rtt_auth and not rtt_payment)
    else:
        # No probe results — NIP-11 is all we have
        auth = bool(nip11_auth)
        payment = bool(nip11_payment)
        writes = bool(nip11_writes)

    read_auth = read_success is False and "auth" in read_reason

    # R tags
    tags.append(Tag.parse(["R", "auth" if auth else "!auth"]))
    tags.append(Tag.parse(["R", "payment" if payment else "!payment"]))
    tags.append(Tag.parse(["R", "writes" if writes else "!writes"]))
    tags.append(Tag.parse(["R", "pow" if pow_diff and pow_diff > 0 else "!pow"]))

    # T tags
    access = AccessFlags(payment=payment, auth=auth, writes=writes, read_auth=read_auth)
    add_type_tags(tags, nip11_data.supported_nips, access)

add_type_tags

add_type_tags(
    tags: list[Tag],
    supported_nips: list[int] | None,
    access: AccessFlags,
) -> None

Add T (type) tags classifying the relay based on NIPs and access restrictions.

Source code in src/bigbrotr/nips/event_builders.py
def add_type_tags(
    tags: list[Tag],
    supported_nips: list[int] | None,
    access: AccessFlags,
) -> None:
    """Add ``T`` (type) tags classifying the relay based on NIPs and access restrictions."""
    nips = set(supported_nips) if supported_nips else set()

    if _NIP_CAP_SEARCH in nips:
        tags.append(Tag.parse(["T", "Search"]))
    if _NIP_CAP_COMMUNITY in nips:
        tags.append(Tag.parse(["T", "Community"]))
    if _NIP_CAP_BLOSSOM in nips:
        tags.append(Tag.parse(["T", "Blob"]))

    if access.payment:
        tags.append(Tag.parse(["T", "Paid"]))

    if access.read_auth:
        if access.auth:
            tags.append(Tag.parse(["T", "PrivateStorage"]))
        else:
            tags.append(Tag.parse(["T", "PrivateInbox"]))
    elif access.auth or access.writes or access.payment:
        tags.append(Tag.parse(["T", "PublicOutbox"]))
    else:
        tags.append(Tag.parse(["T", "PublicInbox"]))

add_nip11_tags

add_nip11_tags(
    tags: list[Tag],
    nip11_data: Nip11InfoData | None,
    rtt_logs: Nip66RttMultiPhaseLogs | None = None,
) -> None

Add NIP-11-derived tags: N, t, l, R, T, W.

Source code in src/bigbrotr/nips/event_builders.py
def add_nip11_tags(
    tags: list[Tag],
    nip11_data: Nip11InfoData | None,
    rtt_logs: Nip66RttMultiPhaseLogs | None = None,
) -> None:
    """Add NIP-11-derived tags: ``N``, ``t``, ``l``, ``R``, ``T``, ``W``."""
    if nip11_data is None:
        return

    if nip11_data.supported_nips:
        tags.extend(Tag.parse(["N", str(nip)]) for nip in nip11_data.supported_nips)

    if nip11_data.tags:
        tags.extend(Tag.hashtag(topic) for topic in nip11_data.tags)

    add_language_tags(tags, nip11_data)
    add_requirement_and_type_tags(tags, nip11_data, rtt_logs)
    add_attributes_tags(tags, nip11_data)

build_relay_discovery

build_relay_discovery(
    relay: Relay,
    nip11: Nip11 | None = None,
    nip66: Nip66 | None = None,
) -> EventBuilder

Build a Kind 30166 relay discovery event per NIP-66.

Parameters:

  • relay (Relay) –

    The relay being monitored.

  • nip11 (Nip11 | None, default: None ) –

    NIP-11 relay information (None if not fetched).

  • nip66 (Nip66 | None, default: None ) –

    NIP-66 health check results (None if not run).

Returns:

  • EventBuilder

    A signed-ready EventBuilder for Kind 30166.

Source code in src/bigbrotr/nips/event_builders.py
def build_relay_discovery(
    relay: Relay,
    nip11: Nip11 | None = None,
    nip66: Nip66 | None = None,
) -> EventBuilder:
    """Build a Kind 30166 relay discovery event per NIP-66.

    Args:
        relay: The relay being monitored.
        nip11: NIP-11 relay information (``None`` if not fetched).
        nip66: NIP-66 health check results (``None`` if not run).

    Returns:
        A signed-ready ``EventBuilder`` for Kind 30166.
    """
    nip11_canonical_json = ""
    nip11_data = None
    if nip11 and nip11.info:
        meta = Metadata(type=MetadataType.NIP11_INFO, data=nip11.info.to_dict())
        nip11_canonical_json = meta.canonical_json
        nip11_data = nip11.info.data

    rtt_data = nip66.rtt.data if nip66 and nip66.rtt else None
    rtt_logs = nip66.rtt.logs if nip66 and nip66.rtt else None

    tags: list[Tag] = [
        Tag.identifier(relay.url),
        Tag.parse(["n", relay.network.value]),
    ]

    add_rtt_tags(tags, rtt_data)
    add_ssl_tags(tags, nip66.ssl.data if nip66 and nip66.ssl else None)
    add_net_tags(tags, nip66.net.data if nip66 and nip66.net else None)
    add_geo_tags(tags, nip66.geo.data if nip66 and nip66.geo else None)
    add_dns_tags(tags, nip66.dns.data if nip66 and nip66.dns else None)
    add_http_tags(tags, nip66.http.data if nip66 and nip66.http else None)
    add_nip11_tags(tags, nip11_data, rtt_logs)

    return EventBuilder(Kind(EventKind.RELAY_DISCOVERY), nip11_canonical_json).tags(tags)