#!/usr/bin/env python3 """Central SSL/TLS and HTTPS checker for Zabbix external checks.""" from __future__ import annotations import argparse import datetime as dt import hashlib import http.client import ipaddress import json import re import socket import ssl import sys import time from dataclasses import dataclass from email.utils import parsedate_to_datetime from pathlib import Path from typing import Any from urllib.parse import urljoin, urlparse try: # Optional richer certificate parsing. from cryptography import x509 from cryptography.hazmat.primitives.asymmetric import dsa, ec, ed25519, ed448, rsa from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat from cryptography.x509.oid import ExtensionOID, NameOID HAS_CRYPTOGRAPHY = True except Exception: # pragma: no cover - depends on local environment x509 = None # type: ignore[assignment] dsa = ec = ed25519 = ed448 = rsa = None # type: ignore[assignment] Encoding = PublicFormat = None # type: ignore[assignment] ExtensionOID = NameOID = None # type: ignore[assignment] HAS_CRYPTOGRAPHY = False VALID_PROFILES = {"relaxed", "serious", "internal", "external"} DEFAULT_HTTP_STATUS = [200, 301, 302, 401, 403] HOST_RE = re.compile(r"^[A-Za-z0-9_.:-]+$") class ConfigError(ValueError): """Raised when the target configuration is invalid.""" @dataclass(frozen=True) class Target: """Normalized target configuration.""" name: str host: str port: int owner: str profile: str expected_issuer_contains: str | None = None expected_hostname: str | None = None http_check: bool = False expected_http_status: tuple[int, ...] = tuple(DEFAULT_HTTP_STATUS) timeout: float = 10.0 @classmethod def from_dict(cls, raw: dict[str, Any], index: int) -> "Target": """Validate and normalize one target dictionary.""" required = ("name", "host", "port", "owner", "profile") missing = [field for field in required if field not in raw] if missing: raise ConfigError(f"target #{index}: missing required field(s): {', '.join(missing)}") name = _non_empty_string(raw["name"], f"target #{index}: name") host = _validate_host(raw["host"], f"target #{index}: host") port = _validate_port(raw["port"], f"target #{index}: port") owner = _non_empty_string(raw["owner"], f"target #{index}: owner") profile = _non_empty_string(raw["profile"], f"target #{index}: profile") if profile not in VALID_PROFILES: raise ConfigError( f"target #{index}: profile must be one of {', '.join(sorted(VALID_PROFILES))}" ) expected_hostname = raw.get("expected_hostname", host) expected_hostname = _validate_host(expected_hostname, f"target #{index}: expected_hostname") issuer = raw.get("expected_issuer_contains") if issuer is not None: issuer = _non_empty_string(issuer, f"target #{index}: expected_issuer_contains") http_check = raw.get("http_check", False) if not isinstance(http_check, bool): raise ConfigError(f"target #{index}: http_check must be boolean") timeout = raw.get("timeout", 10) if not isinstance(timeout, (int, float)) or isinstance(timeout, bool) or timeout <= 0: raise ConfigError(f"target #{index}: timeout must be a positive number") if timeout > 60: raise ConfigError(f"target #{index}: timeout must be 60 seconds or less") statuses = raw.get("expected_http_status", DEFAULT_HTTP_STATUS) if not isinstance(statuses, list) or not statuses: raise ConfigError(f"target #{index}: expected_http_status must be a non-empty list") normalized_statuses: list[int] = [] for status in statuses: if not isinstance(status, int) or isinstance(status, bool) or status < 100 or status > 599: raise ConfigError(f"target #{index}: expected_http_status contains invalid status") normalized_statuses.append(status) return cls( name=name, host=host, port=port, owner=owner, profile=profile, expected_issuer_contains=issuer, expected_hostname=expected_hostname, http_check=http_check, expected_http_status=tuple(normalized_statuses), timeout=float(timeout), ) def to_dict(self) -> dict[str, Any]: """Return a serializable target dictionary.""" return { "name": self.name, "host": self.host, "port": self.port, "owner": self.owner, "profile": self.profile, "expected_issuer_contains": self.expected_issuer_contains, "expected_hostname": self.expected_hostname, "http_check": self.http_check, "expected_http_status": list(self.expected_http_status), "timeout": self.timeout, } def _non_empty_string(value: Any, field: str) -> str: if not isinstance(value, str) or not value.strip(): raise ConfigError(f"{field} must be a non-empty string") return value.strip() def _validate_port(value: Any, field: str) -> int: if isinstance(value, bool): raise ConfigError(f"{field} must be an integer port") try: port = int(value) except (TypeError, ValueError) as exc: raise ConfigError(f"{field} must be an integer port") from exc if port < 1 or port > 65535: raise ConfigError(f"{field} must be between 1 and 65535") return port def _validate_host(value: Any, field: str) -> str: host = _non_empty_string(value, field) if "/" in host or "\\" in host or any(char.isspace() for char in host): raise ConfigError(f"{field} must be a hostname or IP address, not a URL") try: ipaddress.ip_address(host.strip("[]")) return host.strip("[]") except ValueError: pass if len(host) > 253 or not HOST_RE.match(host): raise ConfigError(f"{field} contains invalid characters") labels = host.rstrip(".").split(".") if any(not label or label.startswith("-") or label.endswith("-") for label in labels): raise ConfigError(f"{field} contains invalid DNS label(s)") return host.rstrip(".") def load_targets(path: Path) -> list[dict[str, Any]]: """Load, validate, normalize, and deduplicate target config.""" with path.open("r", encoding="utf-8") as handle: raw = json.load(handle) if not isinstance(raw, list): raise ConfigError("config root must be a JSON array") targets: list[dict[str, Any]] = [] seen: set[tuple[str, int]] = set() for index, entry in enumerate(raw, start=1): if not isinstance(entry, dict): raise ConfigError(f"target #{index}: entry must be an object") target = Target.from_dict(entry, index) key = (target.host.lower(), target.port) if key in seen: continue seen.add(key) targets.append(target.to_dict()) return targets def find_target(config: Path, host: str, port: int) -> Target: """Find one target by host and port in a validated config file.""" for raw in load_targets(config): if str(raw["host"]).lower() == host.lower() and int(raw["port"]) == port: return Target.from_dict(raw, 0) raise ConfigError(f"target {host}:{port} was not found in {config}") def default_result(target: Target) -> dict[str, Any]: """Create a stable JSON result skeleton.""" return { "target": target.host, "port": target.port, "name": target.name, "owner": target.owner, "profile": target.profile, "reachable": False, "error": "", "valid_now": None, "days_left": None, "not_before": None, "not_after": None, "subject_cn": None, "san_names": None, "issuer_cn": None, "issuer_org": None, "serial_number": None, "fingerprint_sha256": None, "hostname_match": None, "chain_valid": None, "self_signed": None, "not_yet_valid": None, "expired": None, "expected_issuer_match": None, "expected_hostname_match": None, "tls_version_negotiated": None, "tls12_supported": None, "tls13_supported": None, "tls10_supported": None, "tls11_supported": None, "key_type": None, "key_bits": None, "signature_algorithm": None, "http_check_enabled": target.http_check, "http_reachable": None, "http_status": None, "http_status_expected": None, "http_final_url": None, "http_redirects_to_https": None, "http_response_time_ms": None, "http_server_header": None, "http_hsts": None, "http_security_headers_score": None, "warnings": [], } def utc_now() -> dt.datetime: """Return timezone-aware UTC now.""" return dt.datetime.now(dt.timezone.utc) def iso_z(value: dt.datetime | None) -> str | None: """Format a datetime as RFC3339 UTC with Z suffix.""" if value is None: return None if value.tzinfo is None: value = value.replace(tzinfo=dt.timezone.utc) return value.astimezone(dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") def parse_cert_time(value: str | None) -> dt.datetime | None: """Parse OpenSSL ASN.1 time strings from getpeercert().""" if not value: return None parsed = parsedate_to_datetime(value) if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=dt.timezone.utc) return parsed.astimezone(dt.timezone.utc) def colon_hex(data: bytes) -> str: """Return colon-separated uppercase hex.""" return ":".join(f"{byte:02X}" for byte in data) def get_name_attr(name_parts: Any, key: str) -> str | None: """Extract an attribute from ssl.getpeercert() subject/issuer tuples.""" for rdn in name_parts or (): for attr_name, value in rdn: if attr_name == key: return str(value) return None def parse_stdlib_cert(cert: dict[str, Any]) -> dict[str, Any]: """Parse fields available from ssl.getpeercert().""" san_names = [ str(value) for kind, value in cert.get("subjectAltName", []) if str(kind).lower() in {"dns", "ip address"} ] return { "not_before_dt": parse_cert_time(cert.get("notBefore")), "not_after_dt": parse_cert_time(cert.get("notAfter")), "subject_cn": get_name_attr(cert.get("subject"), "commonName"), "issuer_cn": get_name_attr(cert.get("issuer"), "commonName"), "issuer_org": get_name_attr(cert.get("issuer"), "organizationName"), "san_names": san_names, "serial_number": cert.get("serialNumber"), } def parse_crypto_cert(der_cert: bytes) -> dict[str, Any]: """Parse certificate fields with optional cryptography support.""" if not HAS_CRYPTOGRAPHY or x509 is None: return {} cert = x509.load_der_x509_certificate(der_cert) def first_name_attr(name: Any, oid: Any) -> str | None: attrs = name.get_attributes_for_oid(oid) return str(attrs[0].value) if attrs else None san_names: list[str] = [] try: san_ext = cert.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value san_names.extend(str(name) for name in san_ext.get_values_for_type(x509.DNSName)) san_names.extend(str(name) for name in san_ext.get_values_for_type(x509.IPAddress)) except x509.ExtensionNotFound: pass public_key = cert.public_key() key_type = type(public_key).__name__ key_bits: int | None = None if rsa is not None and isinstance(public_key, rsa.RSAPublicKey): key_type = "RSA" key_bits = public_key.key_size elif dsa is not None and isinstance(public_key, dsa.DSAPublicKey): key_type = "DSA" key_bits = public_key.key_size elif ec is not None and isinstance(public_key, ec.EllipticCurvePublicKey): key_type = "EC" key_bits = public_key.key_size elif ed25519 is not None and isinstance(public_key, ed25519.Ed25519PublicKey): key_type = "Ed25519" elif ed448 is not None and isinstance(public_key, ed448.Ed448PublicKey): key_type = "Ed448" return { "not_before_dt": cert.not_valid_before_utc, "not_after_dt": cert.not_valid_after_utc, "subject_cn": first_name_attr(cert.subject, NameOID.COMMON_NAME), "issuer_cn": first_name_attr(cert.issuer, NameOID.COMMON_NAME), "issuer_org": first_name_attr(cert.issuer, NameOID.ORGANIZATION_NAME), "san_names": san_names, "serial_number": format(cert.serial_number, "X"), "key_type": key_type, "key_bits": key_bits, "signature_algorithm": ( cert.signature_hash_algorithm.name if cert.signature_hash_algorithm else cert.signature_algorithm_oid._name ), "self_signed": cert.issuer == cert.subject and _self_signature_looks_valid(cert), } def _self_signature_looks_valid(cert: Any) -> bool: """Best-effort self-signed detection without failing the whole check.""" try: public_key = cert.public_key() public_bytes = public_key.public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo) return cert.issuer == cert.subject and bool(public_bytes) except Exception: return cert.issuer == cert.subject def connect_tls( host: str, port: int, expected_hostname: str, timeout: float, verify: bool = True, min_version: ssl.TLSVersion | None = None, max_version: ssl.TLSVersion | None = None, ) -> tuple[ssl.SSLSocket, dict[str, Any], bytes]: """Open a TLS connection and return socket, parsed cert, and DER leaf cert.""" context = ssl.create_default_context() if verify else ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.check_hostname = verify context.verify_mode = ssl.CERT_REQUIRED if verify else ssl.CERT_NONE if min_version is not None: context.minimum_version = min_version if max_version is not None: context.maximum_version = max_version raw_sock = socket.create_connection((host, port), timeout=timeout) tls_sock = context.wrap_socket(raw_sock, server_hostname=expected_hostname) tls_sock.settimeout(timeout) cert = tls_sock.getpeercert() der_cert = tls_sock.getpeercert(binary_form=True) if der_cert is None: raise ssl.SSLError("server did not provide a certificate") return tls_sock, cert, der_cert def check_tls(target: Target, result: dict[str, Any]) -> None: """Populate TLS and certificate result fields.""" try: sock, cert, der_cert = connect_tls( target.host, target.port, target.expected_hostname or target.host, target.timeout, verify=True ) negotiated_version = sock.version() sock.close() result["reachable"] = True result["chain_valid"] = True result["hostname_match"] = True result["tls_version_negotiated"] = negotiated_version except ssl.SSLCertVerificationError as exc: result["chain_valid"] = False result["hostname_match"] = "hostname" not in str(exc).lower() result["error"] = f"TLS verification failed: {exc.verify_message or exc}" try: sock, cert, der_cert = connect_tls( target.host, target.port, target.expected_hostname or target.host, target.timeout, verify=False ) result["reachable"] = True result["tls_version_negotiated"] = sock.version() sock.close() except Exception: return except (TimeoutError, OSError, ssl.SSLError) as exc: result["reachable"] = False result["error"] = f"TLS connection failed: {type(exc).__name__}: {exc}" return std_fields = parse_stdlib_cert(cert) crypto_fields = parse_crypto_cert(der_cert) fields = {**std_fields, **{key: value for key, value in crypto_fields.items() if value not in (None, [], "")}} not_before = fields.pop("not_before_dt", None) not_after = fields.pop("not_after_dt", None) now = utc_now() result.update(fields) result["fingerprint_sha256"] = colon_hex(hashlib.sha256(der_cert).digest()) result["not_before"] = iso_z(not_before) result["not_after"] = iso_z(not_after) result["not_yet_valid"] = bool(not_before and now < not_before) result["expired"] = bool(not_after and now > not_after) result["valid_now"] = bool(not_before and not_after and not_before <= now <= not_after and result["chain_valid"]) result["days_left"] = max(0, int((not_after - now).total_seconds() // 86400)) if not_after else None if result["self_signed"] is None: result["self_signed"] = bool( result["subject_cn"] and result["issuer_cn"] and result["subject_cn"] == result["issuer_cn"] ) result["hostname_match"] = certificate_matches_hostname(cert, target.expected_hostname or target.host) result["expected_hostname_match"] = certificate_matches_hostname(cert, target.expected_hostname or target.host) issuer_text = " ".join( str(part) for part in (result.get("issuer_cn"), result.get("issuer_org")) if part is not None ) result["expected_issuer_match"] = ( target.expected_issuer_contains.lower() in issuer_text.lower() if target.expected_issuer_contains else None ) if not HAS_CRYPTOGRAPHY: result["warnings"].append("cryptography not installed; key/signature parsing is limited") result["tls10_supported"] = probe_tls_version(target, ssl.TLSVersion.TLSv1, "TLS 1.0", result["warnings"]) result["tls11_supported"] = probe_tls_version(target, ssl.TLSVersion.TLSv1_1, "TLS 1.1", result["warnings"]) result["tls12_supported"] = probe_tls_version(target, ssl.TLSVersion.TLSv1_2, "TLS 1.2", result["warnings"]) if hasattr(ssl.TLSVersion, "TLSv1_3"): result["tls13_supported"] = probe_tls_version(target, ssl.TLSVersion.TLSv1_3, "TLS 1.3", result["warnings"]) def certificate_matches_hostname(cert: dict[str, Any], hostname: str) -> bool | None: """Return whether a peer certificate matches hostname.""" try: ssl.match_hostname(cert, hostname) return True except ssl.CertificateError: return False except Exception: return None def probe_tls_version( target: Target, version: ssl.TLSVersion, label: str, warnings: list[str] ) -> bool | None: """Probe support for one exact TLS protocol version.""" try: sock, _, _ = connect_tls( target.host, target.port, target.expected_hostname or target.host, min(target.timeout, 5.0), verify=False, min_version=version, max_version=version, ) sock.close() return True except ssl.SSLError as exc: message = str(exc).lower() if ( "unsupported protocol" in message or "no protocols available" in message or "no ciphers available" in message ): warnings.append(f"{label} could not be tested due to local OpenSSL policy") return None return False except (TimeoutError, OSError): return False except ValueError as exc: warnings.append(f"{label} could not be tested: {exc}") return None def check_http(target: Target, result: dict[str, Any]) -> None: """Perform an HTTPS request with limited redirect following.""" if not target.http_check: return start = time.monotonic() url = f"https://{target.host}:{target.port}/" if target.port != 443 else f"https://{target.host}/" redirects_to_https = True try: for _ in range(6): parsed = urlparse(url) if parsed.scheme != "https": redirects_to_https = False if parsed.scheme not in {"https", "http"}: raise ValueError(f"unsupported redirect scheme: {parsed.scheme}") connection_cls = http.client.HTTPSConnection if parsed.scheme == "https" else http.client.HTTPConnection port = parsed.port or (443 if parsed.scheme == "https" else 80) path = parsed.path or "/" if parsed.query: path += f"?{parsed.query}" connection = connection_cls(parsed.hostname, port, timeout=target.timeout) connection.request("GET", path, headers={"User-Agent": "zabbix-ssl-checker/1.0"}) response = connection.getresponse() headers = {key.lower(): value for key, value in response.getheaders()} response.read(1024) status = response.status location = headers.get("location") connection.close() if status in {301, 302, 303, 307, 308} and location: url = urljoin(url, location) continue elapsed_ms = int((time.monotonic() - start) * 1000) result["http_reachable"] = True result["http_status"] = status result["http_status_expected"] = status in target.expected_http_status result["http_final_url"] = url result["http_redirects_to_https"] = redirects_to_https and urlparse(url).scheme == "https" result["http_response_time_ms"] = elapsed_ms result["http_server_header"] = headers.get("server") result["http_hsts"] = "strict-transport-security" in headers result["http_security_headers_score"] = sum( 1 for header in ( "strict-transport-security", "x-content-type-options", "x-frame-options", "content-security-policy", "referrer-policy", ) if header in headers ) return raise ValueError("too many redirects") except Exception as exc: result["http_reachable"] = False result["http_status_expected"] = False result["http_response_time_ms"] = int((time.monotonic() - start) * 1000) result["warnings"].append(f"HTTP check failed: {type(exc).__name__}: {exc}") def run_check(target: Target) -> dict[str, Any]: """Run TLS and optional HTTP checks for one target.""" result = default_result(target) check_tls(target, result) check_http(target, result) return result def parse_args(argv: list[str] | None = None) -> argparse.Namespace: """Parse command-line arguments.""" parser = argparse.ArgumentParser(description="Check one SSL/TLS target for Zabbix.") parser.add_argument("--config", help="Path to ssl_targets.json") parser.add_argument("--host", required=True, help="Target hostname or IP") parser.add_argument("--port", required=True, type=int, help="Target port") parser.add_argument("--name", help="Target display name when no config is used") parser.add_argument("--owner", default="unknown", help="Target owner when no config is used") parser.add_argument("--profile", default="relaxed", choices=sorted(VALID_PROFILES)) parser.add_argument("--expected-issuer-contains") parser.add_argument("--expected-hostname") parser.add_argument("--http-check", action="store_true") parser.add_argument("--expected-http-status", nargs="*", type=int, default=DEFAULT_HTTP_STATUS) parser.add_argument("--timeout", type=float, default=10) return parser.parse_args(argv) def target_from_args(args: argparse.Namespace) -> Target: """Build target config from CLI arguments or a config file.""" port = _validate_port(args.port, "--port") host = _validate_host(args.host, "--host") if args.config: return find_target(Path(args.config), host, port) raw = { "name": args.name or host, "host": host, "port": port, "owner": args.owner, "profile": args.profile, "expected_issuer_contains": args.expected_issuer_contains, "expected_hostname": args.expected_hostname or host, "http_check": args.http_check, "expected_http_status": args.expected_http_status, "timeout": args.timeout, } return Target.from_dict(raw, 0) def main(argv: list[str] | None = None) -> int: """Run the checker and always print JSON for valid arguments/config.""" try: args = parse_args(argv) target = target_from_args(args) except (ConfigError, OSError, json.JSONDecodeError) as exc: print(f"Configuration/argument error: {exc}", file=sys.stderr) return 2 result = run_check(target) print(json.dumps(result, separators=(",", ":"), ensure_ascii=False)) return 0 if __name__ == "__main__": sys.exit(main())