Last active
November 29, 2024 16:26
-
-
Save jwnimmer-tri/19479ebdd860d00566b7703e96fc630c to your computer and use it in GitHub Desktop.
item locking prototype; 2024-10-17; licensed under Drake's BSD-3-Clause
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import abc | |
| from pydrake.systems.framework import Context, EventStatus | |
| class HardwareStationMonitor(abc.ABC): | |
| """Base class for all station simuation components that interact with | |
| Drake's Simulator.set_monitor feature.""" | |
| @abc.abstractmethod | |
| def Monitor(self, root_context: Context) -> EventStatus: | |
| """Callback for Simulator.set_monitor that breaks out of the simulation | |
| when needed.""" | |
| @abc.abstractmethod | |
| def HandleExternalUpdates(self, root_context: Context) -> None: | |
| """Update the simulator's context as needed.""" | |
| class CompositeMonitor(HardwareStationMonitor): | |
| """A composite monitor that can contain multiple monitors of different | |
| types. Each monitor can be added at most once. """ | |
| def __init__(self): | |
| super().__init__() | |
| self._monitors: list[HardwareStationMonitor] = [] | |
| def Monitor(self, root_context: Context) -> EventStatus: | |
| for monitor in self._monitors: | |
| status = monitor.Monitor(root_context) | |
| if status.severity() != EventStatus.Severity.kDidNothing: | |
| return status | |
| return EventStatus.DidNothing() | |
| def HandleExternalUpdates(self, root_context: Context) -> None: | |
| for monitor in self._monitors: | |
| monitor.HandleExternalUpdates(root_context) | |
| def get_monitor_by_type(self, monitor_class): | |
| """Return the only monitor of the given type if one exists, otherwise | |
| return None.""" | |
| for monitor in self._monitors: | |
| if isinstance(monitor, monitor_class): | |
| return monitor | |
| return None | |
| def add_monitor(self, monitor: HardwareStationMonitor) -> None: | |
| # Throw an error if the monitor is already added | |
| if self.get_monitor_by_type(type(monitor)) is not None: | |
| raise ValueError(f"Monitor of type {type(monitor)} already added.") | |
| self._monitors.append(monitor) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import dataclasses | |
| @dataclasses.dataclass(kw_only=True) | |
| class ItemLockingMonitorConfig: | |
| # Simulation time at which item locking can first happen. Intended to allow | |
| # for settling of objects at the start of a simulation. | |
| t_start: float = 0.1 | |
| # Scoped or non-scoped names for the geometry that triggers (un)locking. | |
| # Typically, this would be geometry on the EE, e.g., the fingers. | |
| unlock_near_geometry: list[str] = dataclasses.field(default_factory=list) | |
| # Distance threshold to trigger (un)locking. Free bodies must be at least | |
| # this far away from the `unlock_near_geometry` to be eligible for locking. | |
| # Must be non-negative. | |
| distance_threshold: float = 0.05 | |
| # Translational speed threshold to trigger (un)locking. Free bodies must be | |
| # at least this slow translationally (in units of m/s) to be eligible for | |
| # locking. (Any rotational velocity is ignored.) Must be non-negative. | |
| speed_threshold: float = 0.002 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from collections import defaultdict | |
| import dataclasses | |
| import itertools | |
| import logging | |
| import numpy as np | |
| from pydrake.geometry import ( | |
| Box, | |
| Capsule, | |
| CollisionFilterDeclaration, | |
| ConvertVolumeToSurfaceMesh, | |
| Cylinder, | |
| Ellipsoid, | |
| GeometryId, | |
| GeometryInstance, | |
| GeometrySet, | |
| ProximityProperties, | |
| SceneGraph, | |
| SceneGraphInspector, | |
| Shape, | |
| Sphere, | |
| TriangleSurfaceMesh, | |
| VolumeMesh, | |
| ) | |
| from pydrake.math import RigidTransform | |
| from pydrake.multibody.plant import MultibodyPlant | |
| from pydrake.multibody.tree import ( | |
| BodyIndex, | |
| JointIndex, | |
| ModelInstanceIndex, | |
| RigidBody, | |
| ScopedName, | |
| ) | |
| from pydrake.systems.framework import Context, EventStatus | |
| from anzu.sim.common.hardware_station_monitor import ( | |
| CompositeMonitor, | |
| HardwareStationMonitor, | |
| ) | |
| from anzu.sim.common.item_locking_monitor_config import ( | |
| ItemLockingMonitorConfig, | |
| ) | |
| @dataclasses.dataclass(kw_only=True) | |
| class BoundingBox: | |
| """A bounding box representation as a 3d center point and 3d overall size | |
| (i.e., using diameters not radii). | |
| """ | |
| center: np.array | |
| size: np.array | |
| # TODO(dale.mcconachie) Replace this dispatch once | |
| # https://github.com/RobotLocomotion/drake/issues/20565 is addressed. | |
| def _GetShapeOabb(shape: Shape) -> BoundingBox: | |
| """Returns the bounding box in the geometry frame G.""" | |
| if isinstance(shape, Box): | |
| size = shape.size() | |
| elif isinstance(shape, Capsule): | |
| diameter = 2.0 * shape.radius() | |
| size = np.array([diameter, diameter, shape.length() + diameter]) | |
| elif isinstance(shape, Cylinder): | |
| diameter = 2.0 * shape.radius() | |
| size = np.array([diameter, diameter, shape.length()]) | |
| elif isinstance(shape, Ellipsoid): | |
| size = 2.0 * np.array([shape.a(), shape.b(), shape.c()]) | |
| elif isinstance(shape, Sphere): | |
| diameter = 2.0 * shape.radius() | |
| size = np.array([diameter, diameter, diameter]) | |
| else: | |
| raise NotImplementedError(f"Unimplemented shape {type(shape)}") | |
| return BoundingBox(center=np.zeros(3), size=size) | |
| def _GetMeshOabb(mesh: TriangleSurfaceMesh | VolumeMesh) -> BoundingBox: | |
| """Returns the bounding box in the geometry frame G.""" | |
| assert mesh is not None | |
| if isinstance(mesh, VolumeMesh): | |
| mesh = ConvertVolumeToSurfaceMesh(mesh) | |
| assert isinstance(mesh, TriangleSurfaceMesh), mesh | |
| center, size = mesh.CalcBoundingBox() | |
| return BoundingBox(center=center, size=size) | |
| def _OabbToAabb(oabb_G: BoundingBox, X_BG: RigidTransform) -> BoundingBox: | |
| """Converts bounding box from geometry frame G to body frame B.""" | |
| aabb_center_B = X_BG @ oabb_G.center | |
| # Rather than creating all points and looking for the element-wise maximum | |
| # of those points, we observe that the x coordinate of the rotated points | |
| # will follow the pattern +/- r11 * e_x +/- r12 * e_y +/- r13 * e_z . | |
| # Thus the maximum of this value will occur when all of the terms are | |
| # positive, which is guaranteed to happen given the +/- in the pattern | |
| # above. Thus we can directly skip to the final result by taking the | |
| # absolute value of the elements in the rotation matrix and multiplying | |
| # them with the size vector which is already known to be positive by | |
| # definition. The same holds for the y and z components. | |
| # See https://zeux.io/2010/10/17/aabb-from-obb-with-component-wise-abs/ for | |
| # a more detailed breakdown. | |
| aabb_size_B = np.abs(X_BG.rotation().matrix()) @ oabb_G.size | |
| return BoundingBox(center=aabb_center_B, size=aabb_size_B) | |
| def _MergeAabbs(aabb_list: list[BoundingBox]) -> BoundingBox: | |
| count = len(aabb_list) | |
| max_points = np.ndarray(shape=(3, count)) | |
| min_points = np.ndarray(shape=(3, count)) | |
| for i, bbox in enumerate(aabb_list): | |
| half_size = bbox.size * 0.5 | |
| max_points[:, i] = bbox.center + half_size | |
| min_points[:, i] = bbox.center - half_size | |
| max_point = np.max(max_points, axis=1) | |
| min_point = np.min(min_points, axis=1) | |
| center = 0.5 * (max_point + min_point) | |
| size = max_point - min_point | |
| return BoundingBox(center=center, size=size) | |
| def _CalcAabb(inspector: SceneGraphInspector, body: RigidBody): | |
| aabb_B_list = list() | |
| for geom_id in body.GetParentPlant().GetCollisionGeometriesForBody(body): | |
| # Frame G is the frame of the current geometry. | |
| maybe_mesh = inspector.maybe_get_hydroelastic_mesh(geom_id) | |
| if maybe_mesh is not None: | |
| oabb_G = _GetMeshOabb(maybe_mesh) | |
| else: | |
| oabb_G = _GetShapeOabb(inspector.GetShape(geom_id)) | |
| # Convert from the frame G of each individual geometry to the frame B | |
| # of the body so that they can be aggregated in the common body frame. | |
| X_BG = inspector.GetPoseInFrame(geom_id) | |
| aabb_B = _OabbToAabb(oabb_G, X_BG) | |
| aabb_B_list.append(aabb_B) | |
| return _MergeAabbs(aabb_B_list) | |
| def _get_contact_pairs_body_indices( | |
| plant: MultibodyPlant, context: Context | |
| ) -> list[tuple[BodyIndex, BodyIndex]]: | |
| contact_results = plant.get_contact_results_output_port().Eval(context) | |
| contact_pairs = [] | |
| # Point contact pairs. | |
| for i in range(contact_results.num_point_pair_contacts()): | |
| contact_result = contact_results.point_pair_contact_info(i) | |
| index_A = contact_result.bodyA_index() | |
| index_B = contact_result.bodyB_index() | |
| contact_pairs.append((index_A, index_B)) | |
| # Hydroelastic contact pairs. | |
| query_object = plant.get_geometry_query_input_port().Eval(context) | |
| inspector = query_object.inspector() | |
| for i in range(contact_results.num_hydroelastic_contacts()): | |
| contact_result = contact_results.hydroelastic_contact_info(i) | |
| geo_id_M = contact_result.contact_surface().id_M() | |
| geo_id_N = contact_result.contact_surface().id_N() | |
| frame_id_M = inspector.GetFrameId(geo_id_M) | |
| frame_id_N = inspector.GetFrameId(geo_id_N) | |
| body_M = plant.GetBodyFromFrameId(frame_id_M) | |
| body_N = plant.GetBodyFromFrameId(frame_id_N) | |
| contact_pairs.append((body_M.index(), body_N.index())) | |
| # TODO(dale.mcconachie) Add deformable contacts. | |
| return contact_pairs | |
| def get_contact_pairs_scoped_names( | |
| plant: MultibodyPlant, context: Context | |
| ) -> list[tuple[ScopedName, ScopedName]]: | |
| """Extracts the `ScopedName`s of all point contact and hydroelastic contact | |
| pairs from the contact results output port of the `plant`. | |
| """ | |
| contact_pairs_body_indices = _get_contact_pairs_body_indices( | |
| plant, context) | |
| contact_pairs_scoped_names = [ | |
| ( | |
| plant.get_body(index_A).scoped_name(), | |
| plant.get_body(index_B).scoped_name() | |
| ) for index_A, index_B in contact_pairs_body_indices | |
| ] | |
| return contact_pairs_scoped_names | |
| def _contact_pairs_as_dict( | |
| contact_pairs: list[tuple[BodyIndex, BodyIndex]] | |
| ) -> dict[BodyIndex: set[BodyIndex]]: | |
| contact_dict = defaultdict(set) | |
| for index_A, index_B in contact_pairs: | |
| contact_dict[index_A].add(index_B) | |
| contact_dict[index_B].add(index_A) | |
| return contact_dict | |
| # TODO(dale.mcconachie) Hoist this to Drake. | |
| def _GetScopedGeometryByName(plant: MultibodyPlant, | |
| full_body_name: str) -> list[GeometryId]: | |
| scoped_name = ScopedName.Parse(full_body_name) | |
| namespace = scoped_name.get_namespace() | |
| element = scoped_name.get_element() | |
| if len(namespace): | |
| instance = plant.GetModelInstanceByName(namespace) | |
| body = plant.GetBodyByName(element, instance) | |
| return plant.GetCollisionGeometriesForBody(body) | |
| else: | |
| body = plant.GetBodyByName(element) | |
| return plant.GetCollisionGeometriesForBody(body) | |
| @dataclasses.dataclass(kw_only=True) | |
| class _TargetDetail: | |
| """A helper class for ItemLockingMonitor that collects all relevant | |
| information about each model subject to locking. | |
| """ | |
| model_instance_index: ModelInstanceIndex | |
| model_instance_name: str | |
| body_index: BodyIndex | |
| joint_index: JointIndex | |
| bounding_box_id: GeometryId | None = None | |
| is_locked: bool = False | |
| want_locked: bool = False | |
| colliding_bodies_when_fell_asleep: set[BodyIndex] | None = None | |
| def _FindAllTargets(plant) -> list[_TargetDetail]: | |
| """Returns the "target" model instances the plant that are subject to | |
| locking (along with some helpful additional details about each model). | |
| A target is: | |
| - a model instance with only a single body, where | |
| - that body has no outboard joints, and | |
| - is mobilized by a single 6dof inboard joint, and | |
| - the joint's inboard parent body is anchored to the world. | |
| In the future, we might extend the definition (e.g., to model instances | |
| with more than one body, but still welded as an isolated subgraph). | |
| """ | |
| # Precompute all world-affixed bodies. | |
| world_body_indices = set([ | |
| body.index() | |
| for body in plant.GetBodiesWeldedTo(plant.GetBodyByName("world")) | |
| ]) | |
| # Find all model instances that contain exactly one body. | |
| candidates: dict[BodyIndex, _TargetDetail] = dict() | |
| for i in range(plant.num_model_instances()): | |
| i = ModelInstanceIndex(i) | |
| body_indices = plant.GetBodyIndices(i) | |
| if len(body_indices) != 1: | |
| continue | |
| body_index = body_indices[0] | |
| candidates[body_index] = _TargetDetail( | |
| model_instance_name=plant.GetModelInstanceName(i), | |
| model_instance_index=i, | |
| body_index=body_index, | |
| # We'll set the joint_index later on within this function. | |
| joint_index=None, | |
| ) | |
| # Precompute all inboard and outboard joints for those bodies. | |
| inboards: dict[BodyIndex, list[JointIndex]] = dict() | |
| outboards: dict[BodyIndex, list[JointIndex]] = dict() | |
| for j in plant.GetJointIndices(): | |
| joint = plant.get_joint(j) | |
| outboards.setdefault(joint.parent_body().index(), list()).append(j) | |
| inboards.setdefault(joint.child_body().index(), list()).append(j) | |
| # Find bodies that meet our criteria. | |
| matches = [] | |
| for body_index in candidates.keys(): | |
| # Skip bodies that are inboard of something else. | |
| if len(outboards.get(body_index, [])) > 0: | |
| continue | |
| # Skip bodies without a single 6dof inboard joint. | |
| if len(inboards.get(body_index, [])) != 1: | |
| continue | |
| joint_index = inboards[body_index][0] | |
| joint = plant.get_joint(joint_index) | |
| if joint.num_positions() < 6: | |
| continue | |
| # Skip bodies with a non-world-affixed parent. | |
| if joint.parent_body().index() not in world_body_indices: | |
| continue | |
| candidates[body_index].joint_index = joint_index | |
| matches.append(body_index) | |
| return [ | |
| candidates[body_index] | |
| for body_index in matches | |
| ] | |
| # TODO(dale.mcconachie) Convert this to a LeafSystem with inputs and outputs | |
| # once joints can be locked/unlocked from within the systems framework and | |
| # collision filters are similarly supported. | |
| # https://github.com/RobotLocomotion/drake/issues/20571 | |
| # TODO(jeremy.nimmer) We've outgrown the name "item", so should adopt something | |
| # more conventional. The typical term of art here is "sleeping". | |
| class ItemLockingMonitor(HardwareStationMonitor): | |
| """This class is a simulation monitor designed to lock manipulands when | |
| they are far away from the grippers, unlocking when the grippers approach | |
| that manipuland. Designed purely for simulation speed improvements rather | |
| than any physical process. | |
| """ | |
| def __init__(self, | |
| config: ItemLockingMonitorConfig, | |
| plant: MultibodyPlant, | |
| scene_graph: SceneGraph): | |
| super().__init__() | |
| assert plant.is_finalized() | |
| self._t_start = config.t_start | |
| self._distance_threshold = config.distance_threshold | |
| self._speed_threshold = config.speed_threshold | |
| self._unlock_near_ids: list[GeometryId] = list(itertools.chain(*[ | |
| _GetScopedGeometryByName(plant, name) | |
| for name in config.unlock_near_geometry | |
| ])) | |
| self._plant = plant | |
| self._scene_graph = scene_graph | |
| self._targets = _FindAllTargets(plant) | |
| self._geometry_filter_id = None | |
| # Add proximity bounding box helper geometry for each lockable target. | |
| inspector = scene_graph.model_inspector() | |
| for detail in self._targets: | |
| # Calculate the AABB of all the body's proximity geometry (in body | |
| # frame B). | |
| body = plant.get_body(detail.body_index) | |
| aabb_B = _CalcAabb(inspector, body) | |
| # Add the bounding box as a proximity shape; it must be a proximity | |
| # shape to be able to call ComputeSignedDistancePairClosestPoints. | |
| # (Registering a geometry directly on the scene graph instead of | |
| # laundering it through the plant would be a problem if the added | |
| # geometry were to collide with anything; however, we will be | |
| # filtering out all collisions involving these added geometries, | |
| # thus there will never be any collisions to be resolved.) | |
| detail.bounding_box_id = scene_graph.RegisterGeometry( | |
| source_id=plant.get_source_id(), | |
| frame_id=plant.GetBodyFrameIdIfExists(detail.body_index), | |
| geometry=GeometryInstance( | |
| X_PG=RigidTransform(aabb_B.center), | |
| shape=Box(aabb_B.size), | |
| name=body.name() + "_aabb_for_joint_locking", | |
| ), | |
| ) | |
| scene_graph.AssignRole( | |
| source_id=plant.get_source_id(), | |
| geometry_id=detail.bounding_box_id, | |
| properties=ProximityProperties(), | |
| ) | |
| # Filter out all collisions between our added bounding box geometries | |
| # and everything else (including themselves). | |
| filter_manager = scene_graph.collision_filter_manager() | |
| filter_manager.Apply(CollisionFilterDeclaration().ExcludeBetween( | |
| GeometrySet([detail.bounding_box_id for detail in self._targets]), | |
| GeometrySet(inspector.GetAllGeometryIds()))) | |
| def Monitor(self, root_context: Context) -> EventStatus: | |
| """Callback for Simulator.set_monitor that computes the desired joint | |
| locking status for monitored targets, and breaks out of the simulation | |
| in case any has changed. The desired status is stored as member data; | |
| users should call the SetItemLockStates() method to update the joint | |
| locking state. | |
| """ | |
| if root_context.get_time() < self._t_start: | |
| return EventStatus.DidNothing() | |
| plant_context = self._plant.GetMyContextFromRoot(root_context) | |
| query_object = self._plant.get_geometry_query_input_port().Eval( | |
| plant_context) | |
| # Default to locking everything, unlocking only if it is close to a | |
| # finger. This considers each target independently; we'll consider | |
| # any target-on-target interaction in a second pass, below. | |
| velocities_port = self._plant.get_body_spatial_velocities_output_port() | |
| velocities = velocities_port.Eval(plant_context) | |
| for detail in self._targets: | |
| detail.want_locked = True | |
| # Check the speed limit. | |
| # TODO(jeremy-nimmer) This only considers translation. There might | |
| # be cases where rotational speed should also prevent locking even | |
| # if the target is translationally stationary. | |
| spatial_velocity = velocities[detail.body_index] | |
| speed = np.linalg.norm(spatial_velocity.translational()) | |
| if not speed < self._speed_threshold: | |
| detail.want_locked = False | |
| continue | |
| # Check the distance limit. | |
| for unlock_near_id in self._unlock_near_ids: | |
| signed_distance_pair = ( | |
| query_object.ComputeSignedDistancePairClosestPoints( | |
| detail.bounding_box_id, unlock_near_id)) | |
| if signed_distance_pair.distance <= self._distance_threshold: | |
| detail.want_locked = False | |
| # We know at this point that this object should not be | |
| # locked, so we don't need to check it against the rest | |
| # of the gripper geometries. | |
| break | |
| # Targets that are (conservatively) in contact with each other must | |
| # either all lock at once, or none of them can lock at all. First, | |
| # we'll compute the (conservative) collision status for all target | |
| # pairs (excluding self-collisions). | |
| num_targets = len(self._targets) | |
| contacts = np.zeros(shape=(num_targets, num_targets)) | |
| for i in range(num_targets): | |
| target_i = self._targets[i] | |
| for j in range(i + 1, num_targets): | |
| target_j = self._targets[j] | |
| signed_distance_pair = ( | |
| query_object.ComputeSignedDistancePairClosestPoints( | |
| target_i.bounding_box_id, target_j.bounding_box_id)) | |
| contact = 1 if signed_distance_pair.distance <= 0 else 0 | |
| contacts[i, j] = contact | |
| contacts[j, i] = contact | |
| # Second, we'll propagate the "unlocked" attribute across collisions. | |
| unlocked_to_visit = set([ | |
| i for i, x in enumerate(self._targets) | |
| if not x.want_locked | |
| ]) | |
| unlocked_visited = set() | |
| while unlocked_to_visit: | |
| i = unlocked_to_visit.pop() | |
| unlocked_visited.add(i) | |
| target_i = self._targets[i] | |
| assert not target_i.want_locked | |
| for j in range(num_targets): | |
| if contacts[i, j]: | |
| target_j = self._targets[j] | |
| if target_j.want_locked: | |
| assert j not in unlocked_to_visit | |
| assert j not in unlocked_visited | |
| target_j.want_locked = False | |
| unlocked_to_visit.add(j) | |
| if self._needs_update(): | |
| return EventStatus.ReachedTermination(None, "Joint locking update") | |
| else: | |
| return EventStatus.DidNothing() | |
| def _needs_update(self) -> bool: | |
| """Returns True iff we need to stop the sim to change the locking.""" | |
| return any([x.is_locked != x.want_locked for x in self._targets]) | |
| def HandleExternalUpdates(self, root_context: Context) -> None: | |
| """Updates the context with the lock states computed by the most recent | |
| call to the Monitor() method. | |
| """ | |
| if not self._needs_update(): | |
| # It's extremely important not to change the context when locking | |
| # details are quiescent. Changes are non-trivially expensive. | |
| return | |
| # Grab the *prior* list of contact pairs, so that we can memorize it. | |
| plant_context = self._plant.GetMyContextFromRoot(root_context) | |
| old_contact_pairs = _contact_pairs_as_dict( | |
| _get_contact_pairs_body_indices(self._plant, plant_context)) | |
| # Update the MultibodyPlant context. | |
| plant_context = self._plant.GetMyMutableContextFromRoot(root_context) | |
| locked_geom_ids = [] | |
| for detail in self._targets: | |
| if detail.want_locked: | |
| locked_geom_ids.extend( | |
| self._plant.GetCollisionGeometriesForBody( | |
| self._plant.get_body(detail.body_index))) | |
| if not detail.is_locked: | |
| # TODO(dale.mcconachie) Consider quieting this. | |
| logging.info(f"Locking {detail.model_instance_name}") | |
| joint = self._plant.get_joint(detail.joint_index) | |
| joint.Lock(plant_context) | |
| detail.is_locked = True | |
| detail.colliding_bodies_when_fell_asleep = ( | |
| old_contact_pairs[detail.body_index] | |
| ) | |
| else: | |
| if detail.is_locked: | |
| # TODO(dale.mcconachie) Consider quieting this. | |
| logging.info(f"Unlocking {detail.model_instance_name}") | |
| joint = self._plant.get_joint(detail.joint_index) | |
| joint.Unlock(plant_context) | |
| detail.is_locked = False | |
| detail.colliding_bodies_when_fell_asleep = None | |
| # Update the SceneGraph context. | |
| query_object = self._plant.get_geometry_query_input_port().Eval( | |
| plant_context) | |
| new_filter = CollisionFilterDeclaration().ExcludeBetween( | |
| GeometrySet(locked_geom_ids), | |
| GeometrySet(query_object.inspector().GetAllGeometryIds())) | |
| filter_manager = self._scene_graph.collision_filter_manager( | |
| self._scene_graph.GetMyMutableContextFromRoot(root_context)) | |
| if self._geometry_filter_id is not None: | |
| filter_manager.RemoveDeclaration(self._geometry_filter_id) | |
| self._geometry_filter_id = filter_manager.ApplyTransient(new_filter) | |
| def get_colliding_bodies_when_fell_asleep( | |
| self, body_index: BodyIndex | None = None | |
| ): | |
| """ This function operates in two modes. | |
| In the first mode when a `body_index` is passed, this function returns | |
| the set of body indices that the given body indicated by `body_index` | |
| was in contact with when it fell asleep. If the body is not asleep or | |
| it is not managed by this class then `None` is returned. | |
| In the second mode, a list of sleeping contact pairs is returned in the | |
| same format as returned by `get_contact_pairs_scoped_names(...)`. | |
| """ | |
| if body_index is not None: | |
| for detail in self._targets: | |
| if detail.body_index == body_index: | |
| return detail.colliding_bodies_when_fell_asleep | |
| else: | |
| # We don't raise an error to allow users to query for bodies | |
| # that are not managed by this class. | |
| return None | |
| else: | |
| contact_pairs = [] | |
| for detail in self._targets: | |
| if detail.colliding_bodies_when_fell_asleep: | |
| index_A = detail.body_index | |
| for index_B in detail.colliding_bodies_when_fell_asleep: | |
| body_A = self._plant.get_body(index_A) | |
| body_B = self._plant.get_body(index_B) | |
| contact_pairs.append( | |
| (body_A.scoped_name(), body_B.scoped_name())) | |
| return contact_pairs | |
| def ApplyItemLockingMonitorConfig( | |
| config: ItemLockingMonitorConfig, | |
| plant: MultibodyPlant, | |
| scene_graph: SceneGraph, | |
| monitor: CompositeMonitor) -> None: | |
| """Prepares the given plant and scene_graph for free body locking. | |
| Adds the ItemLockingMonitor to the given CompositeMonitor. | |
| @pre The plant is finalized. | |
| """ | |
| monitor.add_monitor(ItemLockingMonitor(config, plant, scene_graph)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| while simulator.get_context().get_time() < t_final: | |
| monitor.HandleExternalUpdates(simulator.get_mutable_context()) | |
| simulator.AdvanceTo(t_final) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment