Created
November 20, 2025 23:58
-
-
Save philerooski/e2cb0bb380fd48ea18805aa7e9602d50 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Analyze errors from the LOAD_LOG table. | |
| This script queries the LOAD_LOG table for failed operations, | |
| categorizes errors by type, and groups data types by error category. | |
| This is a complementary script to https://gist.github.com/philerooski/a740b25f066f1ad205344637160aa969 | |
| """ | |
| import snowflake.connector | |
| from collections import defaultdict | |
| import re | |
| def categorize_error(error_message: str) -> str: | |
| """ | |
| Categorize an error message into a high-level issue type. | |
| Args: | |
| error_message: The error message from LOAD_LOG | |
| Returns: | |
| A high-level category for the error | |
| """ | |
| if not error_message: | |
| return "UNKNOWN" | |
| error_lower = error_message.lower() | |
| # UTF-8 / Encoding errors | |
| if "utf8" in error_lower or "utf-8" in error_lower or "encoding" in error_lower: | |
| return "ENCODING_ERROR" | |
| # Schema mismatch errors | |
| if "schema" in error_lower or "column" in error_lower and "mismatch" in error_lower: | |
| return "SCHEMA_MISMATCH" | |
| # File format errors | |
| if "parquet" in error_lower and ( | |
| "corrupt" in error_lower or "invalid" in error_lower | |
| ): | |
| return "CORRUPT_PARQUET" | |
| # Permission errors | |
| if ( | |
| "permission" in error_lower | |
| or "access denied" in error_lower | |
| or "unauthorized" in error_lower | |
| ): | |
| return "PERMISSION_ERROR" | |
| # File not found | |
| if "file not found" in error_lower or "does not exist" in error_lower: | |
| return "FILE_NOT_FOUND" | |
| # Data type conversion errors | |
| if "conversion" in error_lower or "cast" in error_lower or "type" in error_lower: | |
| return "DATA_TYPE_ERROR" | |
| # Null constraint violations | |
| if "null" in error_lower and ( | |
| "constraint" in error_lower or "not null" in error_lower | |
| ): | |
| return "NULL_CONSTRAINT" | |
| # Timeout errors | |
| if "timeout" in error_lower or "timed out" in error_lower: | |
| return "TIMEOUT" | |
| # Stage errors | |
| if "stage" in error_lower and "not found" in error_lower: | |
| return "STAGE_NOT_FOUND" | |
| # Infer schema errors | |
| if "infer_schema" in error_lower: | |
| return "INFER_SCHEMA_ERROR" | |
| # Generic SQL errors | |
| if "sql compilation error" in error_lower: | |
| return "SQL_COMPILATION_ERROR" | |
| return "OTHER" | |
| def analyze_load_errors(database=None, schema=None): | |
| """ | |
| Analyze errors from the LOAD_LOG table. | |
| Args: | |
| database: Optional database name to use | |
| schema: Optional schema name to use | |
| """ | |
| # Connect to Snowflake | |
| conn = snowflake.connector.connect() | |
| try: | |
| cursor = conn.cursor() | |
| # Set database and schema if provided | |
| if database: | |
| cursor.execute(f"USE DATABASE {database}") | |
| if schema: | |
| cursor.execute(f"USE SCHEMA {schema}") | |
| # Query for all failed operations | |
| print("Querying LOAD_LOG for errors...") | |
| cursor.execute( | |
| """ | |
| SELECT | |
| PREFIX, | |
| DATA_TYPE, | |
| STAGE_PATH, | |
| PHASE, | |
| ERROR_MESSAGE, | |
| LOG_TS | |
| FROM LOAD_LOG | |
| WHERE STATUS = 'FAIL' | |
| ORDER BY LOG_TS | |
| """ | |
| ) | |
| errors = cursor.fetchall() | |
| if not errors: | |
| print("\n✓ No errors found in LOAD_LOG!") | |
| return | |
| print(f"\nFound {len(errors)} error(s) in LOAD_LOG\n") | |
| # Categorize errors | |
| error_categories = defaultdict(list) | |
| for prefix, data_type, stage_path, phase, error_msg, log_ts in errors: | |
| category = categorize_error(error_msg) | |
| error_categories[category].append( | |
| { | |
| "prefix": prefix, | |
| "data_type": data_type, | |
| "stage_path": stage_path, | |
| "phase": phase, | |
| "error_message": error_msg, | |
| "timestamp": log_ts, | |
| } | |
| ) | |
| # Print summary by category | |
| print("=" * 80) | |
| print("ERROR ANALYSIS SUMMARY") | |
| print("=" * 80) | |
| print() | |
| for category in sorted(error_categories.keys()): | |
| errors_in_category = error_categories[category] | |
| data_types = list(set([e["data_type"] for e in errors_in_category])) | |
| print(f"📋 {category}") | |
| print(f" Count: {len(errors_in_category)} error(s)") | |
| print( | |
| f" Affected Data Types ({len(data_types)}): {', '.join(sorted(data_types))}" | |
| ) | |
| print() | |
| # Print detailed breakdown | |
| print("=" * 80) | |
| print("DETAILED ERROR BREAKDOWN") | |
| print("=" * 80) | |
| print() | |
| for category in sorted(error_categories.keys()): | |
| errors_in_category = error_categories[category] | |
| print(f"\n{'─' * 80}") | |
| print(f"Category: {category}") | |
| print(f"{'─' * 80}") | |
| # Group by data type within this category | |
| by_data_type = defaultdict(list) | |
| for error in errors_in_category: | |
| by_data_type[error["data_type"]].append(error) | |
| for data_type in sorted(by_data_type.keys()): | |
| data_type_errors = by_data_type[data_type] | |
| print( | |
| f"\n Data Type: {data_type} ({len(data_type_errors)} occurrence(s))" | |
| ) | |
| # Show first error details | |
| first_error = data_type_errors[0] | |
| print(f" Phase: {first_error['phase']}") | |
| print(f" Prefix: {first_error['prefix']}") | |
| # Truncate long error messages | |
| error_msg = first_error["error_message"] | |
| if error_msg and len(error_msg) > 200: | |
| error_msg = error_msg[:200] + "..." | |
| print(f" Error: {error_msg}") | |
| if len(data_type_errors) > 1: | |
| print(f" (+ {len(data_type_errors) - 1} more occurrence(s))") | |
| print("\n" + "=" * 80) | |
| # Summary statistics | |
| print("\nSUMMARY STATISTICS") | |
| print("=" * 80) | |
| total_errors = len(errors) | |
| unique_data_types = len(set([e[1] for e in errors])) | |
| unique_categories = len(error_categories) | |
| print(f"Total Errors: {total_errors}") | |
| print(f"Unique Data Types with Errors: {unique_data_types}") | |
| print(f"Error Categories: {unique_categories}") | |
| print() | |
| # Phase breakdown | |
| phase_counts = defaultdict(int) | |
| for _, _, _, phase, _, _ in errors: | |
| phase_counts[phase] += 1 | |
| print("Errors by Phase:") | |
| for phase in sorted(phase_counts.keys()): | |
| print(f" {phase}: {phase_counts[phase]}") | |
| print("\n" + "=" * 80) | |
| finally: | |
| cursor.close() | |
| conn.close() | |
| if __name__ == "__main__": | |
| analyze_load_errors() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment