"""Tests for detecting duplicate entries in compliance checks.""" import sqlite3 from datetime import UTC, datetime from sslysze_scan.db.compliance import check_compliance from sslysze_scan.db.writer import write_scan_results def test_compliance_no_duplicate_cipher_suite_checks(test_db_path): """Test that each cipher suite is checked only once per port in compliance.""" db_path = test_db_path # Create scan results with cipher suites tested across multiple TLS versions scan_results = { 443: { "cipher_suites": [ # Same cipher suite in multiple TLS versions ("TLS 1.0", "TLS_RSA_WITH_AES_128_CBC_SHA", False), ("TLS 1.1", "TLS_RSA_WITH_AES_128_CBC_SHA", False), ("TLS 1.2", "TLS_RSA_WITH_AES_128_CBC_SHA", True), ("TLS 1.2", "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", True), ("TLS 1.3", "TLS_AES_256_GCM_SHA384", True), ], "supported_groups": ["secp256r1"], "certificates": [ { "subject": "CN=example.com", "key_type": "RSA", "key_bits": 2048, "signature_algorithm": "sha256WithRSAEncryption", } ], } } scan_id = write_scan_results( db_path=db_path, hostname="example.com", ports=[443], scan_results=scan_results, scan_start_time=datetime.now(UTC), scan_duration=1.0, ) check_compliance(db_path, scan_id) # Query compliance status for cipher suites conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute( """ SELECT item_name, COUNT(*) as count FROM scan_compliance_status WHERE scan_id = ? AND port = 443 AND check_type = 'cipher_suite' GROUP BY item_name HAVING count > 1 """, (scan_id,), ) duplicates = cursor.fetchall() conn.close() assert len(duplicates) == 0, ( f"Found duplicate cipher suite checks: {duplicates}. " "Each cipher suite should only be checked once per port." ) def test_compliance_no_duplicate_supported_group_checks(test_db_path): """Test that each supported group is checked only once per port in compliance.""" db_path = test_db_path scan_results = { 443: { "cipher_suites": [ ("TLS 1.2", "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", True), ], "supported_groups": [ "secp256r1", "secp384r1", "secp521r1", ], "certificates": [], } } scan_id = write_scan_results( db_path=db_path, hostname="example.com", ports=[443], scan_results=scan_results, scan_start_time=datetime.now(UTC), scan_duration=1.0, ) check_compliance(db_path, scan_id) conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute( """ SELECT item_name, COUNT(*) as count FROM scan_compliance_status WHERE scan_id = ? AND port = 443 AND check_type = 'supported_group' GROUP BY item_name HAVING count > 1 """, (scan_id,), ) duplicates = cursor.fetchall() conn.close() assert len(duplicates) == 0, ( f"Found duplicate supported group checks: {duplicates}. " "Each group should only be checked once per port." ) def test_compliance_no_duplicate_certificate_checks(test_db_path): """Test that each certificate is checked only once per port in compliance.""" db_path = test_db_path scan_results = { 443: { "cipher_suites": [ ("TLS 1.2", "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", True), ], "supported_groups": ["secp256r1"], "certificates": [ { "subject": "CN=example.com", "key_type": "RSA", "key_bits": 2048, "signature_algorithm": "sha256WithRSAEncryption", }, { "subject": "CN=Root CA", "key_type": "RSA", "key_bits": 4096, "signature_algorithm": "sha256WithRSAEncryption", }, ], } } scan_id = write_scan_results( db_path=db_path, hostname="example.com", ports=[443], scan_results=scan_results, scan_start_time=datetime.now(UTC), scan_duration=1.0, ) check_compliance(db_path, scan_id) conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute( """ SELECT item_name, COUNT(*) as count FROM scan_compliance_status WHERE scan_id = ? AND port = 443 AND check_type = 'certificate' GROUP BY item_name HAVING count > 1 """, (scan_id,), ) duplicates = cursor.fetchall() conn.close() assert len(duplicates) == 0, ( f"Found duplicate certificate checks: {duplicates}. " "Each certificate should only be checked once per port." ) def test_compliance_count_matches_unique_scan_data(test_db_path): """Test that compliance check count matches unique items in scan data.""" db_path = test_db_path scan_results = { 443: { "cipher_suites": [ ("TLS 1.0", "TLS_RSA_WITH_AES_128_CBC_SHA", False), ("TLS 1.1", "TLS_RSA_WITH_AES_128_CBC_SHA", False), ("TLS 1.2", "TLS_RSA_WITH_AES_128_CBC_SHA", True), ("TLS 1.2", "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", True), ("TLS 1.3", "TLS_AES_256_GCM_SHA384", True), ], "supported_groups": ["secp256r1", "secp384r1"], "certificates": [ { "subject": "CN=example.com", "key_type": "RSA", "key_bits": 2048, "signature_algorithm": "sha256WithRSAEncryption", } ], } } scan_id = write_scan_results( db_path=db_path, hostname="example.com", ports=[443], scan_results=scan_results, scan_start_time=datetime.now(UTC), scan_duration=1.0, ) check_compliance(db_path, scan_id) conn = sqlite3.connect(db_path) cursor = conn.cursor() # Count unique cipher suites in scan data cursor.execute( """ SELECT COUNT(DISTINCT cipher_suite_name) FROM scan_cipher_suites WHERE scan_id = ? AND port = 443 """, (scan_id,), ) unique_cipher_suites = cursor.fetchone()[0] # Count cipher suite compliance checks cursor.execute( """ SELECT COUNT(DISTINCT item_name) FROM scan_compliance_status WHERE scan_id = ? AND port = 443 AND check_type = 'cipher_suite' """, (scan_id,), ) compliance_cipher_suites = cursor.fetchone()[0] # Count unique groups in scan data cursor.execute( """ SELECT COUNT(DISTINCT group_name) FROM scan_supported_groups WHERE scan_id = ? AND port = 443 """, (scan_id,), ) unique_groups = cursor.fetchone()[0] # Count group compliance checks cursor.execute( """ SELECT COUNT(DISTINCT item_name) FROM scan_compliance_status WHERE scan_id = ? AND port = 443 AND check_type = 'supported_group' """, (scan_id,), ) compliance_groups = cursor.fetchone()[0] conn.close() assert unique_cipher_suites == compliance_cipher_suites, ( f"Mismatch: {unique_cipher_suites} unique cipher suites in scan data, " f"but {compliance_cipher_suites} compliance checks" ) assert unique_groups == compliance_groups, ( f"Mismatch: {unique_groups} unique groups in scan data, " f"but {compliance_groups} compliance checks" ) def test_csv_export_no_duplicates(test_db_path): """Test that CSV exports contain no duplicate rows for same cipher suite.""" db_path = test_db_path scan_results = { 443: { "cipher_suites": [ ("TLS 1.0", "TLS_RSA_WITH_AES_128_CBC_SHA", False), ("TLS 1.1", "TLS_RSA_WITH_AES_128_CBC_SHA", False), ("TLS 1.2", "TLS_RSA_WITH_AES_128_CBC_SHA", True), ], "supported_groups": ["secp256r1", "secp384r1"], "certificates": [], } } scan_id = write_scan_results( db_path=db_path, hostname="example.com", ports=[443], scan_results=scan_results, scan_start_time=datetime.now(UTC), scan_duration=1.0, ) check_compliance(db_path, scan_id) # Query compliance view used for CSV export conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute( """ SELECT cipher_suite_name, COUNT(*) as count FROM v_compliance_tls_cipher_suites WHERE scan_id = ? AND port = 443 GROUP BY cipher_suite_name HAVING count > 1 """, (scan_id,), ) cipher_duplicates = cursor.fetchall() cursor.execute( """ SELECT group_name, COUNT(*) as count FROM v_compliance_tls_supported_groups WHERE scan_id = ? AND port = 443 GROUP BY group_name HAVING count > 1 """, (scan_id,), ) group_duplicates = cursor.fetchall() conn.close() assert len(cipher_duplicates) == 0, ( f"Found duplicate cipher suites in CSV view: {cipher_duplicates}" ) assert len(group_duplicates) == 0, ( f"Found duplicate groups in CSV view: {group_duplicates}" )