feature: iana update #1

Merged
heiko merged 3 commits from feature_iana_update into master 2025-12-25 11:38:04 +01:00
28 changed files with 1962 additions and 424 deletions

117
README.md
View File

@@ -1,63 +1,132 @@
# compliance-scan # compliance-scan
SSL/TLS configuration analysis with automated IANA/BSI compliance checking. SSL/TLS configuration analysis with automated BSI/IANA compliance checking.
## Quick Start ## Quick Start
```bash ```bash
# Scan # Install
poetry install
# Scan server
poetry run compliance-scan scan example.com:443,636 poetry run compliance-scan scan example.com:443,636
# Report # Generate report
poetry run compliance-scan report -t md -o report.md poetry run compliance-scan report -t md -o report.md
# Update IANA registry data
poetry run compliance-scan update-iana
``` ```
## Installation Note: SSLyze outputs INFO-level log messages during scanning that cannot be suppressed. These messages are harmless and can be ignored.
```bash
poetry install
```
## Features ## Features
- Multi-port TLS/SSL scanning - Multi-port TLS/SSL scanning with SSLyze
- BSI TR-02102-1/2 compliance validation - BSI TR-02102-1/2 compliance validation
- IANA recommendations checking - IANA recommendations checking
- Vulnerability detection (Heartbleed, ROBOT, CCS Injection) - Vulnerability detection (Heartbleed, ROBOT, CCS Injection)
- Certificate validation - Certificate validation with key size compliance
- Multiple report formats (CSV, Markdown, reStructuredText) - Multiple report formats (CSV, Markdown, reStructuredText)
- IANA registry updates from official sources
## Commands ## Commands
### Scan
```bash ```bash
# Scan with ports compliance-scan scan <hostname>:<port1>,<port2> [options]
compliance-scan scan <hostname>:<port1>,<port2> [--print] [-db <path>]
# Generate report # Examples
compliance-scan report [scan_id] -t <csv|md|rest> [-o <file>] compliance-scan scan example.com:443,636 --print
compliance-scan scan [2001:db8::1]:443 -db custom.db
```
# List scans Note: SSLyze outputs INFO-level log messages during scanning that cannot be suppressed.
Options:
- `--print` - Display scan summary in console
- `-db <path>` - Database file path (default: compliance_status.db)
### Report
```bash
compliance-scan report [scan_id] -t <type> [options]
# Examples
compliance-scan report -t md -o report.md
compliance-scan report 5 -t csv --output-dir ./reports
compliance-scan report --list compliance-scan report --list
``` ```
Options:
- `-t <type>` - Report type: csv, md, markdown, rest, rst
- `-o <file>` - Output file for Markdown/reStructuredText
- `--output-dir <dir>` - Output directory for CSV files
- `--list` - List all available scans
- `-db <path>` - Database file path
### Update IANA Data
```bash
compliance-scan update-iana [-db <path>]
# Example
compliance-scan update-iana -db compliance_status.db
```
Updates IANA registry data from official sources. Default database contains IANA data as of 12/2024.
## Report Formats
**CSV**: Granular files per port and category for data analysis.
**Markdown**: Single comprehensive report with all findings.
**reStructuredText**: Sphinx-compatible report with CSV table includes.
## Supported Protocols ## Supported Protocols
Opportunistic TLS: SMTP, LDAP, IMAP, POP3, FTP, XMPP, RDP, PostgreSQL **Opportunistic TLS**: SMTP, LDAP, IMAP, POP3, FTP, XMPP, RDP, PostgreSQL
Direct TLS: HTTPS, LDAPS, SMTPS, IMAPS, POP3S
**Direct TLS**: HTTPS, LDAPS, SMTPS, IMAPS, POP3S
## Compliance Standards
- BSI TR-02102-1: Certificate requirements
- BSI TR-02102-2: TLS cipher suites and parameters
- IANA TLS Parameters: Cipher suites, signature schemes, supported groups
## Documentation ## Documentation
**[Detailed Guide](docs/detailed-guide.md)** - Complete reference with CLI commands, database schema, compliance rules, and development guide. **[Detailed Guide](docs/detailed-guide.md)** - Complete reference with database schema, compliance rules, and development information.
## Requirements ## Requirements
- Python 3.13+ - Python 3.13+
- SSLyze 6.0.0+
- Poetry - Poetry
- SSLyze 6.0.0+
## Planned Features ## Database
- CLI command for updating IANA reference data Default location: `compliance_status.db`
- Automated IANA registry updates from web sources base on `src/sslysze_scan/scan_iana.py`
- TLS Parameters: https://www.iana.org/assignments/tls-parameters/tls-parameters.xml Template with reference data: `src/sslysze_scan/data/crypto_standards.db`
- IKEv2 Parameters: https://www.iana.org/assignments/ikev2-parameters/ikev2-parameters.xml
Schema version: 5 (includes optimized views for reporting)
## Development
```bash
# Run tests
poetry run pytest
# Update IANA reference data in template
python3 -m sslysze_scan.iana_parser
```
## Version Management
Version is maintained in `pyproject.toml` and read dynamically at runtime.

BIN
compliance_status.db Normal file

Binary file not shown.

View File

@@ -4,14 +4,17 @@ Complete reference for developers and advanced users.
## Core Entry Points ## Core Entry Points
| Component | Path | Purpose | | Component | Path | Purpose |
| --------------- | ------------------------------------ | ------------------------------------- | | --------------- | ------------------------------------------ | ------------------------------------- |
| CLI | `src/sslysze_scan/__main__.py` | Command-line interface entry | | CLI | `src/sslysze_scan/__main__.py` | Command-line interface entry |
| Scanner | `src/sslysze_scan/scanner.py` | SSLyze integration and scan execution | | Scanner | `src/sslysze_scan/scanner.py` | SSLyze integration and scan execution |
| Database Writer | `src/sslysze_scan/db/writer.py` | Scan result persistence | | Database Writer | `src/sslysze_scan/db/writer.py` | Scan result persistence |
| Reporter | `src/sslysze_scan/reporter/` | Report generation (CSV/MD/reST) | | Reporter | `src/sslysze_scan/reporter/` | Report generation (CSV/MD/reST) |
| Compliance | `src/sslysze_scan/db/compliance.py` | BSI/IANA validation logic | | Compliance | `src/sslysze_scan/db/compliance.py` | BSI/IANA validation logic |
| Query | `src/sslysze_scan/reporter/query.py` | Database queries using views | | Query | `src/sslysze_scan/reporter/query.py` | Database queries using views |
| IANA Update | `src/sslysze_scan/commands/update_iana.py` | IANA registry updates from web |
| IANA Validator | `src/sslysze_scan/iana_validator.py` | IANA data validation |
| IANA Parser | `src/sslysze_scan/iana_parser.py` | IANA XML parsing utilities |
## Installation ## Installation
@@ -43,6 +46,8 @@ poetry run compliance-scan report --list
compliance-scan scan <hostname>:<port1>,<port2> [options] compliance-scan scan <hostname>:<port1>,<port2> [options]
``` ```
Note: SSLyze outputs INFO-level log messages during scanning that cannot be suppressed. These messages are harmless and can be ignored.
| Argument | Required | Description | | Argument | Required | Description |
| -------------------- | -------- | ---------------------------------------------------------------- | | -------------------- | -------- | ---------------------------------------------------------------- |
| `<hostname>:<ports>` | Yes | Target with comma-separated ports. IPv6: `[2001:db8::1]:443,636` | | `<hostname>:<ports>` | Yes | Target with comma-separated ports. IPv6: `[2001:db8::1]:443,636` |
@@ -78,6 +83,39 @@ compliance-scan report 5 -t csv --output-dir ./reports
compliance-scan report -t rest --output-dir ./docs compliance-scan report -t rest --output-dir ./docs
``` ```
### Update IANA Command
```
compliance-scan update-iana [-db <path>]
```
| Argument | Required | Description |
| ------------ | -------- | ------------------------------------------------------- |
| `-db <path>` | No | Database file to update (default: compliance_status.db) |
Updates IANA registry data from official sources:
- TLS Parameters: https://www.iana.org/assignments/tls-parameters/tls-parameters.xml
- IKEv2 Parameters: https://www.iana.org/assignments/ikev2-parameters/ikev2-parameters.xml
Default database contains IANA data as of 12/2024.
Examples:
```bash
compliance-scan update-iana
compliance-scan update-iana -db custom.db
```
Update process:
1. Fetches XML from IANA URLs
2. Validates headers against database schema
3. Validates data integrity (value formats, minimum row counts)
4. Calculates diff (added/modified/deleted entries)
5. Updates database in transaction (rollback on error)
6. Logs all changes at INFO level
## Report Formats ## Report Formats
### CSV ### CSV
@@ -230,9 +268,12 @@ src/sslysze_scan/
├── scanner.py # SSLyze integration ├── scanner.py # SSLyze integration
├── protocol_loader.py # Port-protocol mapping ├── protocol_loader.py # Port-protocol mapping
├── output.py # Console output ├── output.py # Console output
├── iana_parser.py # IANA XML parsing utilities
├── iana_validator.py # IANA data validation
├── commands/ ├── commands/
│ ├── scan.py # Scan command handler │ ├── scan.py # Scan command handler
── report.py # Report command handler ── report.py # Report command handler
│ └── update_iana.py # IANA update command handler
├── db/ ├── db/
│ ├── schema.py # Schema version management │ ├── schema.py # Schema version management
│ ├── writer.py # Scan result storage │ ├── writer.py # Scan result storage
@@ -241,6 +282,7 @@ src/sslysze_scan/
├── reporter/ ├── reporter/
│ ├── query.py # Database queries (uses views) │ ├── query.py # Database queries (uses views)
│ ├── csv_export.py # CSV generation │ ├── csv_export.py # CSV generation
│ ├── csv_utils.py # CSV utilities (exporter class)
│ ├── markdown_export.py # Markdown generation │ ├── markdown_export.py # Markdown generation
│ ├── rst_export.py # reST generation │ ├── rst_export.py # reST generation
│ └── template_utils.py # Shared utilities │ └── template_utils.py # Shared utilities
@@ -249,7 +291,16 @@ src/sslysze_scan/
│ └── report.reST.j2 # reST template │ └── report.reST.j2 # reST template
└── data/ └── data/
├── crypto_standards.db # Template DB (IANA/BSI + schema) ├── crypto_standards.db # Template DB (IANA/BSI + schema)
├── iana_parse.json # IANA XML source URLs and registry config
└── protocols.csv # Port-protocol mapping └── protocols.csv # Port-protocol mapping
tests/
├── fixtures/
│ ├── iana_xml/ # Minimal XML test fixtures
│ └── test_scan.db # Test database
├── test_iana_validator.py # IANA validation tests (25 tests)
├── test_iana_parse.py # IANA XML parsing tests (20 tests)
└── test_iana_update.py # IANA update logic tests (13 tests)
``` ```
## Key Functions ## Key Functions
@@ -284,6 +335,7 @@ src/sslysze_scan/
| `get_scan_data(db_path, scan_id)` | `reporter/query.py` | Get complete scan data using views | | `get_scan_data(db_path, scan_id)` | `reporter/query.py` | Get complete scan data using views |
| `get_scan_metadata(db_path, scan_id)` | `reporter/query.py` | Get scan metadata only | | `get_scan_metadata(db_path, scan_id)` | `reporter/query.py` | Get scan metadata only |
| `list_scans(db_path)` | `reporter/query.py` | List all scans in database | | `list_scans(db_path)` | `reporter/query.py` | List all scans in database |
| `has_tls_support(port_data)` | `reporter/query.py` | Check if port has TLS support |
### Report Generation ### Report Generation
@@ -292,10 +344,101 @@ src/sslysze_scan/
| `generate_csv_reports(db_path, scan_id, output_dir)` | `reporter/csv_export.py` | Generate all CSV files | | `generate_csv_reports(db_path, scan_id, output_dir)` | `reporter/csv_export.py` | Generate all CSV files |
| `generate_markdown_report(db_path, scan_id, output)` | `reporter/markdown_export.py` | Generate Markdown report | | `generate_markdown_report(db_path, scan_id, output)` | `reporter/markdown_export.py` | Generate Markdown report |
| `generate_rest_report(db_path, scan_id, output, output_dir)` | `reporter/rst_export.py` | Generate reStructuredText report | | `generate_rest_report(db_path, scan_id, output, output_dir)` | `reporter/rst_export.py` | Generate reStructuredText report |
| `_get_headers(db_path, export_type)` | `reporter/csv_export.py` | Load CSV headers from database |
| `build_template_context(data)` | `reporter/template_utils.py` | Prepare Jinja2 template context | | `build_template_context(data)` | `reporter/template_utils.py` | Prepare Jinja2 template context |
| `generate_report_id(metadata)` | `reporter/template_utils.py` | Generate report ID (YYYYMMDD_scanid) | | `generate_report_id(metadata)` | `reporter/template_utils.py` | Generate report ID (YYYYMMDD_scanid) |
### IANA Update and Validation
| Function | Module | Purpose |
| ------------------------------------------------ | ------------------------- | ---------------------------------------- |
| `handle_update_iana_command(args)` | `commands/update_iana.py` | Main update command handler |
| `fetch_xml_from_url(url)` | `commands/update_iana.py` | Fetch XML from IANA URL |
| `calculate_diff(old_rows, new_rows)` | `commands/update_iana.py` | Calculate added/modified/deleted entries |
| `process_registry_with_validation(...)` | `commands/update_iana.py` | Process and validate single registry |
| `validate_headers(table_name, headers, db_conn)` | `iana_validator.py` | Validate headers match database schema |
| `validate_registry_data(table_name, rows)` | `iana_validator.py` | Validate complete registry data |
| `validate_cipher_suite_row(row)` | `iana_validator.py` | Validate single cipher suite record |
| `validate_supported_groups_row(row)` | `iana_validator.py` | Validate single supported group record |
| `normalize_header(header)` | `iana_validator.py` | Normalize header to DB column format |
| `get_min_rows(table_name)` | `iana_validator.py` | Get minimum expected rows for table |
| `extract_updated_date(xml_content)` | `iana_parser.py` | Extract date from XML `<updated>` tag |
| `parse_xml_with_namespace_support(xml_path)` | `iana_parser.py` | Parse XML with IANA namespace detection |
| `find_registry(root, registry_id, ns)` | `iana_parser.py` | Find registry element by ID |
| `extract_field_value(record, header, ns)` | `iana_parser.py` | Extract field value from XML record |
## IANA Data Update Process
Configuration file: `src/sslysze_scan/data/iana_parse.json`
Structure:
```json
{
"https://www.iana.org/assignments/tls-parameters/tls-parameters.xml": [
["registry_id", "output_filename.csv", ["Header1", "Header2", "..."]]
]
}
```
Validation rules:
1. Headers must match database schema (case-insensitive, `/``_`)
2. Minimum row counts per table (50 for cipher suites, 10 for groups, 5 for small tables)
3. Value format validation (0x prefix for hex values, numeric for groups)
4. Recommended field must be Y, N, or D
Error handling:
- Validation failure: Rollback transaction, display error with hint to open issue
- Network error: Abort with error message
- XML structure change: Validation catches and aborts
Logging output:
```
INFO: Fetching https://www.iana.org/assignments/tls-parameters/tls-parameters.xml
INFO: XML data date: 2025-12-03
INFO: iana_tls_cipher_suites: 448 rows (2 added, 1 modified, 0 deleted)
INFO: Successfully updated 11 registries (1310 total rows)
```
## Version Management
Version is maintained in `pyproject.toml` only:
```toml
[project]
version = "0.1.0"
```
Runtime access via `importlib.metadata`:
```python
from sslysze_scan import __version__
print(__version__) # "0.1.0"
```
## Development
Run tests:
```bash
poetry run pytest
poetry run pytest tests/test_iana_validator.py -v
```
Update IANA template database:
```bash
python3 -m sslysze_scan.iana_parser
```
Code style:
- PEP 8 compliant
- Max line length: 90 characters
- Ruff for linting and formatting
## SQL Query Examples ## SQL Query Examples
All queries use optimized views for performance. All queries use optimized views for performance.
@@ -382,11 +525,13 @@ poetry run pytest tests/ -v
- `tests/conftest.py`: Fixtures with test_db, test_db_path - `tests/conftest.py`: Fixtures with test_db, test_db_path
- `tests/fixtures/test_scan.db`: Real scan data (Scan 1: dc.validation.lan:443,636) - `tests/fixtures/test_scan.db`: Real scan data (Scan 1: dc.validation.lan:443,636)
- `tests/test_csv_export.py`: 11 CSV export tests - `tests/test_csv_export.py`: 11 CSV export tests
- `tests/test_template_utils.py`: 3 template utility tests - `tests/test_template_utils.py`: 2 template utility tests
- `tests/test_compliance.py`: 2 compliance tests
- `tests/test_cli.py`: 3 CLI parsing tests - `tests/test_cli.py`: 3 CLI parsing tests
- `tests/test_iana_validator.py`: 20 IANA validation tests
- `tests/test_iana_parse.py`: 14 IANA parsing tests
- `tests/test_iana_update.py`: 14 IANA update tests
**Total:** 19 tests **Total:** 64 tests
**Test database setup:** **Test database setup:**

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "compliance-scan" name = "compliance-scan"
version = "0.1.0" version = "1.0.1"
description = "" description = ""
authors = [ authors = [
{name = "Heiko Haase",email = "heiko.haase.extern@univention.de"} {name = "Heiko Haase",email = "heiko.haase.extern@univention.de"}

View File

@@ -5,7 +5,13 @@ import logging
from .__main__ import main from .__main__ import main
from .scanner import perform_scan from .scanner import perform_scan
__version__ = "0.1.0" try:
from importlib.metadata import version
__version__ = version("compliance-scan")
except Exception:
__version__ = "unknown"
__all__ = ["main", "perform_scan"] __all__ = ["main", "perform_scan"]
# Configure logging # Configure logging

View File

@@ -4,7 +4,11 @@
import sys import sys
from .cli import parse_arguments from .cli import parse_arguments
from .commands import handle_report_command, handle_scan_command from .commands import (
handle_report_command,
handle_scan_command,
handle_update_iana_command,
)
from .output import print_error from .output import print_error
@@ -21,6 +25,8 @@ def main() -> int:
return handle_scan_command(args) return handle_scan_command(args)
if args.command == "report": if args.command == "report":
return handle_report_command(args) return handle_report_command(args)
if args.command == "update-iana":
return handle_update_iana_command(args)
print_error(f"Unknown command: {args.command}") print_error(f"Unknown command: {args.command}")
return 1 return 1

View File

@@ -164,6 +164,10 @@ Examples:
compliance-scan scan example.com:443 --print compliance-scan scan example.com:443 --print
compliance-scan scan example.com:443,636 -db /path/to/scans.db compliance-scan scan example.com:443,636 -db /path/to/scans.db
compliance-scan scan [2001:db8::1]:443,636 --print compliance-scan scan [2001:db8::1]:443,636 --print
Note:
SSLyze outputs INFO-level log messages during scanning that cannot be suppressed.
These messages are harmless and can be ignored.
""", """,
) )
@@ -247,6 +251,30 @@ Examples:
default=False, default=False,
) )
# Update-iana subcommand
update_parser = subparsers.add_parser(
"update-iana",
help="Update IANA registry data from official online sources",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
compliance-scan update-iana
compliance-scan update-iana -db /path/to/scans.db
Note:
Default database contains IANA data as of 12/2024.
This command fetches current data from IANA and updates the database.
""",
)
update_parser.add_argument(
"-db",
"--database",
type=str,
help="Database file to update (default: compliance_status.db)",
default="compliance_status.db",
)
args = parser.parse_args() args = parser.parse_args()
# Check if no command was provided # Check if no command was provided

View File

@@ -2,5 +2,10 @@
from .report import handle_report_command from .report import handle_report_command
from .scan import handle_scan_command from .scan import handle_scan_command
from .update_iana import handle_update_iana_command
__all__ = ["handle_report_command", "handle_scan_command"] __all__ = [
"handle_report_command",
"handle_scan_command",
"handle_update_iana_command",
]

View File

@@ -1,7 +1,8 @@
"""Scan command handler.""" """Scan command handler."""
import argparse import argparse
from datetime import datetime, timezone import sqlite3
from datetime import UTC, datetime
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@@ -58,7 +59,7 @@ def handle_scan_command(args: argparse.Namespace) -> int:
return 1 return 1
# Single timestamp for all scans (program start time) # Single timestamp for all scans (program start time)
program_start_time = datetime.now(timezone.utc) program_start_time = datetime.now(UTC)
# Scan results storage # Scan results storage
scan_results_dict: dict[int, Any] = {} scan_results_dict: dict[int, Any] = {}
@@ -76,7 +77,7 @@ def handle_scan_command(args: argparse.Namespace) -> int:
continue continue
# Calculate total scan duration # Calculate total scan duration
scan_end_time = datetime.now(timezone.utc) scan_end_time = datetime.now(UTC)
total_scan_duration = (scan_end_time - program_start_time).total_seconds() total_scan_duration = (scan_end_time - program_start_time).total_seconds()
# Save all results to database with single scan_id # Save all results to database with single scan_id
@@ -105,8 +106,6 @@ def handle_scan_command(args: argparse.Namespace) -> int:
# Print summary if requested # Print summary if requested
if args.print: if args.print:
import sqlite3
print("\n" + "=" * 70) print("\n" + "=" * 70)
print("SCAN SUMMARY") print("SCAN SUMMARY")
print("=" * 70) print("=" * 70)

View File

@@ -0,0 +1,248 @@
"""Update IANA command handler."""
import argparse
import json
import logging
import sqlite3
from pathlib import Path
from urllib.error import URLError
from urllib.request import urlopen
from ..iana_parser import (
extract_updated_date,
find_registry,
get_table_name_from_filename,
parse_xml_with_namespace_support,
)
from ..iana_validator import (
ValidationError,
normalize_header,
validate_headers,
validate_registry_data,
)
from ..output import print_error
logger = logging.getLogger(__name__)
def fetch_xml_from_url(url: str, timeout: int = 30) -> str:
"""Fetch XML content from URL.
Args:
url: URL to fetch
timeout: Timeout in seconds
Returns:
XML content as string
Raises:
URLError: If URL cannot be fetched
"""
logger.info(f"Fetching {url}")
with urlopen(url, timeout=timeout) as response:
return response.read().decode("utf-8")
def calculate_diff(
old_rows: list[tuple],
new_rows: list[tuple],
pk_index: int = 0,
) -> dict[str, list]:
"""Calculate diff between old and new data.
Args:
old_rows: Existing rows from DB
new_rows: New rows from XML
pk_index: Index of primary key column
Returns:
Dict with 'added', 'deleted', 'modified' lists of primary keys
"""
old_dict = {row[pk_index]: row for row in old_rows}
new_dict = {row[pk_index]: row for row in new_rows}
added = [k for k in new_dict if k not in old_dict]
deleted = [k for k in old_dict if k not in new_dict]
modified = [k for k in new_dict if k in old_dict and old_dict[k] != new_dict[k]]
return {"added": added, "deleted": deleted, "modified": modified}
def process_registry_with_validation(
xml_content: str,
registry_id: str,
table_name: str,
headers: list[str],
db_conn: sqlite3.Connection,
skip_min_rows_check: bool = False,
) -> tuple[int, dict[str, list]]:
"""Process registry with validation and diff calculation.
Args:
xml_content: XML content as string
registry_id: Registry ID to extract
table_name: Database table name
headers: List of column headers
db_conn: Database connection
skip_min_rows_check: Skip minimum rows validation (for tests)
Returns:
Tuple of (row_count, diff_dict)
Raises:
ValidationError: If validation fails
ValueError: If registry not found
"""
import tempfile
with tempfile.NamedTemporaryFile(
mode="w", suffix=".xml", delete=False, encoding="utf-8"
) as tmp_file:
tmp_file.write(xml_content)
tmp_path = tmp_file.name
try:
root, ns = parse_xml_with_namespace_support(tmp_path)
finally:
Path(tmp_path).unlink()
validate_headers(table_name, headers, db_conn)
registry = find_registry(root, registry_id, ns)
if ns:
records = registry.findall("iana:record", ns)
else:
records = registry.findall("record")
from ..iana_parser import extract_field_value, is_unassigned
rows_dict = []
for record in records:
if is_unassigned(record, ns):
continue
row_dict = {}
for header in headers:
normalized_key = normalize_header(header)
row_dict[normalized_key] = extract_field_value(record, header, ns)
rows_dict.append(row_dict)
validate_registry_data(table_name, rows_dict, skip_min_rows_check)
rows = [tuple(row.values()) for row in rows_dict]
cursor = db_conn.cursor()
old_rows = cursor.execute(f"SELECT * FROM {table_name}").fetchall()
diff = calculate_diff(old_rows, rows)
placeholders = ",".join(["?"] * len(headers))
cursor.execute(f"DELETE FROM {table_name}")
cursor.executemany(f"INSERT INTO {table_name} VALUES ({placeholders})", rows)
return len(rows), diff
def handle_update_iana_command(args: argparse.Namespace) -> int:
"""Handle the update-iana subcommand.
Args:
args: Parsed arguments
Returns:
Exit code (0 for success, 1 for error)
"""
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
db_path = args.database
if not Path(db_path).exists():
print_error(f"Database not found: {db_path}")
return 1
script_dir = Path(__file__).parent.parent
config_path = script_dir / "data" / "iana_parse.json"
logger.info(f"Loading configuration from {config_path}")
try:
with config_path.open(encoding="utf-8") as f:
config = json.load(f)
except (FileNotFoundError, json.JSONDecodeError, OSError) as e:
print_error(f"Error loading configuration: {e}")
return 1
try:
conn = sqlite3.connect(str(db_path))
except sqlite3.Error as e:
print_error(f"Error opening database: {e}")
return 1
logger.info("Starting IANA registry update")
try:
conn.execute("BEGIN TRANSACTION")
total_registries = 0
total_rows = 0
for url, registries in config.items():
try:
xml_content = fetch_xml_from_url(url)
except (URLError, OSError) as e:
print_error(f"Failed to fetch {url}: {e}")
conn.rollback()
conn.close()
return 1
xml_date = extract_updated_date(xml_content)
logger.info(f"XML data date: {xml_date}")
for registry_id, output_filename, headers in registries:
table_name = get_table_name_from_filename(output_filename)
try:
row_count, diff = process_registry_with_validation(
xml_content, registry_id, table_name, headers, conn
)
logger.info(
f"{table_name}: {row_count} rows "
f"({len(diff['added'])} added, "
f"{len(diff['modified'])} modified, "
f"{len(diff['deleted'])} deleted)"
)
total_registries += 1
total_rows += row_count
except (ValidationError, ValueError) as e:
print_error(
f"Validation failed for {table_name}: {e}\n"
f"IANA data structure may have changed. "
f"Please open an issue at the project repository."
)
conn.rollback()
conn.close()
return 1
conn.commit()
logger.info(
f"Successfully updated {total_registries} registries "
f"({total_rows} total rows)"
)
except sqlite3.Error as e:
print_error(f"Database error: {e}")
conn.rollback()
conn.close()
return 1
finally:
conn.close()
return 0

View File

@@ -1,5 +1,5 @@
{ {
"proto/assignments/tls-parameters/tls-parameters.xml": [ "https://www.iana.org/assignments/tls-parameters/tls-parameters.xml": [
[ [
"tls-parameters-4", "tls-parameters-4",
"tls_cipher_suites.csv", "tls_cipher_suites.csv",
@@ -24,14 +24,9 @@
"tls-parameters-5", "tls-parameters-5",
"tls_content_types.csv", "tls_content_types.csv",
["Value", "Description", "DTLS", "Recommended", "RFC/Draft"] ["Value", "Description", "DTLS", "Recommended", "RFC/Draft"]
],
[
"tls-parameters-7",
"tls_content_types.csv",
["Value", "Description", "DTLS", "Recommended", "RFC/Draft"]
] ]
], ],
"proto/assignments/ikev2-parameters/ikev2-parameters.xml": [ "https://www.iana.org/assignments/ikev2-parameters/ikev2-parameters.xml": [
[ [
"ikev2-parameters-5", "ikev2-parameters-5",
"ikev2_encryption_algorithms.csv", "ikev2_encryption_algorithms.csv",

View File

@@ -1,7 +1,7 @@
"""Compliance checking module for IANA and BSI standards.""" """Compliance checking module for IANA and BSI standards."""
import sqlite3 import sqlite3
from datetime import datetime, timezone from datetime import UTC, datetime
from typing import Any from typing import Any
# Error messages # Error messages
@@ -26,7 +26,7 @@ def check_compliance(db_path: str, scan_id: int) -> dict[str, Any]:
cursor = conn.cursor() cursor = conn.cursor()
try: try:
timestamp = datetime.now(timezone.utc).isoformat() timestamp = datetime.now(UTC).isoformat()
stats = { stats = {
"cipher_suites_checked": 0, "cipher_suites_checked": 0,
"cipher_suites_passed": 0, "cipher_suites_passed": 0,
@@ -122,7 +122,7 @@ def check_certificate_compliance(
if bsi_result and algo_type: if bsi_result and algo_type:
min_key_length, valid_until, notes = bsi_result min_key_length, valid_until, notes = bsi_result
current_year = datetime.now(timezone.utc).year current_year = datetime.now(UTC).year
# Check key length # Check key length
if key_bits and key_bits >= min_key_length: if key_bits and key_bits >= min_key_length:
@@ -285,7 +285,7 @@ def _check_cipher_suite_compliance(
# BSI check (sole compliance criterion) # BSI check (sole compliance criterion)
if bsi_approved: if bsi_approved:
current_year = datetime.now(timezone.utc).year current_year = datetime.now(UTC).year
if bsi_valid_until and bsi_valid_until >= current_year: if bsi_valid_until and bsi_valid_until >= current_year:
details.append(f"BSI: Approved until {bsi_valid_until}") details.append(f"BSI: Approved until {bsi_valid_until}")
passed = True passed = True
@@ -408,7 +408,7 @@ def _check_supported_group_compliance(
# BSI check (sole compliance criterion) # BSI check (sole compliance criterion)
if bsi_approved: if bsi_approved:
current_year = datetime.now(timezone.utc).year current_year = datetime.now(UTC).year
if bsi_valid_until and bsi_valid_until >= current_year: if bsi_valid_until and bsi_valid_until >= current_year:
details.append(f"BSI: Approved until {bsi_valid_until}") details.append(f"BSI: Approved until {bsi_valid_until}")
passed = True passed = True

View File

@@ -1,13 +1,11 @@
#!/usr/bin/env python3 """IANA XML parser utilities.
"""IANA XML Registry to SQLite Converter
Parses IANA XML registry files and exports specified registries directly to SQLite database Provides functions for parsing IANA XML registry files and extracting
based on configuration from iana_parse.json. registry data. Used by update_iana command and tests.
""" """
"""Script to fetch and parse IANA TLS registries into SQLite database."""
import json import json
import re
import sqlite3 import sqlite3
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
from pathlib import Path from pathlib import Path
@@ -213,6 +211,45 @@ def get_table_name_from_filename(filename: str) -> str:
return table_name return table_name
def extract_updated_date(xml_content: str) -> str:
"""Extract date from <updated> tag in XML content.
Args:
xml_content: XML content as string
Returns:
Date string in format YYYY-MM-DD or "unknown"
"""
for line in xml_content.split("\n")[:10]:
if "<updated>" in line:
match = re.search(r"<updated>([\d-]+)</updated>", line)
if match:
return match.group(1)
return "unknown"
def is_unassigned(record: ET.Element, ns: dict | None) -> bool:
"""Check if record is an unassigned or reserved range entry.
Args:
record: XML record element
ns: Namespace dictionary or None
Returns:
True if record should be skipped (unassigned/reserved ranges)
"""
value = get_element_text(record, "value", ns)
# Check for range notation (e.g., "42-255", "0x0000-0x0200")
# Range values indicate unassigned or reserved blocks
if re.search(r"\d+-\d+", value):
return True
return False
def write_registry_to_db( def write_registry_to_db(
root: ET.Element, root: ET.Element,
registry_id: str, registry_id: str,
@@ -248,9 +285,11 @@ def write_registry_to_db(
else: else:
records = registry.findall("record") records = registry.findall("record")
# Prepare data # Prepare data (skip unassigned entries)
rows = [] rows = []
for record in records: for record in records:
if is_unassigned(record, ns):
continue
row = [] row = []
for header in headers: for header in headers:
value = extract_field_value(record, header, ns) value = extract_field_value(record, header, ns)
@@ -279,7 +318,7 @@ def process_xml_file(
xml_path: str, xml_path: str,
registries: list[tuple[str, str, list[str]]], registries: list[tuple[str, str, list[str]]],
db_conn: sqlite3.Connection, db_conn: sqlite3.Connection,
repo_root: str, repo_root: Path,
) -> int: ) -> int:
"""Process single XML file and export all specified registries to database. """Process single XML file and export all specified registries to database.
@@ -331,71 +370,3 @@ def process_xml_file(
) from e ) from e
return total_rows return total_rows
def main() -> None:
"""Main entry point."""
# Determine paths
script_dir = Path(__file__).parent
repo_root = script_dir.parent.parent
config_path = script_dir / "data" / "iana_parse.json"
db_path = script_dir / "data" / "crypto_standards.db"
print("IANA XML zu SQLite Konverter")
print("=" * 50)
print(f"Repository Root: {repo_root}")
print(f"Konfiguration: {config_path}")
print(f"Datenbank: {db_path}")
# Check if database exists
if not db_path.exists():
print(f"\n✗ Fehler: Datenbank {db_path} nicht gefunden", file=sys.stderr)
print("Bitte zuerst die Datenbank erstellen.", file=sys.stderr)
sys.exit(1)
# Load configuration
try:
config = load_config(config_path)
except (FileNotFoundError, json.JSONDecodeError, OSError) as e:
print(f"\nFehler beim Laden der Konfiguration: {e}", file=sys.stderr)
sys.exit(1)
print(f"\n{len(config)} XML-Datei(en) gefunden in Konfiguration")
# Connect to database
try:
db_conn = sqlite3.connect(str(db_path))
except sqlite3.Error as e:
print(f"\n✗ Fehler beim Öffnen der Datenbank: {e}", file=sys.stderr)
sys.exit(1)
# Process each XML file
try:
success_count = 0
total_count = 0
total_rows = 0
for xml_path, registries in config.items():
total_count += len(registries)
try:
rows = process_xml_file(xml_path, registries, db_conn, str(repo_root))
success_count += len(registries)
total_rows += rows
except (RuntimeError, ValueError, sqlite3.Error) as e:
print(f"\nFehler: {e}", file=sys.stderr)
db_conn.close()
sys.exit(1)
# Summary
print("\n" + "=" * 50)
print(
f"Erfolgreich abgeschlossen: {success_count}/{total_count} Registries "
f"({total_rows} Einträge) in Datenbank importiert",
)
finally:
db_conn.close()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,227 @@
"""Validation functions for IANA registry data."""
import sqlite3
class ValidationError(Exception):
"""Raised when IANA data validation fails."""
pass
def normalize_header(header: str) -> str:
"""Normalize header name to database column format.
Args:
header: Header name from JSON config
Returns:
Normalized column name (lowercase, / replaced with _)
"""
return header.lower().replace("/", "_")
def validate_headers(
table_name: str,
headers: list[str],
db_conn: sqlite3.Connection,
) -> None:
"""Validate that headers match database schema.
Args:
table_name: Database table name
headers: Headers from JSON config
db_conn: Database connection
Raises:
ValidationError: If headers don't match schema
"""
cursor = db_conn.cursor()
cursor.execute(f"PRAGMA table_info({table_name})")
db_columns = [row[1] for row in cursor.fetchall()]
normalized_headers = [normalize_header(h) for h in headers]
if len(normalized_headers) != len(db_columns):
raise ValidationError(
f"Column count mismatch for {table_name}: "
f"expected {len(db_columns)}, got {len(normalized_headers)}"
)
for i, (expected, actual) in enumerate(zip(db_columns, normalized_headers)):
if expected != actual:
raise ValidationError(
f"Column {i} mismatch for {table_name}: "
f"expected '{expected}', got '{actual}' "
f"(from header '{headers[i]}')"
)
def validate_cipher_suite_row(row: dict[str, str]) -> None:
"""Validate single cipher suite record.
Args:
row: Dictionary with column names as keys
Raises:
ValidationError: If data is invalid
"""
required_fields = ["value", "description"]
for field in required_fields:
if field not in row or not row[field]:
raise ValidationError(f"Missing required field: {field}")
value = row["value"]
if not value.startswith("0x"):
raise ValidationError(f"Invalid value format: {value}")
rec = row.get("recommended", "")
if rec and rec not in ["Y", "N", "D"]:
raise ValidationError(f"Invalid Recommended value: {rec}")
def validate_supported_groups_row(row: dict[str, str]) -> None:
"""Validate single supported groups record.
Args:
row: Dictionary with column names as keys
Raises:
ValidationError: If data is invalid
"""
required_fields = ["value", "description"]
for field in required_fields:
if field not in row or not row[field]:
raise ValidationError(f"Missing required field: {field}")
try:
int(row["value"])
except ValueError as e:
raise ValidationError(f"Value must be numeric: {row['value']}") from e
rec = row.get("recommended", "")
if rec and rec not in ["Y", "N", "D"]:
raise ValidationError(f"Invalid Recommended value: {rec}")
def validate_signature_schemes_row(row: dict[str, str]) -> None:
"""Validate single signature schemes record.
Args:
row: Dictionary with column names as keys
Raises:
ValidationError: If data is invalid
"""
required_fields = ["value", "description"]
for field in required_fields:
if field not in row or not row[field]:
raise ValidationError(f"Missing required field: {field}")
value = row["value"]
if not value.startswith("0x"):
raise ValidationError(f"Invalid value format: {value}")
def validate_ikev2_row(row: dict[str, str]) -> None:
"""Validate IKEv2 record (encryption, DH groups, auth methods).
Args:
row: Dictionary with column names as keys
Raises:
ValidationError: If data is invalid
"""
required_fields = ["value", "description"]
for field in required_fields:
if field not in row or not row[field]:
raise ValidationError(f"Missing required field: {field}")
try:
int(row["value"])
except ValueError as e:
raise ValidationError(f"Value must be numeric: {row['value']}") from e
VALIDATORS = {
"iana_tls_cipher_suites": validate_cipher_suite_row,
"iana_tls_supported_groups": validate_supported_groups_row,
"iana_tls_signature_schemes": validate_signature_schemes_row,
"iana_ikev2_encryption_algorithms": validate_ikev2_row,
"iana_ikev2_dh_groups": validate_ikev2_row,
"iana_ikev2_authentication_methods": validate_ikev2_row,
"iana_ikev2_prf_algorithms": validate_ikev2_row,
"iana_ikev2_integrity_algorithms": validate_ikev2_row,
}
MIN_ROWS = {
"iana_tls_cipher_suites": 50,
"iana_tls_signature_schemes": 10,
"iana_tls_supported_groups": 10,
"iana_tls_alerts": 10,
"iana_tls_content_types": 5,
"iana_ikev2_encryption_algorithms": 10,
"iana_ikev2_prf_algorithms": 5,
"iana_ikev2_integrity_algorithms": 5,
"iana_ikev2_dh_groups": 10,
"iana_ikev2_authentication_methods": 5,
}
def get_min_rows(table_name: str) -> int:
"""Get minimum expected rows for table.
Args:
table_name: Database table name
Returns:
Minimum number of rows expected
"""
return MIN_ROWS.get(table_name, 5)
def validate_registry_data(
table_name: str,
rows: list[dict[str, str]],
skip_min_rows_check: bool = False,
) -> None:
"""Validate complete registry data before DB write.
Args:
table_name: Database table name
rows: List of row dictionaries
skip_min_rows_check: Skip minimum rows validation (for tests)
Raises:
ValidationError: If validation fails
"""
if not skip_min_rows_check:
min_rows = get_min_rows(table_name)
if len(rows) < min_rows:
raise ValidationError(
f"Insufficient data for {table_name}: "
f"{len(rows)} rows (expected >= {min_rows})"
)
validator = VALIDATORS.get(table_name)
if not validator:
return
for i, row in enumerate(rows, 1):
try:
validator(row)
except ValidationError as e:
raise ValidationError(f"Row {i} in {table_name}: {e}") from e

View File

@@ -164,9 +164,7 @@ def _print_vulnerabilities(scan_result: ServerScanResult) -> None:
heartbleed_result = heartbleed_attempt.result heartbleed_result = heartbleed_attempt.result
if heartbleed_result: if heartbleed_result:
status = ( status = (
"VERWUNDBAR ⚠️" "VERWUNDBAR" if heartbleed_result.is_vulnerable_to_heartbleed else "OK"
if heartbleed_result.is_vulnerable_to_heartbleed
else "OK ✓"
) )
print(f" • Heartbleed: {status}") print(f" • Heartbleed: {status}")
@@ -182,7 +180,7 @@ def _print_vulnerabilities(scan_result: ServerScanResult) -> None:
) )
elif hasattr(robot_result, "robot_result"): elif hasattr(robot_result, "robot_result"):
vulnerable = str(robot_result.robot_result) != "NOT_VULNERABLE_NO_ORACLE" vulnerable = str(robot_result.robot_result) != "NOT_VULNERABLE_NO_ORACLE"
status = "VERWUNDBAR ⚠️" if vulnerable else "OK" status = "VERWUNDBAR" if vulnerable else "OK"
print(f" • ROBOT: {status}") print(f" • ROBOT: {status}")
# OpenSSL CCS Injection # OpenSSL CCS Injection
@@ -190,9 +188,7 @@ def _print_vulnerabilities(scan_result: ServerScanResult) -> None:
if ccs_attempt.status == ScanCommandAttemptStatusEnum.COMPLETED: if ccs_attempt.status == ScanCommandAttemptStatusEnum.COMPLETED:
ccs_result = ccs_attempt.result ccs_result = ccs_attempt.result
if ccs_result: if ccs_result:
status = ( status = "VERWUNDBAR" if ccs_result.is_vulnerable_to_ccs_injection else "OK"
"VERWUNDBAR ⚠️" if ccs_result.is_vulnerable_to_ccs_injection else "OK ✓"
)
print(f" • OpenSSL CCS Injection: {status}") print(f" • OpenSSL CCS Injection: {status}")

View File

@@ -1,95 +1,26 @@
"""CSV report generation with granular file structure for reST integration.""" """CSV report generation with granular file structure for reST integration."""
import csv
import json
import sqlite3
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from .query import get_scan_data from .csv_utils import CSVExporter, format_bool
from .query import get_scan_data, has_tls_support
def _get_headers(db_path: str, export_type: str) -> list[str]:
"""Get CSV headers from database.
Args:
db_path: Path to database file
export_type: Type of export (e.g. 'cipher_suites_accepted')
Returns:
List of column headers
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute(
"SELECT headers FROM csv_export_metadata WHERE export_type = ?",
(export_type,),
)
row = cursor.fetchone()
conn.close()
if row:
return json.loads(row[0])
raise ValueError(f"No headers found for export_type: {export_type}")
def _format_bool(
value: bool | None,
true_val: str = "Yes",
false_val: str = "No",
none_val: str = "-",
) -> str:
"""Format boolean value to string representation.
Args:
value: Boolean value to format
true_val: String representation for True
false_val: String representation for False
none_val: String representation for None
Returns:
Formatted string
"""
if value is True:
return true_val
if value is False:
return false_val
return none_val
def _write_csv(filepath: Path, headers: list[str], rows: list[list[Any]]) -> None:
"""Write data to CSV file.
Args:
filepath: Path to CSV file
headers: List of column headers
rows: List of data rows
"""
with filepath.open("w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(headers)
writer.writerows(rows)
def _export_summary( def _export_summary(
output_dir: Path, exporter: CSVExporter,
summary: dict[str, Any], summary: dict[str, Any],
db_path: str,
) -> list[str]: ) -> list[str]:
"""Export summary statistics to CSV. """Export summary statistics to CSV.
Args: Args:
output_dir: Output directory path exporter: CSVExporter instance
summary: Summary data dictionary summary: Summary data dictionary
Returns: Returns:
List of generated file paths List of generated file paths
""" """
summary_file = output_dir / "summary.csv"
rows = [ rows = [
["Scanned Ports", summary.get("total_ports", 0)], ["Scanned Ports", summary.get("total_ports", 0)],
["Ports with TLS Support", summary.get("successful_ports", 0)], ["Ports with TLS Support", summary.get("successful_ports", 0)],
@@ -114,21 +45,19 @@ def _export_summary(
summary.get("critical_vulnerabilities", 0), summary.get("critical_vulnerabilities", 0),
], ],
] ]
headers = _get_headers(db_path, "summary") filepath = exporter.write_csv("summary.csv", "summary", rows)
_write_csv(summary_file, headers, rows) return [filepath]
return [str(summary_file)]
def _export_cipher_suites( def _export_cipher_suites(
output_dir: Path, exporter: CSVExporter,
port: int, port: int,
cipher_suites: dict[str, dict[str, list]], cipher_suites: dict[str, dict[str, list]],
db_path: str,
) -> list[str]: ) -> list[str]:
"""Export cipher suites to CSV files. """Export cipher suites to CSV files.
Args: Args:
output_dir: Output directory path exporter: CSVExporter instance
port: Port number port: Port number
cipher_suites: Cipher suites data per TLS version cipher_suites: Cipher suites data per TLS version
@@ -140,49 +69,46 @@ def _export_cipher_suites(
for tls_version, suites in cipher_suites.items(): for tls_version, suites in cipher_suites.items():
if suites.get("accepted"): if suites.get("accepted"):
filepath = output_dir / f"{port}_cipher_suites_{tls_version}_accepted.csv"
rows = [ rows = [
[ [
suite["name"], suite["name"],
suite.get("iana_recommended", "-"), suite.get("iana_recommended", "-"),
_format_bool(suite.get("bsi_approved")), format_bool(suite.get("bsi_approved")),
suite.get("bsi_valid_until", "-"), suite.get("bsi_valid_until", "-"),
_format_bool(suite.get("compliant")), format_bool(suite.get("compliant")),
] ]
for suite in suites["accepted"] for suite in suites["accepted"]
] ]
headers = _get_headers(db_path, "cipher_suites_accepted") filename = f"{port}_cipher_suites_{tls_version}_accepted.csv"
_write_csv(filepath, headers, rows) filepath = exporter.write_csv(filename, "cipher_suites_accepted", rows)
generated.append(str(filepath)) generated.append(filepath)
if suites.get("rejected"): if suites.get("rejected"):
filepath = output_dir / f"{port}_cipher_suites_{tls_version}_rejected.csv"
rows = [ rows = [
[ [
suite["name"], suite["name"],
suite.get("iana_recommended", "-"), suite.get("iana_recommended", "-"),
_format_bool(suite.get("bsi_approved")), format_bool(suite.get("bsi_approved")),
suite.get("bsi_valid_until", "-"), suite.get("bsi_valid_until", "-"),
] ]
for suite in suites["rejected"] for suite in suites["rejected"]
] ]
headers = _get_headers(db_path, "cipher_suites_rejected") filename = f"{port}_cipher_suites_{tls_version}_rejected.csv"
_write_csv(filepath, headers, rows) filepath = exporter.write_csv(filename, "cipher_suites_rejected", rows)
generated.append(str(filepath)) generated.append(filepath)
return generated return generated
def _export_supported_groups( def _export_supported_groups(
output_dir: Path, exporter: CSVExporter,
port: int, port: int,
groups: list[dict[str, Any]], groups: list[dict[str, Any]],
db_path: str,
) -> list[str]: ) -> list[str]:
"""Export supported groups to CSV. """Export supported groups to CSV.
Args: Args:
output_dir: Output directory path exporter: CSVExporter instance
port: Port number port: Port number
groups: List of supported groups groups: List of supported groups
@@ -190,32 +116,30 @@ def _export_supported_groups(
List of generated file paths List of generated file paths
""" """
filepath = output_dir / f"{port}_supported_groups.csv"
rows = [ rows = [
[ [
group["name"], group["name"],
group.get("iana_recommended", "-"), group.get("iana_recommended", "-"),
_format_bool(group.get("bsi_approved")), format_bool(group.get("bsi_approved")),
group.get("bsi_valid_until", "-"), group.get("bsi_valid_until", "-"),
_format_bool(group.get("compliant")), format_bool(group.get("compliant")),
] ]
for group in groups for group in groups
] ]
headers = _get_headers(db_path, "supported_groups") filename = f"{port}_supported_groups.csv"
_write_csv(filepath, headers, rows) filepath = exporter.write_csv(filename, "supported_groups", rows)
return [str(filepath)] return [filepath]
def _export_missing_groups( def _export_missing_groups(
output_dir: Path, exporter: CSVExporter,
port: int, port: int,
missing: dict[str, list[dict[str, Any]]], missing: dict[str, list[dict[str, Any]]],
db_path: str,
) -> list[str]: ) -> list[str]:
"""Export missing recommended groups to CSV. """Export missing recommended groups to CSV.
Args: Args:
output_dir: Output directory path exporter: CSVExporter instance
port: Port number port: Port number
missing: Dictionary with bsi_approved and iana_recommended groups missing: Dictionary with bsi_approved and iana_recommended groups
@@ -226,7 +150,6 @@ def _export_missing_groups(
generated = [] generated = []
if missing.get("bsi_approved"): if missing.get("bsi_approved"):
filepath = output_dir / f"{port}_missing_groups_bsi.csv"
rows = [ rows = [
[ [
group["name"], group["name"],
@@ -235,33 +158,31 @@ def _export_missing_groups(
] ]
for group in missing["bsi_approved"] for group in missing["bsi_approved"]
] ]
headers = _get_headers(db_path, "missing_groups_bsi") filename = f"{port}_missing_groups_bsi.csv"
_write_csv(filepath, headers, rows) filepath = exporter.write_csv(filename, "missing_groups_bsi", rows)
generated.append(str(filepath)) generated.append(filepath)
if missing.get("iana_recommended"): if missing.get("iana_recommended"):
filepath = output_dir / f"{port}_missing_groups_iana.csv"
rows = [ rows = [
[group["name"], group.get("iana_value", "-")] [group["name"], group.get("iana_value", "-")]
for group in missing["iana_recommended"] for group in missing["iana_recommended"]
] ]
headers = _get_headers(db_path, "missing_groups_iana") filename = f"{port}_missing_groups_iana.csv"
_write_csv(filepath, headers, rows) filepath = exporter.write_csv(filename, "missing_groups_iana", rows)
generated.append(str(filepath)) generated.append(filepath)
return generated return generated
def _export_certificates( def _export_certificates(
output_dir: Path, exporter: CSVExporter,
port: int, port: int,
certificates: list[dict[str, Any]], certificates: list[dict[str, Any]],
db_path: str,
) -> list[str]: ) -> list[str]:
"""Export certificates to CSV. """Export certificates to CSV.
Args: Args:
output_dir: Output directory path exporter: CSVExporter instance
port: Port number port: Port number
certificates: List of certificate data certificates: List of certificate data
@@ -269,7 +190,6 @@ def _export_certificates(
List of generated file paths List of generated file paths
""" """
filepath = output_dir / f"{port}_certificates.csv"
rows = [ rows = [
[ [
cert["position"], cert["position"],
@@ -279,25 +199,24 @@ def _export_certificates(
cert["not_after"], cert["not_after"],
cert["key_type"], cert["key_type"],
cert["key_bits"], cert["key_bits"],
_format_bool(cert.get("compliant")), format_bool(cert.get("compliant")),
] ]
for cert in certificates for cert in certificates
] ]
headers = _get_headers(db_path, "certificates") filename = f"{port}_certificates.csv"
_write_csv(filepath, headers, rows) filepath = exporter.write_csv(filename, "certificates", rows)
return [str(filepath)] return [filepath]
def _export_vulnerabilities( def _export_vulnerabilities(
output_dir: Path, exporter: CSVExporter,
port: int, port: int,
vulnerabilities: list[dict[str, Any]], vulnerabilities: list[dict[str, Any]],
db_path: str,
) -> list[str]: ) -> list[str]:
"""Export vulnerabilities to CSV. """Export vulnerabilities to CSV.
Args: Args:
output_dir: Output directory path exporter: CSVExporter instance
port: Port number port: Port number
vulnerabilities: List of vulnerability data vulnerabilities: List of vulnerability data
@@ -305,30 +224,28 @@ def _export_vulnerabilities(
List of generated file paths List of generated file paths
""" """
filepath = output_dir / f"{port}_vulnerabilities.csv"
rows = [ rows = [
[ [
vuln["type"], vuln["type"],
_format_bool(vuln["vulnerable"]), format_bool(vuln["vulnerable"]),
vuln.get("details", "-"), vuln.get("details", "-"),
] ]
for vuln in vulnerabilities for vuln in vulnerabilities
] ]
headers = _get_headers(db_path, "vulnerabilities") filename = f"{port}_vulnerabilities.csv"
_write_csv(filepath, headers, rows) filepath = exporter.write_csv(filename, "vulnerabilities", rows)
return [str(filepath)] return [filepath]
def _export_protocol_features( def _export_protocol_features(
output_dir: Path, exporter: CSVExporter,
port: int, port: int,
features: list[dict[str, Any]], features: list[dict[str, Any]],
db_path: str,
) -> list[str]: ) -> list[str]:
"""Export protocol features to CSV. """Export protocol features to CSV.
Args: Args:
output_dir: Output directory path exporter: CSVExporter instance
port: Port number port: Port number
features: List of protocol feature data features: List of protocol feature data
@@ -336,30 +253,28 @@ def _export_protocol_features(
List of generated file paths List of generated file paths
""" """
filepath = output_dir / f"{port}_protocol_features.csv"
rows = [ rows = [
[ [
feature["name"], feature["name"],
_format_bool(feature["supported"]), format_bool(feature["supported"]),
feature.get("details", "-"), feature.get("details", "-"),
] ]
for feature in features for feature in features
] ]
headers = _get_headers(db_path, "protocol_features") filename = f"{port}_protocol_features.csv"
_write_csv(filepath, headers, rows) filepath = exporter.write_csv(filename, "protocol_features", rows)
return [str(filepath)] return [filepath]
def _export_session_features( def _export_session_features(
output_dir: Path, exporter: CSVExporter,
port: int, port: int,
features: list[dict[str, Any]], features: list[dict[str, Any]],
db_path: str,
) -> list[str]: ) -> list[str]:
"""Export session features to CSV. """Export session features to CSV.
Args: Args:
output_dir: Output directory path exporter: CSVExporter instance
port: Port number port: Port number
features: List of session feature data features: List of session feature data
@@ -367,33 +282,31 @@ def _export_session_features(
List of generated file paths List of generated file paths
""" """
filepath = output_dir / f"{port}_session_features.csv"
rows = [ rows = [
[ [
feature["type"], feature["type"],
_format_bool(feature.get("client_initiated")), format_bool(feature.get("client_initiated")),
_format_bool(feature.get("secure")), format_bool(feature.get("secure")),
_format_bool(feature.get("session_id_supported")), format_bool(feature.get("session_id_supported")),
_format_bool(feature.get("ticket_supported")), format_bool(feature.get("ticket_supported")),
feature.get("details", "-"), feature.get("details", "-"),
] ]
for feature in features for feature in features
] ]
headers = _get_headers(db_path, "session_features") filename = f"{port}_session_features.csv"
_write_csv(filepath, headers, rows) filepath = exporter.write_csv(filename, "session_features", rows)
return [str(filepath)] return [filepath]
def _export_http_headers( def _export_http_headers(
output_dir: Path, exporter: CSVExporter,
port: int, port: int,
headers: list[dict[str, Any]], headers: list[dict[str, Any]],
db_path: str,
) -> list[str]: ) -> list[str]:
"""Export HTTP headers to CSV. """Export HTTP headers to CSV.
Args: Args:
output_dir: Output directory path exporter: CSVExporter instance
port: Port number port: Port number
headers: List of HTTP header data headers: List of HTTP header data
@@ -401,30 +314,28 @@ def _export_http_headers(
List of generated file paths List of generated file paths
""" """
filepath = output_dir / f"{port}_http_headers.csv"
rows = [ rows = [
[ [
header["name"], header["name"],
_format_bool(header["is_present"]), format_bool(header["is_present"]),
header.get("value", "-"), header.get("value", "-"),
] ]
for header in headers for header in headers
] ]
csv_headers = _get_headers(db_path, "http_headers") filename = f"{port}_http_headers.csv"
_write_csv(filepath, csv_headers, rows) filepath = exporter.write_csv(filename, "http_headers", rows)
return [str(filepath)] return [filepath]
def _export_compliance_status( def _export_compliance_status(
output_dir: Path, exporter: CSVExporter,
port: int, port: int,
compliance: dict[str, Any], compliance: dict[str, Any],
db_path: str,
) -> list[str]: ) -> list[str]:
"""Export compliance status to CSV. """Export compliance status to CSV.
Args: Args:
output_dir: Output directory path exporter: CSVExporter instance
port: Port number port: Port number
compliance: Compliance data dictionary compliance: Compliance data dictionary
@@ -432,7 +343,6 @@ def _export_compliance_status(
List of generated file paths List of generated file paths
""" """
filepath = output_dir / f"{port}_compliance_status.csv"
rows = [] rows = []
if "cipher_suites_checked" in compliance: if "cipher_suites_checked" in compliance:
@@ -456,32 +366,13 @@ def _export_compliance_status(
) )
if rows: if rows:
headers = _get_headers(db_path, "compliance_status") filename = f"{port}_compliance_status.csv"
_write_csv(filepath, headers, rows) filepath = exporter.write_csv(filename, "compliance_status", rows)
return [str(filepath)] return [filepath]
return [] return []
def _has_tls_support(port_data: dict[str, Any]) -> bool:
"""Check if port has TLS support.
Args:
port_data: Port data dictionary
Returns:
True if port has TLS support
"""
return bool(
port_data.get("cipher_suites")
or port_data.get("supported_groups")
or port_data.get("certificates")
or port_data.get("tls_version"),
)
# Export handlers mapping: (data_key, handler_function)
EXPORT_HANDLERS = ( EXPORT_HANDLERS = (
("cipher_suites", _export_cipher_suites), ("cipher_suites", _export_cipher_suites),
("supported_groups", _export_supported_groups), ("supported_groups", _export_supported_groups),
@@ -515,22 +406,19 @@ def generate_csv_reports(
output_dir_path = Path(output_dir) output_dir_path = Path(output_dir)
output_dir_path.mkdir(parents=True, exist_ok=True) output_dir_path.mkdir(parents=True, exist_ok=True)
exporter = CSVExporter(db_path, output_dir_path)
generated_files = [] generated_files = []
generated_files.extend( generated_files.extend(_export_summary(exporter, data.get("summary", {})))
_export_summary(output_dir_path, data.get("summary", {}), db_path),
)
for port_data in data["ports_data"].values(): for port_data in data["ports_data"].values():
if not _has_tls_support(port_data): if not has_tls_support(port_data):
continue continue
port = port_data["port"] port = port_data["port"]
for data_key, handler_func in EXPORT_HANDLERS: for data_key, handler_func in EXPORT_HANDLERS:
if port_data.get(data_key): if port_data.get(data_key):
generated_files.extend( generated_files.extend(handler_func(exporter, port, port_data[data_key]))
handler_func(output_dir_path, port, port_data[data_key], db_path),
)
return generated_files return generated_files

View File

@@ -0,0 +1,102 @@
"""Utilities for CSV export with header caching and path management."""
import csv
import json
import sqlite3
from pathlib import Path
from typing import Any
class CSVExporter:
"""CSV export helper with header caching and path management."""
def __init__(self, db_path: str, output_dir: Path):
"""Initialize CSV exporter.
Args:
db_path: Path to database file
output_dir: Output directory for CSV files
"""
self.db_path = db_path
self.output_dir = output_dir
self._headers_cache: dict[str, list[str]] = {}
def get_headers(self, export_type: str) -> list[str]:
"""Get CSV headers from database with caching.
Args:
export_type: Type of export (e.g. 'cipher_suites_accepted')
Returns:
List of column headers
"""
if export_type not in self._headers_cache:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
"SELECT headers FROM csv_export_metadata WHERE export_type = ?",
(export_type,),
)
row = cursor.fetchone()
conn.close()
if row:
self._headers_cache[export_type] = json.loads(row[0])
else:
raise ValueError(f"No headers found for export_type: {export_type}")
return self._headers_cache[export_type]
def write_csv(
self,
filename: str,
export_type: str,
rows: list[list[Any]],
) -> str:
"""Write data to CSV file with headers from metadata.
Args:
filename: CSV filename
export_type: Type of export for header lookup
rows: List of data rows
Returns:
String path to created file
"""
filepath = self.output_dir / filename
headers = self.get_headers(export_type)
with filepath.open("w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(headers)
writer.writerows(rows)
return str(filepath)
def format_bool(
value: bool | None,
true_val: str = "Yes",
false_val: str = "No",
none_val: str = "-",
) -> str:
"""Format boolean value to string representation.
Args:
value: Boolean value to format
true_val: String representation for True
false_val: String representation for False
none_val: String representation for None
Returns:
Formatted string
"""
if value is True:
return true_val
if value is False:
return false_val
return none_val

View File

@@ -7,6 +7,24 @@ from typing import Any
COMPLIANCE_WARNING_THRESHOLD = 50.0 COMPLIANCE_WARNING_THRESHOLD = 50.0
def has_tls_support(port_data: dict[str, Any]) -> bool:
"""Check if port has TLS support based on data presence.
Args:
port_data: Port data dictionary
Returns:
True if port has TLS support
"""
return bool(
port_data.get("cipher_suites")
or port_data.get("supported_groups")
or port_data.get("certificates")
or port_data.get("tls_version")
)
def list_scans(db_path: str) -> list[dict[str, Any]]: def list_scans(db_path: str) -> list[dict[str, Any]]:
"""List all available scans in the database. """List all available scans in the database.

View File

@@ -1,11 +1,13 @@
"""Shared utilities for report template rendering.""" """Shared utilities for report template rendering."""
from datetime import datetime, timezone from datetime import UTC, datetime
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from jinja2 import Environment, FileSystemLoader, select_autoescape from jinja2 import Environment, FileSystemLoader, select_autoescape
from .query import has_tls_support
def format_tls_version(version: str) -> str: def format_tls_version(version: str) -> str:
"""Format TLS version string for display. """Format TLS version string for display.
@@ -59,7 +61,7 @@ def generate_report_id(metadata: dict[str, Any]) -> str:
dt = datetime.fromisoformat(metadata["timestamp"]) dt = datetime.fromisoformat(metadata["timestamp"])
date_str = dt.strftime("%Y%m%d") date_str = dt.strftime("%Y%m%d")
except (ValueError, KeyError): except (ValueError, KeyError):
date_str = datetime.now(timezone.utc).strftime("%Y%m%d") date_str = datetime.now(UTC).strftime("%Y%m%d")
return f"{date_str}_{metadata['scan_id']}" return f"{date_str}_{metadata['scan_id']}"
@@ -95,13 +97,7 @@ def build_template_context(data: dict[str, Any]) -> dict[str, Any]:
# Filter ports with TLS support for port sections # Filter ports with TLS support for port sections
ports_with_tls = [] ports_with_tls = []
for port_data in data["ports_data"].values(): for port_data in data["ports_data"].values():
has_tls = ( if has_tls_support(port_data):
port_data.get("cipher_suites")
or port_data.get("supported_groups")
or port_data.get("certificates")
or port_data.get("tls_version")
)
if has_tls:
ports_with_tls.append(port_data) ports_with_tls.append(port_data)
return { return {

View File

@@ -1,11 +1,9 @@
"""Module for performing SSL/TLS scans with SSLyze.""" """Module for performing SSL/TLS scans with SSLyze."""
import logging import logging
from datetime import datetime, timezone from datetime import UTC, datetime
from typing import Any from typing import Any
logger = logging.getLogger(__name__)
from sslyze import ( from sslyze import (
ProtocolWithOpportunisticTlsEnum, ProtocolWithOpportunisticTlsEnum,
Scanner, Scanner,
@@ -19,6 +17,8 @@ from sslyze import (
from .protocol_loader import get_protocol_for_port from .protocol_loader import get_protocol_for_port
logger = logging.getLogger(__name__)
def create_scan_request( def create_scan_request(
hostname: str, hostname: str,
@@ -194,7 +194,7 @@ def perform_scan(
continue continue
# Calculate scan duration # Calculate scan duration
scan_end_time = datetime.now(timezone.utc) scan_end_time = datetime.now(UTC)
scan_duration = (scan_end_time - scan_start_time).total_seconds() scan_duration = (scan_end_time - scan_start_time).total_seconds()
# Return first result (we only scan one host) # Return first result (we only scan one host)

View File

@@ -0,0 +1,69 @@
<?xml version='1.0' encoding='UTF-8'?>
<registry xmlns="http://www.iana.org/assignments" id="ikev2-parameters">
<title>Internet Key Exchange Version 2 (IKEv2) Parameters</title>
<created>2005-01-18</created>
<updated>2025-12-03</updated>
<registry id="ikev2-parameters-5">
<title>Transform Type 1 - Encryption Algorithm Transform IDs</title>
<record>
<value>12</value>
<description>ENCR_AES_CBC</description>
<esp>Y</esp>
<ikev2>Y</ikev2>
<xref type="rfc" data="rfc3602"/>
</record>
<record>
<value>20</value>
<description>ENCR_AES_GCM_16</description>
<esp>Y</esp>
<ikev2>Y</ikev2>
<xref type="rfc" data="rfc4106"/>
</record>
<record>
<value>28</value>
<description>ENCR_CHACHA20_POLY1305</description>
<esp>Y</esp>
<ikev2>Y</ikev2>
<xref type="rfc" data="rfc7634"/>
</record>
</registry>
<registry id="ikev2-parameters-8">
<title>Transform Type 4 - Diffie-Hellman Group Transform IDs</title>
<record>
<value>14</value>
<description>2048-bit MODP Group</description>
<status>RECOMMENDED</status>
<xref type="rfc" data="rfc3526"/>
</record>
<record>
<value>19</value>
<description>256-bit random ECP group</description>
<status>RECOMMENDED</status>
<xref type="rfc" data="rfc5903"/>
</record>
<record>
<value>31</value>
<description>Curve25519</description>
<status>RECOMMENDED</status>
<xref type="rfc" data="rfc8031"/>
</record>
</registry>
<registry id="ikev2-parameters-12">
<title>IKEv2 Authentication Method</title>
<record>
<value>1</value>
<description>RSA Digital Signature</description>
<status>DEPRECATED</status>
<xref type="rfc" data="rfc7427"/>
</record>
<record>
<value>14</value>
<description>Digital Signature</description>
<status>RECOMMENDED</status>
<xref type="rfc" data="rfc7427"/>
</record>
</registry>
</registry>

View File

@@ -0,0 +1,96 @@
<?xml version='1.0' encoding='UTF-8'?>
<registry xmlns="http://www.iana.org/assignments" id="tls-parameters">
<title>Transport Layer Security (TLS) Parameters</title>
<category>Transport Layer Security (TLS)</category>
<created>2005-08-23</created>
<updated>2025-12-03</updated>
<registry id="tls-parameters-4">
<title>TLS Cipher Suites</title>
<record>
<value>0x13,0x01</value>
<description>TLS_AES_128_GCM_SHA256</description>
<dtls>Y</dtls>
<rec>Y</rec>
<xref type="rfc" data="rfc8446"/>
</record>
<record>
<value>0x13,0x02</value>
<description>TLS_AES_256_GCM_SHA384</description>
<dtls>Y</dtls>
<rec>Y</rec>
<xref type="rfc" data="rfc8446"/>
</record>
<record>
<value>0x00,0x9C</value>
<description>TLS_RSA_WITH_AES_128_GCM_SHA256</description>
<dtls>Y</dtls>
<rec>N</rec>
<xref type="rfc" data="rfc5288"/>
</record>
<record>
<value>0x00,0x2F</value>
<description>TLS_RSA_WITH_AES_128_CBC_SHA</description>
<dtls>Y</dtls>
<rec>N</rec>
<xref type="rfc" data="rfc5246"/>
</record>
<record>
<value>0x00,0x0A</value>
<description>TLS_RSA_WITH_3DES_EDE_CBC_SHA</description>
<dtls>Y</dtls>
<rec>N</rec>
<xref type="rfc" data="rfc5246"/>
</record>
</registry>
<registry id="tls-parameters-8">
<title>TLS Supported Groups</title>
<record>
<value>23</value>
<description>secp256r1</description>
<dtls>Y</dtls>
<rec>Y</rec>
<xref type="rfc" data="rfc8422"/>
</record>
<record>
<value>24</value>
<description>secp384r1</description>
<dtls>Y</dtls>
<rec>Y</rec>
<xref type="rfc" data="rfc8422"/>
</record>
<record>
<value>29</value>
<description>x25519</description>
<dtls>Y</dtls>
<rec>Y</rec>
<xref type="rfc" data="rfc8446"/>
</record>
</registry>
<registry id="tls-signaturescheme">
<title>TLS SignatureScheme</title>
<record>
<value>0x0403</value>
<description>ecdsa_secp256r1_sha256</description>
<dtls>Y</dtls>
<rec>Y</rec>
<xref type="rfc" data="rfc8446"/>
</record>
<record>
<value>0x0804</value>
<description>rsa_pss_rsae_sha256</description>
<dtls>Y</dtls>
<rec>Y</rec>
<xref type="rfc" data="rfc8446"/>
</record>
<record>
<value>0x0401</value>
<description>rsa_pkcs1_sha256</description>
<dtls>Y</dtls>
<rec>N</rec>
<xref type="rfc" data="rfc8446"/>
</record>
</registry>
</registry>

View File

@@ -1,73 +0,0 @@
"""Tests for compliance checking functionality."""
from datetime import datetime
class TestComplianceChecks:
"""Tests for compliance validation logic."""
def test_check_bsi_validity(self) -> None:
"""Test BSI cipher suite validity checking."""
# Valid BSI-approved cipher suite (not expired)
cipher_suite_valid = {
"name": "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
"iana_recommended": "N",
"bsi_approved": True,
"bsi_valid_until": "2029",
}
# Check that current year is before 2029
current_year = datetime.now().year
assert current_year < 2029, "Test assumes current year < 2029"
# BSI-approved and valid should be compliant
assert cipher_suite_valid["bsi_approved"] is True
assert int(cipher_suite_valid["bsi_valid_until"]) > current_year
# Expired BSI-approved cipher suite
cipher_suite_expired = {
"name": "TLS_OLD_CIPHER",
"iana_recommended": "N",
"bsi_approved": True,
"bsi_valid_until": "2020",
}
# BSI-approved but expired should not be compliant
assert cipher_suite_expired["bsi_approved"] is True
assert int(cipher_suite_expired["bsi_valid_until"]) < current_year
# No BSI data
cipher_suite_no_bsi = {
"name": "TLS_CHACHA20_POLY1305_SHA256",
"iana_recommended": "Y",
"bsi_approved": False,
"bsi_valid_until": None,
}
# Without BSI approval, compliance depends on IANA
assert cipher_suite_no_bsi["bsi_approved"] is False
def test_check_iana_recommendation(self) -> None:
"""Test IANA recommendation checking."""
# IANA recommended cipher suite
cipher_suite_recommended = {
"name": "TLS_AES_256_GCM_SHA384",
"iana_recommended": "Y",
"bsi_approved": True,
"bsi_valid_until": "2031",
}
assert cipher_suite_recommended["iana_recommended"] == "Y"
# IANA not recommended cipher suite
cipher_suite_not_recommended = {
"name": "TLS_RSA_WITH_AES_128_CBC_SHA",
"iana_recommended": "N",
"bsi_approved": False,
"bsi_valid_until": None,
}
assert cipher_suite_not_recommended["iana_recommended"] == "N"
# No IANA data (should default to non-compliant)
cipher_suite_no_iana = {
"name": "TLS_UNKNOWN_CIPHER",
"iana_recommended": None,
"bsi_approved": False,
"bsi_valid_until": None,
}
assert cipher_suite_no_iana["iana_recommended"] is None

202
tests/test_iana_parse.py Normal file
View File

@@ -0,0 +1,202 @@
"""Tests for IANA XML parsing functionality."""
import xml.etree.ElementTree as ET
import pytest
from sslysze_scan.iana_parser import (
extract_field_value,
find_registry,
get_element_text,
is_unassigned,
parse_xml_with_namespace_support,
process_xref_elements,
)
class TestParseXmlWithNamespace:
"""Tests for XML parsing with namespace detection."""
def test_parse_tls_parameters_with_namespace(self) -> None:
"""Test parsing TLS parameters XML with IANA namespace."""
xml_path = "tests/fixtures/iana_xml/tls-parameters-minimal.xml"
root, ns = parse_xml_with_namespace_support(xml_path)
assert root is not None
assert ns is not None
assert "iana" in ns
assert ns["iana"] == "http://www.iana.org/assignments"
def test_parse_nonexistent_file(self) -> None:
"""Test that parsing nonexistent file raises FileNotFoundError."""
with pytest.raises(FileNotFoundError):
parse_xml_with_namespace_support("nonexistent.xml")
class TestFindRegistry:
"""Tests for finding registry by ID."""
def test_find_cipher_suites_registry(self) -> None:
"""Test finding TLS cipher suites registry."""
xml_path = "tests/fixtures/iana_xml/tls-parameters-minimal.xml"
root, ns = parse_xml_with_namespace_support(xml_path)
registry = find_registry(root, "tls-parameters-4", ns)
assert registry is not None
assert registry.get("id") == "tls-parameters-4"
def test_find_nonexistent_registry(self) -> None:
"""Test that finding nonexistent registry raises ValueError."""
xml_path = "tests/fixtures/iana_xml/tls-parameters-minimal.xml"
root, ns = parse_xml_with_namespace_support(xml_path)
with pytest.raises(ValueError, match="Registry .* nicht gefunden"):
find_registry(root, "nonexistent-registry", ns)
class TestGetElementText:
"""Tests for element text extraction."""
def test_get_element_text_with_namespace(self) -> None:
"""Test extracting text from element with namespace."""
xml_path = "tests/fixtures/iana_xml/tls-parameters-minimal.xml"
root, ns = parse_xml_with_namespace_support(xml_path)
registry = find_registry(root, "tls-parameters-4", ns)
records = registry.findall("iana:record", ns)
first_record = records[0]
value = get_element_text(first_record, "value", ns)
assert value == "0x13,0x01"
description = get_element_text(first_record, "description", ns)
assert description == "TLS_AES_128_GCM_SHA256"
def test_get_element_text_nonexistent(self) -> None:
"""Test that nonexistent element returns empty string."""
xml_path = "tests/fixtures/iana_xml/tls-parameters-minimal.xml"
root, ns = parse_xml_with_namespace_support(xml_path)
registry = find_registry(root, "tls-parameters-4", ns)
records = registry.findall("iana:record", ns)
first_record = records[0]
result = get_element_text(first_record, "nonexistent", ns)
assert result == ""
class TestProcessXrefElements:
"""Tests for xref element processing."""
def test_process_single_xref(self) -> None:
"""Test processing single xref element."""
xml_path = "tests/fixtures/iana_xml/tls-parameters-minimal.xml"
root, ns = parse_xml_with_namespace_support(xml_path)
registry = find_registry(root, "tls-parameters-4", ns)
records = registry.findall("iana:record", ns)
first_record = records[0]
xref_str = process_xref_elements(first_record, ns)
assert "rfc:rfc8446" in xref_str
def test_process_no_xref(self) -> None:
"""Test processing record without xref elements."""
xml_str = """
<record xmlns="http://www.iana.org/assignments">
<value>0x13,0x01</value>
<description>Test</description>
</record>
"""
record = ET.fromstring(xml_str)
ns = {"iana": "http://www.iana.org/assignments"}
xref_str = process_xref_elements(record, ns)
assert xref_str == ""
class TestExtractFieldValue:
"""Tests for field value extraction."""
def test_extract_recommended_field(self) -> None:
"""Test extracting Recommended field."""
xml_path = "tests/fixtures/iana_xml/tls-parameters-minimal.xml"
root, ns = parse_xml_with_namespace_support(xml_path)
registry = find_registry(root, "tls-parameters-4", ns)
records = registry.findall("iana:record", ns)
first_record = records[0]
rec = extract_field_value(first_record, "Recommended", ns)
assert rec == "Y"
def test_extract_rfc_draft_field(self) -> None:
"""Test extracting RFC/Draft field via xref processing."""
xml_path = "tests/fixtures/iana_xml/tls-parameters-minimal.xml"
root, ns = parse_xml_with_namespace_support(xml_path)
registry = find_registry(root, "tls-parameters-4", ns)
records = registry.findall("iana:record", ns)
first_record = records[0]
rfc_draft = extract_field_value(first_record, "RFC/Draft", ns)
assert "rfc:rfc8446" in rfc_draft
class TestExtractUpdatedDate:
"""Tests for extracting updated date from XML."""
def test_extract_updated_from_tls_xml(self) -> None:
"""Test extracting updated date from TLS parameters XML."""
xml_path = "tests/fixtures/iana_xml/tls-parameters-minimal.xml"
with open(xml_path, encoding="utf-8") as f:
xml_content = f.read()
lines = xml_content.split("\n")[:10]
updated_line = [line for line in lines if "<updated>" in line]
assert len(updated_line) == 1
assert "2025-12-03" in updated_line[0]
class TestIsUnassigned:
"""Tests for unassigned entry detection."""
def test_is_unassigned_numeric_range(self) -> None:
"""Test detection of numeric range values."""
xml_str = """
<record xmlns="http://www.iana.org/assignments">
<value>42-255</value>
<description>Unassigned</description>
</record>
"""
record = ET.fromstring(xml_str)
ns = {"iana": "http://www.iana.org/assignments"}
assert is_unassigned(record, ns) is True
def test_is_unassigned_hex_range(self) -> None:
"""Test detection of hex range values."""
xml_str = """
<record xmlns="http://www.iana.org/assignments">
<value>0x0000-0x0200</value>
<description>Reserved for backward compatibility</description>
</record>
"""
record = ET.fromstring(xml_str)
ns = {"iana": "http://www.iana.org/assignments"}
assert is_unassigned(record, ns) is True
def test_is_unassigned_false(self) -> None:
"""Test that assigned entries return False."""
xml_path = "tests/fixtures/iana_xml/tls-parameters-minimal.xml"
root, ns = parse_xml_with_namespace_support(xml_path)
registry = find_registry(root, "tls-parameters-4", ns)
records = registry.findall("iana:record", ns)
first_record = records[0]
assert is_unassigned(first_record, ns) is False

324
tests/test_iana_update.py Normal file
View File

@@ -0,0 +1,324 @@
"""Tests for IANA update functionality."""
import sqlite3
import pytest
from sslysze_scan.commands.update_iana import (
calculate_diff,
process_registry_with_validation,
)
from sslysze_scan.iana_validator import ValidationError
class TestCalculateDiff:
"""Tests for diff calculation between old and new data."""
def test_calculate_diff_no_changes(self) -> None:
"""Test diff calculation when data is unchanged."""
rows = [
("0x13,0x01", "TLS_AES_128_GCM_SHA256", "Y", "Y", "rfc8446"),
("0x13,0x02", "TLS_AES_256_GCM_SHA384", "Y", "Y", "rfc8446"),
]
diff = calculate_diff(rows, rows)
assert len(diff["added"]) == 0
assert len(diff["deleted"]) == 0
assert len(diff["modified"]) == 0
def test_calculate_diff_added_rows(self) -> None:
"""Test diff calculation with added rows."""
old_rows = [
("0x13,0x01", "TLS_AES_128_GCM_SHA256", "Y", "Y", "rfc8446"),
]
new_rows = [
("0x13,0x01", "TLS_AES_128_GCM_SHA256", "Y", "Y", "rfc8446"),
("0x13,0x02", "TLS_AES_256_GCM_SHA384", "Y", "Y", "rfc8446"),
]
diff = calculate_diff(old_rows, new_rows)
assert len(diff["added"]) == 1
assert "0x13,0x02" in diff["added"]
assert len(diff["deleted"]) == 0
assert len(diff["modified"]) == 0
def test_calculate_diff_deleted_rows(self) -> None:
"""Test diff calculation with deleted rows."""
old_rows = [
("0x13,0x01", "TLS_AES_128_GCM_SHA256", "Y", "Y", "rfc8446"),
("0x13,0x02", "TLS_AES_256_GCM_SHA384", "Y", "Y", "rfc8446"),
]
new_rows = [
("0x13,0x01", "TLS_AES_128_GCM_SHA256", "Y", "Y", "rfc8446"),
]
diff = calculate_diff(old_rows, new_rows)
assert len(diff["added"]) == 0
assert len(diff["deleted"]) == 1
assert "0x13,0x02" in diff["deleted"]
assert len(diff["modified"]) == 0
def test_calculate_diff_modified_rows(self) -> None:
"""Test diff calculation with modified rows."""
old_rows = [
("0x13,0x01", "TLS_AES_128_GCM_SHA256", "Y", "Y", "rfc8446"),
]
new_rows = [
("0x13,0x01", "TLS_AES_128_GCM_SHA256", "Y", "N", "rfc8446"),
]
diff = calculate_diff(old_rows, new_rows)
assert len(diff["added"]) == 0
assert len(diff["deleted"]) == 0
assert len(diff["modified"]) == 1
assert "0x13,0x01" in diff["modified"]
def test_calculate_diff_mixed_changes(self) -> None:
"""Test diff calculation with mixed changes."""
old_rows = [
("0x13,0x01", "TLS_AES_128_GCM_SHA256", "Y", "Y", "rfc8446"),
("0x13,0x02", "TLS_AES_256_GCM_SHA384", "Y", "Y", "rfc8446"),
("0x00,0x9C", "TLS_RSA_WITH_AES_128_GCM_SHA256", "Y", "N", "rfc5288"),
]
new_rows = [
("0x13,0x01", "TLS_AES_128_GCM_SHA256", "Y", "N", "rfc8446"),
("0x13,0x03", "TLS_CHACHA20_POLY1305_SHA256", "Y", "Y", "rfc8446"),
("0x00,0x9C", "TLS_RSA_WITH_AES_128_GCM_SHA256", "Y", "N", "rfc5288"),
]
diff = calculate_diff(old_rows, new_rows)
assert len(diff["added"]) == 1
assert "0x13,0x03" in diff["added"]
assert len(diff["deleted"]) == 1
assert "0x13,0x02" in diff["deleted"]
assert len(diff["modified"]) == 1
assert "0x13,0x01" in diff["modified"]
class TestProcessRegistryWithValidation:
"""Tests for registry processing with validation."""
def test_process_valid_registry(self, test_db_path: str) -> None:
"""Test processing valid registry data."""
xml_path = "tests/fixtures/iana_xml/tls-parameters-minimal.xml"
with open(xml_path, encoding="utf-8") as f:
xml_content = f.read()
conn = sqlite3.connect(test_db_path)
headers = ["Value", "Description", "DTLS", "Recommended", "RFC/Draft"]
row_count, diff = process_registry_with_validation(
xml_content,
"tls-parameters-4",
"iana_tls_cipher_suites",
headers,
conn,
skip_min_rows_check=True,
)
assert row_count == 5
assert isinstance(diff, dict)
assert "added" in diff
assert "deleted" in diff
assert "modified" in diff
conn.close()
def test_process_registry_invalid_headers(self, test_db_path: str) -> None:
"""Test that invalid headers raise ValidationError."""
xml_path = "tests/fixtures/iana_xml/tls-parameters-minimal.xml"
with open(xml_path, encoding="utf-8") as f:
xml_content = f.read()
conn = sqlite3.connect(test_db_path)
headers = ["Value", "Name"]
with pytest.raises(ValidationError, match="Column count mismatch"):
process_registry_with_validation(
xml_content,
"tls-parameters-4",
"iana_tls_cipher_suites",
headers,
conn,
skip_min_rows_check=True,
)
conn.close()
def test_process_registry_nonexistent_registry_id(self, test_db_path: str) -> None:
"""Test that nonexistent registry ID raises ValueError."""
xml_path = "tests/fixtures/iana_xml/tls-parameters-minimal.xml"
with open(xml_path, encoding="utf-8") as f:
xml_content = f.read()
conn = sqlite3.connect(test_db_path)
headers = ["Value", "Description", "DTLS", "Recommended", "RFC/Draft"]
with pytest.raises(ValueError, match="Registry .* nicht gefunden"):
process_registry_with_validation(
xml_content,
"nonexistent-registry",
"iana_tls_cipher_suites",
headers,
conn,
skip_min_rows_check=True,
)
conn.close()
class TestTransactionHandling:
"""Tests for database transaction handling."""
def test_transaction_rollback_preserves_data(self, test_db_path: str) -> None:
"""Test that rollback preserves original data."""
conn = sqlite3.connect(test_db_path)
cursor = conn.cursor()
original_count = cursor.execute(
"SELECT COUNT(*) FROM iana_tls_cipher_suites"
).fetchone()[0]
conn.execute("BEGIN TRANSACTION")
cursor.execute("DELETE FROM iana_tls_cipher_suites")
after_delete = cursor.execute(
"SELECT COUNT(*) FROM iana_tls_cipher_suites"
).fetchone()[0]
assert after_delete == 0
conn.rollback()
after_rollback = cursor.execute(
"SELECT COUNT(*) FROM iana_tls_cipher_suites"
).fetchone()[0]
assert after_rollback == original_count
conn.close()
def test_transaction_commit_persists_changes(self, test_db_path: str) -> None:
"""Test that commit persists changes."""
conn = sqlite3.connect(test_db_path)
cursor = conn.cursor()
conn.execute("BEGIN TRANSACTION")
cursor.execute(
"""
INSERT INTO iana_tls_cipher_suites
VALUES ('0xFF,0xFF', 'TEST_CIPHER', 'Y', 'N', 'test')
"""
)
conn.commit()
result = cursor.execute(
"""
SELECT COUNT(*) FROM iana_tls_cipher_suites
WHERE value = '0xFF,0xFF'
"""
).fetchone()[0]
assert result == 1
cursor.execute("DELETE FROM iana_tls_cipher_suites WHERE value = '0xFF,0xFF'")
conn.commit()
conn.close()
class TestDiffCalculationEdgeCases:
"""Tests for edge cases in diff calculation."""
def test_calculate_diff_empty_old_rows(self) -> None:
"""Test diff with empty old rows (initial import)."""
old_rows = []
new_rows = [
("0x13,0x01", "TLS_AES_128_GCM_SHA256", "Y", "Y", "rfc8446"),
("0x13,0x02", "TLS_AES_256_GCM_SHA384", "Y", "Y", "rfc8446"),
]
diff = calculate_diff(old_rows, new_rows)
assert len(diff["added"]) == 2
assert len(diff["deleted"]) == 0
assert len(diff["modified"]) == 0
def test_calculate_diff_empty_new_rows(self) -> None:
"""Test diff with empty new rows (complete deletion)."""
old_rows = [
("0x13,0x01", "TLS_AES_128_GCM_SHA256", "Y", "Y", "rfc8446"),
]
new_rows = []
diff = calculate_diff(old_rows, new_rows)
assert len(diff["added"]) == 0
assert len(diff["deleted"]) == 1
assert len(diff["modified"]) == 0
def test_calculate_diff_different_pk_index(self) -> None:
"""Test diff calculation with different primary key index."""
old_rows = [
("desc1", "0x01", "Y"),
("desc2", "0x02", "Y"),
]
new_rows = [
("desc1", "0x01", "N"),
("desc3", "0x03", "Y"),
]
diff = calculate_diff(old_rows, new_rows, pk_index=1)
assert "0x03" in diff["added"]
assert "0x02" in diff["deleted"]
assert "0x01" in diff["modified"]
class TestConsecutiveUpdates:
"""Tests for consecutive IANA updates."""
def test_consecutive_updates_show_no_changes(self, test_db_path: str) -> None:
"""Test that second update with same data shows no changes."""
xml_path = "tests/fixtures/iana_xml/tls-parameters-minimal.xml"
with open(xml_path, encoding="utf-8") as f:
xml_content = f.read()
conn = sqlite3.connect(test_db_path)
headers = ["Value", "Description", "DTLS", "Recommended", "RFC/Draft"]
row_count_1, diff_1 = process_registry_with_validation(
xml_content,
"tls-parameters-4",
"iana_tls_cipher_suites",
headers,
conn,
skip_min_rows_check=True,
)
row_count_2, diff_2 = process_registry_with_validation(
xml_content,
"tls-parameters-4",
"iana_tls_cipher_suites",
headers,
conn,
skip_min_rows_check=True,
)
assert row_count_1 == row_count_2
assert len(diff_2["added"]) == 0
assert len(diff_2["deleted"]) == 0
assert len(diff_2["modified"]) == 0
conn.close()

View File

@@ -0,0 +1,232 @@
"""Tests for IANA data validators."""
import sqlite3
import pytest
from sslysze_scan.iana_validator import (
ValidationError,
get_min_rows,
normalize_header,
validate_cipher_suite_row,
validate_headers,
validate_ikev2_row,
validate_registry_data,
validate_signature_schemes_row,
validate_supported_groups_row,
)
class TestNormalizeHeader:
"""Tests for header normalization."""
def test_normalize_combined(self) -> None:
"""Test combined normalization."""
assert normalize_header("RFC/Draft") == "rfc_draft"
assert normalize_header("Recommended") == "recommended"
class TestValidateHeaders:
"""Tests for header validation against database schema."""
def test_validate_headers_matching(self, test_db_path: str) -> None:
"""Test that correct headers pass validation."""
conn = sqlite3.connect(test_db_path)
headers = ["Value", "Description", "DTLS", "Recommended", "RFC/Draft"]
validate_headers("iana_tls_cipher_suites", headers, conn)
conn.close()
def test_validate_headers_mismatch_count(self, test_db_path: str) -> None:
"""Test that wrong number of columns raises error."""
conn = sqlite3.connect(test_db_path)
headers = ["Value", "Description"]
with pytest.raises(ValidationError, match="Column count mismatch"):
validate_headers("iana_tls_cipher_suites", headers, conn)
conn.close()
def test_validate_headers_mismatch_name(self, test_db_path: str) -> None:
"""Test that wrong column name raises error."""
conn = sqlite3.connect(test_db_path)
headers = ["Value", "Name", "DTLS", "Recommended", "RFC/Draft"]
with pytest.raises(ValidationError, match="Column .* mismatch"):
validate_headers("iana_tls_cipher_suites", headers, conn)
conn.close()
class TestCipherSuiteValidation:
"""Tests for cipher suite data validation."""
def test_valid_cipher_suite(self) -> None:
"""Test that valid cipher suite passes."""
row = {
"value": "0x13,0x01",
"description": "TLS_AES_128_GCM_SHA256",
"dtls": "Y",
"recommended": "Y",
"rfc_draft": "rfc: rfc8446",
}
validate_cipher_suite_row(row)
def test_missing_required_field(self) -> None:
"""Test that missing value field raises error."""
row = {"description": "TLS_AES_128_GCM_SHA256"}
with pytest.raises(ValidationError, match="Missing required field"):
validate_cipher_suite_row(row)
def test_invalid_value_format(self) -> None:
"""Test that invalid value format raises error."""
row = {
"value": "1301",
"description": "TLS_AES_128_GCM_SHA256",
}
with pytest.raises(ValidationError, match="Invalid value format"):
validate_cipher_suite_row(row)
def test_invalid_recommended_value(self) -> None:
"""Test that invalid Recommended value raises error."""
row = {
"value": "0x13,0x01",
"description": "TLS_AES_128_GCM_SHA256",
"recommended": "X",
}
with pytest.raises(ValidationError, match="Invalid Recommended value"):
validate_cipher_suite_row(row)
def test_empty_recommended_valid(self) -> None:
"""Test that empty Recommended field is valid."""
row = {
"value": "0x13,0x01",
"description": "TLS_AES_128_GCM_SHA256",
"recommended": "",
}
validate_cipher_suite_row(row)
class TestSupportedGroupsValidation:
"""Tests for supported groups data validation."""
def test_valid_supported_group(self) -> None:
"""Test that valid supported group passes."""
row = {
"value": "23",
"description": "secp256r1",
"recommended": "Y",
}
validate_supported_groups_row(row)
def test_invalid_value_non_numeric(self) -> None:
"""Test that non-numeric value raises error."""
row = {"value": "0x17", "description": "secp256r1"}
with pytest.raises(ValidationError, match="Value must be numeric"):
validate_supported_groups_row(row)
class TestSignatureSchemesValidation:
"""Tests for signature schemes data validation."""
def test_valid_signature_scheme(self) -> None:
"""Test that valid signature scheme passes."""
row = {
"value": "0x0403",
"description": "ecdsa_secp256r1_sha256",
"recommended": "Y",
}
validate_signature_schemes_row(row)
def test_invalid_value_format(self) -> None:
"""Test that invalid value format raises error."""
row = {"value": "0403", "description": "ecdsa_secp256r1_sha256"}
with pytest.raises(ValidationError, match="Invalid value format"):
validate_signature_schemes_row(row)
class TestIKEv2Validation:
"""Tests for IKEv2 data validation."""
def test_valid_ikev2_row(self) -> None:
"""Test that valid IKEv2 row passes."""
row = {
"value": "12",
"description": "ENCR_AES_CBC",
"esp": "Y",
"ikev2": "Y",
}
validate_ikev2_row(row)
def test_invalid_value_non_numeric(self) -> None:
"""Test that non-numeric value raises error."""
row = {"value": "0x0C", "description": "ENCR_AES_CBC"}
with pytest.raises(ValidationError, match="Value must be numeric"):
validate_ikev2_row(row)
class TestGetMinRows:
"""Tests for minimum row count lookup."""
def test_get_min_rows_unknown_table(self) -> None:
"""Test that unknown tables return default minimum."""
assert get_min_rows("iana_unknown_table") == 5
class TestValidateRegistryData:
"""Tests for complete registry data validation."""
def test_validate_registry_sufficient_rows(self) -> None:
"""Test that sufficient rows pass validation."""
rows = [
{
"value": f"0x13,0x{i:02x}",
"description": f"Cipher_{i}",
"dtls": "Y",
"recommended": "Y",
"rfc_draft": "rfc: rfc8446",
}
for i in range(60)
]
validate_registry_data("iana_tls_cipher_suites", rows)
def test_validate_registry_insufficient_rows(self) -> None:
"""Test that insufficient rows raise error."""
rows = [
{
"value": "0x13,0x01",
"description": "Cipher_1",
"dtls": "Y",
"recommended": "Y",
"rfc_draft": "rfc: rfc8446",
}
]
with pytest.raises(ValidationError, match="Insufficient data"):
validate_registry_data("iana_tls_cipher_suites", rows)
def test_validate_registry_invalid_row(self) -> None:
"""Test that invalid row in dataset raises error."""
rows = [
{
"value": f"0x13,0x{i:02x}",
"description": f"Cipher_{i}",
"dtls": "Y",
"recommended": "Y",
"rfc_draft": "rfc: rfc8446",
}
for i in range(60)
]
rows[30]["value"] = "invalid"
with pytest.raises(ValidationError, match="Row 31"):
validate_registry_data("iana_tls_cipher_suites", rows)
def test_validate_registry_no_validator(self) -> None:
"""Test that tables without validator pass basic validation."""
rows = [{"value": "1", "description": f"Item_{i}"} for i in range(10)]
validate_registry_data("iana_tls_alerts", rows)

View File

@@ -5,21 +5,10 @@ from typing import Any
from sslysze_scan.reporter.template_utils import ( from sslysze_scan.reporter.template_utils import (
build_template_context, build_template_context,
format_tls_version,
generate_report_id, generate_report_id,
) )
class TestFormatTlsVersion:
"""Tests for format_tls_version function."""
def test_format_tls_version_all_versions(self) -> None:
"""Test formatting all known TLS versions."""
versions = ["1.0", "1.1", "1.2", "1.3", "ssl_3.0", "unknown"]
expected = ["TLS 1.0", "TLS 1.1", "TLS 1.2", "TLS 1.3", "SSL 3.0", "unknown"]
assert [format_tls_version(v) for v in versions] == expected
class TestGenerateReportId: class TestGenerateReportId:
"""Tests for generate_report_id function.""" """Tests for generate_report_id function."""