feature: IANA update

This commit is contained in:
Heiko
2025-12-19 20:10:39 +01:00
parent f038d6a3fc
commit 753c582010
27 changed files with 1923 additions and 419 deletions

View File

@@ -1,95 +1,26 @@
"""CSV report generation with granular file structure for reST integration."""
import csv
import json
import sqlite3
from pathlib import Path
from typing import Any
from .query import get_scan_data
def _get_headers(db_path: str, export_type: str) -> list[str]:
"""Get CSV headers from database.
Args:
db_path: Path to database file
export_type: Type of export (e.g. 'cipher_suites_accepted')
Returns:
List of column headers
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute(
"SELECT headers FROM csv_export_metadata WHERE export_type = ?",
(export_type,),
)
row = cursor.fetchone()
conn.close()
if row:
return json.loads(row[0])
raise ValueError(f"No headers found for export_type: {export_type}")
def _format_bool(
value: bool | None,
true_val: str = "Yes",
false_val: str = "No",
none_val: str = "-",
) -> str:
"""Format boolean value to string representation.
Args:
value: Boolean value to format
true_val: String representation for True
false_val: String representation for False
none_val: String representation for None
Returns:
Formatted string
"""
if value is True:
return true_val
if value is False:
return false_val
return none_val
def _write_csv(filepath: Path, headers: list[str], rows: list[list[Any]]) -> None:
"""Write data to CSV file.
Args:
filepath: Path to CSV file
headers: List of column headers
rows: List of data rows
"""
with filepath.open("w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(headers)
writer.writerows(rows)
from .csv_utils import CSVExporter, format_bool
from .query import get_scan_data, has_tls_support
def _export_summary(
output_dir: Path,
exporter: CSVExporter,
summary: dict[str, Any],
db_path: str,
) -> list[str]:
"""Export summary statistics to CSV.
Args:
output_dir: Output directory path
exporter: CSVExporter instance
summary: Summary data dictionary
Returns:
List of generated file paths
"""
summary_file = output_dir / "summary.csv"
rows = [
["Scanned Ports", summary.get("total_ports", 0)],
["Ports with TLS Support", summary.get("successful_ports", 0)],
@@ -114,21 +45,19 @@ def _export_summary(
summary.get("critical_vulnerabilities", 0),
],
]
headers = _get_headers(db_path, "summary")
_write_csv(summary_file, headers, rows)
return [str(summary_file)]
filepath = exporter.write_csv("summary.csv", "summary", rows)
return [filepath]
def _export_cipher_suites(
output_dir: Path,
exporter: CSVExporter,
port: int,
cipher_suites: dict[str, dict[str, list]],
db_path: str,
) -> list[str]:
"""Export cipher suites to CSV files.
Args:
output_dir: Output directory path
exporter: CSVExporter instance
port: Port number
cipher_suites: Cipher suites data per TLS version
@@ -140,49 +69,46 @@ def _export_cipher_suites(
for tls_version, suites in cipher_suites.items():
if suites.get("accepted"):
filepath = output_dir / f"{port}_cipher_suites_{tls_version}_accepted.csv"
rows = [
[
suite["name"],
suite.get("iana_recommended", "-"),
_format_bool(suite.get("bsi_approved")),
format_bool(suite.get("bsi_approved")),
suite.get("bsi_valid_until", "-"),
_format_bool(suite.get("compliant")),
format_bool(suite.get("compliant")),
]
for suite in suites["accepted"]
]
headers = _get_headers(db_path, "cipher_suites_accepted")
_write_csv(filepath, headers, rows)
generated.append(str(filepath))
filename = f"{port}_cipher_suites_{tls_version}_accepted.csv"
filepath = exporter.write_csv(filename, "cipher_suites_accepted", rows)
generated.append(filepath)
if suites.get("rejected"):
filepath = output_dir / f"{port}_cipher_suites_{tls_version}_rejected.csv"
rows = [
[
suite["name"],
suite.get("iana_recommended", "-"),
_format_bool(suite.get("bsi_approved")),
format_bool(suite.get("bsi_approved")),
suite.get("bsi_valid_until", "-"),
]
for suite in suites["rejected"]
]
headers = _get_headers(db_path, "cipher_suites_rejected")
_write_csv(filepath, headers, rows)
generated.append(str(filepath))
filename = f"{port}_cipher_suites_{tls_version}_rejected.csv"
filepath = exporter.write_csv(filename, "cipher_suites_rejected", rows)
generated.append(filepath)
return generated
def _export_supported_groups(
output_dir: Path,
exporter: CSVExporter,
port: int,
groups: list[dict[str, Any]],
db_path: str,
) -> list[str]:
"""Export supported groups to CSV.
Args:
output_dir: Output directory path
exporter: CSVExporter instance
port: Port number
groups: List of supported groups
@@ -190,32 +116,30 @@ def _export_supported_groups(
List of generated file paths
"""
filepath = output_dir / f"{port}_supported_groups.csv"
rows = [
[
group["name"],
group.get("iana_recommended", "-"),
_format_bool(group.get("bsi_approved")),
format_bool(group.get("bsi_approved")),
group.get("bsi_valid_until", "-"),
_format_bool(group.get("compliant")),
format_bool(group.get("compliant")),
]
for group in groups
]
headers = _get_headers(db_path, "supported_groups")
_write_csv(filepath, headers, rows)
return [str(filepath)]
filename = f"{port}_supported_groups.csv"
filepath = exporter.write_csv(filename, "supported_groups", rows)
return [filepath]
def _export_missing_groups(
output_dir: Path,
exporter: CSVExporter,
port: int,
missing: dict[str, list[dict[str, Any]]],
db_path: str,
) -> list[str]:
"""Export missing recommended groups to CSV.
Args:
output_dir: Output directory path
exporter: CSVExporter instance
port: Port number
missing: Dictionary with bsi_approved and iana_recommended groups
@@ -226,7 +150,6 @@ def _export_missing_groups(
generated = []
if missing.get("bsi_approved"):
filepath = output_dir / f"{port}_missing_groups_bsi.csv"
rows = [
[
group["name"],
@@ -235,33 +158,31 @@ def _export_missing_groups(
]
for group in missing["bsi_approved"]
]
headers = _get_headers(db_path, "missing_groups_bsi")
_write_csv(filepath, headers, rows)
generated.append(str(filepath))
filename = f"{port}_missing_groups_bsi.csv"
filepath = exporter.write_csv(filename, "missing_groups_bsi", rows)
generated.append(filepath)
if missing.get("iana_recommended"):
filepath = output_dir / f"{port}_missing_groups_iana.csv"
rows = [
[group["name"], group.get("iana_value", "-")]
for group in missing["iana_recommended"]
]
headers = _get_headers(db_path, "missing_groups_iana")
_write_csv(filepath, headers, rows)
generated.append(str(filepath))
filename = f"{port}_missing_groups_iana.csv"
filepath = exporter.write_csv(filename, "missing_groups_iana", rows)
generated.append(filepath)
return generated
def _export_certificates(
output_dir: Path,
exporter: CSVExporter,
port: int,
certificates: list[dict[str, Any]],
db_path: str,
) -> list[str]:
"""Export certificates to CSV.
Args:
output_dir: Output directory path
exporter: CSVExporter instance
port: Port number
certificates: List of certificate data
@@ -269,7 +190,6 @@ def _export_certificates(
List of generated file paths
"""
filepath = output_dir / f"{port}_certificates.csv"
rows = [
[
cert["position"],
@@ -279,25 +199,24 @@ def _export_certificates(
cert["not_after"],
cert["key_type"],
cert["key_bits"],
_format_bool(cert.get("compliant")),
format_bool(cert.get("compliant")),
]
for cert in certificates
]
headers = _get_headers(db_path, "certificates")
_write_csv(filepath, headers, rows)
return [str(filepath)]
filename = f"{port}_certificates.csv"
filepath = exporter.write_csv(filename, "certificates", rows)
return [filepath]
def _export_vulnerabilities(
output_dir: Path,
exporter: CSVExporter,
port: int,
vulnerabilities: list[dict[str, Any]],
db_path: str,
) -> list[str]:
"""Export vulnerabilities to CSV.
Args:
output_dir: Output directory path
exporter: CSVExporter instance
port: Port number
vulnerabilities: List of vulnerability data
@@ -305,30 +224,28 @@ def _export_vulnerabilities(
List of generated file paths
"""
filepath = output_dir / f"{port}_vulnerabilities.csv"
rows = [
[
vuln["type"],
_format_bool(vuln["vulnerable"]),
format_bool(vuln["vulnerable"]),
vuln.get("details", "-"),
]
for vuln in vulnerabilities
]
headers = _get_headers(db_path, "vulnerabilities")
_write_csv(filepath, headers, rows)
return [str(filepath)]
filename = f"{port}_vulnerabilities.csv"
filepath = exporter.write_csv(filename, "vulnerabilities", rows)
return [filepath]
def _export_protocol_features(
output_dir: Path,
exporter: CSVExporter,
port: int,
features: list[dict[str, Any]],
db_path: str,
) -> list[str]:
"""Export protocol features to CSV.
Args:
output_dir: Output directory path
exporter: CSVExporter instance
port: Port number
features: List of protocol feature data
@@ -336,30 +253,28 @@ def _export_protocol_features(
List of generated file paths
"""
filepath = output_dir / f"{port}_protocol_features.csv"
rows = [
[
feature["name"],
_format_bool(feature["supported"]),
format_bool(feature["supported"]),
feature.get("details", "-"),
]
for feature in features
]
headers = _get_headers(db_path, "protocol_features")
_write_csv(filepath, headers, rows)
return [str(filepath)]
filename = f"{port}_protocol_features.csv"
filepath = exporter.write_csv(filename, "protocol_features", rows)
return [filepath]
def _export_session_features(
output_dir: Path,
exporter: CSVExporter,
port: int,
features: list[dict[str, Any]],
db_path: str,
) -> list[str]:
"""Export session features to CSV.
Args:
output_dir: Output directory path
exporter: CSVExporter instance
port: Port number
features: List of session feature data
@@ -367,33 +282,31 @@ def _export_session_features(
List of generated file paths
"""
filepath = output_dir / f"{port}_session_features.csv"
rows = [
[
feature["type"],
_format_bool(feature.get("client_initiated")),
_format_bool(feature.get("secure")),
_format_bool(feature.get("session_id_supported")),
_format_bool(feature.get("ticket_supported")),
format_bool(feature.get("client_initiated")),
format_bool(feature.get("secure")),
format_bool(feature.get("session_id_supported")),
format_bool(feature.get("ticket_supported")),
feature.get("details", "-"),
]
for feature in features
]
headers = _get_headers(db_path, "session_features")
_write_csv(filepath, headers, rows)
return [str(filepath)]
filename = f"{port}_session_features.csv"
filepath = exporter.write_csv(filename, "session_features", rows)
return [filepath]
def _export_http_headers(
output_dir: Path,
exporter: CSVExporter,
port: int,
headers: list[dict[str, Any]],
db_path: str,
) -> list[str]:
"""Export HTTP headers to CSV.
Args:
output_dir: Output directory path
exporter: CSVExporter instance
port: Port number
headers: List of HTTP header data
@@ -401,30 +314,28 @@ def _export_http_headers(
List of generated file paths
"""
filepath = output_dir / f"{port}_http_headers.csv"
rows = [
[
header["name"],
_format_bool(header["is_present"]),
format_bool(header["is_present"]),
header.get("value", "-"),
]
for header in headers
]
csv_headers = _get_headers(db_path, "http_headers")
_write_csv(filepath, csv_headers, rows)
return [str(filepath)]
filename = f"{port}_http_headers.csv"
filepath = exporter.write_csv(filename, "http_headers", rows)
return [filepath]
def _export_compliance_status(
output_dir: Path,
exporter: CSVExporter,
port: int,
compliance: dict[str, Any],
db_path: str,
) -> list[str]:
"""Export compliance status to CSV.
Args:
output_dir: Output directory path
exporter: CSVExporter instance
port: Port number
compliance: Compliance data dictionary
@@ -432,7 +343,6 @@ def _export_compliance_status(
List of generated file paths
"""
filepath = output_dir / f"{port}_compliance_status.csv"
rows = []
if "cipher_suites_checked" in compliance:
@@ -456,32 +366,13 @@ def _export_compliance_status(
)
if rows:
headers = _get_headers(db_path, "compliance_status")
_write_csv(filepath, headers, rows)
return [str(filepath)]
filename = f"{port}_compliance_status.csv"
filepath = exporter.write_csv(filename, "compliance_status", rows)
return [filepath]
return []
def _has_tls_support(port_data: dict[str, Any]) -> bool:
"""Check if port has TLS support.
Args:
port_data: Port data dictionary
Returns:
True if port has TLS support
"""
return bool(
port_data.get("cipher_suites")
or port_data.get("supported_groups")
or port_data.get("certificates")
or port_data.get("tls_version"),
)
# Export handlers mapping: (data_key, handler_function)
EXPORT_HANDLERS = (
("cipher_suites", _export_cipher_suites),
("supported_groups", _export_supported_groups),
@@ -515,22 +406,19 @@ def generate_csv_reports(
output_dir_path = Path(output_dir)
output_dir_path.mkdir(parents=True, exist_ok=True)
exporter = CSVExporter(db_path, output_dir_path)
generated_files = []
generated_files.extend(
_export_summary(output_dir_path, data.get("summary", {}), db_path),
)
generated_files.extend(_export_summary(exporter, data.get("summary", {})))
for port_data in data["ports_data"].values():
if not _has_tls_support(port_data):
if not has_tls_support(port_data):
continue
port = port_data["port"]
for data_key, handler_func in EXPORT_HANDLERS:
if port_data.get(data_key):
generated_files.extend(
handler_func(output_dir_path, port, port_data[data_key], db_path),
)
generated_files.extend(handler_func(exporter, port, port_data[data_key]))
return generated_files

View File

@@ -0,0 +1,102 @@
"""Utilities for CSV export with header caching and path management."""
import csv
import json
import sqlite3
from pathlib import Path
from typing import Any
class CSVExporter:
"""CSV export helper with header caching and path management."""
def __init__(self, db_path: str, output_dir: Path):
"""Initialize CSV exporter.
Args:
db_path: Path to database file
output_dir: Output directory for CSV files
"""
self.db_path = db_path
self.output_dir = output_dir
self._headers_cache: dict[str, list[str]] = {}
def get_headers(self, export_type: str) -> list[str]:
"""Get CSV headers from database with caching.
Args:
export_type: Type of export (e.g. 'cipher_suites_accepted')
Returns:
List of column headers
"""
if export_type not in self._headers_cache:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
"SELECT headers FROM csv_export_metadata WHERE export_type = ?",
(export_type,),
)
row = cursor.fetchone()
conn.close()
if row:
self._headers_cache[export_type] = json.loads(row[0])
else:
raise ValueError(f"No headers found for export_type: {export_type}")
return self._headers_cache[export_type]
def write_csv(
self,
filename: str,
export_type: str,
rows: list[list[Any]],
) -> str:
"""Write data to CSV file with headers from metadata.
Args:
filename: CSV filename
export_type: Type of export for header lookup
rows: List of data rows
Returns:
String path to created file
"""
filepath = self.output_dir / filename
headers = self.get_headers(export_type)
with filepath.open("w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(headers)
writer.writerows(rows)
return str(filepath)
def format_bool(
value: bool | None,
true_val: str = "Yes",
false_val: str = "No",
none_val: str = "-",
) -> str:
"""Format boolean value to string representation.
Args:
value: Boolean value to format
true_val: String representation for True
false_val: String representation for False
none_val: String representation for None
Returns:
Formatted string
"""
if value is True:
return true_val
if value is False:
return false_val
return none_val

View File

@@ -7,6 +7,24 @@ from typing import Any
COMPLIANCE_WARNING_THRESHOLD = 50.0
def has_tls_support(port_data: dict[str, Any]) -> bool:
"""Check if port has TLS support based on data presence.
Args:
port_data: Port data dictionary
Returns:
True if port has TLS support
"""
return bool(
port_data.get("cipher_suites")
or port_data.get("supported_groups")
or port_data.get("certificates")
or port_data.get("tls_version")
)
def list_scans(db_path: str) -> list[dict[str, Any]]:
"""List all available scans in the database.

View File

@@ -1,11 +1,13 @@
"""Shared utilities for report template rendering."""
from datetime import datetime, timezone
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from jinja2 import Environment, FileSystemLoader, select_autoescape
from .query import has_tls_support
def format_tls_version(version: str) -> str:
"""Format TLS version string for display.
@@ -59,7 +61,7 @@ def generate_report_id(metadata: dict[str, Any]) -> str:
dt = datetime.fromisoformat(metadata["timestamp"])
date_str = dt.strftime("%Y%m%d")
except (ValueError, KeyError):
date_str = datetime.now(timezone.utc).strftime("%Y%m%d")
date_str = datetime.now(UTC).strftime("%Y%m%d")
return f"{date_str}_{metadata['scan_id']}"
@@ -95,13 +97,7 @@ def build_template_context(data: dict[str, Any]) -> dict[str, Any]:
# Filter ports with TLS support for port sections
ports_with_tls = []
for port_data in data["ports_data"].values():
has_tls = (
port_data.get("cipher_suites")
or port_data.get("supported_groups")
or port_data.get("certificates")
or port_data.get("tls_version")
)
if has_tls:
if has_tls_support(port_data):
ports_with_tls.append(port_data)
return {