Created
December 26, 2024 04:20
-
-
Save alt-glitch/0a010995487c37169bf1f8a7d7116ad6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import subprocess | |
| from pathlib import Path | |
| import cv2 | |
| import numpy as np | |
| def motion_extract( | |
| video_path, | |
| output_path, | |
| frame_window=2, | |
| keep_original_color=False, | |
| invert_changes=False, | |
| ): | |
| # Check CUDA availability | |
| if cv2.cuda.getCudaEnabledDeviceCount() == 0: | |
| print("CUDA is not available. Running on CPU.") | |
| use_cuda = False | |
| else: | |
| print(f"Using CUDA device. GPU count: {cv2.cuda.getCudaEnabledDeviceCount()}") | |
| use_cuda = True | |
| cv2.cuda.setDevice(0) | |
| cap = cv2.VideoCapture(video_path) | |
| # Store recent frames in a list | |
| frames = [] | |
| for _ in range(frame_window): | |
| ret, frame = cap.read() | |
| if ret: | |
| if use_cuda: | |
| gpu_frame = cv2.cuda_GpuMat() | |
| gpu_frame.upload(frame) | |
| frames.append(gpu_frame) | |
| else: | |
| frames.append(frame) | |
| # Setup output | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| # Create temporary output file | |
| temp_output = str(Path(output_path).with_suffix('.temp.mp4')) | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(temp_output, fourcc, fps, (width, height)) | |
| if not out.isOpened(): | |
| raise ValueError("Could not initialize video writer") | |
| frame_count = 0 | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| # Create GPU resources if using CUDA | |
| if use_cuda: | |
| stream = cv2.cuda.Stream() | |
| gpu_gray = cv2.cuda_GpuMat() | |
| gpu_thresh = cv2.cuda_GpuMat() | |
| gpu_output = cv2.cuda_GpuMat() | |
| while True: | |
| ret, current_frame = cap.read() | |
| if not ret: | |
| break | |
| frame_count += 1 | |
| if frame_count % 30 == 0: | |
| progress = (frame_count / total_frames) * 100 | |
| print( | |
| f"Processing: {progress:.1f}% ({frame_count}/{total_frames} frames)", | |
| end="\r", | |
| ) | |
| if use_cuda: | |
| # Upload current frame to GPU | |
| gpu_current = cv2.cuda_GpuMat() | |
| gpu_current.upload(current_frame) | |
| # Calculate differences across frame window | |
| diffs = [] | |
| for past_frame in frames: | |
| diff = cv2.cuda.absdiff(gpu_current, past_frame, stream=stream) | |
| diffs.append(diff) | |
| # Combine all differences | |
| motion_mask = diffs[0] | |
| for diff in diffs[1:]: | |
| cv2.cuda.max(motion_mask, diff, motion_mask, stream=stream) | |
| if keep_original_color or invert_changes: | |
| # Process on GPU | |
| cv2.cuda.cvtColor(motion_mask, cv2.COLOR_BGR2GRAY, gpu_gray, stream=stream) | |
| cv2.cuda.threshold(gpu_gray, 30, 255, cv2.THRESH_BINARY, gpu_thresh, stream=stream) | |
| thresh_3ch = cv2.cuda.cvtColor(gpu_thresh, cv2.COLOR_GRAY2BGR, stream=stream) | |
| if keep_original_color: | |
| output = cv2.cuda.bitwise_and(gpu_current, thresh_3ch, stream=stream) | |
| else: # invert_changes | |
| inverted = cv2.cuda.bitwise_not(gpu_current, stream=stream) | |
| mask = thresh_3ch.download() / 255.0 | |
| output_cpu = (mask * inverted.download() + (1 - mask) * current_frame).astype(np.uint8) | |
| out.write(output_cpu) | |
| frames.pop(0) | |
| frames.append(gpu_current) | |
| continue | |
| else: | |
| output = motion_mask | |
| # Download result from GPU | |
| output_cpu = output.download() | |
| out.write(output_cpu) | |
| frames.pop(0) | |
| frames.append(gpu_current) | |
| else: | |
| # CPU fallback code | |
| diffs = [] | |
| for past_frame in frames: | |
| diff = cv2.absdiff(current_frame, past_frame) | |
| diffs.append(diff) | |
| motion_mask = np.max(diffs, axis=0) | |
| gray = cv2.cvtColor(motion_mask, cv2.COLOR_BGR2GRAY) | |
| _, thresh = cv2.threshold(gray, 30, 255, cv2.THRESH_BINARY) | |
| thresh_3ch = cv2.cvtColor(thresh, cv2.COLOR_GRAY2BGR) | |
| if keep_original_color: | |
| output = cv2.bitwise_and(current_frame, thresh_3ch) | |
| elif invert_changes: | |
| inverted = cv2.bitwise_not(current_frame) | |
| mask = thresh_3ch / 255.0 | |
| output = (mask * inverted + (1 - mask) * current_frame).astype(np.uint8) | |
| else: | |
| output = motion_mask | |
| out.write(output) | |
| frames.pop(0) | |
| frames.append(current_frame) | |
| print("\nFinished processing all frames!") | |
| cap.release() | |
| out.release() | |
| if use_cuda: | |
| stream.free() | |
| # Compress using ffmpeg | |
| print("Compressing output video...") | |
| ffmpeg_cmd = [ | |
| 'ffmpeg', '-y', | |
| '-i', temp_output, | |
| '-c:v', 'libx264', | |
| '-preset', 'medium', | |
| '-crf', '23', | |
| '-c:a', 'aac', | |
| '-b:a', '128k', | |
| output_path | |
| ] | |
| try: | |
| subprocess.run(ffmpeg_cmd, check=True, capture_output=True) | |
| Path(temp_output).unlink() # Remove temporary file | |
| print("Compression complete!") | |
| except subprocess.CalledProcessError as e: | |
| print(f"FFmpeg error: {e.stderr.decode()}") | |
| raise | |
| except Exception as e: | |
| print(f"Error during compression: {str(e)}") | |
| raise | |
| def parse_args(): | |
| parser = argparse.ArgumentParser( | |
| description="Extract motion from video file." | |
| ) | |
| parser.add_argument( | |
| "-i", | |
| "--input", | |
| type=str, | |
| required=True, | |
| help="Path to input video file", | |
| ) | |
| parser.add_argument( | |
| "-o", | |
| "--output", | |
| type=str, | |
| required=True, | |
| help="Path to output video file (must end in .mp4 or .avi)", | |
| ) | |
| parser.add_argument( | |
| "-w", | |
| "--window", | |
| type=int, | |
| default=2, | |
| help="Number of frames to compare (default: 2)", | |
| ) | |
| parser.add_argument( | |
| "--keep-color", | |
| action="store_true", | |
| help="Keep original colors in motion areas", | |
| ) | |
| parser.add_argument( | |
| "--invert", action="store_true", help="Invert colors in motion areas" | |
| ) | |
| return parser.parse_args() | |
| def validate_paths(input_path, output_path): | |
| # Check if input file exists | |
| if not Path(input_path).is_file(): | |
| raise FileNotFoundError(f"Input file not found: {input_path}") | |
| # Check output file extension | |
| if not output_path.lower().endswith((".mp4", ".avi")): | |
| raise ValueError("Output file must have .mp4 or .avi extension") | |
| # Create output directory if it doesn't exist | |
| output_dir = Path(output_path).parent | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| # Usage examples: | |
| if __name__ == "__main__": | |
| args = parse_args() | |
| try: | |
| validate_paths(args.input, args.output) | |
| print(f"Processing video: {args.input}") | |
| print(f"Output will be saved to: {args.output}") | |
| motion_extract( | |
| args.input, | |
| args.output, | |
| frame_window=args.window, | |
| keep_original_color=args.keep_color, | |
| invert_changes=args.invert, | |
| ) | |
| print("Processing complete!") | |
| except Exception as e: | |
| print(f"Error: {str(e)}") | |
| exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment