Files
RL-Sim-Framework/tests/test_sim2real.py
Victor Mylle 8cc84d6a21 feat: RMA-style history-conditioned policy for sim2real adaptation
Added a temporal observation history buffer and 1D-CNN encoder so the
policy can implicitly infer environment parameters (mass, friction,
gear ratios, etc.) from recent (obs, action) dynamics.

Architecture:
  history window [(obs₀,a₀), ..., (obs_{H-1},a_{H-1})]
      → 1D-CNN HistoryEncoder → embedding (32-dim)
      → concat [current_obs, embedding] → MLP → action

Components:
- BaseRunner: history ring buffer, _push_history/_reset_history,
  augmented obs space (6 + H×7 = 76 with H=10)
- HistoryEncoder (src/models/mlp.py): 2-layer temporal Conv1d + GAP
- SharedMLP: optional history_length/raw_obs_dim/embedding_dim params;
  splits augmented obs, encodes history, feeds [obs, emb] to MLP
- TrainerConfig: history_length, embedding_dim fields
- All runner configs: history_length=10 by default
- Tests: encoder shape, model with/without history, config defaults
2026-03-28 18:58:24 +01:00

95 lines
3.4 KiB
Python

"""Unit tests for MuJoCoRunner domain randomization and history buffer."""
import dataclasses
import numpy as np
import pytest
import torch
from gymnasium import spaces
from src.runners.mujoco import DomainRandConfig, MuJoCoRunnerConfig
from src.models.mlp import SharedMLP, HistoryEncoder
class TestDomainRandConfig:
def test_default_all_zero(self) -> None:
cfg = DomainRandConfig()
assert cfg.mass_frac == 0.0
assert cfg.friction_frac == 0.0
assert cfg.gear_frac == 0.0
def test_from_dict(self) -> None:
d = {"mass_frac": 0.15, "gear_frac": 0.1}
cfg = DomainRandConfig(**d)
assert cfg.mass_frac == 0.15
assert cfg.gear_frac == 0.1
assert cfg.damping_frac == 0.0 # not set
class TestMuJoCoRunnerConfig:
def test_default_dr_disabled(self) -> None:
cfg = MuJoCoRunnerConfig()
assert isinstance(cfg.domain_rand, DomainRandConfig)
assert cfg.domain_rand.mass_frac == 0.0
def test_domain_rand_from_dict(self) -> None:
"""Hydra passes nested configs as dicts — test __post_init__ converts."""
cfg = MuJoCoRunnerConfig(
domain_rand={"mass_frac": 0.2, "friction_frac": 0.3}, # type: ignore[arg-type]
)
assert isinstance(cfg.domain_rand, DomainRandConfig)
assert cfg.domain_rand.mass_frac == 0.2
assert cfg.domain_rand.friction_frac == 0.3
def test_history_length_default(self) -> None:
cfg = MuJoCoRunnerConfig()
assert cfg.history_length == 0
class TestHistoryEncoder:
def test_output_shape(self) -> None:
enc = HistoryEncoder(history_length=10, step_dim=7, embedding_dim=32)
x = torch.randn(4, 10, 7) # batch=4, H=10, step_dim=7
out = enc(x)
assert out.shape == (4, 32)
def test_different_embedding_dim(self) -> None:
enc = HistoryEncoder(history_length=5, step_dim=7, embedding_dim=16)
x = torch.randn(2, 5, 7)
out = enc(x)
assert out.shape == (2, 16)
class TestSharedMLPWithHistory:
def test_no_history(self) -> None:
"""Without history, model works as before."""
obs_space = spaces.Box(low=-1.0, high=1.0, shape=(6,))
act_space = spaces.Box(low=-1.0, high=1.0, shape=(1,))
model = SharedMLP(obs_space, act_space, torch.device("cpu"),
hidden_sizes=(32, 32))
assert model.history_encoder is None
inp = {"states": torch.randn(4, 6)}
mean, log_std, _ = model.compute(inp, role="policy")
assert mean.shape == (4, 1)
def test_with_history(self) -> None:
"""With history, model splits obs and encodes history."""
raw_obs_dim = 6
act_dim = 1
H = 10
step_dim = raw_obs_dim + act_dim # 7
aug_dim = raw_obs_dim + H * step_dim # 6 + 70 = 76
obs_space = spaces.Box(low=-1.0, high=1.0, shape=(aug_dim,))
act_space = spaces.Box(low=-1.0, high=1.0, shape=(act_dim,))
model = SharedMLP(obs_space, act_space, torch.device("cpu"),
hidden_sizes=(32, 32),
history_length=H, raw_obs_dim=raw_obs_dim,
embedding_dim=32)
assert model.history_encoder is not None
inp = {"states": torch.randn(4, aug_dim)}
mean, log_std, _ = model.compute(inp, role="policy")
assert mean.shape == (4, act_dim)
value, _ = model.compute(inp, role="value")
assert value.shape == (4, 1)