diff --git a/newton/examples/example_mjwarp_tendon.py b/newton/examples/example_mjwarp_tendon.py new file mode 100644 index 000000000..e79ef6dbd --- /dev/null +++ b/newton/examples/example_mjwarp_tendon.py @@ -0,0 +1,216 @@ +import os +import tempfile + +import numpy as np +import warp as wp + +import newton +import newton.solvers +import newton.utils + + +class Example: + def __init__(self, stage_path="example_mjwarp_tendon.usd", num_frames=300, headless=False): + # Simulation parameters + self.sim_time = 0.0 + self.frame_dt = 1.0 / 60 # 60 FPS + self.sim_substeps = 5 + self.sim_dt = self.frame_dt / self.sim_substeps + self.num_frames = num_frames + + # MJCF content with tendon + mjcf_content = """ + + + + + + + + + + + + + + + + + + + + + + + + + """ + + # Create temporary MJCF file + self.tmpdir = tempfile.TemporaryDirectory() + mjcf_path = os.path.join(self.tmpdir.name, "test-tendon.xml") + with open(mjcf_path, "w") as f: + f.write(mjcf_content) + + # Build model + builder = newton.ModelBuilder() + newton.utils.parse_mjcf( + mjcf_path, + builder, + collapse_fixed_joints=True, + up_axis="Z", + enable_self_collisions=False, + ) + self.model = builder.finalize() + + print("Model statistics:") + print(f" Sites: {self.model.site_count}") + print(f" Tendons: {self.model.tendon_count}") + print(f" Tendon actuators: {self.model.tendon_actuator_count}") + + # Create solver + self.solver = newton.solvers.MuJoCoSolver(self.model) + print("\nMuJoCo solver created successfully!") + + # Create states and control + self.state_0 = self.model.state() + self.state_1 = self.model.state() + self.control = self.model.control() + + # Create renderer + if not headless: + self.renderer = newton.utils.SimRendererOpenGL( + model=self.model, + path=stage_path, + scaling=1.0, + up_axis="Z", + screen_width=1280, + screen_height=720, + camera_pos=(0, 1, 3), # View from negative Y direction, looking at the pendulum + ) + elif stage_path: + self.renderer = newton.utils.SimRendererUsd(self.model, stage_path) + else: + self.renderer = None + + # Set initial tendon target to contract the cable + tendon_targets = self.control.tendon_target.numpy() + tendon_targets[0] = -0.3 # Contract by 30cm + self.control.tendon_target = wp.array(tendon_targets, dtype=wp.float32, device=self.model.device) + print(f"Set tendon target to: {tendon_targets[0]}") + + # Record initial state + self.initial_angle = self.state_0.joint_q.numpy()[0] + + def simulate(self): + for _ in range(self.sim_substeps): + self.solver.step(self.state_0, self.state_1, self.control, None, self.sim_dt) + self.state_0, self.state_1 = self.state_1, self.state_0 + + def step(self): + with wp.ScopedTimer("step", active=False): + self.simulate() + self.sim_time += self.frame_dt + + def render(self): + if self.renderer is None: + return + + with wp.ScopedTimer("render", active=False): + self.renderer.begin_frame(self.sim_time) + self.renderer.render(self.state_0) + + # Visualize the tendon as a line between sites + if self.model.site_count >= 2: + # Get site positions in world space + site_positions = [] + site_body_np = self.model.site_body.numpy() + site_xform_np = self.model.site_xform.numpy() + body_q_np = self.state_0.body_q.numpy() + + for i in range(2): # anchor and attach sites + body_idx = int(site_body_np[i]) + site_xform = site_xform_np[i] + + if body_idx >= 0: + # Site is attached to a body, transform to world space + body_q = body_q_np[body_idx] + body_transform = wp.transform(body_q[:3], body_q[3:7]) + site_world_pos = wp.transform_point(body_transform, wp.vec3(site_xform[:3])) + else: + # Site is in world space + site_world_pos = site_xform[:3] + + site_positions.append(site_world_pos) + + # Render line between sites + self.renderer.render_line_strip( + "tendon_cable", + site_positions, + color=(1.0, 0.5, 0.0), # Orange color for the cable + radius=0.02, + ) + + self.renderer.end_frame() + + def run(self): + print(f"\nRunning simulation for {self.num_frames} frames...") + print("Controls: SPACE to pause, TAB to skip rendering, ESC to exit") + print("Camera: Use WASD + mouse drag to move, mouse wheel to zoom") + print("Orange cable shows the tendon, gets redder/thicker with more force") + + for i in range(self.num_frames): + self.step() + self.render() + + # Print status every 0.5 seconds (30 frames at 60 FPS) + if i % 30 == 0: + angle = self.state_0.joint_q.numpy()[0] + velocity = self.state_0.joint_qd.numpy()[0] + print(f" t={self.sim_time:.1f}s: angle={np.degrees(angle):6.1f}°, velocity={velocity:6.2f} rad/s") + + # Final results + final_angle = self.state_0.joint_q.numpy()[0] + angle_change = np.degrees(final_angle - self.initial_angle) + + print("\nFinal result:") + print(f" Angle changed by: {angle_change:.1f}°") + + if abs(angle_change) > 1.0: + print("✓ SUCCESS: Tendon control is working!") + else: + print("✗ FAIL: Tendon control is NOT working - pendulum didn't move") + + # Save renderer output + if self.renderer and hasattr(self.renderer, "save"): + self.renderer.save() + print(f"\nAnimation saved to: {self.renderer.stage_path}") + + def __del__(self): + # Clean up temporary directory + if hasattr(self, "tmpdir") and self.tmpdir: + try: + self.tmpdir.cleanup() + except Exception: + pass # Ignore cleanup errors + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument("--device", type=str, default=None, help="Override the default Warp device.") + parser.add_argument( + "--stage-path", + type=lambda x: None if x == "None" else str(x), + default="example_mjwarp_tendon.usd", + help="Path to the output USD file.", + ) + parser.add_argument("--num-frames", type=int, default=300, help="Total number of frames (default: 300).") + parser.add_argument("--headless", action="store_true", help="Run in headless mode without visualization.") + + args = parser.parse_known_args()[0] + + with wp.ScopedDevice(args.device): + example = Example(stage_path=args.stage_path, num_frames=args.num_frames, headless=args.headless) + example.run() diff --git a/newton/sim/builder.py b/newton/sim/builder.py index f3dc06e76..a63bc6bdf 100644 --- a/newton/sim/builder.py +++ b/newton/sim/builder.py @@ -439,6 +439,30 @@ def __init__(self, up_axis: AxisType = Axis.Z, gravity: float = -9.81): # if setting is None, the number of worst-case number of contacts will be calculated in self.finalize() self.num_rigid_contacts_per_env = None + self.body_armature = self.default_body_armature + + # sites (reference points on bodies for tendons) + self.site_key = [] + self.site_body = [] + self.site_xform = [] + + # tendons + self.tendon_key = [] + self.tendon_type = [] + self.tendon_site_ids = [] + self.tendon_damping = [] + self.tendon_stiffness = [] + self.tendon_rest_length = [] + + # tendon actuators + self.tendon_actuator_tendon_id = [] + self.tendon_actuator_key = [] + self.tendon_actuator_ke = [] + self.tendon_actuator_kd = [] + self.tendon_actuator_force_range = [] + + # rigid articulations + # equality constraints self.equality_constraint_type = [] self.equality_constraint_body1 = [] @@ -1888,6 +1912,99 @@ def add_muscle( # return the index of the muscle return len(self.muscle_start) - 1 + def add_site( + self, + body: int, + xform: Transform | None = None, + key: str | None = None, + ) -> int: + """ + Adds a site (reference frame) to the model. + + Args: + body: The body index to which the site is attached + xform: The transform of the site relative to the body (default: identity) + key: Key of the site (optional) + + Returns: + The index of the site + """ + if xform is None: + xform = wp.transform() + + i = len(self.site_key) + self.site_key.append(key or f"site_{i}") + self.site_body.append(body) + self.site_xform.append(xform) + + return i + + def add_tendon( + self, + tendon_type: str, + site_ids: list[int], + damping: float = 0.0, + stiffness: float = 0.0, + rest_length: float | None = None, + key: str | None = None, + ) -> int: + """ + Adds a tendon to the model. + + Args: + tendon_type: The type of tendon (e.g., 'spatial') + site_ids: List of site indices that the tendon connects + damping: Damping coefficient for the tendon + stiffness: Stiffness coefficient for the tendon + rest_length: Rest length of the tendon (default: computed from initial configuration) + key: Key of the tendon (optional) + + Returns: + The index of the tendon + """ + i = len(self.tendon_key) + self.tendon_key.append(key or f"tendon_{i}") + self.tendon_type.append(tendon_type) + self.tendon_site_ids.append(site_ids) + self.tendon_damping.append(damping) + self.tendon_stiffness.append(stiffness) + self.tendon_rest_length.append(rest_length if rest_length is not None else 0.0) + + return i + + def add_tendon_actuator( + self, + tendon_id: int, + ke: float = 0.0, + kd: float = 0.0, + force_range: tuple[float, float] | None = None, + key: str | None = None, + ) -> int: + """ + Adds a tendon actuator to the model. + + Args: + tendon_id: The index of the tendon to actuate + ke: Elastic/stiffness gain for the actuator + kd: Damping gain for the actuator + force_range: Force range [min, max] for the actuator (default: [-inf, inf]) + key: Key of the actuator (optional) + + Returns: + The index of the tendon actuator + """ + if force_range is None: + force_range = (-float("inf"), float("inf")) + + i = len(self.tendon_actuator_key) + self.tendon_actuator_tendon_id.append(tendon_id) + self.tendon_actuator_key.append(key or f"tendon_actuator_{i}") + self.tendon_actuator_ke.append(ke) + self.tendon_actuator_kd.append(kd) + self.tendon_actuator_force_range.append(force_range) + + return i + # region shapes def add_shape( @@ -3707,6 +3824,43 @@ def finalize(self, device: Devicelike | None = None, requires_grad: bool = False m.muscle_points = wp.array(self.muscle_points, dtype=wp.vec3, requires_grad=requires_grad) m.muscle_activations = wp.array(self.muscle_activations, dtype=wp.float32, requires_grad=requires_grad) + # ----------------------- + # sites, tendons, and tendon actuators + + m.site_key = self.site_key + m.site_body = wp.array(self.site_body, dtype=wp.int32) if self.site_body else None + m.site_xform = wp.array(self.site_xform, dtype=wp.transform) if self.site_xform else None + + m.tendon_key = self.tendon_key + m.tendon_type = self.tendon_type + m.tendon_site_ids = self.tendon_site_ids + m.tendon_damping = wp.array(self.tendon_damping, dtype=wp.float32) if self.tendon_damping else None + m.tendon_stiffness = wp.array(self.tendon_stiffness, dtype=wp.float32) if self.tendon_stiffness else None + m.tendon_rest_length = ( + wp.array(self.tendon_rest_length, dtype=wp.float32) if self.tendon_rest_length else None + ) + + m.tendon_actuator_tendon_id = ( + wp.array(self.tendon_actuator_tendon_id, dtype=wp.int32) if self.tendon_actuator_tendon_id else None + ) + m.tendon_actuator_key = self.tendon_actuator_key + m.tendon_actuator_ke = ( + wp.array(self.tendon_actuator_ke, dtype=wp.float32) if self.tendon_actuator_ke else None + ) + m.tendon_actuator_kd = ( + wp.array(self.tendon_actuator_kd, dtype=wp.float32) if self.tendon_actuator_kd else None + ) + m.tendon_actuator_force_range = ( + wp.array(self.tendon_actuator_force_range, dtype=wp.vec2) if self.tendon_actuator_force_range else None + ) + + # Initialize tendon control arrays + if self.tendon_actuator_key: + # Initialize with zeros (no default targets for tendons unlike joints) + m.tendon_target = wp.zeros( + len(self.tendon_actuator_key), dtype=wp.float32, device=device, requires_grad=requires_grad + ) + # -------------------------------------- # rigid bodies @@ -3874,6 +4028,9 @@ def finalize(self, device: Devicelike | None = None, requires_grad: bool = False m.edge_count = len(self.edge_rest_angle) m.spring_count = len(self.spring_rest_length) m.muscle_count = len(self.muscle_start) + m.site_count = len(self.site_key) + m.tendon_count = len(self.tendon_key) + m.tendon_actuator_count = len(self.tendon_actuator_key) m.articulation_count = len(self.articulation_start) m.equality_constraint_count = len(self.equality_constraint_type) diff --git a/newton/sim/control.py b/newton/sim/control.py index fb944dc1b..bc07ef20c 100644 --- a/newton/sim/control.py +++ b/newton/sim/control.py @@ -50,6 +50,13 @@ def __init__(self): The joint targets are defined for any joint type, except for free joints. """ + self.tendon_target: wp.array | None = None + """ + Array of tendon targets with shape ``(tendon_count,)`` and type ``float``. + Tendon targets define the target length or target velocity for each tendon actuator, + depending on the tendon actuator configuration. + """ + self.tri_activations: wp.array | None = None """Array of triangle element activations with shape ``(tri_count,)`` and type ``float``.""" diff --git a/newton/sim/model.py b/newton/sim/model.py index 84e498d31..4a6db8fca 100644 --- a/newton/sim/model.py +++ b/newton/sim/model.py @@ -190,6 +190,37 @@ def __init__(self, device: Devicelike | None = None): self.muscle_activations = None """Muscle activations, shape [muscle_count], float.""" + self.site_key = [] + """List of site keys.""" + self.site_body = None + """Body index for each site, shape [site_count], int.""" + self.site_xform = None + """Transform (position and orientation) for each site relative to its body, shape [site_count, 7], float.""" + + self.tendon_key = [] + """List of tendon keys.""" + self.tendon_type = [] + """Type of each tendon ('spatial', etc.), shape [tendon_count], string.""" + self.tendon_site_ids = [] + """List of site IDs for each tendon. For spatial tendons, this is a list of site indices.""" + self.tendon_damping = None + """Damping coefficient for each tendon, shape [tendon_count], float.""" + self.tendon_stiffness = None + """Stiffness coefficient for each tendon, shape [tendon_count], float.""" + self.tendon_rest_length = None + """Rest length for each tendon, shape [tendon_count], float.""" + + self.tendon_actuator_tendon_id = None + """Tendon index for each tendon actuator, shape [tendon_actuator_count], int.""" + self.tendon_actuator_key = [] + """List of tendon actuator keys.""" + self.tendon_actuator_ke = None + """Elastic/stiffness gain for each tendon actuator, shape [tendon_actuator_count], float.""" + self.tendon_actuator_kd = None + """Damping gain for each tendon actuator, shape [tendon_actuator_count], float.""" + self.tendon_actuator_force_range = None + """Force range [min, max] for each tendon actuator, shape [tendon_actuator_count, 2], float.""" + self.body_q = None """Poses of rigid bodies used for state initialization, shape [body_count, 7], float.""" self.body_qd = None @@ -215,6 +246,8 @@ def __init__(self, device: Devicelike | None = None): """Generalized joint forces used for state initialization, shape [joint_dof_count], float.""" self.joint_target = None """Generalized joint target inputs, shape [joint_dof_count], float.""" + self.tendon_target = None + """Tendon position target for control, shape [tendon_actuator_count], float.""" self.joint_type = None """Joint type, shape [joint_count], int.""" self.joint_parent = None @@ -342,6 +375,13 @@ def __init__(self, device: Devicelike | None = None): """Total number of velocity degrees of freedom of all joints in the system. Equals the number of joint axes.""" self.joint_coord_count = 0 """Total number of position degrees of freedom of all joints in the system.""" + self.site_count = 0 + """Total number of sites in the system.""" + self.tendon_count = 0 + """Total number of tendons in the system.""" + self.tendon_actuator_count = 0 + """Total number of tendon actuators in the system.""" + self.equality_constraint_count = 0 """Total number of equality constraints in the system.""" @@ -417,6 +457,16 @@ def __init__(self, device: Devicelike | None = None): self.attribute_frequency["shape_scale"] = "shape" self.attribute_frequency["shape_filter"] = "shape" + # attributes per tendon + self.attribute_frequency["tendon_damping"] = "tendon" + self.attribute_frequency["tendon_stiffness"] = "tendon" + self.attribute_frequency["tendon_rest_length"] = "tendon" + self.attribute_frequency["tendon_type"] = "tendon" + self.attribute_frequency["tendon_target"] = "tendon" + self.attribute_frequency["tendon_actuator_kp"] = "tendon" + self.attribute_frequency["tendon_actuator_kv"] = "tendon" + self.attribute_frequency["tendon_actuator_force_range"] = "tendon" + def state(self, requires_grad: bool | None = None) -> State: """Returns a state object for the model @@ -478,12 +528,17 @@ def control(self, requires_grad: bool | None = None, clone_variables: bool = Tru c.tet_activations = wp.clone(self.tet_activations, requires_grad=requires_grad) if self.muscle_count: c.muscle_activations = wp.clone(self.muscle_activations, requires_grad=requires_grad) + if self.tendon_actuator_count: + c.tendon_target = wp.clone(self.tendon_target, requires_grad=requires_grad) + else: c.joint_target = self.joint_target c.joint_f = self.joint_f c.tri_activations = self.tri_activations c.tet_activations = self.tet_activations c.muscle_activations = self.muscle_activations + c.tendon_target = self.tendon_target + return c def collide( diff --git a/newton/solvers/mujoco/solver_mujoco.py b/newton/solvers/mujoco/solver_mujoco.py index 1904faac8..acbe254ae 100644 --- a/newton/solvers/mujoco/solver_mujoco.py +++ b/newton/solvers/mujoco/solver_mujoco.py @@ -627,6 +627,20 @@ def apply_mjc_qfrc_kernel( qfrc_applied[worldid, qd_i + i] = joint_f[wqd_i + i] +@wp.kernel +def apply_mjc_tendon_control_kernel( + tendon_target: wp.array(dtype=wp.float32), + tendon_actuator_to_actuator: wp.array(dtype=wp.int32), + tendons_per_env: int, + # outputs + mj_act: wp.array2d(dtype=wp.float32), +): + worldid, tendon_idx = wp.tid() + actuator_id = tendon_actuator_to_actuator[tendon_idx] + if actuator_id != -1: # Valid mapping + mj_act[worldid, actuator_id] = tendon_target[worldid * tendons_per_env + tendon_idx] + + @wp.func def eval_single_articulation_fk( joint_start: int, @@ -1321,6 +1335,31 @@ def apply_mjc_control(model: Model, state: State, control: Control | None, mj_da ], device=model.device, ) + + # Apply tendon control if available + if control.tendon_target is not None: + # Validate tendon_target dimensions + if len(control.tendon_target) != model.tendon_actuator_count: + raise ValueError( + f"Expected {model.tendon_actuator_count} tendon targets, got {len(control.tendon_target)}" + ) + # Apply tendon control using the tendon actuator mapping + if hasattr(model, "mjc_tendon_actuator_to_actuator"): + tendons_per_env = model.tendon_actuator_count // nworld + wp.launch( + apply_mjc_tendon_control_kernel, + dim=(nworld, tendons_per_env), + inputs=[ + control.tendon_target, + model.mjc_tendon_actuator_to_actuator, + tendons_per_env, + ], + outputs=[ + ctrl, + ], + device=model.device, + ) + wp.launch( apply_mjc_qfrc_kernel, dim=(nworld, joints_per_env), @@ -1784,6 +1823,8 @@ def quat_from_mjc(q): # mapping from joint axis to actuator index axis_to_actuator = np.zeros((model.joint_dof_count,), dtype=np.int32) - 1 actuator_count = 0 + # mapping from tendon actuator to MuJoCo actuator index + tendon_actuator_to_actuator = np.zeros((model.tendon_actuator_count,), dtype=np.int32) - 1 # supported non-fixed joint types in MuJoCo (fixed joints are handled by nesting bodies) supported_joint_types = { @@ -2168,6 +2209,103 @@ def add_geoms(warp_body_id: int, incoming_xform: wp.transform | None = None): add_geoms(child, incoming_xform=child_tf) + # ----------------------- + # add sites to MuJoCo bodies + + if hasattr(model, "site_key") and model.site_count > 0: + site_key = model.site_key + site_body = model.site_body.numpy() if model.site_body is not None else [] + site_xform_array = model.site_xform.numpy() if model.site_xform is not None else [] + + for site_idx in range(model.site_count): + body_idx = site_body[site_idx] + # Extract transform components from numpy array + site_tf_data = site_xform_array[site_idx] + site_pos = site_tf_data[:3] # First 3 elements are position + site_quat = site_tf_data[3:7] # Next 4 elements are quaternion (xyzw) + + # Find the MuJoCo body to attach the site to + if body_idx == -1: # Worldbody + mj_body = spec.worldbody + else: + mj_body = mj_bodies[body_mapping[body_idx]] + + # Add the site to the MuJoCo body + mj_body.add_site( + name=site_key[site_idx], + pos=site_pos, + quat=quat_to_mjc(site_quat), + ) + + # ----------------------- + # add tendons to MuJoCo model + + if hasattr(model, "tendon_key") and model.tendon_count > 0: + for tendon_idx in range(model.tendon_count): + tendon_key = model.tendon_key[tendon_idx] + tendon_type = model.tendon_type[tendon_idx] + + if tendon_type == "spatial": + # Add tendon using the correct API + tendon = spec.add_tendon(name=tendon_key) + + # Add sites to the tendon using wrap_site + site_ids = model.tendon_site_ids[tendon_idx] + for site_id in site_ids: + tendon.wrap_site(model.site_key[site_id]) + + # Set tendon properties if available + if model.tendon_damping is not None: + tendon.damping = float(model.tendon_damping.numpy()[tendon_idx]) + if model.tendon_stiffness is not None: + tendon.stiffness = float(model.tendon_stiffness.numpy()[tendon_idx]) + else: + wp.utils.warn( + f"Tendon type '{tendon_type}' is not supported by MuJoCo solver, skipping tendon '{tendon_key}'" + ) + + # ----------------------- + # add tendon actuators + + if hasattr(model, "tendon_actuator_key") and model.tendon_actuator_count > 0: + tendon_actuator_ids = ( + model.tendon_actuator_tendon_id.numpy() if model.tendon_actuator_tendon_id is not None else [] + ) + for act_idx in range(model.tendon_actuator_count): + tendon_id = int(tendon_actuator_ids[act_idx]) + tendon_key = model.tendon_key[tendon_id] + + # Create actuator args + act_args = { + "name": model.tendon_actuator_key[act_idx], + "target": tendon_key, + "trntype": mujoco.mjtTrn.mjTRN_TENDON, + } + + # Set actuator parameters + if model.tendon_actuator_ke is not None: + ke_values = model.tendon_actuator_ke.numpy() + ke = float(ke_values[act_idx]) + if model.tendon_actuator_kd is not None: + kd_values = model.tendon_actuator_kd.numpy() + kd = float(kd_values[act_idx]) + else: + kd = 0.0 + + # For position control actuators (MuJoCo uses kp/kv terminology) + act_args["gainprm"] = [ke, 0, 0, 0, 0, 0, 0, 0, 0, 0] + act_args["biasprm"] = [0.0, -ke, -kd, 0, 0, 0, 0, 0, 0, 0] + + # Set force range if available + if model.tendon_actuator_force_range is not None: + force_range_values = model.tendon_actuator_force_range.numpy() + force_range = force_range_values[act_idx] + act_args["forcerange"] = [float(force_range[0]), float(force_range[1])] + + spec.add_actuator(**act_args) + tendon_actuator_to_actuator[act_idx] = actuator_count + actuator_count += 1 + for i, typ in enumerate(eq_constraint_type): if typ == newton.EQ_CONNECT: eq = spec.add_equality(objtype=mujoco.mjtObj.mjOBJ_BODY) @@ -2285,6 +2423,12 @@ def add_geoms(warp_body_id: int, incoming_xform: wp.transform | None = None): with wp.ScopedDevice(model.device): # mapping from Newton joint axis index to MJC actuator index model.mjc_axis_to_actuator = wp.array(axis_to_actuator, dtype=wp.int32) # pyright: ignore[reportAttributeAccessIssue] + # mapping from Newton tendon actuator index to MJC actuator index + model.mjc_tendon_actuator_to_actuator = wp.array(tendon_actuator_to_actuator, dtype=wp.int32) # pyright: ignore[reportAttributeAccessIssue] + # store joint actuator count (before tendon actuators) + model.joint_actuator_count = actuator_count - model.tendon_actuator_count # pyright: ignore[reportAttributeAccessIssue] + # store total actuator count (including tendon actuators) + model.actuator_count = actuator_count # pyright: ignore[reportAttributeAccessIssue] # mapping from MJC body index to Newton body index (skip world index -1) reverse_body_mapping = {v: k for k, v in body_mapping.items()} model.to_mjc_body_index = wp.array( # pyright: ignore[reportAttributeAccessIssue] diff --git a/newton/tests/test_import_mjcf.py b/newton/tests/test_import_mjcf.py index cd16af4ba..8d10ec812 100644 --- a/newton/tests/test_import_mjcf.py +++ b/newton/tests/test_import_mjcf.py @@ -212,6 +212,171 @@ def test_inertia_rotation(self): # Verify that the rotation was actually applied (not just identity) assert not np.allclose(actual_inertia, original_inertia, atol=1e-6) + def test_site_parsing(self): + """Test that sites are parsed from MJCF files""" + mjcf_content = """ + + + + + + + + + + + +""" + + with tempfile.TemporaryDirectory() as tmpdir: + mjcf_path = os.path.join(tmpdir, "test_sites.xml") + with open(mjcf_path, "w") as f: + f.write(mjcf_content) + + # Parse MJCF + builder = newton.ModelBuilder() + newton.utils.parse_mjcf(mjcf_path, builder) + model = builder.finalize() + + # Check sites were parsed + self.assertEqual(model.site_count, 3) + self.assertEqual(len(model.site_key), 3) + + # Check site keys + self.assertIn("world_site", model.site_key) + self.assertIn("body_site1", model.site_key) + self.assertIn("body_site2", model.site_key) + + # Check site bodies (world is -1, body1 is 0) + site_bodies = model.site_body.numpy() if model.site_body is not None else [] + world_site_idx = model.site_key.index("world_site") + body_site1_idx = model.site_key.index("body_site1") + + self.assertEqual(site_bodies[world_site_idx], -1) # worldbody + self.assertEqual(site_bodies[body_site1_idx], 0) # first body + + def test_tendon_parsing(self): + """Test that spatial tendons are parsed from MJCF files""" + mjcf_content = """ + + + + + + + + + + + + + + + + + + + + + + + +""" + + with tempfile.TemporaryDirectory() as tmpdir: + mjcf_path = os.path.join(tmpdir, "test_tendons.xml") + with open(mjcf_path, "w") as f: + f.write(mjcf_content) + + # Parse MJCF + builder = newton.ModelBuilder() + newton.utils.parse_mjcf(mjcf_path, builder) + model = builder.finalize() + + # Check tendons were parsed + self.assertEqual(model.tendon_count, 2) + self.assertEqual(len(model.tendon_key), 2) + + # Check tendon keys + self.assertIn("tendon1", model.tendon_key) + self.assertIn("tendon2", model.tendon_key) + + # Check tendon properties + tendon1_idx = model.tendon_key.index("tendon1") + self.assertEqual(model.tendon_type[tendon1_idx], "spatial") + + if model.tendon_damping is not None: + damping_values = model.tendon_damping.numpy() + self.assertAlmostEqual(damping_values[tendon1_idx], 0.5) + if model.tendon_stiffness is not None: + stiffness_values = model.tendon_stiffness.numpy() + self.assertAlmostEqual(stiffness_values[tendon1_idx], 100.0) + + # Check tendon site connections + self.assertEqual(len(model.tendon_site_ids[tendon1_idx]), 2) # 2 sites + self.assertEqual(len(model.tendon_site_ids[1]), 3) # 3 sites for tendon2 + + def test_tendon_actuator_parsing(self): + """Test that tendon actuators are parsed from MJCF files""" + mjcf_content = """ + + + + + + + + + + + + + + + + + + + + + + +""" + + with tempfile.TemporaryDirectory() as tmpdir: + mjcf_path = os.path.join(tmpdir, "test_actuators.xml") + with open(mjcf_path, "w") as f: + f.write(mjcf_content) + + # Parse MJCF + builder = newton.ModelBuilder() + newton.utils.parse_mjcf(mjcf_path, builder) + model = builder.finalize() + + # Check tendon actuator was parsed + self.assertEqual(model.tendon_actuator_count, 1) + self.assertEqual(len(model.tendon_actuator_key), 1) + + # Check actuator key + self.assertEqual(model.tendon_actuator_key[0], "tendon_act1") + + # Check actuator properties + if model.tendon_actuator_ke is not None: + ke_values = model.tendon_actuator_ke.numpy() + self.assertAlmostEqual(ke_values[0], 300.0) + if model.tendon_actuator_kd is not None: + kd_values = model.tendon_actuator_kd.numpy() + self.assertAlmostEqual(kd_values[0], 10.0) + if model.tendon_actuator_force_range is not None: + force_range = model.tendon_actuator_force_range.numpy() + self.assertAlmostEqual(force_range[0][0], -50.0) + self.assertAlmostEqual(force_range[0][1], 50.0) + + # Check actuator references correct tendon + tendon_ids = model.tendon_actuator_tendon_id.numpy() + tendon_id = tendon_ids[0] + self.assertEqual(model.tendon_key[tendon_id], "tendon1") + if __name__ == "__main__": wp.clear_kernel_cache() diff --git a/newton/tests/test_tendon_control.py b/newton/tests/test_tendon_control.py new file mode 100644 index 000000000..be353ebce --- /dev/null +++ b/newton/tests/test_tendon_control.py @@ -0,0 +1,239 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 The Newton Developers +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import tempfile +import unittest + +import warp as wp + +import newton +import newton.examples +from newton.utils.selection import ArticulationView + + +class TestTendonControl(unittest.TestCase): + def test_tendon_control_integration(self): + """Test full integration of tendon control from MJCF to simulation""" + mjcf_content = """ + + + + + + + + + + + + + + + + + + + + + + +""" + + with tempfile.TemporaryDirectory() as tmpdir: + mjcf_path = os.path.join(tmpdir, "test_tendon.xml") + with open(mjcf_path, "w") as f: + f.write(mjcf_content) + + # Parse with Newton + builder = newton.ModelBuilder() + newton.utils.parse_mjcf( + mjcf_path, + builder, + collapse_fixed_joints=True, + up_axis="Z", + enable_self_collisions=False, + ) + model = builder.finalize() + + # Verify model structure + self.assertEqual(model.site_count, 2) + self.assertEqual(model.tendon_count, 1) + self.assertEqual(model.tendon_actuator_count, 1) + self.assertEqual(model.joint_count, 1) + + # Create states and control + state_0 = model.state() + state_1 = model.state() + control = model.control() + + # Verify control has tendon arrays + self.assertIsNotNone(control.tendon_target) + self.assertEqual(len(control.tendon_target), 1) + + # Set tendon target + # Need to use numpy to modify warp array + tendon_targets = control.tendon_target.numpy() + tendon_targets[0] = -0.05 # Contract tendon by 5cm + control.tendon_target = wp.array(tendon_targets, dtype=wp.float32, device=model.device) + + # Create solver - let it handle MuJoCo model creation internally + solver = newton.solvers.MuJoCoSolver(model) + + # Record initial joint position + initial_joint_pos = state_0.joint_q.numpy()[0] + + # Simulate + dt = 0.001 + for _ in range(100): + solver.step(state_0, state_1, control, None, dt) + state_0, state_1 = state_1, state_0 + + # Verify joint moved due to tendon actuation + final_joint_pos = state_0.joint_q.numpy()[0] + self.assertNotAlmostEqual(initial_joint_pos, final_joint_pos, places=3) + # Joint should have rotated (positive direction due to tendon contraction pulling site1 towards site0) + self.assertGreater(final_joint_pos, initial_joint_pos) + + def test_control_initialization(self): + """Test that tendon control arrays are properly initialized""" + builder = newton.ModelBuilder() + + # Add a simple model with tendons + builder.add_body(xform=wp.transform()) + site1 = builder.add_site(0, wp.transform(wp.vec3(0, 0, 0)), key="site1") + site2 = builder.add_site(0, wp.transform(wp.vec3(1, 0, 0)), key="site2") + + tendon_id = builder.add_tendon( + tendon_type="spatial", site_ids=[site1, site2], stiffness=100.0, key="test_tendon" + ) + + builder.add_tendon_actuator(tendon_id=tendon_id, ke=50.0, key="test_actuator") + + model = builder.finalize() + + # Check control initialization + control = model.control() + self.assertIsNotNone(control.tendon_target) + self.assertEqual(len(control.tendon_target), 1) + + # Check initial values are zero + self.assertEqual(control.tendon_target.numpy()[0], 0.0) + + def test_tendon_control_selection(self): + """Test that tendon can be controlled via Selection API""" + mjcf_content = """ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + """ + with tempfile.TemporaryDirectory() as tmpdir: + mjcf_path = os.path.join(tmpdir, "test_tendon.xml") + with open(mjcf_path, "w") as f: + f.write(mjcf_content) + + # Parse with Newton + builder = newton.ModelBuilder() + newton.utils.parse_mjcf( + mjcf_path, + builder, + collapse_fixed_joints=True, + up_axis="Z", + enable_self_collisions=False, + ) + model = builder.finalize() + + # Verify model structure + self.assertEqual(model.site_count, 3) + self.assertEqual(model.tendon_count, 2) + self.assertEqual(model.tendon_actuator_count, 2) + self.assertEqual(model.joint_count, 2) + + # Create states and control + state_0 = model.state() + state_1 = model.state() + control = model.control() + + # Verify control has tendon arrays + self.assertIsNotNone(control.tendon_target) + self.assertEqual(len(control.tendon_target), 2) + + # Set tendon target via selection + tendons = ArticulationView(model, "articulation_1") + self.assertEqual(tendons.get_attribute("tendon_target", control).numpy()[0][0], 0) + self.assertEqual(tendons.get_attribute("tendon_target", control).numpy()[0][1], 0) + tendons.set_attribute("tendon_target", control, [[-1.0, -1.0]]) + self.assertEqual(tendons.get_attribute("tendon_target", control).numpy()[0][0], -1.0) + self.assertEqual(tendons.get_attribute("tendon_target", control).numpy()[0][1], -1.0) + + tendons = ArticulationView(model, "articulation_1", exclude_tendons=[0]) + tendons.set_attribute("tendon_target", control, [[-1.0]]) + + # Create solver - let it handle MuJoCo model creation internally + solver = newton.solvers.MuJoCoSolver(model) + + # Record initial joint position + initial_joint0_pos = state_0.joint_q.numpy()[0] + initial_joint1_pos = state_0.joint_q.numpy()[1] + + # Simulate + dt = 0.001 + for _ in range(100): + solver.step(state_0, state_1, control, None, dt) + state_0, state_1 = state_1, state_0 + + # Verify joint moved due to tendon actuation + final_joint0_pos = state_0.joint_q.numpy()[0] + final_joint1_pos = state_0.joint_q.numpy()[1] + self.assertNotAlmostEqual(initial_joint0_pos, final_joint0_pos, places=3) + self.assertNotAlmostEqual(initial_joint1_pos, final_joint1_pos, places=3) + # Joint should have rotated (positive direction due to gravity) + self.assertGreater(final_joint0_pos, initial_joint0_pos) + # Joint should have rotated (negative direction due to tendon contraction pulling site2 towards site1) + self.assertLess(final_joint1_pos, initial_joint1_pos) + + +if __name__ == "__main__": + wp.clear_kernel_cache() + unittest.main(verbosity=2) diff --git a/newton/utils/import_mjcf.py b/newton/utils/import_mjcf.py index 810767a52..956090ec0 100644 --- a/newton/utils/import_mjcf.py +++ b/newton/utils/import_mjcf.py @@ -751,6 +751,19 @@ def parse_body(body, parent, incoming_defaults: dict, childclass: str | None = N builder.body_inertia[link] = I_m builder.body_inv_inertia[link] = wp.inverse(I_m) + # ----------------- + # parse sites + + for site in body.findall("site"): + site_attrib = site.attrib + site_name = site_attrib.get("name", f"{body_name}_site_{len(builder.site_key)}") + site_pos = parse_vec(site_attrib, "pos", (0.0, 0.0, 0.0)) * scale + site_quat = parse_orientation(site_attrib) + site_xform = wp.transform(site_pos, site_quat) + + # Add the site to the builder + builder.add_site(link, site_xform, key=site_name) + # ----------------- # recurse @@ -896,6 +909,23 @@ def parse_common_attributes(element): incoming_xform=xform, ) + # ----------------- + # add worldbody sites + + for site in world.findall("site"): + site_attrib = site.attrib + site_name = site_attrib.get("name", f"world_site_{len(builder.site_key)}") + site_pos = parse_vec(site_attrib, "pos", (0.0, 0.0, 0.0)) * scale + site_quat = parse_orientation(site_attrib) + site_xform = wp.transform(site_pos, site_quat) + + # Apply world transform if provided + if xform is not None: + site_xform = xform * site_xform + + # Add the site to the builder (worldbody has index -1) + builder.add_site(-1, site_xform, key=site_name) + # ----------------- # add equality constraints @@ -916,5 +946,76 @@ def parse_common_attributes(element): for j in range(i + 1, end_shape_count): builder.shape_collision_filter_pairs.add((i, j)) + # ----------------- + # parse tendons + + # Create a mapping from site names to site indices + site_name_to_index = {name: i for i, name in enumerate(builder.site_key)} + + tendon_root = root.find("tendon") + if tendon_root is not None: + for spatial_tendon in tendon_root.findall("spatial"): + tendon_name = spatial_tendon.attrib.get("name", f"spatial_tendon_{len(builder.tendon_key)}") + + # Parse tendon properties + damping = parse_float(spatial_tendon.attrib, "damping", 0.0) + stiffness = parse_float(spatial_tendon.attrib, "stiffness", 0.0) + + # Collect site indices for this tendon + site_ids = [] + for site_element in spatial_tendon.findall("site"): + site_attr = site_element.attrib.get("site") + if site_attr and site_attr in site_name_to_index: + site_ids.append(site_name_to_index[site_attr]) + else: + print(f"Warning: Site '{site_attr}' referenced in tendon '{tendon_name}' not found") + + if len(site_ids) >= 2: # A tendon needs at least 2 sites + builder.add_tendon( + tendon_type="spatial", + site_ids=site_ids, + damping=damping, + stiffness=stiffness, + key=tendon_name, + ) + else: + print(f"Warning: Tendon '{tendon_name}' has fewer than 2 sites, skipping") + + # ----------------- + # parse actuators + + # Create a mapping from tendon names to tendon indices + tendon_name_to_index = {name: i for i, name in enumerate(builder.tendon_key)} + + actuator_root = root.find("actuator") + if actuator_root is not None: + for actuator in actuator_root: + # Check if this actuator references a tendon + tendon_ref = actuator.attrib.get("tendon") + if tendon_ref and tendon_ref in tendon_name_to_index: + actuator_name = actuator.attrib.get("name", f"tendon_actuator_{len(builder.tendon_actuator_key)}") + tendon_id = tendon_name_to_index[tendon_ref] + + # Parse actuator properties + # For position actuators, kp maps to ke (elastic gain) + ke = parse_float(actuator.attrib, "kp", 0.0) + kd = parse_float(actuator.attrib, "kv", 0.0) + + # Parse force range if specified + forcerange = actuator.attrib.get("forcerange") + if forcerange: + force_vals = np.fromstring(forcerange, sep=" ", dtype=np.float32) + force_range = (force_vals[0], force_vals[1]) + else: + force_range = None + + builder.add_tendon_actuator( + tendon_id=tendon_id, + ke=ke, + kd=kd, + force_range=force_range, + key=actuator_name, + ) + if collapse_fixed_joints: builder.collapse_fixed_joints() diff --git a/newton/utils/selection.py b/newton/utils/selection.py index 8533ceb22..8975b261a 100644 --- a/newton/utils/selection.py +++ b/newton/utils/selection.py @@ -135,6 +135,8 @@ def __init__( exclude_links: list[str | int] | None = None, include_joint_types: list[int] | None = None, exclude_joint_types: list[int] | None = None, + include_tendons: list[str | int] | None = None, + exclude_tendons: list[str | int] | None = None, verbose: bool | None = None, ): self.model = model @@ -159,6 +161,7 @@ def __init__( model_joint_q_start = model.joint_q_start.numpy() model_joint_qd_start = model.joint_qd_start.numpy() model_shape_body = model.shape_body.numpy() + model_tendon_count = model.tendon_actuator_count # FIXME: # - this assumes homogeneous envs with one selected articulation per env @@ -176,6 +179,8 @@ def __init__( arti_joint_types = [] arti_link_ids = [] arti_link_names = [] + arti_tendon_indices = [] + arti_tendon_names = [] def get_name_from_key(key): return key.split("/")[-1] @@ -189,6 +194,10 @@ def get_name_from_key(key): arti_link_ids.append(int(link_id)) arti_link_names.append(get_name_from_key(model.body_key[link_id])) + for idx in range(model_tendon_count): + arti_tendon_indices.append(int(idx)) + arti_tendon_names.append(get_name_from_key(model.tendon_key[idx])) + # create joint inclusion set if include_joints is None and include_joint_types is None: joint_include_indices = set(range(arti_joint_count)) @@ -259,15 +268,46 @@ def get_name_from_key(key): else: raise TypeError(f"Link ids must be strings or integers, got {id} of type {type(id)}") + if include_tendons is None: + tendon_include_indices = set(range(model_tendon_count)) + else: + tendon_include_indices = set() + if include_tendons is not None: + for id in include_tendons: + if isinstance(id, str): + for idx, name in enumerate(arti_tendon_names): + if fnmatch(name, id): + tendon_include_indices.add(idx) + elif isinstance(id, int): + if id >= 0 and id < model_tendon_count: + tendon_include_indices.add(id) + else: + raise TypeError(f"Tendon ids must be strings or integers, got {id} of type {type(id)}") + + tendon_exclude_indices = set() + if exclude_tendons is not None: + for id in exclude_tendons: + if isinstance(id, str): + for idx, name in enumerate(arti_tendon_names): + if fnmatch(name, id): + tendon_exclude_indices.add(idx) + elif isinstance(id, int): + if id >= 0 and id < model_tendon_count: + tendon_exclude_indices.add(id) + else: + raise TypeError(f"Tendon ids must be strings or integers, got {id} of type {type(id)}") + # compute selected indices selected_joint_indices = sorted(joint_include_indices - joint_exclude_indices) selected_link_indices = sorted(link_include_indices - link_exclude_indices) + selected_tendon_indices = sorted(tendon_include_indices - tendon_exclude_indices) selected_joint_ids = [] selected_joint_dof_ids = [] selected_joint_coord_ids = [] selected_link_ids = [] selected_shape_ids = [] + selected_tendon_ids = [] self.joint_names = [] self.joint_dof_names = [] @@ -277,6 +317,7 @@ def get_name_from_key(key): self.body_names = [] self.shape_names = [] self.body_shapes = [] + self.tendon_names = [] # populate info for selected joints and dofs for idx in selected_joint_indices: @@ -337,6 +378,10 @@ def get_name_from_key(key): self.shape_names.append(get_name_from_key(model.shape_key[shape_id])) self.body_shapes.append(shape_index_list) + for idx in selected_tendon_indices: + selected_tendon_ids.append(arti_tendon_indices[idx]) + self.tendon_names.append(arti_tendon_names[idx]) + # selected counts self.count = articulation_count self.joint_count = len(selected_joint_ids) @@ -373,6 +418,7 @@ def is_contiguous_slice(indices): self.joint_coords_contiguous = is_contiguous_slice(selected_joint_coord_ids) self.links_contiguous = is_contiguous_slice(selected_link_ids) self.shapes_contiguous = is_contiguous_slice(selected_shape_ids) + self.tendons_contiguous = is_contiguous_slice(selected_tendon_ids) # contiguous slices or indices by attribute frequency # @@ -409,6 +455,11 @@ def is_contiguous_slice(indices): else: self._frequency_indices["shape"] = wp.array(selected_shape_ids, dtype=int, device=self.device) + if self.tendons_contiguous and len(selected_tendon_ids) > 0: + self._frequency_slices["tendon"] = slice(selected_tendon_ids[0], selected_tendon_ids[-1] + 1) + else: + self._frequency_indices["tendon"] = wp.array(selected_tendon_ids, dtype=int, device=self.device) + self.articulation_indices = wp.array(articulation_ids, dtype=int, device=self.device) # TODO: zero-stride mask would use less memory