From 2b27138b2a32ccc46ee71b36bff3fc5a34e15344 Mon Sep 17 00:00:00 2001 From: Heiko Date: Tue, 20 Jan 2026 19:40:29 +0100 Subject: [PATCH] refactor: split large functions and reduce code duplication --- docs/detailed-guide.md | 18 ++- src/sslysze_scan/db/writer.py | 254 +++++++++++++++++++--------------- 2 files changed, 156 insertions(+), 116 deletions(-) diff --git a/docs/detailed-guide.md b/docs/detailed-guide.md index 74fac1f..65112a9 100644 --- a/docs/detailed-guide.md +++ b/docs/detailed-guide.md @@ -321,12 +321,18 @@ tests/ ### Database Writing -| Function | Module | Purpose | -| ---------------------------------------------------------------------------- | ------------------ | --------------------------------------- | -| `save_scan_results(db_path, hostname, ports, results, start_time, duration)` | `db/writer.py` | Store all scan results, returns scan_id | -| `check_compliance(db_path, scan_id)` | `db/compliance.py` | Validate compliance, returns statistics | -| `check_schema_version(db_path)` | `db/schema.py` | Verify schema compatibility | -| `get_schema_version(db_path)` | `db/schema.py` | Get current schema version | +| Function | Module | Purpose | +| -------------------------------------------------------------------------------------- | ------------------ | ----------------------------------------------- | +| `save_scan_results(db_path, hostname, ports, results, start_time, duration)` | `db/writer.py` | Store all scan results, returns scan_id | +| `check_compliance(db_path, scan_id)` | `db/compliance.py` | Validate compliance, returns statistics | +| `check_schema_version(db_path)` | `db/schema.py` | Verify schema compatibility | +| `get_schema_version(db_path)` | `db/schema.py` | Get current schema version | +| `_save_session_features(cursor, scan_id, port, scan_result)` | `db/writer.py` | Save session renegotiation and resumption data | +| `_save_session_renegotiation(cursor, scan_id, port, renegotiation_result)` | `db/writer.py` | Save session renegotiation data | +| `_save_session_resumption(cursor, scan_id, port, resumption_result)` | `db/writer.py` | Save session resumption data | +| `_extract_resumption_data(resumption_result)` | `db/writer.py` | Extract session resumption data from result | +| `_save_cipher_suites(cursor, scan_id, port, scan_result, tls_version)` | `db/writer.py` | Save cipher suites for specific TLS version | +| `_save_cipher_suite_list(cursor, scan_id, port, tls_version, cipher_suites, accepted)` | `db/writer.py` | Helper function to save a list of cipher suites | ### Database Querying diff --git a/src/sslysze_scan/db/writer.py b/src/sslysze_scan/db/writer.py index c2fadd8..4a696a0 100644 --- a/src/sslysze_scan/db/writer.py +++ b/src/sslysze_scan/db/writer.py @@ -252,8 +252,32 @@ def _save_cipher_suites( if not cipher_result: return - # Insert accepted cipher suites - for accepted_cipher in cipher_result.accepted_cipher_suites: + # Save accepted and rejected cipher suites + _save_cipher_suite_list( + cursor, scan_id, port, tls_version, cipher_result.accepted_cipher_suites, True + ) + + if hasattr(cipher_result, "rejected_cipher_suites"): + _save_cipher_suite_list( + cursor, + scan_id, + port, + tls_version, + cipher_result.rejected_cipher_suites, + False, + ) + + +def _save_cipher_suite_list( + cursor: sqlite3.Cursor, + scan_id: int, + port: int, + tls_version: str, + cipher_suites: list, + accepted: bool, +) -> None: + """Helper function to save a list of cipher suites.""" + for cipher in cipher_suites: cursor.execute( """ INSERT INTO scan_cipher_suites ( @@ -265,36 +289,14 @@ def _save_cipher_suites( scan_id, port, tls_version, - accepted_cipher.cipher_suite.name, - True, + cipher.cipher_suite.name, + accepted, None, # IANA value mapping would go here - accepted_cipher.cipher_suite.key_size, - accepted_cipher.cipher_suite.is_anonymous, + cipher.cipher_suite.key_size, + cipher.cipher_suite.is_anonymous, ), ) - # Insert rejected cipher suites (if available) - if hasattr(cipher_result, "rejected_cipher_suites"): - for rejected_cipher in cipher_result.rejected_cipher_suites: - cursor.execute( - """ - INSERT INTO scan_cipher_suites ( - scan_id, port, tls_version, cipher_suite_name, accepted, - iana_value, key_size, is_anonymous - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) - """, - ( - scan_id, - port, - tls_version, - rejected_cipher.cipher_suite.name, - False, - None, - rejected_cipher.cipher_suite.key_size, - rejected_cipher.cipher_suite.is_anonymous, - ), - ) - def _save_supported_groups( cursor: sqlite3.Cursor, @@ -726,96 +728,128 @@ def _save_session_features( # Session Renegotiation renegotiation_attempt = scan_result.scan_result.session_renegotiation if renegotiation_attempt.status == ScanCommandAttemptStatusEnum.COMPLETED: - renegotiation_result = renegotiation_attempt.result - if renegotiation_result: - client_initiated = ( - hasattr(renegotiation_result, "is_client_renegotiation_supported") - and renegotiation_result.is_client_renegotiation_supported - ) - secure = ( - hasattr(renegotiation_result, "supports_secure_renegotiation") - and renegotiation_result.supports_secure_renegotiation - ) - cursor.execute( - """ - INSERT INTO scan_session_features ( - scan_id, port, feature_type, client_initiated, secure, - session_id_supported, ticket_supported, - attempted_resumptions, successful_resumptions, details - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, - ( - scan_id, - port, - "session_renegotiation", - client_initiated, - secure, - None, - None, - None, - None, - None, - ), - ) + _save_session_renegotiation(cursor, scan_id, port, renegotiation_attempt.result) # Session Resumption resumption_attempt = scan_result.scan_result.session_resumption if resumption_attempt.status == ScanCommandAttemptStatusEnum.COMPLETED: - resumption_result = resumption_attempt.result - if resumption_result: - session_id_supported = False - ticket_supported = False - attempted = 0 - successful = 0 + _save_session_resumption(cursor, scan_id, port, resumption_attempt.result) - if hasattr(resumption_result, "session_id_resumption_result"): - session_id_resumption = resumption_result.session_id_resumption_result - if session_id_resumption: - session_id_supported = ( - hasattr( - session_id_resumption, - "is_session_id_resumption_supported", - ) - and session_id_resumption.is_session_id_resumption_supported - ) - if hasattr(session_id_resumption, "attempted_resumptions_count"): - attempted += session_id_resumption.attempted_resumptions_count - if hasattr(session_id_resumption, "successful_resumptions_count"): - successful += session_id_resumption.successful_resumptions_count - if hasattr(resumption_result, "tls_ticket_resumption_result"): - ticket_resumption = resumption_result.tls_ticket_resumption_result - if ticket_resumption: - ticket_supported = ( - hasattr(ticket_resumption, "is_tls_ticket_resumption_supported") - and ticket_resumption.is_tls_ticket_resumption_supported - ) - if hasattr(ticket_resumption, "attempted_resumptions_count"): - attempted += ticket_resumption.attempted_resumptions_count - if hasattr(ticket_resumption, "successful_resumptions_count"): - successful += ticket_resumption.successful_resumptions_count +def _save_session_renegotiation( + cursor: sqlite3.Cursor, + scan_id: int, + port: int, + renegotiation_result: Any, +) -> None: + """Save session renegotiation data.""" + if not renegotiation_result: + return - cursor.execute( - """ - INSERT INTO scan_session_features ( - scan_id, port, feature_type, client_initiated, secure, - session_id_supported, ticket_supported, - attempted_resumptions, successful_resumptions, details - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, - ( - scan_id, - port, - "session_resumption", - None, - None, - session_id_supported, - ticket_supported, - attempted, - successful, - None, - ), + client_initiated = ( + hasattr(renegotiation_result, "is_client_renegotiation_supported") + and renegotiation_result.is_client_renegotiation_supported + ) + secure = ( + hasattr(renegotiation_result, "supports_secure_renegotiation") + and renegotiation_result.supports_secure_renegotiation + ) + + cursor.execute( + """ + INSERT INTO scan_session_features ( + scan_id, port, feature_type, client_initiated, secure, + session_id_supported, ticket_supported, + attempted_resumptions, successful_resumptions, details + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + scan_id, + port, + "session_renegotiation", + client_initiated, + secure, + None, + None, + None, + None, + None, + ), + ) + + +def _save_session_resumption( + cursor: sqlite3.Cursor, + scan_id: int, + port: int, + resumption_result: Any, +) -> None: + """Save session resumption data.""" + if not resumption_result: + return + + session_id_supported, ticket_supported, attempted, successful = ( + _extract_resumption_data(resumption_result) + ) + + cursor.execute( + """ + INSERT INTO scan_session_features ( + scan_id, port, feature_type, client_initiated, secure, + session_id_supported, ticket_supported, + attempted_resumptions, successful_resumptions, details + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + scan_id, + port, + "session_resumption", + None, + None, + session_id_supported, + ticket_supported, + attempted, + successful, + None, + ), + ) + + +def _extract_resumption_data(resumption_result: Any) -> tuple[bool, bool, int, int]: + """Extract session resumption data from result.""" + session_id_supported = False + ticket_supported = False + attempted = 0 + successful = 0 + + if hasattr(resumption_result, "session_id_resumption_result"): + session_id_resumption = resumption_result.session_id_resumption_result + if session_id_resumption: + session_id_supported = ( + hasattr( + session_id_resumption, + "is_session_id_resumption_supported", + ) + and session_id_resumption.is_session_id_resumption_supported ) + if hasattr(session_id_resumption, "attempted_resumptions_count"): + attempted += session_id_resumption.attempted_resumptions_count + if hasattr(session_id_resumption, "successful_resumptions_count"): + successful += session_id_resumption.successful_resumptions_count + + if hasattr(resumption_result, "tls_ticket_resumption_result"): + ticket_resumption = resumption_result.tls_ticket_resumption_result + if ticket_resumption: + ticket_supported = ( + hasattr(ticket_resumption, "is_tls_ticket_resumption_supported") + and ticket_resumption.is_tls_ticket_resumption_supported + ) + if hasattr(ticket_resumption, "attempted_resumptions_count"): + attempted += ticket_resumption.attempted_resumptions_count + if hasattr(ticket_resumption, "successful_resumptions_count"): + successful += ticket_resumption.successful_resumptions_count + + return session_id_supported, ticket_supported, attempted, successful def _save_http_headers(