Skip to content

Instantly share code, notes, and snippets.

@mutaguchi
Last active June 2, 2025 06:04
Show Gist options
  • Select an option

  • Save mutaguchi/6a02a214a424099fec9eca193a5f8655 to your computer and use it in GitHub Desktop.

Select an option

Save mutaguchi/6a02a214a424099fec9eca193a5f8655 to your computer and use it in GitHub Desktop.
QR Video Converter - Text to QR Code Video (MP4) and Back
import os
import random
import sys
import argparse
import tempfile
import shutil
import threading
import time
import json
import qrcode
import ffmpeg
import cv2
import re
# Try importing MeCab (optional)
try:
import MeCab
MECAB_AVAILABLE = True
except ImportError:
MECAB_AVAILABLE = False
MeCab = None
"""
QR Video Converter
This script converts text files to QR code videos (MP4) and vice versa.
Inspired by https://github.com/Olow304/memvid for proof-of-concept of text-to-QR video conversion.
Requirements:
- pip install qrcode ffmpeg-python opencv-python
Typical usage:
python qr_video_converter.py input.txt # Convert text to input.qr.mp4
python qr_video_converter.py --sample # Generate sample file
python qr_video_converter.py input.qr.mp4 # Output all content to stdout
python qr_video_converter.py input.qr.mp4 -s python # Search for 'python' and output matching frames as JSON
python qr_video_converter.py input.qr.mp4 -o # Output to input.decoded.txt file
Note: This is a proof-of-concept tool. The resulting video files are typically 2-3x larger
than the original text files. This tool is not intended for practical use but rather
to demonstrate the feasibility of text-to-QR video conversion.
---
QR Video Converter (日本語版)
このスクリプトはテキストファイルをQRコード動画(MP4)に変換したり、その逆変換を行います。
テキストからQRコード動画への変換の概念実証として、https://github.com/Olow304/memvid からインスピレーションを得ています。
必要なライブラリ:
- pip install qrcode ffmpeg-python opencv-python
- 日本語検索機能を使用する場合: pip install mecab-python3 unidic-lite
基本的な使用方法:
python qr_video_converter.py input.txt # テキストをinput.qr.mp4に変換
python qr_video_converter.py --sample # サンプルファイルを生成して変換
python qr_video_converter.py input.qr.mp4 # 全内容を標準出力に出力
python qr_video_converter.py input.qr.mp4 -s python # 'python'を検索してマッチしたフレームをJSONで出力
python qr_video_converter.py input.qr.mp4 -o # input.decoded.txtファイルに出力
注意: これは概念実証ツールです。生成される動画ファイルは通常、元のテキストファイルの2-3倍のサイズになります。
このツールは実用的な使用を目的とせず、テキストからQRコード動画への変換の実現可能性を示すものです。
"""
def process_text_to_mp4(input_file_path, output_file_path, crf=30, quick_verify=False, chunk_size=2800, overlap_size=100):
"""Function to convert text file to QR code video"""
# Display input file information
original_size = os.path.getsize(input_file_path)
file_name = os.path.basename(input_file_path)
with open(input_file_path, "r", encoding="utf-8") as f:
text_content = f.read()
char_count = len(text_content)
print(f"\n=== Input File Information ===")
print(f"File name: {file_name}")
print(f"File size: {original_size:,} bytes")
print(f"Character count: {char_count:,} characters")
# Split into chunks <= 2800 bytes with 100-byte overlap on each side
def split_utf8_chunks(data_bytes, chunk_size, overlap_size):
chunks = []
pos = 0
length = len(data_bytes)
# Adjust chunk size to accommodate overlap areas (2800 - 200 = 2600)
actual_chunk_size = chunk_size - (overlap_size * 2)
while pos < length:
# Calculate chunk boundaries
# Start with overlap from previous chunk
chunk_start = max(0, pos - overlap_size)
chunk_end = min(pos + actual_chunk_size, length)
# Add overlap to the end (if not at file end)
if chunk_end < length:
overlap_end = min(chunk_end + overlap_size, length)
else:
overlap_end = chunk_end
# Ensure we don't break UTF-8 characters at boundaries
while overlap_end > chunk_start:
try:
chunk = data_bytes[chunk_start:overlap_end].decode("utf-8")
break
except UnicodeDecodeError:
overlap_end -= 1
if overlap_end <= chunk_start:
# Fallback: if we can't decode, move to next position
pos += 1
continue
chunks.append(chunk)
pos += actual_chunk_size
return chunks
# Create temporary directory
temp_dir = tempfile.mkdtemp(prefix="qr_")
try:
# Get base name from original file name
base_name = os.path.splitext(os.path.basename(input_file_path))[0]
# Read text file
with open(input_file_path, "rb") as f:
data_bytes = f.read()
chunks = split_utf8_chunks(data_bytes, chunk_size, overlap_size)
print(f"Number of chunks: {len(chunks)}")
print(f"Chunk size: {chunk_size} bytes")
print(f"Overlap size: {overlap_size} bytes")
print(f"\n=== Generating QR Codes ===")
# Generate QR images
for idx, chunk in enumerate(chunks, start=1):
frame_path = os.path.join(
temp_dir, f"{base_name}_frame_{idx:03d}.png")
qr = qrcode.QRCode(version=40, error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=1, border=4)
qr.add_data(chunk)
qr.make(fit=False)
img = qr.make_image(fill_color="black", back_color="white")
img.save(frame_path)
# Progress display
progress = idx / len(chunks) * 100
bar_length = 30
filled_length = int(bar_length * idx // len(chunks))
bar = '█' * filled_length + '-' * (bar_length - filled_length)
print(f"\r[{bar}] {progress:.1f}% ({idx}/{len(chunks)})",
end='', flush=True)
print()
print("QR code generation completed")
# Encode to HEVC with padding to even dimensions (width,height)
input_pattern = os.path.join(temp_dir, f'{base_name}_frame_%03d.png')
progress_file = os.path.join(temp_dir, 'progress.txt')
print(f"\n=== Video Encoding ===")
encoding_finished = threading.Event()
def monitor_progress():
"""Monitor ffmpeg progress and display progress bar"""
total_frames = len(chunks)
last_frame = 0
while not encoding_finished.is_set():
try:
if os.path.exists(progress_file):
with open(progress_file, 'r') as f:
lines = f.readlines()
current_frame = 0
for line in lines:
if line.startswith('frame='):
try:
current_frame = int(
line.split('=')[1].strip())
except:
pass
if current_frame > last_frame:
last_frame = current_frame
progress = min(current_frame /
total_frames * 100, 100)
bar_length = 30
filled_length = int(
bar_length * current_frame // total_frames)
bar = '█' * filled_length + '-' * \
(bar_length - filled_length)
print(f"\r[{bar}] {progress:.1f}% ({current_frame}/{total_frames})",
end='', flush=True)
time.sleep(0.5)
except:
pass
# Start progress monitoring thread
progress_thread = threading.Thread(target=monitor_progress)
progress_thread.start()
try:
(
ffmpeg
.input(input_pattern, framerate=30)
.filter('pad', 'iw+mod(iw,2)', 'ih+mod(ih,2)', color='white')
.output(
output_file_path,
vcodec='libx265',
crf=crf,
preset='veryslow',
pix_fmt='gray',
**{
'x265-params':
'profile=monochrome:'
'keyint=120:min-keyint=120:'
'scenecut=0:bframes=8:'
'sao=0'
}
)
.global_args('-progress', progress_file)
.overwrite_output()
.run(quiet=True)
)
finally:
# End progress monitoring
encoding_finished.set()
progress_thread.join()
# Display final progress at 100%
bar_length = 30
bar = '█' * bar_length
total_frames = len(chunks)
print(f"\r[{bar}] 100.0% ({total_frames}/{total_frames})", flush=True)
print()
print("Video encoding completed")
# Generate search index
print(f"\n=== Generating Search Index ===")
json_file_path = output_file_path.replace('.qr.mp4', '.qr.json')
generate_search_index(chunks, json_file_path, chunk_size, overlap_size)
print(f"Search index generated: {json_file_path}")
# Execute QR code reading verification
try:
verify_qr_frames(output_file_path, chunks,
quick_verify=quick_verify)
except Exception as e:
print(f"\nError: QR code verification failed: {e}")
print("Deleting generated MP4 file...")
if os.path.exists(output_file_path):
os.remove(output_file_path)
if os.path.exists(json_file_path):
os.remove(json_file_path)
sys.exit(1)
finally:
# Delete temporary directory and its contents
shutil.rmtree(temp_dir, ignore_errors=True)
# Display file size comparison
output_size = os.path.getsize(output_file_path)
size_ratio = output_size / original_size
print(f"\n=== File Size Comparison ===")
print(f"Original file: {original_size:,} bytes")
print(f"MP4 file: {output_size:,} bytes")
print(f"Size ratio: {size_ratio:.2f}x")
return output_file_path
def has_japanese_text(text):
"""Determine if text contains Japanese characters"""
return bool(re.search(r'[\u3040-\u309F\u30A0-\u30FF\u4E00-\u9FAF]', text))
def extract_words_from_text(text):
"""Extract words from text (use MeCab for Japanese if available)"""
if has_japanese_text(text) and MECAB_AVAILABLE:
# Morphological analysis using MeCab
try:
tagger = MeCab.Tagger()
result = tagger.parse(text)
words = [re.split('[\t]', x)[0] for x in result.splitlines()][:-1]
# Exclude empty strings and symbol-only words
words = [word for word in words if word and re.search(
r'[a-zA-Z0-9\u3040-\u309F\u30A0-\u30FF\u4E00-\u9FAF]', word)]
return words
except Exception:
# Fallback to regex if MeCab error occurs
pass
# Word extraction using regex (traditional method)
words = re.findall(
r'[a-zA-Z0-9\u3040-\u309F\u30A0-\u30FF\u4E00-\u9FAF]+', text, re.UNICODE)
return words
def generate_search_index(chunks, json_file_path, chunk_size=2800, overlap_size=100):
"""Generate search index from text chunks with metadata"""
# Change to structure including metadata and index
index_data = {
"metadata": {
"chunk_size": chunk_size,
"overlap_size": overlap_size,
"total_chunks": len(chunks),
"actual_chunk_size": chunk_size - (overlap_size * 2)
},
"word_index": {}
}
word_index = index_data["word_index"]
for frame_id, chunk in enumerate(chunks, start=1):
# Extract words using MeCab if Japanese text is present, otherwise use regex
words = extract_words_from_text(chunk)
for word in words:
word_lower = word.lower()
if word_lower not in word_index:
word_index[word_lower] = []
if frame_id not in word_index[word_lower]:
word_index[word_lower].append(frame_id)
# Sort frame IDs for each word
for word in word_index:
word_index[word].sort()
with open(json_file_path, 'w', encoding='utf-8') as f:
json.dump(index_data, f, ensure_ascii=False, indent=2)
def search_in_index(json_file_path, search_query):
"""Search for words in index file and return frame IDs with AND/OR logic"""
if not os.path.exists(json_file_path):
raise FileNotFoundError(f"Index file not found: {json_file_path}")
with open(json_file_path, 'r', encoding='utf-8') as f:
index_data = json.load(f)
# Support for new and old formats
if "word_index" in index_data:
word_index = index_data["word_index"]
else:
# Old format (backward compatibility)
word_index = index_data
def find_word_frames(word):
"""Find frames for a word using exact match first, then partial match"""
word_lower = word.lower()
# First try exact match
if word_lower in word_index:
return set(word_index[word_lower])
# If no exact match, try partial match
result_frames = set()
for indexed_word in word_index:
if word_lower in indexed_word: # Partial match
result_frames.update(word_index[indexed_word])
return result_frames
# Parse search query
if " OR " in search_query:
# OR search
words = [word.strip() for word in search_query.split(" OR ")]
result_frames = set()
for word in words:
# Apply morphological analysis to each word
if has_japanese_text(word) and MECAB_AVAILABLE:
try:
sub_words = extract_words_from_text(word)
# Process as AND search when morphologically analyzed
word_frames = None
for sub_word in sub_words:
sub_frames = find_word_frames(sub_word)
if word_frames is None:
word_frames = sub_frames
else:
word_frames = word_frames.intersection(sub_frames)
if word_frames:
result_frames.update(word_frames)
except Exception:
# Search with original word if MeCab error occurs
word_frames = find_word_frames(word)
result_frames.update(word_frames)
else:
word_frames = find_word_frames(word)
result_frames.update(word_frames)
return sorted(list(result_frames))
else:
# AND search (default - space-separated words)
words = [word.strip() for word in search_query.split()]
if not words:
return []
# Apply morphological analysis to entire search query
if has_japanese_text(search_query) and MECAB_AVAILABLE:
try:
morphed_words = extract_words_from_text(search_query)
if morphed_words:
words = morphed_words
except Exception:
# Use original word list if MeCab error occurs
pass
# Start with frames containing the first word
result_frames = find_word_frames(words[0])
# Intersect with frames containing each subsequent word
for word in words[1:]:
word_frames = find_word_frames(word)
result_frames = result_frames.intersection(word_frames)
return sorted(list(result_frames))
def reconstruct_text_from_chunks(chunks, chunk_size=2800, overlap_size=100):
"""Reconstruct original text from chunks by removing overlaps"""
if not chunks:
return ""
if len(chunks) == 1:
return chunks[0]["data"]
# Actual chunk size (excluding overlap portions)
actual_chunk_size = chunk_size - (overlap_size * 2)
result_text = ""
for i, chunk in enumerate(chunks):
chunk_data = chunk["data"]
if i == 0:
# First chunk: remove only the rear overlap
if len(chunks) == 1:
# If there's only one chunk, use the entire chunk
result_text = chunk_data
else:
# Remove rear overlap
chunk_bytes = chunk_data.encode('utf-8')
if len(chunk_bytes) > overlap_size:
# Safely trim considering UTF-8 boundaries
target_end = len(chunk_bytes) - overlap_size
# Move backward to UTF-8 character boundary
while target_end > 0:
try:
result_text = chunk_bytes[:target_end].decode(
'utf-8')
break
except UnicodeDecodeError:
target_end -= 1
else:
result_text = chunk_data
elif i == len(chunks) - 1:
# Last chunk: remove only the front overlap
chunk_bytes = chunk_data.encode('utf-8')
if len(chunk_bytes) > overlap_size:
# Safely trim considering UTF-8 boundaries
target_start = overlap_size
# Move forward to UTF-8 character boundary
while target_start < len(chunk_bytes):
try:
clean_data = chunk_bytes[target_start:].decode('utf-8')
result_text += clean_data
break
except UnicodeDecodeError:
target_start += 1
else:
result_text += chunk_data
else:
# Middle chunk: remove both front and rear overlaps
chunk_bytes = chunk_data.encode('utf-8')
if len(chunk_bytes) > overlap_size * 2:
# Remove front overlap
target_start = overlap_size
while target_start < len(chunk_bytes):
try:
temp_data = chunk_bytes[target_start:].decode('utf-8')
break
except UnicodeDecodeError:
target_start += 1
# Remove rear overlap
target_end = len(chunk_bytes) - overlap_size
while target_end > target_start:
try:
clean_data = chunk_bytes[target_start:target_end].decode(
'utf-8')
result_text += clean_data
break
except UnicodeDecodeError:
target_end -= 1
else:
# Skip if chunk is too small
continue
return result_text
def create_sample_file(output_dir):
"""Function to create sample text file"""
# Generate diverse sample text for search testing
sample_texts = [
"Python programming language is powerful and versatile. ",
"Machine learning algorithms require large datasets for training. ",
"Artificial intelligence will transform our society. ",
"Data science involves statistics, programming, and domain expertise. ",
"Computer vision enables machines to interpret visual information. ",
"Natural language processing helps computers understand human language. ",
"Deep learning neural networks can solve complex problems. ",
"Software engineering practices ensure code quality and maintainability. ",
"Database management systems store and retrieve information efficiently. ",
"Web development frameworks simplify building online applications. ",
"Cloud computing provides scalable infrastructure for modern applications. ",
"Cybersecurity protects digital assets from malicious attacks. ",
"Blockchain technology enables decentralized and secure transactions. ",
"Internet of Things connects everyday objects to the digital world. ",
"Virtual reality creates immersive digital experiences. ",
]
target_bytes = 1024 * 100 # 100KB
text = ""
while len(text.encode("utf-8")) < target_bytes:
text += random.choice(sample_texts)
txt_path = os.path.join(output_dir, "sample_tech_100kb.txt")
with open(txt_path, "w", encoding="utf-8") as f:
f.write(text)
return txt_path
def create_temp_file_from_stdin(output_dir):
"""Function to create temporary file from stdin input"""
# Read all text from stdin
text_content = sys.stdin.read()
if not text_content.strip():
print("Error: No input received from stdin.", file=sys.stderr)
sys.exit(1)
# Create temporary file with timestamp
timestamp = int(time.time())
txt_path = os.path.join(output_dir, f"stdin_input_{timestamp}.txt")
with open(txt_path, "w", encoding="utf-8") as f:
f.write(text_content)
return txt_path
def verify_qr_frames(mp4_path, original_chunks, num_samples=5, quick_verify=False):
"""Extract frames from MP4 and verify if they can be read as QR codes"""
print(f"\n=== QR Code Reading Verification ===")
# Temporary directory for verification
verify_dir = tempfile.mkdtemp(prefix="verify_")
try:
total_frames = len(original_chunks)
if quick_verify:
# Random sampling verification
sample_frames = random.sample(
range(1, total_frames + 1), min(num_samples, total_frames))
print(f"Quick verification mode ({len(sample_frames)} frames)")
else:
# Verify all frames (default)
sample_frames = list(range(1, total_frames + 1))
print(f"All frames verification mode ({total_frames} frames)")
success_count = 0
qr_detector = cv2.QRCodeDetectorAruco()
for idx, frame_num in enumerate(sample_frames, 1):
# Extract specific frame
output_frame = os.path.join(
verify_dir, f"verify_frame_{frame_num}.png")
(
ffmpeg
.input(mp4_path)
.filter('select', f'eq(n,{frame_num-1})')
# Scale up 4x
.filter('scale', 'iw*4', 'ih*4', flags='neighbor')
.output(output_frame, vframes=1)
.overwrite_output()
.run(quiet=True)
)
# Read QR code
try:
image = cv2.imread(output_frame)
data, bbox, _ = qr_detector.detectAndDecode(image)
if data:
# Check if it matches original chunk text
original_chunk = original_chunks[frame_num - 1]
if data == original_chunk:
success_count += 1
else:
raise Exception(f"Frame {frame_num}: Text mismatch")
else:
raise Exception(
f"Frame {frame_num}: Could not read QR code")
except Exception as e:
raise Exception(f"Frame {frame_num}: Reading error - {str(e)}")
# Show progress during verification
if not quick_verify:
progress = idx / len(sample_frames) * 100
bar_length = 30
filled_length = int(bar_length * idx // len(sample_frames))
bar = '█' * filled_length + '-' * (bar_length - filled_length)
print(f"\r[{bar}] {progress:.1f}% ({idx}/{len(sample_frames)})",
end='', flush=True)
if not quick_verify:
print() # New line
success_rate = (success_count / len(sample_frames)) * 100
print(
f"Verification result: {success_count}/{len(sample_frames)} frames readable successfully ({success_rate:.1f}%)")
if success_count != len(sample_frames):
raise Exception(
f"Verification failed: {success_count}/{len(sample_frames)} frames readable")
return True
finally:
# Delete verification temporary directory
shutil.rmtree(verify_dir, ignore_errors=True)
def parse_frame_numbers(frame_spec, total_frames):
"""Parse frame number specification string and return list of frame numbers"""
frame_numbers = set()
for part in frame_spec.split(','):
part = part.strip()
if '-' in part:
# Range specification (e.g., "2-4")
try:
start, end = map(int, part.split('-'))
if start < 1 or end > total_frames or start > end:
raise ValueError(f"Invalid range: {part}")
frame_numbers.update(range(start, end + 1))
except ValueError:
raise ValueError(f"Invalid range format: {part}")
else:
# Single number
try:
num = int(part)
if num < 1 or num > total_frames:
raise ValueError(f"Frame number out of range: {num}")
frame_numbers.add(num)
except ValueError:
raise ValueError(f"Invalid frame number: {part}")
return sorted(list(frame_numbers))
def extract_qr_data_from_mp4(input_mp4_path, target_frames=None, quiet=True):
"""Extract QR code data from MP4 file and return decoded chunks"""
if not quiet:
print(f"\n=== MP4 QR Code Extraction Started ===", file=sys.stderr)
# Temporary directory for extraction
extract_dir = tempfile.mkdtemp(prefix="extract_")
try:
# Get total frame count from MP4
probe = ffmpeg.probe(input_mp4_path)
video_stream = next(
(stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None)
total_frames = int(video_stream['nb_frames'])
# Determine target frames
if target_frames is None:
target_frames = list(range(1, total_frames + 1))
if not quiet:
print(f"Total frames: {total_frames}", file=sys.stderr)
print(f"Target frames: {len(target_frames)}", file=sys.stderr)
print(f"\n=== Reading QR Codes ===", file=sys.stderr)
qr_detector = cv2.QRCodeDetectorAruco()
decoded_chunks = []
for idx, frame_num in enumerate(target_frames, 1):
# Extract frame
output_frame = os.path.join(extract_dir, f"frame_{frame_num}.png")
(
ffmpeg
.input(input_mp4_path)
.filter('select', f'eq(n,{frame_num-1})')
.filter('scale', 'iw*4', 'ih*4', flags='neighbor')
.output(output_frame, vframes=1)
.overwrite_output()
.run(quiet=True)
)
# Read QR code
try:
image = cv2.imread(output_frame)
data, bbox, _ = qr_detector.detectAndDecode(image)
if not data:
error_msg = f"Could not read QR code from frame {frame_num}."
if not quiet:
print(f"\nError: {error_msg}", file=sys.stderr)
raise Exception(error_msg)
decoded_chunks.append({"frame_num": frame_num, "data": data})
# Progress display
if not quiet:
progress = idx / len(target_frames) * 100
bar_length = 30
filled_length = int(bar_length * idx // len(target_frames))
bar = '█' * filled_length + '-' * \
(bar_length - filled_length)
print(f"\r[{bar}] {progress:.1f}% ({idx}/{len(target_frames)})",
end='', flush=True, file=sys.stderr)
except Exception as e:
error_msg = f"An error occurred while processing frame {frame_num}: {e}"
if not quiet:
print(f"\nError: {error_msg}", file=sys.stderr)
raise Exception(error_msg)
if not quiet:
print(file=sys.stderr) # New line
print("QR code reading completed", file=sys.stderr)
return decoded_chunks, total_frames
finally:
# Delete temporary directory
shutil.rmtree(extract_dir, ignore_errors=True)
def process_mp4_to_text(input_mp4_path, output_txt_path):
"""Function to read QR codes from MP4 file and convert to text file"""
print(f"\n=== MP4 to Text Conversion Started ===")
try:
# Extract QR data from all frames
decoded_chunks, total_frames = extract_qr_data_from_mp4(
input_mp4_path, quiet=False)
# Get metadata and reconstruct text
json_file_path = input_mp4_path.replace('.qr.mp4', '.qr.json')
chunk_size = 2800 # Default value
overlap_size = 100 # Default value
if os.path.exists(json_file_path):
try:
with open(json_file_path, 'r', encoding='utf-8') as f:
index_data = json.load(f)
if "metadata" in index_data:
metadata = index_data["metadata"]
chunk_size = metadata.get("chunk_size", 2800)
overlap_size = metadata.get("overlap_size", 100)
print(
f"Using metadata: chunk_size={chunk_size}, overlap_size={overlap_size}")
else:
print("No metadata found in index file, using default values")
except Exception as e:
print(
f"Warning: Could not read metadata from {json_file_path}: {e}")
print("Using default values: chunk_size=2800, overlap_size=100")
# Reconstruct text from chunks by removing overlaps
combined_text = reconstruct_text_from_chunks(
decoded_chunks, chunk_size, overlap_size)
with open(output_txt_path, 'w', encoding='utf-8', newline='') as f:
f.write(combined_text)
# Display results
output_size = os.path.getsize(output_txt_path)
input_size = os.path.getsize(input_mp4_path)
print(f"\n=== Conversion Results ===")
print(f"Chunks read: {len(decoded_chunks)}")
print(f"Restored character count: {len(combined_text):,} characters")
print(f"MP4 file: {input_size:,} bytes")
print(f"TXT file: {output_size:,} bytes")
return output_txt_path
except Exception as e:
print(f"\nError: {e}")
sys.exit(1)
def process_mp4_to_stdout(input_mp4_path, frame_spec=None, quiet=True):
"""Function to read QR codes from MP4 file and output to stdout"""
try:
# Get total frame count first for frame spec parsing
probe = ffmpeg.probe(input_mp4_path)
video_stream = next(
(stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None)
total_frames = int(video_stream['nb_frames'])
# Determine target frames
if frame_spec:
try:
target_frames = parse_frame_numbers(frame_spec, total_frames)
except ValueError as e:
if not quiet:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
else:
target_frames = None
# Extract QR data
decoded_chunks, _ = extract_qr_data_from_mp4(
input_mp4_path, target_frames, quiet)
# Output results
if frame_spec:
# JSON output for specific frames
json_output = [{"id": chunk["frame_num"],
"content": chunk["data"]} for chunk in decoded_chunks]
print(json.dumps(json_output, ensure_ascii=False, indent=2))
else:
# Plain text output for all frames - remove chunk overlaps
json_file_path = input_mp4_path.replace('.qr.mp4', '.qr.json')
chunk_size = 2800 # Default value
overlap_size = 100 # Default value
if os.path.exists(json_file_path):
try:
with open(json_file_path, 'r', encoding='utf-8') as f:
index_data = json.load(f)
if "metadata" in index_data:
metadata = index_data["metadata"]
chunk_size = metadata.get("chunk_size", 2800)
overlap_size = metadata.get("overlap_size", 100)
except Exception:
pass # Use default values
combined_text = reconstruct_text_from_chunks(
decoded_chunks, chunk_size, overlap_size)
print(combined_text, end='')
except Exception as e:
if not quiet:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def process_search_query(input_mp4_path, search_query, quiet=True):
"""Search for words in index and return matching frames"""
# Determine index file path
json_file_path = input_mp4_path.replace('.qr.mp4', '.qr.json')
try:
# Search in index
frame_ids = search_in_index(json_file_path, search_query)
if not frame_ids:
if not quiet:
search_type = "OR" if " OR " in search_query else "AND"
print(
f"No frames found containing words: {search_query} ({search_type} search, includes partial matches)", file=sys.stderr)
print(json.dumps([], ensure_ascii=False, indent=2))
return
if not quiet:
search_type = "OR" if " OR " in search_query else "AND"
words = search_query.split(
" OR ") if " OR " in search_query else search_query.split()
print(
f"Found {len(frame_ids)} frames containing '{search_query}' ({search_type} search, includes partial matches)", file=sys.stderr)
# Extract QR data from matching frames
decoded_chunks, _ = extract_qr_data_from_mp4(
input_mp4_path, frame_ids, quiet)
# Output results in JSON format
json_output = [{"id": chunk["frame_num"],
"content": chunk["data"]} for chunk in decoded_chunks]
print(json.dumps(json_output, ensure_ascii=False, indent=2))
except FileNotFoundError as e:
if not quiet:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
except Exception as e:
if not quiet:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def main():
parser = argparse.ArgumentParser(
description="Convert text files to QR code videos (MP4) or restore MP4 to text",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Usage examples:
python qr_video_converter.py input.txt # Convert input.txt to input.qr.mp4
python qr_video_converter.py input.qr.mp4 # Output all content to stdout as plain text
python qr_video_converter.py input.qr.mp4 -f 42 # Output frame 42 as JSON
python qr_video_converter.py input.qr.mp4 -f 1,3,5 # Output specific frames as JSON
python qr_video_converter.py input.qr.mp4 -f 1-5,10 # Output frame range as JSON
python qr_video_converter.py input.qr.mp4 -s python # Search for word 'python' and output matching frames as JSON
python qr_video_converter.py input.qr.mp4 -s "python machine" # AND search for both words
python qr_video_converter.py input.qr.mp4 -s "python OR java" # OR search for either word
python qr_video_converter.py input.qr.mp4 -o # Output to input.decoded.txt file
python qr_video_converter.py input.txt --crf 25 # Convert with CRF value 25 (high quality)
python qr_video_converter.py input.txt -q # Convert with quick verification (5 random frames)
python qr_video_converter.py --sample # Generate sample file and convert
python qr_video_converter.py --stdin # Read text from stdin and convert to MP4
echo "Hello World" | python qr_video_converter.py --stdin # Pipe text and convert
python qr_video_converter.py input.txt --chunk-size 3000 --overlap-size 150 # Custom chunk settings
"""
)
group = parser.add_mutually_exclusive_group()
group.add_argument(
'input_file',
nargs='?',
help='Path to file to convert (any text file or .qr.mp4)'
)
group.add_argument(
'--sample',
action='store_true',
help='Generate sample text file and convert'
)
group.add_argument(
'--stdin',
action='store_true',
help='Read text from standard input and convert to MP4'
)
parser.add_argument(
'--crf',
type=int,
default=30,
choices=range(25, 35),
metavar='25-34',
help='CRF value (quality setting). Only effective for text to MP4 conversion. Lower values mean higher quality and larger file size. Recommended: 30 (default), High quality: 25, Small size: 34'
)
parser.add_argument(
'--quick-verify', '-q',
action='store_true',
help='Quick verification mode - verify only 5 random frames during text to MP4 conversion (default is all frames)'
)
parser.add_argument(
'--chunk-size',
type=int,
default=2800,
metavar='BYTES',
help='Maximum chunk size in bytes (default: 2800). Affects QR code data density and video frame count.'
)
parser.add_argument(
'--overlap-size',
type=int,
default=100,
metavar='BYTES',
help='Overlap size in bytes between chunks (default: 100). Higher values improve error recovery but increase redundancy.'
)
parser.add_argument(
'--output', '-o',
action='store_true',
help='Output to text file instead of stdout (only for .qr.mp4 input)'
)
parser.add_argument(
'--frames', '-f',
type=str,
help='Specify frame numbers to extract (e.g., "1", "1,3,5", "1-5,10"). Output format will be JSON.'
)
parser.add_argument(
'--search', '-s',
type=str,
help='Search for words in the index and output matching frames as JSON. ' +
'Use space-separated words for AND search (e.g., "python machine") or ' +
'"OR" keyword for OR search (e.g., "python OR java") (only for .qr.mp4 input)'
)
args = parser.parse_args()
# Validate chunk settings for text to MP4 conversion
if args.chunk_size < args.overlap_size * 2 + 100:
print(
f"Error: Chunk size ({args.chunk_size}) must be at least {args.overlap_size * 2 + 100} bytes (overlap_size * 2 + 100).", file=sys.stderr)
sys.exit(1)
if args.sample:
# Sample mode
output_dir = "output"
if not os.path.exists(output_dir):
os.makedirs(output_dir)
txt_path = create_sample_file(output_dir)
# Use the same base name as the generated text file
txt_basename = os.path.splitext(os.path.basename(txt_path))[0]
output_mp4 = os.path.join(output_dir, f'{txt_basename}.qr.mp4')
# .qr.mp4 files can be overwritten (removed check)
result_file = process_text_to_mp4(
txt_path, output_mp4, args.crf, args.quick_verify, args.chunk_size, args.overlap_size)
print(f"Generated video: {result_file}")
elif args.stdin:
# Stdin mode
output_dir = "output"
if not os.path.exists(output_dir):
os.makedirs(output_dir)
txt_path = create_temp_file_from_stdin(output_dir)
# Use the same base name as the generated text file
txt_basename = os.path.splitext(os.path.basename(txt_path))[0]
output_mp4 = os.path.join(output_dir, f'{txt_basename}.qr.mp4')
# .qr.mp4 files can be overwritten (removed check)
try:
result_file = process_text_to_mp4(
txt_path, output_mp4, args.crf, args.quick_verify, args.chunk_size, args.overlap_size)
print(f"Generated video: {result_file}")
finally:
# Clean up temporary file
if os.path.exists(txt_path):
os.remove(txt_path)
elif args.input_file:
input_file = args.input_file
if not os.path.exists(input_file):
print(f"Error: File '{input_file}' not found.")
sys.exit(1)
# Check if input is .qr.json and convert to corresponding .qr.mp4
if input_file.lower().endswith('.qr.json'):
# Convert .qr.json to .qr.mp4
mp4_file = input_file.replace('.qr.json', '.qr.mp4')
if not os.path.exists(mp4_file):
print(
f"Error: Corresponding MP4 file '{mp4_file}' not found for JSON index file '{input_file}'.")
sys.exit(1)
input_file = mp4_file
# Branch processing based on file extension
input_dir = os.path.dirname(input_file)
input_basename = os.path.splitext(os.path.basename(input_file))[0]
input_ext = os.path.splitext(input_file)[1].lower()
if input_ext == '.mp4':
# Check if it's a .qr.mp4 file
if not input_file.lower().endswith('.qr.mp4'):
print(
f"Error: Only .qr.mp4 files are supported for MP4 input. Got: {input_file}")
sys.exit(1)
# Remove .qr from basename for output filename
if input_basename.lower().endswith('.qr'):
base_name_for_output = input_basename[:-3] # Remove '.qr'
else:
base_name_for_output = input_basename
if args.search:
# Search mode
process_search_query(input_file, args.search, quiet=True)
elif args.output:
# MP4 to TXT file conversion
output_file = os.path.join(
input_dir, f"{base_name_for_output}.decoded.txt")
# .decoded.txt files can be overwritten (removed check)
result_file = process_mp4_to_text(input_file, output_file)
print(f"Restored text: {result_file}")
else:
# Output to stdout (default for MP4)
process_mp4_to_stdout(input_file, args.frames, quiet=True)
else:
# Any non-MP4 file treated as text file
if args.output:
print(
"Error: --output option is only available for .qr.mp4 input files.", file=sys.stderr)
sys.exit(1)
if args.search:
print(
"Error: --search option is only available for .qr.mp4 input files.", file=sys.stderr)
sys.exit(1)
# Text to MP4 conversion
output_file = os.path.join(input_dir, f"{input_basename}.qr.mp4")
# .qr.mp4 files can be overwritten (removed check)
result_file = process_text_to_mp4(
input_file, output_file, args.crf, args.quick_verify, args.chunk_size, args.overlap_size)
print(f"Generated video: {result_file}")
else:
# Show help when no arguments provided
parser.print_help()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment