Last active
June 2, 2025 06:04
-
-
Save mutaguchi/6a02a214a424099fec9eca193a5f8655 to your computer and use it in GitHub Desktop.
QR Video Converter - Text to QR Code Video (MP4) and Back
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import random | |
| import sys | |
| import argparse | |
| import tempfile | |
| import shutil | |
| import threading | |
| import time | |
| import json | |
| import qrcode | |
| import ffmpeg | |
| import cv2 | |
| import re | |
| # Try importing MeCab (optional) | |
| try: | |
| import MeCab | |
| MECAB_AVAILABLE = True | |
| except ImportError: | |
| MECAB_AVAILABLE = False | |
| MeCab = None | |
| """ | |
| QR Video Converter | |
| This script converts text files to QR code videos (MP4) and vice versa. | |
| Inspired by https://github.com/Olow304/memvid for proof-of-concept of text-to-QR video conversion. | |
| Requirements: | |
| - pip install qrcode ffmpeg-python opencv-python | |
| Typical usage: | |
| python qr_video_converter.py input.txt # Convert text to input.qr.mp4 | |
| python qr_video_converter.py --sample # Generate sample file | |
| python qr_video_converter.py input.qr.mp4 # Output all content to stdout | |
| python qr_video_converter.py input.qr.mp4 -s python # Search for 'python' and output matching frames as JSON | |
| python qr_video_converter.py input.qr.mp4 -o # Output to input.decoded.txt file | |
| Note: This is a proof-of-concept tool. The resulting video files are typically 2-3x larger | |
| than the original text files. This tool is not intended for practical use but rather | |
| to demonstrate the feasibility of text-to-QR video conversion. | |
| --- | |
| QR Video Converter (日本語版) | |
| このスクリプトはテキストファイルをQRコード動画(MP4)に変換したり、その逆変換を行います。 | |
| テキストからQRコード動画への変換の概念実証として、https://github.com/Olow304/memvid からインスピレーションを得ています。 | |
| 必要なライブラリ: | |
| - pip install qrcode ffmpeg-python opencv-python | |
| - 日本語検索機能を使用する場合: pip install mecab-python3 unidic-lite | |
| 基本的な使用方法: | |
| python qr_video_converter.py input.txt # テキストをinput.qr.mp4に変換 | |
| python qr_video_converter.py --sample # サンプルファイルを生成して変換 | |
| python qr_video_converter.py input.qr.mp4 # 全内容を標準出力に出力 | |
| python qr_video_converter.py input.qr.mp4 -s python # 'python'を検索してマッチしたフレームをJSONで出力 | |
| python qr_video_converter.py input.qr.mp4 -o # input.decoded.txtファイルに出力 | |
| 注意: これは概念実証ツールです。生成される動画ファイルは通常、元のテキストファイルの2-3倍のサイズになります。 | |
| このツールは実用的な使用を目的とせず、テキストからQRコード動画への変換の実現可能性を示すものです。 | |
| """ | |
| def process_text_to_mp4(input_file_path, output_file_path, crf=30, quick_verify=False, chunk_size=2800, overlap_size=100): | |
| """Function to convert text file to QR code video""" | |
| # Display input file information | |
| original_size = os.path.getsize(input_file_path) | |
| file_name = os.path.basename(input_file_path) | |
| with open(input_file_path, "r", encoding="utf-8") as f: | |
| text_content = f.read() | |
| char_count = len(text_content) | |
| print(f"\n=== Input File Information ===") | |
| print(f"File name: {file_name}") | |
| print(f"File size: {original_size:,} bytes") | |
| print(f"Character count: {char_count:,} characters") | |
| # Split into chunks <= 2800 bytes with 100-byte overlap on each side | |
| def split_utf8_chunks(data_bytes, chunk_size, overlap_size): | |
| chunks = [] | |
| pos = 0 | |
| length = len(data_bytes) | |
| # Adjust chunk size to accommodate overlap areas (2800 - 200 = 2600) | |
| actual_chunk_size = chunk_size - (overlap_size * 2) | |
| while pos < length: | |
| # Calculate chunk boundaries | |
| # Start with overlap from previous chunk | |
| chunk_start = max(0, pos - overlap_size) | |
| chunk_end = min(pos + actual_chunk_size, length) | |
| # Add overlap to the end (if not at file end) | |
| if chunk_end < length: | |
| overlap_end = min(chunk_end + overlap_size, length) | |
| else: | |
| overlap_end = chunk_end | |
| # Ensure we don't break UTF-8 characters at boundaries | |
| while overlap_end > chunk_start: | |
| try: | |
| chunk = data_bytes[chunk_start:overlap_end].decode("utf-8") | |
| break | |
| except UnicodeDecodeError: | |
| overlap_end -= 1 | |
| if overlap_end <= chunk_start: | |
| # Fallback: if we can't decode, move to next position | |
| pos += 1 | |
| continue | |
| chunks.append(chunk) | |
| pos += actual_chunk_size | |
| return chunks | |
| # Create temporary directory | |
| temp_dir = tempfile.mkdtemp(prefix="qr_") | |
| try: | |
| # Get base name from original file name | |
| base_name = os.path.splitext(os.path.basename(input_file_path))[0] | |
| # Read text file | |
| with open(input_file_path, "rb") as f: | |
| data_bytes = f.read() | |
| chunks = split_utf8_chunks(data_bytes, chunk_size, overlap_size) | |
| print(f"Number of chunks: {len(chunks)}") | |
| print(f"Chunk size: {chunk_size} bytes") | |
| print(f"Overlap size: {overlap_size} bytes") | |
| print(f"\n=== Generating QR Codes ===") | |
| # Generate QR images | |
| for idx, chunk in enumerate(chunks, start=1): | |
| frame_path = os.path.join( | |
| temp_dir, f"{base_name}_frame_{idx:03d}.png") | |
| qr = qrcode.QRCode(version=40, error_correction=qrcode.constants.ERROR_CORRECT_L, | |
| box_size=1, border=4) | |
| qr.add_data(chunk) | |
| qr.make(fit=False) | |
| img = qr.make_image(fill_color="black", back_color="white") | |
| img.save(frame_path) | |
| # Progress display | |
| progress = idx / len(chunks) * 100 | |
| bar_length = 30 | |
| filled_length = int(bar_length * idx // len(chunks)) | |
| bar = '█' * filled_length + '-' * (bar_length - filled_length) | |
| print(f"\r[{bar}] {progress:.1f}% ({idx}/{len(chunks)})", | |
| end='', flush=True) | |
| print() | |
| print("QR code generation completed") | |
| # Encode to HEVC with padding to even dimensions (width,height) | |
| input_pattern = os.path.join(temp_dir, f'{base_name}_frame_%03d.png') | |
| progress_file = os.path.join(temp_dir, 'progress.txt') | |
| print(f"\n=== Video Encoding ===") | |
| encoding_finished = threading.Event() | |
| def monitor_progress(): | |
| """Monitor ffmpeg progress and display progress bar""" | |
| total_frames = len(chunks) | |
| last_frame = 0 | |
| while not encoding_finished.is_set(): | |
| try: | |
| if os.path.exists(progress_file): | |
| with open(progress_file, 'r') as f: | |
| lines = f.readlines() | |
| current_frame = 0 | |
| for line in lines: | |
| if line.startswith('frame='): | |
| try: | |
| current_frame = int( | |
| line.split('=')[1].strip()) | |
| except: | |
| pass | |
| if current_frame > last_frame: | |
| last_frame = current_frame | |
| progress = min(current_frame / | |
| total_frames * 100, 100) | |
| bar_length = 30 | |
| filled_length = int( | |
| bar_length * current_frame // total_frames) | |
| bar = '█' * filled_length + '-' * \ | |
| (bar_length - filled_length) | |
| print(f"\r[{bar}] {progress:.1f}% ({current_frame}/{total_frames})", | |
| end='', flush=True) | |
| time.sleep(0.5) | |
| except: | |
| pass | |
| # Start progress monitoring thread | |
| progress_thread = threading.Thread(target=monitor_progress) | |
| progress_thread.start() | |
| try: | |
| ( | |
| ffmpeg | |
| .input(input_pattern, framerate=30) | |
| .filter('pad', 'iw+mod(iw,2)', 'ih+mod(ih,2)', color='white') | |
| .output( | |
| output_file_path, | |
| vcodec='libx265', | |
| crf=crf, | |
| preset='veryslow', | |
| pix_fmt='gray', | |
| **{ | |
| 'x265-params': | |
| 'profile=monochrome:' | |
| 'keyint=120:min-keyint=120:' | |
| 'scenecut=0:bframes=8:' | |
| 'sao=0' | |
| } | |
| ) | |
| .global_args('-progress', progress_file) | |
| .overwrite_output() | |
| .run(quiet=True) | |
| ) | |
| finally: | |
| # End progress monitoring | |
| encoding_finished.set() | |
| progress_thread.join() | |
| # Display final progress at 100% | |
| bar_length = 30 | |
| bar = '█' * bar_length | |
| total_frames = len(chunks) | |
| print(f"\r[{bar}] 100.0% ({total_frames}/{total_frames})", flush=True) | |
| print() | |
| print("Video encoding completed") | |
| # Generate search index | |
| print(f"\n=== Generating Search Index ===") | |
| json_file_path = output_file_path.replace('.qr.mp4', '.qr.json') | |
| generate_search_index(chunks, json_file_path, chunk_size, overlap_size) | |
| print(f"Search index generated: {json_file_path}") | |
| # Execute QR code reading verification | |
| try: | |
| verify_qr_frames(output_file_path, chunks, | |
| quick_verify=quick_verify) | |
| except Exception as e: | |
| print(f"\nError: QR code verification failed: {e}") | |
| print("Deleting generated MP4 file...") | |
| if os.path.exists(output_file_path): | |
| os.remove(output_file_path) | |
| if os.path.exists(json_file_path): | |
| os.remove(json_file_path) | |
| sys.exit(1) | |
| finally: | |
| # Delete temporary directory and its contents | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| # Display file size comparison | |
| output_size = os.path.getsize(output_file_path) | |
| size_ratio = output_size / original_size | |
| print(f"\n=== File Size Comparison ===") | |
| print(f"Original file: {original_size:,} bytes") | |
| print(f"MP4 file: {output_size:,} bytes") | |
| print(f"Size ratio: {size_ratio:.2f}x") | |
| return output_file_path | |
| def has_japanese_text(text): | |
| """Determine if text contains Japanese characters""" | |
| return bool(re.search(r'[\u3040-\u309F\u30A0-\u30FF\u4E00-\u9FAF]', text)) | |
| def extract_words_from_text(text): | |
| """Extract words from text (use MeCab for Japanese if available)""" | |
| if has_japanese_text(text) and MECAB_AVAILABLE: | |
| # Morphological analysis using MeCab | |
| try: | |
| tagger = MeCab.Tagger() | |
| result = tagger.parse(text) | |
| words = [re.split('[\t]', x)[0] for x in result.splitlines()][:-1] | |
| # Exclude empty strings and symbol-only words | |
| words = [word for word in words if word and re.search( | |
| r'[a-zA-Z0-9\u3040-\u309F\u30A0-\u30FF\u4E00-\u9FAF]', word)] | |
| return words | |
| except Exception: | |
| # Fallback to regex if MeCab error occurs | |
| pass | |
| # Word extraction using regex (traditional method) | |
| words = re.findall( | |
| r'[a-zA-Z0-9\u3040-\u309F\u30A0-\u30FF\u4E00-\u9FAF]+', text, re.UNICODE) | |
| return words | |
| def generate_search_index(chunks, json_file_path, chunk_size=2800, overlap_size=100): | |
| """Generate search index from text chunks with metadata""" | |
| # Change to structure including metadata and index | |
| index_data = { | |
| "metadata": { | |
| "chunk_size": chunk_size, | |
| "overlap_size": overlap_size, | |
| "total_chunks": len(chunks), | |
| "actual_chunk_size": chunk_size - (overlap_size * 2) | |
| }, | |
| "word_index": {} | |
| } | |
| word_index = index_data["word_index"] | |
| for frame_id, chunk in enumerate(chunks, start=1): | |
| # Extract words using MeCab if Japanese text is present, otherwise use regex | |
| words = extract_words_from_text(chunk) | |
| for word in words: | |
| word_lower = word.lower() | |
| if word_lower not in word_index: | |
| word_index[word_lower] = [] | |
| if frame_id not in word_index[word_lower]: | |
| word_index[word_lower].append(frame_id) | |
| # Sort frame IDs for each word | |
| for word in word_index: | |
| word_index[word].sort() | |
| with open(json_file_path, 'w', encoding='utf-8') as f: | |
| json.dump(index_data, f, ensure_ascii=False, indent=2) | |
| def search_in_index(json_file_path, search_query): | |
| """Search for words in index file and return frame IDs with AND/OR logic""" | |
| if not os.path.exists(json_file_path): | |
| raise FileNotFoundError(f"Index file not found: {json_file_path}") | |
| with open(json_file_path, 'r', encoding='utf-8') as f: | |
| index_data = json.load(f) | |
| # Support for new and old formats | |
| if "word_index" in index_data: | |
| word_index = index_data["word_index"] | |
| else: | |
| # Old format (backward compatibility) | |
| word_index = index_data | |
| def find_word_frames(word): | |
| """Find frames for a word using exact match first, then partial match""" | |
| word_lower = word.lower() | |
| # First try exact match | |
| if word_lower in word_index: | |
| return set(word_index[word_lower]) | |
| # If no exact match, try partial match | |
| result_frames = set() | |
| for indexed_word in word_index: | |
| if word_lower in indexed_word: # Partial match | |
| result_frames.update(word_index[indexed_word]) | |
| return result_frames | |
| # Parse search query | |
| if " OR " in search_query: | |
| # OR search | |
| words = [word.strip() for word in search_query.split(" OR ")] | |
| result_frames = set() | |
| for word in words: | |
| # Apply morphological analysis to each word | |
| if has_japanese_text(word) and MECAB_AVAILABLE: | |
| try: | |
| sub_words = extract_words_from_text(word) | |
| # Process as AND search when morphologically analyzed | |
| word_frames = None | |
| for sub_word in sub_words: | |
| sub_frames = find_word_frames(sub_word) | |
| if word_frames is None: | |
| word_frames = sub_frames | |
| else: | |
| word_frames = word_frames.intersection(sub_frames) | |
| if word_frames: | |
| result_frames.update(word_frames) | |
| except Exception: | |
| # Search with original word if MeCab error occurs | |
| word_frames = find_word_frames(word) | |
| result_frames.update(word_frames) | |
| else: | |
| word_frames = find_word_frames(word) | |
| result_frames.update(word_frames) | |
| return sorted(list(result_frames)) | |
| else: | |
| # AND search (default - space-separated words) | |
| words = [word.strip() for word in search_query.split()] | |
| if not words: | |
| return [] | |
| # Apply morphological analysis to entire search query | |
| if has_japanese_text(search_query) and MECAB_AVAILABLE: | |
| try: | |
| morphed_words = extract_words_from_text(search_query) | |
| if morphed_words: | |
| words = morphed_words | |
| except Exception: | |
| # Use original word list if MeCab error occurs | |
| pass | |
| # Start with frames containing the first word | |
| result_frames = find_word_frames(words[0]) | |
| # Intersect with frames containing each subsequent word | |
| for word in words[1:]: | |
| word_frames = find_word_frames(word) | |
| result_frames = result_frames.intersection(word_frames) | |
| return sorted(list(result_frames)) | |
| def reconstruct_text_from_chunks(chunks, chunk_size=2800, overlap_size=100): | |
| """Reconstruct original text from chunks by removing overlaps""" | |
| if not chunks: | |
| return "" | |
| if len(chunks) == 1: | |
| return chunks[0]["data"] | |
| # Actual chunk size (excluding overlap portions) | |
| actual_chunk_size = chunk_size - (overlap_size * 2) | |
| result_text = "" | |
| for i, chunk in enumerate(chunks): | |
| chunk_data = chunk["data"] | |
| if i == 0: | |
| # First chunk: remove only the rear overlap | |
| if len(chunks) == 1: | |
| # If there's only one chunk, use the entire chunk | |
| result_text = chunk_data | |
| else: | |
| # Remove rear overlap | |
| chunk_bytes = chunk_data.encode('utf-8') | |
| if len(chunk_bytes) > overlap_size: | |
| # Safely trim considering UTF-8 boundaries | |
| target_end = len(chunk_bytes) - overlap_size | |
| # Move backward to UTF-8 character boundary | |
| while target_end > 0: | |
| try: | |
| result_text = chunk_bytes[:target_end].decode( | |
| 'utf-8') | |
| break | |
| except UnicodeDecodeError: | |
| target_end -= 1 | |
| else: | |
| result_text = chunk_data | |
| elif i == len(chunks) - 1: | |
| # Last chunk: remove only the front overlap | |
| chunk_bytes = chunk_data.encode('utf-8') | |
| if len(chunk_bytes) > overlap_size: | |
| # Safely trim considering UTF-8 boundaries | |
| target_start = overlap_size | |
| # Move forward to UTF-8 character boundary | |
| while target_start < len(chunk_bytes): | |
| try: | |
| clean_data = chunk_bytes[target_start:].decode('utf-8') | |
| result_text += clean_data | |
| break | |
| except UnicodeDecodeError: | |
| target_start += 1 | |
| else: | |
| result_text += chunk_data | |
| else: | |
| # Middle chunk: remove both front and rear overlaps | |
| chunk_bytes = chunk_data.encode('utf-8') | |
| if len(chunk_bytes) > overlap_size * 2: | |
| # Remove front overlap | |
| target_start = overlap_size | |
| while target_start < len(chunk_bytes): | |
| try: | |
| temp_data = chunk_bytes[target_start:].decode('utf-8') | |
| break | |
| except UnicodeDecodeError: | |
| target_start += 1 | |
| # Remove rear overlap | |
| target_end = len(chunk_bytes) - overlap_size | |
| while target_end > target_start: | |
| try: | |
| clean_data = chunk_bytes[target_start:target_end].decode( | |
| 'utf-8') | |
| result_text += clean_data | |
| break | |
| except UnicodeDecodeError: | |
| target_end -= 1 | |
| else: | |
| # Skip if chunk is too small | |
| continue | |
| return result_text | |
| def create_sample_file(output_dir): | |
| """Function to create sample text file""" | |
| # Generate diverse sample text for search testing | |
| sample_texts = [ | |
| "Python programming language is powerful and versatile. ", | |
| "Machine learning algorithms require large datasets for training. ", | |
| "Artificial intelligence will transform our society. ", | |
| "Data science involves statistics, programming, and domain expertise. ", | |
| "Computer vision enables machines to interpret visual information. ", | |
| "Natural language processing helps computers understand human language. ", | |
| "Deep learning neural networks can solve complex problems. ", | |
| "Software engineering practices ensure code quality and maintainability. ", | |
| "Database management systems store and retrieve information efficiently. ", | |
| "Web development frameworks simplify building online applications. ", | |
| "Cloud computing provides scalable infrastructure for modern applications. ", | |
| "Cybersecurity protects digital assets from malicious attacks. ", | |
| "Blockchain technology enables decentralized and secure transactions. ", | |
| "Internet of Things connects everyday objects to the digital world. ", | |
| "Virtual reality creates immersive digital experiences. ", | |
| ] | |
| target_bytes = 1024 * 100 # 100KB | |
| text = "" | |
| while len(text.encode("utf-8")) < target_bytes: | |
| text += random.choice(sample_texts) | |
| txt_path = os.path.join(output_dir, "sample_tech_100kb.txt") | |
| with open(txt_path, "w", encoding="utf-8") as f: | |
| f.write(text) | |
| return txt_path | |
| def create_temp_file_from_stdin(output_dir): | |
| """Function to create temporary file from stdin input""" | |
| # Read all text from stdin | |
| text_content = sys.stdin.read() | |
| if not text_content.strip(): | |
| print("Error: No input received from stdin.", file=sys.stderr) | |
| sys.exit(1) | |
| # Create temporary file with timestamp | |
| timestamp = int(time.time()) | |
| txt_path = os.path.join(output_dir, f"stdin_input_{timestamp}.txt") | |
| with open(txt_path, "w", encoding="utf-8") as f: | |
| f.write(text_content) | |
| return txt_path | |
| def verify_qr_frames(mp4_path, original_chunks, num_samples=5, quick_verify=False): | |
| """Extract frames from MP4 and verify if they can be read as QR codes""" | |
| print(f"\n=== QR Code Reading Verification ===") | |
| # Temporary directory for verification | |
| verify_dir = tempfile.mkdtemp(prefix="verify_") | |
| try: | |
| total_frames = len(original_chunks) | |
| if quick_verify: | |
| # Random sampling verification | |
| sample_frames = random.sample( | |
| range(1, total_frames + 1), min(num_samples, total_frames)) | |
| print(f"Quick verification mode ({len(sample_frames)} frames)") | |
| else: | |
| # Verify all frames (default) | |
| sample_frames = list(range(1, total_frames + 1)) | |
| print(f"All frames verification mode ({total_frames} frames)") | |
| success_count = 0 | |
| qr_detector = cv2.QRCodeDetectorAruco() | |
| for idx, frame_num in enumerate(sample_frames, 1): | |
| # Extract specific frame | |
| output_frame = os.path.join( | |
| verify_dir, f"verify_frame_{frame_num}.png") | |
| ( | |
| ffmpeg | |
| .input(mp4_path) | |
| .filter('select', f'eq(n,{frame_num-1})') | |
| # Scale up 4x | |
| .filter('scale', 'iw*4', 'ih*4', flags='neighbor') | |
| .output(output_frame, vframes=1) | |
| .overwrite_output() | |
| .run(quiet=True) | |
| ) | |
| # Read QR code | |
| try: | |
| image = cv2.imread(output_frame) | |
| data, bbox, _ = qr_detector.detectAndDecode(image) | |
| if data: | |
| # Check if it matches original chunk text | |
| original_chunk = original_chunks[frame_num - 1] | |
| if data == original_chunk: | |
| success_count += 1 | |
| else: | |
| raise Exception(f"Frame {frame_num}: Text mismatch") | |
| else: | |
| raise Exception( | |
| f"Frame {frame_num}: Could not read QR code") | |
| except Exception as e: | |
| raise Exception(f"Frame {frame_num}: Reading error - {str(e)}") | |
| # Show progress during verification | |
| if not quick_verify: | |
| progress = idx / len(sample_frames) * 100 | |
| bar_length = 30 | |
| filled_length = int(bar_length * idx // len(sample_frames)) | |
| bar = '█' * filled_length + '-' * (bar_length - filled_length) | |
| print(f"\r[{bar}] {progress:.1f}% ({idx}/{len(sample_frames)})", | |
| end='', flush=True) | |
| if not quick_verify: | |
| print() # New line | |
| success_rate = (success_count / len(sample_frames)) * 100 | |
| print( | |
| f"Verification result: {success_count}/{len(sample_frames)} frames readable successfully ({success_rate:.1f}%)") | |
| if success_count != len(sample_frames): | |
| raise Exception( | |
| f"Verification failed: {success_count}/{len(sample_frames)} frames readable") | |
| return True | |
| finally: | |
| # Delete verification temporary directory | |
| shutil.rmtree(verify_dir, ignore_errors=True) | |
| def parse_frame_numbers(frame_spec, total_frames): | |
| """Parse frame number specification string and return list of frame numbers""" | |
| frame_numbers = set() | |
| for part in frame_spec.split(','): | |
| part = part.strip() | |
| if '-' in part: | |
| # Range specification (e.g., "2-4") | |
| try: | |
| start, end = map(int, part.split('-')) | |
| if start < 1 or end > total_frames or start > end: | |
| raise ValueError(f"Invalid range: {part}") | |
| frame_numbers.update(range(start, end + 1)) | |
| except ValueError: | |
| raise ValueError(f"Invalid range format: {part}") | |
| else: | |
| # Single number | |
| try: | |
| num = int(part) | |
| if num < 1 or num > total_frames: | |
| raise ValueError(f"Frame number out of range: {num}") | |
| frame_numbers.add(num) | |
| except ValueError: | |
| raise ValueError(f"Invalid frame number: {part}") | |
| return sorted(list(frame_numbers)) | |
| def extract_qr_data_from_mp4(input_mp4_path, target_frames=None, quiet=True): | |
| """Extract QR code data from MP4 file and return decoded chunks""" | |
| if not quiet: | |
| print(f"\n=== MP4 QR Code Extraction Started ===", file=sys.stderr) | |
| # Temporary directory for extraction | |
| extract_dir = tempfile.mkdtemp(prefix="extract_") | |
| try: | |
| # Get total frame count from MP4 | |
| probe = ffmpeg.probe(input_mp4_path) | |
| video_stream = next( | |
| (stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None) | |
| total_frames = int(video_stream['nb_frames']) | |
| # Determine target frames | |
| if target_frames is None: | |
| target_frames = list(range(1, total_frames + 1)) | |
| if not quiet: | |
| print(f"Total frames: {total_frames}", file=sys.stderr) | |
| print(f"Target frames: {len(target_frames)}", file=sys.stderr) | |
| print(f"\n=== Reading QR Codes ===", file=sys.stderr) | |
| qr_detector = cv2.QRCodeDetectorAruco() | |
| decoded_chunks = [] | |
| for idx, frame_num in enumerate(target_frames, 1): | |
| # Extract frame | |
| output_frame = os.path.join(extract_dir, f"frame_{frame_num}.png") | |
| ( | |
| ffmpeg | |
| .input(input_mp4_path) | |
| .filter('select', f'eq(n,{frame_num-1})') | |
| .filter('scale', 'iw*4', 'ih*4', flags='neighbor') | |
| .output(output_frame, vframes=1) | |
| .overwrite_output() | |
| .run(quiet=True) | |
| ) | |
| # Read QR code | |
| try: | |
| image = cv2.imread(output_frame) | |
| data, bbox, _ = qr_detector.detectAndDecode(image) | |
| if not data: | |
| error_msg = f"Could not read QR code from frame {frame_num}." | |
| if not quiet: | |
| print(f"\nError: {error_msg}", file=sys.stderr) | |
| raise Exception(error_msg) | |
| decoded_chunks.append({"frame_num": frame_num, "data": data}) | |
| # Progress display | |
| if not quiet: | |
| progress = idx / len(target_frames) * 100 | |
| bar_length = 30 | |
| filled_length = int(bar_length * idx // len(target_frames)) | |
| bar = '█' * filled_length + '-' * \ | |
| (bar_length - filled_length) | |
| print(f"\r[{bar}] {progress:.1f}% ({idx}/{len(target_frames)})", | |
| end='', flush=True, file=sys.stderr) | |
| except Exception as e: | |
| error_msg = f"An error occurred while processing frame {frame_num}: {e}" | |
| if not quiet: | |
| print(f"\nError: {error_msg}", file=sys.stderr) | |
| raise Exception(error_msg) | |
| if not quiet: | |
| print(file=sys.stderr) # New line | |
| print("QR code reading completed", file=sys.stderr) | |
| return decoded_chunks, total_frames | |
| finally: | |
| # Delete temporary directory | |
| shutil.rmtree(extract_dir, ignore_errors=True) | |
| def process_mp4_to_text(input_mp4_path, output_txt_path): | |
| """Function to read QR codes from MP4 file and convert to text file""" | |
| print(f"\n=== MP4 to Text Conversion Started ===") | |
| try: | |
| # Extract QR data from all frames | |
| decoded_chunks, total_frames = extract_qr_data_from_mp4( | |
| input_mp4_path, quiet=False) | |
| # Get metadata and reconstruct text | |
| json_file_path = input_mp4_path.replace('.qr.mp4', '.qr.json') | |
| chunk_size = 2800 # Default value | |
| overlap_size = 100 # Default value | |
| if os.path.exists(json_file_path): | |
| try: | |
| with open(json_file_path, 'r', encoding='utf-8') as f: | |
| index_data = json.load(f) | |
| if "metadata" in index_data: | |
| metadata = index_data["metadata"] | |
| chunk_size = metadata.get("chunk_size", 2800) | |
| overlap_size = metadata.get("overlap_size", 100) | |
| print( | |
| f"Using metadata: chunk_size={chunk_size}, overlap_size={overlap_size}") | |
| else: | |
| print("No metadata found in index file, using default values") | |
| except Exception as e: | |
| print( | |
| f"Warning: Could not read metadata from {json_file_path}: {e}") | |
| print("Using default values: chunk_size=2800, overlap_size=100") | |
| # Reconstruct text from chunks by removing overlaps | |
| combined_text = reconstruct_text_from_chunks( | |
| decoded_chunks, chunk_size, overlap_size) | |
| with open(output_txt_path, 'w', encoding='utf-8', newline='') as f: | |
| f.write(combined_text) | |
| # Display results | |
| output_size = os.path.getsize(output_txt_path) | |
| input_size = os.path.getsize(input_mp4_path) | |
| print(f"\n=== Conversion Results ===") | |
| print(f"Chunks read: {len(decoded_chunks)}") | |
| print(f"Restored character count: {len(combined_text):,} characters") | |
| print(f"MP4 file: {input_size:,} bytes") | |
| print(f"TXT file: {output_size:,} bytes") | |
| return output_txt_path | |
| except Exception as e: | |
| print(f"\nError: {e}") | |
| sys.exit(1) | |
| def process_mp4_to_stdout(input_mp4_path, frame_spec=None, quiet=True): | |
| """Function to read QR codes from MP4 file and output to stdout""" | |
| try: | |
| # Get total frame count first for frame spec parsing | |
| probe = ffmpeg.probe(input_mp4_path) | |
| video_stream = next( | |
| (stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None) | |
| total_frames = int(video_stream['nb_frames']) | |
| # Determine target frames | |
| if frame_spec: | |
| try: | |
| target_frames = parse_frame_numbers(frame_spec, total_frames) | |
| except ValueError as e: | |
| if not quiet: | |
| print(f"Error: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| else: | |
| target_frames = None | |
| # Extract QR data | |
| decoded_chunks, _ = extract_qr_data_from_mp4( | |
| input_mp4_path, target_frames, quiet) | |
| # Output results | |
| if frame_spec: | |
| # JSON output for specific frames | |
| json_output = [{"id": chunk["frame_num"], | |
| "content": chunk["data"]} for chunk in decoded_chunks] | |
| print(json.dumps(json_output, ensure_ascii=False, indent=2)) | |
| else: | |
| # Plain text output for all frames - remove chunk overlaps | |
| json_file_path = input_mp4_path.replace('.qr.mp4', '.qr.json') | |
| chunk_size = 2800 # Default value | |
| overlap_size = 100 # Default value | |
| if os.path.exists(json_file_path): | |
| try: | |
| with open(json_file_path, 'r', encoding='utf-8') as f: | |
| index_data = json.load(f) | |
| if "metadata" in index_data: | |
| metadata = index_data["metadata"] | |
| chunk_size = metadata.get("chunk_size", 2800) | |
| overlap_size = metadata.get("overlap_size", 100) | |
| except Exception: | |
| pass # Use default values | |
| combined_text = reconstruct_text_from_chunks( | |
| decoded_chunks, chunk_size, overlap_size) | |
| print(combined_text, end='') | |
| except Exception as e: | |
| if not quiet: | |
| print(f"Error: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| def process_search_query(input_mp4_path, search_query, quiet=True): | |
| """Search for words in index and return matching frames""" | |
| # Determine index file path | |
| json_file_path = input_mp4_path.replace('.qr.mp4', '.qr.json') | |
| try: | |
| # Search in index | |
| frame_ids = search_in_index(json_file_path, search_query) | |
| if not frame_ids: | |
| if not quiet: | |
| search_type = "OR" if " OR " in search_query else "AND" | |
| print( | |
| f"No frames found containing words: {search_query} ({search_type} search, includes partial matches)", file=sys.stderr) | |
| print(json.dumps([], ensure_ascii=False, indent=2)) | |
| return | |
| if not quiet: | |
| search_type = "OR" if " OR " in search_query else "AND" | |
| words = search_query.split( | |
| " OR ") if " OR " in search_query else search_query.split() | |
| print( | |
| f"Found {len(frame_ids)} frames containing '{search_query}' ({search_type} search, includes partial matches)", file=sys.stderr) | |
| # Extract QR data from matching frames | |
| decoded_chunks, _ = extract_qr_data_from_mp4( | |
| input_mp4_path, frame_ids, quiet) | |
| # Output results in JSON format | |
| json_output = [{"id": chunk["frame_num"], | |
| "content": chunk["data"]} for chunk in decoded_chunks] | |
| print(json.dumps(json_output, ensure_ascii=False, indent=2)) | |
| except FileNotFoundError as e: | |
| if not quiet: | |
| print(f"Error: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| except Exception as e: | |
| if not quiet: | |
| print(f"Error: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Convert text files to QR code videos (MP4) or restore MP4 to text", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Usage examples: | |
| python qr_video_converter.py input.txt # Convert input.txt to input.qr.mp4 | |
| python qr_video_converter.py input.qr.mp4 # Output all content to stdout as plain text | |
| python qr_video_converter.py input.qr.mp4 -f 42 # Output frame 42 as JSON | |
| python qr_video_converter.py input.qr.mp4 -f 1,3,5 # Output specific frames as JSON | |
| python qr_video_converter.py input.qr.mp4 -f 1-5,10 # Output frame range as JSON | |
| python qr_video_converter.py input.qr.mp4 -s python # Search for word 'python' and output matching frames as JSON | |
| python qr_video_converter.py input.qr.mp4 -s "python machine" # AND search for both words | |
| python qr_video_converter.py input.qr.mp4 -s "python OR java" # OR search for either word | |
| python qr_video_converter.py input.qr.mp4 -o # Output to input.decoded.txt file | |
| python qr_video_converter.py input.txt --crf 25 # Convert with CRF value 25 (high quality) | |
| python qr_video_converter.py input.txt -q # Convert with quick verification (5 random frames) | |
| python qr_video_converter.py --sample # Generate sample file and convert | |
| python qr_video_converter.py --stdin # Read text from stdin and convert to MP4 | |
| echo "Hello World" | python qr_video_converter.py --stdin # Pipe text and convert | |
| python qr_video_converter.py input.txt --chunk-size 3000 --overlap-size 150 # Custom chunk settings | |
| """ | |
| ) | |
| group = parser.add_mutually_exclusive_group() | |
| group.add_argument( | |
| 'input_file', | |
| nargs='?', | |
| help='Path to file to convert (any text file or .qr.mp4)' | |
| ) | |
| group.add_argument( | |
| '--sample', | |
| action='store_true', | |
| help='Generate sample text file and convert' | |
| ) | |
| group.add_argument( | |
| '--stdin', | |
| action='store_true', | |
| help='Read text from standard input and convert to MP4' | |
| ) | |
| parser.add_argument( | |
| '--crf', | |
| type=int, | |
| default=30, | |
| choices=range(25, 35), | |
| metavar='25-34', | |
| help='CRF value (quality setting). Only effective for text to MP4 conversion. Lower values mean higher quality and larger file size. Recommended: 30 (default), High quality: 25, Small size: 34' | |
| ) | |
| parser.add_argument( | |
| '--quick-verify', '-q', | |
| action='store_true', | |
| help='Quick verification mode - verify only 5 random frames during text to MP4 conversion (default is all frames)' | |
| ) | |
| parser.add_argument( | |
| '--chunk-size', | |
| type=int, | |
| default=2800, | |
| metavar='BYTES', | |
| help='Maximum chunk size in bytes (default: 2800). Affects QR code data density and video frame count.' | |
| ) | |
| parser.add_argument( | |
| '--overlap-size', | |
| type=int, | |
| default=100, | |
| metavar='BYTES', | |
| help='Overlap size in bytes between chunks (default: 100). Higher values improve error recovery but increase redundancy.' | |
| ) | |
| parser.add_argument( | |
| '--output', '-o', | |
| action='store_true', | |
| help='Output to text file instead of stdout (only for .qr.mp4 input)' | |
| ) | |
| parser.add_argument( | |
| '--frames', '-f', | |
| type=str, | |
| help='Specify frame numbers to extract (e.g., "1", "1,3,5", "1-5,10"). Output format will be JSON.' | |
| ) | |
| parser.add_argument( | |
| '--search', '-s', | |
| type=str, | |
| help='Search for words in the index and output matching frames as JSON. ' + | |
| 'Use space-separated words for AND search (e.g., "python machine") or ' + | |
| '"OR" keyword for OR search (e.g., "python OR java") (only for .qr.mp4 input)' | |
| ) | |
| args = parser.parse_args() | |
| # Validate chunk settings for text to MP4 conversion | |
| if args.chunk_size < args.overlap_size * 2 + 100: | |
| print( | |
| f"Error: Chunk size ({args.chunk_size}) must be at least {args.overlap_size * 2 + 100} bytes (overlap_size * 2 + 100).", file=sys.stderr) | |
| sys.exit(1) | |
| if args.sample: | |
| # Sample mode | |
| output_dir = "output" | |
| if not os.path.exists(output_dir): | |
| os.makedirs(output_dir) | |
| txt_path = create_sample_file(output_dir) | |
| # Use the same base name as the generated text file | |
| txt_basename = os.path.splitext(os.path.basename(txt_path))[0] | |
| output_mp4 = os.path.join(output_dir, f'{txt_basename}.qr.mp4') | |
| # .qr.mp4 files can be overwritten (removed check) | |
| result_file = process_text_to_mp4( | |
| txt_path, output_mp4, args.crf, args.quick_verify, args.chunk_size, args.overlap_size) | |
| print(f"Generated video: {result_file}") | |
| elif args.stdin: | |
| # Stdin mode | |
| output_dir = "output" | |
| if not os.path.exists(output_dir): | |
| os.makedirs(output_dir) | |
| txt_path = create_temp_file_from_stdin(output_dir) | |
| # Use the same base name as the generated text file | |
| txt_basename = os.path.splitext(os.path.basename(txt_path))[0] | |
| output_mp4 = os.path.join(output_dir, f'{txt_basename}.qr.mp4') | |
| # .qr.mp4 files can be overwritten (removed check) | |
| try: | |
| result_file = process_text_to_mp4( | |
| txt_path, output_mp4, args.crf, args.quick_verify, args.chunk_size, args.overlap_size) | |
| print(f"Generated video: {result_file}") | |
| finally: | |
| # Clean up temporary file | |
| if os.path.exists(txt_path): | |
| os.remove(txt_path) | |
| elif args.input_file: | |
| input_file = args.input_file | |
| if not os.path.exists(input_file): | |
| print(f"Error: File '{input_file}' not found.") | |
| sys.exit(1) | |
| # Check if input is .qr.json and convert to corresponding .qr.mp4 | |
| if input_file.lower().endswith('.qr.json'): | |
| # Convert .qr.json to .qr.mp4 | |
| mp4_file = input_file.replace('.qr.json', '.qr.mp4') | |
| if not os.path.exists(mp4_file): | |
| print( | |
| f"Error: Corresponding MP4 file '{mp4_file}' not found for JSON index file '{input_file}'.") | |
| sys.exit(1) | |
| input_file = mp4_file | |
| # Branch processing based on file extension | |
| input_dir = os.path.dirname(input_file) | |
| input_basename = os.path.splitext(os.path.basename(input_file))[0] | |
| input_ext = os.path.splitext(input_file)[1].lower() | |
| if input_ext == '.mp4': | |
| # Check if it's a .qr.mp4 file | |
| if not input_file.lower().endswith('.qr.mp4'): | |
| print( | |
| f"Error: Only .qr.mp4 files are supported for MP4 input. Got: {input_file}") | |
| sys.exit(1) | |
| # Remove .qr from basename for output filename | |
| if input_basename.lower().endswith('.qr'): | |
| base_name_for_output = input_basename[:-3] # Remove '.qr' | |
| else: | |
| base_name_for_output = input_basename | |
| if args.search: | |
| # Search mode | |
| process_search_query(input_file, args.search, quiet=True) | |
| elif args.output: | |
| # MP4 to TXT file conversion | |
| output_file = os.path.join( | |
| input_dir, f"{base_name_for_output}.decoded.txt") | |
| # .decoded.txt files can be overwritten (removed check) | |
| result_file = process_mp4_to_text(input_file, output_file) | |
| print(f"Restored text: {result_file}") | |
| else: | |
| # Output to stdout (default for MP4) | |
| process_mp4_to_stdout(input_file, args.frames, quiet=True) | |
| else: | |
| # Any non-MP4 file treated as text file | |
| if args.output: | |
| print( | |
| "Error: --output option is only available for .qr.mp4 input files.", file=sys.stderr) | |
| sys.exit(1) | |
| if args.search: | |
| print( | |
| "Error: --search option is only available for .qr.mp4 input files.", file=sys.stderr) | |
| sys.exit(1) | |
| # Text to MP4 conversion | |
| output_file = os.path.join(input_dir, f"{input_basename}.qr.mp4") | |
| # .qr.mp4 files can be overwritten (removed check) | |
| result_file = process_text_to_mp4( | |
| input_file, output_file, args.crf, args.quick_verify, args.chunk_size, args.overlap_size) | |
| print(f"Generated video: {result_file}") | |
| else: | |
| # Show help when no arguments provided | |
| parser.print_help() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment