clean up lot of stuff

This commit is contained in:
2026-03-22 15:49:13 +01:00
parent d3ed1c25ad
commit ca0e7b8b03
37 changed files with 3613 additions and 1223 deletions

View File

@@ -0,0 +1,5 @@
"""Motor-only system identification subpackage.
Identifies JGB37-520 DC motor dynamics (no pendulum, no limits)
so the MuJoCo simulation matches the real hardware response.
"""

364
src/sysid/motor/capture.py Normal file
View File

@@ -0,0 +1,364 @@
"""Capture a motor-only trajectory under random excitation (PRBS-style).
Connects to the ESP32 running the simplified sysid firmware (no pendulum,
no limits), sends random PWM commands, and records motor angle + velocity
at ~ 50 Hz.
Firmware serial protocol (115200 baud):
Commands: M<speed>\\n R\\n S\\n G\\n H\\n P\\n
State: S,<millis>,<encoder_count>,<rpm>,<applied_speed>,<enc_vel_cps>\\n
Usage:
python -m src.sysid.motor.capture --duration 20
python -m src.sysid.motor.capture --duration 30 --amplitude 200
"""
from __future__ import annotations
import argparse
import math
import random
import threading
import time
from datetime import datetime
from pathlib import Path
from typing import Any
import numpy as np
import structlog
import yaml
log = structlog.get_logger()
# ── Default asset path ───────────────────────────────────────────────
_DEFAULT_ASSET = "assets/motor"
# ── Serial protocol helpers ──────────────────────────────────────────
def _parse_state_line(line: str) -> dict[str, Any] | None:
"""Parse an ``S,…`` state line from the motor sysid firmware.
Format: S,<millis>,<encoder_count>,<rpm>,<applied_speed>,<enc_vel_cps>
"""
if not line.startswith("S,"):
return None
parts = line.split(",")
if len(parts) < 6:
return None
try:
return {
"timestamp_ms": int(parts[1]),
"encoder_count": int(parts[2]),
"rpm": float(parts[3]),
"applied_speed": int(parts[4]),
"enc_vel_cps": float(parts[5]),
}
except (ValueError, IndexError):
return None
# ── Background serial reader ─────────────────────────────────────────
class _SerialReader:
"""Minimal background reader for the ESP32 serial stream."""
def __init__(self, port: str, baud: int = 115200):
import serial as _serial
self._serial_mod = _serial
self.ser = _serial.Serial(port, baud, timeout=0.05)
time.sleep(2) # Wait for ESP32 boot
self.ser.reset_input_buffer()
self._latest: dict[str, Any] = {}
self._seq: int = 0
self._lock = threading.Lock()
self._cond = threading.Condition(self._lock)
self._running = True
self._thread = threading.Thread(target=self._reader_loop, daemon=True)
self._thread.start()
def _reader_loop(self) -> None:
while self._running:
try:
if self.ser.in_waiting:
line = (
self.ser.readline()
.decode("utf-8", errors="ignore")
.strip()
)
parsed = _parse_state_line(line)
if parsed is not None:
with self._cond:
self._latest = parsed
self._seq += 1
self._cond.notify_all()
elif line and not line.startswith("S,"):
# Log non-state lines (READY, PONG, WARN, etc.)
log.debug("serial_info", line=line)
else:
time.sleep(0.001)
except (OSError, self._serial_mod.SerialException):
log.critical("serial_lost")
break
def send(self, cmd: str) -> None:
try:
self.ser.write(f"{cmd}\n".encode())
except (OSError, self._serial_mod.SerialException):
log.critical("serial_send_failed", cmd=cmd)
def read_blocking(self, timeout: float = 0.1) -> dict[str, Any]:
"""Wait until a *new* state line arrives, then return it."""
with self._cond:
seq_before = self._seq
if not self._cond.wait_for(
lambda: self._seq > seq_before, timeout=timeout
):
return {}
return dict(self._latest)
def close(self) -> None:
self._running = False
self.send("H")
self.send("M0")
time.sleep(0.1)
self._thread.join(timeout=1.0)
self.ser.close()
# ── PRBS excitation signal ───────────────────────────────────────────
class _PRBSExcitation:
"""Random hold-value excitation with configurable amplitude and hold time."""
def __init__(
self,
amplitude: int = 200,
hold_min_ms: int = 50,
hold_max_ms: int = 400,
):
self.amplitude = amplitude
self.hold_min_ms = hold_min_ms
self.hold_max_ms = hold_max_ms
self._current: int = 0
self._switch_time: float = 0.0
self._new_value()
def _new_value(self) -> None:
self._current = random.randint(-self.amplitude, self.amplitude)
hold_ms = random.randint(self.hold_min_ms, self.hold_max_ms)
self._switch_time = time.monotonic() + hold_ms / 1000.0
def __call__(self) -> int:
if time.monotonic() >= self._switch_time:
self._new_value()
return self._current
# ── Main capture loop ────────────────────────────────────────────────
def capture(
asset_path: str | Path = _DEFAULT_ASSET,
port: str = "/dev/cu.usbserial-0001",
baud: int = 115200,
duration: float = 20.0,
amplitude: int = 200,
hold_min_ms: int = 50,
hold_max_ms: int = 400,
dt: float = 0.02,
) -> Path:
"""Run motor-only capture and return the path to the saved .npz file.
Stream-driven: blocks on each firmware state line (~50 Hz),
sends next motor command immediately, records both.
No time.sleep pacing — locked to firmware clock.
The recording stores:
- time: wall-clock seconds since start
- action: normalised action = applied_speed / 255
- motor_angle: shaft angle in radians (from encoder)
- motor_vel: shaft velocity in rad/s (from encoder velocity)
"""
asset_path = Path(asset_path).resolve()
# Load hardware config for encoder conversion.
hw_yaml = asset_path / "hardware.yaml"
if not hw_yaml.exists():
raise FileNotFoundError(f"hardware.yaml not found in {asset_path}")
raw_hw = yaml.safe_load(hw_yaml.read_text())
ppr = raw_hw.get("encoder", {}).get("ppr", 11)
gear_ratio = raw_hw.get("encoder", {}).get("gear_ratio", 30.0)
counts_per_rev: float = ppr * gear_ratio * 4.0
max_pwm = raw_hw.get("motor", {}).get("max_pwm", 255)
log.info(
"hardware_config",
ppr=ppr,
gear_ratio=gear_ratio,
counts_per_rev=counts_per_rev,
max_pwm=max_pwm,
)
# Connect.
reader = _SerialReader(port, baud)
excitation = _PRBSExcitation(amplitude, hold_min_ms, hold_max_ms)
# Prepare recording buffers.
max_samples = int(duration / dt) + 500
rec_time = np.zeros(max_samples, dtype=np.float64)
rec_action = np.zeros(max_samples, dtype=np.float64)
rec_motor_angle = np.zeros(max_samples, dtype=np.float64)
rec_motor_vel = np.zeros(max_samples, dtype=np.float64)
# Reset encoder to zero.
reader.send("R")
time.sleep(0.1)
# Start streaming.
reader.send("G")
time.sleep(0.1)
log.info(
"capture_starting",
port=port,
duration=duration,
amplitude=amplitude,
hold_range_ms=f"{hold_min_ms}{hold_max_ms}",
)
idx = 0
pwm = 0
last_esp_ms = -1
t0 = time.monotonic()
try:
while True:
state = reader.read_blocking(timeout=0.1)
if not state:
continue
# Deduplicate by firmware timestamp.
esp_ms = state.get("timestamp_ms", 0)
if esp_ms == last_esp_ms:
continue
last_esp_ms = esp_ms
elapsed = time.monotonic() - t0
if elapsed >= duration:
break
# Generate next excitation PWM.
pwm = excitation()
# Send command.
reader.send(f"M{pwm}")
# Convert encoder to angle/velocity.
enc = state.get("encoder_count", 0)
motor_angle = enc / counts_per_rev * 2.0 * math.pi
motor_vel = (
state.get("enc_vel_cps", 0.0) / counts_per_rev * 2.0 * math.pi
)
# Use firmware-applied speed for the action.
applied = state.get("applied_speed", 0)
action_norm = applied / 255.0
if idx < max_samples:
rec_time[idx] = elapsed
rec_action[idx] = action_norm
rec_motor_angle[idx] = motor_angle
rec_motor_vel[idx] = motor_vel
idx += 1
else:
break
if idx % 50 == 0:
log.info(
"capture_progress",
elapsed=f"{elapsed:.1f}/{duration:.0f}s",
samples=idx,
pwm=pwm,
angle_deg=f"{math.degrees(motor_angle):.1f}",
vel_rps=f"{motor_vel / (2 * math.pi):.1f}",
)
finally:
reader.send("M0")
reader.close()
# Trim.
rec_time = rec_time[:idx]
rec_action = rec_action[:idx]
rec_motor_angle = rec_motor_angle[:idx]
rec_motor_vel = rec_motor_vel[:idx]
# Save.
recordings_dir = asset_path / "recordings"
recordings_dir.mkdir(exist_ok=True)
stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
out_path = recordings_dir / f"motor_capture_{stamp}.npz"
np.savez_compressed(
out_path,
time=rec_time,
action=rec_action,
motor_angle=rec_motor_angle,
motor_vel=rec_motor_vel,
)
log.info(
"capture_saved",
path=str(out_path),
samples=idx,
duration_actual=f"{rec_time[-1]:.2f}s" if idx > 0 else "0s",
)
return out_path
# ── CLI ──────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(
description="Capture motor-only trajectory for system identification."
)
parser.add_argument(
"--asset-path", type=str, default=_DEFAULT_ASSET,
help="Path to motor asset directory (contains hardware.yaml)",
)
parser.add_argument(
"--port", type=str, default="/dev/cu.usbserial-0001",
help="Serial port for ESP32",
)
parser.add_argument("--baud", type=int, default=115200)
parser.add_argument("--duration", type=float, default=20.0, help="Capture duration (s)")
parser.add_argument(
"--amplitude", type=int, default=200,
help="Max PWM magnitude (0255, firmware allows full range)",
)
parser.add_argument("--hold-min-ms", type=int, default=50, help="PRBS min hold (ms)")
parser.add_argument("--hold-max-ms", type=int, default=400, help="PRBS max hold (ms)")
parser.add_argument("--dt", type=float, default=0.02, help="Nominal sample period (s)")
args = parser.parse_args()
capture(
asset_path=args.asset_path,
port=args.port,
baud=args.baud,
duration=args.duration,
amplitude=args.amplitude,
hold_min_ms=args.hold_min_ms,
hold_max_ms=args.hold_max_ms,
dt=args.dt,
)
if __name__ == "__main__":
main()

186
src/sysid/motor/export.py Normal file
View File

@@ -0,0 +1,186 @@
"""Export tuned motor parameters to MJCF and robot.yaml files.
Reads the original motor.xml and robot.yaml, patches with optimised
parameter values, and writes motor_tuned.xml + robot_tuned.yaml.
Usage:
python -m src.sysid.motor.export \
--asset-path assets/motor \
--result assets/motor/motor_sysid_result.json
"""
from __future__ import annotations
import argparse
import copy
import json
import xml.etree.ElementTree as ET
from pathlib import Path
import structlog
import yaml
log = structlog.get_logger()
_DEFAULT_ASSET = "assets/motor"
def export_tuned_files(
asset_path: str | Path,
params: dict[str, float],
) -> tuple[Path, Path]:
"""Write tuned MJCF and robot.yaml files.
Returns (tuned_mjcf_path, tuned_robot_yaml_path).
"""
asset_path = Path(asset_path).resolve()
robot_yaml_path = asset_path / "robot.yaml"
robot_cfg = yaml.safe_load(robot_yaml_path.read_text())
mjcf_path = asset_path / robot_cfg["mjcf"]
# ── Tune MJCF ────────────────────────────────────────────────
tree = ET.parse(str(mjcf_path))
root = tree.getroot()
# Actuator — use average gear for the MJCF model.
gear_pos = params.get("actuator_gear_pos", params.get("actuator_gear"))
gear_neg = params.get("actuator_gear_neg", params.get("actuator_gear"))
gear_avg = None
if gear_pos is not None and gear_neg is not None:
gear_avg = (gear_pos + gear_neg) / 2.0
elif gear_pos is not None:
gear_avg = gear_pos
filter_tau = params.get("actuator_filter_tau")
for act_el in root.iter("general"):
if act_el.get("name") == "motor":
if gear_avg is not None:
act_el.set("gear", str(gear_avg))
if filter_tau is not None:
if filter_tau > 0:
act_el.set("dyntype", "filter")
act_el.set("dynprm", str(filter_tau))
else:
act_el.set("dyntype", "none")
# Joint — average damping & friction for MJCF (asymmetry in runtime).
fl_pos = params.get("motor_frictionloss_pos", params.get("motor_frictionloss"))
fl_neg = params.get("motor_frictionloss_neg", params.get("motor_frictionloss"))
fl_avg = None
if fl_pos is not None and fl_neg is not None:
fl_avg = (fl_pos + fl_neg) / 2.0
elif fl_pos is not None:
fl_avg = fl_pos
damp_pos = params.get("motor_damping_pos", params.get("motor_damping"))
damp_neg = params.get("motor_damping_neg", params.get("motor_damping"))
damp_avg = None
if damp_pos is not None and damp_neg is not None:
damp_avg = (damp_pos + damp_neg) / 2.0
elif damp_pos is not None:
damp_avg = damp_pos
for jnt in root.iter("joint"):
if jnt.get("name") == "motor_joint":
if damp_avg is not None:
jnt.set("damping", str(damp_avg))
if "motor_armature" in params:
jnt.set("armature", str(params["motor_armature"]))
if fl_avg is not None:
jnt.set("frictionloss", str(fl_avg))
# Rotor mass.
if "rotor_mass" in params:
for geom in root.iter("geom"):
if geom.get("name") == "rotor_disk":
geom.set("mass", str(params["rotor_mass"]))
# Write tuned MJCF.
tuned_mjcf_name = mjcf_path.stem + "_tuned" + mjcf_path.suffix
tuned_mjcf_path = asset_path / tuned_mjcf_name
ET.indent(tree, space=" ")
tree.write(str(tuned_mjcf_path), xml_declaration=True, encoding="unicode")
log.info("tuned_mjcf_written", path=str(tuned_mjcf_path))
# ── Tune robot.yaml ──────────────────────────────────────────
tuned_cfg = copy.deepcopy(robot_cfg)
tuned_cfg["mjcf"] = tuned_mjcf_name
if tuned_cfg.get("actuators") and len(tuned_cfg["actuators"]) > 0:
act = tuned_cfg["actuators"][0]
if gear_avg is not None:
act["gear"] = round(gear_avg, 6)
if "actuator_filter_tau" in params:
act["filter_tau"] = round(params["actuator_filter_tau"], 6)
if "motor_damping" in params:
act["damping"] = round(params["motor_damping"], 6)
if "joints" not in tuned_cfg:
tuned_cfg["joints"] = {}
if "motor_joint" not in tuned_cfg["joints"]:
tuned_cfg["joints"]["motor_joint"] = {}
mj = tuned_cfg["joints"]["motor_joint"]
if "motor_armature" in params:
mj["armature"] = round(params["motor_armature"], 6)
if fl_avg is not None:
mj["frictionloss"] = round(fl_avg, 6)
# Asymmetric / hardware-realism / nonlinear parameters.
realism = {}
for key in [
"actuator_gear_pos", "actuator_gear_neg",
"motor_damping_pos", "motor_damping_neg",
"motor_frictionloss_pos", "motor_frictionloss_neg",
"motor_deadzone_pos", "motor_deadzone_neg",
"action_bias",
"viscous_quadratic", "back_emf_gain",
"stribeck_friction_boost", "stribeck_vel",
"gearbox_backlash",
]:
if key in params:
realism[key] = round(params[key], 6)
if realism:
tuned_cfg["hardware_realism"] = realism
tuned_yaml_path = asset_path / "robot_tuned.yaml"
header = (
"# Tuned motor config — generated by src.sysid.motor.optimize\n"
"# Original: robot.yaml\n\n"
)
tuned_yaml_path.write_text(
header + yaml.dump(tuned_cfg, default_flow_style=False, sort_keys=False)
)
log.info("tuned_robot_yaml_written", path=str(tuned_yaml_path))
return tuned_mjcf_path, tuned_yaml_path
# ── CLI ──────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(
description="Export tuned motor parameters to MJCF + robot.yaml."
)
parser.add_argument("--asset-path", type=str, default=_DEFAULT_ASSET)
parser.add_argument(
"--result", type=str, default=None,
help="Path to motor_sysid_result.json (auto-detected if omitted)",
)
args = parser.parse_args()
asset_path = Path(args.asset_path).resolve()
if args.result:
result_path = Path(args.result)
else:
result_path = asset_path / "motor_sysid_result.json"
if not result_path.exists():
raise FileNotFoundError(f"Result file not found: {result_path}")
result = json.loads(result_path.read_text())
params = result["best_params"]
export_tuned_files(asset_path=args.asset_path, params=params)
if __name__ == "__main__":
main()

367
src/sysid/motor/optimize.py Normal file
View File

@@ -0,0 +1,367 @@
"""CMA-ES optimiser — fit motor simulation parameters to a real recording.
Motor-only version: minimises trajectory-matching cost between MuJoCo
rollout and recorded motor angle + velocity.
Usage:
python -m src.sysid.motor.optimize \
--recording assets/motor/recordings/motor_capture_YYYYMMDD_HHMMSS.npz
# Quick test:
python -m src.sysid.motor.optimize --recording <file>.npz \
--max-generations 20 --population-size 10
"""
from __future__ import annotations
import argparse
import json
import time
from datetime import datetime
from pathlib import Path
import numpy as np
import structlog
from src.sysid.motor.preprocess import recompute_velocity
from src.sysid.motor.rollout import (
MOTOR_PARAMS,
ParamSpec,
bounds_arrays,
defaults_vector,
params_to_dict,
rollout,
windowed_rollout,
)
log = structlog.get_logger()
_DEFAULT_ASSET = "assets/motor"
# ── Cost function ────────────────────────────────────────────────────
def _compute_trajectory_cost(
sim: dict[str, np.ndarray],
recording: dict[str, np.ndarray],
pos_weight: float = 1.0,
vel_weight: float = 0.5,
acc_weight: float = 0.0,
dt: float = 0.02,
) -> float:
"""Weighted MSE between simulated and real motor trajectories.
Motor-only: angle, velocity, and optionally acceleration.
Acceleration is computed as finite-difference of velocity.
"""
angle_err = sim["motor_angle"] - recording["motor_angle"]
vel_err = sim["motor_vel"] - recording["motor_vel"]
# Reject NaN results (unstable simulation).
if np.any(~np.isfinite(angle_err)) or np.any(~np.isfinite(vel_err)):
return 1e6
cost = float(
pos_weight * np.mean(angle_err**2)
+ vel_weight * np.mean(vel_err**2)
)
# Optional acceleration term — penalises wrong dynamics (d(vel)/dt).
if acc_weight > 0 and len(vel_err) > 2:
sim_acc = np.diff(sim["motor_vel"]) / dt
real_acc = np.diff(recording["motor_vel"]) / dt
acc_err = sim_acc - real_acc
if np.any(~np.isfinite(acc_err)):
return 1e6
# Normalise by typical acceleration scale (~50 rad/s²) to keep
# the weight intuitive relative to vel/pos terms.
cost += acc_weight * np.mean(acc_err**2) / (50.0**2)
return cost
def cost_function(
params_vec: np.ndarray,
recording: dict[str, np.ndarray],
asset_path: Path,
specs: list[ParamSpec],
sim_dt: float = 0.002,
substeps: int = 10,
pos_weight: float = 1.0,
vel_weight: float = 0.5,
acc_weight: float = 0.0,
window_duration: float = 0.5,
) -> float:
"""Compute trajectory-matching cost for a candidate parameter vector.
Uses multiple-shooting (windowed rollout) by default.
"""
params = params_to_dict(params_vec, specs)
try:
if window_duration > 0:
sim = windowed_rollout(
asset_path=asset_path,
params=params,
recording=recording,
window_duration=window_duration,
sim_dt=sim_dt,
substeps=substeps,
)
else:
sim = rollout(
asset_path=asset_path,
params=params,
actions=recording["action"],
sim_dt=sim_dt,
substeps=substeps,
)
except Exception as exc:
log.warning("rollout_failed", error=str(exc))
return 1e6
return _compute_trajectory_cost(
sim, recording, pos_weight, vel_weight, acc_weight,
dt=np.mean(np.diff(recording["time"])) if len(recording["time"]) > 1 else 0.02,
)
# ── CMA-ES optimisation loop ────────────────────────────────────────
def optimize(
asset_path: str | Path = _DEFAULT_ASSET,
recording_path: str | Path = "",
specs: list[ParamSpec] | None = None,
sigma0: float = 0.3,
population_size: int = 30,
max_generations: int = 300,
sim_dt: float = 0.002,
substeps: int = 10,
pos_weight: float = 1.0,
vel_weight: float = 0.5,
acc_weight: float = 0.0,
window_duration: float = 0.5,
seed: int = 42,
preprocess_vel: bool = True,
sg_window: int = 7,
sg_polyorder: int = 3,
) -> dict:
"""Run CMA-ES optimisation and return results dict."""
from cmaes import CMA
asset_path = Path(asset_path).resolve()
recording_path = Path(recording_path).resolve()
if specs is None:
specs = MOTOR_PARAMS
# Load recording.
recording = dict(np.load(recording_path))
n_samples = len(recording["time"])
duration = recording["time"][-1] - recording["time"][0]
n_windows = max(1, int(duration / window_duration)) if window_duration > 0 else 1
log.info(
"recording_loaded",
path=str(recording_path),
samples=n_samples,
duration=f"{duration:.1f}s",
n_windows=n_windows,
)
# Preprocess velocity: replace noisy firmware finite-difference with
# smooth Savitzky-Golay derivative of the angle signal.
if preprocess_vel:
recording = recompute_velocity(
recording,
window_length=sg_window,
polyorder=sg_polyorder,
)
# Normalise to [0, 1] for CMA-ES.
lo, hi = bounds_arrays(specs)
x0 = defaults_vector(specs)
span = hi - lo
span[span == 0] = 1.0
def to_normed(x: np.ndarray) -> np.ndarray:
return (x - lo) / span
def from_normed(x_n: np.ndarray) -> np.ndarray:
return x_n * span + lo
x0_normed = to_normed(x0)
bounds_normed = np.column_stack(
[np.zeros(len(specs)), np.ones(len(specs))]
)
optimizer = CMA(
mean=x0_normed,
sigma=sigma0,
bounds=bounds_normed,
population_size=population_size,
seed=seed,
)
best_cost = float("inf")
best_params_vec = x0.copy()
history: list[tuple[int, float]] = []
log.info(
"cmaes_starting",
n_params=len(specs),
population=population_size,
max_gens=max_generations,
sigma0=sigma0,
)
t0 = time.monotonic()
for gen in range(max_generations):
solutions = []
for _ in range(optimizer.population_size):
x_normed = optimizer.ask()
x_natural = from_normed(x_normed)
x_natural = np.clip(x_natural, lo, hi)
c = cost_function(
x_natural,
recording,
asset_path,
specs,
sim_dt=sim_dt,
substeps=substeps,
pos_weight=pos_weight,
vel_weight=vel_weight,
acc_weight=acc_weight,
window_duration=window_duration,
)
solutions.append((x_normed, c))
if c < best_cost:
best_cost = c
best_params_vec = x_natural.copy()
optimizer.tell(solutions)
history.append((gen, best_cost))
elapsed = time.monotonic() - t0
if gen % 5 == 0 or gen == max_generations - 1:
log.info(
"cmaes_generation",
gen=gen,
best_cost=f"{best_cost:.6f}",
elapsed=f"{elapsed:.1f}s",
gen_best=f"{min(c for _, c in solutions):.6f}",
)
total_time = time.monotonic() - t0
best_params = params_to_dict(best_params_vec, specs)
log.info(
"cmaes_finished",
best_cost=f"{best_cost:.6f}",
total_time=f"{total_time:.1f}s",
evaluations=max_generations * population_size,
)
# Log parameter comparison.
defaults = params_to_dict(defaults_vector(specs), specs)
for name in best_params:
d = defaults[name]
b = best_params[name]
change_pct = ((b - d) / abs(d) * 100) if abs(d) > 1e-12 else 0.0
log.info(
"param_result",
name=name,
default=f"{d:.6g}",
tuned=f"{b:.6g}",
change=f"{change_pct:+.1f}%",
)
return {
"best_params": best_params,
"best_cost": best_cost,
"history": history,
"recording": str(recording_path),
"param_names": [s.name for s in specs],
"defaults": {s.name: s.default for s in specs},
"timestamp": datetime.now().isoformat(),
}
# ── CLI ──────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(
description="Fit motor simulation parameters to a real recording (CMA-ES)."
)
parser.add_argument(
"--asset-path", type=str, default=_DEFAULT_ASSET,
help="Path to motor asset directory",
)
parser.add_argument(
"--recording", type=str, required=True,
help="Path to .npz recording file",
)
parser.add_argument("--sigma0", type=float, default=0.3)
parser.add_argument("--population-size", type=int, default=30)
parser.add_argument("--max-generations", type=int, default=300)
parser.add_argument("--sim-dt", type=float, default=0.002)
parser.add_argument("--substeps", type=int, default=10)
parser.add_argument("--pos-weight", type=float, default=1.0)
parser.add_argument("--vel-weight", type=float, default=0.5)
parser.add_argument("--acc-weight", type=float, default=0.0,
help="Weight for acceleration matching (0=off, try 0.1-0.5)")
parser.add_argument("--window-duration", type=float, default=0.5)
parser.add_argument("--seed", type=int, default=42)
parser.add_argument(
"--no-preprocess-vel", action="store_true",
help="Disable Savitzky-Golay velocity preprocessing",
)
parser.add_argument("--sg-window", type=int, default=7,
help="Savitzky-Golay window length (odd, default 7 = 140ms)")
parser.add_argument("--sg-polyorder", type=int, default=3,
help="Savitzky-Golay polynomial order (default 3 = cubic)")
args = parser.parse_args()
result = optimize(
asset_path=args.asset_path,
recording_path=args.recording,
sigma0=args.sigma0,
population_size=args.population_size,
max_generations=args.max_generations,
sim_dt=args.sim_dt,
substeps=args.substeps,
pos_weight=args.pos_weight,
vel_weight=args.vel_weight,
acc_weight=args.acc_weight,
window_duration=args.window_duration,
seed=args.seed,
preprocess_vel=not args.no_preprocess_vel,
sg_window=args.sg_window,
sg_polyorder=args.sg_polyorder,
)
# Save results JSON.
asset_path = Path(args.asset_path).resolve()
result_path = asset_path / "motor_sysid_result.json"
result_json = {k: v for k, v in result.items() if k != "history"}
result_json["history_summary"] = {
"first_cost": result["history"][0][1] if result["history"] else None,
"final_cost": result["history"][-1][1] if result["history"] else None,
"generations": len(result["history"]),
}
result_path.write_text(json.dumps(result_json, indent=2, default=str))
log.info("results_saved", path=str(result_path))
# Export tuned files.
from src.sysid.motor.export import export_tuned_files
export_tuned_files(asset_path=args.asset_path, params=result["best_params"])
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,114 @@
"""Recording preprocessing — clean velocity estimation from angle data.
The ESP32 firmware computes velocity as a raw finite-difference of encoder
counts at 50 Hz. With a 1320 CPR encoder that gives ~0.24 rad/s of
quantisation noise per count. This module replaces the noisy firmware
velocity with a smooth differentiation of the (much cleaner) angle signal.
Method: Savitzky-Golay filter applied to the angle signal, then
differentiated analytically. Zero phase lag, preserves transients well.
This is standard practice in robotics sysid — see e.g. MATLAB System ID
Toolbox, Drake's trajectory processing, or ETH's ANYmal sysid pipeline.
"""
from __future__ import annotations
import numpy as np
from scipy.signal import savgol_filter
import structlog
log = structlog.get_logger()
def recompute_velocity(
recording: dict[str, np.ndarray],
window_length: int = 7,
polyorder: int = 3,
deriv: int = 1,
) -> dict[str, np.ndarray]:
"""Recompute motor_vel from motor_angle using Savitzky-Golay differentiation.
Parameters
----------
recording : dict with at least 'time', 'motor_angle', 'motor_vel' keys.
window_length : SG filter window (must be odd, > polyorder).
7 samples at 50 Hz = 140ms window — good balance of smoothness
and responsiveness. Captures dynamics up to ~7 Hz.
polyorder : Polynomial order for the SG filter (3 = cubic).
deriv : Derivative order (1 = first derivative = velocity).
Returns
-------
New recording dict with 'motor_vel' replaced and 'motor_vel_raw' added.
"""
rec = dict(recording) # shallow copy
times = rec["time"]
angles = rec["motor_angle"]
dt = np.mean(np.diff(times))
# Keep original for diagnostics.
rec["motor_vel_raw"] = rec["motor_vel"].copy()
# Savitzky-Golay derivative: fits a polynomial to each window,
# then takes the analytical derivative → smooth, zero phase lag.
vel_sg = savgol_filter(
angles,
window_length=window_length,
polyorder=polyorder,
deriv=deriv,
delta=dt,
)
# Compute stats for logging.
raw_vel = rec["motor_vel_raw"]
noise_estimate = np.std(raw_vel - vel_sg)
max_diff = np.max(np.abs(raw_vel - vel_sg))
log.info(
"velocity_recomputed",
method="savgol",
window=window_length,
polyorder=polyorder,
dt=f"{dt*1000:.1f}ms",
noise_std=f"{noise_estimate:.3f} rad/s",
max_diff=f"{max_diff:.3f} rad/s",
raw_rms=f"{np.sqrt(np.mean(raw_vel**2)):.3f}",
sg_rms=f"{np.sqrt(np.mean(vel_sg**2)):.3f}",
)
rec["motor_vel"] = vel_sg
return rec
def smooth_velocity(
recording: dict[str, np.ndarray],
cutoff_hz: float = 10.0,
) -> dict[str, np.ndarray]:
"""Alternative: apply zero-phase Butterworth low-pass to motor_vel.
Simpler than SG derivative but introduces slight edge effects.
"""
from scipy.signal import butter, filtfilt
rec = dict(recording)
rec["motor_vel_raw"] = rec["motor_vel"].copy()
dt = np.mean(np.diff(rec["time"]))
fs = 1.0 / dt
nyq = fs / 2.0
norm_cutoff = min(cutoff_hz / nyq, 0.99)
b, a = butter(2, norm_cutoff, btype="low")
rec["motor_vel"] = filtfilt(b, a, rec["motor_vel"])
log.info(
"velocity_smoothed",
method="butterworth",
cutoff_hz=cutoff_hz,
fs=fs,
)
return rec

