Files
compliance-scan/src/sslysze_scan/reporter/query.py
2025-12-19 20:10:39 +01:00

553 lines
17 KiB
Python

"""Report generation module for scan results."""
import sqlite3
from typing import Any
# Compliance thresholds
COMPLIANCE_WARNING_THRESHOLD = 50.0
def has_tls_support(port_data: dict[str, Any]) -> bool:
"""Check if port has TLS support based on data presence.
Args:
port_data: Port data dictionary
Returns:
True if port has TLS support
"""
return bool(
port_data.get("cipher_suites")
or port_data.get("supported_groups")
or port_data.get("certificates")
or port_data.get("tls_version")
)
def list_scans(db_path: str) -> list[dict[str, Any]]:
"""List all available scans in the database.
Args:
db_path: Path to database file
Returns:
List of scan dictionaries with metadata
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute(
"""
SELECT scan_id, timestamp, hostname, ports, scan_duration_seconds
FROM scans
ORDER BY scan_id DESC
""",
)
scans = []
for row in cursor.fetchall():
scans.append(
{
"scan_id": row[0],
"timestamp": row[1],
"hostname": row[2],
"ports": row[3],
"duration": row[4],
},
)
conn.close()
return scans
def get_scan_metadata(db_path: str, scan_id: int) -> dict[str, Any] | None:
"""Get metadata for a specific scan.
Args:
db_path: Path to database file
scan_id: Scan ID
Returns:
Dictionary with scan metadata or None if not found
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute(
"""
SELECT s.scan_id, s.timestamp, s.hostname, s.ports, s.scan_duration_seconds,
h.fqdn, h.ipv4, h.ipv6
FROM scans s
LEFT JOIN scanned_hosts h ON s.scan_id = h.scan_id
WHERE s.scan_id = ?
""",
(scan_id,),
)
row = cursor.fetchone()
conn.close()
if not row:
return None
return {
"scan_id": row[0],
"timestamp": row[1],
"hostname": row[2],
"ports": row[3].split(",") if row[3] else [],
"duration": row[4],
"fqdn": row[5] or row[2],
"ipv4": row[6],
"ipv6": row[7],
}
def get_scan_data(db_path: str, scan_id: int) -> dict[str, Any]:
"""Get all scan data for report generation.
Args:
db_path: Path to database file
scan_id: Scan ID
Returns:
Dictionary with all scan data
"""
metadata = get_scan_metadata(db_path, scan_id)
if not metadata:
raise ValueError(f"Scan ID {scan_id} not found")
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
data = {
"metadata": metadata,
"ports_data": {},
}
# Get data for each port
for port in metadata["ports"]:
port_num = int(port)
port_data = {
"port": port_num,
"status": "completed",
"tls_version": None,
"cipher_suites": {},
"supported_groups": [],
"certificates": [],
"vulnerabilities": [],
"protocol_features": [],
"session_features": [],
"http_headers": [],
"compliance": {},
}
# Cipher suites using view
cursor.execute(
"""
SELECT tls_version, cipher_suite_name, accepted, iana_value, key_size, is_anonymous,
iana_recommended_final, bsi_approved_final, bsi_valid_until_final, compliant
FROM v_cipher_suites_with_compliance
WHERE scan_id = ? AND port = ?
ORDER BY tls_version, accepted DESC, cipher_suite_name
""",
(scan_id, port_num),
)
rejected_counts = {}
for row in cursor.fetchall():
tls_version = row[0]
if tls_version not in port_data["cipher_suites"]:
port_data["cipher_suites"][tls_version] = {
"accepted": [],
"rejected": [],
}
rejected_counts[tls_version] = 0
suite = {
"name": row[1],
"accepted": row[2],
"iana_value": row[3],
"key_size": row[4],
"is_anonymous": row[5],
}
if row[2]: # accepted
suite["iana_recommended"] = row[6]
suite["bsi_approved"] = row[7]
suite["bsi_valid_until"] = row[8]
suite["compliant"] = row[9]
port_data["cipher_suites"][tls_version]["accepted"].append(suite)
else: # rejected
rejected_counts[tls_version] += 1
# Only include rejected if BSI-approved OR IANA-recommended
if row[7] or row[6] == "Y":
suite["iana_recommended"] = row[6]
suite["bsi_approved"] = row[7]
suite["bsi_valid_until"] = row[8]
suite["compliant"] = False
port_data["cipher_suites"][tls_version]["rejected"].append(suite)
# Store rejected counts
for tls_version in port_data["cipher_suites"]:
port_data["cipher_suites"][tls_version]["rejected_total"] = (
rejected_counts.get(tls_version, 0)
)
# Determine highest TLS version
if port_data["cipher_suites"]:
tls_versions = list(port_data["cipher_suites"].keys())
version_order = ["ssl_3.0", "1.0", "1.1", "1.2", "1.3"]
for version in reversed(version_order):
if version in tls_versions:
port_data["tls_version"] = version
break
# Supported groups using view
cursor.execute(
"""
SELECT group_name, iana_value, openssl_nid,
iana_recommended, bsi_approved, bsi_valid_until, compliant
FROM v_supported_groups_with_compliance
WHERE scan_id = ? AND port = ?
ORDER BY group_name
""",
(scan_id, port_num),
)
for row in cursor.fetchall():
port_data["supported_groups"].append(
{
"name": row[0],
"iana_value": row[1],
"openssl_nid": row[2],
"iana_recommended": row[3],
"bsi_approved": row[4],
"bsi_valid_until": row[5],
"compliant": row[6],
},
)
# Certificates using view
cursor.execute(
"""
SELECT position, subject, issuer, serial_number, not_before, not_after,
key_type, key_bits, signature_algorithm, fingerprint_sha256,
compliant, compliance_details
FROM v_certificates_with_compliance
WHERE scan_id = ? AND port = ?
ORDER BY position
""",
(scan_id, port_num),
)
for row in cursor.fetchall():
port_data["certificates"].append(
{
"position": row[0],
"subject": row[1],
"issuer": row[2],
"serial_number": row[3],
"not_before": row[4],
"not_after": row[5],
"key_type": row[6],
"key_bits": row[7],
"signature_algorithm": row[8],
"fingerprint_sha256": row[9],
"compliant": row[10] if row[10] is not None else None,
"compliance_details": row[11] if row[11] else None,
},
)
# Vulnerabilities
cursor.execute(
"""
SELECT vuln_type, vulnerable, details
FROM scan_vulnerabilities
WHERE scan_id = ? AND port = ?
ORDER BY vuln_type
""",
(scan_id, port_num),
)
for row in cursor.fetchall():
port_data["vulnerabilities"].append(
{
"type": row[0],
"vulnerable": row[1],
"details": row[2],
},
)
# Protocol features
cursor.execute(
"""
SELECT feature_type, supported, details
FROM scan_protocol_features
WHERE scan_id = ? AND port = ?
ORDER BY feature_type
""",
(scan_id, port_num),
)
for row in cursor.fetchall():
port_data["protocol_features"].append(
{
"name": row[0],
"supported": row[1],
"details": row[2],
},
)
# Session features
cursor.execute(
"""
SELECT feature_type, client_initiated, secure, session_id_supported,
ticket_supported, attempted_resumptions, successful_resumptions, details
FROM scan_session_features
WHERE scan_id = ? AND port = ?
ORDER BY feature_type
""",
(scan_id, port_num),
)
for row in cursor.fetchall():
port_data["session_features"].append(
{
"type": row[0],
"client_initiated": row[1],
"secure": row[2],
"session_id_supported": row[3],
"ticket_supported": row[4],
"attempted_resumptions": row[5],
"successful_resumptions": row[6],
"details": row[7],
},
)
# HTTP headers
cursor.execute(
"""
SELECT header_name, header_value, is_present
FROM scan_http_headers
WHERE scan_id = ? AND port = ?
ORDER BY header_name
""",
(scan_id, port_num),
)
for row in cursor.fetchall():
port_data["http_headers"].append(
{
"name": row[0],
"value": row[1],
"is_present": row[2],
},
)
# Compliance summary using view
cursor.execute(
"""
SELECT check_type, total, passed, percentage
FROM v_port_compliance_summary
WHERE scan_id = ? AND port = ?
""",
(scan_id, port_num),
)
for row in cursor.fetchall():
check_type = row[0]
total = row[1]
passed = row[2]
percentage = row[3]
if check_type == "cipher_suite":
port_data["compliance"]["cipher_suites_checked"] = total
port_data["compliance"]["cipher_suites_passed"] = passed
port_data["compliance"]["cipher_suite_percentage"] = f"{percentage:.1f}"
elif check_type == "supported_group":
port_data["compliance"]["groups_checked"] = total
port_data["compliance"]["groups_passed"] = passed
port_data["compliance"]["group_percentage"] = f"{percentage:.1f}"
# Get missing recommended groups for this port
port_data["missing_recommended_groups"] = _get_missing_recommended_groups(
cursor,
scan_id,
port_num,
)
data["ports_data"][port_num] = port_data
conn.close()
# Calculate overall summary
data["summary"] = _calculate_summary(data)
return data
def _get_missing_recommended_groups(
cursor: sqlite3.Cursor,
scan_id: int,
port: int,
) -> dict[str, list[dict[str, Any]]]:
"""Get recommended groups that are not offered by the server using views.
Args:
cursor: Database cursor
scan_id: Scan ID
port: Port number
Returns:
Dictionary with 'bsi_approved' and 'iana_recommended' lists
"""
missing = {"bsi_approved": [], "iana_recommended": []}
# Get missing BSI-approved groups using view
cursor.execute(
"""
SELECT group_name, tls_version, valid_until
FROM v_missing_bsi_groups
WHERE scan_id = ?
ORDER BY group_name, tls_version
""",
(scan_id,),
)
bsi_groups = {}
for row in cursor.fetchall():
group_name = row[0]
tls_version = row[1]
valid_until = row[2]
if group_name not in bsi_groups:
bsi_groups[group_name] = {
"name": group_name,
"tls_versions": [],
"valid_until": valid_until,
}
bsi_groups[group_name]["tls_versions"].append(tls_version)
missing["bsi_approved"] = list(bsi_groups.values())
# Get missing IANA-recommended groups using view
cursor.execute(
"""
SELECT group_name, iana_value
FROM v_missing_iana_groups
WHERE scan_id = ?
ORDER BY CAST(iana_value AS INTEGER)
""",
(scan_id,),
)
for row in cursor.fetchall():
missing["iana_recommended"].append(
{
"name": row[0],
"iana_value": row[1],
},
)
return missing
def _calculate_summary(data: dict[str, Any]) -> dict[str, Any]:
"""Calculate overall summary statistics."""
total_cipher_suites = 0
compliant_cipher_suites = 0
total_groups = 0
compliant_groups = 0
critical_vulnerabilities = 0
ports_with_tls = 0
ports_without_tls = 0
for port_data in data["ports_data"].values():
# Check if port has TLS support
has_tls = (
port_data.get("cipher_suites")
or port_data.get("supported_groups")
or port_data.get("certificates")
or port_data.get("tls_version")
)
if has_tls:
ports_with_tls += 1
compliance = port_data.get("compliance", {})
total_cipher_suites += compliance.get("cipher_suites_checked", 0)
compliant_cipher_suites += compliance.get("cipher_suites_passed", 0)
total_groups += compliance.get("groups_checked", 0)
compliant_groups += compliance.get("groups_passed", 0)
for vuln in port_data.get("vulnerabilities", []):
if vuln.get("vulnerable"):
critical_vulnerabilities += 1
else:
ports_without_tls += 1
cipher_suite_percentage = (
(compliant_cipher_suites / total_cipher_suites * 100)
if total_cipher_suites > 0
else 0
)
group_percentage = (compliant_groups / total_groups * 100) if total_groups > 0 else 0
return {
"total_ports": len(data["ports_data"]),
"successful_ports": ports_with_tls,
"ports_without_tls": ports_without_tls,
"total_cipher_suites": total_cipher_suites,
"compliant_cipher_suites": compliant_cipher_suites,
"cipher_suite_percentage": f"{cipher_suite_percentage:.1f}",
"total_groups": total_groups,
"compliant_groups": compliant_groups,
"group_percentage": f"{group_percentage:.1f}",
"critical_vulnerabilities": critical_vulnerabilities,
}
def _generate_recommendations(data: dict[str, Any]) -> list[dict[str, str]]:
"""Generate recommendations based on scan results."""
recommendations = []
# Check for vulnerabilities
for port_data in data["ports_data"].values():
for vuln in port_data.get("vulnerabilities", []):
if vuln.get("vulnerable"):
recommendations.append(
{
"severity": "CRITICAL",
"message": f"Port {port_data['port']}: {vuln['type']} vulnerability found. Immediate update required.",
},
)
# Check for low compliance
summary = data.get("summary", {})
cipher_percentage = float(summary.get("cipher_suite_percentage", 0))
if cipher_percentage < COMPLIANCE_WARNING_THRESHOLD:
recommendations.append(
{
"severity": "WARNING",
"message": f"Only {cipher_percentage:.1f}% of cipher suites are compliant. Disable insecure cipher suites.",
},
)
# Check for deprecated TLS versions
for port_data in data["ports_data"].values():
for tls_version in port_data.get("cipher_suites", {}).keys():
if tls_version in ["ssl_3.0", "1.0", "1.1"]:
if port_data["cipher_suites"][tls_version]["accepted"]:
recommendations.append(
{
"severity": "WARNING",
"message": f"Port {port_data['port']}: Deprecated TLS version {tls_version} is supported. Disable TLS 1.0 and 1.1.",
},
)
return recommendations