Controller / data_generator /trajectory_generator.py
Gen-HVAC's picture
Upload 2 files
831718a verified
from __future__ import annotations
import time
import os
import json
import math
import hashlib
import traceback
from dataclasses import dataclass
from typing import Dict, Any, List, Tuple, Optional
from concurrent.futures import ProcessPoolExecutor, as_completed
import numpy as np
import pandas as pd
from contextlib import contextmanager
import sys
@contextmanager
def suppress_output(enabled: bool = True):
if not enabled:
yield
return
with open(os.devnull, "w") as devnull:
old_out, old_err = sys.stdout, sys.stderr
sys.stdout, sys.stderr = devnull, devnull
try:
yield
finally:
sys.stdout, sys.stderr = old_out, old_err
import gymnasium as gym
import sinergym
from unihvac.find_files import (
detect_paths,
find_manifest,
load_manifest_records,
get_paths_from_manifest_record,
)
from unihvac.rollout import run_rollout_to_df
from unihvac.rewards import RewardConfig, compute_rewards_vectorized, compute_terminals, config_to_meta
# ======================================================================================
# USER CONFIG
# ======================================================================================
BUILDING = "OfficeSmall"
PREFER_PATCHED = True
OUTPUTS_DIRNAME = "traj_results"
SAVE_DIRNAME = "TrajectoryData_officesmall"
EPISODES_PER_RECORD = 1
QUIET_WORKERS = False
BEHAVIORS = [
"rbc_21_24",
"random_walk",
"piecewise",
"sinusoid",
"aggressive",
]
TIME_STEP_HOURS = 900.0 / 3600.0 # 0.25
HTG_MIN, HTG_MAX = 18.0, 24.0
CLG_MIN, CLG_MAX = 22.0, 30.0
DEADBAND_MIN = 1.0
MAX_STEPS = None
VERBOSE_ROLLOUT = True
NUM_WORKERS = 16
BASE_SEED = 123
RESUME = True
REWARD_CFG = RewardConfig(version="v1_energy_only", w_energy=1.0, w_comfort=0.0)
# ======================================================================================
# VARIABLES / ACTUATORS (copy from your baseline runner)
# ======================================================================================
hot_actuators = {
"Htg_Core": ("Zone Temperature Control", "Heating Setpoint", "CORE_ZN"),
"Clg_Core": ("Zone Temperature Control", "Cooling Setpoint", "CORE_ZN"),
"Htg_P1": ("Zone Temperature Control", "Heating Setpoint", "PERIMETER_ZN_1"),
"Clg_P1": ("Zone Temperature Control", "Cooling Setpoint", "PERIMETER_ZN_1"),
"Htg_P2": ("Zone Temperature Control", "Heating Setpoint", "PERIMETER_ZN_2"),
"Clg_P2": ("Zone Temperature Control", "Cooling Setpoint", "PERIMETER_ZN_2"),
"Htg_P3": ("Zone Temperature Control", "Heating Setpoint", "PERIMETER_ZN_3"),
"Clg_P3": ("Zone Temperature Control", "Cooling Setpoint", "PERIMETER_ZN_3"),
"Htg_P4": ("Zone Temperature Control", "Heating Setpoint", "PERIMETER_ZN_4"),
"Clg_P4": ("Zone Temperature Control", "Cooling Setpoint", "PERIMETER_ZN_4"),
}
hot_variables = {
"outdoor_temp": ("Site Outdoor Air DryBulb Temperature", "Environment"),
"core_temp": ("Zone Air Temperature", "Core_ZN"),
"perim1_temp": ("Zone Air Temperature", "Perimeter_ZN_1"),
"perim2_temp": ("Zone Air Temperature", "Perimeter_ZN_2"),
"perim3_temp": ("Zone Air Temperature", "Perimeter_ZN_3"),
"perim4_temp": ("Zone Air Temperature", "Perimeter_ZN_4"),
"elec_power": ("Facility Total HVAC Electricity Demand Rate", "Whole Building"),
"core_occ_count": ("Zone People Occupant Count", "CORE_ZN"),
"perim1_occ_count": ("Zone People Occupant Count", "PERIMETER_ZN_1"),
"perim2_occ_count": ("Zone People Occupant Count", "PERIMETER_ZN_2"),
"perim3_occ_count": ("Zone People Occupant Count", "PERIMETER_ZN_3"),
"perim4_occ_count": ("Zone People Occupant Count", "PERIMETER_ZN_4"),
"outdoor_dewpoint": ("Site Outdoor Air Dewpoint Temperature", "Environment"),
"outdoor_wetbulb": ("Site Outdoor Air Wetbulb Temperature", "Environment"),
"core_rh": ("Zone Air Relative Humidity", "CORE_ZN"),
"perim1_rh": ("Zone Air Relative Humidity", "PERIMETER_ZN_1"),
"perim2_rh": ("Zone Air Relative Humidity", "PERIMETER_ZN_2"),
"perim3_rh": ("Zone Air Relative Humidity", "PERIMETER_ZN_3"),
"perim4_rh": ("Zone Air Relative Humidity", "PERIMETER_ZN_4"),
"core_ash55_notcomfortable_summer": (
"Zone Thermal Comfort ASHRAE 55 Simple Model Summer Clothes Not Comfortable Time",
"CORE_ZN",
),
"core_ash55_notcomfortable_winter": (
"Zone Thermal Comfort ASHRAE 55 Simple Model Winter Clothes Not Comfortable Time",
"CORE_ZN",
),
"core_ash55_notcomfortable_any": (
"Zone Thermal Comfort ASHRAE 55 Simple Model Summer or Winter Clothes Not Comfortable Time",
"CORE_ZN",
),
"p1_ash55_notcomfortable_any": (
"Zone Thermal Comfort ASHRAE 55 Simple Model Summer or Winter Clothes Not Comfortable Time",
"PERIMETER_ZN_1",
),
"p2_ash55_notcomfortable_any": (
"Zone Thermal Comfort ASHRAE 55 Simple Model Summer or Winter Clothes Not Comfortable Time",
"PERIMETER_ZN_2",
),
"p3_ash55_notcomfortable_any": (
"Zone Thermal Comfort ASHRAE 55 Simple Model Summer or Winter Clothes Not Comfortable Time",
"PERIMETER_ZN_3",
),
"p4_ash55_notcomfortable_any": (
"Zone Thermal Comfort ASHRAE 55 Simple Model Summer or Winter Clothes Not Comfortable Time",
"PERIMETER_ZN_4",
),
}
def stable_hash_int(s: str, mod: int = 1000) -> int:
h = hashlib.md5(s.encode("utf-8")).hexdigest()
return int(h[:8], 16) % mod
def record_id(rec: Dict[str, Any]) -> str:
loc = rec.get("location", "UNKNOWN")
vname = rec.get("variation_name", "UNKNOWN")
btype = rec.get("building_type", BUILDING)
raw = f"{btype}__{loc}__{vname}"
safe = "".join(c if c.isalnum() or c in "._-=" else "_" for c in raw)
return safe
def _enforce_bounds(htg: float, clg: float) -> Tuple[float, float]:
h = float(np.clip(htg, HTG_MIN, HTG_MAX))
c = float(np.clip(clg, CLG_MIN, CLG_MAX))
if c < h + DEADBAND_MIN:
c = min(CLG_MAX, h + DEADBAND_MIN)
return h, c
def action_from_setpoints(htg: float, clg: float) -> np.ndarray:
h, c = _enforce_bounds(htg, clg)
return np.array([h, c] * 5, dtype=np.float32)
@dataclass
class PolicyRecorder:
behavior: str
rng: np.random.Generator
timestep_hours: float
last_htg: float = 21.0
last_clg: float = 24.0
piece_until: int = 0
piece_htg: float = 21.0
piece_clg: float = 24.0
def __post_init__(self):
self.actions: List[np.ndarray] = []
def policy(self, obs: Any, info: Dict[str, Any], step: int) -> np.ndarray:
b = self.behavior
if b == "rbc_21_24":
htg, clg = 21.0, 24.0
elif b == "random_walk":
if step == 0:
self.last_htg, self.last_clg = 21.0, 24.0
dh = self.rng.normal(0.0, 0.15)
dc = self.rng.normal(0.0, 0.20)
if (step % int(6 / self.timestep_hours)) == 0:
dh += self.rng.normal(0.0, 0.6)
dc += self.rng.normal(0.0, 0.8)
htg = self.last_htg + dh
clg = self.last_clg + dc
htg, clg = _enforce_bounds(htg, clg)
self.last_htg, self.last_clg = htg, clg
elif b == "piecewise":
if step >= self.piece_until:
hours = float(self.rng.choice([2, 3, 4, 6, 8, 12]))
dur_steps = max(1, int(round(hours / self.timestep_hours)))
self.piece_until = step + dur_steps
htg = float(self.rng.uniform(HTG_MIN, HTG_MAX))
clg = float(self.rng.uniform(max(CLG_MIN, htg + DEADBAND_MIN), CLG_MAX))
self.piece_htg, self.piece_clg = _enforce_bounds(htg, clg)
htg, clg = self.piece_htg, self.piece_clg
elif b == "sinusoid":
t_hours = step * self.timestep_hours
phase = 2.0 * math.pi * (t_hours / 24.0)
htg = 21.0 + 1.0 * math.sin(phase - 0.5) + self.rng.normal(0.0, 0.10)
clg = 24.5 + 1.5 * math.sin(phase) + self.rng.normal(0.0, 0.12)
htg, clg = _enforce_bounds(htg, clg)
elif b == "aggressive":
block = int((step * self.timestep_hours) // 6) % 2
if block == 0:
htg = float(self.rng.uniform(21.0, 23.5))
clg = float(self.rng.uniform(23.5, 25.5))
else:
htg = float(self.rng.uniform(HTG_MIN, 20.5))
clg = float(self.rng.uniform(26.0, CLG_MAX))
htg, clg = _enforce_bounds(htg, clg)
else:
htg, clg = 21.0, 24.0
a = action_from_setpoints(htg, clg)
self.actions.append(a)
return a
def select_state_columns(df: pd.DataFrame) -> List[str]:
base = list(hot_variables.keys())
time_candidates = [
"month", "day", "hour",
"day_of_week", "is_weekend",
"minute", "time", "timestep",
]
cols = []
for c in base + time_candidates:
if c in df.columns:
cols.append(c)
if not cols:
bad = set(["done", "terminated", "truncated"])
cols = [c for c in df.columns if c not in bad and pd.api.types.is_numeric_dtype(df[c])]
return cols
def build_npz_payload(
df: pd.DataFrame,
actions: np.ndarray,
meta: Dict[str, Any],
) -> Dict[str, Any]:
state_cols = select_state_columns(df)
obs = df[state_cols].to_numpy(dtype=np.float32)
rewards = compute_rewards_vectorized(df, timestep_hours=TIME_STEP_HOURS, cfg=REWARD_CFG)
terminals = compute_terminals(df)
meta = dict(meta)
meta["reward_cfg"] = config_to_meta(REWARD_CFG)
action_keys = [
"htg_core", "clg_core",
"htg_p1", "clg_p1",
"htg_p2", "clg_p2",
"htg_p3", "clg_p3",
"htg_p4", "clg_p4",
]
payload = {
"observations": obs,
"actions": actions.astype(np.float32),
"rewards": rewards,
"terminals": terminals,
"state_keys": np.array(state_cols, dtype=object),
"action_keys": np.array(action_keys, dtype=object),
"meta": np.array([json.dumps(meta)], dtype=object),
}
return payload
def save_npz(path: str, payload: Dict[str, Any]) -> None:
os.makedirs(os.path.dirname(path), exist_ok=True)
np.savez_compressed(path, **payload)
def run_one_episode(
rec: Dict[str, Any],
behavior: str,
episode_idx: int,
outputs_root: str,
save_root: str,
seed: int,
) -> Optional[str]:
rid = record_id(rec)
bpath, wpath = get_paths_from_manifest_record(rec)
out_dir = os.path.join(outputs_root, OUTPUTS_DIRNAME, rid, behavior, f"ep{episode_idx:03d}")
os.makedirs(out_dir, exist_ok=True)
traj_dir = os.path.join(save_root, rid, behavior)
traj_path = os.path.join(traj_dir, f"traj_ep{episode_idx:03d}_seed{seed}.npz")
if RESUME and os.path.exists(traj_path):
return traj_path
rng = np.random.default_rng(seed)
recorder = PolicyRecorder(behavior=behavior, rng=rng, timestep_hours=TIME_STEP_HOURS)
with suppress_output(QUIET_WORKERS):
df = run_rollout_to_df(
building_path=str(bpath),
weather_path=str(wpath),
variables=hot_variables,
actuators=hot_actuators,
policy_fn=recorder.policy,
location=str(rec.get("location", rec.get("climate", "UNKNOWN"))),
timestep_hours=TIME_STEP_HOURS,
heating_sp=21.0,
cooling_sp=24.0,
reward=None,
max_steps=MAX_STEPS,
verbose=VERBOSE_ROLLOUT,
)
actions = np.stack(recorder.actions, axis=0) if recorder.actions else np.zeros((len(df), 10), dtype=np.float32)
T = len(df)
if actions.shape[0] > T:
actions = actions[:T]
elif actions.shape[0] < T:
pad = np.repeat(actions[-1][None, :], T - actions.shape[0], axis=0) if actions.shape[0] > 0 else np.zeros((T, 10), dtype=np.float32)
actions = np.concatenate([actions, pad], axis=0)
if len(df) == actions.shape[0] and len(df) > 0:
df["setpoint_htg"] = actions[:, 0]
df["setpoint_clg"] = actions[:, 1]
meta = {
"record_id": rid,
"behavior": behavior,
"episode_idx": episode_idx,
"seed": seed,
"building_path": str(bpath),
"weather_path": str(wpath),
"location": rec.get("location", rec.get("climate")),
"thermal": rec.get("thermal", rec.get("thermal_variation")),
"occupancy": rec.get("occupancy", rec.get("occupancy_variation")),
"timestep_hours": TIME_STEP_HOURS,
"state_cols": select_state_columns(df),
}
payload = build_npz_payload(df=df, actions=actions, meta=meta)
save_npz(traj_path, payload)
df.to_csv(os.path.join(traj_dir, f"timeseries_ep{episode_idx:03d}_seed{seed}.csv"), index=False)
return traj_path
def main():
paths = detect_paths(outputs_dirname=OUTPUTS_DIRNAME)
manifest_path = find_manifest(paths, building=BUILDING, prefer_patched=PREFER_PATCHED)
records = load_manifest_records(manifest_path)
outputs_root = str(paths.outputs_root)
save_root = os.path.join(outputs_root, SAVE_DIRNAME)
os.makedirs(save_root, exist_ok=True)
tasks = []
task_id = 0
for rec_idx, rec in enumerate(records):
for behavior in BEHAVIORS:
for ep in range(EPISODES_PER_RECORD):
seed = BASE_SEED + (rec_idx * 100000) + (stable_hash_int(behavior, 100000)) + ep
tasks.append((task_id, rec, behavior, ep, seed))
task_id += 1
t0 = time.time()
successes = 0
failures = 0
saved_paths: List[str] = []
if NUM_WORKERS <= 1:
for tid, rec, behavior, ep, seed in tasks:
try:
p = run_one_episode(
rec=rec,
behavior=behavior,
episode_idx=ep,
outputs_root=outputs_root,
save_root=save_root,
seed=seed,
)
if p:
saved_paths.append(p)
successes += 1
if successes % 10 == 0:
elapsed = time.time() - t0
done = successes + failures
rate = done / elapsed if elapsed > 0 else 0.0
except Exception as e:
failures += 1
rid = record_id(rec)
print(f"[ERROR] tid={tid} record={rid} behavior={behavior} ep={ep}: {e}")
print(traceback.format_exc())
else:
with ProcessPoolExecutor(max_workers=NUM_WORKERS) as ex:
futs = []
for tid, rec, behavior, ep, seed in tasks:
futs.append(ex.submit(
run_one_episode,
rec, behavior, ep, outputs_root, save_root, seed
))
for i, fut in enumerate(as_completed(futs), 1):
try:
p = fut.result()
if p:
saved_paths.append(p)
successes += 1
except Exception as e:
failures += 1
print(f"[ERROR] future failed: {e}")
if i % 25 == 0 or i == len(futs):
elapsed = time.time() - t0
rate = i / elapsed if elapsed > 0 else 0.0
print(f"[progress] done={i}/{len(futs)} success={successes} fail={failures} rate={rate:.2f} eps/s elapsed={elapsed:.1f}s")
print("\nDONE")
if saved_paths:
print("Example saved file:", saved_paths[0])
print("Save root:", save_root)
if __name__ == "__main__":
main()