Created
October 22, 2025 20:51
-
-
Save benstein/f1ea37d1c3163da23e84e3ec1dbfe8d0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Outlook Express DBX Email Extractor | |
| Improved algorithm to extract email bodies more reliably | |
| """ | |
| import struct | |
| import sys | |
| import re | |
| from datetime import datetime | |
| def read_dbx_header(f): | |
| """Read and validate the DBX file header""" | |
| # DBX files start with signature: CF AD 12 FE | |
| signature = f.read(4) | |
| if signature != b'\xcf\xad\x12\xfe': | |
| raise ValueError("Not a valid DBX file - incorrect signature") | |
| # Read additional header info | |
| header_data = f.read(12) | |
| return True | |
| def extract_string(data, start): | |
| """Extract a null-terminated or length-prefixed string from binary data""" | |
| end = data.find(b'\x00', start) | |
| if end == -1: | |
| return "" | |
| try: | |
| return data[start:end].decode('latin-1', errors='ignore') | |
| except: | |
| return "" | |
| def find_email_messages(data): | |
| """ | |
| Extract email messages from DBX file by finding complete message blocks. | |
| DBX stores emails as complete RFC822 messages in the file. | |
| """ | |
| messages = [] | |
| # Look for patterns that indicate the start of email message data | |
| # Common patterns: "Return-Path:", "Received:", "From:", etc. | |
| patterns = [ | |
| (b'Return-Path:', 'Return-Path'), | |
| (b'Received: from', 'Received'), | |
| (b'From: ', 'From'), | |
| (b'Message-ID:', 'Message-ID'), | |
| ] | |
| potential_starts = [] | |
| for pattern, name in patterns: | |
| pos = 0 | |
| while True: | |
| pos = data.find(pattern, pos) | |
| if pos == -1: | |
| break | |
| # Look backwards to find the real start of the email | |
| # (often there are some bytes before the headers) | |
| real_start = pos | |
| # Check if there's a reasonable amount of data before this | |
| lookback = max(0, pos - 200) | |
| chunk_before = data[lookback:pos] | |
| # Find the last occurrence of double-null or similar boundary | |
| for i in range(len(chunk_before) - 1, -1, -1): | |
| if chunk_before[i:i+2] == b'\x00\x00' or chunk_before[i:i+4] == b'\x00' * 4: | |
| real_start = lookback + i + 2 | |
| break | |
| potential_starts.append((real_start, pos, name)) | |
| pos += 1 | |
| # Sort by position and remove duplicates that are too close | |
| potential_starts.sort() | |
| # Filter out starts that are too close together (within 50 bytes) | |
| filtered_starts = [] | |
| last_pos = -1000 | |
| for start, marker_pos, name in potential_starts: | |
| if marker_pos - last_pos > 50: | |
| filtered_starts.append((start, marker_pos, name)) | |
| last_pos = marker_pos | |
| print(f"Found {len(filtered_starts)} potential email starts") | |
| # Extract messages | |
| for i, (start, marker_pos, name) in enumerate(filtered_starts): | |
| # Determine the end of this message | |
| if i < len(filtered_starts) - 1: | |
| # Next message starts here | |
| end = filtered_starts[i + 1][0] | |
| else: | |
| # Last message - search for reasonable end | |
| end = len(data) | |
| # Extract the raw message data | |
| raw_message = data[start:end] | |
| # Clean up: find where the actual email headers begin | |
| # Look for the first occurrence of a valid email header line | |
| header_start = 0 | |
| for pattern, _ in patterns: | |
| pos = raw_message.find(pattern) | |
| if pos != -1 and pos < 200: | |
| header_start = pos | |
| break | |
| if header_start > 0: | |
| raw_message = raw_message[header_start:] | |
| # Parse this message | |
| try: | |
| email_data = parse_raw_email(raw_message) | |
| if email_data and (email_data.get('from') or email_data.get('subject')): | |
| messages.append(email_data) | |
| print(f"Extracted email {len(messages)}: {email_data.get('subject', '(no subject)')[:60]}") | |
| except Exception as e: | |
| continue | |
| return messages | |
| def parse_raw_email(raw_data): | |
| """Parse raw email data into structured format""" | |
| # Convert to string for easier parsing | |
| try: | |
| full_text = raw_data.decode('latin-1', errors='ignore') | |
| except: | |
| full_text = str(raw_data) | |
| # Parse headers | |
| email = { | |
| 'from': '', | |
| 'to': '', | |
| 'date': '', | |
| 'subject': '', | |
| 'body': '' | |
| } | |
| # Extract header fields using regex | |
| # From: | |
| match = re.search(r'^From:\s*(.+?)(?:\r?\n(?!\s)|$)', full_text, re.MULTILINE | re.IGNORECASE) | |
| if match: | |
| email['from'] = match.group(1).strip() | |
| # To: | |
| match = re.search(r'^To:\s*(.+?)(?:\r?\n(?!\s)|$)', full_text, re.MULTILINE | re.IGNORECASE) | |
| if match: | |
| email['to'] = match.group(1).strip() | |
| # Date: | |
| match = re.search(r'^Date:\s*(.+?)(?:\r?\n(?!\s)|$)', full_text, re.MULTILINE | re.IGNORECASE) | |
| if match: | |
| email['date'] = match.group(1).strip() | |
| # Subject: | |
| match = re.search(r'^Subject:\s*(.+?)(?:\r?\n(?!\s)|$)', full_text, re.MULTILINE | re.IGNORECASE) | |
| if match: | |
| email['subject'] = match.group(1).strip() | |
| # Find the end of headers - look for Content-Type followed by blank line, or just blank line | |
| # Headers end at first blank line after the main headers | |
| header_end = -1 | |
| # Strategy 1: Look for Content-Type or Content-Transfer-Encoding followed by double newline | |
| patterns_to_try = [ | |
| r'Content-Type:[^\n]+\n(?:Content-Transfer-Encoding:[^\n]+\n)?\s*\n', | |
| r'MIME-Version:[^\n]+\n(?:Content-Type:[^\n]+\n)?(?:Content-Transfer-Encoding:[^\n]+\n)?\s*\n', | |
| r'Message-ID:[^\n]+\n\s*\n', # Sometimes body comes right after Message-ID | |
| r'\nSubject:[^\n]+\n\s*\n', # Or right after Subject | |
| ] | |
| for pattern in patterns_to_try: | |
| match = re.search(pattern, full_text, re.IGNORECASE | re.MULTILINE) | |
| if match: | |
| header_end = match.end() | |
| break | |
| # If we found a header end, extract the body | |
| if header_end > 0 and header_end < len(full_text): | |
| body_text = full_text[header_end:] | |
| # Find where the body actually ends (before "Status:" or next email marker) | |
| body_end_markers = [ | |
| '\nStatus: ', | |
| '\nReturn-Path:', | |
| '\x00\x00\x00\x00', | |
| ] | |
| body_end = len(body_text) | |
| for marker in body_end_markers: | |
| pos = body_text.find(marker) | |
| if pos > 0 and pos < body_end: | |
| body_end = pos | |
| body_text = body_text[:body_end] | |
| # Clean body text | |
| body_text = clean_body_text(body_text) | |
| email['body'] = body_text.strip() | |
| return email | |
| def clean_body_text(text): | |
| """Clean up body text by removing binary garbage""" | |
| if not text: | |
| return "" | |
| # Remove common binary markers | |
| text = re.sub(r'ࡱ.*?(?=\n|$)', '', text, flags=re.DOTALL) | |
| text = re.sub(r'Microsoft Forms.*?Forms\.Frame\.1', '', text, flags=re.DOTALL) | |
| text = re.sub(r'Embedded Object', '', text) | |
| text = re.sub(r'VERSION 5\.00.*?Begin', '', text, flags=re.DOTALL) | |
| # Remove lines with too many control characters | |
| lines = text.split('\n') | |
| cleaned_lines = [] | |
| for line in lines: | |
| if not line.strip(): | |
| cleaned_lines.append('') | |
| continue | |
| # Count printable characters | |
| printable = sum(1 for c in line if 32 <= ord(c) <= 126 or c in '\t\n\r') | |
| total = len(line) | |
| if total > 0 and (printable / total) >= 0.6: | |
| # Remove non-printable chars | |
| cleaned = ''.join(c if (32 <= ord(c) <= 126 or c in '\t\n\r') else '' for c in line) | |
| if cleaned.strip(): | |
| cleaned_lines.append(cleaned.rstrip()) | |
| # Join and clean up | |
| text = '\n'.join(cleaned_lines) | |
| # Remove excessive blank lines | |
| text = re.sub(r'\n{3,}', '\n\n', text) | |
| # Remove common OE artifacts | |
| text = re.sub(r'X-MimeOLE:.*?(?=\n)', '', text) | |
| text = re.sub(r'Content-Type:.*?(?=\n)', '', text) | |
| text = re.sub(r'Content-Transfer-Encoding:.*?(?=\n)', '', text) | |
| return text.strip() | |
| def parse_dbx_file(filename): | |
| """Parse a DBX file and extract all emails""" | |
| print(f"Opening DBX file: {filename}") | |
| with open(filename, 'rb') as f: | |
| # Validate header | |
| try: | |
| read_dbx_header(f) | |
| print("Valid DBX file header found") | |
| except ValueError as e: | |
| print(f"Warning: {e}") | |
| print("Attempting to parse anyway...") | |
| f.seek(0) | |
| # Read the entire file | |
| data = f.read() | |
| print(f"Read {len(data)} bytes from file") | |
| # Extract emails | |
| emails = find_email_messages(data) | |
| return emails | |
| def write_emails_to_text(emails, output_filename): | |
| """Write extracted emails to a text file""" | |
| print(f"\nWriting {len(emails)} emails to {output_filename}") | |
| with open(output_filename, 'w', encoding='utf-8') as f: | |
| for idx, email in enumerate(emails, 1): | |
| f.write(f"{'='*80}\n") | |
| f.write(f"EMAIL #{idx}\n") | |
| f.write(f"{'='*80}\n\n") | |
| # Write fields | |
| f.write(f"From: {email.get('from', '')}\n") | |
| f.write(f"To: {email.get('to', '')}\n") | |
| f.write(f"Date: {email.get('date', '')}\n") | |
| f.write(f"Subject: {email.get('subject', '(no subject)')}\n\n") | |
| # Write body | |
| body = email.get('body', '') | |
| if body: | |
| f.write(body) | |
| f.write(f"\n\n") | |
| print(f"Successfully wrote emails to {output_filename}") | |
| def main(): | |
| if len(sys.argv) < 2: | |
| print("Usage: python dbx_parser_v2.py <dbx_file>") | |
| sys.exit(1) | |
| dbx_file = sys.argv[1] | |
| output_file = dbx_file.rsplit('.', 1)[0] + '.txt' | |
| print(f"DBX Email Extractor v2") | |
| print(f"{'='*80}\n") | |
| # Parse the DBX file | |
| emails = parse_dbx_file(dbx_file) | |
| if not emails: | |
| print("\nNo emails found in the DBX file.") | |
| sys.exit(1) | |
| # Write to output file | |
| write_emails_to_text(emails, output_file) | |
| print(f"\n{'='*80}") | |
| print(f"Extraction complete! Found {len(emails)} emails.") | |
| print(f"Output written to: {output_file}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment