feat: sim2real domain randomization + reward fixes for rotary cartpole

Close the sim2real gap for the Furuta pendulum (swings up but can't
balance on hardware). Root causes were (a) no domain randomization, so
the policy overfit one deterministic sim instance, and (b) reward design
flaws that produced degenerate policies.

Domain randomization (runner-level, backend-agnostic):
- BaseRunner: domain_rand config; per-env action-delay buffer (latency),
  Gaussian qpos/qvel sensor noise, per-env dynamics-scale sampling
  (friction/damping/torque), resampled per episode. Sensor noise per step.
- privileged_obs/privileged_dim expose normalized DR factors (mu) for RMA.
- step() now uses clean state for reward/termination, noisy state for the
  observation the policy sees.
- MuJoCoRunner: applies per-env friction/damping/torque scales.
- robot.py: compute_motor_force gains friction/damping scale args.
- Configs: DR blocks for mujoco (full) and mjx (delay+noise); clean
  defaults for mujoco_single/serial; noise/delay anchored to recordings.

Reward fixes (rotary_cartpole):
- Shift upright reward to [0,1] (was [-1,1]) + alive_bonus, so surviving
  always beats ending early (kills the "suicide into the limit" policy).
- Add balance_bonus * upright * stillness so reward requires upright AND
  near-zero pendulum velocity (kills the "spin in full loops" policy).

Deploy:
- eval.py load_policy reconstructs the history/adaptation encoder
  (auto-detects its dim from the checkpoint) so DR+embedding policies load.

Fixes:
- MuJoCoRunner._sim_reset referenced self._env (typo) -> self.env, which
  was breaking every rotary-cartpole reset.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-09 20:48:25 +02:00
parent 8cc84d6a21
commit b37cd26690
22 changed files with 1219 additions and 781 deletions

1
.gitignore vendored
View File

@@ -5,6 +5,7 @@
outputs/
runs/
smac3_output/
training_log.txt
# MuJoCo
MUJOCO_LOG.TXT

View File

@@ -1,23 +1,21 @@
# Tuned robot config — generated by src.sysid (2026-03-28)
# Unified sysid: cost 0.925 (28 params, 0.2s windows, amplitude 200)
# Tuned robot config — generated by src.sysid.optimize
urdf: rotary_cartpole_tuned.urdf
urdf: rotary_cartpole.urdf
actuators:
- joint: motor_joint
type: motor
gear: [0.846499, 1.183733] # torque constant [pos, neg]
ctrl_range: [-0.686251, 0.686251] # effective control bound
deadzone: [0.181097, 0.202072] # L298N min |ctrl| for torque [pos, neg]
damping: [0.013165, 0.015452] # viscous damping [pos, neg]
frictionloss: [0.014244, 0.001005] # Coulomb friction [pos, neg]
filter_tau: 0.096263 # 1st-order actuator filter
stribeck_friction_boost: 0.068594 # low-velocity friction boost
stribeck_vel: 5.279594 # Stribeck velocity scale (rad/s)
action_bias: 0.056566 # asymmetric bias offset
gear: [0.424182, 0.425031] # torque constant [pos, neg] (motor sysid)
ctrl_range: [-0.592, 0.592] # effective control bound (sysid-tuned)
deadzone: [0.141291, 0.078015] # L298N min |ctrl| for torque [pos, neg]
damping: [0.002027, 0.014665] # viscous damping [pos, neg]
frictionloss: [0.057328, 0.053355] # Coulomb friction [pos, neg]
filter_tau: 0.005035 # 1st-order actuator filter (motor sysid)
viscous_quadratic: 0.000285 # velocity² drag
back_emf_gain: 0.006758 # back-EMF torque reduction
joints:
motor_joint:
armature: 0.001676 # reflected rotor inertia
armature: 0.002773 # reflected rotor inertia (motor sysid)
frictionloss: 0.0 # handled by motor model via qfrc_applied
pendulum_joint:
damping: 1.2e-05 # pendulum pivot damping
frictionloss: 7.2e-05 # pendulum pivot friction
damping: 0.000119
frictionloss: 1.0e-05

View File

@@ -1,12 +1,18 @@
max_steps: 1000
robot_path: assets/rotary_cartpole
reward_upright_scale: 1.0
alive_bonus: 0.25 # per-step survival bonus (living must beat dying)
balance_bonus: 2.0 # extra reward for upright AND still (beats spinning)
balance_vel_scale: 0.5 # how fast the balance bonus decays with pendulum speed
# ── Regularisation penalties (prevent fast spinning) ─────────────────
motor_vel_penalty: 0.01 # penalise high motor angular velocity
motor_angle_penalty: 0.05 # penalise deviation from centre
action_penalty: 0.05 # penalise large actions (energy cost)
# ── Initial state randomisation ──────────────────────────────────────
pendulum_init_range_deg: 180.0 # pendulum starts in [-180°, +180°]
# ── Software safety limit (env-level, always applied) ────────────────
motor_angle_limit_deg: 90.0 # terminate episode if motor exceeds ±90°
@@ -16,4 +22,5 @@ hpo:
motor_vel_penalty: {min: 0.001, max: 0.1}
motor_angle_penalty: {min: 0.01, max: 0.2}
action_penalty: {min: 0.01, max: 0.2}
pendulum_init_range_deg: {min: 30.0, max: 180.0}
max_steps: {values: [500, 1000, 2000]}

View File

@@ -3,3 +3,15 @@ device: auto # auto = cuda if available, else cpu
dt: 0.002
substeps: 10
history_length: 10 # RMA-style: 10-step window of (obs, action) pairs
rma_mode: "none" # "none" | "teacher" | "deploy"
# ── Domain randomization (sim-to-real) ──────────────────────────────
# NOTE: action-delay and sensor-noise are applied for MJX, but the
# per-env dynamics *scales* (friction/damping/torque) are NOT yet wired
# into the JIT step — use runner=mujoco for scale randomization, or keep
# this block to delay+noise only on MJX.
domain_rand:
qpos_noise_std: 0.01 # rad — encoder angle noise
qvel_noise_std: 0.5 # rad/s — velocity-estimate noise (measured)
action_delay_steps: [0, 2] # control-step latency (040 ms)

View File

@@ -2,13 +2,17 @@ num_envs: 64
device: auto # auto = cuda if available, else cpu
dt: 0.002
substeps: 10
history_length: 10 # RMA-style: 10-step window of (obs, action) pairs
history_length: 10 # must match training.history_length (DR + embedding)
# ── Sim2real: domain randomization ───────────────────────────────
rma_mode: "none" # "none" | "teacher" | "deploy"
# ── Domain randomization (sim-to-real) ──────────────────────────────
# Noise/delay levels anchored to the real recordings (~50 Hz, ~0.5 rad/s
# velocity noise, ≤1-step latency). Set domain_rand: {} to disable.
domain_rand:
mass_frac: 0.15 # ±15% body mass randomization
friction_frac: 0.3 # ±30% joint friction
damping_frac: 0.3 # ±30% joint damping
armature_frac: 0.2 # ±20% reflected rotor inertia
gear_frac: 0.15 # ±15% actuator gear ratio
com_offset: 0.005 # ±5mm center-of-mass shift
qpos_noise_std: 0.01 # rad — encoder angle noise
qvel_noise_std: 0.5 # rad/s — velocity-estimate noise (measured)
action_delay_steps: [0, 2] # control-step latency (040 ms)
friction_scale: [0.6, 1.6] # Coulomb-friction multiplier
damping_scale: [0.6, 1.6] # viscous-damping multiplier
torque_scale: [0.85, 1.15] # motor-constant / battery-voltage variation

View File

@@ -6,3 +6,12 @@ device: cpu
dt: 0.002
substeps: 10
history_length: 10
rma_mode: "none" # "none" | "teacher" | "deploy"
# Clean by default (deterministic eval). Confirming-experiment example —
# re-eval an existing checkpoint in sim with a fixed 1-step action delay:
# mjpython scripts/eval.py env=rotary_cartpole runner=mujoco_single \
# checkpoint=runs/.../agent_XXXX.pt \
# '++runner.domain_rand.action_delay_steps=[1,1]'
domain_rand: {}

View File

@@ -9,3 +9,5 @@ baud: 115200
dt: 0.02 # control loop period (50 Hz, matches training)
no_data_timeout: 2.0 # seconds of silence before declaring disconnect
history_length: 10 # must match training runner
rma_mode: "none" # "none" | "teacher" | "deploy"

View File

@@ -1,19 +1,19 @@
hidden_sizes: [128, 128]
hidden_sizes: [256, 256]
total_timesteps: 5000000
rollout_steps: 1024
learning_epochs: 4
mini_batches: 4
rollout_steps: 2048
learning_epochs: 10
mini_batches: 8
discount_factor: 0.99
gae_lambda: 0.95
learning_rate: 0.0003
clip_ratio: 0.2
value_loss_scale: 0.5
entropy_loss_scale: 0.05
entropy_loss_scale: 0.01
log_interval: 1000
checkpoint_interval: 50000
initial_log_std: 0.5
min_log_std: -2.0
initial_log_std: -0.5
min_log_std: -4.0
max_log_std: 2.0
record_video_every: 10000
@@ -22,6 +22,10 @@ record_video_every: 10000
history_length: 10 # temporal window (must match runner)
embedding_dim: 32 # history encoder output dimension
# RMA (Rapid Motor Adaptation)
rma_mode: "none" # "none" | "teacher" | "deploy"
latent_dim: 8 # env encoder / adaptation latent dimension
# ClearML remote execution (GPU worker)
remote: false

View File

@@ -74,14 +74,33 @@ def _infer_hidden_sizes(state_dict: dict[str, torch.Tensor]) -> tuple[int, ...]:
return tuple(sizes)
def _infer_encoder_out_dim(state_dict: dict[str, torch.Tensor]) -> int | None:
"""Return the history/adaptation encoder output dim, if present.
Lets eval reconstruct an embedding policy without knowing the training
embedding_dim/latent_dim — read it straight from the saved weights.
"""
for key in ("history_encoder.fc.weight", "adaptation_module.fc.weight"):
if key in state_dict:
return state_dict[key].shape[0]
return None
def load_policy(
checkpoint_path: str,
observation_space: spaces.Space,
action_space: spaces.Space,
device: torch.device = torch.device("cpu"),
history_length: int = 0,
rma_mode: str = "none",
raw_obs_dim: int = 0,
) -> tuple[SharedMLP, RunningStandardScaler]:
"""Load a trained SharedMLP + observation normalizer from a checkpoint.
For DR + history-embedding policies (history_length > 0) or RMA deploy
policies (rma_mode="deploy"), the history/adaptation encoder must be
reconstructed too — its output dim is read back from the saved weights.
Returns:
(model, state_preprocessor) ready for inference.
"""
@@ -89,13 +108,20 @@ def load_policy(
# Infer architecture from saved weights.
hidden_sizes = _infer_hidden_sizes(ckpt["policy"])
enc_out = _infer_encoder_out_dim(ckpt["policy"])
# Reconstruct model.
# Reconstruct model — pass through the encoder config so a DR+embedding
# checkpoint rebuilds the history encoder with matching dimensions.
model = SharedMLP(
observation_space=observation_space,
action_space=action_space,
device=device,
hidden_sizes=hidden_sizes,
history_length=history_length,
rma_mode=rma_mode,
raw_obs_dim=raw_obs_dim,
embedding_dim=enc_out or 32, # legacy "none" + history
latent_dim=enc_out or 8, # RMA deploy adaptation module
)
model.load_state_dict(ckpt["policy"])
model.eval()
@@ -194,7 +220,10 @@ def _eval_sim(cfg: DictConfig, env_name: str, checkpoint_path: str) -> None:
device = runner.device
model, preprocessor = load_policy(
checkpoint_path, runner.observation_space, runner.action_space, device
checkpoint_path, runner.observation_space, runner.action_space, device,
history_length=runner.config.history_length,
rma_mode=runner.config.rma_mode,
raw_obs_dim=runner.env.observation_space.shape[0],
)
mj_model = runner._model
@@ -280,7 +309,10 @@ def _eval_serial(cfg: DictConfig, env_name: str, checkpoint_path: str) -> None:
device = serial_runner.device
model, preprocessor = load_policy(
checkpoint_path, serial_runner.observation_space, serial_runner.action_space, device
checkpoint_path, serial_runner.observation_space, serial_runner.action_space, device,
history_length=serial_runner.config.history_length,
rma_mode=serial_runner.config.rma_mode,
raw_obs_dim=serial_runner.env.observation_space.shape[0],
)
# Set up digital-twin MuJoCo model for visualization.

296
scripts/train_adaptation.py Normal file
View File

@@ -0,0 +1,296 @@
"""RMA Phase 2: Train the adaptation module φ(history) → ẑ.
Loads a Phase 1 (teacher) checkpoint, freezes the backbone + env_encoder,
and trains a HistoryEncoder (adaptation module) to predict the teacher's
latent z from observation-action history using supervised MSE.
Usage:
python scripts/train_adaptation.py \
--checkpoint runs/<run>/checkpoints/agent_XXXXX.pt \
--env rotary_cartpole \
--robot-path assets/rotary_cartpole \
--num-envs 64 \
--iterations 2000 \
--lr 3e-4
"""
import argparse
import pathlib
import sys
_PROJECT_ROOT = str(pathlib.Path(__file__).resolve().parent.parent)
if _PROJECT_ROOT not in sys.path:
sys.path.insert(0, _PROJECT_ROOT)
import structlog
import torch
import tqdm
from gymnasium import spaces
from omegaconf import OmegaConf
from src.core.registry import build_env
from src.models.mlp import SharedMLP, EnvironmentEncoder, HistoryEncoder
from src.runners.mujoco import MuJoCoRunner, MuJoCoRunnerConfig
log = structlog.get_logger()
def _load_teacher_checkpoint(
path: str, obs_space: spaces.Space, act_space: spaces.Space,
device: torch.device, raw_obs_dim: int, mu_dim: int,
hidden_sizes: tuple[int, ...], latent_dim: int,
) -> SharedMLP:
"""Reconstruct the teacher SharedMLP and load saved weights."""
model = SharedMLP(
observation_space=obs_space,
action_space=act_space,
device=device,
hidden_sizes=hidden_sizes,
rma_mode="teacher",
raw_obs_dim=raw_obs_dim,
mu_dim=mu_dim,
latent_dim=latent_dim,
)
ckpt = torch.load(path, map_location=device, weights_only=True)
# skrl saves under "policy" key with state_dict.
if "policy" in ckpt:
model.load_state_dict(ckpt["policy"])
else:
model.load_state_dict(ckpt)
return model
def _build_deploy_model(
teacher: SharedMLP,
obs_space: spaces.Space,
act_space: spaces.Space,
device: torch.device,
raw_obs_dim: int,
history_length: int,
hidden_sizes: tuple[int, ...],
latent_dim: int,
) -> SharedMLP:
"""Create a deploy-mode SharedMLP and copy backbone + heads from teacher."""
model = SharedMLP(
observation_space=obs_space,
action_space=act_space,
device=device,
hidden_sizes=hidden_sizes,
rma_mode="deploy",
raw_obs_dim=raw_obs_dim,
history_length=history_length,
latent_dim=latent_dim,
)
# Copy backbone, policy head, value head from teacher.
model.net.load_state_dict(teacher.net.state_dict())
model.mean_layer.load_state_dict(teacher.mean_layer.state_dict())
model.value_layer.load_state_dict(teacher.value_layer.state_dict())
model.log_std_parameter.data.copy_(teacher.log_std_parameter.data)
return model
def main() -> None:
parser = argparse.ArgumentParser(description="RMA Phase 2: train adaptation module")
parser.add_argument("--checkpoint", required=True, help="Path to Phase 1 teacher checkpoint")
parser.add_argument("--env", default="rotary_cartpole")
parser.add_argument("--robot-path", default="assets/rotary_cartpole")
parser.add_argument("--num-envs", type=int, default=64)
parser.add_argument("--iterations", type=int, default=2000)
parser.add_argument("--rollout-steps", type=int, default=256, help="Steps per rollout")
parser.add_argument("--lr", type=float, default=3e-4)
parser.add_argument("--latent-dim", type=int, default=8)
parser.add_argument("--hidden-sizes", type=int, nargs="+", default=[128, 128])
parser.add_argument("--history-length", type=int, default=10)
parser.add_argument("--output", default="checkpoints/adaptation.pt")
parser.add_argument("--device", default="cpu")
args = parser.parse_args()
device = torch.device(args.device)
hidden_sizes = tuple(args.hidden_sizes)
# ── Build env + runner (deploy mode with history + DR) ───────
env_cfg = OmegaConf.create({"env": {
"robot_path": args.robot_path,
}})
env = build_env(args.env, env_cfg)
runner_cfg = MuJoCoRunnerConfig(
num_envs=args.num_envs,
device=args.device,
history_length=args.history_length,
rma_mode="deploy",
domain_rand={
"qpos_noise_std": 0.01,
"qvel_noise_std": 0.5,
"action_delay_steps": [0, 2],
"friction_scale": [0.6, 1.6],
"damping_scale": [0.6, 1.6],
"torque_scale": [0.85, 1.15],
},
)
runner = MuJoCoRunner(env=env, config=runner_cfg)
raw_obs_dim = env.observation_space.shape[0]
act_dim = env.action_space.shape[0]
mu_dim = runner.privileged_dim
log.info(
"adaptation_setup",
raw_obs_dim=raw_obs_dim,
act_dim=act_dim,
mu_dim=mu_dim,
latent_dim=args.latent_dim,
history_length=args.history_length,
)
# ── Load teacher & build deploy model ────────────────────────
# Teacher obs space: [raw_obs, μ]
teacher_obs_space = spaces.Box(
low=-torch.inf, high=torch.inf, shape=(raw_obs_dim + mu_dim,),
)
teacher = _load_teacher_checkpoint(
path=args.checkpoint,
obs_space=teacher_obs_space,
act_space=env.action_space,
device=device,
raw_obs_dim=raw_obs_dim,
mu_dim=mu_dim,
hidden_sizes=hidden_sizes,
latent_dim=args.latent_dim,
)
teacher.eval()
for p in teacher.parameters():
p.requires_grad_(False)
# Deploy obs space: [raw_obs, history_flat]
step_dim = raw_obs_dim + act_dim
deploy_obs_space = spaces.Box(
low=-torch.inf, high=torch.inf,
shape=(raw_obs_dim + args.history_length * step_dim,),
)
deploy_model = _build_deploy_model(
teacher=teacher,
obs_space=deploy_obs_space,
act_space=env.action_space,
device=device,
raw_obs_dim=raw_obs_dim,
history_length=args.history_length,
hidden_sizes=hidden_sizes,
latent_dim=args.latent_dim,
)
# Freeze everything except the adaptation module.
for name, param in deploy_model.named_parameters():
if "adaptation_module" not in name:
param.requires_grad_(False)
optimizer = torch.optim.Adam(
deploy_model.adaptation_module.parameters(), lr=args.lr,
)
# ── Training loop ────────────────────────────────────────────
log.info("starting_adaptation_training", iterations=args.iterations)
obs, _ = runner.reset()
for iteration in tqdm.tqdm(range(args.iterations), desc="Adaptation"):
# Collect a rollout using the deploy model.
z_targets: list[torch.Tensor] = []
z_preds: list[torch.Tensor] = []
for _step in range(args.rollout_steps):
with torch.no_grad():
# Get action from deploy model (uses adaptation module).
aug_obs = obs # already augmented by runner
actions = deploy_model.act(
{"states": aug_obs}, role="policy",
)[0]
obs, _, _, _, info = runner.step(actions)
# Compute teacher's z from privileged μ.
mu = info.get("privileged_obs")
if mu is not None:
z_target = teacher.env_encoder(mu)
z_targets.append(z_target)
# Compute adaptation module's ẑ from history.
raw = aug_obs[:, :raw_obs_dim]
hist_flat = aug_obs[:, raw_obs_dim:]
history = hist_flat.reshape(
-1, args.history_length, step_dim,
)
z_pred = deploy_model.adaptation_module(history)
z_preds.append(z_pred)
if not z_targets:
continue
# Supervised update on adaptation module.
z_target_batch = torch.cat(z_targets, dim=0).detach()
z_pred_batch = torch.cat(z_preds, dim=0)
# Re-compute z_pred with gradients (the ones above were no_grad).
# We need to re-encode from stored data; instead, collect with grad:
# Actually, z_preds were computed in no_grad. Let me re-collect
# a fresh batch with gradients.
obs_reset, _ = runner.reset()
obs = obs_reset
z_targets_grad: list[torch.Tensor] = []
z_preds_grad: list[torch.Tensor] = []
for _step in range(args.rollout_steps):
with torch.no_grad():
aug_obs = obs
actions = deploy_model.act(
{"states": aug_obs}, role="policy",
)[0]
obs, _, _, _, info = runner.step(actions)
mu = info.get("privileged_obs")
if mu is not None:
with torch.no_grad():
z_target = teacher.env_encoder(mu)
z_targets_grad.append(z_target)
# This time, compute z_pred WITH gradients.
raw = aug_obs[:, :raw_obs_dim]
hist_flat = aug_obs[:, raw_obs_dim:]
history = hist_flat.reshape(
-1, args.history_length, step_dim,
)
z_pred = deploy_model.adaptation_module(history)
z_preds_grad.append(z_pred)
if not z_targets_grad:
continue
z_target_all = torch.cat(z_targets_grad, dim=0).detach()
z_pred_all = torch.cat(z_preds_grad, dim=0)
loss = torch.nn.functional.mse_loss(z_pred_all, z_target_all)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if iteration % 50 == 0:
log.info("adaptation_loss", iteration=iteration, mse=f"{loss.item():.6f}")
# ── Save adaptation weights ──────────────────────────────────
out_path = pathlib.Path(args.output)
out_path.parent.mkdir(parents=True, exist_ok=True)
# Save the full deploy model state dict.
torch.save(deploy_model.state_dict(), out_path)
log.info("adaptation_saved", path=str(out_path))
runner.close()
if __name__ == "__main__":
main()

View File

@@ -46,12 +46,11 @@ class ActuatorConfig:
deadzone: tuple[float, float] = (0.0, 0.0) # min |ctrl| for torque (pos, neg)
damping: tuple[float, float] = (0.0, 0.0) # viscous damping (pos, neg)
frictionloss: tuple[float, float] = (0.0, 0.0) # Coulomb friction (pos, neg)
stribeck_friction_boost: float = 0.0 # extra friction at low speed (fraction)
stribeck_vel: float = 2.0 # Stribeck velocity scale (rad/s)
action_bias: float = 0.0 # constant bias on action (H-bridge asymmetry)
kp: float = 0.0 # proportional gain (position / velocity actuators)
kv: float = 0.0 # derivative gain (position actuators)
filter_tau: float = 0.0 # 1st-order filter time constant (s); 0 = no filter
viscous_quadratic: float = 0.0 # velocity² drag coefficient
back_emf_gain: float = 0.0 # back-EMF torque reduction
@property
def gear_avg(self) -> float:
@@ -65,15 +64,12 @@ class ActuatorConfig:
or self.deadzone != (0.0, 0.0)
or self.damping != (0.0, 0.0)
or self.frictionloss != (0.0, 0.0)
or self.stribeck_friction_boost > 0
or self.action_bias != 0.0
or self.viscous_quadratic > 0
or self.back_emf_gain > 0
)
def transform_ctrl(self, ctrl: float) -> float:
"""Apply bias, asymmetric deadzone, and gear compensation."""
# Action bias (H-bridge asymmetry)
ctrl = ctrl + self.action_bias
"""Apply asymmetric deadzone and gear compensation to a scalar ctrl."""
# Deadzone
dz_pos, dz_neg = self.deadzone
if ctrl >= 0 and ctrl < dz_pos:
@@ -89,23 +85,35 @@ class ActuatorConfig:
return ctrl
def compute_motor_force(self, vel: float, ctrl: float) -> float:
"""Asymmetric friction + Stribeck + damping → applied torque."""
def compute_motor_force(self, vel: float, ctrl: float,
friction_scale: float = 1.0,
damping_scale: float = 1.0) -> float:
"""Asymmetric friction, damping, drag, back-EMF → applied torque.
``friction_scale`` / ``damping_scale`` multiply the Coulomb-friction
and viscous-damping terms for per-env domain randomization
(1.0 = no randomization, the default used by sysid).
"""
torque = 0.0
# Coulomb friction (direction-dependent + Stribeck boost)
# Coulomb friction (direction-dependent)
fl_pos, fl_neg = self.frictionloss
if abs(vel) > 1e-6:
fl = fl_pos if vel > 0 else fl_neg
if self.stribeck_friction_boost > 0 and self.stribeck_vel > 0:
fl = fl * (1.0 + self.stribeck_friction_boost
* math.exp(-abs(vel) / self.stribeck_vel))
fl = (fl_pos if vel > 0 else fl_neg) * friction_scale
torque -= math.copysign(fl, vel)
# Viscous damping (direction-dependent)
damp = self.damping[0] if vel > 0 else self.damping[1]
damp = (self.damping[0] if vel > 0 else self.damping[1]) * damping_scale
torque -= damp * vel
# Quadratic velocity drag
if self.viscous_quadratic > 0:
torque -= self.viscous_quadratic * vel * abs(vel)
# Back-EMF torque reduction
if self.back_emf_gain > 0 and abs(ctrl) > 1e-6:
torque -= self.back_emf_gain * vel * math.copysign(1.0, ctrl)
return max(-10.0, min(10.0, torque))
def transform_action(self, action):

View File

@@ -15,6 +15,18 @@ class BaseRunnerConfig:
num_envs: int = 1
device: str = "cpu"
history_length: int = 0 # 0 = no history (single obs), >0 = RMA-style
rma_mode: str = "none" # "none" | "teacher" | "deploy"
# ── Domain randomization (sim-to-real) ─────────────────────────
# Empty dict = disabled (every field below is a no-op). Supported keys:
# qpos_noise_std: float — Gaussian sensor noise on joint angles (rad)
# qvel_noise_std: float — Gaussian sensor noise on joint velocities (rad/s)
# action_delay_steps: [lo, hi] — per-env integer control-step latency
# friction_scale: [lo, hi] — per-env multiplier on Coulomb friction
# damping_scale: [lo, hi] — per-env multiplier on viscous damping
# torque_scale: [lo, hi] — per-env multiplier on applied motor torque
# The randomized factors are exposed as privileged_obs (μ) for RMA.
domain_rand: dict = dataclasses.field(default_factory=dict)
class BaseRunner(abc.ABC, Generic[T]):
def __init__(self, env: BaseEnv, config: T) -> None:
@@ -37,7 +49,15 @@ class BaseRunner(abc.ABC, Generic[T]):
self.config.num_envs, dtype=torch.long, device=self.config.device
)
# ── History buffer (RMA-style adaptation) ───────────────
# ── Domain randomization (latency / sensor noise / dynamics)
# Must precede the RMA block: teacher mode reads privileged_dim,
# which is derived from the randomized-factor count below.
self._setup_domain_rand()
# ── RMA mode ────────────────────────────────────────────
self._rma_mode: str = getattr(self.config, "rma_mode", "none")
# ── History buffer (used in "deploy" and legacy "none" modes) ─
self._history_len: int = getattr(self.config, "history_length", 0)
if self._history_len > 0:
obs_dim = self.observation_space.shape[0]
@@ -50,11 +70,30 @@ class BaseRunner(abc.ABC, Generic[T]):
self.config.num_envs, self._history_len, self._history_step_dim,
device=self.config.device,
)
# Augmented observation space: [current_obs, history_flat]
from gymnasium import spaces
aug_dim = obs_dim + self._history_len * self._history_step_dim
# ── Observation space augmentation ───────────────────────
from gymnasium import spaces
raw_obs_dim = self.observation_space.shape[0]
if self._rma_mode == "teacher":
# Teacher gets [raw_obs, μ]. μ dim resolved after _sim_initialize.
mu_dim = getattr(self, "privileged_dim", 0)
if mu_dim > 0:
aug_dim = raw_obs_dim + mu_dim
self.observation_space = spaces.Box(
low=-torch.inf, high=torch.inf, shape=(aug_dim,),
)
elif self._rma_mode == "deploy" and self._history_len > 0:
# Deploy gets [raw_obs, history_flat].
aug_dim = raw_obs_dim + self._history_len * self._history_step_dim
self.observation_space = spaces.Box(
low=-torch.inf, high=torch.inf, shape=(aug_dim,)
low=-torch.inf, high=torch.inf, shape=(aug_dim,),
)
elif self._rma_mode == "none" and self._history_len > 0:
# Legacy mode: [raw_obs, history_flat].
aug_dim = raw_obs_dim + self._history_len * self._history_step_dim
self.observation_space = spaces.Box(
low=-torch.inf, high=torch.inf, shape=(aug_dim,),
)
@property
@@ -84,11 +123,139 @@ class BaseRunner(abc.ABC, Generic[T]):
if hasattr(self, "_offscreen_renderer") and self._offscreen_renderer is not None:
self._offscreen_renderer.close()
# ── Domain randomization ─────────────────────────────────────
_SCALE_FIELDS = ("friction_scale", "damping_scale", "torque_scale")
def _setup_domain_rand(self) -> None:
"""Parse the domain_rand config into per-env buffers + the μ layout.
All buffers are no-ops when ``domain_rand`` is empty: scales are 1.0,
delay is 0, noise std is 0, and privileged_dim is 0.
"""
dr = dict(getattr(self.config, "domain_rand", {}) or {})
n = self.config.num_envs
dev = self.config.device
# Fixed (not per-env) Gaussian sensor noise.
self._qpos_noise_std = float(dr.get("qpos_noise_std", 0.0))
self._qvel_noise_std = float(dr.get("qvel_noise_std", 0.0))
# Per-env multiplicative dynamics scales (applied by the sim runner).
self._dr_scales: dict[str, torch.Tensor] = {
f: torch.ones(n, device=dev) for f in self._SCALE_FIELDS
}
# Per-env integer action delay (in control steps).
self._dr_delay = torch.zeros(n, dtype=torch.long, device=dev)
# Spec list — its order fixes the privileged μ vector layout.
self._dr_specs: list[tuple[str, float, float]] = []
delay_range = dr.get("action_delay_steps")
if delay_range:
self._dr_specs.append(
("action_delay_steps", float(delay_range[0]), float(delay_range[1]))
)
self._max_delay = int(delay_range[1])
else:
self._max_delay = 0
for f in self._SCALE_FIELDS:
rng = dr.get(f)
if rng:
self._dr_specs.append((f, float(rng[0]), float(rng[1])))
self._mu_dim = len(self._dr_specs)
self._dr_mu = torch.zeros(n, self._mu_dim, device=dev)
# Action-delay ring buffer: (num_envs, max_delay + 1, act_dim).
if self._max_delay > 0:
act_dim = self.env.action_space.shape[0]
self._action_buf = torch.zeros(
n, self._max_delay + 1, act_dim, device=dev,
)
def _resample_domain_rand(self, env_ids: torch.Tensor) -> None:
"""Sample fresh per-env DR factors (call on every (re)set)."""
if self._mu_dim == 0 or env_ids.numel() == 0:
return
dev = self.config.device
for j, (name, lo, hi) in enumerate(self._dr_specs):
if name == "action_delay_steps":
vals = torch.randint(
int(lo), int(hi) + 1, (env_ids.numel(),), device=dev,
)
self._dr_delay[env_ids] = vals
raw = vals.float()
else:
vals = torch.rand(env_ids.numel(), device=dev) * (hi - lo) + lo
self._dr_scales[name][env_ids] = vals
raw = vals
# Normalize each factor to ~[-1, 1] for the privileged μ vector.
span = (hi - lo) if (hi - lo) > 1e-9 else 1.0
self._dr_mu[env_ids, j] = 2.0 * (raw - lo) / span - 1.0
def _reset_action_buffer(self, env_ids: torch.Tensor) -> None:
if self._max_delay > 0:
self._action_buf[env_ids] = 0.0
def _apply_action_delay(self, actions: torch.Tensor) -> torch.Tensor:
"""Return the per-env delayed action that the simulator should apply.
The policy's commanded action is what gets stored in history; only
the action handed to ``_sim_step`` is delayed.
"""
if self._max_delay <= 0:
return actions
self._action_buf = torch.roll(self._action_buf, 1, dims=1)
self._action_buf[:, 0] = actions
idx = torch.arange(self.num_envs, device=self.device)
return self._action_buf[idx, self._dr_delay]
def _add_sensor_noise(
self, qpos: torch.Tensor, qvel: torch.Tensor,
) -> tuple[torch.Tensor, torch.Tensor]:
if self._qpos_noise_std > 0:
qpos = qpos + torch.randn_like(qpos) * self._qpos_noise_std
if self._qvel_noise_std > 0:
qvel = qvel + torch.randn_like(qvel) * self._qvel_noise_std
return qpos, qvel
def _compute_obs(self, qpos: torch.Tensor, qvel: torch.Tensor) -> torch.Tensor:
"""Observation the policy sees — built from noisy (sensor) state."""
nqpos, nqvel = self._add_sensor_noise(qpos, qvel)
return self.env.compute_observations(self.env.build_state(nqpos, nqvel))
# ── Privileged observation interface ─────────────────────────
@property
def privileged_dim(self) -> int:
"""Number of randomized DR factors exposed as μ (0 if DR disabled)."""
return getattr(self, "_mu_dim", 0)
@property
def privileged_obs(self) -> torch.Tensor:
"""Per-env normalized DR factors μ ∈ [-1, 1] for RMA supervision."""
if getattr(self, "_mu_dim", 0) > 0:
return self._dr_mu
return torch.zeros(self.config.num_envs, 0, device=self.config.device)
# ── Observation augmentation ─────────────────────────────────
def _augment_obs(self, obs: torch.Tensor) -> torch.Tensor:
"""Concatenate history buffer to current obs if history is enabled."""
"""Augment raw obs based on RMA mode.
teacher: [raw_obs, μ]
deploy: [raw_obs, history_flat]
none: [raw_obs, history_flat] (legacy, or plain obs if no history)
"""
if self._rma_mode == "teacher":
mu = self.privileged_obs
if mu.shape[-1] > 0:
return torch.cat([obs, mu], dim=-1)
return obs
# deploy / none: concatenate history
if self._history_len <= 0:
return obs
# Flatten history: (num_envs, H, step_dim) → (num_envs, H * step_dim)
hist_flat = self._history_buf.reshape(obs.shape[0], -1)
return torch.cat([obs, hist_flat], dim=-1)
@@ -115,30 +282,41 @@ class BaseRunner(abc.ABC, Generic[T]):
def reset(self) -> tuple[torch.Tensor, dict[str, Any]]:
all_ids = torch.arange(self.num_envs, device=self.device)
self._resample_domain_rand(all_ids)
self._reset_action_buffer(all_ids)
qpos, qvel = self._sim_reset(all_ids)
self.step_counts.zero_()
self._reset_history(all_ids)
state = self.env.build_state(qpos, qvel)
obs = self.env.compute_observations(state)
obs = self._compute_obs(qpos, qvel)
return self._augment_obs(obs), {}
def step(self, actions: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor, dict[str, Any]]:
self._last_actions = actions
qpos, qvel = self._sim_step(actions)
# Latency: the simulator applies a (per-env) delayed action.
sim_actions = self._apply_action_delay(actions)
qpos, qvel = self._sim_step(sim_actions)
self.step_counts += 1
state = self.env.build_state(qpos, qvel)
obs = self.env.compute_observations(state)
rewards = self.env.compute_rewards(state, actions)
terminated = self.env.compute_terminations(state)
# Reward / termination use the TRUE state (no sensor noise) so the
# learning signal and safety checks stay clean.
clean_state = self.env.build_state(qpos, qvel)
rewards = self.env.compute_rewards(clean_state, actions)
terminated = self.env.compute_terminations(clean_state)
truncated = self.env.compute_truncations(self.step_counts)
# The observation the policy sees is built from the NOISY sensor state.
obs = self._compute_obs(qpos, qvel)
# Push current (obs, action) into history before augmenting.
self._push_history(obs, actions)
info: dict[str, Any] = {}
# Expose privileged DR params μ for RMA supervision (this step's dynamics).
if self.privileged_dim > 0:
info["privileged_obs"] = self.privileged_obs.clone()
done = terminated | truncated
done_ids = done.nonzero(as_tuple=False).squeeze(-1)
@@ -146,12 +324,14 @@ class BaseRunner(abc.ABC, Generic[T]):
info["final_observations"] = self._augment_obs(obs)[done_ids].clone()
info["final_env_ids"] = done_ids.clone()
# New episode → fresh dynamics + cleared latency buffer.
self._resample_domain_rand(done_ids)
self._reset_action_buffer(done_ids)
reset_qpos, reset_qvel = self._sim_reset(done_ids)
self.step_counts[done_ids] = 0
self._reset_history(done_ids)
reset_state = self.env.build_state(reset_qpos, reset_qvel)
obs[done_ids] = self.env.compute_observations(reset_state)
obs[done_ids] = self._compute_obs(reset_qpos, reset_qvel)
# skrl expects (num_envs, 1) for rewards/terminated/truncated
return self._augment_obs(obs), rewards.unsqueeze(-1), terminated.unsqueeze(-1), truncated.unsqueeze(-1), info

View File

@@ -21,11 +21,17 @@ class RotaryCartPoleConfig(BaseEnvConfig):
at the arm tip. Goal: swing the pendulum up and balance it upright.
"""
# Reward shaping
reward_upright_scale: float = 1.0 # cos(pendulum) when upright = +1
reward_upright_scale: float = 1.0 # upright reward ∈ [0, scale]
alive_bonus: float = 0.25 # per-step survival bonus (must stay alive > die)
balance_bonus: float = 2.0 # extra reward for upright AND still (beats spinning)
balance_vel_scale: float = 0.5 # decay rate of the bonus with pendulum speed
motor_vel_penalty: float = 0.01 # penalise high motor angular velocity
motor_angle_penalty: float = 0.05 # penalise deviation from centre
action_penalty: float = 0.05 # penalise large actions (energy cost)
# ── Initial state randomisation ──
pendulum_init_range_deg: float = 180.0 # pendulum starts in [-range, +range]
# ── Software safety limit (env-level, on top of URDF + hardware) ──
motor_angle_limit_deg: float = 90.0 # terminate episode if exceeded
@@ -82,8 +88,23 @@ class RotaryCartPoleEnv(BaseEnv[RotaryCartPoleConfig]):
# ── Rewards ──────────────────────────────────────────────────
def compute_rewards(self, state: RotaryCartPoleState, actions: torch.Tensor) -> torch.Tensor:
# Upright reward: -cos(θ) ∈ [-1, +1]
reward = -torch.cos(state.pendulum_angle) * self.config.reward_upright_scale
# Upright shaping ∈ [0, 1]: 0 hanging down (θ=0), 1 fully upright (θ=π).
# Non-negative by design so *surviving* always beats ending the episode early
# (otherwise the optimum is to slam the arm into the ±limit — "suicide policy").
upright = 0.5 * (1.0 - torch.cos(state.pendulum_angle))
# Balanced bonus: large ONLY when near the top AND nearly still. A freely
# spinning pendulum passes through the top at high speed, so stillness≈0 and
# it earns ~none of this — making true balancing strictly dominate the
# "just keep spinning in full loops" local optimum.
stillness = torch.exp(-self.config.balance_vel_scale * state.pendulum_vel.pow(2))
balance = self.config.balance_bonus * upright * stillness
# Per-step alive bonus keeps a not-yet-upright step net-positive despite
# penalties, so the 10 termination penalty is genuinely a deterrent.
reward = (upright * self.config.reward_upright_scale
+ balance
+ self.config.alive_bonus)
# Penalise fast motor spinning (discourages violent oscillation)
reward = reward - self.config.motor_vel_penalty * state.motor_vel.pow(2)
@@ -97,7 +118,7 @@ class RotaryCartPoleEnv(BaseEnv[RotaryCartPoleConfig]):
# Penalty for exceeding motor angle limit (episode also terminates)
limit_rad = math.radians(self.config.motor_angle_limit_deg)
exceeded = state.motor_angle.abs() >= limit_rad
reward = torch.where(exceeded, torch.tensor(-1000.0, device=reward.device), reward)
reward = torch.where(exceeded, torch.tensor(-10.0, device=reward.device), reward)
return reward

View File

@@ -11,9 +11,8 @@ class HistoryEncoder(nn.Module):
Output: (batch, embedding_dim)
Architecture: two temporal conv layers → global average pool → linear.
This lets the policy implicitly infer environment parameters (mass,
friction, gear, etc.) from recent dynamics — the core of RMA-style
adaptation for sim2real.
Used as the adaptation module φ in RMA Phase 2 (deploy mode) to
predict the latent ẑ from recent dynamics.
"""
def __init__(
@@ -43,18 +42,95 @@ class HistoryEncoder(nn.Module):
return self.fc(x)
class EnvironmentEncoder(nn.Module):
"""MLP that compresses privileged DR parameters μ into a latent z.
Used in RMA Phase 1 (teacher mode): z = e(μ).
"""
def __init__(self, mu_dim: int, latent_dim: int = 8, hidden: int = 64) -> None:
super().__init__()
self.net = nn.Sequential(
nn.Linear(mu_dim, hidden),
nn.ELU(),
nn.Linear(hidden, latent_dim),
)
def forward(self, mu: torch.Tensor) -> torch.Tensor:
"""mu: (batch, mu_dim) → z: (batch, latent_dim)."""
return self.net(mu)
class SharedMLP(GaussianMixin, DeterministicMixin, Model):
def __init__(self, observation_space: spaces.Space, action_space: spaces.Space, device: torch.device, hidden_sizes: tuple[int, ...] = (32, 32), clip_actions: bool = False, clip_log_std: bool = True, min_log_std: float = -2.0, max_log_std: float = 2.0, initial_log_std: float = 0.0, history_length: int = 0, raw_obs_dim: int = 0, embedding_dim: int = 32):
"""Shared policy/value network with RMA support.
rma_mode:
"none" legacy: optional history encoder, plain obs, backward compat.
"teacher" Phase 1: env_encoder(μ) → z, backbone input = [raw_obs, z].
"deploy" Phase 2+: adaptation_module(history) → ẑ, input = [raw_obs, ẑ].
Teacher and deploy modes produce the same backbone in_dim
(raw_obs_dim + latent_dim), so weights transfer cleanly.
"""
def __init__(
self,
observation_space: spaces.Space,
action_space: spaces.Space,
device: torch.device,
hidden_sizes: tuple[int, ...] = (32, 32),
clip_actions: bool = False,
clip_log_std: bool = True,
min_log_std: float = -2.0,
max_log_std: float = 2.0,
initial_log_std: float = 0.0,
# ── Legacy (none mode) ───────────────────────────────────
history_length: int = 0,
raw_obs_dim: int = 0,
embedding_dim: int = 32,
# ── RMA modes ────────────────────────────────────────────
rma_mode: str = "none",
latent_dim: int = 8,
mu_dim: int = 0,
):
Model.__init__(self, observation_space, action_space, device)
GaussianMixin.__init__(self, clip_actions, clip_log_std, min_log_std, max_log_std)
GaussianMixin.__init__(
self, clip_actions, clip_log_std, min_log_std, max_log_std,
)
DeterministicMixin.__init__(self, clip_actions)
self._rma_mode = rma_mode
self._history_length = history_length
self._raw_obs_dim = raw_obs_dim
self._embedding_dim = embedding_dim
self._latent_dim = latent_dim
self._mu_dim = mu_dim
if history_length > 0 and raw_obs_dim > 0:
# The observation is [current_obs(raw_obs_dim), history_flat(H * step_dim)].
# ── Build encoder + determine backbone in_dim ────────────
self.env_encoder: EnvironmentEncoder | None = None
self.adaptation_module: HistoryEncoder | None = None
self.history_encoder: HistoryEncoder | None = None # legacy
if rma_mode == "teacher":
assert mu_dim > 0 and raw_obs_dim > 0
self.env_encoder = EnvironmentEncoder(
mu_dim=mu_dim, latent_dim=latent_dim,
)
in_dim = raw_obs_dim + latent_dim
elif rma_mode == "deploy":
assert history_length > 0 and raw_obs_dim > 0
act_dim = self.num_actions
step_dim = raw_obs_dim + act_dim
self.adaptation_module = HistoryEncoder(
history_length=history_length,
step_dim=step_dim,
embedding_dim=latent_dim,
)
in_dim = raw_obs_dim + latent_dim
elif history_length > 0 and raw_obs_dim > 0:
# Legacy "none" mode with history encoder.
act_dim = self.num_actions
step_dim = raw_obs_dim + act_dim
self.history_encoder = HistoryEncoder(
@@ -62,12 +138,11 @@ class SharedMLP(GaussianMixin, DeterministicMixin, Model):
step_dim=step_dim,
embedding_dim=embedding_dim,
)
# MLP input = raw obs + history embedding.
in_dim = raw_obs_dim + embedding_dim
else:
self.history_encoder = None
in_dim = self.num_observations
# ── Shared backbone ──────────────────────────────────────
layers: list[nn.Module] = []
for hidden_size in hidden_sizes:
layers.append(nn.Linear(in_dim, hidden_size))
@@ -77,34 +152,54 @@ class SharedMLP(GaussianMixin, DeterministicMixin, Model):
# Policy head
self.mean_layer = nn.Linear(in_dim, self.num_actions)
self.log_std_parameter: nn.Parameter = nn.Parameter(torch.full((self.num_actions,), initial_log_std))
self.log_std_parameter: nn.Parameter = nn.Parameter(
torch.full((self.num_actions,), initial_log_std),
)
# Value head
self.value_layer = nn.Linear(in_dim, 1)
self._shared_output: torch.Tensor | None = None
def act(self, inputs: dict[str, torch.Tensor], role: str = "") -> tuple[torch.Tensor, ...]:
def act(
self, inputs: dict[str, torch.Tensor], role: str = "",
) -> tuple[torch.Tensor, ...]:
if role == "policy":
return GaussianMixin.act(self, inputs, role)
elif role == "value":
return DeterministicMixin.act(self, inputs, role)
def _encode(self, states: torch.Tensor) -> torch.Tensor:
"""Split augmented obs into current obs + history, encode, concat."""
if self.history_encoder is None:
return self.net(states)
obs = states[:, :self._raw_obs_dim]
hist_flat = states[:, self._raw_obs_dim:]
step_dim = self._raw_obs_dim + self.num_actions
history = hist_flat.reshape(-1, self._history_length, step_dim)
embedding = self.history_encoder(history)
return self.net(torch.cat([obs, embedding], dim=-1))
def _encode(self, states: torch.Tensor) -> torch.Tensor:
"""Route through the correct encoder based on rma_mode."""
if self._rma_mode == "teacher":
# states = [raw_obs, μ]
obs = states[:, :self._raw_obs_dim]
mu = states[:, self._raw_obs_dim:]
z = self.env_encoder(mu)
return self.net(torch.cat([obs, z], dim=-1))
if self._rma_mode == "deploy":
# states = [raw_obs, history_flat]
obs = states[:, :self._raw_obs_dim]
hist_flat = states[:, self._raw_obs_dim:]
step_dim = self._raw_obs_dim + self.num_actions
history = hist_flat.reshape(-1, self._history_length, step_dim)
z_hat = self.adaptation_module(history)
return self.net(torch.cat([obs, z_hat], dim=-1))
# Legacy "none" mode.
if self.history_encoder is not None:
obs = states[:, :self._raw_obs_dim]
hist_flat = states[:, self._raw_obs_dim:]
step_dim = self._raw_obs_dim + self.num_actions
history = hist_flat.reshape(-1, self._history_length, step_dim)
embedding = self.history_encoder(history)
return self.net(torch.cat([obs, embedding], dim=-1))
return self.net(states)
def compute(
self, inputs: dict[str, torch.Tensor], role: str = ""
) -> tuple[torch.Tensor, ...]:
self, inputs: dict[str, torch.Tensor], role: str = "",
) -> tuple[torch.Tensor, ...]:
if role == "policy":
self._shared_output = self._encode(inputs["states"])
return self.mean_layer(self._shared_output), self.log_std_parameter, {}

View File

