Skip to content

event_builders

event_builders

Nostr event builders for all NIP-compliant event kinds.

Standalone functions for constructing Nostr events from typed NIP data models. Used by the Monitor service for publishing profile (Kind 0), monitor announcement (Kind 10166), and relay discovery (Kind 30166) events.

See Also

bigbrotr.nips.nip66.data: Typed data models consumed by the Kind 30166 tag builder functions. bigbrotr.nips.nip11.data.Nip11InfoData: NIP-11 data model used for language, requirement, and type tags. bigbrotr.nips.nip66.logs.Nip66RttMultiPhaseLogs: RTT probe logs used for requirement tag derivation.

Classes

AccessFlags

Bases: NamedTuple

Relay access restriction flags derived from NIP-11 and RTT probe results.

Functions

build_profile_event

build_profile_event(
    *,
    name: str | None = None,
    about: str | None = None,
    picture: str | None = None,
    nip05: str | None = None,
    website: str | None = None,
    banner: str | None = None,
    lud16: str | None = None,
) -> EventBuilder

Build a Kind 0 profile metadata event per NIP-01.

Source code in src/bigbrotr/nips/event_builders.py
def build_profile_event(  # noqa: PLR0913
    *,
    name: str | None = None,
    about: str | None = None,
    picture: str | None = None,
    nip05: str | None = None,
    website: str | None = None,
    banner: str | None = None,
    lud16: str | None = None,
) -> EventBuilder:
    """Build a Kind 0 profile metadata event per NIP-01."""
    profile_data: dict[str, str] = {}
    if name:
        profile_data["name"] = name
    if about:
        profile_data["about"] = about
    if picture:
        profile_data["picture"] = picture
    if nip05:
        profile_data["nip05"] = nip05
    if website:
        profile_data["website"] = website
    if banner:
        profile_data["banner"] = banner
    if lud16:
        profile_data["lud16"] = lud16
    return EventBuilder.metadata(NostrMetadata.from_json(json.dumps(profile_data)))

build_monitor_announcement

build_monitor_announcement(
    *,
    interval: int,
    timeout_ms: int,
    enabled_networks: list[NetworkType],
    nip11_selection: Nip11Selection,
    nip66_selection: Nip66Selection,
) -> EventBuilder

Build a Kind 10166 monitor announcement event per NIP-66.

Source code in src/bigbrotr/nips/event_builders.py
def build_monitor_announcement(
    *,
    interval: int,
    timeout_ms: int,
    enabled_networks: list[NetworkType],
    nip11_selection: Nip11Selection,
    nip66_selection: Nip66Selection,
) -> EventBuilder:
    """Build a Kind 10166 monitor announcement event per NIP-66."""
    tags = [Tag.parse(["frequency", str(interval)])]
    tags.extend(Tag.parse(["n", network.value]) for network in enabled_networks)

    ms = str(timeout_ms)

    # Timeout tags — only NIP-66 check types that have timeouts
    if nip66_selection.rtt:
        tags.extend(Tag.parse(["timeout", name, ms]) for name in ("open", "read", "write"))
    if nip11_selection.info:
        tags.append(Tag.parse(["timeout", "nip11", ms]))
    if nip66_selection.ssl:
        tags.append(Tag.parse(["timeout", "ssl", ms]))

    # Capability tags — all NIP-66 check types (includes geo/net which have no timeout)
    if nip66_selection.rtt:
        tags.extend(Tag.parse(["c", name]) for name in ("open", "read", "write"))
    if nip11_selection.info:
        tags.append(Tag.parse(["c", "nip11"]))
    if nip66_selection.ssl:
        tags.append(Tag.parse(["c", "ssl"]))
    if nip66_selection.geo:
        tags.append(Tag.parse(["c", "geo"]))
    if nip66_selection.net:
        tags.append(Tag.parse(["c", "net"]))

    return EventBuilder(Kind(EventKind.MONITOR_ANNOUNCEMENT), "").tags(tags)

add_rtt_tags

add_rtt_tags(
    tags: list[Tag], rtt_data: Nip66RttData | None
) -> None

Add round-trip time tags: rtt-open, rtt-read, rtt-write.

Source code in src/bigbrotr/nips/event_builders.py
def add_rtt_tags(tags: list[Tag], rtt_data: Nip66RttData | None) -> None:
    """Add round-trip time tags: ``rtt-open``, ``rtt-read``, ``rtt-write``."""
    if rtt_data is None:
        return
    if rtt_data.rtt_open is not None:
        tags.append(Tag.parse(["rtt-open", str(rtt_data.rtt_open)]))
    if rtt_data.rtt_read is not None:
        tags.append(Tag.parse(["rtt-read", str(rtt_data.rtt_read)]))
    if rtt_data.rtt_write is not None:
        tags.append(Tag.parse(["rtt-write", str(rtt_data.rtt_write)]))

add_ssl_tags

add_ssl_tags(
    tags: list[Tag], ssl_data: Nip66SslData | None
) -> None

Add SSL certificate tags: ssl, ssl-expires, ssl-issuer.

Source code in src/bigbrotr/nips/event_builders.py
def add_ssl_tags(tags: list[Tag], ssl_data: Nip66SslData | None) -> None:
    """Add SSL certificate tags: ``ssl``, ``ssl-expires``, ``ssl-issuer``."""
    if ssl_data is None:
        return
    if ssl_data.ssl_valid is not None:
        tags.append(Tag.parse(["ssl", "valid" if ssl_data.ssl_valid else "!valid"]))
    if ssl_data.ssl_expires is not None:
        tags.append(Tag.parse(["ssl-expires", str(ssl_data.ssl_expires)]))
    if ssl_data.ssl_issuer:
        tags.append(Tag.parse(["ssl-issuer", ssl_data.ssl_issuer]))

add_net_tags

add_net_tags(
    tags: list[Tag], net_data: Nip66NetData | None
) -> None

Add network tags: net-ip, net-ipv6, net-asn, net-asn-org.

Source code in src/bigbrotr/nips/event_builders.py
def add_net_tags(tags: list[Tag], net_data: Nip66NetData | None) -> None:
    """Add network tags: ``net-ip``, ``net-ipv6``, ``net-asn``, ``net-asn-org``."""
    if net_data is None:
        return
    if net_data.net_ip:
        tags.append(Tag.parse(["net-ip", net_data.net_ip]))
    if net_data.net_ipv6:
        tags.append(Tag.parse(["net-ipv6", net_data.net_ipv6]))
    if net_data.net_asn is not None:
        tags.append(Tag.parse(["net-asn", str(net_data.net_asn)]))
    if net_data.net_asn_org:
        tags.append(Tag.parse(["net-asn-org", net_data.net_asn_org]))

add_geo_tags

add_geo_tags(
    tags: list[Tag], geo_data: Nip66GeoData | None
) -> None

Add geolocation tags (g, geo-country, geo-city, etc.).

Source code in src/bigbrotr/nips/event_builders.py
def add_geo_tags(tags: list[Tag], geo_data: Nip66GeoData | None) -> None:
    """Add geolocation tags (``g``, ``geo-country``, ``geo-city``, etc.)."""
    if geo_data is None:
        return
    if geo_data.geo_hash:
        tags.append(Tag.parse(["g", geo_data.geo_hash]))
    if geo_data.geo_country:
        tags.append(Tag.parse(["geo-country", geo_data.geo_country]))
    if geo_data.geo_city:
        tags.append(Tag.parse(["geo-city", geo_data.geo_city]))
    if geo_data.geo_lat is not None:
        tags.append(Tag.parse(["geo-lat", str(geo_data.geo_lat)]))
    if geo_data.geo_lon is not None:
        tags.append(Tag.parse(["geo-lon", str(geo_data.geo_lon)]))
    if geo_data.geo_tz:
        tags.append(Tag.parse(["geo-tz", geo_data.geo_tz]))

add_language_tags

add_language_tags(
    tags: list[Tag], nip11_data: Nip11InfoData
) -> None

Add ISO 639-1 language tags derived from NIP-11 language_tags field.

Source code in src/bigbrotr/nips/event_builders.py
def add_language_tags(tags: list[Tag], nip11_data: Nip11InfoData) -> None:
    """Add ISO 639-1 language tags derived from NIP-11 ``language_tags`` field."""
    language_tags = nip11_data.language_tags
    if not language_tags or "*" in language_tags:
        return
    seen_langs: set[str] = set()
    for lang in language_tags:
        primary = lang.split("-")[0].lower() if lang else ""
        if primary and len(primary) == _ISO_639_1_LENGTH and primary not in seen_langs:
            seen_langs.add(primary)
            tags.append(Tag.parse(["l", primary, "ISO-639-1"]))

add_requirement_and_type_tags

add_requirement_and_type_tags(
    tags: list[Tag],
    nip11_data: Nip11InfoData,
    rtt_logs: Nip66RttMultiPhaseLogs | None,
) -> None

Add R (requirement) and T (type) tags from NIP-11 data and RTT probe logs.

Source code in src/bigbrotr/nips/event_builders.py
def add_requirement_and_type_tags(
    tags: list[Tag],
    nip11_data: Nip11InfoData,
    rtt_logs: Nip66RttMultiPhaseLogs | None,
) -> None:
    """Add ``R`` (requirement) and ``T`` (type) tags from NIP-11 data and RTT probe logs."""
    limitation = nip11_data.limitation
    nip11_auth = limitation.auth_required or False
    nip11_payment = limitation.payment_required or False
    nip11_writes = limitation.restricted_writes or False
    pow_diff = limitation.min_pow_difficulty or 0

    write_success: bool | None = None
    write_reason = ""
    read_success: bool | None = None
    read_reason = ""
    if rtt_logs is not None:
        write_success = rtt_logs.write_success
        write_reason = (rtt_logs.write_reason or "").lower()
        read_success = rtt_logs.read_success
        read_reason = (rtt_logs.read_reason or "").lower()

    # NIP-66 probes are ground truth; NIP-11 is relay self-report (fallback only)
    if write_success is True:
        # Probe wrote successfully without auth or payment — relay is open
        auth = False
        payment = False
        writes = False
    elif write_success is False and write_reason:
        # Probe failed — trust failure reason, augment with NIP-11 where probe is ambiguous
        rtt_auth = "auth" in write_reason
        rtt_payment = "pay" in write_reason or "paid" in write_reason
        auth = bool(rtt_auth or nip11_auth)
        payment = bool(rtt_payment or nip11_payment)
        writes = bool(not rtt_auth and not rtt_payment)
    else:
        # No probe results — NIP-11 is all we have
        auth = bool(nip11_auth)
        payment = bool(nip11_payment)
        writes = bool(nip11_writes)

    read_auth = read_success is False and "auth" in read_reason

    # R tags
    tags.append(Tag.parse(["R", "auth" if auth else "!auth"]))
    tags.append(Tag.parse(["R", "payment" if payment else "!payment"]))
    tags.append(Tag.parse(["R", "writes" if writes else "!writes"]))
    tags.append(Tag.parse(["R", "pow" if pow_diff and pow_diff > 0 else "!pow"]))

    # T tags
    access = AccessFlags(payment=payment, auth=auth, writes=writes, read_auth=read_auth)
    add_type_tags(tags, nip11_data.supported_nips, access)

add_type_tags

add_type_tags(
    tags: list[Tag],
    supported_nips: list[int] | None,
    access: AccessFlags,
) -> None

Add T (type) tags classifying the relay based on NIPs and access restrictions.

Source code in src/bigbrotr/nips/event_builders.py
def add_type_tags(
    tags: list[Tag],
    supported_nips: list[int] | None,
    access: AccessFlags,
) -> None:
    """Add ``T`` (type) tags classifying the relay based on NIPs and access restrictions."""
    nips = set(supported_nips) if supported_nips else set()

    if _NIP_CAP_SEARCH in nips:
        tags.append(Tag.parse(["T", "Search"]))
    if _NIP_CAP_COMMUNITY in nips:
        tags.append(Tag.parse(["T", "Community"]))
    if _NIP_CAP_BLOSSOM in nips:
        tags.append(Tag.parse(["T", "Blob"]))

    if access.payment:
        tags.append(Tag.parse(["T", "Paid"]))

    if access.read_auth:
        if access.auth:
            tags.append(Tag.parse(["T", "PrivateStorage"]))
        else:
            tags.append(Tag.parse(["T", "PrivateInbox"]))
    elif access.auth or access.writes or access.payment:
        tags.append(Tag.parse(["T", "PublicOutbox"]))
    else:
        tags.append(Tag.parse(["T", "PublicInbox"]))

add_nip11_tags

add_nip11_tags(
    tags: list[Tag],
    nip11_data: Nip11InfoData | None,
    rtt_logs: Nip66RttMultiPhaseLogs | None = None,
) -> None

Add NIP-11-derived tags: N, t, l, R, T.

Source code in src/bigbrotr/nips/event_builders.py
def add_nip11_tags(
    tags: list[Tag],
    nip11_data: Nip11InfoData | None,
    rtt_logs: Nip66RttMultiPhaseLogs | None = None,
) -> None:
    """Add NIP-11-derived tags: ``N``, ``t``, ``l``, ``R``, ``T``."""
    if nip11_data is None:
        return

    if nip11_data.supported_nips:
        tags.extend(Tag.parse(["N", str(nip)]) for nip in nip11_data.supported_nips)

    if nip11_data.tags:
        tags.extend(Tag.hashtag(topic) for topic in nip11_data.tags)

    add_language_tags(tags, nip11_data)
    add_requirement_and_type_tags(tags, nip11_data, rtt_logs)

build_relay_discovery

build_relay_discovery(
    relay_url: str,
    network_value: str,
    nip11_canonical_json: str = "",
    *,
    rtt_data: Nip66RttData | None = None,
    ssl_data: Nip66SslData | None = None,
    net_data: Nip66NetData | None = None,
    geo_data: Nip66GeoData | None = None,
    nip11_data: Nip11InfoData | None = None,
    rtt_logs: Nip66RttMultiPhaseLogs | None = None,
) -> EventBuilder

Build a Kind 30166 relay discovery event per NIP-66.

Source code in src/bigbrotr/nips/event_builders.py
def build_relay_discovery(  # noqa: PLR0913
    relay_url: str,
    network_value: str,
    nip11_canonical_json: str = "",
    *,
    rtt_data: Nip66RttData | None = None,
    ssl_data: Nip66SslData | None = None,
    net_data: Nip66NetData | None = None,
    geo_data: Nip66GeoData | None = None,
    nip11_data: Nip11InfoData | None = None,
    rtt_logs: Nip66RttMultiPhaseLogs | None = None,
) -> EventBuilder:
    """Build a Kind 30166 relay discovery event per NIP-66."""
    tags: list[Tag] = [
        Tag.identifier(relay_url),
        Tag.parse(["n", network_value]),
    ]

    add_rtt_tags(tags, rtt_data)
    add_ssl_tags(tags, ssl_data)
    add_net_tags(tags, net_data)
    add_geo_tags(tags, geo_data)
    add_nip11_tags(tags, nip11_data, rtt_logs)

    return EventBuilder(Kind(EventKind.RELAY_DISCOVERY), nip11_canonical_json).tags(tags)