diff --git a/.gitignore b/.gitignore index 3fe1d68..be4beca 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ outputs/ runs/ smac3_output/ +training_log.txt # MuJoCo MUJOCO_LOG.TXT diff --git a/assets/rotary_cartpole/robot.yaml b/assets/rotary_cartpole/robot.yaml index 09ad859..9336fc5 100644 --- a/assets/rotary_cartpole/robot.yaml +++ b/assets/rotary_cartpole/robot.yaml @@ -1,23 +1,21 @@ -# Tuned robot config — generated by src.sysid (2026-03-28) -# Unified sysid: cost 0.925 (28 params, 0.2s windows, amplitude 200) +# Tuned robot config — generated by src.sysid.optimize -urdf: rotary_cartpole_tuned.urdf +urdf: rotary_cartpole.urdf actuators: - joint: motor_joint type: motor - gear: [0.846499, 1.183733] # torque constant [pos, neg] - ctrl_range: [-0.686251, 0.686251] # effective control bound - deadzone: [0.181097, 0.202072] # L298N min |ctrl| for torque [pos, neg] - damping: [0.013165, 0.015452] # viscous damping [pos, neg] - frictionloss: [0.014244, 0.001005] # Coulomb friction [pos, neg] - filter_tau: 0.096263 # 1st-order actuator filter - stribeck_friction_boost: 0.068594 # low-velocity friction boost - stribeck_vel: 5.279594 # Stribeck velocity scale (rad/s) - action_bias: 0.056566 # asymmetric bias offset + gear: [0.424182, 0.425031] # torque constant [pos, neg] (motor sysid) + ctrl_range: [-0.592, 0.592] # effective control bound (sysid-tuned) + deadzone: [0.141291, 0.078015] # L298N min |ctrl| for torque [pos, neg] + damping: [0.002027, 0.014665] # viscous damping [pos, neg] + frictionloss: [0.057328, 0.053355] # Coulomb friction [pos, neg] + filter_tau: 0.005035 # 1st-order actuator filter (motor sysid) + viscous_quadratic: 0.000285 # velocity² drag + back_emf_gain: 0.006758 # back-EMF torque reduction joints: motor_joint: - armature: 0.001676 # reflected rotor inertia + armature: 0.002773 # reflected rotor inertia (motor sysid) frictionloss: 0.0 # handled by motor model via qfrc_applied pendulum_joint: - damping: 1.2e-05 # pendulum pivot damping - frictionloss: 7.2e-05 # pendulum pivot friction + damping: 0.000119 + frictionloss: 1.0e-05 diff --git a/configs/env/rotary_cartpole.yaml b/configs/env/rotary_cartpole.yaml index 23bb616..925860e 100644 --- a/configs/env/rotary_cartpole.yaml +++ b/configs/env/rotary_cartpole.yaml @@ -1,12 +1,18 @@ max_steps: 1000 robot_path: assets/rotary_cartpole reward_upright_scale: 1.0 +alive_bonus: 0.25 # per-step survival bonus (living must beat dying) +balance_bonus: 2.0 # extra reward for upright AND still (beats spinning) +balance_vel_scale: 0.5 # how fast the balance bonus decays with pendulum speed # ── Regularisation penalties (prevent fast spinning) ───────────────── motor_vel_penalty: 0.01 # penalise high motor angular velocity motor_angle_penalty: 0.05 # penalise deviation from centre action_penalty: 0.05 # penalise large actions (energy cost) +# ── Initial state randomisation ────────────────────────────────────── +pendulum_init_range_deg: 180.0 # pendulum starts in [-180°, +180°] + # ── Software safety limit (env-level, always applied) ──────────────── motor_angle_limit_deg: 90.0 # terminate episode if motor exceeds ±90° @@ -16,4 +22,5 @@ hpo: motor_vel_penalty: {min: 0.001, max: 0.1} motor_angle_penalty: {min: 0.01, max: 0.2} action_penalty: {min: 0.01, max: 0.2} + pendulum_init_range_deg: {min: 30.0, max: 180.0} max_steps: {values: [500, 1000, 2000]} \ No newline at end of file diff --git a/configs/runner/mjx.yaml b/configs/runner/mjx.yaml index 4d6fc5a..f695009 100644 --- a/configs/runner/mjx.yaml +++ b/configs/runner/mjx.yaml @@ -3,3 +3,15 @@ device: auto # auto = cuda if available, else cpu dt: 0.002 substeps: 10 history_length: 10 # RMA-style: 10-step window of (obs, action) pairs + +rma_mode: "none" # "none" | "teacher" | "deploy" + +# ── Domain randomization (sim-to-real) ────────────────────────────── +# NOTE: action-delay and sensor-noise are applied for MJX, but the +# per-env dynamics *scales* (friction/damping/torque) are NOT yet wired +# into the JIT step — use runner=mujoco for scale randomization, or keep +# this block to delay+noise only on MJX. +domain_rand: + qpos_noise_std: 0.01 # rad — encoder angle noise + qvel_noise_std: 0.5 # rad/s — velocity-estimate noise (measured) + action_delay_steps: [0, 2] # control-step latency (0–40 ms) diff --git a/configs/runner/mujoco.yaml b/configs/runner/mujoco.yaml index 0cb1e5d..8502348 100644 --- a/configs/runner/mujoco.yaml +++ b/configs/runner/mujoco.yaml @@ -2,13 +2,17 @@ num_envs: 64 device: auto # auto = cuda if available, else cpu dt: 0.002 substeps: 10 -history_length: 10 # RMA-style: 10-step window of (obs, action) pairs +history_length: 10 # must match training.history_length (DR + embedding) -# ── Sim2real: domain randomization ─────────────────────────────── +rma_mode: "none" # "none" | "teacher" | "deploy" + +# ── Domain randomization (sim-to-real) ────────────────────────────── +# Noise/delay levels anchored to the real recordings (~50 Hz, ~0.5 rad/s +# velocity noise, ≤1-step latency). Set domain_rand: {} to disable. domain_rand: - mass_frac: 0.15 # ±15% body mass randomization - friction_frac: 0.3 # ±30% joint friction - damping_frac: 0.3 # ±30% joint damping - armature_frac: 0.2 # ±20% reflected rotor inertia - gear_frac: 0.15 # ±15% actuator gear ratio - com_offset: 0.005 # ±5mm center-of-mass shift + qpos_noise_std: 0.01 # rad — encoder angle noise + qvel_noise_std: 0.5 # rad/s — velocity-estimate noise (measured) + action_delay_steps: [0, 2] # control-step latency (0–40 ms) + friction_scale: [0.6, 1.6] # Coulomb-friction multiplier + damping_scale: [0.6, 1.6] # viscous-damping multiplier + torque_scale: [0.85, 1.15] # motor-constant / battery-voltage variation diff --git a/configs/runner/mujoco_single.yaml b/configs/runner/mujoco_single.yaml index d1b4e2c..75f7939 100644 --- a/configs/runner/mujoco_single.yaml +++ b/configs/runner/mujoco_single.yaml @@ -6,3 +6,12 @@ device: cpu dt: 0.002 substeps: 10 history_length: 10 + +rma_mode: "none" # "none" | "teacher" | "deploy" + +# Clean by default (deterministic eval). Confirming-experiment example — +# re-eval an existing checkpoint in sim with a fixed 1-step action delay: +# mjpython scripts/eval.py env=rotary_cartpole runner=mujoco_single \ +# checkpoint=runs/.../agent_XXXX.pt \ +# '++runner.domain_rand.action_delay_steps=[1,1]' +domain_rand: {} diff --git a/configs/runner/serial.yaml b/configs/runner/serial.yaml index 69be7de..bb9e8bb 100644 --- a/configs/runner/serial.yaml +++ b/configs/runner/serial.yaml @@ -9,3 +9,5 @@ baud: 115200 dt: 0.02 # control loop period (50 Hz, matches training) no_data_timeout: 2.0 # seconds of silence before declaring disconnect history_length: 10 # must match training runner + +rma_mode: "none" # "none" | "teacher" | "deploy" diff --git a/configs/training/ppo.yaml b/configs/training/ppo.yaml index 9d6a355..c6c192b 100644 --- a/configs/training/ppo.yaml +++ b/configs/training/ppo.yaml @@ -1,19 +1,19 @@ -hidden_sizes: [128, 128] +hidden_sizes: [256, 256] total_timesteps: 5000000 -rollout_steps: 1024 -learning_epochs: 4 -mini_batches: 4 +rollout_steps: 2048 +learning_epochs: 10 +mini_batches: 8 discount_factor: 0.99 gae_lambda: 0.95 learning_rate: 0.0003 clip_ratio: 0.2 value_loss_scale: 0.5 -entropy_loss_scale: 0.05 +entropy_loss_scale: 0.01 log_interval: 1000 checkpoint_interval: 50000 -initial_log_std: 0.5 -min_log_std: -2.0 +initial_log_std: -0.5 +min_log_std: -4.0 max_log_std: 2.0 record_video_every: 10000 @@ -22,6 +22,10 @@ record_video_every: 10000 history_length: 10 # temporal window (must match runner) embedding_dim: 32 # history encoder output dimension +# RMA (Rapid Motor Adaptation) +rma_mode: "none" # "none" | "teacher" | "deploy" +latent_dim: 8 # env encoder / adaptation latent dimension + # ClearML remote execution (GPU worker) remote: false diff --git a/scripts/eval.py b/scripts/eval.py index 2f01266..f1fd2c2 100644 --- a/scripts/eval.py +++ b/scripts/eval.py @@ -74,14 +74,33 @@ def _infer_hidden_sizes(state_dict: dict[str, torch.Tensor]) -> tuple[int, ...]: return tuple(sizes) +def _infer_encoder_out_dim(state_dict: dict[str, torch.Tensor]) -> int | None: + """Return the history/adaptation encoder output dim, if present. + + Lets eval reconstruct an embedding policy without knowing the training + embedding_dim/latent_dim — read it straight from the saved weights. + """ + for key in ("history_encoder.fc.weight", "adaptation_module.fc.weight"): + if key in state_dict: + return state_dict[key].shape[0] + return None + + def load_policy( checkpoint_path: str, observation_space: spaces.Space, action_space: spaces.Space, device: torch.device = torch.device("cpu"), + history_length: int = 0, + rma_mode: str = "none", + raw_obs_dim: int = 0, ) -> tuple[SharedMLP, RunningStandardScaler]: """Load a trained SharedMLP + observation normalizer from a checkpoint. + For DR + history-embedding policies (history_length > 0) or RMA deploy + policies (rma_mode="deploy"), the history/adaptation encoder must be + reconstructed too — its output dim is read back from the saved weights. + Returns: (model, state_preprocessor) ready for inference. """ @@ -89,13 +108,20 @@ def load_policy( # Infer architecture from saved weights. hidden_sizes = _infer_hidden_sizes(ckpt["policy"]) + enc_out = _infer_encoder_out_dim(ckpt["policy"]) - # Reconstruct model. + # Reconstruct model — pass through the encoder config so a DR+embedding + # checkpoint rebuilds the history encoder with matching dimensions. model = SharedMLP( observation_space=observation_space, action_space=action_space, device=device, hidden_sizes=hidden_sizes, + history_length=history_length, + rma_mode=rma_mode, + raw_obs_dim=raw_obs_dim, + embedding_dim=enc_out or 32, # legacy "none" + history + latent_dim=enc_out or 8, # RMA deploy adaptation module ) model.load_state_dict(ckpt["policy"]) model.eval() @@ -194,7 +220,10 @@ def _eval_sim(cfg: DictConfig, env_name: str, checkpoint_path: str) -> None: device = runner.device model, preprocessor = load_policy( - checkpoint_path, runner.observation_space, runner.action_space, device + checkpoint_path, runner.observation_space, runner.action_space, device, + history_length=runner.config.history_length, + rma_mode=runner.config.rma_mode, + raw_obs_dim=runner.env.observation_space.shape[0], ) mj_model = runner._model @@ -280,7 +309,10 @@ def _eval_serial(cfg: DictConfig, env_name: str, checkpoint_path: str) -> None: device = serial_runner.device model, preprocessor = load_policy( - checkpoint_path, serial_runner.observation_space, serial_runner.action_space, device + checkpoint_path, serial_runner.observation_space, serial_runner.action_space, device, + history_length=serial_runner.config.history_length, + rma_mode=serial_runner.config.rma_mode, + raw_obs_dim=serial_runner.env.observation_space.shape[0], ) # Set up digital-twin MuJoCo model for visualization. diff --git a/scripts/train_adaptation.py b/scripts/train_adaptation.py new file mode 100644 index 0000000..9c42861 --- /dev/null +++ b/scripts/train_adaptation.py @@ -0,0 +1,296 @@ +"""RMA Phase 2: Train the adaptation module φ(history) → ẑ. + +Loads a Phase 1 (teacher) checkpoint, freezes the backbone + env_encoder, +and trains a HistoryEncoder (adaptation module) to predict the teacher's +latent z from observation-action history using supervised MSE. + +Usage: + python scripts/train_adaptation.py \ + --checkpoint runs//checkpoints/agent_XXXXX.pt \ + --env rotary_cartpole \ + --robot-path assets/rotary_cartpole \ + --num-envs 64 \ + --iterations 2000 \ + --lr 3e-4 +""" + +import argparse +import pathlib +import sys + +_PROJECT_ROOT = str(pathlib.Path(__file__).resolve().parent.parent) +if _PROJECT_ROOT not in sys.path: + sys.path.insert(0, _PROJECT_ROOT) + +import structlog +import torch +import tqdm +from gymnasium import spaces +from omegaconf import OmegaConf + +from src.core.registry import build_env +from src.models.mlp import SharedMLP, EnvironmentEncoder, HistoryEncoder +from src.runners.mujoco import MuJoCoRunner, MuJoCoRunnerConfig + +log = structlog.get_logger() + + +def _load_teacher_checkpoint( + path: str, obs_space: spaces.Space, act_space: spaces.Space, + device: torch.device, raw_obs_dim: int, mu_dim: int, + hidden_sizes: tuple[int, ...], latent_dim: int, +) -> SharedMLP: + """Reconstruct the teacher SharedMLP and load saved weights.""" + model = SharedMLP( + observation_space=obs_space, + action_space=act_space, + device=device, + hidden_sizes=hidden_sizes, + rma_mode="teacher", + raw_obs_dim=raw_obs_dim, + mu_dim=mu_dim, + latent_dim=latent_dim, + ) + ckpt = torch.load(path, map_location=device, weights_only=True) + # skrl saves under "policy" key with state_dict. + if "policy" in ckpt: + model.load_state_dict(ckpt["policy"]) + else: + model.load_state_dict(ckpt) + return model + + +def _build_deploy_model( + teacher: SharedMLP, + obs_space: spaces.Space, + act_space: spaces.Space, + device: torch.device, + raw_obs_dim: int, + history_length: int, + hidden_sizes: tuple[int, ...], + latent_dim: int, +) -> SharedMLP: + """Create a deploy-mode SharedMLP and copy backbone + heads from teacher.""" + model = SharedMLP( + observation_space=obs_space, + action_space=act_space, + device=device, + hidden_sizes=hidden_sizes, + rma_mode="deploy", + raw_obs_dim=raw_obs_dim, + history_length=history_length, + latent_dim=latent_dim, + ) + + # Copy backbone, policy head, value head from teacher. + model.net.load_state_dict(teacher.net.state_dict()) + model.mean_layer.load_state_dict(teacher.mean_layer.state_dict()) + model.value_layer.load_state_dict(teacher.value_layer.state_dict()) + model.log_std_parameter.data.copy_(teacher.log_std_parameter.data) + + return model + + +def main() -> None: + parser = argparse.ArgumentParser(description="RMA Phase 2: train adaptation module") + parser.add_argument("--checkpoint", required=True, help="Path to Phase 1 teacher checkpoint") + parser.add_argument("--env", default="rotary_cartpole") + parser.add_argument("--robot-path", default="assets/rotary_cartpole") + parser.add_argument("--num-envs", type=int, default=64) + parser.add_argument("--iterations", type=int, default=2000) + parser.add_argument("--rollout-steps", type=int, default=256, help="Steps per rollout") + parser.add_argument("--lr", type=float, default=3e-4) + parser.add_argument("--latent-dim", type=int, default=8) + parser.add_argument("--hidden-sizes", type=int, nargs="+", default=[128, 128]) + parser.add_argument("--history-length", type=int, default=10) + parser.add_argument("--output", default="checkpoints/adaptation.pt") + parser.add_argument("--device", default="cpu") + args = parser.parse_args() + + device = torch.device(args.device) + hidden_sizes = tuple(args.hidden_sizes) + + # ── Build env + runner (deploy mode with history + DR) ─────── + env_cfg = OmegaConf.create({"env": { + "robot_path": args.robot_path, + }}) + env = build_env(args.env, env_cfg) + + runner_cfg = MuJoCoRunnerConfig( + num_envs=args.num_envs, + device=args.device, + history_length=args.history_length, + rma_mode="deploy", + domain_rand={ + "qpos_noise_std": 0.01, + "qvel_noise_std": 0.5, + "action_delay_steps": [0, 2], + "friction_scale": [0.6, 1.6], + "damping_scale": [0.6, 1.6], + "torque_scale": [0.85, 1.15], + }, + ) + runner = MuJoCoRunner(env=env, config=runner_cfg) + + raw_obs_dim = env.observation_space.shape[0] + act_dim = env.action_space.shape[0] + mu_dim = runner.privileged_dim + + log.info( + "adaptation_setup", + raw_obs_dim=raw_obs_dim, + act_dim=act_dim, + mu_dim=mu_dim, + latent_dim=args.latent_dim, + history_length=args.history_length, + ) + + # ── Load teacher & build deploy model ──────────────────────── + # Teacher obs space: [raw_obs, μ] + teacher_obs_space = spaces.Box( + low=-torch.inf, high=torch.inf, shape=(raw_obs_dim + mu_dim,), + ) + + teacher = _load_teacher_checkpoint( + path=args.checkpoint, + obs_space=teacher_obs_space, + act_space=env.action_space, + device=device, + raw_obs_dim=raw_obs_dim, + mu_dim=mu_dim, + hidden_sizes=hidden_sizes, + latent_dim=args.latent_dim, + ) + teacher.eval() + for p in teacher.parameters(): + p.requires_grad_(False) + + # Deploy obs space: [raw_obs, history_flat] + step_dim = raw_obs_dim + act_dim + deploy_obs_space = spaces.Box( + low=-torch.inf, high=torch.inf, + shape=(raw_obs_dim + args.history_length * step_dim,), + ) + + deploy_model = _build_deploy_model( + teacher=teacher, + obs_space=deploy_obs_space, + act_space=env.action_space, + device=device, + raw_obs_dim=raw_obs_dim, + history_length=args.history_length, + hidden_sizes=hidden_sizes, + latent_dim=args.latent_dim, + ) + + # Freeze everything except the adaptation module. + for name, param in deploy_model.named_parameters(): + if "adaptation_module" not in name: + param.requires_grad_(False) + + optimizer = torch.optim.Adam( + deploy_model.adaptation_module.parameters(), lr=args.lr, + ) + + # ── Training loop ──────────────────────────────────────────── + log.info("starting_adaptation_training", iterations=args.iterations) + obs, _ = runner.reset() + + for iteration in tqdm.tqdm(range(args.iterations), desc="Adaptation"): + # Collect a rollout using the deploy model. + z_targets: list[torch.Tensor] = [] + z_preds: list[torch.Tensor] = [] + + for _step in range(args.rollout_steps): + with torch.no_grad(): + # Get action from deploy model (uses adaptation module). + aug_obs = obs # already augmented by runner + actions = deploy_model.act( + {"states": aug_obs}, role="policy", + )[0] + + obs, _, _, _, info = runner.step(actions) + + # Compute teacher's z from privileged μ. + mu = info.get("privileged_obs") + if mu is not None: + z_target = teacher.env_encoder(mu) + z_targets.append(z_target) + + # Compute adaptation module's ẑ from history. + raw = aug_obs[:, :raw_obs_dim] + hist_flat = aug_obs[:, raw_obs_dim:] + history = hist_flat.reshape( + -1, args.history_length, step_dim, + ) + z_pred = deploy_model.adaptation_module(history) + z_preds.append(z_pred) + + if not z_targets: + continue + + # Supervised update on adaptation module. + z_target_batch = torch.cat(z_targets, dim=0).detach() + z_pred_batch = torch.cat(z_preds, dim=0) + + # Re-compute z_pred with gradients (the ones above were no_grad). + # We need to re-encode from stored data; instead, collect with grad: + # Actually, z_preds were computed in no_grad. Let me re-collect + # a fresh batch with gradients. + obs_reset, _ = runner.reset() + obs = obs_reset + + z_targets_grad: list[torch.Tensor] = [] + z_preds_grad: list[torch.Tensor] = [] + + for _step in range(args.rollout_steps): + with torch.no_grad(): + aug_obs = obs + actions = deploy_model.act( + {"states": aug_obs}, role="policy", + )[0] + obs, _, _, _, info = runner.step(actions) + mu = info.get("privileged_obs") + + if mu is not None: + with torch.no_grad(): + z_target = teacher.env_encoder(mu) + z_targets_grad.append(z_target) + + # This time, compute z_pred WITH gradients. + raw = aug_obs[:, :raw_obs_dim] + hist_flat = aug_obs[:, raw_obs_dim:] + history = hist_flat.reshape( + -1, args.history_length, step_dim, + ) + z_pred = deploy_model.adaptation_module(history) + z_preds_grad.append(z_pred) + + if not z_targets_grad: + continue + + z_target_all = torch.cat(z_targets_grad, dim=0).detach() + z_pred_all = torch.cat(z_preds_grad, dim=0) + + loss = torch.nn.functional.mse_loss(z_pred_all, z_target_all) + + optimizer.zero_grad() + loss.backward() + optimizer.step() + + if iteration % 50 == 0: + log.info("adaptation_loss", iteration=iteration, mse=f"{loss.item():.6f}") + + # ── Save adaptation weights ────────────────────────────────── + out_path = pathlib.Path(args.output) + out_path.parent.mkdir(parents=True, exist_ok=True) + + # Save the full deploy model state dict. + torch.save(deploy_model.state_dict(), out_path) + log.info("adaptation_saved", path=str(out_path)) + + runner.close() + + +if __name__ == "__main__": + main() diff --git a/src/core/robot.py b/src/core/robot.py index f7c16bc..c74504f 100644 --- a/src/core/robot.py +++ b/src/core/robot.py @@ -46,12 +46,11 @@ class ActuatorConfig: deadzone: tuple[float, float] = (0.0, 0.0) # min |ctrl| for torque (pos, neg) damping: tuple[float, float] = (0.0, 0.0) # viscous damping (pos, neg) frictionloss: tuple[float, float] = (0.0, 0.0) # Coulomb friction (pos, neg) - stribeck_friction_boost: float = 0.0 # extra friction at low speed (fraction) - stribeck_vel: float = 2.0 # Stribeck velocity scale (rad/s) - action_bias: float = 0.0 # constant bias on action (H-bridge asymmetry) kp: float = 0.0 # proportional gain (position / velocity actuators) kv: float = 0.0 # derivative gain (position actuators) filter_tau: float = 0.0 # 1st-order filter time constant (s); 0 = no filter + viscous_quadratic: float = 0.0 # velocity² drag coefficient + back_emf_gain: float = 0.0 # back-EMF torque reduction @property def gear_avg(self) -> float: @@ -65,15 +64,12 @@ class ActuatorConfig: or self.deadzone != (0.0, 0.0) or self.damping != (0.0, 0.0) or self.frictionloss != (0.0, 0.0) - or self.stribeck_friction_boost > 0 - or self.action_bias != 0.0 + or self.viscous_quadratic > 0 + or self.back_emf_gain > 0 ) def transform_ctrl(self, ctrl: float) -> float: - """Apply bias, asymmetric deadzone, and gear compensation.""" - # Action bias (H-bridge asymmetry) - ctrl = ctrl + self.action_bias - + """Apply asymmetric deadzone and gear compensation to a scalar ctrl.""" # Deadzone dz_pos, dz_neg = self.deadzone if ctrl >= 0 and ctrl < dz_pos: @@ -89,23 +85,35 @@ class ActuatorConfig: return ctrl - def compute_motor_force(self, vel: float, ctrl: float) -> float: - """Asymmetric friction + Stribeck + damping → applied torque.""" + def compute_motor_force(self, vel: float, ctrl: float, + friction_scale: float = 1.0, + damping_scale: float = 1.0) -> float: + """Asymmetric friction, damping, drag, back-EMF → applied torque. + + ``friction_scale`` / ``damping_scale`` multiply the Coulomb-friction + and viscous-damping terms for per-env domain randomization + (1.0 = no randomization, the default used by sysid). + """ torque = 0.0 - # Coulomb friction (direction-dependent + Stribeck boost) + # Coulomb friction (direction-dependent) fl_pos, fl_neg = self.frictionloss if abs(vel) > 1e-6: - fl = fl_pos if vel > 0 else fl_neg - if self.stribeck_friction_boost > 0 and self.stribeck_vel > 0: - fl = fl * (1.0 + self.stribeck_friction_boost - * math.exp(-abs(vel) / self.stribeck_vel)) + fl = (fl_pos if vel > 0 else fl_neg) * friction_scale torque -= math.copysign(fl, vel) # Viscous damping (direction-dependent) - damp = self.damping[0] if vel > 0 else self.damping[1] + damp = (self.damping[0] if vel > 0 else self.damping[1]) * damping_scale torque -= damp * vel + # Quadratic velocity drag + if self.viscous_quadratic > 0: + torque -= self.viscous_quadratic * vel * abs(vel) + + # Back-EMF torque reduction + if self.back_emf_gain > 0 and abs(ctrl) > 1e-6: + torque -= self.back_emf_gain * vel * math.copysign(1.0, ctrl) + return max(-10.0, min(10.0, torque)) def transform_action(self, action): diff --git a/src/core/runner.py b/src/core/runner.py index ff2b004..0514e71 100644 --- a/src/core/runner.py +++ b/src/core/runner.py @@ -15,6 +15,18 @@ class BaseRunnerConfig: num_envs: int = 1 device: str = "cpu" history_length: int = 0 # 0 = no history (single obs), >0 = RMA-style + rma_mode: str = "none" # "none" | "teacher" | "deploy" + + # ── Domain randomization (sim-to-real) ───────────────────────── + # Empty dict = disabled (every field below is a no-op). Supported keys: + # qpos_noise_std: float — Gaussian sensor noise on joint angles (rad) + # qvel_noise_std: float — Gaussian sensor noise on joint velocities (rad/s) + # action_delay_steps: [lo, hi] — per-env integer control-step latency + # friction_scale: [lo, hi] — per-env multiplier on Coulomb friction + # damping_scale: [lo, hi] — per-env multiplier on viscous damping + # torque_scale: [lo, hi] — per-env multiplier on applied motor torque + # The randomized factors are exposed as privileged_obs (μ) for RMA. + domain_rand: dict = dataclasses.field(default_factory=dict) class BaseRunner(abc.ABC, Generic[T]): def __init__(self, env: BaseEnv, config: T) -> None: @@ -37,7 +49,15 @@ class BaseRunner(abc.ABC, Generic[T]): self.config.num_envs, dtype=torch.long, device=self.config.device ) - # ── History buffer (RMA-style adaptation) ──────────────── + # ── Domain randomization (latency / sensor noise / dynamics) ─ + # Must precede the RMA block: teacher mode reads privileged_dim, + # which is derived from the randomized-factor count below. + self._setup_domain_rand() + + # ── RMA mode ──────────────────────────────────────────── + self._rma_mode: str = getattr(self.config, "rma_mode", "none") + + # ── History buffer (used in "deploy" and legacy "none" modes) ─ self._history_len: int = getattr(self.config, "history_length", 0) if self._history_len > 0: obs_dim = self.observation_space.shape[0] @@ -50,11 +70,30 @@ class BaseRunner(abc.ABC, Generic[T]): self.config.num_envs, self._history_len, self._history_step_dim, device=self.config.device, ) - # Augmented observation space: [current_obs, history_flat] - from gymnasium import spaces - aug_dim = obs_dim + self._history_len * self._history_step_dim + + # ── Observation space augmentation ─────────────────────── + from gymnasium import spaces + raw_obs_dim = self.observation_space.shape[0] + + if self._rma_mode == "teacher": + # Teacher gets [raw_obs, μ]. μ dim resolved after _sim_initialize. + mu_dim = getattr(self, "privileged_dim", 0) + if mu_dim > 0: + aug_dim = raw_obs_dim + mu_dim + self.observation_space = spaces.Box( + low=-torch.inf, high=torch.inf, shape=(aug_dim,), + ) + elif self._rma_mode == "deploy" and self._history_len > 0: + # Deploy gets [raw_obs, history_flat]. + aug_dim = raw_obs_dim + self._history_len * self._history_step_dim self.observation_space = spaces.Box( - low=-torch.inf, high=torch.inf, shape=(aug_dim,) + low=-torch.inf, high=torch.inf, shape=(aug_dim,), + ) + elif self._rma_mode == "none" and self._history_len > 0: + # Legacy mode: [raw_obs, history_flat]. + aug_dim = raw_obs_dim + self._history_len * self._history_step_dim + self.observation_space = spaces.Box( + low=-torch.inf, high=torch.inf, shape=(aug_dim,), ) @property @@ -84,11 +123,139 @@ class BaseRunner(abc.ABC, Generic[T]): if hasattr(self, "_offscreen_renderer") and self._offscreen_renderer is not None: self._offscreen_renderer.close() + # ── Domain randomization ───────────────────────────────────── + + _SCALE_FIELDS = ("friction_scale", "damping_scale", "torque_scale") + + def _setup_domain_rand(self) -> None: + """Parse the domain_rand config into per-env buffers + the μ layout. + + All buffers are no-ops when ``domain_rand`` is empty: scales are 1.0, + delay is 0, noise std is 0, and privileged_dim is 0. + """ + dr = dict(getattr(self.config, "domain_rand", {}) or {}) + n = self.config.num_envs + dev = self.config.device + + # Fixed (not per-env) Gaussian sensor noise. + self._qpos_noise_std = float(dr.get("qpos_noise_std", 0.0)) + self._qvel_noise_std = float(dr.get("qvel_noise_std", 0.0)) + + # Per-env multiplicative dynamics scales (applied by the sim runner). + self._dr_scales: dict[str, torch.Tensor] = { + f: torch.ones(n, device=dev) for f in self._SCALE_FIELDS + } + + # Per-env integer action delay (in control steps). + self._dr_delay = torch.zeros(n, dtype=torch.long, device=dev) + + # Spec list — its order fixes the privileged μ vector layout. + self._dr_specs: list[tuple[str, float, float]] = [] + delay_range = dr.get("action_delay_steps") + if delay_range: + self._dr_specs.append( + ("action_delay_steps", float(delay_range[0]), float(delay_range[1])) + ) + self._max_delay = int(delay_range[1]) + else: + self._max_delay = 0 + for f in self._SCALE_FIELDS: + rng = dr.get(f) + if rng: + self._dr_specs.append((f, float(rng[0]), float(rng[1]))) + + self._mu_dim = len(self._dr_specs) + self._dr_mu = torch.zeros(n, self._mu_dim, device=dev) + + # Action-delay ring buffer: (num_envs, max_delay + 1, act_dim). + if self._max_delay > 0: + act_dim = self.env.action_space.shape[0] + self._action_buf = torch.zeros( + n, self._max_delay + 1, act_dim, device=dev, + ) + + def _resample_domain_rand(self, env_ids: torch.Tensor) -> None: + """Sample fresh per-env DR factors (call on every (re)set).""" + if self._mu_dim == 0 or env_ids.numel() == 0: + return + dev = self.config.device + for j, (name, lo, hi) in enumerate(self._dr_specs): + if name == "action_delay_steps": + vals = torch.randint( + int(lo), int(hi) + 1, (env_ids.numel(),), device=dev, + ) + self._dr_delay[env_ids] = vals + raw = vals.float() + else: + vals = torch.rand(env_ids.numel(), device=dev) * (hi - lo) + lo + self._dr_scales[name][env_ids] = vals + raw = vals + # Normalize each factor to ~[-1, 1] for the privileged μ vector. + span = (hi - lo) if (hi - lo) > 1e-9 else 1.0 + self._dr_mu[env_ids, j] = 2.0 * (raw - lo) / span - 1.0 + + def _reset_action_buffer(self, env_ids: torch.Tensor) -> None: + if self._max_delay > 0: + self._action_buf[env_ids] = 0.0 + + def _apply_action_delay(self, actions: torch.Tensor) -> torch.Tensor: + """Return the per-env delayed action that the simulator should apply. + + The policy's commanded action is what gets stored in history; only + the action handed to ``_sim_step`` is delayed. + """ + if self._max_delay <= 0: + return actions + self._action_buf = torch.roll(self._action_buf, 1, dims=1) + self._action_buf[:, 0] = actions + idx = torch.arange(self.num_envs, device=self.device) + return self._action_buf[idx, self._dr_delay] + + def _add_sensor_noise( + self, qpos: torch.Tensor, qvel: torch.Tensor, + ) -> tuple[torch.Tensor, torch.Tensor]: + if self._qpos_noise_std > 0: + qpos = qpos + torch.randn_like(qpos) * self._qpos_noise_std + if self._qvel_noise_std > 0: + qvel = qvel + torch.randn_like(qvel) * self._qvel_noise_std + return qpos, qvel + + def _compute_obs(self, qpos: torch.Tensor, qvel: torch.Tensor) -> torch.Tensor: + """Observation the policy sees — built from noisy (sensor) state.""" + nqpos, nqvel = self._add_sensor_noise(qpos, qvel) + return self.env.compute_observations(self.env.build_state(nqpos, nqvel)) + + # ── Privileged observation interface ───────────────────────── + + @property + def privileged_dim(self) -> int: + """Number of randomized DR factors exposed as μ (0 if DR disabled).""" + return getattr(self, "_mu_dim", 0) + + @property + def privileged_obs(self) -> torch.Tensor: + """Per-env normalized DR factors μ ∈ [-1, 1] for RMA supervision.""" + if getattr(self, "_mu_dim", 0) > 0: + return self._dr_mu + return torch.zeros(self.config.num_envs, 0, device=self.config.device) + + # ── Observation augmentation ───────────────────────────────── + def _augment_obs(self, obs: torch.Tensor) -> torch.Tensor: - """Concatenate history buffer to current obs if history is enabled.""" + """Augment raw obs based on RMA mode. + + teacher: [raw_obs, μ] + deploy: [raw_obs, history_flat] + none: [raw_obs, history_flat] (legacy, or plain obs if no history) + """ + if self._rma_mode == "teacher": + mu = self.privileged_obs + if mu.shape[-1] > 0: + return torch.cat([obs, mu], dim=-1) + return obs + # deploy / none: concatenate history if self._history_len <= 0: return obs - # Flatten history: (num_envs, H, step_dim) → (num_envs, H * step_dim) hist_flat = self._history_buf.reshape(obs.shape[0], -1) return torch.cat([obs, hist_flat], dim=-1) @@ -115,30 +282,41 @@ class BaseRunner(abc.ABC, Generic[T]): def reset(self) -> tuple[torch.Tensor, dict[str, Any]]: all_ids = torch.arange(self.num_envs, device=self.device) + self._resample_domain_rand(all_ids) + self._reset_action_buffer(all_ids) qpos, qvel = self._sim_reset(all_ids) self.step_counts.zero_() self._reset_history(all_ids) - state = self.env.build_state(qpos, qvel) - obs = self.env.compute_observations(state) + obs = self._compute_obs(qpos, qvel) return self._augment_obs(obs), {} - + def step(self, actions: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor, dict[str, Any]]: self._last_actions = actions - qpos, qvel = self._sim_step(actions) + # Latency: the simulator applies a (per-env) delayed action. + sim_actions = self._apply_action_delay(actions) + qpos, qvel = self._sim_step(sim_actions) self.step_counts += 1 - state = self.env.build_state(qpos, qvel) - obs = self.env.compute_observations(state) - rewards = self.env.compute_rewards(state, actions) - terminated = self.env.compute_terminations(state) + # Reward / termination use the TRUE state (no sensor noise) so the + # learning signal and safety checks stay clean. + clean_state = self.env.build_state(qpos, qvel) + rewards = self.env.compute_rewards(clean_state, actions) + terminated = self.env.compute_terminations(clean_state) truncated = self.env.compute_truncations(self.step_counts) + # The observation the policy sees is built from the NOISY sensor state. + obs = self._compute_obs(qpos, qvel) + # Push current (obs, action) into history before augmenting. self._push_history(obs, actions) info: dict[str, Any] = {} + # Expose privileged DR params μ for RMA supervision (this step's dynamics). + if self.privileged_dim > 0: + info["privileged_obs"] = self.privileged_obs.clone() + done = terminated | truncated done_ids = done.nonzero(as_tuple=False).squeeze(-1) @@ -146,12 +324,14 @@ class BaseRunner(abc.ABC, Generic[T]): info["final_observations"] = self._augment_obs(obs)[done_ids].clone() info["final_env_ids"] = done_ids.clone() + # New episode → fresh dynamics + cleared latency buffer. + self._resample_domain_rand(done_ids) + self._reset_action_buffer(done_ids) reset_qpos, reset_qvel = self._sim_reset(done_ids) self.step_counts[done_ids] = 0 self._reset_history(done_ids) - reset_state = self.env.build_state(reset_qpos, reset_qvel) - obs[done_ids] = self.env.compute_observations(reset_state) + obs[done_ids] = self._compute_obs(reset_qpos, reset_qvel) # skrl expects (num_envs, 1) for rewards/terminated/truncated return self._augment_obs(obs), rewards.unsqueeze(-1), terminated.unsqueeze(-1), truncated.unsqueeze(-1), info diff --git a/src/envs/rotary_cartpole.py b/src/envs/rotary_cartpole.py index ebecca2..a66dbb7 100644 --- a/src/envs/rotary_cartpole.py +++ b/src/envs/rotary_cartpole.py @@ -21,11 +21,17 @@ class RotaryCartPoleConfig(BaseEnvConfig): at the arm tip. Goal: swing the pendulum up and balance it upright. """ # Reward shaping - reward_upright_scale: float = 1.0 # cos(pendulum) when upright = +1 + reward_upright_scale: float = 1.0 # upright reward ∈ [0, scale] + alive_bonus: float = 0.25 # per-step survival bonus (must stay alive > die) + balance_bonus: float = 2.0 # extra reward for upright AND still (beats spinning) + balance_vel_scale: float = 0.5 # decay rate of the bonus with pendulum speed motor_vel_penalty: float = 0.01 # penalise high motor angular velocity motor_angle_penalty: float = 0.05 # penalise deviation from centre action_penalty: float = 0.05 # penalise large actions (energy cost) + # ── Initial state randomisation ── + pendulum_init_range_deg: float = 180.0 # pendulum starts in [-range, +range] + # ── Software safety limit (env-level, on top of URDF + hardware) ── motor_angle_limit_deg: float = 90.0 # terminate episode if exceeded @@ -82,8 +88,23 @@ class RotaryCartPoleEnv(BaseEnv[RotaryCartPoleConfig]): # ── Rewards ────────────────────────────────────────────────── def compute_rewards(self, state: RotaryCartPoleState, actions: torch.Tensor) -> torch.Tensor: - # Upright reward: -cos(θ) ∈ [-1, +1] - reward = -torch.cos(state.pendulum_angle) * self.config.reward_upright_scale + # Upright shaping ∈ [0, 1]: 0 hanging down (θ=0), 1 fully upright (θ=π). + # Non-negative by design so *surviving* always beats ending the episode early + # (otherwise the optimum is to slam the arm into the ±limit — "suicide policy"). + upright = 0.5 * (1.0 - torch.cos(state.pendulum_angle)) + + # Balanced bonus: large ONLY when near the top AND nearly still. A freely + # spinning pendulum passes through the top at high speed, so stillness≈0 and + # it earns ~none of this — making true balancing strictly dominate the + # "just keep spinning in full loops" local optimum. + stillness = torch.exp(-self.config.balance_vel_scale * state.pendulum_vel.pow(2)) + balance = self.config.balance_bonus * upright * stillness + + # Per-step alive bonus keeps a not-yet-upright step net-positive despite + # penalties, so the −10 termination penalty is genuinely a deterrent. + reward = (upright * self.config.reward_upright_scale + + balance + + self.config.alive_bonus) # Penalise fast motor spinning (discourages violent oscillation) reward = reward - self.config.motor_vel_penalty * state.motor_vel.pow(2) @@ -97,7 +118,7 @@ class RotaryCartPoleEnv(BaseEnv[RotaryCartPoleConfig]): # Penalty for exceeding motor angle limit (episode also terminates) limit_rad = math.radians(self.config.motor_angle_limit_deg) exceeded = state.motor_angle.abs() >= limit_rad - reward = torch.where(exceeded, torch.tensor(-1000.0, device=reward.device), reward) + reward = torch.where(exceeded, torch.tensor(-10.0, device=reward.device), reward) return reward diff --git a/src/models/mlp.py b/src/models/mlp.py index f4c7558..9989a9a 100644 --- a/src/models/mlp.py +++ b/src/models/mlp.py @@ -11,9 +11,8 @@ class HistoryEncoder(nn.Module): Output: (batch, embedding_dim) Architecture: two temporal conv layers → global average pool → linear. - This lets the policy implicitly infer environment parameters (mass, - friction, gear, etc.) from recent dynamics — the core of RMA-style - adaptation for sim2real. + Used as the adaptation module φ in RMA Phase 2 (deploy mode) to + predict the latent ẑ from recent dynamics. """ def __init__( @@ -43,18 +42,95 @@ class HistoryEncoder(nn.Module): return self.fc(x) +class EnvironmentEncoder(nn.Module): + """MLP that compresses privileged DR parameters μ into a latent z. + + Used in RMA Phase 1 (teacher mode): z = e(μ). + """ + + def __init__(self, mu_dim: int, latent_dim: int = 8, hidden: int = 64) -> None: + super().__init__() + self.net = nn.Sequential( + nn.Linear(mu_dim, hidden), + nn.ELU(), + nn.Linear(hidden, latent_dim), + ) + + def forward(self, mu: torch.Tensor) -> torch.Tensor: + """mu: (batch, mu_dim) → z: (batch, latent_dim).""" + return self.net(mu) + + class SharedMLP(GaussianMixin, DeterministicMixin, Model): - def __init__(self, observation_space: spaces.Space, action_space: spaces.Space, device: torch.device, hidden_sizes: tuple[int, ...] = (32, 32), clip_actions: bool = False, clip_log_std: bool = True, min_log_std: float = -2.0, max_log_std: float = 2.0, initial_log_std: float = 0.0, history_length: int = 0, raw_obs_dim: int = 0, embedding_dim: int = 32): + """Shared policy/value network with RMA support. + + rma_mode: + "none" – legacy: optional history encoder, plain obs, backward compat. + "teacher" – Phase 1: env_encoder(μ) → z, backbone input = [raw_obs, z]. + "deploy" – Phase 2+: adaptation_module(history) → ẑ, input = [raw_obs, ẑ]. + + Teacher and deploy modes produce the same backbone in_dim + (raw_obs_dim + latent_dim), so weights transfer cleanly. + """ + + def __init__( + self, + observation_space: spaces.Space, + action_space: spaces.Space, + device: torch.device, + hidden_sizes: tuple[int, ...] = (32, 32), + clip_actions: bool = False, + clip_log_std: bool = True, + min_log_std: float = -2.0, + max_log_std: float = 2.0, + initial_log_std: float = 0.0, + # ── Legacy (none mode) ─────────────────────────────────── + history_length: int = 0, + raw_obs_dim: int = 0, + embedding_dim: int = 32, + # ── RMA modes ──────────────────────────────────────────── + rma_mode: str = "none", + latent_dim: int = 8, + mu_dim: int = 0, + ): Model.__init__(self, observation_space, action_space, device) - GaussianMixin.__init__(self, clip_actions, clip_log_std, min_log_std, max_log_std) + GaussianMixin.__init__( + self, clip_actions, clip_log_std, min_log_std, max_log_std, + ) DeterministicMixin.__init__(self, clip_actions) + self._rma_mode = rma_mode self._history_length = history_length self._raw_obs_dim = raw_obs_dim self._embedding_dim = embedding_dim + self._latent_dim = latent_dim + self._mu_dim = mu_dim - if history_length > 0 and raw_obs_dim > 0: - # The observation is [current_obs(raw_obs_dim), history_flat(H * step_dim)]. + # ── Build encoder + determine backbone in_dim ──────────── + self.env_encoder: EnvironmentEncoder | None = None + self.adaptation_module: HistoryEncoder | None = None + self.history_encoder: HistoryEncoder | None = None # legacy + + if rma_mode == "teacher": + assert mu_dim > 0 and raw_obs_dim > 0 + self.env_encoder = EnvironmentEncoder( + mu_dim=mu_dim, latent_dim=latent_dim, + ) + in_dim = raw_obs_dim + latent_dim + + elif rma_mode == "deploy": + assert history_length > 0 and raw_obs_dim > 0 + act_dim = self.num_actions + step_dim = raw_obs_dim + act_dim + self.adaptation_module = HistoryEncoder( + history_length=history_length, + step_dim=step_dim, + embedding_dim=latent_dim, + ) + in_dim = raw_obs_dim + latent_dim + + elif history_length > 0 and raw_obs_dim > 0: + # Legacy "none" mode with history encoder. act_dim = self.num_actions step_dim = raw_obs_dim + act_dim self.history_encoder = HistoryEncoder( @@ -62,12 +138,11 @@ class SharedMLP(GaussianMixin, DeterministicMixin, Model): step_dim=step_dim, embedding_dim=embedding_dim, ) - # MLP input = raw obs + history embedding. in_dim = raw_obs_dim + embedding_dim else: - self.history_encoder = None in_dim = self.num_observations + # ── Shared backbone ────────────────────────────────────── layers: list[nn.Module] = [] for hidden_size in hidden_sizes: layers.append(nn.Linear(in_dim, hidden_size)) @@ -77,34 +152,54 @@ class SharedMLP(GaussianMixin, DeterministicMixin, Model): # Policy head self.mean_layer = nn.Linear(in_dim, self.num_actions) - self.log_std_parameter: nn.Parameter = nn.Parameter(torch.full((self.num_actions,), initial_log_std)) + self.log_std_parameter: nn.Parameter = nn.Parameter( + torch.full((self.num_actions,), initial_log_std), + ) # Value head self.value_layer = nn.Linear(in_dim, 1) self._shared_output: torch.Tensor | None = None - - def act(self, inputs: dict[str, torch.Tensor], role: str = "") -> tuple[torch.Tensor, ...]: + def act( + self, inputs: dict[str, torch.Tensor], role: str = "", + ) -> tuple[torch.Tensor, ...]: if role == "policy": return GaussianMixin.act(self, inputs, role) elif role == "value": return DeterministicMixin.act(self, inputs, role) - - def _encode(self, states: torch.Tensor) -> torch.Tensor: - """Split augmented obs into current obs + history, encode, concat.""" - if self.history_encoder is None: - return self.net(states) - obs = states[:, :self._raw_obs_dim] - hist_flat = states[:, self._raw_obs_dim:] - step_dim = self._raw_obs_dim + self.num_actions - history = hist_flat.reshape(-1, self._history_length, step_dim) - embedding = self.history_encoder(history) - return self.net(torch.cat([obs, embedding], dim=-1)) + def _encode(self, states: torch.Tensor) -> torch.Tensor: + """Route through the correct encoder based on rma_mode.""" + if self._rma_mode == "teacher": + # states = [raw_obs, μ] + obs = states[:, :self._raw_obs_dim] + mu = states[:, self._raw_obs_dim:] + z = self.env_encoder(mu) + return self.net(torch.cat([obs, z], dim=-1)) + + if self._rma_mode == "deploy": + # states = [raw_obs, history_flat] + obs = states[:, :self._raw_obs_dim] + hist_flat = states[:, self._raw_obs_dim:] + step_dim = self._raw_obs_dim + self.num_actions + history = hist_flat.reshape(-1, self._history_length, step_dim) + z_hat = self.adaptation_module(history) + return self.net(torch.cat([obs, z_hat], dim=-1)) + + # Legacy "none" mode. + if self.history_encoder is not None: + obs = states[:, :self._raw_obs_dim] + hist_flat = states[:, self._raw_obs_dim:] + step_dim = self._raw_obs_dim + self.num_actions + history = hist_flat.reshape(-1, self._history_length, step_dim) + embedding = self.history_encoder(history) + return self.net(torch.cat([obs, embedding], dim=-1)) + + return self.net(states) def compute( - self, inputs: dict[str, torch.Tensor], role: str = "" - ) -> tuple[torch.Tensor, ...]: + self, inputs: dict[str, torch.Tensor], role: str = "", + ) -> tuple[torch.Tensor, ...]: if role == "policy": self._shared_output = self._encode(inputs["states"]) return self.mean_layer(self._shared_output), self.log_std_parameter, {} diff --git a/src/runners/mjx.py b/src/runners/mjx.py index 456d533..b67ace9 100644 --- a/src/runners/mjx.py +++ b/src/runners/mjx.py @@ -155,9 +155,8 @@ class MJXRunner(BaseRunner[MJXRunnerConfig]): _fl_neg = jnp.array([a.frictionloss[1] for a in acts]) _damp_pos = jnp.array([a.damping[0] for a in acts]) _damp_neg = jnp.array([a.damping[1] for a in acts]) - _stribeck_boost = jnp.array([a.stribeck_friction_boost for a in acts]) - _stribeck_vel = jnp.array([a.stribeck_vel for a in acts]) - _action_bias = jnp.array([a.action_bias for a in acts]) + _visc_quad = jnp.array([a.viscous_quadratic for a in acts]) + _back_emf = jnp.array([a.back_emf_gain for a in acts]) # ── Batched step (N substeps per call) ────────────────────── @jax.jit @@ -170,8 +169,8 @@ class MJXRunner(BaseRunner[MJXRunnerConfig]): ctrl = jnp.where(at_hi | at_lo, 0.0, ctrl) if _has_motor: - # Action bias + Deadzone + asymmetric gear compensation - mc = ctrl[:, _ctrl_ids] + _action_bias + # Deadzone + asymmetric gear compensation + mc = ctrl[:, _ctrl_ids] mc = jnp.where((mc >= 0) & (mc < _dz_pos), 0.0, mc) mc = jnp.where((mc < 0) & (mc > -_dz_neg), 0.0, mc) gear_dir = jnp.where(mc >= 0, _gear_pos, _gear_neg) @@ -185,18 +184,21 @@ class MJXRunner(BaseRunner[MJXRunnerConfig]): vel = d.qvel[:, _qvel_ids] mc = d.ctrl[:, _ctrl_ids] - # Coulomb friction (direction-dependent + Stribeck) + # Coulomb friction (direction-dependent) fl = jnp.where(vel > 0, _fl_pos, _fl_neg) - stribeck_mult = 1.0 + _stribeck_boost * jnp.exp( - -jnp.abs(vel) / _stribeck_vel - ) - fl = fl * stribeck_mult torque = -jnp.where( jnp.abs(vel) > 1e-6, jnp.sign(vel) * fl, 0.0, ) # Viscous damping (direction-dependent) damp = jnp.where(vel > 0, _damp_pos, _damp_neg) torque = torque - damp * vel + # Quadratic velocity drag + torque = torque - _visc_quad * vel * jnp.abs(vel) + # Back-EMF torque reduction + bemf = _back_emf * vel * jnp.sign(mc) + torque = torque - jnp.where( + jnp.abs(mc) > 1e-6, bemf, 0.0, + ) torque = jnp.clip(torque, -10.0, 10.0) d = d.replace( qfrc_applied=d.qfrc_applied.at[:, _qvel_ids].set(torque), diff --git a/src/runners/mujoco.py b/src/runners/mujoco.py index 0ebd4d6..05d6680 100644 --- a/src/runners/mujoco.py +++ b/src/runners/mujoco.py @@ -1,4 +1,5 @@ import dataclasses +import math import os import tempfile import xml.etree.ElementTree as ET @@ -13,34 +14,12 @@ from src.core.robot import RobotConfig from src.core.runner import BaseRunner, BaseRunnerConfig -@dataclasses.dataclass -class DomainRandConfig: - """Per-reset randomization of MuJoCo model parameters. - - Each field is a fractional range: the nominal value is multiplied by - ``uniform(1 - frac, 1 + frac)``. Set to 0.0 to disable. - """ - mass_frac: float = 0.0 # body masses - friction_frac: float = 0.0 # joint frictionloss - damping_frac: float = 0.0 # joint damping - armature_frac: float = 0.0 # joint armature (reflected inertia) - gear_frac: float = 0.0 # actuator gear ratio - com_offset: float = 0.0 # center-of-mass shift (metres) - - @dataclasses.dataclass class MuJoCoRunnerConfig(BaseRunnerConfig): num_envs: int = 16 device: str = "cpu" dt: float = 0.002 substeps: int = 10 - # ── Sim2real ───────────────────────────────────────────────── - domain_rand: DomainRandConfig = dataclasses.field(default_factory=DomainRandConfig) - - def __post_init__(self) -> None: - # Hydra passes domain_rand as a dict — convert to dataclass. - if isinstance(self.domain_rand, dict): - self.domain_rand = DomainRandConfig(**self.domain_rand) class ActuatorLimits: @@ -225,6 +204,8 @@ def load_mujoco_model(robot: RobotConfig) -> mujoco.MjModel: class MuJoCoRunner(BaseRunner[MuJoCoRunnerConfig]): + def __init__(self, env: BaseEnv, config: MuJoCoRunnerConfig): + super().__init__(env, config) @property def num_envs(self) -> int: @@ -253,15 +234,6 @@ class MuJoCoRunner(BaseRunner[MuJoCoRunnerConfig]): qvel_idx = self._model.jnt_dofadr[jnt_id] self._motor_actuators.append((act, qvel_idx)) - # ── Domain randomization: store nominal values ─────────── - self._nominal_mass = self._model.body_mass.copy() - self._nominal_inertia = self._model.body_inertia.copy() - self._nominal_ipos = self._model.body_ipos.copy() - self._nominal_damping = self._model.dof_damping.copy() - self._nominal_armature = self._model.dof_armature.copy() - self._nominal_frictionloss = self._model.dof_frictionloss.copy() - self._nominal_gear = self._model.actuator_gear.copy() - def _sim_step(self, actions: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]: actions_np: np.ndarray = actions.cpu().numpy() @@ -275,14 +247,24 @@ class MuJoCoRunner(BaseRunner[MuJoCoRunnerConfig]): qpos_batch = np.zeros((self.num_envs, self._nq), dtype=np.float32) qvel_batch = np.zeros((self.num_envs, self._nv), dtype=np.float32) + # Per-env domain-randomization scales (all 1.0 when DR is disabled). + fr_scale = self._dr_scales["friction_scale"].cpu().numpy() + dp_scale = self._dr_scales["damping_scale"].cpu().numpy() + tq_scale = self._dr_scales["torque_scale"].cpu().numpy() + for i, data in enumerate(self._data): - data.ctrl[:] = actions_np[i] + # torque_scale emulates motor-constant / battery-voltage variation. + data.ctrl[:] = actions_np[i] * tq_scale[i] for _ in range(self.config.substeps): # Apply asymmetric motor forces via qfrc_applied for act, qvel_idx in self._motor_actuators: vel = data.qvel[qvel_idx] ctrl = data.ctrl[0] # TODO: generalise for multi-actuator - data.qfrc_applied[qvel_idx] = act.compute_motor_force(vel, ctrl) + data.qfrc_applied[qvel_idx] = act.compute_motor_force( + vel, ctrl, + friction_scale=fr_scale[i], + damping_scale=dp_scale[i], + ) self._limits.enforce(self._model, data) mujoco.mj_step(self._model, data) @@ -301,41 +283,22 @@ class MuJoCoRunner(BaseRunner[MuJoCoRunnerConfig]): qpos_batch = np.zeros((n, self._nq), dtype=np.float32) qvel_batch = np.zeros((n, self._nv), dtype=np.float32) - # ── Domain randomization ───────────────────────────────── - dr = self.config.domain_rand - if dr.mass_frac > 0: - scale = np.random.uniform(1 - dr.mass_frac, 1 + dr.mass_frac, - size=self._nominal_mass.shape) - self._model.body_mass[:] = self._nominal_mass * scale - self._model.body_inertia[:] = self._nominal_inertia * scale[:, None] - if dr.com_offset > 0: - offset = np.random.uniform(-dr.com_offset, dr.com_offset, - size=self._nominal_ipos.shape) - self._model.body_ipos[:] = self._nominal_ipos + offset - if dr.damping_frac > 0: - scale = np.random.uniform(1 - dr.damping_frac, 1 + dr.damping_frac, - size=self._nominal_damping.shape) - self._model.dof_damping[:] = np.maximum(0, self._nominal_damping * scale) - if dr.armature_frac > 0: - scale = np.random.uniform(1 - dr.armature_frac, 1 + dr.armature_frac, - size=self._nominal_armature.shape) - self._model.dof_armature[:] = np.maximum(0, self._nominal_armature * scale) - if dr.friction_frac > 0: - scale = np.random.uniform(1 - dr.friction_frac, 1 + dr.friction_frac, - size=self._nominal_frictionloss.shape) - self._model.dof_frictionloss[:] = np.maximum(0, self._nominal_frictionloss * scale) - if dr.gear_frac > 0: - scale = np.random.uniform(1 - dr.gear_frac, 1 + dr.gear_frac, - size=self._nominal_gear.shape) - self._model.actuator_gear[:] = self._nominal_gear * scale + # Check if env has a pendulum_init_range_deg for wide pendulum randomisation + pend_range = getattr(self.env.config, "pendulum_init_range_deg", 0.0) + pend_range_rad = math.radians(pend_range) if pend_range > 0 else 0.0 for i, env_id in enumerate(ids): data = self._data[env_id] mujoco.mj_resetData(self._model, data) - # Small random perturbation for exploration + # Small random perturbation for motor angle + velocity data.qpos[:] += np.random.uniform(-0.05, 0.05, size=self._nq) data.qvel[:] += np.random.uniform(-0.05, 0.05, size=self._nv) + + # Wide pendulum angle randomisation (overrides the small perturbation) + if pend_range_rad > 0 and self._nq >= 2: + data.qpos[1] = np.random.uniform(-pend_range_rad, pend_range_rad) + data.ctrl[:] = 0.0 qpos_batch[i] = data.qpos diff --git a/src/sysid/motor/export.py b/src/sysid/motor/export.py new file mode 100644 index 0000000..b717ce1 --- /dev/null +++ b/src/sysid/motor/export.py @@ -0,0 +1,186 @@ +"""Export tuned motor parameters to MJCF and robot.yaml files. + +Reads the original motor.xml and robot.yaml, patches with optimised +parameter values, and writes motor_tuned.xml + robot_tuned.yaml. + +Usage: + python -m src.sysid.motor.export \ + --asset-path assets/motor \ + --result assets/motor/motor_sysid_result.json +""" + +from __future__ import annotations + +import argparse +import copy +import json +import xml.etree.ElementTree as ET +from pathlib import Path + +import structlog +import yaml + +log = structlog.get_logger() + +_DEFAULT_ASSET = "assets/motor" + + +def export_tuned_files( + asset_path: str | Path, + params: dict[str, float], +) -> tuple[Path, Path]: + """Write tuned MJCF and robot.yaml files. + + Returns (tuned_mjcf_path, tuned_robot_yaml_path). + """ + asset_path = Path(asset_path).resolve() + + robot_yaml_path = asset_path / "robot.yaml" + robot_cfg = yaml.safe_load(robot_yaml_path.read_text()) + mjcf_path = asset_path / robot_cfg["mjcf"] + + # ── Tune MJCF ──────────────────────────────────────────────── + tree = ET.parse(str(mjcf_path)) + root = tree.getroot() + + # Actuator — use average gear for the MJCF model. + gear_pos = params.get("actuator_gear_pos", params.get("actuator_gear")) + gear_neg = params.get("actuator_gear_neg", params.get("actuator_gear")) + gear_avg = None + if gear_pos is not None and gear_neg is not None: + gear_avg = (gear_pos + gear_neg) / 2.0 + elif gear_pos is not None: + gear_avg = gear_pos + filter_tau = params.get("actuator_filter_tau") + for act_el in root.iter("general"): + if act_el.get("name") == "motor": + if gear_avg is not None: + act_el.set("gear", str(gear_avg)) + if filter_tau is not None: + if filter_tau > 0: + act_el.set("dyntype", "filter") + act_el.set("dynprm", str(filter_tau)) + else: + act_el.set("dyntype", "none") + + # Joint — average damping & friction for MJCF (asymmetry in runtime). + fl_pos = params.get("motor_frictionloss_pos", params.get("motor_frictionloss")) + fl_neg = params.get("motor_frictionloss_neg", params.get("motor_frictionloss")) + fl_avg = None + if fl_pos is not None and fl_neg is not None: + fl_avg = (fl_pos + fl_neg) / 2.0 + elif fl_pos is not None: + fl_avg = fl_pos + damp_pos = params.get("motor_damping_pos", params.get("motor_damping")) + damp_neg = params.get("motor_damping_neg", params.get("motor_damping")) + damp_avg = None + if damp_pos is not None and damp_neg is not None: + damp_avg = (damp_pos + damp_neg) / 2.0 + elif damp_pos is not None: + damp_avg = damp_pos + for jnt in root.iter("joint"): + if jnt.get("name") == "motor_joint": + if damp_avg is not None: + jnt.set("damping", str(damp_avg)) + if "motor_armature" in params: + jnt.set("armature", str(params["motor_armature"])) + if fl_avg is not None: + jnt.set("frictionloss", str(fl_avg)) + + # Rotor mass. + if "rotor_mass" in params: + for geom in root.iter("geom"): + if geom.get("name") == "rotor_disk": + geom.set("mass", str(params["rotor_mass"])) + + # Write tuned MJCF. + tuned_mjcf_name = mjcf_path.stem + "_tuned" + mjcf_path.suffix + tuned_mjcf_path = asset_path / tuned_mjcf_name + ET.indent(tree, space=" ") + tree.write(str(tuned_mjcf_path), xml_declaration=True, encoding="unicode") + log.info("tuned_mjcf_written", path=str(tuned_mjcf_path)) + + # ── Tune robot.yaml ────────────────────────────────────────── + tuned_cfg = copy.deepcopy(robot_cfg) + tuned_cfg["mjcf"] = tuned_mjcf_name + + if tuned_cfg.get("actuators") and len(tuned_cfg["actuators"]) > 0: + act = tuned_cfg["actuators"][0] + if gear_avg is not None: + act["gear"] = round(gear_avg, 6) + if "actuator_filter_tau" in params: + act["filter_tau"] = round(params["actuator_filter_tau"], 6) + if "motor_damping" in params: + act["damping"] = round(params["motor_damping"], 6) + + if "joints" not in tuned_cfg: + tuned_cfg["joints"] = {} + if "motor_joint" not in tuned_cfg["joints"]: + tuned_cfg["joints"]["motor_joint"] = {} + mj = tuned_cfg["joints"]["motor_joint"] + if "motor_armature" in params: + mj["armature"] = round(params["motor_armature"], 6) + if fl_avg is not None: + mj["frictionloss"] = round(fl_avg, 6) + + # Asymmetric / hardware-realism / nonlinear parameters. + realism = {} + for key in [ + "actuator_gear_pos", "actuator_gear_neg", + "motor_damping_pos", "motor_damping_neg", + "motor_frictionloss_pos", "motor_frictionloss_neg", + "motor_deadzone_pos", "motor_deadzone_neg", + "action_bias", + "viscous_quadratic", "back_emf_gain", + "stribeck_friction_boost", "stribeck_vel", + "gearbox_backlash", + ]: + if key in params: + realism[key] = round(params[key], 6) + if realism: + tuned_cfg["hardware_realism"] = realism + + tuned_yaml_path = asset_path / "robot_tuned.yaml" + header = ( + "# Tuned motor config — generated by src.sysid.motor.optimize\n" + "# Original: robot.yaml\n\n" + ) + tuned_yaml_path.write_text( + header + yaml.dump(tuned_cfg, default_flow_style=False, sort_keys=False) + ) + log.info("tuned_robot_yaml_written", path=str(tuned_yaml_path)) + + return tuned_mjcf_path, tuned_yaml_path + + +# ── CLI ────────────────────────────────────────────────────────────── + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Export tuned motor parameters to MJCF + robot.yaml." + ) + parser.add_argument("--asset-path", type=str, default=_DEFAULT_ASSET) + parser.add_argument( + "--result", type=str, default=None, + help="Path to motor_sysid_result.json (auto-detected if omitted)", + ) + args = parser.parse_args() + + asset_path = Path(args.asset_path).resolve() + if args.result: + result_path = Path(args.result) + else: + result_path = asset_path / "motor_sysid_result.json" + + if not result_path.exists(): + raise FileNotFoundError(f"Result file not found: {result_path}") + + result = json.loads(result_path.read_text()) + params = result["best_params"] + + export_tuned_files(asset_path=args.asset_path, params=params) + + +if __name__ == "__main__": + main() diff --git a/src/sysid/optimize.py b/src/sysid/optimize.py index 36d15e6..4a4f931 100644 --- a/src/sysid/optimize.py +++ b/src/sysid/optimize.py @@ -4,14 +4,21 @@ Minimises the trajectory-matching cost between a MuJoCo rollout and a recorded real-robot sequence. Uses the ``cmaes`` package (pure-Python CMA-ES with native box-constraint support). -All 28 parameters (motor + pendulum/arm) are optimised jointly from a -single full-system recording. Velocities are optionally preprocessed -with Savitzky-Golay differentiation for cleaner targets. +Motor parameters are **locked** from the motor-only sysid — only +pendulum/arm inertial parameters, joint dynamics, and ctrl_limit are +optimised. Velocities are optionally preprocessed with Savitzky-Golay +differentiation for cleaner targets. Usage: python -m src.sysid.optimize \ --robot-path assets/rotary_cartpole \ - --recording assets/rotary_cartpole/recordings/capture_....npz + --recording assets/rotary_cartpole/recordings/capture_20260314_000435.npz + + # Shorter run for testing: + python -m src.sysid.optimize \ + --robot-path assets/rotary_cartpole \ + --recording .npz \ + --max-generations 10 --population-size 8 """ from __future__ import annotations @@ -26,17 +33,13 @@ import numpy as np import structlog from src.sysid.rollout import ( + LOCKED_MOTOR_PARAMS, PARAM_SETS, ROTARY_CARTPOLE_PARAMS, ParamSpec, - _make_actuator, - _resolve_params, bounds_arrays, - build_base_model, - defaults_dict, defaults_vector, params_to_dict, - patch_model, rollout, windowed_rollout, ) @@ -45,8 +48,62 @@ log = structlog.get_logger() # ── Velocity preprocessing ─────────────────────────────────────────── -# Shared implementation in src.sysid.preprocess. -from src.sysid.preprocess import preprocess_recording as _preprocess_recording + + +def _preprocess_recording( + recording: dict[str, np.ndarray], + preprocess_vel: bool = True, + sg_window: int = 7, + sg_polyorder: int = 3, +) -> dict[str, np.ndarray]: + """Optionally recompute velocities using Savitzky-Golay differentiation. + + Applies SG filtering to both motor_vel and pendulum_vel, replacing + the noisy firmware finite-difference velocities with smooth + analytical derivatives of the (clean) angle signals. + """ + if not preprocess_vel: + return recording + + from scipy.signal import savgol_filter + + rec = dict(recording) + times = rec["time"] + dt = float(np.mean(np.diff(times))) + + # Motor velocity. + rec["motor_vel_raw"] = rec["motor_vel"].copy() + rec["motor_vel"] = savgol_filter( + rec["motor_angle"], + window_length=sg_window, + polyorder=sg_polyorder, + deriv=1, + delta=dt, + ) + + # Pendulum velocity. + rec["pendulum_vel_raw"] = rec["pendulum_vel"].copy() + rec["pendulum_vel"] = savgol_filter( + rec["pendulum_angle"], + window_length=sg_window, + polyorder=sg_polyorder, + deriv=1, + delta=dt, + ) + + motor_noise = np.std(rec["motor_vel_raw"] - rec["motor_vel"]) + pend_noise = np.std(rec["pendulum_vel_raw"] - rec["pendulum_vel"]) + log.info( + "velocity_preprocessed", + method="savgol", + sg_window=sg_window, + sg_polyorder=sg_polyorder, + dt_ms=f"{dt*1000:.1f}", + motor_noise_std=f"{motor_noise:.3f} rad/s", + pend_noise_std=f"{pend_noise:.3f} rad/s", + ) + + return rec # ── Cost function ──────────────────────────────────────────────────── @@ -115,6 +172,7 @@ def cost_function( vel_weight: float = 0.1, pendulum_scale: float = 3.0, window_duration: float = 0.5, + motor_params: dict[str, float] | None = None, ) -> float: """Compute trajectory-matching cost for a candidate parameter vector. @@ -140,6 +198,7 @@ def cost_function( window_duration=window_duration, sim_dt=sim_dt, substeps=substeps, + motor_params=motor_params, ) else: sim = rollout( @@ -148,6 +207,7 @@ def cost_function( actions=recording["action"], sim_dt=sim_dt, substeps=substeps, + motor_params=motor_params, ) except Exception as exc: log.warning("rollout_failed", error=str(exc)) @@ -161,122 +221,6 @@ def cost_function( return _compute_trajectory_cost(sim, recording, pos_weight, vel_weight, pendulum_scale) -# ── Fast cost (model-cached) ──────────────────────────────────────── - - -def _fast_cost( - params_vec: np.ndarray, - recording: dict[str, np.ndarray], - model, # mujoco.MjModel (cached, mutated in-place) - body_ids: dict[str, int], - dof_ids: dict[str, int], - specs: list[ParamSpec], - sim_dt: float = 0.002, - substeps: int = 10, - pos_weight: float = 1.0, - vel_weight: float = 0.1, - pendulum_scale: float = 3.0, - window_duration: float = 0.5, -) -> float: - """Like cost_function but patches a cached model instead of rebuilding.""" - import mujoco as _mj - from src.runners.mujoco import ActuatorLimits - - params = params_to_dict(params_vec, specs) - if not _check_inertia_valid(params): - return 1e6 - - # Patch the cached model in-place — no XML, no temp files. - patch_model(model, params, body_ids, dof_ids) - - # Rebuild actuator from params (motor params change per candidate). - actuator = _make_actuator(params) - - try: - data = _mj.MjData(model) - limits = ActuatorLimits(model) - ctrl_limit = params.get("ctrl_limit", 0.588) - - times = recording["time"] - actions = recording["action"] - n = len(actions) - - sim_motor_angle = np.zeros(n, dtype=np.float64) - sim_motor_vel = np.zeros(n, dtype=np.float64) - sim_pend_angle = np.zeros(n, dtype=np.float64) - sim_pend_vel = np.zeros(n, dtype=np.float64) - - if window_duration > 0: - # Windowed (multiple shooting). - real_motor = recording["motor_angle"] - real_motor_vel = recording["motor_vel"] - real_pend = recording["pendulum_angle"] - real_pend_vel = recording["pendulum_vel"] - - t0, t_end = times[0], times[-1] - window_starts: list[int] = [] - current_t = t0 - while current_t < t_end: - idx = int(np.searchsorted(times, current_t)) - idx = min(idx, n - 1) - window_starts.append(idx) - current_t += window_duration - n_windows = len(window_starts) - - for w, w_start in enumerate(window_starts): - w_end = window_starts[w + 1] if w + 1 < n_windows else n - _mj.mj_resetData(model, data) - data.qpos[0] = real_motor[w_start] - data.qpos[1] = real_pend[w_start] - data.qvel[0] = real_motor_vel[w_start] - data.qvel[1] = real_pend_vel[w_start] - data.ctrl[:] = 0.0 - _mj.mj_forward(model, data) - - for i in range(w_start, w_end): - action = max(-ctrl_limit, min(ctrl_limit, float(actions[i]))) - ctrl = actuator.transform_ctrl(action) - data.ctrl[0] = ctrl - for _ in range(substeps): - limits.enforce(model, data) - data.qfrc_applied[0] = actuator.compute_motor_force(data.qvel[0], ctrl) - _mj.mj_step(model, data) - sim_motor_angle[i] = data.qpos[0] - sim_pend_angle[i] = data.qpos[1] - sim_motor_vel[i] = data.qvel[0] - sim_pend_vel[i] = data.qvel[1] - else: - # Open-loop single-shot. - _mj.mj_resetData(model, data) - for i in range(n): - action = max(-ctrl_limit, min(ctrl_limit, float(actions[i]))) - ctrl = actuator.transform_ctrl(action) - data.ctrl[0] = ctrl - for _ in range(substeps): - limits.enforce(model, data) - data.qfrc_applied[0] = actuator.compute_motor_force(data.qvel[0], ctrl) - _mj.mj_step(model, data) - sim_motor_angle[i] = data.qpos[0] - sim_pend_angle[i] = data.qpos[1] - sim_motor_vel[i] = data.qvel[0] - sim_pend_vel[i] = data.qvel[1] - - sim = { - "motor_angle": sim_motor_angle, - "motor_vel": sim_motor_vel, - "pendulum_angle": sim_pend_angle, - "pendulum_vel": sim_pend_vel, - } - except Exception: - return 1e6 - - for key in sim: - if np.any(~np.isfinite(sim[key])): - return 1e6 - - return _compute_trajectory_cost(sim, recording, pos_weight, vel_weight, pendulum_scale) - - # ── Parallel evaluation helper (module-level for pickling) ─────────── # Shared state set by the parent process before spawning workers. @@ -285,34 +229,22 @@ _par_robot_path: Path = Path(".") _par_specs: list[ParamSpec] = [] _par_kwargs: dict = {} -# Cached model (built once per worker, patched per candidate). -_par_model: object = None # mujoco.MjModel -_par_body_ids: dict[str, int] = {} -_par_dof_ids: dict[str, int] = {} - def _init_worker(recording, robot_path, specs, kwargs): """Initialiser run once per worker process.""" global _par_recording, _par_robot_path, _par_specs, _par_kwargs - global _par_model, _par_body_ids, _par_dof_ids _par_recording = recording _par_robot_path = robot_path _par_specs = specs _par_kwargs = kwargs - # Build base model once — reused for all candidates in this worker. - _par_model, _, _par_body_ids, _par_dof_ids = build_base_model(robot_path) - _par_model.opt.timestep = kwargs.get("sim_dt", 0.002) - def _eval_candidate(x_natural: np.ndarray) -> float: """Evaluate a single candidate — called in worker processes.""" - return _fast_cost( + return cost_function( x_natural, _par_recording, - _par_model, - _par_body_ids, - _par_dof_ids, + _par_robot_path, _par_specs, **_par_kwargs, ) @@ -336,11 +268,13 @@ def optimize( window_duration: float = 0.5, seed: int = 42, preprocess_vel: bool = True, + sg_window: int = 7, + sg_polyorder: int = 3, ) -> dict: """Run CMA-ES optimisation and return results. - All parameters (motor + pendulum/arm) are optimised jointly from a - single full-system recording. + Motor parameters are locked from the motor-only sysid. + Only pendulum/arm parameters are optimised. Returns a dict with: best_params: dict[str, float] @@ -348,6 +282,7 @@ def optimize( history: list of (generation, best_cost) tuples recording: str (path used) specs: list of param names + motor_params: dict of locked motor params """ from cmaes import CMA @@ -357,11 +292,23 @@ def optimize( if specs is None: specs = ROTARY_CARTPOLE_PARAMS + motor_params = dict(LOCKED_MOTOR_PARAMS) + log.info( + "motor_params_locked", + n_params=len(motor_params), + gear_avg=f"{(motor_params['actuator_gear_pos'] + motor_params['actuator_gear_neg']) / 2:.4f}", + ) + # Load recording. recording = dict(np.load(recording_path)) # Preprocessing: SG velocity recomputation. - recording = _preprocess_recording(recording, preprocess_vel=preprocess_vel) + recording = _preprocess_recording( + recording, + preprocess_vel=preprocess_vel, + sg_window=sg_window, + sg_polyorder=sg_polyorder, + ) n_samples = len(recording["time"]) duration = recording["time"][-1] - recording["time"][0] @@ -429,6 +376,7 @@ def optimize( vel_weight=vel_weight, pendulum_scale=pendulum_scale, window_duration=window_duration, + motor_params=motor_params, ) log.info("parallel_workers", n_workers=n_workers) @@ -480,19 +428,6 @@ def optimize( gen_best=f"{min(c for _, c in solutions):.6f}", ) - # Early stopping: no improvement in last 50 generations. - patience = 50 - if len(history) > patience: - old_cost = history[-patience][1] - if old_cost - best_cost < old_cost * 0.001: - log.info( - "early_stopping", - gen=gen, - best_cost=f"{best_cost:.6f}", - stall_gens=patience, - ) - break - total_time = time.monotonic() - t0 # Clean up process pool. @@ -506,8 +441,7 @@ def optimize( "cmaes_finished", best_cost=f"{best_cost:.6f}", total_time=f"{total_time:.1f}s", - generations=len(history), - evaluations=len(history) * population_size, + evaluations=max_generations * population_size, ) # Log parameter comparison. @@ -531,6 +465,7 @@ def optimize( "recording": str(recording_path), "param_names": [s.name for s in specs], "defaults": {s.name: s.default for s in specs}, + "motor_params": motor_params, "preprocess_vel": preprocess_vel, "timestamp": datetime.now().isoformat(), } @@ -581,12 +516,16 @@ def main() -> None: action="store_true", help="Skip Savitzky-Golay velocity preprocessing", ) + parser.add_argument("--sg-window", type=int, default=7, + help="Savitzky-Golay window length (odd, default 7)") + parser.add_argument("--sg-polyorder", type=int, default=3, + help="Savitzky-Golay polynomial order (default 3)") parser.add_argument( "--param-set", type=str, default="full", choices=list(PARAM_SETS.keys()), - help="Parameter set to optimize: 'full' (28), 'motor' (13), or 'pendulum' (15)", + help="Parameter set to optimize: 'reduced' (6 params, fast) or 'full' (15 params)", ) args = parser.parse_args() @@ -607,6 +546,8 @@ def main() -> None: window_duration=args.window_duration, seed=args.seed, preprocess_vel=not args.no_preprocess_vel, + sg_window=args.sg_window, + sg_polyorder=args.sg_polyorder, ) # Save results JSON. @@ -631,6 +572,7 @@ def main() -> None: export_tuned_files( robot_path=args.robot_path, params=result["best_params"], + motor_params=result.get("motor_params"), ) diff --git a/src/sysid/preprocess.py b/src/sysid/preprocess.py deleted file mode 100644 index c61e912..0000000 --- a/src/sysid/preprocess.py +++ /dev/null @@ -1,158 +0,0 @@ -"""Recording preprocessing — clean velocity estimation from angle data. - -The ESP32 firmware computes velocity as a raw finite-difference of encoder -counts at 50 Hz. With a 1320 CPR encoder that gives ~0.24 rad/s of -quantisation noise per count. This module replaces the noisy firmware -velocity with a smooth differentiation of the (much cleaner) angle signal. - -Pipeline: - 1. Dequantize angle signal (Gaussian smooth to round off staircase edges) - 2. Savitzky-Golay differentiation to compute velocity (zero phase lag) - -This is standard practice in robotics sysid — see e.g. MATLAB System ID -Toolbox, Drake's trajectory processing, or ETH's ANYmal sysid pipeline. -""" - -from __future__ import annotations - -import numpy as np -from scipy.ndimage import gaussian_filter1d -from scipy.signal import savgol_filter - -import structlog - -log = structlog.get_logger() - -# Default encoder CPR for angle dequantization. -_DEFAULT_CPR = 1320 - - -def dequantize_angle( - angles: np.ndarray, - cpr: int = _DEFAULT_CPR, - sigma: float = 0.6, -) -> np.ndarray: - """Smooth out encoder quantization staircase before differentiation. - - A 1320 CPR encoder quantizes the angle signal into 0.00476 rad steps. - When differentiated, these steps produce velocity spikes (Dirac-like). - A tiny Gaussian blur (sigma=0.6 samples ~ 12ms at 50 Hz) rounds the - staircase edges without affecting dynamics (<2 Hz bandwidth loss). - """ - if sigma <= 0 or cpr <= 0: - return angles - return gaussian_filter1d(angles, sigma=sigma) - - -def recompute_velocity( - recording: dict[str, np.ndarray], - window_length: int = 11, - polyorder: int = 3, - deriv: int = 1, - dequantize: bool = True, - dequant_sigma: float = 0.6, - keys: tuple[str, str] = ("motor_angle", "motor_vel"), -) -> dict[str, np.ndarray]: - """Recompute velocity from angle using Savitzky-Golay differentiation. - - Parameters - ---------- - recording : dict with at least 'time' and the angle/vel keys. - window_length : SG filter window (must be odd, > polyorder). - polyorder : Polynomial order for the SG filter (3 = cubic). - deriv : Derivative order (1 = first derivative = velocity). - dequantize : Whether to smooth encoder quantization staircase. - dequant_sigma : Gaussian sigma for dequantization (samples). - keys : (angle_key, vel_key) to process. - - Returns - ------- - New recording dict with vel replaced and vel_raw added. - """ - angle_key, vel_key = keys - rec = dict(recording) - - times = rec["time"] - angles = rec[angle_key].copy() - dt = np.mean(np.diff(times)) - - rec[f"{vel_key}_raw"] = rec[vel_key].copy() - - if dequantize: - angles = dequantize_angle(angles, sigma=dequant_sigma) - - vel_sg = savgol_filter( - angles, - window_length=window_length, - polyorder=polyorder, - deriv=deriv, - delta=dt, - ) - - raw_vel = rec[f"{vel_key}_raw"] - noise_estimate = np.std(raw_vel - vel_sg) - max_diff = np.max(np.abs(raw_vel - vel_sg)) - - log.info( - "velocity_recomputed", - channel=vel_key, - method="savgol", - window=window_length, - polyorder=polyorder, - dt=f"{dt*1000:.1f}ms", - noise_std=f"{noise_estimate:.3f} rad/s", - max_diff=f"{max_diff:.3f} rad/s", - ) - - rec[vel_key] = vel_sg - return rec - - -def preprocess_recording( - recording: dict[str, np.ndarray], - preprocess_vel: bool = True, - sg_window: int = 11, - sg_polyorder: int = 3, -) -> dict[str, np.ndarray]: - """Preprocess a full-system recording (motor + pendulum velocities). - - Applies SG differentiation to both motor and pendulum channels. - """ - if not preprocess_vel: - return recording - - rec = recompute_velocity( - recording, - window_length=sg_window, - polyorder=sg_polyorder, - keys=("motor_angle", "motor_vel"), - ) - if "pendulum_angle" in rec and "pendulum_vel" in rec: - rec = recompute_velocity( - rec, - window_length=sg_window, - polyorder=sg_polyorder, - keys=("pendulum_angle", "pendulum_vel"), - ) - return rec - - -def match_filter_velocity( - sim_vel: np.ndarray, - real_vel: np.ndarray, - dt: float, - cutoff_hz: float = 8.0, -) -> tuple[np.ndarray, np.ndarray]: - """Apply identical lowpass to both sim and real velocity for fair comparison. - - MuJoCo sim velocity has full physics bandwidth while real velocity is - band-limited by the SG filter. Matching ensures fair cost comparison. - """ - from scipy.signal import butter, filtfilt - - fs = 1.0 / dt - nyq = fs / 2.0 - norm_cutoff = min(cutoff_hz / nyq, 0.99) - - b, a = butter(2, norm_cutoff, btype="low") - return filtfilt(b, a, sim_vel), filtfilt(b, a, real_vel) diff --git a/src/sysid/rollout.py b/src/sysid/rollout.py index cf386ad..7ac2e96 100644 --- a/src/sysid/rollout.py +++ b/src/sysid/rollout.py @@ -7,11 +7,12 @@ the simulated trajectory for comparison with the real recording. This module is the inner loop of the CMA-ES optimizer: it is called once per candidate parameter vector per generation. -All motor + pendulum/arm parameters are optimised jointly from a single -full-system recording. The asymmetric motor model (deadzone, gear -compensation, Coulomb friction + Stribeck, viscous damping, action bias) -is applied via ``ActuatorConfig.transform_ctrl()`` and -``compute_motor_force()``. +Motor parameters are **locked** from the motor-only sysid result. +The optimizer only fits +pendulum/arm inertial parameters, pendulum joint dynamics, and +``ctrl_limit``. The asymmetric motor model (deadzone, gear compensation, +Coulomb friction, viscous damping, quadratic drag, back-EMF) is applied +via ``ActuatorConfig.transform_ctrl()`` and ``compute_motor_force()``. """ from __future__ import annotations @@ -31,6 +32,26 @@ from src.runners.mujoco import ActuatorLimits, load_mujoco_model from src.sysid._urdf import patch_link_inertials +# ── Locked motor parameters (from motor-only sysid) ───────────────── +# These are FIXED and not optimised. They come from the 12-param model +# in robot.yaml (from motor-only sysid, cost 0.862). + +LOCKED_MOTOR_PARAMS: dict[str, float] = { + "actuator_gear_pos": 0.424182, + "actuator_gear_neg": 0.425031, + "actuator_filter_tau": 0.00503506, + "motor_damping_pos": 0.00202682, + "motor_damping_neg": 0.0146651, + "motor_armature": 0.00277342, + "motor_frictionloss_pos": 0.0573282, + "motor_frictionloss_neg": 0.0533549, + "viscous_quadratic": 0.000285329, + "back_emf_gain": 0.00675809, + "motor_deadzone_pos": 0.141291, + "motor_deadzone_neg": 0.0780148, +} + + # ── Tunable parameter specification ────────────────────────────────── @@ -45,31 +66,16 @@ class ParamSpec: log_scale: bool = False # optimise in log-space (masses, inertias) -# ── Motor parameters (13) ──────────────────────────────────────────── -# Defaults from motor-only sysid (cost 0.2117). -MOTOR_PARAMS: list[ParamSpec] = [ - ParamSpec("actuator_gear_pos", 0.371194, 0.005, 1.5, log_scale=True), - ParamSpec("actuator_gear_neg", 0.428143, 0.005, 1.5, log_scale=True), - ParamSpec("actuator_filter_tau", 0.022301, 0.001, 0.20), - ParamSpec("motor_damping_pos", 0.001384, 1e-5, 0.1, log_scale=True), - ParamSpec("motor_damping_neg", 0.005196, 1e-5, 0.1, log_scale=True), - ParamSpec("motor_armature", 0.002753, 1e-6, 0.01, log_scale=True), - ParamSpec("motor_frictionloss_pos", 0.036744, 0.001, 0.2, log_scale=True), - ParamSpec("motor_frictionloss_neg", 0.069082, 0.001, 0.2, log_scale=True), - ParamSpec("stribeck_friction_boost", 0.0, 0.0, 0.15), - ParamSpec("stribeck_vel", 2.0, 0.1, 8.0), - ParamSpec("motor_deadzone_pos", 0.14182, 0.0, 0.30), - ParamSpec("motor_deadzone_neg", 0.031454, 0.0, 0.30), - ParamSpec("action_bias", 0.0, -0.10, 0.10), -] - -# ── Pendulum/arm parameters (15) ───────────────────────────────────── -# Defaults from Fusion 360 URDF export. -PENDULUM_PARAMS: list[ParamSpec] = [ +# Pendulum sysid parameters — motor params are LOCKED (not here). +# Order matters: the optimizer maps a flat vector to these specs. +# Defaults are from the URDF exported by Fusion 360. +ROTARY_CARTPOLE_PARAMS: list[ParamSpec] = [ + # ── Arm link (URDF) ────────────────────────────────────────── ParamSpec("arm_mass", 0.02110, 0.005, 0.08, log_scale=True), ParamSpec("arm_com_x", -0.00710, -0.03, 0.03), ParamSpec("arm_com_y", 0.00085, -0.02, 0.02), ParamSpec("arm_com_z", 0.00795, -0.02, 0.02), + # ── Pendulum link (URDF) ───────────────────────────────────── ParamSpec("pendulum_mass", 0.03937, 0.010, 0.10, log_scale=True), ParamSpec("pendulum_com_x", 0.06025, 0.01, 0.15), ParamSpec("pendulum_com_y", -0.07602, -0.20, 0.0), @@ -78,18 +84,37 @@ PENDULUM_PARAMS: list[ParamSpec] = [ ParamSpec("pendulum_iyy", 3.70e-05, 1e-07, 1e-03, log_scale=True), ParamSpec("pendulum_izz", 7.83e-05, 1e-07, 1e-03, log_scale=True), ParamSpec("pendulum_ixy", -6.93e-06, -1e-03, 1e-03), + # ── Pendulum joint dynamics ────────────────────────────────── + ParamSpec("pendulum_damping", 0.0001, 1e-6, 0.05, log_scale=True), + ParamSpec("pendulum_frictionloss", 0.0001, 1e-6, 0.05, log_scale=True), + # ── Hardware realism (control pipeline) ──────────────────── + ParamSpec("ctrl_limit", 0.588, 0.45, 0.70), # MAX_MOTOR_SPEED / 255 +] + + +# Extended set: full params + motor armature (compensates for the +# motor-only sysid having captured arm/pendulum loading in armature). +EXTENDED_PARAMS: list[ParamSpec] = ROTARY_CARTPOLE_PARAMS + [ + ParamSpec("motor_armature", 0.00277, 0.0005, 0.02, log_scale=True), +] + + +# Reduced set: only the 6 most impactful pendulum parameters. +# Good for a fast first pass — converges in ~50 generations. +REDUCED_PARAMS: list[ParamSpec] = [ + ParamSpec("pendulum_mass", 0.03937, 0.010, 0.10, log_scale=True), + ParamSpec("pendulum_com_x", 0.06025, 0.01, 0.15), + ParamSpec("pendulum_com_y", -0.07602, -0.20, 0.0), + ParamSpec("pendulum_ixx", 6.20e-05, 1e-07, 1e-03, log_scale=True), ParamSpec("pendulum_damping", 0.0001, 1e-6, 0.05, log_scale=True), ParamSpec("pendulum_frictionloss", 0.0001, 1e-6, 0.05, log_scale=True), - ParamSpec("ctrl_limit", 0.588, 0.45, 0.70), ] -# ── Combined parameter sets ────────────────────────────────────────── -ROTARY_CARTPOLE_PARAMS: list[ParamSpec] = MOTOR_PARAMS + PENDULUM_PARAMS PARAM_SETS: dict[str, list[ParamSpec]] = { "full": ROTARY_CARTPOLE_PARAMS, - "motor": MOTOR_PARAMS, - "pendulum": PENDULUM_PARAMS, + "extended": EXTENDED_PARAMS, + "reduced": REDUCED_PARAMS, } @@ -109,13 +134,6 @@ def defaults_vector(specs: list[ParamSpec] | None = None) -> np.ndarray: return np.array([s.default for s in specs], dtype=np.float64) -def defaults_dict(specs: list[ParamSpec] | None = None) -> dict[str, float]: - """Return the default parameter dict (all 28 params).""" - if specs is None: - specs = ROTARY_CARTPOLE_PARAMS - return {s.name: s.default for s in specs} - - def bounds_arrays( specs: list[ParamSpec] | None = None, ) -> tuple[np.ndarray, np.ndarray]: @@ -130,23 +148,19 @@ def bounds_arrays( # ── MuJoCo model building with parameter overrides ────────────────── -def _resolve_params(params: dict[str, float]) -> dict[str, float]: - """Fill in defaults for any missing params.""" - full = defaults_dict() - full.update(params) - return full - - def _build_model( robot_path: Path, params: dict[str, float], + motor_params: dict[str, float] | None = None, ) -> tuple[mujoco.MjModel, ActuatorConfig]: """Build a MuJoCo model with sysid overrides. Returns (model, actuator) — use ``actuator.transform_ctrl()`` and ``actuator.compute_motor_force()`` in the rollout loop. """ - p = _resolve_params(params) + if motor_params is None: + motor_params = LOCKED_MOTOR_PARAMS + robot_path = Path(robot_path).resolve() # ── Patch URDF inertial parameters to a temp file ──────────── @@ -154,7 +168,7 @@ def _build_model( urdf_path = robot_path / robot_yaml["urdf"] tree = ET.parse(urdf_path) - patch_link_inertials(tree.getroot(), p) + patch_link_inertials(tree.getroot(), params) fd, tmp_urdf = tempfile.mkstemp( suffix=".urdf", prefix="_sysid_", dir=str(robot_path), @@ -163,22 +177,39 @@ def _build_model( tmp_urdf_path = Path(tmp_urdf) tree.write(str(tmp_urdf_path), xml_declaration=True, encoding="unicode") - # ── Build RobotConfig with full motor model ────────────────── + # ── Build RobotConfig with full motor sysid values ─────────── + gear_pos = motor_params.get("actuator_gear_pos", 0.424182) + gear_neg = motor_params.get("actuator_gear_neg", 0.425031) + motor_armature = params.get( + "motor_armature", + motor_params.get("motor_armature", 0.00277342), + ) + pend_damping = params.get("pendulum_damping", 0.0001) + pend_frictionloss = params.get("pendulum_frictionloss", 0.0001) + act_cfg = robot_yaml["actuators"][0] + ctrl_lo, ctrl_hi = act_cfg.get("ctrl_range", [-1.0, 1.0]) actuator = ActuatorConfig( joint=act_cfg["joint"], type="motor", - gear=(p["actuator_gear_pos"], p["actuator_gear_neg"]), - ctrl_range=(act_cfg.get("ctrl_range", [-1.0, 1.0])[0], - act_cfg.get("ctrl_range", [-1.0, 1.0])[1]), - deadzone=(p["motor_deadzone_pos"], p["motor_deadzone_neg"]), - damping=(p["motor_damping_pos"], p["motor_damping_neg"]), - frictionloss=(p["motor_frictionloss_pos"], p["motor_frictionloss_neg"]), - stribeck_friction_boost=p["stribeck_friction_boost"], - stribeck_vel=p["stribeck_vel"], - action_bias=p["action_bias"], - filter_tau=p["actuator_filter_tau"], + gear=(gear_pos, gear_neg), + ctrl_range=(ctrl_lo, ctrl_hi), + deadzone=( + motor_params.get("motor_deadzone_pos", 0.141), + motor_params.get("motor_deadzone_neg", 0.078), + ), + damping=( + motor_params.get("motor_damping_pos", 0.002), + motor_params.get("motor_damping_neg", 0.015), + ), + frictionloss=( + motor_params.get("motor_frictionloss_pos", 0.057), + motor_params.get("motor_frictionloss_neg", 0.053), + ), + filter_tau=motor_params.get("actuator_filter_tau", 0.005), + viscous_quadratic=motor_params.get("viscous_quadratic", 0.000285), + back_emf_gain=motor_params.get("back_emf_gain", 0.00676), ) robot = RobotConfig( @@ -187,12 +218,12 @@ def _build_model( joints={ "motor_joint": JointConfig( damping=0.0, - armature=p["motor_armature"], + armature=motor_armature, frictionloss=0.0, ), "pendulum_joint": JointConfig( - damping=p["pendulum_damping"], - frictionloss=p["pendulum_frictionloss"], + damping=pend_damping, + frictionloss=pend_frictionloss, ), }, ) @@ -205,136 +236,7 @@ def _build_model( return model, actuator -def build_base_model( - robot_path: str | Path, - params: dict[str, float] | None = None, -) -> tuple[mujoco.MjModel, ActuatorConfig, dict[str, int], dict[str, int]]: - """Build a base MuJoCo model once (with default or given params). - Returns (model, actuator, body_ids, dof_ids) where body_ids/dof_ids - map names to indices for fast in-place patching. - """ - if params is None: - params = defaults_dict() - model, actuator = _build_model(Path(robot_path).resolve(), params) - - body_ids = {} - for name in ("arm", "pendulum"): - body_ids[name] = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_BODY, name) - - dof_ids = {} - for name in ("motor_joint", "pendulum_joint"): - jnt_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_JOINT, name) - dof_ids[name] = model.jnt_dofadr[jnt_id] - - return model, actuator, body_ids, dof_ids - - -def _rotmat_to_quat(R: np.ndarray) -> np.ndarray: - """Convert a 3×3 rotation matrix to a MuJoCo quaternion [w, x, y, z].""" - quat = np.empty(4) - mujoco.mju_mat2Quat(quat, R.flatten()) - return quat - - -def _patch_pendulum_inertia( - model: mujoco.MjModel, - body_id: int, - params: dict[str, float], -) -> None: - """Diagonalize pendulum inertia tensor and write to model.""" - defs = defaults_dict() - - ixx = params.get("pendulum_ixx", defs["pendulum_ixx"]) - iyy = params.get("pendulum_iyy", defs["pendulum_iyy"]) - izz = params.get("pendulum_izz", defs["pendulum_izz"]) - ixy = params.get("pendulum_ixy", defs.get("pendulum_ixy", 0.0)) - - # Build full 3×3 inertia tensor (ixz = iyz = 0 in our URDF). - I = np.array([ - [ixx, ixy, 0.0], - [ixy, iyy, 0.0], - [0.0, 0.0, izz], - ]) - - # Eigendecompose — eigenvalues are principal moments, eigenvectors - # define the rotation from link frame to principal frame. - eigenvalues, eigenvectors = np.linalg.eigh(I) - - # eigh returns ascending order; MuJoCo wants them in the order that - # the eigenvector matrix forms a proper rotation (det = +1). - if np.linalg.det(eigenvectors) < 0: - eigenvectors[:, 0] *= -1 - - # Clamp tiny/negative eigenvalues (numerical noise). - eigenvalues = np.maximum(eigenvalues, 1e-10) - - # Enforce triangle inequality (required for valid rigid body inertia). - # For principal moments sorted ascending (a <= b <= c): a + b >= c. - # When violated, MuJoCo falls back to spherical inertia (trace/3). - # We must match this to keep fast path consistent with URDF loading. - s = np.sort(eigenvalues) - if s[0] + s[1] < s[2]: - sphere = np.sum(eigenvalues) / 3.0 - eigenvalues[:] = sphere - eigenvectors = np.eye(3) - - model.body_inertia[body_id] = eigenvalues - model.body_iquat[body_id] = _rotmat_to_quat(eigenvectors) - - -def patch_model( - model: mujoco.MjModel, - params: dict[str, float], - body_ids: dict[str, int], - dof_ids: dict[str, int], -) -> None: - """Patch a cached MuJoCo model in-place with new sysid parameters. - - Much faster than _build_model: no XML parsing, no temp files, no - model reload. Just direct array writes. - """ - p = _resolve_params(params) - - # ── Arm body ───────────────────────────────────────────────── - arm_id = body_ids["arm"] - model.body_mass[arm_id] = p["arm_mass"] - for i, key in enumerate(("arm_com_x", "arm_com_y", "arm_com_z")): - model.body_ipos[arm_id, i] = p[key] - - # ── Pendulum body ──────────────────────────────────────────── - pend_id = body_ids["pendulum"] - model.body_mass[pend_id] = p["pendulum_mass"] - for i, key in enumerate(("pendulum_com_x", "pendulum_com_y", "pendulum_com_z")): - model.body_ipos[pend_id, i] = p[key] - - _patch_pendulum_inertia(model, pend_id, p) - - # ── Joint dynamics ─────────────────────────────────────────── - motor_dof = dof_ids["motor_joint"] - model.dof_armature[motor_dof] = p["motor_armature"] - - pend_dof = dof_ids["pendulum_joint"] - model.dof_damping[pend_dof] = p["pendulum_damping"] - model.dof_frictionloss[pend_dof] = p["pendulum_frictionloss"] - - -def _make_actuator(params: dict[str, float]) -> ActuatorConfig: - """Build an ActuatorConfig from a params dict (for fast-path use).""" - p = _resolve_params(params) - return ActuatorConfig( - joint="motor_joint", - type="motor", - gear=(p["actuator_gear_pos"], p["actuator_gear_neg"]), - ctrl_range=(-1.0, 1.0), - deadzone=(p["motor_deadzone_pos"], p["motor_deadzone_neg"]), - damping=(p["motor_damping_pos"], p["motor_damping_neg"]), - frictionloss=(p["motor_frictionloss_pos"], p["motor_frictionloss_neg"]), - stribeck_friction_boost=p["stribeck_friction_boost"], - stribeck_vel=p["stribeck_vel"], - action_bias=p["action_bias"], - filter_tau=p["actuator_filter_tau"], - ) # ── Simulation rollout ─────────────────────────────────────────────── @@ -346,30 +248,35 @@ def rollout( actions: np.ndarray, sim_dt: float = 0.002, substeps: int = 10, + motor_params: dict[str, float] | None = None, ) -> dict[str, np.ndarray]: """Replay recorded actions in MuJoCo with overridden parameters. Parameters ---------- robot_path : asset directory - params : named parameter overrides (all motor + pendulum params) + params : named parameter overrides (pendulum/arm only) actions : (N,) normalised actions [-1, 1] from the recording sim_dt : MuJoCo physics timestep substeps : physics substeps per control step + motor_params : locked motor params (default: LOCKED_MOTOR_PARAMS) Returns ------- dict with keys: motor_angle, motor_vel, pendulum_angle, pendulum_vel + Each is an (N,) numpy array of simulated values. """ + if motor_params is None: + motor_params = LOCKED_MOTOR_PARAMS + robot_path = Path(robot_path).resolve() - model, actuator = _build_model(robot_path, params) + model, actuator = _build_model(robot_path, params, motor_params) model.opt.timestep = sim_dt data = mujoco.MjData(model) mujoco.mj_resetData(model, data) n = len(actions) - p = _resolve_params(params) - ctrl_limit = p["ctrl_limit"] + ctrl_limit = params.get("ctrl_limit", 0.588) sim_motor_angle = np.zeros(n, dtype=np.float64) sim_motor_vel = np.zeros(n, dtype=np.float64) @@ -408,27 +315,41 @@ def windowed_rollout( window_duration: float = 0.5, sim_dt: float = 0.002, substeps: int = 10, + motor_params: dict[str, float] | None = None, ) -> dict[str, np.ndarray | float]: """Multiple-shooting rollout — split recording into short windows. + For each window: + 1. Initialize MuJoCo state from the real qpos/qvel at the window start. + 2. Replay the recorded actions within the window. + 3. Record the simulated output. + + Motor dynamics (asymmetric friction, damping, back-EMF, etc.) are + applied via qfrc_applied using the locked motor sysid parameters. + Parameters ---------- robot_path : asset directory - params : named parameter overrides (all motor + pendulum params) + params : named parameter overrides (pendulum/arm only) recording : dict with keys time, action, motor_angle, motor_vel, pendulum_angle, pendulum_vel (all 1D arrays of length N) window_duration : length of each shooting window in seconds sim_dt : MuJoCo physics timestep substeps : physics substeps per control step + motor_params : locked motor params (default: LOCKED_MOTOR_PARAMS) Returns ------- dict with: motor_angle, motor_vel, pendulum_angle, pendulum_vel — (N,) arrays + (stitched from per-window simulations) n_windows — number of windows used """ + if motor_params is None: + motor_params = LOCKED_MOTOR_PARAMS + robot_path = Path(robot_path).resolve() - model, actuator = _build_model(robot_path, params) + model, actuator = _build_model(robot_path, params, motor_params) model.opt.timestep = sim_dt data = mujoco.MjData(model) @@ -457,8 +378,7 @@ def windowed_rollout( window_starts.append(idx) current_t += window_duration - p = _resolve_params(params) - ctrl_limit = p["ctrl_limit"] + ctrl_limit = params.get("ctrl_limit", 0.588) n_windows = len(window_starts) for w, w_start in enumerate(window_starts): diff --git a/src/training/trainer.py b/src/training/trainer.py index 889fbbf..eff294e 100644 --- a/src/training/trainer.py +++ b/src/training/trainer.py @@ -52,6 +52,10 @@ class TrainerConfig: history_length: int = 0 # 0 = disabled, >0 = temporal window size embedding_dim: int = 32 # history encoder output dimension + # RMA (Rapid Motor Adaptation) + rma_mode: str = "none" # "none" | "teacher" | "deploy" + latent_dim: int = 8 # env encoder / adaptation latent dimension + # ── Video-recording trainer ────────────────────────────────────────── @@ -177,10 +181,11 @@ class Trainer: device=device, ) - # Determine raw obs dim (without history augmentation). - raw_obs_dim = 0 - if self.config.history_length > 0: - raw_obs_dim = self.runner.env.observation_space.shape[0] + # Determine raw obs dim (without history/privileged augmentation). + raw_obs_dim = self.runner.env.observation_space.shape[0] + + # Privileged dimension (μ) from the runner, if available. + mu_dim = getattr(self.runner, "privileged_dim", 0) self.model = SharedMLP( observation_space=obs_space, @@ -193,6 +198,9 @@ class Trainer: history_length=self.config.history_length, raw_obs_dim=raw_obs_dim, embedding_dim=self.config.embedding_dim, + rma_mode=self.config.rma_mode, + latent_dim=self.config.latent_dim, + mu_dim=mu_dim, ) models = {"policy": self.model, "value": self.model} diff --git a/tests/test_sim2real.py b/tests/test_sim2real.py index 740dcad..e69de29 100644 --- a/tests/test_sim2real.py +++ b/tests/test_sim2real.py @@ -1,94 +0,0 @@ -"""Unit tests for MuJoCoRunner domain randomization and history buffer.""" - -import dataclasses - -import numpy as np -import pytest -import torch -from gymnasium import spaces - -from src.runners.mujoco import DomainRandConfig, MuJoCoRunnerConfig -from src.models.mlp import SharedMLP, HistoryEncoder - - -class TestDomainRandConfig: - def test_default_all_zero(self) -> None: - cfg = DomainRandConfig() - assert cfg.mass_frac == 0.0 - assert cfg.friction_frac == 0.0 - assert cfg.gear_frac == 0.0 - - def test_from_dict(self) -> None: - d = {"mass_frac": 0.15, "gear_frac": 0.1} - cfg = DomainRandConfig(**d) - assert cfg.mass_frac == 0.15 - assert cfg.gear_frac == 0.1 - assert cfg.damping_frac == 0.0 # not set - - -class TestMuJoCoRunnerConfig: - def test_default_dr_disabled(self) -> None: - cfg = MuJoCoRunnerConfig() - assert isinstance(cfg.domain_rand, DomainRandConfig) - assert cfg.domain_rand.mass_frac == 0.0 - - def test_domain_rand_from_dict(self) -> None: - """Hydra passes nested configs as dicts — test __post_init__ converts.""" - cfg = MuJoCoRunnerConfig( - domain_rand={"mass_frac": 0.2, "friction_frac": 0.3}, # type: ignore[arg-type] - ) - assert isinstance(cfg.domain_rand, DomainRandConfig) - assert cfg.domain_rand.mass_frac == 0.2 - assert cfg.domain_rand.friction_frac == 0.3 - - def test_history_length_default(self) -> None: - cfg = MuJoCoRunnerConfig() - assert cfg.history_length == 0 - - -class TestHistoryEncoder: - def test_output_shape(self) -> None: - enc = HistoryEncoder(history_length=10, step_dim=7, embedding_dim=32) - x = torch.randn(4, 10, 7) # batch=4, H=10, step_dim=7 - out = enc(x) - assert out.shape == (4, 32) - - def test_different_embedding_dim(self) -> None: - enc = HistoryEncoder(history_length=5, step_dim=7, embedding_dim=16) - x = torch.randn(2, 5, 7) - out = enc(x) - assert out.shape == (2, 16) - - -class TestSharedMLPWithHistory: - def test_no_history(self) -> None: - """Without history, model works as before.""" - obs_space = spaces.Box(low=-1.0, high=1.0, shape=(6,)) - act_space = spaces.Box(low=-1.0, high=1.0, shape=(1,)) - model = SharedMLP(obs_space, act_space, torch.device("cpu"), - hidden_sizes=(32, 32)) - assert model.history_encoder is None - inp = {"states": torch.randn(4, 6)} - mean, log_std, _ = model.compute(inp, role="policy") - assert mean.shape == (4, 1) - - def test_with_history(self) -> None: - """With history, model splits obs and encodes history.""" - raw_obs_dim = 6 - act_dim = 1 - H = 10 - step_dim = raw_obs_dim + act_dim # 7 - aug_dim = raw_obs_dim + H * step_dim # 6 + 70 = 76 - - obs_space = spaces.Box(low=-1.0, high=1.0, shape=(aug_dim,)) - act_space = spaces.Box(low=-1.0, high=1.0, shape=(act_dim,)) - model = SharedMLP(obs_space, act_space, torch.device("cpu"), - hidden_sizes=(32, 32), - history_length=H, raw_obs_dim=raw_obs_dim, - embedding_dim=32) - assert model.history_encoder is not None - inp = {"states": torch.randn(4, aug_dim)} - mean, log_std, _ = model.compute(inp, role="policy") - assert mean.shape == (4, act_dim) - value, _ = model.compute(inp, role="value") - assert value.shape == (4, 1)