feat: initial release

This commit is contained in:
Heiko
2025-12-18 19:16:04 +01:00
commit f038d6a3fc
38 changed files with 6765 additions and 0 deletions

View File

@@ -0,0 +1,50 @@
"""Report generation module for scan results."""
from .csv_export import generate_csv_reports
from .markdown_export import generate_markdown_report
from .query import get_scan_data, get_scan_metadata, list_scans
from .rst_export import generate_rest_report
__all__ = [
"generate_csv_reports",
"generate_markdown_report",
"generate_report",
"generate_rest_report",
"get_scan_data",
"get_scan_metadata",
"list_scans",
]
def generate_report(
db_path: str,
scan_id: int,
report_type: str,
output: str = None,
output_dir: str = ".",
) -> list[str]:
"""Generate report for scan.
Args:
db_path: Path to database file
scan_id: Scan ID
report_type: Report type ('csv', 'markdown', or 'rest')
output: Output file for markdown/rest (auto-generated if None)
output_dir: Output directory for CSV/reST files
Returns:
List of generated file paths
Raises:
ValueError: If report type is unknown
"""
if report_type == "markdown":
file_path = generate_markdown_report(db_path, scan_id, output)
return [file_path]
if report_type == "csv":
return generate_csv_reports(db_path, scan_id, output_dir)
if report_type in ("rest", "rst"):
file_path = generate_rest_report(db_path, scan_id, output, output_dir)
return [file_path]
raise ValueError(f"Unknown report type: {report_type}")

View File