@@ -155,9 +155,8 @@ class MJXRunner(BaseRunner[MJXRunnerConfig]):
_fl_neg = jnp.array([a.frictionloss[1] for a in acts])
_damp_pos = jnp.array([a.damping[0] for a in acts])
_damp_neg = jnp.array([a.damping[1] for a in acts])
_stribeck_boost = jnp.array([a.stribeck_friction_boost for a in acts])
_stribeck_vel = jnp.array([a.stribeck_vel for a in acts])
_action_bias = jnp.array([a.action_bias for a in acts])
_visc_quad = jnp.array([a.viscous_quadratic for a in acts])
_back_emf = jnp.array([a.back_emf_gain for a in acts])
# ── Batched step (N substeps per call) ──────────────────────
@jax.jit
@@ -170,8 +169,8 @@ class MJXRunner(BaseRunner[MJXRunnerConfig]):
ctrl = jnp.where(at_hi | at_lo, 0.0, ctrl)
if _has_motor:
# Action bias + Deadzone + asymmetric gear compensation
mc = ctrl[:, _ctrl_ids] + _action_bias
# Deadzone + asymmetric gear compensation
mc = ctrl[:, _ctrl_ids]
mc = jnp.where((mc >= 0) & (mc < _dz_pos), 0.0, mc)
mc = jnp.where((mc < 0) & (mc > -_dz_neg), 0.0, mc)
gear_dir = jnp.where(mc >= 0, _gear_pos, _gear_neg)
@@ -185,18 +184,21 @@ class MJXRunner(BaseRunner[MJXRunnerConfig]):
vel = d.qvel[:, _qvel_ids]
mc = d.ctrl[:, _ctrl_ids]
# Coulomb friction (direction-dependent + Stribeck)
# Coulomb friction (direction-dependent)
fl = jnp.where(vel > 0, _fl_pos, _fl_neg)
stribeck_mult = 1.0 + _stribeck_boost * jnp.exp(
-jnp.abs(vel) / _stribeck_vel
)
fl = fl * stribeck_mult
torque = -jnp.where(
jnp.abs(vel) > 1e-6, jnp.sign(vel) * fl, 0.0,
)
# Viscous damping (direction-dependent)
damp = jnp.where(vel > 0, _damp_pos, _damp_neg)
torque = torque - damp * vel
# Quadratic velocity drag
torque = torque - _visc_quad * vel * jnp.abs(vel)
# Back-EMF torque reduction
bemf = _back_emf * vel * jnp.sign(mc)
torque = torque - jnp.where(
jnp.abs(mc) > 1e-6, bemf, 0.0,
)
torque = jnp.clip(torque, -10.0, 10.0)
d = d.replace(
qfrc_applied=d.qfrc_applied.at[:, _qvel_ids].set(torque),

View File

@@ -1,4 +1,5 @@
import dataclasses
import math
import os
import tempfile
import xml.etree.ElementTree as ET
@@ -13,34 +14,12 @@ from src.core.robot import RobotConfig
from src.core.runner import BaseRunner, BaseRunnerConfig
@dataclasses.dataclass
class DomainRandConfig:
"""Per-reset randomization of MuJoCo model parameters.
Each field is a fractional range: the nominal value is multiplied by
``uniform(1 - frac, 1 + frac)``. Set to 0.0 to disable.
"""
mass_frac: float = 0.0 # body masses
friction_frac: float = 0.0 # joint frictionloss
damping_frac: float = 0.0 # joint damping
armature_frac: float = 0.0 # joint armature (reflected inertia)
gear_frac: float = 0.0 # actuator gear ratio
com_offset: float = 0.0 # center-of-mass shift (metres)
@dataclasses.dataclass
class MuJoCoRunnerConfig(BaseRunnerConfig):
num_envs: int = 16
device: str = "cpu"
dt: float = 0.002
substeps: int = 10
# ── Sim2real ─────────────────────────────────────────────────
domain_rand: DomainRandConfig = dataclasses.field(default_factory=DomainRandConfig)
def __post_init__(self) -> None:
# Hydra passes domain_rand as a dict — convert to dataclass.
if isinstance(self.domain_rand, dict):
self.domain_rand = DomainRandConfig(**self.domain_rand)
class ActuatorLimits:
@@ -225,6 +204,8 @@ def load_mujoco_model(robot: RobotConfig) -> mujoco.MjModel:
class MuJoCoRunner(BaseRunner[MuJoCoRunnerConfig]):
def __init__(self, env: BaseEnv, config: MuJoCoRunnerConfig):
super().__init__(env, config)
@property
def num_envs(self) -> int:
@@ -253,15 +234,6 @@ class MuJoCoRunner(BaseRunner[MuJoCoRunnerConfig]):
qvel_idx = self._model.jnt_dofadr[jnt_id]
self._motor_actuators.append((act, qvel_idx))
# ── Domain randomization: store nominal values ───────────
self._nominal_mass = self._model.body_mass.copy()
self._nominal_inertia = self._model.body_inertia.copy()
self._nominal_ipos = self._model.body_ipos.copy()
self._nominal_damping = self._model.dof_damping.copy()
self._nominal_armature = self._model.dof_armature.copy()
self._nominal_frictionloss = self._model.dof_frictionloss.copy()
self._nominal_gear = self._model.actuator_gear.copy()
def _sim_step(self, actions: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
actions_np: np.ndarray = actions.cpu().numpy()
@@ -275,14 +247,24 @@ class MuJoCoRunner(BaseRunner[MuJoCoRunnerConfig]):
qpos_batch = np.zeros((self.num_envs, self._nq), dtype=np.float32)
qvel_batch = np.zeros((self.num_envs, self._nv), dtype=np.float32)
# Per-env domain-randomization scales (all 1.0 when DR is disabled).
fr_scale = self._dr_scales["friction_scale"].cpu().numpy()
dp_scale = self._dr_scales["damping_scale"].cpu().numpy()
tq_scale = self._dr_scales["torque_scale"].cpu().numpy()
for i, data in enumerate(self._data):
data.ctrl[:] = actions_np[i]
# torque_scale emulates motor-constant / battery-voltage variation.
data.ctrl[:] = actions_np[i] * tq_scale[i]
for _ in range(self.config.substeps):
# Apply asymmetric motor forces via qfrc_applied
for act, qvel_idx in self._motor_actuators:
vel = data.qvel[qvel_idx]
ctrl = data.ctrl[0] # TODO: generalise for multi-actuator
data.qfrc_applied[qvel_idx] = act.compute_motor_force(vel, ctrl)
data.qfrc_applied[qvel_idx] = act.compute_motor_force(
vel, ctrl,
friction_scale=fr_scale[i],
damping_scale=dp_scale[i],
)
self._limits.enforce(self._model, data)
mujoco.mj_step(self._model, data)
@@ -301,41 +283,22 @@ class MuJoCoRunner(BaseRunner[MuJoCoRunnerConfig]):
qpos_batch = np.zeros((n, self._nq), dtype=np.float32)
qvel_batch = np.zeros((n, self._nv), dtype=np.float32)
# ── Domain randomization ─────────────────────────────────
dr = self.config.domain_rand
if dr.mass_frac > 0:
scale = np.random.uniform(1 - dr.mass_frac, 1 + dr.mass_frac,
size=self._nominal_mass.shape)
self._model.body_mass[:] = self._nominal_mass * scale
self._model.body_inertia[:] = self._nominal_inertia * scale[:, None]
if dr.com_offset > 0:
offset = np.random.uniform(-dr.com_offset, dr.com_offset,
size=self._nominal_ipos.shape)
self._model.body_ipos[:] = self._nominal_ipos + offset
if dr.damping_frac > 0:
scale = np.random.uniform(1 - dr.damping_frac, 1 + dr.damping_frac,
size=self._nominal_damping.shape)
self._model.dof_damping[:] = np.maximum(0, self._nominal_damping * scale)
if dr.armature_frac > 0:
scale = np.random.uniform(1 - dr.armature_frac, 1 + dr.armature_frac,
size=self._nominal_armature.shape)
self._model.dof_armature[:] = np.maximum(0, self._nominal_armature * scale)
if dr.friction_frac > 0:
scale = np.random.uniform(1 - dr.friction_frac, 1 + dr.friction_frac,
size=self._nominal_frictionloss.shape)
self._model.dof_frictionloss[:] = np.maximum(0, self._nominal_frictionloss * scale)
if dr.gear_frac > 0:
scale = np.random.uniform(1 - dr.gear_frac, 1 + dr.gear_frac,
size=self._nominal_gear.shape)
self._model.actuator_gear[:] = self._nominal_gear * scale
# Check if env has a pendulum_init_range_deg for wide pendulum randomisation
pend_range = getattr(self.env.config, "pendulum_init_range_deg", 0.0)
pend_range_rad = math.radians(pend_range) if pend_range > 0 else 0.0
for i, env_id in enumerate(ids):
data = self._data[env_id]
mujoco.mj_resetData(self._model, data)
# Small random perturbation for exploration
# Small random perturbation for motor angle + velocity
data.qpos[:] += np.random.uniform(-0.05, 0.05, size=self._nq)
data.qvel[:] += np.random.uniform(-0.05, 0.05, size=self._nv)
# Wide pendulum angle randomisation (overrides the small perturbation)
if pend_range_rad > 0 and self._nq >= 2:
data.qpos[1] = np.random.uniform(-pend_range_rad, pend_range_rad)
data.ctrl[:] = 0.0
qpos_batch[i] = data.qpos

186
src/sysid/motor/export.py Normal file
View File

@@ -0,0 +1,186 @@
"""Export tuned motor parameters to MJCF and robot.yaml files.
Reads the original motor.xml and robot.yaml, patches with optimised
parameter values, and writes motor_tuned.xml + robot_tuned.yaml.
Usage:
python -m src.sysid.motor.export \
--asset-path assets/motor \
--result assets/motor/motor_sysid_result.json
"""
from __future__ import annotations
import argparse
import copy
import json
import xml.etree.ElementTree as ET
from pathlib import Path
import structlog
import yaml
log = structlog.get_logger()
_DEFAULT_ASSET = "assets/motor"
def export_tuned_files(
asset_path: str | Path,
params: dict[str, float],
) -> tuple[Path, Path]:
"""Write tuned MJCF and robot.yaml files.
Returns (tuned_mjcf_path, tuned_robot_yaml_path).
"""
asset_path = Path(asset_path).resolve()
robot_yaml_path = asset_path / "robot.yaml"
robot_cfg = yaml.safe_load(robot_yaml_path.read_text())
mjcf_path = asset_path / robot_cfg["mjcf"]
# ── Tune MJCF ────────────────────────────────────────────────
tree = ET.parse(str(mjcf_path))
root = tree.getroot()
# Actuator — use average gear for the MJCF model.
gear_pos = params.get("actuator_gear_pos", params.get("actuator_gear"))
gear_neg = params.get("actuator_gear_neg", params.get("actuator_gear"))
gear_avg = None
if gear_pos is not None and gear_neg is not None:
gear_avg = (gear_pos + gear_neg) / 2.0
elif gear_pos is not None:
gear_avg = gear_pos
filter_tau = params.get("actuator_filter_tau")
for act_el in root.iter("general"):
if act_el.get("name") == "motor":
if gear_avg is not None:
act_el.set("gear", str(gear_avg))
if filter_tau is not None:
if filter_tau > 0:
act_el.set("dyntype", "filter")
act_el.set("dynprm", str(filter_tau))
else:
act_el.set("dyntype", "none")
# Joint — average damping & friction for MJCF (asymmetry in runtime).
fl_pos = params.get("motor_frictionloss_pos", params.get("motor_frictionloss"))
fl_neg = params.get("motor_frictionloss_neg", params.get("motor_frictionloss"))
fl_avg = None
if fl_pos is not None and fl_neg is not None:
fl_avg = (fl_pos + fl_neg) / 2.0
elif fl_pos is not None:
fl_avg = fl_pos
damp_pos = params.get("motor_damping_pos", params.get("motor_damping"))
damp_neg = params.get("motor_damping_neg", params.get("motor_damping"))
damp_avg = None
if damp_pos is not None and damp_neg is not None:
damp_avg = (damp_pos + damp_neg) / 2.0
elif damp_pos is not None:
damp_avg = damp_pos
for jnt in root.iter("joint"):
if jnt.get("name") == "motor_joint":
if damp_avg is not None:
jnt.set("damping", str(damp_avg))
if "motor_armature" in params:
jnt.set("armature", str(params["motor_armature"]))
if fl_avg is not None:
jnt.set("frictionloss", str(fl_avg))
# Rotor mass.
if "rotor_mass" in params:
for geom in root.iter("geom"):
if geom.get("name") == "rotor_disk":
geom.set("mass", str(params["rotor_mass"]))
# Write tuned MJCF.
tuned_mjcf_name = mjcf_path.stem + "_tuned" + mjcf_path.suffix
tuned_mjcf_path = asset_path / tuned_mjcf_name
ET.indent(tree, space=" ")
tree.write(str(tuned_mjcf_path), xml_declaration=True, encoding="unicode")
log.info("tuned_mjcf_written", path=str(tuned_mjcf_path))
# ── Tune robot.yaml ──────────────────────────────────────────
tuned_cfg = copy.deepcopy(robot_cfg)
tuned_cfg["mjcf"] = tuned_mjcf_name
if tuned_cfg.get("actuators") and len(tuned_cfg["actuators"]) > 0:
act = tuned_cfg["actuators"][0]
if gear_avg is not None:
act["gear"] = round(gear_avg, 6)
if "actuator_filter_tau" in params:
act["filter_tau"] = round(params["actuator_filter_tau"], 6)
if "motor_damping" in params:
act["damping"] = round(params["motor_damping"], 6)
if "joints" not in tuned_cfg:
tuned_cfg["joints"] = {}
if "motor_joint" not in tuned_cfg["joints"]:
tuned_cfg["joints"]["motor_joint"] = {}
mj = tuned_cfg["joints"]["motor_joint"]
if "motor_armature" in params:
mj["armature"] = round(params["motor_armature"], 6)
if fl_avg is not None:
mj["frictionloss"] = round(fl_avg, 6)
# Asymmetric / hardware-realism / nonlinear parameters.
realism = {}
for key in [
"actuator_gear_pos", "actuator_gear_neg",
"motor_damping_pos", "motor_damping_neg",
"motor_frictionloss_pos", "motor_frictionloss_neg",
"motor_deadzone_pos", "motor_deadzone_neg",
"action_bias",
"viscous_quadratic", "back_emf_gain",
"stribeck_friction_boost", "stribeck_vel",
"gearbox_backlash",
]:
if key in params:
realism[key] = round(params[key], 6)
if realism:
tuned_cfg["hardware_realism"] = realism
tuned_yaml_path = asset_path / "robot_tuned.yaml"
header = (
"# Tuned motor config — generated by src.sysid.motor.optimize\n"
"# Original: robot.yaml\n\n"
)
tuned_yaml_path.write_text(
header + yaml.dump(tuned_cfg, default_flow_style=False, sort_keys=False)
)
log.info("tuned_robot_yaml_written", path=str(tuned_yaml_path))
return tuned_mjcf_path, tuned_yaml_path
# ── CLI ──────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(
description="Export tuned motor parameters to MJCF + robot.yaml."
)
parser.add_argument("--asset-path", type=str, default=_DEFAULT_ASSET)
parser.add_argument(
"--result", type=str, default=None,
help="Path to motor_sysid_result.json (auto-detected if omitted)",
)
args = parser.parse_args()
asset_path = Path(args.asset_path).resolve()
if args.result:
result_path = Path(args.result)
else:
result_path = asset_path / "motor_sysid_result.json"
if not result_path.exists():
raise FileNotFoundError(f"Result file not found: {result_path}")
result = json.loads(result_path.read_text())
params = result["best_params"]
export_tuned_files(asset_path=args.asset_path, params=params)
if __name__ == "__main__":
main()

View File

@@ -4,14 +4,21 @@ Minimises the trajectory-matching cost between a MuJoCo rollout and a
recorded real-robot sequence. Uses the ``cmaes`` package (pure-Python
CMA-ES with native box-constraint support).
All 28 parameters (motor + pendulum/arm) are optimised jointly from a
single full-system recording. Velocities are optionally preprocessed
with Savitzky-Golay differentiation for cleaner targets.
Motor parameters are **locked** from the motor-only sysid — only
pendulum/arm inertial parameters, joint dynamics, and ctrl_limit are
optimised. Velocities are optionally preprocessed with Savitzky-Golay
differentiation for cleaner targets.
Usage:
python -m src.sysid.optimize \
--robot-path assets/rotary_cartpole \
--recording assets/rotary_cartpole/recordings/capture_....npz
--recording assets/rotary_cartpole/recordings/capture_20260314_000435.npz
# Shorter run for testing:
python -m src.sysid.optimize \
--robot-path assets/rotary_cartpole \
--recording <file>.npz \
--max-generations 10 --population-size 8
"""
from __future__ import annotations
@@ -26,17 +33,13 @@ import numpy as np
import structlog
from src.sysid.rollout import (
LOCKED_MOTOR_PARAMS,
PARAM_SETS,
ROTARY_CARTPOLE_PARAMS,
ParamSpec,
_make_actuator,
_resolve_params,
bounds_arrays,
build_base_model,
defaults_dict,
defaults_vector,
params_to_dict,
patch_model,
rollout,
windowed_rollout,
)
@@ -45,8 +48,62 @@ log = structlog.get_logger()
# ── Velocity preprocessing ───────────────────────────────────────────
# Shared implementation in src.sysid.preprocess.
from src.sysid.preprocess import preprocess_recording as _preprocess_recording
def _preprocess_recording(
recording: dict[str, np.ndarray],
preprocess_vel: bool = True,
sg_window: int = 7,
sg_polyorder: int = 3,
) -> dict[str, np.ndarray]:
"""Optionally recompute velocities using Savitzky-Golay differentiation.
Applies SG filtering to both motor_vel and pendulum_vel, replacing
the noisy firmware finite-difference velocities with smooth
analytical derivatives of the (clean) angle signals.
"""
if not preprocess_vel:
return recording
from scipy.signal import savgol_filter
rec = dict(recording)
times = rec["time"]
dt = float(np.mean(np.diff(times)))
# Motor velocity.
rec["motor_vel_raw"] = rec["motor_vel"].copy()
rec["motor_vel"] = savgol_filter(
rec["motor_angle"],
window_length=sg_window,
polyorder=sg_polyorder,
deriv=1,
delta=dt,
)
# Pendulum velocity.
rec["pendulum_vel_raw"] = rec["pendulum_vel"].copy()
rec["pendulum_vel"] = savgol_filter(
rec["pendulum_angle"],
window_length=sg_window,
polyorder=sg_polyorder,
deriv=1,
delta=dt,
)
motor_noise = np.std(rec["motor_vel_raw"] - rec["motor_vel"])
pend_noise = np.std(rec["pendulum_vel_raw"] - rec["pendulum_vel"])
log.info(
"velocity_preprocessed",
method="savgol",
sg_window=sg_window,
sg_polyorder=sg_polyorder,
dt_ms=f"{dt*1000:.1f}",
motor_noise_std=f"{motor_noise:.3f} rad/s",
pend_noise_std=f"{pend_noise:.3f} rad/s",
)
return rec
# ── Cost function ────────────────────────────────────────────────────
@@ -115,6 +172,7 @@ def cost_function(
vel_weight: float = 0.1,
pendulum_scale: float = 3.0,
window_duration: float = 0.5,
motor_params: dict[str, float] | None = None,
) -> float:
"""Compute trajectory-matching cost for a candidate parameter vector.
@@ -140,6 +198,7 @@ def cost_function(
window_duration=window_duration,
sim_dt=sim_dt,
substeps=substeps,
motor_params=motor_params,
)
else:
sim = rollout(
@@ -148,6 +207,7 @@ def cost_function(
actions=recording["action"],
sim_dt=sim_dt,
substeps=substeps,
motor_params=motor_params,
)
except Exception as exc:
log.warning("rollout_failed", error=str(exc))
@@ -161,122 +221,6 @@ def cost_function(
return _compute_trajectory_cost(sim, recording, pos_weight, vel_weight, pendulum_scale)
# ── Fast cost (model-cached) ────────────────────────────────────────
def _fast_cost(
params_vec: np.ndarray,
recording: dict[str, np.ndarray],
model, # mujoco.MjModel (cached, mutated in-place)
body_ids: dict[str, int],
dof_ids: dict[str, int],
specs: list[ParamSpec],
sim_dt: float = 0.002,
substeps: int = 10,
pos_weight: float = 1.0,
vel_weight: float = 0.1,
pendulum_scale: float = 3.0,
window_duration: float = 0.5,
) -> float:
"""Like cost_function but patches a cached model instead of rebuilding."""
import mujoco as _mj
from src.runners.mujoco import ActuatorLimits
params = params_to_dict(params_vec, specs)
if not _check_inertia_valid(params):
return 1e6
# Patch the cached model in-place — no XML, no temp files.
patch_model(model, params, body_ids, dof_ids)
# Rebuild actuator from params (motor params change per candidate).
actuator = _make_actuator(params)
try:
data = _mj.MjData(model)
limits = ActuatorLimits(model)
ctrl_limit = params.get("ctrl_limit", 0.588)
times = recording["time"]
actions = recording["action"]
n = len(actions)
sim_motor_angle = np.zeros(n, dtype=np.float64)
sim_motor_vel = np.zeros(n, dtype=np.float64)
sim_pend_angle = np.zeros(n, dtype=np.float64)
sim_pend_vel = np.zeros(n, dtype=np.float64)
if window_duration > 0:
# Windowed (multiple shooting).
real_motor = recording["motor_angle"]
real_motor_vel = recording["motor_vel"]
real_pend = recording["pendulum_angle"]
real_pend_vel = recording["pendulum_vel"]
t0, t_end = times[0], times[-1]
window_starts: list[int] = []
current_t = t0
while current_t < t_end:
idx = int(np.searchsorted(times, current_t))
idx = min(idx, n - 1)
window_starts.append(idx)
current_t += window_duration
n_windows = len(window_starts)
for w, w_start in enumerate(window_starts):
w_end = window_starts[w + 1] if w + 1 < n_windows else n
_mj.mj_resetData(model, data)
data.qpos[0] = real_motor[w_start]
data.qpos[1] = real_pend[w_start]
data.qvel[0] = real_motor_vel[w_start]
data.qvel[1] = real_pend_vel[w_start]
data.ctrl[:] = 0.0
_mj.mj_forward(model, data)
for i in range(w_start, w_end):
action = max(-ctrl_limit, min(ctrl_limit, float(actions[i])))
ctrl = actuator.transform_ctrl(action)
data.ctrl[0] = ctrl
for _ in range(substeps):
limits.enforce(model, data)
data.qfrc_applied[0] = actuator.compute_motor_force(data.qvel[0], ctrl)
_mj.mj_step(model, data)
sim_motor_angle[i] = data.qpos[0]
sim_pend_angle[i] = data.qpos[1]
sim_motor_vel[i] = data.qvel[0]
sim_pend_vel[i] = data.qvel[1]
else:
# Open-loop single-shot.
_mj.mj_resetData(model, data)
for i in range(n):
action = max(-ctrl_limit, min(ctrl_limit, float(actions[i])))
ctrl = actuator.transform_ctrl(action)
data.ctrl[0] = ctrl
for _ in range(substeps):
limits.enforce(model, data)
data.qfrc_applied[0] = actuator.compute_motor_force(data.qvel[0], ctrl)
_mj.mj_step(model, data)
sim_motor_angle[i] = data.qpos[0]
sim_pend_angle[i] = data.qpos[1]
sim_motor_vel[i] = data.qvel[0]
sim_pend_vel[i] = data.qvel[1]
sim = {
"motor_angle": sim_motor_angle,
"motor_vel": sim_motor_vel,
"pendulum_angle": sim_pend_angle,
"pendulum_vel": sim_pend_vel,
}
except Exception:
return 1e6
for key in sim:
if np.any(~np.isfinite(sim[key])):
return 1e6
return _compute_trajectory_cost(sim, recording, pos_weight, vel_weight, pendulum_scale)
# ── Parallel evaluation helper (module-level for pickling) ───────────
# Shared state set by the parent process before spawning workers.
@@ -285,34 +229,22 @@ _par_robot_path: Path = Path(".")
_par_specs: list[ParamSpec] = []
_par_kwargs: dict = {}
# Cached model (built once per worker, patched per candidate).
_par_model: object = None # mujoco.MjModel
_par_body_ids: dict[str, int] = {}
_par_dof_ids: dict[str, int] = {}
def _init_worker(recording, robot_path, specs, kwargs):
"""Initialiser run once per worker process."""
global _par_recording, _par_robot_path, _par_specs, _par_kwargs
global _par_model, _par_body_ids, _par_dof_ids
_par_recording = recording
_par_robot_path = robot_path
_par_specs = specs
_par_kwargs = kwargs
# Build base model once — reused for all candidates in this worker.
_par_model, _, _par_body_ids, _par_dof_ids = build_base_model(robot_path)
_par_model.opt.timestep = kwargs.get("sim_dt", 0.002)
def _eval_candidate(x_natural: np.ndarray) -> float:
"""Evaluate a single candidate — called in worker processes."""
return _fast_cost(
return cost_function(
x_natural,
_par_recording,
_par_model,
_par_body_ids,
_par_dof_ids,
_par_robot_path,
_par_specs,
**_par_kwargs,
)
@@ -336,11 +268,13 @@ def optimize(
window_duration: float = 0.5,
seed: int = 42,
preprocess_vel: bool = True,
sg_window: int = 7,
sg_polyorder: int = 3,
) -> dict:
"""Run CMA-ES optimisation and return results.
All parameters (motor + pendulum/arm) are optimised jointly from a
single full-system recording.
Motor parameters are locked from the motor-only sysid.
Only pendulum/arm parameters are optimised.
Returns a dict with:
best_params: dict[str, float]
@@ -348,6 +282,7 @@ def optimize(
history: list of (generation, best_cost) tuples
recording: str (path used)
specs: list of param names
motor_params: dict of locked motor params
"""
from cmaes import CMA
@@ -357,11 +292,23 @@ def optimize(
if specs is None:
specs = ROTARY_CARTPOLE_PARAMS
motor_params = dict(LOCKED_MOTOR_PARAMS)
log.info(
"motor_params_locked",
n_params=len(motor_params),
gear_avg=f"{(motor_params['actuator_gear_pos'] + motor_params['actuator_gear_neg']) / 2:.4f}",
)
# Load recording.
recording = dict(np.load(recording_path))
# Preprocessing: SG velocity recomputation.
recording = _preprocess_recording(recording, preprocess_vel=preprocess_vel)
recording = _preprocess_recording(
recording,
preprocess_vel=preprocess_vel,
sg_window=sg_window,
sg_polyorder=sg_polyorder,
)
n_samples = len(recording["time"])
duration = recording["time"][-1] - recording["time"][0]
@@ -429,6 +376,7 @@ def optimize(
vel_weight=vel_weight,
pendulum_scale=pendulum_scale,
window_duration=window_duration,
motor_params=motor_params,
)
log.info("parallel_workers", n_workers=n_workers)
@@ -480,19 +428,6 @@ def optimize(
gen_best=f"{min(c for _, c in solutions):.6f}",
)
# Early stopping: no improvement in last 50 generations.
patience = 50
if len(history) > patience:
old_cost = history[-patience][1]
if old_cost - best_cost < old_cost * 0.001:
log.info(
"early_stopping",
gen=gen,
best_cost=f"{best_cost:.6f}",
stall_gens=patience,
)
break
total_time = time.monotonic() - t0
# Clean up process pool.
@@ -506,8 +441,7 @@ def optimize(
"cmaes_finished",
best_cost=f"{best_cost:.6f}",
total_time=f"{total_time:.1f}s",
generations=len(history),
evaluations=len(history) * population_size,
evaluations=max_generations * population_size,
)
# Log parameter comparison.
@@ -531,6 +465,7 @@ def optimize(
"recording": str(recording_path),
"param_names": [s.name for s in specs],
"defaults": {s.name: s.default for s in specs},
"motor_params": motor_params,
"preprocess_vel": preprocess_vel,
"timestamp": datetime.now().isoformat(),
}
@@ -581,12 +516,16 @@ def main() -> None:
action="store_true",
help="Skip Savitzky-Golay velocity preprocessing",
)
parser.add_argument("--sg-window", type=int, default=7,
help="Savitzky-Golay window length (odd, default 7)")
parser.add_argument("--sg-polyorder", type=int, default=3,
help="Savitzky-Golay polynomial order (default 3)")
parser.add_argument(
"--param-set",
type=str,
default="full",
choices=list(PARAM_SETS.keys()),
help="Parameter set to optimize: 'full' (28), 'motor' (13), or 'pendulum' (15)",
help="Parameter set to optimize: 'reduced' (6 params, fast) or 'full' (15 params)",
)
args = parser.parse_args()
@@ -607,6 +546,8 @@ def main() -> None:
window_duration=args.window_duration,
seed=args.seed,
preprocess_vel=not args.no_preprocess_vel,
sg_window=args.sg_window,
sg_polyorder=args.sg_polyorder,
)
# Save results JSON.
@@ -631,6 +572,7 @@ def main() -> None:
export_tuned_files(
robot_path=args.robot_path,
params=result["best_params"],
motor_params=result.get("motor_params"),
)

View File

@@ -1,158 +0,0 @@
"""Recording preprocessing — clean velocity estimation from angle data.
The ESP32 firmware computes velocity as a raw finite-difference of encoder
counts at 50 Hz. With a 1320 CPR encoder that gives ~0.24 rad/s of
quantisation noise per count. This module replaces the noisy firmware
velocity with a smooth differentiation of the (much cleaner) angle signal.
Pipeline:
1. Dequantize angle signal (Gaussian smooth to round off staircase edges)
2. Savitzky-Golay differentiation to compute velocity (zero phase lag)
This is standard practice in robotics sysid — see e.g. MATLAB System ID
Toolbox, Drake's trajectory processing, or ETH's ANYmal sysid pipeline.
"""
from __future__ import annotations
import numpy as np
from scipy.ndimage import gaussian_filter1d
from scipy.signal import savgol_filter
import structlog
log = structlog.get_logger()
# Default encoder CPR for angle dequantization.
_DEFAULT_CPR = 1320
def dequantize_angle(
angles: np.ndarray,
cpr: int = _DEFAULT_CPR,
sigma: float = 0.6,
) -> np.ndarray:
"""Smooth out encoder quantization staircase before differentiation.
A 1320 CPR encoder quantizes the angle signal into 0.00476 rad steps.
When differentiated, these steps produce velocity spikes (Dirac-like).
A tiny Gaussian blur (sigma=0.6 samples ~ 12ms at 50 Hz) rounds the
staircase edges without affecting dynamics (<2 Hz bandwidth loss).
"""
if sigma <= 0 or cpr <= 0:
return angles
return gaussian_filter1d(angles, sigma=sigma)
def recompute_velocity(
recording: dict[str, np.ndarray],
window_length: int = 11,
polyorder: int = 3,
deriv: int = 1,
dequantize: bool = True,
dequant_sigma: float = 0.6,
keys: tuple[str, str] = ("motor_angle", "motor_vel"),
) -> dict[str, np.ndarray]:
"""Recompute velocity from angle using Savitzky-Golay differentiation.
Parameters
----------
recording : dict with at least 'time' and the angle/vel keys.
window_length : SG filter window (must be odd, > polyorder).
polyorder : Polynomial order for the SG filter (3 = cubic).
deriv : Derivative order (1 = first derivative = velocity).
dequantize : Whether to smooth encoder quantization staircase.
dequant_sigma : Gaussian sigma for dequantization (samples).
keys : (angle_key, vel_key) to process.
Returns
-------
New recording dict with vel replaced and vel_raw added.
"""
angle_key, vel_key = keys
rec = dict(recording)
times = rec["time"]
angles = rec[angle_key].copy()
dt = np.mean(np.diff(times))
rec[f"{vel_key}_raw"] = rec[vel_key].copy()
if dequantize:
angles = dequantize_angle(angles, sigma=dequant_sigma)
vel_sg = savgol_filter(
angles,
window_length=window_length,
polyorder=polyorder,
deriv=deriv,
delta=dt,
)
raw_vel = rec[f"{vel_key}_raw"]
noise_estimate = np.std(raw_vel - vel_sg)
max_diff = np.max(np.abs(raw_vel - vel_sg))
log.info(
"velocity_recomputed",
channel=vel_key,
method="savgol",
window=window_length,
polyorder=polyorder,
dt=f"{dt*1000:.1f}ms",
noise_std=f"{noise_estimate:.3f} rad/s",
max_diff=f"{max_diff:.3f} rad/s",
)
rec[vel_key] = vel_sg
return rec
def preprocess_recording(
recording: dict[str, np.ndarray],
preprocess_vel: bool = True,
sg_window: int = 11,
sg_polyorder: int = 3,
) -> dict[str, np.ndarray]:
"""Preprocess a full-system recording (motor + pendulum velocities).
Applies SG differentiation to both motor and pendulum channels.
"""
if not preprocess_vel:
return recording
rec = recompute_velocity(
recording,
window_length=sg_window,
polyorder=sg_polyorder,
keys=("motor_angle", "motor_vel"),
)
if "pendulum_angle" in rec and "pendulum_vel" in rec:
rec = recompute_velocity(
rec,
window_length=sg_window,
polyorder=sg_polyorder,
keys=("pendulum_angle", "pendulum_vel"),
)
return rec
def match_filter_velocity(
sim_vel: np.ndarray,
real_vel: np.ndarray,
dt: float,
cutoff_hz: float = 8.0,
) -> tuple[np.ndarray, np.ndarray]:
"""Apply identical lowpass to both sim and real velocity for fair comparison.
MuJoCo sim velocity has full physics bandwidth while real velocity is
band-limited by the SG filter. Matching ensures fair cost comparison.
"""
from scipy.signal import butter, filtfilt
fs = 1.0 / dt
nyq = fs / 2.0
norm_cutoff = min(cutoff_hz / nyq, 0.99)
b, a = butter(2, norm_cutoff, btype="low")
return filtfilt(b, a, sim_vel), filtfilt(b, a, real_vel)

View File

@@ -7,11 +7,12 @@ the simulated trajectory for comparison with the real recording.
This module is the inner loop of the CMA-ES optimizer: it is called once
per candidate parameter vector per generation.
All motor + pendulum/arm parameters are optimised jointly from a single
full-system recording. The asymmetric motor model (deadzone, gear
compensation, Coulomb friction + Stribeck, viscous damping, action bias)
is applied via ``ActuatorConfig.transform_ctrl()`` and
``compute_motor_force()``.
Motor parameters are **locked** from the motor-only sysid result.
The optimizer only fits
pendulum/arm inertial parameters, pendulum joint dynamics, and
``ctrl_limit``. The asymmetric motor model (deadzone, gear compensation,
Coulomb friction, viscous damping, quadratic drag, back-EMF) is applied
via ``ActuatorConfig.transform_ctrl()`` and ``compute_motor_force()``.
"""
from __future__ import annotations
@@ -31,6 +32,26 @@ from src.runners.mujoco import ActuatorLimits, load_mujoco_model
from src.sysid._urdf import patch_link_inertials
# ── Locked motor parameters (from motor-only sysid) ─────────────────
# These are FIXED and not optimised. They come from the 12-param model
# in robot.yaml (from motor-only sysid, cost 0.862).
LOCKED_MOTOR_PARAMS: dict[str, float] = {
"actuator_gear_pos": 0.424182,
"actuator_gear_neg": 0.425031,
"actuator_filter_tau": 0.00503506,
"motor_damping_pos": 0.00202682,
"motor_damping_neg": 0.0146651,
"motor_armature": 0.00277342,
"motor_frictionloss_pos": 0.0573282,
"motor_frictionloss_neg": 0.0533549,
"viscous_quadratic": 0.000285329,
"back_emf_gain": 0.00675809,
"motor_deadzone_pos": 0.141291,
"motor_deadzone_neg": 0.0780148,
}
# ── Tunable parameter specification ──────────────────────────────────
@@ -45,31 +66,16 @@ class ParamSpec:
log_scale: bool = False # optimise in log-space (masses, inertias)
# ── Motor parameters (13) ────────────────────────────────────────────
# Defaults from motor-only sysid (cost 0.2117).
MOTOR_PARAMS: list[ParamSpec] = [
ParamSpec("actuator_gear_pos", 0.371194, 0.005, 1.5, log_scale=True),
ParamSpec("actuator_gear_neg", 0.428143, 0.005, 1.5, log_scale=True),
ParamSpec("actuator_filter_tau", 0.022301, 0.001, 0.20),
ParamSpec("motor_damping_pos", 0.001384, 1e-5, 0.1, log_scale=True),
ParamSpec("motor_damping_neg", 0.005196, 1e-5, 0.1, log_scale=True),
ParamSpec("motor_armature", 0.002753, 1e-6, 0.01, log_scale=True),
ParamSpec("motor_frictionloss_pos", 0.036744, 0.001, 0.2, log_scale=True),
ParamSpec("motor_frictionloss_neg", 0.069082, 0.001, 0.2, log_scale=True),
ParamSpec("stribeck_friction_boost", 0.0, 0.0, 0.15),
ParamSpec("stribeck_vel", 2.0, 0.1, 8.0),
ParamSpec("motor_deadzone_pos", 0.14182, 0.0, 0.30),
ParamSpec("motor_deadzone_neg", 0.031454, 0.0, 0.30),
ParamSpec("action_bias", 0.0, -0.10, 0.10),
]
# ── Pendulum/arm parameters (15) ─────────────────────────────────────
# Defaults from Fusion 360 URDF export.
PENDULUM_PARAMS: list[ParamSpec] = [
# Pendulum sysid parameters — motor params are LOCKED (not here).
# Order matters: the optimizer maps a flat vector to these specs.
# Defaults are from the URDF exported by Fusion 360.
ROTARY_CARTPOLE_PARAMS: list[ParamSpec] = [
# ── Arm link (URDF) ──────────────────────────────────────────
ParamSpec("arm_mass", 0.02110, 0.005, 0.08, log_scale=True),
ParamSpec("arm_com_x", -0.00710, -0.03, 0.03),
ParamSpec("arm_com_y", 0.00085, -0.02, 0.02),
ParamSpec("arm_com_z", 0.00795, -0.02, 0.02),
# ── Pendulum link (URDF) ─────────────────────────────────────
ParamSpec("pendulum_mass", 0.03937, 0.010, 0.10, log_scale=True),
ParamSpec("pendulum_com_x", 0.06025, 0.01, 0.15),
ParamSpec("pendulum_com_y", -0.07602, -0.20, 0.0),
@@ -78,18 +84,37 @@ PENDULUM_PARAMS: list[ParamSpec] = [
ParamSpec("pendulum_iyy", 3.70e-05, 1e-07, 1e-03, log_scale=True),
ParamSpec("pendulum_izz", 7.83e-05, 1e-07, 1e-03, log_scale=True),
ParamSpec("pendulum_ixy", -6.93e-06, -1e-03, 1e-03),
# ── Pendulum joint dynamics ──────────────────────────────────
ParamSpec("pendulum_damping", 0.0001, 1e-6, 0.05, log_scale=True),
ParamSpec("pendulum_frictionloss", 0.0001, 1e-6, 0.05, log_scale=True),
# ── Hardware realism (control pipeline) ────────────────────
ParamSpec("ctrl_limit", 0.588, 0.45, 0.70), # MAX_MOTOR_SPEED / 255
]
# Extended set: full params + motor armature (compensates for the
# motor-only sysid having captured arm/pendulum loading in armature).
EXTENDED_PARAMS: list[ParamSpec] = ROTARY_CARTPOLE_PARAMS + [
ParamSpec("motor_armature", 0.00277, 0.0005, 0.02, log_scale=True),
]
# Reduced set: only the 6 most impactful pendulum parameters.
# Good for a fast first pass — converges in ~50 generations.
REDUCED_PARAMS: list[ParamSpec] = [
ParamSpec("pendulum_mass", 0.03937, 0.010, 0.10, log_scale=True),
ParamSpec("pendulum_com_x", 0.06025, 0.01, 0.15),
ParamSpec("pendulum_com_y", -0.07602, -0.20, 0.0),
ParamSpec("pendulum_ixx", 6.20e-05, 1e-07, 1e-03, log_scale=True),
ParamSpec("pendulum_damping", 0.0001, 1e-6, 0.05, log_scale=True),
ParamSpec("pendulum_frictionloss", 0.0001, 1e-6, 0.05, log_scale=True),
ParamSpec("ctrl_limit", 0.588, 0.45, 0.70),
]
# ── Combined parameter sets ──────────────────────────────────────────
ROTARY_CARTPOLE_PARAMS: list[ParamSpec] = MOTOR_PARAMS + PENDULUM_PARAMS
PARAM_SETS: dict[str, list[ParamSpec]] = {
"full": ROTARY_CARTPOLE_PARAMS,
"motor": MOTOR_PARAMS,
"pendulum": PENDULUM_PARAMS,
"extended": EXTENDED_PARAMS,
"reduced": REDUCED_PARAMS,
}
@@ -109,13 +134,6 @@ def defaults_vector(specs: list[ParamSpec] | None = None) -> np.ndarray:
return np.array([s.default for s in specs], dtype=np.float64)
def defaults_dict(specs: list[ParamSpec] | None = None) -> dict[str, float]:
"""Return the default parameter dict (all 28 params)."""
if specs is None:
specs = ROTARY_CARTPOLE_PARAMS
return {s.name: s.default for s in specs}
def bounds_arrays(
specs: list[ParamSpec] | None = None,
) -> tuple[np.ndarray, np.ndarray]:
@@ -130,23 +148,19 @@ def bounds_arrays(
# ── MuJoCo model building with parameter overrides ──────────────────
def _resolve_params(params: dict[str, float]) -> dict[str, float]:
"""Fill in defaults for any missing params."""
full = defaults_dict()
full.update(params)
return full
def _build_model(
robot_path: Path,
params: dict[str, float],
motor_params: dict[str, float] | None = None,
) -> tuple[mujoco.MjModel, ActuatorConfig]:
"""Build a MuJoCo model with sysid overrides.
Returns (model, actuator) — use ``actuator.transform_ctrl()`` and
``actuator.compute_motor_force()`` in the rollout loop.
"""
p = _resolve_params(params)
if motor_params is None:
motor_params = LOCKED_MOTOR_PARAMS
robot_path = Path(robot_path).resolve()
# ── Patch URDF inertial parameters to a temp file ────────────
@@ -154,7 +168,7 @@ def _build_model(
urdf_path = robot_path / robot_yaml["urdf"]
tree = ET.parse(urdf_path)
patch_link_inertials(tree.getroot(), p)
patch_link_inertials(tree.getroot(), params)
fd, tmp_urdf = tempfile.mkstemp(
suffix=".urdf", prefix="_sysid_", dir=str(robot_path),
@@ -163,22 +177,39 @@ def _build_model(
tmp_urdf_path = Path(tmp_urdf)
tree.write(str(tmp_urdf_path), xml_declaration=True, encoding="unicode")
# ── Build RobotConfig with full motor model ──────────────────
# ── Build RobotConfig with full motor sysid values ───────────
gear_pos = motor_params.get("actuator_gear_pos", 0.424182)
gear_neg = motor_params.get("actuator_gear_neg", 0.425031)
motor_armature = params.get(
"motor_armature",
motor_params.get("motor_armature", 0.00277342),
)
pend_damping = params.get("pendulum_damping", 0.0001)
pend_frictionloss = params.get("pendulum_frictionloss", 0.0001)
act_cfg = robot_yaml["actuators"][0]
ctrl_lo, ctrl_hi = act_cfg.get("ctrl_range", [-1.0, 1.0])
actuator = ActuatorConfig(
joint=act_cfg["joint"],
type="motor",
gear=(p["actuator_gear_pos"], p["actuator_gear_neg"]),
ctrl_range=(act_cfg.get("ctrl_range", [-1.0, 1.0])[0],
act_cfg.get("ctrl_range", [-1.0, 1.0])[1]),
deadzone=(p["motor_deadzone_pos"], p["motor_deadzone_neg"]),
damping=(p["motor_damping_pos"], p["motor_damping_neg"]),
frictionloss=(p["motor_frictionloss_pos"], p["motor_frictionloss_neg"]),
stribeck_friction_boost=p["stribeck_friction_boost"],
stribeck_vel=p["stribeck_vel"],
action_bias=p["action_bias"],
filter_tau=p["actuator_filter_tau"],
gear=(gear_pos, gear_neg),
ctrl_range=(ctrl_lo, ctrl_hi),
deadzone=(
motor_params.get("motor_deadzone_pos", 0.141),
motor_params.get("motor_deadzone_neg", 0.078),
),
damping=(
motor_params.get("motor_damping_pos", 0.002),
motor_params.get("motor_damping_neg", 0.015),
),
frictionloss=(
motor_params.get("motor_frictionloss_pos", 0.057),
motor_params.get("motor_frictionloss_neg", 0.053),
),
filter_tau=motor_params.get("actuator_filter_tau", 0.005),
viscous_quadratic=motor_params.get("viscous_quadratic", 0.000285),
back_emf_gain=motor_params.get("back_emf_gain", 0.00676),
)
robot = RobotConfig(
@@ -187,12 +218,12 @@ def _build_model(
joints={
"motor_joint": JointConfig(
damping=0.0,
armature=p["motor_armature"],
armature=motor_armature,
frictionloss=0.0,
),
"pendulum_joint": JointConfig(
damping=p["pendulum_damping"],
frictionloss=p["pendulum_frictionloss"],
damping=pend_damping,
frictionloss=pend_frictionloss,
),
},
)
@@ -205,136 +236,7 @@ def _build_model(
return model, actuator
def build_base_model(
robot_path: str | Path,
params: dict[str, float] | None = None,
) -> tuple[mujoco.MjModel, ActuatorConfig, dict[str, int], dict[str, int]]:
"""Build a base MuJoCo model once (with default or given params).
Returns (model, actuator, body_ids, dof_ids) where body_ids/dof_ids
map names to indices for fast in-place patching.
"""
if params is None:
params = defaults_dict()
model, actuator = _build_model(Path(robot_path).resolve(), params)
body_ids = {}
for name in ("arm", "pendulum"):
body_ids[name] = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_BODY, name)
dof_ids = {}
for name in ("motor_joint", "pendulum_joint"):
jnt_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_JOINT, name)
dof_ids[name] = model.jnt_dofadr[jnt_id]
return model, actuator, body_ids, dof_ids
def _rotmat_to_quat(R: np.ndarray) -> np.ndarray:
"""Convert a 3×3 rotation matrix to a MuJoCo quaternion [w, x, y, z]."""
quat = np.empty(4)
mujoco.mju_mat2Quat(quat, R.flatten())
return quat
def _patch_pendulum_inertia(
model: mujoco.MjModel,
body_id: int,
params: dict[str, float],
) -> None:
"""Diagonalize pendulum inertia tensor and write to model."""
defs = defaults_dict()
ixx = params.get("pendulum_ixx", defs["pendulum_ixx"])
iyy = params.get("pendulum_iyy", defs["pendulum_iyy"])
izz = params.get("pendulum_izz", defs["pendulum_izz"])
ixy = params.get("pendulum_ixy", defs.get("pendulum_ixy", 0.0))
# Build full 3×3 inertia tensor (ixz = iyz = 0 in our URDF).
I = np.array([
[ixx, ixy, 0.0],
[ixy, iyy, 0.0],
[0.0, 0.0, izz],
])
# Eigendecompose — eigenvalues are principal moments, eigenvectors
# define the rotation from link frame to principal frame.
eigenvalues, eigenvectors = np.linalg.eigh(I)
# eigh returns ascending order; MuJoCo wants them in the order that
# the eigenvector matrix forms a proper rotation (det = +1).
if np.linalg.det(eigenvectors) < 0:
eigenvectors[:, 0] *= -1
# Clamp tiny/negative eigenvalues (numerical noise).
eigenvalues = np.maximum(eigenvalues, 1e-10)
# Enforce triangle inequality (required for valid rigid body inertia).
# For principal moments sorted ascending (a <= b <= c): a + b >= c.
# When violated, MuJoCo falls back to spherical inertia (trace/3).
# We must match this to keep fast path consistent with URDF loading.
s = np.sort(eigenvalues)
if s[0] + s[1] < s[2]:
sphere = np.sum(eigenvalues) / 3.0
eigenvalues[:] = sphere
eigenvectors = np.eye(3)
model.body_inertia[body_id] = eigenvalues
model.body_iquat[body_id] = _rotmat_to_quat(eigenvectors)
def patch_model(
model: mujoco.MjModel,
params: dict[str, float],
body_ids: dict[str, int],
dof_ids: dict[str, int],
) -> None:
"""Patch a cached MuJoCo model in-place with new sysid parameters.
Much faster than _build_model: no XML parsing, no temp files, no
model reload. Just direct array writes.
"""
p = _resolve_params(params)
# ── Arm body ─────────────────────────────────────────────────
arm_id = body_ids["arm"]
model.body_mass[arm_id] = p["arm_mass"]
for i, key in enumerate(("arm_com_x", "arm_com_y", "arm_com_z")):
model.body_ipos[arm_id, i] = p[key]
# ── Pendulum body ────────────────────────────────────────────
pend_id = body_ids["pendulum"]
model.body_mass[pend_id] = p["pendulum_mass"]
for i, key in enumerate(("pendulum_com_x", "pendulum_com_y", "pendulum_com_z")):
model.body_ipos[pend_id, i] = p[key]
_patch_pendulum_inertia(model, pend_id, p)
# ── Joint dynamics ───────────────────────────────────────────
motor_dof = dof_ids["motor_joint"]
model.dof_armature[motor_dof] = p["motor_armature"]
pend_dof = dof_ids["pendulum_joint"]
model.dof_damping[pend_dof] = p["pendulum_damping"]
model.dof_frictionloss[pend_dof] = p["pendulum_frictionloss"]
def _make_actuator(params: dict[str, float]) -> ActuatorConfig:
"""Build an ActuatorConfig from a params dict (for fast-path use)."""
p = _resolve_params(params)
return ActuatorConfig(
joint="motor_joint",
type="motor",
gear=(p["actuator_gear_pos"], p["actuator_gear_neg"]),
ctrl_range=(-1.0, 1.0),
deadzone=(p["motor_deadzone_pos"], p["motor_deadzone_neg"]),
damping=(p["motor_damping_pos"], p["motor_damping_neg"]),
frictionloss=(p["motor_frictionloss_pos"], p["motor_frictionloss_neg"]),
stribeck_friction_boost=p["stribeck_friction_boost"],
stribeck_vel=p["stribeck_vel"],
action_bias=p["action_bias"],
filter_tau=p["actuator_filter_tau"],
)
# ── Simulation rollout ───────────────────────────────────────────────
@@ -346,30 +248,35 @@ def rollout(
actions: np.ndarray,
sim_dt: float = 0.002,
substeps: int = 10,
motor_params: dict[str, float] | None = None,
) -> dict[str, np.ndarray]:
"""Replay recorded actions in MuJoCo with overridden parameters.
Parameters
----------
robot_path : asset directory
params : named parameter overrides (all motor + pendulum params)
params : named parameter overrides (pendulum/arm only)
actions : (N,) normalised actions [-1, 1] from the recording
sim_dt : MuJoCo physics timestep
substeps : physics substeps per control step
motor_params : locked motor params (default: LOCKED_MOTOR_PARAMS)
Returns
-------
dict with keys: motor_angle, motor_vel, pendulum_angle, pendulum_vel
Each is an (N,) numpy array of simulated values.
"""
if motor_params is None:
motor_params = LOCKED_MOTOR_PARAMS
robot_path = Path(robot_path).resolve()
model, actuator = _build_model(robot_path, params)
model, actuator = _build_model(robot_path, params, motor_params)
model.opt.timestep = sim_dt
data = mujoco.MjData(model)
mujoco.mj_resetData(model, data)
n = len(actions)
p = _resolve_params(params)
ctrl_limit = p["ctrl_limit"]
ctrl_limit = params.get("ctrl_limit", 0.588)
sim_motor_angle = np.zeros(n, dtype=np.float64)
sim_motor_vel = np.zeros(n, dtype=np.float64)
@@ -408,27 +315,41 @@ def windowed_rollout(
window_duration: float = 0.5,
sim_dt: float = 0.002,
substeps: int = 10,
motor_params: dict[str, float] | None = None,
) -> dict[str, np.ndarray | float]:
"""Multiple-shooting rollout — split recording into short windows.
For each window:
1. Initialize MuJoCo state from the real qpos/qvel at the window start.
2. Replay the recorded actions within the window.
3. Record the simulated output.
Motor dynamics (asymmetric friction, damping, back-EMF, etc.) are
applied via qfrc_applied using the locked motor sysid parameters.
Parameters
----------
robot_path : asset directory
params : named parameter overrides (all motor + pendulum params)
params : named parameter overrides (pendulum/arm only)
recording : dict with keys time, action, motor_angle, motor_vel,
pendulum_angle, pendulum_vel (all 1D arrays of length N)
window_duration : length of each shooting window in seconds
sim_dt : MuJoCo physics timestep
substeps : physics substeps per control step
motor_params : locked motor params (default: LOCKED_MOTOR_PARAMS)
Returns
-------
dict with:
motor_angle, motor_vel, pendulum_angle, pendulum_vel — (N,) arrays
(stitched from per-window simulations)
n_windows — number of windows used
"""
if motor_params is None:
motor_params = LOCKED_MOTOR_PARAMS
robot_path = Path(robot_path).resolve()
model, actuator = _build_model(robot_path, params)
model, actuator = _build_model(robot_path, params, motor_params)
model.opt.timestep = sim_dt
data = mujoco.MjData(model)
@@ -457,8 +378,7 @@ def windowed_rollout(
window_starts.append(idx)
current_t += window_duration
p = _resolve_params(params)
ctrl_limit = p["ctrl_limit"]
ctrl_limit = params.get("ctrl_limit", 0.588)
n_windows = len(window_starts)
for w, w_start in enumerate(window_starts):

View File

@@ -52,6 +52,10 @@ class TrainerConfig:
history_length: int = 0 # 0 = disabled, >0 = temporal window size
embedding_dim: int = 32 # history encoder output dimension
# RMA (Rapid Motor Adaptation)
rma_mode: str = "none" # "none" | "teacher" | "deploy"
latent_dim: int = 8 # env encoder / adaptation latent dimension
# ── Video-recording trainer ──────────────────────────────────────────
@@ -177,10 +181,11 @@ class Trainer:
device=device,
)
# Determine raw obs dim (without history augmentation).
raw_obs_dim = 0
if self.config.history_length > 0:
raw_obs_dim = self.runner.env.observation_space.shape[0]
# Determine raw obs dim (without history/privileged augmentation).
raw_obs_dim = self.runner.env.observation_space.shape[0]
# Privileged dimension (μ) from the runner, if available.
mu_dim = getattr(self.runner, "privileged_dim", 0)
self.model = SharedMLP(
observation_space=obs_space,
@@ -193,6 +198,9 @@ class Trainer:
history_length=self.config.history_length,
raw_obs_dim=raw_obs_dim,
embedding_dim=self.config.embedding_dim,
rma_mode=self.config.rma_mode,
latent_dim=self.config.latent_dim,
mu_dim=mu_dim,
)
models = {"policy": self.model, "value": self.model}

View File

@@ -1,94 +0,0 @@
"""Unit tests for MuJoCoRunner domain randomization and history buffer."""
import dataclasses
import numpy as np
import pytest
import torch
from gymnasium import spaces
from src.runners.mujoco import DomainRandConfig, MuJoCoRunnerConfig
from src.models.mlp import SharedMLP, HistoryEncoder
class TestDomainRandConfig:
def test_default_all_zero(self) -> None:
cfg = DomainRandConfig()
assert cfg.mass_frac == 0.0
assert cfg.friction_frac == 0.0
assert cfg.gear_frac == 0.0
def test_from_dict(self) -> None:
d = {"mass_frac": 0.15, "gear_frac": 0.1}
cfg = DomainRandConfig(**d)
assert cfg.mass_frac == 0.15
assert cfg.gear_frac == 0.1
assert cfg.damping_frac == 0.0 # not set
class TestMuJoCoRunnerConfig:
def test_default_dr_disabled(self) -> None:
cfg = MuJoCoRunnerConfig()
assert isinstance(cfg.domain_rand, DomainRandConfig)
assert cfg.domain_rand.mass_frac == 0.0
def test_domain_rand_from_dict(self) -> None:
"""Hydra passes nested configs as dicts — test __post_init__ converts."""
cfg = MuJoCoRunnerConfig(
domain_rand={"mass_frac": 0.2, "friction_frac": 0.3}, # type: ignore[arg-type]
)
assert isinstance(cfg.domain_rand, DomainRandConfig)
assert cfg.domain_rand.mass_frac == 0.2
assert cfg.domain_rand.friction_frac == 0.3
def test_history_length_default(self) -> None:
cfg = MuJoCoRunnerConfig()
assert cfg.history_length == 0
class TestHistoryEncoder:
def test_output_shape(self) -> None:
enc = HistoryEncoder(history_length=10, step_dim=7, embedding_dim=32)
x = torch.randn(4, 10, 7) # batch=4, H=10, step_dim=7
out = enc(x)
assert out.shape == (4, 32)
def test_different_embedding_dim(self) -> None:
enc = HistoryEncoder(history_length=5, step_dim=7, embedding_dim=16)
x = torch.randn(2, 5, 7)
out = enc(x)
assert out.shape == (2, 16)
class TestSharedMLPWithHistory:
def test_no_history(self) -> None:
"""Without history, model works as before."""
obs_space = spaces.Box(low=-1.0, high=1.0, shape=(6,))
act_space = spaces.Box(low=-1.0, high=1.0, shape=(1,))
model = SharedMLP(obs_space, act_space, torch.device("cpu"),
hidden_sizes=(32, 32))
assert model.history_encoder is None
inp = {"states": torch.randn(4, 6)}
mean, log_std, _ = model.compute(inp, role="policy")
assert mean.shape == (4, 1)
def test_with_history(self) -> None:
"""With history, model splits obs and encodes history."""
raw_obs_dim = 6
act_dim = 1
H = 10
step_dim = raw_obs_dim + act_dim # 7
aug_dim = raw_obs_dim + H * step_dim # 6 + 70 = 76
obs_space = spaces.Box(low=-1.0, high=1.0, shape=(aug_dim,))
act_space = spaces.Box(low=-1.0, high=1.0, shape=(act_dim,))
model = SharedMLP(obs_space, act_space, torch.device("cpu"),
hidden_sizes=(32, 32),
history_length=H, raw_obs_dim=raw_obs_dim,
embedding_dim=32)
assert model.history_encoder is not None
inp = {"states": torch.randn(4, aug_dim)}
mean, log_std, _ = model.compute(inp, role="policy")
assert mean.shape == (4, act_dim)
value, _ = model.compute(inp, role="value")
assert value.shape == (4, 1)