Skip to content

Instantly share code, notes, and snippets.

@clungzta
Created September 27, 2025 02:13
Show Gist options
  • Select an option

  • Save clungzta/e368b4d039e885b30e29c79b7f77cf5b to your computer and use it in GitHub Desktop.

Select an option

Save clungzta/e368b4d039e885b30e29c79b7f77cf5b to your computer and use it in GitHub Desktop.
Determine Temporal Alignment Statistics of Topics in a ROS2 Bag. How well are the topics within the bag syncronised?
import os
import json
import rosbag2_py
from collections import defaultdict
import numpy as np
from scipy.stats import skew, kurtosis
bag_path = "/test_sync/test_sync_0.db3"
out_path = os.path.splitext(bag_path)[0] + "_stats.json"
# Open the bag
reader = rosbag2_py.SequentialReader()
storage_options = rosbag2_py.StorageOptions(uri=bag_path, storage_id="sqlite3")
converter_options = rosbag2_py.ConverterOptions("", "")
reader.open(storage_options, converter_options)
# Collect timestamps per topic
topic_timestamps = defaultdict(list)
while reader.has_next():
topic, data, t = reader.read_next()
timestamp_sec = t / 1e9
topic_timestamps[topic].append(timestamp_sec)
# Function to compute descriptive statistics
def compute_stats(times):
arr = np.array(times)
dt = np.diff(arr)
stats = {
"count": len(arr),
"median": float(np.median(arr)),
"mean": float(np.mean(arr)),
"std": float(np.std(arr)),
"min": float(np.min(arr)),
"max": float(np.max(arr)),
"skew": float(skew(arr)),
"kurtosis": float(kurtosis(arr)),
}
if len(dt) > 0:
stats["dt_median"] = float(np.median(dt))
stats["dt_mean"] = float(np.mean(dt))
stats["dt_std"] = float(np.std(dt))
stats["dt_min"] = float(np.min(dt))
stats["dt_max"] = float(np.max(dt))
stats["dt_skew"] = float(skew(dt))
stats["dt_kurtosis"] = float(kurtosis(dt))
else:
stats["dt_median"] = stats["dt_mean"] = stats["dt_std"] = None
stats["dt_min"] = stats["dt_max"] = stats["dt_skew"] = stats["dt_kurtosis"] = None
return stats
# Compute per-topic stats
per_topic_stats = {}
for topic, timestamps in topic_timestamps.items():
per_topic_stats[topic] = compute_stats(timestamps)
# Compute pairwise nearest-neighbor statistics
topic_list = list(topic_timestamps.keys())
pairwise_stats = {}
for i in range(len(topic_list)):
for j in range(i + 1, len(topic_list)):
t1 = np.array(topic_timestamps[topic_list[i]])
t2 = np.array(topic_timestamps[topic_list[j]])
# For each t1, find nearest t2
nearest_diffs = []
for ts in t1:
idx = np.argmin(np.abs(t2 - ts))
nearest_diffs.append(t2[idx] - ts)
pair_key = f"{topic_list[i]} -> {topic_list[j]}"
pairwise_stats[pair_key] = compute_stats(nearest_diffs)
# Combine and save
stats = {
"per_topic": per_topic_stats,
"pairwise_nearest": pairwise_stats
}
with open(out_path, "w") as f:
json.dump(stats, f, indent=2)
print(f"Descriptive statistics saved to {out_path}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment