Created
August 28, 2025 15:36
-
-
Save pepijndevos/f03cc26701705806a00a9af2a5c96dbd to your computer and use it in GitHub Desktop.
process high speed footage into low speed HDR footage
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import sys | |
| import gi | |
| gi.require_version('Gst', '1.0') | |
| from gi.repository import Gst, GLib | |
| import numpy as np | |
| from collections import deque | |
| if len(sys.argv) != 3: | |
| print("Usage: python gst_hdr.py input.mp4 output.mkv") | |
| sys.exit(1) | |
| input_path = sys.argv[1] | |
| output_path = sys.argv[2] | |
| # HDR parameters | |
| window_size = 4 | |
| input_fps = 120.0 | |
| output_fps = 30.0 | |
| frame_skip = int(input_fps / output_fps) | |
| gamma = 3 | |
| print(f"HDR Config: Window={window_size}, Skip every {frame_skip} frames, Gamma={gamma}") | |
| # Initialize GStreamer | |
| Gst.init(None) | |
| class HDRProcessor: | |
| def __init__(self, input_file, output_file): | |
| self.input_file = input_file | |
| self.output_file = output_file | |
| self.frame_count = 0 | |
| self.output_count = 0 | |
| self.frames_since_output = 0 | |
| # HDR frame buffer | |
| self.frame_buffer = deque(maxlen=window_size) | |
| # Timestamp tracking | |
| self.timestamp = 0 | |
| self.duration = int(Gst.SECOND / output_fps) | |
| # Create main loop | |
| self.loop = GLib.MainLoop() | |
| # Create source pipeline (decoder -> appsink) | |
| self.src_pipeline = Gst.parse_launch(f""" | |
| filesrc location={input_file} ! | |
| decodebin ! | |
| videoconvert ! | |
| video/x-raw,format=ARGB64 ! | |
| appsink name=sink emit-signals=true sync=false | |
| """) | |
| # Create sink pipeline (appsrc -> encoder -> file) | |
| self.sink_pipeline = Gst.parse_launch(f""" | |
| appsrc name=source emit-signals=false ! | |
| video/x-raw,format=ARGB64 ! | |
| videoconvert ! | |
| x265enc ! | |
| h265parse ! | |
| matroskamux ! | |
| filesink location={output_file} | |
| """) | |
| # Get appsink and appsrc elements | |
| self.appsink = self.src_pipeline.get_by_name('sink') | |
| self.appsrc = self.sink_pipeline.get_by_name('source') | |
| # Configure appsink | |
| self.appsink.set_property('emit-signals', True) | |
| self.appsink.connect('new-sample', self.on_new_sample) | |
| # Configure appsrc | |
| self.appsrc.set_property('emit-signals', True) | |
| self.appsrc.set_property('is-live', False) | |
| self.appsrc.set_property('format', Gst.Format.TIME) | |
| self.appsrc.set_property('max-bytes', 50 * 1024 * 1024) | |
| self.appsrc.set_property('block', True) | |
| # Set up bus message handlers | |
| self.src_bus = self.src_pipeline.get_bus() | |
| self.src_bus.add_signal_watch() | |
| self.src_bus.connect('message', self.on_src_message) | |
| self.sink_bus = self.sink_pipeline.get_bus() | |
| self.sink_bus.add_signal_watch() | |
| self.sink_bus.connect('message', self.on_sink_message) | |
| # Video properties (will be set from first frame) | |
| self.width = None | |
| self.height = None | |
| self.caps_set = False | |
| def timeout_quit(self): | |
| print("Timeout reached - forcing quit") | |
| self.loop.quit() | |
| return False | |
| def buffer_to_numpy(self, buffer): | |
| """Convert GStreamer buffer to numpy array (ARGB64 format)""" | |
| # Get buffer data | |
| success, map_info = buffer.map(Gst.MapFlags.READ) | |
| if not success: | |
| return None | |
| # ARGB64 format: 16-bit per channel, 4 channels (A,R,G,B) | |
| array = np.frombuffer(map_info.data, dtype=np.uint16) | |
| array = array.reshape((self.height, self.width, 4)) | |
| # Extract RGB channels (skip alpha channel 0, take channels 1,2,3) | |
| rgb_array = array[:, :, 1:4] # Skip alpha, take R,G,B | |
| buffer.unmap(map_info) | |
| return rgb_array.copy() | |
| def numpy_to_buffer(self, array): | |
| """Convert numpy array back to GStreamer buffer (ARGB64 format)""" | |
| # Ensure array is uint16 | |
| if array.dtype != np.uint16: | |
| array = array.astype(np.uint16) | |
| # Add alpha channel back (set to max value) | |
| alpha_channel = np.full((self.height, self.width, 1), 65535, dtype=np.uint16) | |
| argb_array = np.concatenate([alpha_channel, array], axis=2) # ARGB format | |
| # Create buffer from array data - use correct method | |
| data = argb_array.tobytes() | |
| buffer = Gst.Buffer.new_wrapped(data) | |
| return buffer | |
| def process_hdr_frame(self): | |
| """Process accumulated frames with HDR tone mapping""" | |
| if len(self.frame_buffer) < window_size: | |
| return None | |
| # Sum all frames in buffer | |
| summed = np.zeros((self.height, self.width, 3), dtype=np.int64) | |
| for buf_frame in self.frame_buffer: | |
| summed += buf_frame.astype(np.int64) | |
| # Apply tone mapping | |
| tone_mapped = np.power(summed, 1.0/gamma) | |
| # Normalize and keep 16-bit precision | |
| mapped_min = np.min(tone_mapped) | |
| mapped_max = np.max(tone_mapped) | |
| if mapped_max > mapped_min: | |
| output_frame = ((tone_mapped - mapped_min) / (mapped_max - mapped_min) * 65535).astype(np.uint16) | |
| else: | |
| output_frame = np.zeros_like(tone_mapped, dtype=np.uint16) | |
| return output_frame | |
| def on_new_sample(self, appsink): | |
| # Pull sample from appsink | |
| sample = appsink.emit('pull-sample') | |
| if sample: | |
| # Get the buffer and caps | |
| buffer = sample.get_buffer() | |
| caps = sample.get_caps() | |
| # Set video properties from first frame | |
| if not self.caps_set: | |
| structure = caps.get_structure(0) | |
| self.width = structure.get_int('width')[1] | |
| self.height = structure.get_int('height')[1] | |
| # Create output caps with framerate | |
| output_caps = Gst.Caps.from_string(f"video/x-raw,format=ARGB64,width={self.width},height={self.height},framerate={int(output_fps)}/1") | |
| self.appsrc.set_property('caps', output_caps) | |
| print(f"Input caps: {caps.to_string()}") | |
| print(f"Output caps: {output_caps.to_string()}") | |
| print(f"Video size: {self.width}x{self.height}") | |
| self.sink_pipeline.set_state(Gst.State.PLAYING) | |
| self.caps_set = True | |
| # Convert buffer to numpy array | |
| frame_array = self.buffer_to_numpy(buffer) | |
| if frame_array is not None: | |
| # Add frame to HDR buffer | |
| self.frame_buffer.append(frame_array) | |
| self.frame_count += 1 | |
| self.frames_since_output += 1 | |
| if self.frame_count % 100 == 0: | |
| print(f"Processed input frame {self.frame_count}") | |
| # Generate output frame? | |
| if self.frames_since_output >= frame_skip: | |
| hdr_frame = self.process_hdr_frame() | |
| if hdr_frame is not None: | |
| # Convert back to GStreamer buffer and push | |
| output_buffer = self.numpy_to_buffer(hdr_frame) | |
| # Set timestamp and duration | |
| output_buffer.pts = self.timestamp | |
| output_buffer.duration = self.duration | |
| self.timestamp += self.duration | |
| print(f"Created buffer size: {output_buffer.get_size()}, HDR frame shape: {hdr_frame.shape}") | |
| ret = self.appsrc.emit('push-buffer', output_buffer) | |
| self.output_count += 1 | |
| self.frames_since_output = 0 | |
| print(f"Output HDR frame {self.output_count} (from input {self.frame_count}), push result: {ret}") | |
| if ret != Gst.FlowReturn.OK: | |
| print(f"Error pushing HDR buffer: {ret}") | |
| return ret | |
| else: | |
| print("HDR frame was None - not enough frames in buffer yet") | |
| return Gst.FlowReturn.OK | |
| def on_src_message(self, bus, message): | |
| if message.type == Gst.MessageType.EOS: | |
| print("Source pipeline reached EOS") | |
| # Send EOS to appsrc | |
| ret = self.appsrc.emit('end-of-stream') | |
| print(f"Sent EOS to appsrc: {ret}") | |
| # Set a timeout to quit if sink doesn't finish | |
| GLib.timeout_add_seconds(5, self.timeout_quit) | |
| elif message.type == Gst.MessageType.ERROR: | |
| err, debug = message.parse_error() | |
| print(f"Source pipeline error: {err}, {debug}") | |
| self.loop.quit() | |
| def on_sink_message(self, bus, message): | |
| if message.type == Gst.MessageType.EOS: | |
| print("Sink pipeline reached EOS") | |
| self.loop.quit() | |
| elif message.type == Gst.MessageType.ERROR: | |
| err, debug = message.parse_error() | |
| print(f"Sink pipeline error: {err}, {debug}") | |
| self.loop.quit() | |
| def run(self): | |
| print(f"Starting HDR processing: {self.input_file} -> {self.output_file}") | |
| # Start both pipelines (sink will wait for caps from first frame) | |
| self.sink_pipeline.set_state(Gst.State.READY) | |
| self.src_pipeline.set_state(Gst.State.PLAYING) | |
| # Run the main loop | |
| try: | |
| self.loop.run() | |
| except KeyboardInterrupt: | |
| print("Interrupted by user - sending EOS to finish processing...") | |
| # Send EOS to source pipeline instead of abrupt stop | |
| self.src_pipeline.send_event(Gst.Event.new_eos()) | |
| # Continue running loop to process remaining frames | |
| try: | |
| self.loop.run() | |
| except KeyboardInterrupt: | |
| print("Second interrupt - force stopping") | |
| # Clean up | |
| self.src_pipeline.set_state(Gst.State.NULL) | |
| self.sink_pipeline.set_state(Gst.State.NULL) | |
| print(f"Done: {self.frame_count} input frames, {self.output_count} HDR output frames") | |
| if __name__ == "__main__": | |
| processor = HDRProcessor(input_path, output_path) | |
| processor.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment