Skip to content

Instantly share code, notes, and snippets.

@sean-
Created September 26, 2025 19:47
Show Gist options
  • Select an option

  • Save sean-/af49dbae971ce79c911196c016a816fb to your computer and use it in GitHub Desktop.

Select an option

Save sean-/af49dbae971ce79c911196c016a816fb to your computer and use it in GitHub Desktop.
Emergency recovery tool for Claude Code session files
#!/usr/bin/env python3
"""claude_code_recovery.py - Emergency recovery tool for Claude Code session files
Copyright (c) 2025 Sean Chittenden
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
================================================================================
ABOUT THIS TOOL:
This recovery tool exists because Claude "helpfully" decided:
```
⏺ Now I need to update the wipe operation to remove the parent directory (not just pgdata), but still check for the safety file in the pgdata subdirectory:
```
and changed:
os.RemoveAll(p.PgData) // Remove data directory
to:
os.RemoveAll(parentDir) // Remove parent data directory (which contains pgdata subdirectory)
...which promptly deleted the entire project directory including 20 days of
un-pushed work and the .git directory. Oops. 🤦
Fortunately, Claude Code keeps detailed JSONL session files of all interactions,
including file contents, edits, and changes. This tool reconstructs your lost
files by:
1. Finding the latest snapshot of each file (from Write operations)
2. Applying all subsequent edits chronologically
3. Extracting additional context from chat messages
The recovered files isn't perfect - some recent changes might be missed if they
weren't captured in the session files, and complex MultiEdit operations may have
partial failures. But it's infinitely better than losing everything and it helps
structure the recovery effort so you can get back on your feet sooner rather
than later.
Remember kids: Always push your commits and keep backups. And maybe don't let
an AI suggest filesystem operations on parent directories. 😅
USAGE:
python3 claude_code_recovery.py --jsonl-dir ~/.claude/projects/YOUR_PROJECT_PATH --output-dir recovered_files
The JSONL files are typically stored in:
~/.claude/projects/[project-path]/
Each session is stored as a UUID.jsonl file containing all interactions from that session.
OPTIONS:
--jsonl-dir DIR Directory containing JSONL files (required)
--output-dir DIR Output directory for recovered files (default: recovered_files)
--dry-run Show what would be recovered without writing files
--filter PATTERN Only recover files matching regex pattern
--verbose Show detailed progress
--interactive Interactively resolve edit conflicts
--use-earliest-snapshot Use earliest snapshot instead of latest (applies more edits)
--no-save-rejects Don't save rejected edits to separate files
WHAT GETS RECOVERED:
- Files from 'Write' operations (full snapshots)
- Files modified via 'Edit' and 'MultiEdit' operations
- Git commit messages from chat
- File content shown in 'Read' operations (as reference)
WHAT MIGHT BE MISSING:
- Very recent changes not yet in session files
- Files only viewed but never edited
- Binary files (images, compiled code, etc.)
- Files created outside of Claude Code
ERROR HANDLING:
- Tool execution errors are tracked in tool_errors.md
- Failed edits are documented in edit_failures.md
- Unknown entry types are logged in unknown_entries.md
- Rejected edits can be saved for manual review
This tool is provided as-is, with apologies for its existence being necessary.
May your future commits be frequent and your backups plentiful. 🙏
"""
import json
import re
import argparse
from pathlib import Path
from datetime import datetime
from collections import defaultdict, Counter
from typing import Dict, List, Optional, Set, Tuple, Any
from dataclasses import dataclass
import sys
@dataclass
class FileSnapshot:
"""Represents a complete file snapshot from an originalFile entry"""
file_path: str
content: str
timestamp: datetime
session_id: str
uuid: str
source_file: str = "unknown"
source_line: int = 0
@dataclass
class EditOperation:
"""Represents an edit operation to apply to a file"""
file_path: str
old_string: str
new_string: str
timestamp: datetime
session_id: str
uuid: str
tool_name: str = 'Edit'
replace_all: bool = False
source_file: str = "unknown"
source_line: int = 0
assistant_message: str = "" # Store assistant's reasoning/message
@dataclass
class ChatMessage:
"""Represents a chat message"""
role: str
content: str
timestamp: datetime
session_id: str
uuid: str
commit_message: Optional[str] = None
def semver_sort_key(version_str):
"""Convert version string to tuple for proper sorting"""
try:
parts = version_str.split('.')
return tuple(int(p) for p in parts)
except (ValueError, AttributeError):
return (0, 0, 0) # Default for unparseable versions
class JSONLRecoveryTool:
def __init__(self, jsonl_dir: str, output_dir: str = "recovered_files",
chat_dir: Optional[str] = None,
dry_run: bool = False, verbose: bool = False,
interactive: bool = False, use_earliest_snapshot: bool = False,
save_rejects: bool = True):
self.jsonl_dir = Path(jsonl_dir)
self.output_dir = Path(output_dir)
self.chat_dir = Path(chat_dir) if chat_dir else self.output_dir / "chat"
self.dry_run = dry_run
self.verbose = verbose
self.interactive = interactive
self.use_earliest_snapshot = use_earliest_snapshot
self.save_rejects = save_rejects
# Add reject directory
self.reject_dir = self.output_dir / "rejected_edits"
# Storage for recovered data
self.file_snapshots: Dict[str, FileSnapshot] = {}
self.file_edits: Dict[str, List[EditOperation]] = defaultdict(list)
self.file_content_entries: Dict[str, List[dict]] = defaultdict(list) # Track file content from toolUseResult.file
self.chat_messages: List[ChatMessage] = []
self.recovered_files: Dict[str, str] = {}
self.entries_after_recovery: Dict[str, List[dict]] = defaultdict(list) # Track entries after file recovery
# UUID relationship tracking
self.uuid_graph: Dict[str, dict] = {} # Track parent-child relationships
# Track unknown entries
self.unknown_entries: List[dict] = [] # Entries we don't know how to process
self.unknown_entry_types: Counter = Counter() # Count types of unknown entries
# Track tool errors
self.tool_errors: List[dict] = [] # Store all tool errors
self.tool_error_types: Counter = Counter() # Count by error type
self.tool_rejections: int = 0 # Count of user rejections
# Statistics
self.stats = {
'sessions': 0,
'snapshots': 0,
'edits_collected': 0,
'edits_skipped': 0, # Edits that occurred before snapshot
'edits_applicable': 0, # Edits that occurred after snapshot
'edits_successful': 0,
'edits_failed': 0,
'conflicts': 0,
'messages': 0,
'commits': 0,
'file_reads': 0,
'entries': 0,
'entries_processed': 0, # Entries we handled
'entries_unknown': 0, # Entries we don't know how to handle
'entries_benign': 0, # Benign entries (summary, system)
'entries_after_recovery': 0, # Entries that came after file recovery
}
def log(self, message: str, level: str = "INFO"):
"""Log message with timestamp"""
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"[{timestamp}] [{level}] {message}")
def find_jsonl_files(self) -> List[Path]:
"""Find all JSONL files in the directory"""
jsonl_files = list(self.jsonl_dir.glob("*.jsonl"))
self.log(f"Found {len(jsonl_files)} JSONL files in {self.jsonl_dir}")
return sorted(jsonl_files)
def find_approximate_line(self, content: str, search_str: str, context_chars: int = 50) -> int:
"""Find approximate line number where string might be expected"""
lines = content.split('\n')
search_lower = search_str.lower()[:context_chars]
for i, line in enumerate(lines, 1):
if search_lower in line.lower():
return i
# If not found, return middle of file
return len(lines) // 2
def handle_failed_edit(self, file_path: str, edit: EditOperation, content: str, reason: str):
"""Handle a failed edit operation"""
self.stats['edits_failed'] += 1
self.stats['conflicts'] += 1
if self.interactive:
print(f"\n{'='*60}")
print(f"EDIT CONFLICT in {file_path}")
print(f"Reason: {reason}")
print(f"Timestamp: {edit.timestamp}")
print(f"Looking for:")
print(edit.old_string[:200] + "..." if len(edit.old_string) > 200 else edit.old_string)
print("\nOptions:")
print("1. Skip this edit")
print("2. Show full old_string")
print("3. Show current file content")
print("4. Apply anyway (force)")
choice = input("Choice [1-4, default=1]: ").strip()
if choice == '2':
print("\nFull old_string:")
print(edit.old_string)
return self.handle_failed_edit(file_path, edit, content, reason)
elif choice == '3':
print("\nCurrent file content:")
print(content[:1000] + "..." if len(content) > 1000 else content)
return self.handle_failed_edit(file_path, edit, content, reason)
elif choice == '4':
return content.replace(edit.old_string, edit.new_string, 1 if not edit.replace_all else -1)
return content
def save_reject_file(self, edit: EditOperation, file_path: str, reason: str):
"""Save rejected edit to a file for manual review"""
if not self.save_rejects or self.dry_run:
return
self.reject_dir.mkdir(parents=True, exist_ok=True)
# Create a unique filename for this reject
timestamp_str = edit.timestamp.strftime("%Y%m%d_%H%M%S")
safe_filename = re.sub(r'[^a-zA-Z0-9_-]', '_', Path(file_path).name)
reject_file = self.reject_dir / f"{timestamp_str}_{safe_filename}.reject.md"
with open(reject_file, 'w') as f:
f.write(f"# Rejected Edit\n\n")
f.write(f"**File**: {file_path}\n")
f.write(f"**Timestamp**: {edit.timestamp}\n")
f.write(f"**Reason**: {reason}\n")
f.write(f"**Tool**: {edit.tool_name}\n")
f.write(f"**Source**: {edit.source_file}:{edit.source_line}\n\n")
if edit.assistant_message:
f.write(f"## Assistant Message\n")
f.write(f"{edit.assistant_message}\n\n")
f.write(f"## Old String (looking for this)\n")
f.write("```\n")
f.write(edit.old_string)
f.write("\n```\n\n")
f.write(f"## New String (wanted to change to)\n")
f.write("```\n")
f.write(edit.new_string)
f.write("\n```\n")
def parse_jsonl_entry(self, entry: dict, source_file: str = "unknown", source_line: int = 0,
previous_assistant_message: str = "") -> str:
"""Parse a single JSONL entry and extract relevant information.
Returns the assistant message if this entry contains one, otherwise returns empty string."""
current_assistant_message = ""
entry_handled = False
try:
if not isinstance(entry, dict):
self.log(f"ERROR: Entry is not a dict at {source_file}:{source_line}, it's a {type(entry)}", "ERROR")
self.stats['entries_unknown'] += 1
return ""
# Handle timestamp parsing with better error handling
timestamp = None
timestamp_str = entry.get('timestamp')
if timestamp_str:
try:
# Handle both formats: with and without timezone
if timestamp_str.endswith('Z'):
timestamp_str = timestamp_str[:-1] + '+00:00'
timestamp = datetime.fromisoformat(timestamp_str)
except (ValueError, AttributeError) as e:
self.log(f"Warning: Invalid timestamp format at {source_file}:{source_line}: {timestamp_str}", "WARN")
timestamp = datetime.now()
else:
timestamp = datetime.now()
session_id = entry.get('sessionId', 'unknown')
uuid = entry.get('uuid', 'unknown')
version = entry.get('version', 'unknown')
# Track Claude Code versions
if version != 'unknown':
if not hasattr(self, 'versions_seen'):
self.versions_seen = Counter()
self.versions_seen[version] += 1
# Track assistant messages for context
if entry.get('type') == 'assistant' and 'message' in entry:
message = entry.get('message', {})
if isinstance(message, dict) and 'content' in message:
content_items = message.get('content', [])
if isinstance(content_items, list):
for item in content_items:
if isinstance(item, dict) and item.get('type') == 'text':
text = item.get('text', '')
if text:
current_assistant_message = text
break
# Track UUID relationships
if uuid and uuid != 'unknown':
self.uuid_graph[uuid] = entry
if 'parentUuid' in entry:
parent_uuid = entry['parentUuid']
# Track parent/child relationship
entry['_parent'] = parent_uuid
# Check for tool results (edits, writes, etc.)
if 'toolUseResult' in entry:
tool_result = entry['toolUseResult']
# Only process if it's a dict (not an error string)
if isinstance(tool_result, dict):
self.process_tool_result(tool_result, timestamp, session_id, uuid, source_file, source_line, previous_assistant_message)
entry_handled = True
# Also check for file content in toolUseResult.file
if 'file' in tool_result and 'content' in tool_result['file']:
file_path = tool_result['file'].get('filePath')
if file_path:
content_entry = {
'file_path': file_path,
'content': tool_result['file']['content'],
'timestamp': timestamp,
'uuid': uuid,
'parent_uuid': entry.get('parentUuid'),
'source': f"{source_file}:{source_line}",
'type': 'file_read'
}
self.file_content_entries[file_path].append(content_entry)
self.stats['file_reads'] += 1
self.log(f"Found file content for {file_path} in toolUseResult.file at {timestamp}")
# Check if this is after the file was already recovered
if file_path in self.recovered_files:
self.entries_after_recovery[file_path].append({
'timestamp': timestamp,
'type': 'file_read_after_recovery',
'source': f"{source_file}:{source_line}"
})
self.stats['entries_after_recovery'] += 1
# Check for tool errors (user role with error content)
tool_error_found = False
if entry.get('type') == 'user' and 'toolUseResult' in entry:
tool_result = entry['toolUseResult']
if isinstance(tool_result, str) and tool_result.startswith('Error:'):
self.log(f"DEBUG: Found toolUseResult error at {source_file}:{source_line}", "DEBUG")
self.process_tool_error(entry, timestamp, version, source_file, source_line)
tool_error_found = True
entry_handled = True
# Check for messages (chat content) - always check for tool errors in message
if 'message' in entry:
message = entry['message']
# Handle both dict and string message formats
if isinstance(message, dict):
# Check for tool errors in message content
if message.get('role') == 'user' and isinstance(message.get('content'), list):
has_error = False
for content_item in message['content']:
if isinstance(content_item, dict) and content_item.get('is_error'):
self.log(f"DEBUG: Found is_error in message content at {source_file}:{source_line}", "DEBUG")
self.process_tool_error_from_content(content_item, entry, timestamp, version, source_file, source_line)
has_error = True
if has_error:
entry_handled = True
elif not tool_error_found:
# Only process as regular message if no errors found anywhere
self.process_message(message, timestamp, session_id, uuid, source_file, source_line)
entry_handled = True
elif not tool_error_found:
# Process other messages only if no tool error was found
self.process_message(message, timestamp, session_id, uuid, source_file, source_line)
entry_handled = True
elif isinstance(message, str) and not tool_error_found:
# Convert string message to dict format
self.process_message({'content': message, 'role': 'unknown'}, timestamp, session_id, uuid, source_file, source_line)
entry_handled = True
# Check for tool use (to identify tool names)
if entry.get('type') == 'assistant' and 'message' in entry:
message = entry['message'] # Get the actual message
if isinstance(message, dict) and 'content' in message:
for content in message.get('content', []):
if isinstance(content, dict) and content.get('type') == 'tool_use':
# Pass version info for debugging
self.process_tool_use(content, timestamp, session_id, uuid, source_file, source_line,
current_assistant_message or previous_assistant_message, version)
entry_handled = True
# Check for benign entry types (summary, system)
entry_type = entry.get('type', 'no_type')
if entry_type in ['summary', 'system']:
# These are benign status messages, track separately
self.stats['entries_benign'] = self.stats.get('entries_benign', 0) + 1
if entry_type not in self.benign_entry_types:
self.benign_entry_types = getattr(self, 'benign_entry_types', {})
self.benign_entry_types[entry_type] = 0
self.benign_entry_types[entry_type] += 1
entry_handled = True # Mark as handled since they're expected
# Track if this entry wasn't handled (unknown)
if not entry_handled:
# Double-check it's not a tool error we missed
is_tool_error = False
# Check for toolUseResult errors
if entry.get('type') == 'user' and 'toolUseResult' in entry:
tool_result = entry['toolUseResult']
if isinstance(tool_result, str) and tool_result.startswith('Error:'):
is_tool_error = True
self.log(f"WARNING: Missed tool error (toolUseResult) at {source_file}:{source_line}", "WARN")
self.process_tool_error(entry, timestamp, version, source_file, source_line)
# Check for is_error in message content
if not is_tool_error and 'message' in entry:
message = entry['message']
if isinstance(message, dict) and message.get('role') == 'user':
content = message.get('content', [])
if isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get('is_error'):
is_tool_error = True
self.log(f"WARNING: Missed tool error (is_error) at {source_file}:{source_line}", "WARN")
self.process_tool_error_from_content(item, entry, timestamp, version, source_file, source_line)
break
# Only add to unknown if it's really not a tool error
if not is_tool_error:
self.stats['entries_unknown'] += 1
# Determine entry type for categorization
if not entry_type or entry_type == 'no_type':
# Try to infer type from keys
if 'toolUseRequest' in entry:
entry_type = 'toolUseRequest'
elif 'error' in entry:
entry_type = 'error'
elif 'message' in entry:
entry_type = 'message_unknown_format'
else:
entry_type = f"unknown_keys:{','.join(list(entry.keys()))}"
self.unknown_entry_types[entry_type] += 1
# Store ALL unknown entries (no limiting)
self.unknown_entries.append({
'type': entry_type,
'source': f"{source_file}:{source_line}",
'timestamp': timestamp_str,
'uuid': uuid,
'sample_keys': list(entry.keys()), # All keys
'entry_snippet': str(entry) # Full entry
})
else:
self.stats['entries_processed'] += 1
except Exception as e:
# Wrap entire exception handler in try-except to catch errors in the handler itself
try:
self.log(f"Error parsing entry at {source_file}:{source_line}: {e}", "WARN")
self.stats['entries_unknown'] += 1
self.unknown_entry_types['parse_error'] += 1
# Track parse errors in detail
if not hasattr(self, 'parse_errors'):
self.parse_errors = []
version = entry.get('version', 'unknown') if isinstance(entry, dict) else 'unknown'
self.parse_errors.append({
'source': f"{source_file}:{source_line}",
'error': str(e),
'version': version,
'entry_keys': list(entry.keys()) if isinstance(entry, dict) else 'not_a_dict',
'entry_snippet': str(entry) if entry else 'None'
})
# Track version-specific errors
if version != 'unknown':
if not hasattr(self, 'version_errors'):
self.version_errors = {}
if version not in self.version_errors:
self.version_errors[version] = []
self.version_errors[version].append({
'source': f"{source_file}:{source_line}",
'error': str(e),
'tool': 'unknown' # Will be updated if we can determine the tool
})
except Exception as handler_error:
# If the exception handler itself fails, just log it simply
self.log(f"CRITICAL: Exception handler failed at {source_file}:{source_line}: {handler_error}", "ERROR")
self.log(f"Original error was: {e}", "ERROR")
self.stats['entries_unknown'] += 1
self.unknown_entry_types['parse_error'] += 1
return current_assistant_message
def process_tool_result(self, tool_result: dict, timestamp: datetime, session_id: str,
uuid: str, source_file: str, source_line: int, assistant_message: str = ""):
"""Process toolUseResult entries"""
if 'originalFile' in tool_result:
# This is a file snapshot
file_info = tool_result['originalFile']
file_path = file_info.get('filePath')
if file_path:
content = file_info.get('content', '')
snapshot = FileSnapshot(
file_path=file_path,
content=content,
timestamp=timestamp,
session_id=session_id,
uuid=uuid,
source_file=source_file,
source_line=source_line
)
# Keep either earliest or latest snapshot based on configuration
if self.use_earliest_snapshot:
# Keep the earliest snapshot for each file
if file_path not in self.file_snapshots or snapshot.timestamp < self.file_snapshots[file_path].timestamp:
self.file_snapshots[file_path] = snapshot
self.stats['snapshots'] += 1
self.log(f"Found originalFile snapshot for {file_path} at {timestamp}")
else:
# Keep only the latest snapshot for each file
if file_path not in self.file_snapshots or snapshot.timestamp > self.file_snapshots[file_path].timestamp:
self.file_snapshots[file_path] = snapshot
self.stats['snapshots'] += 1
self.log(f"Found originalFile snapshot for {file_path} at {timestamp}")
def process_tool_use(self, tool_use: dict, timestamp: datetime, session_id: str,
uuid: str, source_file: str, source_line: int, assistant_message: str = "",
version: str = "unknown"):
"""Process tool_use entries to extract edits and writes"""
tool_name = tool_use.get('name')
tool_input = tool_use.get('input', {})
if tool_name == 'Write':
file_path = tool_input.get('file_path')
content_text = tool_input.get('content', '')
if file_path:
# Create a snapshot for Write operations
snapshot = FileSnapshot(
file_path=file_path,
content=content_text,
timestamp=timestamp,
session_id=session_id,
uuid=uuid,
source_file=source_file,
source_line=source_line
)
# Keep either earliest or latest snapshot based on configuration
if self.use_earliest_snapshot:
# Keep the earliest snapshot for each file
if file_path not in self.file_snapshots or snapshot.timestamp < self.file_snapshots[file_path].timestamp:
self.file_snapshots[file_path] = snapshot
self.stats['snapshots'] += 1
self.log(f"Found Write operation for {file_path} at {timestamp}")
else:
# Keep only the latest snapshot for each file
if file_path not in self.file_snapshots or snapshot.timestamp > self.file_snapshots[file_path].timestamp:
self.file_snapshots[file_path] = snapshot
self.stats['snapshots'] += 1
self.log(f"Found Write operation for {file_path} at {timestamp}")
elif tool_name == 'Edit':
file_path = tool_input.get('file_path')
if file_path:
# Check if we have old_string and new_string in input
if 'old_string' in tool_input and 'new_string' in tool_input:
edit = EditOperation(
file_path=file_path,
old_string=tool_input.get('old_string', ''),
new_string=tool_input.get('new_string', ''),
timestamp=timestamp,
session_id=session_id,
uuid=uuid,
tool_name='Edit',
replace_all=tool_input.get('replace_all', False),
source_file=source_file,
source_line=source_line,
assistant_message=assistant_message
)
self.file_edits[file_path].append(edit)
self.stats['edits_collected'] += 1
self.log(f"Found Edit operation for {file_path} at {timestamp}")
else:
self.log(f"Edit tool_use missing old_string or new_string for {file_path} in v{version}", "WARN")
elif tool_name == 'MultiEdit':
# MultiEdit has an array of edits
file_path = tool_input.get('file_path')
if file_path:
edits_array = tool_input.get('edits', [])
edits_found = 0
# Create a unique ID for this MultiEdit group
multi_edit_group_id = f"{uuid}_{timestamp.isoformat()}"
for edit_idx, edit_op in enumerate(edits_array):
if 'old_string' in edit_op and 'new_string' in edit_op:
edit = EditOperation(
file_path=file_path,
old_string=edit_op.get('old_string', ''),
new_string=edit_op.get('new_string', ''),
timestamp=timestamp,
session_id=session_id,
uuid=uuid,
tool_name='MultiEdit',
replace_all=edit_op.get('replace_all', False),
source_file=source_file,
source_line=source_line,
assistant_message=assistant_message
)
# Add MultiEdit-specific metadata
edit.multi_edit_group_id = multi_edit_group_id
edit.multi_edit_index = edit_idx
edit.multi_edit_total = len(edits_array)
# Store all edit operations from the group for reporting
edit.multi_edit_all = edits_array
self.file_edits[file_path].append(edit)
self.stats['edits_collected'] += 1
edits_found += 1
if edits_found > 0:
self.log(f"Found MultiEdit operation for {file_path} with {edits_found} edits at {timestamp}")
else:
self.log(f"MultiEdit tool_use missing edits content for {file_path} in v{version}", "WARN")
def normalize_error_text(self, error_text: str) -> str:
"""Normalize error text by replacing dynamic parts with placeholders"""
import re
# Normalize "Found N matches" pattern
normalized = re.sub(r'Found \d+ matches', 'Found [N] matches', error_text)
# Normalize file paths that look like /path/to/file
normalized = re.sub(r'/[\w/.-]+', '[PATH]', normalized)
# Normalize line numbers
normalized = re.sub(r'line \d+', 'line [N]', normalized)
normalized = re.sub(r':\d+', ':[N]', normalized)
# Normalize UUIDs
normalized = re.sub(r'[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}', '[UUID]', normalized)
return normalized
def categorize_error(self, error_text: str) -> str:
"""Categorize error - keep semantic categories for known patterns"""
# Check for specific patterns first
if "doesn't want to proceed" in error_text or "tool use was rejected" in error_text:
return "user_rejected"
elif "File has not been read yet" in error_text:
return "file_not_read"
elif "String to replace not found" in error_text or "Old string not found" in error_text:
return "string_not_found"
elif "File not found" in error_text:
return "file_not_found"
elif "Permission denied" in error_text:
return "permission_denied"
elif "already exists" in error_text:
return "already_exists"
else:
# For other errors, use normalized version as category
return self.normalize_error_text(error_text)
def process_tool_error(self, entry: dict, timestamp: datetime, version: str, source_file: str, source_line: int):
"""Process a tool use error entry"""
error_text = entry.get('toolUseResult', '')
parent_uuid = entry.get('parentUuid', '')
# Categorize the error
error_type = self.categorize_error(error_text)
# Track error
if error_type not in self.tool_error_types:
self.tool_error_types[error_type] = 0
self.tool_error_types[error_type] += 1
# Track rejections specifically
if error_type == "user_rejected":
self.tool_rejections += 1
# Store error details - FULL TEXT, no limiting
self.tool_errors.append({
'type': error_type,
'text': error_text, # Full text, no limiting
'original_text': error_text, # Keep original too
'parent_uuid': parent_uuid,
'version': version,
'timestamp': timestamp,
'source': f"{source_file}:{source_line}"
})
self.log(f"Tool error ({error_type}) in v{version} - Total tool errors so far: {len(self.tool_errors)}")
def process_tool_error_from_content(self, content_item: dict, entry: dict, timestamp: datetime,
version: str, source_file: str, source_line: int):
"""Process tool error from message content"""
error_text = content_item.get('content', '')
tool_use_id = content_item.get('tool_use_id', '')
parent_uuid = entry.get('parentUuid', '')
# Remove <tool_use_error> tags if present
if '<tool_use_error>' in error_text:
error_text = error_text.replace('<tool_use_error>', '').replace('</tool_use_error>', '').strip()
# Categorize the error
error_type = self.categorize_error(error_text)
# Track error
if error_type not in self.tool_error_types:
self.tool_error_types[error_type] = 0
self.tool_error_types[error_type] += 1
# Track rejections specifically
if error_type == "user_rejected":
self.tool_rejections += 1
# Store error details - FULL TEXT, no limiting
self.tool_errors.append({
'type': error_type,
'text': error_text, # Full text, no limiting
'original_text': error_text, # Keep original too
'tool_use_id': tool_use_id,
'parent_uuid': parent_uuid,
'version': version,
'timestamp': timestamp,
'source': f"{source_file}:{source_line}"
})
self.log(f"Tool error ({error_type}) for tool {tool_use_id} in v{version} - Total tool errors so far: {len(self.tool_errors)}")
def process_message(self, message: dict, timestamp: datetime, session_id: str, uuid: str, source_file: str = "unknown", source_line: int = 0) -> None:
"""Process chat messages"""
# Ensure message is a dict
if not isinstance(message, dict):
self.log(f"Warning: process_message received non-dict: {type(message)}", "WARN")
return
role = message.get('role', 'unknown')
# Extract text content
content_text = ""
commit_message = None
message_content = message.get('content')
if isinstance(message_content, str):
content_text = message_content
elif isinstance(message_content, list):
for content in message_content:
if isinstance(content, dict) and content.get('type') == 'text':
text = content.get('text', '')
content_text += text + "\n"
# Check for git commit messages
if 'git commit' in text.lower() and '-m' in text:
# Extract commit message
import re
commit_match = re.search(r'-m\s*["\'](.*?)["\']', text)
if not commit_match:
# Try heredoc style
commit_match = re.search(r'cat\s*<<.*?\n(.*?)\nEOF', text, re.DOTALL)
if commit_match:
commit_message = commit_match.group(1)
self.stats['commits'] += 1
if content_text:
msg = ChatMessage(
role=role,
content=content_text,
timestamp=timestamp,
session_id=session_id,
uuid=uuid,
commit_message=commit_message
)
self.chat_messages.append(msg)
self.stats['messages'] += 1
def phase1_extract_snapshots(self, filter_pattern: Optional[str] = None):
"""Phase 1: Extract all originalFile snapshots"""
snapshot_mode = "EARLIEST" if self.use_earliest_snapshot else "LATEST"
self.log(f"PHASE 1: Extracting originalFile snapshots (using {snapshot_mode} snapshot for each file)...")
jsonl_files = self.find_jsonl_files()
for jsonl_file in jsonl_files:
self.log(f"Processing {jsonl_file.name} for snapshots...")
try:
with open(jsonl_file, 'r', encoding='utf-8') as f:
previous_assistant_message = ""
full_path = str(jsonl_file.absolute())
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
self.stats['entries'] += 1
# Pass the previous assistant message and get the current one
assistant_msg = self.parse_jsonl_entry(entry, source_file=full_path,
source_line=line_num,
previous_assistant_message=previous_assistant_message)
# Update for next iteration
if assistant_msg:
previous_assistant_message = assistant_msg
except json.JSONDecodeError as e:
self.log(f"Warning: Invalid JSON on line {line_num} in {jsonl_file.name}: {e}", "WARN")
continue
except Exception as e:
self.log(f"Warning: Error processing line {line_num} in {jsonl_file.name}: {e}", "WARN")
continue
except Exception as e:
self.log(f"Error reading {jsonl_file}: {e}, continuing with next file", "ERROR")
continue
self.stats['sessions'] += 1
# Filter snapshots if pattern specified
if filter_pattern:
filtered = {}
for file_path, snapshot in self.file_snapshots.items():
if re.search(filter_pattern, file_path):
filtered[file_path] = snapshot
self.file_snapshots = filtered
self.log(f"Found {len(self.file_snapshots)} unique files with snapshots")
def phase2_apply_edits(self):
"""Phase 2: Apply all edits in chronological order"""
self.log("PHASE 2: Applying edits to snapshots...")
# Track edit failures for reporting
self.failed_edits = []
# Log summary of all edits collected
total_file_edits = sum(len(edits) for edits in self.file_edits.values())
self.log(f"Total edits collected across all files: {total_file_edits}")
self.log(f"Files with edits: {len(self.file_edits)}")
# Log edits per file
for fp, edits in self.file_edits.items():
if edits:
self.log(f" {fp}: {len(edits)} edits")
if self.verbose:
for i, e in enumerate(edits): # Show all edits
self.log(f" Edit {i+1}: {e.timestamp} ({e.tool_name})")
for file_path, snapshot in self.file_snapshots.items():
content = snapshot.content
# Get all edits for this file after the snapshot timestamp
file_edits = self.file_edits.get(file_path, [])
# Filter edits that occurred after the snapshot
applicable_edits = [e for e in file_edits if e.timestamp > snapshot.timestamp]
skipped_edits = [e for e in file_edits if e.timestamp <= snapshot.timestamp]
# Update statistics
self.stats['edits_applicable'] += len(applicable_edits)
self.stats['edits_skipped'] += len(skipped_edits)
# Sort by timestamp (chronological order)
applicable_edits.sort(key=lambda e: e.timestamp)
self.log(f"File: {file_path}")
self.log(f" Snapshot timestamp: {snapshot.timestamp}")
self.log(f" Total edits for file: {len(file_edits)}")
self.log(f" Edits BEFORE snapshot (skipped): {len(skipped_edits)}")
self.log(f" Applicable edits (after snapshot): {len(applicable_edits)}")
for idx, edit in enumerate(applicable_edits, 1):
old_content = content
# Try to apply the edit
if edit.replace_all:
# Replace all occurrences
if edit.old_string in content:
occurrences = content.count(edit.old_string)
content = content.replace(edit.old_string, edit.new_string)
self.stats['edits_successful'] += 1
self.log(f" ✓ Edit {idx}/{len(applicable_edits)}: SUCCEEDED (replaced {occurrences} occurrences)")
else:
# Find line number where edit was expected
line_num = self.find_approximate_line(content, edit.old_string)
reason = f"Old string not found (replace_all) near line {line_num}"
self.handle_failed_edit(file_path, edit, content, reason)
self.save_reject_file(edit, file_path, reason)
self.log(f" ✗ Edit {idx}/{len(applicable_edits)}: FAILED - Old string not found (replace_all)")
self.log(f" Source: {edit.source_file}:{edit.source_line}")
self.log(f" Tool: {edit.tool_name}")
if edit.assistant_message:
self.log(f" Assistant reasoning: {edit.assistant_message}")
self.log(f" FULL OLD STRING:")
self.log(f" {repr(edit.old_string)}")
self.log(f" FULL NEW STRING:")
self.log(f" {repr(edit.new_string)}")
failed_edit_dict = {
'file': file_path,
'edit_num': idx,
'timestamp': edit.timestamp,
'reason': 'Old string not found (replace_all)',
'line_hint': line_num,
'old_string': edit.old_string, # Full string, not preview
'new_string': edit.new_string, # Include what we wanted to change to
'source_file': edit.source_file,
'source_line': edit.source_line,
'tool_name': edit.tool_name,
'assistant_message': edit.assistant_message
}
# Add MultiEdit information if present
if hasattr(edit, 'multi_edit_group_id'):
failed_edit_dict['multi_edit_group_id'] = edit.multi_edit_group_id
failed_edit_dict['multi_edit_index'] = edit.multi_edit_index
failed_edit_dict['multi_edit_total'] = edit.multi_edit_total
if hasattr(edit, 'multi_edit_all'):
failed_edit_dict['multi_edit_all'] = edit.multi_edit_all
self.failed_edits.append(failed_edit_dict)
else:
# Replace first occurrence only
if edit.old_string in content:
content = content.replace(edit.old_string, edit.new_string, 1)
self.stats['edits_successful'] += 1
self.log(f" ✓ Edit {idx}/{len(applicable_edits)}: SUCCEEDED")
else:
line_num = self.find_approximate_line(content, edit.old_string)
reason = f"Old string not found at expected location (line ~{line_num})"
self.handle_failed_edit(file_path, edit, content, reason)
self.save_reject_file(edit, file_path, reason)
self.log(f" ✗ Edit {idx}/{len(applicable_edits)}: FAILED - Old string not found")
self.log(f" Source: {edit.source_file}:{edit.source_line}")
self.log(f" Tool: {edit.tool_name}")
if edit.assistant_message:
self.log(f" Assistant reasoning: {edit.assistant_message}")
self.log(f" FULL OLD STRING:")
self.log(f" {repr(edit.old_string)}")
self.log(f" FULL NEW STRING:")
self.log(f" {repr(edit.new_string)}")
failed_edit_dict = {
'file': file_path,
'edit_num': idx,
'timestamp': edit.timestamp,
'reason': f'Old string not found at expected location (line ~{line_num})',
'line_hint': line_num,
'old_string': edit.old_string, # Full string, not preview
'new_string': edit.new_string, # Include what we wanted to change to
'source_file': edit.source_file,
'source_line': edit.source_line,
'tool_name': edit.tool_name,
'assistant_message': edit.assistant_message
}
# Add MultiEdit information if present
if hasattr(edit, 'multi_edit_group_id'):
failed_edit_dict['multi_edit_group_id'] = edit.multi_edit_group_id
failed_edit_dict['multi_edit_index'] = edit.multi_edit_index
failed_edit_dict['multi_edit_total'] = edit.multi_edit_total
if hasattr(edit, 'multi_edit_all'):
failed_edit_dict['multi_edit_all'] = edit.multi_edit_all
self.failed_edits.append(failed_edit_dict)
self.recovered_files[file_path] = content
self.log(f" Final content length: {len(content)} bytes\n")
def phase3_extract_chat(self):
"""Phase 3: Extract chat messages and commits"""
self.log("PHASE 3: Extracting chat messages...")
# Chat is already extracted during phase 1
self.log(f"Found {len(self.chat_messages)} chat messages")
# Count commits
commits = [m for m in self.chat_messages if m.commit_message]
self.log(f"Found {len(commits)} git commits")
# Write chat to files
if not self.dry_run:
self.chat_dir.mkdir(parents=True, exist_ok=True)
# Group messages by session
by_session = defaultdict(list)
for msg in self.chat_messages:
by_session[msg.session_id].append(msg)
for session_id, messages in by_session.items():
session_file = self.chat_dir / f"session_{session_id[:8]}.md"
with open(session_file, 'w') as f:
f.write(f"# Chat Session {session_id[:8]}\n\n")
for msg in sorted(messages, key=lambda m: m.timestamp):
f.write(f"## {msg.timestamp} - {msg.role}\n")
if msg.commit_message:
f.write(f"**COMMIT**: {msg.commit_message}\n\n")
f.write(msg.content)
f.write("\n\n---\n\n")
def write_recovered_files(self):
"""Write all recovered files to disk"""
if self.dry_run:
self.log("DRY RUN - Not writing files")
return
self.log("Writing recovered files...")
for file_path, content in self.recovered_files.items():
output_path = self.output_dir / file_path.lstrip('/')
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w') as f:
f.write(content)
self.log(f" Wrote {len(content)} bytes to {output_path}")
def write_edit_failure_report(self):
"""Write detailed edit failure report"""
if not hasattr(self, 'failed_edits') or not self.failed_edits:
return
report_path = self.output_dir / "edit_failures.md"
with open(report_path, 'w') as f:
f.write("# Edit Failure Report\n")
f.write(f"Generated at: {datetime.now().isoformat()}\n\n")
f.write(f"## Summary\n")
f.write(f"- Total edits collected from JSONL: {self.stats['edits_collected']}\n")
f.write(f"- Edits skipped (before snapshot): {self.stats['edits_skipped']}\n")
f.write(f"- Edits applicable (after snapshot): {self.stats['edits_applicable']}\n")
f.write(f"- Edits successfully applied: {self.stats['edits_successful']}\n")
f.write(f"- Edit failures: {len(self.failed_edits)}\n\n")
f.write("## Failed Edits by File\n\n")
# Group failures by file
by_file = {}
for failure in self.failed_edits:
file_path = failure['file']
if file_path not in by_file:
by_file[file_path] = []
by_file[file_path].append(failure)
for file_path, failures in sorted(by_file.items()):
f.write(f"### FILE: {file_path}\n")
f.write(f"Total failed edits for this file: {len(failures)}\n\n")
for idx, failure in enumerate(failures, 1):
f.write(f"#### ===== FAILED EDIT #{idx} for {file_path} =====\n")
f.write(f"**Full Source Path**: `{failure['source_file']}:{failure['source_line']}`\n")
f.write(f"**Tool Type**: {failure.get('tool_name', 'Unknown')}\n")
f.write(f"**Timestamp**: {failure['timestamp']}\n")
f.write(f"**Failure Reason**: {failure['reason']}\n")
f.write(f"**Approximate line in target file**: {failure['line_hint']}\n")
# Add MultiEdit info if present
if 'multi_edit_index' in failure:
f.write(f"**MultiEdit**: Failed on edit {failure['multi_edit_index'] + 1} of {failure['multi_edit_total']}\n")
f.write("\n")
if failure.get('assistant_message'):
f.write("**ASSISTANT REASONING:**\n")
f.write("```\n")
f.write(failure['assistant_message'])
f.write("\n```\n\n")
# For MultiEdit, show all edits in the group
if 'multi_edit_all' in failure:
f.write("**ALL EDITS IN THIS MULTIEDIT:**\n\n")
for edit_idx, edit in enumerate(failure['multi_edit_all']):
is_failed = edit_idx == failure['multi_edit_index']
status = "❌ FAILED" if is_failed else "⚪ NOT ATTEMPTED"
f.write(f"##### Edit {edit_idx + 1} of {failure['multi_edit_total']} - {status}\n\n")
f.write("**OLD STRING:**\n```\n")
f.write(edit.get('old_string', 'N/A'))
f.write("\n```\n\n")
f.write("**NEW STRING:**\n```\n")
f.write(edit.get('new_string', 'N/A'))
f.write("\n```\n")
if edit.get('replace_all'):
f.write("**Replace All**: Yes\n")
f.write("\n")
else:
# Single edit failure
f.write("**OLD STRING (looking for this):**\n")
f.write("```\n")
f.write(failure.get('old_string', failure.get('old_string_preview', 'N/A')))
f.write("\n```\n\n")
f.write("**NEW STRING (wanted to change to this):**\n")
f.write("```\n")
f.write(failure.get('new_string', 'N/A'))
f.write("\n```\n\n")
f.write("-" * 80 + "\n\n")
self.log(f"Edit failure report written to {report_path}")
# Also write tool errors report if it exists
if self.tool_errors:
self.write_tool_errors_report()
def write_tool_errors_report(self):
"""Write a detailed report of tool errors"""
if not self.tool_errors:
return
report_path = self.output_dir / "tool_errors.md"
with open(report_path, 'w') as f:
f.write("# Tool Errors Report\n")
f.write(f"Generated at: {datetime.now().isoformat()}\n\n")
f.write("## Summary\n")
f.write(f"- Total tool errors: {len(self.tool_errors):,}\n")
f.write(f"- User rejections: {self.tool_rejections:,}\n")
f.write(f"- Unique error types: {len(self.tool_error_types):,}\n\n")
# Show error type breakdown
f.write("## Error Types Breakdown\n\n")
f.write("| Error Category | Count | Percentage |\n")
f.write("|----------------|-------|------------|\n")
total = len(self.tool_errors)
for error_type, count in sorted(self.tool_error_types.items(), key=lambda x: x[1], reverse=True):
percentage = (count / total) * 100 if total > 0 else 0
display_type = error_type if len(error_type) <= 80 else error_type[:77] + "..."
f.write(f"| {display_type} | {count:,} | {percentage:.1f}% |\n")
f.write("\n")
# Show sample errors for each type
f.write("## Sample Errors by Type\n\n")
for error_type in sorted(self.tool_error_types.keys()):
# Get sample errors of this type
samples = [e for e in self.tool_errors if e['type'] == error_type][:3] # Show up to 3 samples
if samples:
f.write(f"### {error_type} ({self.tool_error_types[error_type]:,} occurrences)\n\n")
for i, sample in enumerate(samples, 1):
f.write(f"**Sample {i}:**\n")
f.write(f"- Version: {sample['version']}\n")
f.write(f"- Timestamp: {sample['timestamp']}\n")
f.write(f"- Source: `{sample['source']}`\n")
if sample.get('tool_use_id'):
f.write(f"- Tool Use ID: {sample['tool_use_id']}\n")
f.write(f"- Full Error Text:\n")
f.write("```\n")
f.write(sample['text'])
f.write("\n```\n\n")
self.log(f"Tool errors report written to {report_path}")
def write_unknown_entries_report(self):
"""Write a report of unknown/unhandled entries"""
# Write tool errors to a separate file
if self.tool_errors:
self.write_tool_errors_report()
report_path = self.output_dir / "unknown_entries.md"
# If there are no unknown entries, write a minimal report or remove the file
if not self.unknown_entry_types and not self.entries_after_recovery and not hasattr(self, 'benign_entry_types'):
# Write a minimal report indicating no unknown entries
with open(report_path, 'w') as f:
f.write("# Unknown/Unhandled Entries Report\n")
f.write(f"Generated at: {datetime.now().isoformat()}\n\n")
f.write("## Summary\n")
f.write("✅ **No unknown entries found!**\n\n")
f.write(f"- Total entries processed: {self.stats['entries']:,}\n")
f.write(f"- Tool errors: {len(self.tool_errors):,} (see tool_errors.md)\n")
f.write(f"- Benign entries: {self.stats.get('entries_benign', 0):,}\n")
return
with open(report_path, 'w') as f:
f.write("# Unknown/Unhandled Entries Report\n")
f.write(f"Generated at: {datetime.now().isoformat()}\n\n")
f.write("## Summary\n")
f.write(f"- Total entries in JSONL files: {self.stats['entries']:,}\n")
f.write(f"- Entries processed successfully: {self.stats['entries_processed']:,}\n")
f.write(f"- Benign entries (summary/system): {self.stats.get('entries_benign', 0):,}\n")
f.write(f"- Unknown/unhandled entries: {self.stats['entries_unknown']:,}\n")
f.write(f"- Tool errors: {len(self.tool_errors):,}\n")
f.write(f"- Tool rejections by user: {self.tool_rejections:,}\n")
f.write(f"- Entries after file recovery: {self.stats['entries_after_recovery']:,}\n\n")
# Show Claude Code versions seen
if hasattr(self, 'versions_seen') and self.versions_seen:
f.write("## Claude Code Versions\n\n")
f.write("| Version | Entry Count |\n")
f.write("|---------|------------|\n")
for version, count in sorted(self.versions_seen.items(), key=lambda x: semver_sort_key(x[0])):
f.write(f"| {version} | {count:,} |\n")
f.write("\n")
# Show version-specific errors if any
if hasattr(self, 'version_errors') and self.version_errors:
f.write("## Errors by Version\n\n")
for version, errors in sorted(self.version_errors.items(), key=lambda x: semver_sort_key(x[0])):
f.write(f"### Version {version} ({len(errors)} errors)\n")
for i, error in enumerate(errors, 1): # Show ALL errors per version
f.write(f"- Error {i}: {error['error']} at {error['source']}")
if error.get('tool') != 'unknown':
f.write(f" (Tool: {error['tool']})")
f.write("\n")
f.write("\n")
# Tool errors are now in a separate file
if self.tool_error_types:
f.write("## Tool Execution Errors\n\n")
f.write(f"Tool errors have been moved to a separate file: `tool_errors.md`\n")
f.write(f"Total tool errors: {len(self.tool_errors):,}\n")
f.write(f"User rejections: {self.tool_rejections:,}\n\n")
if hasattr(self, 'benign_entry_types') and self.benign_entry_types:
f.write("## Benign Entry Types\n\n")
f.write("| Entry Type | Count | Description |\n")
f.write("|------------|-------|-------------|\n")
for entry_type, count in sorted(self.benign_entry_types.items(), key=lambda x: x[1], reverse=True):
desc = "Status messages" if entry_type == "summary" else "System messages"
f.write(f"| {entry_type} | {count:,} | {desc} |\n")
f.write("\n")
if self.unknown_entry_types:
f.write("## Unknown Entry Types\n\n")
f.write("| Entry Type | Count |\n")
f.write("|------------|-------|\n")
for entry_type, count in sorted(self.unknown_entry_types.items(), key=lambda x: x[1], reverse=True):
f.write(f"| {entry_type} | {count:,} |\n")
f.write("\n")
# Show parse errors in detail
if hasattr(self, 'parse_errors') and self.parse_errors:
f.write(f"## Parse Errors ({len(self.parse_errors)} total)\n\n")
f.write("These entries could not be parsed correctly:\n\n")
for i, error in enumerate(self.parse_errors, 1):
f.write(f"### Parse Error #{i}\n")
f.write(f"- **Source**: {error['source']}\n")
f.write(f"- **Version**: {error.get('version', 'unknown')}\n")
f.write(f"- **Error**: {error['error']}\n")
f.write(f"- **Entry Keys**: {error['entry_keys']}\n")
f.write(f"- **Entry Snippet**:\n```\n{error['entry_snippet']}\n```\n\n")
if self.unknown_entries:
f.write(f"## All Unknown Entries ({len(self.unknown_entries)} total)\n\n")
for i, entry in enumerate(self.unknown_entries, 1):
f.write(f"### Unknown Entry #{i}\n")
f.write(f"- **Type**: {entry['type']}\n")
f.write(f"- **Source**: {entry['source']}\n")
f.write(f"- **Timestamp**: {entry['timestamp']}\n")
f.write(f"- **UUID**: {entry['uuid']}\n")
f.write(f"- **Keys**: {entry['sample_keys']}\n")
f.write(f"- **Full Entry**:\n```json\n{entry['entry_snippet']}\n```\n\n")
if self.entries_after_recovery:
f.write("## Entries After File Recovery\n\n")
f.write("These entries occurred after files were already recovered and may contain newer edits:\n\n")
for file_path, entries in sorted(self.entries_after_recovery.items()):
f.write(f"### {file_path}\n")
f.write(f"- {len(entries)} entries after recovery\n")
for entry in entries: # Show ALL entries
f.write(f" - {entry['type']} at {entry['timestamp']} ({entry['source']})\n")
f.write("\n")
self.log(f"Unknown entries report written to {report_path}")
def phase4_analyze_findings(self):
"""Phase 4: Analyze what we found in the JSONL files"""
self.log("PHASE 4: Analyzing findings...")
# Report on UUID relationships
self.log(f"UUID graph entries: {len(self.uuid_graph)}")
# Report on file content found outside originalFile
self.log(f"Files with content in toolUseResult.file: {len(self.file_content_entries)}")
for file_path, entries in self.file_content_entries.items():
if entries:
self.log(f" {file_path}: {len(entries)} content entries found")
# Check for missing edits
total_expected = self.stats['edits_collected']
total_processed = self.stats['edits_applicable'] + self.stats['edits_skipped']
if total_expected != total_processed:
self.log(f"WARNING: Edit count mismatch!")
self.log(f" Collected: {total_expected}")
self.log(f" Processed: {total_processed}")
self.log(f" Missing: {total_expected - total_processed}")
# Check for files with content but no snapshots
files_with_content_only = set(self.file_content_entries.keys()) - set(self.file_snapshots.keys())
if files_with_content_only:
self.log(f"Files with content but no originalFile snapshot: {len(files_with_content_only)}")
for fp in files_with_content_only: # Show all files
self.log(f" - {fp}")
# Report on unknown entries
self.log(f"\nUnknown/Unhandled Entries Analysis:")
self.log(f" Total entries: {self.stats['entries']:,}")
self.log(f" Entries processed: {self.stats['entries_processed']:,}")
self.log(f" Benign entries (summary/system): {self.stats.get('entries_benign', 0):,}")
self.log(f" Unknown entries: {self.stats['entries_unknown']:,}")
if hasattr(self, 'parse_errors') and self.parse_errors:
self.log(f" Parse errors: {len(self.parse_errors):,}")
self.log(f" Tool errors: {len(self.tool_errors):,}")
self.log(f" Tool rejections: {self.tool_rejections:,}")
self.log(f" Entries after recovery: {self.stats['entries_after_recovery']:,}")
# Report on Claude Code versions
if hasattr(self, 'versions_seen') and self.versions_seen:
self.log(f"\n Claude Code versions detected:")
for version, count in sorted(self.versions_seen.items(), key=lambda x: semver_sort_key(x[0])):
errors = len(self.version_errors.get(version, [])) if hasattr(self, 'version_errors') else 0
if errors > 0:
self.log(f" - v{version}: {count:,} entries ({errors} errors)")
else:
self.log(f" - v{version}: {count:,} entries")
if self.tool_error_types:
self.log(f"\n Tool error types breakdown:")
for error_type, count in sorted(self.tool_error_types.items(), key=lambda x: x[1], reverse=True):
self.log(f" - {error_type}: {count:,}")
if self.unknown_entry_types:
self.log(f"\n Unknown entry types breakdown:")
for entry_type, count in sorted(self.unknown_entry_types.items(), key=lambda x: x[1], reverse=True):
self.log(f" - {entry_type}: {count:,}")
# Generate unknown entries report
if not self.dry_run:
self.write_unknown_entries_report()
def generate_report(self) -> str:
"""Generate a recovery report"""
report_lines = [
"# JSONL Recovery Report",
f"Generated at: {datetime.now().isoformat()}",
"",
"## Statistics",
f"- JSONL files processed: {self.stats['sessions']}",
f"- Total entries processed: {self.stats['entries']:,}",
f"- Files with snapshots found: {self.stats['snapshots']}",
f"- File read operations found: {self.stats['file_reads']}",
f"- Files successfully recovered: {len(self.recovered_files)}",
"",
"## Edit Statistics",
f"- Total edits collected: {self.stats['edits_collected']}",
f"- Edits skipped (before snapshot): {self.stats['edits_skipped']}",
f"- Edits applicable (after snapshot): {self.stats['edits_applicable']}",
f"- Edits successfully applied: {self.stats['edits_successful']}",
f"- Edits failed: {self.stats['edits_failed']}",
f"- Conflicts: {self.stats['conflicts']}",
"",
"## Other",
f"- Chat messages extracted: {self.stats['messages']}",
f"- Git commits found: {self.stats['commits']}",
"",
"## Recovered Files"
]
for file_path in sorted(self.recovered_files.keys()):
size = len(self.recovered_files[file_path])
snapshot = self.file_snapshots.get(file_path)
snapshot_time = snapshot.timestamp.isoformat() if snapshot else "unknown"
edits_count = len([e for e in self.file_edits.get(file_path, [])
if not snapshot or e.timestamp > snapshot.timestamp])
report_lines.append(f"- `{file_path}`")
report_lines.append(f" - Size: {size:,} bytes")
report_lines.append(f" - Snapshot from: {snapshot_time}")
report_lines.append(f" - Edits applied: {edits_count}")
if self.stats['conflicts'] > 0:
report_lines.extend([
"",
"## Conflicts",
f"Total conflicts encountered: {self.stats['conflicts']}",
"Run with --interactive flag to resolve conflicts manually"
])
report = "\n".join(report_lines)
if not self.dry_run:
report_path = self.output_dir / "recovery_report.md"
with open(report_path, 'w') as f:
f.write(report)
self.log(f"Recovery report written to {report_path}")
return report
def run(self, filter_pattern: Optional[str] = None):
"""Run the complete recovery process"""
self.log("Starting JSONL recovery process...")
# Create output directories
if not self.dry_run:
self.output_dir.mkdir(parents=True, exist_ok=True)
# Phase 1: Extract snapshots
self.phase1_extract_snapshots(filter_pattern)
# Phase 2: Apply edits
self.phase2_apply_edits()
# Phase 3: Extract chat
self.phase3_extract_chat()
# Phase 4: Analyze what we found
self.phase4_analyze_findings()
# Write recovered files
self.write_recovered_files()
# Generate report
self.generate_report()
# Write edit failure report if there were failures
if hasattr(self, 'failed_edits') and self.failed_edits:
self.write_edit_failure_report()
# Summary
self.log("="*60)
self.log("Recovery complete!")
self.log(f"Files recovered: {len(self.recovered_files)}")
self.log(f"File reads found: {self.stats['file_reads']}")
self.log(f"Edits collected: {self.stats['edits_collected']}")
self.log(f" - Skipped (before snapshot): {self.stats['edits_skipped']}")
self.log(f" - Applicable (after snapshot): {self.stats['edits_applicable']}")
self.log(f" - Successfully applied: {self.stats['edits_successful']}")
self.log(f" - Failed to apply: {self.stats['edits_failed']}")
self.log(f"Conflicts: {self.stats['conflicts']}")
self.log(f"Chat messages: {self.stats['messages']}")
self.log(f"Benign entries (summary/system): {self.stats.get('entries_benign', 0)}")
self.log(f"Tool errors: {len(self.tool_errors)}")
if self.tool_rejections > 0:
self.log(f" - User rejections: {self.tool_rejections}")
self.log(f"Unknown entries: {self.stats['entries_unknown']} (see unknown_entries.md for details)")
if hasattr(self, 'parse_errors') and self.parse_errors:
self.log(f" - Parse errors: {len(self.parse_errors)}")
self.log(f"Entries after recovery: {self.stats['entries_after_recovery']}")
if hasattr(self, 'failed_edits') and self.failed_edits:
self.log(f"Edit failures: {len(self.failed_edits)} (see edit_failures.md for details)")
if self.save_rejects and not self.dry_run:
self.log(f"Reject files saved to: {self.reject_dir}")
def main():
parser = argparse.ArgumentParser(
description="Recover files from Claude Code JSONL session files using three-phase approach"
)
parser.add_argument("--jsonl-dir", required=True,
help="Directory containing JSONL files (required, typically ~/.claude/projects/YOUR_PROJECT)")
parser.add_argument("--output-dir", default="recovered_files",
help="Output directory for recovered files")
parser.add_argument("--chat-dir",
help="Output directory for chat messages (default: output-dir/chat)")
parser.add_argument("--dry-run", action="store_true",
help="Show what would be recovered without writing files")
parser.add_argument("--filter", dest="filter_pattern",
help="Only recover files matching regex pattern")
parser.add_argument("--verbose", action="store_true",
help="Show detailed progress")
parser.add_argument("--interactive", action="store_true",
help="Interactively resolve edit conflicts")
parser.add_argument("--use-earliest-snapshot", action="store_true",
help="Use earliest snapshot instead of latest (applies more edits)")
parser.add_argument("--no-save-rejects", action="store_true",
help="Don't save rejected edits to separate files")
args = parser.parse_args()
if not Path(args.jsonl_dir).exists():
print(f"ERROR: JSONL directory does not exist: {args.jsonl_dir}")
print("\nTypical locations for Claude Code session files:")
print(" ~/.claude/projects/[YOUR_PROJECT_PATH]/")
print("\nExample:")
print(" python3 claude_code_recovery.py --jsonl-dir ~/.claude/projects/-Users-yourname-myproject")
sys.exit(1)
tool = JSONLRecoveryTool(
jsonl_dir=args.jsonl_dir,
output_dir=args.output_dir,
chat_dir=args.chat_dir,
dry_run=args.dry_run,
verbose=args.verbose,
interactive=args.interactive,
use_earliest_snapshot=args.use_earliest_snapshot,
save_rejects=not args.no_save_rejects
)
tool.run(filter_pattern=args.filter_pattern)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment