"""Deterministic simulation replay — roll out recorded actions in MuJoCo. Given a parameter vector and a recorded action sequence, builds a MuJoCo model with overridden physics parameters, replays the actions, and returns the simulated trajectory for comparison with the real recording. This module is the inner loop of the CMA-ES optimizer: it is called once per candidate parameter vector per generation. All motor + pendulum/arm parameters are optimised jointly from a single full-system recording. The asymmetric motor model (deadzone, gear compensation, Coulomb friction + Stribeck, viscous damping, action bias) is applied via ``ActuatorConfig.transform_ctrl()`` and ``compute_motor_force()``. """ from __future__ import annotations import dataclasses import os import tempfile import xml.etree.ElementTree as ET from pathlib import Path import mujoco import numpy as np import yaml from src.core.robot import ActuatorConfig, JointConfig, RobotConfig from src.runners.mujoco import ActuatorLimits, load_mujoco_model from src.sysid._urdf import patch_link_inertials # ── Tunable parameter specification ────────────────────────────────── @dataclasses.dataclass class ParamSpec: """Specification for a single tunable parameter.""" name: str default: float lower: float upper: float log_scale: bool = False # optimise in log-space (masses, inertias) # ── Motor parameters (13) ──────────────────────────────────────────── # Defaults from motor-only sysid (cost 0.2117). MOTOR_PARAMS: list[ParamSpec] = [ ParamSpec("actuator_gear_pos", 0.371194, 0.005, 1.5, log_scale=True), ParamSpec("actuator_gear_neg", 0.428143, 0.005, 1.5, log_scale=True), ParamSpec("actuator_filter_tau", 0.022301, 0.001, 0.20), ParamSpec("motor_damping_pos", 0.001384, 1e-5, 0.1, log_scale=True), ParamSpec("motor_damping_neg", 0.005196, 1e-5, 0.1, log_scale=True), ParamSpec("motor_armature", 0.002753, 1e-6, 0.01, log_scale=True), ParamSpec("motor_frictionloss_pos", 0.036744, 0.001, 0.2, log_scale=True), ParamSpec("motor_frictionloss_neg", 0.069082, 0.001, 0.2, log_scale=True), ParamSpec("stribeck_friction_boost", 0.0, 0.0, 0.15), ParamSpec("stribeck_vel", 2.0, 0.1, 8.0), ParamSpec("motor_deadzone_pos", 0.14182, 0.0, 0.30), ParamSpec("motor_deadzone_neg", 0.031454, 0.0, 0.30), ParamSpec("action_bias", 0.0, -0.10, 0.10), ] # ── Pendulum/arm parameters (15) ───────────────────────────────────── # Defaults from Fusion 360 URDF export. PENDULUM_PARAMS: list[ParamSpec] = [ ParamSpec("arm_mass", 0.02110, 0.005, 0.08, log_scale=True), ParamSpec("arm_com_x", -0.00710, -0.03, 0.03), ParamSpec("arm_com_y", 0.00085, -0.02, 0.02), ParamSpec("arm_com_z", 0.00795, -0.02, 0.02), ParamSpec("pendulum_mass", 0.03937, 0.010, 0.10, log_scale=True), ParamSpec("pendulum_com_x", 0.06025, 0.01, 0.15), ParamSpec("pendulum_com_y", -0.07602, -0.20, 0.0), ParamSpec("pendulum_com_z", -0.00346, -0.05, 0.05), ParamSpec("pendulum_ixx", 6.20e-05, 1e-07, 1e-03, log_scale=True), ParamSpec("pendulum_iyy", 3.70e-05, 1e-07, 1e-03, log_scale=True), ParamSpec("pendulum_izz", 7.83e-05, 1e-07, 1e-03, log_scale=True), ParamSpec("pendulum_ixy", -6.93e-06, -1e-03, 1e-03), ParamSpec("pendulum_damping", 0.0001, 1e-6, 0.05, log_scale=True), ParamSpec("pendulum_frictionloss", 0.0001, 1e-6, 0.05, log_scale=True), ParamSpec("ctrl_limit", 0.588, 0.45, 0.70), ] # ── Combined parameter sets ────────────────────────────────────────── ROTARY_CARTPOLE_PARAMS: list[ParamSpec] = MOTOR_PARAMS + PENDULUM_PARAMS PARAM_SETS: dict[str, list[ParamSpec]] = { "full": ROTARY_CARTPOLE_PARAMS, "motor": MOTOR_PARAMS, "pendulum": PENDULUM_PARAMS, } def params_to_dict( values: np.ndarray, specs: list[ParamSpec] | None = None ) -> dict[str, float]: """Convert a flat parameter vector to a named dict.""" if specs is None: specs = ROTARY_CARTPOLE_PARAMS return {s.name: float(values[i]) for i, s in enumerate(specs)} def defaults_vector(specs: list[ParamSpec] | None = None) -> np.ndarray: """Return the default parameter vector (in natural space).""" if specs is None: specs = ROTARY_CARTPOLE_PARAMS return np.array([s.default for s in specs], dtype=np.float64) def defaults_dict(specs: list[ParamSpec] | None = None) -> dict[str, float]: """Return the default parameter dict (all 28 params).""" if specs is None: specs = ROTARY_CARTPOLE_PARAMS return {s.name: s.default for s in specs} def bounds_arrays( specs: list[ParamSpec] | None = None, ) -> tuple[np.ndarray, np.ndarray]: """Return (lower, upper) bound arrays.""" if specs is None: specs = ROTARY_CARTPOLE_PARAMS lo = np.array([s.lower for s in specs], dtype=np.float64) hi = np.array([s.upper for s in specs], dtype=np.float64) return lo, hi # ── MuJoCo model building with parameter overrides ────────────────── def _resolve_params(params: dict[str, float]) -> dict[str, float]: """Fill in defaults for any missing params.""" full = defaults_dict() full.update(params) return full def _build_model( robot_path: Path, params: dict[str, float], ) -> tuple[mujoco.MjModel, ActuatorConfig]: """Build a MuJoCo model with sysid overrides. Returns (model, actuator) — use ``actuator.transform_ctrl()`` and ``actuator.compute_motor_force()`` in the rollout loop. """ p = _resolve_params(params) robot_path = Path(robot_path).resolve() # ── Patch URDF inertial parameters to a temp file ──────────── robot_yaml = yaml.safe_load((robot_path / "robot.yaml").read_text()) urdf_path = robot_path / robot_yaml["urdf"] tree = ET.parse(urdf_path) patch_link_inertials(tree.getroot(), p) fd, tmp_urdf = tempfile.mkstemp( suffix=".urdf", prefix="_sysid_", dir=str(robot_path), ) os.close(fd) tmp_urdf_path = Path(tmp_urdf) tree.write(str(tmp_urdf_path), xml_declaration=True, encoding="unicode") # ── Build RobotConfig with full motor model ────────────────── act_cfg = robot_yaml["actuators"][0] actuator = ActuatorConfig( joint=act_cfg["joint"], type="motor", gear=(p["actuator_gear_pos"], p["actuator_gear_neg"]), ctrl_range=(act_cfg.get("ctrl_range", [-1.0, 1.0])[0], act_cfg.get("ctrl_range", [-1.0, 1.0])[1]), deadzone=(p["motor_deadzone_pos"], p["motor_deadzone_neg"]), damping=(p["motor_damping_pos"], p["motor_damping_neg"]), frictionloss=(p["motor_frictionloss_pos"], p["motor_frictionloss_neg"]), stribeck_friction_boost=p["stribeck_friction_boost"], stribeck_vel=p["stribeck_vel"], action_bias=p["action_bias"], filter_tau=p["actuator_filter_tau"], ) robot = RobotConfig( urdf_path=tmp_urdf_path, actuators=[actuator], joints={ "motor_joint": JointConfig( damping=0.0, armature=p["motor_armature"], frictionloss=0.0, ), "pendulum_joint": JointConfig( damping=p["pendulum_damping"], frictionloss=p["pendulum_frictionloss"], ), }, ) try: model = load_mujoco_model(robot) finally: tmp_urdf_path.unlink(missing_ok=True) return model, actuator def build_base_model( robot_path: str | Path, params: dict[str, float] | None = None, ) -> tuple[mujoco.MjModel, ActuatorConfig, dict[str, int], dict[str, int]]: """Build a base MuJoCo model once (with default or given params). Returns (model, actuator, body_ids, dof_ids) where body_ids/dof_ids map names to indices for fast in-place patching. """ if params is None: params = defaults_dict() model, actuator = _build_model(Path(robot_path).resolve(), params) body_ids = {} for name in ("arm", "pendulum"): body_ids[name] = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_BODY, name) dof_ids = {} for name in ("motor_joint", "pendulum_joint"): jnt_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_JOINT, name) dof_ids[name] = model.jnt_dofadr[jnt_id] return model, actuator, body_ids, dof_ids def _rotmat_to_quat(R: np.ndarray) -> np.ndarray: """Convert a 3×3 rotation matrix to a MuJoCo quaternion [w, x, y, z].""" quat = np.empty(4) mujoco.mju_mat2Quat(quat, R.flatten()) return quat def _patch_pendulum_inertia( model: mujoco.MjModel, body_id: int, params: dict[str, float], ) -> None: """Diagonalize pendulum inertia tensor and write to model.""" defs = defaults_dict() ixx = params.get("pendulum_ixx", defs["pendulum_ixx"]) iyy = params.get("pendulum_iyy", defs["pendulum_iyy"]) izz = params.get("pendulum_izz", defs["pendulum_izz"]) ixy = params.get("pendulum_ixy", defs.get("pendulum_ixy", 0.0)) # Build full 3×3 inertia tensor (ixz = iyz = 0 in our URDF). I = np.array([ [ixx, ixy, 0.0], [ixy, iyy, 0.0], [0.0, 0.0, izz], ]) # Eigendecompose — eigenvalues are principal moments, eigenvectors # define the rotation from link frame to principal frame. eigenvalues, eigenvectors = np.linalg.eigh(I) # eigh returns ascending order; MuJoCo wants them in the order that # the eigenvector matrix forms a proper rotation (det = +1). if np.linalg.det(eigenvectors) < 0: eigenvectors[:, 0] *= -1 # Clamp tiny/negative eigenvalues (numerical noise). eigenvalues = np.maximum(eigenvalues, 1e-10) # Enforce triangle inequality (required for valid rigid body inertia). # For principal moments sorted ascending (a <= b <= c): a + b >= c. # When violated, MuJoCo falls back to spherical inertia (trace/3). # We must match this to keep fast path consistent with URDF loading. s = np.sort(eigenvalues) if s[0] + s[1] < s[2]: sphere = np.sum(eigenvalues) / 3.0 eigenvalues[:] = sphere eigenvectors = np.eye(3) model.body_inertia[body_id] = eigenvalues model.body_iquat[body_id] = _rotmat_to_quat(eigenvectors) def patch_model( model: mujoco.MjModel, params: dict[str, float], body_ids: dict[str, int], dof_ids: dict[str, int], ) -> None: """Patch a cached MuJoCo model in-place with new sysid parameters. Much faster than _build_model: no XML parsing, no temp files, no model reload. Just direct array writes. """ p = _resolve_params(params) # ── Arm body ───────────────────────────────────────────────── arm_id = body_ids["arm"] model.body_mass[arm_id] = p["arm_mass"] for i, key in enumerate(("arm_com_x", "arm_com_y", "arm_com_z")): model.body_ipos[arm_id, i] = p[key] # ── Pendulum body ──────────────────────────────────────────── pend_id = body_ids["pendulum"] model.body_mass[pend_id] = p["pendulum_mass"] for i, key in enumerate(("pendulum_com_x", "pendulum_com_y", "pendulum_com_z")): model.body_ipos[pend_id, i] = p[key] _patch_pendulum_inertia(model, pend_id, p) # ── Joint dynamics ─────────────────────────────────────────── motor_dof = dof_ids["motor_joint"] model.dof_armature[motor_dof] = p["motor_armature"] pend_dof = dof_ids["pendulum_joint"] model.dof_damping[pend_dof] = p["pendulum_damping"] model.dof_frictionloss[pend_dof] = p["pendulum_frictionloss"] def _make_actuator(params: dict[str, float]) -> ActuatorConfig: """Build an ActuatorConfig from a params dict (for fast-path use).""" p = _resolve_params(params) return ActuatorConfig( joint="motor_joint", type="motor", gear=(p["actuator_gear_pos"], p["actuator_gear_neg"]), ctrl_range=(-1.0, 1.0), deadzone=(p["motor_deadzone_pos"], p["motor_deadzone_neg"]), damping=(p["motor_damping_pos"], p["motor_damping_neg"]), frictionloss=(p["motor_frictionloss_pos"], p["motor_frictionloss_neg"]), stribeck_friction_boost=p["stribeck_friction_boost"], stribeck_vel=p["stribeck_vel"], action_bias=p["action_bias"], filter_tau=p["actuator_filter_tau"], ) # ── Simulation rollout ─────────────────────────────────────────────── def rollout( robot_path: str | Path, params: dict[str, float], actions: np.ndarray, sim_dt: float = 0.002, substeps: int = 10, ) -> dict[str, np.ndarray]: """Replay recorded actions in MuJoCo with overridden parameters. Parameters ---------- robot_path : asset directory params : named parameter overrides (all motor + pendulum params) actions : (N,) normalised actions [-1, 1] from the recording sim_dt : MuJoCo physics timestep substeps : physics substeps per control step Returns ------- dict with keys: motor_angle, motor_vel, pendulum_angle, pendulum_vel """ robot_path = Path(robot_path).resolve() model, actuator = _build_model(robot_path, params) model.opt.timestep = sim_dt data = mujoco.MjData(model) mujoco.mj_resetData(model, data) n = len(actions) p = _resolve_params(params) ctrl_limit = p["ctrl_limit"] sim_motor_angle = np.zeros(n, dtype=np.float64) sim_motor_vel = np.zeros(n, dtype=np.float64) sim_pend_angle = np.zeros(n, dtype=np.float64) sim_pend_vel = np.zeros(n, dtype=np.float64) limits = ActuatorLimits(model) for i in range(n): action = max(-ctrl_limit, min(ctrl_limit, float(actions[i]))) ctrl = actuator.transform_ctrl(action) data.ctrl[0] = ctrl for _ in range(substeps): limits.enforce(model, data) data.qfrc_applied[0] = actuator.compute_motor_force(data.qvel[0], ctrl) mujoco.mj_step(model, data) sim_motor_angle[i] = data.qpos[0] sim_pend_angle[i] = data.qpos[1] sim_motor_vel[i] = data.qvel[0] sim_pend_vel[i] = data.qvel[1] return { "motor_angle": sim_motor_angle, "motor_vel": sim_motor_vel, "pendulum_angle": sim_pend_angle, "pendulum_vel": sim_pend_vel, } def windowed_rollout( robot_path: str | Path, params: dict[str, float], recording: dict[str, np.ndarray], window_duration: float = 0.5, sim_dt: float = 0.002, substeps: int = 10, ) -> dict[str, np.ndarray | float]: """Multiple-shooting rollout — split recording into short windows. Parameters ---------- robot_path : asset directory params : named parameter overrides (all motor + pendulum params) recording : dict with keys time, action, motor_angle, motor_vel, pendulum_angle, pendulum_vel (all 1D arrays of length N) window_duration : length of each shooting window in seconds sim_dt : MuJoCo physics timestep substeps : physics substeps per control step Returns ------- dict with: motor_angle, motor_vel, pendulum_angle, pendulum_vel — (N,) arrays n_windows — number of windows used """ robot_path = Path(robot_path).resolve() model, actuator = _build_model(robot_path, params) model.opt.timestep = sim_dt data = mujoco.MjData(model) times = recording["time"] actions = recording["action"] real_motor = recording["motor_angle"] real_motor_vel = recording["motor_vel"] real_pend = recording["pendulum_angle"] real_pend_vel = recording["pendulum_vel"] n = len(actions) sim_motor_angle = np.zeros(n, dtype=np.float64) sim_motor_vel = np.zeros(n, dtype=np.float64) sim_pend_angle = np.zeros(n, dtype=np.float64) sim_pend_vel = np.zeros(n, dtype=np.float64) limits = ActuatorLimits(model) t0 = times[0] t_end = times[-1] window_starts: list[int] = [] current_t = t0 while current_t < t_end: idx = int(np.searchsorted(times, current_t)) idx = min(idx, n - 1) window_starts.append(idx) current_t += window_duration p = _resolve_params(params) ctrl_limit = p["ctrl_limit"] n_windows = len(window_starts) for w, w_start in enumerate(window_starts): w_end = window_starts[w + 1] if w + 1 < n_windows else n mujoco.mj_resetData(model, data) data.qpos[0] = real_motor[w_start] data.qpos[1] = real_pend[w_start] data.qvel[0] = real_motor_vel[w_start] data.qvel[1] = real_pend_vel[w_start] data.ctrl[:] = 0.0 mujoco.mj_forward(model, data) for i in range(w_start, w_end): action = max(-ctrl_limit, min(ctrl_limit, float(actions[i]))) ctrl = actuator.transform_ctrl(action) data.ctrl[0] = ctrl for _ in range(substeps): limits.enforce(model, data) data.qfrc_applied[0] = actuator.compute_motor_force(data.qvel[0], ctrl) mujoco.mj_step(model, data) sim_motor_angle[i] = data.qpos[0] sim_pend_angle[i] = data.qpos[1] sim_motor_vel[i] = data.qvel[0] sim_pend_vel[i] = data.qvel[1] return { "motor_angle": sim_motor_angle, "motor_vel": sim_motor_vel, "pendulum_angle": sim_pend_angle, "pendulum_vel": sim_pend_vel, "n_windows": n_windows, }