Skip to content

Instantly share code, notes, and snippets.

@yoi-hibino
Last active July 23, 2025 00:29
Show Gist options
  • Select an option

  • Save yoi-hibino/83a14a9628b6fddd6f2ab62421f00a4d to your computer and use it in GitHub Desktop.

Select an option

Save yoi-hibino/83a14a9628b6fddd6f2ab62421f00a4d to your computer and use it in GitHub Desktop.
# spray_path_planner.py
import rclpy
from rclpy.node import Node
import pyrealsense2 as rs
import open3d as o3d
import numpy as np
import tf_transformations
from moveit_commander import RobotCommander, MoveGroupCommander
from geometry_msgs.msg import PoseStamped
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from visualization_msgs.msg import Marker, MarkerArray
from std_msgs.msg import Header, ColorRGBA
from sensor_msgs.msg import PointCloud2
import sensor_msgs_py.point_cloud2 as pc2
class SprayPathPlanner(Node):
def __init__(self):
super().__init__('spray_path_planner')
self.robot = RobotCommander()
self.group = MoveGroupCommander("arm")
self.traj_pub = self.create_publisher(JointTrajectory, '/joint_trajectory', 10)
self.spray_distance = 0.1 # 法線方向に10cm離してスプレー
self.grid_spacing = 0.03 # 3cm間隔で網塗り
self.grid_size = 0.3 # 全体30cm角グリッド
self.ik_cache = {}
# MarkerをRVizに追加
self.marker_pub = self.create_publisher(MarkerArray, '/spray_path_marker', 10)
self.pcd_pub = self.create_publisher(PointCloud2, '/spray_reach_cloud', 10)
def capture_point_cloud(self):
self.get_logger().info("🔍 Capturing point cloud from RealSense...")
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
pipeline.start(config)
for _ in range(10):
frames = pipeline.wait_for_frames()
depth = np.asanyarray(frames.get_depth_frame().get_data())
color = np.asanyarray(frames.get_color_frame().get_data())
intr = frames.get_profile().as_video_stream_profile().get_intrinsics()
pinhole = o3d.camera.PinholeCameraIntrinsic(
intr.width, intr.height, intr.fx, intr.fy, intr.ppx, intr.ppy
)
rgbd = o3d.geometry.RGBDImage.create_from_color_and_depth(
o3d.geometry.Image(color), o3d.geometry.Image(depth),
depth_scale=1000.0, convert_rgb_to_intensity=False
)
pcd = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd, pinhole)
pcd.transform([[1, 0, 0, 0],
[0, -1, 0, 0],
[0, 0, -1, 0],
[0, 0, 0, 1]])
pipeline.stop()
return pcd
def extract_spray_path(self, pcd):
self.get_logger().info("📐 Estimating normals and generating spray path...")
pcd.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.02, max_nn=30))
pcd.normalize_normals()
# 格子状にサンプリング
min_bound = pcd.get_min_bound()
max_bound = pcd.get_max_bound()
grid_x = np.arange(min_bound[0], max_bound[0], self.grid_spacing)
grid_y = np.arange(min_bound[1], max_bound[1], self.grid_spacing)
spray_positions, normals = [], []
pcd_tree = o3d.geometry.KDTreeFlann(pcd)
for x in grid_x:
for y in grid_y:
query = np.array([x, y, (min_bound[2] + max_bound[2]) / 2])
[_, idx, _] = pcd_tree.search_knn_vector_3d(query, 1)
nearest = np.asarray(pcd.points)[idx[0]]
normal = np.asarray(pcd.normals)[idx[0]]
spray_pos = nearest + normal * self.spray_distance
spray_positions.append(spray_pos)
normals.append(normal)
return spray_positions, normals
def pose_from_normal(self, pos, normal):
z = normal / np.linalg.norm(normal)
x = np.array([1, 0, 0])
if abs(np.dot(x, z)) > 0.95:
x = np.array([0, 1, 0])
y = np.cross(z, x)
x = np.cross(y, z)
rot = np.vstack([x, y, z]).T
quat = tf_transformations.quaternion_from_matrix(
np.vstack([np.hstack([rot, [[0],[0],[0]]]), [0,0,0,1]])
)
pose = PoseStamped()
pose.header.frame_id = "base_link"
pose.pose.position.x, pose.pose.position.y, pose.pose.position.z = pos
pose.pose.orientation.x, pose.pose.orientation.y, pose.pose.orientation.z, pose.pose.orientation.w = quat
return pose
def filter_by_ik(self, spray_positions, normals):
self.get_logger().info("🔄 Caching + Orientation search with IK filtering...")
valid_pairs = []
unreachable = []
self.ik_cache = getattr(self, "ik_cache", {})
for pos, normal in zip(spray_positions, normals):
if not self.is_reachable_fast(pos):
unreachable.append((None, None, pos, normal, False))
continue
# 🔍 キャッシュの検索キー(位置だけに基づく)
cache_key_prefix = (
round(pos[0], 3),
round(pos[1], 3),
round(pos[2], 3),
)
# 💾 すでに成功したorientationがあれば再利用
cached_result = next((
(pose, joints)
for key, (pose, joints) in self.ik_cache.items()
if key[:3] == cache_key_prefix and joints is not None
), None)
if cached_result is not None:
pose, joints = cached_result
valid_pairs.append((pose, joints, pos, normal, True))
continue
# それ以外はorientationスキャンを試す
pose, joints = self.find_best_orientation(pos, normal, n_steps=12)
# 成功 or 失敗のいずれもキャッシュに登録
key = cache_key_prefix + (round(pose.pose.orientation.w, 2),) if pose else cache_key_prefix + (None,)
self.ik_cache[key] = (pose, joints)
if joints is not None:
valid_pairs.append((pose, joints, pos, normal, True))
else:
unreachable.append((None, None, pos, normal, False))
self.get_logger().info(f"✅ Reachable: {len(valid_pairs)}, ❌ Unreachable: {len(unreachable)}")
return valid_pairs + unreachable
# filter_by_ikの代替軽量関数
def filter_fast_reachability(self, spray_positions, normals):
flags = []
for p in spray_positions:
flags.append(self.is_reachable_fast(p))
return flags # → ヒートマップやMarker用に使える
def find_best_orientation(self, position, normal, n_steps=12):
z = normal / np.linalg.norm(normal)
x_base = np.array([1, 0, 0])
if abs(np.dot(x_base, z)) > 0.95:
x_base = np.array([0, 1, 0])
y = np.cross(z, x_base)
x = np.cross(y, z)
base_rot = np.vstack([x, y, z]).T
for i in range(n_steps):
theta = 2 * np.pi * i / n_steps
rot_z = tf_transformations.rotation_matrix(theta, (0, 0, 1))[:3, :3]
rot_total = base_rot @ rot_z
quat = tf_transformations.quaternion_from_matrix(np.vstack([
np.hstack([rot_total, [[0],[0],[0]]]),
[0,0,0,1]
]))
pose = PoseStamped()
pose.header.frame_id = "base_link"
pose.pose.position.x, pose.pose.position.y, pose.pose.position.z = position
pose.pose.orientation.x, pose.pose.orientation.y, pose.pose.orientation.z, pose.pose.orientation.w = quat
# キャッシュ付きIK
key = (
round(pose.pose.position.x, 3),
round(pose.pose.position.y, 3),
round(pose.pose.position.z, 3),
round(quat[3], 2)
)
if key in self.ik_cache:
joints = self.ik_cache[key]
else:
self.group.set_pose_target(pose)
plan = self.group.plan()
joints = None
if plan and len(plan.joint_trajectory.points) > 0:
joints = plan.joint_trajectory.points[-1].positions
self.ik_cache[key] = joints
if joints is not None:
return pose, joints # 🎯 IK成功!
return None, None # ❌ 全スキャン失敗
def is_reachable_fast(self, point, arm_reach_radius=0.4, z_bounds=(0.05, 0.35)):
# 単純な球状リーチモデル(改良可)
r = np.linalg.norm(point[:2])
z = point[2]
return (r <= arm_reach_radius) and (z_bounds[0] <= z <= z_bounds[1])
def cached_ik(self, pose: PoseStamped):
key = (
round(pose.pose.position.x, 3),
round(pose.pose.position.y, 3),
round(pose.pose.position.z, 3),
round(pose.pose.orientation.w, 2),
)
if key in self.ik_cache:
return self.ik_cache[key]
self.group.set_pose_target(pose)
plan = self.group.plan()
if plan and len(plan.joint_trajectory.points) > 0:
result = plan.joint_trajectory.points[-1].positions
self.ik_cache[key] = result
return result
else:
self.ik_cache[key] = None
return None
def create_trajectory(self, pose_joint_pairs, speed=0.05):
traj = JointTrajectory()
traj.joint_names = self.group.get_active_joints()
tfs = 0.0
prev = None
for _, joints in pose_joint_pairs:
pt = JointTrajectoryPoint()
pt.positions = joints
if prev is not None:
dist = np.linalg.norm(np.array(joints) - np.array(prev))
tfs += dist / speed
pt.time_from_start.sec = int(tfs)
pt.time_from_start.nanosec = int((tfs % 1) * 1e9)
traj.points.append(pt)
prev = joints
return traj
def publish(self, trajectory):
self.traj_pub.publish(trajectory)
self.get_logger().info(f"🚀 Trajectory published with {len(trajectory.points)} points")
def publish_colored_pointcloud(self, pcd: o3d.geometry.PointCloud, reachable_mask: list[bool]):
colors = []
for reachable in reachable_mask:
if reachable:
colors.append([0.0, 1.0, 0.0]) # Green
else:
colors.append([1.0, 0.0, 0.0]) # Red
pcd.colors = o3d.utility.Vector3dVector(colors)
# ROS PointCloud2 変換
header = Header()
header.frame_id = "base_link"
points = np.asarray(pcd.points)
cloud_data = [(p[0], p[1], p[2], int(c[0]*255)<<16 | int(c[1]*255)<<8 | int(c[2]*255)) for p, c in zip(points, colors)]
msg = pc2.create_cloud(header, fields=[
pc2.PointField('x', 0, pc2.PointField.FLOAT32, 1),
pc2.PointField('y', 4, pc2.PointField.FLOAT32, 1),
pc2.PointField('z', 8, pc2.PointField.FLOAT32, 1),
pc2.PointField('rgb', 12, pc2.PointField.UINT32, 1)
], points=cloud_data)
self.pcd_pub.publish(msg)
def create_marker_array(self, spray_positions, normals):
ma = MarkerArray()
for i, (p, n) in enumerate(zip(spray_positions, normals)):
m = Marker()
m.header = Header(frame_id="base_link")
m.ns = "spray_points"
m.id = i
m.type = Marker.ARROW
m.action = Marker.ADD
m.pose.position.x = p[0]
m.pose.position.y = p[1]
m.pose.position.z = p[2]
# 法線ベクトル → 向きに変換
q = self.normal_to_quat(n)
m.pose.orientation.x = q[0]
m.pose.orientation.y = q[1]
m.pose.orientation.z = q[2]
m.pose.orientation.w = q[3]
m.scale.x = 0.03 # shaft length
m.scale.y = 0.005
m.scale.z = 0.005
m.color = ColorRGBA(r=0.0, g=1.0, b=0.0, a=1.0)
ma.markers.append(m)
return ma
# Noneになっているposeを除外して表示
def create_colored_marker_array(self, pose_joint_flags):
ma = MarkerArray()
for i, (pose, _, pos, normal, success) in enumerate(pose_joint_flags):
if pose is None:
continue # 表示なし(または灰色などでもOK)
m = Marker()
m.header.frame_id = "base_link"
m.ns = "spray_path"
m.id = i
m.type = Marker.ARROW
m.action = Marker.ADD
m.pose = pose.pose
m.scale.x = 0.03
m.scale.y = 0.005
m.scale.z = 0.005
m.color = ColorRGBA(r=0.0 if success else 1.0,
g=1.0 if success else 0.0,
b=0.0,
a=1.0)
ma.markers.append(m)
return ma
def create_reachability_heatmap(self, pose_joint_flags):
ma = MarkerArray()
for i, (_, _, p, n, success) in enumerate(pose_joint_flags):
m = Marker()
m.header.frame_id = "base_link"
m.ns = "spray_reach_map"
m.id = i
m.type = Marker.SPHERE
m.action = Marker.ADD
m.pose.position.x, m.pose.position.y, m.pose.position.z = p
m.pose.orientation.w = 1.0
m.scale.x = m.scale.y = m.scale.z = 0.015
m.color = ColorRGBA(r=0.2, g=0.2 + 0.8 * success, b=0.2, a=0.4)
ma.markers.append(m)
return ma
def zigzag_order(spray_positions, normals, spacing=0.03):
# XY平面上でスナップ(整数グリッド座標)
grid = {}
for pos, normal in zip(spray_positions, normals):
gx = int(pos[0] / spacing)
gy = int(pos[1] / spacing)
grid.setdefault(gy, []).append((gx, pos, normal))
ordered = []
for gy in sorted(grid.keys()):
row = sorted(grid[gy], key=lambda x: x[0])
if gy % 2 == 1:
row = row[::-1] # 奇数行は逆順
ordered.extend([(pos, normal) for _, pos, normal in row])
return zip(*ordered) # spray_positions, normals に再分割
def main():
rclpy.init()
node = SprayPathPlanner()
# スキャンからスプレーパス生成・送信
pcd = node.capture_point_cloud()
spray_positions, normals = node.extract_spray_path(pcd)
# ここでジグザグ順に並べ替え
spray_positions, normals = zigzag_order(spray_positions, normals, spacing=node.grid_spacing)
pose_joint_flags = node.filter_by_ik(spray_positions, normals)
# 可視化
colored = node.create_colored_marker_array(pose_joint_flags)
node.marker_pub.publish(colored)
heatmap = node.create_reachability_heatmap(pose_joint_flags)
node.marker_pub.publish(heatmap)
reachable_flags = [success for (_, _, _, _, success) in pose_joint_flags]
node.publish_colored_pointcloud(pcd, reachable_flags)
#marker_array = node.create_marker_array(spray_positions, normals)
#node.marker_pub.publish(marker_array)
# IK & Trajectoryへ
valid_only = [(p, j) for (p, j, _, _, success) in pose_joint_flags if success]
trajectory = node.create_trajectory(valid_only)
node.publish(trajectory)
rclpy.shutdown()
if __name__ == '__main__':
main()
# pip install pyrealsense2 open3d tf_transformations
# Add → PointCloud2, MarkerArray, MarkerArray
# Topic: /spray_reach_cloud, /spray_path_marker, /spray_path_marker
# Frame: base_link
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment