| | import torch |
| | import torch.nn as nn |
| | import numpy as np |
| | import pandas as pd |
| | import os |
| | import joblib |
| | import math |
| | import datetime |
| | from tqdm import tqdm |
| | import matplotlib.pyplot as plt |
| | import matplotlib.dates as mdates |
| |
|
| | |
| | |
| | |
| |
|
| | try: |
| | from hierarchical_diffusion_model import ( |
| | HierarchicalDiffusionModel, ConditionalUnet, ResnetBlock1D, |
| | AttentionBlock1D, DownBlock1D, UpBlock1D, |
| | SinusoidalPositionEmbeddings, ImprovedDiffusionModel |
| | ) |
| | print("Diffusion model classes imported.") |
| | except ImportError: |
| | print("="*50) |
| | print("ERROR: Could not import model classes from 'hierarchical_diffusion_model.py'.") |
| | print("="*50) |
| | exit() |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def add_amplitude_jitter(series, daily_samples=48, scale=0.05): |
| | series = series.copy() |
| | num_days = len(series) // daily_samples |
| | if num_days == 0: return series |
| | factors = np.random.normal(1.0, scale, size=num_days) |
| | for d in range(num_days): |
| | start, end = d * daily_samples, (d + 1) * daily_samples |
| | series[start:end] *= factors[d] |
| | return series |
| |
|
| | def add_cloud_variability(pv, timestamps, base_sigma=0.25): |
| | pv = pv.copy() |
| | if len(pv) == 0: return pv |
| | days = pd.Series(pv, index=timestamps).groupby(timestamps.date) |
| | adjusted = [] |
| | for day, vals in days: |
| | cloud_factor = np.random.lognormal(mean=-0.02, sigma=base_sigma) |
| | hour = vals.index.hour |
| | day_pv = np.where((hour >= 6) & (hour <= 18), vals * cloud_factor, 0.0) |
| | adjusted.append(day_pv) |
| | if not adjusted: return np.array([]) |
| | return np.concatenate(adjusted) |
| |
|
| | def enforce_physics(df: pd.DataFrame, pv_cap_kw: float | None = None) -> pd.DataFrame: |
| | df = df.copy() |
| | df['solar_generation'] = np.clip(df['solar_generation'], 0.0, None) |
| | hour = df.index.hour |
| | night = (hour < 7) | (hour > 18) |
| | df.loc[night, 'solar_generation'] = 0.0 |
| | export_mask = df['grid_usage'] < 0 |
| | if export_mask.any(): |
| | limited_export = -np.minimum(-df.loc[export_mask, 'grid_usage'], df.loc[export_mask, 'solar_generation']) |
| | df.loc[export_mask, 'grid_usage'] = limited_export |
| | zero_pv_neg_grid = export_mask & (df['solar_generation'] <= 1e-6) |
| | df.loc[zero_pv_neg_grid, 'grid_usage'] = 0.0 |
| | if pv_cap_kw is not None: |
| | df['solar_generation'] = np.clip(df['solar_generation'], 0.0, pv_cap_kw) |
| | return df |
| |
|
| | def calculate_generation_length(duration: str, samples_per_day: int) -> int: |
| | """Calculate samples needed.""" |
| | if duration == '1_year': |
| | return 365 * samples_per_day |
| | elif duration == '6_months': |
| | return 182 * samples_per_day |
| | elif duration == '2_months': |
| | return 60 * samples_per_day |
| | elif duration == '1_month': |
| | return 30 * samples_per_day |
| | elif duration == '14_days': |
| | return 14 * samples_per_day |
| | elif duration == '7_days': |
| | return 7 * samples_per_day |
| | elif duration == '2_days': |
| | return 2 * samples_per_day |
| | else: |
| | print(f"Warning: Unknown duration '{duration}'. Defaulting to 1 year.") |
| | return 365 * samples_per_day |
| |
|
| | |
| | |
| | |
| |
|
| | class Config: |
| | |
| | MODEL_PATH = './trained_model/best_hierarchical_model.pth' |
| | SCALER_PATH = './data/global_scaler.gz' |
| | ORIGINAL_DATA_DIR = './data/per_house' |
| | OUTPUT_DIR = './generated_data' |
| |
|
| | |
| | GENERATION_DURATION = '1_year' |
| | NUM_PROFILES_TO_GENERATE = 2000 |
| | PLOTS_TO_GENERATE = 20 |
| | GENERATION_BATCH_SIZE = 128 |
| |
|
| | |
| | TRAINING_WINDOW_DAYS = 14 |
| | |
| | NUM_HOUSES_TRAINED_ON = 300 |
| | SAMPLES_PER_DAY = 48 |
| | NUM_FEATURES = 4 |
| | DOWNSCALE_FACTOR = 4 |
| | EMBEDDING_DIM = 64 |
| | HIDDEN_SIZE = 512 |
| | HIDDEN_DIMS = [HIDDEN_SIZE // 4, HIDDEN_SIZE // 2, HIDDEN_SIZE] |
| | DROPOUT = 0.1 |
| | USE_ATTENTION = True |
| | DIFFUSION_TIMESTEPS = 500 |
| | BLOCKS_PER_LEVEL = 3 |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def main(cfg, run_output_dir): |
| | """Main generation logic.""" |
| | DEVICE = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu" |
| | print(f"Using device: {DEVICE}") |
| |
|
| | csv_output_dir = os.path.join(run_output_dir, 'csv') |
| | plot_output_dir = os.path.join(run_output_dir, 'plots') |
| | os.makedirs(csv_output_dir, exist_ok=True) |
| | os.makedirs(plot_output_dir, exist_ok=True) |
| | |
| | print("Loading resources...") |
| | try: |
| | scaler = joblib.load(cfg.SCALER_PATH) |
| | if scaler.n_features_in_ != cfg.NUM_FEATURES: |
| | print(f"WARNING: Scaler was fit on {scaler.n_features_in_} features, but model expects {cfg.NUM_FEATURES}.") |
| | |
| | original_files = sorted([f for f in os.listdir(cfg.ORIGINAL_DATA_DIR) if f.endswith('.csv')]) |
| | if not original_files: |
| | raise FileNotFoundError("No original data files found to extract timestamps.") |
| | |
| | sample_original_df = pd.read_csv(os.path.join(cfg.ORIGINAL_DATA_DIR, original_files[0]), index_col='timestamp', parse_dates=True) |
| | |
| | |
| | full_timestamps = sample_original_df.index[:(365 * cfg.SAMPLES_PER_DAY)] |
| | |
| | |
| | total_samples_needed = calculate_generation_length(cfg.GENERATION_DURATION, cfg.SAMPLES_PER_DAY) |
| | |
| | |
| | TRAINING_WINDOW_SAMPLES = cfg.TRAINING_WINDOW_DAYS * cfg.SAMPLES_PER_DAY |
| | |
| | |
| | if total_samples_needed > len(full_timestamps): |
| | print(f"Warning: Requested {total_samples_needed} samples, but file has {len(full_timestamps)}. Clamping to max.") |
| | total_samples_needed = len(full_timestamps) |
| | |
| | print(f"Goal: Generate {total_samples_needed} samples ({cfg.GENERATION_DURATION}) per profile.") |
| | print(f"Strategy: Stitching {TRAINING_WINDOW_SAMPLES}-sample chunks.") |
| | |
| | model = HierarchicalDiffusionModel( |
| | in_channels=cfg.NUM_FEATURES, |
| | num_houses=cfg.NUM_HOUSES_TRAINED_ON, |
| | downscale_factor=cfg.DOWNSCALE_FACTOR, |
| | embedding_dim=cfg.EMBEDDING_DIM, |
| | hidden_dims=cfg.HIDDEN_DIMS, |
| | dropout=cfg.DROPOUT, |
| | use_attention=cfg.USE_ATTENTION, |
| | num_timesteps=cfg.DIFFUSION_TIMESTEPS, |
| | blocks_per_level=cfg.BLOCKS_PER_LEVEL |
| | ) |
| | |
| | model.load_state_dict(torch.load(cfg.MODEL_PATH, map_location=DEVICE)) |
| | model.to(DEVICE) |
| | model.eval() |
| | print("Model, scaler, timestamps ready.") |
| |
|
| | except FileNotFoundError as e: |
| | print(f"ERROR: A required file was not found. Details: {e}") |
| | return |
| | except Exception as e: |
| | print(f"An error occurred during setup: {e}") |
| | return |
| |
|
| | num_batches = math.ceil(cfg.NUM_PROFILES_TO_GENERATE / cfg.GENERATION_BATCH_SIZE) |
| | house_counter = 0 |
| |
|
| | pbar = tqdm(range(num_batches), desc="Generating Batches") |
| | for i in pbar: |
| | current_batch_size = min(cfg.GENERATION_BATCH_SIZE, cfg.NUM_PROFILES_TO_GENERATE - house_counter) |
| | if current_batch_size <= 0: break |
| | pbar.set_postfix({'batch_size': current_batch_size}) |
| | |
| | |
| | num_chunks_needed = math.ceil(total_samples_needed / TRAINING_WINDOW_SAMPLES) |
| | batch_chunks_list = [] |
| |
|
| | for chunk_idx in range(num_chunks_needed): |
| | |
| | samples_remaining = total_samples_needed - (chunk_idx * TRAINING_WINDOW_SAMPLES) |
| | current_chunk_length = min(TRAINING_WINDOW_SAMPLES, samples_remaining) |
| | |
| | shape_to_generate = (current_chunk_length, cfg.NUM_FEATURES) |
| |
|
| | |
| | sample_conditions = { |
| | "house_id": torch.randint(0, cfg.NUM_HOUSES_TRAINED_ON, (current_batch_size,), device=DEVICE), |
| | "day_of_week": torch.randint(0, 7, (current_batch_size,), device=DEVICE), |
| | "day_of_year": torch.randint(0, 365, (current_batch_size,), device=DEVICE) |
| | } |
| | |
| | with torch.no_grad(): |
| | |
| | generated_chunk_data = model.sample(current_batch_size, sample_conditions, shape=shape_to_generate) |
| | |
| | batch_chunks_list.append(generated_chunk_data.cpu().numpy()) |
| | |
| | |
| | generated_data_np = np.concatenate(batch_chunks_list, axis=1) |
| | |
| |
|
| | |
| | for j in range(current_batch_size): |
| | current_house_num = house_counter + 1 |
| | |
| | profile_timestamps = full_timestamps[:total_samples_needed] |
| | normalized_series = generated_data_np[j] |
| | |
| | unscaled_series = scaler.inverse_transform(normalized_series) |
| | |
| | df = pd.DataFrame( |
| | unscaled_series, |
| | columns=['grid_usage', 'solar_generation', 'sin_time', 'cos_time'], |
| | index=profile_timestamps |
| | ) |
| |
|
| | df = enforce_physics(df) |
| | df['grid_usage'] = add_amplitude_jitter(df['grid_usage'].values, scale=0.08, daily_samples=cfg.SAMPLES_PER_DAY) |
| | df['solar_generation'] = add_cloud_variability(df['solar_generation'].values, df.index, base_sigma=0.3) |
| | df = enforce_physics(df) |
| | |
| | df_to_save = df[['grid_usage', 'solar_generation']] |
| | df_to_save.to_csv(os.path.join(csv_output_dir, f'generated_house_{current_house_num}.csv')) |
| |
|
| | if house_counter < cfg.PLOTS_TO_GENERATE: |
| | plot_df = df_to_save.head(cfg.SAMPLES_PER_DAY * 14) |
| | plt.figure(figsize=(15, 6)) |
| | plt.plot(plot_df.index, plot_df['grid_usage'], label='Grid Usage', color='dodgerblue', alpha=0.9) |
| | plt.plot(plot_df.index, plot_df['solar_generation'], label='Solar Generation', color='darkorange', alpha=0.9) |
| | plt.title(f'Generated Data for Profile {current_house_num} (First 14 Days)') |
| | plt.xlabel('Timestamp'); plt.ylabel('Power (kW)'); plt.legend(); plt.grid(True, which='both', linestyle='--', linewidth=0.5) |
| | plt.tight_layout() |
| | plt.savefig(os.path.join(plot_output_dir, f'generated_profile_{current_house_num}_plot.png')) |
| | plt.close() |
| | |
| | house_counter += 1 |
| |
|
| | print(f"\nSuccessfully generated and saved {house_counter} house profiles.") |
| | if cfg.PLOTS_TO_GENERATE > 0: |
| | print(f"Plots saved to '{plot_output_dir}'.") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | if __name__ == '__main__': |
| | config = Config() |
| | |
| | |
| | run_timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") |
| | run_name = f"generation_run_{config.GENERATION_DURATION}_{run_timestamp}" |
| | run_output_dir = os.path.join(config.OUTPUT_DIR, run_name) |
| | os.makedirs(run_output_dir, exist_ok=True) |
| | |
| | print(f"Starting new generation run: {run_name}") |
| | print(f"All outputs will be saved to: {run_output_dir}") |
| | |
| | |
| | main(config, run_output_dir) |
| | |
| | print("\nGeneration process complete.") |