460
src/sysid/motor/rollout.py Normal file
View File

@@ -0,0 +1,460 @@
"""Deterministic simulation replay — roll out recorded actions in MuJoCo.
Motor-only version: single hinge joint, no pendulum.
Given a parameter vector and recorded actions, builds a MuJoCo model
with overridden dynamics, replays the actions, and returns the simulated
motor angle + velocity for comparison with the real recording.
"""
from __future__ import annotations
import dataclasses
import xml.etree.ElementTree as ET
from pathlib import Path
import mujoco
import numpy as np
import yaml
# ── Tunable parameter specification ──────────────────────────────────
@dataclasses.dataclass
class ParamSpec:
"""Specification for a single tunable parameter."""
name: str
default: float
lower: float
upper: float
log_scale: bool = False
# Motor-only parameters to identify.
# These capture the full transfer function: PWM → shaft angle/velocity.
#
# Asymmetric parameters (pos/neg suffix) capture real-world differences
# between CW and CCW rotation caused by gear mesh, brush contact,
# and H-bridge asymmetry.
MOTOR_PARAMS: list[ParamSpec] = [
# ── Actuator ─────────────────────────────────────────────────
# gear_pos/neg: effective torque per unit ctrl, split by direction.
# Real motors + L298N often have different drive strength per direction.
ParamSpec("actuator_gear_pos", 0.064, 0.005, 0.5, log_scale=True),
ParamSpec("actuator_gear_neg", 0.064, 0.005, 0.5, log_scale=True),
# filter_tau: first-order electrical/driver time constant (s).
# Lower bound 1ms — L298N PWM switching is very fast.
ParamSpec("actuator_filter_tau", 0.03, 0.001, 0.20),
# ── Joint dynamics ───────────────────────────────────────────
# damping_pos/neg: viscous friction (back-EMF), split by direction.
ParamSpec("motor_damping_pos", 0.003, 1e-5, 0.1, log_scale=True),
ParamSpec("motor_damping_neg", 0.003, 1e-5, 0.1, log_scale=True),
# armature: reflected rotor inertia (kg·m²).
ParamSpec("motor_armature", 0.0001, 1e-6, 0.01, log_scale=True),
# frictionloss_pos/neg: Coulomb friction, split by velocity direction.
ParamSpec("motor_frictionloss_pos", 0.03, 0.001, 0.2, log_scale=True),
ParamSpec("motor_frictionloss_neg", 0.03, 0.001, 0.2, log_scale=True),
# ── Nonlinear dynamics ───────────────────────────────────────
# viscous_quadratic: velocity-squared drag term (N·m·s²/rad²).
# Captures nonlinear friction that increases at high speed (air drag,
# grease viscosity, etc.). Opposes motion.
ParamSpec("viscous_quadratic", 0.0, 0.0, 0.005),
# back_emf_gain: torque reduction proportional to |vel × ctrl|.
# Models the back-EMF effect: at high speed the motor produces less
# torque because the voltage drop across the armature is smaller.
ParamSpec("back_emf_gain", 0.0, 0.0, 0.05),
# stribeck_vel: characteristic velocity below which Coulomb friction
# is boosted (Stribeck effect). 0 = standard Coulomb only.
ParamSpec("stribeck_friction_boost", 0.0, 0.0, 0.15),
ParamSpec("stribeck_vel", 2.0, 0.1, 8.0),
# ── Rotor load ───────────────────────────────────────────────
ParamSpec("rotor_mass", 0.012, 0.002, 0.05, log_scale=True),
# ── Hardware realism ─────────────────────────────────────────
# deadzone_pos/neg: minimum |action| per direction.
ParamSpec("motor_deadzone_pos", 0.08, 0.0, 0.30),
ParamSpec("motor_deadzone_neg", 0.08, 0.0, 0.30),
# action_bias: constant offset added to ctrl (H-bridge asymmetry).
ParamSpec("action_bias", 0.0, -0.10, 0.10),
# ── Gearbox backlash ─────────────────────────────────────────
# backlash_halfwidth: half the angular deadband (rad) in the gearbox.
# When the motor reverses, the shaft doesn't move until the backlash
# gap is taken up. Typical for 30:1 plastic/metal spur gears.
ParamSpec("gearbox_backlash", 0.0, 0.0, 0.15),
]
def params_to_dict(
values: np.ndarray, specs: list[ParamSpec] | None = None
) -> dict[str, float]:
if specs is None:
specs = MOTOR_PARAMS
return {s.name: float(values[i]) for i, s in enumerate(specs)}
def defaults_vector(specs: list[ParamSpec] | None = None) -> np.ndarray:
if specs is None:
specs = MOTOR_PARAMS
return np.array([s.default for s in specs], dtype=np.float64)
def bounds_arrays(
specs: list[ParamSpec] | None = None,
) -> tuple[np.ndarray, np.ndarray]:
if specs is None:
specs = MOTOR_PARAMS
lo = np.array([s.lower for s in specs], dtype=np.float64)
hi = np.array([s.upper for s in specs], dtype=np.float64)
return lo, hi
# ── MuJoCo model building with parameter overrides ──────────────────
def _build_model(
asset_path: Path,
params: dict[str, float],
) -> mujoco.MjModel:
"""Build a MuJoCo model from motor.xml with parameter overrides.
Parses the MJCF, patches actuator/joint/body parameters, reloads.
"""
asset_path = Path(asset_path).resolve()
robot_cfg = yaml.safe_load((asset_path / "robot.yaml").read_text())
mjcf_path = asset_path / robot_cfg["mjcf"]
tree = ET.parse(str(mjcf_path))
root = tree.getroot()
# ── Actuator overrides ───────────────────────────────────────
# Use average of pos/neg gear for MuJoCo (asymmetry handled in ctrl).
gear_pos = params.get("actuator_gear_pos", params.get("actuator_gear", 0.064))
gear_neg = params.get("actuator_gear_neg", params.get("actuator_gear", 0.064))
gear = (gear_pos + gear_neg) / 2.0
filter_tau = params.get("actuator_filter_tau", 0.03)
for act_el in root.iter("general"):
if act_el.get("name") == "motor":
act_el.set("gear", str(gear))
if filter_tau > 0:
act_el.set("dyntype", "filter")
act_el.set("dynprm", str(filter_tau))
else:
act_el.set("dyntype", "none")
act_el.set("dynprm", "1")
# ── Joint overrides ──────────────────────────────────────────
# Damping and friction are asymmetric + nonlinear → applied manually.
# Set MuJoCo damping & frictionloss to 0; we handle them in qfrc_applied.
armature = params.get("motor_armature", 0.0001)
for jnt in root.iter("joint"):
if jnt.get("name") == "motor_joint":
jnt.set("damping", "0")
jnt.set("armature", str(armature))
jnt.set("frictionloss", "0")
# ── Rotor mass override ──────────────────────────────────────
rotor_mass = params.get("rotor_mass", 0.012)
for geom in root.iter("geom"):
if geom.get("name") == "rotor_disk":
geom.set("mass", str(rotor_mass))
# Write temp file and load.
tmp_path = asset_path / "_tmp_motor_sysid.xml"
try:
tree.write(str(tmp_path), xml_declaration=True, encoding="unicode")
model = mujoco.MjModel.from_xml_path(str(tmp_path))
finally:
tmp_path.unlink(missing_ok=True)
return model
# ── Action + asymmetry transforms ────────────────────────────────────
def _transform_action(
action: float,
params: dict[str, float],
) -> float:
"""Apply bias, direction-dependent deadzone, and gear scaling.
The MuJoCo actuator has the *average* gear ratio. We rescale the
control signal so that ``ctrl × gear_avg ≈ action × gear_dir``.
"""
# Constant bias (H-bridge asymmetry).
action = action + params.get("action_bias", 0.0)
# Direction-dependent deadzone.
dz_pos = params.get("motor_deadzone_pos", params.get("motor_deadzone", 0.08))
dz_neg = params.get("motor_deadzone_neg", params.get("motor_deadzone", 0.08))
if action >= 0 and action < dz_pos:
return 0.0
if action < 0 and action > -dz_neg:
return 0.0
# Direction-dependent gear scaling.
# MuJoCo model uses gear_avg; we rescale ctrl to get the right torque.
gear_pos = params.get("actuator_gear_pos", params.get("actuator_gear", 0.064))
gear_neg = params.get("actuator_gear_neg", params.get("actuator_gear", 0.064))
gear_avg = (gear_pos + gear_neg) / 2.0
if gear_avg < 1e-8:
return 0.0
gear_dir = gear_pos if action >= 0 else gear_neg
return action * (gear_dir / gear_avg)
def _apply_forces(
data: mujoco.MjData,
vel: float,
ctrl: float,
params: dict[str, float],
backlash_state: list[float] | None = None,
) -> None:
"""Apply all manual forces: asymmetric friction, damping, and nonlinear terms.
Everything that MuJoCo can't represent with its symmetric joint model
is injected here via ``qfrc_applied``.
Forces applied (all oppose motion or reduce torque):
1. Asymmetric Coulomb friction (with Stribeck boost at low speed)
2. Asymmetric viscous damping
3. Quadratic velocity drag
4. Back-EMF torque reduction (proportional to |vel|)
Backlash:
If backlash_state is provided, it is a 1-element list [gap_pos].
gap_pos tracks the motor's position within the backlash deadband.
When inside the gap, no actuator torque is transmitted to the
output shaft — only friction forces act.
"""
torque = 0.0
# ── Gearbox backlash ──────────────────────────────────────────
# Model: the gear teeth have play of 2×halfwidth radians.
# We track where the motor is within that gap. When at the
# edge (contact), actuator torque passes through normally.
# When inside the gap, no actuator torque is transmitted.
backlash_hw = params.get("gearbox_backlash", 0.0)
actuator_torque_scale = 1.0 # 1.0 = full contact, 0.0 = in gap
if backlash_hw > 0 and backlash_state is not None:
# gap_pos: how far into the backlash gap we are.
# Range: [-backlash_hw, +backlash_hw]
# At ±backlash_hw, gears are in contact and torque transmits.
gap = backlash_state[0]
# Update gap position based on velocity.
dt_sub = data.model.opt.timestep
gap += vel * dt_sub
# Clamp to backlash range.
if gap > backlash_hw:
gap = backlash_hw
elif gap < -backlash_hw:
gap = -backlash_hw
backlash_state[0] = gap
# If not at contact edge, no torque transmitted.
if abs(gap) < backlash_hw - 1e-8:
actuator_torque_scale = 0.0
else:
actuator_torque_scale = 1.0
# ── 1. Coulomb friction (direction-dependent + Stribeck) ─────
fl_pos = params.get("motor_frictionloss_pos", params.get("motor_frictionloss", 0.03))
fl_neg = params.get("motor_frictionloss_neg", params.get("motor_frictionloss", 0.03))
stribeck_boost = params.get("stribeck_friction_boost", 0.0)
stribeck_vel = params.get("stribeck_vel", 2.0)
if abs(vel) > 1e-6:
fl = fl_pos if vel > 0 else fl_neg
# Stribeck: boost friction at low speed. Exponential decay.
if stribeck_boost > 0 and stribeck_vel > 0:
fl = fl * (1.0 + stribeck_boost * np.exp(-abs(vel) / stribeck_vel))
# Coulomb: constant magnitude, opposes motion.
torque -= np.sign(vel) * fl
# ── 2. Asymmetric viscous damping ────────────────────────────
damp_pos = params.get("motor_damping_pos", params.get("motor_damping", 0.003))
damp_neg = params.get("motor_damping_neg", params.get("motor_damping", 0.003))
damp = damp_pos if vel > 0 else damp_neg
torque -= damp * vel
# ── 3. Quadratic velocity drag ───────────────────────────────
visc_quad = params.get("viscous_quadratic", 0.0)
if visc_quad > 0:
torque -= visc_quad * vel * abs(vel)
# ── 4. Back-EMF torque reduction ─────────────────────────────
# At high speed, the motor's effective torque is reduced because
# back-EMF opposes the supply voltage. Modelled as a torque that
# opposes the control signal proportional to speed.
bemf = params.get("back_emf_gain", 0.0)
if bemf > 0 and abs(ctrl) > 1e-6:
# The reduction should oppose the actuator torque direction.
torque -= bemf * vel * np.sign(ctrl) * actuator_torque_scale
# ── 5. Scale actuator contribution by backlash state ─────────
# When in the backlash gap, MuJoCo's actuator force should not
# transmit. We cancel it by applying an opposing force.
if actuator_torque_scale < 1.0:
# The actuator_force from MuJoCo will be applied by mj_step.
# We need to counteract it. data.qfrc_actuator isn't set yet
# at this point (pre-step), so we zero the ctrl instead.
# This is handled in the rollout loop by zeroing ctrl.
pass
data.qfrc_applied[0] = max(-10.0, min(10.0, torque))
return actuator_torque_scale
# ── Simulation rollout ───────────────────────────────────────────────
def rollout(
asset_path: str | Path,
params: dict[str, float],
actions: np.ndarray,
sim_dt: float = 0.002,
substeps: int = 10,
) -> dict[str, np.ndarray]:
"""Open-loop replay of recorded actions.
Parameters
----------
asset_path : motor asset directory
params : named parameter overrides
actions : (N,) normalised actions [-1, 1] from the recording
sim_dt : MuJoCo physics timestep
substeps : physics substeps per control step (ctrl_dt = sim_dt × substeps)
Returns
-------
dict with motor_angle (N,) and motor_vel (N,).
"""
asset_path = Path(asset_path).resolve()
model = _build_model(asset_path, params)
model.opt.timestep = sim_dt
data = mujoco.MjData(model)
mujoco.mj_resetData(model, data)
n = len(actions)
sim_motor_angle = np.zeros(n, dtype=np.float64)
sim_motor_vel = np.zeros(n, dtype=np.float64)
# Backlash state: [gap_position]. Starts at 0 (centered in gap).
backlash_state = [0.0]
for i in range(n):
ctrl = _transform_action(actions[i], params)
data.ctrl[0] = ctrl
for _ in range(substeps):
scale = _apply_forces(data, data.qvel[0], ctrl, params, backlash_state)
# If in backlash gap, zero ctrl so actuator torque doesn't transmit.
if scale < 1.0:
data.ctrl[0] = 0.0
else:
data.ctrl[0] = ctrl
mujoco.mj_step(model, data)
# Bail out on NaN/instability.
if not np.isfinite(data.qpos[0]) or abs(data.qvel[0]) > 1e4:
sim_motor_angle[i:] = np.nan
sim_motor_vel[i:] = np.nan
break
sim_motor_angle[i] = data.qpos[0]
sim_motor_vel[i] = data.qvel[0]
return {
"motor_angle": sim_motor_angle,
"motor_vel": sim_motor_vel,
}
def windowed_rollout(
asset_path: str | Path,
params: dict[str, float],
recording: dict[str, np.ndarray],
window_duration: float = 0.5,
sim_dt: float = 0.002,
substeps: int = 10,
) -> dict[str, np.ndarray]:
"""Multiple-shooting rollout for motor-only sysid.
Splits the recording into short windows. Each window is initialised
from the real motor state, preventing error accumulation.
Returns
-------
dict with motor_angle (N,), motor_vel (N,), n_windows (int).
"""
asset_path = Path(asset_path).resolve()
model = _build_model(asset_path, params)
model.opt.timestep = sim_dt
data = mujoco.MjData(model)
times = recording["time"]
actions = recording["action"]
real_angle = recording["motor_angle"]
real_vel = recording["motor_vel"]
n = len(actions)
sim_motor_angle = np.zeros(n, dtype=np.float64)
sim_motor_vel = np.zeros(n, dtype=np.float64)
# Compute window boundaries.
t0 = times[0]
t_end = times[-1]
window_starts: list[int] = []
current_t = t0
while current_t < t_end:
idx = int(np.searchsorted(times, current_t))
idx = min(idx, n - 1)
window_starts.append(idx)
current_t += window_duration
n_windows = len(window_starts)
for w, w_start in enumerate(window_starts):
w_end = window_starts[w + 1] if w + 1 < n_windows else n
# Init from real state at window start.
mujoco.mj_resetData(model, data)
data.qpos[0] = real_angle[w_start]
data.qvel[0] = real_vel[w_start]
data.ctrl[:] = 0.0
mujoco.mj_forward(model, data)
# Backlash state resets each window (assume contact at start).
backlash_state = [0.0]
for i in range(w_start, w_end):
ctrl = _transform_action(actions[i], params)
data.ctrl[0] = ctrl
for _ in range(substeps):
scale = _apply_forces(data, data.qvel[0], ctrl, params, backlash_state)
if scale < 1.0:
data.ctrl[0] = 0.0
else:
data.ctrl[0] = ctrl
mujoco.mj_step(model, data)
# Bail out on NaN/instability — fill rest of window with last good.
if not np.isfinite(data.qpos[0]) or abs(data.qvel[0]) > 1e4:
sim_motor_angle[i:w_end] = sim_motor_angle[max(i-1, w_start)]
sim_motor_vel[i:w_end] = 0.0
break
sim_motor_angle[i] = data.qpos[0]
sim_motor_vel[i] = data.qvel[0]
return {
"motor_angle": sim_motor_angle,
"motor_vel": sim_motor_vel,
"n_windows": n_windows,
}

View File

@@ -0,0 +1,204 @@
"""Visualise motor system identification — real vs simulated trajectories.
Usage:
python -m src.sysid.motor.visualize \
--recording assets/motor/recordings/motor_capture_YYYYMMDD_HHMMSS.npz
# With tuned result:
python -m src.sysid.motor.visualize \
--recording <file>.npz \
--result assets/motor/motor_sysid_result.json
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
import numpy as np
import structlog
log = structlog.get_logger()
_DEFAULT_ASSET = "assets/motor"
def visualize(
asset_path: str | Path = _DEFAULT_ASSET,
recording_path: str | Path = "",
result_path: str | Path | None = None,
sim_dt: float = 0.002,
substeps: int = 10,
window_duration: float = 0.5,
save_path: str | Path | None = None,
show: bool = True,
) -> None:
"""Generate 3-panel comparison plot: angle, velocity, action."""
import matplotlib.pyplot as plt
from src.sysid.motor.rollout import (
MOTOR_PARAMS,
defaults_vector,
params_to_dict,
rollout,
windowed_rollout,
)
asset_path = Path(asset_path).resolve()
recording = dict(np.load(recording_path))
t = recording["time"]
actions = recording["action"]
# ── Simulate with default parameters ─────────────────────────
default_params = params_to_dict(defaults_vector(MOTOR_PARAMS), MOTOR_PARAMS)
log.info("simulating_default_params", windowed=window_duration > 0)
if window_duration > 0:
sim_default = windowed_rollout(
asset_path=asset_path,
params=default_params,
recording=recording,
window_duration=window_duration,
sim_dt=sim_dt,
substeps=substeps,
)
else:
sim_default = rollout(
asset_path=asset_path,
params=default_params,
actions=actions,
sim_dt=sim_dt,
substeps=substeps,
)
# ── Simulate with tuned parameters (if available) ────────────
sim_tuned = None
tuned_cost = None
if result_path is not None:
result_path = Path(result_path)
else:
# Auto-detect.
auto = asset_path / "motor_sysid_result.json"
if auto.exists():
result_path = auto
if result_path is not None and result_path.exists():
result = json.loads(result_path.read_text())
tuned_params = result.get("best_params", {})
tuned_cost = result.get("best_cost")
log.info("simulating_tuned_params", cost=tuned_cost)
if window_duration > 0:
sim_tuned = windowed_rollout(
asset_path=asset_path,
params=tuned_params,
recording=recording,
window_duration=window_duration,
sim_dt=sim_dt,
substeps=substeps,
)
else:
sim_tuned = rollout(
asset_path=asset_path,
params=tuned_params,
actions=actions,
sim_dt=sim_dt,
substeps=substeps,
)
# ── Plot ─────────────────────────────────────────────────────
fig, axes = plt.subplots(3, 1, figsize=(14, 8), sharex=True)
# Motor angle.
ax = axes[0]
ax.plot(t, np.degrees(recording["motor_angle"]), "k-", lw=1.2, alpha=0.8, label="Real")
ax.plot(t, np.degrees(sim_default["motor_angle"]), "--", color="#d62728", lw=1.0, alpha=0.7, label="Sim (default)")
if sim_tuned is not None:
ax.plot(t, np.degrees(sim_tuned["motor_angle"]), "--", color="#2ca02c", lw=1.0, alpha=0.7, label="Sim (tuned)")
ax.set_ylabel("Motor Angle (°)")
ax.legend(loc="upper right", fontsize=8)
ax.grid(True, alpha=0.3)
# Motor velocity.
ax = axes[1]
ax.plot(t, recording["motor_vel"], "k-", lw=1.2, alpha=0.8, label="Real")
ax.plot(t, sim_default["motor_vel"], "--", color="#d62728", lw=1.0, alpha=0.7, label="Sim (default)")
if sim_tuned is not None:
ax.plot(t, sim_tuned["motor_vel"], "--", color="#2ca02c", lw=1.0, alpha=0.7, label="Sim (tuned)")
ax.set_ylabel("Motor Velocity (rad/s)")
ax.legend(loc="upper right", fontsize=8)
ax.grid(True, alpha=0.3)
# Action.
ax = axes[2]
ax.plot(t, actions, "b-", lw=0.8, alpha=0.6)
ax.set_ylabel("Action (norm)")
ax.set_xlabel("Time (s)")
ax.grid(True, alpha=0.3)
ax.set_ylim(-1.1, 1.1)
# Title.
title = "Motor System Identification — Real vs Simulated"
if tuned_cost is not None:
from src.sysid.motor.optimize import cost_function
orig_cost = cost_function(
defaults_vector(MOTOR_PARAMS),
recording,
asset_path,
MOTOR_PARAMS,
sim_dt=sim_dt,
substeps=substeps,
window_duration=window_duration,
)
title += f"\nDefault cost: {orig_cost:.4f} → Tuned cost: {tuned_cost:.4f}"
improvement = (1.0 - tuned_cost / orig_cost) * 100 if orig_cost > 0 else 0
title += f" ({improvement:+.1f}%)"
fig.suptitle(title, fontsize=12)
plt.tight_layout()
if save_path:
fig.savefig(str(save_path), dpi=150, bbox_inches="tight")
log.info("figure_saved", path=str(save_path))
if show:
plt.show()
else:
plt.close(fig)
# ── CLI ──────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(
description="Visualise motor system identification results."
)
parser.add_argument("--asset-path", type=str, default=_DEFAULT_ASSET)
parser.add_argument("--recording", type=str, required=True, help=".npz file")
parser.add_argument("--result", type=str, default=None, help="sysid result JSON")
parser.add_argument("--sim-dt", type=float, default=0.002)
parser.add_argument("--substeps", type=int, default=10)
parser.add_argument("--window-duration", type=float, default=0.5)
parser.add_argument("--save", type=str, default=None, help="Save figure path")
parser.add_argument("--no-show", action="store_true")
args = parser.parse_args()
visualize(
asset_path=args.asset_path,
recording_path=args.recording,
result_path=args.result,
sim_dt=args.sim_dt,
substeps=args.substeps,
window_duration=args.window_duration,
save_path=args.save,
show=not args.no_show,
)
if __name__ == "__main__":
main()