Skip to content

Instantly share code, notes, and snippets.

@philerooski
Created November 20, 2025 23:58
Show Gist options
  • Select an option

  • Save philerooski/e2cb0bb380fd48ea18805aa7e9602d50 to your computer and use it in GitHub Desktop.

Select an option

Save philerooski/e2cb0bb380fd48ea18805aa7e9602d50 to your computer and use it in GitHub Desktop.
"""
Analyze errors from the LOAD_LOG table.
This script queries the LOAD_LOG table for failed operations,
categorizes errors by type, and groups data types by error category.
This is a complementary script to https://gist.github.com/philerooski/a740b25f066f1ad205344637160aa969
"""
import snowflake.connector
from collections import defaultdict
import re
def categorize_error(error_message: str) -> str:
"""
Categorize an error message into a high-level issue type.
Args:
error_message: The error message from LOAD_LOG
Returns:
A high-level category for the error
"""
if not error_message:
return "UNKNOWN"
error_lower = error_message.lower()
# UTF-8 / Encoding errors
if "utf8" in error_lower or "utf-8" in error_lower or "encoding" in error_lower:
return "ENCODING_ERROR"
# Schema mismatch errors
if "schema" in error_lower or "column" in error_lower and "mismatch" in error_lower:
return "SCHEMA_MISMATCH"
# File format errors
if "parquet" in error_lower and (
"corrupt" in error_lower or "invalid" in error_lower
):
return "CORRUPT_PARQUET"
# Permission errors
if (
"permission" in error_lower
or "access denied" in error_lower
or "unauthorized" in error_lower
):
return "PERMISSION_ERROR"
# File not found
if "file not found" in error_lower or "does not exist" in error_lower:
return "FILE_NOT_FOUND"
# Data type conversion errors
if "conversion" in error_lower or "cast" in error_lower or "type" in error_lower:
return "DATA_TYPE_ERROR"
# Null constraint violations
if "null" in error_lower and (
"constraint" in error_lower or "not null" in error_lower
):
return "NULL_CONSTRAINT"
# Timeout errors
if "timeout" in error_lower or "timed out" in error_lower:
return "TIMEOUT"
# Stage errors
if "stage" in error_lower and "not found" in error_lower:
return "STAGE_NOT_FOUND"
# Infer schema errors
if "infer_schema" in error_lower:
return "INFER_SCHEMA_ERROR"
# Generic SQL errors
if "sql compilation error" in error_lower:
return "SQL_COMPILATION_ERROR"
return "OTHER"
def analyze_load_errors(database=None, schema=None):
"""
Analyze errors from the LOAD_LOG table.
Args:
database: Optional database name to use
schema: Optional schema name to use
"""
# Connect to Snowflake
conn = snowflake.connector.connect()
try:
cursor = conn.cursor()
# Set database and schema if provided
if database:
cursor.execute(f"USE DATABASE {database}")
if schema:
cursor.execute(f"USE SCHEMA {schema}")
# Query for all failed operations
print("Querying LOAD_LOG for errors...")
cursor.execute(
"""
SELECT
PREFIX,
DATA_TYPE,
STAGE_PATH,
PHASE,
ERROR_MESSAGE,
LOG_TS
FROM LOAD_LOG
WHERE STATUS = 'FAIL'
ORDER BY LOG_TS
"""
)
errors = cursor.fetchall()
if not errors:
print("\n✓ No errors found in LOAD_LOG!")
return
print(f"\nFound {len(errors)} error(s) in LOAD_LOG\n")
# Categorize errors
error_categories = defaultdict(list)
for prefix, data_type, stage_path, phase, error_msg, log_ts in errors:
category = categorize_error(error_msg)
error_categories[category].append(
{
"prefix": prefix,
"data_type": data_type,
"stage_path": stage_path,
"phase": phase,
"error_message": error_msg,
"timestamp": log_ts,
}
)
# Print summary by category
print("=" * 80)
print("ERROR ANALYSIS SUMMARY")
print("=" * 80)
print()
for category in sorted(error_categories.keys()):
errors_in_category = error_categories[category]
data_types = list(set([e["data_type"] for e in errors_in_category]))
print(f"📋 {category}")
print(f" Count: {len(errors_in_category)} error(s)")
print(
f" Affected Data Types ({len(data_types)}): {', '.join(sorted(data_types))}"
)
print()
# Print detailed breakdown
print("=" * 80)
print("DETAILED ERROR BREAKDOWN")
print("=" * 80)
print()
for category in sorted(error_categories.keys()):
errors_in_category = error_categories[category]
print(f"\n{'─' * 80}")
print(f"Category: {category}")
print(f"{'─' * 80}")
# Group by data type within this category
by_data_type = defaultdict(list)
for error in errors_in_category:
by_data_type[error["data_type"]].append(error)
for data_type in sorted(by_data_type.keys()):
data_type_errors = by_data_type[data_type]
print(
f"\n Data Type: {data_type} ({len(data_type_errors)} occurrence(s))"
)
# Show first error details
first_error = data_type_errors[0]
print(f" Phase: {first_error['phase']}")
print(f" Prefix: {first_error['prefix']}")
# Truncate long error messages
error_msg = first_error["error_message"]
if error_msg and len(error_msg) > 200:
error_msg = error_msg[:200] + "..."
print(f" Error: {error_msg}")
if len(data_type_errors) > 1:
print(f" (+ {len(data_type_errors) - 1} more occurrence(s))")
print("\n" + "=" * 80)
# Summary statistics
print("\nSUMMARY STATISTICS")
print("=" * 80)
total_errors = len(errors)
unique_data_types = len(set([e[1] for e in errors]))
unique_categories = len(error_categories)
print(f"Total Errors: {total_errors}")
print(f"Unique Data Types with Errors: {unique_data_types}")
print(f"Error Categories: {unique_categories}")
print()
# Phase breakdown
phase_counts = defaultdict(int)
for _, _, _, phase, _, _ in errors:
phase_counts[phase] += 1
print("Errors by Phase:")
for phase in sorted(phase_counts.keys()):
print(f" {phase}: {phase_counts[phase]}")
print("\n" + "=" * 80)
finally:
cursor.close()
conn.close()
if __name__ == "__main__":
analyze_load_errors()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment