from __future__ import annotations import time import os import json import math import hashlib import traceback from dataclasses import dataclass from typing import Dict, Any, List, Tuple, Optional from concurrent.futures import ProcessPoolExecutor, as_completed import numpy as np import pandas as pd from contextlib import contextmanager import sys @contextmanager def suppress_output(enabled: bool = True): if not enabled: yield return with open(os.devnull, "w") as devnull: old_out, old_err = sys.stdout, sys.stderr sys.stdout, sys.stderr = devnull, devnull try: yield finally: sys.stdout, sys.stderr = old_out, old_err import gymnasium as gym import sinergym from unihvac.find_files import ( detect_paths, find_manifest, load_manifest_records, get_paths_from_manifest_record, ) from unihvac.rollout import run_rollout_to_df from unihvac.rewards import RewardConfig, compute_rewards_vectorized, compute_terminals, config_to_meta # ====================================================================================== # USER CONFIG # ====================================================================================== BUILDING = "OfficeSmall" PREFER_PATCHED = True OUTPUTS_DIRNAME = "traj_results" SAVE_DIRNAME = "TrajectoryData_officesmall" EPISODES_PER_RECORD = 1 QUIET_WORKERS = False BEHAVIORS = [ "rbc_21_24", "random_walk", "piecewise", "sinusoid", "aggressive", ] TIME_STEP_HOURS = 900.0 / 3600.0 # 0.25 HTG_MIN, HTG_MAX = 18.0, 24.0 CLG_MIN, CLG_MAX = 22.0, 30.0 DEADBAND_MIN = 1.0 MAX_STEPS = None VERBOSE_ROLLOUT = True NUM_WORKERS = 16 BASE_SEED = 123 RESUME = True REWARD_CFG = RewardConfig(version="v1_energy_only", w_energy=1.0, w_comfort=0.0) # ====================================================================================== # VARIABLES / ACTUATORS (copy from your baseline runner) # ====================================================================================== hot_actuators = { "Htg_Core": ("Zone Temperature Control", "Heating Setpoint", "CORE_ZN"), "Clg_Core": ("Zone Temperature Control", "Cooling Setpoint", "CORE_ZN"), "Htg_P1": ("Zone Temperature Control", "Heating Setpoint", "PERIMETER_ZN_1"), "Clg_P1": ("Zone Temperature Control", "Cooling Setpoint", "PERIMETER_ZN_1"), "Htg_P2": ("Zone Temperature Control", "Heating Setpoint", "PERIMETER_ZN_2"), "Clg_P2": ("Zone Temperature Control", "Cooling Setpoint", "PERIMETER_ZN_2"), "Htg_P3": ("Zone Temperature Control", "Heating Setpoint", "PERIMETER_ZN_3"), "Clg_P3": ("Zone Temperature Control", "Cooling Setpoint", "PERIMETER_ZN_3"), "Htg_P4": ("Zone Temperature Control", "Heating Setpoint", "PERIMETER_ZN_4"), "Clg_P4": ("Zone Temperature Control", "Cooling Setpoint", "PERIMETER_ZN_4"), } hot_variables = { "outdoor_temp": ("Site Outdoor Air DryBulb Temperature", "Environment"), "core_temp": ("Zone Air Temperature", "Core_ZN"), "perim1_temp": ("Zone Air Temperature", "Perimeter_ZN_1"), "perim2_temp": ("Zone Air Temperature", "Perimeter_ZN_2"), "perim3_temp": ("Zone Air Temperature", "Perimeter_ZN_3"), "perim4_temp": ("Zone Air Temperature", "Perimeter_ZN_4"), "elec_power": ("Facility Total HVAC Electricity Demand Rate", "Whole Building"), "core_occ_count": ("Zone People Occupant Count", "CORE_ZN"), "perim1_occ_count": ("Zone People Occupant Count", "PERIMETER_ZN_1"), "perim2_occ_count": ("Zone People Occupant Count", "PERIMETER_ZN_2"), "perim3_occ_count": ("Zone People Occupant Count", "PERIMETER_ZN_3"), "perim4_occ_count": ("Zone People Occupant Count", "PERIMETER_ZN_4"), "outdoor_dewpoint": ("Site Outdoor Air Dewpoint Temperature", "Environment"), "outdoor_wetbulb": ("Site Outdoor Air Wetbulb Temperature", "Environment"), "core_rh": ("Zone Air Relative Humidity", "CORE_ZN"), "perim1_rh": ("Zone Air Relative Humidity", "PERIMETER_ZN_1"), "perim2_rh": ("Zone Air Relative Humidity", "PERIMETER_ZN_2"), "perim3_rh": ("Zone Air Relative Humidity", "PERIMETER_ZN_3"), "perim4_rh": ("Zone Air Relative Humidity", "PERIMETER_ZN_4"), "core_ash55_notcomfortable_summer": ( "Zone Thermal Comfort ASHRAE 55 Simple Model Summer Clothes Not Comfortable Time", "CORE_ZN", ), "core_ash55_notcomfortable_winter": ( "Zone Thermal Comfort ASHRAE 55 Simple Model Winter Clothes Not Comfortable Time", "CORE_ZN", ), "core_ash55_notcomfortable_any": ( "Zone Thermal Comfort ASHRAE 55 Simple Model Summer or Winter Clothes Not Comfortable Time", "CORE_ZN", ), "p1_ash55_notcomfortable_any": ( "Zone Thermal Comfort ASHRAE 55 Simple Model Summer or Winter Clothes Not Comfortable Time", "PERIMETER_ZN_1", ), "p2_ash55_notcomfortable_any": ( "Zone Thermal Comfort ASHRAE 55 Simple Model Summer or Winter Clothes Not Comfortable Time", "PERIMETER_ZN_2", ), "p3_ash55_notcomfortable_any": ( "Zone Thermal Comfort ASHRAE 55 Simple Model Summer or Winter Clothes Not Comfortable Time", "PERIMETER_ZN_3", ), "p4_ash55_notcomfortable_any": ( "Zone Thermal Comfort ASHRAE 55 Simple Model Summer or Winter Clothes Not Comfortable Time", "PERIMETER_ZN_4", ), } def stable_hash_int(s: str, mod: int = 1000) -> int: h = hashlib.md5(s.encode("utf-8")).hexdigest() return int(h[:8], 16) % mod def record_id(rec: Dict[str, Any]) -> str: loc = rec.get("location", "UNKNOWN") vname = rec.get("variation_name", "UNKNOWN") btype = rec.get("building_type", BUILDING) raw = f"{btype}__{loc}__{vname}" safe = "".join(c if c.isalnum() or c in "._-=" else "_" for c in raw) return safe def _enforce_bounds(htg: float, clg: float) -> Tuple[float, float]: h = float(np.clip(htg, HTG_MIN, HTG_MAX)) c = float(np.clip(clg, CLG_MIN, CLG_MAX)) if c < h + DEADBAND_MIN: c = min(CLG_MAX, h + DEADBAND_MIN) return h, c def action_from_setpoints(htg: float, clg: float) -> np.ndarray: h, c = _enforce_bounds(htg, clg) return np.array([h, c] * 5, dtype=np.float32) @dataclass class PolicyRecorder: behavior: str rng: np.random.Generator timestep_hours: float last_htg: float = 21.0 last_clg: float = 24.0 piece_until: int = 0 piece_htg: float = 21.0 piece_clg: float = 24.0 def __post_init__(self): self.actions: List[np.ndarray] = [] def policy(self, obs: Any, info: Dict[str, Any], step: int) -> np.ndarray: b = self.behavior if b == "rbc_21_24": htg, clg = 21.0, 24.0 elif b == "random_walk": if step == 0: self.last_htg, self.last_clg = 21.0, 24.0 dh = self.rng.normal(0.0, 0.15) dc = self.rng.normal(0.0, 0.20) if (step % int(6 / self.timestep_hours)) == 0: dh += self.rng.normal(0.0, 0.6) dc += self.rng.normal(0.0, 0.8) htg = self.last_htg + dh clg = self.last_clg + dc htg, clg = _enforce_bounds(htg, clg) self.last_htg, self.last_clg = htg, clg elif b == "piecewise": if step >= self.piece_until: hours = float(self.rng.choice([2, 3, 4, 6, 8, 12])) dur_steps = max(1, int(round(hours / self.timestep_hours))) self.piece_until = step + dur_steps htg = float(self.rng.uniform(HTG_MIN, HTG_MAX)) clg = float(self.rng.uniform(max(CLG_MIN, htg + DEADBAND_MIN), CLG_MAX)) self.piece_htg, self.piece_clg = _enforce_bounds(htg, clg) htg, clg = self.piece_htg, self.piece_clg elif b == "sinusoid": t_hours = step * self.timestep_hours phase = 2.0 * math.pi * (t_hours / 24.0) htg = 21.0 + 1.0 * math.sin(phase - 0.5) + self.rng.normal(0.0, 0.10) clg = 24.5 + 1.5 * math.sin(phase) + self.rng.normal(0.0, 0.12) htg, clg = _enforce_bounds(htg, clg) elif b == "aggressive": block = int((step * self.timestep_hours) // 6) % 2 if block == 0: htg = float(self.rng.uniform(21.0, 23.5)) clg = float(self.rng.uniform(23.5, 25.5)) else: htg = float(self.rng.uniform(HTG_MIN, 20.5)) clg = float(self.rng.uniform(26.0, CLG_MAX)) htg, clg = _enforce_bounds(htg, clg) else: htg, clg = 21.0, 24.0 a = action_from_setpoints(htg, clg) self.actions.append(a) return a def select_state_columns(df: pd.DataFrame) -> List[str]: base = list(hot_variables.keys()) time_candidates = [ "month", "day", "hour", "day_of_week", "is_weekend", "minute", "time", "timestep", ] cols = [] for c in base + time_candidates: if c in df.columns: cols.append(c) if not cols: bad = set(["done", "terminated", "truncated"]) cols = [c for c in df.columns if c not in bad and pd.api.types.is_numeric_dtype(df[c])] return cols def build_npz_payload( df: pd.DataFrame, actions: np.ndarray, meta: Dict[str, Any], ) -> Dict[str, Any]: state_cols = select_state_columns(df) obs = df[state_cols].to_numpy(dtype=np.float32) rewards = compute_rewards_vectorized(df, timestep_hours=TIME_STEP_HOURS, cfg=REWARD_CFG) terminals = compute_terminals(df) meta = dict(meta) meta["reward_cfg"] = config_to_meta(REWARD_CFG) action_keys = [ "htg_core", "clg_core", "htg_p1", "clg_p1", "htg_p2", "clg_p2", "htg_p3", "clg_p3", "htg_p4", "clg_p4", ] payload = { "observations": obs, "actions": actions.astype(np.float32), "rewards": rewards, "terminals": terminals, "state_keys": np.array(state_cols, dtype=object), "action_keys": np.array(action_keys, dtype=object), "meta": np.array([json.dumps(meta)], dtype=object), } return payload def save_npz(path: str, payload: Dict[str, Any]) -> None: os.makedirs(os.path.dirname(path), exist_ok=True) np.savez_compressed(path, **payload) def run_one_episode( rec: Dict[str, Any], behavior: str, episode_idx: int, outputs_root: str, save_root: str, seed: int, ) -> Optional[str]: rid = record_id(rec) bpath, wpath = get_paths_from_manifest_record(rec) out_dir = os.path.join(outputs_root, OUTPUTS_DIRNAME, rid, behavior, f"ep{episode_idx:03d}") os.makedirs(out_dir, exist_ok=True) traj_dir = os.path.join(save_root, rid, behavior) traj_path = os.path.join(traj_dir, f"traj_ep{episode_idx:03d}_seed{seed}.npz") if RESUME and os.path.exists(traj_path): return traj_path rng = np.random.default_rng(seed) recorder = PolicyRecorder(behavior=behavior, rng=rng, timestep_hours=TIME_STEP_HOURS) with suppress_output(QUIET_WORKERS): df = run_rollout_to_df( building_path=str(bpath), weather_path=str(wpath), variables=hot_variables, actuators=hot_actuators, policy_fn=recorder.policy, location=str(rec.get("location", rec.get("climate", "UNKNOWN"))), timestep_hours=TIME_STEP_HOURS, heating_sp=21.0, cooling_sp=24.0, reward=None, max_steps=MAX_STEPS, verbose=VERBOSE_ROLLOUT, ) actions = np.stack(recorder.actions, axis=0) if recorder.actions else np.zeros((len(df), 10), dtype=np.float32) T = len(df) if actions.shape[0] > T: actions = actions[:T] elif actions.shape[0] < T: pad = np.repeat(actions[-1][None, :], T - actions.shape[0], axis=0) if actions.shape[0] > 0 else np.zeros((T, 10), dtype=np.float32) actions = np.concatenate([actions, pad], axis=0) if len(df) == actions.shape[0] and len(df) > 0: df["setpoint_htg"] = actions[:, 0] df["setpoint_clg"] = actions[:, 1] meta = { "record_id": rid, "behavior": behavior, "episode_idx": episode_idx, "seed": seed, "building_path": str(bpath), "weather_path": str(wpath), "location": rec.get("location", rec.get("climate")), "thermal": rec.get("thermal", rec.get("thermal_variation")), "occupancy": rec.get("occupancy", rec.get("occupancy_variation")), "timestep_hours": TIME_STEP_HOURS, "state_cols": select_state_columns(df), } payload = build_npz_payload(df=df, actions=actions, meta=meta) save_npz(traj_path, payload) df.to_csv(os.path.join(traj_dir, f"timeseries_ep{episode_idx:03d}_seed{seed}.csv"), index=False) return traj_path def main(): paths = detect_paths(outputs_dirname=OUTPUTS_DIRNAME) manifest_path = find_manifest(paths, building=BUILDING, prefer_patched=PREFER_PATCHED) records = load_manifest_records(manifest_path) outputs_root = str(paths.outputs_root) save_root = os.path.join(outputs_root, SAVE_DIRNAME) os.makedirs(save_root, exist_ok=True) tasks = [] task_id = 0 for rec_idx, rec in enumerate(records): for behavior in BEHAVIORS: for ep in range(EPISODES_PER_RECORD): seed = BASE_SEED + (rec_idx * 100000) + (stable_hash_int(behavior, 100000)) + ep tasks.append((task_id, rec, behavior, ep, seed)) task_id += 1 t0 = time.time() successes = 0 failures = 0 saved_paths: List[str] = [] if NUM_WORKERS <= 1: for tid, rec, behavior, ep, seed in tasks: try: p = run_one_episode( rec=rec, behavior=behavior, episode_idx=ep, outputs_root=outputs_root, save_root=save_root, seed=seed, ) if p: saved_paths.append(p) successes += 1 if successes % 10 == 0: elapsed = time.time() - t0 done = successes + failures rate = done / elapsed if elapsed > 0 else 0.0 except Exception as e: failures += 1 rid = record_id(rec) print(f"[ERROR] tid={tid} record={rid} behavior={behavior} ep={ep}: {e}") print(traceback.format_exc()) else: with ProcessPoolExecutor(max_workers=NUM_WORKERS) as ex: futs = [] for tid, rec, behavior, ep, seed in tasks: futs.append(ex.submit( run_one_episode, rec, behavior, ep, outputs_root, save_root, seed )) for i, fut in enumerate(as_completed(futs), 1): try: p = fut.result() if p: saved_paths.append(p) successes += 1 except Exception as e: failures += 1 print(f"[ERROR] future failed: {e}") if i % 25 == 0 or i == len(futs): elapsed = time.time() - t0 rate = i / elapsed if elapsed > 0 else 0.0 print(f"[progress] done={i}/{len(futs)} success={successes} fail={failures} rate={rate:.2f} eps/s elapsed={elapsed:.1f}s") print("\nDONE") if saved_paths: print("Example saved file:", saved_paths[0]) print("Save root:", save_root) if __name__ == "__main__": main()