from clearml import Task from tqdm import tqdm from src.policies.simple_baseline import BaselinePolicy import pandas as pd import numpy as np import torch import plotly.express as px from src.utils.imbalance_price_calculator import ImbalancePriceCalculator class PolicyEvaluator: def __init__(self, baseline_policy: BaselinePolicy, task: Task = None): self.baseline_policy = baseline_policy self.ipc = ImbalancePriceCalculator(data_path="") self.dates = baseline_policy.test_data["DateTime"].dt.date.unique() self.dates = pd.to_datetime(self.dates) ### Load Imbalance Prices ### imbalance_prices = pd.read_csv('data/imbalance_prices.csv', sep=';') imbalance_prices["DateTime"] = pd.to_datetime(imbalance_prices['DateTime'], utc=True) self.imbalance_prices = imbalance_prices.sort_values(by=['DateTime']) self.penalties = [0, 100, 300, 500, 800, 1000, 1500] self.profits = [] self.task = task def get_imbanlance_prices_for_date(self, date): imbalance_prices_day = self.imbalance_prices[self.imbalance_prices["DateTime"].dt.date == date] return imbalance_prices_day['Positive imbalance price'].values def evaluate_for_date(self, date, idx_samples, test_loader): charge_thresholds = np.arange(-100, 250, 25) discharge_thresholds = np.arange(-100, 250, 25) idx = test_loader.dataset.get_idx_for_date(date.date()) (initial, samples) = idx_samples[idx] initial = initial.cpu().numpy()[0][-1] samples = samples.cpu().numpy() initial = np.repeat(initial, samples.shape[0]) combined = np.concatenate((initial.reshape(-1, 1), samples), axis=1) reconstructed_imbalance_prices = self.ipc.get_imbalance_prices_2023_for_date_vectorized(date, combined) reconstructed_imbalance_prices = torch.tensor(reconstructed_imbalance_prices, device="cuda") real_imbalance_prices = self.get_imbanlance_prices_for_date(date.date()) for penalty in self.penalties: found_charge_thresholds, found_discharge_thresholds = self.baseline_policy.get_optimal_thresholds( reconstructed_imbalance_prices, charge_thresholds, discharge_thresholds, penalty ) predicted_charge_threshold = found_charge_thresholds.mean(axis=0) predicted_discharge_threshold = found_discharge_thresholds.mean(axis=0) ### Determine Profits and Charge Cycles ### simulated_profit, simulated_charge_cycles = self.baseline_policy.simulate( torch.tensor([[real_imbalance_prices]]), torch.tensor([predicted_charge_threshold]), torch.tensor([predicted_discharge_threshold]) ) self.profits.append([date, penalty, simulated_profit[0][0].item(), simulated_charge_cycles[0][0].item(), predicted_charge_threshold.item(), predicted_discharge_threshold.item()]) def evaluate_test_set(self, idx_samples, test_loader): self.profits = [] try: for date in tqdm(self.dates): self.evaluate_for_date(date, idx_samples, test_loader) except KeyboardInterrupt: print("Interrupted") raise KeyboardInterrupt except Exception as e: print(e) pass self.profits = pd.DataFrame(self.profits, columns=["Date", "Penalty", "Profit", "Charge Cycles", "Charge Threshold", "Discharge Threshold"]) def plot_profits_table(self): # Check if task or penalties are not set if self.task is None or not hasattr(self, 'penalties') or not hasattr(self, 'profits'): print("Task, penalties, or profits not defined.") return if self.profits.empty: print("Profits DataFrame is empty.") return # Aggregate profits and charge cycles by penalty, calculating totals and per-year values aggregated = self.profits.groupby("Penalty").agg( Total_Profit=("Profit", "sum"), Total_Charge_Cycles=("Charge Cycles", "sum"), Num_Days=("Date", "nunique") ) aggregated["Profit_Per_Year"] = aggregated["Total_Profit"] / aggregated["Num_Days"] * 365 aggregated["Charge_Cycles_Per_Year"] = aggregated["Total_Charge_Cycles"] / aggregated["Num_Days"] * 365 # Reset index to make 'Penalty' a column again and drop unnecessary columns final_df = aggregated.reset_index().drop(columns=["Total_Profit", "Total_Charge_Cycles", "Num_Days"]) # Rename columns to match expected output final_df.columns = ["Penalty", "Total Profit", "Total Charge Cycles"] # Log the final results table self.task.get_logger().report_table( "Policy Results", "Policy Results", iteration=0, table_plot=final_df ) def plot_thresholds_per_day(self): if self.task is None: return fig = px.line( self.profits[self.profits["Penalty"] == 0], x="Date", y=["Charge Threshold", "Discharge Threshold"], title="Charge and Discharge Thresholds per Day" ) fig.update_layout( width=1000, height=600, title_x=0.5, ) self.task.get_logger().report_plotly( "Thresholds per Day", "Thresholds per Day", iteration=0, figure=fig ) def get_profits_as_scalars(self): aggregated = self.profits.groupby("Penalty").agg( Total_Profit=("Profit", "sum"), Total_Charge_Cycles=("Charge Cycles", "sum"), Num_Days=("Date", "nunique") ) aggregated["Profit_Per_Year"] = aggregated["Total_Profit"] / aggregated["Num_Days"] * 365 aggregated["Charge_Cycles_Per_Year"] = aggregated["Total_Charge_Cycles"] / aggregated["Num_Days"] * 365 # Reset index to make 'Penalty' a column again and drop unnecessary columns final_df = aggregated.reset_index().drop(columns=["Total_Profit", "Total_Charge_Cycles", "Num_Days"]) # Rename columns to match expected output final_df.columns = ["Penalty", "Total Profit", "Total Charge Cycles"] return final_df