Last active
April 29, 2025 07:39
-
-
Save Luca-Pozzi/16ef8d951a8fef9ae26f59c82a21e240 to your computer and use it in GitHub Desktop.
ROS .bag with (Compressed)Image and Audio(Stamped) messages to .mp4 conversion
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import argparse | |
| import glob | |
| import subprocess | |
| import wave | |
| import cv2 | |
| import rosbag | |
| from cv_bridge import CvBridge | |
| from audio_common_msgs.msg import AudioData, AudioDataStamped | |
| from sensor_msgs.msg import Image, CompressedImage | |
| def parse_audio_format(format_str): | |
| """ | |
| Parse an audio format string (e.g., S16LE) and return: | |
| - sample width (in bytes) | |
| - byteorder ('little' or 'big') | |
| - range of values (min, max) | |
| Args: | |
| format_str (str): Audio format string. Must start with 'S' or 'U' to indicate signed or unsigned format, respectively. The format string should also include the sample width in bits (e.g., 'S16LE' for signed 16-bit little-endian). | |
| Return: | |
| dict: A dictionary containing the sample width, byteorder, and range of values (min, max). | |
| """ | |
| # Determine if the format is signed or unsigned | |
| if format_str.startswith('S'): | |
| is_signed = True | |
| elif format_str.startswith('U'): | |
| is_signed = False | |
| else: | |
| raise ValueError(f"Invalid format string: {format_str}. Must start with 'S' or 'U'.") | |
| # Extract the sample width (in bits) | |
| sample_width_bits = int(''.join(filter(str.isdigit, format_str))) | |
| sample_width_bytes = sample_width_bits // 8 # Convert bits to bytes | |
| # Determine byteorder | |
| if 'LE' in format_str: | |
| byteorder = 'little' | |
| elif 'BE' in format_str: | |
| byteorder = 'big' | |
| else: | |
| byteorder = 'little' # Default to little endian if not specified | |
| # Calculate the range of values | |
| if is_signed: | |
| min_value = -(2 ** (sample_width_bits - 1)) | |
| max_value = (2 ** (sample_width_bits - 1)) - 1 | |
| else: | |
| min_value = 0 | |
| max_value = (2 ** sample_width_bits) - 1 | |
| return { | |
| 'sample_width': sample_width_bytes, | |
| 'byteorder': byteorder, | |
| 'range': (min_value, max_value) | |
| } | |
| def bag2video(bag_file, image_topic, output_dir=None): | |
| """Reads a ROS1 bag file and extracts the images from a given image topic. | |
| Adapted from [munzz11 on GitHub Gist](https://gist.github.com/munzz11/1131f18b4134094a70db4e451040e08f). | |
| Args: | |
| bag_file (str): Path to the bag file. | |
| image_topic (str): Name of the image topic to extract from the bag. The messages published on this topic must be of type `sensor_msgs/Image` or `sensor_msgs/CompressedImage`. | |
| output_dir (str, optional): Destination directory for output files. If set to `None`, the output file will be saved in the same directory of the input bag. Defaults to `None`. | |
| """ | |
| # Define the output filepaths | |
| if output_dir is None: | |
| output_dir = os.path.dirname(bag_file) | |
| output_video = os.path.join(output_dir, | |
| os.path.basename(bag_file).replace('.bag', | |
| '.mp4')) | |
| # Open and inspect the bag file | |
| bag = rosbag.Bag(bag_file, 'r') | |
| topics_dict = bag.get_type_and_topic_info().topics | |
| # Create the CV bridge to convert ROS messages to OpenCV images | |
| bridge = CvBridge() | |
| # Define the codec and video writer | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| video_frame_rate = topics_dict[image_topic].frequency | |
| video_frame_size = None # Will be determined from the first frame | |
| video_writer = None | |
| for topic, msg, t in bag.read_messages(topics=[image_topic]): | |
| # Convert the ROS image message to OpenCV image | |
| if msg._type == Image._type: | |
| frame = bridge.imgmsg_to_cv2(msg, "bgr8") | |
| elif msg._type == CompressedImage._type: | |
| frame = bridge.compressed_imgmsg_to_cv2(msg, "bgr8") | |
| else: | |
| raise ValueError("Invalid image message type. Supported types are `Image` and `CompressedImage`.") | |
| # Initialize the video writer on first non-empty frame | |
| if video_frame_size is None: | |
| video_frame_size = (frame.shape[1], frame.shape[0]) | |
| video_writer = cv2.VideoWriter(output_video, | |
| fourcc, | |
| video_frame_rate, video_frame_size | |
| ) | |
| video_writer.write(frame) | |
| bag.close() | |
| if video_writer is not None: | |
| video_writer.release() | |
| print(f"Video saved as {output_video}") | |
| def bag2audio(bag_file, audio_topic, audio_info_topic=None, output_dir=None): | |
| """Reads a ROS1 bag file and extracts the audio from a given audio topic. | |
| Args: | |
| bag_file (str): Path to the bag file. | |
| audio_topic (str): Name of the audio topic to extract from the bag. The messages published on this topic must be of type `audio_common_msgs/AudioData` or `audio_common_msgs/AudioDataStamped`. | |
| audio_info_topic (str, optional): Name of the topic to extract info on audio stream. The messages published on this topic must be of type `audio_common_msgs/AudioInfo`. If not specified, default values in `audio_common/audio_capture/launch/capture_wave.launch` are used. Defaults to None. | |
| output_dir (str, optional): Destination directory for output files. If set to `None`, the output file will be saved in the same directory of the input bag. Defaults to `None`. | |
| """ | |
| # Open and inspect the bag file | |
| bag = rosbag.Bag(bag_file, 'r') | |
| # Get info on the audio stream | |
| is_audio_info_msg_received = False | |
| if audio_info_topic: | |
| for topic, msg, t in bag.read_messages(topics=[audio_info_topic]): | |
| channels = msg.channels | |
| sample_rate = msg.sample_rate | |
| sample_format = msg.sample_format | |
| coding_format = msg.coding_format | |
| is_audio_info_msg_received = True | |
| break # only read the first message | |
| if not is_audio_info_msg_received: | |
| # If the bag does not provide AudioInfo messages, revert to default values of `audio_common/audio_capture/launch/capture_wave.launch`. | |
| # See: https://github.com/ros-drivers/audio_common | |
| channels=1 | |
| sample_rate=16000 | |
| sample_format="S16LE" | |
| coding_format="wav" | |
| sample_format_dict = parse_audio_format(sample_format) | |
| #byteorder = sample_format_dict['byteorder'] | |
| sample_width = sample_format_dict['sample_width'] | |
| # Check if the requested coding format is supported | |
| ALLOWED_CODING_FMT = ["wav", "mp3"] | |
| if coding_format in ALLOWED_CODING_FMT: | |
| fext = '.' + coding_format | |
| else: | |
| raise ValueError("Invalid coding format. Supported formats are {}".format(ALLOWED_CODING_FMT)) | |
| # Define the output filepaths | |
| if output_dir is None: | |
| output_dir = os.path.dirname(bag_file) | |
| output_audio = os.path.join(output_dir, | |
| os.path.basename(bag_file).replace('.bag', | |
| fext)) | |
| # Initialize audio data buffer | |
| audio_data = [] | |
| for topic, msg, t in bag.read_messages(topics=[audio_topic]): | |
| if msg._type == AudioData._type: | |
| audio_msg_data = msg.data | |
| elif msg._type == AudioDataStamped._type: | |
| audio_msg_data = msg.audio.data | |
| else: | |
| raise ValueError("Invalid audio message type. Supported types are `AudioData` and `AudioDataStamped`.") | |
| audio_data.extend(audio_msg_data) | |
| if audio_data: # if valid audio data is received | |
| with wave.open(output_audio, "w") as f: | |
| f.setnchannels(channels) | |
| f.setframerate(sample_rate) | |
| f.setsampwidth(sample_width) | |
| f.writeframes(bytes(audio_data)) | |
| print(f"Audio saved as {output_audio}") | |
| def combine_video_audio(video_file, audio_file, output_file=None): | |
| """Combine video and audio files into a single MP4 file. | |
| Args: | |
| video_file (str): Path to the video file. | |
| audio_file (str): Path to the audio file. | |
| output_file (str, optional): Output video file. If set to `None`, the output file will be saved in the same directory of the input video file. Defaults to None. | |
| """ | |
| if output_file: | |
| output_dir = os.path.dirname(output_file) | |
| output_filename = os.path.basename(output_file) | |
| else: | |
| output_dir = os.path.dirname(video_file) | |
| output_filename = os.path.basename(video_file).replace('.mp4', | |
| '_with_audio.mp4') | |
| output_file = os.path.join(output_dir, output_filename) | |
| command = [ | |
| 'ffmpeg', '-i', video_file, '-i', audio_file, | |
| '-c:v', 'copy', '-c:a', 'aac', '-strict', 'experimental', output_file | |
| ] | |
| subprocess.run(command, | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.STDOUT | |
| ) | |
| print(f"Video merged with audio at {output_file}") | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="Convert ROS1 bag image topic to MP4 video") | |
| parser.add_argument("bag_files", | |
| nargs="+", # multiple arguments | |
| help="Paths to the ROS1 bag files (supports wildcards like *.bag)") | |
| parser.add_argument("-i", "--image_topic", | |
| nargs="?", # 0 or 1 arguments | |
| const=None, | |
| help="Image or CompressedImage topic to extract from the bag file") | |
| parser.add_argument("-a", "--audio_topic", | |
| nargs="?", # 0 or 1 arguments | |
| const=None, | |
| help="AudioData or AudioDataStamped topic to extract from the bag file") | |
| parser.add_argument("--audio_info_topic", | |
| nargs="?", # 0 or 1 arguments | |
| const=None, | |
| help="AudioInfo topic to get the audio info from the bag file") | |
| parser.add_argument("-o", "--output_dir", | |
| nargs="?", # 0 or 1 arguments | |
| const=None, | |
| help="Directory to save the output MP4 videos (and MP3 audio, if audio is processed)") | |
| parser.add_argument("-m", "--merge", | |
| action="store_true", | |
| help="Merge the audio and video files into a single MP4 file") | |
| parser.add_argument("-c", "--clean", | |
| action="store_true", | |
| help="Delete the intermediate video and audio files after merging") | |
| args = parser.parse_args() | |
| for i, bag_file in enumerate(args.bag_files): | |
| print("Processing file {}/{}".format(i+1, len(args.bag_files))) | |
| bag_paths = glob.glob(bag_file) | |
| for path in bag_paths: | |
| bag_filename = os.path.basename(path) | |
| bag_dir = os.path.dirname(path) | |
| output_dir = args.output_dir if args.output_dir else bag_dir | |
| if args.image_topic: | |
| # Process the bag file to create a video from images | |
| bag2video(path, | |
| image_topic=args.image_topic, | |
| output_dir=output_dir) | |
| else: | |
| args.merge = False # if no video topic is provided, | |
| # there is nothing to merge | |
| if args.audio_topic: | |
| # Process the bag file to create audio from audio chuncks | |
| bag2audio(path, | |
| audio_topic=args.audio_topic, | |
| audio_info_topic=args.audio_info_topic, | |
| output_dir=output_dir) | |
| else: | |
| args.merge = False # if no audio topic is provided, | |
| # there is nothing to merge | |
| if args.merge: | |
| # Merge the video and audio files | |
| combine_video_audio(os.path.join(output_dir, | |
| bag_filename.replace('.bag', | |
| '.mp4')), | |
| os.path.join(output_dir, | |
| bag_filename.replace('.bag', | |
| '.wav')), | |
| ) | |
| else: | |
| args.clean = False # if no merge is done, | |
| # there is audio and/or video are | |
| # the final output of the script | |
| if args.clean: | |
| # Remove intermediate video and audio files | |
| try: | |
| os.remove(os.path.join(output_dir, | |
| bag_filename.replace('.bag', | |
| '.mp4'))) | |
| except: | |
| pass | |
| try: | |
| os.remove(os.path.join(output_dir, | |
| bag_filename.replace('.bag', | |
| '.wav'))) | |
| except: | |
| pass |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
bag2video.pyUtility script to convert ROS bags with images and audio data into video and audio files.
This script processes ROS bag files to extract images and audio, convert them into video and audio files, and optionally merge them into a single multimedia file. It is designed to work with ROS bag files containing image and audio topics.
Adapted from munzz11 on GitHub Gist.
Usage
Command-Line Arguments
The script accepts the following command-line arguments:
bag_files-i,--image_topicsensor_msgs/Imageorsensor_msgs/CompressedImage.-a,--audio_topicaudio_common_msgs/AudioDataoraudio_common_msgs/AudioDataStamped.--audio_info_topicaudio_common_msgs/AudioInfo.If no
audio_info_topicis passed together withaudio_topic, the script falls back to default values ofaudio_common/audio_capture/launch/capture_wave.launch.-o,--output_dir-m,--merge-c,--cleanExamples