Add Zabbix SSL checker

This commit is contained in:
2026-05-21 19:20:49 +02:00
commit 76f1f36d97
9 changed files with 1829 additions and 0 deletions
+641
View File
@@ -0,0 +1,641 @@
#!/usr/bin/env python3
"""Central SSL/TLS and HTTPS checker for Zabbix external checks."""
from __future__ import annotations
import argparse
import datetime as dt
import hashlib
import http.client
import ipaddress
import json
import re
import socket
import ssl
import sys
import time
from dataclasses import dataclass
from email.utils import parsedate_to_datetime
from pathlib import Path
from typing import Any
from urllib.parse import urljoin, urlparse
try: # Optional richer certificate parsing.
from cryptography import x509
from cryptography.hazmat.primitives.asymmetric import dsa, ec, ed25519, ed448, rsa
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from cryptography.x509.oid import ExtensionOID, NameOID
HAS_CRYPTOGRAPHY = True
except Exception: # pragma: no cover - depends on local environment
x509 = None # type: ignore[assignment]
dsa = ec = ed25519 = ed448 = rsa = None # type: ignore[assignment]
Encoding = PublicFormat = None # type: ignore[assignment]
ExtensionOID = NameOID = None # type: ignore[assignment]
HAS_CRYPTOGRAPHY = False
VALID_PROFILES = {"relaxed", "serious", "internal", "external"}
DEFAULT_HTTP_STATUS = [200, 301, 302, 401, 403]
HOST_RE = re.compile(r"^[A-Za-z0-9_.:-]+$")
class ConfigError(ValueError):
"""Raised when the target configuration is invalid."""
@dataclass(frozen=True)
class Target:
"""Normalized target configuration."""
name: str
host: str
port: int
owner: str
profile: str
expected_issuer_contains: str | None = None
expected_hostname: str | None = None
http_check: bool = False
expected_http_status: tuple[int, ...] = tuple(DEFAULT_HTTP_STATUS)
timeout: float = 10.0
@classmethod
def from_dict(cls, raw: dict[str, Any], index: int) -> "Target":
"""Validate and normalize one target dictionary."""
required = ("name", "host", "port", "owner", "profile")
missing = [field for field in required if field not in raw]
if missing:
raise ConfigError(f"target #{index}: missing required field(s): {', '.join(missing)}")
name = _non_empty_string(raw["name"], f"target #{index}: name")
host = _validate_host(raw["host"], f"target #{index}: host")
port = _validate_port(raw["port"], f"target #{index}: port")
owner = _non_empty_string(raw["owner"], f"target #{index}: owner")
profile = _non_empty_string(raw["profile"], f"target #{index}: profile")
if profile not in VALID_PROFILES:
raise ConfigError(
f"target #{index}: profile must be one of {', '.join(sorted(VALID_PROFILES))}"
)
expected_hostname = raw.get("expected_hostname", host)
expected_hostname = _validate_host(expected_hostname, f"target #{index}: expected_hostname")
issuer = raw.get("expected_issuer_contains")
if issuer is not None:
issuer = _non_empty_string(issuer, f"target #{index}: expected_issuer_contains")
http_check = raw.get("http_check", False)
if not isinstance(http_check, bool):
raise ConfigError(f"target #{index}: http_check must be boolean")
timeout = raw.get("timeout", 10)
if not isinstance(timeout, (int, float)) or isinstance(timeout, bool) or timeout <= 0:
raise ConfigError(f"target #{index}: timeout must be a positive number")
if timeout > 60:
raise ConfigError(f"target #{index}: timeout must be 60 seconds or less")
statuses = raw.get("expected_http_status", DEFAULT_HTTP_STATUS)
if not isinstance(statuses, list) or not statuses:
raise ConfigError(f"target #{index}: expected_http_status must be a non-empty list")
normalized_statuses: list[int] = []
for status in statuses:
if not isinstance(status, int) or isinstance(status, bool) or status < 100 or status > 599:
raise ConfigError(f"target #{index}: expected_http_status contains invalid status")
normalized_statuses.append(status)
return cls(
name=name,
host=host,
port=port,
owner=owner,
profile=profile,
expected_issuer_contains=issuer,
expected_hostname=expected_hostname,
http_check=http_check,
expected_http_status=tuple(normalized_statuses),
timeout=float(timeout),
)
def to_dict(self) -> dict[str, Any]:
"""Return a serializable target dictionary."""
return {
"name": self.name,
"host": self.host,
"port": self.port,
"owner": self.owner,
"profile": self.profile,
"expected_issuer_contains": self.expected_issuer_contains,
"expected_hostname": self.expected_hostname,
"http_check": self.http_check,
"expected_http_status": list(self.expected_http_status),
"timeout": self.timeout,
}
def _non_empty_string(value: Any, field: str) -> str:
if not isinstance(value, str) or not value.strip():
raise ConfigError(f"{field} must be a non-empty string")
return value.strip()
def _validate_port(value: Any, field: str) -> int:
if isinstance(value, bool):
raise ConfigError(f"{field} must be an integer port")
try:
port = int(value)
except (TypeError, ValueError) as exc:
raise ConfigError(f"{field} must be an integer port") from exc
if port < 1 or port > 65535:
raise ConfigError(f"{field} must be between 1 and 65535")
return port
def _validate_host(value: Any, field: str) -> str:
host = _non_empty_string(value, field)
if "/" in host or "\\" in host or any(char.isspace() for char in host):
raise ConfigError(f"{field} must be a hostname or IP address, not a URL")
try:
ipaddress.ip_address(host.strip("[]"))
return host.strip("[]")
except ValueError:
pass
if len(host) > 253 or not HOST_RE.match(host):
raise ConfigError(f"{field} contains invalid characters")
labels = host.rstrip(".").split(".")
if any(not label or label.startswith("-") or label.endswith("-") for label in labels):
raise ConfigError(f"{field} contains invalid DNS label(s)")
return host.rstrip(".")
def load_targets(path: Path) -> list[dict[str, Any]]:
"""Load, validate, normalize, and deduplicate target config."""
with path.open("r", encoding="utf-8") as handle:
raw = json.load(handle)
if not isinstance(raw, list):
raise ConfigError("config root must be a JSON array")
targets: list[dict[str, Any]] = []
seen: set[tuple[str, int]] = set()
for index, entry in enumerate(raw, start=1):
if not isinstance(entry, dict):
raise ConfigError(f"target #{index}: entry must be an object")
target = Target.from_dict(entry, index)
key = (target.host.lower(), target.port)
if key in seen:
continue
seen.add(key)
targets.append(target.to_dict())
return targets
def find_target(config: Path, host: str, port: int) -> Target:
"""Find one target by host and port in a validated config file."""
for raw in load_targets(config):
if str(raw["host"]).lower() == host.lower() and int(raw["port"]) == port:
return Target.from_dict(raw, 0)
raise ConfigError(f"target {host}:{port} was not found in {config}")
def default_result(target: Target) -> dict[str, Any]:
"""Create a stable JSON result skeleton."""
return {
"target": target.host,
"port": target.port,
"name": target.name,
"owner": target.owner,
"profile": target.profile,
"reachable": False,
"error": "",
"valid_now": None,
"days_left": None,
"not_before": None,
"not_after": None,
"subject_cn": None,
"san_names": None,
"issuer_cn": None,
"issuer_org": None,
"serial_number": None,
"fingerprint_sha256": None,
"hostname_match": None,
"chain_valid": None,
"self_signed": None,
"not_yet_valid": None,
"expired": None,
"expected_issuer_match": None,
"expected_hostname_match": None,
"tls_version_negotiated": None,
"tls12_supported": None,
"tls13_supported": None,
"tls10_supported": None,
"tls11_supported": None,
"key_type": None,
"key_bits": None,
"signature_algorithm": None,
"http_check_enabled": target.http_check,
"http_reachable": None,
"http_status": None,
"http_status_expected": None,
"http_final_url": None,
"http_redirects_to_https": None,
"http_response_time_ms": None,
"http_server_header": None,
"http_hsts": None,
"http_security_headers_score": None,
"warnings": [],
}
def utc_now() -> dt.datetime:
"""Return timezone-aware UTC now."""
return dt.datetime.now(dt.timezone.utc)
def iso_z(value: dt.datetime | None) -> str | None:
"""Format a datetime as RFC3339 UTC with Z suffix."""
if value is None:
return None
if value.tzinfo is None:
value = value.replace(tzinfo=dt.timezone.utc)
return value.astimezone(dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def parse_cert_time(value: str | None) -> dt.datetime | None:
"""Parse OpenSSL ASN.1 time strings from getpeercert()."""
if not value:
return None
parsed = parsedate_to_datetime(value)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=dt.timezone.utc)
return parsed.astimezone(dt.timezone.utc)
def colon_hex(data: bytes) -> str:
"""Return colon-separated uppercase hex."""
return ":".join(f"{byte:02X}" for byte in data)
def get_name_attr(name_parts: Any, key: str) -> str | None:
"""Extract an attribute from ssl.getpeercert() subject/issuer tuples."""
for rdn in name_parts or ():
for attr_name, value in rdn:
if attr_name == key:
return str(value)
return None
def parse_stdlib_cert(cert: dict[str, Any]) -> dict[str, Any]:
"""Parse fields available from ssl.getpeercert()."""
san_names = [
str(value)
for kind, value in cert.get("subjectAltName", [])
if str(kind).lower() in {"dns", "ip address"}
]
return {
"not_before_dt": parse_cert_time(cert.get("notBefore")),
"not_after_dt": parse_cert_time(cert.get("notAfter")),
"subject_cn": get_name_attr(cert.get("subject"), "commonName"),
"issuer_cn": get_name_attr(cert.get("issuer"), "commonName"),
"issuer_org": get_name_attr(cert.get("issuer"), "organizationName"),
"san_names": san_names,
"serial_number": cert.get("serialNumber"),
}
def parse_crypto_cert(der_cert: bytes) -> dict[str, Any]:
"""Parse certificate fields with optional cryptography support."""
if not HAS_CRYPTOGRAPHY or x509 is None:
return {}
cert = x509.load_der_x509_certificate(der_cert)
def first_name_attr(name: Any, oid: Any) -> str | None:
attrs = name.get_attributes_for_oid(oid)
return str(attrs[0].value) if attrs else None
san_names: list[str] = []
try:
san_ext = cert.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value
san_names.extend(str(name) for name in san_ext.get_values_for_type(x509.DNSName))
san_names.extend(str(name) for name in san_ext.get_values_for_type(x509.IPAddress))
except x509.ExtensionNotFound:
pass
public_key = cert.public_key()
key_type = type(public_key).__name__
key_bits: int | None = None
if rsa is not None and isinstance(public_key, rsa.RSAPublicKey):
key_type = "RSA"
key_bits = public_key.key_size
elif dsa is not None and isinstance(public_key, dsa.DSAPublicKey):
key_type = "DSA"
key_bits = public_key.key_size
elif ec is not None and isinstance(public_key, ec.EllipticCurvePublicKey):
key_type = "EC"
key_bits = public_key.key_size
elif ed25519 is not None and isinstance(public_key, ed25519.Ed25519PublicKey):
key_type = "Ed25519"
elif ed448 is not None and isinstance(public_key, ed448.Ed448PublicKey):
key_type = "Ed448"
return {
"not_before_dt": cert.not_valid_before_utc,
"not_after_dt": cert.not_valid_after_utc,
"subject_cn": first_name_attr(cert.subject, NameOID.COMMON_NAME),
"issuer_cn": first_name_attr(cert.issuer, NameOID.COMMON_NAME),
"issuer_org": first_name_attr(cert.issuer, NameOID.ORGANIZATION_NAME),
"san_names": san_names,
"serial_number": format(cert.serial_number, "X"),
"key_type": key_type,
"key_bits": key_bits,
"signature_algorithm": (
cert.signature_hash_algorithm.name if cert.signature_hash_algorithm else cert.signature_algorithm_oid._name
),
"self_signed": cert.issuer == cert.subject and _self_signature_looks_valid(cert),
}
def _self_signature_looks_valid(cert: Any) -> bool:
"""Best-effort self-signed detection without failing the whole check."""
try:
public_key = cert.public_key()
public_bytes = public_key.public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo)
return cert.issuer == cert.subject and bool(public_bytes)
except Exception:
return cert.issuer == cert.subject
def connect_tls(
host: str,
port: int,
expected_hostname: str,
timeout: float,
verify: bool = True,
min_version: ssl.TLSVersion | None = None,
max_version: ssl.TLSVersion | None = None,
) -> tuple[ssl.SSLSocket, dict[str, Any], bytes]:
"""Open a TLS connection and return socket, parsed cert, and DER leaf cert."""
context = ssl.create_default_context() if verify else ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = verify
context.verify_mode = ssl.CERT_REQUIRED if verify else ssl.CERT_NONE
if min_version is not None:
context.minimum_version = min_version
if max_version is not None:
context.maximum_version = max_version
raw_sock = socket.create_connection((host, port), timeout=timeout)
tls_sock = context.wrap_socket(raw_sock, server_hostname=expected_hostname)
tls_sock.settimeout(timeout)
cert = tls_sock.getpeercert()
der_cert = tls_sock.getpeercert(binary_form=True)
if der_cert is None:
raise ssl.SSLError("server did not provide a certificate")
return tls_sock, cert, der_cert
def check_tls(target: Target, result: dict[str, Any]) -> None:
"""Populate TLS and certificate result fields."""
try:
sock, cert, der_cert = connect_tls(
target.host, target.port, target.expected_hostname or target.host, target.timeout, verify=True
)
negotiated_version = sock.version()
sock.close()
result["reachable"] = True
result["chain_valid"] = True
result["hostname_match"] = True
result["tls_version_negotiated"] = negotiated_version
except ssl.SSLCertVerificationError as exc:
result["chain_valid"] = False
result["hostname_match"] = "hostname" not in str(exc).lower()
result["error"] = f"TLS verification failed: {exc.verify_message or exc}"
try:
sock, cert, der_cert = connect_tls(
target.host, target.port, target.expected_hostname or target.host, target.timeout, verify=False
)
result["reachable"] = True
result["tls_version_negotiated"] = sock.version()
sock.close()
except Exception:
return
except (TimeoutError, OSError, ssl.SSLError) as exc:
result["reachable"] = False
result["error"] = f"TLS connection failed: {type(exc).__name__}: {exc}"
return
std_fields = parse_stdlib_cert(cert)
crypto_fields = parse_crypto_cert(der_cert)
fields = {**std_fields, **{key: value for key, value in crypto_fields.items() if value not in (None, [], "")}}
not_before = fields.pop("not_before_dt", None)
not_after = fields.pop("not_after_dt", None)
now = utc_now()
result.update(fields)
result["fingerprint_sha256"] = colon_hex(hashlib.sha256(der_cert).digest())
result["not_before"] = iso_z(not_before)
result["not_after"] = iso_z(not_after)
result["not_yet_valid"] = bool(not_before and now < not_before)
result["expired"] = bool(not_after and now > not_after)
result["valid_now"] = bool(not_before and not_after and not_before <= now <= not_after and result["chain_valid"])
result["days_left"] = max(0, int((not_after - now).total_seconds() // 86400)) if not_after else None
if result["self_signed"] is None:
result["self_signed"] = bool(
result["subject_cn"] and result["issuer_cn"] and result["subject_cn"] == result["issuer_cn"]
)
result["hostname_match"] = certificate_matches_hostname(cert, target.expected_hostname or target.host)
result["expected_hostname_match"] = certificate_matches_hostname(cert, target.expected_hostname or target.host)
issuer_text = " ".join(
str(part)
for part in (result.get("issuer_cn"), result.get("issuer_org"))
if part is not None
)
result["expected_issuer_match"] = (
target.expected_issuer_contains.lower() in issuer_text.lower()
if target.expected_issuer_contains
else None
)
if not HAS_CRYPTOGRAPHY:
result["warnings"].append("cryptography not installed; key/signature parsing is limited")
result["tls10_supported"] = probe_tls_version(target, ssl.TLSVersion.TLSv1, "TLS 1.0", result["warnings"])
result["tls11_supported"] = probe_tls_version(target, ssl.TLSVersion.TLSv1_1, "TLS 1.1", result["warnings"])
result["tls12_supported"] = probe_tls_version(target, ssl.TLSVersion.TLSv1_2, "TLS 1.2", result["warnings"])
if hasattr(ssl.TLSVersion, "TLSv1_3"):
result["tls13_supported"] = probe_tls_version(target, ssl.TLSVersion.TLSv1_3, "TLS 1.3", result["warnings"])
def certificate_matches_hostname(cert: dict[str, Any], hostname: str) -> bool | None:
"""Return whether a peer certificate matches hostname."""
try:
ssl.match_hostname(cert, hostname)
return True
except ssl.CertificateError:
return False
except Exception:
return None
def probe_tls_version(
target: Target, version: ssl.TLSVersion, label: str, warnings: list[str]
) -> bool | None:
"""Probe support for one exact TLS protocol version."""
try:
sock, _, _ = connect_tls(
target.host,
target.port,
target.expected_hostname or target.host,
min(target.timeout, 5.0),
verify=False,
min_version=version,
max_version=version,
)
sock.close()
return True
except ssl.SSLError as exc:
message = str(exc).lower()
if (
"unsupported protocol" in message
or "no protocols available" in message
or "no ciphers available" in message
):
warnings.append(f"{label} could not be tested due to local OpenSSL policy")
return None
return False
except (TimeoutError, OSError):
return False
except ValueError as exc:
warnings.append(f"{label} could not be tested: {exc}")
return None
def check_http(target: Target, result: dict[str, Any]) -> None:
"""Perform an HTTPS request with limited redirect following."""
if not target.http_check:
return
start = time.monotonic()
url = f"https://{target.host}:{target.port}/" if target.port != 443 else f"https://{target.host}/"
redirects_to_https = True
try:
for _ in range(6):
parsed = urlparse(url)
if parsed.scheme != "https":
redirects_to_https = False
if parsed.scheme not in {"https", "http"}:
raise ValueError(f"unsupported redirect scheme: {parsed.scheme}")
connection_cls = http.client.HTTPSConnection if parsed.scheme == "https" else http.client.HTTPConnection
port = parsed.port or (443 if parsed.scheme == "https" else 80)
path = parsed.path or "/"
if parsed.query:
path += f"?{parsed.query}"
connection = connection_cls(parsed.hostname, port, timeout=target.timeout)
connection.request("GET", path, headers={"User-Agent": "zabbix-ssl-checker/1.0"})
response = connection.getresponse()
headers = {key.lower(): value for key, value in response.getheaders()}
response.read(1024)
status = response.status
location = headers.get("location")
connection.close()
if status in {301, 302, 303, 307, 308} and location:
url = urljoin(url, location)
continue
elapsed_ms = int((time.monotonic() - start) * 1000)
result["http_reachable"] = True
result["http_status"] = status
result["http_status_expected"] = status in target.expected_http_status
result["http_final_url"] = url
result["http_redirects_to_https"] = redirects_to_https and urlparse(url).scheme == "https"
result["http_response_time_ms"] = elapsed_ms
result["http_server_header"] = headers.get("server")
result["http_hsts"] = "strict-transport-security" in headers
result["http_security_headers_score"] = sum(
1
for header in (
"strict-transport-security",
"x-content-type-options",
"x-frame-options",
"content-security-policy",
"referrer-policy",
)
if header in headers
)
return
raise ValueError("too many redirects")
except Exception as exc:
result["http_reachable"] = False
result["http_status_expected"] = False
result["http_response_time_ms"] = int((time.monotonic() - start) * 1000)
result["warnings"].append(f"HTTP check failed: {type(exc).__name__}: {exc}")
def run_check(target: Target) -> dict[str, Any]:
"""Run TLS and optional HTTP checks for one target."""
result = default_result(target)
check_tls(target, result)
check_http(target, result)
return result
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(description="Check one SSL/TLS target for Zabbix.")
parser.add_argument("--config", help="Path to ssl_targets.json")
parser.add_argument("--host", required=True, help="Target hostname or IP")
parser.add_argument("--port", required=True, type=int, help="Target port")
parser.add_argument("--name", help="Target display name when no config is used")
parser.add_argument("--owner", default="unknown", help="Target owner when no config is used")
parser.add_argument("--profile", default="relaxed", choices=sorted(VALID_PROFILES))
parser.add_argument("--expected-issuer-contains")
parser.add_argument("--expected-hostname")
parser.add_argument("--http-check", action="store_true")
parser.add_argument("--expected-http-status", nargs="*", type=int, default=DEFAULT_HTTP_STATUS)
parser.add_argument("--timeout", type=float, default=10)
return parser.parse_args(argv)
def target_from_args(args: argparse.Namespace) -> Target:
"""Build target config from CLI arguments or a config file."""
port = _validate_port(args.port, "--port")
host = _validate_host(args.host, "--host")
if args.config:
return find_target(Path(args.config), host, port)
raw = {
"name": args.name or host,
"host": host,
"port": port,
"owner": args.owner,
"profile": args.profile,
"expected_issuer_contains": args.expected_issuer_contains,
"expected_hostname": args.expected_hostname or host,
"http_check": args.http_check,
"expected_http_status": args.expected_http_status,
"timeout": args.timeout,
}
return Target.from_dict(raw, 0)
def main(argv: list[str] | None = None) -> int:
"""Run the checker and always print JSON for valid arguments/config."""
try:
args = parse_args(argv)
target = target_from_args(args)
except (ConfigError, OSError, json.JSONDecodeError) as exc:
print(f"Configuration/argument error: {exc}", file=sys.stderr)
return 2
result = run_check(target)
print(json.dumps(result, separators=(",", ":"), ensure_ascii=False))
return 0
if __name__ == "__main__":
sys.exit(main())