Created
August 15, 2024 06:58
-
-
Save methylDragon/3a05aa0f2745305b60620a367c1dff94 to your computer and use it in GitHub Desktop.
Extract ROS interface package type descriptions as JSON files
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Generate type descriptions for ROS interface packages as JSON files! | |
| Usage: (Remember to source your ROS distro first!) | |
| ``` | |
| # Generate type description for a single ROS interface | |
| python3 gen.py --interface_type <pkg>/(msg|srv)/<type> | |
| # Generate type description for all ROS interfaces in a package | |
| python3 gen.py --target_package <pkg_name> | |
| ``` | |
| """ | |
| from rosidl_parser import definition | |
| from ament_index_python import get_package_share_directory | |
| from pathlib import Path | |
| import json | |
| from rosbag2_py import LocalMessageDefinitionSource | |
| import argparse | |
| import re | |
| import logging | |
| import os | |
| PACKAGE_TYPENAME_REGEX = re.compile( | |
| r"^(?P<package_name>[a-zA-Z0-9_]+)/(?P<category>msg|srv)/(?P<type_name>[a-zA-Z0-9_]+)$" | |
| ) | |
| JSON_FULL_TYPENAME_REGEX = re.compile( | |
| r"^.*/(?P<full_type_name>[a-zA-Z0-9_]+/(msg|srv)/[a-zA-Z0-9_]+)\.json$" | |
| ) | |
| def parse_args(): | |
| parser = argparse.ArgumentParser( | |
| description="Generate type description for a ROS message") | |
| group = parser.add_mutually_exclusive_group(required=True) | |
| group.add_argument( | |
| "--interface_type", | |
| type=str, | |
| help="Generate type description for a single ROS interface (e.g. geometry_msgs/msg/PoseArray)") | |
| group.add_argument( | |
| "--target_package", | |
| type=str, | |
| help="Generate type description for every ROS interface in a package (e.g. geometry_msgs)") | |
| parser.add_argument('-o', | |
| '--output-dir', | |
| type=str, | |
| default='.', | |
| help='Output directory for generated files.') | |
| parser.add_argument('-log', | |
| '--loglevel', | |
| default='info', | |
| help='Provide logging level. Example --loglevel info, default=info') | |
| return parser.parse_args() | |
| def get_json_target(interface_package, interface_category, interface_type): | |
| return ( | |
| Path(get_package_share_directory(interface_package)) | |
| / interface_category | |
| / (interface_type + ".json") | |
| ) | |
| if __name__ == "__main__": | |
| args = parse_args() | |
| logging.basicConfig(level=args.loglevel.upper()) | |
| json_targets = [] | |
| target_name = "" | |
| # Extract JSON targets so we can grab the type hashes. | |
| # | |
| # This also allows us to double check if the interface files | |
| # (e.g. .msg/.srv/.idl) exist. | |
| if args.interface_type is not None: # Single interface target. | |
| full_interface_type = args.interface_type | |
| match = PACKAGE_TYPENAME_REGEX.match(full_interface_type) | |
| if match is None: | |
| raise ValueError( | |
| "Message type must be in the format '<pkg>/(msg|srv)/<type>'") | |
| interface_package = match.group("package_name") | |
| interface_category = match.group("category") | |
| interface_type = match.group("type_name") | |
| json_targets.append(get_json_target( | |
| interface_package, interface_category, interface_type)) | |
| target_name = full_interface_type | |
| elif args.target_package is not None: # Package target. | |
| pkg_path = Path(get_package_share_directory(args.target_package)) | |
| for json_path in pkg_path.glob("**/*.json"): | |
| json_targets.append(json_path) | |
| target_name = args.target_package | |
| else: | |
| raise ValueError( | |
| "Must specify either --interface_type or --target_package") | |
| assert len(json_targets) > 0, "No valid targets found" | |
| output_dir = Path(args.output_dir) / f"{target_name}__message_definition_gen" | |
| output_dir = output_dir.absolute() | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| logging.info(f"Processing {len(json_targets)} potential targets") | |
| definition_source = LocalMessageDefinitionSource() | |
| count = 0 | |
| for json_target in json_targets: | |
| match = JSON_FULL_TYPENAME_REGEX.match(str(json_target)) | |
| if match is None: | |
| logging.warning( | |
| "Skipping invalid JSON target: " + str(json_target)) | |
| continue | |
| full_type_name = match.group('full_type_name') | |
| logging.info(f"Processing {full_type_name}...") | |
| definition = definition_source.get_full_text(full_type_name) | |
| output_json_dict = { | |
| 'encoded_message_definition': definition.encoded_message_definition, | |
| 'encoding': definition.encoding, | |
| 'topic_type': definition.topic_type, | |
| 'type_hash': "", | |
| 'ros_distro': "", | |
| } | |
| if os.getenv("ROS_DISTRO") is not None: | |
| output_json_dict['ros_distro'] = os.getenv("ROS_DISTRO") | |
| with json_target.open("r") as f: | |
| schema = json.load(f) | |
| for type_hash in schema['type_hashes']: | |
| if type_hash['type_name'] == full_type_name: | |
| output_json_dict['type_hash'] = type_hash['hash_string'] | |
| break | |
| output_path = output_dir / \ | |
| (full_type_name.replace("/", "_") + "__definition.json") | |
| with output_path.open("w") as f: | |
| json.dump(output_json_dict, f, indent=2) | |
| count += 1 | |
| logging.info(f"Done! Wrote {count} files") | |
| logging.info("Output directory: " + str(output_dir)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment