Skip to content

Instantly share code, notes, and snippets.

@Luca-Pozzi
Last active September 18, 2025 10:46
Show Gist options
  • Select an option

  • Save Luca-Pozzi/70bbc276ab0c7fab5dded7213d4456fa to your computer and use it in GitHub Desktop.

Select an option

Save Luca-Pozzi/70bbc276ab0c7fab5dded7213d4456fa to your computer and use it in GitHub Desktop.
ROS node to save images from CompressedImage topic to file
#!/usr/bin/env python3
import rospy
import os
import cv2
from cv_bridge import CvBridge
from sensor_msgs.msg import Image, CompressedImage
class ImageAcquisitionNode:
def __init__(self, topic, out_dir, n_images, interval):
self.topic = topic
self.out_dir = out_dir
self.n_images = n_images
self.interval = interval
self.bridge = CvBridge()
self.image_count = 0
self.last_capture_time = rospy.Time.now()
self.image_received = False
self.current_image = None
if not os.path.exists(self.out_dir):
os.makedirs(self.out_dir)
rospy.Subscriber(self.topic, CompressedImage, self.image_callback)
rospy.loginfo(f"Subscribed to {self.topic}, saving {self.n_images} images every {self.interval}s to {self.out_dir}")
def image_callback(self, msg):
self.current_image = msg
self.image_received = True
def run(self):
rate = rospy.Rate(10)
while not rospy.is_shutdown() and self.image_count < self.n_images:
if self.image_received:
now = rospy.Time.now()
if (now - self.last_capture_time).to_sec() >= self.interval or self.image_count == 0:
try:
rospy.loginfo(f"Acquiring image {self.image_count + 1}/{self.n_images}")
cv_image = self.bridge.compressed_imgmsg_to_cv2(self.current_image, desired_encoding='bgr8')
filename = os.path.join(self.out_dir, f"image_{self.image_count+1:03d}.jpg")
cv2.imwrite(filename, cv_image)
rospy.loginfo(f"Saved {filename}")
self.image_count += 1
self.last_capture_time = now
except Exception as e:
rospy.logerr(f"Failed to save image: {e}")
self.image_received = False
rate.sleep()
rospy.loginfo("Image acquisition complete.")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="ROS Image Acquisition Node")
parser.add_argument('--topic',
type=str, default='image',
help='Camera topic')
parser.add_argument('--out_dir',
type=str, required=True,
help='Output directory')
parser.add_argument('--n_images',
type=int, default=10,
help='Number of images to acquire')
parser.add_argument('--interval',
type=float, default=2.0,
help='Seconds between images')
args, unknown = parser.parse_known_args()
rospy.init_node('img_acquisition', anonymous=True)
node = ImageAcquisitionNode(args.topic, args.out_dir, args.n_images, args.interval)
node.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment