Created
June 11, 2025 07:38
-
-
Save wooparadog/a4f85c276d45cd6c04468a15a3450d4b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import mailbox | |
| import json | |
| from email.header import decode_header, make_header | |
| from email.utils import parsedate_to_datetime | |
| def get_decoded_header(header): | |
| """Decodes an email header to a string.""" | |
| if header is None: | |
| return "" | |
| # The make_header function ensures that different encoding parts are handled correctly. | |
| decoded_header = make_header(decode_header(header)) | |
| return str(decoded_header) | |
| def get_body(message): | |
| """ | |
| Extracts the plain text body from an email message. | |
| Handles multipart messages. | |
| """ | |
| if message.is_multipart(): | |
| # Walk through all parts of the message to find the plain text part. | |
| for part in message.walk(): | |
| content_type = part.get_content_type() | |
| content_disposition = str(part.get("Content-Disposition")) | |
| # The 'text/plain' part without an 'attachment' disposition is the body. | |
| if content_type == "text/plain" and "attachment" not in content_disposition: | |
| try: | |
| return part.get_payload(decode=True).decode("utf-8", "ignore") | |
| except Exception: | |
| # Fallback to another common encoding if utf-8 fails. | |
| return part.get_payload(decode=True).decode("latin-1", "ignore") | |
| else: | |
| # For non-multipart messages, get the payload directly. | |
| try: | |
| return message.get_payload(decode=True).decode("utf-8", "ignore") | |
| except Exception: | |
| # Fallback to another common encoding if utf-8 fails. | |
| return message.get_payload(decode=True).decode("latin-1", "ignore") | |
| return "" | |
| def extract_topics_from_mbox(mbox_file): | |
| """ | |
| Extracts topics and their corresponding emails from an mbox file. | |
| Args: | |
| mbox_file (str): The path to the mbox file. | |
| Returns: | |
| list: A list of dictionaries, where each dictionary represents a topic. | |
| """ | |
| mbox = mailbox.mbox(mbox_file) | |
| topics = {} | |
| for message in mbox: | |
| # Use 'X-GM-THRID' as the topic identifier for Google Groups, which is a thread ID. | |
| topic_id = message.get("X-GM-THRID") | |
| if not topic_id: | |
| # If 'X-GM-THRID' isn't present, fallback to using the subject line to group messages. | |
| # This is less reliable for threads with subject changes. | |
| topic_id = get_decoded_header(message.get("Subject")) | |
| if not topic_id.strip(): | |
| # If both are missing, this message cannot be grouped and is skipped. | |
| continue | |
| if topic_id not in topics: | |
| topics[topic_id] = { | |
| "topic_id": topic_id, | |
| "topic_title": get_decoded_header(message.get("Subject")), | |
| "letters": [], | |
| } | |
| # Parse the date string and format it to an ISO string. | |
| sent_datetime_str = "" | |
| date_header = message.get("Date") | |
| if date_header: | |
| try: | |
| # parsedate_to_datetime handles various email date formats. | |
| dt_obj = parsedate_to_datetime(date_header) | |
| sent_datetime_str = dt_obj.isoformat() | |
| except Exception: | |
| # If parsing fails, keep it as an empty string. | |
| sent_datetime_str = "" | |
| letter = { | |
| "sender": get_decoded_header(message.get("From")), | |
| "subject": get_decoded_header(message.get("Subject")), | |
| "sent_datetime": sent_datetime_str, | |
| "body": get_body(message), | |
| } | |
| topics[topic_id]["letters"].append(letter) | |
| return list(topics.values()) | |
| if __name__ == "__main__": | |
| # Replace 'topics.mbox' with the path to your mbox file | |
| mbox_file_path = "topics.mbox" | |
| try: | |
| extracted_data = extract_topics_from_mbox(mbox_file_path) | |
| # Output the data as a JSON object | |
| with open("topics.json", "w", encoding="utf-8") as json_file: | |
| json.dump(extracted_data, json_file, indent=4, ensure_ascii=False) | |
| print("Successfully extracted topics to topics.json") | |
| except FileNotFoundError: | |
| print(f"Error: The file '{mbox_file_path}' was not found.") | |
| except Exception as e: | |
| print(f"An error occurred: {e}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment