From abea008cf538281b93fda5c9b685e7bf7ac5a969 Mon Sep 17 00:00:00 2001 From: James Fulton Date: Mon, 24 Jul 2023 08:50:51 +0000 Subject: [PATCH 1/7] add deviation from GSP sum as option to model --- configs/model/default.yaml | 7 +-- pvnet_summation/data/datamodule.py | 30 ++++++++++--- pvnet_summation/models/base_model.py | 57 ++++++++++++++++-------- pvnet_summation/models/model.py | 65 +++++++++++++++++++++------- pvnet_summation/utils.py | 6 ++- 5 files changed, 121 insertions(+), 44 deletions(-) diff --git a/configs/model/default.yaml b/configs/model/default.yaml index 0418c70..a2e8beb 100644 --- a/configs/model/default.yaml +++ b/configs/model/default.yaml @@ -14,13 +14,10 @@ output_network: _partial_: True output_network_kwargs: fc_hidden_features: 128 - n_res_blocks: 6 + n_res_blocks: 2 res_block_layers: 2 dropout_frac: 0.0 - - -# Foreast and time settings -forecast_minutes: 480 +predict_difference_from_sum: False # ---------------------------------------------- diff --git a/pvnet_summation/data/datamodule.py b/pvnet_summation/data/datamodule.py index b91d0c4..07e6510 100644 --- a/pvnet_summation/data/datamodule.py +++ b/pvnet_summation/data/datamodule.py @@ -113,6 +113,14 @@ def __iter__(self): for outputs in self.source_datapipes: yield {key: value for key, value in zip(self.keys, outputs)} + +def get_capacity(batch): + """Extract the capacity from the numpy batch""" + return batch[BatchKey.gsp_effective_capacity_mwp] + +def divide(args): + return args[0]/args[1] + class DataModule(LightningDataModule): """Datamodule for training pvnet_summation.""" @@ -171,24 +179,34 @@ def _get_premade_batches_datapipe(self, subdir, shuffle=False, add_filename=Fals ) sample_pipeline, sample_pipeline_copy = sample_pipeline.fork(2, buffer_size=5) + times_datapipe = GetBatchTime(sample_pipeline_copy) + + times_datapipe, times_datapipe_copy = times_datapipe.fork(2, buffer_size=5) + national_targets_datapipe = GetNationalPVLive(gsp_data, times_datapipe_copy) - times_datapipe, times_datapipe_copy = ( - GetBatchTime(sample_pipeline_copy).fork(2, buffer_size=5) + times_datapipe, times_datapipe_copy = times_datapipe.fork(2, buffer_size=5) + national_capacity_datapipe = GetNationalPVLive( + gsp_data.effective_capacity_mwp, + times_datapipe_copy ) + sample_pipeline, sample_pipeline_copy = sample_pipeline.fork(2, buffer_size=5) + gsp_capacity_pipeline = sample_pipeline_copy.map(get_capacity) - national_targets_datapipe = GetNationalPVLive(gsp_data, times_datapipe_copy) + capacity_pipeline = gsp_capacity_pipeline.zip(national_capacity_datapipe).map(divide) # Compile the samples if add_filename: data_pipeline = ZipperDict( pvnet_inputs = sample_pipeline, + effective_capacity = capacity_pipeline, national_targets = national_targets_datapipe, - times = times_datapipe, + times = times_datapipe, filepath = file_pipeline_copy, ) else: data_pipeline = ZipperDict( pvnet_inputs = sample_pipeline, + effective_capacity = capacity_pipeline, national_targets = national_targets_datapipe, times = times_datapipe, ) @@ -197,7 +215,8 @@ def _get_premade_batches_datapipe(self, subdir, shuffle=False, add_filename=Fals data_pipeline = PivotDictList(data_pipeline.batch(self.batch_size)) data_pipeline = DictApply( - data_pipeline, + data_pipeline, + effective_capacity=torch.stack, national_targets=torch.stack, times=torch.stack, ) @@ -276,6 +295,7 @@ def _get_premade_batches_datapipe(self, subdir, shuffle=False): batch_pipeline = PivotDictList(sample_pipeline.batch(self.batch_size)) batch_pipeline = DictApply( batch_pipeline, + effective_capacity=torch.stack, pvnet_outputs=torch.stack, national_targets=torch.stack, times=torch.stack, diff --git a/pvnet_summation/models/base_model.py b/pvnet_summation/models/base_model.py index dec091f..f67bddd 100644 --- a/pvnet_summation/models/base_model.py +++ b/pvnet_summation/models/base_model.py @@ -37,7 +37,6 @@ class BaseModel(PVNetBaseModel): def __init__( self, - forecast_minutes: int, model_name: str, model_version: Optional[str], optimizer: AbstractOptimizer, @@ -46,7 +45,6 @@ def __init__( """Abtstract base class for PVNet summation submodels. Args: - forecast_minutes (int): Length of the GSP forecast period in minutes model_name: Model path either locally or on huggingface. model_version: Model version if using huggingface. Set to None if using local. optimizer (AbstractOptimizer): Optimizer @@ -54,33 +52,35 @@ def __init__( None the output is a single value. """ pl.LightningModule.__init__(self) - PVNetModelHubMixin.__init__(self) - - self._optimizer = optimizer + PVNetModelHubMixin.__init__(self) + + self.pvnet_model = PVNetBaseModel.from_pretrained( + model_name, + revision=model_version, + ) + self.pvnet_model.requires_grad_(False) + + self._optimizer = optimizer # Model must have lr to allow tuning # This setting is only used when lr is tuned with callback self.lr = None - self.forecast_minutes = forecast_minutes + self.forecast_minutes = self.pvnet_model.forecast_minutes self.output_quantiles = output_quantiles # Number of timestemps for 30 minutely data - self.forecast_len_30 = forecast_minutes // 30 + self.forecast_len_30 = self.forecast_minutes // 30 self.weighted_losses = WeightedLosses(forecast_length=self.forecast_len_30) self._accumulated_metrics = MetricAccumulator() self._accumulated_y = PredAccumulator() self._accumulated_y_hat = PredAccumulator() + self._accumulated_y_sum = PredAccumulator() self._accumulated_times = PredAccumulator() - self.pvnet_model = PVNetBaseModel.from_pretrained( - model_name, - revision=model_version, - ) - self.pvnet_model.requires_grad_(False) - + def predict_pvnet_batch(self, batch): gsp_batches = [] for sample in batch: @@ -88,6 +88,14 @@ def predict_pvnet_batch(self, batch): gsp_batches += [preds] return torch.stack(gsp_batches) + def sum_of_gsps(self, x): + if self.pvnet_model.use_quantile_regression: + y_hat = self.pvnet_model._quantiles_to_prediction(x["pvnet_outputs"]) + else: + y_hat = x["pvnet_outputs"] + + return (y_hat * x["effective_capacity"]).sum(dim=1) + @property def pvnet_output_shape(self): if self.pvnet_model.use_quantile_regression: @@ -95,8 +103,7 @@ def pvnet_output_shape(self): else: return (317, self.pvnet_model.forecast_len_30) - - def _training_accumulate_log(self, batch_idx, losses, y_hat, y, times): + def _training_accumulate_log(self, batch_idx, losses, y_hat, y, y_sum, times): """Internal function to accumulate training batches and log results. This is used when accummulating grad batches. Should make the variability in logged training @@ -110,12 +117,14 @@ def _training_accumulate_log(self, batch_idx, losses, y_hat, y, times): self._accumulated_metrics.append(losses) self._accumulated_y_hat.append(y_hat) self._accumulated_y.append(y) + self._accumulated_y_sum.append(y_sum) self._accumulated_times.append(times) if not self.trainer.fit_loop._should_accumulate(): losses = self._accumulated_metrics.flush() y_hat = self._accumulated_y_hat.flush() y = self._accumulated_y.flush() + y_sum = self._accumulated_y_sum.flush() times = self._accumulated_times.flush() self.log_dict( @@ -130,7 +139,10 @@ def _training_accumulate_log(self, batch_idx, losses, y_hat, y, times): # We only create the figure every 8 log steps # This was reduced as it was creating figures too often if grad_batch_num % (8 * self.trainer.log_every_n_steps) == 0: - fig = plot_forecasts(y, y_hat, times, batch_idx, quantiles=self.output_quantiles) + fig = plot_forecasts(y, y_hat, times, batch_idx, + quantiles=self.output_quantiles, + y_sum=y_sum, + ) fig.savefig("latest_logged_train_batch.png") def training_step(self, batch, batch_idx): @@ -139,11 +151,12 @@ def training_step(self, batch, batch_idx): y_hat = self.forward(batch) y = batch["national_targets"] times = batch["times"] + y_sum = self.sum_of_gsps(batch) losses = self._calculate_common_losses(y, y_hat) losses = {f"{k}/train": v for k, v in losses.items()} - self._training_accumulate_log(batch_idx, losses, y_hat, y, times) + self._training_accumulate_log(batch_idx, losses, y_hat, y, y_sum, times) if self.use_quantile_regression: opt_target = losses["quantile_loss/train"] @@ -157,6 +170,7 @@ def validation_step(self, batch: dict, batch_idx): y_hat = self.forward(batch) y = batch["national_targets"] times = batch["times"] + y_sum = self.sum_of_gsps(batch) losses = self._calculate_common_losses(y, y_hat) losses.update(self._calculate_val_losses(y, y_hat)) @@ -176,19 +190,25 @@ def validation_step(self, batch: dict, batch_idx): if not hasattr(self, "_val_y_hats"): self._val_y_hats = PredAccumulator() self._val_y = PredAccumulator() + self._val_y_sum = PredAccumulator() self._val_times = PredAccumulator() self._val_y_hats.append(y_hat) self._val_y.append(y) + self._val_y_sum.append(y_sum) self._val_times.append(times) # if batch had accumulated if (batch_idx + 1) % self.trainer.accumulate_grad_batches == 0: y_hat = self._val_y_hats.flush() y = self._val_y.flush() + y_sum = self._val_y_sum.flush() times = self._val_times.flush() - fig = plot_forecasts(y, y_hat, times, batch_idx, quantiles=self.output_quantiles) + fig = plot_forecasts(y, y_hat, times, batch_idx, + quantiles=self.output_quantiles, + y_sum=y_sum, + ) self.logger.experiment.log( { @@ -197,6 +217,7 @@ def validation_step(self, batch: dict, batch_idx): ) del self._val_y_hats del self._val_y + del self._val_y_sum del self._val_times return logged_losses diff --git a/pvnet_summation/models/model.py b/pvnet_summation/models/model.py index 901621e..479191b 100644 --- a/pvnet_summation/models/model.py +++ b/pvnet_summation/models/model.py @@ -4,6 +4,8 @@ import numpy as np import torch +import torch.nn.functional as F +from torch import nn import pvnet from pvnet_summation.models.base_model import BaseModel @@ -16,7 +18,6 @@ class Model(BaseModel): """Neural network which combines GSP predictions from PVNet - """ name = "pvnet_summation_model" @@ -24,43 +25,55 @@ class Model(BaseModel): def __init__( self, model_name: str, - forecast_minutes: int, model_version: Optional[str], output_quantiles: Optional[list[float]] = None, output_network: AbstractLinearNetwork = DefaultFCNet, output_network_kwargs: dict = dict(), + scale_pvnet_outputs: bool = False, + predict_difference_from_sum: bool = False, optimizer: AbstractOptimizer = pvnet.optimizers.Adam(), ): """Neural network which combines GSP predictions from PVNet Args: + model_name: Model path either locally or on huggingface. + model_version: Model version if using huggingface. Set to None if using local. output_quantiles: A list of float (0.0, 1.0) quantiles to predict values for. If set to None the output is a single value. output_network: Pytorch Module class used to combine the 1D features to produce the forecast. output_network_kwargs: Dictionary of optional kwargs for the `output_network` module. - model_name: Model path either locally or on huggingface. - model_version: Model version if using huggingface. Set to None if using local. - forecast_minutes (int): Length of the GSP forecast period in minutes + scale_pvnet_outputs: If true, the PVNet predictions are scaled by the capacities. + predict_difference_from_sum: Whether to use the sum of GSPs as an estimate for the + national sum and train the model to correct this estimate. Otherwise the model tries + to learn the national sum from the PVNet outputs directly. optimizer (AbstractOptimizer): Optimizer """ - super().__init__( - forecast_minutes, + super().__init__( model_name, model_version, optimizer, output_quantiles ) - - in_features = np.product(self.pvnet_output_shape) + + self.scale_pvnet_outputs = scale_pvnet_outputs + self.predict_difference_from_sum = predict_difference_from_sum self.model = output_network( - in_features=in_features, + in_features=np.product(self.pvnet_output_shape), out_features=self.num_output_features, **output_network_kwargs, ) + + # Add linear layer if predicting difference from sum + # This allows difference to be positive or negative + if predict_difference_from_sum: + self.model = nn.Sequential( + self.model, + nn.Linear(self.num_output_features, self.num_output_features) + ) self.save_hyperparameters() @@ -68,17 +81,39 @@ def __init__( def forward(self, x): """Run model forward""" - if "pvnet_outputs" in x: - pvnet_out = x["pvnet_outputs"] + if "pvnet_outputs" not in x: + x["pvnet_outputs"] = self.predict_pvnet_batch(x['pvnet_inputs']) + + if self.scale_pvnet_outputs: + if self.pvnet_model.use_quantile_regression: + eff_cap = x["effective_capacity"].unsqueeze(-1) + else: + eff_cap = x["effective_capacity"] + x_in = x["pvnet_outputs"]*eff_cap else: - pvnet_out = self.predict_pvnet_batch(x['pvnet_inputs']) + x_in = x["pvnet_outputs"] - pvnet_out = torch.flatten(pvnet_out, start_dim=1) - out = self.model(pvnet_out) + x_in = torch.flatten(x_in, start_dim=1) + out = self.model(x_in) if self.use_quantile_regression: # Shape: batch_size, seq_length * num_quantiles out = out.reshape(out.shape[0], self.forecast_len_30, len(self.output_quantiles)) + if self.predict_difference_from_sum: + # If PVNet model uses quantiles, need to expand to match shape + if self.pvnet_model.use_quantile_regression: + eff_cap = x["effective_capacity"].unsqueeze(-1) + else: + eff_cap = x["effective_capacity"] + + gsp_sum = (x["pvnet_outputs"] * eff_cap).sum(dim=1) + + # + if self.use_quantile_regression: + gsp_sum = gsp_sum.unsqueeze(-1) + + out = F.leaky_relu(gsp_sum + out) + return out diff --git a/pvnet_summation/utils.py b/pvnet_summation/utils.py index 711dc3e..5cf45f7 100644 --- a/pvnet_summation/utils.py +++ b/pvnet_summation/utils.py @@ -5,7 +5,7 @@ import pylab -def plot_forecasts(y, y_hat, times, batch_idx=None, quantiles=None): +def plot_forecasts(y, y_hat, times, batch_idx=None, quantiles=None, y_sum=None): """Plot a batch of data and the forecast from that batch""" @@ -13,6 +13,7 @@ def plot_forecasts(y, y_hat, times, batch_idx=None, quantiles=None): times_utc = [pd.to_datetime(t) for t in times_utc] y = y.cpu().numpy() y_hat = y_hat.cpu().numpy() + y_sum = y_sum.cpu().numpy() if (y_sum is not None) else None batch_size = y.shape[0] @@ -22,7 +23,10 @@ def plot_forecasts(y, y_hat, times, batch_idx=None, quantiles=None): if i >= batch_size: ax.axis("off") continue + ax.plot(times_utc[i], y[i], marker=".", color="k", label=r"$y$") + if y_sum is not None: + ax.plot(times_utc[i], y_sum[i], marker=".", linestyle="--", color="k", label=r"$y_{sum}$") if quantiles is None: ax.plot( From 73e9607512d9a06580e944ec57308498e427a71a Mon Sep 17 00:00:00 2001 From: James Fulton Date: Mon, 24 Jul 2023 08:51:50 +0000 Subject: [PATCH 2/7] update configs --- configs/datamodule/default.yaml | 2 +- configs/trainer/default.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/configs/datamodule/default.yaml b/configs/datamodule/default.yaml index 3ed7921..510d54a 100644 --- a/configs/datamodule/default.yaml +++ b/configs/datamodule/default.yaml @@ -1,6 +1,6 @@ _target_: pvnet_summation.data.datamodule.DataModule batch_dir: "/mnt/disks/bigbatches/concurrent_batches_v3.6_-60mins" gsp_zarr_path: "/mnt/disks/nwp/pv_gsp.zarr" -batch_size: 8 +batch_size: 32 num_workers: 20 prefetch_factor: 2 \ No newline at end of file diff --git a/configs/trainer/default.yaml b/configs/trainer/default.yaml index 5f05108..dd0d7c4 100644 --- a/configs/trainer/default.yaml +++ b/configs/trainer/default.yaml @@ -11,7 +11,7 @@ num_sanity_val_steps: 8 fast_dev_run: false #profiler: 'simple' -accumulate_grad_batches: 4 +#accumulate_grad_batches: 4 #val_check_interval: 800 #limit_val_batches: 800 log_every_n_steps: 50 From 6205530cc7a9e6c31c95f4e25f3c5b86b323e645 Mon Sep 17 00:00:00 2001 From: James Fulton Date: Mon, 24 Jul 2023 10:00:42 +0000 Subject: [PATCH 3/7] update tests --- tests/conftest.py | 1 - tests/models/test_model.py | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 1ffb578..5abc0ab 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -110,7 +110,6 @@ def sample_batch(sample_datamodule): @pytest.fixture() def model_kwargs(): kwargs = dict( - forecast_minutes=480, model_name= "openclimatefix/pvnet_v2", model_version= "898630f3f8cd4e8506525d813dd61c6d8de86144", ) diff --git a/tests/models/test_model.py b/tests/models/test_model.py index f4bc30d..aec06e8 100644 --- a/tests/models/test_model.py +++ b/tests/models/test_model.py @@ -3,7 +3,7 @@ def test_model_forward(model, sample_batch): - y = model.forward(sample_batch["pvnet_inputs"]) + y = model.forward(sample_batch) # check output is the correct shape # batch size=2, forecast_len=16 @@ -13,14 +13,14 @@ def test_model_forward(model, sample_batch): def test_model_backward(model, sample_batch): opt = SGD(model.parameters(), lr=0.001) - y = model(sample_batch["pvnet_inputs"]) + y = model(sample_batch) # Backwards on sum drives sum to zero y.sum().backward() def test_quantile_model_forward(quantile_model, sample_batch): - y_quantiles = quantile_model(sample_batch["pvnet_inputs"]) + y_quantiles = quantile_model(sample_batch) # check output is the correct shape # batch size=2, forecast_len=15, num_quantiles=3 @@ -30,7 +30,7 @@ def test_quantile_model_forward(quantile_model, sample_batch): def test_quantile_model_backward(quantile_model, sample_batch): opt = SGD(quantile_model.parameters(), lr=0.001) - y_quantiles = quantile_model(sample_batch["pvnet_inputs"]) + y_quantiles = quantile_model(sample_batch) # Backwards on sum drives sum to zero y_quantiles.sum().backward() From a3cba0b825d1b65d5be650e71dc7ea04f323a975 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 24 Jul 2023 10:04:00 +0000 Subject: [PATCH 4/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- configs/config.yaml | 4 +- configs/datamodule/default.yaml | 2 +- pvnet_summation/__init__.py | 2 +- pvnet_summation/data/datamodule.py | 167 ++++++++++++--------------- pvnet_summation/models/base_model.py | 66 +++++------ pvnet_summation/models/model.py | 62 ++++------ pvnet_summation/training.py | 40 +++---- pvnet_summation/utils.py | 13 +-- requirements.txt | 2 +- run.py | 5 +- tests/conftest.py | 39 +++---- tests/data/test_datamodule.py | 32 ++--- tests/test_end2end.py | 2 +- 13 files changed, 197 insertions(+), 239 deletions(-) diff --git a/configs/config.yaml b/configs/config.yaml index 08c8fb4..0cb36a1 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -10,8 +10,8 @@ defaults: - hydra: default.yaml # Whether to loop through the PVNet outputs and save them out before training -presave_pvnet_outputs: True - +presave_pvnet_outputs: + True # enable color logging # - override hydra/hydra_logging: colorlog diff --git a/configs/datamodule/default.yaml b/configs/datamodule/default.yaml index 510d54a..d3b23b3 100644 --- a/configs/datamodule/default.yaml +++ b/configs/datamodule/default.yaml @@ -3,4 +3,4 @@ batch_dir: "/mnt/disks/bigbatches/concurrent_batches_v3.6_-60mins" gsp_zarr_path: "/mnt/disks/nwp/pv_gsp.zarr" batch_size: 32 num_workers: 20 -prefetch_factor: 2 \ No newline at end of file +prefetch_factor: 2 diff --git a/pvnet_summation/__init__.py b/pvnet_summation/__init__.py index ed2c3c5..ed53582 100644 --- a/pvnet_summation/__init__.py +++ b/pvnet_summation/__init__.py @@ -1 +1 @@ -"""PVNet_summation""" \ No newline at end of file +"""PVNet_summation""" diff --git a/pvnet_summation/data/datamodule.py b/pvnet_summation/data/datamodule.py index 07e6510..3fc1b9d 100644 --- a/pvnet_summation/data/datamodule.py +++ b/pvnet_summation/data/datamodule.py @@ -2,36 +2,30 @@ import torch from lightning.pytorch import LightningDataModule -from torchdata.dataloader2 import DataLoader2, MultiProcessingReadingService -from torchdata.datapipes.iter import FileLister, IterDataPipe -from ocf_datapipes.utils.consts import BatchKey from ocf_datapipes.load import OpenGSP from ocf_datapipes.training.pvnet import normalize_gsp -from torchdata.datapipes.iter import Zipper +from ocf_datapipes.utils.consts import BatchKey +from torchdata.dataloader2 import DataLoader2, MultiProcessingReadingService +from torchdata.datapipes.iter import FileLister, IterDataPipe, Zipper -from pvnet.data.datamodule import ( - copy_batch_to_device, - batch_to_tensor, - split_batches, -) # https://github.com/pytorch/pytorch/issues/973 -torch.multiprocessing.set_sharing_strategy('file_system') +torch.multiprocessing.set_sharing_strategy("file_system") class GetNationalPVLive(IterDataPipe): """Select national output targets for given times""" + def __init__(self, gsp_data, times_datapipe): """Select national output targets for given times - + Args: gsp_data: xarray Dataarray of the national outputs times_datapipe: IterDataPipe yeilding arrays of target times. """ self.gsp_data = gsp_data self.times_datapipe = times_datapipe - + def __iter__(self): - gsp_data = self.gsp_data for times in self.times_datapipe: national_outputs = torch.as_tensor( @@ -42,54 +36,54 @@ def __iter__(self): class GetBatchTime(IterDataPipe): """Extract the valid times from the concurrent sample batch""" - + def __init__(self, sample_datapipe): """Extract the valid times from the concurrent sample batch - + Args: sample_datapipe: IterDataPipe yeilding concurrent sample batches """ self.sample_datapipe = sample_datapipe - + def __iter__(self): for sample in self.sample_datapipe: - # Times for each GSP in the sample batch should be the same - take first + # Times for each GSP in the sample batch should be the same - take first id0 = sample[BatchKey.gsp_t0_idx] - times = sample[BatchKey.gsp_time_utc][0, id0+1:] + times = sample[BatchKey.gsp_time_utc][0, id0 + 1 :] yield times - + class PivotDictList(IterDataPipe): """Convert list of dicts to dict of lists""" - + def __init__(self, source_datapipe): """Convert list of dicts to dict of lists - + Args: - source_datapipe: + source_datapipe: """ self.source_datapipe = source_datapipe - + def __iter__(self): for list_of_dicts in self.source_datapipe: keys = list_of_dicts[0].keys() batch_dict = {k: [d[k] for d in list_of_dicts] for k in keys} yield batch_dict - - + + class DictApply(IterDataPipe): """Apply functions to elements of a dictionary and return processed dictionary.""" - + def __init__(self, source_datapipe, **transforms): """Apply functions to elements of a dictionary and return processed dictionary. - + Args: source_datapipe: Datapipe which yields dicts **transforms: key-function pairs """ self.source_datapipe = source_datapipe self.transforms = transforms - + def __iter__(self): for d in self.source_datapipe: for key, function in self.transforms.items(): @@ -99,16 +93,16 @@ def __iter__(self): class ZipperDict(IterDataPipe): """Yield samples from multiple datapipes as a dict""" - + def __init__(self, **datapipes): """Yield samples from multiple datapipes as a dict. - + Args: **datapipes: Named datapipes """ self.keys = list(datapipes.keys()) self.source_datapipes = Zipper(*[datapipes[key] for key in self.keys]) - + def __iter__(self): for outputs in self.source_datapipes: yield {key: value for key, value in zip(self.keys, outputs)} @@ -118,10 +112,11 @@ def get_capacity(batch): """Extract the capacity from the numpy batch""" return batch[BatchKey.gsp_effective_capacity_mwp] + def divide(args): - return args[0]/args[1] + return args[0] / args[1] + - class DataModule(LightningDataModule): """Datamodule for training pvnet_summation.""" @@ -152,106 +147,93 @@ def __init__( multiprocessing_context="spawn", worker_prefetch_cnt=prefetch_factor, ) - + def _get_premade_batches_datapipe(self, subdir, shuffle=False, add_filename=False): - # Load presaved concurrent sample batches file_pipeline = FileLister(f"{self.batch_dir}/{subdir}", masks="*.pt", recursive=False) - + if shuffle: file_pipeline = file_pipeline.shuffle(buffer_size=1000) - + file_pipeline = file_pipeline.sharding_filter() - + if add_filename: file_pipeline, file_pipeline_copy = file_pipeline.fork(2, buffer_size=5) - + sample_pipeline = file_pipeline.map(torch.load) - + # Find national outout simultaneous to concurrent samples gsp_data = ( - next(iter( - OpenGSP(gsp_pv_power_zarr_path=self.gsp_zarr_path) - .map(normalize_gsp) - )) + next(iter(OpenGSP(gsp_pv_power_zarr_path=self.gsp_zarr_path).map(normalize_gsp))) .sel(gsp_id=0) .compute() ) - + sample_pipeline, sample_pipeline_copy = sample_pipeline.fork(2, buffer_size=5) times_datapipe = GetBatchTime(sample_pipeline_copy) - + times_datapipe, times_datapipe_copy = times_datapipe.fork(2, buffer_size=5) national_targets_datapipe = GetNationalPVLive(gsp_data, times_datapipe_copy) - + times_datapipe, times_datapipe_copy = times_datapipe.fork(2, buffer_size=5) national_capacity_datapipe = GetNationalPVLive( - gsp_data.effective_capacity_mwp, - times_datapipe_copy + gsp_data.effective_capacity_mwp, times_datapipe_copy ) sample_pipeline, sample_pipeline_copy = sample_pipeline.fork(2, buffer_size=5) gsp_capacity_pipeline = sample_pipeline_copy.map(get_capacity) - + capacity_pipeline = gsp_capacity_pipeline.zip(national_capacity_datapipe).map(divide) - + # Compile the samples if add_filename: data_pipeline = ZipperDict( - pvnet_inputs = sample_pipeline, - effective_capacity = capacity_pipeline, - national_targets = national_targets_datapipe, - times = times_datapipe, - filepath = file_pipeline_copy, + pvnet_inputs=sample_pipeline, + effective_capacity=capacity_pipeline, + national_targets=national_targets_datapipe, + times=times_datapipe, + filepath=file_pipeline_copy, ) else: data_pipeline = ZipperDict( - pvnet_inputs = sample_pipeline, - effective_capacity = capacity_pipeline, - national_targets = national_targets_datapipe, - times = times_datapipe, - ) - + pvnet_inputs=sample_pipeline, + effective_capacity=capacity_pipeline, + national_targets=national_targets_datapipe, + times=times_datapipe, + ) + if self.batch_size is not None: - data_pipeline = PivotDictList(data_pipeline.batch(self.batch_size)) data_pipeline = DictApply( data_pipeline, - effective_capacity=torch.stack, - national_targets=torch.stack, + effective_capacity=torch.stack, + national_targets=torch.stack, times=torch.stack, ) - + return data_pipeline - def train_dataloader(self, shuffle=True, add_filename=False): """Construct train dataloader""" datapipe = self._get_premade_batches_datapipe( - "train", - shuffle=shuffle, - add_filename=add_filename + "train", shuffle=shuffle, add_filename=add_filename ) rs = MultiProcessingReadingService(**self.readingservice_config) return DataLoader2(datapipe, reading_service=rs) - def val_dataloader(self, shuffle=False, add_filename=False): """Construct val dataloader""" datapipe = self._get_premade_batches_datapipe( - "val", - shuffle=shuffle, - add_filename=add_filename - ) + "val", shuffle=shuffle, add_filename=add_filename + ) rs = MultiProcessingReadingService(**self.readingservice_config) return DataLoader2(datapipe, reading_service=rs) - def test_dataloader(self): """Construct test dataloader""" raise NotImplementedError - - + + class PVNetPresavedDataModule(LightningDataModule): """Datamodule for loading pre-saved PVNet predictions to train pvnet_summation.""" @@ -279,35 +261,33 @@ def __init__( multiprocessing_context="spawn", worker_prefetch_cnt=prefetch_factor, ) - + def _get_premade_batches_datapipe(self, subdir, shuffle=False): - # Load presaved concurrent sample batches file_pipeline = FileLister(f"{self.batch_dir}/{subdir}", masks="*.pt", recursive=False) - + if shuffle: file_pipeline = file_pipeline.shuffle(buffer_size=1000) - - sample_pipeline = file_pipeline.sharding_filter().map(torch.load) - + + sample_pipeline = file_pipeline.sharding_filter().map(torch.load) + if self.batch_size is not None: - batch_pipeline = PivotDictList(sample_pipeline.batch(self.batch_size)) batch_pipeline = DictApply( batch_pipeline, - effective_capacity=torch.stack, + effective_capacity=torch.stack, pvnet_outputs=torch.stack, - national_targets=torch.stack, + national_targets=torch.stack, times=torch.stack, ) - + return batch_pipeline def train_dataloader(self, shuffle=True): """Construct train dataloader""" datapipe = self._get_premade_batches_datapipe( - "train", - shuffle=shuffle, + "train", + shuffle=shuffle, ) rs = MultiProcessingReadingService(**self.readingservice_config) @@ -316,13 +296,12 @@ def train_dataloader(self, shuffle=True): def val_dataloader(self, shuffle=False): """Construct val dataloader""" datapipe = self._get_premade_batches_datapipe( - "val", - shuffle=shuffle, - ) + "val", + shuffle=shuffle, + ) rs = MultiProcessingReadingService(**self.readingservice_config) return DataLoader2(datapipe, reading_service=rs) def test_dataloader(self): """Construct test dataloader""" raise NotImplementedError - diff --git a/pvnet_summation/models/base_model.py b/pvnet_summation/models/base_model.py index f67bddd..211818a 100644 --- a/pvnet_summation/models/base_model.py +++ b/pvnet_summation/models/base_model.py @@ -1,28 +1,22 @@ """Base model for all PVNet submodels""" -import json import logging -import os -from pathlib import Path -from typing import Dict, Optional, Union +from typing import Optional -import hydra +import lightning.pytorch as pl import torch import wandb from nowcasting_utils.models.loss import WeightedLosses -import lightning.pytorch as pl -from torch import nn -from pvnet.models.base_model import PVNetModelHubMixin, BaseModel as PVNetBaseModel - -from pvnet.models.utils import ( +from pvnet.models.base_model import BaseModel as PVNetBaseModel +from pvnet.models.base_model import PVNetModelHubMixin +from pvnet.models.utils import ( MetricAccumulator, PredAccumulator, ) - - from pvnet.optimizers import AbstractOptimizer + from pvnet_summation.utils import plot_forecasts -#from pvnet.models.base_model import BaseModel as PVNetBaseModel +# from pvnet.models.base_model import BaseModel as PVNetBaseModel logger = logging.getLogger(__name__) @@ -52,15 +46,15 @@ def __init__( None the output is a single value. """ pl.LightningModule.__init__(self) - PVNetModelHubMixin.__init__(self) - + PVNetModelHubMixin.__init__(self) + self.pvnet_model = PVNetBaseModel.from_pretrained( model_name, revision=model_version, ) self.pvnet_model.requires_grad_(False) - - self._optimizer = optimizer + + self._optimizer = optimizer # Model must have lr to allow tuning # This setting is only used when lr is tuned with callback @@ -79,7 +73,6 @@ def __init__( self._accumulated_y_hat = PredAccumulator() self._accumulated_y_sum = PredAccumulator() self._accumulated_times = PredAccumulator() - def predict_pvnet_batch(self, batch): gsp_batches = [] @@ -87,15 +80,15 @@ def predict_pvnet_batch(self, batch): preds = self.pvnet_model(sample) gsp_batches += [preds] return torch.stack(gsp_batches) - + def sum_of_gsps(self, x): if self.pvnet_model.use_quantile_regression: y_hat = self.pvnet_model._quantiles_to_prediction(x["pvnet_outputs"]) else: y_hat = x["pvnet_outputs"] - + return (y_hat * x["effective_capacity"]).sum(dim=1) - + @property def pvnet_output_shape(self): if self.pvnet_model.use_quantile_regression: @@ -139,15 +132,19 @@ def _training_accumulate_log(self, batch_idx, losses, y_hat, y, y_sum, times): # We only create the figure every 8 log steps # This was reduced as it was creating figures too often if grad_batch_num % (8 * self.trainer.log_every_n_steps) == 0: - fig = plot_forecasts(y, y_hat, times, batch_idx, - quantiles=self.output_quantiles, - y_sum=y_sum, + fig = plot_forecasts( + y, + y_hat, + times, + batch_idx, + quantiles=self.output_quantiles, + y_sum=y_sum, ) fig.savefig("latest_logged_train_batch.png") def training_step(self, batch, batch_idx): """Run training step""" - + y_hat = self.forward(batch) y = batch["national_targets"] times = batch["times"] @@ -163,10 +160,10 @@ def training_step(self, batch, batch_idx): else: opt_target = losses["MAE/train"] return opt_target - + def validation_step(self, batch: dict, batch_idx): """Run validation step""" - + y_hat = self.forward(batch) y = batch["national_targets"] times = batch["times"] @@ -205,9 +202,13 @@ def validation_step(self, batch: dict, batch_idx): y_sum = self._val_y_sum.flush() times = self._val_times.flush() - fig = plot_forecasts(y, y_hat, times, batch_idx, - quantiles=self.output_quantiles, - y_sum=y_sum, + fig = plot_forecasts( + y, + y_hat, + times, + batch_idx, + quantiles=self.output_quantiles, + y_sum=y_sum, ) self.logger.experiment.log( @@ -224,7 +225,7 @@ def validation_step(self, batch: dict, batch_idx): def test_step(self, batch, batch_idx): """Run test step""" - + y_hat = self.forward(batch) y = batch["national_targets"] @@ -241,10 +242,9 @@ def test_step(self, batch, batch_idx): return logged_losses - def configure_optimizers(self): """Configure the optimizers using learning rate found with LR finder if used""" if self.lr is not None: # Use learning rate found by learning rate finder callback self._optimizer.lr = self.lr - return self._optimizer(self.parameters()) \ No newline at end of file + return self._optimizer(self.parameters()) diff --git a/pvnet_summation/models/model.py b/pvnet_summation/models/model.py index 479191b..30e9486 100644 --- a/pvnet_summation/models/model.py +++ b/pvnet_summation/models/model.py @@ -3,22 +3,19 @@ from typing import Optional import numpy as np +import pvnet import torch import torch.nn.functional as F +from pvnet.models.multimodal.linear_networks.basic_blocks import AbstractLinearNetwork +from pvnet.models.multimodal.linear_networks.networks import DefaultFCNet +from pvnet.optimizers import AbstractOptimizer from torch import nn -import pvnet from pvnet_summation.models.base_model import BaseModel -from pvnet.optimizers import AbstractOptimizer -from pvnet.models.multimodal.linear_networks.networks import DefaultFCNet -from pvnet.models.multimodal.linear_networks.basic_blocks import AbstractLinearNetwork - - class Model(BaseModel): - """Neural network which combines GSP predictions from PVNet - """ + """Neural network which combines GSP predictions from PVNet""" name = "pvnet_summation_model" @@ -31,8 +28,7 @@ def __init__( output_network_kwargs: dict = dict(), scale_pvnet_outputs: bool = False, predict_difference_from_sum: bool = False, - optimizer: AbstractOptimizer = pvnet.optimizers.Adam(), - + optimizer: AbstractOptimizer = pvnet.optimizers.Adam(), ): """Neural network which combines GSP predictions from PVNet @@ -45,75 +41,67 @@ def __init__( forecast. output_network_kwargs: Dictionary of optional kwargs for the `output_network` module. scale_pvnet_outputs: If true, the PVNet predictions are scaled by the capacities. - predict_difference_from_sum: Whether to use the sum of GSPs as an estimate for the + predict_difference_from_sum: Whether to use the sum of GSPs as an estimate for the national sum and train the model to correct this estimate. Otherwise the model tries to learn the national sum from the PVNet outputs directly. optimizer (AbstractOptimizer): Optimizer """ - super().__init__( - model_name, - model_version, - optimizer, - output_quantiles - ) - + super().__init__(model_name, model_version, optimizer, output_quantiles) + self.scale_pvnet_outputs = scale_pvnet_outputs self.predict_difference_from_sum = predict_difference_from_sum - + self.model = output_network( in_features=np.product(self.pvnet_output_shape), out_features=self.num_output_features, **output_network_kwargs, ) - + # Add linear layer if predicting difference from sum # This allows difference to be positive or negative if predict_difference_from_sum: self.model = nn.Sequential( - self.model, - nn.Linear(self.num_output_features, self.num_output_features) + self.model, nn.Linear(self.num_output_features, self.num_output_features) ) self.save_hyperparameters() - def forward(self, x): """Run model forward""" - + if "pvnet_outputs" not in x: - x["pvnet_outputs"] = self.predict_pvnet_batch(x['pvnet_inputs']) - + x["pvnet_outputs"] = self.predict_pvnet_batch(x["pvnet_inputs"]) + if self.scale_pvnet_outputs: if self.pvnet_model.use_quantile_regression: eff_cap = x["effective_capacity"].unsqueeze(-1) else: - eff_cap = x["effective_capacity"] - x_in = x["pvnet_outputs"]*eff_cap + eff_cap = x["effective_capacity"] + x_in = x["pvnet_outputs"] * eff_cap else: x_in = x["pvnet_outputs"] - + x_in = torch.flatten(x_in, start_dim=1) out = self.model(x_in) - + if self.use_quantile_regression: # Shape: batch_size, seq_length * num_quantiles out = out.reshape(out.shape[0], self.forecast_len_30, len(self.output_quantiles)) - + if self.predict_difference_from_sum: # If PVNet model uses quantiles, need to expand to match shape if self.pvnet_model.use_quantile_regression: eff_cap = x["effective_capacity"].unsqueeze(-1) else: eff_cap = x["effective_capacity"] - + gsp_sum = (x["pvnet_outputs"] * eff_cap).sum(dim=1) - - # + + # if self.use_quantile_regression: gsp_sum = gsp_sum.unsqueeze(-1) - + out = F.leaky_relu(gsp_sum + out) - - return out + return out diff --git a/pvnet_summation/training.py b/pvnet_summation/training.py index 5e65549..8388be7 100644 --- a/pvnet_summation/training.py +++ b/pvnet_summation/training.py @@ -15,9 +15,8 @@ from lightning.pytorch.loggers import Logger from lightning.pytorch.loggers.wandb import WandbLogger from omegaconf import DictConfig, OmegaConf -from tqdm import tqdm - from pvnet import utils +from tqdm import tqdm from pvnet_summation.data.datamodule import PVNetPresavedDataModule @@ -67,45 +66,42 @@ def train(config: DictConfig) -> Optional[float]: # Init lightning model log.info(f"Instantiating model <{config.model._target_}>") model: LightningModule = hydra.utils.instantiate(config.model) - + # Presave batches if config.get("presave_pvnet_outputs", False): - - - save_dir = ( f"{config.datamodule.batch_dir}/" f"{config.model.model_name}/" f"{config.model.model_version}" ) - - - + if os.path.isdir(save_dir): log.info( f"PVNet output directory already exists: {save_dir}\n" "Skipping saving new outputs. The existing saved outputs will be loaded." ) - + else: log.info(f"Saving PVNet outputs to {save_dir}") - - os.makedirs(f"{save_dir}/train") + + os.makedirs(f"{save_dir}/train") os.makedirs(f"{save_dir}/val") - - # Set batch size to None so batching is skipped + + # Set batch size to None so batching is skipped datamodule.batch_size = None for dataloader_func, split in [ - (datamodule.train_dataloader, "train"), - (datamodule.val_dataloader, "val") + (datamodule.train_dataloader, "train"), + (datamodule.val_dataloader, "val"), ]: log.info(f"Saving {split} outputs") dataloader = dataloader_func(shuffle=False, add_filename=True) for concurrent_sample_dict in tqdm(dataloader): # Run though model and remove - pvnet_out = model.predict_pvnet_batch([concurrent_sample_dict["pvnet_inputs"]])[0] + pvnet_out = model.predict_pvnet_batch([concurrent_sample_dict["pvnet_inputs"]])[ + 0 + ] del concurrent_sample_dict["pvnet_inputs"] concurrent_sample_dict["pvnet_outputs"] = pvnet_out @@ -114,14 +110,12 @@ def train(config: DictConfig) -> Optional[float]: sample_rel_path = filepath.removeprefix(config.datamodule.batch_dir) sample_path = f"{save_dir}{sample_rel_path}" torch.save(concurrent_sample_dict, sample_path) - - - + datamodule = PVNetPresavedDataModule( batch_dir=save_dir, - batch_size=config.datamodule.batch_size, + batch_size=config.datamodule.batch_size, num_workers=config.datamodule.num_workers, - prefetch_factor=config.datamodule.prefetch_factor + prefetch_factor=config.datamodule.prefetch_factor, ) # Init lightning loggers @@ -163,7 +157,6 @@ def train(config: DictConfig) -> Optional[float]: OmegaConf.save(config.model, f"{callback.dirpath}/model_config.yaml") break - trainer: Trainer = hydra.utils.instantiate( config.trainer, logger=loggers, @@ -174,7 +167,6 @@ def train(config: DictConfig) -> Optional[float]: # Train the model completely trainer.fit(model=model, datamodule=datamodule) - # Make sure everything closed properly log.info("Finalizing!") utils.finish( diff --git a/pvnet_summation/utils.py b/pvnet_summation/utils.py index 5cf45f7..9f76946 100644 --- a/pvnet_summation/utils.py +++ b/pvnet_summation/utils.py @@ -8,7 +8,6 @@ def plot_forecasts(y, y_hat, times, batch_idx=None, quantiles=None, y_sum=None): """Plot a batch of data and the forecast from that batch""" - times_utc = times.cpu().numpy().squeeze().astype("datetime64[s]") times_utc = [pd.to_datetime(t) for t in times_utc] y = y.cpu().numpy() @@ -23,15 +22,15 @@ def plot_forecasts(y, y_hat, times, batch_idx=None, quantiles=None, y_sum=None): if i >= batch_size: ax.axis("off") continue - + ax.plot(times_utc[i], y[i], marker=".", color="k", label=r"$y$") if y_sum is not None: - ax.plot(times_utc[i], y_sum[i], marker=".", linestyle="--", color="k", label=r"$y_{sum}$") - - if quantiles is None: ax.plot( - times_utc[i], y_hat[i], marker=".", color="r", label=r"$\hat{y}$" + times_utc[i], y_sum[i], marker=".", linestyle="--", color="k", label=r"$y_{sum}$" ) + + if quantiles is None: + ax.plot(times_utc[i], y_hat[i], marker=".", color="r", label=r"$\hat{y}$") else: cm = pylab.get_cmap("twilight") for nq, q in enumerate(quantiles): @@ -61,4 +60,4 @@ def plot_forecasts(y, y_hat, times, batch_idx=None, quantiles=None, y_sum=None): plt.suptitle(title) plt.tight_layout() - return fig \ No newline at end of file + return fig diff --git a/requirements.txt b/requirements.txt index d80d384..bf8a5e9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,4 +27,4 @@ tqdm rich omegaconf hydra-core -python-dotenv \ No newline at end of file +python-dotenv diff --git a/run.py b/run.py index 0c2b3c4..dbcceb2 100644 --- a/run.py +++ b/run.py @@ -12,8 +12,8 @@ pass import logging -import sys import os +import sys # Tired of seeing these warnings import warnings @@ -34,9 +34,10 @@ def main(config: DictConfig): """Runs training""" # Imports should be nested inside @hydra.main to optimize tab completion # Read more here: https://github.com/facebookresearch/hydra/issues/934 - from pvnet_summation.training import train from pvnet.utils import extras, print_config + from pvnet_summation.training import train + # A couple of optional utilities: # - disabling python warnings # - easier access to debug mode diff --git a/tests/conftest.py b/tests/conftest.py index 5abc0ab..a3343e7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -17,44 +17,41 @@ from pvnet_summation.data.datamodule import DataModule - @pytest.fixture() def sample_data(): - # Copy small batches to fake 317 GSPs in each with tempfile.TemporaryDirectory() as tmpdirname: os.makedirs(f"{tmpdirname}/train") os.makedirs(f"{tmpdirname}/val") - + # Grab times from batch to make national output zarr times = [] - + file_n = 0 for file in glob.glob("tests/data/sample_batches/train/*.pt"): - batch = torch.load(file) - + this_batch = {} for i in range(batch[BatchKey.gsp_time_utc].shape[0]): - # Duplicate sample to fake 317 GSPs + # Duplicate sample to fake 317 GSPs for key in batch.keys(): if isinstance(batch[key], torch.Tensor): n_dims = len(batch[key].shape) - repeats = (317,) + tuple(1 for dim in range(n_dims-1)) - this_batch[key] = batch[key][i:i+1].repeat(repeats)[:317] + repeats = (317,) + tuple(1 for dim in range(n_dims - 1)) + this_batch[key] = batch[key][i : i + 1].repeat(repeats)[:317] else: this_batch[key] = batch[key] - + # Save fopr both train and val torch.save(this_batch, f"{tmpdirname}/train/{file_n:06}.pt") torch.save(this_batch, f"{tmpdirname}/val/{file_n:06}.pt") - + file_n += 1 times += [batch[BatchKey.gsp_time_utc][i].numpy().astype("datetime64[s]")] - + times = np.unique(np.sort(np.concatenate(times))) - + da_output = xr.DataArray( data=np.random.uniform(size=(len(times), 1)), dims=["datetime_gmt", "gsp_id"], @@ -63,7 +60,7 @@ def sample_data(): gsp_id=[0], ), ) - + da_cap = xr.DataArray( data=np.ones((len(times), 1)), dims=["datetime_gmt", "gsp_id"], @@ -72,7 +69,7 @@ def sample_data(): gsp_id=[0], ), ) - + ds = xr.Dataset( data_vars=dict( generation_mw=da_output, @@ -80,9 +77,9 @@ def sample_data(): capacity_mwp=da_cap, ), ) - + ds.to_zarr(f"{tmpdirname}/gsp.zarr") - + yield tmpdirname, f"{tmpdirname}/gsp.zarr" @@ -97,7 +94,7 @@ def sample_datamodule(sample_data): num_workers=0, prefetch_factor=2, ) - + return dm @@ -110,8 +107,8 @@ def sample_batch(sample_datamodule): @pytest.fixture() def model_kwargs(): kwargs = dict( - model_name= "openclimatefix/pvnet_v2", - model_version= "898630f3f8cd4e8506525d813dd61c6d8de86144", + model_name="openclimatefix/pvnet_v2", + model_version="898630f3f8cd4e8506525d813dd61c6d8de86144", ) return kwargs @@ -125,4 +122,4 @@ def model(model_kwargs): @pytest.fixture() def quantile_model(model_kwargs): model = Model(output_quantiles=[0.1, 0.5, 0.9], **model_kwargs) - return model \ No newline at end of file + return model diff --git a/tests/data/test_datamodule.py b/tests/data/test_datamodule.py index b23c3bc..9aa2c27 100644 --- a/tests/data/test_datamodule.py +++ b/tests/data/test_datamodule.py @@ -13,6 +13,7 @@ def test_init(sample_data): prefetch_factor=2, ) + def test_iter(sample_data): batch_dir, gsp_zarr_dir = sample_data @@ -23,20 +24,21 @@ def test_iter(sample_data): num_workers=0, prefetch_factor=2, ) - + batch = next(iter(dm.train_dataloader())) - + # batch size is 2 - assert len(batch['pvnet_inputs'])==2 - + assert len(batch["pvnet_inputs"]) == 2 + # 317 GSPs in each sample # 21 timestamps for each GSP from -120 mins to +480 mins - assert batch['pvnet_inputs'][0][BatchKey.gsp_time_utc].shape==(317,21) - - assert batch['times'].shape==(2, 16) - - assert batch['national_targets'].shape==(2, 16) - + assert batch["pvnet_inputs"][0][BatchKey.gsp_time_utc].shape == (317, 21) + + assert batch["times"].shape == (2, 16) + + assert batch["national_targets"].shape == (2, 16) + + def test_iter_multiprocessing(sample_data): batch_dir, gsp_zarr_dir = sample_data @@ -47,15 +49,15 @@ def test_iter_multiprocessing(sample_data): num_workers=2, prefetch_factor=2, ) - + for batch in dm.train_dataloader(): # batch size is 2 - assert len(batch['pvnet_inputs'])==2 + assert len(batch["pvnet_inputs"]) == 2 # 317 GSPs in each sample # 21 timestamps for each GSP from -120 mins to +480 mins - assert batch['pvnet_inputs'][0][BatchKey.gsp_time_utc].shape==(317,21) + assert batch["pvnet_inputs"][0][BatchKey.gsp_time_utc].shape == (317, 21) - assert batch['times'].shape==(2, 16) + assert batch["times"].shape == (2, 16) - assert batch['national_targets'].shape==(2, 16) \ No newline at end of file + assert batch["national_targets"].shape == (2, 16) diff --git a/tests/test_end2end.py b/tests/test_end2end.py index 856f883..b054d01 100644 --- a/tests/test_end2end.py +++ b/tests/test_end2end.py @@ -3,4 +3,4 @@ def test_model_trainer_fit(model, sample_datamodule): trainer = lightning.pytorch.trainer.trainer.Trainer(fast_dev_run=True) - trainer.fit(model=model, datamodule=sample_datamodule) \ No newline at end of file + trainer.fit(model=model, datamodule=sample_datamodule) From a21404fadae333e438a013ce7009165871ae8d2a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 24 Jul 2023 10:36:39 +0000 Subject: [PATCH 5/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pvnet_summation/models/base_model.py | 2 -- pvnet_summation/models/model.py | 2 +- pvnet_summation/utils.py | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pvnet_summation/models/base_model.py b/pvnet_summation/models/base_model.py index 93ff2e2..6db2eb3 100644 --- a/pvnet_summation/models/base_model.py +++ b/pvnet_summation/models/base_model.py @@ -16,7 +16,6 @@ from pvnet_summation.utils import plot_forecasts - logger = logging.getLogger(__name__) activities = [torch.profiler.ProfilerActivity.CPU] @@ -52,7 +51,6 @@ def __init__( ) self.pvnet_model.requires_grad_(False) - self._optimizer = optimizer # Model must have lr to allow tuning diff --git a/pvnet_summation/models/model.py b/pvnet_summation/models/model.py index 47132f1..30e9486 100644 --- a/pvnet_summation/models/model.py +++ b/pvnet_summation/models/model.py @@ -104,4 +104,4 @@ def forward(self, x): out = F.leaky_relu(gsp_sum + out) - return out \ No newline at end of file + return out diff --git a/pvnet_summation/utils.py b/pvnet_summation/utils.py index 91214de..9f7d179 100644 --- a/pvnet_summation/utils.py +++ b/pvnet_summation/utils.py @@ -24,7 +24,7 @@ def plot_forecasts(y, y_hat, times, batch_idx=None, quantiles=None, y_sum=None): continue ax.plot(times_utc[i], y[i], marker=".", color="k", label=r"$y$") - + if y_sum is not None: ax.plot( times_utc[i], y_sum[i], marker=".", linestyle="--", color="k", label=r"$y_{sum}$" From 884595559befd6b42ba45a23adcaf65a61d6245c Mon Sep 17 00:00:00 2001 From: James Fulton Date: Mon, 24 Jul 2023 10:52:31 +0000 Subject: [PATCH 6/7] linting --- pvnet_summation/data/datamodule.py | 5 +++-- pvnet_summation/models/base_model.py | 3 +++ pvnet_summation/models/model.py | 10 ++++++++-- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/pvnet_summation/data/datamodule.py b/pvnet_summation/data/datamodule.py index 3fc1b9d..0f5f0e5 100644 --- a/pvnet_summation/data/datamodule.py +++ b/pvnet_summation/data/datamodule.py @@ -60,7 +60,7 @@ def __init__(self, source_datapipe): """Convert list of dicts to dict of lists Args: - source_datapipe: + source_datapipe: Datapipe yielding lists of dicts """ self.source_datapipe = source_datapipe @@ -105,7 +105,7 @@ def __init__(self, **datapipes): def __iter__(self): for outputs in self.source_datapipes: - yield {key: value for key, value in zip(self.keys, outputs)} + yield {key: value for key, value in zip(self.keys, outputs)} # noqa: B905 def get_capacity(batch): @@ -114,6 +114,7 @@ def get_capacity(batch): def divide(args): + """Divide first argument by second""" return args[0] / args[1] diff --git a/pvnet_summation/models/base_model.py b/pvnet_summation/models/base_model.py index 93ff2e2..758dc65 100644 --- a/pvnet_summation/models/base_model.py +++ b/pvnet_summation/models/base_model.py @@ -74,6 +74,7 @@ def __init__( self._accumulated_times = PredAccumulator() def predict_pvnet_batch(self, batch): + """Use PVNet model to create predictions for batch""" gsp_batches = [] for sample in batch: preds = self.pvnet_model(sample) @@ -81,6 +82,7 @@ def predict_pvnet_batch(self, batch): return torch.stack(gsp_batches) def sum_of_gsps(self, x): + """Compute the sume of the GSP-level predictions""" if self.pvnet_model.use_quantile_regression: y_hat = self.pvnet_model._quantiles_to_prediction(x["pvnet_outputs"]) else: @@ -90,6 +92,7 @@ def sum_of_gsps(self, x): @property def pvnet_output_shape(self): + """Return the expected shape of the PVNet outputs""" if self.pvnet_model.use_quantile_regression: return (317, self.pvnet_model.forecast_len_30, len(self.pvnet_model.output_quantiles)) else: diff --git a/pvnet_summation/models/model.py b/pvnet_summation/models/model.py index 47132f1..0c9c953 100644 --- a/pvnet_summation/models/model.py +++ b/pvnet_summation/models/model.py @@ -14,6 +14,9 @@ from pvnet_summation.models.base_model import BaseModel +_default_optimizer = pvnet.optimizers.Adam() + + class Model(BaseModel): """Neural network which combines GSP predictions from PVNet""" @@ -25,10 +28,10 @@ def __init__( model_version: Optional[str], output_quantiles: Optional[list[float]] = None, output_network: AbstractLinearNetwork = DefaultFCNet, - output_network_kwargs: dict = dict(), + output_network_kwargs: Optional[dict] = None, scale_pvnet_outputs: bool = False, predict_difference_from_sum: bool = False, - optimizer: AbstractOptimizer = pvnet.optimizers.Adam(), + optimizer: AbstractOptimizer = _default_optimizer, ): """Neural network which combines GSP predictions from PVNet @@ -51,6 +54,9 @@ def __init__( self.scale_pvnet_outputs = scale_pvnet_outputs self.predict_difference_from_sum = predict_difference_from_sum + + if output_network_kwargs is None: + output_network_kwargs = dict() self.model = output_network( in_features=np.product(self.pvnet_output_shape), From 86ae9169c550a07990103c4b9087fbc3932dd193 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 24 Jul 2023 10:53:27 +0000 Subject: [PATCH 7/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pvnet_summation/data/datamodule.py | 2 +- pvnet_summation/models/model.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pvnet_summation/data/datamodule.py b/pvnet_summation/data/datamodule.py index 0f5f0e5..c8a92a3 100644 --- a/pvnet_summation/data/datamodule.py +++ b/pvnet_summation/data/datamodule.py @@ -105,7 +105,7 @@ def __init__(self, **datapipes): def __iter__(self): for outputs in self.source_datapipes: - yield {key: value for key, value in zip(self.keys, outputs)} # noqa: B905 + yield {key: value for key, value in zip(self.keys, outputs)} # noqa: B905 def get_capacity(batch): diff --git a/pvnet_summation/models/model.py b/pvnet_summation/models/model.py index 5256be8..42b426f 100644 --- a/pvnet_summation/models/model.py +++ b/pvnet_summation/models/model.py @@ -13,7 +13,6 @@ from pvnet_summation.models.base_model import BaseModel - _default_optimizer = pvnet.optimizers.Adam() @@ -54,7 +53,7 @@ def __init__( self.scale_pvnet_outputs = scale_pvnet_outputs self.predict_difference_from_sum = predict_difference_from_sum - + if output_network_kwargs is None: output_network_kwargs = dict()