diff --git a/configs/datamodule/default.yaml b/configs/datamodule/default.yaml index 4e9f6af..d3b23b3 100644 --- a/configs/datamodule/default.yaml +++ b/configs/datamodule/default.yaml @@ -1,6 +1,6 @@ _target_: pvnet_summation.data.datamodule.DataModule batch_dir: "/mnt/disks/bigbatches/concurrent_batches_v3.6_-60mins" gsp_zarr_path: "/mnt/disks/nwp/pv_gsp.zarr" -batch_size: 8 +batch_size: 32 num_workers: 20 prefetch_factor: 2 diff --git a/configs/model/default.yaml b/configs/model/default.yaml index d481aaf..a2e8beb 100644 --- a/configs/model/default.yaml +++ b/configs/model/default.yaml @@ -14,12 +14,10 @@ output_network: _partial_: True output_network_kwargs: fc_hidden_features: 128 - n_res_blocks: 6 + n_res_blocks: 2 res_block_layers: 2 dropout_frac: 0.0 - -# Foreast and time settings -forecast_minutes: 480 +predict_difference_from_sum: False # ---------------------------------------------- diff --git a/configs/trainer/default.yaml b/configs/trainer/default.yaml index 5f05108..dd0d7c4 100644 --- a/configs/trainer/default.yaml +++ b/configs/trainer/default.yaml @@ -11,7 +11,7 @@ num_sanity_val_steps: 8 fast_dev_run: false #profiler: 'simple' -accumulate_grad_batches: 4 +#accumulate_grad_batches: 4 #val_check_interval: 800 #limit_val_batches: 800 log_every_n_steps: 50 diff --git a/pvnet_summation/data/datamodule.py b/pvnet_summation/data/datamodule.py index 029a952..c8a92a3 100644 --- a/pvnet_summation/data/datamodule.py +++ b/pvnet_summation/data/datamodule.py @@ -60,7 +60,7 @@ def __init__(self, source_datapipe): """Convert list of dicts to dict of lists Args: - source_datapipe: + source_datapipe: Datapipe yielding lists of dicts """ self.source_datapipe = source_datapipe @@ -105,7 +105,17 @@ def __init__(self, **datapipes): def __iter__(self): for outputs in self.source_datapipes: - yield {key: value for key, value in zip(self.keys, outputs)} + yield {key: value for key, value in zip(self.keys, outputs)} # noqa: B905 + + +def get_capacity(batch): + """Extract the capacity from the numpy batch""" + return batch[BatchKey.gsp_effective_capacity_mwp] + + +def divide(args): + """Divide first argument by second""" + return args[0] / args[1] class DataModule(LightningDataModule): @@ -161,17 +171,25 @@ def _get_premade_batches_datapipe(self, subdir, shuffle=False, add_filename=Fals ) sample_pipeline, sample_pipeline_copy = sample_pipeline.fork(2, buffer_size=5) + times_datapipe = GetBatchTime(sample_pipeline_copy) - times_datapipe, times_datapipe_copy = GetBatchTime(sample_pipeline_copy).fork( - 2, buffer_size=5 + times_datapipe, times_datapipe_copy = times_datapipe.fork(2, buffer_size=5) + national_targets_datapipe = GetNationalPVLive(gsp_data, times_datapipe_copy) + + times_datapipe, times_datapipe_copy = times_datapipe.fork(2, buffer_size=5) + national_capacity_datapipe = GetNationalPVLive( + gsp_data.effective_capacity_mwp, times_datapipe_copy ) + sample_pipeline, sample_pipeline_copy = sample_pipeline.fork(2, buffer_size=5) + gsp_capacity_pipeline = sample_pipeline_copy.map(get_capacity) - national_targets_datapipe = GetNationalPVLive(gsp_data, times_datapipe_copy) + capacity_pipeline = gsp_capacity_pipeline.zip(national_capacity_datapipe).map(divide) # Compile the samples if add_filename: data_pipeline = ZipperDict( pvnet_inputs=sample_pipeline, + effective_capacity=capacity_pipeline, national_targets=national_targets_datapipe, times=times_datapipe, filepath=file_pipeline_copy, @@ -179,6 +197,7 @@ def _get_premade_batches_datapipe(self, subdir, shuffle=False, add_filename=Fals else: data_pipeline = ZipperDict( pvnet_inputs=sample_pipeline, + effective_capacity=capacity_pipeline, national_targets=national_targets_datapipe, times=times_datapipe, ) @@ -187,6 +206,7 @@ def _get_premade_batches_datapipe(self, subdir, shuffle=False, add_filename=Fals data_pipeline = PivotDictList(data_pipeline.batch(self.batch_size)) data_pipeline = DictApply( data_pipeline, + effective_capacity=torch.stack, national_targets=torch.stack, times=torch.stack, ) @@ -256,6 +276,7 @@ def _get_premade_batches_datapipe(self, subdir, shuffle=False): batch_pipeline = PivotDictList(sample_pipeline.batch(self.batch_size)) batch_pipeline = DictApply( batch_pipeline, + effective_capacity=torch.stack, pvnet_outputs=torch.stack, national_targets=torch.stack, times=torch.stack, diff --git a/pvnet_summation/models/base_model.py b/pvnet_summation/models/base_model.py index 16debf4..4ffe49c 100644 --- a/pvnet_summation/models/base_model.py +++ b/pvnet_summation/models/base_model.py @@ -16,9 +16,6 @@ from pvnet_summation.utils import plot_forecasts -# from pvnet.models.base_model import BaseModel as PVNetBaseModel - - logger = logging.getLogger(__name__) activities = [torch.profiler.ProfilerActivity.CPU] @@ -31,7 +28,6 @@ class BaseModel(PVNetBaseModel): def __init__( self, - forecast_minutes: int, model_name: str, model_version: Optional[str], optimizer: AbstractOptimizer, @@ -40,7 +36,6 @@ def __init__( """Abtstract base class for PVNet summation submodels. Args: - forecast_minutes (int): Length of the GSP forecast period in minutes model_name: Model path either locally or on huggingface. model_version: Model version if using huggingface. Set to None if using local. optimizer (AbstractOptimizer): Optimizer @@ -50,46 +45,58 @@ def __init__( pl.LightningModule.__init__(self) PVNetModelHubMixin.__init__(self) + self.pvnet_model = PVNetBaseModel.from_pretrained( + model_name, + revision=model_version, + ) + self.pvnet_model.requires_grad_(False) + self._optimizer = optimizer # Model must have lr to allow tuning # This setting is only used when lr is tuned with callback self.lr = None - self.forecast_minutes = forecast_minutes + self.forecast_minutes = self.pvnet_model.forecast_minutes self.output_quantiles = output_quantiles # Number of timestemps for 30 minutely data - self.forecast_len_30 = forecast_minutes // 30 + self.forecast_len_30 = self.forecast_minutes // 30 self.weighted_losses = WeightedLosses(forecast_length=self.forecast_len_30) self._accumulated_metrics = MetricAccumulator() self._accumulated_y = PredAccumulator() self._accumulated_y_hat = PredAccumulator() + self._accumulated_y_sum = PredAccumulator() self._accumulated_times = PredAccumulator() - self.pvnet_model = PVNetBaseModel.from_pretrained( - model_name, - revision=model_version, - ) - self.pvnet_model.requires_grad_(False) - def predict_pvnet_batch(self, batch): + """Use PVNet model to create predictions for batch""" gsp_batches = [] for sample in batch: preds = self.pvnet_model(sample) gsp_batches += [preds] return torch.stack(gsp_batches) + def sum_of_gsps(self, x): + """Compute the sume of the GSP-level predictions""" + if self.pvnet_model.use_quantile_regression: + y_hat = self.pvnet_model._quantiles_to_prediction(x["pvnet_outputs"]) + else: + y_hat = x["pvnet_outputs"] + + return (y_hat * x["effective_capacity"]).sum(dim=1) + @property def pvnet_output_shape(self): + """Return the expected shape of the PVNet outputs""" if self.pvnet_model.use_quantile_regression: return (317, self.pvnet_model.forecast_len_30, len(self.pvnet_model.output_quantiles)) else: return (317, self.pvnet_model.forecast_len_30) - def _training_accumulate_log(self, batch_idx, losses, y_hat, y, times): + def _training_accumulate_log(self, batch_idx, losses, y_hat, y, y_sum, times): """Internal function to accumulate training batches and log results. This is used when accummulating grad batches. Should make the variability in logged training @@ -103,12 +110,14 @@ def _training_accumulate_log(self, batch_idx, losses, y_hat, y, times): self._accumulated_metrics.append(losses) self._accumulated_y_hat.append(y_hat) self._accumulated_y.append(y) + self._accumulated_y_sum.append(y_sum) self._accumulated_times.append(times) if not self.trainer.fit_loop._should_accumulate(): losses = self._accumulated_metrics.flush() y_hat = self._accumulated_y_hat.flush() y = self._accumulated_y.flush() + y_sum = self._accumulated_y_sum.flush() times = self._accumulated_times.flush() self.log_dict( @@ -123,7 +132,14 @@ def _training_accumulate_log(self, batch_idx, losses, y_hat, y, times): # We only create the figure every 8 log steps # This was reduced as it was creating figures too often if grad_batch_num % (8 * self.trainer.log_every_n_steps) == 0: - fig = plot_forecasts(y, y_hat, times, batch_idx, quantiles=self.output_quantiles) + fig = plot_forecasts( + y, + y_hat, + times, + batch_idx, + quantiles=self.output_quantiles, + y_sum=y_sum, + ) fig.savefig("latest_logged_train_batch.png") def training_step(self, batch, batch_idx): @@ -132,11 +148,12 @@ def training_step(self, batch, batch_idx): y_hat = self.forward(batch) y = batch["national_targets"] times = batch["times"] + y_sum = self.sum_of_gsps(batch) losses = self._calculate_common_losses(y, y_hat) losses = {f"{k}/train": v for k, v in losses.items()} - self._training_accumulate_log(batch_idx, losses, y_hat, y, times) + self._training_accumulate_log(batch_idx, losses, y_hat, y, y_sum, times) if self.use_quantile_regression: opt_target = losses["quantile_loss/train"] @@ -150,6 +167,7 @@ def validation_step(self, batch: dict, batch_idx): y_hat = self.forward(batch) y = batch["national_targets"] times = batch["times"] + y_sum = self.sum_of_gsps(batch) losses = self._calculate_common_losses(y, y_hat) losses.update(self._calculate_val_losses(y, y_hat)) @@ -169,19 +187,29 @@ def validation_step(self, batch: dict, batch_idx): if not hasattr(self, "_val_y_hats"): self._val_y_hats = PredAccumulator() self._val_y = PredAccumulator() + self._val_y_sum = PredAccumulator() self._val_times = PredAccumulator() self._val_y_hats.append(y_hat) self._val_y.append(y) + self._val_y_sum.append(y_sum) self._val_times.append(times) # if batch had accumulated if (batch_idx + 1) % self.trainer.accumulate_grad_batches == 0: y_hat = self._val_y_hats.flush() y = self._val_y.flush() + y_sum = self._val_y_sum.flush() times = self._val_times.flush() - fig = plot_forecasts(y, y_hat, times, batch_idx, quantiles=self.output_quantiles) + fig = plot_forecasts( + y, + y_hat, + times, + batch_idx, + quantiles=self.output_quantiles, + y_sum=y_sum, + ) self.logger.experiment.log( { @@ -190,6 +218,7 @@ def validation_step(self, batch: dict, batch_idx): ) del self._val_y_hats del self._val_y + del self._val_y_sum del self._val_times return logged_losses diff --git a/pvnet_summation/models/model.py b/pvnet_summation/models/model.py index 9214508..42b426f 100644 --- a/pvnet_summation/models/model.py +++ b/pvnet_summation/models/model.py @@ -5,12 +5,16 @@ import numpy as np import pvnet import torch +import torch.nn.functional as F from pvnet.models.multimodal.linear_networks.basic_blocks import AbstractLinearNetwork from pvnet.models.multimodal.linear_networks.networks import DefaultFCNet from pvnet.optimizers import AbstractOptimizer +from torch import nn from pvnet_summation.models.base_model import BaseModel +_default_optimizer = pvnet.optimizers.Adam() + class Model(BaseModel): """Neural network which combines GSP predictions from PVNet""" @@ -20,52 +24,89 @@ class Model(BaseModel): def __init__( self, model_name: str, - forecast_minutes: int, model_version: Optional[str], output_quantiles: Optional[list[float]] = None, output_network: AbstractLinearNetwork = DefaultFCNet, - output_network_kwargs: dict = dict(), - optimizer: AbstractOptimizer = pvnet.optimizers.Adam(), + output_network_kwargs: Optional[dict] = None, + scale_pvnet_outputs: bool = False, + predict_difference_from_sum: bool = False, + optimizer: AbstractOptimizer = _default_optimizer, ): """Neural network which combines GSP predictions from PVNet Args: + model_name: Model path either locally or on huggingface. + model_version: Model version if using huggingface. Set to None if using local. output_quantiles: A list of float (0.0, 1.0) quantiles to predict values for. If set to None the output is a single value. output_network: Pytorch Module class used to combine the 1D features to produce the forecast. output_network_kwargs: Dictionary of optional kwargs for the `output_network` module. - model_name: Model path either locally or on huggingface. - model_version: Model version if using huggingface. Set to None if using local. - forecast_minutes (int): Length of the GSP forecast period in minutes + scale_pvnet_outputs: If true, the PVNet predictions are scaled by the capacities. + predict_difference_from_sum: Whether to use the sum of GSPs as an estimate for the + national sum and train the model to correct this estimate. Otherwise the model tries + to learn the national sum from the PVNet outputs directly. optimizer (AbstractOptimizer): Optimizer """ - super().__init__(forecast_minutes, model_name, model_version, optimizer, output_quantiles) + super().__init__(model_name, model_version, optimizer, output_quantiles) + + self.scale_pvnet_outputs = scale_pvnet_outputs + self.predict_difference_from_sum = predict_difference_from_sum - in_features = np.product(self.pvnet_output_shape) + if output_network_kwargs is None: + output_network_kwargs = dict() self.model = output_network( - in_features=in_features, + in_features=np.product(self.pvnet_output_shape), out_features=self.num_output_features, **output_network_kwargs, ) + # Add linear layer if predicting difference from sum + # This allows difference to be positive or negative + if predict_difference_from_sum: + self.model = nn.Sequential( + self.model, nn.Linear(self.num_output_features, self.num_output_features) + ) + self.save_hyperparameters() def forward(self, x): """Run model forward""" - if "pvnet_outputs" in x: - pvnet_out = x["pvnet_outputs"] + if "pvnet_outputs" not in x: + x["pvnet_outputs"] = self.predict_pvnet_batch(x["pvnet_inputs"]) + + if self.scale_pvnet_outputs: + if self.pvnet_model.use_quantile_regression: + eff_cap = x["effective_capacity"].unsqueeze(-1) + else: + eff_cap = x["effective_capacity"] + x_in = x["pvnet_outputs"] * eff_cap else: - pvnet_out = self.predict_pvnet_batch(x["pvnet_inputs"]) + x_in = x["pvnet_outputs"] - pvnet_out = torch.flatten(pvnet_out, start_dim=1) - out = self.model(pvnet_out) + x_in = torch.flatten(x_in, start_dim=1) + out = self.model(x_in) if self.use_quantile_regression: # Shape: batch_size, seq_length * num_quantiles out = out.reshape(out.shape[0], self.forecast_len_30, len(self.output_quantiles)) + if self.predict_difference_from_sum: + # If PVNet model uses quantiles, need to expand to match shape + if self.pvnet_model.use_quantile_regression: + eff_cap = x["effective_capacity"].unsqueeze(-1) + else: + eff_cap = x["effective_capacity"] + + gsp_sum = (x["pvnet_outputs"] * eff_cap).sum(dim=1) + + # + if self.use_quantile_regression: + gsp_sum = gsp_sum.unsqueeze(-1) + + out = F.leaky_relu(gsp_sum + out) + return out diff --git a/pvnet_summation/utils.py b/pvnet_summation/utils.py index e9c7b79..9f7d179 100644 --- a/pvnet_summation/utils.py +++ b/pvnet_summation/utils.py @@ -5,13 +5,14 @@ import pylab -def plot_forecasts(y, y_hat, times, batch_idx=None, quantiles=None): +def plot_forecasts(y, y_hat, times, batch_idx=None, quantiles=None, y_sum=None): """Plot a batch of data and the forecast from that batch""" times_utc = times.cpu().numpy().squeeze().astype("datetime64[s]") times_utc = [pd.to_datetime(t) for t in times_utc] y = y.cpu().numpy() y_hat = y_hat.cpu().numpy() + y_sum = y_sum.cpu().numpy() if (y_sum is not None) else None batch_size = y.shape[0] @@ -21,8 +22,14 @@ def plot_forecasts(y, y_hat, times, batch_idx=None, quantiles=None): if i >= batch_size: ax.axis("off") continue + ax.plot(times_utc[i], y[i], marker=".", color="k", label=r"$y$") + if y_sum is not None: + ax.plot( + times_utc[i], y_sum[i], marker=".", linestyle="--", color="k", label=r"$y_{sum}$" + ) + if quantiles is None: ax.plot(times_utc[i], y_hat[i], marker=".", color="r", label=r"$\hat{y}$") else: diff --git a/tests/conftest.py b/tests/conftest.py index dfdffa7..a3343e7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -107,7 +107,6 @@ def sample_batch(sample_datamodule): @pytest.fixture() def model_kwargs(): kwargs = dict( - forecast_minutes=480, model_name="openclimatefix/pvnet_v2", model_version="898630f3f8cd4e8506525d813dd61c6d8de86144", ) diff --git a/tests/models/test_model.py b/tests/models/test_model.py index f4bc30d..aec06e8 100644 --- a/tests/models/test_model.py +++ b/tests/models/test_model.py @@ -3,7 +3,7 @@ def test_model_forward(model, sample_batch): - y = model.forward(sample_batch["pvnet_inputs"]) + y = model.forward(sample_batch) # check output is the correct shape # batch size=2, forecast_len=16 @@ -13,14 +13,14 @@ def test_model_forward(model, sample_batch): def test_model_backward(model, sample_batch): opt = SGD(model.parameters(), lr=0.001) - y = model(sample_batch["pvnet_inputs"]) + y = model(sample_batch) # Backwards on sum drives sum to zero y.sum().backward() def test_quantile_model_forward(quantile_model, sample_batch): - y_quantiles = quantile_model(sample_batch["pvnet_inputs"]) + y_quantiles = quantile_model(sample_batch) # check output is the correct shape # batch size=2, forecast_len=15, num_quantiles=3 @@ -30,7 +30,7 @@ def test_quantile_model_forward(quantile_model, sample_batch): def test_quantile_model_backward(quantile_model, sample_batch): opt = SGD(quantile_model.parameters(), lr=0.001) - y_quantiles = quantile_model(sample_batch["pvnet_inputs"]) + y_quantiles = quantile_model(sample_batch) # Backwards on sum drives sum to zero y_quantiles.sum().backward()