Created
September 27, 2025 02:13
-
-
Save clungzta/e368b4d039e885b30e29c79b7f77cf5b to your computer and use it in GitHub Desktop.
Determine Temporal Alignment Statistics of Topics in a ROS2 Bag. How well are the topics within the bag syncronised?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import json | |
| import rosbag2_py | |
| from collections import defaultdict | |
| import numpy as np | |
| from scipy.stats import skew, kurtosis | |
| bag_path = "/test_sync/test_sync_0.db3" | |
| out_path = os.path.splitext(bag_path)[0] + "_stats.json" | |
| # Open the bag | |
| reader = rosbag2_py.SequentialReader() | |
| storage_options = rosbag2_py.StorageOptions(uri=bag_path, storage_id="sqlite3") | |
| converter_options = rosbag2_py.ConverterOptions("", "") | |
| reader.open(storage_options, converter_options) | |
| # Collect timestamps per topic | |
| topic_timestamps = defaultdict(list) | |
| while reader.has_next(): | |
| topic, data, t = reader.read_next() | |
| timestamp_sec = t / 1e9 | |
| topic_timestamps[topic].append(timestamp_sec) | |
| # Function to compute descriptive statistics | |
| def compute_stats(times): | |
| arr = np.array(times) | |
| dt = np.diff(arr) | |
| stats = { | |
| "count": len(arr), | |
| "median": float(np.median(arr)), | |
| "mean": float(np.mean(arr)), | |
| "std": float(np.std(arr)), | |
| "min": float(np.min(arr)), | |
| "max": float(np.max(arr)), | |
| "skew": float(skew(arr)), | |
| "kurtosis": float(kurtosis(arr)), | |
| } | |
| if len(dt) > 0: | |
| stats["dt_median"] = float(np.median(dt)) | |
| stats["dt_mean"] = float(np.mean(dt)) | |
| stats["dt_std"] = float(np.std(dt)) | |
| stats["dt_min"] = float(np.min(dt)) | |
| stats["dt_max"] = float(np.max(dt)) | |
| stats["dt_skew"] = float(skew(dt)) | |
| stats["dt_kurtosis"] = float(kurtosis(dt)) | |
| else: | |
| stats["dt_median"] = stats["dt_mean"] = stats["dt_std"] = None | |
| stats["dt_min"] = stats["dt_max"] = stats["dt_skew"] = stats["dt_kurtosis"] = None | |
| return stats | |
| # Compute per-topic stats | |
| per_topic_stats = {} | |
| for topic, timestamps in topic_timestamps.items(): | |
| per_topic_stats[topic] = compute_stats(timestamps) | |
| # Compute pairwise nearest-neighbor statistics | |
| topic_list = list(topic_timestamps.keys()) | |
| pairwise_stats = {} | |
| for i in range(len(topic_list)): | |
| for j in range(i + 1, len(topic_list)): | |
| t1 = np.array(topic_timestamps[topic_list[i]]) | |
| t2 = np.array(topic_timestamps[topic_list[j]]) | |
| # For each t1, find nearest t2 | |
| nearest_diffs = [] | |
| for ts in t1: | |
| idx = np.argmin(np.abs(t2 - ts)) | |
| nearest_diffs.append(t2[idx] - ts) | |
| pair_key = f"{topic_list[i]} -> {topic_list[j]}" | |
| pairwise_stats[pair_key] = compute_stats(nearest_diffs) | |
| # Combine and save | |
| stats = { | |
| "per_topic": per_topic_stats, | |
| "pairwise_nearest": pairwise_stats | |
| } | |
| with open(out_path, "w") as f: | |
| json.dump(stats, f, indent=2) | |
| print(f"Descriptive statistics saved to {out_path}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment