Last active
November 18, 2025 22:51
-
-
Save Xoma163/a9a04ead97a7e11328caeff058a184ce to your computer and use it in GitHub Desktop.
Позволяет переводить переписку из telegram dump json в txt с группировкой сообщений и указанием типов вложений
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import json | |
| from itertools import groupby | |
| from pathlib import Path | |
| from typing import Any | |
| media_type_converter = { | |
| 'sticker': 'Стикер', | |
| 'voice_message': 'Голосовое сообщение', | |
| 'video_message': 'Видеосообщение', | |
| 'video_file': 'Видео', | |
| 'animation': 'Гифка', | |
| 'audio_file': 'Аудио', | |
| } | |
| DEFAULT_INPUT_FILE = 'result.json' | |
| DEFAULT_OUTPUT_FILE = 'result.txt' | |
| BLANK_LINE_ITEMS = [' \n', '\n ', '\n', ''] | |
| MESSAGE_TEMPLATE = "{sender} [{date}]\n{message}" | |
| def open_file(input_file: Path) -> dict[str, Any]: | |
| if not input_file.exists(): | |
| raise FileNotFoundError(f"Входной файл не найден: {input_file}") | |
| try: | |
| data = input_file.read_text(encoding='utf-8') | |
| return json.loads(data) | |
| except json.JSONDecodeError as e: | |
| raise ValueError(f"Некорректный JSON во входном файле {input_file}: {e}") from e | |
| def clear_messages(messages: list[dict]) -> list[dict]: | |
| return [m for m in messages if m.get("type") != "service"] | |
| def group_messages(messages: list[dict]) -> list[tuple[Any, list[dict]]]: | |
| """ | |
| Группировка сообщений по отправителю | |
| """ | |
| return list((group, list(items)) for group, items in groupby(messages, lambda x: x["from_id"])) | |
| def _extract_text_items(text) -> list[str]: | |
| """ | |
| Рекурсивная обработка поля текста в сообщении | |
| """ | |
| items: list[str] = [] | |
| if isinstance(text, str): | |
| if text_strip := text.strip(): | |
| items.append(text_strip) | |
| elif isinstance(text, list): | |
| for item in text: | |
| if isinstance(item, str): | |
| s = item.strip() | |
| if s: | |
| items.append(s) | |
| elif isinstance(item, dict): | |
| _text = item.get('text') | |
| if isinstance(_text, str): | |
| if text_strip := _text.strip(): | |
| items.append(text_strip) | |
| elif isinstance(item, list): | |
| items.extend(_extract_text_items(item)) | |
| return items | |
| def _process_message(message: dict, message_list: list) -> None: | |
| """ | |
| Обработка одного сообщения | |
| """ | |
| # Если сообщение было переслано | |
| if message.get('forwarded_from'): | |
| message_list.append(f"\nПересланное сообщение от {message.get('forwarded_from')}:") | |
| # Обработка текста | |
| if parts := _extract_text_items(message.get('text')): | |
| message_list.append("\n".join(parts)) | |
| # Обработка вложений | |
| if media_type := message.get('media_type'): | |
| message_list.append(f"*{media_type_converter[media_type]}*") | |
| elif message.get('photo'): | |
| message_list.append('*Картинка*') | |
| elif poll := message.get('poll'): | |
| poll_answers = "\n".join([f"Вариант ответа: {x['text']}" for x in message['poll']['answers']]) | |
| message_list.append(f"*Опрос*\nВопрос:{poll['question']}\n{poll_answers}\n") | |
| def get_str_messages(grouped_messages: list[tuple[Any, list[dict]]]) -> str: | |
| conversation = [] | |
| for _, messages in grouped_messages: | |
| message_list = [] | |
| for message in messages: | |
| _process_message(message, message_list) | |
| first_message = messages[0] | |
| result_msg = MESSAGE_TEMPLATE.format( | |
| sender=first_message['from'], | |
| date=first_message['date'], | |
| message="\n".join(message_list) | |
| ) | |
| conversation.append(result_msg) | |
| return "\n\n".join(conversation) | |
| def write_to_file(output_file: Path, content: str) -> None: | |
| output_file.write_text(content, encoding='utf-8') | |
| def parse_cli() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser( | |
| description="Конвертирует JSON в TXT группируя сообщения" | |
| ) | |
| parser.add_argument( | |
| "-i", | |
| "--input_file", | |
| type=Path, | |
| default=DEFAULT_INPUT_FILE, | |
| help="Входной файл json (default: %(default)s)", | |
| ) | |
| parser.add_argument( | |
| "-o", | |
| "-o", | |
| "--output_file", | |
| type=Path, | |
| default=DEFAULT_OUTPUT_FILE, | |
| help="Выходной файл txt (default: %(default)s)" | |
| ) | |
| return parser.parse_args() | |
| def main() -> None: | |
| args = parse_cli() | |
| print(f"Начало обработки") | |
| json_data = open_file(args.input_file) | |
| messages = json_data['messages'] | |
| print(f"Всего сообщений - {len(messages)}") | |
| cleared_messages = clear_messages(messages) | |
| grouped_messages = group_messages(cleared_messages) | |
| print(f"Сгруппированных сообщений - {len(grouped_messages)}") | |
| str_messages = get_str_messages(grouped_messages) | |
| write_to_file(args.output_file, str_messages) | |
| print(f"Файл {args.output_file} успешно записан") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment