Last active
July 23, 2025 00:29
-
-
Save yoi-hibino/83a14a9628b6fddd6f2ab62421f00a4d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # spray_path_planner.py | |
| import rclpy | |
| from rclpy.node import Node | |
| import pyrealsense2 as rs | |
| import open3d as o3d | |
| import numpy as np | |
| import tf_transformations | |
| from moveit_commander import RobotCommander, MoveGroupCommander | |
| from geometry_msgs.msg import PoseStamped | |
| from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint | |
| from visualization_msgs.msg import Marker, MarkerArray | |
| from std_msgs.msg import Header, ColorRGBA | |
| from sensor_msgs.msg import PointCloud2 | |
| import sensor_msgs_py.point_cloud2 as pc2 | |
| class SprayPathPlanner(Node): | |
| def __init__(self): | |
| super().__init__('spray_path_planner') | |
| self.robot = RobotCommander() | |
| self.group = MoveGroupCommander("arm") | |
| self.traj_pub = self.create_publisher(JointTrajectory, '/joint_trajectory', 10) | |
| self.spray_distance = 0.1 # 法線方向に10cm離してスプレー | |
| self.grid_spacing = 0.03 # 3cm間隔で網塗り | |
| self.grid_size = 0.3 # 全体30cm角グリッド | |
| self.ik_cache = {} | |
| # MarkerをRVizに追加 | |
| self.marker_pub = self.create_publisher(MarkerArray, '/spray_path_marker', 10) | |
| self.pcd_pub = self.create_publisher(PointCloud2, '/spray_reach_cloud', 10) | |
| def capture_point_cloud(self): | |
| self.get_logger().info("🔍 Capturing point cloud from RealSense...") | |
| pipeline = rs.pipeline() | |
| config = rs.config() | |
| config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30) | |
| config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30) | |
| pipeline.start(config) | |
| for _ in range(10): | |
| frames = pipeline.wait_for_frames() | |
| depth = np.asanyarray(frames.get_depth_frame().get_data()) | |
| color = np.asanyarray(frames.get_color_frame().get_data()) | |
| intr = frames.get_profile().as_video_stream_profile().get_intrinsics() | |
| pinhole = o3d.camera.PinholeCameraIntrinsic( | |
| intr.width, intr.height, intr.fx, intr.fy, intr.ppx, intr.ppy | |
| ) | |
| rgbd = o3d.geometry.RGBDImage.create_from_color_and_depth( | |
| o3d.geometry.Image(color), o3d.geometry.Image(depth), | |
| depth_scale=1000.0, convert_rgb_to_intensity=False | |
| ) | |
| pcd = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd, pinhole) | |
| pcd.transform([[1, 0, 0, 0], | |
| [0, -1, 0, 0], | |
| [0, 0, -1, 0], | |
| [0, 0, 0, 1]]) | |
| pipeline.stop() | |
| return pcd | |
| def extract_spray_path(self, pcd): | |
| self.get_logger().info("📐 Estimating normals and generating spray path...") | |
| pcd.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.02, max_nn=30)) | |
| pcd.normalize_normals() | |
| # 格子状にサンプリング | |
| min_bound = pcd.get_min_bound() | |
| max_bound = pcd.get_max_bound() | |
| grid_x = np.arange(min_bound[0], max_bound[0], self.grid_spacing) | |
| grid_y = np.arange(min_bound[1], max_bound[1], self.grid_spacing) | |
| spray_positions, normals = [], [] | |
| pcd_tree = o3d.geometry.KDTreeFlann(pcd) | |
| for x in grid_x: | |
| for y in grid_y: | |
| query = np.array([x, y, (min_bound[2] + max_bound[2]) / 2]) | |
| [_, idx, _] = pcd_tree.search_knn_vector_3d(query, 1) | |
| nearest = np.asarray(pcd.points)[idx[0]] | |
| normal = np.asarray(pcd.normals)[idx[0]] | |
| spray_pos = nearest + normal * self.spray_distance | |
| spray_positions.append(spray_pos) | |
| normals.append(normal) | |
| return spray_positions, normals | |
| def pose_from_normal(self, pos, normal): | |
| z = normal / np.linalg.norm(normal) | |
| x = np.array([1, 0, 0]) | |
| if abs(np.dot(x, z)) > 0.95: | |
| x = np.array([0, 1, 0]) | |
| y = np.cross(z, x) | |
| x = np.cross(y, z) | |
| rot = np.vstack([x, y, z]).T | |
| quat = tf_transformations.quaternion_from_matrix( | |
| np.vstack([np.hstack([rot, [[0],[0],[0]]]), [0,0,0,1]]) | |
| ) | |
| pose = PoseStamped() | |
| pose.header.frame_id = "base_link" | |
| pose.pose.position.x, pose.pose.position.y, pose.pose.position.z = pos | |
| pose.pose.orientation.x, pose.pose.orientation.y, pose.pose.orientation.z, pose.pose.orientation.w = quat | |
| return pose | |
| def filter_by_ik(self, spray_positions, normals): | |
| self.get_logger().info("🔄 Caching + Orientation search with IK filtering...") | |
| valid_pairs = [] | |
| unreachable = [] | |
| self.ik_cache = getattr(self, "ik_cache", {}) | |
| for pos, normal in zip(spray_positions, normals): | |
| if not self.is_reachable_fast(pos): | |
| unreachable.append((None, None, pos, normal, False)) | |
| continue | |
| # 🔍 キャッシュの検索キー(位置だけに基づく) | |
| cache_key_prefix = ( | |
| round(pos[0], 3), | |
| round(pos[1], 3), | |
| round(pos[2], 3), | |
| ) | |
| # 💾 すでに成功したorientationがあれば再利用 | |
| cached_result = next(( | |
| (pose, joints) | |
| for key, (pose, joints) in self.ik_cache.items() | |
| if key[:3] == cache_key_prefix and joints is not None | |
| ), None) | |
| if cached_result is not None: | |
| pose, joints = cached_result | |
| valid_pairs.append((pose, joints, pos, normal, True)) | |
| continue | |
| # それ以外はorientationスキャンを試す | |
| pose, joints = self.find_best_orientation(pos, normal, n_steps=12) | |
| # 成功 or 失敗のいずれもキャッシュに登録 | |
| key = cache_key_prefix + (round(pose.pose.orientation.w, 2),) if pose else cache_key_prefix + (None,) | |
| self.ik_cache[key] = (pose, joints) | |
| if joints is not None: | |
| valid_pairs.append((pose, joints, pos, normal, True)) | |
| else: | |
| unreachable.append((None, None, pos, normal, False)) | |
| self.get_logger().info(f"✅ Reachable: {len(valid_pairs)}, ❌ Unreachable: {len(unreachable)}") | |
| return valid_pairs + unreachable | |
| # filter_by_ikの代替軽量関数 | |
| def filter_fast_reachability(self, spray_positions, normals): | |
| flags = [] | |
| for p in spray_positions: | |
| flags.append(self.is_reachable_fast(p)) | |
| return flags # → ヒートマップやMarker用に使える | |
| def find_best_orientation(self, position, normal, n_steps=12): | |
| z = normal / np.linalg.norm(normal) | |
| x_base = np.array([1, 0, 0]) | |
| if abs(np.dot(x_base, z)) > 0.95: | |
| x_base = np.array([0, 1, 0]) | |
| y = np.cross(z, x_base) | |
| x = np.cross(y, z) | |
| base_rot = np.vstack([x, y, z]).T | |
| for i in range(n_steps): | |
| theta = 2 * np.pi * i / n_steps | |
| rot_z = tf_transformations.rotation_matrix(theta, (0, 0, 1))[:3, :3] | |
| rot_total = base_rot @ rot_z | |
| quat = tf_transformations.quaternion_from_matrix(np.vstack([ | |
| np.hstack([rot_total, [[0],[0],[0]]]), | |
| [0,0,0,1] | |
| ])) | |
| pose = PoseStamped() | |
| pose.header.frame_id = "base_link" | |
| pose.pose.position.x, pose.pose.position.y, pose.pose.position.z = position | |
| pose.pose.orientation.x, pose.pose.orientation.y, pose.pose.orientation.z, pose.pose.orientation.w = quat | |
| # キャッシュ付きIK | |
| key = ( | |
| round(pose.pose.position.x, 3), | |
| round(pose.pose.position.y, 3), | |
| round(pose.pose.position.z, 3), | |
| round(quat[3], 2) | |
| ) | |
| if key in self.ik_cache: | |
| joints = self.ik_cache[key] | |
| else: | |
| self.group.set_pose_target(pose) | |
| plan = self.group.plan() | |
| joints = None | |
| if plan and len(plan.joint_trajectory.points) > 0: | |
| joints = plan.joint_trajectory.points[-1].positions | |
| self.ik_cache[key] = joints | |
| if joints is not None: | |
| return pose, joints # 🎯 IK成功! | |
| return None, None # ❌ 全スキャン失敗 | |
| def is_reachable_fast(self, point, arm_reach_radius=0.4, z_bounds=(0.05, 0.35)): | |
| # 単純な球状リーチモデル(改良可) | |
| r = np.linalg.norm(point[:2]) | |
| z = point[2] | |
| return (r <= arm_reach_radius) and (z_bounds[0] <= z <= z_bounds[1]) | |
| def cached_ik(self, pose: PoseStamped): | |
| key = ( | |
| round(pose.pose.position.x, 3), | |
| round(pose.pose.position.y, 3), | |
| round(pose.pose.position.z, 3), | |
| round(pose.pose.orientation.w, 2), | |
| ) | |
| if key in self.ik_cache: | |
| return self.ik_cache[key] | |
| self.group.set_pose_target(pose) | |
| plan = self.group.plan() | |
| if plan and len(plan.joint_trajectory.points) > 0: | |
| result = plan.joint_trajectory.points[-1].positions | |
| self.ik_cache[key] = result | |
| return result | |
| else: | |
| self.ik_cache[key] = None | |
| return None | |
| def create_trajectory(self, pose_joint_pairs, speed=0.05): | |
| traj = JointTrajectory() | |
| traj.joint_names = self.group.get_active_joints() | |
| tfs = 0.0 | |
| prev = None | |
| for _, joints in pose_joint_pairs: | |
| pt = JointTrajectoryPoint() | |
| pt.positions = joints | |
| if prev is not None: | |
| dist = np.linalg.norm(np.array(joints) - np.array(prev)) | |
| tfs += dist / speed | |
| pt.time_from_start.sec = int(tfs) | |
| pt.time_from_start.nanosec = int((tfs % 1) * 1e9) | |
| traj.points.append(pt) | |
| prev = joints | |
| return traj | |
| def publish(self, trajectory): | |
| self.traj_pub.publish(trajectory) | |
| self.get_logger().info(f"🚀 Trajectory published with {len(trajectory.points)} points") | |
| def publish_colored_pointcloud(self, pcd: o3d.geometry.PointCloud, reachable_mask: list[bool]): | |
| colors = [] | |
| for reachable in reachable_mask: | |
| if reachable: | |
| colors.append([0.0, 1.0, 0.0]) # Green | |
| else: | |
| colors.append([1.0, 0.0, 0.0]) # Red | |
| pcd.colors = o3d.utility.Vector3dVector(colors) | |
| # ROS PointCloud2 変換 | |
| header = Header() | |
| header.frame_id = "base_link" | |
| points = np.asarray(pcd.points) | |
| cloud_data = [(p[0], p[1], p[2], int(c[0]*255)<<16 | int(c[1]*255)<<8 | int(c[2]*255)) for p, c in zip(points, colors)] | |
| msg = pc2.create_cloud(header, fields=[ | |
| pc2.PointField('x', 0, pc2.PointField.FLOAT32, 1), | |
| pc2.PointField('y', 4, pc2.PointField.FLOAT32, 1), | |
| pc2.PointField('z', 8, pc2.PointField.FLOAT32, 1), | |
| pc2.PointField('rgb', 12, pc2.PointField.UINT32, 1) | |
| ], points=cloud_data) | |
| self.pcd_pub.publish(msg) | |
| def create_marker_array(self, spray_positions, normals): | |
| ma = MarkerArray() | |
| for i, (p, n) in enumerate(zip(spray_positions, normals)): | |
| m = Marker() | |
| m.header = Header(frame_id="base_link") | |
| m.ns = "spray_points" | |
| m.id = i | |
| m.type = Marker.ARROW | |
| m.action = Marker.ADD | |
| m.pose.position.x = p[0] | |
| m.pose.position.y = p[1] | |
| m.pose.position.z = p[2] | |
| # 法線ベクトル → 向きに変換 | |
| q = self.normal_to_quat(n) | |
| m.pose.orientation.x = q[0] | |
| m.pose.orientation.y = q[1] | |
| m.pose.orientation.z = q[2] | |
| m.pose.orientation.w = q[3] | |
| m.scale.x = 0.03 # shaft length | |
| m.scale.y = 0.005 | |
| m.scale.z = 0.005 | |
| m.color = ColorRGBA(r=0.0, g=1.0, b=0.0, a=1.0) | |
| ma.markers.append(m) | |
| return ma | |
| # Noneになっているposeを除外して表示 | |
| def create_colored_marker_array(self, pose_joint_flags): | |
| ma = MarkerArray() | |
| for i, (pose, _, pos, normal, success) in enumerate(pose_joint_flags): | |
| if pose is None: | |
| continue # 表示なし(または灰色などでもOK) | |
| m = Marker() | |
| m.header.frame_id = "base_link" | |
| m.ns = "spray_path" | |
| m.id = i | |
| m.type = Marker.ARROW | |
| m.action = Marker.ADD | |
| m.pose = pose.pose | |
| m.scale.x = 0.03 | |
| m.scale.y = 0.005 | |
| m.scale.z = 0.005 | |
| m.color = ColorRGBA(r=0.0 if success else 1.0, | |
| g=1.0 if success else 0.0, | |
| b=0.0, | |
| a=1.0) | |
| ma.markers.append(m) | |
| return ma | |
| def create_reachability_heatmap(self, pose_joint_flags): | |
| ma = MarkerArray() | |
| for i, (_, _, p, n, success) in enumerate(pose_joint_flags): | |
| m = Marker() | |
| m.header.frame_id = "base_link" | |
| m.ns = "spray_reach_map" | |
| m.id = i | |
| m.type = Marker.SPHERE | |
| m.action = Marker.ADD | |
| m.pose.position.x, m.pose.position.y, m.pose.position.z = p | |
| m.pose.orientation.w = 1.0 | |
| m.scale.x = m.scale.y = m.scale.z = 0.015 | |
| m.color = ColorRGBA(r=0.2, g=0.2 + 0.8 * success, b=0.2, a=0.4) | |
| ma.markers.append(m) | |
| return ma | |
| def zigzag_order(spray_positions, normals, spacing=0.03): | |
| # XY平面上でスナップ(整数グリッド座標) | |
| grid = {} | |
| for pos, normal in zip(spray_positions, normals): | |
| gx = int(pos[0] / spacing) | |
| gy = int(pos[1] / spacing) | |
| grid.setdefault(gy, []).append((gx, pos, normal)) | |
| ordered = [] | |
| for gy in sorted(grid.keys()): | |
| row = sorted(grid[gy], key=lambda x: x[0]) | |
| if gy % 2 == 1: | |
| row = row[::-1] # 奇数行は逆順 | |
| ordered.extend([(pos, normal) for _, pos, normal in row]) | |
| return zip(*ordered) # spray_positions, normals に再分割 | |
| def main(): | |
| rclpy.init() | |
| node = SprayPathPlanner() | |
| # スキャンからスプレーパス生成・送信 | |
| pcd = node.capture_point_cloud() | |
| spray_positions, normals = node.extract_spray_path(pcd) | |
| # ここでジグザグ順に並べ替え | |
| spray_positions, normals = zigzag_order(spray_positions, normals, spacing=node.grid_spacing) | |
| pose_joint_flags = node.filter_by_ik(spray_positions, normals) | |
| # 可視化 | |
| colored = node.create_colored_marker_array(pose_joint_flags) | |
| node.marker_pub.publish(colored) | |
| heatmap = node.create_reachability_heatmap(pose_joint_flags) | |
| node.marker_pub.publish(heatmap) | |
| reachable_flags = [success for (_, _, _, _, success) in pose_joint_flags] | |
| node.publish_colored_pointcloud(pcd, reachable_flags) | |
| #marker_array = node.create_marker_array(spray_positions, normals) | |
| #node.marker_pub.publish(marker_array) | |
| # IK & Trajectoryへ | |
| valid_only = [(p, j) for (p, j, _, _, success) in pose_joint_flags if success] | |
| trajectory = node.create_trajectory(valid_only) | |
| node.publish(trajectory) | |
| rclpy.shutdown() | |
| if __name__ == '__main__': | |
| main() | |
| # pip install pyrealsense2 open3d tf_transformations | |
| # Add → PointCloud2, MarkerArray, MarkerArray | |
| # Topic: /spray_reach_cloud, /spray_path_marker, /spray_path_marker | |
| # Frame: base_link |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment