♻️ crazy refactor

This commit is contained in:
2026-03-11 22:52:01 +01:00
parent 35223b3560
commit 4115447022
34 changed files with 4255 additions and 102 deletions

1
src/sysid/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""System identification — tune simulation parameters to match real hardware."""

381
src/sysid/capture.py Normal file
View File

@@ -0,0 +1,381 @@
"""Capture a real-robot trajectory under random excitation (PRBS-style).
Connects to the ESP32 over serial, sends random PWM commands to excite
the system, and records motor + pendulum angles and velocities at ~50 Hz.
Saves a compressed numpy archive (.npz) that the optimizer can replay
in simulation to fit physics parameters.
Usage:
python -m src.sysid.capture \
--robot-path assets/rotary_cartpole \
--port /dev/cu.usbserial-0001 \
--duration 20
"""
from __future__ import annotations
import argparse
import math
import os
import random
import threading
import time
from datetime import datetime
from pathlib import Path
from typing import Any
import numpy as np
import structlog
import yaml
log = structlog.get_logger()
# ── Serial protocol helpers (mirrored from SerialRunner) ─────────────
def _parse_state_line(line: str) -> dict[str, Any] | None:
"""Parse an ``S,…`` state line from the ESP32."""
if not line.startswith("S,"):
return None
parts = line.split(",")
if len(parts) < 12:
return None
try:
return {
"timestamp_ms": int(parts[1]),
"encoder_count": int(parts[2]),
"rpm": float(parts[3]),
"motor_speed": int(parts[4]),
"at_limit": bool(int(parts[5])),
"pendulum_angle": float(parts[6]),
"pendulum_velocity": float(parts[7]),
"target_speed": int(parts[8]),
"braking": bool(int(parts[9])),
"enc_vel_cps": float(parts[10]),
"pendulum_ok": bool(int(parts[11])),
}
except (ValueError, IndexError):
return None
# ── Background serial reader ─────────────────────────────────────────
class _SerialReader:
"""Minimal background reader for the ESP32 serial stream."""
def __init__(self, port: str, baud: int = 115200):
import serial as _serial
self._serial_mod = _serial
self.ser = _serial.Serial(port, baud, timeout=0.05)
time.sleep(2) # Wait for ESP32 boot.
self.ser.reset_input_buffer()
self._latest: dict[str, Any] = {}
self._lock = threading.Lock()
self._event = threading.Event()
self._running = True
self._thread = threading.Thread(target=self._reader_loop, daemon=True)
self._thread.start()
def _reader_loop(self) -> None:
while self._running:
try:
if self.ser.in_waiting:
line = (
self.ser.readline()
.decode("utf-8", errors="ignore")
.strip()
)
parsed = _parse_state_line(line)
if parsed is not None:
with self._lock:
self._latest = parsed
self._event.set()
else:
time.sleep(0.001)
except (OSError, self._serial_mod.SerialException):
log.critical("serial_lost")
break
def send(self, cmd: str) -> None:
try:
self.ser.write(f"{cmd}\n".encode())
except (OSError, self._serial_mod.SerialException):
log.critical("serial_send_failed", cmd=cmd)
def read(self) -> dict[str, Any]:
with self._lock:
return dict(self._latest)
def read_blocking(self, timeout: float = 0.1) -> dict[str, Any]:
self._event.clear()
self._event.wait(timeout=timeout)
return self.read()
def close(self) -> None:
self._running = False
self.send("H")
self.send("M0")
time.sleep(0.1)
self._thread.join(timeout=1.0)
self.ser.close()
# ── PRBS excitation signal ───────────────────────────────────────────
class _PRBSExcitation:
"""Random hold-value excitation with configurable amplitude and hold time.
At each call to ``__call__``, returns the current PWM value.
The value is held for a random duration (``hold_min````hold_max`` ms),
then a new random value is drawn uniformly from ``[-amplitude, +amplitude]``.
"""
def __init__(
self,
amplitude: int = 180,
hold_min_ms: int = 50,
hold_max_ms: int = 300,
):
self.amplitude = amplitude
self.hold_min_ms = hold_min_ms
self.hold_max_ms = hold_max_ms
self._current: int = 0
self._switch_time: float = 0.0
self._new_value()
def _new_value(self) -> None:
self._current = random.randint(-self.amplitude, self.amplitude)
hold_ms = random.randint(self.hold_min_ms, self.hold_max_ms)
self._switch_time = time.monotonic() + hold_ms / 1000.0
def __call__(self) -> int:
if time.monotonic() >= self._switch_time:
self._new_value()
return self._current
# ── Main capture loop ────────────────────────────────────────────────
def capture(
robot_path: str | Path,
port: str = "/dev/cu.usbserial-0001",
baud: int = 115200,
duration: float = 20.0,
amplitude: int = 180,
hold_min_ms: int = 50,
hold_max_ms: int = 300,
dt: float = 0.02,
) -> Path:
"""Run the capture procedure and return the path to the saved .npz file.
Parameters
----------
robot_path : path to robot asset directory (contains hardware.yaml)
port : serial port for ESP32
baud : baud rate
duration : capture duration in seconds
amplitude : max PWM magnitude for excitation (0255)
hold_min_ms / hold_max_ms : random hold time range (ms)
dt : target sampling period (seconds), default 50 Hz
"""
robot_path = Path(robot_path).resolve()
# Load hardware config for encoder conversion + safety.
hw_yaml = robot_path / "hardware.yaml"
if not hw_yaml.exists():
raise FileNotFoundError(f"hardware.yaml not found in {robot_path}")
raw_hw = yaml.safe_load(hw_yaml.read_text())
ppr = raw_hw.get("encoder", {}).get("ppr", 11)
gear_ratio = raw_hw.get("encoder", {}).get("gear_ratio", 30.0)
counts_per_rev: float = ppr * gear_ratio * 4.0
max_motor_deg = raw_hw.get("safety", {}).get("max_motor_angle_deg", 90.0)
max_motor_rad = math.radians(max_motor_deg) if max_motor_deg > 0 else 0.0
# Connect.
reader = _SerialReader(port, baud)
excitation = _PRBSExcitation(amplitude, hold_min_ms, hold_max_ms)
# Prepare recording buffers.
max_samples = int(duration / dt) + 500 # headroom
rec_time = np.zeros(max_samples, dtype=np.float64)
rec_action = np.zeros(max_samples, dtype=np.float64)
rec_motor_angle = np.zeros(max_samples, dtype=np.float64)
rec_motor_vel = np.zeros(max_samples, dtype=np.float64)
rec_pend_angle = np.zeros(max_samples, dtype=np.float64)
rec_pend_vel = np.zeros(max_samples, dtype=np.float64)
# Start streaming.
reader.send("G")
time.sleep(0.1)
log.info(
"capture_starting",
port=port,
duration=duration,
amplitude=amplitude,
hold_range_ms=f"{hold_min_ms}{hold_max_ms}",
dt=dt,
)
idx = 0
t0 = time.monotonic()
try:
while True:
loop_start = time.monotonic()
elapsed = loop_start - t0
if elapsed >= duration:
break
# Get excitation PWM.
pwm = excitation()
# Safety: reverse/zero if near motor limit.
state = reader.read()
if state:
motor_angle_rad = (
state.get("encoder_count", 0) / counts_per_rev * 2.0 * math.pi
)
if max_motor_rad > 0:
margin = max_motor_rad * 0.85 # start braking at 85%
if motor_angle_rad > margin and pwm > 0:
pwm = -abs(pwm) # reverse
elif motor_angle_rad < -margin and pwm < 0:
pwm = abs(pwm) # reverse
# Send command.
reader.send(f"M{pwm}")
# Wait for fresh data.
time.sleep(max(0, dt - (time.monotonic() - loop_start) - 0.005))
state = reader.read_blocking(timeout=dt)
if state:
enc = state.get("encoder_count", 0)
motor_angle = enc / counts_per_rev * 2.0 * math.pi
motor_vel = (
state.get("enc_vel_cps", 0.0) / counts_per_rev * 2.0 * math.pi
)
pend_angle = math.radians(state.get("pendulum_angle", 0.0))
pend_vel = math.radians(state.get("pendulum_velocity", 0.0))
# Normalised action: PWM / 255 → [-1, 1]
action_norm = pwm / 255.0
if idx < max_samples:
rec_time[idx] = elapsed
rec_action[idx] = action_norm
rec_motor_angle[idx] = motor_angle
rec_motor_vel[idx] = motor_vel
rec_pend_angle[idx] = pend_angle
rec_pend_vel[idx] = pend_vel
idx += 1
# Progress.
if idx % 50 == 0:
log.info(
"capture_progress",
elapsed=f"{elapsed:.1f}/{duration:.0f}s",
samples=idx,
pwm=pwm,
)
# Pace to dt.
remaining = dt - (time.monotonic() - loop_start)
if remaining > 0:
time.sleep(remaining)
finally:
reader.send("M0")
reader.close()
# Trim to actual sample count.
rec_time = rec_time[:idx]
rec_action = rec_action[:idx]
rec_motor_angle = rec_motor_angle[:idx]
rec_motor_vel = rec_motor_vel[:idx]
rec_pend_angle = rec_pend_angle[:idx]
rec_pend_vel = rec_pend_vel[:idx]
# Save.
recordings_dir = robot_path / "recordings"
recordings_dir.mkdir(exist_ok=True)
stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
out_path = recordings_dir / f"capture_{stamp}.npz"
np.savez_compressed(
out_path,
time=rec_time,
action=rec_action,
motor_angle=rec_motor_angle,
motor_vel=rec_motor_vel,
pendulum_angle=rec_pend_angle,
pendulum_vel=rec_pend_vel,
)
log.info(
"capture_saved",
path=str(out_path),
samples=idx,
duration_actual=f"{rec_time[-1]:.2f}s" if idx > 0 else "0s",
)
return out_path
# ── CLI entry point ──────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(
description="Capture a real-robot trajectory for system identification."
)
parser.add_argument(
"--robot-path",
type=str,
default="assets/rotary_cartpole",
help="Path to robot asset directory (contains hardware.yaml)",
)
parser.add_argument(
"--port",
type=str,
default="/dev/cu.usbserial-0001",
help="Serial port for ESP32",
)
parser.add_argument("--baud", type=int, default=115200)
parser.add_argument(
"--duration", type=float, default=20.0, help="Capture duration (s)"
)
parser.add_argument(
"--amplitude", type=int, default=180, help="Max PWM magnitude (0255)"
)
parser.add_argument(
"--hold-min-ms", type=int, default=50, help="Min hold time (ms)"
)
parser.add_argument(
"--hold-max-ms", type=int, default=300, help="Max hold time (ms)"
)
parser.add_argument(
"--dt", type=float, default=0.02, help="Sample period (s)"
)
args = parser.parse_args()
capture(
robot_path=args.robot_path,
port=args.port,
baud=args.baud,
duration=args.duration,
amplitude=args.amplitude,
hold_min_ms=args.hold_min_ms,
hold_max_ms=args.hold_max_ms,
dt=args.dt,
)
if __name__ == "__main__":
main()

186
src/sysid/export.py Normal file
View File

@@ -0,0 +1,186 @@
"""Export tuned parameters to URDF and robot.yaml files.
Reads the original files, injects the optimised parameter values,
and writes ``rotary_cartpole_tuned.urdf`` + ``robot_tuned.yaml``
alongside the originals in the robot asset directory.
"""
from __future__ import annotations
import copy
import json
import shutil
import xml.etree.ElementTree as ET
from pathlib import Path
import structlog
import yaml
log = structlog.get_logger()
def export_tuned_files(
robot_path: str | Path,
params: dict[str, float],
) -> tuple[Path, Path]:
"""Write tuned URDF and robot.yaml files.
Parameters
----------
robot_path : robot asset directory (contains robot.yaml + *.urdf)
params : dict of parameter name → tuned value (from optimizer)
Returns
-------
(tuned_urdf_path, tuned_robot_yaml_path)
"""
robot_path = Path(robot_path).resolve()
# ── Load originals ───────────────────────────────────────────
robot_yaml_path = robot_path / "robot.yaml"
robot_cfg = yaml.safe_load(robot_yaml_path.read_text())
urdf_path = robot_path / robot_cfg["urdf"]
# ── Tune URDF ────────────────────────────────────────────────
tree = ET.parse(urdf_path)
root = tree.getroot()
for link in root.iter("link"):
link_name = link.get("name", "")
inertial = link.find("inertial")
if inertial is None:
continue
if link_name == "arm":
_set_mass(inertial, params.get("arm_mass"))
_set_com(
inertial,
params.get("arm_com_x"),
params.get("arm_com_y"),
params.get("arm_com_z"),
)
elif link_name == "pendulum":
_set_mass(inertial, params.get("pendulum_mass"))
_set_com(
inertial,
params.get("pendulum_com_x"),
params.get("pendulum_com_y"),
params.get("pendulum_com_z"),
)
_set_inertia(
inertial,
ixx=params.get("pendulum_ixx"),
iyy=params.get("pendulum_iyy"),
izz=params.get("pendulum_izz"),
ixy=params.get("pendulum_ixy"),
)
# Write tuned URDF.
tuned_urdf_name = urdf_path.stem + "_tuned" + urdf_path.suffix
tuned_urdf_path = robot_path / tuned_urdf_name
# Preserve the XML declaration and original formatting as much as possible.
ET.indent(tree, space=" ")
tree.write(str(tuned_urdf_path), xml_declaration=True, encoding="unicode")
log.info("tuned_urdf_written", path=str(tuned_urdf_path))
# ── Tune robot.yaml ──────────────────────────────────────────
tuned_cfg = copy.deepcopy(robot_cfg)
# Point to the tuned URDF.
tuned_cfg["urdf"] = tuned_urdf_name
# Update actuator parameters.
if tuned_cfg.get("actuators") and len(tuned_cfg["actuators"]) > 0:
act = tuned_cfg["actuators"][0]
if "actuator_gear" in params:
act["gear"] = round(params["actuator_gear"], 6)
if "actuator_filter_tau" in params:
act["filter_tau"] = round(params["actuator_filter_tau"], 6)
if "motor_damping" in params:
act["damping"] = round(params["motor_damping"], 6)
# Update joint overrides.
if "joints" not in tuned_cfg:
tuned_cfg["joints"] = {}
if "motor_joint" not in tuned_cfg["joints"]:
tuned_cfg["joints"]["motor_joint"] = {}
mj = tuned_cfg["joints"]["motor_joint"]
if "motor_armature" in params:
mj["armature"] = round(params["motor_armature"], 6)
if "motor_frictionloss" in params:
mj["frictionloss"] = round(params["motor_frictionloss"], 6)
if "pendulum_joint" not in tuned_cfg["joints"]:
tuned_cfg["joints"]["pendulum_joint"] = {}
pj = tuned_cfg["joints"]["pendulum_joint"]
if "pendulum_damping" in params:
pj["damping"] = round(params["pendulum_damping"], 6)
# Write tuned robot.yaml.
tuned_yaml_path = robot_path / "robot_tuned.yaml"
# Add a header comment.
header = (
"# Tuned robot config — generated by src.sysid.optimize\n"
"# Original: robot.yaml\n"
"# Run `python -m src.sysid.visualize` to compare real vs sim.\n\n"
)
tuned_yaml_path.write_text(
header + yaml.dump(tuned_cfg, default_flow_style=False, sort_keys=False)
)
log.info("tuned_robot_yaml_written", path=str(tuned_yaml_path))
return tuned_urdf_path, tuned_yaml_path
# ── XML helpers (shared with rollout.py) ─────────────────────────────
def _set_mass(inertial: ET.Element, mass: float | None) -> None:
if mass is None:
return
mass_el = inertial.find("mass")
if mass_el is not None:
mass_el.set("value", str(mass))
def _set_com(
inertial: ET.Element,
x: float | None,
y: float | None,
z: float | None,
) -> None:
origin = inertial.find("origin")
if origin is None:
return
xyz = origin.get("xyz", "0 0 0").split()
if x is not None:
xyz[0] = str(x)
if y is not None:
xyz[1] = str(y)
if z is not None:
xyz[2] = str(z)
origin.set("xyz", " ".join(xyz))
def _set_inertia(
inertial: ET.Element,
ixx: float | None = None,
iyy: float | None = None,
izz: float | None = None,
ixy: float | None = None,
iyz: float | None = None,
ixz: float | None = None,
) -> None:
ine = inertial.find("inertia")
if ine is None:
return
for attr, val in [
("ixx", ixx), ("iyy", iyy), ("izz", izz),
("ixy", ixy), ("iyz", iyz), ("ixz", ixz),
]:
if val is not None:
ine.set(attr, str(val))

376
src/sysid/optimize.py Normal file
View File

@@ -0,0 +1,376 @@
"""CMA-ES optimiser — fit simulation parameters to a real-robot recording.
Minimises the trajectory-matching cost between a MuJoCo rollout and a
recorded real-robot sequence. Uses the ``cmaes`` package (pure-Python
CMA-ES with native box-constraint support).
Usage:
python -m src.sysid.optimize \
--robot-path assets/rotary_cartpole \
--recording assets/rotary_cartpole/recordings/capture_20260311_120000.npz
# Shorter run for testing:
python -m src.sysid.optimize \
--robot-path assets/rotary_cartpole \
--recording <file>.npz \
--max-generations 10 --population-size 8
"""
from __future__ import annotations
import argparse
import json
import time
from datetime import datetime
from pathlib import Path
import numpy as np
import structlog
from src.sysid.rollout import (
ROTARY_CARTPOLE_PARAMS,
ParamSpec,
bounds_arrays,
defaults_vector,
params_to_dict,
rollout,
windowed_rollout,
)
log = structlog.get_logger()
# ── Cost function ────────────────────────────────────────────────────
def _angle_diff(a: np.ndarray, b: np.ndarray) -> np.ndarray:
"""Shortest signed angle difference, handling wrapping."""
return np.arctan2(np.sin(a - b), np.cos(a - b))
def _check_inertia_valid(params: dict[str, float]) -> bool:
"""Quick reject: pendulum inertia tensor must be positive-definite."""
ixx = params.get("pendulum_ixx", 6.16e-06)
iyy = params.get("pendulum_iyy", 6.16e-06)
izz = params.get("pendulum_izz", 1.23e-05)
ixy = params.get("pendulum_ixy", 6.10e-06)
det_xy = ixx * iyy - ixy * ixy
return det_xy > 0 and ixx > 0 and iyy > 0 and izz > 0
def _compute_trajectory_cost(
sim: dict[str, np.ndarray],
recording: dict[str, np.ndarray],
pos_weight: float = 1.0,
vel_weight: float = 0.1,
) -> float:
"""Weighted MSE between sim and real trajectories."""
motor_err = _angle_diff(sim["motor_angle"], recording["motor_angle"])
pend_err = _angle_diff(sim["pendulum_angle"], recording["pendulum_angle"])
motor_vel_err = sim["motor_vel"] - recording["motor_vel"]
pend_vel_err = sim["pendulum_vel"] - recording["pendulum_vel"]
return float(
pos_weight * np.mean(motor_err**2)
+ pos_weight * np.mean(pend_err**2)
+ vel_weight * np.mean(motor_vel_err**2)
+ vel_weight * np.mean(pend_vel_err**2)
)
def cost_function(
params_vec: np.ndarray,
recording: dict[str, np.ndarray],
robot_path: Path,
specs: list[ParamSpec],
sim_dt: float = 0.002,
substeps: int = 10,
pos_weight: float = 1.0,
vel_weight: float = 0.1,
window_duration: float = 0.5,
) -> float:
"""Compute trajectory-matching cost for a candidate parameter vector.
Uses **multiple-shooting** (windowed rollout): the recording is split
into short windows (default 0.5 s). Each window is initialised from
the real qpos/qvel, so early errors dont compound across the full
trajectory. This gives a much smoother cost landscape for CMA-ES.
Set ``window_duration=0`` to fall back to the original open-loop
single-shot rollout (not recommended).
"""
params = params_to_dict(params_vec, specs)
if not _check_inertia_valid(params):
return 1e6
try:
if window_duration > 0:
sim = windowed_rollout(
robot_path=robot_path,
params=params,
recording=recording,
window_duration=window_duration,
sim_dt=sim_dt,
substeps=substeps,
)
else:
sim = rollout(
robot_path=robot_path,
params=params,
actions=recording["action"],
timesteps=recording["time"],
sim_dt=sim_dt,
substeps=substeps,
)
except Exception as exc:
log.warning("rollout_failed", error=str(exc))
return 1e6
return _compute_trajectory_cost(sim, recording, pos_weight, vel_weight)
# ── CMA-ES optimisation loop ────────────────────────────────────────
def optimize(
robot_path: str | Path,
recording_path: str | Path,
specs: list[ParamSpec] | None = None,
sigma0: float = 0.3,
population_size: int = 20,
max_generations: int = 1000,
sim_dt: float = 0.002,
substeps: int = 10,
pos_weight: float = 1.0,
vel_weight: float = 0.1,
window_duration: float = 0.5,
seed: int = 42,
) -> dict:
"""Run CMA-ES optimisation and return results.
Returns a dict with:
best_params: dict[str, float]
best_cost: float
history: list of (generation, best_cost) tuples
recording: str (path used)
specs: list of param names
"""
from cmaes import CMA
robot_path = Path(robot_path).resolve()
recording_path = Path(recording_path).resolve()
if specs is None:
specs = ROTARY_CARTPOLE_PARAMS
# Load recording.
recording = dict(np.load(recording_path))
n_samples = len(recording["time"])
duration = recording["time"][-1] - recording["time"][0]
n_windows = max(1, int(duration / window_duration)) if window_duration > 0 else 1
log.info(
"recording_loaded",
path=str(recording_path),
samples=n_samples,
duration=f"{duration:.1f}s",
window_duration=f"{window_duration}s",
n_windows=n_windows,
)
# Initial point (defaults) — normalised to [0, 1] for CMA-ES.
lo, hi = bounds_arrays(specs)
x0 = defaults_vector(specs)
# Normalise to [0, 1] for the optimizer (better conditioned).
span = hi - lo
span[span == 0] = 1.0 # avoid division by zero
def to_normed(x: np.ndarray) -> np.ndarray:
return (x - lo) / span
def from_normed(x_n: np.ndarray) -> np.ndarray:
return x_n * span + lo
x0_normed = to_normed(x0)
bounds_normed = np.column_stack(
[np.zeros(len(specs)), np.ones(len(specs))]
)
optimizer = CMA(
mean=x0_normed,
sigma=sigma0,
bounds=bounds_normed,
population_size=population_size,
seed=seed,
)
best_cost = float("inf")
best_params_vec = x0.copy()
history: list[tuple[int, float]] = []
log.info(
"cmaes_starting",
n_params=len(specs),
population=population_size,
max_gens=max_generations,
sigma0=sigma0,
)
t0 = time.monotonic()
for gen in range(max_generations):
solutions = []
for _ in range(optimizer.population_size):
x_normed = optimizer.ask()
x_natural = from_normed(x_normed)
# Clip to bounds (CMA-ES can slightly exceed with sampling noise).
x_natural = np.clip(x_natural, lo, hi)
c = cost_function(
x_natural,
recording,
robot_path,
specs,
sim_dt=sim_dt,
substeps=substeps,
pos_weight=pos_weight,
vel_weight=vel_weight,
window_duration=window_duration,
)
solutions.append((x_normed, c))
if c < best_cost:
best_cost = c
best_params_vec = x_natural.copy()
optimizer.tell(solutions)
history.append((gen, best_cost))
elapsed = time.monotonic() - t0
if gen % 5 == 0 or gen == max_generations - 1:
log.info(
"cmaes_generation",
gen=gen,
best_cost=f"{best_cost:.6f}",
elapsed=f"{elapsed:.1f}s",
gen_best=f"{min(c for _, c in solutions):.6f}",
)
total_time = time.monotonic() - t0
best_params = params_to_dict(best_params_vec, specs)
log.info(
"cmaes_finished",
best_cost=f"{best_cost:.6f}",
total_time=f"{total_time:.1f}s",
evaluations=max_generations * population_size,
)
# Log parameter comparison.
defaults = params_to_dict(defaults_vector(specs), specs)
for name in best_params:
d = defaults[name]
b = best_params[name]
change_pct = ((b - d) / abs(d) * 100) if abs(d) > 1e-12 else 0.0
log.info(
"param_result",
name=name,
default=f"{d:.6g}",
tuned=f"{b:.6g}",
change=f"{change_pct:+.1f}%",
)
return {
"best_params": best_params,
"best_cost": best_cost,
"history": history,
"recording": str(recording_path),
"param_names": [s.name for s in specs],
"defaults": {s.name: s.default for s in specs},
"timestamp": datetime.now().isoformat(),
}
# ── CLI entry point ──────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(
description="Fit simulation parameters to a real-robot recording (CMA-ES)."
)
parser.add_argument(
"--robot-path",
type=str,
default="assets/rotary_cartpole",
help="Path to robot asset directory",
)
parser.add_argument(
"--recording",
type=str,
required=True,
help="Path to .npz recording file",
)
parser.add_argument("--sigma0", type=float, default=0.3)
parser.add_argument("--population-size", type=int, default=20)
parser.add_argument("--max-generations", type=int, default=200)
parser.add_argument("--sim-dt", type=float, default=0.002)
parser.add_argument("--substeps", type=int, default=10)
parser.add_argument("--pos-weight", type=float, default=1.0)
parser.add_argument("--vel-weight", type=float, default=0.1)
parser.add_argument(
"--window-duration",
type=float,
default=0.5,
help="Shooting window length in seconds (0 = open-loop, default 0.5)",
)
parser.add_argument("--seed", type=int, default=42)
parser.add_argument(
"--no-export",
action="store_true",
help="Skip exporting tuned files (results JSON only)",
)
args = parser.parse_args()
result = optimize(
robot_path=args.robot_path,
recording_path=args.recording,
sigma0=args.sigma0,
population_size=args.population_size,
max_generations=args.max_generations,
sim_dt=args.sim_dt,
substeps=args.substeps,
pos_weight=args.pos_weight,
vel_weight=args.vel_weight,
window_duration=args.window_duration,
seed=args.seed,
)
# Save results JSON.
robot_path = Path(args.robot_path).resolve()
result_path = robot_path / "sysid_result.json"
# Convert numpy types for JSON serialisation.
result_json = {
k: v for k, v in result.items() if k != "history"
}
result_json["history_summary"] = {
"first_cost": result["history"][0][1] if result["history"] else None,
"final_cost": result["history"][-1][1] if result["history"] else None,
"generations": len(result["history"]),
}
result_path.write_text(json.dumps(result_json, indent=2, default=str))
log.info("results_saved", path=str(result_path))
# Export tuned files unless --no-export.
if not args.no_export:
from src.sysid.export import export_tuned_files
export_tuned_files(
robot_path=args.robot_path,
params=result["best_params"],
)
if __name__ == "__main__":
main()

477
src/sysid/rollout.py Normal file
View File

@@ -0,0 +1,477 @@
"""Deterministic simulation replay — roll out recorded actions in MuJoCo.
Given a parameter vector and a recorded action sequence, builds a MuJoCo
model with overridden physics parameters, replays the actions, and returns
the simulated trajectory for comparison with the real recording.
This module is the inner loop of the CMA-ES optimizer: it is called once
per candidate parameter vector per generation.
"""
from __future__ import annotations
import copy
import dataclasses
import math
import tempfile
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Any
import mujoco
import numpy as np
import yaml
# ── Tunable parameter specification ──────────────────────────────────
@dataclasses.dataclass
class ParamSpec:
"""Specification for a single tunable parameter."""
name: str
default: float
lower: float
upper: float
log_scale: bool = False # optimise in log-space (masses, inertias)
# Default parameter specs for the rotary cartpole.
# Order matters: the optimizer maps a flat vector to these specs.
ROTARY_CARTPOLE_PARAMS: list[ParamSpec] = [
# ── Arm link (URDF) ──────────────────────────────────────────
ParamSpec("arm_mass", 0.010, 0.003, 0.05, log_scale=True),
ParamSpec("arm_com_x", 0.00005, -0.02, 0.02),
ParamSpec("arm_com_y", 0.0065, -0.01, 0.02),
ParamSpec("arm_com_z", 0.00563, -0.01, 0.02),
# ── Pendulum link (URDF) ─────────────────────────────────────
ParamSpec("pendulum_mass", 0.015, 0.005, 0.05, log_scale=True),
ParamSpec("pendulum_com_x", 0.1583, 0.05, 0.25),
ParamSpec("pendulum_com_y", -0.0983, -0.20, 0.0),
ParamSpec("pendulum_com_z", 0.0, -0.05, 0.05),
ParamSpec("pendulum_ixx", 6.16e-06, 1e-07, 1e-04, log_scale=True),
ParamSpec("pendulum_iyy", 6.16e-06, 1e-07, 1e-04, log_scale=True),
ParamSpec("pendulum_izz", 1.23e-05, 1e-07, 1e-04, log_scale=True),
ParamSpec("pendulum_ixy", 6.10e-06, -1e-04, 1e-04),
# ── Actuator / joint dynamics (robot.yaml) ───────────────────
ParamSpec("actuator_gear", 0.064, 0.01, 0.2, log_scale=True),
ParamSpec("actuator_filter_tau", 0.03, 0.005, 0.15),
ParamSpec("motor_damping", 0.003, 1e-4, 0.05, log_scale=True),
ParamSpec("pendulum_damping", 0.0001, 1e-5, 0.01, log_scale=True),
ParamSpec("motor_armature", 0.0001, 1e-5, 0.01, log_scale=True),
ParamSpec("motor_frictionloss", 0.03, 0.001, 0.1, log_scale=True),
]
def params_to_dict(
values: np.ndarray, specs: list[ParamSpec] | None = None
) -> dict[str, float]:
"""Convert a flat parameter vector to a named dict."""
if specs is None:
specs = ROTARY_CARTPOLE_PARAMS
return {s.name: float(values[i]) for i, s in enumerate(specs)}
def defaults_vector(specs: list[ParamSpec] | None = None) -> np.ndarray:
"""Return the default parameter vector (in natural space)."""
if specs is None:
specs = ROTARY_CARTPOLE_PARAMS
return np.array([s.default for s in specs], dtype=np.float64)
def bounds_arrays(
specs: list[ParamSpec] | None = None,
) -> tuple[np.ndarray, np.ndarray]:
"""Return (lower, upper) bound arrays."""
if specs is None:
specs = ROTARY_CARTPOLE_PARAMS
lo = np.array([s.lower for s in specs], dtype=np.float64)
hi = np.array([s.upper for s in specs], dtype=np.float64)
return lo, hi
# ── MuJoCo model building with parameter overrides ──────────────────
def _build_model(
robot_path: Path,
params: dict[str, float],
) -> mujoco.MjModel:
"""Build a MuJoCo model from URDF + robot.yaml with parameter overrides.
Follows the same two-step approach as ``MuJoCoRunner._load_model()``:
1. Parse URDF, inject meshdir, load into MuJoCo
2. Export MJCF, inject actuators + joint overrides + param overrides, reload
"""
robot_path = Path(robot_path).resolve()
robot_yaml = yaml.safe_load((robot_path / "robot.yaml").read_text())
urdf_path = robot_path / robot_yaml["urdf"]
# ── Step 1: Load URDF ────────────────────────────────────────
tree = ET.parse(urdf_path)
root = tree.getroot()
# Inject meshdir compiler directive.
meshdir = None
for mesh_el in root.iter("mesh"):
fn = mesh_el.get("filename", "")
parent = str(Path(fn).parent)
if parent and parent != ".":
meshdir = parent
break
if meshdir:
mj_ext = ET.SubElement(root, "mujoco")
ET.SubElement(
mj_ext, "compiler", attrib={"meshdir": meshdir, "balanceinertia": "true"}
)
# Override URDF inertial parameters BEFORE MuJoCo loading.
for link in root.iter("link"):
link_name = link.get("name", "")
inertial = link.find("inertial")
if inertial is None:
continue
if link_name == "arm":
_set_mass(inertial, params.get("arm_mass"))
_set_com(
inertial,
params.get("arm_com_x"),
params.get("arm_com_y"),
params.get("arm_com_z"),
)
elif link_name == "pendulum":
_set_mass(inertial, params.get("pendulum_mass"))
_set_com(
inertial,
params.get("pendulum_com_x"),
params.get("pendulum_com_y"),
params.get("pendulum_com_z"),
)
_set_inertia(
inertial,
ixx=params.get("pendulum_ixx"),
iyy=params.get("pendulum_iyy"),
izz=params.get("pendulum_izz"),
ixy=params.get("pendulum_ixy"),
)
# Write temp URDF and load.
tmp_urdf = robot_path / "_tmp_sysid_load.urdf"
tree.write(str(tmp_urdf), xml_declaration=True, encoding="unicode")
try:
model_raw = mujoco.MjModel.from_xml_path(str(tmp_urdf))
finally:
tmp_urdf.unlink(missing_ok=True)
# ── Step 2: Export MJCF, inject actuators + overrides ────────
tmp_mjcf = robot_path / "_tmp_sysid_inject.xml"
try:
mujoco.mj_saveLastXML(str(tmp_mjcf), model_raw)
mjcf_root = ET.fromstring(tmp_mjcf.read_text())
# Actuator.
gear = params.get("actuator_gear", robot_yaml["actuators"][0].get("gear", 0.064))
filter_tau = params.get(
"actuator_filter_tau",
robot_yaml["actuators"][0].get("filter_tau", 0.03),
)
act_cfg = robot_yaml["actuators"][0]
ctrl_lo, ctrl_hi = act_cfg.get("ctrl_range", [-1.0, 1.0])
act_elem = ET.SubElement(mjcf_root, "actuator")
attribs: dict[str, str] = {
"name": f"{act_cfg['joint']}_motor",
"joint": act_cfg["joint"],
"gear": str(gear),
"ctrlrange": f"{ctrl_lo} {ctrl_hi}",
}
if filter_tau > 0:
attribs["dyntype"] = "filter"
attribs["dynprm"] = str(filter_tau)
attribs["gaintype"] = "fixed"
attribs["biastype"] = "none"
ET.SubElement(act_elem, "general", attrib=attribs)
else:
ET.SubElement(act_elem, "motor", attrib=attribs)
# Joint overrides.
motor_damping = params.get("motor_damping", 0.003)
pend_damping = params.get("pendulum_damping", 0.0001)
motor_armature = params.get("motor_armature", 0.0001)
motor_frictionloss = params.get("motor_frictionloss", 0.03)
for body in mjcf_root.iter("body"):
for jnt in body.findall("joint"):
name = jnt.get("name")
if name == "motor_joint":
jnt.set("damping", str(motor_damping))
jnt.set("armature", str(motor_armature))
jnt.set("frictionloss", str(motor_frictionloss))
elif name == "pendulum_joint":
jnt.set("damping", str(pend_damping))
# Disable self-collision.
for geom in mjcf_root.iter("geom"):
geom.set("contype", "0")
geom.set("conaffinity", "0")
modified_xml = ET.tostring(mjcf_root, encoding="unicode")
tmp_mjcf.write_text(modified_xml)
model = mujoco.MjModel.from_xml_path(str(tmp_mjcf))
finally:
tmp_mjcf.unlink(missing_ok=True)
return model
def _set_mass(inertial: ET.Element, mass: float | None) -> None:
if mass is None:
return
mass_el = inertial.find("mass")
if mass_el is not None:
mass_el.set("value", str(mass))
def _set_com(
inertial: ET.Element,
x: float | None,
y: float | None,
z: float | None,
) -> None:
origin = inertial.find("origin")
if origin is None:
return
xyz = origin.get("xyz", "0 0 0").split()
if x is not None:
xyz[0] = str(x)
if y is not None:
xyz[1] = str(y)
if z is not None:
xyz[2] = str(z)
origin.set("xyz", " ".join(xyz))
def _set_inertia(
inertial: ET.Element,
ixx: float | None = None,
iyy: float | None = None,
izz: float | None = None,
ixy: float | None = None,
iyz: float | None = None,
ixz: float | None = None,
) -> None:
ine = inertial.find("inertia")
if ine is None:
return
for attr, val in [
("ixx", ixx), ("iyy", iyy), ("izz", izz),
("ixy", ixy), ("iyz", iyz), ("ixz", ixz),
]:
if val is not None:
ine.set(attr, str(val))
# ── Simulation rollout ───────────────────────────────────────────────
def rollout(
robot_path: str | Path,
params: dict[str, float],
actions: np.ndarray,
timesteps: np.ndarray,
sim_dt: float = 0.002,
substeps: int = 10,
) -> dict[str, np.ndarray]:
"""Replay recorded actions in MuJoCo with overridden parameters.
Parameters
----------
robot_path : asset directory
params : named parameter overrides
actions : (N,) normalised actions [-1, 1] from the recording
timesteps : (N,) wall-clock times (seconds) from the recording
sim_dt : MuJoCo physics timestep
substeps : physics substeps per control step
Returns
-------
dict with keys: motor_angle, motor_vel, pendulum_angle, pendulum_vel
Each is an (N,) numpy array of simulated values.
"""
robot_path = Path(robot_path).resolve()
model = _build_model(robot_path, params)
model.opt.timestep = sim_dt
data = mujoco.MjData(model)
# Start from pendulum hanging down (qpos=0 is down per URDF convention).
mujoco.mj_resetData(model, data)
# Control dt derived from actual recording sample rate.
n = len(actions)
ctrl_dt = sim_dt * substeps
# Pre-allocate output.
sim_motor_angle = np.zeros(n, dtype=np.float64)
sim_motor_vel = np.zeros(n, dtype=np.float64)
sim_pend_angle = np.zeros(n, dtype=np.float64)
sim_pend_vel = np.zeros(n, dtype=np.float64)
# Extract actuator limit info for software limit switch.
nu = model.nu
if nu > 0:
jnt_id = model.actuator_trnid[0, 0]
jnt_limited = bool(model.jnt_limited[jnt_id])
jnt_lo = model.jnt_range[jnt_id, 0]
jnt_hi = model.jnt_range[jnt_id, 1]
gear_sign = float(np.sign(model.actuator_gear[0, 0]))
else:
jnt_limited = False
jnt_lo = jnt_hi = gear_sign = 0.0
for i in range(n):
data.ctrl[0] = actions[i]
for _ in range(substeps):
# Software limit switch (mirrors MuJoCoRunner).
if jnt_limited and nu > 0:
pos = data.qpos[jnt_id]
if pos >= jnt_hi and gear_sign * data.ctrl[0] > 0:
data.ctrl[0] = 0.0
elif pos <= jnt_lo and gear_sign * data.ctrl[0] < 0:
data.ctrl[0] = 0.0
mujoco.mj_step(model, data)
sim_motor_angle[i] = data.qpos[0]
sim_motor_vel[i] = data.qvel[0]
sim_pend_angle[i] = data.qpos[1]
sim_pend_vel[i] = data.qvel[1]
return {
"motor_angle": sim_motor_angle,
"motor_vel": sim_motor_vel,
"pendulum_angle": sim_pend_angle,
"pendulum_vel": sim_pend_vel,
}
def windowed_rollout(
robot_path: str | Path,
params: dict[str, float],
recording: dict[str, np.ndarray],
window_duration: float = 0.5,
sim_dt: float = 0.002,
substeps: int = 10,
) -> dict[str, np.ndarray | float]:
"""Multiple-shooting rollout — split recording into short windows.
For each window:
1. Initialize MuJoCo state from the real qpos/qvel at the window start.
2. Replay the recorded actions within the window.
3. Record the simulated output.
This prevents error accumulation across the full trajectory, giving
a much smoother cost landscape for the optimizer.
Parameters
----------
robot_path : asset directory
params : named parameter overrides
recording : dict with keys time, action, motor_angle, motor_vel,
pendulum_angle, pendulum_vel (all 1D arrays of length N)
window_duration : length of each shooting window in seconds
sim_dt : MuJoCo physics timestep
substeps : physics substeps per control step
Returns
-------
dict with:
motor_angle, motor_vel, pendulum_angle, pendulum_vel — (N,) arrays
(stitched from per-window simulations)
n_windows — number of windows used
"""
robot_path = Path(robot_path).resolve()
model = _build_model(robot_path, params)
model.opt.timestep = sim_dt
data = mujoco.MjData(model)
times = recording["time"]
actions = recording["action"]
real_motor = recording["motor_angle"]
real_motor_vel = recording["motor_vel"]
real_pend = recording["pendulum_angle"]
real_pend_vel = recording["pendulum_vel"]
n = len(actions)
# Pre-allocate output (stitched from all windows).
sim_motor_angle = np.zeros(n, dtype=np.float64)
sim_motor_vel = np.zeros(n, dtype=np.float64)
sim_pend_angle = np.zeros(n, dtype=np.float64)
sim_pend_vel = np.zeros(n, dtype=np.float64)
# Extract actuator limit info.
nu = model.nu
if nu > 0:
jnt_id = model.actuator_trnid[0, 0]
jnt_limited = bool(model.jnt_limited[jnt_id])
jnt_lo = model.jnt_range[jnt_id, 0]
jnt_hi = model.jnt_range[jnt_id, 1]
gear_sign = float(np.sign(model.actuator_gear[0, 0]))
else:
jnt_limited = False
jnt_lo = jnt_hi = gear_sign = 0.0
# Compute window boundaries from recording timestamps.
t0 = times[0]
t_end = times[-1]
window_starts: list[int] = [] # indices into the recording
current_t = t0
while current_t < t_end:
# Find the index closest to current_t.
idx = int(np.searchsorted(times, current_t))
idx = min(idx, n - 1)
window_starts.append(idx)
current_t += window_duration
n_windows = len(window_starts)
for w, w_start in enumerate(window_starts):
# Window end: next window start, or end of recording.
w_end = window_starts[w + 1] if w + 1 < n_windows else n
# Initialize MuJoCo state from real data at window start.
mujoco.mj_resetData(model, data)
data.qpos[0] = real_motor[w_start]
data.qpos[1] = real_pend[w_start]
data.qvel[0] = real_motor_vel[w_start]
data.qvel[1] = real_pend_vel[w_start]
data.ctrl[:] = 0.0
# Forward kinematics to make state consistent.
mujoco.mj_forward(model, data)
for i in range(w_start, w_end):
data.ctrl[0] = actions[i]
for _ in range(substeps):
if jnt_limited and nu > 0:
pos = data.qpos[jnt_id]
if pos >= jnt_hi and gear_sign * data.ctrl[0] > 0:
data.ctrl[0] = 0.0
elif pos <= jnt_lo and gear_sign * data.ctrl[0] < 0:
data.ctrl[0] = 0.0
mujoco.mj_step(model, data)
sim_motor_angle[i] = data.qpos[0]
sim_motor_vel[i] = data.qvel[0]
sim_pend_angle[i] = data.qpos[1]
sim_pend_vel[i] = data.qvel[1]
return {
"motor_angle": sim_motor_angle,
"motor_vel": sim_motor_vel,
"pendulum_angle": sim_pend_angle,
"pendulum_vel": sim_pend_vel,
"n_windows": n_windows,
}

287
src/sysid/visualize.py Normal file
View File

@@ -0,0 +1,287 @@
"""Visualise system identification results — real vs simulated trajectories.
Loads a recording and runs simulation with both the original and tuned
parameters, then plots a 4-panel comparison (motor angle, motor vel,
pendulum angle, pendulum vel) over time.
Usage:
python -m src.sysid.visualize \
--robot-path assets/rotary_cartpole \
--recording assets/rotary_cartpole/recordings/capture_20260311_120000.npz
# Also compare with tuned parameters:
python -m src.sysid.visualize \
--robot-path assets/rotary_cartpole \
--recording <file>.npz \
--result assets/rotary_cartpole/sysid_result.json
"""
from __future__ import annotations
import argparse
import json
import math
from pathlib import Path
import numpy as np
import structlog
log = structlog.get_logger()
def visualize(
robot_path: str | Path,
recording_path: str | Path,
result_path: str | Path | None = None,
sim_dt: float = 0.002,
substeps: int = 10,
window_duration: float = 0.5,
save_path: str | Path | None = None,
show: bool = True,
) -> None:
"""Generate comparison plot.
Parameters
----------
robot_path : robot asset directory
recording_path : .npz file from capture
result_path : sysid_result.json with best_params (optional)
sim_dt / substeps : physics settings for rollout
window_duration : shooting window length (s); 0 = open-loop
save_path : if provided, save figure to this path (PNG, PDF, …)
show : if True, display interactive matplotlib window
"""
import matplotlib.pyplot as plt
from src.sysid.rollout import (
ROTARY_CARTPOLE_PARAMS,
defaults_vector,
params_to_dict,
rollout,
windowed_rollout,
)
robot_path = Path(robot_path).resolve()
recording = dict(np.load(recording_path))
t = recording["time"]
actions = recording["action"]
# ── Simulate with default parameters ─────────────────────────
default_params = params_to_dict(
defaults_vector(ROTARY_CARTPOLE_PARAMS), ROTARY_CARTPOLE_PARAMS
)
log.info("simulating_default_params", windowed=window_duration > 0)
if window_duration > 0:
sim_default = windowed_rollout(
robot_path=robot_path,
params=default_params,
recording=recording,
window_duration=window_duration,
sim_dt=sim_dt,
substeps=substeps,
)
else:
sim_default = rollout(
robot_path=robot_path,
params=default_params,
actions=actions,
timesteps=t,
sim_dt=sim_dt,
substeps=substeps,
)
# ── Simulate with tuned parameters (if available) ────────────
sim_tuned = None
tuned_cost = None
if result_path is not None:
result_path = Path(result_path)
if result_path.exists():
result = json.loads(result_path.read_text())
tuned_params = result.get("best_params", {})
tuned_cost = result.get("best_cost")
log.info("simulating_tuned_params", cost=tuned_cost)
if window_duration > 0:
sim_tuned = windowed_rollout(
robot_path=robot_path,
params=tuned_params,
recording=recording,
window_duration=window_duration,
sim_dt=sim_dt,
substeps=substeps,
)
else:
sim_tuned = rollout(
robot_path=robot_path,
params=tuned_params,
actions=actions,
timesteps=t,
sim_dt=sim_dt,
substeps=substeps,
)
else:
log.warning("result_file_not_found", path=str(result_path))
else:
# Auto-detect sysid_result.json in robot_path.
auto_result = robot_path / "sysid_result.json"
if auto_result.exists():
result = json.loads(auto_result.read_text())
tuned_params = result.get("best_params", {})
tuned_cost = result.get("best_cost")
log.info("auto_detected_tuned_params", cost=tuned_cost)
if window_duration > 0:
sim_tuned = windowed_rollout(
robot_path=robot_path,
params=tuned_params,
recording=recording,
window_duration=window_duration,
sim_dt=sim_dt,
substeps=substeps,
)
else:
sim_tuned = rollout(
robot_path=robot_path,
params=tuned_params,
actions=actions,
timesteps=t,
sim_dt=sim_dt,
substeps=substeps,
)
# ── Plot ─────────────────────────────────────────────────────
fig, axes = plt.subplots(5, 1, figsize=(14, 12), sharex=True)
channels = [
("motor_angle", "Motor Angle (rad)", True),
("motor_vel", "Motor Velocity (rad/s)", False),
("pendulum_angle", "Pendulum Angle (rad)", True),
("pendulum_vel", "Pendulum Velocity (rad/s)", False),
]
for ax, (key, ylabel, is_angle) in zip(axes[:4], channels):
real = recording[key]
ax.plot(t, real, "k-", linewidth=1.2, alpha=0.8, label="Real")
ax.plot(
t,
sim_default[key],
"--",
color="#d62728",
linewidth=1.0,
alpha=0.7,
label="Sim (original)",
)
if sim_tuned is not None:
ax.plot(
t,
sim_tuned[key],
"--",
color="#2ca02c",
linewidth=1.0,
alpha=0.7,
label="Sim (tuned)",
)
ax.set_ylabel(ylabel)
ax.legend(loc="upper right", fontsize=8)
ax.grid(True, alpha=0.3)
# Action plot (bottom panel).
axes[4].plot(t, actions, "b-", linewidth=0.8, alpha=0.6)
axes[4].set_ylabel("Action (norm)")
axes[4].set_xlabel("Time (s)")
axes[4].grid(True, alpha=0.3)
axes[4].set_ylim(-1.1, 1.1)
# Title with cost info.
title = "System Identification — Real vs Simulated Trajectories"
if tuned_cost is not None:
# Compute original cost for comparison.
from src.sysid.optimize import cost_function
orig_cost = cost_function(
defaults_vector(ROTARY_CARTPOLE_PARAMS),
recording,
robot_path,
ROTARY_CARTPOLE_PARAMS,
sim_dt=sim_dt,
substeps=substeps,
window_duration=window_duration,
)
title += f"\nOriginal cost: {orig_cost:.4f} → Tuned cost: {tuned_cost:.4f}"
improvement = (1.0 - tuned_cost / orig_cost) * 100 if orig_cost > 0 else 0
title += f" ({improvement:+.1f}%)"
fig.suptitle(title, fontsize=12)
plt.tight_layout()
if save_path:
save_path = Path(save_path)
fig.savefig(str(save_path), dpi=150, bbox_inches="tight")
log.info("figure_saved", path=str(save_path))
if show:
plt.show()
else:
plt.close(fig)
# ── CLI entry point ──────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(
description="Visualise system identification results."
)
parser.add_argument(
"--robot-path",
type=str,
default="assets/rotary_cartpole",
)
parser.add_argument(
"--recording",
type=str,
required=True,
help="Path to .npz recording file",
)
parser.add_argument(
"--result",
type=str,
default=None,
help="Path to sysid_result.json (auto-detected if omitted)",
)
parser.add_argument("--sim-dt", type=float, default=0.002)
parser.add_argument("--substeps", type=int, default=10)
parser.add_argument(
"--window-duration",
type=float,
default=0.5,
help="Shooting window length in seconds (0 = open-loop)",
)
parser.add_argument(
"--save",
type=str,
default=None,
help="Save figure to this path (PNG, PDF, …)",
)
parser.add_argument(
"--no-show",
action="store_true",
help="Don't show interactive window (useful for CI)",
)
args = parser.parse_args()
visualize(
robot_path=args.robot_path,
recording_path=args.recording,
result_path=args.result,
sim_dt=args.sim_dt,
substeps=args.substeps,
window_duration=args.window_duration,
save_path=args.save,
show=not args.no_show,
)
if __name__ == "__main__":
main()