Skip to content

Instantly share code, notes, and snippets.

@alt-glitch
Created December 26, 2024 04:20
Show Gist options
  • Select an option

  • Save alt-glitch/0a010995487c37169bf1f8a7d7116ad6 to your computer and use it in GitHub Desktop.

Select an option

Save alt-glitch/0a010995487c37169bf1f8a7d7116ad6 to your computer and use it in GitHub Desktop.
import argparse
import subprocess
from pathlib import Path
import cv2
import numpy as np
def motion_extract(
video_path,
output_path,
frame_window=2,
keep_original_color=False,
invert_changes=False,
):
# Check CUDA availability
if cv2.cuda.getCudaEnabledDeviceCount() == 0:
print("CUDA is not available. Running on CPU.")
use_cuda = False
else:
print(f"Using CUDA device. GPU count: {cv2.cuda.getCudaEnabledDeviceCount()}")
use_cuda = True
cv2.cuda.setDevice(0)
cap = cv2.VideoCapture(video_path)
# Store recent frames in a list
frames = []
for _ in range(frame_window):
ret, frame = cap.read()
if ret:
if use_cuda:
gpu_frame = cv2.cuda_GpuMat()
gpu_frame.upload(frame)
frames.append(gpu_frame)
else:
frames.append(frame)
# Setup output
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Create temporary output file
temp_output = str(Path(output_path).with_suffix('.temp.mp4'))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(temp_output, fourcc, fps, (width, height))
if not out.isOpened():
raise ValueError("Could not initialize video writer")
frame_count = 0
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Create GPU resources if using CUDA
if use_cuda:
stream = cv2.cuda.Stream()
gpu_gray = cv2.cuda_GpuMat()
gpu_thresh = cv2.cuda_GpuMat()
gpu_output = cv2.cuda_GpuMat()
while True:
ret, current_frame = cap.read()
if not ret:
break
frame_count += 1
if frame_count % 30 == 0:
progress = (frame_count / total_frames) * 100
print(
f"Processing: {progress:.1f}% ({frame_count}/{total_frames} frames)",
end="\r",
)
if use_cuda:
# Upload current frame to GPU
gpu_current = cv2.cuda_GpuMat()
gpu_current.upload(current_frame)
# Calculate differences across frame window
diffs = []
for past_frame in frames:
diff = cv2.cuda.absdiff(gpu_current, past_frame, stream=stream)
diffs.append(diff)
# Combine all differences
motion_mask = diffs[0]
for diff in diffs[1:]:
cv2.cuda.max(motion_mask, diff, motion_mask, stream=stream)
if keep_original_color or invert_changes:
# Process on GPU
cv2.cuda.cvtColor(motion_mask, cv2.COLOR_BGR2GRAY, gpu_gray, stream=stream)
cv2.cuda.threshold(gpu_gray, 30, 255, cv2.THRESH_BINARY, gpu_thresh, stream=stream)
thresh_3ch = cv2.cuda.cvtColor(gpu_thresh, cv2.COLOR_GRAY2BGR, stream=stream)
if keep_original_color:
output = cv2.cuda.bitwise_and(gpu_current, thresh_3ch, stream=stream)
else: # invert_changes
inverted = cv2.cuda.bitwise_not(gpu_current, stream=stream)
mask = thresh_3ch.download() / 255.0
output_cpu = (mask * inverted.download() + (1 - mask) * current_frame).astype(np.uint8)
out.write(output_cpu)
frames.pop(0)
frames.append(gpu_current)
continue
else:
output = motion_mask
# Download result from GPU
output_cpu = output.download()
out.write(output_cpu)
frames.pop(0)
frames.append(gpu_current)
else:
# CPU fallback code
diffs = []
for past_frame in frames:
diff = cv2.absdiff(current_frame, past_frame)
diffs.append(diff)
motion_mask = np.max(diffs, axis=0)
gray = cv2.cvtColor(motion_mask, cv2.COLOR_BGR2GRAY)
_, thresh = cv2.threshold(gray, 30, 255, cv2.THRESH_BINARY)
thresh_3ch = cv2.cvtColor(thresh, cv2.COLOR_GRAY2BGR)
if keep_original_color:
output = cv2.bitwise_and(current_frame, thresh_3ch)
elif invert_changes:
inverted = cv2.bitwise_not(current_frame)
mask = thresh_3ch / 255.0
output = (mask * inverted + (1 - mask) * current_frame).astype(np.uint8)
else:
output = motion_mask
out.write(output)
frames.pop(0)
frames.append(current_frame)
print("\nFinished processing all frames!")
cap.release()
out.release()
if use_cuda:
stream.free()
# Compress using ffmpeg
print("Compressing output video...")
ffmpeg_cmd = [
'ffmpeg', '-y',
'-i', temp_output,
'-c:v', 'libx264',
'-preset', 'medium',
'-crf', '23',
'-c:a', 'aac',
'-b:a', '128k',
output_path
]
try:
subprocess.run(ffmpeg_cmd, check=True, capture_output=True)
Path(temp_output).unlink() # Remove temporary file
print("Compression complete!")
except subprocess.CalledProcessError as e:
print(f"FFmpeg error: {e.stderr.decode()}")
raise
except Exception as e:
print(f"Error during compression: {str(e)}")
raise
def parse_args():
parser = argparse.ArgumentParser(
description="Extract motion from video file."
)
parser.add_argument(
"-i",
"--input",
type=str,
required=True,
help="Path to input video file",
)
parser.add_argument(
"-o",
"--output",
type=str,
required=True,
help="Path to output video file (must end in .mp4 or .avi)",
)
parser.add_argument(
"-w",
"--window",
type=int,
default=2,
help="Number of frames to compare (default: 2)",
)
parser.add_argument(
"--keep-color",
action="store_true",
help="Keep original colors in motion areas",
)
parser.add_argument(
"--invert", action="store_true", help="Invert colors in motion areas"
)
return parser.parse_args()
def validate_paths(input_path, output_path):
# Check if input file exists
if not Path(input_path).is_file():
raise FileNotFoundError(f"Input file not found: {input_path}")
# Check output file extension
if not output_path.lower().endswith((".mp4", ".avi")):
raise ValueError("Output file must have .mp4 or .avi extension")
# Create output directory if it doesn't exist
output_dir = Path(output_path).parent
output_dir.mkdir(parents=True, exist_ok=True)
# Usage examples:
if __name__ == "__main__":
args = parse_args()
try:
validate_paths(args.input, args.output)
print(f"Processing video: {args.input}")
print(f"Output will be saved to: {args.output}")
motion_extract(
args.input,
args.output,
frame_window=args.window,
keep_original_color=args.keep_color,
invert_changes=args.invert,
)
print("Processing complete!")
except Exception as e:
print(f"Error: {str(e)}")
exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment