Skip to content

ssl

ssl

NIP-66 SSL metadata container with certificate inspection capabilities.

Connects to a relay's TLS endpoint, extracts certificate details (subject, issuer, validity, SANs, fingerprint, cipher), and separately validates the certificate chain as part of NIP-66 monitoring. Clearnet relays only.

Note

The SSL test uses a two-connection methodology:

  1. Extraction -- connects with CERT_NONE to obtain the DER-encoded certificate regardless of chain validity, then parses it with the cryptography library. This allows inspecting self-signed or expired certificates.
  2. Validation -- connects with the default (validating) SSL context to verify the certificate chain against the system trust store.

Both connections are synchronous socket operations delegated to a thread pool via asyncio.to_thread to avoid blocking the event loop.

See Also

bigbrotr.nips.nip66.data.Nip66SslData: Data model for SSL certificate fields. bigbrotr.nips.nip66.logs.Nip66SslLogs: Log model for SSL inspection results. bigbrotr.utils.transport.InsecureWebSocketTransport: Related insecure transport used for WebSocket connections (distinct from the raw socket approach used here for certificate extraction).

Classes

CertificateExtractor

Extracts structured fields from SSL certificates.

Uses cryptography X.509 objects (from DER-encoded certificates) and raw DER bytes for fingerprint computation.

See Also

Nip66SslMetadata: Container that uses this extractor during certificate inspection. bigbrotr.nips.nip66.data.Nip66SslData: Data model populated by the extracted fields.

Functions
extract_fingerprint staticmethod
extract_fingerprint(cert_binary: bytes) -> str

Compute a SHA-256 fingerprint from the DER-encoded certificate.

Returns:

  • str

    Colon-separated hex string prefixed with SHA256:.

Source code in src/bigbrotr/nips/nip66/ssl.py
@staticmethod
def extract_fingerprint(cert_binary: bytes) -> str:
    """Compute a SHA-256 fingerprint from the DER-encoded certificate.

    Returns:
        Colon-separated hex string prefixed with ``SHA256:``.
    """
    fingerprint = hashlib.sha256(cert_binary).hexdigest().upper()
    formatted = ":".join(fingerprint[i : i + 2] for i in range(0, len(fingerprint), 2))
    return f"SHA256:{formatted}"
extract_all_from_x509 classmethod
extract_all_from_x509(cert: Certificate) -> dict[str, Any]

Extract all fields from a cryptography X.509 certificate object.

Source code in src/bigbrotr/nips/nip66/ssl.py
@classmethod
def extract_all_from_x509(cls, cert: x509.Certificate) -> dict[str, Any]:
    """Extract all fields from a ``cryptography`` X.509 certificate object."""
    result: dict[str, Any] = {}

    cns = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)
    if cns:
        result["ssl_subject_cn"] = cns[0].value

    orgs = cert.issuer.get_attributes_for_oid(NameOID.ORGANIZATION_NAME)
    if orgs:
        result["ssl_issuer"] = orgs[0].value
    issuer_cns = cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)
    if issuer_cns:
        result["ssl_issuer_cn"] = issuer_cns[0].value

    result["ssl_expires"] = int(calendar.timegm(cert.not_valid_after_utc.timetuple()))
    result["ssl_not_before"] = int(calendar.timegm(cert.not_valid_before_utc.timetuple()))

    try:
        san_ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
        dns_names = san_ext.value.get_values_for_type(x509.DNSName)
        if dns_names:
            result["ssl_san"] = dns_names
    except x509.ExtensionNotFound:
        pass

    result["ssl_serial"] = format(cert.serial_number, "X")
    result["ssl_version"] = cert.version.value

    return result

Nip66SslMetadata

Bases: BaseNipMetadata

Container for SSL/TLS certificate data and inspection logs.

Provides the execute() class method that performs certificate extraction and chain validation against a relay's TLS endpoint.

Warning

The certificate extraction phase uses CERT_NONE to read certificates from relays with invalid chains. This is intentional for monitoring purposes and does not affect the ssl_valid field, which is determined by a separate validating connection.

See Also

bigbrotr.nips.nip66.nip66.Nip66: Top-level model that orchestrates this alongside other tests. bigbrotr.models.metadata.MetadataType: The NIP66_SSL variant used when storing these results. bigbrotr.nips.nip66.rtt.Nip66RttMetadata: RTT test that also involves SSL connections.

Functions
execute async classmethod
execute(relay: Relay, timeout: float | None = None) -> Self

Inspect the SSL/TLS certificate of a clearnet relay.

Runs the synchronous SSL operations in a thread pool to avoid blocking the event loop.

Parameters:

  • relay (Relay) –

    Clearnet relay to inspect.

  • timeout (float | None, default: None ) –

    Socket timeout in seconds (default: 10.0).

Returns:

  • Self

    An Nip66SslMetadata instance with certificate data and logs.

Source code in src/bigbrotr/nips/nip66/ssl.py
@classmethod
async def execute(
    cls,
    relay: Relay,
    timeout: float | None = None,  # noqa: ASYNC109
) -> Self:
    """Inspect the SSL/TLS certificate of a clearnet relay.

    Runs the synchronous SSL operations in a thread pool to avoid
    blocking the event loop.

    Args:
        relay: Clearnet relay to inspect.
        timeout: Socket timeout in seconds (default: 10.0).

    Returns:
        An ``Nip66SslMetadata`` instance with certificate data and logs.
    """
    timeout = timeout if timeout is not None else DEFAULT_TIMEOUT
    logger.debug("ssl_testing relay=%s timeout_s=%s", relay.url, timeout)

    if relay.network != NetworkType.CLEARNET:
        return cls(
            data=Nip66SslData(),
            logs=Nip66SslLogs(
                success=False, reason=f"requires clearnet, got {relay.network.value}"
            ),
        )

    data: dict[str, Any] = {}
    logs: dict[str, Any] = {"success": False, "reason": None}
    port = relay.port or 443

    try:
        logger.debug("ssl_checking host=%s port=%s", relay.host, port)
        data = await asyncio.to_thread(cls._ssl, relay.host, port, timeout)
        if data:
            logs["success"] = True
            logger.debug("ssl_checked relay=%s valid=%s", relay.url, data.get("ssl_valid"))
        else:
            logs["reason"] = "no certificate data extracted"
            logger.debug("ssl_no_data relay=%s", relay.url)
    except OSError as e:
        logs["reason"] = str(e) or type(e).__name__
        logger.debug("ssl_error relay=%s error=%s", relay.url, str(e))

    return cls(
        data=Nip66SslData.model_validate(Nip66SslData.parse(data)),
        logs=Nip66SslLogs.model_validate(logs),
    )