Files
compliance-scan/src/sslysze_scan/iana_validator.py
2025-12-19 20:10:39 +01:00

228 lines
6.1 KiB
Python

"""Validation functions for IANA registry data."""
import sqlite3
class ValidationError(Exception):
"""Raised when IANA data validation fails."""
pass
def normalize_header(header: str) -> str:
"""Normalize header name to database column format.
Args:
header: Header name from JSON config
Returns:
Normalized column name (lowercase, / replaced with _)
"""
return header.lower().replace("/", "_")
def validate_headers(
table_name: str,
headers: list[str],
db_conn: sqlite3.Connection,
) -> None:
"""Validate that headers match database schema.
Args:
table_name: Database table name
headers: Headers from JSON config
db_conn: Database connection
Raises:
ValidationError: If headers don't match schema
"""
cursor = db_conn.cursor()
cursor.execute(f"PRAGMA table_info({table_name})")
db_columns = [row[1] for row in cursor.fetchall()]
normalized_headers = [normalize_header(h) for h in headers]
if len(normalized_headers) != len(db_columns):
raise ValidationError(
f"Column count mismatch for {table_name}: "
f"expected {len(db_columns)}, got {len(normalized_headers)}"
)
for i, (expected, actual) in enumerate(zip(db_columns, normalized_headers)):
if expected != actual:
raise ValidationError(
f"Column {i} mismatch for {table_name}: "
f"expected '{expected}', got '{actual}' "
f"(from header '{headers[i]}')"
)
def validate_cipher_suite_row(row: dict[str, str]) -> None:
"""Validate single cipher suite record.
Args:
row: Dictionary with column names as keys
Raises:
ValidationError: If data is invalid
"""
required_fields = ["value", "description"]
for field in required_fields:
if field not in row or not row[field]:
raise ValidationError(f"Missing required field: {field}")
value = row["value"]
if not value.startswith("0x"):
raise ValidationError(f"Invalid value format: {value}")
rec = row.get("recommended", "")
if rec and rec not in ["Y", "N", "D"]:
raise ValidationError(f"Invalid Recommended value: {rec}")
def validate_supported_groups_row(row: dict[str, str]) -> None:
"""Validate single supported groups record.
Args:
row: Dictionary with column names as keys
Raises:
ValidationError: If data is invalid
"""
required_fields = ["value", "description"]
for field in required_fields:
if field not in row or not row[field]:
raise ValidationError(f"Missing required field: {field}")
try:
int(row["value"])
except ValueError as e:
raise ValidationError(f"Value must be numeric: {row['value']}") from e
rec = row.get("recommended", "")
if rec and rec not in ["Y", "N", "D"]:
raise ValidationError(f"Invalid Recommended value: {rec}")
def validate_signature_schemes_row(row: dict[str, str]) -> None:
"""Validate single signature schemes record.
Args:
row: Dictionary with column names as keys
Raises:
ValidationError: If data is invalid
"""
required_fields = ["value", "description"]
for field in required_fields:
if field not in row or not row[field]:
raise ValidationError(f"Missing required field: {field}")
value = row["value"]
if not value.startswith("0x"):
raise ValidationError(f"Invalid value format: {value}")
def validate_ikev2_row(row: dict[str, str]) -> None:
"""Validate IKEv2 record (encryption, DH groups, auth methods).
Args:
row: Dictionary with column names as keys
Raises:
ValidationError: If data is invalid
"""
required_fields = ["value", "description"]
for field in required_fields:
if field not in row or not row[field]:
raise ValidationError(f"Missing required field: {field}")
try:
int(row["value"])
except ValueError as e:
raise ValidationError(f"Value must be numeric: {row['value']}") from e
VALIDATORS = {
"iana_tls_cipher_suites": validate_cipher_suite_row,
"iana_tls_supported_groups": validate_supported_groups_row,
"iana_tls_signature_schemes": validate_signature_schemes_row,
"iana_ikev2_encryption_algorithms": validate_ikev2_row,
"iana_ikev2_dh_groups": validate_ikev2_row,
"iana_ikev2_authentication_methods": validate_ikev2_row,
"iana_ikev2_prf_algorithms": validate_ikev2_row,
"iana_ikev2_integrity_algorithms": validate_ikev2_row,
}
MIN_ROWS = {
"iana_tls_cipher_suites": 50,
"iana_tls_signature_schemes": 10,
"iana_tls_supported_groups": 10,
"iana_tls_alerts": 10,
"iana_tls_content_types": 5,
"iana_ikev2_encryption_algorithms": 10,
"iana_ikev2_prf_algorithms": 5,
"iana_ikev2_integrity_algorithms": 5,
"iana_ikev2_dh_groups": 10,
"iana_ikev2_authentication_methods": 5,
}
def get_min_rows(table_name: str) -> int:
"""Get minimum expected rows for table.
Args:
table_name: Database table name
Returns:
Minimum number of rows expected
"""
return MIN_ROWS.get(table_name, 5)
def validate_registry_data(
table_name: str,
rows: list[dict[str, str]],
skip_min_rows_check: bool = False,
) -> None:
"""Validate complete registry data before DB write.
Args:
table_name: Database table name
rows: List of row dictionaries
skip_min_rows_check: Skip minimum rows validation (for tests)
Raises:
ValidationError: If validation fails
"""
if not skip_min_rows_check:
min_rows = get_min_rows(table_name)
if len(rows) < min_rows:
raise ValidationError(
f"Insufficient data for {table_name}: "
f"{len(rows)} rows (expected >= {min_rows})"
)
validator = VALIDATORS.get(table_name)
if not validator:
return
for i, row in enumerate(rows, 1):
try:
validator(row)
except ValidationError as e:
raise ValidationError(f"Row {i} in {table_name}: {e}") from e