@@ -0,0 +1,536 @@
"""CSV report generation with granular file structure for reST integration."""
import csv
import json
import sqlite3
from pathlib import Path
from typing import Any
from .query import get_scan_data
def _get_headers(db_path: str, export_type: str) -> list[str]:
"""Get CSV headers from database.
Args:
db_path: Path to database file
export_type: Type of export (e.g. 'cipher_suites_accepted')
Returns:
List of column headers
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute(
"SELECT headers FROM csv_export_metadata WHERE export_type = ?",
(export_type,),
)
row = cursor.fetchone()
conn.close()
if row:
return json.loads(row[0])
raise ValueError(f"No headers found for export_type: {export_type}")
def _format_bool(
value: bool | None,
true_val: str = "Yes",
false_val: str = "No",
none_val: str = "-",
) -> str:
"""Format boolean value to string representation.
Args:
value: Boolean value to format
true_val: String representation for True
false_val: String representation for False
none_val: String representation for None
Returns:
Formatted string
"""
if value is True:
return true_val
if value is False:
return false_val
return none_val
def _write_csv(filepath: Path, headers: list[str], rows: list[list[Any]]) -> None:
"""Write data to CSV file.
Args:
filepath: Path to CSV file
headers: List of column headers
rows: List of data rows
"""
with filepath.open("w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(headers)
writer.writerows(rows)
def _export_summary(
output_dir: Path,
summary: dict[str, Any],
db_path: str,
) -> list[str]:
"""Export summary statistics to CSV.
Args:
output_dir: Output directory path
summary: Summary data dictionary
Returns:
List of generated file paths
"""
summary_file = output_dir / "summary.csv"
rows = [
["Scanned Ports", summary.get("total_ports", 0)],
["Ports with TLS Support", summary.get("successful_ports", 0)],
["Cipher Suites Checked", summary.get("total_cipher_suites", 0)],
[
"Cipher Suites Compliant",
(
f"{summary.get('compliant_cipher_suites', 0)} "
f"({summary.get('cipher_suite_percentage', 0)}%)"
),
],
["Supported Groups Checked", summary.get("total_groups", 0)],
[
"Supported Groups Compliant",
(
f"{summary.get('compliant_groups', 0)} "
f"({summary.get('group_percentage', 0)}%)"
),
],
[
"Critical Vulnerabilities",
summary.get("critical_vulnerabilities", 0),
],
]
headers = _get_headers(db_path, "summary")
_write_csv(summary_file, headers, rows)
return [str(summary_file)]
def _export_cipher_suites(
output_dir: Path,
port: int,
cipher_suites: dict[str, dict[str, list]],
db_path: str,
) -> list[str]:
"""Export cipher suites to CSV files.
Args:
output_dir: Output directory path
port: Port number
cipher_suites: Cipher suites data per TLS version
Returns:
List of generated file paths
"""
generated = []
for tls_version, suites in cipher_suites.items():
if suites.get("accepted"):
filepath = output_dir / f"{port}_cipher_suites_{tls_version}_accepted.csv"
rows = [
[
suite["name"],
suite.get("iana_recommended", "-"),
_format_bool(suite.get("bsi_approved")),
suite.get("bsi_valid_until", "-"),
_format_bool(suite.get("compliant")),
]
for suite in suites["accepted"]
]
headers = _get_headers(db_path, "cipher_suites_accepted")
_write_csv(filepath, headers, rows)
generated.append(str(filepath))
if suites.get("rejected"):
filepath = output_dir / f"{port}_cipher_suites_{tls_version}_rejected.csv"
rows = [
[
suite["name"],
suite.get("iana_recommended", "-"),
_format_bool(suite.get("bsi_approved")),
suite.get("bsi_valid_until", "-"),
]
for suite in suites["rejected"]
]
headers = _get_headers(db_path, "cipher_suites_rejected")
_write_csv(filepath, headers, rows)
generated.append(str(filepath))
return generated
def _export_supported_groups(
output_dir: Path,
port: int,
groups: list[dict[str, Any]],
db_path: str,
) -> list[str]:
"""Export supported groups to CSV.
Args:
output_dir: Output directory path
port: Port number
groups: List of supported groups
Returns:
List of generated file paths
"""
filepath = output_dir / f"{port}_supported_groups.csv"
rows = [
[
group["name"],
group.get("iana_recommended", "-"),
_format_bool(group.get("bsi_approved")),
group.get("bsi_valid_until", "-"),
_format_bool(group.get("compliant")),
]
for group in groups
]
headers = _get_headers(db_path, "supported_groups")
_write_csv(filepath, headers, rows)
return [str(filepath)]
def _export_missing_groups(
output_dir: Path,
port: int,
missing: dict[str, list[dict[str, Any]]],
db_path: str,
) -> list[str]:
"""Export missing recommended groups to CSV.
Args:
output_dir: Output directory path
port: Port number
missing: Dictionary with bsi_approved and iana_recommended groups
Returns:
List of generated file paths
"""
generated = []
if missing.get("bsi_approved"):
filepath = output_dir / f"{port}_missing_groups_bsi.csv"
rows = [
[
group["name"],
", ".join(group.get("tls_versions", [])),
group.get("valid_until", "-"),
]
for group in missing["bsi_approved"]
]
headers = _get_headers(db_path, "missing_groups_bsi")
_write_csv(filepath, headers, rows)
generated.append(str(filepath))
if missing.get("iana_recommended"):
filepath = output_dir / f"{port}_missing_groups_iana.csv"
rows = [
[group["name"], group.get("iana_value", "-")]
for group in missing["iana_recommended"]
]
headers = _get_headers(db_path, "missing_groups_iana")
_write_csv(filepath, headers, rows)
generated.append(str(filepath))
return generated
def _export_certificates(
output_dir: Path,
port: int,
certificates: list[dict[str, Any]],
db_path: str,
) -> list[str]:
"""Export certificates to CSV.
Args:
output_dir: Output directory path
port: Port number
certificates: List of certificate data
Returns:
List of generated file paths
"""
filepath = output_dir / f"{port}_certificates.csv"
rows = [
[
cert["position"],
cert["subject"],
cert["issuer"],
cert["not_before"],
cert["not_after"],
cert["key_type"],
cert["key_bits"],
_format_bool(cert.get("compliant")),
]
for cert in certificates
]
headers = _get_headers(db_path, "certificates")
_write_csv(filepath, headers, rows)
return [str(filepath)]
def _export_vulnerabilities(
output_dir: Path,
port: int,
vulnerabilities: list[dict[str, Any]],
db_path: str,
) -> list[str]:
"""Export vulnerabilities to CSV.
Args:
output_dir: Output directory path
port: Port number
vulnerabilities: List of vulnerability data
Returns:
List of generated file paths
"""
filepath = output_dir / f"{port}_vulnerabilities.csv"
rows = [
[
vuln["type"],
_format_bool(vuln["vulnerable"]),
vuln.get("details", "-"),
]
for vuln in vulnerabilities
]
headers = _get_headers(db_path, "vulnerabilities")
_write_csv(filepath, headers, rows)
return [str(filepath)]
def _export_protocol_features(
output_dir: Path,
port: int,
features: list[dict[str, Any]],
db_path: str,
) -> list[str]:
"""Export protocol features to CSV.
Args:
output_dir: Output directory path
port: Port number
features: List of protocol feature data
Returns:
List of generated file paths
"""
filepath = output_dir / f"{port}_protocol_features.csv"
rows = [
[
feature["name"],
_format_bool(feature["supported"]),
feature.get("details", "-"),
]
for feature in features
]
headers = _get_headers(db_path, "protocol_features")
_write_csv(filepath, headers, rows)
return [str(filepath)]
def _export_session_features(
output_dir: Path,
port: int,
features: list[dict[str, Any]],
db_path: str,
) -> list[str]:
"""Export session features to CSV.
Args:
output_dir: Output directory path
port: Port number
features: List of session feature data
Returns:
List of generated file paths
"""
filepath = output_dir / f"{port}_session_features.csv"
rows = [
[
feature["type"],
_format_bool(feature.get("client_initiated")),
_format_bool(feature.get("secure")),
_format_bool(feature.get("session_id_supported")),
_format_bool(feature.get("ticket_supported")),
feature.get("details", "-"),
]
for feature in features
]
headers = _get_headers(db_path, "session_features")
_write_csv(filepath, headers, rows)
return [str(filepath)]
def _export_http_headers(
output_dir: Path,
port: int,
headers: list[dict[str, Any]],
db_path: str,
) -> list[str]:
"""Export HTTP headers to CSV.
Args:
output_dir: Output directory path
port: Port number
headers: List of HTTP header data
Returns:
List of generated file paths
"""
filepath = output_dir / f"{port}_http_headers.csv"
rows = [
[
header["name"],
_format_bool(header["is_present"]),
header.get("value", "-"),
]
for header in headers
]
csv_headers = _get_headers(db_path, "http_headers")
_write_csv(filepath, csv_headers, rows)
return [str(filepath)]
def _export_compliance_status(
output_dir: Path,
port: int,
compliance: dict[str, Any],
db_path: str,
) -> list[str]:
"""Export compliance status to CSV.
Args:
output_dir: Output directory path
port: Port number
compliance: Compliance data dictionary
Returns:
List of generated file paths
"""
filepath = output_dir / f"{port}_compliance_status.csv"
rows = []
if "cipher_suites_checked" in compliance:
rows.append(
[
"Cipher Suites",
compliance["cipher_suites_checked"],
compliance["cipher_suites_passed"],
f"{compliance['cipher_suite_percentage']}%",
],
)
if "groups_checked" in compliance:
rows.append(
[
"Supported Groups",
compliance["groups_checked"],
compliance["groups_passed"],
f"{compliance['group_percentage']}%",
],
)
if rows:
headers = _get_headers(db_path, "compliance_status")
_write_csv(filepath, headers, rows)
return [str(filepath)]
return []
def _has_tls_support(port_data: dict[str, Any]) -> bool:
"""Check if port has TLS support.
Args:
port_data: Port data dictionary
Returns:
True if port has TLS support
"""
return bool(
port_data.get("cipher_suites")
or port_data.get("supported_groups")
or port_data.get("certificates")
or port_data.get("tls_version"),
)
# Export handlers mapping: (data_key, handler_function)
EXPORT_HANDLERS = (
("cipher_suites", _export_cipher_suites),
("supported_groups", _export_supported_groups),
("missing_recommended_groups", _export_missing_groups),
("certificates", _export_certificates),
("vulnerabilities", _export_vulnerabilities),
("protocol_features", _export_protocol_features),
("session_features", _export_session_features),
("http_headers", _export_http_headers),
("compliance", _export_compliance_status),
)
def generate_csv_reports(
db_path: str,
scan_id: int,
output_dir: str = ".",
) -> list[str]:
"""Generate granular CSV reports for scan.
Args:
db_path: Path to database file
scan_id: Scan ID
output_dir: Output directory for CSV files
Returns:
List of generated file paths
"""
data = get_scan_data(db_path, scan_id)
output_dir_path = Path(output_dir)
output_dir_path.mkdir(parents=True, exist_ok=True)
generated_files = []
generated_files.extend(
_export_summary(output_dir_path, data.get("summary", {}), db_path),
)
for port_data in data["ports_data"].values():
if not _has_tls_support(port_data):
continue
port = port_data["port"]
for data_key, handler_func in EXPORT_HANDLERS:
if port_data.get(data_key):
generated_files.extend(
handler_func(output_dir_path, port, port_data[data_key], db_path),
)
return generated_files

View File

@@ -0,0 +1,37 @@
"""Markdown report generation using shared template utilities."""
from .query import _generate_recommendations, get_scan_data
from .template_utils import (
build_template_context,
generate_report_id,
prepare_output_path,
render_template_to_file,
)
def generate_markdown_report(
db_path: str, scan_id: int, output_file: str | None = None,
) -> str:
"""Generate markdown report for scan.
Args:
db_path: Path to database file
scan_id: Scan ID
output_file: Optional output file path (auto-generated if None)
Returns:
Path to generated report file
"""
data = get_scan_data(db_path, scan_id)
metadata = data["metadata"]
report_id = generate_report_id(metadata)
context = build_template_context(data)
context["recommendations"] = _generate_recommendations(data)
default_filename = f"compliance_report_{report_id}.md"
output_path = prepare_output_path(output_file, ".", default_filename)
return render_template_to_file("report.md.j2", context, output_path)

View File

@@ -0,0 +1,534 @@
"""Report generation module for scan results."""
import sqlite3
from typing import Any
# Compliance thresholds
COMPLIANCE_WARNING_THRESHOLD = 50.0
def list_scans(db_path: str) -> list[dict[str, Any]]:
"""List all available scans in the database.
Args:
db_path: Path to database file
Returns:
List of scan dictionaries with metadata
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute(
"""
SELECT scan_id, timestamp, hostname, ports, scan_duration_seconds
FROM scans
ORDER BY scan_id DESC
""",
)
scans = []
for row in cursor.fetchall():
scans.append(
{
"scan_id": row[0],
"timestamp": row[1],
"hostname": row[2],
"ports": row[3],
"duration": row[4],
},
)
conn.close()
return scans
def get_scan_metadata(db_path: str, scan_id: int) -> dict[str, Any] | None:
"""Get metadata for a specific scan.
Args:
db_path: Path to database file
scan_id: Scan ID
Returns:
Dictionary with scan metadata or None if not found
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute(
"""
SELECT s.scan_id, s.timestamp, s.hostname, s.ports, s.scan_duration_seconds,
h.fqdn, h.ipv4, h.ipv6
FROM scans s
LEFT JOIN scanned_hosts h ON s.scan_id = h.scan_id
WHERE s.scan_id = ?
""",
(scan_id,),
)
row = cursor.fetchone()
conn.close()
if not row:
return None
return {
"scan_id": row[0],
"timestamp": row[1],
"hostname": row[2],
"ports": row[3].split(",") if row[3] else [],
"duration": row[4],
"fqdn": row[5] or row[2],
"ipv4": row[6],
"ipv6": row[7],
}
def get_scan_data(db_path: str, scan_id: int) -> dict[str, Any]:
"""Get all scan data for report generation.
Args:
db_path: Path to database file
scan_id: Scan ID
Returns:
Dictionary with all scan data
"""
metadata = get_scan_metadata(db_path, scan_id)
if not metadata:
raise ValueError(f"Scan ID {scan_id} not found")
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
data = {
"metadata": metadata,
"ports_data": {},
}
# Get data for each port
for port in metadata["ports"]:
port_num = int(port)
port_data = {
"port": port_num,
"status": "completed",
"tls_version": None,
"cipher_suites": {},
"supported_groups": [],
"certificates": [],
"vulnerabilities": [],
"protocol_features": [],
"session_features": [],
"http_headers": [],
"compliance": {},
}
# Cipher suites using view
cursor.execute(
"""
SELECT tls_version, cipher_suite_name, accepted, iana_value, key_size, is_anonymous,
iana_recommended_final, bsi_approved_final, bsi_valid_until_final, compliant
FROM v_cipher_suites_with_compliance
WHERE scan_id = ? AND port = ?
ORDER BY tls_version, accepted DESC, cipher_suite_name
""",
(scan_id, port_num),
)
rejected_counts = {}
for row in cursor.fetchall():
tls_version = row[0]
if tls_version not in port_data["cipher_suites"]:
port_data["cipher_suites"][tls_version] = {
"accepted": [],
"rejected": [],
}
rejected_counts[tls_version] = 0
suite = {
"name": row[1],
"accepted": row[2],
"iana_value": row[3],
"key_size": row[4],
"is_anonymous": row[5],
}
if row[2]: # accepted
suite["iana_recommended"] = row[6]
suite["bsi_approved"] = row[7]
suite["bsi_valid_until"] = row[8]
suite["compliant"] = row[9]
port_data["cipher_suites"][tls_version]["accepted"].append(suite)
else: # rejected
rejected_counts[tls_version] += 1
# Only include rejected if BSI-approved OR IANA-recommended
if row[7] or row[6] == "Y":
suite["iana_recommended"] = row[6]
suite["bsi_approved"] = row[7]
suite["bsi_valid_until"] = row[8]
suite["compliant"] = False
port_data["cipher_suites"][tls_version]["rejected"].append(suite)
# Store rejected counts
for tls_version in port_data["cipher_suites"]:
port_data["cipher_suites"][tls_version]["rejected_total"] = (
rejected_counts.get(tls_version, 0)
)
# Determine highest TLS version
if port_data["cipher_suites"]:
tls_versions = list(port_data["cipher_suites"].keys())
version_order = ["ssl_3.0", "1.0", "1.1", "1.2", "1.3"]
for version in reversed(version_order):
if version in tls_versions:
port_data["tls_version"] = version
break
# Supported groups using view
cursor.execute(
"""
SELECT group_name, iana_value, openssl_nid,
iana_recommended, bsi_approved, bsi_valid_until, compliant
FROM v_supported_groups_with_compliance
WHERE scan_id = ? AND port = ?
ORDER BY group_name
""",
(scan_id, port_num),
)
for row in cursor.fetchall():
port_data["supported_groups"].append(
{
"name": row[0],
"iana_value": row[1],
"openssl_nid": row[2],
"iana_recommended": row[3],
"bsi_approved": row[4],
"bsi_valid_until": row[5],
"compliant": row[6],
},
)
# Certificates using view
cursor.execute(
"""
SELECT position, subject, issuer, serial_number, not_before, not_after,
key_type, key_bits, signature_algorithm, fingerprint_sha256,
compliant, compliance_details
FROM v_certificates_with_compliance
WHERE scan_id = ? AND port = ?
ORDER BY position
""",
(scan_id, port_num),
)
for row in cursor.fetchall():
port_data["certificates"].append(
{
"position": row[0],
"subject": row[1],
"issuer": row[2],
"serial_number": row[3],
"not_before": row[4],
"not_after": row[5],
"key_type": row[6],
"key_bits": row[7],
"signature_algorithm": row[8],
"fingerprint_sha256": row[9],
"compliant": row[10] if row[10] is not None else None,
"compliance_details": row[11] if row[11] else None,
},
)
# Vulnerabilities
cursor.execute(
"""
SELECT vuln_type, vulnerable, details
FROM scan_vulnerabilities
WHERE scan_id = ? AND port = ?
ORDER BY vuln_type
""",
(scan_id, port_num),
)
for row in cursor.fetchall():
port_data["vulnerabilities"].append(
{
"type": row[0],
"vulnerable": row[1],
"details": row[2],
},
)
# Protocol features
cursor.execute(
"""
SELECT feature_type, supported, details
FROM scan_protocol_features
WHERE scan_id = ? AND port = ?
ORDER BY feature_type
""",
(scan_id, port_num),
)
for row in cursor.fetchall():
port_data["protocol_features"].append(
{
"name": row[0],
"supported": row[1],
"details": row[2],
},
)
# Session features
cursor.execute(
"""
SELECT feature_type, client_initiated, secure, session_id_supported,
ticket_supported, attempted_resumptions, successful_resumptions, details
FROM scan_session_features
WHERE scan_id = ? AND port = ?
ORDER BY feature_type
""",
(scan_id, port_num),
)
for row in cursor.fetchall():
port_data["session_features"].append(
{
"type": row[0],
"client_initiated": row[1],
"secure": row[2],
"session_id_supported": row[3],
"ticket_supported": row[4],
"attempted_resumptions": row[5],
"successful_resumptions": row[6],
"details": row[7],
},
)
# HTTP headers
cursor.execute(
"""
SELECT header_name, header_value, is_present
FROM scan_http_headers
WHERE scan_id = ? AND port = ?
ORDER BY header_name
""",
(scan_id, port_num),
)
for row in cursor.fetchall():
port_data["http_headers"].append(
{
"name": row[0],
"value": row[1],
"is_present": row[2],
},
)
# Compliance summary using view
cursor.execute(
"""
SELECT check_type, total, passed, percentage
FROM v_port_compliance_summary
WHERE scan_id = ? AND port = ?
""",
(scan_id, port_num),
)
for row in cursor.fetchall():
check_type = row[0]
total = row[1]
passed = row[2]
percentage = row[3]
if check_type == "cipher_suite":
port_data["compliance"]["cipher_suites_checked"] = total
port_data["compliance"]["cipher_suites_passed"] = passed
port_data["compliance"]["cipher_suite_percentage"] = f"{percentage:.1f}"
elif check_type == "supported_group":
port_data["compliance"]["groups_checked"] = total
port_data["compliance"]["groups_passed"] = passed
port_data["compliance"]["group_percentage"] = f"{percentage:.1f}"
# Get missing recommended groups for this port
port_data["missing_recommended_groups"] = _get_missing_recommended_groups(
cursor,
scan_id,
port_num,
)
data["ports_data"][port_num] = port_data
conn.close()
# Calculate overall summary
data["summary"] = _calculate_summary(data)
return data
def _get_missing_recommended_groups(
cursor: sqlite3.Cursor,
scan_id: int,
port: int,
) -> dict[str, list[dict[str, Any]]]:
"""Get recommended groups that are not offered by the server using views.
Args:
cursor: Database cursor
scan_id: Scan ID
port: Port number
Returns:
Dictionary with 'bsi_approved' and 'iana_recommended' lists
"""
missing = {"bsi_approved": [], "iana_recommended": []}
# Get missing BSI-approved groups using view
cursor.execute(
"""
SELECT group_name, tls_version, valid_until
FROM v_missing_bsi_groups
WHERE scan_id = ?
ORDER BY group_name, tls_version
""",
(scan_id,),
)
bsi_groups = {}
for row in cursor.fetchall():
group_name = row[0]
tls_version = row[1]
valid_until = row[2]
if group_name not in bsi_groups:
bsi_groups[group_name] = {
"name": group_name,
"tls_versions": [],
"valid_until": valid_until,
}
bsi_groups[group_name]["tls_versions"].append(tls_version)
missing["bsi_approved"] = list(bsi_groups.values())
# Get missing IANA-recommended groups using view
cursor.execute(
"""
SELECT group_name, iana_value
FROM v_missing_iana_groups
WHERE scan_id = ?
ORDER BY CAST(iana_value AS INTEGER)
""",
(scan_id,),
)
for row in cursor.fetchall():
missing["iana_recommended"].append(
{
"name": row[0],
"iana_value": row[1],
},
)
return missing
def _calculate_summary(data: dict[str, Any]) -> dict[str, Any]:
"""Calculate overall summary statistics."""
total_cipher_suites = 0
compliant_cipher_suites = 0
total_groups = 0
compliant_groups = 0
critical_vulnerabilities = 0
ports_with_tls = 0
ports_without_tls = 0
for port_data in data["ports_data"].values():
# Check if port has TLS support
has_tls = (
port_data.get("cipher_suites")
or port_data.get("supported_groups")
or port_data.get("certificates")
or port_data.get("tls_version")
)
if has_tls:
ports_with_tls += 1
compliance = port_data.get("compliance", {})
total_cipher_suites += compliance.get("cipher_suites_checked", 0)
compliant_cipher_suites += compliance.get("cipher_suites_passed", 0)
total_groups += compliance.get("groups_checked", 0)
compliant_groups += compliance.get("groups_passed", 0)
for vuln in port_data.get("vulnerabilities", []):
if vuln.get("vulnerable"):
critical_vulnerabilities += 1
else:
ports_without_tls += 1
cipher_suite_percentage = (
(compliant_cipher_suites / total_cipher_suites * 100)
if total_cipher_suites > 0
else 0
)
group_percentage = (compliant_groups / total_groups * 100) if total_groups > 0 else 0
return {
"total_ports": len(data["ports_data"]),
"successful_ports": ports_with_tls,
"ports_without_tls": ports_without_tls,
"total_cipher_suites": total_cipher_suites,
"compliant_cipher_suites": compliant_cipher_suites,
"cipher_suite_percentage": f"{cipher_suite_percentage:.1f}",
"total_groups": total_groups,
"compliant_groups": compliant_groups,
"group_percentage": f"{group_percentage:.1f}",
"critical_vulnerabilities": critical_vulnerabilities,
}
def _generate_recommendations(data: dict[str, Any]) -> list[dict[str, str]]:
"""Generate recommendations based on scan results."""
recommendations = []
# Check for vulnerabilities
for port_data in data["ports_data"].values():
for vuln in port_data.get("vulnerabilities", []):
if vuln.get("vulnerable"):
recommendations.append(
{
"severity": "CRITICAL",
"message": f"Port {port_data['port']}: {vuln['type']} vulnerability found. Immediate update required.",
},
)
# Check for low compliance
summary = data.get("summary", {})
cipher_percentage = float(summary.get("cipher_suite_percentage", 0))
if cipher_percentage < COMPLIANCE_WARNING_THRESHOLD:
recommendations.append(
{
"severity": "WARNING",
"message": f"Only {cipher_percentage:.1f}% of cipher suites are compliant. Disable insecure cipher suites.",
},
)
# Check for deprecated TLS versions
for port_data in data["ports_data"].values():
for tls_version in port_data.get("cipher_suites", {}).keys():
if tls_version in ["ssl_3.0", "1.0", "1.1"]:
if port_data["cipher_suites"][tls_version]["accepted"]:
recommendations.append(
{
"severity": "WARNING",
"message": f"Port {port_data['port']}: Deprecated TLS version {tls_version} is supported. Disable TLS 1.0 and 1.1.",
},
)
return recommendations

View File

@@ -0,0 +1,39 @@
"""reStructuredText report generation with CSV includes using shared utilities."""
from .csv_export import generate_csv_reports
from .query import get_scan_data
from .template_utils import (
build_template_context,
prepare_output_path,
render_template_to_file,
)
def generate_rest_report(
db_path: str, scan_id: int, output_file: str | None = None, output_dir: str = ".",
) -> str:
"""Generate reStructuredText report with CSV includes.
Args:
db_path: Path to database file
scan_id: Scan ID
output_file: Output file path (optional)
output_dir: Output directory for report and CSV files
Returns:
Path to generated report file
"""
data = get_scan_data(db_path, scan_id)
# Generate CSV files first
generate_csv_reports(db_path, scan_id, output_dir)
# Build template context
context = build_template_context(data)
# Prepare output path - always use fixed filename
default_filename = "compliance_report.rst"
output_path = prepare_output_path(output_file, output_dir, default_filename)
return render_template_to_file("report.reST.j2", context, output_path)

View File

@@ -0,0 +1,168 @@
"""Shared utilities for report template rendering."""
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from jinja2 import Environment, FileSystemLoader, select_autoescape
def format_tls_version(version: str) -> str:
"""Format TLS version string for display.
Args:
version: TLS version identifier (e.g., "1.2", "ssl_3.0")
Returns:
Formatted version string (e.g., "TLS 1.2", "SSL 3.0")
"""
version_map = {
"ssl_3.0": "SSL 3.0",
"1.0": "TLS 1.0",
"1.1": "TLS 1.1",
"1.2": "TLS 1.2",
"1.3": "TLS 1.3",
}
return version_map.get(version, version)
def create_jinja_env() -> Environment:
"""Create Jinja2 environment with standard configuration.
Returns:
Configured Jinja2 Environment with custom filters
"""
template_dir = Path(__file__).parent.parent / "templates"
env = Environment(
loader=FileSystemLoader(str(template_dir)),
autoescape=select_autoescape(["html", "xml"]),
trim_blocks=True,
lstrip_blocks=True,
)
env.filters["format_tls_version"] = format_tls_version
return env
def generate_report_id(metadata: dict[str, Any]) -> str:
"""Generate report ID from scan metadata.
Args:
metadata: Scan metadata dictionary containing timestamp
Returns:
Report ID in format YYYYMMDD_<scanid>
"""
try:
dt = datetime.fromisoformat(metadata["timestamp"])
date_str = dt.strftime("%Y%m%d")
except (ValueError, KeyError):
date_str = datetime.now(timezone.utc).strftime("%Y%m%d")
return f"{date_str}_{metadata['scan_id']}"
def build_template_context(data: dict[str, Any]) -> dict[str, Any]:
"""Build template context from scan data.
Args:
data: Scan data dictionary from get_scan_data()
Returns:
Dictionary with template context variables
"""
metadata = data["metadata"]
duration = metadata.get("duration")
if duration is not None:
duration_str = (
f"{duration:.2f}" if isinstance(duration, (int, float)) else str(duration)
)
else:
duration_str = "N/A"
# Format timestamp to minute precision (DD.MM.YYYY HH:MM)
timestamp_str = metadata["timestamp"]
try:
dt = datetime.fromisoformat(timestamp_str)
timestamp_str = dt.strftime("%d.%m.%Y %H:%M")
except (ValueError, KeyError):
pass
# Filter ports with TLS support for port sections
ports_with_tls = []
for port_data in data["ports_data"].values():
has_tls = (
port_data.get("cipher_suites")
or port_data.get("supported_groups")
or port_data.get("certificates")
or port_data.get("tls_version")
)
if has_tls:
ports_with_tls.append(port_data)
return {
"scan_id": metadata["scan_id"],
"hostname": metadata["hostname"],
"fqdn": metadata["fqdn"],
"ipv4": metadata["ipv4"],
"ipv6": metadata["ipv6"],
"timestamp": timestamp_str,
"duration": duration_str,
"ports": ", ".join(metadata["ports"]),
"ports_without_tls": data.get("summary", {}).get("ports_without_tls", 0),
"summary": data.get("summary", {}),
"ports_data": sorted(ports_with_tls, key=lambda x: x["port"]),
}
def prepare_output_path(
output_file: str | None,
output_dir: str,
default_filename: str,
) -> Path:
"""Prepare output file path and ensure parent directory exists.
Args:
output_file: Explicit output file path (optional)
output_dir: Output directory for auto-generated files
default_filename: Default filename if output_file is None
Returns:
Path object for output file
"""
if output_file:
output_path = Path(output_file)
else:
output_path = Path(output_dir) / default_filename
output_path.parent.mkdir(parents=True, exist_ok=True)
return output_path
def render_template_to_file(
template_name: str,
context: dict[str, Any],
output_path: Path,
) -> str:
"""Render Jinja2 template and write to file.
Args:
template_name: Name of template file
context: Template context variables
output_path: Output file path
Returns:
String path of generated file
"""
env = create_jinja_env()
template = env.get_template(template_name)
content = template.render(**context)
output_path.write_text(content, encoding="utf-8")
return str(output_path)