Created
September 26, 2025 19:47
-
-
Save sean-/af49dbae971ce79c911196c016a816fb to your computer and use it in GitHub Desktop.
Emergency recovery tool for Claude Code session files
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """claude_code_recovery.py - Emergency recovery tool for Claude Code session files | |
| Copyright (c) 2025 Sean Chittenden | |
| All rights reserved. | |
| Redistribution and use in source and binary forms, with or without | |
| modification, are permitted provided that the following conditions are met: | |
| 1. Redistributions of source code must retain the above copyright notice, this | |
| list of conditions and the following disclaimer. | |
| 2. Redistributions in binary form must reproduce the above copyright notice, | |
| this list of conditions and the following disclaimer in the documentation | |
| and/or other materials provided with the distribution. | |
| THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | |
| AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |
| IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | |
| DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE | |
| FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | |
| DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR | |
| SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER | |
| CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, | |
| OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | |
| OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |
| ================================================================================ | |
| ABOUT THIS TOOL: | |
| This recovery tool exists because Claude "helpfully" decided: | |
| ``` | |
| ⏺ Now I need to update the wipe operation to remove the parent directory (not just pgdata), but still check for the safety file in the pgdata subdirectory: | |
| ``` | |
| and changed: | |
| os.RemoveAll(p.PgData) // Remove data directory | |
| to: | |
| os.RemoveAll(parentDir) // Remove parent data directory (which contains pgdata subdirectory) | |
| ...which promptly deleted the entire project directory including 20 days of | |
| un-pushed work and the .git directory. Oops. 🤦 | |
| Fortunately, Claude Code keeps detailed JSONL session files of all interactions, | |
| including file contents, edits, and changes. This tool reconstructs your lost | |
| files by: | |
| 1. Finding the latest snapshot of each file (from Write operations) | |
| 2. Applying all subsequent edits chronologically | |
| 3. Extracting additional context from chat messages | |
| The recovered files isn't perfect - some recent changes might be missed if they | |
| weren't captured in the session files, and complex MultiEdit operations may have | |
| partial failures. But it's infinitely better than losing everything and it helps | |
| structure the recovery effort so you can get back on your feet sooner rather | |
| than later. | |
| Remember kids: Always push your commits and keep backups. And maybe don't let | |
| an AI suggest filesystem operations on parent directories. 😅 | |
| USAGE: | |
| python3 claude_code_recovery.py --jsonl-dir ~/.claude/projects/YOUR_PROJECT_PATH --output-dir recovered_files | |
| The JSONL files are typically stored in: | |
| ~/.claude/projects/[project-path]/ | |
| Each session is stored as a UUID.jsonl file containing all interactions from that session. | |
| OPTIONS: | |
| --jsonl-dir DIR Directory containing JSONL files (required) | |
| --output-dir DIR Output directory for recovered files (default: recovered_files) | |
| --dry-run Show what would be recovered without writing files | |
| --filter PATTERN Only recover files matching regex pattern | |
| --verbose Show detailed progress | |
| --interactive Interactively resolve edit conflicts | |
| --use-earliest-snapshot Use earliest snapshot instead of latest (applies more edits) | |
| --no-save-rejects Don't save rejected edits to separate files | |
| WHAT GETS RECOVERED: | |
| - Files from 'Write' operations (full snapshots) | |
| - Files modified via 'Edit' and 'MultiEdit' operations | |
| - Git commit messages from chat | |
| - File content shown in 'Read' operations (as reference) | |
| WHAT MIGHT BE MISSING: | |
| - Very recent changes not yet in session files | |
| - Files only viewed but never edited | |
| - Binary files (images, compiled code, etc.) | |
| - Files created outside of Claude Code | |
| ERROR HANDLING: | |
| - Tool execution errors are tracked in tool_errors.md | |
| - Failed edits are documented in edit_failures.md | |
| - Unknown entry types are logged in unknown_entries.md | |
| - Rejected edits can be saved for manual review | |
| This tool is provided as-is, with apologies for its existence being necessary. | |
| May your future commits be frequent and your backups plentiful. 🙏 | |
| """ | |
| import json | |
| import re | |
| import argparse | |
| from pathlib import Path | |
| from datetime import datetime | |
| from collections import defaultdict, Counter | |
| from typing import Dict, List, Optional, Set, Tuple, Any | |
| from dataclasses import dataclass | |
| import sys | |
| @dataclass | |
| class FileSnapshot: | |
| """Represents a complete file snapshot from an originalFile entry""" | |
| file_path: str | |
| content: str | |
| timestamp: datetime | |
| session_id: str | |
| uuid: str | |
| source_file: str = "unknown" | |
| source_line: int = 0 | |
| @dataclass | |
| class EditOperation: | |
| """Represents an edit operation to apply to a file""" | |
| file_path: str | |
| old_string: str | |
| new_string: str | |
| timestamp: datetime | |
| session_id: str | |
| uuid: str | |
| tool_name: str = 'Edit' | |
| replace_all: bool = False | |
| source_file: str = "unknown" | |
| source_line: int = 0 | |
| assistant_message: str = "" # Store assistant's reasoning/message | |
| @dataclass | |
| class ChatMessage: | |
| """Represents a chat message""" | |
| role: str | |
| content: str | |
| timestamp: datetime | |
| session_id: str | |
| uuid: str | |
| commit_message: Optional[str] = None | |
| def semver_sort_key(version_str): | |
| """Convert version string to tuple for proper sorting""" | |
| try: | |
| parts = version_str.split('.') | |
| return tuple(int(p) for p in parts) | |
| except (ValueError, AttributeError): | |
| return (0, 0, 0) # Default for unparseable versions | |
| class JSONLRecoveryTool: | |
| def __init__(self, jsonl_dir: str, output_dir: str = "recovered_files", | |
| chat_dir: Optional[str] = None, | |
| dry_run: bool = False, verbose: bool = False, | |
| interactive: bool = False, use_earliest_snapshot: bool = False, | |
| save_rejects: bool = True): | |
| self.jsonl_dir = Path(jsonl_dir) | |
| self.output_dir = Path(output_dir) | |
| self.chat_dir = Path(chat_dir) if chat_dir else self.output_dir / "chat" | |
| self.dry_run = dry_run | |
| self.verbose = verbose | |
| self.interactive = interactive | |
| self.use_earliest_snapshot = use_earliest_snapshot | |
| self.save_rejects = save_rejects | |
| # Add reject directory | |
| self.reject_dir = self.output_dir / "rejected_edits" | |
| # Storage for recovered data | |
| self.file_snapshots: Dict[str, FileSnapshot] = {} | |
| self.file_edits: Dict[str, List[EditOperation]] = defaultdict(list) | |
| self.file_content_entries: Dict[str, List[dict]] = defaultdict(list) # Track file content from toolUseResult.file | |
| self.chat_messages: List[ChatMessage] = [] | |
| self.recovered_files: Dict[str, str] = {} | |
| self.entries_after_recovery: Dict[str, List[dict]] = defaultdict(list) # Track entries after file recovery | |
| # UUID relationship tracking | |
| self.uuid_graph: Dict[str, dict] = {} # Track parent-child relationships | |
| # Track unknown entries | |
| self.unknown_entries: List[dict] = [] # Entries we don't know how to process | |
| self.unknown_entry_types: Counter = Counter() # Count types of unknown entries | |
| # Track tool errors | |
| self.tool_errors: List[dict] = [] # Store all tool errors | |
| self.tool_error_types: Counter = Counter() # Count by error type | |
| self.tool_rejections: int = 0 # Count of user rejections | |
| # Statistics | |
| self.stats = { | |
| 'sessions': 0, | |
| 'snapshots': 0, | |
| 'edits_collected': 0, | |
| 'edits_skipped': 0, # Edits that occurred before snapshot | |
| 'edits_applicable': 0, # Edits that occurred after snapshot | |
| 'edits_successful': 0, | |
| 'edits_failed': 0, | |
| 'conflicts': 0, | |
| 'messages': 0, | |
| 'commits': 0, | |
| 'file_reads': 0, | |
| 'entries': 0, | |
| 'entries_processed': 0, # Entries we handled | |
| 'entries_unknown': 0, # Entries we don't know how to handle | |
| 'entries_benign': 0, # Benign entries (summary, system) | |
| 'entries_after_recovery': 0, # Entries that came after file recovery | |
| } | |
| def log(self, message: str, level: str = "INFO"): | |
| """Log message with timestamp""" | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| print(f"[{timestamp}] [{level}] {message}") | |
| def find_jsonl_files(self) -> List[Path]: | |
| """Find all JSONL files in the directory""" | |
| jsonl_files = list(self.jsonl_dir.glob("*.jsonl")) | |
| self.log(f"Found {len(jsonl_files)} JSONL files in {self.jsonl_dir}") | |
| return sorted(jsonl_files) | |
| def find_approximate_line(self, content: str, search_str: str, context_chars: int = 50) -> int: | |
| """Find approximate line number where string might be expected""" | |
| lines = content.split('\n') | |
| search_lower = search_str.lower()[:context_chars] | |
| for i, line in enumerate(lines, 1): | |
| if search_lower in line.lower(): | |
| return i | |
| # If not found, return middle of file | |
| return len(lines) // 2 | |
| def handle_failed_edit(self, file_path: str, edit: EditOperation, content: str, reason: str): | |
| """Handle a failed edit operation""" | |
| self.stats['edits_failed'] += 1 | |
| self.stats['conflicts'] += 1 | |
| if self.interactive: | |
| print(f"\n{'='*60}") | |
| print(f"EDIT CONFLICT in {file_path}") | |
| print(f"Reason: {reason}") | |
| print(f"Timestamp: {edit.timestamp}") | |
| print(f"Looking for:") | |
| print(edit.old_string[:200] + "..." if len(edit.old_string) > 200 else edit.old_string) | |
| print("\nOptions:") | |
| print("1. Skip this edit") | |
| print("2. Show full old_string") | |
| print("3. Show current file content") | |
| print("4. Apply anyway (force)") | |
| choice = input("Choice [1-4, default=1]: ").strip() | |
| if choice == '2': | |
| print("\nFull old_string:") | |
| print(edit.old_string) | |
| return self.handle_failed_edit(file_path, edit, content, reason) | |
| elif choice == '3': | |
| print("\nCurrent file content:") | |
| print(content[:1000] + "..." if len(content) > 1000 else content) | |
| return self.handle_failed_edit(file_path, edit, content, reason) | |
| elif choice == '4': | |
| return content.replace(edit.old_string, edit.new_string, 1 if not edit.replace_all else -1) | |
| return content | |
| def save_reject_file(self, edit: EditOperation, file_path: str, reason: str): | |
| """Save rejected edit to a file for manual review""" | |
| if not self.save_rejects or self.dry_run: | |
| return | |
| self.reject_dir.mkdir(parents=True, exist_ok=True) | |
| # Create a unique filename for this reject | |
| timestamp_str = edit.timestamp.strftime("%Y%m%d_%H%M%S") | |
| safe_filename = re.sub(r'[^a-zA-Z0-9_-]', '_', Path(file_path).name) | |
| reject_file = self.reject_dir / f"{timestamp_str}_{safe_filename}.reject.md" | |
| with open(reject_file, 'w') as f: | |
| f.write(f"# Rejected Edit\n\n") | |
| f.write(f"**File**: {file_path}\n") | |
| f.write(f"**Timestamp**: {edit.timestamp}\n") | |
| f.write(f"**Reason**: {reason}\n") | |
| f.write(f"**Tool**: {edit.tool_name}\n") | |
| f.write(f"**Source**: {edit.source_file}:{edit.source_line}\n\n") | |
| if edit.assistant_message: | |
| f.write(f"## Assistant Message\n") | |
| f.write(f"{edit.assistant_message}\n\n") | |
| f.write(f"## Old String (looking for this)\n") | |
| f.write("```\n") | |
| f.write(edit.old_string) | |
| f.write("\n```\n\n") | |
| f.write(f"## New String (wanted to change to)\n") | |
| f.write("```\n") | |
| f.write(edit.new_string) | |
| f.write("\n```\n") | |
| def parse_jsonl_entry(self, entry: dict, source_file: str = "unknown", source_line: int = 0, | |
| previous_assistant_message: str = "") -> str: | |
| """Parse a single JSONL entry and extract relevant information. | |
| Returns the assistant message if this entry contains one, otherwise returns empty string.""" | |
| current_assistant_message = "" | |
| entry_handled = False | |
| try: | |
| if not isinstance(entry, dict): | |
| self.log(f"ERROR: Entry is not a dict at {source_file}:{source_line}, it's a {type(entry)}", "ERROR") | |
| self.stats['entries_unknown'] += 1 | |
| return "" | |
| # Handle timestamp parsing with better error handling | |
| timestamp = None | |
| timestamp_str = entry.get('timestamp') | |
| if timestamp_str: | |
| try: | |
| # Handle both formats: with and without timezone | |
| if timestamp_str.endswith('Z'): | |
| timestamp_str = timestamp_str[:-1] + '+00:00' | |
| timestamp = datetime.fromisoformat(timestamp_str) | |
| except (ValueError, AttributeError) as e: | |
| self.log(f"Warning: Invalid timestamp format at {source_file}:{source_line}: {timestamp_str}", "WARN") | |
| timestamp = datetime.now() | |
| else: | |
| timestamp = datetime.now() | |
| session_id = entry.get('sessionId', 'unknown') | |
| uuid = entry.get('uuid', 'unknown') | |
| version = entry.get('version', 'unknown') | |
| # Track Claude Code versions | |
| if version != 'unknown': | |
| if not hasattr(self, 'versions_seen'): | |
| self.versions_seen = Counter() | |
| self.versions_seen[version] += 1 | |
| # Track assistant messages for context | |
| if entry.get('type') == 'assistant' and 'message' in entry: | |
| message = entry.get('message', {}) | |
| if isinstance(message, dict) and 'content' in message: | |
| content_items = message.get('content', []) | |
| if isinstance(content_items, list): | |
| for item in content_items: | |
| if isinstance(item, dict) and item.get('type') == 'text': | |
| text = item.get('text', '') | |
| if text: | |
| current_assistant_message = text | |
| break | |
| # Track UUID relationships | |
| if uuid and uuid != 'unknown': | |
| self.uuid_graph[uuid] = entry | |
| if 'parentUuid' in entry: | |
| parent_uuid = entry['parentUuid'] | |
| # Track parent/child relationship | |
| entry['_parent'] = parent_uuid | |
| # Check for tool results (edits, writes, etc.) | |
| if 'toolUseResult' in entry: | |
| tool_result = entry['toolUseResult'] | |
| # Only process if it's a dict (not an error string) | |
| if isinstance(tool_result, dict): | |
| self.process_tool_result(tool_result, timestamp, session_id, uuid, source_file, source_line, previous_assistant_message) | |
| entry_handled = True | |
| # Also check for file content in toolUseResult.file | |
| if 'file' in tool_result and 'content' in tool_result['file']: | |
| file_path = tool_result['file'].get('filePath') | |
| if file_path: | |
| content_entry = { | |
| 'file_path': file_path, | |
| 'content': tool_result['file']['content'], | |
| 'timestamp': timestamp, | |
| 'uuid': uuid, | |
| 'parent_uuid': entry.get('parentUuid'), | |
| 'source': f"{source_file}:{source_line}", | |
| 'type': 'file_read' | |
| } | |
| self.file_content_entries[file_path].append(content_entry) | |
| self.stats['file_reads'] += 1 | |
| self.log(f"Found file content for {file_path} in toolUseResult.file at {timestamp}") | |
| # Check if this is after the file was already recovered | |
| if file_path in self.recovered_files: | |
| self.entries_after_recovery[file_path].append({ | |
| 'timestamp': timestamp, | |
| 'type': 'file_read_after_recovery', | |
| 'source': f"{source_file}:{source_line}" | |
| }) | |
| self.stats['entries_after_recovery'] += 1 | |
| # Check for tool errors (user role with error content) | |
| tool_error_found = False | |
| if entry.get('type') == 'user' and 'toolUseResult' in entry: | |
| tool_result = entry['toolUseResult'] | |
| if isinstance(tool_result, str) and tool_result.startswith('Error:'): | |
| self.log(f"DEBUG: Found toolUseResult error at {source_file}:{source_line}", "DEBUG") | |
| self.process_tool_error(entry, timestamp, version, source_file, source_line) | |
| tool_error_found = True | |
| entry_handled = True | |
| # Check for messages (chat content) - always check for tool errors in message | |
| if 'message' in entry: | |
| message = entry['message'] | |
| # Handle both dict and string message formats | |
| if isinstance(message, dict): | |
| # Check for tool errors in message content | |
| if message.get('role') == 'user' and isinstance(message.get('content'), list): | |
| has_error = False | |
| for content_item in message['content']: | |
| if isinstance(content_item, dict) and content_item.get('is_error'): | |
| self.log(f"DEBUG: Found is_error in message content at {source_file}:{source_line}", "DEBUG") | |
| self.process_tool_error_from_content(content_item, entry, timestamp, version, source_file, source_line) | |
| has_error = True | |
| if has_error: | |
| entry_handled = True | |
| elif not tool_error_found: | |
| # Only process as regular message if no errors found anywhere | |
| self.process_message(message, timestamp, session_id, uuid, source_file, source_line) | |
| entry_handled = True | |
| elif not tool_error_found: | |
| # Process other messages only if no tool error was found | |
| self.process_message(message, timestamp, session_id, uuid, source_file, source_line) | |
| entry_handled = True | |
| elif isinstance(message, str) and not tool_error_found: | |
| # Convert string message to dict format | |
| self.process_message({'content': message, 'role': 'unknown'}, timestamp, session_id, uuid, source_file, source_line) | |
| entry_handled = True | |
| # Check for tool use (to identify tool names) | |
| if entry.get('type') == 'assistant' and 'message' in entry: | |
| message = entry['message'] # Get the actual message | |
| if isinstance(message, dict) and 'content' in message: | |
| for content in message.get('content', []): | |
| if isinstance(content, dict) and content.get('type') == 'tool_use': | |
| # Pass version info for debugging | |
| self.process_tool_use(content, timestamp, session_id, uuid, source_file, source_line, | |
| current_assistant_message or previous_assistant_message, version) | |
| entry_handled = True | |
| # Check for benign entry types (summary, system) | |
| entry_type = entry.get('type', 'no_type') | |
| if entry_type in ['summary', 'system']: | |
| # These are benign status messages, track separately | |
| self.stats['entries_benign'] = self.stats.get('entries_benign', 0) + 1 | |
| if entry_type not in self.benign_entry_types: | |
| self.benign_entry_types = getattr(self, 'benign_entry_types', {}) | |
| self.benign_entry_types[entry_type] = 0 | |
| self.benign_entry_types[entry_type] += 1 | |
| entry_handled = True # Mark as handled since they're expected | |
| # Track if this entry wasn't handled (unknown) | |
| if not entry_handled: | |
| # Double-check it's not a tool error we missed | |
| is_tool_error = False | |
| # Check for toolUseResult errors | |
| if entry.get('type') == 'user' and 'toolUseResult' in entry: | |
| tool_result = entry['toolUseResult'] | |
| if isinstance(tool_result, str) and tool_result.startswith('Error:'): | |
| is_tool_error = True | |
| self.log(f"WARNING: Missed tool error (toolUseResult) at {source_file}:{source_line}", "WARN") | |
| self.process_tool_error(entry, timestamp, version, source_file, source_line) | |
| # Check for is_error in message content | |
| if not is_tool_error and 'message' in entry: | |
| message = entry['message'] | |
| if isinstance(message, dict) and message.get('role') == 'user': | |
| content = message.get('content', []) | |
| if isinstance(content, list): | |
| for item in content: | |
| if isinstance(item, dict) and item.get('is_error'): | |
| is_tool_error = True | |
| self.log(f"WARNING: Missed tool error (is_error) at {source_file}:{source_line}", "WARN") | |
| self.process_tool_error_from_content(item, entry, timestamp, version, source_file, source_line) | |
| break | |
| # Only add to unknown if it's really not a tool error | |
| if not is_tool_error: | |
| self.stats['entries_unknown'] += 1 | |
| # Determine entry type for categorization | |
| if not entry_type or entry_type == 'no_type': | |
| # Try to infer type from keys | |
| if 'toolUseRequest' in entry: | |
| entry_type = 'toolUseRequest' | |
| elif 'error' in entry: | |
| entry_type = 'error' | |
| elif 'message' in entry: | |
| entry_type = 'message_unknown_format' | |
| else: | |
| entry_type = f"unknown_keys:{','.join(list(entry.keys()))}" | |
| self.unknown_entry_types[entry_type] += 1 | |
| # Store ALL unknown entries (no limiting) | |
| self.unknown_entries.append({ | |
| 'type': entry_type, | |
| 'source': f"{source_file}:{source_line}", | |
| 'timestamp': timestamp_str, | |
| 'uuid': uuid, | |
| 'sample_keys': list(entry.keys()), # All keys | |
| 'entry_snippet': str(entry) # Full entry | |
| }) | |
| else: | |
| self.stats['entries_processed'] += 1 | |
| except Exception as e: | |
| # Wrap entire exception handler in try-except to catch errors in the handler itself | |
| try: | |
| self.log(f"Error parsing entry at {source_file}:{source_line}: {e}", "WARN") | |
| self.stats['entries_unknown'] += 1 | |
| self.unknown_entry_types['parse_error'] += 1 | |
| # Track parse errors in detail | |
| if not hasattr(self, 'parse_errors'): | |
| self.parse_errors = [] | |
| version = entry.get('version', 'unknown') if isinstance(entry, dict) else 'unknown' | |
| self.parse_errors.append({ | |
| 'source': f"{source_file}:{source_line}", | |
| 'error': str(e), | |
| 'version': version, | |
| 'entry_keys': list(entry.keys()) if isinstance(entry, dict) else 'not_a_dict', | |
| 'entry_snippet': str(entry) if entry else 'None' | |
| }) | |
| # Track version-specific errors | |
| if version != 'unknown': | |
| if not hasattr(self, 'version_errors'): | |
| self.version_errors = {} | |
| if version not in self.version_errors: | |
| self.version_errors[version] = [] | |
| self.version_errors[version].append({ | |
| 'source': f"{source_file}:{source_line}", | |
| 'error': str(e), | |
| 'tool': 'unknown' # Will be updated if we can determine the tool | |
| }) | |
| except Exception as handler_error: | |
| # If the exception handler itself fails, just log it simply | |
| self.log(f"CRITICAL: Exception handler failed at {source_file}:{source_line}: {handler_error}", "ERROR") | |
| self.log(f"Original error was: {e}", "ERROR") | |
| self.stats['entries_unknown'] += 1 | |
| self.unknown_entry_types['parse_error'] += 1 | |
| return current_assistant_message | |
| def process_tool_result(self, tool_result: dict, timestamp: datetime, session_id: str, | |
| uuid: str, source_file: str, source_line: int, assistant_message: str = ""): | |
| """Process toolUseResult entries""" | |
| if 'originalFile' in tool_result: | |
| # This is a file snapshot | |
| file_info = tool_result['originalFile'] | |
| file_path = file_info.get('filePath') | |
| if file_path: | |
| content = file_info.get('content', '') | |
| snapshot = FileSnapshot( | |
| file_path=file_path, | |
| content=content, | |
| timestamp=timestamp, | |
| session_id=session_id, | |
| uuid=uuid, | |
| source_file=source_file, | |
| source_line=source_line | |
| ) | |
| # Keep either earliest or latest snapshot based on configuration | |
| if self.use_earliest_snapshot: | |
| # Keep the earliest snapshot for each file | |
| if file_path not in self.file_snapshots or snapshot.timestamp < self.file_snapshots[file_path].timestamp: | |
| self.file_snapshots[file_path] = snapshot | |
| self.stats['snapshots'] += 1 | |
| self.log(f"Found originalFile snapshot for {file_path} at {timestamp}") | |
| else: | |
| # Keep only the latest snapshot for each file | |
| if file_path not in self.file_snapshots or snapshot.timestamp > self.file_snapshots[file_path].timestamp: | |
| self.file_snapshots[file_path] = snapshot | |
| self.stats['snapshots'] += 1 | |
| self.log(f"Found originalFile snapshot for {file_path} at {timestamp}") | |
| def process_tool_use(self, tool_use: dict, timestamp: datetime, session_id: str, | |
| uuid: str, source_file: str, source_line: int, assistant_message: str = "", | |
| version: str = "unknown"): | |
| """Process tool_use entries to extract edits and writes""" | |
| tool_name = tool_use.get('name') | |
| tool_input = tool_use.get('input', {}) | |
| if tool_name == 'Write': | |
| file_path = tool_input.get('file_path') | |
| content_text = tool_input.get('content', '') | |
| if file_path: | |
| # Create a snapshot for Write operations | |
| snapshot = FileSnapshot( | |
| file_path=file_path, | |
| content=content_text, | |
| timestamp=timestamp, | |
| session_id=session_id, | |
| uuid=uuid, | |
| source_file=source_file, | |
| source_line=source_line | |
| ) | |
| # Keep either earliest or latest snapshot based on configuration | |
| if self.use_earliest_snapshot: | |
| # Keep the earliest snapshot for each file | |
| if file_path not in self.file_snapshots or snapshot.timestamp < self.file_snapshots[file_path].timestamp: | |
| self.file_snapshots[file_path] = snapshot | |
| self.stats['snapshots'] += 1 | |
| self.log(f"Found Write operation for {file_path} at {timestamp}") | |
| else: | |
| # Keep only the latest snapshot for each file | |
| if file_path not in self.file_snapshots or snapshot.timestamp > self.file_snapshots[file_path].timestamp: | |
| self.file_snapshots[file_path] = snapshot | |
| self.stats['snapshots'] += 1 | |
| self.log(f"Found Write operation for {file_path} at {timestamp}") | |
| elif tool_name == 'Edit': | |
| file_path = tool_input.get('file_path') | |
| if file_path: | |
| # Check if we have old_string and new_string in input | |
| if 'old_string' in tool_input and 'new_string' in tool_input: | |
| edit = EditOperation( | |
| file_path=file_path, | |
| old_string=tool_input.get('old_string', ''), | |
| new_string=tool_input.get('new_string', ''), | |
| timestamp=timestamp, | |
| session_id=session_id, | |
| uuid=uuid, | |
| tool_name='Edit', | |
| replace_all=tool_input.get('replace_all', False), | |
| source_file=source_file, | |
| source_line=source_line, | |
| assistant_message=assistant_message | |
| ) | |
| self.file_edits[file_path].append(edit) | |
| self.stats['edits_collected'] += 1 | |
| self.log(f"Found Edit operation for {file_path} at {timestamp}") | |
| else: | |
| self.log(f"Edit tool_use missing old_string or new_string for {file_path} in v{version}", "WARN") | |
| elif tool_name == 'MultiEdit': | |
| # MultiEdit has an array of edits | |
| file_path = tool_input.get('file_path') | |
| if file_path: | |
| edits_array = tool_input.get('edits', []) | |
| edits_found = 0 | |
| # Create a unique ID for this MultiEdit group | |
| multi_edit_group_id = f"{uuid}_{timestamp.isoformat()}" | |
| for edit_idx, edit_op in enumerate(edits_array): | |
| if 'old_string' in edit_op and 'new_string' in edit_op: | |
| edit = EditOperation( | |
| file_path=file_path, | |
| old_string=edit_op.get('old_string', ''), | |
| new_string=edit_op.get('new_string', ''), | |
| timestamp=timestamp, | |
| session_id=session_id, | |
| uuid=uuid, | |
| tool_name='MultiEdit', | |
| replace_all=edit_op.get('replace_all', False), | |
| source_file=source_file, | |
| source_line=source_line, | |
| assistant_message=assistant_message | |
| ) | |
| # Add MultiEdit-specific metadata | |
| edit.multi_edit_group_id = multi_edit_group_id | |
| edit.multi_edit_index = edit_idx | |
| edit.multi_edit_total = len(edits_array) | |
| # Store all edit operations from the group for reporting | |
| edit.multi_edit_all = edits_array | |
| self.file_edits[file_path].append(edit) | |
| self.stats['edits_collected'] += 1 | |
| edits_found += 1 | |
| if edits_found > 0: | |
| self.log(f"Found MultiEdit operation for {file_path} with {edits_found} edits at {timestamp}") | |
| else: | |
| self.log(f"MultiEdit tool_use missing edits content for {file_path} in v{version}", "WARN") | |
| def normalize_error_text(self, error_text: str) -> str: | |
| """Normalize error text by replacing dynamic parts with placeholders""" | |
| import re | |
| # Normalize "Found N matches" pattern | |
| normalized = re.sub(r'Found \d+ matches', 'Found [N] matches', error_text) | |
| # Normalize file paths that look like /path/to/file | |
| normalized = re.sub(r'/[\w/.-]+', '[PATH]', normalized) | |
| # Normalize line numbers | |
| normalized = re.sub(r'line \d+', 'line [N]', normalized) | |
| normalized = re.sub(r':\d+', ':[N]', normalized) | |
| # Normalize UUIDs | |
| normalized = re.sub(r'[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}', '[UUID]', normalized) | |
| return normalized | |
| def categorize_error(self, error_text: str) -> str: | |
| """Categorize error - keep semantic categories for known patterns""" | |
| # Check for specific patterns first | |
| if "doesn't want to proceed" in error_text or "tool use was rejected" in error_text: | |
| return "user_rejected" | |
| elif "File has not been read yet" in error_text: | |
| return "file_not_read" | |
| elif "String to replace not found" in error_text or "Old string not found" in error_text: | |
| return "string_not_found" | |
| elif "File not found" in error_text: | |
| return "file_not_found" | |
| elif "Permission denied" in error_text: | |
| return "permission_denied" | |
| elif "already exists" in error_text: | |
| return "already_exists" | |
| else: | |
| # For other errors, use normalized version as category | |
| return self.normalize_error_text(error_text) | |
| def process_tool_error(self, entry: dict, timestamp: datetime, version: str, source_file: str, source_line: int): | |
| """Process a tool use error entry""" | |
| error_text = entry.get('toolUseResult', '') | |
| parent_uuid = entry.get('parentUuid', '') | |
| # Categorize the error | |
| error_type = self.categorize_error(error_text) | |
| # Track error | |
| if error_type not in self.tool_error_types: | |
| self.tool_error_types[error_type] = 0 | |
| self.tool_error_types[error_type] += 1 | |
| # Track rejections specifically | |
| if error_type == "user_rejected": | |
| self.tool_rejections += 1 | |
| # Store error details - FULL TEXT, no limiting | |
| self.tool_errors.append({ | |
| 'type': error_type, | |
| 'text': error_text, # Full text, no limiting | |
| 'original_text': error_text, # Keep original too | |
| 'parent_uuid': parent_uuid, | |
| 'version': version, | |
| 'timestamp': timestamp, | |
| 'source': f"{source_file}:{source_line}" | |
| }) | |
| self.log(f"Tool error ({error_type}) in v{version} - Total tool errors so far: {len(self.tool_errors)}") | |
| def process_tool_error_from_content(self, content_item: dict, entry: dict, timestamp: datetime, | |
| version: str, source_file: str, source_line: int): | |
| """Process tool error from message content""" | |
| error_text = content_item.get('content', '') | |
| tool_use_id = content_item.get('tool_use_id', '') | |
| parent_uuid = entry.get('parentUuid', '') | |
| # Remove <tool_use_error> tags if present | |
| if '<tool_use_error>' in error_text: | |
| error_text = error_text.replace('<tool_use_error>', '').replace('</tool_use_error>', '').strip() | |
| # Categorize the error | |
| error_type = self.categorize_error(error_text) | |
| # Track error | |
| if error_type not in self.tool_error_types: | |
| self.tool_error_types[error_type] = 0 | |
| self.tool_error_types[error_type] += 1 | |
| # Track rejections specifically | |
| if error_type == "user_rejected": | |
| self.tool_rejections += 1 | |
| # Store error details - FULL TEXT, no limiting | |
| self.tool_errors.append({ | |
| 'type': error_type, | |
| 'text': error_text, # Full text, no limiting | |
| 'original_text': error_text, # Keep original too | |
| 'tool_use_id': tool_use_id, | |
| 'parent_uuid': parent_uuid, | |
| 'version': version, | |
| 'timestamp': timestamp, | |
| 'source': f"{source_file}:{source_line}" | |
| }) | |
| self.log(f"Tool error ({error_type}) for tool {tool_use_id} in v{version} - Total tool errors so far: {len(self.tool_errors)}") | |
| def process_message(self, message: dict, timestamp: datetime, session_id: str, uuid: str, source_file: str = "unknown", source_line: int = 0) -> None: | |
| """Process chat messages""" | |
| # Ensure message is a dict | |
| if not isinstance(message, dict): | |
| self.log(f"Warning: process_message received non-dict: {type(message)}", "WARN") | |
| return | |
| role = message.get('role', 'unknown') | |
| # Extract text content | |
| content_text = "" | |
| commit_message = None | |
| message_content = message.get('content') | |
| if isinstance(message_content, str): | |
| content_text = message_content | |
| elif isinstance(message_content, list): | |
| for content in message_content: | |
| if isinstance(content, dict) and content.get('type') == 'text': | |
| text = content.get('text', '') | |
| content_text += text + "\n" | |
| # Check for git commit messages | |
| if 'git commit' in text.lower() and '-m' in text: | |
| # Extract commit message | |
| import re | |
| commit_match = re.search(r'-m\s*["\'](.*?)["\']', text) | |
| if not commit_match: | |
| # Try heredoc style | |
| commit_match = re.search(r'cat\s*<<.*?\n(.*?)\nEOF', text, re.DOTALL) | |
| if commit_match: | |
| commit_message = commit_match.group(1) | |
| self.stats['commits'] += 1 | |
| if content_text: | |
| msg = ChatMessage( | |
| role=role, | |
| content=content_text, | |
| timestamp=timestamp, | |
| session_id=session_id, | |
| uuid=uuid, | |
| commit_message=commit_message | |
| ) | |
| self.chat_messages.append(msg) | |
| self.stats['messages'] += 1 | |
| def phase1_extract_snapshots(self, filter_pattern: Optional[str] = None): | |
| """Phase 1: Extract all originalFile snapshots""" | |
| snapshot_mode = "EARLIEST" if self.use_earliest_snapshot else "LATEST" | |
| self.log(f"PHASE 1: Extracting originalFile snapshots (using {snapshot_mode} snapshot for each file)...") | |
| jsonl_files = self.find_jsonl_files() | |
| for jsonl_file in jsonl_files: | |
| self.log(f"Processing {jsonl_file.name} for snapshots...") | |
| try: | |
| with open(jsonl_file, 'r', encoding='utf-8') as f: | |
| previous_assistant_message = "" | |
| full_path = str(jsonl_file.absolute()) | |
| for line_num, line in enumerate(f, 1): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| entry = json.loads(line) | |
| self.stats['entries'] += 1 | |
| # Pass the previous assistant message and get the current one | |
| assistant_msg = self.parse_jsonl_entry(entry, source_file=full_path, | |
| source_line=line_num, | |
| previous_assistant_message=previous_assistant_message) | |
| # Update for next iteration | |
| if assistant_msg: | |
| previous_assistant_message = assistant_msg | |
| except json.JSONDecodeError as e: | |
| self.log(f"Warning: Invalid JSON on line {line_num} in {jsonl_file.name}: {e}", "WARN") | |
| continue | |
| except Exception as e: | |
| self.log(f"Warning: Error processing line {line_num} in {jsonl_file.name}: {e}", "WARN") | |
| continue | |
| except Exception as e: | |
| self.log(f"Error reading {jsonl_file}: {e}, continuing with next file", "ERROR") | |
| continue | |
| self.stats['sessions'] += 1 | |
| # Filter snapshots if pattern specified | |
| if filter_pattern: | |
| filtered = {} | |
| for file_path, snapshot in self.file_snapshots.items(): | |
| if re.search(filter_pattern, file_path): | |
| filtered[file_path] = snapshot | |
| self.file_snapshots = filtered | |
| self.log(f"Found {len(self.file_snapshots)} unique files with snapshots") | |
| def phase2_apply_edits(self): | |
| """Phase 2: Apply all edits in chronological order""" | |
| self.log("PHASE 2: Applying edits to snapshots...") | |
| # Track edit failures for reporting | |
| self.failed_edits = [] | |
| # Log summary of all edits collected | |
| total_file_edits = sum(len(edits) for edits in self.file_edits.values()) | |
| self.log(f"Total edits collected across all files: {total_file_edits}") | |
| self.log(f"Files with edits: {len(self.file_edits)}") | |
| # Log edits per file | |
| for fp, edits in self.file_edits.items(): | |
| if edits: | |
| self.log(f" {fp}: {len(edits)} edits") | |
| if self.verbose: | |
| for i, e in enumerate(edits): # Show all edits | |
| self.log(f" Edit {i+1}: {e.timestamp} ({e.tool_name})") | |
| for file_path, snapshot in self.file_snapshots.items(): | |
| content = snapshot.content | |
| # Get all edits for this file after the snapshot timestamp | |
| file_edits = self.file_edits.get(file_path, []) | |
| # Filter edits that occurred after the snapshot | |
| applicable_edits = [e for e in file_edits if e.timestamp > snapshot.timestamp] | |
| skipped_edits = [e for e in file_edits if e.timestamp <= snapshot.timestamp] | |
| # Update statistics | |
| self.stats['edits_applicable'] += len(applicable_edits) | |
| self.stats['edits_skipped'] += len(skipped_edits) | |
| # Sort by timestamp (chronological order) | |
| applicable_edits.sort(key=lambda e: e.timestamp) | |
| self.log(f"File: {file_path}") | |
| self.log(f" Snapshot timestamp: {snapshot.timestamp}") | |
| self.log(f" Total edits for file: {len(file_edits)}") | |
| self.log(f" Edits BEFORE snapshot (skipped): {len(skipped_edits)}") | |
| self.log(f" Applicable edits (after snapshot): {len(applicable_edits)}") | |
| for idx, edit in enumerate(applicable_edits, 1): | |
| old_content = content | |
| # Try to apply the edit | |
| if edit.replace_all: | |
| # Replace all occurrences | |
| if edit.old_string in content: | |
| occurrences = content.count(edit.old_string) | |
| content = content.replace(edit.old_string, edit.new_string) | |
| self.stats['edits_successful'] += 1 | |
| self.log(f" ✓ Edit {idx}/{len(applicable_edits)}: SUCCEEDED (replaced {occurrences} occurrences)") | |
| else: | |
| # Find line number where edit was expected | |
| line_num = self.find_approximate_line(content, edit.old_string) | |
| reason = f"Old string not found (replace_all) near line {line_num}" | |
| self.handle_failed_edit(file_path, edit, content, reason) | |
| self.save_reject_file(edit, file_path, reason) | |
| self.log(f" ✗ Edit {idx}/{len(applicable_edits)}: FAILED - Old string not found (replace_all)") | |
| self.log(f" Source: {edit.source_file}:{edit.source_line}") | |
| self.log(f" Tool: {edit.tool_name}") | |
| if edit.assistant_message: | |
| self.log(f" Assistant reasoning: {edit.assistant_message}") | |
| self.log(f" FULL OLD STRING:") | |
| self.log(f" {repr(edit.old_string)}") | |
| self.log(f" FULL NEW STRING:") | |
| self.log(f" {repr(edit.new_string)}") | |
| failed_edit_dict = { | |
| 'file': file_path, | |
| 'edit_num': idx, | |
| 'timestamp': edit.timestamp, | |
| 'reason': 'Old string not found (replace_all)', | |
| 'line_hint': line_num, | |
| 'old_string': edit.old_string, # Full string, not preview | |
| 'new_string': edit.new_string, # Include what we wanted to change to | |
| 'source_file': edit.source_file, | |
| 'source_line': edit.source_line, | |
| 'tool_name': edit.tool_name, | |
| 'assistant_message': edit.assistant_message | |
| } | |
| # Add MultiEdit information if present | |
| if hasattr(edit, 'multi_edit_group_id'): | |
| failed_edit_dict['multi_edit_group_id'] = edit.multi_edit_group_id | |
| failed_edit_dict['multi_edit_index'] = edit.multi_edit_index | |
| failed_edit_dict['multi_edit_total'] = edit.multi_edit_total | |
| if hasattr(edit, 'multi_edit_all'): | |
| failed_edit_dict['multi_edit_all'] = edit.multi_edit_all | |
| self.failed_edits.append(failed_edit_dict) | |
| else: | |
| # Replace first occurrence only | |
| if edit.old_string in content: | |
| content = content.replace(edit.old_string, edit.new_string, 1) | |
| self.stats['edits_successful'] += 1 | |
| self.log(f" ✓ Edit {idx}/{len(applicable_edits)}: SUCCEEDED") | |
| else: | |
| line_num = self.find_approximate_line(content, edit.old_string) | |
| reason = f"Old string not found at expected location (line ~{line_num})" | |
| self.handle_failed_edit(file_path, edit, content, reason) | |
| self.save_reject_file(edit, file_path, reason) | |
| self.log(f" ✗ Edit {idx}/{len(applicable_edits)}: FAILED - Old string not found") | |
| self.log(f" Source: {edit.source_file}:{edit.source_line}") | |
| self.log(f" Tool: {edit.tool_name}") | |
| if edit.assistant_message: | |
| self.log(f" Assistant reasoning: {edit.assistant_message}") | |
| self.log(f" FULL OLD STRING:") | |
| self.log(f" {repr(edit.old_string)}") | |
| self.log(f" FULL NEW STRING:") | |
| self.log(f" {repr(edit.new_string)}") | |
| failed_edit_dict = { | |
| 'file': file_path, | |
| 'edit_num': idx, | |
| 'timestamp': edit.timestamp, | |
| 'reason': f'Old string not found at expected location (line ~{line_num})', | |
| 'line_hint': line_num, | |
| 'old_string': edit.old_string, # Full string, not preview | |
| 'new_string': edit.new_string, # Include what we wanted to change to | |
| 'source_file': edit.source_file, | |
| 'source_line': edit.source_line, | |
| 'tool_name': edit.tool_name, | |
| 'assistant_message': edit.assistant_message | |
| } | |
| # Add MultiEdit information if present | |
| if hasattr(edit, 'multi_edit_group_id'): | |
| failed_edit_dict['multi_edit_group_id'] = edit.multi_edit_group_id | |
| failed_edit_dict['multi_edit_index'] = edit.multi_edit_index | |
| failed_edit_dict['multi_edit_total'] = edit.multi_edit_total | |
| if hasattr(edit, 'multi_edit_all'): | |
| failed_edit_dict['multi_edit_all'] = edit.multi_edit_all | |
| self.failed_edits.append(failed_edit_dict) | |
| self.recovered_files[file_path] = content | |
| self.log(f" Final content length: {len(content)} bytes\n") | |
| def phase3_extract_chat(self): | |
| """Phase 3: Extract chat messages and commits""" | |
| self.log("PHASE 3: Extracting chat messages...") | |
| # Chat is already extracted during phase 1 | |
| self.log(f"Found {len(self.chat_messages)} chat messages") | |
| # Count commits | |
| commits = [m for m in self.chat_messages if m.commit_message] | |
| self.log(f"Found {len(commits)} git commits") | |
| # Write chat to files | |
| if not self.dry_run: | |
| self.chat_dir.mkdir(parents=True, exist_ok=True) | |
| # Group messages by session | |
| by_session = defaultdict(list) | |
| for msg in self.chat_messages: | |
| by_session[msg.session_id].append(msg) | |
| for session_id, messages in by_session.items(): | |
| session_file = self.chat_dir / f"session_{session_id[:8]}.md" | |
| with open(session_file, 'w') as f: | |
| f.write(f"# Chat Session {session_id[:8]}\n\n") | |
| for msg in sorted(messages, key=lambda m: m.timestamp): | |
| f.write(f"## {msg.timestamp} - {msg.role}\n") | |
| if msg.commit_message: | |
| f.write(f"**COMMIT**: {msg.commit_message}\n\n") | |
| f.write(msg.content) | |
| f.write("\n\n---\n\n") | |
| def write_recovered_files(self): | |
| """Write all recovered files to disk""" | |
| if self.dry_run: | |
| self.log("DRY RUN - Not writing files") | |
| return | |
| self.log("Writing recovered files...") | |
| for file_path, content in self.recovered_files.items(): | |
| output_path = self.output_dir / file_path.lstrip('/') | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(output_path, 'w') as f: | |
| f.write(content) | |
| self.log(f" Wrote {len(content)} bytes to {output_path}") | |
| def write_edit_failure_report(self): | |
| """Write detailed edit failure report""" | |
| if not hasattr(self, 'failed_edits') or not self.failed_edits: | |
| return | |
| report_path = self.output_dir / "edit_failures.md" | |
| with open(report_path, 'w') as f: | |
| f.write("# Edit Failure Report\n") | |
| f.write(f"Generated at: {datetime.now().isoformat()}\n\n") | |
| f.write(f"## Summary\n") | |
| f.write(f"- Total edits collected from JSONL: {self.stats['edits_collected']}\n") | |
| f.write(f"- Edits skipped (before snapshot): {self.stats['edits_skipped']}\n") | |
| f.write(f"- Edits applicable (after snapshot): {self.stats['edits_applicable']}\n") | |
| f.write(f"- Edits successfully applied: {self.stats['edits_successful']}\n") | |
| f.write(f"- Edit failures: {len(self.failed_edits)}\n\n") | |
| f.write("## Failed Edits by File\n\n") | |
| # Group failures by file | |
| by_file = {} | |
| for failure in self.failed_edits: | |
| file_path = failure['file'] | |
| if file_path not in by_file: | |
| by_file[file_path] = [] | |
| by_file[file_path].append(failure) | |
| for file_path, failures in sorted(by_file.items()): | |
| f.write(f"### FILE: {file_path}\n") | |
| f.write(f"Total failed edits for this file: {len(failures)}\n\n") | |
| for idx, failure in enumerate(failures, 1): | |
| f.write(f"#### ===== FAILED EDIT #{idx} for {file_path} =====\n") | |
| f.write(f"**Full Source Path**: `{failure['source_file']}:{failure['source_line']}`\n") | |
| f.write(f"**Tool Type**: {failure.get('tool_name', 'Unknown')}\n") | |
| f.write(f"**Timestamp**: {failure['timestamp']}\n") | |
| f.write(f"**Failure Reason**: {failure['reason']}\n") | |
| f.write(f"**Approximate line in target file**: {failure['line_hint']}\n") | |
| # Add MultiEdit info if present | |
| if 'multi_edit_index' in failure: | |
| f.write(f"**MultiEdit**: Failed on edit {failure['multi_edit_index'] + 1} of {failure['multi_edit_total']}\n") | |
| f.write("\n") | |
| if failure.get('assistant_message'): | |
| f.write("**ASSISTANT REASONING:**\n") | |
| f.write("```\n") | |
| f.write(failure['assistant_message']) | |
| f.write("\n```\n\n") | |
| # For MultiEdit, show all edits in the group | |
| if 'multi_edit_all' in failure: | |
| f.write("**ALL EDITS IN THIS MULTIEDIT:**\n\n") | |
| for edit_idx, edit in enumerate(failure['multi_edit_all']): | |
| is_failed = edit_idx == failure['multi_edit_index'] | |
| status = "❌ FAILED" if is_failed else "⚪ NOT ATTEMPTED" | |
| f.write(f"##### Edit {edit_idx + 1} of {failure['multi_edit_total']} - {status}\n\n") | |
| f.write("**OLD STRING:**\n```\n") | |
| f.write(edit.get('old_string', 'N/A')) | |
| f.write("\n```\n\n") | |
| f.write("**NEW STRING:**\n```\n") | |
| f.write(edit.get('new_string', 'N/A')) | |
| f.write("\n```\n") | |
| if edit.get('replace_all'): | |
| f.write("**Replace All**: Yes\n") | |
| f.write("\n") | |
| else: | |
| # Single edit failure | |
| f.write("**OLD STRING (looking for this):**\n") | |
| f.write("```\n") | |
| f.write(failure.get('old_string', failure.get('old_string_preview', 'N/A'))) | |
| f.write("\n```\n\n") | |
| f.write("**NEW STRING (wanted to change to this):**\n") | |
| f.write("```\n") | |
| f.write(failure.get('new_string', 'N/A')) | |
| f.write("\n```\n\n") | |
| f.write("-" * 80 + "\n\n") | |
| self.log(f"Edit failure report written to {report_path}") | |
| # Also write tool errors report if it exists | |
| if self.tool_errors: | |
| self.write_tool_errors_report() | |
| def write_tool_errors_report(self): | |
| """Write a detailed report of tool errors""" | |
| if not self.tool_errors: | |
| return | |
| report_path = self.output_dir / "tool_errors.md" | |
| with open(report_path, 'w') as f: | |
| f.write("# Tool Errors Report\n") | |
| f.write(f"Generated at: {datetime.now().isoformat()}\n\n") | |
| f.write("## Summary\n") | |
| f.write(f"- Total tool errors: {len(self.tool_errors):,}\n") | |
| f.write(f"- User rejections: {self.tool_rejections:,}\n") | |
| f.write(f"- Unique error types: {len(self.tool_error_types):,}\n\n") | |
| # Show error type breakdown | |
| f.write("## Error Types Breakdown\n\n") | |
| f.write("| Error Category | Count | Percentage |\n") | |
| f.write("|----------------|-------|------------|\n") | |
| total = len(self.tool_errors) | |
| for error_type, count in sorted(self.tool_error_types.items(), key=lambda x: x[1], reverse=True): | |
| percentage = (count / total) * 100 if total > 0 else 0 | |
| display_type = error_type if len(error_type) <= 80 else error_type[:77] + "..." | |
| f.write(f"| {display_type} | {count:,} | {percentage:.1f}% |\n") | |
| f.write("\n") | |
| # Show sample errors for each type | |
| f.write("## Sample Errors by Type\n\n") | |
| for error_type in sorted(self.tool_error_types.keys()): | |
| # Get sample errors of this type | |
| samples = [e for e in self.tool_errors if e['type'] == error_type][:3] # Show up to 3 samples | |
| if samples: | |
| f.write(f"### {error_type} ({self.tool_error_types[error_type]:,} occurrences)\n\n") | |
| for i, sample in enumerate(samples, 1): | |
| f.write(f"**Sample {i}:**\n") | |
| f.write(f"- Version: {sample['version']}\n") | |
| f.write(f"- Timestamp: {sample['timestamp']}\n") | |
| f.write(f"- Source: `{sample['source']}`\n") | |
| if sample.get('tool_use_id'): | |
| f.write(f"- Tool Use ID: {sample['tool_use_id']}\n") | |
| f.write(f"- Full Error Text:\n") | |
| f.write("```\n") | |
| f.write(sample['text']) | |
| f.write("\n```\n\n") | |
| self.log(f"Tool errors report written to {report_path}") | |
| def write_unknown_entries_report(self): | |
| """Write a report of unknown/unhandled entries""" | |
| # Write tool errors to a separate file | |
| if self.tool_errors: | |
| self.write_tool_errors_report() | |
| report_path = self.output_dir / "unknown_entries.md" | |
| # If there are no unknown entries, write a minimal report or remove the file | |
| if not self.unknown_entry_types and not self.entries_after_recovery and not hasattr(self, 'benign_entry_types'): | |
| # Write a minimal report indicating no unknown entries | |
| with open(report_path, 'w') as f: | |
| f.write("# Unknown/Unhandled Entries Report\n") | |
| f.write(f"Generated at: {datetime.now().isoformat()}\n\n") | |
| f.write("## Summary\n") | |
| f.write("✅ **No unknown entries found!**\n\n") | |
| f.write(f"- Total entries processed: {self.stats['entries']:,}\n") | |
| f.write(f"- Tool errors: {len(self.tool_errors):,} (see tool_errors.md)\n") | |
| f.write(f"- Benign entries: {self.stats.get('entries_benign', 0):,}\n") | |
| return | |
| with open(report_path, 'w') as f: | |
| f.write("# Unknown/Unhandled Entries Report\n") | |
| f.write(f"Generated at: {datetime.now().isoformat()}\n\n") | |
| f.write("## Summary\n") | |
| f.write(f"- Total entries in JSONL files: {self.stats['entries']:,}\n") | |
| f.write(f"- Entries processed successfully: {self.stats['entries_processed']:,}\n") | |
| f.write(f"- Benign entries (summary/system): {self.stats.get('entries_benign', 0):,}\n") | |
| f.write(f"- Unknown/unhandled entries: {self.stats['entries_unknown']:,}\n") | |
| f.write(f"- Tool errors: {len(self.tool_errors):,}\n") | |
| f.write(f"- Tool rejections by user: {self.tool_rejections:,}\n") | |
| f.write(f"- Entries after file recovery: {self.stats['entries_after_recovery']:,}\n\n") | |
| # Show Claude Code versions seen | |
| if hasattr(self, 'versions_seen') and self.versions_seen: | |
| f.write("## Claude Code Versions\n\n") | |
| f.write("| Version | Entry Count |\n") | |
| f.write("|---------|------------|\n") | |
| for version, count in sorted(self.versions_seen.items(), key=lambda x: semver_sort_key(x[0])): | |
| f.write(f"| {version} | {count:,} |\n") | |
| f.write("\n") | |
| # Show version-specific errors if any | |
| if hasattr(self, 'version_errors') and self.version_errors: | |
| f.write("## Errors by Version\n\n") | |
| for version, errors in sorted(self.version_errors.items(), key=lambda x: semver_sort_key(x[0])): | |
| f.write(f"### Version {version} ({len(errors)} errors)\n") | |
| for i, error in enumerate(errors, 1): # Show ALL errors per version | |
| f.write(f"- Error {i}: {error['error']} at {error['source']}") | |
| if error.get('tool') != 'unknown': | |
| f.write(f" (Tool: {error['tool']})") | |
| f.write("\n") | |
| f.write("\n") | |
| # Tool errors are now in a separate file | |
| if self.tool_error_types: | |
| f.write("## Tool Execution Errors\n\n") | |
| f.write(f"Tool errors have been moved to a separate file: `tool_errors.md`\n") | |
| f.write(f"Total tool errors: {len(self.tool_errors):,}\n") | |
| f.write(f"User rejections: {self.tool_rejections:,}\n\n") | |
| if hasattr(self, 'benign_entry_types') and self.benign_entry_types: | |
| f.write("## Benign Entry Types\n\n") | |
| f.write("| Entry Type | Count | Description |\n") | |
| f.write("|------------|-------|-------------|\n") | |
| for entry_type, count in sorted(self.benign_entry_types.items(), key=lambda x: x[1], reverse=True): | |
| desc = "Status messages" if entry_type == "summary" else "System messages" | |
| f.write(f"| {entry_type} | {count:,} | {desc} |\n") | |
| f.write("\n") | |
| if self.unknown_entry_types: | |
| f.write("## Unknown Entry Types\n\n") | |
| f.write("| Entry Type | Count |\n") | |
| f.write("|------------|-------|\n") | |
| for entry_type, count in sorted(self.unknown_entry_types.items(), key=lambda x: x[1], reverse=True): | |
| f.write(f"| {entry_type} | {count:,} |\n") | |
| f.write("\n") | |
| # Show parse errors in detail | |
| if hasattr(self, 'parse_errors') and self.parse_errors: | |
| f.write(f"## Parse Errors ({len(self.parse_errors)} total)\n\n") | |
| f.write("These entries could not be parsed correctly:\n\n") | |
| for i, error in enumerate(self.parse_errors, 1): | |
| f.write(f"### Parse Error #{i}\n") | |
| f.write(f"- **Source**: {error['source']}\n") | |
| f.write(f"- **Version**: {error.get('version', 'unknown')}\n") | |
| f.write(f"- **Error**: {error['error']}\n") | |
| f.write(f"- **Entry Keys**: {error['entry_keys']}\n") | |
| f.write(f"- **Entry Snippet**:\n```\n{error['entry_snippet']}\n```\n\n") | |
| if self.unknown_entries: | |
| f.write(f"## All Unknown Entries ({len(self.unknown_entries)} total)\n\n") | |
| for i, entry in enumerate(self.unknown_entries, 1): | |
| f.write(f"### Unknown Entry #{i}\n") | |
| f.write(f"- **Type**: {entry['type']}\n") | |
| f.write(f"- **Source**: {entry['source']}\n") | |
| f.write(f"- **Timestamp**: {entry['timestamp']}\n") | |
| f.write(f"- **UUID**: {entry['uuid']}\n") | |
| f.write(f"- **Keys**: {entry['sample_keys']}\n") | |
| f.write(f"- **Full Entry**:\n```json\n{entry['entry_snippet']}\n```\n\n") | |
| if self.entries_after_recovery: | |
| f.write("## Entries After File Recovery\n\n") | |
| f.write("These entries occurred after files were already recovered and may contain newer edits:\n\n") | |
| for file_path, entries in sorted(self.entries_after_recovery.items()): | |
| f.write(f"### {file_path}\n") | |
| f.write(f"- {len(entries)} entries after recovery\n") | |
| for entry in entries: # Show ALL entries | |
| f.write(f" - {entry['type']} at {entry['timestamp']} ({entry['source']})\n") | |
| f.write("\n") | |
| self.log(f"Unknown entries report written to {report_path}") | |
| def phase4_analyze_findings(self): | |
| """Phase 4: Analyze what we found in the JSONL files""" | |
| self.log("PHASE 4: Analyzing findings...") | |
| # Report on UUID relationships | |
| self.log(f"UUID graph entries: {len(self.uuid_graph)}") | |
| # Report on file content found outside originalFile | |
| self.log(f"Files with content in toolUseResult.file: {len(self.file_content_entries)}") | |
| for file_path, entries in self.file_content_entries.items(): | |
| if entries: | |
| self.log(f" {file_path}: {len(entries)} content entries found") | |
| # Check for missing edits | |
| total_expected = self.stats['edits_collected'] | |
| total_processed = self.stats['edits_applicable'] + self.stats['edits_skipped'] | |
| if total_expected != total_processed: | |
| self.log(f"WARNING: Edit count mismatch!") | |
| self.log(f" Collected: {total_expected}") | |
| self.log(f" Processed: {total_processed}") | |
| self.log(f" Missing: {total_expected - total_processed}") | |
| # Check for files with content but no snapshots | |
| files_with_content_only = set(self.file_content_entries.keys()) - set(self.file_snapshots.keys()) | |
| if files_with_content_only: | |
| self.log(f"Files with content but no originalFile snapshot: {len(files_with_content_only)}") | |
| for fp in files_with_content_only: # Show all files | |
| self.log(f" - {fp}") | |
| # Report on unknown entries | |
| self.log(f"\nUnknown/Unhandled Entries Analysis:") | |
| self.log(f" Total entries: {self.stats['entries']:,}") | |
| self.log(f" Entries processed: {self.stats['entries_processed']:,}") | |
| self.log(f" Benign entries (summary/system): {self.stats.get('entries_benign', 0):,}") | |
| self.log(f" Unknown entries: {self.stats['entries_unknown']:,}") | |
| if hasattr(self, 'parse_errors') and self.parse_errors: | |
| self.log(f" Parse errors: {len(self.parse_errors):,}") | |
| self.log(f" Tool errors: {len(self.tool_errors):,}") | |
| self.log(f" Tool rejections: {self.tool_rejections:,}") | |
| self.log(f" Entries after recovery: {self.stats['entries_after_recovery']:,}") | |
| # Report on Claude Code versions | |
| if hasattr(self, 'versions_seen') and self.versions_seen: | |
| self.log(f"\n Claude Code versions detected:") | |
| for version, count in sorted(self.versions_seen.items(), key=lambda x: semver_sort_key(x[0])): | |
| errors = len(self.version_errors.get(version, [])) if hasattr(self, 'version_errors') else 0 | |
| if errors > 0: | |
| self.log(f" - v{version}: {count:,} entries ({errors} errors)") | |
| else: | |
| self.log(f" - v{version}: {count:,} entries") | |
| if self.tool_error_types: | |
| self.log(f"\n Tool error types breakdown:") | |
| for error_type, count in sorted(self.tool_error_types.items(), key=lambda x: x[1], reverse=True): | |
| self.log(f" - {error_type}: {count:,}") | |
| if self.unknown_entry_types: | |
| self.log(f"\n Unknown entry types breakdown:") | |
| for entry_type, count in sorted(self.unknown_entry_types.items(), key=lambda x: x[1], reverse=True): | |
| self.log(f" - {entry_type}: {count:,}") | |
| # Generate unknown entries report | |
| if not self.dry_run: | |
| self.write_unknown_entries_report() | |
| def generate_report(self) -> str: | |
| """Generate a recovery report""" | |
| report_lines = [ | |
| "# JSONL Recovery Report", | |
| f"Generated at: {datetime.now().isoformat()}", | |
| "", | |
| "## Statistics", | |
| f"- JSONL files processed: {self.stats['sessions']}", | |
| f"- Total entries processed: {self.stats['entries']:,}", | |
| f"- Files with snapshots found: {self.stats['snapshots']}", | |
| f"- File read operations found: {self.stats['file_reads']}", | |
| f"- Files successfully recovered: {len(self.recovered_files)}", | |
| "", | |
| "## Edit Statistics", | |
| f"- Total edits collected: {self.stats['edits_collected']}", | |
| f"- Edits skipped (before snapshot): {self.stats['edits_skipped']}", | |
| f"- Edits applicable (after snapshot): {self.stats['edits_applicable']}", | |
| f"- Edits successfully applied: {self.stats['edits_successful']}", | |
| f"- Edits failed: {self.stats['edits_failed']}", | |
| f"- Conflicts: {self.stats['conflicts']}", | |
| "", | |
| "## Other", | |
| f"- Chat messages extracted: {self.stats['messages']}", | |
| f"- Git commits found: {self.stats['commits']}", | |
| "", | |
| "## Recovered Files" | |
| ] | |
| for file_path in sorted(self.recovered_files.keys()): | |
| size = len(self.recovered_files[file_path]) | |
| snapshot = self.file_snapshots.get(file_path) | |
| snapshot_time = snapshot.timestamp.isoformat() if snapshot else "unknown" | |
| edits_count = len([e for e in self.file_edits.get(file_path, []) | |
| if not snapshot or e.timestamp > snapshot.timestamp]) | |
| report_lines.append(f"- `{file_path}`") | |
| report_lines.append(f" - Size: {size:,} bytes") | |
| report_lines.append(f" - Snapshot from: {snapshot_time}") | |
| report_lines.append(f" - Edits applied: {edits_count}") | |
| if self.stats['conflicts'] > 0: | |
| report_lines.extend([ | |
| "", | |
| "## Conflicts", | |
| f"Total conflicts encountered: {self.stats['conflicts']}", | |
| "Run with --interactive flag to resolve conflicts manually" | |
| ]) | |
| report = "\n".join(report_lines) | |
| if not self.dry_run: | |
| report_path = self.output_dir / "recovery_report.md" | |
| with open(report_path, 'w') as f: | |
| f.write(report) | |
| self.log(f"Recovery report written to {report_path}") | |
| return report | |
| def run(self, filter_pattern: Optional[str] = None): | |
| """Run the complete recovery process""" | |
| self.log("Starting JSONL recovery process...") | |
| # Create output directories | |
| if not self.dry_run: | |
| self.output_dir.mkdir(parents=True, exist_ok=True) | |
| # Phase 1: Extract snapshots | |
| self.phase1_extract_snapshots(filter_pattern) | |
| # Phase 2: Apply edits | |
| self.phase2_apply_edits() | |
| # Phase 3: Extract chat | |
| self.phase3_extract_chat() | |
| # Phase 4: Analyze what we found | |
| self.phase4_analyze_findings() | |
| # Write recovered files | |
| self.write_recovered_files() | |
| # Generate report | |
| self.generate_report() | |
| # Write edit failure report if there were failures | |
| if hasattr(self, 'failed_edits') and self.failed_edits: | |
| self.write_edit_failure_report() | |
| # Summary | |
| self.log("="*60) | |
| self.log("Recovery complete!") | |
| self.log(f"Files recovered: {len(self.recovered_files)}") | |
| self.log(f"File reads found: {self.stats['file_reads']}") | |
| self.log(f"Edits collected: {self.stats['edits_collected']}") | |
| self.log(f" - Skipped (before snapshot): {self.stats['edits_skipped']}") | |
| self.log(f" - Applicable (after snapshot): {self.stats['edits_applicable']}") | |
| self.log(f" - Successfully applied: {self.stats['edits_successful']}") | |
| self.log(f" - Failed to apply: {self.stats['edits_failed']}") | |
| self.log(f"Conflicts: {self.stats['conflicts']}") | |
| self.log(f"Chat messages: {self.stats['messages']}") | |
| self.log(f"Benign entries (summary/system): {self.stats.get('entries_benign', 0)}") | |
| self.log(f"Tool errors: {len(self.tool_errors)}") | |
| if self.tool_rejections > 0: | |
| self.log(f" - User rejections: {self.tool_rejections}") | |
| self.log(f"Unknown entries: {self.stats['entries_unknown']} (see unknown_entries.md for details)") | |
| if hasattr(self, 'parse_errors') and self.parse_errors: | |
| self.log(f" - Parse errors: {len(self.parse_errors)}") | |
| self.log(f"Entries after recovery: {self.stats['entries_after_recovery']}") | |
| if hasattr(self, 'failed_edits') and self.failed_edits: | |
| self.log(f"Edit failures: {len(self.failed_edits)} (see edit_failures.md for details)") | |
| if self.save_rejects and not self.dry_run: | |
| self.log(f"Reject files saved to: {self.reject_dir}") | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Recover files from Claude Code JSONL session files using three-phase approach" | |
| ) | |
| parser.add_argument("--jsonl-dir", required=True, | |
| help="Directory containing JSONL files (required, typically ~/.claude/projects/YOUR_PROJECT)") | |
| parser.add_argument("--output-dir", default="recovered_files", | |
| help="Output directory for recovered files") | |
| parser.add_argument("--chat-dir", | |
| help="Output directory for chat messages (default: output-dir/chat)") | |
| parser.add_argument("--dry-run", action="store_true", | |
| help="Show what would be recovered without writing files") | |
| parser.add_argument("--filter", dest="filter_pattern", | |
| help="Only recover files matching regex pattern") | |
| parser.add_argument("--verbose", action="store_true", | |
| help="Show detailed progress") | |
| parser.add_argument("--interactive", action="store_true", | |
| help="Interactively resolve edit conflicts") | |
| parser.add_argument("--use-earliest-snapshot", action="store_true", | |
| help="Use earliest snapshot instead of latest (applies more edits)") | |
| parser.add_argument("--no-save-rejects", action="store_true", | |
| help="Don't save rejected edits to separate files") | |
| args = parser.parse_args() | |
| if not Path(args.jsonl_dir).exists(): | |
| print(f"ERROR: JSONL directory does not exist: {args.jsonl_dir}") | |
| print("\nTypical locations for Claude Code session files:") | |
| print(" ~/.claude/projects/[YOUR_PROJECT_PATH]/") | |
| print("\nExample:") | |
| print(" python3 claude_code_recovery.py --jsonl-dir ~/.claude/projects/-Users-yourname-myproject") | |
| sys.exit(1) | |
| tool = JSONLRecoveryTool( | |
| jsonl_dir=args.jsonl_dir, | |
| output_dir=args.output_dir, | |
| chat_dir=args.chat_dir, | |
| dry_run=args.dry_run, | |
| verbose=args.verbose, | |
| interactive=args.interactive, | |
| use_earliest_snapshot=args.use_earliest_snapshot, | |
| save_rejects=not args.no_save_rejects | |
| ) | |
| tool.run(filter_pattern=args.filter_pattern) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment