import pandas as pd import numpy as np from sklearn.preprocessing import MinMaxScaler import torch from src.data.dataset import NrvDataset from datetime import datetime import pytz history_data_path = "data/history-quarter-hour-data.csv" forecast_data_path = "data/load_forecast.csv" pv_forecast_data_path = "data/pv_gen_forecast.csv" wind_forecast_data_path = "data/wind_gen_forecast.csv" nominal_net_position_data_path = "data/nominal_net_position.csv" class DataConfig: def __init__(self): self.NRV_HISTORY: bool = True ### LOAD ### self.LOAD_FORECAST: bool = False self.LOAD_HISTORY: bool = False ### PV ### self.PV_FORECAST: bool = False self.PV_HISTORY: bool = False ### WIND ### self.WIND_FORECAST: bool = False self.WIND_HISTORY: bool = False ### NET POSITION ### self.NOMINAL_NET_POSITION: bool = False ### TIME ### self.YEAR: bool = False self.DAY_OF_WEEK: bool = False self.QUARTER: bool = False class DataProcessor: def __init__(self, data_config: DataConfig, lstm: bool = False, path: str = "./"): self.batch_size = 2048 self.path = path self.lstm = lstm self.train_range = ( -np.inf, datetime(year=2022, month=11, day=30, tzinfo=pytz.UTC), ) self.val_range = ( datetime(year=2022, month=11, day=1, tzinfo=pytz.UTC), datetime(year=2022, month=12, day=30, tzinfo=pytz.UTC), ) self.test_range = (datetime(year=2023, month=1, day=1, tzinfo=pytz.UTC), np.inf) self.update_range_str() self.history_features = self.get_nrv_history() self.future_features = self.get_load_forecast() self.pv_forecast = self.get_pv_forecast() self.wind_forecast = self.get_wind_forecast() self.all_features = self.history_features.merge( self.future_features, on="datetime", how="left" ) self.all_features = self.all_features.merge( self.pv_forecast, on="datetime", how="left" ) self.all_features = self.all_features.merge( self.wind_forecast, on="datetime", how="left" ) self.all_features = self.all_features.merge( self.get_nominal_net_position(), on="datetime", how="left" ) self.all_features["quarter"] = ( self.all_features["datetime"].dt.hour * 4 + self.all_features["datetime"].dt.minute / 15 ) self.all_features["day_of_week"] = self.all_features["datetime"].dt.dayofweek self.output_size = 96 self.data_config = data_config self.nrv_scaler = MinMaxScaler(feature_range=(-1, 1)) self.load_forecast_scaler = MinMaxScaler(feature_range=(-1, 1)) self.pv_forecast_scaler = MinMaxScaler(feature_range=(-1, 1)) self.wind_forecast_scaler = MinMaxScaler(feature_range=(-1, 1)) self.nominal_net_position_scaler = MinMaxScaler(feature_range=(-1, 1)) self.full_day_skip = False def set_data_config(self, data_config: DataConfig): self.data_config = data_config def set_full_day_skip(self, full_day_skip: bool): self.full_day_skip = full_day_skip def set_output_size(self, output_size: int): self.output_size = output_size def set_train_range(self, train_range: tuple): self.train_range = train_range self.update_range_str() def set_test_range(self, test_range: tuple): self.test_range = test_range self.update_range_str() def update_range_str(self): self.train_range_start = ( str(self.train_range[0]) if self.train_range[0] != -np.inf else "-inf" ) self.train_range_end = ( str(self.train_range[1]) if self.train_range[1] != np.inf else "inf" ) self.test_range_start = ( str(self.test_range[0]) if self.test_range[0] != -np.inf else "-inf" ) self.test_range_end = ( str(self.test_range[1]) if self.test_range[1] != np.inf else "inf" ) def get_nrv_history(self): df = pd.read_csv(self.path + history_data_path, delimiter=";") df = df[["datetime", "netregulationvolume"]] df = df.rename(columns={"netregulationvolume": "nrv"}) df["datetime"] = pd.to_datetime(df["datetime"]) counts = df["datetime"].dt.date.value_counts().sort_index() df = df[df["datetime"].dt.date.isin(counts[counts == 96].index)] df.sort_values(by="datetime", inplace=True) return df def get_load_forecast(self): df = pd.read_csv(self.path + forecast_data_path, delimiter=";") df = df.rename( columns={ "Day-ahead 6PM forecast": "load_forecast", "Datetime": "datetime", "Total Load": "total_load", } ) df = df[["datetime", "load_forecast", "total_load"]] df["datetime"] = pd.to_datetime(df["datetime"], utc=True) df.sort_values(by="datetime", inplace=True) return df def get_pv_forecast(self): df = pd.read_csv(self.path + pv_forecast_data_path, delimiter=";") df = df[df["region"] == "Belgium"] df = df.rename( columns={ "dayahead11hforecast": "pv_forecast", "Datetime": "datetime", "measured": "pv_history", } ) df = df[["datetime", "pv_forecast", "pv_history"]] # replace nan by zero df = df.fillna(0) df = df.groupby("datetime").mean().reset_index() df["datetime"] = pd.to_datetime(df["datetime"], utc=True) df.sort_values(by="datetime", inplace=True) return df def get_wind_forecast(self): df = pd.read_csv(self.path + wind_forecast_data_path, delimiter=";") df = df.rename( columns={ "measured": "wind_history", "dayaheadforecast": "wind_forecast", "datetime": "datetime", } ) df = df[["datetime", "wind_forecast", "wind_history"]] # remove nan rows df = df[~df["wind_forecast"].isnull()] df = df.groupby("datetime").mean().reset_index() df["datetime"] = pd.to_datetime(df["datetime"], utc=True) df.sort_values(by="datetime", inplace=True) return df def get_nominal_net_position(self): df = pd.read_csv(self.path + nominal_net_position_data_path, delimiter=";") # remove Resulotion column df = df.drop(columns=["Resolution code"]) # rename columns df = df.rename( columns={ "Datetime": "datetime", "Implicit net position": "nominal_net_position", } ) # to pandas datetime df["datetime"] = pd.to_datetime(df["datetime"], utc=True) # make sure all rows are quarter-hourly, if some are not, copy the previous value df = df.set_index("datetime").resample("15min").ffill().reset_index() return df def set_batch_size(self, batch_size: int): self.batch_size = batch_size def get_dataloader(self, dataset, shuffle: bool = True): batch_size = len(dataset) if self.batch_size is None else self.batch_size return torch.utils.data.DataLoader( dataset, batch_size=batch_size, shuffle=shuffle, num_workers=4 ) def get_train_dataloader( self, transform: bool = True, predict_sequence_length: int = 96, shuffle: bool = True, with_validation: bool = False, ): train_df = self.all_features.copy() train_range = self.train_range if with_validation: train_range = ( self.train_range[0], self.val_range[0] - pd.Timedelta(days=1), ) if self.train_range[0] != -np.inf: train_df = train_df[(train_df["datetime"] >= train_range[0])] if self.train_range[1] != np.inf: train_df = train_df[(train_df["datetime"] <= train_range[1])] if transform: train_df["nrv"] = self.nrv_scaler.fit_transform( train_df["nrv"].values.reshape(-1, 1) ).reshape(-1) train_df["load_forecast"] = self.load_forecast_scaler.fit_transform( train_df["load_forecast"].values.reshape(-1, 1) ).reshape(-1) train_df["total_load"] = self.load_forecast_scaler.transform( train_df["total_load"].values.reshape(-1, 1) ).reshape(-1) train_df["pv_forecast"] = self.pv_forecast_scaler.fit_transform( train_df["pv_forecast"].values.reshape(-1, 1) ).reshape(-1) train_df["pv_history"] = self.pv_forecast_scaler.transform( train_df["pv_history"].values.reshape(-1, 1) ).reshape(-1) train_df["wind_forecast"] = self.wind_forecast_scaler.fit_transform( train_df["wind_forecast"].values.reshape(-1, 1) ).reshape(-1) train_df["wind_history"] = self.wind_forecast_scaler.transform( train_df["wind_history"].values.reshape(-1, 1) ).reshape(-1) train_df["nominal_net_position"] = ( self.nominal_net_position_scaler.fit_transform( train_df["nominal_net_position"].values.reshape(-1, 1) ).reshape(-1) ) train_dataset = NrvDataset( train_df, data_config=self.data_config, full_day_skip=self.full_day_skip, predict_sequence_length=predict_sequence_length, lstm=self.lstm, ) return self.get_dataloader(train_dataset, shuffle=shuffle) def get_val_dataloader( self, transform: bool = True, predict_sequence_length: int = 96, full_day_skip: bool = False, ): val_df = self.all_features.copy() if self.val_range[0] != -np.inf: val_df = val_df[(val_df["datetime"] >= self.val_range[0])] if self.val_range[1] != np.inf: val_df = val_df[(val_df["datetime"] <= self.val_range[1])] if transform: val_df["nrv"] = self.nrv_scaler.transform( val_df["nrv"].values.reshape(-1, 1) ).reshape(-1) val_df["load_forecast"] = self.load_forecast_scaler.transform( val_df["load_forecast"].values.reshape(-1, 1) ).reshape(-1) val_df["total_load"] = self.load_forecast_scaler.transform( val_df["total_load"].values.reshape(-1, 1) ).reshape(-1) val_df["pv_forecast"] = self.pv_forecast_scaler.transform( val_df["pv_forecast"].values.reshape(-1, 1) ).reshape(-1) val_df["pv_history"] = self.pv_forecast_scaler.transform( val_df["pv_history"].values.reshape(-1, 1) ).reshape(-1) val_df["wind_forecast"] = self.wind_forecast_scaler.transform( val_df["wind_forecast"].values.reshape(-1, 1) ).reshape(-1) val_df["wind_history"] = self.wind_forecast_scaler.transform( val_df["wind_history"].values.reshape(-1, 1) ).reshape(-1) val_df["nominal_net_position"] = self.nominal_net_position_scaler.transform( val_df["nominal_net_position"].values.reshape(-1, 1) ).reshape(-1) val_dataset = NrvDataset( val_df, data_config=self.data_config, full_day_skip=self.full_day_skip or full_day_skip, predict_sequence_length=predict_sequence_length, lstm=self.lstm, ) return self.get_dataloader(val_dataset, shuffle=False) def get_test_dataloader( self, transform: bool = True, predict_sequence_length: int = 96, full_day_skip: bool = False, ): test_df = self.all_features.copy() if self.test_range[0] != -np.inf: test_df = test_df[(test_df["datetime"] >= self.test_range[0])] if self.test_range[1] != np.inf: test_df = test_df[(test_df["datetime"] <= self.test_range[1])] if transform: test_df["nrv"] = self.nrv_scaler.transform( test_df["nrv"].values.reshape(-1, 1) ).reshape(-1) test_df["load_forecast"] = self.load_forecast_scaler.transform( test_df["load_forecast"].values.reshape(-1, 1) ).reshape(-1) test_df["total_load"] = self.load_forecast_scaler.transform( test_df["total_load"].values.reshape(-1, 1) ).reshape(-1) test_df["pv_forecast"] = self.pv_forecast_scaler.transform( test_df["pv_forecast"].values.reshape(-1, 1) ).reshape(-1) test_df["pv_history"] = self.pv_forecast_scaler.transform( test_df["pv_history"].values.reshape(-1, 1) ).reshape(-1) test_df["wind_forecast"] = self.wind_forecast_scaler.transform( test_df["wind_forecast"].values.reshape(-1, 1) ).reshape(-1) test_df["wind_history"] = self.wind_forecast_scaler.transform( test_df["wind_history"].values.reshape(-1, 1) ).reshape(-1) test_df["nominal_net_position"] = ( self.nominal_net_position_scaler.transform( test_df["nominal_net_position"].values.reshape(-1, 1) ).reshape(-1) ) test_dataset = NrvDataset( test_df, data_config=self.data_config, full_day_skip=self.full_day_skip or full_day_skip, predict_sequence_length=predict_sequence_length, lstm=self.lstm, ) return self.get_dataloader(test_dataset, shuffle=False) def get_dataloaders( self, transform: bool = True, predict_sequence_length: int = 96, full_day_skip: bool = False, validation: bool = False, ): if not validation: return self.get_train_dataloader( transform=transform, predict_sequence_length=predict_sequence_length ), self.get_test_dataloader( transform=transform, predict_sequence_length=predict_sequence_length, full_day_skip=full_day_skip, ) else: return ( self.get_train_dataloader( transform=transform, predict_sequence_length=predict_sequence_length, with_validation=True, ), self.get_val_dataloader( transform=transform, predict_sequence_length=predict_sequence_length, full_day_skip=full_day_skip, ), self.get_test_dataloader( transform=transform, predict_sequence_length=predict_sequence_length, full_day_skip=full_day_skip, ), ) def inverse_transform(self, input_data): try: if isinstance(input_data, torch.Tensor): if input_data.is_cuda: input_data = input_data.cpu() input_np = input_data.detach().numpy() # Convert to numpy array elif isinstance(input_data, np.ndarray): input_np = input_data else: raise TypeError("Input must be a PyTorch tensor or a NumPy array") # Store the original shape original_shape = input_np.shape input_2d = input_np.reshape(-1, original_shape[-1]) transformed_2d = self.nrv_scaler.inverse_transform(input_2d) if isinstance(input_data, torch.Tensor): return torch.from_numpy(transformed_2d).view(original_shape) else: return transformed_2d.reshape(original_shape) except Exception as e: raise RuntimeError(f"Error in inverse_transform: {e}") from e def get_input_size(self): data_loader = self.get_train_dataloader( predict_sequence_length=self.output_size ) input, _, _ = next(iter(data_loader)) return input.shape def get_time_feature_size(self): time_feature_size = 1 if self.data_config.QUARTER: time_feature_size *= 96 if self.data_config.DAY_OF_WEEK: time_feature_size *= 7 if time_feature_size == 1: return 0 return time_feature_size