Files
RL-Sim-Framework/scripts/hpo.py
2026-03-11 23:07:37 +01:00

350 lines
12 KiB
Python

"""Hyperparameter optimization for RL-Framework using ClearML + SMAC3.
Automatically creates a base training task (via Task.create), reads HPO
search ranges from the Hydra config's `training.hpo` and `env.hpo` blocks,
and launches SMAC3 Successive Halving optimization.
Usage:
python scripts/hpo.py \
--env rotary_cartpole \
--runner mujoco_single \
--training ppo_single \
--queue gpu-queue
# Or use an existing base task:
python scripts/hpo.py --base-task-id <TASK_ID>
"""
from __future__ import annotations
import argparse
import sys
import time
from pathlib import Path
# Ensure project root is on sys.path
_PROJECT_ROOT = str(Path(__file__).resolve().parent.parent)
if _PROJECT_ROOT not in sys.path:
sys.path.insert(0, _PROJECT_ROOT)
import structlog
from clearml import Task
from clearml.automation import (
DiscreteParameterRange,
HyperParameterOptimizer,
UniformIntegerParameterRange,
UniformParameterRange,
)
from omegaconf import OmegaConf
logger = structlog.get_logger()
def _load_hydra_config(
env: str, runner: str, training: str
) -> dict:
"""Load and merge Hydra configs to extract HPO ranges.
We read the YAML files directly (without running Hydra) so this script
doesn't need @hydra.main — it's a ClearML optimizer, not a training job.
"""
configs_dir = Path(__file__).resolve().parent.parent / "configs"
# Load training config (handles defaults: [ppo] inheritance)
training_path = configs_dir / "training" / f"{training}.yaml"
training_cfg = OmegaConf.load(training_path)
# If the training config has defaults pointing to a base, load + merge
if "defaults" in training_cfg:
defaults = OmegaConf.to_container(training_cfg.defaults)
base_cfg = OmegaConf.create({})
for d in defaults:
if isinstance(d, str):
base_path = configs_dir / "training" / f"{d}.yaml"
if base_path.exists():
loaded = OmegaConf.load(base_path)
base_cfg = OmegaConf.merge(base_cfg, loaded)
# Remove defaults key and merge
training_no_defaults = {
k: v for k, v in OmegaConf.to_container(training_cfg).items()
if k != "defaults"
}
training_cfg = OmegaConf.merge(base_cfg, OmegaConf.create(training_no_defaults))
# Load env config
env_path = configs_dir / "env" / f"{env}.yaml"
env_cfg = OmegaConf.load(env_path) if env_path.exists() else OmegaConf.create({})
return {
"training": OmegaConf.to_container(training_cfg, resolve=True),
"env": OmegaConf.to_container(env_cfg, resolve=True),
}
def _build_hyper_parameters(config: dict) -> list:
"""Build ClearML parameter ranges from hpo: blocks in config.
Reads training.hpo and env.hpo dicts and creates appropriate
ClearML parameter range objects.
Each hpo entry can have:
{min, max} → UniformParameterRange (float)
{min, max, type: int} → UniformIntegerParameterRange
{min, max, log: true} → UniformParameterRange with log scale
{values: [...]} → DiscreteParameterRange
"""
params = []
for section in ("training", "env"):
hpo_ranges = config.get(section, {}).get("hpo", {})
if not hpo_ranges:
continue
for param_name, spec in hpo_ranges.items():
hydra_key = f"Hydra/{section}.{param_name}"
if "values" in spec:
params.append(
DiscreteParameterRange(hydra_key, values=spec["values"])
)
elif "min" in spec and "max" in spec:
if spec.get("type") == "int":
params.append(
UniformIntegerParameterRange(
hydra_key,
min_value=int(spec["min"]),
max_value=int(spec["max"]),
)
)
else:
step = spec.get("step", None)
params.append(
UniformParameterRange(
hydra_key,
min_value=float(spec["min"]),
max_value=float(spec["max"]),
step_size=step,
)
)
else:
logger.warning("skipping_unknown_hpo_spec", param=param_name, spec=spec)
return params
def _create_base_task(
env: str, runner: str, training: str, queue: str
) -> str:
"""Create a base ClearML task without executing it.
Uses Task.create() to register a task pointing at scripts/train.py
with the correct Hydra overrides. The HPO optimizer will clone this.
"""
script_path = str(Path(__file__).resolve().parent / "train.py")
project_root = str(Path(__file__).resolve().parent.parent)
base_task = Task.create(
project_name="RL-Framework",
task_name=f"{env}-{runner}-{training} (HPO base)",
task_type=Task.TaskTypes.training,
script=script_path,
working_directory=project_root,
argparse_args=[
f"env={env}",
f"runner={runner}",
f"training={training}",
],
add_task_init_call=False,
)
# Explicitly set Hydra config-group choices so cloned tasks
# pick up the correct env / runner / training groups.
# Task.create() does not populate the Hydra parameter section
# because Hydra never actually runs during creation.
base_task.set_parameter("Hydra/env", env)
base_task.set_parameter("Hydra/runner", runner)
base_task.set_parameter("Hydra/training", training)
# Set docker config
base_task.set_base_docker(
"registry.kube.optimize/worker-image:latest",
docker_setup_bash_script=(
"apt-get update && apt-get install -y --no-install-recommends "
"libosmesa6 libgl1-mesa-glx libglfw3 && rm -rf /var/lib/apt/lists/* "
"&& pip install 'jax[cuda12]' mujoco-mjx"
),
)
req_file = Path(__file__).resolve().parent.parent / "requirements.txt"
base_task.set_packages(str(req_file))
task_id = base_task.id
logger.info("base_task_created", task_id=task_id, task_name=base_task.name)
return task_id
def main() -> None:
parser = argparse.ArgumentParser(
description="Hyperparameter optimization for RL-Framework"
)
parser.add_argument(
"--base-task-id",
type=str,
default=None,
help="Existing ClearML task ID to use as base (skip auto-creation)",
)
parser.add_argument("--env", type=str, default="rotary_cartpole")
parser.add_argument("--runner", type=str, default="mujoco_single")
parser.add_argument("--training", type=str, default="ppo_single")
parser.add_argument("--queue", type=str, default="gpu-queue")
parser.add_argument(
"--max-concurrent", type=int, default=2,
help="Maximum concurrent trial tasks",
)
parser.add_argument(
"--total-trials", type=int, default=200,
help="Total HPO trial budget",
)
parser.add_argument(
"--min-budget", type=int, default=50_000,
help="Minimum budget (total_timesteps) per trial",
)
parser.add_argument(
"--max-budget", type=int, default=500_000,
help="Maximum budget (total_timesteps) for promoted trials",
)
parser.add_argument("--eta", type=int, default=3, help="Successive halving reduction factor")
parser.add_argument(
"--time-limit-hours", type=float, default=72,
help="Total wall-clock time limit in hours",
)
parser.add_argument(
"--objective-metric", type=str, default="Reward / Total reward (mean)",
help="ClearML scalar metric title to optimize",
)
parser.add_argument(
"--objective-series", type=str, default=None,
help="ClearML scalar metric series (default: same as title)",
)
parser.add_argument(
"--maximize", action="store_true", default=True,
help="Maximize the objective (default)",
)
parser.add_argument(
"--minimize", action="store_true", default=False,
help="Minimize the objective",
)
parser.add_argument(
"--dry-run", action="store_true",
help="Print search space and exit without running",
)
args = parser.parse_args()
objective_sign = "min" if args.minimize else "max"
# ── Load config and build search space ────────────────────────
config = _load_hydra_config(args.env, args.runner, args.training)
hyper_parameters = _build_hyper_parameters(config)
if not hyper_parameters:
logger.error(
"no_hpo_ranges_found",
hint="Add 'hpo:' blocks to your training and/or env YAML configs",
)
return
if args.dry_run:
print(f"\nSearch space ({len(hyper_parameters)} parameters):")
for p in hyper_parameters:
print(f" {p.name}: {p}")
print(f"\nObjective: {args.objective_metric} ({objective_sign})")
return
# ── Create or reuse base task ─────────────────────────────────
if args.base_task_id:
base_task_id = args.base_task_id
logger.info("using_existing_base_task", task_id=base_task_id)
else:
base_task_id = _create_base_task(
args.env, args.runner, args.training, args.queue
)
# ── Initialize ClearML HPO task ───────────────────────────────
Task.ignore_requirements("torch")
task = Task.init(
project_name="RL-Framework",
task_name=f"HPO {args.env}-{args.runner}-{args.training}",
task_type=Task.TaskTypes.optimizer,
reuse_last_task_id=False,
)
task.set_base_docker(
docker_image="registry.kube.optimize/worker-image:latest",
docker_arguments=[
"-e", "CLEARML_AGENT_SKIP_PYTHON_ENV_INSTALL=1",
"-e", "CLEARML_AGENT_SKIP_PIP_VENV_INSTALL=1",
"-e", "CLEARML_AGENT_FORCE_SYSTEM_SITE_PACKAGES=1",
],
)
req_file = Path(__file__).resolve().parent.parent / "requirements.txt"
task.set_packages(str(req_file))
# ── Build objective metric ────────────────────────────────────
# skrl's SequentialTrainer logs "Reward / Total reward (mean)" by default
objective_title = args.objective_metric
objective_series = args.objective_series or objective_title
# ── Launch optimizer ──────────────────────────────────────────
from src.hpo.smac3 import OptimizerSMAC
optimizer = HyperParameterOptimizer(
base_task_id=base_task_id,
hyper_parameters=hyper_parameters,
objective_metric_title=objective_title,
objective_metric_series=objective_series,
objective_metric_sign=objective_sign,
optimizer_class=OptimizerSMAC,
execution_queue=args.queue,
max_number_of_concurrent_tasks=args.max_concurrent,
total_max_jobs=args.total_trials,
min_iteration_per_job=args.min_budget,
max_iteration_per_job=args.max_budget,
pool_period_min=1,
time_limit_per_job=240, # 4 hours per trial max
eta=args.eta,
budget_param_name="Hydra/training.total_timesteps",
)
# Send this HPO controller to a remote services worker
task.execute_remotely(queue_name="services", exit_process=True)
# Reporting and time limits
optimizer.set_report_period(1)
optimizer.set_time_limit(in_minutes=int(args.time_limit_hours * 60))
# Start and wait
optimizer.start()
optimizer.wait()
# Get top experiments
max_retries = 5
for attempt in range(max_retries):
try:
top_exp = optimizer.get_top_experiments(top_k=10)
logger.info("top_experiments_retrieved", count=len(top_exp))
for i, t in enumerate(top_exp):
logger.info("top_experiment", rank=i + 1, task_id=t.id, name=t.name)
break
except Exception as e:
logger.warning("retry_get_top_experiments", attempt=attempt + 1, error=str(e))
if attempt < max_retries - 1:
time.sleep(5.0 * (2 ** attempt))
else:
logger.error("could_not_retrieve_top_experiments")
optimizer.stop()
logger.info("hpo_complete")
if __name__ == "__main__":
main()