642 lines
24 KiB
Python
Executable File
642 lines
24 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Central SSL/TLS and HTTPS checker for Zabbix external checks."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import hashlib
|
|
import http.client
|
|
import ipaddress
|
|
import json
|
|
import re
|
|
import socket
|
|
import ssl
|
|
import sys
|
|
import time
|
|
from dataclasses import dataclass
|
|
from email.utils import parsedate_to_datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from urllib.parse import urljoin, urlparse
|
|
|
|
try: # Optional richer certificate parsing.
|
|
from cryptography import x509
|
|
from cryptography.hazmat.primitives.asymmetric import dsa, ec, ed25519, ed448, rsa
|
|
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
|
|
from cryptography.x509.oid import ExtensionOID, NameOID
|
|
|
|
HAS_CRYPTOGRAPHY = True
|
|
except Exception: # pragma: no cover - depends on local environment
|
|
x509 = None # type: ignore[assignment]
|
|
dsa = ec = ed25519 = ed448 = rsa = None # type: ignore[assignment]
|
|
Encoding = PublicFormat = None # type: ignore[assignment]
|
|
ExtensionOID = NameOID = None # type: ignore[assignment]
|
|
HAS_CRYPTOGRAPHY = False
|
|
|
|
|
|
VALID_PROFILES = {"relaxed", "serious", "internal", "external"}
|
|
DEFAULT_HTTP_STATUS = [200, 301, 302, 401, 403]
|
|
HOST_RE = re.compile(r"^[A-Za-z0-9_.:-]+$")
|
|
|
|
|
|
class ConfigError(ValueError):
|
|
"""Raised when the target configuration is invalid."""
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Target:
|
|
"""Normalized target configuration."""
|
|
|
|
name: str
|
|
host: str
|
|
port: int
|
|
owner: str
|
|
profile: str
|
|
expected_issuer_contains: str | None = None
|
|
expected_hostname: str | None = None
|
|
http_check: bool = False
|
|
expected_http_status: tuple[int, ...] = tuple(DEFAULT_HTTP_STATUS)
|
|
timeout: float = 10.0
|
|
|
|
@classmethod
|
|
def from_dict(cls, raw: dict[str, Any], index: int) -> "Target":
|
|
"""Validate and normalize one target dictionary."""
|
|
required = ("name", "host", "port", "owner", "profile")
|
|
missing = [field for field in required if field not in raw]
|
|
if missing:
|
|
raise ConfigError(f"target #{index}: missing required field(s): {', '.join(missing)}")
|
|
|
|
name = _non_empty_string(raw["name"], f"target #{index}: name")
|
|
host = _validate_host(raw["host"], f"target #{index}: host")
|
|
port = _validate_port(raw["port"], f"target #{index}: port")
|
|
owner = _non_empty_string(raw["owner"], f"target #{index}: owner")
|
|
profile = _non_empty_string(raw["profile"], f"target #{index}: profile")
|
|
if profile not in VALID_PROFILES:
|
|
raise ConfigError(
|
|
f"target #{index}: profile must be one of {', '.join(sorted(VALID_PROFILES))}"
|
|
)
|
|
|
|
expected_hostname = raw.get("expected_hostname", host)
|
|
expected_hostname = _validate_host(expected_hostname, f"target #{index}: expected_hostname")
|
|
|
|
issuer = raw.get("expected_issuer_contains")
|
|
if issuer is not None:
|
|
issuer = _non_empty_string(issuer, f"target #{index}: expected_issuer_contains")
|
|
|
|
http_check = raw.get("http_check", False)
|
|
if not isinstance(http_check, bool):
|
|
raise ConfigError(f"target #{index}: http_check must be boolean")
|
|
|
|
timeout = raw.get("timeout", 10)
|
|
if not isinstance(timeout, (int, float)) or isinstance(timeout, bool) or timeout <= 0:
|
|
raise ConfigError(f"target #{index}: timeout must be a positive number")
|
|
if timeout > 60:
|
|
raise ConfigError(f"target #{index}: timeout must be 60 seconds or less")
|
|
|
|
statuses = raw.get("expected_http_status", DEFAULT_HTTP_STATUS)
|
|
if not isinstance(statuses, list) or not statuses:
|
|
raise ConfigError(f"target #{index}: expected_http_status must be a non-empty list")
|
|
normalized_statuses: list[int] = []
|
|
for status in statuses:
|
|
if not isinstance(status, int) or isinstance(status, bool) or status < 100 or status > 599:
|
|
raise ConfigError(f"target #{index}: expected_http_status contains invalid status")
|
|
normalized_statuses.append(status)
|
|
|
|
return cls(
|
|
name=name,
|
|
host=host,
|
|
port=port,
|
|
owner=owner,
|
|
profile=profile,
|
|
expected_issuer_contains=issuer,
|
|
expected_hostname=expected_hostname,
|
|
http_check=http_check,
|
|
expected_http_status=tuple(normalized_statuses),
|
|
timeout=float(timeout),
|
|
)
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
"""Return a serializable target dictionary."""
|
|
return {
|
|
"name": self.name,
|
|
"host": self.host,
|
|
"port": self.port,
|
|
"owner": self.owner,
|
|
"profile": self.profile,
|
|
"expected_issuer_contains": self.expected_issuer_contains,
|
|
"expected_hostname": self.expected_hostname,
|
|
"http_check": self.http_check,
|
|
"expected_http_status": list(self.expected_http_status),
|
|
"timeout": self.timeout,
|
|
}
|
|
|
|
|
|
def _non_empty_string(value: Any, field: str) -> str:
|
|
if not isinstance(value, str) or not value.strip():
|
|
raise ConfigError(f"{field} must be a non-empty string")
|
|
return value.strip()
|
|
|
|
|
|
def _validate_port(value: Any, field: str) -> int:
|
|
if isinstance(value, bool):
|
|
raise ConfigError(f"{field} must be an integer port")
|
|
try:
|
|
port = int(value)
|
|
except (TypeError, ValueError) as exc:
|
|
raise ConfigError(f"{field} must be an integer port") from exc
|
|
if port < 1 or port > 65535:
|
|
raise ConfigError(f"{field} must be between 1 and 65535")
|
|
return port
|
|
|
|
|
|
def _validate_host(value: Any, field: str) -> str:
|
|
host = _non_empty_string(value, field)
|
|
if "/" in host or "\\" in host or any(char.isspace() for char in host):
|
|
raise ConfigError(f"{field} must be a hostname or IP address, not a URL")
|
|
try:
|
|
ipaddress.ip_address(host.strip("[]"))
|
|
return host.strip("[]")
|
|
except ValueError:
|
|
pass
|
|
if len(host) > 253 or not HOST_RE.match(host):
|
|
raise ConfigError(f"{field} contains invalid characters")
|
|
labels = host.rstrip(".").split(".")
|
|
if any(not label or label.startswith("-") or label.endswith("-") for label in labels):
|
|
raise ConfigError(f"{field} contains invalid DNS label(s)")
|
|
return host.rstrip(".")
|
|
|
|
|
|
def load_targets(path: Path) -> list[dict[str, Any]]:
|
|
"""Load, validate, normalize, and deduplicate target config."""
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
raw = json.load(handle)
|
|
if not isinstance(raw, list):
|
|
raise ConfigError("config root must be a JSON array")
|
|
|
|
targets: list[dict[str, Any]] = []
|
|
seen: set[tuple[str, int]] = set()
|
|
for index, entry in enumerate(raw, start=1):
|
|
if not isinstance(entry, dict):
|
|
raise ConfigError(f"target #{index}: entry must be an object")
|
|
target = Target.from_dict(entry, index)
|
|
key = (target.host.lower(), target.port)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
targets.append(target.to_dict())
|
|
return targets
|
|
|
|
|
|
def find_target(config: Path, host: str, port: int) -> Target:
|
|
"""Find one target by host and port in a validated config file."""
|
|
for raw in load_targets(config):
|
|
if str(raw["host"]).lower() == host.lower() and int(raw["port"]) == port:
|
|
return Target.from_dict(raw, 0)
|
|
raise ConfigError(f"target {host}:{port} was not found in {config}")
|
|
|
|
|
|
def default_result(target: Target) -> dict[str, Any]:
|
|
"""Create a stable JSON result skeleton."""
|
|
return {
|
|
"target": target.host,
|
|
"port": target.port,
|
|
"name": target.name,
|
|
"owner": target.owner,
|
|
"profile": target.profile,
|
|
"reachable": False,
|
|
"error": "",
|
|
"valid_now": None,
|
|
"days_left": None,
|
|
"not_before": None,
|
|
"not_after": None,
|
|
"subject_cn": None,
|
|
"san_names": None,
|
|
"issuer_cn": None,
|
|
"issuer_org": None,
|
|
"serial_number": None,
|
|
"fingerprint_sha256": None,
|
|
"hostname_match": None,
|
|
"chain_valid": None,
|
|
"self_signed": None,
|
|
"not_yet_valid": None,
|
|
"expired": None,
|
|
"expected_issuer_match": None,
|
|
"expected_hostname_match": None,
|
|
"tls_version_negotiated": None,
|
|
"tls12_supported": None,
|
|
"tls13_supported": None,
|
|
"tls10_supported": None,
|
|
"tls11_supported": None,
|
|
"key_type": None,
|
|
"key_bits": None,
|
|
"signature_algorithm": None,
|
|
"http_check_enabled": target.http_check,
|
|
"http_reachable": None,
|
|
"http_status": None,
|
|
"http_status_expected": None,
|
|
"http_final_url": None,
|
|
"http_redirects_to_https": None,
|
|
"http_response_time_ms": None,
|
|
"http_server_header": None,
|
|
"http_hsts": None,
|
|
"http_security_headers_score": None,
|
|
"warnings": [],
|
|
}
|
|
|
|
|
|
def utc_now() -> dt.datetime:
|
|
"""Return timezone-aware UTC now."""
|
|
return dt.datetime.now(dt.timezone.utc)
|
|
|
|
|
|
def iso_z(value: dt.datetime | None) -> str | None:
|
|
"""Format a datetime as RFC3339 UTC with Z suffix."""
|
|
if value is None:
|
|
return None
|
|
if value.tzinfo is None:
|
|
value = value.replace(tzinfo=dt.timezone.utc)
|
|
return value.astimezone(dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
|
|
|
|
|
|
def parse_cert_time(value: str | None) -> dt.datetime | None:
|
|
"""Parse OpenSSL ASN.1 time strings from getpeercert()."""
|
|
if not value:
|
|
return None
|
|
parsed = parsedate_to_datetime(value)
|
|
if parsed.tzinfo is None:
|
|
parsed = parsed.replace(tzinfo=dt.timezone.utc)
|
|
return parsed.astimezone(dt.timezone.utc)
|
|
|
|
|
|
def colon_hex(data: bytes) -> str:
|
|
"""Return colon-separated uppercase hex."""
|
|
return ":".join(f"{byte:02X}" for byte in data)
|
|
|
|
|
|
def get_name_attr(name_parts: Any, key: str) -> str | None:
|
|
"""Extract an attribute from ssl.getpeercert() subject/issuer tuples."""
|
|
for rdn in name_parts or ():
|
|
for attr_name, value in rdn:
|
|
if attr_name == key:
|
|
return str(value)
|
|
return None
|
|
|
|
|
|
def parse_stdlib_cert(cert: dict[str, Any]) -> dict[str, Any]:
|
|
"""Parse fields available from ssl.getpeercert()."""
|
|
san_names = [
|
|
str(value)
|
|
for kind, value in cert.get("subjectAltName", [])
|
|
if str(kind).lower() in {"dns", "ip address"}
|
|
]
|
|
return {
|
|
"not_before_dt": parse_cert_time(cert.get("notBefore")),
|
|
"not_after_dt": parse_cert_time(cert.get("notAfter")),
|
|
"subject_cn": get_name_attr(cert.get("subject"), "commonName"),
|
|
"issuer_cn": get_name_attr(cert.get("issuer"), "commonName"),
|
|
"issuer_org": get_name_attr(cert.get("issuer"), "organizationName"),
|
|
"san_names": san_names,
|
|
"serial_number": cert.get("serialNumber"),
|
|
}
|
|
|
|
|
|
def parse_crypto_cert(der_cert: bytes) -> dict[str, Any]:
|
|
"""Parse certificate fields with optional cryptography support."""
|
|
if not HAS_CRYPTOGRAPHY or x509 is None:
|
|
return {}
|
|
|
|
cert = x509.load_der_x509_certificate(der_cert)
|
|
|
|
def first_name_attr(name: Any, oid: Any) -> str | None:
|
|
attrs = name.get_attributes_for_oid(oid)
|
|
return str(attrs[0].value) if attrs else None
|
|
|
|
san_names: list[str] = []
|
|
try:
|
|
san_ext = cert.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value
|
|
san_names.extend(str(name) for name in san_ext.get_values_for_type(x509.DNSName))
|
|
san_names.extend(str(name) for name in san_ext.get_values_for_type(x509.IPAddress))
|
|
except x509.ExtensionNotFound:
|
|
pass
|
|
|
|
public_key = cert.public_key()
|
|
key_type = type(public_key).__name__
|
|
key_bits: int | None = None
|
|
if rsa is not None and isinstance(public_key, rsa.RSAPublicKey):
|
|
key_type = "RSA"
|
|
key_bits = public_key.key_size
|
|
elif dsa is not None and isinstance(public_key, dsa.DSAPublicKey):
|
|
key_type = "DSA"
|
|
key_bits = public_key.key_size
|
|
elif ec is not None and isinstance(public_key, ec.EllipticCurvePublicKey):
|
|
key_type = "EC"
|
|
key_bits = public_key.key_size
|
|
elif ed25519 is not None and isinstance(public_key, ed25519.Ed25519PublicKey):
|
|
key_type = "Ed25519"
|
|
elif ed448 is not None and isinstance(public_key, ed448.Ed448PublicKey):
|
|
key_type = "Ed448"
|
|
|
|
return {
|
|
"not_before_dt": cert.not_valid_before_utc,
|
|
"not_after_dt": cert.not_valid_after_utc,
|
|
"subject_cn": first_name_attr(cert.subject, NameOID.COMMON_NAME),
|
|
"issuer_cn": first_name_attr(cert.issuer, NameOID.COMMON_NAME),
|
|
"issuer_org": first_name_attr(cert.issuer, NameOID.ORGANIZATION_NAME),
|
|
"san_names": san_names,
|
|
"serial_number": format(cert.serial_number, "X"),
|
|
"key_type": key_type,
|
|
"key_bits": key_bits,
|
|
"signature_algorithm": (
|
|
cert.signature_hash_algorithm.name if cert.signature_hash_algorithm else cert.signature_algorithm_oid._name
|
|
),
|
|
"self_signed": cert.issuer == cert.subject and _self_signature_looks_valid(cert),
|
|
}
|
|
|
|
|
|
def _self_signature_looks_valid(cert: Any) -> bool:
|
|
"""Best-effort self-signed detection without failing the whole check."""
|
|
try:
|
|
public_key = cert.public_key()
|
|
public_bytes = public_key.public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo)
|
|
return cert.issuer == cert.subject and bool(public_bytes)
|
|
except Exception:
|
|
return cert.issuer == cert.subject
|
|
|
|
|
|
def connect_tls(
|
|
host: str,
|
|
port: int,
|
|
expected_hostname: str,
|
|
timeout: float,
|
|
verify: bool = True,
|
|
min_version: ssl.TLSVersion | None = None,
|
|
max_version: ssl.TLSVersion | None = None,
|
|
) -> tuple[ssl.SSLSocket, dict[str, Any], bytes]:
|
|
"""Open a TLS connection and return socket, parsed cert, and DER leaf cert."""
|
|
context = ssl.create_default_context() if verify else ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
context.check_hostname = verify
|
|
context.verify_mode = ssl.CERT_REQUIRED if verify else ssl.CERT_NONE
|
|
if min_version is not None:
|
|
context.minimum_version = min_version
|
|
if max_version is not None:
|
|
context.maximum_version = max_version
|
|
|
|
raw_sock = socket.create_connection((host, port), timeout=timeout)
|
|
tls_sock = context.wrap_socket(raw_sock, server_hostname=expected_hostname)
|
|
tls_sock.settimeout(timeout)
|
|
cert = tls_sock.getpeercert()
|
|
der_cert = tls_sock.getpeercert(binary_form=True)
|
|
if der_cert is None:
|
|
raise ssl.SSLError("server did not provide a certificate")
|
|
return tls_sock, cert, der_cert
|
|
|
|
|
|
def check_tls(target: Target, result: dict[str, Any]) -> None:
|
|
"""Populate TLS and certificate result fields."""
|
|
try:
|
|
sock, cert, der_cert = connect_tls(
|
|
target.host, target.port, target.expected_hostname or target.host, target.timeout, verify=True
|
|
)
|
|
negotiated_version = sock.version()
|
|
sock.close()
|
|
result["reachable"] = True
|
|
result["chain_valid"] = True
|
|
result["hostname_match"] = True
|
|
result["tls_version_negotiated"] = negotiated_version
|
|
except ssl.SSLCertVerificationError as exc:
|
|
result["chain_valid"] = False
|
|
result["hostname_match"] = "hostname" not in str(exc).lower()
|
|
result["error"] = f"TLS verification failed: {exc.verify_message or exc}"
|
|
try:
|
|
sock, cert, der_cert = connect_tls(
|
|
target.host, target.port, target.expected_hostname or target.host, target.timeout, verify=False
|
|
)
|
|
result["reachable"] = True
|
|
result["tls_version_negotiated"] = sock.version()
|
|
sock.close()
|
|
except Exception:
|
|
return
|
|
except (TimeoutError, OSError, ssl.SSLError) as exc:
|
|
result["reachable"] = False
|
|
result["error"] = f"TLS connection failed: {type(exc).__name__}: {exc}"
|
|
return
|
|
|
|
std_fields = parse_stdlib_cert(cert)
|
|
crypto_fields = parse_crypto_cert(der_cert)
|
|
fields = {**std_fields, **{key: value for key, value in crypto_fields.items() if value not in (None, [], "")}}
|
|
|
|
not_before = fields.pop("not_before_dt", None)
|
|
not_after = fields.pop("not_after_dt", None)
|
|
now = utc_now()
|
|
|
|
result.update(fields)
|
|
result["fingerprint_sha256"] = colon_hex(hashlib.sha256(der_cert).digest())
|
|
result["not_before"] = iso_z(not_before)
|
|
result["not_after"] = iso_z(not_after)
|
|
result["not_yet_valid"] = bool(not_before and now < not_before)
|
|
result["expired"] = bool(not_after and now > not_after)
|
|
result["valid_now"] = bool(not_before and not_after and not_before <= now <= not_after and result["chain_valid"])
|
|
result["days_left"] = max(0, int((not_after - now).total_seconds() // 86400)) if not_after else None
|
|
|
|
if result["self_signed"] is None:
|
|
result["self_signed"] = bool(
|
|
result["subject_cn"] and result["issuer_cn"] and result["subject_cn"] == result["issuer_cn"]
|
|
)
|
|
result["hostname_match"] = certificate_matches_hostname(cert, target.expected_hostname or target.host)
|
|
result["expected_hostname_match"] = certificate_matches_hostname(cert, target.expected_hostname or target.host)
|
|
issuer_text = " ".join(
|
|
str(part)
|
|
for part in (result.get("issuer_cn"), result.get("issuer_org"))
|
|
if part is not None
|
|
)
|
|
result["expected_issuer_match"] = (
|
|
target.expected_issuer_contains.lower() in issuer_text.lower()
|
|
if target.expected_issuer_contains
|
|
else None
|
|
)
|
|
|
|
if not HAS_CRYPTOGRAPHY:
|
|
result["warnings"].append("cryptography not installed; key/signature parsing is limited")
|
|
|
|
result["tls10_supported"] = probe_tls_version(target, ssl.TLSVersion.TLSv1, "TLS 1.0", result["warnings"])
|
|
result["tls11_supported"] = probe_tls_version(target, ssl.TLSVersion.TLSv1_1, "TLS 1.1", result["warnings"])
|
|
result["tls12_supported"] = probe_tls_version(target, ssl.TLSVersion.TLSv1_2, "TLS 1.2", result["warnings"])
|
|
if hasattr(ssl.TLSVersion, "TLSv1_3"):
|
|
result["tls13_supported"] = probe_tls_version(target, ssl.TLSVersion.TLSv1_3, "TLS 1.3", result["warnings"])
|
|
|
|
|
|
def certificate_matches_hostname(cert: dict[str, Any], hostname: str) -> bool | None:
|
|
"""Return whether a peer certificate matches hostname."""
|
|
try:
|
|
ssl.match_hostname(cert, hostname)
|
|
return True
|
|
except ssl.CertificateError:
|
|
return False
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def probe_tls_version(
|
|
target: Target, version: ssl.TLSVersion, label: str, warnings: list[str]
|
|
) -> bool | None:
|
|
"""Probe support for one exact TLS protocol version."""
|
|
try:
|
|
sock, _, _ = connect_tls(
|
|
target.host,
|
|
target.port,
|
|
target.expected_hostname or target.host,
|
|
min(target.timeout, 5.0),
|
|
verify=False,
|
|
min_version=version,
|
|
max_version=version,
|
|
)
|
|
sock.close()
|
|
return True
|
|
except ssl.SSLError as exc:
|
|
message = str(exc).lower()
|
|
if (
|
|
"unsupported protocol" in message
|
|
or "no protocols available" in message
|
|
or "no ciphers available" in message
|
|
):
|
|
warnings.append(f"{label} could not be tested due to local OpenSSL policy")
|
|
return None
|
|
return False
|
|
except (TimeoutError, OSError):
|
|
return False
|
|
except ValueError as exc:
|
|
warnings.append(f"{label} could not be tested: {exc}")
|
|
return None
|
|
|
|
|
|
def check_http(target: Target, result: dict[str, Any]) -> None:
|
|
"""Perform an HTTPS request with limited redirect following."""
|
|
if not target.http_check:
|
|
return
|
|
|
|
start = time.monotonic()
|
|
url = f"https://{target.host}:{target.port}/" if target.port != 443 else f"https://{target.host}/"
|
|
redirects_to_https = True
|
|
|
|
try:
|
|
for _ in range(6):
|
|
parsed = urlparse(url)
|
|
if parsed.scheme != "https":
|
|
redirects_to_https = False
|
|
if parsed.scheme not in {"https", "http"}:
|
|
raise ValueError(f"unsupported redirect scheme: {parsed.scheme}")
|
|
|
|
connection_cls = http.client.HTTPSConnection if parsed.scheme == "https" else http.client.HTTPConnection
|
|
port = parsed.port or (443 if parsed.scheme == "https" else 80)
|
|
path = parsed.path or "/"
|
|
if parsed.query:
|
|
path += f"?{parsed.query}"
|
|
|
|
connection = connection_cls(parsed.hostname, port, timeout=target.timeout)
|
|
connection.request("GET", path, headers={"User-Agent": "zabbix-ssl-checker/1.0"})
|
|
response = connection.getresponse()
|
|
headers = {key.lower(): value for key, value in response.getheaders()}
|
|
response.read(1024)
|
|
status = response.status
|
|
location = headers.get("location")
|
|
connection.close()
|
|
|
|
if status in {301, 302, 303, 307, 308} and location:
|
|
url = urljoin(url, location)
|
|
continue
|
|
|
|
elapsed_ms = int((time.monotonic() - start) * 1000)
|
|
result["http_reachable"] = True
|
|
result["http_status"] = status
|
|
result["http_status_expected"] = status in target.expected_http_status
|
|
result["http_final_url"] = url
|
|
result["http_redirects_to_https"] = redirects_to_https and urlparse(url).scheme == "https"
|
|
result["http_response_time_ms"] = elapsed_ms
|
|
result["http_server_header"] = headers.get("server")
|
|
result["http_hsts"] = "strict-transport-security" in headers
|
|
result["http_security_headers_score"] = sum(
|
|
1
|
|
for header in (
|
|
"strict-transport-security",
|
|
"x-content-type-options",
|
|
"x-frame-options",
|
|
"content-security-policy",
|
|
"referrer-policy",
|
|
)
|
|
if header in headers
|
|
)
|
|
return
|
|
|
|
raise ValueError("too many redirects")
|
|
except Exception as exc:
|
|
result["http_reachable"] = False
|
|
result["http_status_expected"] = False
|
|
result["http_response_time_ms"] = int((time.monotonic() - start) * 1000)
|
|
result["warnings"].append(f"HTTP check failed: {type(exc).__name__}: {exc}")
|
|
|
|
|
|
def run_check(target: Target) -> dict[str, Any]:
|
|
"""Run TLS and optional HTTP checks for one target."""
|
|
result = default_result(target)
|
|
check_tls(target, result)
|
|
check_http(target, result)
|
|
return result
|
|
|
|
|
|
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
|
|
"""Parse command-line arguments."""
|
|
parser = argparse.ArgumentParser(description="Check one SSL/TLS target for Zabbix.")
|
|
parser.add_argument("--config", help="Path to ssl_targets.json")
|
|
parser.add_argument("--host", required=True, help="Target hostname or IP")
|
|
parser.add_argument("--port", required=True, type=int, help="Target port")
|
|
parser.add_argument("--name", help="Target display name when no config is used")
|
|
parser.add_argument("--owner", default="unknown", help="Target owner when no config is used")
|
|
parser.add_argument("--profile", default="relaxed", choices=sorted(VALID_PROFILES))
|
|
parser.add_argument("--expected-issuer-contains")
|
|
parser.add_argument("--expected-hostname")
|
|
parser.add_argument("--http-check", action="store_true")
|
|
parser.add_argument("--expected-http-status", nargs="*", type=int, default=DEFAULT_HTTP_STATUS)
|
|
parser.add_argument("--timeout", type=float, default=10)
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def target_from_args(args: argparse.Namespace) -> Target:
|
|
"""Build target config from CLI arguments or a config file."""
|
|
port = _validate_port(args.port, "--port")
|
|
host = _validate_host(args.host, "--host")
|
|
if args.config:
|
|
return find_target(Path(args.config), host, port)
|
|
|
|
raw = {
|
|
"name": args.name or host,
|
|
"host": host,
|
|
"port": port,
|
|
"owner": args.owner,
|
|
"profile": args.profile,
|
|
"expected_issuer_contains": args.expected_issuer_contains,
|
|
"expected_hostname": args.expected_hostname or host,
|
|
"http_check": args.http_check,
|
|
"expected_http_status": args.expected_http_status,
|
|
"timeout": args.timeout,
|
|
}
|
|
return Target.from_dict(raw, 0)
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
"""Run the checker and always print JSON for valid arguments/config."""
|
|
try:
|
|
args = parse_args(argv)
|
|
target = target_from_args(args)
|
|
except (ConfigError, OSError, json.JSONDecodeError) as exc:
|
|
print(f"Configuration/argument error: {exc}", file=sys.stderr)
|
|
return 2
|
|
|
|
result = run_check(target)
|
|
print(json.dumps(result, separators=(",", ":"), ensure_ascii=False))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|