Skip to content

Instantly share code, notes, and snippets.

@methylDragon
Created August 15, 2024 06:58
Show Gist options
  • Select an option

  • Save methylDragon/3a05aa0f2745305b60620a367c1dff94 to your computer and use it in GitHub Desktop.

Select an option

Save methylDragon/3a05aa0f2745305b60620a367c1dff94 to your computer and use it in GitHub Desktop.
Extract ROS interface package type descriptions as JSON files
"""
Generate type descriptions for ROS interface packages as JSON files!
Usage: (Remember to source your ROS distro first!)
```
# Generate type description for a single ROS interface
python3 gen.py --interface_type <pkg>/(msg|srv)/<type>
# Generate type description for all ROS interfaces in a package
python3 gen.py --target_package <pkg_name>
```
"""
from rosidl_parser import definition
from ament_index_python import get_package_share_directory
from pathlib import Path
import json
from rosbag2_py import LocalMessageDefinitionSource
import argparse
import re
import logging
import os
PACKAGE_TYPENAME_REGEX = re.compile(
r"^(?P<package_name>[a-zA-Z0-9_]+)/(?P<category>msg|srv)/(?P<type_name>[a-zA-Z0-9_]+)$"
)
JSON_FULL_TYPENAME_REGEX = re.compile(
r"^.*/(?P<full_type_name>[a-zA-Z0-9_]+/(msg|srv)/[a-zA-Z0-9_]+)\.json$"
)
def parse_args():
parser = argparse.ArgumentParser(
description="Generate type description for a ROS message")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
"--interface_type",
type=str,
help="Generate type description for a single ROS interface (e.g. geometry_msgs/msg/PoseArray)")
group.add_argument(
"--target_package",
type=str,
help="Generate type description for every ROS interface in a package (e.g. geometry_msgs)")
parser.add_argument('-o',
'--output-dir',
type=str,
default='.',
help='Output directory for generated files.')
parser.add_argument('-log',
'--loglevel',
default='info',
help='Provide logging level. Example --loglevel info, default=info')
return parser.parse_args()
def get_json_target(interface_package, interface_category, interface_type):
return (
Path(get_package_share_directory(interface_package))
/ interface_category
/ (interface_type + ".json")
)
if __name__ == "__main__":
args = parse_args()
logging.basicConfig(level=args.loglevel.upper())
json_targets = []
target_name = ""
# Extract JSON targets so we can grab the type hashes.
#
# This also allows us to double check if the interface files
# (e.g. .msg/.srv/.idl) exist.
if args.interface_type is not None: # Single interface target.
full_interface_type = args.interface_type
match = PACKAGE_TYPENAME_REGEX.match(full_interface_type)
if match is None:
raise ValueError(
"Message type must be in the format '<pkg>/(msg|srv)/<type>'")
interface_package = match.group("package_name")
interface_category = match.group("category")
interface_type = match.group("type_name")
json_targets.append(get_json_target(
interface_package, interface_category, interface_type))
target_name = full_interface_type
elif args.target_package is not None: # Package target.
pkg_path = Path(get_package_share_directory(args.target_package))
for json_path in pkg_path.glob("**/*.json"):
json_targets.append(json_path)
target_name = args.target_package
else:
raise ValueError(
"Must specify either --interface_type or --target_package")
assert len(json_targets) > 0, "No valid targets found"
output_dir = Path(args.output_dir) / f"{target_name}__message_definition_gen"
output_dir = output_dir.absolute()
output_dir.mkdir(parents=True, exist_ok=True)
logging.info(f"Processing {len(json_targets)} potential targets")
definition_source = LocalMessageDefinitionSource()
count = 0
for json_target in json_targets:
match = JSON_FULL_TYPENAME_REGEX.match(str(json_target))
if match is None:
logging.warning(
"Skipping invalid JSON target: " + str(json_target))
continue
full_type_name = match.group('full_type_name')
logging.info(f"Processing {full_type_name}...")
definition = definition_source.get_full_text(full_type_name)
output_json_dict = {
'encoded_message_definition': definition.encoded_message_definition,
'encoding': definition.encoding,
'topic_type': definition.topic_type,
'type_hash': "",
'ros_distro': "",
}
if os.getenv("ROS_DISTRO") is not None:
output_json_dict['ros_distro'] = os.getenv("ROS_DISTRO")
with json_target.open("r") as f:
schema = json.load(f)
for type_hash in schema['type_hashes']:
if type_hash['type_name'] == full_type_name:
output_json_dict['type_hash'] = type_hash['hash_string']
break
output_path = output_dir / \
(full_type_name.replace("/", "_") + "__definition.json")
with output_path.open("w") as f:
json.dump(output_json_dict, f, indent=2)
count += 1
logging.info(f"Done! Wrote {count} files")
logging.info("Output directory: " + str(output_dir))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment