From 58809977867cceb2f2b18bad0fa0e1d45f1bff37 Mon Sep 17 00:00:00 2001 From: Victor Mylle Date: Sat, 28 Mar 2026 16:48:22 +0100 Subject: [PATCH] refactor: merge motor sysid into unified sysid module Unified the two separate sysid codepaths (motor-only and full-system) into a single module that optimizes all 28 parameters jointly: - 13 motor params (asymmetric gear, damping, friction, deadzone, Stribeck boost, action bias, filter tau, armature, ctrl_limit) - 15 pendulum/arm params (mass, CoM, inertia, joint dynamics) Key changes: - Added stribeck_friction_boost, stribeck_vel, action_bias to ActuatorConfig (robot.py) and MJX runner - Created shared src/sysid/preprocess.py (SG velocity recomputation) - Rewrote src/sysid/rollout.py with unified MOTOR_PARAMS + PENDULUM_PARAMS spec and PARAM_SETS dict for flexible subset optimization - Updated optimize.py, export.py, visualize.py to use unified params (removed all LOCKED_MOTOR_PARAMS references) - Removed src/sysid/motor/ module and scripts/motor_sysid.py Net: -1383 lines, zero code duplication between motor and full-system sysid. --- assets/rotary_cartpole/robot.yaml | 24 +- configs/runner/mujoco.yaml | 9 + scripts/motor_sysid.py | 64 ----- src/core/robot.py | 29 +- src/runners/mjx.py | 22 +- src/runners/mujoco.py | 61 +++- src/sysid/capture.py | 10 +- src/sysid/export.py | 78 ++--- src/sysid/motor/__init__.py | 5 - src/sysid/motor/capture.py | 364 ----------------------- src/sysid/motor/export.py | 186 ------------ src/sysid/motor/optimize.py | 367 ------------------------ src/sysid/motor/preprocess.py | 114 -------- src/sysid/motor/rollout.py | 460 ------------------------------ src/sysid/motor/visualize.py | 204 ------------- src/sysid/optimize.py | 262 ++++++++++------- src/sysid/preprocess.py | 158 ++++++++++ src/sysid/rollout.py | 316 ++++++++++++-------- src/sysid/visualize.py | 11 +- tests/test_sim2real.py | 39 +++ 20 files changed, 700 insertions(+), 2083 deletions(-) delete mode 100644 scripts/motor_sysid.py delete mode 100644 src/sysid/motor/__init__.py delete mode 100644 src/sysid/motor/capture.py delete mode 100644 src/sysid/motor/export.py delete mode 100644 src/sysid/motor/optimize.py delete mode 100644 src/sysid/motor/preprocess.py delete mode 100644 src/sysid/motor/rollout.py delete mode 100644 src/sysid/motor/visualize.py create mode 100644 src/sysid/preprocess.py create mode 100644 tests/test_sim2real.py diff --git a/assets/rotary_cartpole/robot.yaml b/assets/rotary_cartpole/robot.yaml index 9336fc5..02d27ab 100644 --- a/assets/rotary_cartpole/robot.yaml +++ b/assets/rotary_cartpole/robot.yaml @@ -1,21 +1,21 @@ -# Tuned robot config — generated by src.sysid.optimize +# Tuned robot config — generated by src.sysid (2026-03-28) +# Motor params: motor-only sysid cost 0.2117 +# Full-system params: sysid cost 1.216 (0.2s windows, amplitude 200) urdf: rotary_cartpole.urdf actuators: - joint: motor_joint type: motor - gear: [0.424182, 0.425031] # torque constant [pos, neg] (motor sysid) - ctrl_range: [-0.592, 0.592] # effective control bound (sysid-tuned) - deadzone: [0.141291, 0.078015] # L298N min |ctrl| for torque [pos, neg] - damping: [0.002027, 0.014665] # viscous damping [pos, neg] - frictionloss: [0.057328, 0.053355] # Coulomb friction [pos, neg] - filter_tau: 0.005035 # 1st-order actuator filter (motor sysid) - viscous_quadratic: 0.000285 # velocity² drag - back_emf_gain: 0.006758 # back-EMF torque reduction + gear: [0.371194, 0.428143] # torque constant [pos, neg] (motor sysid) + ctrl_range: [-0.611479, 0.611479] # effective control bound (full sysid) + deadzone: [0.141820, 0.031454] # L298N min |ctrl| for torque [pos, neg] + damping: [0.001384, 0.005196] # viscous damping [pos, neg] + frictionloss: [0.036744, 0.069082] # Coulomb friction [pos, neg] + filter_tau: 0.022301 # 1st-order actuator filter (motor sysid) joints: motor_joint: - armature: 0.002773 # reflected rotor inertia (motor sysid) + armature: 0.002753 # reflected rotor inertia (motor sysid) frictionloss: 0.0 # handled by motor model via qfrc_applied pendulum_joint: - damping: 0.000119 - frictionloss: 1.0e-05 + damping: 1.3e-06 # full sysid (was 0.000119) + frictionloss: 3.7e-06 # full sysid (was 1.0e-05) diff --git a/configs/runner/mujoco.yaml b/configs/runner/mujoco.yaml index f99ff9b..67702fc 100644 --- a/configs/runner/mujoco.yaml +++ b/configs/runner/mujoco.yaml @@ -2,3 +2,12 @@ num_envs: 64 device: auto # auto = cuda if available, else cpu dt: 0.002 substeps: 10 + +# ── Sim2real: domain randomization ─────────────────────────────── +domain_rand: + mass_frac: 0.15 # ±15% body mass randomization + friction_frac: 0.3 # ±30% joint friction + damping_frac: 0.3 # ±30% joint damping + armature_frac: 0.2 # ±20% reflected rotor inertia + gear_frac: 0.15 # ±15% actuator gear ratio + com_offset: 0.005 # ±5mm center-of-mass shift diff --git a/scripts/motor_sysid.py b/scripts/motor_sysid.py deleted file mode 100644 index 8ef354d..0000000 --- a/scripts/motor_sysid.py +++ /dev/null @@ -1,64 +0,0 @@ -"""Unified CLI for motor-only system identification. - -Usage: - python scripts/motor_sysid.py capture --duration 20 - python scripts/motor_sysid.py optimize --recording assets/motor/recordings/.npz - python scripts/motor_sysid.py visualize --recording assets/motor/recordings/.npz - python scripts/motor_sysid.py export --result assets/motor/motor_sysid_result.json -""" - -from __future__ import annotations - -import sys -from pathlib import Path - -# Ensure project root is on sys.path -_PROJECT_ROOT = str(Path(__file__).resolve().parent.parent) -if _PROJECT_ROOT not in sys.path: - sys.path.insert(0, _PROJECT_ROOT) - - -def main() -> None: - if len(sys.argv) < 2 or sys.argv[1] in ("-h", "--help"): - print( - "Motor System Identification\n" - "===========================\n" - "Usage: python scripts/motor_sysid.py [options]\n" - "\n" - "Commands:\n" - " capture Record motor trajectory under PRBS excitation\n" - " optimize Run CMA-ES to fit motor parameters\n" - " visualize Plot real vs simulated motor response\n" - " export Write tuned MJCF + robot.yaml files\n" - "\n" - "Workflow:\n" - " 1. Flash sysid firmware to ESP32 (motor-only, no limits)\n" - " 2. python scripts/motor_sysid.py capture --duration 20\n" - " 3. python scripts/motor_sysid.py optimize --recording .npz\n" - " 4. python scripts/motor_sysid.py visualize --recording .npz\n" - "\n" - "Run ' --help' for command-specific options." - ) - sys.exit(0) - - command = sys.argv[1] - sys.argv = [f"motor_sysid {command}"] + sys.argv[2:] - - if command == "capture": - from src.sysid.motor.capture import main as cmd_main - elif command == "optimize": - from src.sysid.motor.optimize import main as cmd_main - elif command == "visualize": - from src.sysid.motor.visualize import main as cmd_main - elif command == "export": - from src.sysid.motor.export import main as cmd_main - else: - print(f"Unknown command: {command}") - print("Available commands: capture, optimize, visualize, export") - sys.exit(1) - - cmd_main() - - -if __name__ == "__main__": - main() diff --git a/src/core/robot.py b/src/core/robot.py index c52dacd..f7c16bc 100644 --- a/src/core/robot.py +++ b/src/core/robot.py @@ -46,11 +46,12 @@ class ActuatorConfig: deadzone: tuple[float, float] = (0.0, 0.0) # min |ctrl| for torque (pos, neg) damping: tuple[float, float] = (0.0, 0.0) # viscous damping (pos, neg) frictionloss: tuple[float, float] = (0.0, 0.0) # Coulomb friction (pos, neg) + stribeck_friction_boost: float = 0.0 # extra friction at low speed (fraction) + stribeck_vel: float = 2.0 # Stribeck velocity scale (rad/s) + action_bias: float = 0.0 # constant bias on action (H-bridge asymmetry) kp: float = 0.0 # proportional gain (position / velocity actuators) kv: float = 0.0 # derivative gain (position actuators) filter_tau: float = 0.0 # 1st-order filter time constant (s); 0 = no filter - viscous_quadratic: float = 0.0 # velocity² drag coefficient - back_emf_gain: float = 0.0 # back-EMF torque reduction @property def gear_avg(self) -> float: @@ -64,12 +65,15 @@ class ActuatorConfig: or self.deadzone != (0.0, 0.0) or self.damping != (0.0, 0.0) or self.frictionloss != (0.0, 0.0) - or self.viscous_quadratic > 0 - or self.back_emf_gain > 0 + or self.stribeck_friction_boost > 0 + or self.action_bias != 0.0 ) def transform_ctrl(self, ctrl: float) -> float: - """Apply asymmetric deadzone and gear compensation to a scalar ctrl.""" + """Apply bias, asymmetric deadzone, and gear compensation.""" + # Action bias (H-bridge asymmetry) + ctrl = ctrl + self.action_bias + # Deadzone dz_pos, dz_neg = self.deadzone if ctrl >= 0 and ctrl < dz_pos: @@ -86,27 +90,22 @@ class ActuatorConfig: return ctrl def compute_motor_force(self, vel: float, ctrl: float) -> float: - """Asymmetric friction, damping, drag, back-EMF → applied torque.""" + """Asymmetric friction + Stribeck + damping → applied torque.""" torque = 0.0 - # Coulomb friction (direction-dependent) + # Coulomb friction (direction-dependent + Stribeck boost) fl_pos, fl_neg = self.frictionloss if abs(vel) > 1e-6: fl = fl_pos if vel > 0 else fl_neg + if self.stribeck_friction_boost > 0 and self.stribeck_vel > 0: + fl = fl * (1.0 + self.stribeck_friction_boost + * math.exp(-abs(vel) / self.stribeck_vel)) torque -= math.copysign(fl, vel) # Viscous damping (direction-dependent) damp = self.damping[0] if vel > 0 else self.damping[1] torque -= damp * vel - # Quadratic velocity drag - if self.viscous_quadratic > 0: - torque -= self.viscous_quadratic * vel * abs(vel) - - # Back-EMF torque reduction - if self.back_emf_gain > 0 and abs(ctrl) > 1e-6: - torque -= self.back_emf_gain * vel * math.copysign(1.0, ctrl) - return max(-10.0, min(10.0, torque)) def transform_action(self, action): diff --git a/src/runners/mjx.py b/src/runners/mjx.py index b67ace9..456d533 100644 --- a/src/runners/mjx.py +++ b/src/runners/mjx.py @@ -155,8 +155,9 @@ class MJXRunner(BaseRunner[MJXRunnerConfig]): _fl_neg = jnp.array([a.frictionloss[1] for a in acts]) _damp_pos = jnp.array([a.damping[0] for a in acts]) _damp_neg = jnp.array([a.damping[1] for a in acts]) - _visc_quad = jnp.array([a.viscous_quadratic for a in acts]) - _back_emf = jnp.array([a.back_emf_gain for a in acts]) + _stribeck_boost = jnp.array([a.stribeck_friction_boost for a in acts]) + _stribeck_vel = jnp.array([a.stribeck_vel for a in acts]) + _action_bias = jnp.array([a.action_bias for a in acts]) # ── Batched step (N substeps per call) ────────────────────── @jax.jit @@ -169,8 +170,8 @@ class MJXRunner(BaseRunner[MJXRunnerConfig]): ctrl = jnp.where(at_hi | at_lo, 0.0, ctrl) if _has_motor: - # Deadzone + asymmetric gear compensation - mc = ctrl[:, _ctrl_ids] + # Action bias + Deadzone + asymmetric gear compensation + mc = ctrl[:, _ctrl_ids] + _action_bias mc = jnp.where((mc >= 0) & (mc < _dz_pos), 0.0, mc) mc = jnp.where((mc < 0) & (mc > -_dz_neg), 0.0, mc) gear_dir = jnp.where(mc >= 0, _gear_pos, _gear_neg) @@ -184,21 +185,18 @@ class MJXRunner(BaseRunner[MJXRunnerConfig]): vel = d.qvel[:, _qvel_ids] mc = d.ctrl[:, _ctrl_ids] - # Coulomb friction (direction-dependent) + # Coulomb friction (direction-dependent + Stribeck) fl = jnp.where(vel > 0, _fl_pos, _fl_neg) + stribeck_mult = 1.0 + _stribeck_boost * jnp.exp( + -jnp.abs(vel) / _stribeck_vel + ) + fl = fl * stribeck_mult torque = -jnp.where( jnp.abs(vel) > 1e-6, jnp.sign(vel) * fl, 0.0, ) # Viscous damping (direction-dependent) damp = jnp.where(vel > 0, _damp_pos, _damp_neg) torque = torque - damp * vel - # Quadratic velocity drag - torque = torque - _visc_quad * vel * jnp.abs(vel) - # Back-EMF torque reduction - bemf = _back_emf * vel * jnp.sign(mc) - torque = torque - jnp.where( - jnp.abs(mc) > 1e-6, bemf, 0.0, - ) torque = jnp.clip(torque, -10.0, 10.0) d = d.replace( qfrc_applied=d.qfrc_applied.at[:, _qvel_ids].set(torque), diff --git a/src/runners/mujoco.py b/src/runners/mujoco.py index 811f057..0ebd4d6 100644 --- a/src/runners/mujoco.py +++ b/src/runners/mujoco.py @@ -13,12 +13,34 @@ from src.core.robot import RobotConfig from src.core.runner import BaseRunner, BaseRunnerConfig +@dataclasses.dataclass +class DomainRandConfig: + """Per-reset randomization of MuJoCo model parameters. + + Each field is a fractional range: the nominal value is multiplied by + ``uniform(1 - frac, 1 + frac)``. Set to 0.0 to disable. + """ + mass_frac: float = 0.0 # body masses + friction_frac: float = 0.0 # joint frictionloss + damping_frac: float = 0.0 # joint damping + armature_frac: float = 0.0 # joint armature (reflected inertia) + gear_frac: float = 0.0 # actuator gear ratio + com_offset: float = 0.0 # center-of-mass shift (metres) + + @dataclasses.dataclass class MuJoCoRunnerConfig(BaseRunnerConfig): num_envs: int = 16 device: str = "cpu" dt: float = 0.002 substeps: int = 10 + # ── Sim2real ───────────────────────────────────────────────── + domain_rand: DomainRandConfig = dataclasses.field(default_factory=DomainRandConfig) + + def __post_init__(self) -> None: + # Hydra passes domain_rand as a dict — convert to dataclass. + if isinstance(self.domain_rand, dict): + self.domain_rand = DomainRandConfig(**self.domain_rand) class ActuatorLimits: @@ -203,8 +225,6 @@ def load_mujoco_model(robot: RobotConfig) -> mujoco.MjModel: class MuJoCoRunner(BaseRunner[MuJoCoRunnerConfig]): - def __init__(self, env: BaseEnv, config: MuJoCoRunnerConfig): - super().__init__(env, config) @property def num_envs(self) -> int: @@ -233,6 +253,15 @@ class MuJoCoRunner(BaseRunner[MuJoCoRunnerConfig]): qvel_idx = self._model.jnt_dofadr[jnt_id] self._motor_actuators.append((act, qvel_idx)) + # ── Domain randomization: store nominal values ─────────── + self._nominal_mass = self._model.body_mass.copy() + self._nominal_inertia = self._model.body_inertia.copy() + self._nominal_ipos = self._model.body_ipos.copy() + self._nominal_damping = self._model.dof_damping.copy() + self._nominal_armature = self._model.dof_armature.copy() + self._nominal_frictionloss = self._model.dof_frictionloss.copy() + self._nominal_gear = self._model.actuator_gear.copy() + def _sim_step(self, actions: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]: actions_np: np.ndarray = actions.cpu().numpy() @@ -272,6 +301,34 @@ class MuJoCoRunner(BaseRunner[MuJoCoRunnerConfig]): qpos_batch = np.zeros((n, self._nq), dtype=np.float32) qvel_batch = np.zeros((n, self._nv), dtype=np.float32) + # ── Domain randomization ───────────────────────────────── + dr = self.config.domain_rand + if dr.mass_frac > 0: + scale = np.random.uniform(1 - dr.mass_frac, 1 + dr.mass_frac, + size=self._nominal_mass.shape) + self._model.body_mass[:] = self._nominal_mass * scale + self._model.body_inertia[:] = self._nominal_inertia * scale[:, None] + if dr.com_offset > 0: + offset = np.random.uniform(-dr.com_offset, dr.com_offset, + size=self._nominal_ipos.shape) + self._model.body_ipos[:] = self._nominal_ipos + offset + if dr.damping_frac > 0: + scale = np.random.uniform(1 - dr.damping_frac, 1 + dr.damping_frac, + size=self._nominal_damping.shape) + self._model.dof_damping[:] = np.maximum(0, self._nominal_damping * scale) + if dr.armature_frac > 0: + scale = np.random.uniform(1 - dr.armature_frac, 1 + dr.armature_frac, + size=self._nominal_armature.shape) + self._model.dof_armature[:] = np.maximum(0, self._nominal_armature * scale) + if dr.friction_frac > 0: + scale = np.random.uniform(1 - dr.friction_frac, 1 + dr.friction_frac, + size=self._nominal_frictionloss.shape) + self._model.dof_frictionloss[:] = np.maximum(0, self._nominal_frictionloss * scale) + if dr.gear_frac > 0: + scale = np.random.uniform(1 - dr.gear_frac, 1 + dr.gear_frac, + size=self._nominal_gear.shape) + self._model.actuator_gear[:] = self._nominal_gear * scale + for i, env_id in enumerate(ids): data = self._data[env_id] mujoco.mj_resetData(self._model, data) diff --git a/src/sysid/capture.py b/src/sysid/capture.py index f202c74..3718d83 100644 --- a/src/sysid/capture.py +++ b/src/sysid/capture.py @@ -243,8 +243,9 @@ def capture( idx = 0 pwm = 0 last_esp_ms = -1 # firmware timestamp of last recorded sample + esp_ms_origin: int | None = None # first firmware timestamp no_data_count = 0 # consecutive timeouts with no data - t0 = time.monotonic() + t0 = time.monotonic() # host clock for duration check only try: while True: # Block until the firmware sends the next state line (~20 ms). @@ -276,7 +277,10 @@ def capture( continue last_esp_ms = esp_ms - elapsed = time.monotonic() - t0 + # Use firmware clock for time axis (avoids host serial jitter). + if esp_ms_origin is None: + esp_ms_origin = esp_ms + elapsed = (esp_ms - esp_ms_origin) / 1000.0 if elapsed >= duration: break @@ -396,7 +400,7 @@ def main() -> None: ) parser.add_argument( "--amplitude", type=int, default=150, - help="Max PWM magnitude (should not exceed firmware MAX_MOTOR_SPEED=150)", + help="Max PWM magnitude for excitation (0-255)", ) parser.add_argument( "--hold-min-ms", type=int, default=50, help="Min hold time (ms)" diff --git a/src/sysid/export.py b/src/sysid/export.py index ae33855..b229750 100644 --- a/src/sysid/export.py +++ b/src/sysid/export.py @@ -22,16 +22,13 @@ log = structlog.get_logger() def export_tuned_files( robot_path: str | Path, params: dict[str, float], - motor_params: dict[str, float] | None = None, ) -> tuple[Path, Path]: """Write tuned URDF and robot.yaml files. Parameters ---------- robot_path : robot asset directory (contains robot.yaml + *.urdf) - params : dict of parameter name → tuned value (from optimizer) - motor_params : locked motor sysid params (asymmetric model). - If provided, motor joint parameters come from here. + params : dict of parameter name → tuned value (unified, all 28 params) Returns ------- @@ -66,39 +63,34 @@ def export_tuned_files( # Update actuator parameters — full asymmetric motor model. if tuned_cfg.get("actuators") and len(tuned_cfg["actuators"]) > 0: act = tuned_cfg["actuators"][0] - if motor_params: - # Asymmetric gear, damping, deadzone, frictionloss as [pos, neg]. - gear_pos = motor_params.get("actuator_gear_pos", 0.424) - gear_neg = motor_params.get("actuator_gear_neg", 0.425) - act["gear"] = [round(gear_pos, 6), round(gear_neg, 6)] - damp_pos = motor_params.get("motor_damping_pos", 0.002) - damp_neg = motor_params.get("motor_damping_neg", 0.015) - act["damping"] = [round(damp_pos, 6), round(damp_neg, 6)] + # Asymmetric gear, damping, deadzone, frictionloss as [pos, neg]. + act["gear"] = [ + round(params.get("actuator_gear_pos", 0.424), 6), + round(params.get("actuator_gear_neg", 0.425), 6), + ] + act["damping"] = [ + round(params.get("motor_damping_pos", 0.002), 6), + round(params.get("motor_damping_neg", 0.015), 6), + ] + act["deadzone"] = [ + round(params.get("motor_deadzone_pos", 0.141), 6), + round(params.get("motor_deadzone_neg", 0.078), 6), + ] + act["frictionloss"] = [ + round(params.get("motor_frictionloss_pos", 0.057), 6), + round(params.get("motor_frictionloss_neg", 0.053), 6), + ] + if "actuator_filter_tau" in params: + act["filter_tau"] = round(params["actuator_filter_tau"], 6) - dz_pos = motor_params.get("motor_deadzone_pos", 0.141) - dz_neg = motor_params.get("motor_deadzone_neg", 0.078) - act["deadzone"] = [round(dz_pos, 6), round(dz_neg, 6)] - - fl_pos = motor_params.get("motor_frictionloss_pos", 0.057) - fl_neg = motor_params.get("motor_frictionloss_neg", 0.053) - act["frictionloss"] = [round(fl_pos, 6), round(fl_neg, 6)] - - if "actuator_filter_tau" in motor_params: - act["filter_tau"] = round(motor_params["actuator_filter_tau"], 6) - if "viscous_quadratic" in motor_params: - act["viscous_quadratic"] = round(motor_params["viscous_quadratic"], 6) - if "back_emf_gain" in motor_params: - act["back_emf_gain"] = round(motor_params["back_emf_gain"], 6) - else: - if "actuator_gear" in params: - act["gear"] = round(params["actuator_gear"], 6) - if "actuator_filter_tau" in params: - act["filter_tau"] = round(params["actuator_filter_tau"], 6) - if "motor_damping" in params: - act["damping"] = round(params["motor_damping"], 6) - if "motor_deadzone" in params: - act["deadzone"] = round(params["motor_deadzone"], 6) + # Stribeck friction and action bias. + if "stribeck_friction_boost" in params: + act["stribeck_friction_boost"] = round(params["stribeck_friction_boost"], 6) + if "stribeck_vel" in params: + act["stribeck_vel"] = round(params["stribeck_vel"], 6) + if "action_bias" in params: + act["action_bias"] = round(params["action_bias"], 6) # ctrl_range from ctrl_limit parameter. if "ctrl_limit" in params: @@ -112,15 +104,10 @@ def export_tuned_files( if "motor_joint" not in tuned_cfg["joints"]: tuned_cfg["joints"]["motor_joint"] = {} mj = tuned_cfg["joints"]["motor_joint"] - if motor_params: - mj["armature"] = round(motor_params.get("motor_armature", 0.00277), 6) - # Frictionloss/damping = 0 in MuJoCo (motor model handles via qfrc_applied). - mj["frictionloss"] = 0.0 - else: - if "motor_armature" in params: - mj["armature"] = round(params["motor_armature"], 6) - if "motor_frictionloss" in params: - mj["frictionloss"] = round(params["motor_frictionloss"], 6) + if "motor_armature" in params: + mj["armature"] = round(params["motor_armature"], 6) + # Frictionloss/damping = 0 in MuJoCo (motor model handles via qfrc_applied). + mj["frictionloss"] = 0.0 if "pendulum_joint" not in tuned_cfg["joints"]: tuned_cfg["joints"]["pendulum_joint"] = {} @@ -154,8 +141,6 @@ def main() -> None: import argparse import json - from src.sysid.rollout import LOCKED_MOTOR_PARAMS - parser = argparse.ArgumentParser( description="Export tuned URDF + robot.yaml from sysid results." ) @@ -183,7 +168,6 @@ def main() -> None: export_tuned_files( robot_path=args.robot_path, params=result["best_params"], - motor_params=result.get("motor_params", dict(LOCKED_MOTOR_PARAMS)), ) diff --git a/src/sysid/motor/__init__.py b/src/sysid/motor/__init__.py deleted file mode 100644 index 1c05f16..0000000 --- a/src/sysid/motor/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Motor-only system identification subpackage. - -Identifies JGB37-520 DC motor dynamics (no pendulum, no limits) -so the MuJoCo simulation matches the real hardware response. -""" diff --git a/src/sysid/motor/capture.py b/src/sysid/motor/capture.py deleted file mode 100644 index f7eb7ea..0000000 --- a/src/sysid/motor/capture.py +++ /dev/null @@ -1,364 +0,0 @@ -"""Capture a motor-only trajectory under random excitation (PRBS-style). - -Connects to the ESP32 running the simplified sysid firmware (no pendulum, -no limits), sends random PWM commands, and records motor angle + velocity -at ~ 50 Hz. - -Firmware serial protocol (115200 baud): - Commands: M\\n R\\n S\\n G\\n H\\n P\\n - State: S,,,,,\\n - -Usage: - python -m src.sysid.motor.capture --duration 20 - python -m src.sysid.motor.capture --duration 30 --amplitude 200 -""" - -from __future__ import annotations - -import argparse -import math -import random -import threading -import time -from datetime import datetime -from pathlib import Path -from typing import Any - -import numpy as np -import structlog -import yaml - -log = structlog.get_logger() - -# ── Default asset path ─────────────────────────────────────────────── -_DEFAULT_ASSET = "assets/motor" - - -# ── Serial protocol helpers ────────────────────────────────────────── - - -def _parse_state_line(line: str) -> dict[str, Any] | None: - """Parse an ``S,…`` state line from the motor sysid firmware. - - Format: S,,,,, - """ - if not line.startswith("S,"): - return None - parts = line.split(",") - if len(parts) < 6: - return None - try: - return { - "timestamp_ms": int(parts[1]), - "encoder_count": int(parts[2]), - "rpm": float(parts[3]), - "applied_speed": int(parts[4]), - "enc_vel_cps": float(parts[5]), - } - except (ValueError, IndexError): - return None - - -# ── Background serial reader ───────────────────────────────────────── - - -class _SerialReader: - """Minimal background reader for the ESP32 serial stream.""" - - def __init__(self, port: str, baud: int = 115200): - import serial as _serial - - self._serial_mod = _serial - self.ser = _serial.Serial(port, baud, timeout=0.05) - time.sleep(2) # Wait for ESP32 boot - self.ser.reset_input_buffer() - - self._latest: dict[str, Any] = {} - self._seq: int = 0 - self._lock = threading.Lock() - self._cond = threading.Condition(self._lock) - self._running = True - - self._thread = threading.Thread(target=self._reader_loop, daemon=True) - self._thread.start() - - def _reader_loop(self) -> None: - while self._running: - try: - if self.ser.in_waiting: - line = ( - self.ser.readline() - .decode("utf-8", errors="ignore") - .strip() - ) - parsed = _parse_state_line(line) - if parsed is not None: - with self._cond: - self._latest = parsed - self._seq += 1 - self._cond.notify_all() - elif line and not line.startswith("S,"): - # Log non-state lines (READY, PONG, WARN, etc.) - log.debug("serial_info", line=line) - else: - time.sleep(0.001) - except (OSError, self._serial_mod.SerialException): - log.critical("serial_lost") - break - - def send(self, cmd: str) -> None: - try: - self.ser.write(f"{cmd}\n".encode()) - except (OSError, self._serial_mod.SerialException): - log.critical("serial_send_failed", cmd=cmd) - - def read_blocking(self, timeout: float = 0.1) -> dict[str, Any]: - """Wait until a *new* state line arrives, then return it.""" - with self._cond: - seq_before = self._seq - if not self._cond.wait_for( - lambda: self._seq > seq_before, timeout=timeout - ): - return {} - return dict(self._latest) - - def close(self) -> None: - self._running = False - self.send("H") - self.send("M0") - time.sleep(0.1) - self._thread.join(timeout=1.0) - self.ser.close() - - -# ── PRBS excitation signal ─────────────────────────────────────────── - - -class _PRBSExcitation: - """Random hold-value excitation with configurable amplitude and hold time.""" - - def __init__( - self, - amplitude: int = 200, - hold_min_ms: int = 50, - hold_max_ms: int = 400, - ): - self.amplitude = amplitude - self.hold_min_ms = hold_min_ms - self.hold_max_ms = hold_max_ms - self._current: int = 0 - self._switch_time: float = 0.0 - self._new_value() - - def _new_value(self) -> None: - self._current = random.randint(-self.amplitude, self.amplitude) - hold_ms = random.randint(self.hold_min_ms, self.hold_max_ms) - self._switch_time = time.monotonic() + hold_ms / 1000.0 - - def __call__(self) -> int: - if time.monotonic() >= self._switch_time: - self._new_value() - return self._current - - -# ── Main capture loop ──────────────────────────────────────────────── - - -def capture( - asset_path: str | Path = _DEFAULT_ASSET, - port: str = "/dev/cu.usbserial-0001", - baud: int = 115200, - duration: float = 20.0, - amplitude: int = 200, - hold_min_ms: int = 50, - hold_max_ms: int = 400, - dt: float = 0.02, -) -> Path: - """Run motor-only capture and return the path to the saved .npz file. - - Stream-driven: blocks on each firmware state line (~50 Hz), - sends next motor command immediately, records both. - No time.sleep pacing — locked to firmware clock. - - The recording stores: - - time: wall-clock seconds since start - - action: normalised action = applied_speed / 255 - - motor_angle: shaft angle in radians (from encoder) - - motor_vel: shaft velocity in rad/s (from encoder velocity) - """ - asset_path = Path(asset_path).resolve() - - # Load hardware config for encoder conversion. - hw_yaml = asset_path / "hardware.yaml" - if not hw_yaml.exists(): - raise FileNotFoundError(f"hardware.yaml not found in {asset_path}") - raw_hw = yaml.safe_load(hw_yaml.read_text()) - ppr = raw_hw.get("encoder", {}).get("ppr", 11) - gear_ratio = raw_hw.get("encoder", {}).get("gear_ratio", 30.0) - counts_per_rev: float = ppr * gear_ratio * 4.0 - max_pwm = raw_hw.get("motor", {}).get("max_pwm", 255) - - log.info( - "hardware_config", - ppr=ppr, - gear_ratio=gear_ratio, - counts_per_rev=counts_per_rev, - max_pwm=max_pwm, - ) - - # Connect. - reader = _SerialReader(port, baud) - excitation = _PRBSExcitation(amplitude, hold_min_ms, hold_max_ms) - - # Prepare recording buffers. - max_samples = int(duration / dt) + 500 - rec_time = np.zeros(max_samples, dtype=np.float64) - rec_action = np.zeros(max_samples, dtype=np.float64) - rec_motor_angle = np.zeros(max_samples, dtype=np.float64) - rec_motor_vel = np.zeros(max_samples, dtype=np.float64) - - # Reset encoder to zero. - reader.send("R") - time.sleep(0.1) - - # Start streaming. - reader.send("G") - time.sleep(0.1) - - log.info( - "capture_starting", - port=port, - duration=duration, - amplitude=amplitude, - hold_range_ms=f"{hold_min_ms}–{hold_max_ms}", - ) - - idx = 0 - pwm = 0 - last_esp_ms = -1 - t0 = time.monotonic() - - try: - while True: - state = reader.read_blocking(timeout=0.1) - if not state: - continue - - # Deduplicate by firmware timestamp. - esp_ms = state.get("timestamp_ms", 0) - if esp_ms == last_esp_ms: - continue - last_esp_ms = esp_ms - - elapsed = time.monotonic() - t0 - if elapsed >= duration: - break - - # Generate next excitation PWM. - pwm = excitation() - - # Send command. - reader.send(f"M{pwm}") - - # Convert encoder to angle/velocity. - enc = state.get("encoder_count", 0) - motor_angle = enc / counts_per_rev * 2.0 * math.pi - motor_vel = ( - state.get("enc_vel_cps", 0.0) / counts_per_rev * 2.0 * math.pi - ) - - # Use firmware-applied speed for the action. - applied = state.get("applied_speed", 0) - action_norm = applied / 255.0 - - if idx < max_samples: - rec_time[idx] = elapsed - rec_action[idx] = action_norm - rec_motor_angle[idx] = motor_angle - rec_motor_vel[idx] = motor_vel - idx += 1 - else: - break - - if idx % 50 == 0: - log.info( - "capture_progress", - elapsed=f"{elapsed:.1f}/{duration:.0f}s", - samples=idx, - pwm=pwm, - angle_deg=f"{math.degrees(motor_angle):.1f}", - vel_rps=f"{motor_vel / (2 * math.pi):.1f}", - ) - - finally: - reader.send("M0") - reader.close() - - # Trim. - rec_time = rec_time[:idx] - rec_action = rec_action[:idx] - rec_motor_angle = rec_motor_angle[:idx] - rec_motor_vel = rec_motor_vel[:idx] - - # Save. - recordings_dir = asset_path / "recordings" - recordings_dir.mkdir(exist_ok=True) - stamp = datetime.now().strftime("%Y%m%d_%H%M%S") - out_path = recordings_dir / f"motor_capture_{stamp}.npz" - np.savez_compressed( - out_path, - time=rec_time, - action=rec_action, - motor_angle=rec_motor_angle, - motor_vel=rec_motor_vel, - ) - - log.info( - "capture_saved", - path=str(out_path), - samples=idx, - duration_actual=f"{rec_time[-1]:.2f}s" if idx > 0 else "0s", - ) - return out_path - - -# ── CLI ────────────────────────────────────────────────────────────── - - -def main() -> None: - parser = argparse.ArgumentParser( - description="Capture motor-only trajectory for system identification." - ) - parser.add_argument( - "--asset-path", type=str, default=_DEFAULT_ASSET, - help="Path to motor asset directory (contains hardware.yaml)", - ) - parser.add_argument( - "--port", type=str, default="/dev/cu.usbserial-0001", - help="Serial port for ESP32", - ) - parser.add_argument("--baud", type=int, default=115200) - parser.add_argument("--duration", type=float, default=20.0, help="Capture duration (s)") - parser.add_argument( - "--amplitude", type=int, default=200, - help="Max PWM magnitude (0–255, firmware allows full range)", - ) - parser.add_argument("--hold-min-ms", type=int, default=50, help="PRBS min hold (ms)") - parser.add_argument("--hold-max-ms", type=int, default=400, help="PRBS max hold (ms)") - parser.add_argument("--dt", type=float, default=0.02, help="Nominal sample period (s)") - args = parser.parse_args() - - capture( - asset_path=args.asset_path, - port=args.port, - baud=args.baud, - duration=args.duration, - amplitude=args.amplitude, - hold_min_ms=args.hold_min_ms, - hold_max_ms=args.hold_max_ms, - dt=args.dt, - ) - - -if __name__ == "__main__": - main() diff --git a/src/sysid/motor/export.py b/src/sysid/motor/export.py deleted file mode 100644 index b717ce1..0000000 --- a/src/sysid/motor/export.py +++ /dev/null @@ -1,186 +0,0 @@ -"""Export tuned motor parameters to MJCF and robot.yaml files. - -Reads the original motor.xml and robot.yaml, patches with optimised -parameter values, and writes motor_tuned.xml + robot_tuned.yaml. - -Usage: - python -m src.sysid.motor.export \ - --asset-path assets/motor \ - --result assets/motor/motor_sysid_result.json -""" - -from __future__ import annotations - -import argparse -import copy -import json -import xml.etree.ElementTree as ET -from pathlib import Path - -import structlog -import yaml - -log = structlog.get_logger() - -_DEFAULT_ASSET = "assets/motor" - - -def export_tuned_files( - asset_path: str | Path, - params: dict[str, float], -) -> tuple[Path, Path]: - """Write tuned MJCF and robot.yaml files. - - Returns (tuned_mjcf_path, tuned_robot_yaml_path). - """ - asset_path = Path(asset_path).resolve() - - robot_yaml_path = asset_path / "robot.yaml" - robot_cfg = yaml.safe_load(robot_yaml_path.read_text()) - mjcf_path = asset_path / robot_cfg["mjcf"] - - # ── Tune MJCF ──────────────────────────────────────────────── - tree = ET.parse(str(mjcf_path)) - root = tree.getroot() - - # Actuator — use average gear for the MJCF model. - gear_pos = params.get("actuator_gear_pos", params.get("actuator_gear")) - gear_neg = params.get("actuator_gear_neg", params.get("actuator_gear")) - gear_avg = None - if gear_pos is not None and gear_neg is not None: - gear_avg = (gear_pos + gear_neg) / 2.0 - elif gear_pos is not None: - gear_avg = gear_pos - filter_tau = params.get("actuator_filter_tau") - for act_el in root.iter("general"): - if act_el.get("name") == "motor": - if gear_avg is not None: - act_el.set("gear", str(gear_avg)) - if filter_tau is not None: - if filter_tau > 0: - act_el.set("dyntype", "filter") - act_el.set("dynprm", str(filter_tau)) - else: - act_el.set("dyntype", "none") - - # Joint — average damping & friction for MJCF (asymmetry in runtime). - fl_pos = params.get("motor_frictionloss_pos", params.get("motor_frictionloss")) - fl_neg = params.get("motor_frictionloss_neg", params.get("motor_frictionloss")) - fl_avg = None - if fl_pos is not None and fl_neg is not None: - fl_avg = (fl_pos + fl_neg) / 2.0 - elif fl_pos is not None: - fl_avg = fl_pos - damp_pos = params.get("motor_damping_pos", params.get("motor_damping")) - damp_neg = params.get("motor_damping_neg", params.get("motor_damping")) - damp_avg = None - if damp_pos is not None and damp_neg is not None: - damp_avg = (damp_pos + damp_neg) / 2.0 - elif damp_pos is not None: - damp_avg = damp_pos - for jnt in root.iter("joint"): - if jnt.get("name") == "motor_joint": - if damp_avg is not None: - jnt.set("damping", str(damp_avg)) - if "motor_armature" in params: - jnt.set("armature", str(params["motor_armature"])) - if fl_avg is not None: - jnt.set("frictionloss", str(fl_avg)) - - # Rotor mass. - if "rotor_mass" in params: - for geom in root.iter("geom"): - if geom.get("name") == "rotor_disk": - geom.set("mass", str(params["rotor_mass"])) - - # Write tuned MJCF. - tuned_mjcf_name = mjcf_path.stem + "_tuned" + mjcf_path.suffix - tuned_mjcf_path = asset_path / tuned_mjcf_name - ET.indent(tree, space=" ") - tree.write(str(tuned_mjcf_path), xml_declaration=True, encoding="unicode") - log.info("tuned_mjcf_written", path=str(tuned_mjcf_path)) - - # ── Tune robot.yaml ────────────────────────────────────────── - tuned_cfg = copy.deepcopy(robot_cfg) - tuned_cfg["mjcf"] = tuned_mjcf_name - - if tuned_cfg.get("actuators") and len(tuned_cfg["actuators"]) > 0: - act = tuned_cfg["actuators"][0] - if gear_avg is not None: - act["gear"] = round(gear_avg, 6) - if "actuator_filter_tau" in params: - act["filter_tau"] = round(params["actuator_filter_tau"], 6) - if "motor_damping" in params: - act["damping"] = round(params["motor_damping"], 6) - - if "joints" not in tuned_cfg: - tuned_cfg["joints"] = {} - if "motor_joint" not in tuned_cfg["joints"]: - tuned_cfg["joints"]["motor_joint"] = {} - mj = tuned_cfg["joints"]["motor_joint"] - if "motor_armature" in params: - mj["armature"] = round(params["motor_armature"], 6) - if fl_avg is not None: - mj["frictionloss"] = round(fl_avg, 6) - - # Asymmetric / hardware-realism / nonlinear parameters. - realism = {} - for key in [ - "actuator_gear_pos", "actuator_gear_neg", - "motor_damping_pos", "motor_damping_neg", - "motor_frictionloss_pos", "motor_frictionloss_neg", - "motor_deadzone_pos", "motor_deadzone_neg", - "action_bias", - "viscous_quadratic", "back_emf_gain", - "stribeck_friction_boost", "stribeck_vel", - "gearbox_backlash", - ]: - if key in params: - realism[key] = round(params[key], 6) - if realism: - tuned_cfg["hardware_realism"] = realism - - tuned_yaml_path = asset_path / "robot_tuned.yaml" - header = ( - "# Tuned motor config — generated by src.sysid.motor.optimize\n" - "# Original: robot.yaml\n\n" - ) - tuned_yaml_path.write_text( - header + yaml.dump(tuned_cfg, default_flow_style=False, sort_keys=False) - ) - log.info("tuned_robot_yaml_written", path=str(tuned_yaml_path)) - - return tuned_mjcf_path, tuned_yaml_path - - -# ── CLI ────────────────────────────────────────────────────────────── - - -def main() -> None: - parser = argparse.ArgumentParser( - description="Export tuned motor parameters to MJCF + robot.yaml." - ) - parser.add_argument("--asset-path", type=str, default=_DEFAULT_ASSET) - parser.add_argument( - "--result", type=str, default=None, - help="Path to motor_sysid_result.json (auto-detected if omitted)", - ) - args = parser.parse_args() - - asset_path = Path(args.asset_path).resolve() - if args.result: - result_path = Path(args.result) - else: - result_path = asset_path / "motor_sysid_result.json" - - if not result_path.exists(): - raise FileNotFoundError(f"Result file not found: {result_path}") - - result = json.loads(result_path.read_text()) - params = result["best_params"] - - export_tuned_files(asset_path=args.asset_path, params=params) - - -if __name__ == "__main__": - main() diff --git a/src/sysid/motor/optimize.py b/src/sysid/motor/optimize.py deleted file mode 100644 index d736498..0000000 --- a/src/sysid/motor/optimize.py +++ /dev/null @@ -1,367 +0,0 @@ -"""CMA-ES optimiser — fit motor simulation parameters to a real recording. - -Motor-only version: minimises trajectory-matching cost between MuJoCo -rollout and recorded motor angle + velocity. - -Usage: - python -m src.sysid.motor.optimize \ - --recording assets/motor/recordings/motor_capture_YYYYMMDD_HHMMSS.npz - - # Quick test: - python -m src.sysid.motor.optimize --recording .npz \ - --max-generations 20 --population-size 10 -""" - -from __future__ import annotations - -import argparse -import json -import time -from datetime import datetime -from pathlib import Path - -import numpy as np -import structlog - -from src.sysid.motor.preprocess import recompute_velocity -from src.sysid.motor.rollout import ( - MOTOR_PARAMS, - ParamSpec, - bounds_arrays, - defaults_vector, - params_to_dict, - rollout, - windowed_rollout, -) - -log = structlog.get_logger() - -_DEFAULT_ASSET = "assets/motor" - - -# ── Cost function ──────────────────────────────────────────────────── - - -def _compute_trajectory_cost( - sim: dict[str, np.ndarray], - recording: dict[str, np.ndarray], - pos_weight: float = 1.0, - vel_weight: float = 0.5, - acc_weight: float = 0.0, - dt: float = 0.02, -) -> float: - """Weighted MSE between simulated and real motor trajectories. - - Motor-only: angle, velocity, and optionally acceleration. - Acceleration is computed as finite-difference of velocity. - """ - angle_err = sim["motor_angle"] - recording["motor_angle"] - vel_err = sim["motor_vel"] - recording["motor_vel"] - - # Reject NaN results (unstable simulation). - if np.any(~np.isfinite(angle_err)) or np.any(~np.isfinite(vel_err)): - return 1e6 - - cost = float( - pos_weight * np.mean(angle_err**2) - + vel_weight * np.mean(vel_err**2) - ) - - # Optional acceleration term — penalises wrong dynamics (d(vel)/dt). - if acc_weight > 0 and len(vel_err) > 2: - sim_acc = np.diff(sim["motor_vel"]) / dt - real_acc = np.diff(recording["motor_vel"]) / dt - acc_err = sim_acc - real_acc - if np.any(~np.isfinite(acc_err)): - return 1e6 - # Normalise by typical acceleration scale (~50 rad/s²) to keep - # the weight intuitive relative to vel/pos terms. - cost += acc_weight * np.mean(acc_err**2) / (50.0**2) - - return cost - - -def cost_function( - params_vec: np.ndarray, - recording: dict[str, np.ndarray], - asset_path: Path, - specs: list[ParamSpec], - sim_dt: float = 0.002, - substeps: int = 10, - pos_weight: float = 1.0, - vel_weight: float = 0.5, - acc_weight: float = 0.0, - window_duration: float = 0.5, -) -> float: - """Compute trajectory-matching cost for a candidate parameter vector. - - Uses multiple-shooting (windowed rollout) by default. - """ - params = params_to_dict(params_vec, specs) - - try: - if window_duration > 0: - sim = windowed_rollout( - asset_path=asset_path, - params=params, - recording=recording, - window_duration=window_duration, - sim_dt=sim_dt, - substeps=substeps, - ) - else: - sim = rollout( - asset_path=asset_path, - params=params, - actions=recording["action"], - sim_dt=sim_dt, - substeps=substeps, - ) - except Exception as exc: - log.warning("rollout_failed", error=str(exc)) - return 1e6 - - return _compute_trajectory_cost( - sim, recording, pos_weight, vel_weight, acc_weight, - dt=np.mean(np.diff(recording["time"])) if len(recording["time"]) > 1 else 0.02, - ) - - -# ── CMA-ES optimisation loop ──────────────────────────────────────── - - -def optimize( - asset_path: str | Path = _DEFAULT_ASSET, - recording_path: str | Path = "", - specs: list[ParamSpec] | None = None, - sigma0: float = 0.3, - population_size: int = 30, - max_generations: int = 300, - sim_dt: float = 0.002, - substeps: int = 10, - pos_weight: float = 1.0, - vel_weight: float = 0.5, - acc_weight: float = 0.0, - window_duration: float = 0.5, - seed: int = 42, - preprocess_vel: bool = True, - sg_window: int = 7, - sg_polyorder: int = 3, -) -> dict: - """Run CMA-ES optimisation and return results dict.""" - from cmaes import CMA - - asset_path = Path(asset_path).resolve() - recording_path = Path(recording_path).resolve() - - if specs is None: - specs = MOTOR_PARAMS - - # Load recording. - recording = dict(np.load(recording_path)) - n_samples = len(recording["time"]) - duration = recording["time"][-1] - recording["time"][0] - n_windows = max(1, int(duration / window_duration)) if window_duration > 0 else 1 - log.info( - "recording_loaded", - path=str(recording_path), - samples=n_samples, - duration=f"{duration:.1f}s", - n_windows=n_windows, - ) - - # Preprocess velocity: replace noisy firmware finite-difference with - # smooth Savitzky-Golay derivative of the angle signal. - if preprocess_vel: - recording = recompute_velocity( - recording, - window_length=sg_window, - polyorder=sg_polyorder, - ) - - # Normalise to [0, 1] for CMA-ES. - lo, hi = bounds_arrays(specs) - x0 = defaults_vector(specs) - span = hi - lo - span[span == 0] = 1.0 - - def to_normed(x: np.ndarray) -> np.ndarray: - return (x - lo) / span - - def from_normed(x_n: np.ndarray) -> np.ndarray: - return x_n * span + lo - - x0_normed = to_normed(x0) - bounds_normed = np.column_stack( - [np.zeros(len(specs)), np.ones(len(specs))] - ) - - optimizer = CMA( - mean=x0_normed, - sigma=sigma0, - bounds=bounds_normed, - population_size=population_size, - seed=seed, - ) - - best_cost = float("inf") - best_params_vec = x0.copy() - history: list[tuple[int, float]] = [] - - log.info( - "cmaes_starting", - n_params=len(specs), - population=population_size, - max_gens=max_generations, - sigma0=sigma0, - ) - - t0 = time.monotonic() - - for gen in range(max_generations): - solutions = [] - for _ in range(optimizer.population_size): - x_normed = optimizer.ask() - x_natural = from_normed(x_normed) - x_natural = np.clip(x_natural, lo, hi) - - c = cost_function( - x_natural, - recording, - asset_path, - specs, - sim_dt=sim_dt, - substeps=substeps, - pos_weight=pos_weight, - vel_weight=vel_weight, - acc_weight=acc_weight, - window_duration=window_duration, - ) - solutions.append((x_normed, c)) - - if c < best_cost: - best_cost = c - best_params_vec = x_natural.copy() - - optimizer.tell(solutions) - history.append((gen, best_cost)) - - elapsed = time.monotonic() - t0 - if gen % 5 == 0 or gen == max_generations - 1: - log.info( - "cmaes_generation", - gen=gen, - best_cost=f"{best_cost:.6f}", - elapsed=f"{elapsed:.1f}s", - gen_best=f"{min(c for _, c in solutions):.6f}", - ) - - total_time = time.monotonic() - t0 - best_params = params_to_dict(best_params_vec, specs) - - log.info( - "cmaes_finished", - best_cost=f"{best_cost:.6f}", - total_time=f"{total_time:.1f}s", - evaluations=max_generations * population_size, - ) - - # Log parameter comparison. - defaults = params_to_dict(defaults_vector(specs), specs) - for name in best_params: - d = defaults[name] - b = best_params[name] - change_pct = ((b - d) / abs(d) * 100) if abs(d) > 1e-12 else 0.0 - log.info( - "param_result", - name=name, - default=f"{d:.6g}", - tuned=f"{b:.6g}", - change=f"{change_pct:+.1f}%", - ) - - return { - "best_params": best_params, - "best_cost": best_cost, - "history": history, - "recording": str(recording_path), - "param_names": [s.name for s in specs], - "defaults": {s.name: s.default for s in specs}, - "timestamp": datetime.now().isoformat(), - } - - -# ── CLI ────────────────────────────────────────────────────────────── - - -def main() -> None: - parser = argparse.ArgumentParser( - description="Fit motor simulation parameters to a real recording (CMA-ES)." - ) - parser.add_argument( - "--asset-path", type=str, default=_DEFAULT_ASSET, - help="Path to motor asset directory", - ) - parser.add_argument( - "--recording", type=str, required=True, - help="Path to .npz recording file", - ) - parser.add_argument("--sigma0", type=float, default=0.3) - parser.add_argument("--population-size", type=int, default=30) - parser.add_argument("--max-generations", type=int, default=300) - parser.add_argument("--sim-dt", type=float, default=0.002) - parser.add_argument("--substeps", type=int, default=10) - parser.add_argument("--pos-weight", type=float, default=1.0) - parser.add_argument("--vel-weight", type=float, default=0.5) - parser.add_argument("--acc-weight", type=float, default=0.0, - help="Weight for acceleration matching (0=off, try 0.1-0.5)") - parser.add_argument("--window-duration", type=float, default=0.5) - parser.add_argument("--seed", type=int, default=42) - parser.add_argument( - "--no-preprocess-vel", action="store_true", - help="Disable Savitzky-Golay velocity preprocessing", - ) - parser.add_argument("--sg-window", type=int, default=7, - help="Savitzky-Golay window length (odd, default 7 = 140ms)") - parser.add_argument("--sg-polyorder", type=int, default=3, - help="Savitzky-Golay polynomial order (default 3 = cubic)") - args = parser.parse_args() - - result = optimize( - asset_path=args.asset_path, - recording_path=args.recording, - sigma0=args.sigma0, - population_size=args.population_size, - max_generations=args.max_generations, - sim_dt=args.sim_dt, - substeps=args.substeps, - pos_weight=args.pos_weight, - vel_weight=args.vel_weight, - acc_weight=args.acc_weight, - window_duration=args.window_duration, - seed=args.seed, - preprocess_vel=not args.no_preprocess_vel, - sg_window=args.sg_window, - sg_polyorder=args.sg_polyorder, - ) - - # Save results JSON. - asset_path = Path(args.asset_path).resolve() - result_path = asset_path / "motor_sysid_result.json" - result_json = {k: v for k, v in result.items() if k != "history"} - result_json["history_summary"] = { - "first_cost": result["history"][0][1] if result["history"] else None, - "final_cost": result["history"][-1][1] if result["history"] else None, - "generations": len(result["history"]), - } - result_path.write_text(json.dumps(result_json, indent=2, default=str)) - log.info("results_saved", path=str(result_path)) - - # Export tuned files. - from src.sysid.motor.export import export_tuned_files - - export_tuned_files(asset_path=args.asset_path, params=result["best_params"]) - - -if __name__ == "__main__": - main() diff --git a/src/sysid/motor/preprocess.py b/src/sysid/motor/preprocess.py deleted file mode 100644 index 9aa8cf1..0000000 --- a/src/sysid/motor/preprocess.py +++ /dev/null @@ -1,114 +0,0 @@ -"""Recording preprocessing — clean velocity estimation from angle data. - -The ESP32 firmware computes velocity as a raw finite-difference of encoder -counts at 50 Hz. With a 1320 CPR encoder that gives ~0.24 rad/s of -quantisation noise per count. This module replaces the noisy firmware -velocity with a smooth differentiation of the (much cleaner) angle signal. - -Method: Savitzky-Golay filter applied to the angle signal, then -differentiated analytically. Zero phase lag, preserves transients well. - -This is standard practice in robotics sysid — see e.g. MATLAB System ID -Toolbox, Drake's trajectory processing, or ETH's ANYmal sysid pipeline. -""" - -from __future__ import annotations - -import numpy as np -from scipy.signal import savgol_filter - -import structlog - -log = structlog.get_logger() - - -def recompute_velocity( - recording: dict[str, np.ndarray], - window_length: int = 7, - polyorder: int = 3, - deriv: int = 1, -) -> dict[str, np.ndarray]: - """Recompute motor_vel from motor_angle using Savitzky-Golay differentiation. - - Parameters - ---------- - recording : dict with at least 'time', 'motor_angle', 'motor_vel' keys. - window_length : SG filter window (must be odd, > polyorder). - 7 samples at 50 Hz = 140ms window — good balance of smoothness - and responsiveness. Captures dynamics up to ~7 Hz. - polyorder : Polynomial order for the SG filter (3 = cubic). - deriv : Derivative order (1 = first derivative = velocity). - - Returns - ------- - New recording dict with 'motor_vel' replaced and 'motor_vel_raw' added. - """ - rec = dict(recording) # shallow copy - - times = rec["time"] - angles = rec["motor_angle"] - dt = np.mean(np.diff(times)) - - # Keep original for diagnostics. - rec["motor_vel_raw"] = rec["motor_vel"].copy() - - # Savitzky-Golay derivative: fits a polynomial to each window, - # then takes the analytical derivative → smooth, zero phase lag. - vel_sg = savgol_filter( - angles, - window_length=window_length, - polyorder=polyorder, - deriv=deriv, - delta=dt, - ) - - # Compute stats for logging. - raw_vel = rec["motor_vel_raw"] - noise_estimate = np.std(raw_vel - vel_sg) - max_diff = np.max(np.abs(raw_vel - vel_sg)) - - log.info( - "velocity_recomputed", - method="savgol", - window=window_length, - polyorder=polyorder, - dt=f"{dt*1000:.1f}ms", - noise_std=f"{noise_estimate:.3f} rad/s", - max_diff=f"{max_diff:.3f} rad/s", - raw_rms=f"{np.sqrt(np.mean(raw_vel**2)):.3f}", - sg_rms=f"{np.sqrt(np.mean(vel_sg**2)):.3f}", - ) - - rec["motor_vel"] = vel_sg - return rec - - -def smooth_velocity( - recording: dict[str, np.ndarray], - cutoff_hz: float = 10.0, -) -> dict[str, np.ndarray]: - """Alternative: apply zero-phase Butterworth low-pass to motor_vel. - - Simpler than SG derivative but introduces slight edge effects. - """ - from scipy.signal import butter, filtfilt - - rec = dict(recording) - rec["motor_vel_raw"] = rec["motor_vel"].copy() - - dt = np.mean(np.diff(rec["time"])) - fs = 1.0 / dt - nyq = fs / 2.0 - norm_cutoff = min(cutoff_hz / nyq, 0.99) - - b, a = butter(2, norm_cutoff, btype="low") - rec["motor_vel"] = filtfilt(b, a, rec["motor_vel"]) - - log.info( - "velocity_smoothed", - method="butterworth", - cutoff_hz=cutoff_hz, - fs=fs, - ) - - return rec diff --git a/src/sysid/motor/rollout.py b/src/sysid/motor/rollout.py deleted file mode 100644 index 691703c..0000000 --- a/src/sysid/motor/rollout.py +++ /dev/null @@ -1,460 +0,0 @@ -"""Deterministic simulation replay — roll out recorded actions in MuJoCo. - -Motor-only version: single hinge joint, no pendulum. -Given a parameter vector and recorded actions, builds a MuJoCo model -with overridden dynamics, replays the actions, and returns the simulated -motor angle + velocity for comparison with the real recording. -""" - -from __future__ import annotations - -import dataclasses -import xml.etree.ElementTree as ET -from pathlib import Path - -import mujoco -import numpy as np -import yaml - - -# ── Tunable parameter specification ────────────────────────────────── - - -@dataclasses.dataclass -class ParamSpec: - """Specification for a single tunable parameter.""" - - name: str - default: float - lower: float - upper: float - log_scale: bool = False - - -# Motor-only parameters to identify. -# These capture the full transfer function: PWM → shaft angle/velocity. -# -# Asymmetric parameters (pos/neg suffix) capture real-world differences -# between CW and CCW rotation caused by gear mesh, brush contact, -# and H-bridge asymmetry. -MOTOR_PARAMS: list[ParamSpec] = [ - # ── Actuator ───────────────────────────────────────────────── - # gear_pos/neg: effective torque per unit ctrl, split by direction. - # Real motors + L298N often have different drive strength per direction. - ParamSpec("actuator_gear_pos", 0.064, 0.005, 0.5, log_scale=True), - ParamSpec("actuator_gear_neg", 0.064, 0.005, 0.5, log_scale=True), - # filter_tau: first-order electrical/driver time constant (s). - # Lower bound 1ms — L298N PWM switching is very fast. - ParamSpec("actuator_filter_tau", 0.03, 0.001, 0.20), - # ── Joint dynamics ─────────────────────────────────────────── - # damping_pos/neg: viscous friction (back-EMF), split by direction. - ParamSpec("motor_damping_pos", 0.003, 1e-5, 0.1, log_scale=True), - ParamSpec("motor_damping_neg", 0.003, 1e-5, 0.1, log_scale=True), - # armature: reflected rotor inertia (kg·m²). - ParamSpec("motor_armature", 0.0001, 1e-6, 0.01, log_scale=True), - # frictionloss_pos/neg: Coulomb friction, split by velocity direction. - ParamSpec("motor_frictionloss_pos", 0.03, 0.001, 0.2, log_scale=True), - ParamSpec("motor_frictionloss_neg", 0.03, 0.001, 0.2, log_scale=True), - # ── Nonlinear dynamics ─────────────────────────────────────── - # viscous_quadratic: velocity-squared drag term (N·m·s²/rad²). - # Captures nonlinear friction that increases at high speed (air drag, - # grease viscosity, etc.). Opposes motion. - ParamSpec("viscous_quadratic", 0.0, 0.0, 0.005), - # back_emf_gain: torque reduction proportional to |vel × ctrl|. - # Models the back-EMF effect: at high speed the motor produces less - # torque because the voltage drop across the armature is smaller. - ParamSpec("back_emf_gain", 0.0, 0.0, 0.05), - # stribeck_vel: characteristic velocity below which Coulomb friction - # is boosted (Stribeck effect). 0 = standard Coulomb only. - ParamSpec("stribeck_friction_boost", 0.0, 0.0, 0.15), - ParamSpec("stribeck_vel", 2.0, 0.1, 8.0), - # ── Rotor load ─────────────────────────────────────────────── - ParamSpec("rotor_mass", 0.012, 0.002, 0.05, log_scale=True), - # ── Hardware realism ───────────────────────────────────────── - # deadzone_pos/neg: minimum |action| per direction. - ParamSpec("motor_deadzone_pos", 0.08, 0.0, 0.30), - ParamSpec("motor_deadzone_neg", 0.08, 0.0, 0.30), - # action_bias: constant offset added to ctrl (H-bridge asymmetry). - ParamSpec("action_bias", 0.0, -0.10, 0.10), - # ── Gearbox backlash ───────────────────────────────────────── - # backlash_halfwidth: half the angular deadband (rad) in the gearbox. - # When the motor reverses, the shaft doesn't move until the backlash - # gap is taken up. Typical for 30:1 plastic/metal spur gears. - ParamSpec("gearbox_backlash", 0.0, 0.0, 0.15), -] - - -def params_to_dict( - values: np.ndarray, specs: list[ParamSpec] | None = None -) -> dict[str, float]: - if specs is None: - specs = MOTOR_PARAMS - return {s.name: float(values[i]) for i, s in enumerate(specs)} - - -def defaults_vector(specs: list[ParamSpec] | None = None) -> np.ndarray: - if specs is None: - specs = MOTOR_PARAMS - return np.array([s.default for s in specs], dtype=np.float64) - - -def bounds_arrays( - specs: list[ParamSpec] | None = None, -) -> tuple[np.ndarray, np.ndarray]: - if specs is None: - specs = MOTOR_PARAMS - lo = np.array([s.lower for s in specs], dtype=np.float64) - hi = np.array([s.upper for s in specs], dtype=np.float64) - return lo, hi - - -# ── MuJoCo model building with parameter overrides ────────────────── - - -def _build_model( - asset_path: Path, - params: dict[str, float], -) -> mujoco.MjModel: - """Build a MuJoCo model from motor.xml with parameter overrides. - - Parses the MJCF, patches actuator/joint/body parameters, reloads. - """ - asset_path = Path(asset_path).resolve() - robot_cfg = yaml.safe_load((asset_path / "robot.yaml").read_text()) - mjcf_path = asset_path / robot_cfg["mjcf"] - - tree = ET.parse(str(mjcf_path)) - root = tree.getroot() - - # ── Actuator overrides ─────────────────────────────────────── - # Use average of pos/neg gear for MuJoCo (asymmetry handled in ctrl). - gear_pos = params.get("actuator_gear_pos", params.get("actuator_gear", 0.064)) - gear_neg = params.get("actuator_gear_neg", params.get("actuator_gear", 0.064)) - gear = (gear_pos + gear_neg) / 2.0 - filter_tau = params.get("actuator_filter_tau", 0.03) - - for act_el in root.iter("general"): - if act_el.get("name") == "motor": - act_el.set("gear", str(gear)) - if filter_tau > 0: - act_el.set("dyntype", "filter") - act_el.set("dynprm", str(filter_tau)) - else: - act_el.set("dyntype", "none") - act_el.set("dynprm", "1") - - # ── Joint overrides ────────────────────────────────────────── - # Damping and friction are asymmetric + nonlinear → applied manually. - # Set MuJoCo damping & frictionloss to 0; we handle them in qfrc_applied. - armature = params.get("motor_armature", 0.0001) - - for jnt in root.iter("joint"): - if jnt.get("name") == "motor_joint": - jnt.set("damping", "0") - jnt.set("armature", str(armature)) - jnt.set("frictionloss", "0") - - # ── Rotor mass override ────────────────────────────────────── - rotor_mass = params.get("rotor_mass", 0.012) - for geom in root.iter("geom"): - if geom.get("name") == "rotor_disk": - geom.set("mass", str(rotor_mass)) - - # Write temp file and load. - tmp_path = asset_path / "_tmp_motor_sysid.xml" - try: - tree.write(str(tmp_path), xml_declaration=True, encoding="unicode") - model = mujoco.MjModel.from_xml_path(str(tmp_path)) - finally: - tmp_path.unlink(missing_ok=True) - - return model - - -# ── Action + asymmetry transforms ──────────────────────────────────── - - -def _transform_action( - action: float, - params: dict[str, float], -) -> float: - """Apply bias, direction-dependent deadzone, and gear scaling. - - The MuJoCo actuator has the *average* gear ratio. We rescale the - control signal so that ``ctrl × gear_avg ≈ action × gear_dir``. - """ - # Constant bias (H-bridge asymmetry). - action = action + params.get("action_bias", 0.0) - - # Direction-dependent deadzone. - dz_pos = params.get("motor_deadzone_pos", params.get("motor_deadzone", 0.08)) - dz_neg = params.get("motor_deadzone_neg", params.get("motor_deadzone", 0.08)) - if action >= 0 and action < dz_pos: - return 0.0 - if action < 0 and action > -dz_neg: - return 0.0 - - # Direction-dependent gear scaling. - # MuJoCo model uses gear_avg; we rescale ctrl to get the right torque. - gear_pos = params.get("actuator_gear_pos", params.get("actuator_gear", 0.064)) - gear_neg = params.get("actuator_gear_neg", params.get("actuator_gear", 0.064)) - gear_avg = (gear_pos + gear_neg) / 2.0 - if gear_avg < 1e-8: - return 0.0 - gear_dir = gear_pos if action >= 0 else gear_neg - return action * (gear_dir / gear_avg) - - -def _apply_forces( - data: mujoco.MjData, - vel: float, - ctrl: float, - params: dict[str, float], - backlash_state: list[float] | None = None, -) -> None: - """Apply all manual forces: asymmetric friction, damping, and nonlinear terms. - - Everything that MuJoCo can't represent with its symmetric joint model - is injected here via ``qfrc_applied``. - - Forces applied (all oppose motion or reduce torque): - 1. Asymmetric Coulomb friction (with Stribeck boost at low speed) - 2. Asymmetric viscous damping - 3. Quadratic velocity drag - 4. Back-EMF torque reduction (proportional to |vel|) - - Backlash: - If backlash_state is provided, it is a 1-element list [gap_pos]. - gap_pos tracks the motor's position within the backlash deadband. - When inside the gap, no actuator torque is transmitted to the - output shaft — only friction forces act. - """ - torque = 0.0 - - # ── Gearbox backlash ────────────────────────────────────────── - # Model: the gear teeth have play of 2×halfwidth radians. - # We track where the motor is within that gap. When at the - # edge (contact), actuator torque passes through normally. - # When inside the gap, no actuator torque is transmitted. - backlash_hw = params.get("gearbox_backlash", 0.0) - actuator_torque_scale = 1.0 # 1.0 = full contact, 0.0 = in gap - - if backlash_hw > 0 and backlash_state is not None: - # gap_pos: how far into the backlash gap we are. - # Range: [-backlash_hw, +backlash_hw] - # At ±backlash_hw, gears are in contact and torque transmits. - gap = backlash_state[0] - # Update gap position based on velocity. - dt_sub = data.model.opt.timestep - gap += vel * dt_sub - # Clamp to backlash range. - if gap > backlash_hw: - gap = backlash_hw - elif gap < -backlash_hw: - gap = -backlash_hw - - backlash_state[0] = gap - - # If not at contact edge, no torque transmitted. - if abs(gap) < backlash_hw - 1e-8: - actuator_torque_scale = 0.0 - else: - actuator_torque_scale = 1.0 - - # ── 1. Coulomb friction (direction-dependent + Stribeck) ───── - fl_pos = params.get("motor_frictionloss_pos", params.get("motor_frictionloss", 0.03)) - fl_neg = params.get("motor_frictionloss_neg", params.get("motor_frictionloss", 0.03)) - stribeck_boost = params.get("stribeck_friction_boost", 0.0) - stribeck_vel = params.get("stribeck_vel", 2.0) - - if abs(vel) > 1e-6: - fl = fl_pos if vel > 0 else fl_neg - # Stribeck: boost friction at low speed. Exponential decay. - if stribeck_boost > 0 and stribeck_vel > 0: - fl = fl * (1.0 + stribeck_boost * np.exp(-abs(vel) / stribeck_vel)) - # Coulomb: constant magnitude, opposes motion. - torque -= np.sign(vel) * fl - - # ── 2. Asymmetric viscous damping ──────────────────────────── - damp_pos = params.get("motor_damping_pos", params.get("motor_damping", 0.003)) - damp_neg = params.get("motor_damping_neg", params.get("motor_damping", 0.003)) - damp = damp_pos if vel > 0 else damp_neg - torque -= damp * vel - - # ── 3. Quadratic velocity drag ─────────────────────────────── - visc_quad = params.get("viscous_quadratic", 0.0) - if visc_quad > 0: - torque -= visc_quad * vel * abs(vel) - - # ── 4. Back-EMF torque reduction ───────────────────────────── - # At high speed, the motor's effective torque is reduced because - # back-EMF opposes the supply voltage. Modelled as a torque that - # opposes the control signal proportional to speed. - bemf = params.get("back_emf_gain", 0.0) - if bemf > 0 and abs(ctrl) > 1e-6: - # The reduction should oppose the actuator torque direction. - torque -= bemf * vel * np.sign(ctrl) * actuator_torque_scale - - # ── 5. Scale actuator contribution by backlash state ───────── - # When in the backlash gap, MuJoCo's actuator force should not - # transmit. We cancel it by applying an opposing force. - if actuator_torque_scale < 1.0: - # The actuator_force from MuJoCo will be applied by mj_step. - # We need to counteract it. data.qfrc_actuator isn't set yet - # at this point (pre-step), so we zero the ctrl instead. - # This is handled in the rollout loop by zeroing ctrl. - pass - - data.qfrc_applied[0] = max(-10.0, min(10.0, torque)) - return actuator_torque_scale - - -# ── Simulation rollout ─────────────────────────────────────────────── - - -def rollout( - asset_path: str | Path, - params: dict[str, float], - actions: np.ndarray, - sim_dt: float = 0.002, - substeps: int = 10, -) -> dict[str, np.ndarray]: - """Open-loop replay of recorded actions. - - Parameters - ---------- - asset_path : motor asset directory - params : named parameter overrides - actions : (N,) normalised actions [-1, 1] from the recording - sim_dt : MuJoCo physics timestep - substeps : physics substeps per control step (ctrl_dt = sim_dt × substeps) - - Returns - ------- - dict with motor_angle (N,) and motor_vel (N,). - """ - asset_path = Path(asset_path).resolve() - model = _build_model(asset_path, params) - model.opt.timestep = sim_dt - data = mujoco.MjData(model) - mujoco.mj_resetData(model, data) - - n = len(actions) - - sim_motor_angle = np.zeros(n, dtype=np.float64) - sim_motor_vel = np.zeros(n, dtype=np.float64) - - # Backlash state: [gap_position]. Starts at 0 (centered in gap). - backlash_state = [0.0] - - for i in range(n): - ctrl = _transform_action(actions[i], params) - data.ctrl[0] = ctrl - - for _ in range(substeps): - scale = _apply_forces(data, data.qvel[0], ctrl, params, backlash_state) - # If in backlash gap, zero ctrl so actuator torque doesn't transmit. - if scale < 1.0: - data.ctrl[0] = 0.0 - else: - data.ctrl[0] = ctrl - mujoco.mj_step(model, data) - - # Bail out on NaN/instability. - if not np.isfinite(data.qpos[0]) or abs(data.qvel[0]) > 1e4: - sim_motor_angle[i:] = np.nan - sim_motor_vel[i:] = np.nan - break - - sim_motor_angle[i] = data.qpos[0] - sim_motor_vel[i] = data.qvel[0] - - return { - "motor_angle": sim_motor_angle, - "motor_vel": sim_motor_vel, - } - - -def windowed_rollout( - asset_path: str | Path, - params: dict[str, float], - recording: dict[str, np.ndarray], - window_duration: float = 0.5, - sim_dt: float = 0.002, - substeps: int = 10, -) -> dict[str, np.ndarray]: - """Multiple-shooting rollout for motor-only sysid. - - Splits the recording into short windows. Each window is initialised - from the real motor state, preventing error accumulation. - - Returns - ------- - dict with motor_angle (N,), motor_vel (N,), n_windows (int). - """ - asset_path = Path(asset_path).resolve() - model = _build_model(asset_path, params) - model.opt.timestep = sim_dt - data = mujoco.MjData(model) - - times = recording["time"] - actions = recording["action"] - real_angle = recording["motor_angle"] - real_vel = recording["motor_vel"] - n = len(actions) - - sim_motor_angle = np.zeros(n, dtype=np.float64) - sim_motor_vel = np.zeros(n, dtype=np.float64) - - # Compute window boundaries. - t0 = times[0] - t_end = times[-1] - window_starts: list[int] = [] - current_t = t0 - while current_t < t_end: - idx = int(np.searchsorted(times, current_t)) - idx = min(idx, n - 1) - window_starts.append(idx) - current_t += window_duration - - n_windows = len(window_starts) - - for w, w_start in enumerate(window_starts): - w_end = window_starts[w + 1] if w + 1 < n_windows else n - - # Init from real state at window start. - mujoco.mj_resetData(model, data) - data.qpos[0] = real_angle[w_start] - data.qvel[0] = real_vel[w_start] - data.ctrl[:] = 0.0 - mujoco.mj_forward(model, data) - - # Backlash state resets each window (assume contact at start). - backlash_state = [0.0] - - for i in range(w_start, w_end): - ctrl = _transform_action(actions[i], params) - data.ctrl[0] = ctrl - - for _ in range(substeps): - scale = _apply_forces(data, data.qvel[0], ctrl, params, backlash_state) - if scale < 1.0: - data.ctrl[0] = 0.0 - else: - data.ctrl[0] = ctrl - mujoco.mj_step(model, data) - - # Bail out on NaN/instability — fill rest of window with last good. - if not np.isfinite(data.qpos[0]) or abs(data.qvel[0]) > 1e4: - sim_motor_angle[i:w_end] = sim_motor_angle[max(i-1, w_start)] - sim_motor_vel[i:w_end] = 0.0 - break - - sim_motor_angle[i] = data.qpos[0] - sim_motor_vel[i] = data.qvel[0] - - return { - "motor_angle": sim_motor_angle, - "motor_vel": sim_motor_vel, - "n_windows": n_windows, - } diff --git a/src/sysid/motor/visualize.py b/src/sysid/motor/visualize.py deleted file mode 100644 index 69fa107..0000000 --- a/src/sysid/motor/visualize.py +++ /dev/null @@ -1,204 +0,0 @@ -"""Visualise motor system identification — real vs simulated trajectories. - -Usage: - python -m src.sysid.motor.visualize \ - --recording assets/motor/recordings/motor_capture_YYYYMMDD_HHMMSS.npz - - # With tuned result: - python -m src.sysid.motor.visualize \ - --recording .npz \ - --result assets/motor/motor_sysid_result.json -""" - -from __future__ import annotations - -import argparse -import json -from pathlib import Path - -import numpy as np -import structlog - -log = structlog.get_logger() - -_DEFAULT_ASSET = "assets/motor" - - -def visualize( - asset_path: str | Path = _DEFAULT_ASSET, - recording_path: str | Path = "", - result_path: str | Path | None = None, - sim_dt: float = 0.002, - substeps: int = 10, - window_duration: float = 0.5, - save_path: str | Path | None = None, - show: bool = True, -) -> None: - """Generate 3-panel comparison plot: angle, velocity, action.""" - import matplotlib.pyplot as plt - - from src.sysid.motor.rollout import ( - MOTOR_PARAMS, - defaults_vector, - params_to_dict, - rollout, - windowed_rollout, - ) - - asset_path = Path(asset_path).resolve() - recording = dict(np.load(recording_path)) - - t = recording["time"] - actions = recording["action"] - - # ── Simulate with default parameters ───────────────────────── - default_params = params_to_dict(defaults_vector(MOTOR_PARAMS), MOTOR_PARAMS) - log.info("simulating_default_params", windowed=window_duration > 0) - - if window_duration > 0: - sim_default = windowed_rollout( - asset_path=asset_path, - params=default_params, - recording=recording, - window_duration=window_duration, - sim_dt=sim_dt, - substeps=substeps, - ) - else: - sim_default = rollout( - asset_path=asset_path, - params=default_params, - actions=actions, - sim_dt=sim_dt, - substeps=substeps, - ) - - # ── Simulate with tuned parameters (if available) ──────────── - sim_tuned = None - tuned_cost = None - - if result_path is not None: - result_path = Path(result_path) - else: - # Auto-detect. - auto = asset_path / "motor_sysid_result.json" - if auto.exists(): - result_path = auto - - if result_path is not None and result_path.exists(): - result = json.loads(result_path.read_text()) - tuned_params = result.get("best_params", {}) - tuned_cost = result.get("best_cost") - log.info("simulating_tuned_params", cost=tuned_cost) - - if window_duration > 0: - sim_tuned = windowed_rollout( - asset_path=asset_path, - params=tuned_params, - recording=recording, - window_duration=window_duration, - sim_dt=sim_dt, - substeps=substeps, - ) - else: - sim_tuned = rollout( - asset_path=asset_path, - params=tuned_params, - actions=actions, - sim_dt=sim_dt, - substeps=substeps, - ) - - # ── Plot ───────────────────────────────────────────────────── - fig, axes = plt.subplots(3, 1, figsize=(14, 8), sharex=True) - - # Motor angle. - ax = axes[0] - ax.plot(t, np.degrees(recording["motor_angle"]), "k-", lw=1.2, alpha=0.8, label="Real") - ax.plot(t, np.degrees(sim_default["motor_angle"]), "--", color="#d62728", lw=1.0, alpha=0.7, label="Sim (default)") - if sim_tuned is not None: - ax.plot(t, np.degrees(sim_tuned["motor_angle"]), "--", color="#2ca02c", lw=1.0, alpha=0.7, label="Sim (tuned)") - ax.set_ylabel("Motor Angle (°)") - ax.legend(loc="upper right", fontsize=8) - ax.grid(True, alpha=0.3) - - # Motor velocity. - ax = axes[1] - ax.plot(t, recording["motor_vel"], "k-", lw=1.2, alpha=0.8, label="Real") - ax.plot(t, sim_default["motor_vel"], "--", color="#d62728", lw=1.0, alpha=0.7, label="Sim (default)") - if sim_tuned is not None: - ax.plot(t, sim_tuned["motor_vel"], "--", color="#2ca02c", lw=1.0, alpha=0.7, label="Sim (tuned)") - ax.set_ylabel("Motor Velocity (rad/s)") - ax.legend(loc="upper right", fontsize=8) - ax.grid(True, alpha=0.3) - - # Action. - ax = axes[2] - ax.plot(t, actions, "b-", lw=0.8, alpha=0.6) - ax.set_ylabel("Action (norm)") - ax.set_xlabel("Time (s)") - ax.grid(True, alpha=0.3) - ax.set_ylim(-1.1, 1.1) - - # Title. - title = "Motor System Identification — Real vs Simulated" - if tuned_cost is not None: - from src.sysid.motor.optimize import cost_function - - orig_cost = cost_function( - defaults_vector(MOTOR_PARAMS), - recording, - asset_path, - MOTOR_PARAMS, - sim_dt=sim_dt, - substeps=substeps, - window_duration=window_duration, - ) - title += f"\nDefault cost: {orig_cost:.4f} → Tuned cost: {tuned_cost:.4f}" - improvement = (1.0 - tuned_cost / orig_cost) * 100 if orig_cost > 0 else 0 - title += f" ({improvement:+.1f}%)" - - fig.suptitle(title, fontsize=12) - plt.tight_layout() - - if save_path: - fig.savefig(str(save_path), dpi=150, bbox_inches="tight") - log.info("figure_saved", path=str(save_path)) - - if show: - plt.show() - else: - plt.close(fig) - - -# ── CLI ────────────────────────────────────────────────────────────── - - -def main() -> None: - parser = argparse.ArgumentParser( - description="Visualise motor system identification results." - ) - parser.add_argument("--asset-path", type=str, default=_DEFAULT_ASSET) - parser.add_argument("--recording", type=str, required=True, help=".npz file") - parser.add_argument("--result", type=str, default=None, help="sysid result JSON") - parser.add_argument("--sim-dt", type=float, default=0.002) - parser.add_argument("--substeps", type=int, default=10) - parser.add_argument("--window-duration", type=float, default=0.5) - parser.add_argument("--save", type=str, default=None, help="Save figure path") - parser.add_argument("--no-show", action="store_true") - args = parser.parse_args() - - visualize( - asset_path=args.asset_path, - recording_path=args.recording, - result_path=args.result, - sim_dt=args.sim_dt, - substeps=args.substeps, - window_duration=args.window_duration, - save_path=args.save, - show=not args.no_show, - ) - - -if __name__ == "__main__": - main() diff --git a/src/sysid/optimize.py b/src/sysid/optimize.py index 4a4f931..36d15e6 100644 --- a/src/sysid/optimize.py +++ b/src/sysid/optimize.py @@ -4,21 +4,14 @@ Minimises the trajectory-matching cost between a MuJoCo rollout and a recorded real-robot sequence. Uses the ``cmaes`` package (pure-Python CMA-ES with native box-constraint support). -Motor parameters are **locked** from the motor-only sysid — only -pendulum/arm inertial parameters, joint dynamics, and ctrl_limit are -optimised. Velocities are optionally preprocessed with Savitzky-Golay -differentiation for cleaner targets. +All 28 parameters (motor + pendulum/arm) are optimised jointly from a +single full-system recording. Velocities are optionally preprocessed +with Savitzky-Golay differentiation for cleaner targets. Usage: python -m src.sysid.optimize \ --robot-path assets/rotary_cartpole \ - --recording assets/rotary_cartpole/recordings/capture_20260314_000435.npz - - # Shorter run for testing: - python -m src.sysid.optimize \ - --robot-path assets/rotary_cartpole \ - --recording .npz \ - --max-generations 10 --population-size 8 + --recording assets/rotary_cartpole/recordings/capture_....npz """ from __future__ import annotations @@ -33,13 +26,17 @@ import numpy as np import structlog from src.sysid.rollout import ( - LOCKED_MOTOR_PARAMS, PARAM_SETS, ROTARY_CARTPOLE_PARAMS, ParamSpec, + _make_actuator, + _resolve_params, bounds_arrays, + build_base_model, + defaults_dict, defaults_vector, params_to_dict, + patch_model, rollout, windowed_rollout, ) @@ -48,62 +45,8 @@ log = structlog.get_logger() # ── Velocity preprocessing ─────────────────────────────────────────── - - -def _preprocess_recording( - recording: dict[str, np.ndarray], - preprocess_vel: bool = True, - sg_window: int = 7, - sg_polyorder: int = 3, -) -> dict[str, np.ndarray]: - """Optionally recompute velocities using Savitzky-Golay differentiation. - - Applies SG filtering to both motor_vel and pendulum_vel, replacing - the noisy firmware finite-difference velocities with smooth - analytical derivatives of the (clean) angle signals. - """ - if not preprocess_vel: - return recording - - from scipy.signal import savgol_filter - - rec = dict(recording) - times = rec["time"] - dt = float(np.mean(np.diff(times))) - - # Motor velocity. - rec["motor_vel_raw"] = rec["motor_vel"].copy() - rec["motor_vel"] = savgol_filter( - rec["motor_angle"], - window_length=sg_window, - polyorder=sg_polyorder, - deriv=1, - delta=dt, - ) - - # Pendulum velocity. - rec["pendulum_vel_raw"] = rec["pendulum_vel"].copy() - rec["pendulum_vel"] = savgol_filter( - rec["pendulum_angle"], - window_length=sg_window, - polyorder=sg_polyorder, - deriv=1, - delta=dt, - ) - - motor_noise = np.std(rec["motor_vel_raw"] - rec["motor_vel"]) - pend_noise = np.std(rec["pendulum_vel_raw"] - rec["pendulum_vel"]) - log.info( - "velocity_preprocessed", - method="savgol", - sg_window=sg_window, - sg_polyorder=sg_polyorder, - dt_ms=f"{dt*1000:.1f}", - motor_noise_std=f"{motor_noise:.3f} rad/s", - pend_noise_std=f"{pend_noise:.3f} rad/s", - ) - - return rec +# Shared implementation in src.sysid.preprocess. +from src.sysid.preprocess import preprocess_recording as _preprocess_recording # ── Cost function ──────────────────────────────────────────────────── @@ -172,7 +115,6 @@ def cost_function( vel_weight: float = 0.1, pendulum_scale: float = 3.0, window_duration: float = 0.5, - motor_params: dict[str, float] | None = None, ) -> float: """Compute trajectory-matching cost for a candidate parameter vector. @@ -198,7 +140,6 @@ def cost_function( window_duration=window_duration, sim_dt=sim_dt, substeps=substeps, - motor_params=motor_params, ) else: sim = rollout( @@ -207,7 +148,6 @@ def cost_function( actions=recording["action"], sim_dt=sim_dt, substeps=substeps, - motor_params=motor_params, ) except Exception as exc: log.warning("rollout_failed", error=str(exc)) @@ -221,6 +161,122 @@ def cost_function( return _compute_trajectory_cost(sim, recording, pos_weight, vel_weight, pendulum_scale) +# ── Fast cost (model-cached) ──────────────────────────────────────── + + +def _fast_cost( + params_vec: np.ndarray, + recording: dict[str, np.ndarray], + model, # mujoco.MjModel (cached, mutated in-place) + body_ids: dict[str, int], + dof_ids: dict[str, int], + specs: list[ParamSpec], + sim_dt: float = 0.002, + substeps: int = 10, + pos_weight: float = 1.0, + vel_weight: float = 0.1, + pendulum_scale: float = 3.0, + window_duration: float = 0.5, +) -> float: + """Like cost_function but patches a cached model instead of rebuilding.""" + import mujoco as _mj + from src.runners.mujoco import ActuatorLimits + + params = params_to_dict(params_vec, specs) + if not _check_inertia_valid(params): + return 1e6 + + # Patch the cached model in-place — no XML, no temp files. + patch_model(model, params, body_ids, dof_ids) + + # Rebuild actuator from params (motor params change per candidate). + actuator = _make_actuator(params) + + try: + data = _mj.MjData(model) + limits = ActuatorLimits(model) + ctrl_limit = params.get("ctrl_limit", 0.588) + + times = recording["time"] + actions = recording["action"] + n = len(actions) + + sim_motor_angle = np.zeros(n, dtype=np.float64) + sim_motor_vel = np.zeros(n, dtype=np.float64) + sim_pend_angle = np.zeros(n, dtype=np.float64) + sim_pend_vel = np.zeros(n, dtype=np.float64) + + if window_duration > 0: + # Windowed (multiple shooting). + real_motor = recording["motor_angle"] + real_motor_vel = recording["motor_vel"] + real_pend = recording["pendulum_angle"] + real_pend_vel = recording["pendulum_vel"] + + t0, t_end = times[0], times[-1] + window_starts: list[int] = [] + current_t = t0 + while current_t < t_end: + idx = int(np.searchsorted(times, current_t)) + idx = min(idx, n - 1) + window_starts.append(idx) + current_t += window_duration + n_windows = len(window_starts) + + for w, w_start in enumerate(window_starts): + w_end = window_starts[w + 1] if w + 1 < n_windows else n + _mj.mj_resetData(model, data) + data.qpos[0] = real_motor[w_start] + data.qpos[1] = real_pend[w_start] + data.qvel[0] = real_motor_vel[w_start] + data.qvel[1] = real_pend_vel[w_start] + data.ctrl[:] = 0.0 + _mj.mj_forward(model, data) + + for i in range(w_start, w_end): + action = max(-ctrl_limit, min(ctrl_limit, float(actions[i]))) + ctrl = actuator.transform_ctrl(action) + data.ctrl[0] = ctrl + for _ in range(substeps): + limits.enforce(model, data) + data.qfrc_applied[0] = actuator.compute_motor_force(data.qvel[0], ctrl) + _mj.mj_step(model, data) + sim_motor_angle[i] = data.qpos[0] + sim_pend_angle[i] = data.qpos[1] + sim_motor_vel[i] = data.qvel[0] + sim_pend_vel[i] = data.qvel[1] + else: + # Open-loop single-shot. + _mj.mj_resetData(model, data) + for i in range(n): + action = max(-ctrl_limit, min(ctrl_limit, float(actions[i]))) + ctrl = actuator.transform_ctrl(action) + data.ctrl[0] = ctrl + for _ in range(substeps): + limits.enforce(model, data) + data.qfrc_applied[0] = actuator.compute_motor_force(data.qvel[0], ctrl) + _mj.mj_step(model, data) + sim_motor_angle[i] = data.qpos[0] + sim_pend_angle[i] = data.qpos[1] + sim_motor_vel[i] = data.qvel[0] + sim_pend_vel[i] = data.qvel[1] + + sim = { + "motor_angle": sim_motor_angle, + "motor_vel": sim_motor_vel, + "pendulum_angle": sim_pend_angle, + "pendulum_vel": sim_pend_vel, + } + except Exception: + return 1e6 + + for key in sim: + if np.any(~np.isfinite(sim[key])): + return 1e6 + + return _compute_trajectory_cost(sim, recording, pos_weight, vel_weight, pendulum_scale) + + # ── Parallel evaluation helper (module-level for pickling) ─────────── # Shared state set by the parent process before spawning workers. @@ -229,22 +285,34 @@ _par_robot_path: Path = Path(".") _par_specs: list[ParamSpec] = [] _par_kwargs: dict = {} +# Cached model (built once per worker, patched per candidate). +_par_model: object = None # mujoco.MjModel +_par_body_ids: dict[str, int] = {} +_par_dof_ids: dict[str, int] = {} + def _init_worker(recording, robot_path, specs, kwargs): """Initialiser run once per worker process.""" global _par_recording, _par_robot_path, _par_specs, _par_kwargs + global _par_model, _par_body_ids, _par_dof_ids _par_recording = recording _par_robot_path = robot_path _par_specs = specs _par_kwargs = kwargs + # Build base model once — reused for all candidates in this worker. + _par_model, _, _par_body_ids, _par_dof_ids = build_base_model(robot_path) + _par_model.opt.timestep = kwargs.get("sim_dt", 0.002) + def _eval_candidate(x_natural: np.ndarray) -> float: """Evaluate a single candidate — called in worker processes.""" - return cost_function( + return _fast_cost( x_natural, _par_recording, - _par_robot_path, + _par_model, + _par_body_ids, + _par_dof_ids, _par_specs, **_par_kwargs, ) @@ -268,13 +336,11 @@ def optimize( window_duration: float = 0.5, seed: int = 42, preprocess_vel: bool = True, - sg_window: int = 7, - sg_polyorder: int = 3, ) -> dict: """Run CMA-ES optimisation and return results. - Motor parameters are locked from the motor-only sysid. - Only pendulum/arm parameters are optimised. + All parameters (motor + pendulum/arm) are optimised jointly from a + single full-system recording. Returns a dict with: best_params: dict[str, float] @@ -282,7 +348,6 @@ def optimize( history: list of (generation, best_cost) tuples recording: str (path used) specs: list of param names - motor_params: dict of locked motor params """ from cmaes import CMA @@ -292,23 +357,11 @@ def optimize( if specs is None: specs = ROTARY_CARTPOLE_PARAMS - motor_params = dict(LOCKED_MOTOR_PARAMS) - log.info( - "motor_params_locked", - n_params=len(motor_params), - gear_avg=f"{(motor_params['actuator_gear_pos'] + motor_params['actuator_gear_neg']) / 2:.4f}", - ) - # Load recording. recording = dict(np.load(recording_path)) # Preprocessing: SG velocity recomputation. - recording = _preprocess_recording( - recording, - preprocess_vel=preprocess_vel, - sg_window=sg_window, - sg_polyorder=sg_polyorder, - ) + recording = _preprocess_recording(recording, preprocess_vel=preprocess_vel) n_samples = len(recording["time"]) duration = recording["time"][-1] - recording["time"][0] @@ -376,7 +429,6 @@ def optimize( vel_weight=vel_weight, pendulum_scale=pendulum_scale, window_duration=window_duration, - motor_params=motor_params, ) log.info("parallel_workers", n_workers=n_workers) @@ -428,6 +480,19 @@ def optimize( gen_best=f"{min(c for _, c in solutions):.6f}", ) + # Early stopping: no improvement in last 50 generations. + patience = 50 + if len(history) > patience: + old_cost = history[-patience][1] + if old_cost - best_cost < old_cost * 0.001: + log.info( + "early_stopping", + gen=gen, + best_cost=f"{best_cost:.6f}", + stall_gens=patience, + ) + break + total_time = time.monotonic() - t0 # Clean up process pool. @@ -441,7 +506,8 @@ def optimize( "cmaes_finished", best_cost=f"{best_cost:.6f}", total_time=f"{total_time:.1f}s", - evaluations=max_generations * population_size, + generations=len(history), + evaluations=len(history) * population_size, ) # Log parameter comparison. @@ -465,7 +531,6 @@ def optimize( "recording": str(recording_path), "param_names": [s.name for s in specs], "defaults": {s.name: s.default for s in specs}, - "motor_params": motor_params, "preprocess_vel": preprocess_vel, "timestamp": datetime.now().isoformat(), } @@ -516,16 +581,12 @@ def main() -> None: action="store_true", help="Skip Savitzky-Golay velocity preprocessing", ) - parser.add_argument("--sg-window", type=int, default=7, - help="Savitzky-Golay window length (odd, default 7)") - parser.add_argument("--sg-polyorder", type=int, default=3, - help="Savitzky-Golay polynomial order (default 3)") parser.add_argument( "--param-set", type=str, default="full", choices=list(PARAM_SETS.keys()), - help="Parameter set to optimize: 'reduced' (6 params, fast) or 'full' (15 params)", + help="Parameter set to optimize: 'full' (28), 'motor' (13), or 'pendulum' (15)", ) args = parser.parse_args() @@ -546,8 +607,6 @@ def main() -> None: window_duration=args.window_duration, seed=args.seed, preprocess_vel=not args.no_preprocess_vel, - sg_window=args.sg_window, - sg_polyorder=args.sg_polyorder, ) # Save results JSON. @@ -572,7 +631,6 @@ def main() -> None: export_tuned_files( robot_path=args.robot_path, params=result["best_params"], - motor_params=result.get("motor_params"), ) diff --git a/src/sysid/preprocess.py b/src/sysid/preprocess.py new file mode 100644 index 0000000..c61e912 --- /dev/null +++ b/src/sysid/preprocess.py @@ -0,0 +1,158 @@ +"""Recording preprocessing — clean velocity estimation from angle data. + +The ESP32 firmware computes velocity as a raw finite-difference of encoder +counts at 50 Hz. With a 1320 CPR encoder that gives ~0.24 rad/s of +quantisation noise per count. This module replaces the noisy firmware +velocity with a smooth differentiation of the (much cleaner) angle signal. + +Pipeline: + 1. Dequantize angle signal (Gaussian smooth to round off staircase edges) + 2. Savitzky-Golay differentiation to compute velocity (zero phase lag) + +This is standard practice in robotics sysid — see e.g. MATLAB System ID +Toolbox, Drake's trajectory processing, or ETH's ANYmal sysid pipeline. +""" + +from __future__ import annotations + +import numpy as np +from scipy.ndimage import gaussian_filter1d +from scipy.signal import savgol_filter + +import structlog + +log = structlog.get_logger() + +# Default encoder CPR for angle dequantization. +_DEFAULT_CPR = 1320 + + +def dequantize_angle( + angles: np.ndarray, + cpr: int = _DEFAULT_CPR, + sigma: float = 0.6, +) -> np.ndarray: + """Smooth out encoder quantization staircase before differentiation. + + A 1320 CPR encoder quantizes the angle signal into 0.00476 rad steps. + When differentiated, these steps produce velocity spikes (Dirac-like). + A tiny Gaussian blur (sigma=0.6 samples ~ 12ms at 50 Hz) rounds the + staircase edges without affecting dynamics (<2 Hz bandwidth loss). + """ + if sigma <= 0 or cpr <= 0: + return angles + return gaussian_filter1d(angles, sigma=sigma) + + +def recompute_velocity( + recording: dict[str, np.ndarray], + window_length: int = 11, + polyorder: int = 3, + deriv: int = 1, + dequantize: bool = True, + dequant_sigma: float = 0.6, + keys: tuple[str, str] = ("motor_angle", "motor_vel"), +) -> dict[str, np.ndarray]: + """Recompute velocity from angle using Savitzky-Golay differentiation. + + Parameters + ---------- + recording : dict with at least 'time' and the angle/vel keys. + window_length : SG filter window (must be odd, > polyorder). + polyorder : Polynomial order for the SG filter (3 = cubic). + deriv : Derivative order (1 = first derivative = velocity). + dequantize : Whether to smooth encoder quantization staircase. + dequant_sigma : Gaussian sigma for dequantization (samples). + keys : (angle_key, vel_key) to process. + + Returns + ------- + New recording dict with vel replaced and vel_raw added. + """ + angle_key, vel_key = keys + rec = dict(recording) + + times = rec["time"] + angles = rec[angle_key].copy() + dt = np.mean(np.diff(times)) + + rec[f"{vel_key}_raw"] = rec[vel_key].copy() + + if dequantize: + angles = dequantize_angle(angles, sigma=dequant_sigma) + + vel_sg = savgol_filter( + angles, + window_length=window_length, + polyorder=polyorder, + deriv=deriv, + delta=dt, + ) + + raw_vel = rec[f"{vel_key}_raw"] + noise_estimate = np.std(raw_vel - vel_sg) + max_diff = np.max(np.abs(raw_vel - vel_sg)) + + log.info( + "velocity_recomputed", + channel=vel_key, + method="savgol", + window=window_length, + polyorder=polyorder, + dt=f"{dt*1000:.1f}ms", + noise_std=f"{noise_estimate:.3f} rad/s", + max_diff=f"{max_diff:.3f} rad/s", + ) + + rec[vel_key] = vel_sg + return rec + + +def preprocess_recording( + recording: dict[str, np.ndarray], + preprocess_vel: bool = True, + sg_window: int = 11, + sg_polyorder: int = 3, +) -> dict[str, np.ndarray]: + """Preprocess a full-system recording (motor + pendulum velocities). + + Applies SG differentiation to both motor and pendulum channels. + """ + if not preprocess_vel: + return recording + + rec = recompute_velocity( + recording, + window_length=sg_window, + polyorder=sg_polyorder, + keys=("motor_angle", "motor_vel"), + ) + if "pendulum_angle" in rec and "pendulum_vel" in rec: + rec = recompute_velocity( + rec, + window_length=sg_window, + polyorder=sg_polyorder, + keys=("pendulum_angle", "pendulum_vel"), + ) + return rec + + +def match_filter_velocity( + sim_vel: np.ndarray, + real_vel: np.ndarray, + dt: float, + cutoff_hz: float = 8.0, +) -> tuple[np.ndarray, np.ndarray]: + """Apply identical lowpass to both sim and real velocity for fair comparison. + + MuJoCo sim velocity has full physics bandwidth while real velocity is + band-limited by the SG filter. Matching ensures fair cost comparison. + """ + from scipy.signal import butter, filtfilt + + fs = 1.0 / dt + nyq = fs / 2.0 + norm_cutoff = min(cutoff_hz / nyq, 0.99) + + b, a = butter(2, norm_cutoff, btype="low") + return filtfilt(b, a, sim_vel), filtfilt(b, a, real_vel) diff --git a/src/sysid/rollout.py b/src/sysid/rollout.py index 7ac2e96..cf386ad 100644 --- a/src/sysid/rollout.py +++ b/src/sysid/rollout.py @@ -7,12 +7,11 @@ the simulated trajectory for comparison with the real recording. This module is the inner loop of the CMA-ES optimizer: it is called once per candidate parameter vector per generation. -Motor parameters are **locked** from the motor-only sysid result. -The optimizer only fits -pendulum/arm inertial parameters, pendulum joint dynamics, and -``ctrl_limit``. The asymmetric motor model (deadzone, gear compensation, -Coulomb friction, viscous damping, quadratic drag, back-EMF) is applied -via ``ActuatorConfig.transform_ctrl()`` and ``compute_motor_force()``. +All motor + pendulum/arm parameters are optimised jointly from a single +full-system recording. The asymmetric motor model (deadzone, gear +compensation, Coulomb friction + Stribeck, viscous damping, action bias) +is applied via ``ActuatorConfig.transform_ctrl()`` and +``compute_motor_force()``. """ from __future__ import annotations @@ -32,26 +31,6 @@ from src.runners.mujoco import ActuatorLimits, load_mujoco_model from src.sysid._urdf import patch_link_inertials -# ── Locked motor parameters (from motor-only sysid) ───────────────── -# These are FIXED and not optimised. They come from the 12-param model -# in robot.yaml (from motor-only sysid, cost 0.862). - -LOCKED_MOTOR_PARAMS: dict[str, float] = { - "actuator_gear_pos": 0.424182, - "actuator_gear_neg": 0.425031, - "actuator_filter_tau": 0.00503506, - "motor_damping_pos": 0.00202682, - "motor_damping_neg": 0.0146651, - "motor_armature": 0.00277342, - "motor_frictionloss_pos": 0.0573282, - "motor_frictionloss_neg": 0.0533549, - "viscous_quadratic": 0.000285329, - "back_emf_gain": 0.00675809, - "motor_deadzone_pos": 0.141291, - "motor_deadzone_neg": 0.0780148, -} - - # ── Tunable parameter specification ────────────────────────────────── @@ -66,16 +45,31 @@ class ParamSpec: log_scale: bool = False # optimise in log-space (masses, inertias) -# Pendulum sysid parameters — motor params are LOCKED (not here). -# Order matters: the optimizer maps a flat vector to these specs. -# Defaults are from the URDF exported by Fusion 360. -ROTARY_CARTPOLE_PARAMS: list[ParamSpec] = [ - # ── Arm link (URDF) ────────────────────────────────────────── +# ── Motor parameters (13) ──────────────────────────────────────────── +# Defaults from motor-only sysid (cost 0.2117). +MOTOR_PARAMS: list[ParamSpec] = [ + ParamSpec("actuator_gear_pos", 0.371194, 0.005, 1.5, log_scale=True), + ParamSpec("actuator_gear_neg", 0.428143, 0.005, 1.5, log_scale=True), + ParamSpec("actuator_filter_tau", 0.022301, 0.001, 0.20), + ParamSpec("motor_damping_pos", 0.001384, 1e-5, 0.1, log_scale=True), + ParamSpec("motor_damping_neg", 0.005196, 1e-5, 0.1, log_scale=True), + ParamSpec("motor_armature", 0.002753, 1e-6, 0.01, log_scale=True), + ParamSpec("motor_frictionloss_pos", 0.036744, 0.001, 0.2, log_scale=True), + ParamSpec("motor_frictionloss_neg", 0.069082, 0.001, 0.2, log_scale=True), + ParamSpec("stribeck_friction_boost", 0.0, 0.0, 0.15), + ParamSpec("stribeck_vel", 2.0, 0.1, 8.0), + ParamSpec("motor_deadzone_pos", 0.14182, 0.0, 0.30), + ParamSpec("motor_deadzone_neg", 0.031454, 0.0, 0.30), + ParamSpec("action_bias", 0.0, -0.10, 0.10), +] + +# ── Pendulum/arm parameters (15) ───────────────────────────────────── +# Defaults from Fusion 360 URDF export. +PENDULUM_PARAMS: list[ParamSpec] = [ ParamSpec("arm_mass", 0.02110, 0.005, 0.08, log_scale=True), ParamSpec("arm_com_x", -0.00710, -0.03, 0.03), ParamSpec("arm_com_y", 0.00085, -0.02, 0.02), ParamSpec("arm_com_z", 0.00795, -0.02, 0.02), - # ── Pendulum link (URDF) ───────────────────────────────────── ParamSpec("pendulum_mass", 0.03937, 0.010, 0.10, log_scale=True), ParamSpec("pendulum_com_x", 0.06025, 0.01, 0.15), ParamSpec("pendulum_com_y", -0.07602, -0.20, 0.0), @@ -84,37 +78,18 @@ ROTARY_CARTPOLE_PARAMS: list[ParamSpec] = [ ParamSpec("pendulum_iyy", 3.70e-05, 1e-07, 1e-03, log_scale=True), ParamSpec("pendulum_izz", 7.83e-05, 1e-07, 1e-03, log_scale=True), ParamSpec("pendulum_ixy", -6.93e-06, -1e-03, 1e-03), - # ── Pendulum joint dynamics ────────────────────────────────── - ParamSpec("pendulum_damping", 0.0001, 1e-6, 0.05, log_scale=True), - ParamSpec("pendulum_frictionloss", 0.0001, 1e-6, 0.05, log_scale=True), - # ── Hardware realism (control pipeline) ──────────────────── - ParamSpec("ctrl_limit", 0.588, 0.45, 0.70), # MAX_MOTOR_SPEED / 255 -] - - -# Extended set: full params + motor armature (compensates for the -# motor-only sysid having captured arm/pendulum loading in armature). -EXTENDED_PARAMS: list[ParamSpec] = ROTARY_CARTPOLE_PARAMS + [ - ParamSpec("motor_armature", 0.00277, 0.0005, 0.02, log_scale=True), -] - - -# Reduced set: only the 6 most impactful pendulum parameters. -# Good for a fast first pass — converges in ~50 generations. -REDUCED_PARAMS: list[ParamSpec] = [ - ParamSpec("pendulum_mass", 0.03937, 0.010, 0.10, log_scale=True), - ParamSpec("pendulum_com_x", 0.06025, 0.01, 0.15), - ParamSpec("pendulum_com_y", -0.07602, -0.20, 0.0), - ParamSpec("pendulum_ixx", 6.20e-05, 1e-07, 1e-03, log_scale=True), ParamSpec("pendulum_damping", 0.0001, 1e-6, 0.05, log_scale=True), ParamSpec("pendulum_frictionloss", 0.0001, 1e-6, 0.05, log_scale=True), + ParamSpec("ctrl_limit", 0.588, 0.45, 0.70), ] +# ── Combined parameter sets ────────────────────────────────────────── +ROTARY_CARTPOLE_PARAMS: list[ParamSpec] = MOTOR_PARAMS + PENDULUM_PARAMS PARAM_SETS: dict[str, list[ParamSpec]] = { "full": ROTARY_CARTPOLE_PARAMS, - "extended": EXTENDED_PARAMS, - "reduced": REDUCED_PARAMS, + "motor": MOTOR_PARAMS, + "pendulum": PENDULUM_PARAMS, } @@ -134,6 +109,13 @@ def defaults_vector(specs: list[ParamSpec] | None = None) -> np.ndarray: return np.array([s.default for s in specs], dtype=np.float64) +def defaults_dict(specs: list[ParamSpec] | None = None) -> dict[str, float]: + """Return the default parameter dict (all 28 params).""" + if specs is None: + specs = ROTARY_CARTPOLE_PARAMS + return {s.name: s.default for s in specs} + + def bounds_arrays( specs: list[ParamSpec] | None = None, ) -> tuple[np.ndarray, np.ndarray]: @@ -148,19 +130,23 @@ def bounds_arrays( # ── MuJoCo model building with parameter overrides ────────────────── +def _resolve_params(params: dict[str, float]) -> dict[str, float]: + """Fill in defaults for any missing params.""" + full = defaults_dict() + full.update(params) + return full + + def _build_model( robot_path: Path, params: dict[str, float], - motor_params: dict[str, float] | None = None, ) -> tuple[mujoco.MjModel, ActuatorConfig]: """Build a MuJoCo model with sysid overrides. Returns (model, actuator) — use ``actuator.transform_ctrl()`` and ``actuator.compute_motor_force()`` in the rollout loop. """ - if motor_params is None: - motor_params = LOCKED_MOTOR_PARAMS - + p = _resolve_params(params) robot_path = Path(robot_path).resolve() # ── Patch URDF inertial parameters to a temp file ──────────── @@ -168,7 +154,7 @@ def _build_model( urdf_path = robot_path / robot_yaml["urdf"] tree = ET.parse(urdf_path) - patch_link_inertials(tree.getroot(), params) + patch_link_inertials(tree.getroot(), p) fd, tmp_urdf = tempfile.mkstemp( suffix=".urdf", prefix="_sysid_", dir=str(robot_path), @@ -177,39 +163,22 @@ def _build_model( tmp_urdf_path = Path(tmp_urdf) tree.write(str(tmp_urdf_path), xml_declaration=True, encoding="unicode") - # ── Build RobotConfig with full motor sysid values ─────────── - gear_pos = motor_params.get("actuator_gear_pos", 0.424182) - gear_neg = motor_params.get("actuator_gear_neg", 0.425031) - motor_armature = params.get( - "motor_armature", - motor_params.get("motor_armature", 0.00277342), - ) - pend_damping = params.get("pendulum_damping", 0.0001) - pend_frictionloss = params.get("pendulum_frictionloss", 0.0001) - + # ── Build RobotConfig with full motor model ────────────────── act_cfg = robot_yaml["actuators"][0] - ctrl_lo, ctrl_hi = act_cfg.get("ctrl_range", [-1.0, 1.0]) actuator = ActuatorConfig( joint=act_cfg["joint"], type="motor", - gear=(gear_pos, gear_neg), - ctrl_range=(ctrl_lo, ctrl_hi), - deadzone=( - motor_params.get("motor_deadzone_pos", 0.141), - motor_params.get("motor_deadzone_neg", 0.078), - ), - damping=( - motor_params.get("motor_damping_pos", 0.002), - motor_params.get("motor_damping_neg", 0.015), - ), - frictionloss=( - motor_params.get("motor_frictionloss_pos", 0.057), - motor_params.get("motor_frictionloss_neg", 0.053), - ), - filter_tau=motor_params.get("actuator_filter_tau", 0.005), - viscous_quadratic=motor_params.get("viscous_quadratic", 0.000285), - back_emf_gain=motor_params.get("back_emf_gain", 0.00676), + gear=(p["actuator_gear_pos"], p["actuator_gear_neg"]), + ctrl_range=(act_cfg.get("ctrl_range", [-1.0, 1.0])[0], + act_cfg.get("ctrl_range", [-1.0, 1.0])[1]), + deadzone=(p["motor_deadzone_pos"], p["motor_deadzone_neg"]), + damping=(p["motor_damping_pos"], p["motor_damping_neg"]), + frictionloss=(p["motor_frictionloss_pos"], p["motor_frictionloss_neg"]), + stribeck_friction_boost=p["stribeck_friction_boost"], + stribeck_vel=p["stribeck_vel"], + action_bias=p["action_bias"], + filter_tau=p["actuator_filter_tau"], ) robot = RobotConfig( @@ -218,12 +187,12 @@ def _build_model( joints={ "motor_joint": JointConfig( damping=0.0, - armature=motor_armature, + armature=p["motor_armature"], frictionloss=0.0, ), "pendulum_joint": JointConfig( - damping=pend_damping, - frictionloss=pend_frictionloss, + damping=p["pendulum_damping"], + frictionloss=p["pendulum_frictionloss"], ), }, ) @@ -236,7 +205,136 @@ def _build_model( return model, actuator +def build_base_model( + robot_path: str | Path, + params: dict[str, float] | None = None, +) -> tuple[mujoco.MjModel, ActuatorConfig, dict[str, int], dict[str, int]]: + """Build a base MuJoCo model once (with default or given params). + Returns (model, actuator, body_ids, dof_ids) where body_ids/dof_ids + map names to indices for fast in-place patching. + """ + if params is None: + params = defaults_dict() + model, actuator = _build_model(Path(robot_path).resolve(), params) + + body_ids = {} + for name in ("arm", "pendulum"): + body_ids[name] = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_BODY, name) + + dof_ids = {} + for name in ("motor_joint", "pendulum_joint"): + jnt_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_JOINT, name) + dof_ids[name] = model.jnt_dofadr[jnt_id] + + return model, actuator, body_ids, dof_ids + + +def _rotmat_to_quat(R: np.ndarray) -> np.ndarray: + """Convert a 3×3 rotation matrix to a MuJoCo quaternion [w, x, y, z].""" + quat = np.empty(4) + mujoco.mju_mat2Quat(quat, R.flatten()) + return quat + + +def _patch_pendulum_inertia( + model: mujoco.MjModel, + body_id: int, + params: dict[str, float], +) -> None: + """Diagonalize pendulum inertia tensor and write to model.""" + defs = defaults_dict() + + ixx = params.get("pendulum_ixx", defs["pendulum_ixx"]) + iyy = params.get("pendulum_iyy", defs["pendulum_iyy"]) + izz = params.get("pendulum_izz", defs["pendulum_izz"]) + ixy = params.get("pendulum_ixy", defs.get("pendulum_ixy", 0.0)) + + # Build full 3×3 inertia tensor (ixz = iyz = 0 in our URDF). + I = np.array([ + [ixx, ixy, 0.0], + [ixy, iyy, 0.0], + [0.0, 0.0, izz], + ]) + + # Eigendecompose — eigenvalues are principal moments, eigenvectors + # define the rotation from link frame to principal frame. + eigenvalues, eigenvectors = np.linalg.eigh(I) + + # eigh returns ascending order; MuJoCo wants them in the order that + # the eigenvector matrix forms a proper rotation (det = +1). + if np.linalg.det(eigenvectors) < 0: + eigenvectors[:, 0] *= -1 + + # Clamp tiny/negative eigenvalues (numerical noise). + eigenvalues = np.maximum(eigenvalues, 1e-10) + + # Enforce triangle inequality (required for valid rigid body inertia). + # For principal moments sorted ascending (a <= b <= c): a + b >= c. + # When violated, MuJoCo falls back to spherical inertia (trace/3). + # We must match this to keep fast path consistent with URDF loading. + s = np.sort(eigenvalues) + if s[0] + s[1] < s[2]: + sphere = np.sum(eigenvalues) / 3.0 + eigenvalues[:] = sphere + eigenvectors = np.eye(3) + + model.body_inertia[body_id] = eigenvalues + model.body_iquat[body_id] = _rotmat_to_quat(eigenvectors) + + +def patch_model( + model: mujoco.MjModel, + params: dict[str, float], + body_ids: dict[str, int], + dof_ids: dict[str, int], +) -> None: + """Patch a cached MuJoCo model in-place with new sysid parameters. + + Much faster than _build_model: no XML parsing, no temp files, no + model reload. Just direct array writes. + """ + p = _resolve_params(params) + + # ── Arm body ───────────────────────────────────────────────── + arm_id = body_ids["arm"] + model.body_mass[arm_id] = p["arm_mass"] + for i, key in enumerate(("arm_com_x", "arm_com_y", "arm_com_z")): + model.body_ipos[arm_id, i] = p[key] + + # ── Pendulum body ──────────────────────────────────────────── + pend_id = body_ids["pendulum"] + model.body_mass[pend_id] = p["pendulum_mass"] + for i, key in enumerate(("pendulum_com_x", "pendulum_com_y", "pendulum_com_z")): + model.body_ipos[pend_id, i] = p[key] + + _patch_pendulum_inertia(model, pend_id, p) + + # ── Joint dynamics ─────────────────────────────────────────── + motor_dof = dof_ids["motor_joint"] + model.dof_armature[motor_dof] = p["motor_armature"] + + pend_dof = dof_ids["pendulum_joint"] + model.dof_damping[pend_dof] = p["pendulum_damping"] + model.dof_frictionloss[pend_dof] = p["pendulum_frictionloss"] + + +def _make_actuator(params: dict[str, float]) -> ActuatorConfig: + """Build an ActuatorConfig from a params dict (for fast-path use).""" + p = _resolve_params(params) + return ActuatorConfig( + joint="motor_joint", + type="motor", + gear=(p["actuator_gear_pos"], p["actuator_gear_neg"]), + ctrl_range=(-1.0, 1.0), + deadzone=(p["motor_deadzone_pos"], p["motor_deadzone_neg"]), + damping=(p["motor_damping_pos"], p["motor_damping_neg"]), + frictionloss=(p["motor_frictionloss_pos"], p["motor_frictionloss_neg"]), + stribeck_friction_boost=p["stribeck_friction_boost"], + stribeck_vel=p["stribeck_vel"], + action_bias=p["action_bias"], + filter_tau=p["actuator_filter_tau"], + ) # ── Simulation rollout ─────────────────────────────────────────────── @@ -248,35 +346,30 @@ def rollout( actions: np.ndarray, sim_dt: float = 0.002, substeps: int = 10, - motor_params: dict[str, float] | None = None, ) -> dict[str, np.ndarray]: """Replay recorded actions in MuJoCo with overridden parameters. Parameters ---------- robot_path : asset directory - params : named parameter overrides (pendulum/arm only) + params : named parameter overrides (all motor + pendulum params) actions : (N,) normalised actions [-1, 1] from the recording sim_dt : MuJoCo physics timestep substeps : physics substeps per control step - motor_params : locked motor params (default: LOCKED_MOTOR_PARAMS) Returns ------- dict with keys: motor_angle, motor_vel, pendulum_angle, pendulum_vel - Each is an (N,) numpy array of simulated values. """ - if motor_params is None: - motor_params = LOCKED_MOTOR_PARAMS - robot_path = Path(robot_path).resolve() - model, actuator = _build_model(robot_path, params, motor_params) + model, actuator = _build_model(robot_path, params) model.opt.timestep = sim_dt data = mujoco.MjData(model) mujoco.mj_resetData(model, data) n = len(actions) - ctrl_limit = params.get("ctrl_limit", 0.588) + p = _resolve_params(params) + ctrl_limit = p["ctrl_limit"] sim_motor_angle = np.zeros(n, dtype=np.float64) sim_motor_vel = np.zeros(n, dtype=np.float64) @@ -315,41 +408,27 @@ def windowed_rollout( window_duration: float = 0.5, sim_dt: float = 0.002, substeps: int = 10, - motor_params: dict[str, float] | None = None, ) -> dict[str, np.ndarray | float]: """Multiple-shooting rollout — split recording into short windows. - For each window: - 1. Initialize MuJoCo state from the real qpos/qvel at the window start. - 2. Replay the recorded actions within the window. - 3. Record the simulated output. - - Motor dynamics (asymmetric friction, damping, back-EMF, etc.) are - applied via qfrc_applied using the locked motor sysid parameters. - Parameters ---------- robot_path : asset directory - params : named parameter overrides (pendulum/arm only) + params : named parameter overrides (all motor + pendulum params) recording : dict with keys time, action, motor_angle, motor_vel, pendulum_angle, pendulum_vel (all 1D arrays of length N) window_duration : length of each shooting window in seconds sim_dt : MuJoCo physics timestep substeps : physics substeps per control step - motor_params : locked motor params (default: LOCKED_MOTOR_PARAMS) Returns ------- dict with: motor_angle, motor_vel, pendulum_angle, pendulum_vel — (N,) arrays - (stitched from per-window simulations) n_windows — number of windows used """ - if motor_params is None: - motor_params = LOCKED_MOTOR_PARAMS - robot_path = Path(robot_path).resolve() - model, actuator = _build_model(robot_path, params, motor_params) + model, actuator = _build_model(robot_path, params) model.opt.timestep = sim_dt data = mujoco.MjData(model) @@ -378,7 +457,8 @@ def windowed_rollout( window_starts.append(idx) current_t += window_duration - ctrl_limit = params.get("ctrl_limit", 0.588) + p = _resolve_params(params) + ctrl_limit = p["ctrl_limit"] n_windows = len(window_starts) for w, w_start in enumerate(window_starts): diff --git a/src/sysid/visualize.py b/src/sysid/visualize.py index 0aed788..30e8818 100644 --- a/src/sysid/visualize.py +++ b/src/sysid/visualize.py @@ -35,7 +35,6 @@ def _run_sim( window_duration: float, sim_dt: float, substeps: int, - motor_params: dict[str, float], ) -> dict[str, np.ndarray]: """Run windowed or open-loop rollout depending on window_duration.""" from src.sysid.rollout import rollout, windowed_rollout @@ -44,11 +43,11 @@ def _run_sim( return windowed_rollout( robot_path=robot_path, params=params, recording=recording, window_duration=window_duration, sim_dt=sim_dt, - substeps=substeps, motor_params=motor_params, + substeps=substeps, ) return rollout( robot_path=robot_path, params=params, actions=recording["action"], - substeps=substeps, motor_params=motor_params, + substeps=substeps, ) @@ -66,7 +65,6 @@ def visualize( import matplotlib.pyplot as plt from src.sysid.rollout import ( - LOCKED_MOTOR_PARAMS, ROTARY_CARTPOLE_PARAMS, defaults_vector, params_to_dict, @@ -75,12 +73,10 @@ def visualize( robot_path = Path(robot_path).resolve() recording = dict(np.load(recording_path)) - motor_params = LOCKED_MOTOR_PARAMS - sim_kwargs = dict( robot_path=robot_path, recording=recording, window_duration=window_duration, sim_dt=sim_dt, - substeps=substeps, motor_params=motor_params, + substeps=substeps, ) t = recording["time"] @@ -172,7 +168,6 @@ def visualize( sim_dt=sim_dt, substeps=substeps, window_duration=window_duration, - motor_params=motor_params, ) title += f"\nOriginal cost: {orig_cost:.4f} → Tuned cost: {tuned_cost:.4f}" improvement = (1.0 - tuned_cost / orig_cost) * 100 if orig_cost > 0 else 0 diff --git a/tests/test_sim2real.py b/tests/test_sim2real.py new file mode 100644 index 0000000..ff673e1 --- /dev/null +++ b/tests/test_sim2real.py @@ -0,0 +1,39 @@ +"""Unit tests for MuJoCoRunner domain randomization.""" + +import dataclasses + +import numpy as np +import pytest + +from src.runners.mujoco import DomainRandConfig, MuJoCoRunnerConfig + + +class TestDomainRandConfig: + def test_default_all_zero(self) -> None: + cfg = DomainRandConfig() + assert cfg.mass_frac == 0.0 + assert cfg.friction_frac == 0.0 + assert cfg.gear_frac == 0.0 + + def test_from_dict(self) -> None: + d = {"mass_frac": 0.15, "gear_frac": 0.1} + cfg = DomainRandConfig(**d) + assert cfg.mass_frac == 0.15 + assert cfg.gear_frac == 0.1 + assert cfg.damping_frac == 0.0 # not set + + +class TestMuJoCoRunnerConfig: + def test_default_dr_disabled(self) -> None: + cfg = MuJoCoRunnerConfig() + assert isinstance(cfg.domain_rand, DomainRandConfig) + assert cfg.domain_rand.mass_frac == 0.0 + + def test_domain_rand_from_dict(self) -> None: + """Hydra passes nested configs as dicts — test __post_init__ converts.""" + cfg = MuJoCoRunnerConfig( + domain_rand={"mass_frac": 0.2, "friction_frac": 0.3}, # type: ignore[arg-type] + ) + assert isinstance(cfg.domain_rand, DomainRandConfig) + assert cfg.domain_rand.mass_frac == 0.2 + assert cfg.domain_rand.friction_frac == 0.3