"""Hyperparameter optimization for RL-Framework using ClearML + SMAC3. Automatically creates a base training task (via Task.create), reads HPO search ranges from the Hydra config's `training.hpo` and `env.hpo` blocks, and launches SMAC3 Successive Halving optimization. Usage: python scripts/hpo.py env=rotary_cartpole runner=mujoco_single training=ppo_single # With HPO-specific options: python scripts/hpo.py env=rotary_cartpole runner=mujoco_single training=ppo_single \\ --queue gpu-queue --total-trials 100 # Or use an existing base task: python scripts/hpo.py --base-task-id # Dry run (print search space only): python scripts/hpo.py env=rotary_cartpole --dry-run """ from __future__ import annotations import argparse import sys import time from pathlib import Path # Ensure project root is on sys.path _PROJECT_ROOT = str(Path(__file__).resolve().parent.parent) if _PROJECT_ROOT not in sys.path: sys.path.insert(0, _PROJECT_ROOT) import structlog from clearml import Task from clearml.automation import ( DiscreteParameterRange, HyperParameterOptimizer, UniformIntegerParameterRange, UniformParameterRange, ) from omegaconf import OmegaConf logger = structlog.get_logger() def _load_hydra_config( env: str, runner: str, training: str ) -> dict: """Load and merge Hydra configs to extract HPO ranges. We read the YAML files directly (without running Hydra) so this script doesn't need @hydra.main — it's a ClearML optimizer, not a training job. """ configs_dir = Path(__file__).resolve().parent.parent / "configs" # Load training config (handles defaults: [ppo] inheritance) training_path = configs_dir / "training" / f"{training}.yaml" training_cfg = OmegaConf.load(training_path) # If the training config has defaults pointing to a base, load + merge if "defaults" in training_cfg: defaults = OmegaConf.to_container(training_cfg.defaults) base_cfg = OmegaConf.create({}) for d in defaults: if isinstance(d, str): base_path = configs_dir / "training" / f"{d}.yaml" if base_path.exists(): loaded = OmegaConf.load(base_path) base_cfg = OmegaConf.merge(base_cfg, loaded) # Remove defaults key and merge training_no_defaults = { k: v for k, v in OmegaConf.to_container(training_cfg).items() if k != "defaults" } training_cfg = OmegaConf.merge(base_cfg, OmegaConf.create(training_no_defaults)) # Load env config env_path = configs_dir / "env" / f"{env}.yaml" env_cfg = OmegaConf.load(env_path) if env_path.exists() else OmegaConf.create({}) return { "training": OmegaConf.to_container(training_cfg, resolve=True), "env": OmegaConf.to_container(env_cfg, resolve=True), } def _build_hyper_parameters(config: dict) -> list: """Build ClearML parameter ranges from hpo: blocks in config. Reads training.hpo and env.hpo dicts and creates appropriate ClearML parameter range objects. Each hpo entry can have: {min, max} → UniformParameterRange (float) {min, max, type: int} → UniformIntegerParameterRange {min, max, log: true} → UniformParameterRange with log scale {values: [...]} → DiscreteParameterRange """ params = [] for section in ("training", "env"): hpo_ranges = config.get(section, {}).get("hpo", {}) if not hpo_ranges: continue for param_name, spec in hpo_ranges.items(): hydra_key = f"Hydra/{section}.{param_name}" if "values" in spec: params.append( DiscreteParameterRange(hydra_key, values=spec["values"]) ) elif "min" in spec and "max" in spec: if spec.get("type") == "int": params.append( UniformIntegerParameterRange( hydra_key, min_value=int(spec["min"]), max_value=int(spec["max"]), ) ) else: step = spec.get("step", None) params.append( UniformParameterRange( hydra_key, min_value=float(spec["min"]), max_value=float(spec["max"]), step_size=step, ) ) else: logger.warning("skipping_unknown_hpo_spec", param=param_name, spec=spec) return params def _flatten_dict(d: dict, parent_key: str = "", sep: str = ".") -> dict: """Flatten a nested dict into dot-separated keys. Example: {"a": {"b": 1}} → {"a.b": 1} """ items = {} for k, v in d.items(): new_key = f"{parent_key}{sep}{k}" if parent_key else k if isinstance(v, dict): items.update(_flatten_dict(v, new_key, sep=sep)) else: items[new_key] = v return items def _create_base_task( env: str, runner: str, training: str, queue: str ) -> str: """Create a base ClearML task without executing it. Uses Task.create() to register a task pointing at scripts/train.py with the correct Hydra overrides. The HPO optimizer will clone this. The full resolved OmegaConf config is attached as Hydra/* parameters so cloned trial tasks inherit the complete configuration. """ script_path = str(Path(__file__).resolve().parent / "train.py") project_root = str(Path(__file__).resolve().parent.parent) base_task = Task.create( project_name="RL-Framework", task_name=f"{env}-{runner}-{training} (HPO base)", task_type=Task.TaskTypes.training, script=script_path, working_directory=project_root, argparse_args=[ f"env={env}", f"runner={runner}", f"training={training}", ], add_task_init_call=False, ) # ── Attach full resolved OmegaConf config ───────────────────── # ClearML's Hydra binding normally does this when the script runs, # but Task.create() never executes Hydra. We replicate the binding # manually: config group choices + all resolved values. base_task.set_parameter("Hydra/env", env) base_task.set_parameter("Hydra/runner", runner) base_task.set_parameter("Hydra/training", training) # Load and resolve the full config for each group configs_dir = Path(__file__).resolve().parent.parent / "configs" for section, name in [("training", training), ("env", env), ("runner", runner)]: cfg_path = configs_dir / section / f"{name}.yaml" if not cfg_path.exists(): continue cfg = OmegaConf.load(cfg_path) # Handle Hydra defaults: inheritance (e.g. ppo_single → ppo) if "defaults" in cfg: defaults = OmegaConf.to_container(cfg.defaults) base_cfg = OmegaConf.create({}) for d in defaults: if isinstance(d, str): base_path = configs_dir / section / f"{d}.yaml" if base_path.exists(): loaded = OmegaConf.load(base_path) base_cfg = OmegaConf.merge(base_cfg, loaded) cfg_no_defaults = { k: v for k, v in OmegaConf.to_container(cfg).items() if k != "defaults" } cfg = OmegaConf.merge(base_cfg, OmegaConf.create(cfg_no_defaults)) resolved = OmegaConf.to_container(cfg, resolve=True) # Remove hpo metadata — not a real config value resolved.pop("hpo", None) flat = _flatten_dict(resolved) for key, value in flat.items(): base_task.set_parameter(f"Hydra/{section}.{key}", value) # Set docker config base_task.set_base_docker( "registry.kube.optimize/worker-image:latest", docker_setup_bash_script=( "apt-get update && apt-get install -y --no-install-recommends " "libosmesa6-dev libgl1-mesa-glx libglfw3 && rm -rf /var/lib/apt/lists/* " "&& pip install 'jax[cuda12]' mujoco-mjx PyOpenGL PyOpenGL-accelerate" ), docker_arguments=[ "-e", "MUJOCO_GL=osmesa", ], ) req_file = Path(__file__).resolve().parent.parent / "requirements.txt" base_task.set_packages(str(req_file)) task_id = base_task.id logger.info("base_task_created", task_id=task_id, task_name=base_task.name) return task_id def _parse_overrides(argv: list[str]) -> dict[str, str]: """Parse Hydra-style key=value overrides from argv. Returns a dict of parsed key-value pairs. Unknown args (--flags) are left in argv for argparse to handle. """ overrides = {} remaining = [] for arg in argv: if "=" in arg and not arg.startswith("-"): key, value = arg.split("=", 1) overrides[key] = value else: remaining.append(arg) argv.clear() argv.extend(remaining) return overrides def main() -> None: # First pass: extract Hydra-style key=value overrides from sys.argv raw_args = sys.argv[1:] overrides = _parse_overrides(raw_args) parser = argparse.ArgumentParser( description="Hyperparameter optimization for RL-Framework", usage="%(prog)s env= runner= training= [options]", ) parser.add_argument( "--base-task-id", type=str, default=None, help="Existing ClearML task ID to use as base (skip auto-creation)", ) parser.add_argument("--queue", type=str, default="gpu-queue") parser.add_argument( "--max-concurrent", type=int, default=2, help="Maximum concurrent trial tasks", ) parser.add_argument( "--total-trials", type=int, default=200, help="Total HPO trial budget", ) parser.add_argument( "--min-budget", type=int, default=50_000, help="Minimum budget (total_timesteps) per trial", ) parser.add_argument( "--max-budget", type=int, default=500_000, help="Maximum budget (total_timesteps) for promoted trials", ) parser.add_argument("--eta", type=int, default=3, help="Successive halving reduction factor") parser.add_argument( "--max-consecutive-failures", type=int, default=3, help="Abort HPO after N consecutive trial failures (0 = never abort)", ) parser.add_argument( "--time-limit-hours", type=float, default=72, help="Total wall-clock time limit in hours", ) parser.add_argument( "--objective-metric", type=str, default="Reward / Total reward (mean)", help="ClearML scalar metric title to optimize", ) parser.add_argument( "--objective-series", type=str, default=None, help="ClearML scalar metric series (default: same as title)", ) parser.add_argument( "--maximize", action="store_true", default=True, help="Maximize the objective (default)", ) parser.add_argument( "--minimize", action="store_true", default=False, help="Minimize the objective", ) parser.add_argument( "--dry-run", action="store_true", help="Print search space and exit without running", ) args = parser.parse_args(raw_args) # Resolve env/runner/training from Hydra-style overrides (same as train.py) env = overrides.get("env", "rotary_cartpole") runner = overrides.get("runner", "mujoco_single") training = overrides.get("training", "ppo_single") objective_sign = "min" if args.minimize else "max" # ── Load config and build search space ──────────────────────── config = _load_hydra_config(env, runner, training) hyper_parameters = _build_hyper_parameters(config) if not hyper_parameters: logger.error( "no_hpo_ranges_found", hint="Add 'hpo:' blocks to your training and/or env YAML configs", ) return if args.dry_run: print(f"\nSearch space ({len(hyper_parameters)} parameters):") for p in hyper_parameters: print(f" {p.name}: {p}") print(f"\nObjective: {args.objective_metric} ({objective_sign})") return # ── Initialize ClearML HPO task ─────────────────────────────── Task.ignore_requirements("torch") task = Task.init( project_name="RL-Framework", task_name=f"HPO {env}-{runner}-{training}", task_type=Task.TaskTypes.optimizer, reuse_last_task_id=False, ) task.set_base_docker( docker_image="registry.kube.optimize/worker-image:latest", docker_arguments=[ "-e", "CLEARML_AGENT_SKIP_PYTHON_ENV_INSTALL=1", "-e", "CLEARML_AGENT_SKIP_PIP_VENV_INSTALL=1", "-e", "CLEARML_AGENT_FORCE_SYSTEM_SITE_PACKAGES=1", ], ) req_file = Path(__file__).resolve().parent.parent / "requirements.txt" task.set_packages(str(req_file)) # ── Create or reuse base task ───────────────────────────────── # Store the base_task_id on the HPO task so that when the services # worker re-runs this script it reuses the same base task instead # of creating a duplicate. if args.base_task_id: base_task_id = args.base_task_id logger.info("using_existing_base_task", task_id=base_task_id) else: existing = task.get_parameter("General/base_task_id") if existing: base_task_id = existing logger.info("reusing_base_task_from_param", task_id=base_task_id) else: base_task_id = _create_base_task( env, runner, training, args.queue ) task.set_parameter("General/base_task_id", base_task_id) # ── Build objective metric ──────────────────────────────────── # skrl's SequentialTrainer logs "Reward / Total reward (mean)" by default objective_title = args.objective_metric objective_series = args.objective_series or objective_title # ── Launch optimizer ────────────────────────────────────────── from src.hpo.smac3 import OptimizerSMAC optimizer = HyperParameterOptimizer( base_task_id=base_task_id, hyper_parameters=hyper_parameters, objective_metric_title=objective_title, objective_metric_series=objective_series, objective_metric_sign=objective_sign, optimizer_class=OptimizerSMAC, execution_queue=args.queue, max_number_of_concurrent_tasks=args.max_concurrent, total_max_jobs=args.total_trials, min_iteration_per_job=args.min_budget, max_iteration_per_job=args.max_budget, pool_period_min=1, time_limit_per_job=240, # 4 hours per trial max eta=args.eta, budget_param_name="Hydra/training.total_timesteps", max_consecutive_failures=args.max_consecutive_failures, ) # Send this HPO controller to a remote services worker task.execute_remotely(queue_name="services", exit_process=True) # Reporting and time limits optimizer.set_report_period(1) optimizer.set_time_limit(in_minutes=int(args.time_limit_hours * 60)) # Start and wait optimizer.start() optimizer.wait() # Get top experiments max_retries = 5 for attempt in range(max_retries): try: top_exp = optimizer.get_top_experiments(top_k=10) logger.info("top_experiments_retrieved", count=len(top_exp)) for i, t in enumerate(top_exp): logger.info("top_experiment", rank=i + 1, task_id=t.id, name=t.name) break except Exception as e: logger.warning("retry_get_top_experiments", attempt=attempt + 1, error=str(e)) if attempt < max_retries - 1: time.sleep(5.0 * (2 ** attempt)) else: logger.error("could_not_retrieve_top_experiments") optimizer.stop() logger.info("hpo_complete") if __name__ == "__main__": main()