Created
August 21, 2025 12:45
-
-
Save sjchoi86/1c912ad8bfb5d027b461fcde47bf9e57 to your computer and use it in GitHub Desktop.
Uploaded from jupyterlab
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import pyautogui | |
| import sys | |
| import time | |
| import cv2 | |
| import random | |
| import re | |
| import numpy as np | |
| import cvxpy as cp | |
| import shapely as sp | |
| import matplotlib as mpl | |
| import matplotlib.pyplot as plt | |
| import tkinter as tk | |
| import xml.etree.ElementTree as ET | |
| import subprocess | |
| import pyqtgraph as pg | |
| from scipy.spatial.distance import cdist | |
| from PIL import Image, ImageDraw, ImageFont | |
| from IPython.display import Markdown,display | |
| from xml.dom import minidom | |
| from PyQt5.QtGui import QFont | |
| from PyQt5.QtGui import QColor | |
| from PyQt5.QtWidgets import ( | |
| QApplication, QWidget, QSlider, QLabel, QGridLayout, QScrollArea, | |
| QVBoxLayout, QFrame, QRadioButton, QButtonGroup, QCheckBox, | |
| ) | |
| from PyQt5.QtCore import Qt | |
| from functools import partial | |
| from scipy.ndimage import binary_dilation | |
| from shapely import Polygon,LineString,Point # handle polygons | |
| from io import BytesIO | |
| from termcolor import colored | |
| from collections import deque | |
| from datetime import datetime | |
| """ | |
| sys.path.append('../../package/kinematics_helper/') # for 'transforms' | |
| """ | |
| from transforms import ( | |
| t2p, | |
| r2quat, | |
| rpy2r, | |
| ) | |
| def trim_scale(x,th): | |
| """ | |
| Trim the scale of the input array such that its maximum absolute value does not exceed a given threshold. | |
| Parameters: | |
| x (np.array): Input array. | |
| th (float): Threshold value. | |
| Returns: | |
| np.array: Scaled array with its maximum absolute value limited to th. | |
| """ | |
| x = np.copy(x) | |
| x_abs_max = np.abs(x).max() | |
| if x_abs_max > th: | |
| x = x*th/x_abs_max | |
| return x | |
| def compute_view_params( | |
| camera_pos, | |
| target_pos, | |
| up_vector = np.array([0,0,1]), | |
| ): | |
| """ | |
| Compute view parameters (azimuth, distance, elevation, and lookat point) for a camera in 3D space. | |
| Parameters: | |
| camera_pos (np.ndarray): 3D position of the camera. | |
| target_pos (np.ndarray): 3D position of the target. | |
| up_vector (np.ndarray): 3D up vector (default is [0, 0, 1]). | |
| Returns: | |
| tuple: (azimuth [deg], distance, elevation [deg], lookat point) | |
| """ | |
| # Compute camera-to-target vector and distance | |
| cam_to_target = target_pos - camera_pos | |
| distance = np.linalg.norm(cam_to_target) | |
| # Compute azimuth and elevation | |
| azimuth = np.arctan2(cam_to_target[1], cam_to_target[0]) | |
| azimuth = np.rad2deg(azimuth) # [deg] | |
| elevation = np.arcsin(cam_to_target[2] / distance) | |
| elevation = np.rad2deg(elevation) # [deg] | |
| # Compute lookat point | |
| lookat = target_pos | |
| # Compute camera orientation matrix | |
| zaxis = cam_to_target / distance | |
| xaxis = np.cross(up_vector, zaxis) | |
| yaxis = np.cross(zaxis, xaxis) | |
| cam_orient = np.array([xaxis, yaxis, zaxis]) | |
| # Return computed values | |
| return azimuth, distance, elevation, lookat | |
| def get_idxs(list_query, list_domain): | |
| """ | |
| Get the corresponding indices in list_query for the items present in list_domain. | |
| Parameters: | |
| list_query (list): The list in which to search for items. | |
| list_domain (list): The list of items whose indices are desired. | |
| Returns: | |
| list: A list of indices from list_query for items found in list_domain. | |
| Raises: | |
| TypeError: If either input is not a list. | |
| """ | |
| if not isinstance(list_query, list) or not isinstance(list_domain, list): | |
| raise TypeError("[get_idxs] Both inputs should be lists.") | |
| return [list_query.index(item) for item in list_domain if item in list_query] | |
| def get_idxs_contain(list_query,list_substring): | |
| """ | |
| Get indices of elements in list_query that contain any of the substrings specified in list_substring. | |
| Parameters: | |
| list_query (list): List of strings to search. | |
| list_substring (list): List of substrings to look for. | |
| Returns: | |
| list: Indices of elements in list_query that contain any of the substrings. | |
| """ | |
| idxs = [i for i, s in enumerate(list_query) if any(sub in s for sub in list_substring)] | |
| return idxs | |
| def get_colors(n_color=10, cmap_name='gist_rainbow', alpha=1.0): | |
| """ | |
| Generate a list of diverse colors using a specified colormap. | |
| Returns: | |
| list: A list of RGBA tuples. | |
| """ | |
| cmap = plt.get_cmap(cmap_name) | |
| rgba = cmap(np.linspace(0, 1, n_color)) | |
| rgba[:, 3] = alpha | |
| return [tuple(r) for r in rgba] | |
| def sample_xyzs(n_sample=1,x_range=(-1,-1),y_range=(-1,+1),z_range=(0,0),min_dist=0.1,xy_margin=0.0): | |
| """ | |
| Sample 3D points within specified ranges ensuring a minimum distance between points. | |
| Parameters: | |
| n_sample (int): Number of points to sample. | |
| x_range (list): [min, max] range for x-coordinate. | |
| y_range (list): [min, max] range for y-coordinate. | |
| z_range (list): [min, max] range for z-coordinate. | |
| min_dist (float): Minimum allowed distance between points. | |
| xy_margin (float): Margin to apply for x and y dimensions. | |
| Returns: | |
| np.array: Array of sampled 3D points with shape (n_sample, 3). | |
| """ | |
| xyzs = np.zeros((n_sample,3)) | |
| for p_idx in range(n_sample): | |
| while True: | |
| x_rand = np.random.uniform(low=x_range[0]+xy_margin,high=x_range[1]-xy_margin) | |
| y_rand = np.random.uniform(low=y_range[0]+xy_margin,high=y_range[1]-xy_margin) | |
| z_rand = np.random.uniform(low=z_range[0],high=z_range[1]) | |
| xyz = np.array([x_rand,y_rand,z_rand]) | |
| if p_idx == 0: break | |
| devc = cdist(xyz.reshape((-1,3)),xyzs[:p_idx,:].reshape((-1,3)),'euclidean') | |
| if devc.min() > min_dist: break # minimum distance between objects | |
| xyzs[p_idx,:] = xyz | |
| return xyzs | |
| def sample_xys(n_sample=1,x_range=[0,1],y_range=[0,1],min_dist=0.1,xy_margin=0.0): | |
| """ | |
| Sample 2D points within specified ranges ensuring a minimum distance between points. | |
| Parameters: | |
| n_sample (int): Number of points to sample. | |
| x_range (list): [min, max] range for x-coordinate. | |
| y_range (list): [min, max] range for y-coordinate. | |
| min_dist (float): Minimum allowed distance between points. | |
| xy_margin (float): Margin to apply for x and y dimensions. | |
| Returns: | |
| np.array: Array of sampled 2D points with shape (n_sample, 2). | |
| """ | |
| xys = np.zeros((n_sample,2)) | |
| for p_idx in range(n_sample): | |
| while True: | |
| x_rand = np.random.uniform(low=x_range[0]+xy_margin,high=x_range[1]-xy_margin) | |
| y_rand = np.random.uniform(low=y_range[0]+xy_margin,high=y_range[1]-xy_margin) | |
| xy = np.array([x_rand,y_rand]) | |
| if p_idx == 0: break | |
| devc = cdist(xy.reshape((-1,2)),xys[:p_idx,:].reshape((-1,2)),'euclidean') | |
| if devc.min() > min_dist: break # minimum distance between objects | |
| xys[p_idx,:] = xy | |
| return xys | |
| def save_png(img,png_path,verbose=False): | |
| """ | |
| Save an image to a PNG file. | |
| Parameters: | |
| img (np.array): Image data. | |
| png_path (str): File path to save the PNG. | |
| verbose (bool): If True, print status messages. | |
| Returns: | |
| None | |
| """ | |
| directory = os.path.dirname(png_path) | |
| if not os.path.exists(directory): | |
| os.makedirs(directory) | |
| if verbose: | |
| print ("[%s] generated."%(directory)) | |
| # Save to png | |
| plt.imsave(png_path,img) | |
| if verbose: | |
| print ("[%s] saved."%(png_path)) | |
| class MultiSliderClass(object): | |
| """ | |
| GUI class to create and manage multiple sliders using Tkinter. | |
| """ | |
| def __init__( | |
| self, | |
| n_slider = 10, | |
| title = 'Multiple Sliders', | |
| window_width = 500, | |
| window_height = None, | |
| x_offset = 0, | |
| y_offset = 100, | |
| slider_width = 400, | |
| label_width = None, # dummy parameter | |
| label_texts = None, | |
| slider_mins = None, | |
| slider_maxs = None, | |
| slider_vals = None, | |
| resolution = None, | |
| resolutions = None, | |
| fontsize = 10, | |
| verbose = True | |
| ): | |
| """ | |
| Initialize the MultiSliderClass with the specified slider parameters. | |
| Parameters: | |
| n_slider (int): Number of sliders. | |
| title (str): Window title. | |
| window_width (int): Width of the window. | |
| window_height (int): Height of the window. If None, it is computed based on n_slider. | |
| x_offset (int): X offset for window placement. | |
| y_offset (int): Y offset for window placement. | |
| slider_width (int): Width of each slider. | |
| label_width (int): Width of the label (dummy parameter). | |
| label_texts (list): List of texts for slider labels. | |
| slider_mins (list): List of minimum values for sliders. | |
| slider_maxs (list): List of maximum values for sliders. | |
| slider_vals (list): Initial slider values. | |
| resolution (float): Resolution for all sliders. | |
| resolutions (list): List of resolutions for each slider. | |
| fontsize (int): Font size for labels. | |
| verbose (bool): If True, print status messages. | |
| """ | |
| self.n_slider = n_slider | |
| self.title = title | |
| self.window_width = window_width | |
| if window_height is None: | |
| self.window_height = self.n_slider*40 | |
| else: | |
| self.window_height = window_height | |
| self.x_offset = x_offset | |
| self.y_offset = y_offset | |
| self.slider_width = slider_width | |
| self.resolution = resolution | |
| self.resolutions = resolutions | |
| self.fontsize = fontsize | |
| self.verbose = verbose | |
| # Slider values | |
| self.slider_values = np.zeros(self.n_slider) | |
| # Initial/default slider settings | |
| self.label_texts = label_texts | |
| self.slider_mins = slider_mins | |
| self.slider_maxs = slider_maxs | |
| self.slider_vals = slider_vals | |
| # Create main window | |
| self.gui = tk.Tk() | |
| self.gui.title("%s"%(self.title)) | |
| self.gui.geometry( | |
| "%dx%d+%d+%d"% | |
| (self.window_width,self.window_height,self.x_offset,self.y_offset)) | |
| # Create vertical scrollbar | |
| self.scrollbar = tk.Scrollbar(self.gui,orient=tk.VERTICAL) | |
| # Create a Canvas widget with the scrollbar attached | |
| self.canvas = tk.Canvas(self.gui,yscrollcommand=self.scrollbar.set) | |
| self.canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) | |
| # Configure the scrollbar to control the canvas | |
| self.scrollbar.config(command=self.canvas.yview) | |
| self.scrollbar.pack(side=tk.RIGHT, fill=tk.Y) | |
| # Create a frame inside the canvas to hold the sliders | |
| self.sliders_frame = tk.Frame(self.canvas) | |
| self.canvas.create_window((0,0),window=self.sliders_frame,anchor=tk.NW) | |
| # Create sliders | |
| self.sliders = self.create_sliders() | |
| # Update the canvas scroll region when the sliders_frame changes size | |
| self.sliders_frame.bind("<Configure>",self.cb_scroll) | |
| # You may want to do this in the main script | |
| for _ in range(100): self.update() # to avoid GIL-related error | |
| def cb_scroll(self,event): | |
| """ | |
| Callback function to update the scroll region when the slider frame size changes. | |
| Parameters: | |
| event: Tkinter event object. | |
| Returns: | |
| None | |
| """ | |
| self.canvas.configure(scrollregion=self.canvas.bbox("all")) | |
| def cb_slider(self,slider_idx,slider_value): | |
| """ | |
| Callback function for slider value changes. | |
| Parameters: | |
| slider_idx (int): Index of the slider. | |
| slider_value (float): New value of the slider. | |
| Returns: | |
| None | |
| """ | |
| self.slider_values[slider_idx] = slider_value # append | |
| if self.verbose: | |
| print ("slider_idx:[%d] slider_value:[%.1f]"%(slider_idx,slider_value)) | |
| def create_sliders(self): | |
| """ | |
| Create slider widgets for the GUI. | |
| Returns: | |
| list: List of slider widget objects. | |
| """ | |
| sliders = [] | |
| for s_idx in range(self.n_slider): | |
| # Create label | |
| if self.label_texts is None: | |
| label_text = "Slider %02d "%(s_idx) | |
| else: | |
| label_text = "[%d/%d] %s"%(s_idx,self.n_slider,self.label_texts[s_idx]) | |
| slider_label = tk.Label(self.sliders_frame, text=label_text,font=("Helvetica",self.fontsize)) | |
| slider_label.grid(row=s_idx,column=0,padx=0,pady=0) | |
| # Create slider | |
| if self.slider_mins is None: slider_min = 0 | |
| else: slider_min = self.slider_mins[s_idx] | |
| if self.slider_maxs is None: slider_max = 100 | |
| else: slider_max = self.slider_maxs[s_idx] | |
| if self.slider_vals is None: slider_val = 50 | |
| else: slider_val = self.slider_vals[s_idx] | |
| # Resolution | |
| if self.resolution is None: # if none, divide the range with 100 | |
| resolution = (slider_max-slider_min)/100 | |
| else: | |
| resolution = self.resolution | |
| if self.resolutions is not None: | |
| resolution = self.resolutions[s_idx] | |
| slider = tk.Scale( | |
| self.sliders_frame, | |
| from_ = slider_min, | |
| to = slider_max, | |
| orient = tk.HORIZONTAL, | |
| command = lambda value,idx=s_idx:self.cb_slider(idx,float(value)), | |
| resolution = resolution, | |
| length = self.slider_width, | |
| font = ("Helvetica",self.fontsize), | |
| ) | |
| slider.grid(row=s_idx,column=1,padx=0,pady=0,sticky=tk.W) | |
| slider.set(slider_val) | |
| sliders.append(slider) | |
| return sliders | |
| def update(self): | |
| """ | |
| Update the GUI window if it exists. | |
| Returns: | |
| None | |
| """ | |
| if self.is_window_exists(): | |
| self.gui.update() | |
| def run(self): | |
| """ | |
| Start the Tkinter main loop for the slider GUI. | |
| Returns: | |
| None | |
| """ | |
| self.gui.mainloop() | |
| def is_window_exists(self): | |
| """ | |
| Check if the GUI window still exists. | |
| Returns: | |
| bool: True if the window exists, False otherwise. | |
| """ | |
| try: | |
| return self.gui.winfo_exists() | |
| except tk.TclError: | |
| return False | |
| def get_slider_values(self): | |
| """ | |
| Get the current values of all sliders. | |
| Returns: | |
| np.array: Array of slider values. | |
| """ | |
| return self.slider_values | |
| def get_values(self): | |
| """ | |
| Alias for get_slider_values. | |
| Returns: | |
| np.array: Array of slider values. | |
| """ | |
| return self.slider_values | |
| def set_slider_values(self,slider_values): | |
| """ | |
| Set the values of all sliders. | |
| Parameters: | |
| slider_values (list or np.array): New slider values. | |
| Returns: | |
| None | |
| """ | |
| self.slider_values = slider_values | |
| for slider,slider_value in zip(self.sliders,self.slider_values): | |
| slider.set(slider_value) | |
| def set_slider_value(self,slider_idx,slider_value): | |
| """ | |
| Set the value of a specific slider. | |
| Parameters: | |
| slider_idx (int): Index of the slider. | |
| slider_value (float): New value for the slider. | |
| Returns: | |
| None | |
| """ | |
| self.slider_values[slider_idx] = slider_value | |
| slider = self.sliders[slider_idx] | |
| slider.set(slider_value) | |
| def close(self): | |
| """ | |
| Close the slider GUI. | |
| Returns: | |
| None | |
| """ | |
| if self.is_window_exists(): | |
| # some loop | |
| for _ in range(100): self.update() # to avoid GIL-related error | |
| # Close | |
| self.gui.destroy() | |
| self.gui.quit() | |
| self.gui.update() | |
| class MultiSliderWidgetClass(QWidget): | |
| """ | |
| GUI class to create and manage multiple sliders using PyQt5. | |
| """ | |
| def __init__( | |
| self, | |
| n_slider = None, # to make it comaptible with 'MultiSliderClass' | |
| n_sliders = 5, | |
| title = 'PyQt5 MultiSlider', | |
| window_width = 500, | |
| window_height = None, | |
| x_offset = 100, | |
| y_offset = 100, | |
| label_width = 200, | |
| slider_width = 400, | |
| label_texts = None, | |
| slider_mins = None, | |
| slider_maxs = None, | |
| slider_vals = None, | |
| resolutions = None, | |
| max_change = 0.1, | |
| max_changes = None, | |
| fontsize = 10, | |
| verbose = True, | |
| ): | |
| """ | |
| Initialize the MultiSliderWidgetClass with the specified slider parameters. | |
| Parameters: | |
| n_slider (int): (Optional) Overrides n_sliders if provided. | |
| n_sliders (int): Number of sliders. | |
| title (str): Window title. | |
| window_width (int): Width of the window. | |
| window_height (int): Height of the window. | |
| x_offset (int): X offset for window placement. | |
| y_offset (int): Y offset for window placement. | |
| label_width (int): Width of the label. | |
| slider_width (int): Width of each slider. | |
| label_texts (list): List of texts for slider labels. | |
| slider_mins (list): List of minimum slider values. | |
| slider_maxs (list): List of maximum slider values. | |
| slider_vals (list): List of initial slider values. | |
| resolutions (list): List of resolutions for each slider. | |
| max_change (float): Maximum allowed change per update. | |
| max_changes (list): List of maximum changes for each slider. | |
| fontsize (int): Font size for labels. | |
| verbose (bool): If True, print status messages. | |
| """ | |
| super().__init__() | |
| if n_slider is not None: n_sliders = n_slider # override 'n_sliders' if 'n_slider' it not None | |
| self.n_sliders = n_sliders | |
| self.title = title | |
| self.window_width = window_width | |
| self.window_height = window_height | |
| self.x_offset = x_offset | |
| self.y_offset = y_offset | |
| self.label_width = label_width | |
| self.slider_width = slider_width | |
| if label_texts is None: | |
| self.label_texts = [f'Slider {i+1}' for i in range(n_sliders)] | |
| else: | |
| self.label_texts = label_texts | |
| self.slider_mins = slider_mins | |
| self.slider_maxs = slider_maxs | |
| self.slider_vals = slider_vals | |
| if resolutions is None: | |
| self.resolutions = [0.01]*self.n_sliders | |
| else: | |
| self.resolutions = resolutions | |
| self.max_change = max_change | |
| if max_changes is None: | |
| self.max_changes = [self.max_change]*self.n_sliders | |
| else: | |
| self.max_changes = max_changes | |
| self.fontsize = fontsize | |
| self.verbose = verbose | |
| self.sliders = [] | |
| self.labels_widgets = [] | |
| self.slider_values = self.slider_vals.copy() | |
| # Initialize UI | |
| self.init_ui() | |
| def init_ui(self): | |
| """ | |
| Initialize the user interface for the slider widget. | |
| Returns: | |
| None | |
| """ | |
| # Main layout | |
| main_layout = QVBoxLayout(self) | |
| # Make region scrollable | |
| scroll_area = QScrollArea(self) | |
| scroll_area.setWidgetResizable(True) | |
| # Widget | |
| scroll_widget = QWidget() | |
| scroll_layout = QGridLayout(scroll_widget) | |
| for i in range(self.n_sliders): | |
| # Slider | |
| slider = QSlider(Qt.Horizontal, self) | |
| # Resolution | |
| scale = 1 / self.resolutions[i] | |
| # Integer range | |
| int_min = int(self.slider_mins[i] * scale) | |
| int_max = int(self.slider_maxs[i] * scale) | |
| int_val = int(self.slider_vals[i] * scale) | |
| slider.setMinimum(int_min) | |
| slider.setMaximum(int_max) | |
| slider.setValue(int_val) | |
| slider.setSingleStep(1) | |
| # slider.valueChanged.connect(lambda value, idx=i, s=scale: self.value_changed(idx, value, s)) | |
| slider.sliderMoved.connect(lambda value, idx=i, s=scale: self.value_changed(idx, value, s)) | |
| # Slider label | |
| label = QLabel(f'{self.label_texts[i]}: {self.slider_vals[i]:.4f}', self) | |
| label.setFixedWidth(self.label_width) | |
| label.setStyleSheet(f"font-size: {self.fontsize}px;") | |
| self.sliders.append(slider) | |
| self.labels_widgets.append(label) | |
| scroll_layout.addWidget(label, i, 0) | |
| scroll_layout.addWidget(slider, i, 1) | |
| scroll_widget.setLayout(scroll_layout) | |
| scroll_area.setWidget(scroll_widget) | |
| # Scrollable area | |
| main_layout.addWidget(scroll_area) | |
| self.setLayout(main_layout) | |
| self.setWindowTitle(self.title) | |
| self.setGeometry(self.x_offset, self.y_offset, self.window_width, self.window_height) | |
| self.show() | |
| # Pause | |
| time.sleep(1.0) | |
| def value_changed(self, index, int_value, scale): | |
| """ | |
| Callback function when a slider's value changes. | |
| Parameters: | |
| index (int): Index of the slider. | |
| int_value (int): New integer value from the slider. | |
| scale (float): Scaling factor to convert integer value to float. | |
| Returns: | |
| None | |
| """ | |
| # Change integer to float value | |
| float_value = int_value / (scale+1e-6) | |
| # Change only if difference is less than max_change | |
| max_change = self.max_changes[index] | |
| if abs(float_value - self.slider_values[index]) <= max_change: # small changes | |
| self.slider_values[index] = float_value | |
| self.labels_widgets[index].setText(f'{self.label_texts[index]}: {float_value:.2f}') | |
| if self.verbose: | |
| print(f'Slider {index} Value: {float_value}') | |
| else: | |
| self.set_slider_value(slider_idx=index,slider_value=self.slider_values[index]) | |
| if self.verbose: | |
| print(f'Slider {index} Ignoring jitter (diff: {abs(float_value - self.slider_values[index]):.4f})') | |
| def get_slider_values(self): | |
| """ | |
| Get the current slider values. | |
| Returns: | |
| np.array: Array of current slider values. | |
| """ | |
| return self.slider_values | |
| def set_slider_values(self, slider_values): | |
| """ | |
| Set the values of all sliders. | |
| Parameters: | |
| slider_values (list or np.array): New slider values. | |
| Returns: | |
| None | |
| """ | |
| for i, val in enumerate(slider_values): | |
| scale = 1 / self.resolutions[i] | |
| int_val = int(val * scale) | |
| self.sliders[i].setValue(int_val) | |
| def set_slider_value(self, slider_idx, slider_value): | |
| """ | |
| Set the value of a specific slider. | |
| Parameters: | |
| slider_idx (int): Index of the slider. | |
| slider_value (float): New value for the slider. | |
| Returns: | |
| None | |
| """ | |
| scale = 1 / self.resolutions[slider_idx] | |
| int_val = int(slider_value * scale) | |
| self.sliders[slider_idx].setValue(int_val) | |
| def close(self): | |
| """ | |
| Close the slider widget. | |
| Returns: | |
| None | |
| """ | |
| super().close() | |
| class MultiSliderQtClass(QWidget): | |
| """ | |
| Multi sliders using Qt | |
| """ | |
| def __init__( | |
| self, | |
| title = None, | |
| name = 'Sliders', | |
| n_slider = 5, | |
| x_offset = 0, | |
| y_offset = 0, | |
| window_width = 500, | |
| window_height = 300, | |
| label_width = 100, | |
| slider_width = 400, | |
| label_texts = None, | |
| label_colors = None, | |
| slider_mins = None, | |
| slider_maxs = None, | |
| slider_steps = 1001, | |
| slider_vals = None, | |
| resolutions = None, | |
| max_changes = None, # not used, to make it compatible with 'MultiSliderClass' | |
| fontsize = 10, | |
| verbose = False, | |
| ): | |
| """ | |
| Initialize the MultiSliderQtClass with specified slider parameters. | |
| Parameters: | |
| title (str): Window title. | |
| name (str): Name of the widget. | |
| n_slider (int): Number of sliders. | |
| x_offset (int): X offset for window placement. | |
| y_offset (int): Y offset for window placement. | |
| window_width (int): Width of the window. | |
| window_height (int): Height of the window. | |
| label_width (int): Width of the label. | |
| slider_width (int): Width of each slider. | |
| label_texts (list): List of texts for slider labels. | |
| label_colors (list): List of colors for slider labels. | |
| slider_mins (list): List of minimum values for sliders. | |
| slider_maxs (list): List of maximum values for sliders. | |
| slider_steps (int): Number of steps for each slider. | |
| slider_vals (list): Initial slider values. | |
| resolutions (list): List of resolutions for each slider. | |
| fontsize (int): Font size for labels. | |
| verbose (bool): If True, print status messages. | |
| Returns: | |
| None | |
| """ | |
| super().__init__() | |
| if title is not None: name = title | |
| self.name = name | |
| self.n_slider = n_slider | |
| self.x_offset = x_offset | |
| self.y_offset = y_offset | |
| self.window_width = window_width | |
| self.window_height = window_height | |
| self.label_width = label_width | |
| self.slider_width = slider_width | |
| if label_texts is None: | |
| label_texts = list(np.arange(0,self.n_slider,1)) | |
| self.label_texts = label_texts | |
| self.color_map = { | |
| 'r': 'red', | |
| 'g': 'green', | |
| 'b': 'blue', | |
| 'c': 'cyan', | |
| 'm': 'magenta', | |
| 'y': 'yellow', | |
| 'k': 'black', | |
| 'w': 'white', | |
| } | |
| if label_colors is None: | |
| label_colors = ['w'] * self.n_slider # default to white | |
| self.label_colors = label_colors | |
| if slider_mins is None: | |
| slider_mins = (0.0*np.ones(self.n_slider)) # no need to be 'list' | |
| self.slider_mins = np.array(slider_mins) | |
| if slider_maxs is None: | |
| slider_maxs = (100.0*np.ones(self.n_slider)) | |
| self.slider_maxs = np.array(slider_maxs) | |
| if slider_vals is None: | |
| slider_vals = (0.0*np.ones(self.n_slider)) | |
| self.slider_vals = np.array(slider_vals) | |
| self.slider_steps = slider_steps | |
| if resolutions is None: | |
| resolutions = (self.slider_maxs-self.slider_mins)/self.slider_steps # default resolution | |
| resolutions = np.maximum(resolutions, 1e-6) # set the minimum to 1e-6 | |
| self.resolutions = np.array(resolutions) | |
| self.fontsize = fontsize | |
| self.verbose = verbose | |
| # Buffers | |
| self.sliders = [] | |
| self.labels = [] | |
| # Initialize user interface | |
| self.init_ui() | |
| def resolve_color(self, key: str) -> str: | |
| """ | |
| Resolve a color key to a valid color string. | |
| Parameters: | |
| key (str): Color key to resolve. | |
| Returns: | |
| str: Resolved color string. | |
| """ | |
| k = key.strip().lower() | |
| # 1) If the key is in the color map, return the mapped color | |
| if k in self.color_map: | |
| return self.color_map[k] | |
| # 2) If the key is a valid Qt color name, return it | |
| k2 = re.sub(r'[\s\-]', '', k) | |
| if QColor(k2).isValid(): | |
| return k2 | |
| # 3) If the key is a pure 6 or 8 digit hex, use it with a '#' | |
| if re.fullmatch(r'[0-9A-Fa-f]{6}', k): | |
| return f'#{k}' | |
| if re.fullmatch(r'[0-9A-Fa-f]{8}', k): | |
| return f'#{k}' | |
| # 4) If none of the above, return the key as is ('red', 'green', etc.) | |
| return key | |
| def init_ui(self): | |
| """ | |
| Initialize the user interface for the slider widget. | |
| Parameters: | |
| None | |
| Returns: | |
| None | |
| """ | |
| # Layout | |
| main_layout = QVBoxLayout(self) | |
| # Scrollable area | |
| scroll_area = QScrollArea(self) | |
| scroll_area.setWidgetResizable(True) | |
| scroll_widget = QWidget() | |
| scroll_layout = QGridLayout(scroll_widget) | |
| # Initialize sliders | |
| for s_idx in range(self.n_slider): # for each slider | |
| # Slider | |
| slider = QSlider(Qt.Horizontal,self) | |
| time.sleep(1e-6) | |
| # QtSlider values can only have integers | |
| resolution = self.resolutions[s_idx] | |
| slider_min = self.slider_mins[s_idx] | |
| slider_max = self.slider_maxs[s_idx] | |
| slider_val = self.slider_vals[s_idx] | |
| # Only integers are allowed | |
| slider.setMinimum(int(0)) # set minimum | |
| slider.setMaximum( # set maximum | |
| self.float_to_int( | |
| val_float = slider_max, | |
| min_val_float = slider_min, | |
| resolution = resolution, | |
| ) | |
| ) | |
| slider.setValue( # set value | |
| self.float_to_int( | |
| val_float = slider_val, | |
| min_val_float = slider_min, | |
| resolution = resolution, | |
| ) | |
| ) | |
| slider.setSingleStep(int(1)) | |
| # Slider value change callback | |
| slider.valueChanged.connect(lambda value,idx=s_idx:self.cb_value_changed(idx,value)) | |
| # Append slider | |
| self.sliders.append(slider) | |
| # Label | |
| label = QLabel(f'{self.label_texts[s_idx]}:{self.slider_vals[s_idx]:.2f}',self) | |
| label_color_key = self.label_colors[s_idx] | |
| # label_color = self.color_map.get(label_color_key, label_color_key) | |
| label_color = self.resolve_color(label_color_key) # resolve color key to a valid color string | |
| time.sleep(1e-6) | |
| label.setFixedWidth(self.label_width) | |
| # label.setStyleSheet(f"font-size: {self.fontsize}px;") | |
| label.setStyleSheet(f"color: {label_color}; font-size: {self.fontsize}px;") | |
| # Append label | |
| self.labels.append(label) | |
| # Add layout | |
| scroll_layout.addWidget(label,s_idx,0) | |
| scroll_layout.addWidget(slider,s_idx,1) | |
| # Add layout | |
| scroll_widget.setLayout(scroll_layout) | |
| scroll_area.setWidget(scroll_widget) | |
| # Set layout | |
| main_layout.addWidget(scroll_area) | |
| self.setLayout(main_layout) | |
| self.setWindowTitle(self.name) # set title | |
| self.setGeometry(self.x_offset,self.y_offset,self.window_width,self.window_height) # set size | |
| # Show | |
| self.show() | |
| time.sleep(1e-6) | |
| # (Optional) Resize | |
| # self.resize(self.width(),self.height()) | |
| def reset(self,n_reset=1): | |
| """ | |
| Reset the slider display. | |
| Parameters: | |
| n_reset (int): Number of resets to perform. | |
| Returns: | |
| None | |
| """ | |
| for _ in range(n_reset): | |
| self.resize(self.width(),self.height()) | |
| self.repaint() | |
| def cb_value_changed(self,idx,value): | |
| """ | |
| Callback function for slider value change events. | |
| Parameters: | |
| idx (int): Index of the slider. | |
| value (int): New slider value. | |
| Returns: | |
| None | |
| """ | |
| label = self.labels[idx] | |
| label_text = self.label_texts[idx] | |
| value_float = self.int_to_float( | |
| val_int = value, | |
| min_val_float = self.slider_mins[idx], | |
| resolution = self.resolutions[idx], | |
| ) | |
| label.setText(f'{label_text}:{value_float:.2f}') | |
| if self.verbose: | |
| print ("slider index:[%d] value:[%d] value_float:[%.2f]"% | |
| (idx,value,value_float)) | |
| def float_to_int(self,val_float,min_val_float,resolution): | |
| """ | |
| Convert a float value to an integer based on resolution. | |
| Parameters: | |
| val_float (float): Float value. | |
| min_val_float (float): Minimum float value. | |
| resolution (float): Resolution factor. | |
| Returns: | |
| int: Converted integer value. | |
| """ | |
| return int((val_float-min_val_float)/resolution) | |
| def int_to_float(self,val_int,min_val_float,resolution): | |
| """ | |
| Convert an integer value back to a float based on resolution. | |
| Parameters: | |
| val_int (int): Integer value. | |
| min_val_float (float): Minimum float value. | |
| resolution (float): Resolution factor. | |
| Returns: | |
| float: Converted float value. | |
| """ | |
| return val_int*resolution+min_val_float | |
| def get_values(self): | |
| """ | |
| Retrieve the current slider values as a NumPy array. | |
| Parameters: | |
| None | |
| Returns: | |
| np.array: Current slider values. | |
| """ | |
| values = np.zeros(self.n_slider) | |
| for s_idx in range(self.n_slider): | |
| slider = self.sliders[s_idx] | |
| values[s_idx] = self.int_to_float( | |
| val_int = slider.value(), | |
| min_val_float = self.slider_mins[s_idx], | |
| resolution = self.resolutions[s_idx], | |
| ) | |
| return values | |
| def set_value(self,s_idx,value): | |
| """ | |
| Set the value of a specific slider. | |
| Parameters: | |
| s_idx (int): Slider index. | |
| value (float): New slider value. | |
| Returns: | |
| None | |
| """ | |
| slider = self.sliders[s_idx] | |
| label = self.labels[s_idx] | |
| val_float = value | |
| val_int = self.float_to_int( | |
| val_float = val_float, | |
| min_val_float = self.slider_mins[s_idx], | |
| resolution = self.resolutions[s_idx], | |
| ) | |
| slider.setValue(val_int) # set slider value | |
| label_text = self.label_texts[s_idx] | |
| label.setText(f'{label_text}:{val_float:.2f}') | |
| def set_values(self,values): | |
| """ | |
| Set values for all sliders. | |
| Parameters: | |
| values (list or array): New slider values. | |
| Returns: | |
| None | |
| """ | |
| for s_idx in range(self.n_slider): | |
| self.set_value(s_idx=s_idx,value=values[s_idx]) | |
| def get_slider_values(self): | |
| """ | |
| Get slider values (compatible with MultiSliderClass). | |
| Parameters: | |
| None | |
| Returns: | |
| np.array: Slider values. | |
| """ | |
| return self.get_values() | |
| def set_slider_values(self,values): | |
| """ | |
| Set slider values (compatible with MultiSliderClass). | |
| Parameters: | |
| values (list or array): New slider values. | |
| Returns: | |
| None | |
| """ | |
| return self.set_values(values) | |
| def close(self): | |
| """ | |
| Close the slider widget. | |
| Parameters: | |
| None | |
| Returns: | |
| None | |
| """ | |
| super().close() | |
| def update(self): | |
| """ | |
| Dummy update function for compatibility with MultiSliderClass. | |
| Parameters: | |
| None | |
| Returns: | |
| None | |
| """ | |
| return | |
| class MultiRadioQtClass(QWidget): | |
| """ | |
| Multi radio buttons using Qt. | |
| """ | |
| def __init__( | |
| self, | |
| label_texts, | |
| option_texts, | |
| window_width = 500, | |
| window_height = 300, | |
| x_offset = 0, | |
| y_offset = 0, | |
| title = "Multi Radio Buttons", | |
| fontsize = 10, | |
| initial_texts = None, | |
| ): | |
| """ | |
| Initialize the MultiRadioQtClass with specified parameters. | |
| Parameters: | |
| label_texts (list of str): List of label texts for each row. | |
| option_texts (list of list of str): List of option texts for each row, aligned with label_texts. | |
| window_width (int): Width of the window. | |
| window_height (int): Height of the window. | |
| x_offset (int): Horizontal position of the window on screen. | |
| y_offset (int): Vertical position of the window on screen. | |
| title (str): Window title. | |
| fontsize (int): Font size for labels and buttons. | |
| initial_texts (list of str or None, optional): Initial label to select per row. | |
| """ | |
| super().__init__() | |
| self.label_texts = label_texts | |
| self.option_texts = option_texts | |
| self.n_row = len(option_texts) | |
| self.window_width = window_width | |
| self.window_height = window_height | |
| self.x_offset = x_offset | |
| self.y_offset = y_offset | |
| self.title = title | |
| self.fontsize = fontsize | |
| self.initial_texts = initial_texts if initial_texts else [None] * self.n_row | |
| self.button_groups = [] # QButtonGroup for each row | |
| self.init_ui() | |
| def init_ui(self): | |
| """ | |
| Initialize the user interface for the multi-radio button widget. | |
| """ | |
| main_layout = QVBoxLayout(self) | |
| scroll_area = QScrollArea(self) | |
| scroll_area.setWidgetResizable(True) | |
| scroll_widget = QWidget() | |
| scroll_layout = QGridLayout(scroll_widget) | |
| for row in range(self.n_row): | |
| label = QLabel(self.label_texts[row]) | |
| label.setStyleSheet(f"font-size: {self.fontsize}px;") | |
| scroll_layout.addWidget(label, row, 0) | |
| button_group = QButtonGroup(self) | |
| self.button_groups.append(button_group) | |
| for col, text in enumerate(self.option_texts[row]): | |
| radio = QRadioButton(text) | |
| radio.setStyleSheet(f"font-size: {self.fontsize}px;") | |
| button_group.addButton(radio, col) | |
| scroll_layout.addWidget(radio, row, col + 1) | |
| if self.initial_texts[row] == text: | |
| radio.setChecked(True) | |
| scroll_widget.setLayout(scroll_layout) | |
| scroll_area.setWidget(scroll_widget) | |
| main_layout.addWidget(scroll_area) | |
| self.setLayout(main_layout) | |
| self.setWindowTitle(self.title) | |
| self.setGeometry(self.x_offset, self.y_offset, self.window_width, self.window_height) | |
| self.show() | |
| def get_button_indices(self): | |
| """ | |
| Get the selected indices of radio buttons for each row. | |
| Returns: | |
| list of int: Selected index for each row, or -1 if none is selected. | |
| """ | |
| selected = [] | |
| for group in self.button_groups: | |
| checked = group.checkedId() | |
| selected.append(checked if checked >= 0 else -1) | |
| return selected | |
| def get_button_option(self, label_text): | |
| """ | |
| Get the selected option text for a specific row identified by label text. | |
| Parameters: | |
| label_text (str): The label identifying the row. | |
| Returns: | |
| str or None: The selected option text, or None if nothing is selected or label not found. | |
| """ | |
| for row, row_label in enumerate(self.label_texts): | |
| if row_label == label_text: | |
| checked_idx = self.button_groups[row].checkedId() | |
| if checked_idx >= 0: | |
| return self.option_texts[row][checked_idx] | |
| return None | |
| return None | |
| def set_button_option(self, label_text, option_text): | |
| """ | |
| Set a specific radio button as selected in the row with the given label. | |
| Parameters: | |
| label_text (str): The label identifying the row. | |
| option_text (str): The radio button text to be selected. | |
| Returns: | |
| bool: True if the option was successfully set, False otherwise. | |
| """ | |
| for row, row_label in enumerate(self.label_texts): | |
| if row_label == label_text: | |
| if option_text in self.option_texts[row]: | |
| col = self.option_texts[row].index(option_text) | |
| button = self.button_groups[row].button(col) | |
| if button: | |
| button.setChecked(True) | |
| return True | |
| else: | |
| print(f"[Warning] option_text '{option_text}' not found in row '{label_text}'.") | |
| return False | |
| print(f"[Warning] label_text '{label_text}' not found.") | |
| return False | |
| def finite_difference_matrix(n, dt, order): | |
| """ | |
| Compute a finite difference matrix for numerical differentiation. | |
| Parameters: | |
| n (int): Number of points. | |
| dt (float): Time interval between points. | |
| order (int): Derivative order (1: velocity, 2: acceleration, 3: jerk). | |
| Returns: | |
| np.array: Finite difference matrix scaled by dt^order. | |
| """ | |
| # Order | |
| if order == 1: # velocity | |
| coeffs = np.array([-1, 1]) | |
| elif order == 2: # acceleration | |
| coeffs = np.array([1, -2, 1]) | |
| elif order == 3: # jerk | |
| coeffs = np.array([-1, 3, -3, 1]) | |
| else: | |
| raise ValueError("Order must be 1, 2, or 3.") | |
| # Fill-in matrix | |
| mat = np.zeros((n, n)) | |
| for i in range(n - order): | |
| for j, c in enumerate(coeffs): | |
| mat[i, i + j] = c | |
| # (optional) Handling boundary conditions with backward differences | |
| if order == 1: # velocity | |
| mat[-1, -2:] = np.array([-1, 1]) # backward difference | |
| elif order == 2: # acceleration | |
| mat[-1, -3:] = np.array([1, -2, 1]) # backward difference | |
| mat[-2, -3:] = np.array([1, -2, 1]) # backward difference | |
| elif order == 3: # jerk | |
| mat[-1, -4:] = np.array([-1, 3, -3, 1]) # backward difference | |
| mat[-2, -4:] = np.array([-1, 3, -3, 1]) # backward difference | |
| mat[-3, -4:] = np.array([-1, 3, -3, 1]) # backward difference | |
| # Return | |
| return mat / (dt ** order) | |
| def get_A_vel_acc_jerk(n=100,dt=1e-2): | |
| """ | |
| Generate finite difference matrices for velocity, acceleration, and jerk. | |
| Parameters: | |
| n (int): Number of points. | |
| dt (float): Time interval. | |
| Returns: | |
| tuple: (A_vel, A_acc, A_jerk) matrices. | |
| """ | |
| A_vel = finite_difference_matrix(n,dt,order=1) | |
| A_acc = finite_difference_matrix(n,dt,order=2) | |
| A_jerk = finite_difference_matrix(n,dt,order=3) | |
| return A_vel,A_acc,A_jerk | |
| def diff_matrix_same_length(n: int, dt: float, order: int) -> np.ndarray: | |
| """ | |
| Build an n x n finite-difference operator for derivative of given order, | |
| using forward/backward stencils at the boundaries and central stencils inside. | |
| Parameters | |
| ---------- | |
| n : int | |
| Number of samples. | |
| dt : float | |
| Sampling interval (> 0). | |
| order : int | |
| Derivative order: 1 (velocity), 2 (acceleration), 3 (jerk). | |
| Returns | |
| ------- | |
| D : (n, n) np.ndarray | |
| Same-length finite-difference matrix so that y = D @ x has length n. | |
| Boundary rows use one-sided stencils; interior rows use central stencils. | |
| """ | |
| if n <= 0: | |
| raise ValueError("n must be positive.") | |
| if dt <= 0: | |
| raise ValueError("dt must be > 0.") | |
| if order not in (1, 2, 3): | |
| raise ValueError("order must be 1, 2, or 3.") | |
| D = np.zeros((n, n), dtype=float) | |
| if order == 1: | |
| # Row 0: forward difference [-1, 1]/dt | |
| if n >= 2: | |
| D[0, 0] = -1.0/dt | |
| D[0, 1] = 1.0/dt | |
| else: | |
| # Degenerate (n==1): use zero | |
| return D | |
| # Rows 1..n-2: central difference [-1/2, 0, 1/2]/dt | |
| for i in range(1, n-1): | |
| D[i, i-1] = -0.5/dt | |
| D[i, i+1] = 0.5/dt | |
| # Row n-1: backward difference [-1, 1]/dt applied at (n-2, n-1) | |
| if n >= 2: | |
| D[n-1, n-2] = -1.0/dt | |
| D[n-1, n-1] = 1.0/dt | |
| elif order == 2: | |
| # Row 0: 2nd-order-accurate forward second derivative [2, -5, 4, -1]/dt^2 | |
| if n >= 4: | |
| s = 1.0/(dt**2) | |
| D[0, 0] = 2*s | |
| D[0, 1] = -5*s | |
| D[0, 2] = 4*s | |
| D[0, 3] = -1*s | |
| elif n >= 3: | |
| # Fallback to simple forward [1, -2, 1]/dt^2 | |
| s = 1.0/(dt**2) | |
| D[0, 0] = 1*s | |
| D[0, 1] = -2*s | |
| D[0, 2] = 1*s | |
| # Rows 1..n-2: central [1, -2, 1]/dt^2 | |
| s = 1.0/(dt**2) | |
| for i in range(1, n-1): | |
| D[i, i-1] = 1*s | |
| D[i, i] = -2*s | |
| D[i, i+1] = 1*s | |
| # Row n-1: 2nd-order-accurate backward second derivative [-1, 4, -5, 2]/dt^2 | |
| if n >= 4: | |
| D[n-1, n-4] = -1*s | |
| D[n-1, n-3] = 4*s | |
| D[n-1, n-2] = -5*s | |
| D[n-1, n-1] = 2*s | |
| elif n >= 3: | |
| # Fallback to simple backward [1, -2, 1]/dt^2 anchored at end | |
| D[n-1, n-3] = 1*s | |
| D[n-1, n-2] = -2*s | |
| D[n-1, n-1] = 1*s | |
| else: # order == 3 | |
| # Central 3rd-derivative stencil: [1, -2, 0, 2, -1]/(2 h^3) | |
| s_c = 1.0/(2*dt**3) | |
| # Forward 3rd-derivative: [-1, 3, -3, 1]/h^3 at the start | |
| s_f = 1.0/(dt**3) | |
| # Backward 3rd-derivative: [ -1, 3, -3, 1 ] reversed with sign → [1, -3, 3, -1]/h^3 | |
| s_b = 1.0/(dt**3) | |
| if n >= 4: | |
| # Row 0: forward | |
| D[0, 0] = -1*s_f | |
| D[0, 1] = 3*s_f | |
| D[0, 2] = -3*s_f | |
| D[0, 3] = 1*s_f | |
| elif n >= 3: | |
| # Fallback: use smaller-width forward if needed (less accurate) | |
| D[0, 0] = -1/(dt**3) | |
| D[0, 1] = 3/(dt**3) | |
| D[0, 2] = -2/(dt**3) # crude fallback | |
| # Rows 2..n-3: central (need i-2..i+2 in-bounds) | |
| for i in range(2, n-2): | |
| D[i, i-2] = 1*s_c | |
| D[i, i-1] = -2*s_c | |
| D[i, i+1] = 2*s_c | |
| D[i, i+2] = -1*s_c | |
| # Row 1: still near the start → use forward again | |
| if n >= 4: | |
| D[1, 1] = -1*s_f | |
| D[1, 2] = 3*s_f | |
| D[1, 3] = -3*s_f | |
| D[1, 4 if n>4 else 3] += 1*s_f # guard if n==4 | |
| if n == 4: | |
| # collapse the last coefficient into idx=3 if n==4 | |
| pass | |
| # Row n-2: near the end → use backward | |
| if n >= 4: | |
| i = n-2 | |
| D[i, i-3] = 1*s_b | |
| D[i, i-2] = -3*s_b | |
| D[i, i-1] = 3*s_b | |
| D[i, i] = -1*s_b | |
| # Row n-1: last row → backward | |
| if n >= 4: | |
| i = n-1 | |
| D[i, i-3] = 1*s_b | |
| D[i, i-2] = -3*s_b | |
| D[i, i-1] = 3*s_b | |
| D[i, i] = -1*s_b | |
| return D | |
| def get_A_vel_acc_jerk_same_length(n=100,dt=1e-2): | |
| """ | |
| Generate finite difference matrices for velocity, acceleration, and jerk. | |
| Parameters: | |
| n (int): Number of points. | |
| dt (float): Time interval. | |
| Returns: | |
| tuple: (A_vel, A_acc, A_jerk) matrices. | |
| """ | |
| A_vel = diff_matrix_same_length(n,dt,order=1) | |
| A_acc = diff_matrix_same_length(n,dt,order=2) | |
| A_jerk = diff_matrix_same_length(n,dt,order=3) | |
| return A_vel,A_acc,A_jerk | |
| def smooth_optm_1d( | |
| traj, | |
| dt = 0.1, | |
| x_init = None, | |
| x_final = None, | |
| vel_init = None, | |
| vel_final = None, | |
| x_lower = None, | |
| x_upper = None, | |
| vel_limit = None, | |
| acc_limit = None, | |
| jerk_limit = None, | |
| idxs_remain = None, | |
| vals_remain = None, | |
| p_norm = 2, | |
| verbose = True, | |
| ): | |
| """ | |
| Perform 1-D smoothing of a trajectory using optimization. | |
| Parameters: | |
| traj (array): Original trajectory. | |
| dt (float): Time interval. | |
| x_init (float, optional): Initial position. | |
| x_final (float, optional): Final position. | |
| vel_init (float, optional): Initial velocity. | |
| vel_final (float, optional): Final velocity. | |
| x_lower (float, optional): Lower position bound. | |
| x_upper (float, optional): Upper position bound. | |
| vel_limit (float, optional): Maximum velocity. | |
| acc_limit (float, optional): Maximum acceleration. | |
| jerk_limit (float, optional): Maximum jerk. | |
| idxs_remain (array, optional): Fixed indices. | |
| vals_remain (array, optional): Values at fixed indices. | |
| p_norm (int): Norm degree for objective. | |
| verbose (bool): Verbosity flag. | |
| Returns: | |
| np.array: Smoothed trajectory. | |
| """ | |
| n = len(traj) | |
| A_pos = np.eye(n,n) | |
| A_vel,A_acc,A_jerk = get_A_vel_acc_jerk(n=n,dt=dt) | |
| # Objective | |
| x = cp.Variable(n) | |
| objective = cp.Minimize(cp.norm(x-traj,p_norm)) | |
| # Equality constraints | |
| A_list,b_list = [],[] | |
| if x_init is not None: | |
| A_list.append(A_pos[0,:]) | |
| b_list.append(x_init) | |
| if x_final is not None: | |
| A_list.append(A_pos[-1,:]) | |
| b_list.append(x_final) | |
| if vel_init is not None: | |
| A_list.append(A_vel[0,:]) | |
| b_list.append(vel_init) | |
| if vel_final is not None: | |
| A_list.append(A_vel[-1,:]) | |
| b_list.append(vel_final) | |
| if idxs_remain is not None: | |
| A_list.append(A_pos[idxs_remain,:]) | |
| if vals_remain is not None: | |
| b_list.append(vals_remain) | |
| else: | |
| b_list.append(traj[idxs_remain]) | |
| # Inequality constraints | |
| C_list,d_list = [],[] | |
| if x_lower is not None: | |
| C_list.append(-A_pos) | |
| d_list.append(-x_lower*np.ones(n)) | |
| if x_upper is not None: | |
| C_list.append(A_pos) | |
| d_list.append(x_upper*np.ones(n)) | |
| if vel_limit is not None: | |
| C_list.append(A_vel) | |
| C_list.append(-A_vel) | |
| d_list.append(vel_limit*np.ones(n)) | |
| d_list.append(vel_limit*np.ones(n)) | |
| if acc_limit is not None: | |
| C_list.append(A_acc) | |
| C_list.append(-A_acc) | |
| d_list.append(acc_limit*np.ones(n)) | |
| d_list.append(acc_limit*np.ones(n)) | |
| if jerk_limit is not None: | |
| C_list.append(A_jerk) | |
| C_list.append(-A_jerk) | |
| d_list.append(jerk_limit*np.ones(n)) | |
| d_list.append(jerk_limit*np.ones(n)) | |
| constraints = [] | |
| if A_list: | |
| A = np.vstack(A_list) | |
| b = np.hstack(b_list).squeeze() | |
| constraints.append(A @ x == b) | |
| if C_list: | |
| C = np.vstack(C_list) | |
| d = np.hstack(d_list).squeeze() | |
| constraints.append(C @ x <= d) | |
| # Solve | |
| prob = cp.Problem(objective, constraints) | |
| prob.solve(solver=cp.CLARABEL) | |
| # Return | |
| traj_smt = x.value | |
| # Null check | |
| if traj_smt is None and verbose: | |
| print ("[smooth_optm_1d] Optimization failed.") | |
| return traj_smt | |
| def smooth_gaussian_1d(traj,sigma=5.0,mode='nearest',radius=5): | |
| """ | |
| Smooth a 1-D trajectory using a Gaussian filter. | |
| Parameters: | |
| traj (array): Input trajectory. | |
| sigma (float): Gaussian standard deviation. | |
| mode (str): Filter mode. | |
| radius (int): Radius for filtering. | |
| Returns: | |
| np.array: Smoothed trajectory. | |
| """ | |
| from scipy.ndimage import gaussian_filter1d | |
| traj_smt = gaussian_filter1d( | |
| input = traj, | |
| sigma = sigma, | |
| mode = 'nearest', | |
| radius = int(radius), | |
| ) | |
| return traj_smt | |
| def plot_traj_vel_acc_jerk( | |
| t, | |
| traj, | |
| traj_smt = None, | |
| figsize = (6,6), | |
| title = 'Trajectory', | |
| ): | |
| """ | |
| Plot trajectory, velocity, acceleration, and jerk. | |
| Parameters: | |
| t (array): Time vector. | |
| traj (array): Original trajectory. | |
| traj_smt (array, optional): Smoothed trajectory. | |
| figsize (tuple): Figure size. | |
| title (str): Plot title. | |
| Returns: | |
| None | |
| """ | |
| n = len(t) | |
| dt = t[1]-t[0] | |
| # Compute velocity, acceleration, and jerk | |
| A_vel,A_acc,A_jerk = get_A_vel_acc_jerk(n=n,dt=dt) | |
| vel = A_vel @ traj | |
| acc = A_acc @ traj | |
| jerk = A_jerk @ traj | |
| if traj_smt is not None: | |
| vel_smt = A_vel @ traj_smt | |
| acc_smt = A_acc @ traj_smt | |
| jerk_smt = A_jerk @ traj_smt | |
| # Plot | |
| plt.figure(figsize=figsize) | |
| plt.subplot(4, 1, 1) | |
| plt.plot(t,traj,'.-',ms=1,color='k',lw=1/5,label='Trajectory') | |
| if traj_smt is not None: | |
| plt.plot(t,traj_smt,'.-',ms=1,color='r',lw=1/5,label='Smoothed Trajectory') | |
| plt.legend(fontsize=8,loc='upper right') | |
| plt.subplot(4, 1, 2) | |
| plt.plot(t,vel,'.-',ms=1,color='k',lw=1/5,label='Velocity') | |
| if traj_smt is not None: | |
| plt.plot(t,vel_smt,'.-',ms=1,color='r',lw=1/5,label='Smoothed Velocity') | |
| plt.legend(fontsize=8,loc='upper right') | |
| plt.subplot(4, 1, 3) | |
| plt.plot(t,acc,'.-',ms=1,color='k',lw=1/5,label='Acceleration') | |
| if traj_smt is not None: | |
| plt.plot(t,acc_smt,'.-',ms=1,color='r',lw=1/5,label='Smoothed Acceleration') | |
| plt.legend(fontsize=8,loc='upper right') | |
| plt.subplot(4, 1, 4) | |
| plt.plot(t,jerk,'.-',ms=1,color='k',lw=1/5,label='Jerk') | |
| if traj_smt is not None: | |
| plt.plot(t,jerk_smt,'.-',ms=1,color='r',lw=1/5,label='Smoothed Jerk') | |
| plt.legend(fontsize=8,loc='upper right') | |
| plt.suptitle(title,fontsize=10) | |
| plt.subplots_adjust(hspace=0.2,top=0.95) | |
| plt.show() | |
| def kernel_se(X1,X2,hyp={'g':1,'l':1,'w':0}): | |
| """ | |
| Compute the squared exponential (SE) kernel. | |
| Parameters: | |
| X1 (array): First input set. | |
| X2 (array): Second input set. | |
| hyp (dict): Kernel hyperparameters. | |
| Returns: | |
| np.array: Kernel matrix. | |
| """ | |
| if len(X1.shape) == 1: | |
| X1_use = X1.reshape(-1,1) | |
| else: | |
| X1_use = X1 | |
| if len(X2.shape) == 1: | |
| X2_use = X2.reshape(-1,1) | |
| else: | |
| X2_use = X2 | |
| K = hyp['g']*hyp['g']*np.exp(-cdist(X1_use,X2_use,'sqeuclidean')/(2*hyp['l']*hyp['l'])) | |
| if (hyp['w']>0) and (K.shape[0]==K.shape[1]): | |
| K = K + hyp['w']*np.eye(K.shape[0]) # add diag | |
| return K | |
| def kernel_levse(X1,X2,L1,L2,hyp={'g':1,'l':1}): | |
| """ | |
| Compute the leveraged squared exponential (SE) kernel. | |
| Parameters: | |
| X1 (array): First input set. | |
| X2 (array): Second input set. | |
| L1 (array): Leverage values for X1. | |
| L2 (array): Leverage values for X2. | |
| hyp (dict): Kernel hyperparameters. | |
| Returns: | |
| np.array: Leveraged kernel matrix. | |
| """ | |
| K = hyp['g']*np.exp(-cdist(X1,X2,'sqeuclidean')/(2*hyp['l']*hyp['l'])) | |
| L = np.cos(np.pi/2.0*cdist(L1,L2,'cityblock')) | |
| return np.multiply(K,L) | |
| def safe_chol(A,upper=False,max_iter=100,eps=1e-20,verbose=False): | |
| """ | |
| Perform safe Cholesky decomposition with jitter. | |
| Parameters: | |
| A (np.array): Matrix to decompose. | |
| max_iter (int): Maximum iterations. | |
| eps (float): Initial epsilon value. | |
| verbose (bool): Verbosity flag. | |
| Returns: | |
| np.array or None: Cholesky factor or None if failed. | |
| """ | |
| A_use = A.copy() | |
| for iter in range(max_iter): | |
| try: | |
| L = np.linalg.cholesky(A_use,upper=upper) | |
| if verbose: | |
| print ("[safe_chol] Cholesky succeeded. iter:[%d] eps:[%.2e]"%(iter,eps)) | |
| return L | |
| except np.linalg.LinAlgError: | |
| A_use = A_use + eps*np.eye(A.shape[0]) | |
| eps *= 10 | |
| print ("[safe_chol] Cholesky failed. iter:[%d] eps:[%.2e]"%(iter,eps)) | |
| return None | |
| def soft_squash(x,x_min=-1,x_max=+1,margin=0.1): | |
| """ | |
| Apply soft squashing to constrain array values. | |
| Parameters: | |
| x (np.array): Input array. | |
| x_min (float): Minimum value. | |
| x_max (float): Maximum value. | |
| margin (float): Margin for squashing. | |
| Returns: | |
| np.array: Squashed array. | |
| """ | |
| def th(z,m=0.0): | |
| # thresholding function | |
| return (m)*(np.exp(2/m*z)-1)/(np.exp(2/m*z)+1) | |
| x_in = np.copy(x) | |
| idxs_upper = np.where(x_in>(x_max-margin)) | |
| x_in[idxs_upper] = th(x_in[idxs_upper]-(x_max-margin),m=margin) + (x_max-margin) | |
| idxs_lower = np.where(x_in<(x_min+margin)) | |
| x_in[idxs_lower] = th(x_in[idxs_lower]-(x_min+margin),m=margin) + (x_min+margin) | |
| return x_in | |
| def soft_squash_multidim( | |
| x = np.random.randn(100,5), | |
| x_min = -np.ones(5), | |
| x_max = np.ones(5), | |
| margin = 0.1, | |
| ): | |
| """ | |
| Apply soft squashing dimension-wise for multi-dimensional arrays. | |
| Parameters: | |
| x (np.array): Input array. | |
| x_min (array): Minimum values per dimension. | |
| x_max (array): Maximum values per dimension. | |
| margin (float): Margin for squashing. | |
| Returns: | |
| np.array: Squashed multi-dimensional array. | |
| """ | |
| x_squash = np.copy(x) | |
| dim = x.shape[1] | |
| for d_idx in range(dim): | |
| x_squash[:,d_idx] = soft_squash( | |
| x=x[:,d_idx],x_min=x_min[d_idx],x_max=x_max[d_idx],margin=margin) | |
| return x_squash | |
| def get_idxs_closest_ndarray(ndarray_query,ndarray_domain): | |
| """ | |
| Get indices of closest elements in ndarray_domain for each query element. | |
| Parameters: | |
| ndarray_query (array): Query array. | |
| ndarray_domain (array): Domain array. | |
| Returns: | |
| list: Indices of closest matches. | |
| """ | |
| return [np.argmin(np.abs(ndarray_query-x)) for x in ndarray_domain] | |
| def get_interp_const_vel_traj_nd( | |
| anchors, # [L x D] | |
| vel = 1.0, | |
| HZ = 100, | |
| ord = np.inf, | |
| ): | |
| """ | |
| Generate a linearly interpolated (with constant velocity) trajectory. | |
| Parameters: | |
| anchors (array): Anchor points [L x D]. | |
| vel (float): Constant velocity. | |
| HZ (int): Sampling frequency. | |
| ord (float): Norm order for distance. | |
| Returns: | |
| tuple: (times_interp, anchors_interp, times_anchor, idxs_anchor) | |
| """ | |
| L = anchors.shape[0] | |
| D = anchors.shape[1] | |
| dists = np.zeros(L) | |
| for tick in range(L): | |
| if tick > 0: | |
| p_prev,p_curr = anchors[tick-1,:],anchors[tick,:] | |
| dists[tick] = np.linalg.norm(p_prev-p_curr,ord=ord) | |
| times_anchor = np.cumsum(dists/vel) # [L] | |
| L_interp = int(times_anchor[-1]*HZ) | |
| times_interp = np.linspace(0,times_anchor[-1],L_interp) # [L_interp] | |
| anchors_interp = np.zeros((L_interp,D)) # [L_interp x D] | |
| for d_idx in range(D): # for each dim | |
| anchors_interp[:,d_idx] = np.interp(times_interp,times_anchor,anchors[:,d_idx]) | |
| idxs_anchor = get_idxs_closest_ndarray(times_interp,times_anchor) | |
| return times_interp,anchors_interp,times_anchor,idxs_anchor | |
| def get_smoothed_equidistant_xy_traj( | |
| xy_traj_raw, # [L x 2] | |
| inter_dist = 0.1, # inter distance (m) | |
| acc_limit = 10, | |
| ): | |
| """ | |
| Obtain a smoothed, equidistant XY trajectory. | |
| Parameters: | |
| xy_traj_raw (array): Raw XY trajectory [L x 2]. | |
| inter_dist (float): Interpolation distance. | |
| acc_limit (float): Acceleration limit. | |
| Returns: | |
| np.array: Smoothed XY trajectory. | |
| """ | |
| D = xy_traj_raw.shape[1] # dim=2 | |
| HZ = int(1.0/inter_dist) | |
| # Interpolate the trajectory | |
| _,xy_traj_interp,_,_ = get_interp_const_vel_traj_nd( | |
| anchors = xy_traj_raw, | |
| vel = 1.0, | |
| HZ = HZ, | |
| ) # interpolate the trajectory with constant velocity | |
| # Smooth the trajectory | |
| xy_traj_smt = np.zeros_like(xy_traj_interp) # [L x 2] | |
| for d_idx in range(D): # smooth interpolated trajectory | |
| traj = xy_traj_interp[:,d_idx] | |
| xy_traj_smt[:,d_idx] = smooth_optm_1d( | |
| traj, | |
| dt = 1/HZ, | |
| x_init = traj[0], | |
| x_final = traj[-1], | |
| acc_limit = acc_limit, # smoothing acceleration limit | |
| ) | |
| # Return | |
| return xy_traj_smt | |
| def interpolate_and_smooth_nd( | |
| anchors, # List or [N x D] | |
| HZ = 50, | |
| vel_init = 0.0, | |
| vel_final = 0.0, | |
| x_lowers = None, # [D] | |
| x_uppers = None, # [D] | |
| vel_limit = None, # [1] | |
| acc_limit = None, # [1] | |
| jerk_limit = None, # [1] | |
| vel_interp_max = np.deg2rad(180), | |
| vel_interp_min = np.deg2rad(10), | |
| n_interp = 10, | |
| verbose = False, | |
| ): | |
| """ | |
| Interpolate and smooth multi-dimensional anchor points. | |
| Parameters: | |
| anchors (array or list): Anchor points [N x D]. | |
| HZ (int): Sampling frequency. | |
| vel_init (float): Initial velocity. | |
| vel_final (float): Final velocity. | |
| x_lowers (array, optional): Lower bounds. | |
| x_uppers (array, optional): Upper bounds. | |
| vel_limit (float, optional): Velocity limit. | |
| acc_limit (float, optional): Acceleration limit. | |
| jerk_limit (float, optional): Jerk limit. | |
| vel_interp_max (float): Maximum interpolation velocity. | |
| vel_interp_min (float): Minimum interpolation velocity. | |
| n_interp (int): Number of interpolation steps. | |
| verbose (bool): Verbosity flag. | |
| Returns: | |
| tuple: (times, traj_interp, traj_smt, times_anchor) | |
| """ | |
| if isinstance(anchors, list): | |
| # If 'anchors' is given as a list, make it an ndarray | |
| anchors = np.vstack(anchors) | |
| D = anchors.shape[1] | |
| vels = np.linspace(start=vel_interp_max,stop=vel_interp_min,num=n_interp) | |
| for v_idx,vel_interp in enumerate(vels): | |
| # First, interploate | |
| times,traj_interp,times_anchor,idxs_anchor = get_interp_const_vel_traj_nd( | |
| anchors = anchors, | |
| vel = vel_interp, | |
| HZ = HZ, | |
| ) | |
| dt = times[1] - times[0] | |
| # Second, smooth | |
| traj_smt = np.zeros_like(traj_interp) | |
| is_success = True | |
| for d_idx in range(D): | |
| traj_d = traj_interp[:,d_idx] | |
| if x_lowers is not None: x_lower_d = x_lowers[d_idx] | |
| else: x_lower_d = None | |
| if x_uppers is not None: x_upper_d = x_uppers[d_idx] | |
| else: x_upper_d = None | |
| traj_smt_d = smooth_optm_1d( | |
| traj = traj_d, | |
| dt = dt, | |
| idxs_remain = idxs_anchor, | |
| vals_remain = anchors[:,d_idx], | |
| vel_init = vel_init, | |
| vel_final = vel_final, | |
| x_lower = x_lower_d, | |
| x_upper = x_upper_d, | |
| vel_limit = vel_limit, | |
| acc_limit = acc_limit, | |
| jerk_limit = jerk_limit, | |
| p_norm = 2, | |
| verbose = False, | |
| ) | |
| if traj_smt_d is None: | |
| is_success = False | |
| break | |
| # Append | |
| traj_smt[:,d_idx] = traj_smt_d | |
| # Check success | |
| if is_success: | |
| if verbose: | |
| print ("Optimization succeeded. vel_interp:[%.3f]"%(vel_interp)) | |
| return times,traj_interp,traj_smt,times_anchor | |
| else: | |
| if verbose: | |
| print (" v_idx:[%d/%d] vel_interp:[%.2f] failed."%(v_idx,n_interp,vel_interp)) | |
| # Optimization failed | |
| if verbose: | |
| print ("Optimization failed.") | |
| return times,traj_interp,traj_smt,times_anchor | |
| def check_vel_acc_jerk_nd( | |
| times, # [L] | |
| traj, # [L x D] | |
| verbose = True, | |
| factor = 1.0, | |
| ): | |
| """ | |
| Check velocity, acceleration, and jerk of an n-dimensional trajectory. | |
| Parameters: | |
| times (array): Time vector. | |
| traj (array): Trajectory [L x D]. | |
| verbose (bool): Verbosity flag. | |
| factor (float): Scaling factor for display. | |
| Returns: | |
| tuple: (vel_inits, vel_finals, max_vels, max_accs, max_jerks) | |
| """ | |
| L,D = traj.shape[0],traj.shape[1] | |
| A_vel,A_acc,A_jerk = get_A_vel_acc_jerk(n=len(times),dt=times[1]-times[0]) | |
| vel_inits,vel_finals,max_vels,max_accs,max_jerks = [],[],[],[],[] | |
| for d_idx in range(D): | |
| traj_d = traj[:,d_idx] | |
| vel = A_vel @ traj_d | |
| acc = A_acc @ traj_d | |
| jerk = A_jerk @ traj_d | |
| vel_inits.append(vel[0]) | |
| vel_finals.append(vel[-1]) | |
| max_vels.append(np.abs(vel).max()) | |
| max_accs.append(np.abs(acc).max()) | |
| max_jerks.append(np.abs(jerk).max()) | |
| if verbose: | |
| print ("Checking velocity, acceleration, and jerk of a L:[%d]xD:[%d] trajectory (factor:[%.2f])."% | |
| (L,D,factor)) | |
| for d_idx in range(D): | |
| print (" dim:[%d/%d]: v_init:[%.2e] v_final:[%.2e] v_max:[%.2f] a_max:[%.2f] j_max:[%.2f]"% | |
| (d_idx,D, | |
| factor*vel_inits[d_idx],factor*vel_finals[d_idx], | |
| factor*max_vels[d_idx],factor*max_accs[d_idx],factor*max_jerks[d_idx]) | |
| ) | |
| # Return | |
| return vel_inits,vel_finals,max_vels,max_accs,max_jerks | |
| def animate_chains_slider( | |
| env, | |
| secs, | |
| chains, | |
| transparent = True, | |
| black_sky = True, | |
| r_link = 0.005, | |
| rgba_link = (0.05,0.05,0.05,0.9), | |
| plot_joint = True, | |
| plot_joint_axis = True, | |
| plot_joint_sphere = False, | |
| plot_joint_name = False, | |
| axis_len_joint = 0.05, | |
| axis_width_joint = 0.005, | |
| plot_rev_axis = True, | |
| ): | |
| """ | |
| Animate a sequence of chains with a slider interface. | |
| Parameters: | |
| env: Simulation environment. | |
| secs (array): Time stamps. | |
| chains (list): List of chain objects. | |
| transparent (bool): Use transparent viewer background. | |
| black_sky (bool): Use black sky background. | |
| r_link (float): Link radius. | |
| rgba_link (tuple): RGBA color for links. | |
| plot_joint (bool): Plot joints. | |
| plot_joint_axis (bool): Plot joint axes. | |
| plot_joint_sphere (bool): Plot joint spheres. | |
| plot_joint_name (bool): Display joint names. | |
| axis_len_joint (float): Length of joint axes. | |
| axis_width_joint (float): Width of joint axes. | |
| plot_rev_axis (bool): Plot reverse axes. | |
| Returns: | |
| None | |
| """ | |
| # Reset | |
| env.reset(step=True) | |
| # Initialize slider | |
| L = len(secs) | |
| sliders = MultiSliderClass( | |
| n_slider = 2, | |
| title = 'Slider Tick', | |
| window_width = 900, | |
| window_height = 100, | |
| x_offset = 100, | |
| y_offset = 100, | |
| slider_width = 600, | |
| label_texts = ['tick','mode (0:play,1:slider,2:reverse)'], | |
| slider_mins = [0,0], | |
| slider_maxs = [L-1,2], | |
| slider_vals = [0,1.0], | |
| resolutions = [0.1,1.0], | |
| verbose = False, | |
| ) | |
| # Loop | |
| env.init_viewer( | |
| transparent = transparent, | |
| black_sky = black_sky, | |
| ) | |
| tick,mode = 0,'slider' # 'play' / 'slider' | |
| while env.is_viewer_alive(): | |
| # Update | |
| env.increase_tick() | |
| sliders.update() # update slider | |
| chain = chains[tick] | |
| sec = secs[tick] | |
| # Mode change | |
| if sliders.get_slider_values()[1] == 0.0: mode = 'play' | |
| elif sliders.get_slider_values()[1] == 1.0: mode = 'slider' | |
| elif sliders.get_slider_values()[1] == 2.0: mode = 'reverse' | |
| # Render | |
| if env.loop_every(tick_every=20) or (mode=='play') or (mode=='reverse'): | |
| chain.plot_chain_mujoco( | |
| env, | |
| r_link = r_link, | |
| rgba_link = rgba_link, | |
| plot_joint = plot_joint, | |
| plot_joint_axis = plot_joint_axis, | |
| plot_joint_sphere = plot_joint_sphere, | |
| plot_joint_name = plot_joint_name, | |
| axis_len_joint = axis_len_joint, | |
| axis_width_joint = axis_width_joint, | |
| plot_rev_axis = plot_rev_axis, | |
| ) | |
| env.plot_T(p=np.array([0,0,0]),R=np.eye(3,3)) | |
| # env.plot_time(p=np.array([0,0,1]),post_str=' mode:[%s]'%(mode)) | |
| env.plot_text( | |
| p = np.array([0,0,1]), | |
| label = '[%d] tick:[%d] time:[%.2f]sec mode:[%s]'%(env.tick,tick,sec,mode) | |
| ) | |
| env.render() | |
| # Proceed | |
| if mode == 'play': | |
| if tick < len(chains)-1: tick = tick + 1 | |
| sliders.set_slider_value(slider_idx=0,slider_value=tick) | |
| elif mode == 'slider': | |
| tick = int(sliders.get_slider_values()[0]) | |
| elif mode == 'reverse': | |
| if tick > 0: tick = tick - 1 | |
| sliders.set_slider_value(slider_idx=0,slider_value=tick) | |
| # Close viewer and slider | |
| env.close_viewer() | |
| sliders.close() | |
| def animate_chains_slider_qt( | |
| env, | |
| chains, | |
| secs, | |
| ): | |
| """ | |
| Animate a sequence of chains with a slider interface (Qt). | |
| """ | |
| env.reset() | |
| env.init_viewer(height=env.monitor_height,transparent=True) | |
| L = len(chains) | |
| sliders = MultiSliderQtClass( | |
| n_slider = 1, | |
| window_width = 500, | |
| window_height = 100, | |
| slider_width = 300, | |
| label_width = 150, | |
| label_texts = ['Tick'], | |
| label_colors = ['w'], | |
| slider_mins = np.array([0]), | |
| slider_maxs = np.array([L-1]), | |
| slider_vals = np.array([0,0,0,0,0,0]), | |
| verbose = False, | |
| ) | |
| # Loop | |
| while env.is_viewer_alive(): | |
| tick = int(sliders.get_values()[0]) | |
| chains[tick].plot_chain_mujoco(env) | |
| env.viewer_text_overlay('Tick','%d'%(tick)) | |
| env.viewer_text_overlay('Time','%.2fsec'%(secs[tick])) | |
| env.plot_joint_axis() | |
| env.render() | |
| # Close viewer and slider | |
| env.close_viewer() | |
| sliders.close() | |
| def animate_env_qpos_list( | |
| env, | |
| secs, | |
| qpos_list, | |
| viewer_title = '', | |
| plot_contact_info = True, | |
| transparent = True, | |
| black_sky = True, | |
| ): | |
| """ | |
| Animate the environment using a list of configuration positions. | |
| Parameters: | |
| env: Simulation environment. | |
| secs (array): Time stamps. | |
| qpos_list (list): List of qpos configurations. | |
| viewer_title (str): Viewer window title. | |
| plot_contact_info (bool): Plot contact information. | |
| transparent (bool): Use transparent viewer background. | |
| black_sky (bool): Use black sky background. | |
| Returns: | |
| None | |
| """ | |
| # Reset | |
| env.reset(step=True) | |
| # Initialize slider | |
| L = len(secs) | |
| sliders = MultiSliderClass( | |
| n_slider = 2, | |
| title = 'Slider Tick', | |
| window_width = 900, | |
| window_height = 100, | |
| x_offset = 100, | |
| y_offset = 100, | |
| slider_width = 600, | |
| label_texts = ['tick','mode (0:play,1:slider,2:reverse)'], | |
| slider_mins = [0,0], | |
| slider_maxs = [L-1,2], | |
| slider_vals = [0,1.0], | |
| resolutions = [0.1,1.0], | |
| verbose = False, | |
| ) | |
| # Loop | |
| env.init_viewer( | |
| transparent = transparent, | |
| title = viewer_title, | |
| black_sky = black_sky, | |
| ) | |
| tick,mode = 0,'slider' # 'play' / 'slider' | |
| while env.is_viewer_alive(): | |
| # Update | |
| # env.increase_tick() | |
| sliders.update() # update slider | |
| qpos = qpos_list[tick] | |
| env.forward(q=qpos) | |
| sec = secs[tick] | |
| # Mode change | |
| if sliders.get_slider_values()[1] == 0.0: mode = 'play' | |
| elif sliders.get_slider_values()[1] == 1.0: mode = 'slider' | |
| elif sliders.get_slider_values()[1] == 2.0: mode = 'reverse' | |
| # Render | |
| if env.loop_every(tick_every=20) or (mode=='play') or (mode=='reverse'): | |
| env.plot_T(p=np.array([0,0,0]),R=np.eye(3,3)) | |
| env.viewer.add_overlay(loc='bottom left',text1='tick',text2='%d'%(env.tick)) | |
| env.viewer.add_overlay(loc='bottom left',text1='sim time (sec)',text2='%.2f'%(sec)) | |
| env.viewer.add_overlay(loc='bottom left',text1='mode',text2='%s'%(mode)) | |
| if plot_contact_info: | |
| env.plot_contact_info() | |
| env.render() | |
| # Proceed | |
| if mode == 'play': | |
| if tick < len(qpos_list)-1: tick = tick + 1 | |
| sliders.set_slider_value(slider_idx=0,slider_value=tick) | |
| if tick == (L-1): mode = 'slider' | |
| elif mode == 'slider': | |
| tick = int(sliders.get_slider_values()[0]) | |
| elif mode == 'reverse': | |
| if tick > 0: tick = tick - 1 | |
| sliders.set_slider_value(slider_idx=0,slider_value=tick) | |
| # Close viewer and slider | |
| env.close_viewer() | |
| sliders.close() | |
| def np_uv(vec): | |
| """ | |
| Compute the unit vector of the given vector. | |
| Parameters: | |
| vec (array): Input vector. | |
| Returns: | |
| np.array: Unit vector. | |
| """ | |
| x = np.array(vec) | |
| len = np.linalg.norm(x) | |
| if len <= 1e-6: | |
| return np.array([0,0,1]) # if zero vector, return default unit vector | |
| else: | |
| return x/len | |
| def uv_T_joi(T_joi,joi_fr,joi_to): | |
| """ | |
| Compute the unit vector between two JOI poses. | |
| Parameters: | |
| T_joi (array): Transformation matrices. | |
| joi_fr: Starting index. | |
| joi_to: Ending index. | |
| Returns: | |
| np.array: Unit vector from joi_fr to joi_to. | |
| """ | |
| return np_uv(t2p(T_joi[joi_to]) - t2p(T_joi[joi_fr])) | |
| def len_T_joi(T_joi,joi_fr,joi_to): | |
| """ | |
| Compute the Euclidean distance between two JOI poses. | |
| Parameters: | |
| T_joi (array): Transformation matrices. | |
| joi_fr: Starting index. | |
| joi_to: Ending index. | |
| Returns: | |
| float: Distance between the poses. | |
| """ | |
| return np.linalg.norm(t2p(T_joi[joi_to]) - t2p(T_joi[joi_fr])) | |
| def get_consecutive_subarrays(array,min_element=1): | |
| """ | |
| Extract consecutive subarrays from an array. | |
| Parameters: | |
| array (array): Input array. | |
| min_element (int): Minimum number of elements per subarray. | |
| Returns: | |
| list: List of consecutive subarrays. | |
| """ | |
| split_points = np.where(np.diff(array) != 1)[0] + 1 | |
| subarrays = np.split(array,split_points) | |
| return [subarray for subarray in subarrays if len(subarray) >= min_element] | |
| def load_image(image_path): | |
| """ | |
| Load an image from a file and return it as a NumPy array. | |
| Parameters: | |
| image_path (str): Path to the image file. | |
| Returns: | |
| np.array: Loaded image in uint8 format. | |
| """ | |
| return np.array(Image.open(image_path)) | |
| def imshows(img_list,title_list,figsize=(8,2),fontsize=8): | |
| """ | |
| Display multiple images in a row with titles. | |
| Parameters: | |
| img_list (list): List of images. | |
| title_list (list): List of titles. | |
| figsize (tuple): Figure size. | |
| fontsize (int): Font size for titles. | |
| Returns: | |
| None | |
| """ | |
| n_img = len(img_list) | |
| plt.figure(figsize=(8,2)) | |
| for img_idx in range(n_img): | |
| img = img_list[img_idx] | |
| title = title_list[img_idx] | |
| plt.subplot(1,n_img,img_idx+1) | |
| plt.imshow(img) | |
| plt.axis('off') | |
| plt.title(title,fontsize=fontsize) | |
| plt.show() | |
| def depth_to_gray_img(depth,max_val=10.0): | |
| """ | |
| Convert a 1-channel float depth image to a 3-channel uint8 gray image. | |
| Parameters: | |
| depth (array): Input depth image. | |
| max_val (float): Maximum value for normalization. | |
| Returns: | |
| np.array: 3-channel gray image. | |
| """ | |
| depth_clip = np.clip(depth,a_min=0.0,a_max=max_val) # float-type | |
| # Normalize depth image | |
| img = np.tile(255*depth_clip[:,:,np.newaxis]/depth_clip.max(),(1,1,3)).astype(np.uint8) # unit8-type | |
| return img | |
| def get_monitor_size(): | |
| """ | |
| Retrieve the current monitor size. | |
| Parameters: | |
| None | |
| Returns: | |
| tuple: (width, height) | |
| """ | |
| w,h = pyautogui.size() | |
| return w,h | |
| def get_xml_string_from_path(xml_path): | |
| """ | |
| Parse an XML file and return its content as a string. | |
| Parameters: | |
| xml_path (str): Path to the XML file. | |
| Returns: | |
| str: XML content. | |
| """ | |
| # Parse the XML file | |
| tree = ET.parse(xml_path) | |
| # Get the root element of the XML | |
| root = tree.getroot() | |
| # Convert the ElementTree object to a string | |
| xml_string = ET.tostring(root, encoding='unicode', method='xml') | |
| return xml_string | |
| def printmd(string): | |
| """ | |
| Display a markdown formatted string. | |
| Parameters: | |
| string (str): Markdown string. | |
| Returns: | |
| None | |
| """ | |
| display(Markdown(string)) | |
| def prettify(elem): | |
| """ | |
| Return a pretty-printed XML string for the given element. | |
| Parameters: | |
| elem: XML element. | |
| Returns: | |
| str: Pretty XML string. | |
| """ | |
| rough_string = ET.tostring(elem, 'utf-8') | |
| reparsed = minidom.parseString(rough_string) | |
| pretty_xml = reparsed.toprettyxml(indent=" ") | |
| # Remove empty lines | |
| lines = [line for line in pretty_xml.splitlines() if line.strip()] | |
| return "\n".join(lines) | |
| class TicTocClass(object): | |
| """ | |
| Utility class for measuring elapsed time. | |
| Usage: | |
| tt = TicTocClass() | |
| tt.tic() | |
| ~~ | |
| tt.toc() | |
| """ | |
| def __init__(self,name='tictoc',print_every=1): | |
| """ | |
| Initialize the TicTocClass. | |
| Parameters: | |
| name (str): Name identifier. | |
| print_every (int): Frequency for printing elapsed time. | |
| Returns: | |
| None | |
| """ | |
| self.name = name | |
| self.time_start = time.time() | |
| self.time_end = time.time() | |
| self.print_every = print_every | |
| self.time_elapsed = 0.0 | |
| self.cnt = 0 | |
| def tic(self): | |
| """ | |
| Record the current time as the start time. | |
| Parameters: | |
| None | |
| Returns: | |
| None | |
| """ | |
| self.time_start = time.time() | |
| def toc(self,str=None,cnt=None,print_every=None,verbose=False): | |
| """ | |
| Compute and print the elapsed time since the last tic. | |
| Parameters: | |
| str (str, optional): Custom label. | |
| cnt (int, optional): Current count for periodic printing. | |
| print_every (int, optional): Print frequency. | |
| verbose (bool): Verbosity flag. | |
| Returns: | |
| float: Elapsed time. | |
| """ | |
| self.time_end = time.time() | |
| self.time_elapsed = self.time_end - self.time_start | |
| if print_every is not None: | |
| self.print_every = print_every | |
| if verbose: | |
| if self.time_elapsed <1.0: | |
| time_show = self.time_elapsed*1000.0 | |
| time_unit = 'ms' | |
| elif self.time_elapsed <60.0: | |
| time_show = self.time_elapsed | |
| time_unit = 's' | |
| else: | |
| time_show = self.time_elapsed/60.0 | |
| time_unit = 'min' | |
| if cnt is not None: self.cnt = cnt | |
| if (self.cnt % self.print_every) == 0: | |
| if str is None: | |
| print ("%s Elapsed time:[%.2f]%s"% | |
| (self.name,time_show,time_unit)) | |
| else: | |
| print ("%s Elapsed time:[%.2f]%s"% | |
| (str,time_show,time_unit)) | |
| self.cnt = self.cnt + 1 | |
| # Return | |
| return self.time_elapsed | |
| def sleep(sec): | |
| """ | |
| Pause execution for a specified number of seconds. | |
| Parameters: | |
| sec (float): Number of seconds to sleep. | |
| Returns: | |
| None | |
| """ | |
| time.sleep(sec) | |
| class OccupancyGridMapClass: | |
| """ | |
| Occupancy grid map for environment representation. | |
| """ | |
| def __init__( | |
| self, | |
| x_range, | |
| y_range, | |
| resolution, | |
| verbose = False, | |
| ): | |
| """ | |
| Initialize the occupancy grid map. | |
| Parameters: | |
| x_range (tuple): X-axis range. | |
| y_range (tuple): Y-axis range. | |
| resolution (float): Grid resolution. | |
| verbose (bool): Verbosity flag. | |
| Returns: | |
| None | |
| """ | |
| self.x_range = x_range | |
| self.y_range = y_range | |
| self.resolution = resolution | |
| self.verbose = verbose | |
| self.width = int((x_range[1] - x_range[0]) / resolution) | |
| self.height = int((y_range[1] - y_range[0]) / resolution) | |
| self.extent = [self.y_range[0], self.y_range[1], self.x_range[1], self.x_range[0]] | |
| # Reset | |
| self.reset() | |
| self.reset_local_grid() | |
| if self.verbose: | |
| print ("grid:[%s]"%(self.grid.shape,)) | |
| def reset(self): | |
| """ | |
| Reset the occupancy grid to all zeros. | |
| Parameters: | |
| None | |
| Returns: | |
| None | |
| """ | |
| self.grid = np.zeros((self.height,self.width), dtype=np.int8) | |
| def reset_local_grid(self): | |
| """ | |
| Reset the local occupancy grid. | |
| Parameters: | |
| None | |
| Returns: | |
| None | |
| """ | |
| self.local_grid = np.zeros((self.height,self.width), dtype=np.int8) | |
| def xy_to_grid(self,x,y): | |
| """ | |
| Convert (x,y) coordinates to grid indices. | |
| Parameters: | |
| x (float): X-coordinate. | |
| y (float): Y-coordinate. | |
| Returns: | |
| tuple: (i, j) grid indices. | |
| """ | |
| i = int((y - self.y_range[0]) / self.resolution) | |
| j = int((x - self.x_range[0]) / self.resolution) | |
| return i,j | |
| def grid_to_xy(self,i,j): | |
| """ | |
| Convert grid indices to (x,y) coordinates. | |
| Parameters: | |
| i (int): Row index. | |
| j (int): Column index. | |
| Returns: | |
| tuple: (x, y) coordinates. | |
| """ | |
| y = i * self.resolution + self.y_range[0] | |
| x = j * self.resolution + self.x_range[0] | |
| return x,y | |
| def batch_xy_to_grid(self,xy_list): | |
| """ | |
| Batch convert (x,y) pairs to grid indices. | |
| Parameters: | |
| xy_list (array): List or array of (x,y) pairs. | |
| Returns: | |
| tuple: Arrays of grid indices. | |
| """ | |
| xy_array = np.asarray(xy_list) | |
| x,y = xy_array[..., 0], xy_array[..., 1] # [L] or [N x L] | |
| i = ((y - self.y_range[0]) / self.resolution).astype(int) | |
| j = ((x - self.x_range[0]) / self.resolution).astype(int) | |
| return i,j | |
| def check_point_occupancy(self,x,y,use_margin=True,use_local_grid=False): | |
| """ | |
| Check if a point is occupied in the grid. | |
| Parameters: | |
| x (float): X-coordinate. | |
| y (float): Y-coordinate. | |
| use_margin (bool): Whether to use margin. | |
| use_local_grid (bool): Whether to check the local grid. | |
| Returns: | |
| int: Occupancy value. | |
| """ | |
| i,j = self.xy_to_grid(x=x,y=y) | |
| if use_margin: | |
| occupied = self.grid_with_margin[i,j] | |
| else: | |
| occupied = self.grid[i,j] | |
| if use_local_grid: # (optional) use local grid | |
| if use_margin: # margin-added local grid | |
| occupied = max(occupied,self.local_grid_with_margin[i,j]) | |
| else: # local grid without margin | |
| occupied = max(occupied,self.local_grid[i,j]) | |
| return occupied | |
| def batch_check_point_occupancy(self,xy_list,use_margin=True,use_local_grid=False): | |
| """ | |
| Batch check occupancy for multiple points. | |
| Parameters: | |
| xy_list (array): Array of (x,y) pairs. | |
| use_margin (bool): Whether to use margin. | |
| use_local_grid (bool): Whether to check the local grid. | |
| Returns: | |
| np.array: Occupancy values. | |
| """ | |
| xy_array = np.asarray(xy_list) | |
| shape = xy_array.shape[:-1] # [L] or [N,L] | |
| i,j = self.batch_xy_to_grid(xy_list=xy_array) | |
| i = np.clip(i, 0, self.grid.shape[0] - 1) # Limit row indices | |
| j = np.clip(j, 0, self.grid.shape[1] - 1) # Limit column indices | |
| if use_margin: | |
| occupied = self.grid_with_margin[i,j] | |
| else: | |
| occupied = self.grid[i,j] | |
| if use_local_grid: # (optional) use local grid | |
| if use_margin: # margin-added local grid | |
| occupied = np.maximum(occupied,self.local_grid_with_margin[i,j]) | |
| else: # local grid without margin | |
| occupied = np.maximum(occupied,self.local_grid[i,j]) | |
| # Return | |
| return occupied.reshape(shape) # [L] or [N x L] | |
| def sample_feasible_xy(self,reduce_rate=None,use_margin=True): | |
| """ | |
| Sample a feasible (x,y) point from the grid. | |
| Parameters: | |
| reduce_rate (float, optional): Reduction rate for sampling. | |
| use_margin (bool): Whether to use margin. | |
| Returns: | |
| np.array: Sampled (x,y) point. | |
| """ | |
| while True: | |
| x_min,x_max = self.x_range[0],self.x_range[1] | |
| y_min,y_max = self.y_range[0],self.y_range[1] | |
| x_span,y_span = x_max-x_min,y_max-y_min | |
| if reduce_rate is not None: # reduce_rate=1.0: no reduce / 0.0: no span | |
| x_min = x_min + 0.5*(1-reduce_rate)*x_span | |
| y_min = y_min + 0.5*(1-reduce_rate)*y_span | |
| x_span = reduce_rate*x_span | |
| y_span = reduce_rate*y_span | |
| x_rand = x_min + x_span*np.random.rand() | |
| y_rand = y_min + y_span*np.random.rand() | |
| if not self.check_point_occupancy(x=x_rand,y=y_rand,use_margin=use_margin): | |
| break | |
| # Return | |
| return np.array([x_rand,y_rand]) | |
| def check_line_occupancy(self, x_fr, y_fr, x_to, y_to, step_size=0.1, use_margin=True): | |
| """ | |
| Check if the line between two points is free of obstacles. | |
| Parameters: | |
| x_fr (float): Starting x-coordinate. | |
| y_fr (float): Starting y-coordinate. | |
| x_to (float): Ending x-coordinate. | |
| y_to (float): Ending y-coordinate. | |
| step_size (float): Step size for checking. | |
| use_margin (bool): Whether to use margin. | |
| Returns: | |
| int: 1 if occupied, 0 if free. | |
| """ | |
| distance = np.hypot(x_to-x_fr,y_to-y_fr) | |
| num_steps = int(distance/step_size) | |
| # Generate points along the line | |
| x_values = np.linspace(x_fr,x_to,num_steps) | |
| y_values = np.linspace(y_fr,y_to,num_steps) | |
| # Check each point's occupancy | |
| for x, y in zip(x_values,y_values): | |
| if self.check_point_occupancy(x,y,use_margin=use_margin): | |
| return 1 # occupied | |
| return 0 # free | |
| def update_grid(self,xy_scan): | |
| """ | |
| Update the global occupancy grid using scan data. | |
| Parameters: | |
| xy_scan (list): List of (x,y) scan points. | |
| Returns: | |
| None | |
| """ | |
| for (x,y) in xy_scan: | |
| i, j = self.xy_to_grid(x,y) | |
| if 0 <= i < self.height and 0 <= j < self.width: | |
| self.grid[i, j] = 1 | |
| def update_local_grid(self,xy_scan,reset_first=True,add_margin=False): | |
| """ | |
| Update the local occupancy grid with scan data. | |
| Parameters: | |
| xy_scan (list): List of (x,y) points. | |
| reset_first (bool): Whether to reset the local grid first. | |
| add_margin (bool): Whether to add margin. | |
| Returns: | |
| None | |
| """ | |
| if reset_first: | |
| self.reset_local_grid() | |
| for (x,y) in xy_scan: | |
| i, j = self.xy_to_grid(x,y) | |
| if 0 <= i < self.height and 0 <= j < self.width: | |
| self.local_grid[i, j] = 1 | |
| # Add margin | |
| if add_margin: | |
| self.add_margin_to_local_grid(margin=self.safety_margin) | |
| def add_margin(self,margin): | |
| """ | |
| Add margin around occupied cells in the global grid. | |
| Parameters: | |
| margin (float): Margin distance. | |
| Returns: | |
| None | |
| """ | |
| # Set mask | |
| margin_cells = int(margin / self.resolution) | |
| mask = np.zeros((2 * margin_cells + 1, 2 * margin_cells + 1), dtype=bool) | |
| y, x = np.ogrid[-margin_cells:margin_cells+1, -margin_cells:margin_cells+1] | |
| mask[y**2 + x**2 <= margin_cells**2] = True | |
| # Add mask to grid | |
| self.grid_with_margin = binary_dilation(self.grid,structure=mask).astype(np.int8) | |
| def add_margin_to_local_grid(self,margin): | |
| """ | |
| Add margin around occupied cells in the local grid. | |
| Parameters: | |
| margin (float): Margin distance. | |
| Returns: | |
| None | |
| """ | |
| # Set mask | |
| margin_cells = int(margin / self.resolution) | |
| mask = np.zeros((2 * margin_cells + 1, 2 * margin_cells + 1), dtype=bool) | |
| y, x = np.ogrid[-margin_cells:margin_cells+1, -margin_cells:margin_cells+1] | |
| mask[y**2 + x**2 <= margin_cells**2] = True | |
| # Add mask to grid | |
| self.local_grid_with_margin = binary_dilation(self.local_grid,structure=mask).astype(np.int8) | |
| def plot( | |
| self, | |
| grid = None, | |
| figsize = (6,4), | |
| title_str = 'Occupancy Grid Map', | |
| no_show = False, | |
| return_ax = False, | |
| ): | |
| """ | |
| Plot the occupancy grid map. | |
| Parameters: | |
| grid (array, optional): Grid to plot. | |
| figsize (tuple): Figure size. | |
| title_str (str): Plot title. | |
| no_show (bool): Do not display immediately. | |
| return_ax (bool): Return axis object. | |
| Returns: | |
| None | |
| """ | |
| if grid is None: grid = self.grid | |
| plt.figure(figsize=figsize) | |
| self.ax = plt.axes() | |
| plt.imshow(grid.T,cmap="gray", origin="upper",extent=self.extent) # transpose the map (swap X and Y axis) | |
| plt.grid(color='w', linestyle='-', linewidth=0.2) | |
| plt.gca().set_xticks(np.arange(self.y_range[0],self.y_range[1],self.resolution),minor=True) | |
| plt.gca().set_yticks(np.arange(self.x_range[0],self.x_range[1],self.resolution),minor=True) | |
| plt.gca().grid(which='minor',color='w', linestyle='-',linewidth=0.1) | |
| plt.xlabel("Y (m)",fontsize=8);plt.ylabel("X (m)",fontsize=8);plt.title(title_str,fontsize=10) | |
| if not no_show: | |
| plt.show() | |
| def plot_point( | |
| self, | |
| p = np.array([0,0,]), | |
| marker = 'o', | |
| ms = 10, | |
| mec = 'r', | |
| mew = 2, | |
| mfc = 'none', | |
| label = None, | |
| fs = 8, | |
| p_offset = 0.2, | |
| no_show = True, | |
| ): | |
| """ | |
| Plot a single point on the grid. | |
| Parameters: | |
| p (np.array): Point coordinates. | |
| marker (str): Marker style. | |
| ms (int): Marker size. | |
| mec (str): Marker edge color. | |
| mew (int): Marker edge width. | |
| mfc (str): Marker face color. | |
| label (str, optional): Label text. | |
| fs (int): Font size. | |
| p_offset (float): Label offset. | |
| no_show (bool): Do not display immediately. | |
| Returns: | |
| None | |
| """ | |
| plt.plot(p[1],p[0],marker=marker,ms=ms,mec=mec,mew=mew,mfc=mfc) | |
| if label is not None: | |
| plt.text( | |
| p_offset+p[1],p[0],label,va='center',fontsize=fs, | |
| bbox=dict(fc='white',alpha=0.5,ec='k') | |
| ) | |
| if not no_show: | |
| plt.show() | |
| def plot_points( | |
| self, | |
| ps = np.zeros((10,2)), | |
| marker = 'o', | |
| ms = 3, | |
| mfc = 'none', | |
| mec = 'k', | |
| mew = 1/2, | |
| ls = 'none', | |
| lw = 0.5, | |
| color = 'b', | |
| no_show = True, | |
| ): | |
| """ | |
| Plot multiple points on the grid. | |
| Parameters: | |
| ps (array): Array of points. | |
| marker (str): Marker style. | |
| ms (int): Marker size. | |
| mfc (str): Marker face color. | |
| mec (str): Marker edge color. | |
| mew (float): Marker edge width. | |
| ls (str): Line style. | |
| lw (float): Line width. | |
| color (str): Color for points. | |
| no_show (bool): Do not display immediately. | |
| Returns: | |
| None | |
| """ | |
| plt.plot(ps[:,1],ps[:,0], | |
| marker=marker,ms=ms,mec=mec,mew=mew,mfc=mfc, | |
| ls=ls,lw=lw,color=color) # swap x and y | |
| if not no_show: | |
| plt.show() | |
| def plot_rrt_results( | |
| self, | |
| rrt, | |
| figsize = (8,6), | |
| title_str = 'RRT Results', | |
| edge_color = (1,1,0,1), | |
| edge_lw = 1/4, | |
| node_color = (1,1,0,0.5), | |
| node_ec = (0,0,0,1), | |
| node_ms = 3, | |
| path_color = (0,0,1,1), | |
| path_lw = 1, | |
| path_ms = 3, | |
| save_png = False, | |
| png_path = '', | |
| verbose = True, | |
| ): | |
| """ | |
| Plot RRT results on the occupancy grid map. | |
| Parameters: | |
| rrt: RRT object with nodes and edges. | |
| figsize (tuple): Figure size. | |
| title_str (str): Plot title. | |
| edge_color (tuple): Edge color. | |
| edge_lw (float): Edge line width. | |
| node_color (tuple): Node color. | |
| node_ec (tuple): Node edge color. | |
| node_ms (int): Node marker size. | |
| path_color (tuple): Path color. | |
| path_lw (float): Path line width. | |
| path_ms (int): Path marker size. | |
| save_png (bool): Save plot as PNG. | |
| png_path (str): File path for PNG. | |
| verbose (bool): Verbosity flag. | |
| Returns: | |
| None | |
| """ | |
| # Plot occupancy grid map | |
| self.plot( | |
| grid = self.grid_with_margin, | |
| figsize = figsize, | |
| title_str = title_str, | |
| no_show = True, | |
| return_ax = True, | |
| ) | |
| # Get RRT nodes | |
| p_nodes = np.array([rrt.get_node_point(node) for node in rrt.get_nodes()]) # [N x 2] | |
| p_nodes_swap = p_nodes[:,[1,0]] # X-Y swapped | |
| # Plot RRT edges | |
| edgelist = list(rrt.get_edges()) | |
| edge_pos = np.asarray( | |
| [(p_nodes_swap[e[0]],p_nodes_swap[e[1]]) for e in edgelist]) # [M x 2 x 2] | |
| edge_collection = mpl.collections.LineCollection( | |
| edge_pos,colors=edge_color[:3],linewidths=edge_lw,alpha=edge_color[3]) | |
| self.ax.add_collection(edge_collection) | |
| # Plot RRT nodes | |
| self.plot_points(ps=p_nodes,mfc=node_color,mec=node_ec,ms=node_ms) | |
| # Path to goal | |
| path_to_goal,node_list = rrt.get_path_to_goal() | |
| if path_to_goal is not None: | |
| plt.plot(path_to_goal[:,1],path_to_goal[:,0],'o-', | |
| color=path_color,lw=path_lw,mec=path_color,ms=path_ms,mfc='none') | |
| # Plot root and goal | |
| self.plot_point(p=rrt.point_root,mec='r',label='Start') | |
| self.plot_point(p=rrt.point_goal,mec='b',label='Final') | |
| # Show | |
| if save_png: | |
| if verbose: | |
| print ("[%s] saved."%(png_path)) | |
| directory = os.path.dirname(png_path) | |
| if not os.path.exists(directory): | |
| os.makedirs(directory) | |
| if verbose: | |
| print ("[%s] generated."%(directory)) | |
| plt.savefig(png_path, bbox_inches='tight') | |
| plt.close() | |
| else: | |
| # Show | |
| plt.show() | |
| def get_rgb_array_from_grid( | |
| self, | |
| grid = None, | |
| grid_local = None, | |
| figsize = (8,6), | |
| xy = None, | |
| marker = 'o', | |
| ms = 10, | |
| mec = 'r', | |
| mew = 2, | |
| mfc = 'none', | |
| label = None, | |
| fs = 8, | |
| p_offset = 0.2, | |
| no_show = True, | |
| ): | |
| """ | |
| Generate an RGB image array from the occupancy grid. | |
| Parameters: | |
| grid (array, optional): Global grid. | |
| grid_local (array, optional): Local grid. | |
| figsize (tuple): Figure size. | |
| xy (np.array, optional): Point to overlay. | |
| marker (str): Marker style. | |
| ms (int): Marker size. | |
| mec (str): Marker edge color. | |
| mew (int): Marker edge width. | |
| mfc (str): Marker face color. | |
| label (str, optional): Label text. | |
| fs (int): Font size. | |
| p_offset (float): Label offset. | |
| no_show (bool): Do not display immediately. | |
| Returns: | |
| np.array: RGB image array. | |
| """ | |
| fig,ax = plt.subplots(figsize=figsize) | |
| # Initialize 'color_array'. Note that both 'grid' and 'grid_local' are transposed | |
| color_array = np.zeros((grid.shape[1], grid.shape[0], 3), dtype=np.uint8) | |
| color_array[grid.T == 1] = [255,255,255] # white | |
| if grid_local is not None: | |
| color_array[(grid_local.T==1) & (grid.T==0)] = [0,255,0] # only local occpupied: green | |
| color_array[(grid_local.T==1) & (grid.T==1)] = [255,255,0] # both local and global occupied: yellow | |
| # Show image | |
| ax.imshow(color_array, origin='upper', extent=self.extent) | |
| # Plot point | |
| if xy is not None: | |
| self.plot_point( | |
| p = xy, | |
| marker = marker, | |
| ms = ms, | |
| mec = mec, | |
| mew = mew, | |
| mfc = mfc, | |
| label = label, | |
| fs = fs, | |
| p_offset = p_offset, | |
| no_show = no_show, | |
| ) | |
| plt.axis('off') # remove axis | |
| # Buffer | |
| buf = BytesIO() | |
| fig.savefig(buf, format='png', bbox_inches='tight', pad_inches=0) | |
| buf.seek(0) | |
| # RGBA array | |
| rgba_array = np.frombuffer(buf.getvalue(),dtype=np.uint8) | |
| rgba_array = plt.imread(buf, format='png') | |
| rgb_array = rgba_array[..., :3] # remove alpha channel | |
| rgb_array = 255*rgb_array.astype(np.int32) | |
| # Close | |
| plt.close(fig) | |
| return rgb_array | |
| def get_xy_meshgrid(x_range,y_range,res): | |
| """ | |
| Generate a meshgrid of (x,y) pairs given ranges and resolution. | |
| Parameters: | |
| x_range (tuple): X-axis range. | |
| y_range (tuple): Y-axis range. | |
| res (float): Resolution. | |
| Returns: | |
| np.array: Array of (x,y) pairs. | |
| """ | |
| x_values = np.arange(x_range[0], x_range[1] + res, res) | |
| y_values = np.arange(y_range[0], y_range[1] + res, res) | |
| xx, yy = np.meshgrid(x_values, y_values) | |
| xy_meshgrid = np.column_stack([xx.ravel(), yy.ravel()]) | |
| return xy_meshgrid | |
| def rot_mtx(deg): | |
| """ | |
| Return a 2x2 rotation matrix for the given angle in degrees. | |
| Parameters: | |
| deg (float): Angle in degrees. | |
| Returns: | |
| np.array: 2x2 rotation matrix. | |
| """ | |
| theta = np.radians(deg) | |
| c, s = np.cos(theta), np.sin(theta) | |
| R = np.array(((c, -s), (s, c))) | |
| return R | |
| def is_point_in_polygon(point,polygon): | |
| """ | |
| Check if a point is inside a polygon. | |
| Parameters: | |
| point (np.array or Point): Point to check. | |
| polygon: Polygon object. | |
| Returns: | |
| bool: True if inside, False otherwise. | |
| """ | |
| if isinstance(point,np.ndarray): | |
| point_check = Point(point) | |
| else: | |
| point_check = point | |
| return sp.contains(polygon,point_check) | |
| def is_point_feasible(point,obs_list): | |
| """ | |
| Determine if a point is feasible with respect to obstacles. | |
| Parameters: | |
| point (np.array): Point coordinates. | |
| obs_list: List of obstacles. | |
| Returns: | |
| bool: True if feasible, False otherwise. | |
| """ | |
| result = is_point_in_polygon(point,obs_list) # is the point inside each obstacle? | |
| if sum(result) == 0: | |
| return True | |
| else: | |
| return False | |
| def is_point_to_point_connectable(point1,point2,obs_list): | |
| """ | |
| Check if a straight line between two points is free of obstacles. | |
| Parameters: | |
| point1 (np.array): Start point. | |
| point2 (np.array): End point. | |
| obs_list: List of obstacles. | |
| Returns: | |
| bool: True if connectable, False otherwise. | |
| """ | |
| result = sp.intersects(LineString([point1,point2]),obs_list) | |
| if sum(result) == 0: | |
| return True | |
| else: | |
| return False | |
| def get_ogm_from_env( | |
| env, | |
| rf_body_name = 'body_lidar', | |
| rf_sensor_names = [], | |
| x_range = (-5,5), | |
| y_range = (-5,5), | |
| scan_resolution = 1.0, | |
| grid_resolution = 0.05, | |
| safety_margin = 0.2, | |
| plot = True, | |
| figsize = (5,4), | |
| verbose = True, | |
| ): | |
| """ | |
| Generate an occupancy grid map from environment sensor data. | |
| Parameters: | |
| env: Simulation environment. | |
| rf_body_name (str): Reference body name. | |
| rf_sensor_names (list): Sensor names. | |
| x_range (tuple): X-axis range. | |
| y_range (tuple): Y-axis range. | |
| scan_resolution (float): Scan resolution. | |
| grid_resolution (float): Grid resolution. | |
| safety_margin (float): Safety margin. | |
| plot (bool): Whether to plot the grid. | |
| figsize (tuple): Plot figure size. | |
| verbose (bool): Verbosity flag. | |
| Returns: | |
| OccupancyGridMapClass: Occupancy grid map object. | |
| """ | |
| # Scan with changing the sensor | |
| xy_meshgrid = get_xy_meshgrid(x_range=x_range,y_range=y_range,res=scan_resolution) | |
| xy_data_list = [] | |
| for xy in xy_meshgrid: | |
| env.set_p_body(body_name=rf_body_name,p=np.append(xy,0)) # move lidar | |
| p_rf_obs_list = env.get_p_rf_obs_list(sensor_names=rf_sensor_names) # scan | |
| # Append range finder results | |
| if len(p_rf_obs_list) > 0: # if list is not empty | |
| xy_data = np.array(p_rf_obs_list)[:,:2] | |
| xy_data_list.append(xy_data) | |
| # Set occupancy grid map | |
| ogm = OccupancyGridMapClass( | |
| x_range = x_range, | |
| y_range = y_range, | |
| resolution = grid_resolution, | |
| ) | |
| if verbose: | |
| print ("Occuapncy grid map instantiated from [%s] env"%(env.name)) | |
| print ("x_range:[%.1f]m~[%.1f]m y_range:[%.1f]m~[%.1f]m"% | |
| (x_range[0],x_range[1],y_range[0],y_range[1])) | |
| print ("resolution:[%.2f]m"%(grid_resolution)) | |
| print ("grid.shape:%s"%(ogm.grid.shape,)) | |
| ogm.safety_margin = safety_margin # safety margin | |
| for xy_data in xy_data_list: | |
| ogm.update_grid(xy_scan=xy_data) # update grid | |
| ogm.add_margin(margin=safety_margin) # add margin | |
| # Plot | |
| if plot: | |
| ogm.plot(grid=ogm.grid,figsize=figsize,title_str='Occupancy Grid Map') | |
| ogm.plot(grid=ogm.grid_with_margin,figsize=figsize,title_str='Occupancy Grid Map with Margin') | |
| return ogm | |
| def get_wheel_vels( | |
| dir_vel, # directional velocity of the base (m/s) | |
| ang_vel, # angular velocity of the base (rad/s) | |
| w_radius, # wheel radius (m) | |
| w2w_dist, # wheel to wheel distance (m) | |
| ): | |
| """ | |
| Compute left and right wheel velocities for a differential wheeled robot. | |
| Parameters: | |
| dir_vel (float): Base directional velocity. | |
| ang_vel (float): Base angular velocity. | |
| w_radius (float): Wheel radius. | |
| w2w_dist (float): Wheel-to-wheel distance. | |
| Returns: | |
| tuple: (left_vel, right_vel) | |
| """ | |
| left_vel = (dir_vel - (ang_vel*w2w_dist/2)) / w_radius | |
| right_vel = (dir_vel + (ang_vel*w2w_dist/2)) / w_radius | |
| return left_vel,right_vel | |
| def noramlize_angle(angle): | |
| """ | |
| Normalize an angle to the range [-pi, pi]. | |
| Parameters: | |
| angle (float): Input angle in radians. | |
| Returns: | |
| float: Normalized angle. | |
| """ | |
| while angle > np.pi: | |
| angle = angle - 2*np.pi | |
| while angle < -np.pi: | |
| angle = angle + 2*np.pi | |
| return angle | |
| def compute_xy_heading(p,R): | |
| """ | |
| Compute (x,y) coordinates and heading from position and rotation. | |
| Parameters: | |
| p (np.array): Position vector. | |
| R (np.array): Rotation matrix. | |
| Returns: | |
| tuple: (xy, heading) | |
| """ | |
| xy = p[:2] | |
| heading = np.arctan2(R[1,0],R[0,0]) | |
| return xy,heading | |
| def compute_lookahead_traj(xy,heading,dir_vel,ang_vel,dt,T): | |
| """ | |
| Compute a lookahead trajectory based on current state and velocities. | |
| Parameters: | |
| xy (np.array): Current (x,y) position. | |
| heading (float): Current heading angle. | |
| dir_vel (float): Directional velocity. | |
| ang_vel (float): Angular velocity. | |
| dt (float): Time step. | |
| T (float): Total lookahead time. | |
| Returns: | |
| tuple: (secs, xy_traj, heading_traj) | |
| """ | |
| L = int(T/dt) | |
| xy_traj,heading_traj,secs = np.zeros((L,2)),np.zeros(L),np.zeros(L) | |
| # Append current xy and heading | |
| xy_traj[0,:] = xy | |
| heading_traj[0] = heading | |
| secs[0] = 0.0 | |
| # Loop | |
| xy_curr = xy.copy() | |
| heading_curr = heading | |
| for tick in range(L-1): | |
| # Update | |
| c,s = np.cos(heading_curr),np.sin(heading_curr) | |
| xy_curr = xy_curr + dir_vel*dt*np.array([c,s]) | |
| heading_curr = heading_curr + ang_vel*dt | |
| # Append | |
| xy_traj[tick+1,:] = xy_curr.copy() | |
| heading_traj[tick+1] = heading_curr | |
| secs[tick+1] = tick*dt | |
| # Return | |
| return secs,xy_traj,heading_traj | |
| def compute_dir_ang_vels(sec,p,R): | |
| """ | |
| Compute directional and angular velocities from time, position, and orientation. | |
| Parameters: | |
| sec (float): Current time. | |
| p (np.array): Current position. | |
| R (np.array): Current rotation matrix. | |
| Returns: | |
| tuple: (dir_vel, ang_vel) | |
| """ | |
| # First call flag | |
| if not hasattr(compute_dir_ang_vels,'first_flag'): | |
| compute_dir_ang_vels.first_flag = True | |
| else: | |
| compute_dir_ang_vels.first_flag = False | |
| if compute_dir_ang_vels.first_flag: | |
| dir_vel = 0.0 | |
| ang_vel = 0.0 | |
| else: | |
| sec_prev = compute_dir_ang_vels.sec_prev | |
| p_prev = compute_dir_ang_vels.p_prev.copy() | |
| R_prev = compute_dir_ang_vels.R_prev.copy() | |
| delta_t = sec - sec_prev | |
| delta_x = p[0] - p_prev[0] | |
| delta_y = p[1] - p_prev[1] | |
| dir_vel = np.sqrt(delta_x**2 + delta_y**2) / delta_t | |
| R_rel = R_prev.T @ R | |
| theta = np.arctan2(R_rel[1, 0], R_rel[0, 0]) | |
| theta = noramlize_angle(theta) | |
| ang_vel = theta / delta_t | |
| # Backup | |
| compute_dir_ang_vels.sec_prev = sec | |
| compute_dir_ang_vels.p_prev = p | |
| compute_dir_ang_vels.R_prev = R | |
| return dir_vel,ang_vel | |
| def get_closest_xy_idx(xy,xy_traj): | |
| """ | |
| Get the index of the closest point in a trajectory to a given point. | |
| Parameters: | |
| xy (np.array): Query point. | |
| xy_traj (array): Trajectory of points. | |
| Returns: | |
| int: Index of the closest point. | |
| """ | |
| if len(xy.shape) == 1: | |
| xy_query = xy[np.newaxis,:] | |
| else: | |
| xy_query = xy.copy() | |
| dists = cdist(xy[np.newaxis,:],xy_traj) | |
| idx_closest = np.argmin(dists) | |
| return idx_closest | |
| def get_closest_xy(xy,xy_traj): | |
| """ | |
| Get the closest point and its index from a trajectory. | |
| Parameters: | |
| xy (np.array): Query point. | |
| xy_traj (array): Trajectory of points. | |
| Returns: | |
| tuple: (index, closest point) | |
| """ | |
| idx_closest = get_closest_xy_idx(xy,xy_traj) | |
| xy_closest = xy_traj[idx_closest] | |
| return idx_closest,xy_closest | |
| def compute_distance_from_xy_to_xy_traj(xy,xy_traj): | |
| """ | |
| Compute the shortest distance from a point to a trajectory. | |
| Parameters: | |
| xy (np.array): Query point. | |
| xy_traj (array): Trajectory of points. | |
| Returns: | |
| float: Minimum distance. | |
| """ | |
| idx_closest,xy_closest = get_closest_xy(xy,xy_traj) | |
| dist = np.linalg.norm(xy-xy_closest) # distance | |
| return dist | |
| def compute_angle_from_xy(xy_fr,xy_to): | |
| """ | |
| Compute the angle (in radians) from one point to another. | |
| Parameters: | |
| xy_fr (np.array): Starting point. | |
| xy_to (np.array): Ending point. | |
| Returns: | |
| float: Angle in radians. | |
| """ | |
| delta = xy_to - xy_fr | |
| angle = np.arctan2(delta[1],delta[0]) | |
| return angle | |
| def compute_point_to_segment_distance(p,p_a,p_b): | |
| """ | |
| Compute the distance from a point to a line segment and return the closest point. | |
| Parameters: | |
| p (np.array): Query point. | |
| p_a (np.array): Segment start. | |
| p_b (np.array): Segment end. | |
| Returns: | |
| tuple: (distance, closest point) | |
| """ | |
| # Vectors | |
| ab = p_b - p_a | |
| ap = p - p_a | |
| bp = p - p_b | |
| # ab squared for reuse | |
| ab_squared = np.dot(ab, ab) | |
| # If p_a==p_b | |
| if ab_squared == 0: | |
| return np.linalg.norm(ap),p_a.copy() | |
| # Check whether the closest point is on the segment | |
| t = np.dot(ap, ab) / ab_squared | |
| if 0 <= t <= 1: | |
| # on the segment | |
| closest_point = p_a + t * ab | |
| distance = np.linalg.norm(p - closest_point) | |
| else: | |
| # not on the segment | |
| dist_a = np.dot(ap, ap) | |
| dist_b = np.dot(bp, bp) | |
| if dist_a < dist_b: | |
| closest_point = p_a | |
| distance = np.sqrt(dist_a) | |
| else: | |
| closest_point = p_b | |
| distance = np.sqrt(dist_b) | |
| # Return | |
| return distance, closest_point.copy() | |
| def compute_point_to_segments_distance(p,p_traj): | |
| """ | |
| Compute the minimum distance from a point to a sequence of line segments. | |
| Parameters: | |
| p (np.array): Query point. | |
| p_traj (array): Sequence of points defining segments. | |
| Returns: | |
| tuple: (minimum distance, closest point, segment index) | |
| """ | |
| L = p_traj.shape[0] | |
| dists = np.zeros(L-1) | |
| p_closest_list = [] | |
| for idx in range(L-1): | |
| p_a = p_traj[idx,:] # [d] | |
| p_b = p_traj[idx+1,:] # [d] | |
| dist,p_closest = compute_point_to_segment_distance(p=p,p_a=p_a,p_b=p_b) | |
| # Append | |
| dists[idx] = dist | |
| p_closest_list.append(p_closest) | |
| # Return | |
| idx_seg = np.argmin(dists) | |
| distance = dists[idx_seg] | |
| p_closest = p_closest_list[idx_seg].copy() | |
| return distance,p_closest,idx_seg | |
| def get_colors_from_costs( | |
| costs, | |
| cmap='autumn', | |
| alpha=None, | |
| cost_max=np.inf, | |
| color_min=(1, 0, 0, 1.0), | |
| color_max=(0, 0, 0, 0.02), | |
| ): | |
| """ | |
| Map cost values to RGBA colors using a colormap and thresholds. | |
| Parameters: | |
| costs (array): Array of cost values. | |
| cmap (str): Colormap name. | |
| alpha (float, optional): Base alpha value. | |
| cost_max (float): Maximum cost threshold. | |
| color_min (tuple): Color for minimum cost. | |
| color_max (tuple): Color for infeasible costs. | |
| Returns: | |
| np.array: Array of RGBA colors. | |
| """ | |
| # Initialize colors array | |
| colors = np.zeros((len(costs), 4)) | |
| # Set infeasible indices to color_max | |
| infeas_mask = costs >= cost_max | |
| colors[infeas_mask] = color_max | |
| # Process feasible indices | |
| feas_mask = costs < cost_max | |
| if np.any(feas_mask): | |
| costs_feas = costs[feas_mask] | |
| costs_feas_nzd = (costs_feas - costs_feas.min()) / (costs_feas.max() - costs_feas.min() + 1e-6) | |
| cmap_colors = plt.get_cmap(cmap)(costs_feas_nzd) | |
| if alpha is not None: | |
| cmap_colors[:, 3] = alpha # Set alpha channel | |
| colors[feas_mask] = cmap_colors | |
| # Handle minimum cost color | |
| if color_min is not None: | |
| min_mask = (costs == costs.min()) & (costs < cost_max) | |
| colors[min_mask] = color_min | |
| return colors | |
| def xy2xyz(xy, z=0.0): | |
| """ | |
| Convert 2D coordinates to 3D by appending a z-value. | |
| Parameters: | |
| xy (np.array): 2D coordinates. | |
| For a single coordinate, shape=(2,). | |
| For multiple coordinates, shape=(L, 2). | |
| z (float): Z-coordinate to append (default is 0.0). | |
| Returns: | |
| np.array: 3D coordinates. | |
| For a single coordinate, shape=(3,). | |
| For multiple coordinates, shape=(L, 3). | |
| """ | |
| xy = np.asarray(xy) | |
| if xy.ndim == 1: | |
| if xy.shape[0] != 2: | |
| raise ValueError("1D array must have exactly 2 elements.") | |
| return np.concatenate((xy, np.array([z]))) | |
| elif xy.ndim == 2: | |
| if xy.shape[1] != 2: | |
| raise ValueError("2D array must have 2 columns.") | |
| z_col = np.full((xy.shape[0], 1), z) | |
| return np.concatenate((xy, z_col), axis=1) | |
| else: | |
| raise ValueError("Input must be either a 1D or 2D array.") | |
| def xyz2xy(xyz): | |
| """ | |
| Convert 3D coordinates to 2D by dropping the z-component. | |
| Parameters: | |
| xyz (np.array): 3D coordinates. | |
| For a single coordinate, shape=(3,). | |
| For multiple coordinates, shape=(L, 3). | |
| Returns: | |
| np.array: 2D coordinates. | |
| For a single coordinate, shape=(2,). | |
| For multiple coordinates, shape=(L, 2). | |
| """ | |
| xyz = np.asarray(xyz) | |
| if xyz.ndim == 1: | |
| if xyz.shape[0] != 3: | |
| raise ValueError("1D array must have exactly 3 elements.") | |
| return xyz[:2] | |
| elif xyz.ndim == 2: | |
| if xyz.shape[1] != 3: | |
| raise ValueError("2D array must have 3 columns.") | |
| return xyz[:, :2] | |
| else: | |
| raise ValueError("Input must be either a 1D or 2D array.") | |
| def lookahead_planner( | |
| xy_curr, | |
| angle_curr, | |
| xy_pursuit_traj, | |
| ogm, # occupancy grid map to check feasibility | |
| la_dir_vel_list = np.array([0.1,0.5]), # directional velocities (m/sec) | |
| la_ang_vel_list = np.deg2rad([0,-30,-20,-10,-5,+5,+10,+20,+30]), # angular velocities (rad/sec) | |
| la_dt = 0.1, # (sec) | |
| la_T = 2.0, # (sec) | |
| la_pursuit_dist = 0.5, # lookahead distance (m) | |
| la_sec_feas_check = 2.0, # feasibility check time (sec) | |
| la_sec_pursuit_check = 1.0, # pursuit check time (sec) | |
| la_dist_threshold = 0.15, # pursuit trajectory deviation distance margin (m) | |
| la_angle_threshold = np.deg2rad(20), # deviation angle margin (rad) | |
| la_turn_ang_vel = np.deg2rad(45), # in-place rotation angular velocity (rad/sec) | |
| use_local_grid = False, | |
| ): | |
| """ | |
| Plan a lookahead motion by sampling trajectories and selecting the best command. | |
| Parameters: | |
| xy_curr (np.array): Current (x,y) position. | |
| angle_curr (float): Current heading angle. | |
| xy_pursuit_traj (array): Pursuit trajectory. | |
| ogm: Occupancy grid map for feasibility checking. | |
| la_dir_vel_list (array): List of directional velocities. | |
| la_ang_vel_list (array): List of angular velocities. | |
| la_dt (float): Time step for lookahead. | |
| la_T (float): Total lookahead time. | |
| la_pursuit_dist (float): Desired pursuit distance. | |
| la_sec_feas_check (float): Time for feasibility check. | |
| la_sec_pursuit_check (float): Time for pursuit check. | |
| la_dist_threshold (float): Deviation distance threshold. | |
| la_angle_threshold (float): Deviation angle threshold. | |
| la_turn_ang_vel (float): In-place turning angular velocity. | |
| use_local_grid (bool): Whether to use local grid for feasibility. | |
| Returns: | |
| tuple: (dir_vel_cmd, ang_vel_cmd, lap_info) | |
| """ | |
| # Configuration | |
| n_la_traj = len(la_dir_vel_list)*len(la_ang_vel_list) | |
| # Buffers | |
| la_dir_ang_vels_list = [] # lookahead directional and angular velocities | |
| xy_heading_la_trajs_list = [] # lookahead trajs | |
| xy_la_pursuit_check_list = [] # pursuit check list (possibly short horizon) | |
| xy_la_feas_check_list = [] # feasibility check list (possibly long horizon) | |
| # Costs | |
| costs = np.zeros(n_la_traj) # initialize costs | |
| # Get the current pursuit point | |
| dist2traj,xy_closest,idx_seg = compute_point_to_segments_distance( | |
| p=xy_curr,p_traj=xy_pursuit_traj) | |
| # Check finished | |
| L = len(xy_pursuit_traj) # number of pursuit points | |
| if idx_seg >= L-2: # if we only have two pursuit points left | |
| is_finished = True | |
| else: | |
| is_finished = False | |
| # Lookahead planner | |
| if is_finished: # if finished, stop | |
| xy_pursuit = xy_pursuit_traj[-1,:] # when finished, let the pursuit point to be the final point | |
| dir_vel_cmd,ang_vel_cmd = 0.0,0.0 # make robot stop | |
| angle_diff = 0.0 | |
| else: | |
| # Cmopute the current pursuit point using 'la_pursuit_dist' | |
| dist_sum = np.linalg.norm(xy_pursuit_traj[idx_seg+1,:]-xy_closest) | |
| loop_cnt = 0 | |
| while True: | |
| # Increase loop counter | |
| loop_cnt = loop_cnt + 1 | |
| # Segment index to check | |
| idx_seg_check = idx_seg+loop_cnt+1 | |
| if idx_seg_check >= (L-1): | |
| xy_pursuit = xy_pursuit_traj[-1,:] # last pursuit point | |
| break | |
| else: | |
| xy_fr = xy_pursuit_traj[idx_seg_check-1,:] | |
| xy_to = xy_pursuit_traj[idx_seg_check,:] | |
| dist_sum_check = dist_sum + np.linalg.norm(xy_to-xy_fr) | |
| if dist_sum_check > la_pursuit_dist: # if 'xy_pursuit' lies within the next segment | |
| dist_remain = la_pursuit_dist - dist_sum | |
| xy_pursuit = xy_fr + dist_remain*np_uv(xy_to-xy_fr) | |
| break | |
| else: dist_sum = dist_sum_check # update 'dist_sum' | |
| # If robot devitate too much from 'xy_pursuit_traj', frist pursuit 'xy_closest' | |
| # otherwise, pursuit 'xy_pursuit_traj' using lookahead trajectories | |
| if dist2traj > la_dist_threshold: # if robot deviates too far from 'xy_pursuit_traj', move to 'xy_closest' | |
| angle_curr2closest = compute_angle_from_xy(xy_fr=xy_curr,xy_to=xy_closest) | |
| angle_diff = noramlize_angle(angle_curr2closest-angle_curr) | |
| if np.abs(angle_diff) > np.deg2rad(5): # turn to 'xy_closest' | |
| dir_vel_cmd,ang_vel_cmd = 0.0,la_turn_ang_vel*np.sign(angle_diff) # in-place rotation | |
| else: # go to 'xy_closest' | |
| dir_vel_cmd,ang_vel_cmd = 0.3,np.deg2rad(5)*np.sign(angle_diff) | |
| else: # robot is close enough to 'xy_pursuit_traj', follow the trajectory by pursuiting 'xy_pursuit' | |
| angle_closest2pursuit = compute_angle_from_xy(xy_fr=xy_closest,xy_to=xy_pursuit) | |
| angle_diff = noramlize_angle(angle_closest2pursuit-angle_curr) | |
| if np.abs(angle_diff) > la_angle_threshold: # angle deviates | |
| dir_vel_cmd,ang_vel_cmd = 0.0,la_turn_ang_vel*np.sign(angle_diff) # in-place rotation | |
| else: | |
| # lookahead planner | |
| for la_dir_vel in la_dir_vel_list: | |
| for la_ang_vel in la_ang_vel_list: | |
| la_secs,la_xy_traj,la_heading_traj = compute_lookahead_traj( | |
| xy = xy_curr, | |
| heading = angle_curr, | |
| dir_vel = la_dir_vel, | |
| ang_vel = la_ang_vel, | |
| dt = la_dt, | |
| T = la_T, | |
| ) | |
| # Append | |
| la_dir_ang_vels_list.append((la_dir_vel,la_ang_vel)) | |
| xy_heading_la_trajs_list.append((la_xy_traj,la_heading_traj)) | |
| # Compute costs of the lookahead trajectories | |
| for t_idx in range(n_la_traj): # for each lookahead trajectory | |
| (la_xy_traj,la_heading_traj) = xy_heading_la_trajs_list[t_idx] | |
| # Check the feasibility (check occupied) | |
| idx_feas_check = np.argmin(np.abs(la_secs-la_sec_feas_check)) | |
| xy_feas_check = la_xy_traj[idx_feas_check,:] | |
| is_occupied = ogm.check_point_occupancy( | |
| x = xy_feas_check[0], | |
| y = xy_feas_check[1], | |
| use_margin = True, | |
| use_local_grid = use_local_grid | |
| ) | |
| # Check the feasibility of the half-way point | |
| idx_feas_check_half = np.argmin(np.abs(la_secs-la_sec_feas_check/2.0)) | |
| xy_feas_check_half = la_xy_traj[idx_feas_check_half,:] | |
| is_occupied_half = ogm.check_point_occupancy( | |
| x = xy_feas_check_half[0], | |
| y = xy_feas_check_half[1], | |
| use_margin = True, | |
| use_local_grid = use_local_grid | |
| ) | |
| if is_occupied or is_occupied_half: # if occupied | |
| costs[t_idx] = costs[t_idx] + 1.0 # add big cost to infeasible lookahead trajectory | |
| else: # if not occupied (feasible) | |
| xy_la_pursuit_check = la_xy_traj[np.argmin(np.abs(la_secs-la_sec_pursuit_check)),:] | |
| costs[t_idx] = costs[t_idx] + np.linalg.norm(xy_la_pursuit_check-xy_pursuit) | |
| # Append | |
| xy_la_feas_check_list.append(xy_feas_check) | |
| xy_la_pursuit_check_list.append(xy_la_pursuit_check) | |
| # Get command with the smallest cost | |
| idx_min = np.argmin(costs) | |
| (dir_vel_cmd,ang_vel_cmd) = la_dir_ang_vels_list[idx_min] | |
| # Append 'lap_info' | |
| lap_info = {} | |
| lap_info['xy_curr'] = xy_curr # current point | |
| lap_info['xy_pursuit_traj'] = xy_pursuit_traj # pursuit trajectory | |
| lap_info['costs'] = costs # costs of the lookahead trajectories | |
| lap_info['dist2traj'] = dist2traj | |
| lap_info['xy_closest'] = xy_closest | |
| lap_info['xy_pursuit'] = xy_pursuit | |
| lap_info['angle_diff'] = angle_diff | |
| lap_info['la_dir_ang_vels_list'] = la_dir_ang_vels_list | |
| lap_info['xy_heading_la_trajs_list'] = xy_heading_la_trajs_list | |
| lap_info['xy_la_pursuit_check_list'] = xy_la_pursuit_check_list # pursuit check positions | |
| lap_info['xy_la_feas_check_list'] = xy_la_feas_check_list # feasibility check positions | |
| # Return | |
| return dir_vel_cmd,ang_vel_cmd,lap_info | |
| def plot_lookahead_planner_on_env( | |
| env, | |
| lap_info, | |
| plot_la_traj = True, # this takes a lot of time | |
| plot_pursuit_check_points = True, | |
| plot_feas_check_points = True, | |
| plot_pursuit_traj = True, | |
| plot_pursuit_point = True, | |
| plot_current_point = True, | |
| plot_closest_point = True, | |
| ): | |
| """ | |
| Plot lookahead planner trajectories and check points on the environment. | |
| Parameters: | |
| env: Simulation environment. | |
| lap_info (dict): Lookahead planner information. | |
| plot_la_traj (bool): Plot lookahead trajectories. | |
| plot_pursuit_check_points (bool): Plot pursuit check points. | |
| plot_feas_check_points (bool): Plot feasibility check points. | |
| plot_pursuit_traj (bool): Plot pursuit trajectory. | |
| plot_pursuit_point (bool): Plot pursuit point. | |
| plot_current_point (bool): Plot current position. | |
| plot_closest_point (bool): Plot closest point on pursuit trajectory. | |
| Returns: | |
| None | |
| """ | |
| # Parse | |
| costs = lap_info['costs'] | |
| xy_heading_la_trajs_list = lap_info['xy_heading_la_trajs_list'] | |
| xy_la_pursuit_check_list = lap_info['xy_la_pursuit_check_list'] | |
| xy_la_feas_check_list = lap_info['xy_la_feas_check_list'] | |
| xy_curr = lap_info['xy_curr'] | |
| xy_closest = lap_info['xy_closest'] | |
| xy_pursuit = lap_info['xy_pursuit'] | |
| xy_pursuit_traj = lap_info['xy_pursuit_traj'] | |
| # Colors | |
| colors = get_colors_from_costs( | |
| costs = costs, | |
| cmap = 'summer', | |
| alpha = 0.5, | |
| cost_max = 1.0, | |
| color_max = (0.5,0.5,0.5,0.1), # max_cost==infeasible | |
| color_min = (1,0,0,0.5), | |
| ) | |
| # Plot look-ahead trajectories | |
| if plot_la_traj: | |
| for traj_idx,(la_xy_traj,la_heading_traj) in enumerate(xy_heading_la_trajs_list): | |
| color = colors[traj_idx] | |
| if traj_idx == np.argmin(costs): # minimum cost trajectory | |
| env.plot_xy_heading_traj(la_xy_traj,la_heading_traj, | |
| r=0.005,plot_arrow=False,plot_cylinder=True,rgba=color) | |
| else: # other trajectories | |
| env.plot_xy_heading_traj(la_xy_traj,la_heading_traj, | |
| r=0.002,plot_arrow=False,plot_cylinder=True,rgba=color) | |
| # Plot pursuit check points | |
| if plot_pursuit_check_points: | |
| for xy in xy_la_pursuit_check_list: # pursuit check | |
| env.plot_sphere(p=xy2xyz(xy),r=0.005,rgba=(1,0,0,0.5)) | |
| # Plot feasibility check points | |
| if plot_feas_check_points: | |
| for xy in xy_la_feas_check_list: # feasibility check | |
| env.plot_sphere(p=xy2xyz(xy),r=0.005,rgba=(0,0,0,0.5)) | |
| # Plot pursuit trajectory | |
| if plot_pursuit_traj: | |
| env.plot_traj( | |
| xy_pursuit_traj, | |
| rgba = (0,0,0,0.25), | |
| plot_line = False, | |
| plot_cylinder = True, | |
| cylinder_r = 0.01, | |
| plot_sphere = True, | |
| sphere_r = 0.025, | |
| ) | |
| # Plot pursuit point 'xy_pursuit | |
| if plot_pursuit_point: | |
| env.plot_sphere(p=xy2xyz(xy_pursuit),r=0.03,rgba=(1,0,0,0.5),label='') | |
| # Plot the current point | |
| if plot_current_point: | |
| env.plot_sphere(p=xy2xyz(xy_curr),r=0.02,rgba=(0,0,1,0.5),label='') | |
| # Plot the closest point 'xy_closest' | |
| if plot_closest_point: | |
| env.plot_cylinder_fr2to( | |
| p_fr = xy2xyz(xy_curr), | |
| p_to = xy2xyz(xy_closest), | |
| r = 0.002, | |
| rgba = (0,0,1,0.5), | |
| ) | |
| env.plot_sphere(p=xy2xyz(xy_closest),r=0.02,rgba=(0,0,1,0.5),label='') | |
| def get_xy_scan_from_env(env,sensor_name_prefix='rf_'): | |
| """ | |
| Retrieve a 2D scan (XY coordinates) from the environment sensors. | |
| Parameters: | |
| env: Simulation environment. | |
| sensor_name_prefix (str): Prefix for sensor names. | |
| Returns: | |
| np.array: Array of scanned (x,y) points. | |
| """ | |
| p_rf_list = env.get_p_rf_obs_list(sensor_names=env.get_sensor_names(prefix=sensor_name_prefix)) | |
| xy_scan = np.array(p_rf_list)[:,:2] # [L x 2] | |
| return xy_scan | |
| class GaussianRandomPathClass(object): | |
| """ | |
| Gaussian Random Path (GRP) class for generating random paths via Gaussian processes. | |
| """ | |
| def __init__( | |
| self, | |
| name = 'GRP', | |
| kernel = kernel_levse, | |
| hyp_mean = {'g':1.0,'l':1.0,'w':1e-6}, | |
| hyp_var = {'g':1.0,'l':1.0,'w':1e-6} | |
| ): | |
| self.name = name | |
| self.kernel = kernel | |
| self.hyp_mean = hyp_mean | |
| self.hyp_var = hyp_var | |
| # Initialize with default anchors/test | |
| self.set_data() | |
| def set_data( | |
| self, | |
| t_anchor = np.linspace(0.0, 1.0, 10).reshape((-1,1)), | |
| x_anchor = np.zeros((10,2)), | |
| l_anchor = np.ones((10,1)), | |
| t_test = np.linspace(0.0, 1.0, 100).reshape((-1,1)), | |
| l_test = np.ones((100,1)), | |
| hyp_mean = None, | |
| hyp_var = None, | |
| w_chol = 1e-10, | |
| APPLY_EPSRU = False, | |
| t_eps = 0.0001, | |
| l_eps = 1.0, | |
| x_diff_start = None, | |
| x_diff_end = None, | |
| SKIP_GP_VAR = False | |
| ): | |
| """ | |
| Set anchor and test data, compute GP mean/variance. | |
| """ | |
| # Anchors | |
| self.t_anchor = t_anchor.astype(float) | |
| self.x_anchor = x_anchor.astype(float) | |
| self.l_anchor = l_anchor.astype(float) | |
| self.n_anchor = self.x_anchor.shape[0] | |
| self.d_anchor = self.x_anchor.shape[1] | |
| # Test | |
| self.t_test = t_test.astype(float) | |
| self.l_test = l_test.astype(float) | |
| self.n_test = self.t_test.shape[0] | |
| if hyp_mean is not None: self.hyp_mean = hyp_mean | |
| if hyp_var is not None: self.hyp_var = hyp_var | |
| # Epsilon run-up | |
| self.APPLY_EPSRU = APPLY_EPSRU | |
| if APPLY_EPSRU: | |
| # compute x_diff_start/end if needed and insert runup points | |
| # (omitted here for brevity) | |
| pass | |
| # GP mean | |
| if self.kernel.__name__ == 'kernel_levse': | |
| l_anchor_mean = np.ones((self.n_anchor,1)) | |
| K_tt = self.kernel(self.t_test, self.t_anchor, | |
| self.l_test, l_anchor_mean, | |
| self.hyp_mean) | |
| K_aa = self.kernel(self.t_anchor, self.t_anchor, | |
| l_anchor_mean, l_anchor_mean, | |
| self.hyp_mean) + self.hyp_mean['w']*np.eye(self.n_anchor) | |
| else: | |
| K_tt = self.kernel(self.t_test, self.t_anchor, self.hyp_mean) | |
| K_aa = self.kernel(self.t_anchor, self.t_anchor, self.hyp_mean) + self.hyp_mean['w']*np.eye(self.n_anchor) | |
| self.x_anchor_mean = self.x_anchor.mean(axis=0) | |
| self.gamma_test = np.linalg.solve(K_aa, self.x_anchor - self.x_anchor_mean) | |
| self.mean_test = K_tt @ self.gamma_test + self.x_anchor_mean | |
| # GP variance | |
| if SKIP_GP_VAR: | |
| return | |
| if self.kernel.__name__ == 'kernel_levse': | |
| K_tt_var = self.kernel(self.t_test, self.t_test, | |
| self.l_test, self.l_test, | |
| self.hyp_var) | |
| K_ta_var = self.kernel(self.t_test, self.t_anchor, | |
| self.l_test, self.l_anchor, | |
| self.hyp_var) | |
| K_aa_var = self.kernel(self.t_anchor, self.t_anchor, | |
| self.l_anchor, self.l_anchor, | |
| self.hyp_var) + self.hyp_mean['w']*np.eye(self.n_anchor) | |
| else: | |
| K_tt_var = self.kernel(self.t_test, self.t_test, self.hyp_var) | |
| K_ta_var = self.kernel(self.t_test, self.t_anchor, self.hyp_var) | |
| K_aa_var = self.kernel(self.t_anchor, self.t_anchor, self.hyp_var) + self.hyp_mean['w']*np.eye(self.n_anchor) | |
| self.var_test = K_tt_var - K_ta_var @ np.linalg.solve(K_aa_var, K_ta_var.T) | |
| self.std_diag_test = np.sqrt(np.diag(self.var_test)).reshape(-1,1) | |
| self.var_chol_test = np.linalg.cholesky(self.var_test + w_chol*np.eye(self.n_test)) | |
| def sample(self, n_sample=10, rand_type='Gaussian'): | |
| """ | |
| Sample multiple trajectories (mean + variance). | |
| """ | |
| samples = [] | |
| for _ in range(n_sample): | |
| if rand_type=='Gaussian': | |
| R = np.random.randn(self.n_test, self.d_anchor) | |
| else: | |
| R = 2*(np.random.rand(self.n_test, self.d_anchor)-0.5) | |
| samples.append(self.mean_test + self.var_chol_test @ R) | |
| return samples, self.t_test | |
| def sample_one_traj( | |
| self, | |
| rand_type='Gaussian', | |
| ORG_PERTURB=False, | |
| perturb_gain=1.0, | |
| ss_x_min=None, | |
| ss_x_max=None, | |
| ss_margin=None | |
| ): | |
| """ | |
| Sample a single trajectory with optional perturb/squash. | |
| """ | |
| samples, _ = self.sample(n_sample=1, rand_type=rand_type) | |
| traj = samples[0] | |
| if ORG_PERTURB: | |
| traj += (2*perturb_gain*np.random.rand(1,self.d_anchor)-perturb_gain) | |
| if ss_x_min is not None and ss_x_max is not None and ss_margin is not None: | |
| traj = soft_squash_multidim(traj, ss_x_min, ss_x_max, ss_margin) | |
| return traj, self.t_test | |
| def plot( | |
| self, | |
| n_sample = 10, | |
| ss_x_min = None, | |
| ss_x_max = None, | |
| ss_margin = None, | |
| figsize = (6,3), | |
| subplot_rc = None, | |
| lw_sample = 0.25, | |
| ylim = None, | |
| title_str = None | |
| ): | |
| """ | |
| Plot mean and samples with 2-sigma. | |
| """ | |
| samples, t = self.sample(n_sample=n_sample) | |
| for d in range(self.d_anchor): | |
| plt.figure(figsize=figsize) | |
| for traj in samples: | |
| plt.plot(t, traj[:,d], '-', alpha=0.4) | |
| plt.plot(t, self.mean_test[:,d], '-', lw=2) | |
| plt.fill_between(t.squeeze(), | |
| self.mean_test[:,d]-2*self.std_diag_test.squeeze(), | |
| self.mean_test[:,d]+2*self.std_diag_test.squeeze(), | |
| alpha=0.2) | |
| if ylim: plt.ylim(ylim) | |
| plt.title(title_str or f"Dim-{d}") | |
| plt.show() | |
| def set_prior( | |
| self, | |
| q_init = None, | |
| dur_sec = 2.0, | |
| HZ = 50, | |
| hyp = None, | |
| eps_sec = 0.01 | |
| ): | |
| """ | |
| Initialize prior with uniform or run-up anchors. | |
| """ | |
| if q_init is None: q_init = np.zeros(self.d_anchor) | |
| if eps_sec==0: | |
| t_anchor = np.array([[0.0,dur_sec]]).T | |
| x_anchor = np.tile(q_init, (2,1)) | |
| l_anchor = np.ones((2,1)) | |
| else: | |
| t_anchor = np.array([[0.0,eps_sec,dur_sec-eps_sec,dur_sec]]).T | |
| x_anchor = np.tile(q_init, (4,1)) | |
| l_anchor = np.ones((4,1)) | |
| t_test = np.linspace(0.0, dur_sec, int(dur_sec*HZ)).reshape(-1,1) | |
| l_test = np.ones_like(t_test) | |
| self.set_data(t_anchor, x_anchor, l_anchor, | |
| t_test, l_test, hyp_mean=hyp, hyp_var=hyp) | |
| def interpolate( | |
| self, | |
| t_anchor, | |
| x_anchor, | |
| t_test, | |
| hyp={'g':1,'l':1/5,'w':1e-8}, | |
| APPLY_EPSRU=False, | |
| t_eps=0.05, | |
| x_diff_start=None, | |
| x_diff_end=None, | |
| x_min=None, | |
| x_max=None, | |
| margin=1.0 | |
| ): | |
| """ | |
| Interpolate trajectory given anchors. | |
| """ | |
| self.set_data( | |
| t_anchor=t_anchor, | |
| x_anchor=x_anchor, | |
| l_anchor=np.ones((len(t_anchor),1)), | |
| t_test=t_test, | |
| l_test=np.ones((len(t_test),1)), | |
| hyp_mean=hyp, | |
| hyp_var=hyp, | |
| APPLY_EPSRU=APPLY_EPSRU, | |
| t_eps=t_eps, | |
| x_diff_start=x_diff_start, | |
| x_diff_end=x_diff_end, | |
| SKIP_GP_VAR=True | |
| ) | |
| traj = self.mean_test | |
| if x_min is not None and x_max is not None: | |
| traj = soft_squash_multidim(traj, x_min, x_max, margin) | |
| return traj | |
| def lev_interpolate( | |
| self, | |
| t_anchor, | |
| x_anchor, | |
| t_test, | |
| hyp={'g':1,'l':1/5,'w':1e-8}, | |
| lev_val=0.9, | |
| APPLY_EPSRU=False, | |
| t_eps=0.05, | |
| x_diff_start=None, | |
| x_diff_end=None, | |
| x_min=None, | |
| x_max=None, | |
| margin=1.0 | |
| ): | |
| """ | |
| Leveraged interpolation with variance. | |
| """ | |
| l_anchor = lev_val*np.ones((len(t_anchor),1)) | |
| self.set_data( | |
| t_anchor=t_anchor, | |
| x_anchor=x_anchor, | |
| l_anchor=l_anchor, | |
| t_test=t_test, | |
| l_test=np.ones((len(t_test),1)), | |
| hyp_mean=hyp, | |
| hyp_var=hyp, | |
| APPLY_EPSRU=APPLY_EPSRU, | |
| t_eps=t_eps, | |
| x_diff_start=x_diff_start, | |
| x_diff_end=x_diff_end, | |
| SKIP_GP_VAR=False | |
| ) | |
| traj, _ = self.sample_one_traj() | |
| if x_min is not None and x_max is not None: | |
| traj = soft_squash_multidim(traj, x_min, x_max, margin) | |
| return traj | |
| def doubly_log_scale(start,stop,num,t_min=1.0,t_max=10.0): | |
| """ | |
| Generate a doubly logarithmically scaled sequence. | |
| Parameters: | |
| start (float): Start value. | |
| stop (float): End value. | |
| num (int): Number of points. | |
| t_min (float): Minimum scale value. | |
| t_max (float): Maximum scale value. | |
| Returns: | |
| np.array: Doubly log-scaled sequence. | |
| """ | |
| # Mid point | |
| t_mid = t_min + (t_max-t_min)/2.0 | |
| # Front half | |
| log_min = np.log10(t_min) | |
| log_mid = np.log10(t_mid) | |
| dense_front = np.logspace(log_min,log_mid,num//2,endpoint=False) | |
| # Latter half | |
| dense_back = np.logspace(log_min,log_mid,num//2,endpoint=True) | |
| dense_back = t_max - (dense_back[::-1] - t_min) | |
| # Concat | |
| result = np.concatenate([dense_front, dense_back]) | |
| # Un-normalize | |
| result = start + (result-t_min) * (stop-start) / (t_max-t_min) | |
| # Return | |
| return result | |
| def get_jnt_range_for_free_jnt(jnt_type, jnt_range, names=None): | |
| """ | |
| Get joint range settings for free joints using roll-pitch-yaw configuration. | |
| Parameters: | |
| jnt_type (list): List of joint types. | |
| jnt_range (list): List of joint range arrays. | |
| names (list, optional): Joint names. | |
| Returns: | |
| np.array: Combined joint ranges. | |
| """ | |
| joint_ranges = [] # 데이터를 리스트에 저장 | |
| for i, t in enumerate(jnt_type): | |
| if t == 0: | |
| rng = np.array([[-3.0, 3.0], | |
| [-3.0, 3.0], | |
| [-3.0, 3.0], | |
| [-3.14, 3.14], | |
| [-3.14, 3.14], | |
| [-3.14, 3.14]]) | |
| joint_ranges.append(rng) | |
| elif t == 3 or t==1: | |
| joint_ranges.append(jnt_range[i].reshape(-1, 2)) | |
| # 최종적으로 리스트를 NumPy 배열로 결합 | |
| return np.vstack(joint_ranges) if joint_ranges else np.array([]).reshape(-1, 2) | |
| def post_processing_jnt_value_for_free_jnt(output, jnt_type): | |
| """ | |
| Post-process joint values for free joints (convert RPY to quaternion). | |
| Parameters: | |
| output (array): Raw joint values. | |
| jnt_type (list): List of joint types. | |
| Returns: | |
| np.array: Processed joint values. | |
| """ | |
| processed_output = [] # 데이터를 리스트에 저장 | |
| stacked_idx = 0 | |
| for i, t in enumerate(jnt_type): | |
| current_jnt_idx = i+stacked_idx | |
| # for free_joint | |
| if t == 0: | |
| temp_xyz = output[current_jnt_idx:current_jnt_idx+3] | |
| temp_quat = r2quat(rpy2r(output[current_jnt_idx+3:current_jnt_idx+6])) | |
| processed_output.append(temp_xyz) | |
| processed_output.append(temp_quat) | |
| stacked_idx += 5 | |
| # for revolute_joint | |
| else: | |
| temp_output = output[current_jnt_idx:current_jnt_idx+1] | |
| processed_output.append(temp_output) | |
| # 최종적으로 리스트를 NumPy 배열로 결합 | |
| return np.hstack(processed_output) if processed_output else np.array([]) | |
| def post_processing_ctrl_value_for_free_jnt(output, ctrl_type): | |
| """ | |
| Post-process control values for free joints (convert RPY to quaternion). | |
| Parameters: | |
| output (array): Raw control values. | |
| ctrl_type (list): List of control types. | |
| Returns: | |
| np.array: Processed control values. | |
| """ | |
| processed_output = [] # 데이터를 리스트에 저장 | |
| stacked_idx = 0 | |
| for i, t in enumerate(ctrl_type): | |
| current_jnt_idx = i+stacked_idx | |
| # for free_joint | |
| if t == 0: | |
| temp_xyz = output[current_jnt_idx:current_jnt_idx+3] | |
| temp_quat = r2quat(rpy2r(output[current_jnt_idx+3:current_jnt_idx+6])) | |
| processed_output.append(temp_xyz) | |
| processed_output.append(temp_quat) | |
| stacked_idx += 5 | |
| # for revolute_joint | |
| else: | |
| temp_output = output[current_jnt_idx:current_jnt_idx+1] | |
| processed_output.append(temp_output) | |
| # 최종적으로 리스트를 NumPy 배열로 결합 | |
| return np.hstack(processed_output) if processed_output else np.array([]) | |
| def print_red(str): | |
| """ | |
| Print a string in red color. | |
| Parameters: | |
| str (str): String to print. | |
| Returns: | |
| None | |
| """ | |
| print (colored(str,'red')) | |
| def print_yellow(str): | |
| """ | |
| Print a string in yellow color. | |
| Parameters: | |
| str (str): String to print. | |
| Returns: | |
| None | |
| """ | |
| print (colored(str,'yellow')) | |
| def print_green(str): | |
| """ | |
| Print a string in green color. | |
| Parameters: | |
| str (str): String to print. | |
| Returns: | |
| None | |
| """ | |
| print (colored(str,'green')) | |
| def print_blue(str): | |
| """ | |
| Print a string in blue color. | |
| Parameters: | |
| str (str): String to print. | |
| Returns: | |
| None | |
| """ | |
| print (colored(str,'blue')) | |
| def print_light_green(str): | |
| """ | |
| Print a string in light green color. | |
| Parameters: | |
| str (str): String to print. | |
| Returns: | |
| None | |
| """ | |
| print (colored(str,'light_green')) | |
| def print_light_blue(str): | |
| """ | |
| Print a string in light blue color. | |
| Parameters: | |
| str (str): String to print. | |
| Returns: | |
| None | |
| """ | |
| print (colored(str,'light_blue')) | |
| def generate_xy_trajs( | |
| xy_curr, # current position (m), shape: (2,) | |
| angle_curr, # current heading angle (rad), scalar | |
| v_list = [0.2], # directional velocities (m/s), list or array | |
| w_list = [-0.5*np.pi, 0.0, +0.5*np.pi], # angular velocities (rad/s), list or array | |
| dt = 0.05, # time step size (s), scalar | |
| T = 1.0, # total duration (s), scalar | |
| sparse_interval = 1, # sparse interval | |
| ): | |
| """ | |
| Generate sample trajectories based on given velocities and angular velocities. | |
| Parameters: | |
| xy_curr (array-like): Current position [x, y] in meters, shape (2,). | |
| angle_curr (float): Current heading angle in radians. | |
| v_list (list or array-like): List of linear velocities in m/s. | |
| w_list (list or array-like): List of angular velocities in rad/s. | |
| dt (float): Time interval between trajectory points in seconds. | |
| T (float): Total duration of trajectories in seconds. | |
| Returns: | |
| xy_trajs (np.ndarray): Array of trajectory points, shape (num_traj, L, 2), | |
| where num_traj = len(v_list)*len(w_list), L = T/dt. | |
| heading_trajs (np.ndarray): Array of heading angles, shape (num_traj, L). | |
| times (np.ndarray): Array of time points, shape (L,). | |
| """ | |
| # Generate a grid of linear and angular velocities | |
| v_grid, w_grid = np.meshgrid(v_list, w_list, indexing='ij') | |
| v_flat = v_grid.ravel() # shape: (num_traj,) | |
| w_flat = w_grid.ravel() # shape: (num_traj,) | |
| L = int(T / dt) # Number of time steps | |
| times = np.arange(0, T, dt) # shape: (L,) | |
| # Compute heading angles for each trajectory and timestep | |
| angles = angle_curr + np.outer(w_flat, times) # shape: (num_traj, L) | |
| eps = 1e-8 | |
| x_offsets = np.zeros((v_flat.size, L)) # shape: (num_traj, L) | |
| y_offsets = np.zeros((v_flat.size, L)) # shape: (num_traj, L) | |
| # Separate trajectories based on whether angular velocity is zero or not | |
| nonzero_mask = np.abs(w_flat) > eps | |
| zero_mask = ~nonzero_mask | |
| # Closed-form solution for nonzero angular velocities | |
| if np.any(nonzero_mask): | |
| w_nz = w_flat[nonzero_mask] | |
| v_nz = v_flat[nonzero_mask] | |
| x_offsets[nonzero_mask, :] = (v_nz / w_nz)[:, None] * ( | |
| np.sin(angle_curr + np.outer(w_nz, times)) - np.sin(angle_curr) | |
| ) | |
| y_offsets[nonzero_mask, :] = - (v_nz / w_nz)[:, None] * ( | |
| np.cos(angle_curr + np.outer(w_nz, times)) - np.cos(angle_curr) | |
| ) | |
| # Linear movement for zero angular velocities | |
| if np.any(zero_mask): | |
| v_z = v_flat[zero_mask] | |
| x_offsets[zero_mask, :] = v_z[:, None] * times * np.cos(angle_curr) | |
| y_offsets[zero_mask, :] = v_z[:, None] * times * np.sin(angle_curr) | |
| # Combine initial positions with calculated offsets to form trajectories | |
| xy_trajs = np.zeros((v_flat.size, L, 2)) # shape: (num_traj, L, 2) | |
| xy_trajs[:, :, 0] = xy_curr[0] + x_offsets | |
| xy_trajs[:, :, 1] = xy_curr[1] + y_offsets | |
| heading_trajs = angles # shape: (num_traj, L) | |
| # Sparse sampling indeces | |
| if sparse_interval > 1: | |
| sparse_indices = np.arange(0, L, sparse_interval) | |
| if sparse_indices[-1] != L - 1: | |
| sparse_indices = np.append(sparse_indices, L - 1) | |
| xy_trajs = xy_trajs[:, sparse_indices, :] | |
| heading_trajs = heading_trajs[:, sparse_indices] | |
| times = times[sparse_indices] | |
| return xy_trajs, heading_trajs, times | |
| def sample_trajs( | |
| xy_curr, # current position (m), shape: (2,) | |
| angle_curr, # current heading angle (rad), scalar | |
| v_list = [0.2], # directional velocities (m/s), list or array | |
| w_list = [-0.5*np.pi, 0.0, +0.5*np.pi], # angular velocities (rad/s), list or array | |
| dt = 0.05, # time step size (s), scalar | |
| T = 1.0, # total duration (s), scalar | |
| sparse_interval = 1, # sparse interval | |
| ): | |
| """ | |
| Generate sample trajectories based on given velocities and angular velocities. | |
| Parameters: | |
| xy_curr (array-like): Current position [x, y] in meters, shape (2,). | |
| angle_curr (float): Current heading angle in radians. | |
| v_list (list or array-like): List of linear velocities in m/s. | |
| w_list (list or array-like): List of angular velocities in rad/s. | |
| dt (float): Time interval between trajectory points in seconds. | |
| T (float): Total duration of trajectories in seconds. | |
| Returns: | |
| xy_trajs (np.ndarray): Array of trajectory points, shape (num_traj, L, 2), | |
| where num_traj = len(v_list)*len(w_list), L = T/dt. | |
| heading_trajs (np.ndarray): Array of heading angles, shape (num_traj, L). | |
| times (np.ndarray): Array of time points, shape (L,). | |
| """ | |
| # Generate a grid of linear and angular velocities | |
| v_grid, w_grid = np.meshgrid(v_list, w_list, indexing='ij') | |
| v_flat = v_grid.ravel() # shape: (num_traj,) | |
| w_flat = w_grid.ravel() # shape: (num_traj,) | |
| L = int(T / dt) # Number of time steps | |
| times = np.arange(0, T, dt) # shape: (L,) | |
| # Compute heading angles for each trajectory and timestep | |
| angles = angle_curr + np.outer(w_flat, times) # shape: (num_traj, L) | |
| eps = 1e-8 | |
| x_offsets = np.zeros((v_flat.size, L)) # shape: (num_traj, L) | |
| y_offsets = np.zeros((v_flat.size, L)) # shape: (num_traj, L) | |
| # Separate trajectories based on whether angular velocity is zero or not | |
| nonzero_mask = np.abs(w_flat) > eps | |
| zero_mask = ~nonzero_mask | |
| # Closed-form solution for nonzero angular velocities | |
| if np.any(nonzero_mask): | |
| w_nz = w_flat[nonzero_mask] | |
| v_nz = v_flat[nonzero_mask] | |
| x_offsets[nonzero_mask, :] = (v_nz / w_nz)[:, None] * ( | |
| np.sin(angle_curr + np.outer(w_nz, times)) - np.sin(angle_curr) | |
| ) | |
| y_offsets[nonzero_mask, :] = - (v_nz / w_nz)[:, None] * ( | |
| np.cos(angle_curr + np.outer(w_nz, times)) - np.cos(angle_curr) | |
| ) | |
| # Linear movement for zero angular velocities | |
| if np.any(zero_mask): | |
| v_z = v_flat[zero_mask] | |
| x_offsets[zero_mask, :] = v_z[:, None] * times * np.cos(angle_curr) | |
| y_offsets[zero_mask, :] = v_z[:, None] * times * np.sin(angle_curr) | |
| # Combine initial positions with calculated offsets to form trajectories | |
| xy_trajs = np.zeros((v_flat.size, L, 2)) # shape: (num_traj, L, 2) | |
| xy_trajs[:, :, 0] = xy_curr[0] + x_offsets | |
| xy_trajs[:, :, 1] = xy_curr[1] + y_offsets | |
| heading_trajs = angles # shape: (num_traj, L) | |
| # Sparse sampling indeces | |
| if sparse_interval > 1: | |
| sparse_indices = np.arange(0, L, sparse_interval) | |
| if sparse_indices[-1] != L - 1: | |
| sparse_indices = np.append(sparse_indices, L - 1) | |
| xy_trajs = xy_trajs[:, sparse_indices, :] | |
| heading_trajs = heading_trajs[:, sparse_indices] | |
| times = times[sparse_indices] | |
| return xy_trajs, heading_trajs, times | |
| def norm(x): | |
| """ | |
| Compute the L2 norm of a vector or matrix. | |
| Parameters: | |
| x (array-like): Input vector or matrix. | |
| Returns: | |
| float: L2 norm of the input. | |
| """ | |
| return np.linalg.norm(x) | |
| def print_type_shape(name, x): | |
| """ | |
| print type and shape | |
| """ | |
| # x에 shape 속성이 있으면 그대로 사용 | |
| if hasattr(x, 'shape'): | |
| shape = x.shape | |
| else: | |
| try: | |
| shape = np.array(x).shape | |
| except Exception: | |
| shape = "N/A" | |
| print("['%s'] type:%s shape:%s" % (name, type(x), shape)) | |
| def print_gym_env_shape(env): | |
| """ | |
| print gym observation and action shape | |
| """ | |
| obs,_ = env.reset() | |
| action = env.action_space.sample() | |
| print_type_shape('obs',obs) | |
| print_type_shape('action',action) | |
| next_obs,rewards,terminated,truncated,infos = env.step(action=action) | |
| print (" env.step()") | |
| print_type_shape('next_obs',next_obs) | |
| print_type_shape('rewards',rewards) | |
| print_type_shape('terminated',terminated) | |
| print_type_shape('truncated',truncated) | |
| def farthest_point_sampling(pcd, n_sample): | |
| """ | |
| Performs Farthest Point Sampling on a given point cloud. | |
| Args: | |
| pcd (numpy.ndarray): The input point cloud of shape (N, D), where N is the number of points and D is the dimensionality. | |
| n_sample (int): The number of points to sample. | |
| Returns: | |
| numpy.ndarray: The indices of the sampled points in the original point cloud. | |
| """ | |
| assert len(pcd.shape) == 2, "Input pcd must be a 2D array." | |
| N, D = pcd.shape | |
| # If the number of points is less than or equal to the number of samples, include all points and sample additional ones | |
| if n_sample >= N: | |
| additional_indices = np.random.choice(N, n_sample - N, replace=True) | |
| return np.concatenate([np.arange(N, dtype=np.int32), additional_indices]) | |
| # Initialize arrays | |
| sampled_indices = [] # List to store the indices of sampled points | |
| distances = np.ones(N) * 1e7 # Initialize distances to a large value | |
| # Randomly pick the first point | |
| first_index = np.random.randint(0, N) | |
| sampled_indices.append(first_index) | |
| for _ in range(n_sample - 1): | |
| # Get the last sampled point | |
| last_sampled_point = pcd[sampled_indices[-1]].reshape(1, -1) | |
| # Compute distances from the last sampled point to all other points | |
| current_distances = np.linalg.norm(pcd - last_sampled_point, axis=1) | |
| # Update the minimum distances for each point | |
| distances = np.minimum(distances, current_distances) | |
| # Select the farthest point from the already sampled points | |
| next_index = np.argmax(distances) | |
| sampled_indices.append(next_index) | |
| return np.array(sampled_indices, dtype=np.int32) | |
| def add_title_to_img(img,text='Title',margin_top=30,color=(0,0,0),font_size=20,resize=True,shape=(300,300)): | |
| """ | |
| Add title to image | |
| """ | |
| # Resize | |
| img_copied = img.copy() | |
| if resize: | |
| img_copied = cv2.resize(img_copied,shape,interpolation=cv2.INTER_NEAREST) | |
| # Convert to PIL image | |
| pil_img = Image.fromarray(img_copied)# | |
| width, height = pil_img.size | |
| new_height = margin_top + height | |
| # Create new image with top margin | |
| new_img = Image.new("RGB", (width, new_height),color=(255,255,255)) | |
| # Paste the original image | |
| new_img.paste(pil_img, (0, margin_top)) | |
| # Draw text | |
| draw = ImageDraw.Draw(new_img) | |
| font = ImageFont.load_default(size=font_size) | |
| bbox = draw.textbbox((0,0),text,font=font) | |
| # Center text | |
| text_width = bbox[2] - bbox[0] | |
| text_height = bbox[3] - bbox[1] | |
| x = (width - text_width) // 2 | |
| y = (margin_top - text_height) // 2 | |
| # Draw text | |
| draw.text((x, y), text, font=font, fill=color) | |
| img_with_title = np.array(new_img) | |
| # Return | |
| return img_with_title | |
| def block_mtx(M11,M12,M21,M22): | |
| M_upper = np.concatenate((M11,M12),axis=1) | |
| M_lower = np.concatenate((M21,M22),axis=1) | |
| M = np.concatenate((M_upper,M_lower),axis=0) | |
| return M | |
| def inv_inc(inv_A,b,c): | |
| """ | |
| Incremental inverse using matrix inverse lemma | |
| """ | |
| k = c - b.T @ inv_A @ b | |
| M11 = inv_A + 1/k * inv_A @ b @ b.T @ inv_A | |
| M12 = -1/k * inv_A @ b | |
| M21 = -1/k * b.T @ inv_A | |
| M22 = 1/k | |
| M = block_mtx(M11=M11,M12=M12,M21=M21,M22=M22) | |
| return M | |
| def det_inc(det_A,inv_A,b,c): | |
| """ | |
| Incremental determinant computation | |
| """ | |
| out = det_A * (c - b.T @ inv_A @ b) | |
| return out | |
| def ikdpp_sampling( | |
| xs_total, | |
| qs_total = None, | |
| n_select = 10, | |
| n_trunc = np.inf, | |
| hyp = {'g':1.0,'l':0.001,'w':1e-8} | |
| ): | |
| """ | |
| Performs (truncated) Incremental k-DPP (i-kDPP) sampling from a given dataset. | |
| This function incrementally selects a diverse subset of `n_select` points from `xs_total`, | |
| using a determinant-based diversity measure. It optionally supports truncation to limit | |
| computational cost by considering only a subset of the remaining candidates at each step. | |
| Args: | |
| xs_total (np.ndarray): Input data of shape (N, D), where N is the number of data points and D is the dimension. | |
| qs_total (np.ndarray or None): Optional quality scores for each data point. Default is None. | |
| n_select (int): Number of points to sample (excluding the initial random one). | |
| n_trunc (int or float): Maximum number of candidates to consider per iteration (for truncation). Default is np.inf (no truncation). | |
| hyp (dict): Hyperparameters for the RBF kernel function. Default is {'g':1.0, 'l':1.0, 'w':1e-8}. | |
| Returns: | |
| xs_ikdpp (np.ndarray): Sampled subset of shape (n_select, D). | |
| idxs_ikdpp (np.ndarray): Indices of selected points in the original dataset. | |
| """ | |
| n_total = xs_total.shape[0] | |
| idxs_remain = np.arange(0,n_total,1,dtype=np.int32) | |
| idxs_select = [] | |
| for i_idx in range(n_select+1): # loop | |
| n_remain = len(idxs_remain) | |
| if i_idx == 0: # random first | |
| idx_select = np.random.permutation(n_total)[0] | |
| if qs_total is not None: | |
| q = 1.0+qs_total[idx_select] | |
| else: | |
| q = 1.0 | |
| det_K_prev = q | |
| K_inv_prev = 1/q*np.ones(shape=(1,1)) | |
| else: | |
| xs_select = xs_total[idxs_select,:] | |
| # Compute determinants | |
| dets = np.zeros(shape=n_remain) | |
| # for r_idx in range(n_remain): # for the remained indices | |
| for r_idx in np.random.permutation(n_remain)[:min(n_remain,n_trunc)]: | |
| # Compute the determinant of the appended kernel matrix | |
| k_vec = kernel_se( | |
| X1 = xs_select, | |
| X2 = xs_total[idxs_remain[r_idx],:].reshape(1,-1), | |
| hyp = hyp) | |
| if qs_total is not None: | |
| q = qs_total[idxs_remain[r_idx]] | |
| else: | |
| q = 1.0 | |
| det_check = det_inc( | |
| det_A = det_K_prev, | |
| inv_A = K_inv_prev, | |
| b = k_vec, | |
| c = q) | |
| # Append the determinant | |
| dets[r_idx] = det_check | |
| # Get the index with the highest determinant | |
| idx_temp = np.where(dets == np.amax(dets))[0][0] | |
| idx_select = idxs_remain[idx_temp] | |
| # Compute 'det_K_prev' and 'K_inv_prev' | |
| det_K_prev = dets[idx_temp] | |
| k_vec = kernel_se( | |
| xs_select, | |
| xs_total[idx_select,:].reshape(1,-1), | |
| hyp=hyp) | |
| if qs_total is not None: | |
| q = 1+qs_total[idx_select] | |
| else: | |
| q = 1.0 | |
| K_inv_prev = inv_inc( | |
| inv_A = K_inv_prev, | |
| b = k_vec, | |
| c = q) | |
| # Remove currently selected index from 'idxs_remain' | |
| idxs_remain = idxs_remain[idxs_remain != idx_select] | |
| # Append currently selected index to 'idxs_select' | |
| idxs_select.append(idx_select) | |
| # Select the subset from 'xs_total' with removing the first sample | |
| idxs_select = idxs_select[1:] # excluding the first one | |
| idxs_ikdpp = np.array(idxs_select) | |
| xs_ikdpp = xs_total[idxs_ikdpp] | |
| return xs_ikdpp,idxs_ikdpp | |
| def find_all_file_paths(root_dir, target_filename, verbose=False): | |
| """ | |
| Recursively searches for all occurrences of a specified file within a directory tree. | |
| This function traverses all subdirectories of the given root directory and collects | |
| the full paths of files that exactly match the specified file name. It returns a list | |
| of matched file paths relative to the environment's filesystem. | |
| Args: | |
| root_dir (str): Path to the root directory where the search begins. | |
| Can be an absolute or relative path (e.g., from the Jupyter Notebook). | |
| target_filename (str): Name of the file to search for (e.g., 'data.json', 'results.csv'). | |
| Returns: | |
| List[str]: A list of full file paths (including directories) where the target file is found. | |
| Example: | |
| >>> find_all_file_paths('projects/', 'config.yaml') | |
| ['projects/run1/config.yaml', 'projects/experiment/config.yaml'] | |
| """ | |
| matched_paths = [] | |
| for dirpath, dirnames, filenames in os.walk(root_dir): # walk through all subdirectories | |
| # Check if the target filename exists in the current directory | |
| if target_filename in filenames: | |
| matched_paths.append(os.path.join(dirpath, target_filename)) | |
| if verbose: | |
| for path_idx,matched_path in enumerate(matched_paths): | |
| print("[%d/%d] object_path:[%s]"%(path_idx,len(matched_paths),matched_path)) | |
| # Return | |
| return matched_paths | |
| def indent_xml(elem): | |
| """ | |
| Convert an XML Element into a neatly formatted, indented XML string. | |
| Parameters: | |
| elem (xml.etree.ElementTree.Element): XML element to format. | |
| Returns: | |
| str: Pretty-printed XML string without empty lines. | |
| """ | |
| rough_string = ET.tostring(elem, 'utf-8') | |
| reparsed = minidom.parseString(rough_string) | |
| pretty_xml = reparsed.toprettyxml(indent=" ") | |
| # Remove empty lines | |
| lines = [line for line in pretty_xml.splitlines() if line.strip()] | |
| return "\n".join(lines) | |
| def merge_mjcfs( | |
| mjcf_model_name = 'scene', | |
| included_mjcf_files = ['a.xml', 'b.xml'], | |
| output_xml_path = 'merged_scene.xml', | |
| timestep = 0.002, | |
| integrator = 'implicitfast', | |
| memory = None, # "500M", "1G", etc. | |
| armature = None, # e.g., 1.0 | |
| damping = None, # e.g., 2.0 | |
| cone = None, # e.g., 'elliptic' | |
| verbose = True, | |
| ): | |
| """ | |
| Merges multiple MJCF files into a single MuJoCo model XML, adds optional simulation settings, | |
| pretty-prints it, saves it to a file, and returns the file path. | |
| Args: | |
| mjcf_model_name (str): The value of the 'model' attribute for the <mujoco> root element. | |
| included_mjcf_files (List[str]): List of MJCF file paths to include via <include file="..."/> tags. | |
| output_xml_path (str): File path where the generated XML will be saved. | |
| timestep (float): Simulation timestep for the <option> tag. Default is 0.002. | |
| integrator (str): Integrator type for the <option> tag. Default is "implicitfast". | |
| memory (str or None): Memory setting for the <size> tag (e.g., "500M", "1G"). | |
| If None, the <size> tag is not added. If True, defaults to "1G". | |
| armature (float or None): Default armature for joints. If None, not added. (e.g., 1.0) | |
| damping (float or None): Default damping for joints. If None, not added. (e.g., 2.0) | |
| cone (str or None): Cone friction model. If None, not added. (e.g., 'elliptic') | |
| verbose (bool): If True, print included files and output path. Default is False. | |
| Returns: | |
| str: The path to the saved XML file. | |
| """ | |
| # Create root element | |
| mujoco_elem = ET.Element("mujoco", attrib={"model": mjcf_model_name}) | |
| output_dir = os.path.dirname(os.path.abspath(output_xml_path)) | |
| # Add include elements | |
| for filepath in included_mjcf_files: | |
| abs_path = os.path.abspath(filepath) | |
| rel_path = os.path.relpath(abs_path, output_dir) | |
| ET.SubElement(mujoco_elem, "include", attrib={"file": rel_path}) | |
| # Add <default> tag if needed | |
| if armature is not None or damping is not None: | |
| default_elem = ET.SubElement(mujoco_elem, "default") | |
| joint_attrs = {} | |
| if armature is not None: | |
| joint_attrs["armature"] = str(armature) | |
| if damping is not None: | |
| joint_attrs["damping"] = str(damping) | |
| ET.SubElement(default_elem, "joint", attrib=joint_attrs) | |
| # Add <option> tag with extra attributes if given | |
| option_attrs = { | |
| "timestep": str(timestep), | |
| "integrator": integrator, | |
| } | |
| if cone is not None: | |
| option_attrs["cone"] = cone | |
| ET.SubElement(mujoco_elem, "option", attrib=option_attrs) | |
| # Add <size> tag if memory is given | |
| if memory is not None: | |
| mem_value = memory if isinstance(memory, str) else "1G" | |
| ET.SubElement(mujoco_elem, "size", attrib={"memory": mem_value}) | |
| # Convert to pretty string | |
| pretty_xml = indent_xml(mujoco_elem) | |
| # Ensure the directory exists | |
| if output_dir: | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Write to file | |
| with open(output_xml_path, "w", encoding="utf-8") as f: | |
| f.write(pretty_xml) | |
| # Verbose output | |
| if verbose: | |
| print("[merge_mjcfs] Merging [%d] MJCF files:"%(len(included_mjcf_files))) | |
| for f_idx,f in enumerate(included_mjcf_files): | |
| print(" - [%d] [%s]"%(f_idx,f)) | |
| print(f"[merge_mjcfs] Saved merged XML to:[%s]"%(output_xml_path)) | |
| return output_xml_path | |
| def sample_list(list, n_sample, verbose=False): | |
| """ | |
| Randomly sample n elements from a list. | |
| Parameters: | |
| list (list): Input list to sample from. | |
| n_sample (int): Number of samples to draw. | |
| Returns: | |
| list: Sampled elements. | |
| """ | |
| subset = random.sample(list,n_sample) | |
| if verbose: | |
| print("Sampled [%d] elements from the list consists of [%d] items:"%(n_sample,len(list))) | |
| for i, item in enumerate(subset): | |
| print(" [%d] [%s]"%(i,item)) | |
| # Return | |
| return subset | |
| class PID_ControllerClass(object): | |
| def __init__( | |
| self, | |
| name = 'PID', | |
| k_p = 0.01, | |
| k_i = 0.0, | |
| k_d = 0.001, | |
| dt = 0.01, | |
| dim = 1, | |
| dt_min = 1e-6, | |
| out_min = -np.inf, | |
| out_max = np.inf, | |
| ANTIWU = True, # anti-windup | |
| out_alpha = 0.0, # output EMA (0: no EMA) | |
| ): | |
| """ | |
| Initialize PID Controller. | |
| Parameters: | |
| name (str): Name of the PID controller. | |
| k_p (float): Proportional gain. | |
| k_i (float): Integral gain. | |
| k_d (float): Derivative gain. | |
| dt (float): Default timestep. | |
| dim (int): Dimension of control signals. | |
| dt_min (float): Minimum allowed timestep. | |
| out_min (float): Minimum control output. | |
| out_max (float): Maximum control output. | |
| ANTIWU (bool): Flag to enable anti-windup. | |
| out_alpha (float): EMA smoothing factor for output (0 for no smoothing). | |
| """ | |
| self.name = name | |
| self.k_p = k_p | |
| self.k_i = k_i | |
| self.k_d = k_d | |
| self.dt = dt | |
| self.dim = dim | |
| self.dt_min = dt_min | |
| self.out_min = np.array(out_min) | |
| self.out_max = np.array(out_max) | |
| self.ANTIWU = ANTIWU | |
| self.out_alpha = out_alpha | |
| # Buffers | |
| self.cnt = 0 | |
| self.x_trgt = np.zeros(shape=self.dim) | |
| self.x_curr = np.zeros(shape=self.dim) | |
| self.out_val = np.zeros(shape=self.dim) | |
| self.out_val_prev = np.zeros(shape=self.dim) | |
| self.t_curr = 0.0 | |
| self.t_prev = 0.0 | |
| self.err_curr = np.zeros(shape=self.dim) | |
| self.err_intg = np.zeros(shape=self.dim) | |
| self.err_prev = np.zeros(shape=self.dim) | |
| self.p_term = np.zeros(shape=self.dim) | |
| self.d_term = np.zeros(shape=self.dim) | |
| self.err_out = np.zeros(shape=self.dim) | |
| def reset(self,t_curr=0.0): | |
| """ | |
| Reset PID controller states. | |
| Parameters: | |
| t_curr (float): Current timestamp to reset controller. | |
| Returns: | |
| None | |
| """ | |
| self.cnt = 0 | |
| self.x_trgt = np.zeros(shape=self.dim) | |
| self.x_curr = np.zeros(shape=self.dim) | |
| self.out_val = np.zeros(shape=self.dim) | |
| self.out_val_prev = np.zeros(shape=self.dim) | |
| self.t_curr = t_curr | |
| self.t_prev = t_curr | |
| self.err_curr = np.zeros(shape=self.dim) | |
| self.err_intg = np.zeros(shape=self.dim) | |
| self.err_prev = np.zeros(shape=self.dim) | |
| self.p_term = np.zeros(shape=self.dim) | |
| self.d_term = np.zeros(shape=self.dim) | |
| self.err_out = np.zeros(shape=self.dim) | |
| def update( | |
| self, | |
| t_curr = None, | |
| x_trgt = None, | |
| x_curr = None, | |
| verbose = False | |
| ): | |
| """ | |
| Update PID controller with new measurements. | |
| Parameters: | |
| t_curr (float): Current timestamp. | |
| x_trgt (np.array): Target setpoint. | |
| x_curr (np.array): Current measured value. | |
| verbose (bool): Flag to enable detailed logging. | |
| Returns: | |
| None | |
| """ | |
| if x_trgt is not None: | |
| self.x_trgt = np.array(x_trgt) | |
| if t_curr is not None: | |
| self.t_curr = t_curr | |
| if x_curr is not None: | |
| self.x_curr = np.array(x_curr) | |
| # PID controller updates here | |
| self.dt = max(self.t_curr - self.t_prev,self.dt_min) | |
| self.err_curr = self.x_trgt - self.x_curr | |
| self.err_intg = self.err_intg + (self.err_curr*self.dt) | |
| self.err_diff = self.err_curr - self.err_prev | |
| if self.ANTIWU: # anti-windup | |
| self.err_out = self.err_curr * self.out_val | |
| self.err_intg[self.err_out<0.0] = 0.0 | |
| if self.dt > self.dt_min: | |
| self.p_term = self.k_p * self.err_curr | |
| self.i_term = self.k_i * self.err_intg | |
| self.d_term = self.k_d * self.err_diff / self.dt | |
| self.out_val = np.clip( | |
| a = self.p_term + self.i_term + self.d_term, | |
| a_min = self.out_min, | |
| a_max = self.out_max) | |
| # Smooth the output control value using EMA | |
| self.out_val = self.out_alpha*self.out_val_prev + \ | |
| (1.0-self.out_alpha)*self.out_val | |
| self.out_val_prev = self.out_val | |
| if verbose: | |
| print ("[%s] cnt:[%d] t_curr:[%.5f] dt:[%.5f]"% | |
| (self.name,self.cnt,self.t_curr,self.dt)) | |
| print (" x_trgt: %s"%(self.x_trgt)) | |
| print (" x_curr: %s"%(self.x_curr)) | |
| print (" err_curr: %s"%(self.err_curr)) | |
| print (" err_intg: %s"%(self.err_intg)) | |
| print (" p_term: %s"%(self.p_term)) | |
| print (" i_term: %s"%(self.i_term)) | |
| print (" d_term: %s"%(self.d_term)) | |
| print (" out_val: %s"%(self.out_val)) | |
| print (" err_out: %s"%(self.err_out)) | |
| # Backup | |
| self.t_prev = self.t_curr | |
| self.err_prev = self.err_curr | |
| # Counter | |
| if (t_curr is not None) and (x_curr is not None): | |
| self.cnt = self.cnt + 1 | |
| def out(self): | |
| """ | |
| Retrieve current PID control output. | |
| Returns: | |
| np.array: Current PID controller output. | |
| """ | |
| return self.out_val.copy() | |
| class RecentBufferClass: | |
| def __init__(self, dim, maxlen): | |
| """ | |
| Initializes a buffer to store the most recent fixed-length data points. | |
| Args: | |
| dim (int): The dimension of each data entry. | |
| maxlen (int): Maximum number of recent data entries to retain. | |
| """ | |
| self.dim = dim | |
| self.maxlen = maxlen | |
| self.buffer = deque(maxlen=maxlen) | |
| def append(self, data): | |
| """ | |
| Appends a new data entry to the buffer. | |
| Args: | |
| data (array-like): New data entry of dimension `dim`. | |
| Raises: | |
| ValueError: If input data dimension does not match the specified `dim`. | |
| """ | |
| if len(data) != self.dim: | |
| raise ValueError(f"Input data dimension must be {self.dim}") | |
| self.buffer.append(np.array(data)) | |
| def data(self): | |
| """ | |
| Retrieves all stored data entries as a numpy array. | |
| Returns: | |
| numpy.ndarray: Array of shape `(current_buffer_length, dim)`. | |
| """ | |
| return np.array(self.buffer) | |
| def is_full(self): | |
| """ | |
| Checks whether the buffer has reached its maximum capacity. | |
| Returns: | |
| bool: True if buffer is full, False otherwise. | |
| """ | |
| return len(self.buffer) == self.maxlen | |
| def clear(self): | |
| """ | |
| Clears all data from the buffer. | |
| """ | |
| self.buffer.clear() | |
| class VelocityTrackerClass: | |
| def __init__( | |
| self, | |
| kp_v=0.1, | |
| kp_w=0.1, | |
| v_clip=(-2.0, 2.0), | |
| w_clip=(-np.pi, np.pi), | |
| delta_clip=0.05, | |
| wheel_radius=0.143, | |
| wheel_distance=0.556, | |
| ): | |
| self.kp_v = kp_v | |
| self.kp_w = kp_w | |
| self.v_min, self.v_max = v_clip | |
| self.w_min, self.w_max = w_clip | |
| self.delta_clip = delta_clip | |
| self.r = wheel_radius | |
| self.a = wheel_distance | |
| self.v_trgt = 0.0 | |
| self.w_trgt = 0.0 | |
| def reset(self): | |
| self.v_trgt = 0.0 | |
| self.w_trgt = 0.0 | |
| def update(self, v_cmd, w_cmd, qvel_base, R_base): | |
| v_curr_raw = qvel_base[:2] | |
| v_curr_mag = np.linalg.norm(v_curr_raw) | |
| forward_direction = R_base[:2, 0] | |
| sign_dir = np.sign(np.dot(v_curr_raw, forward_direction)) | |
| v_curr = sign_dir * v_curr_mag | |
| w_curr = qvel_base[5] | |
| v_err = v_cmd - v_curr | |
| w_err = w_cmd - w_curr | |
| self.v_trgt += np.clip(self.kp_v * v_err, -self.delta_clip, self.delta_clip) | |
| self.w_trgt += np.clip(self.kp_w * w_err, -self.delta_clip, self.delta_clip) | |
| self.v_trgt = np.clip(self.v_trgt, self.v_min, self.v_max) | |
| self.w_trgt = np.clip(self.w_trgt, self.w_min, self.w_max) | |
| if np.abs(v_cmd) < 1e-1: | |
| self.v_trgt = 0.0 | |
| if np.abs(w_cmd) < 1e-1: | |
| self.w_trgt = 0.0 | |
| wheel_velocities = self._compute_wheel_velocities() | |
| info = { | |
| 'v_trgt': self.v_trgt, | |
| 'w_trgt': self.w_trgt, | |
| 'v_curr': v_curr, | |
| 'w_curr': w_curr, | |
| } | |
| return wheel_velocities,info | |
| def _compute_wheel_velocities(self): | |
| rrv = (self.v_trgt + (self.a / 2.0) * self.w_trgt) / self.r | |
| rlv = (self.v_trgt - (self.a / 2.0) * self.w_trgt) / self.r | |
| frv = rrv | |
| flv = rlv | |
| return {'rrv': rrv, 'rlv': rlv, 'frv': frv, 'flv': flv} | |
| def rand(): | |
| """ | |
| Generate a random float between 0 and 1. | |
| Returns: | |
| float: Random float in the range [0, 1). | |
| """ | |
| return np.random.rand() | |
| def damped_pinv_svd(J, damping=1e-3): | |
| """ | |
| Compute damped pseudo-inverse using SVD (more stable for all J shapes) | |
| """ | |
| U, S, Vt = np.linalg.svd(J, full_matrices=False) | |
| S_damped = S / (S**2 + damping**2) | |
| return Vt.T @ np.diag(S_damped) @ U.T | |
| def get_arc(p_center, axis, uv_front, r, theta_min, theta_max, L): | |
| """ | |
| Generates a set of 3D points forming an arc on a plane | |
| Parameters: | |
| p_center (np.ndarray): A (3,) array representing the center of the arc. | |
| axis (np.ndarray): A (3,) unit vector representing the axis of rotation | |
| uv_front (np.ndarray): A (3,) unit vector representing the forward direction | |
| r (float): Radius of the arc. | |
| theta_min (float): Start angle of the arc in radians, relative to `uv_front` direction. | |
| theta_max (float): End angle of the arc in radians, relative to `uv_front` direction. | |
| L (int): Number of points to sample along the arc. | |
| Returns: | |
| np.ndarray: An (L, 3) array of 3D points along the arc. | |
| """ | |
| axis = axis / np.linalg.norm(axis) | |
| uv_front = uv_front / np.linalg.norm(uv_front) | |
| # Construct an orthonormal basis on the rotation plane | |
| uv_right = np.cross(axis, uv_front) | |
| uv_right /= np.linalg.norm(uv_right) | |
| uv_front = np.cross(uv_right, axis) # Ensure exact orthogonality | |
| # Create angles | |
| thetas = np.linspace(theta_min, theta_max, L).reshape(-1, 1) # (L, 1) | |
| # Compute direction vectors | |
| directions = np.cos(thetas) * uv_front + np.sin(thetas) * uv_right # (L, 3) | |
| # Compute arc points | |
| points = p_center + r * directions # (L, 3) | |
| return points | |
| class TimerClass(object): | |
| """ | |
| TimerClass is a simple timer class to control the execution time of a loop. | |
| It can be used to run a loop at a specific frequency (Hz) and can also handle maximum execution time. | |
| It is useful for controlling the execution of tasks in a simulation or real-time system. | |
| Example usage: | |
| tmr_pub = TimerClass(name='Person publisher (20Hz)', Hz=20, max_sec=np.inf, verbose=True) | |
| tmr_pub.start() | |
| while tmr_pub.is_notfinished(): # loop | |
| if tmr_pub.do_run(): # publish persons to state machines (20Hz) | |
| A.publishPersons() | |
| tmr_pub.end() | |
| """ | |
| def __init__(self,name='timer',Hz=10,max_sec=np.inf,verbose=True): | |
| """ | |
| Initialize the TimerClass with a name, frequency (Hz), maximum execution time, and verbosity. | |
| Parameters: | |
| name (str): Name of the timer instance. | |
| Hz (int): Frequency in Hertz (Hz) at which the timer should run. | |
| max_sec (float): Maximum execution time in seconds. If exceeded, the timer will stop. | |
| verbose (bool): If True, print initialization messages. | |
| """ | |
| self.name = name | |
| self.Hz = Hz | |
| self.max_sec = max_sec | |
| self.verbose = verbose | |
| self.sec_next = 0.0 | |
| self.sec_period = 1.0 / self.Hz | |
| self.sec_elps = 0.0 | |
| self.sec_elps_prev = 0.0 | |
| self.sec_elps_diff = 0.0 | |
| self.sec_elps_loop = 0.0 # exact esec | |
| self.tick = 0 | |
| self.force_finish = False | |
| self.delayed_flag = False | |
| if self.verbose: | |
| print ("[%s] INITIALIZED WITH [%d]HZ. MAX_SEC:[%.1fsec]." | |
| % (self.name, self.Hz, self.max_sec)) | |
| def start(self): | |
| """ | |
| Start the timer by initializing the start time and resetting all time-related variables. | |
| Parameters: | |
| None | |
| """ | |
| self.time_start = datetime.now() | |
| self.sec_next = 0.0 | |
| self.sec_elps = 0.0 | |
| self.sec_elps_prev = 0.0 | |
| self.sec_elps_diff = 0.0 | |
| self.tick = 0 | |
| if self.verbose: | |
| print ("[%s] START ([%d]HZ. MAX_SEC:[%.1fsec])." | |
| % (self.name, self.Hz, self.max_sec)) | |
| def finish(self): | |
| """ | |
| Force finish the timer by setting the force_finish flag to True. | |
| This will cause the timer to consider itself finished immediately, regardless of elapsed time. | |
| Parameters: | |
| None | |
| """ | |
| self.force_finish = True | |
| def is_finished(self): | |
| """ | |
| Check if the timer has finished based on elapsed time or if force_finish is set. | |
| Parameters: | |
| None | |
| Returns: | |
| bool: True if the timer is finished (either by elapsed time exceeding max_sec or force_finish is True), False otherwise. | |
| """ | |
| self.time_diff = datetime.now() - self.time_start | |
| self.sec_elps = self.time_diff.total_seconds() | |
| if self.force_finish: | |
| return True | |
| if self.sec_elps > self.max_sec: | |
| return True | |
| else: | |
| return False | |
| def is_notfinished(self): | |
| """ | |
| Check if the timer is still running, i.e., not finished. | |
| This method checks if the elapsed time is less than the maximum allowed time and if force_finish is not set. | |
| Parameters: | |
| None | |
| Returns: | |
| bool: True if the timer is still running (not finished), False if it has finished. | |
| """ | |
| self.time_diff = datetime.now() - self.time_start | |
| self.sec_elps = self.time_diff.total_seconds() | |
| if self.force_finish: | |
| return False | |
| if self.sec_elps > self.max_sec: | |
| return False | |
| else: | |
| return True | |
| def do_run(self): | |
| """ | |
| Check if the timer should run based on the specified frequency (Hz). | |
| This method calculates the elapsed time since the last tick and determines if enough time has passed to execute the next tick. | |
| Parameters: | |
| None | |
| Returns: | |
| bool: True if the timer should run (i.e., enough time has passed since the last tick), False otherwise. | |
| """ | |
| time.sleep(1e-6) # avoid busy waiting | |
| self.time_diff = datetime.now() - self.time_start | |
| self.sec_elps = self.time_diff.total_seconds() | |
| if self.sec_elps > self.sec_next: | |
| self.sec_next = self.sec_next + self.sec_period | |
| self.tick = self.tick + 1 | |
| """ | |
| Compute elapsed time since the last tick. | |
| """ | |
| self.sec_elps_diff = self.sec_elps - self.sec_elps_prev | |
| self.sec_elps_prev = self.sec_elps | |
| """ | |
| Check if the elapsed time since the last tick is significantly larger than the expected period. | |
| """ | |
| if (self.sec_elps_diff > self.sec_period*1.5) and (self.Hz != 0): | |
| if self.verbose: | |
| # print ("sec_elps_diff:[%.1fms]" % (self.sec_elps_diff*1000.0)) | |
| print ("[%s][%d][%.1fs] delayed! T:[%.1fms] But it took [%.1fms]. Acutally [%.1fms]"% | |
| (self.name,self.tick, self.sec_elps, self.sec_period*1000.0, | |
| self.sec_elps_diff*1000.0,self.sec_elps_loop*1000.0)) | |
| self.delayed_flag = True | |
| # Set the start time for the next run | |
| self.time_run_start = datetime.now() | |
| return True | |
| else: | |
| self.delayed_flag = False | |
| return False | |
| def end(self): | |
| """ | |
| End the timer by calculating the elapsed time since the last tick and checking for delays. | |
| This method updates the elapsed time since the last tick and checks if it exceeds the expected period. | |
| """ | |
| time_diff = datetime.now() - self.time_run_start | |
| self.sec_elps_loop = time_diff.total_seconds() | |
| if (self.sec_elps_loop > self.sec_period) and (self.Hz != 0): | |
| # Print all the time | |
| print ("[%s] is REALLY delayed! T:[%.1fms] BUT IT TOOK [%.1fms]"% | |
| (self.name,self.sec_period*1000.0, self.sec_elps_loop*1000.0)) | |
| class RangeOfMotionTracker: | |
| """ | |
| Tracks the range of motion (ROM) from arbitrary-dimensional motion data. | |
| This class accumulates data over time and computes the element-wise minimum, | |
| maximum, and range (max - min), which corresponds to the range of motion. | |
| Usage: | |
| rom = RangeOfMotionTracker() | |
| rom.update(data) # Can be 1D, 2D, 3D, etc. | |
| result = rom.get() | |
| print(result['min'], result['max'], result['range']) | |
| # For multiple updates | |
| rom.update(data1) | |
| rom.update(data2) # Accumulative update | |
| result = rom.get() | |
| Attributes: | |
| min_values (np.ndarray): Minimum observed values | |
| max_values (np.ndarray): Maximum observed values | |
| fitted (bool): Whether the tracker has been initialized | |
| shape (tuple): Shape of the input data | |
| """ | |
| def __init__(self): | |
| """Initialize the RangeOfMotionTracker.""" | |
| self.fitted = False | |
| self.min_values = None | |
| self.max_values = None | |
| self.shape = None | |
| def reset(self) -> None: | |
| """Reset the tracker to its initial, unfitted state.""" | |
| self.fitted = False | |
| self.min_values = None | |
| self.max_values = None | |
| self.shape = None | |
| def update(self, data: np.ndarray) -> None: | |
| """ | |
| Update the tracker with new motion data. | |
| Args: | |
| data (np.ndarray): Input data of any shape (1D, 2D, 3D, etc.) | |
| Raises: | |
| ValueError: If the input data has a different shape from previous inputs. | |
| """ | |
| if not isinstance(data, np.ndarray): | |
| data = np.array(data) | |
| if not self.fitted: | |
| # Initialize with first data | |
| self.min_values = data.copy() | |
| self.max_values = data.copy() | |
| self.shape = data.shape | |
| self.fitted = True | |
| else: | |
| if data.shape != self.shape: | |
| raise ValueError(f"Data shape {data.shape} is inconsistent with the expected shape {self.shape}") | |
| self.min_values = np.minimum(self.min_values, data) | |
| self.max_values = np.maximum(self.max_values, data) | |
| def get(self) -> dict: | |
| """ | |
| Retrieve the current range of motion data. | |
| Returns: | |
| dict: Dictionary containing: | |
| - 'min': Minimum values | |
| - 'max': Maximum values | |
| - 'range': Range of motion (max - min) | |
| Raises: | |
| ValueError: If the tracker has not received any data yet. | |
| """ | |
| if not self.fitted: | |
| return None | |
| # raise ValueError("RangeOfMotionTracker has not been fitted yet. Call update() first.") | |
| # Return | |
| ret = { | |
| 'min': self.min_values.copy(), | |
| 'max': self.max_values.copy(), | |
| 'center': (self.min_values+self.max_values)/2.0, | |
| 'range': self.max_values-self.min_values, | |
| } | |
| return ret | |
| def get_corners(self) -> np.ndarray: | |
| """ | |
| Return the 8 corner vertices (8x3) of the axis-aligned box defined by min/max. | |
| Works only for 3D data (size==3). | |
| Raises: | |
| ValueError: If the tracker is not fitted or data is not 3D. | |
| """ | |
| if not self.fitted: | |
| raise ValueError("Call update() before get_corners().") | |
| vmin = np.asarray(self.min_values).reshape(-1) | |
| vmax = np.asarray(self.max_values).reshape(-1) | |
| if vmin.size != 3: | |
| raise ValueError(f"get_corners expects 3D data (size==3), got size {vmin.size}.") | |
| center = (vmin + vmax) / 2.0 | |
| half = (vmax - vmin) / 2.0 | |
| return center + half * np.array(np.meshgrid([-1, 1], [-1, 1], [-1, 1])).T.reshape(-1, 3) # (8 x 3) | |
| def estimate_scalar_scale_vector_offset(X, Y): | |
| """ | |
| Estimate a single scalar scale and a 3D vector offset to best align two sets of 3D points. | |
| This function fits the model: Y ≈ s * X + b, where: | |
| - s (float): scalar scale factor (applied equally to all dimensions) | |
| - b (np.ndarray of shape (3,)): vector offset added to each dimension | |
| Parameters: | |
| X (np.ndarray): Source point set, shape (N, 3) | |
| Y (np.ndarray): Target point set, shape (N, 3) | |
| Returns: | |
| s (float): Scalar scale factor | |
| b (np.ndarray): Offset vector of shape (3,) | |
| Example: | |
| >>> import numpy as np | |
| >>> X = np.array([[0, 0, 0], [1, 1, 1], [2, 2, 2]]) | |
| >>> Y = np.array([[1, 2, 3], [3, 4, 5], [5, 6, 7]]) | |
| >>> s, b = estimate_scalar_scale_vector_offset(X, Y) | |
| >>> print(f"Scale: {s:.2f}, Offset: {b}") | |
| Scale: 2.00, Offset: [1. 2. 3.] | |
| >>> # Apply the transformation | |
| >>> Y_pred = s * X + b | |
| >>> print("Transformed X:") | |
| >>> print(Y_pred) | |
| [[1. 2. 3.] | |
| [3. 4. 5.] | |
| [5. 6. 7.]] | |
| """ | |
| assert X.shape == Y.shape | |
| N = X.shape[0] | |
| # Step 1: solve for scalar scale using flattened data | |
| A = np.vstack([X.flatten(), np.ones(N * 3)]).T # shape (3N, 2) | |
| y = Y.flatten() | |
| s, _ = np.linalg.lstsq(A, y, rcond=None)[0] | |
| # Step 2: compute offset as b = mean(Y - s * X) per dimension | |
| b = np.mean(Y - s * X, axis=0) | |
| return s, b | |
| def get_ip_address(): | |
| """ | |
| Get the local IP address of the machine. | |
| """ | |
| result = subprocess.run(['ipconfig','getifaddr','en0'],capture_output=True,text=True) | |
| if result.returncode == 0: | |
| return result.stdout.strip() | |
| else: | |
| return None | |
| class RealTimePlotWidget(pg.PlotWidget): | |
| """ | |
| Real-time time series plot widget using PyQtGraph. | |
| Supports curve style customization, interactive legend, and runtime style updates. | |
| """ | |
| def __init__( | |
| self, | |
| labels, | |
| max_points = 1000, | |
| title = 'Real-time Plot', | |
| colors = None, | |
| styles = None, | |
| parent = None, | |
| x_offset = 0, | |
| y_offset = 0, | |
| window_width = 500, | |
| window_height = 300, | |
| x_label = 'time (sec)', | |
| legend_offset = (10,10), | |
| symbol_size = 6, | |
| pen_width = 2, | |
| axis_font_size = 10, | |
| ): | |
| """ | |
| Initialize the real-time plot widget. | |
| Parameters: | |
| labels (list): List of labels for each curve. | |
| colors (list): List of colors for each curve. Defaults to white if None. | |
| styles (list): List of styles for each curve (e.g., 'o-', '-', 'o'). Defaults to 'o-' for all curves if None. | |
| parent (QWidget): Parent widget for the plot. | |
| x_offset (int): X offset for the plot window. | |
| y_offset (int): Y offset for the plot window. | |
| window_width (int): Width of the plot window. | |
| window_height (int): Height of the plot window. | |
| x_label (str): Label for the x-axis. | |
| legend_offset (tuple): Offset for the legend position (x, y). | |
| """ | |
| super().__init__(parent) | |
| self.labels = labels | |
| self.n = len(labels) | |
| # Set default styles | |
| self.symbol_size = symbol_size | |
| self.pen_width = pen_width | |
| self.axis_font_size = axis_font_size | |
| # Set size and title | |
| self.setGeometry(x_offset, y_offset, window_width, window_height) | |
| self.setWindowTitle(title) | |
| # Set up the plot item | |
| self.max_points = max_points | |
| if self.max_points is np.inf: | |
| self.data = [np.array([]) for _ in range(self.n)] | |
| self.times = np.array([]) | |
| else: | |
| self.times = np.zeros(self.max_points) | |
| self.data = np.zeros((self.n, self.max_points)) | |
| self.ptr = 0 | |
| self.full = False | |
| # Start time for the plot | |
| self._start = time.time() | |
| self.plotItem.setLabel('bottom', x_label) | |
| self.plotItem.showGrid(x=True, y=True) | |
| self.enable_autoscale(True) | |
| # Enable auto-range automatically | |
| self.auto_range_enabled = True | |
| # Set axis styles | |
| axis_style = {'tickFont': QFont("Arial", axis_font_size)} | |
| self.plotItem.getAxis('bottom').setStyle(**axis_style) | |
| self.plotItem.getAxis('left').setStyle(**axis_style) | |
| # Set legend | |
| self.legend = pg.LegendItem(offset=legend_offset) # 명시적으로 legend 만들기 | |
| self.legend.setParentItem(self.plotItem) # plotItem에 붙이기 | |
| # Set curve styles | |
| if colors is None: | |
| colors = ['w'] * self.n | |
| if styles is None: | |
| styles = ['o-'] * self.n | |
| self.curves = [] | |
| for i, lbl in enumerate(labels): | |
| self._add_curve(i, lbl, colors[i], styles[i]) | |
| # Show | |
| self.show() | |
| def _parse_style(self, style_str, color): | |
| """ | |
| Parse the style string and return the appropriate style kwargs for PyQtGraph. | |
| Parameters: | |
| style_str (str): Style string (e.g., 'o-', '-', 'o'). | |
| color (str or QColor): Color for the curve. | |
| """ | |
| # Ensure color is a QColor object | |
| if isinstance(color, (tuple, list)) and all(isinstance(c, (float, np.floating)) for c in color): | |
| if max(color) <= 1.0: | |
| color = tuple(int(255 * float(c)) for c in color[:4]) # RGBA 0-255 scale | |
| color = pg.mkColor(color) | |
| # Parse style string | |
| style_kwargs = {} | |
| if 'o' in style_str: | |
| style_kwargs['symbol'] = 'o' | |
| style_kwargs['symbolSize'] = self.symbol_size | |
| style_kwargs['symbolPen'] = color | |
| style_kwargs['symbolBrush'] = color | |
| if '-' in style_str: | |
| style_kwargs['pen'] = pg.mkPen(color=color, width=self.pen_width) | |
| else: | |
| style_kwargs['pen'] = None | |
| return style_kwargs | |
| def _add_curve(self, i, label, color, style_str): | |
| """ | |
| Add a new curve to the plot with the specified label, color, and style. | |
| Parameters: | |
| i (int): Index of the curve. | |
| label (str): Label for the curve. | |
| color (str or QColor): Color for the curve. | |
| style_str (str): Style string for the curve (e.g., 'o-', '-', 'o'). | |
| """ | |
| style_kwargs = self._parse_style(style_str, color) | |
| curve = self.plotItem.plot(name=label, **style_kwargs) | |
| self.curves.append(curve) | |
| # Manually add legend item | |
| self.legend.addItem(curve, label) | |
| # legend | |
| legend_label = self.legend.items[-1][1] | |
| legend_label.setFlag(legend_label.ItemIsSelectable) | |
| legend_label.setAcceptHoverEvents(True) | |
| legend_label.curve_index = i | |
| def enable_autoscale(self, flag=True): | |
| """ | |
| Enable or disable autoscaling for the plot. | |
| Parameters: | |
| flag (bool): If True, enable autoscaling. If False, disable it. | |
| """ | |
| self.plotItem.setAutoVisible(flag) | |
| if flag: | |
| self.plotItem.autoRange() | |
| def auto_range(self): | |
| """ | |
| Manually trigger autoscaling of the plot. | |
| """ | |
| self.plotItem.autoRange() | |
| def toggle_auto_range(self): | |
| """ | |
| Toggle the auto-range feature on or off. | |
| """ | |
| if self.auto_range_enabled: | |
| self.auto_range_enabled = False | |
| else: | |
| self.auto_range_enabled = True | |
| def update(self, x, timestamp=None, update_curves=True): | |
| """ | |
| Update the plot with new data points. | |
| Parameters: | |
| x (array-like): New data points to add to the plot. | |
| timestamp (float, optional): Timestamp for the new data points. | |
| """ | |
| if self.max_points is np.inf: | |
| t = timestamp if timestamp is not None else (time.time() - self._start) | |
| self.times = np.append(self.times, t) | |
| for i, val in enumerate(x): | |
| self.data[i] = np.append(self.data[i], val) | |
| # Update the curve with new data | |
| if update_curves: | |
| self.curves[i].setData(self.times, self.data[i]) | |
| else: | |
| t = timestamp if timestamp is not None else (time.time() - self._start) | |
| self.times[self.ptr] = t | |
| for i, val in enumerate(x): | |
| self.data[i, self.ptr] = val | |
| self.ptr += 1 | |
| if self.ptr >= self.max_points: | |
| self.ptr = 0 | |
| self.full = True | |
| # Update the curves with new data | |
| if update_curves: | |
| if self.full: | |
| t_disp = np.roll(self.times, -self.ptr) | |
| for i in range(self.n): | |
| y_disp = np.roll(self.data[i], -self.ptr) | |
| self.curves[i].setData(t_disp, y_disp) | |
| else: | |
| t_disp = self.times[:self.ptr] | |
| for i in range(self.n): | |
| self.curves[i].setData(t_disp, self.data[i, :self.ptr]) | |
| # Auto-range the plot after updating | |
| if self.auto_range_enabled: | |
| self.auto_range() | |
| def set_style(self, index, style_str): | |
| """ | |
| Change the curve style at runtime (e.g. '-', 'o', 'o-'). | |
| Parameters: | |
| index (int): Index of the curve to change. | |
| style_str (str): New style string for the curve (e.g., 'o-', '-', 'o'). | |
| """ | |
| if index < 0 or index >= len(self.curves): | |
| raise IndexError("Curve index out of range") | |
| old_curve = self.curves[index] | |
| self.plotItem.removeItem(old_curve) | |
| # Get color from previous curve | |
| old_pen = old_curve.opts.get('pen', None) | |
| color = old_pen.color() if old_pen is not None else 'w' | |
| # Add new curve with same label and data | |
| style_kwargs = self._parse_style(style_str, color) | |
| new_curve = self.plotItem.plot(**style_kwargs) | |
| new_curve.setData(self.times, self.data[index]) | |
| self.curves[index] = new_curve | |
| # Re-add legend item with proper style | |
| self.legend.addItem(new_curve, self.labels[index]) | |
| # Reconnect legend interactivity | |
| legend_label = self.legend.items[-1][1] | |
| legend_label.setFlag(legend_label.ItemIsSelectable) | |
| legend_label.setAcceptHoverEvents(True) | |
| legend_label.curve_index = index | |
| class RealTimePlotWidgetWithControls(QWidget): | |
| """ | |
| Real-time plot widget with controls for enabling/disabling autoscaling. | |
| """ | |
| def __init__( | |
| self, | |
| labels, | |
| max_points = 1000, | |
| title = 'Real-time Plot', | |
| colors = None, | |
| styles = None, | |
| parent = None, | |
| x_offset = 0, | |
| y_offset = 0, | |
| window_width = 500, | |
| window_height = 300, | |
| x_label = 'time (sec)', | |
| legend_offset = (10,10), | |
| symbol_size = 6, | |
| pen_width = 2, | |
| axis_font_size = 10, | |
| ): | |
| """ | |
| Initialize the real-time plot widget with controls. | |
| """ | |
| super().__init__() | |
| # Create RealTimePlotWidget instance | |
| self.plot_widget = RealTimePlotWidget( | |
| labels = labels, | |
| max_points = max_points, | |
| title = title, | |
| colors = colors, | |
| styles = styles, | |
| parent = parent, | |
| x_offset = x_offset, | |
| y_offset = y_offset, | |
| window_width = window_width, | |
| window_height = window_height, | |
| x_label = x_label, | |
| legend_offset = legend_offset, | |
| symbol_size = symbol_size, | |
| pen_width = pen_width, | |
| axis_font_size = axis_font_size, | |
| ) | |
| # Autoscale toggle checkbox | |
| self.autoscale_checkbox = QCheckBox("Enable Autoscale") | |
| self.autoscale_checkbox.setChecked(True) | |
| self.autoscale_checkbox.stateChanged.connect(self.toggle_autoscale) | |
| # Frequency label | |
| self.freq_label = QLabel("Freq: N/A") | |
| self.dt_label = QLabel("dt: N/A") | |
| self.last_time = None | |
| self.dt_history = [] | |
| self.history_len = 10 | |
| # Layout setup | |
| layout = QVBoxLayout() | |
| layout.addWidget(self.plot_widget) | |
| layout.addWidget(self.autoscale_checkbox) | |
| layout.addWidget(self.freq_label) | |
| layout.addWidget(self.dt_label) | |
| self.setLayout(layout) | |
| self.setGeometry(x_offset, y_offset, window_width, window_height + 40) | |
| self.setWindowTitle(title) | |
| self.show() | |
| def toggle_autoscale(self): | |
| """ | |
| Toggle the autoscale feature based on the checkbox state. | |
| """ | |
| self.plot_widget.toggle_auto_range() | |
| def update(self, *args, **kwargs): | |
| """ | |
| Update the plot with new data points. | |
| """ | |
| # Compute frequency if timestamps are provided | |
| now = time.time() | |
| if self.last_time is not None: | |
| self.dt = now - self.last_time | |
| if self.dt > 0: | |
| self.freq = 1.0 / self.dt | |
| # Append and trim history | |
| self.dt_history.append(self.dt) | |
| if len(self.dt_history) > self.history_len: | |
| self.dt_history.pop(0) | |
| # Compute the moving averages | |
| self.dt_avg = sum(self.dt_history) / len(self.dt_history) | |
| self.freq_avg = 1/self.dt_avg if self.dt_avg > 0 else 0 | |
| # Update labels | |
| self.freq_label.setText(f"Freq: [%.1f]Hz (%.1f) "%(self.freq,self.freq_avg)) | |
| self.dt_label.setText(f"dt: [%.1f]ms (%.1f) "%(1000*self.dt,1000*self.dt_avg)) | |
| self.last_time = now | |
| # Update the plot widget with new data | |
| self.plot_widget.update(*args, **kwargs) | |
| def close(self): | |
| """ | |
| Close the plot widget and hide the main window. | |
| """ | |
| self.plot_widget.close() | |
| self.hide() | |
| def closeEvent(self, event): | |
| """ | |
| This method is called automatically when the window is closed. | |
| """ | |
| self.plot_widget.close() | |
| super().closeEvent(event) | |
| class VectorizedKalmanFilters: | |
| """ | |
| VectorizedKalmanFilters is a class that implements a vectorized Kalman filter | |
| """ | |
| def __init__(self, shape, dt=0.05, q=0.1, r=1.0): | |
| self.shape = shape | |
| self.N = np.prod(shape) | |
| self.dt = dt | |
| self.F = np.array([[1, dt], [0, 1]]) # (2,2) | |
| self.H = np.array([[1, 0]]) # (1,2) | |
| self.Q = np.array([[q, 0], [0, q]]) | |
| self.R = np.array([[r]]) | |
| # Broadcast to (N, 2, 2), (N, 2, 1) | |
| self.Fs = np.broadcast_to(self.F, (self.N, 2, 2)) | |
| self.Hs = np.broadcast_to(self.H, (self.N, 1, 2)) | |
| self.Qs = np.broadcast_to(self.Q, (self.N, 2, 2)) | |
| self.Rs = np.broadcast_to(self.R, (self.N, 1, 1)) | |
| self.x_hat = np.zeros((self.N, 2, 1)) # [pos, vel]^T | |
| self.P_hat = np.tile(self.Q[None, :, :], (self.N, 1, 1)) # cov | |
| self._initialized = False | |
| def update(self, obs, force=False): # obs: self.shape (e.g., (5,4,2,3)) | |
| """ | |
| Update with observation. | |
| If force=True (or first call before init), hard-set state to observation: | |
| pos = obs, vel = 0, P = Q. | |
| """ | |
| obs_flat_1d = obs.reshape(self.N) # (N,) | |
| # Hard reset / first-time init | |
| if force or not self._initialized: | |
| self.x_hat[:, 0, 0] = obs_flat_1d # pos = obs | |
| self.x_hat[:, 1, 0] = 0.0 # vel = 0 | |
| self.P_hat = self.Qs.copy() # reset covariance | |
| self._initialized = True | |
| return self.x_hat[:, 0, 0].reshape(self.shape) | |
| # ------- Standard predict-update ------- | |
| obs_flat = obs_flat_1d[:, None, None] # (N,1,1) | |
| # Predict | |
| x_hat_m = self.Fs @ self.x_hat | |
| P_hat_m = self.Fs @ self.P_hat @ self.Fs.transpose(0, 2, 1) + self.Qs | |
| # Kalman gain | |
| S = self.Hs @ P_hat_m @ self.Hs.transpose(0, 2, 1) + self.Rs # (N,1,1) | |
| K = P_hat_m @ self.Hs.transpose(0, 2, 1) @ np.linalg.inv(S) # (N,2,1) | |
| # Update | |
| y_diff = obs_flat - (self.Hs @ x_hat_m) # innovation | |
| self.x_hat = x_hat_m + K @ y_diff | |
| self.P_hat = P_hat_m - K @ self.Hs @ P_hat_m | |
| return self.x_hat[:, 0, 0].reshape(self.shape) | |
| def hex_to_rgba(hex_color: str, alpha: float = 1.0): | |
| """ | |
| Convert a hex color string to an RGBA color string. | |
| Parameters: | |
| hex_color (str): The hex color string (e.g., "#RRGGBB" or "#RRGGBBAA"). | |
| alpha (float): The alpha value for the color (0.0 to 1.0). | |
| Returns: | |
| tuple: A tuple containing the RGBA values (R, G, B, A) where each value is in the range [0, 255]. | |
| """ | |
| hex_color = hex_color.lstrip('#') | |
| # Extract RGB components | |
| r = int(hex_color[0:2], 16) / 255.0 | |
| g = int(hex_color[2:4], 16) / 255.0 | |
| b = int(hex_color[4:6], 16) / 255.0 | |
| return [r, g, b, alpha] | |
| class FrequencyTracker: | |
| """ | |
| Tracks event frequency (Hz). | |
| - get_Hz(both=False): instantaneous Hz | |
| - get_Hz(both=True): (instantaneous Hz, rolling-average Hz) | |
| """ | |
| def __init__(self, name='FrequencyTracker', window=120): | |
| """ | |
| Parameters | |
| ---------- | |
| name : str | |
| Tracker name. | |
| window : int | |
| Number of recent dt samples to average over. | |
| """ | |
| self.name = name | |
| self.last_time = time.perf_counter() | |
| self._dts = deque(maxlen=int(window)) | |
| def get_Hz(self, both: bool = False): | |
| """ | |
| Update timing and return Hz. | |
| Parameters | |
| ---------- | |
| both : bool | |
| If True, return (inst_hz, avg_hz); else return inst_hz only. | |
| """ | |
| now = time.perf_counter() | |
| dt = now - self.last_time | |
| if dt <= 0: | |
| dt = 1e-12 | |
| self.last_time = now | |
| self._dts.append(dt) | |
| inst_hz = 1.0 / dt | |
| if not both: | |
| return inst_hz | |
| total_dt = sum(self._dts) | |
| avg_hz = float('inf') if total_dt <= 0 else len(self._dts) / total_dt | |
| return inst_hz, avg_hz |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment