Skip to content

Commit

Permalink
Upgrade UK to data sampler (#276)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfulu authored Dec 17, 2024
1 parent d13312e commit 82b6009
Show file tree
Hide file tree
Showing 33 changed files with 555 additions and 559 deletions.
147 changes: 103 additions & 44 deletions pvnet/data/datamodule.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,116 @@
""" Data module for pytorch lightning """

import resource
from glob import glob

import torch
from ocf_datapipes.batch import batch_to_tensor, stack_np_examples_into_batch
from ocf_datapipes.training.pvnet import pvnet_datapipe
from torch.utils.data.datapipes.iter import FileLister
from lightning.pytorch import LightningDataModule
from ocf_data_sampler.torch_datasets.pvnet_uk_regional import PVNetUKRegionalDataset
from ocf_datapipes.batch import (
NumpyBatch,
TensorBatch,
batch_to_tensor,
stack_np_examples_into_batch,
)
from torch.utils.data import DataLoader, Dataset


class NumpybatchPremadeSamplesDataset(Dataset):
"""Dataset to load NumpyBatch samples"""

def __init__(self, sample_dir):
"""Dataset to load NumpyBatch samples
Args:
sample_dir: Path to the directory of pre-saved samples.
"""
self.sample_paths = glob(f"{sample_dir}/*.pt")

from pvnet.data.base import BaseDataModule
def __len__(self):
return len(self.sample_paths)

rlimit = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(resource.RLIMIT_NOFILE, (2048, rlimit[1]))
def __getitem__(self, idx):
return torch.load(self.sample_paths[idx])


class DataModule(BaseDataModule):
def collate_fn(samples: list[NumpyBatch]) -> TensorBatch:
"""Convert a list of NumpyBatch samples to a tensor batch"""
return batch_to_tensor(stack_np_examples_into_batch(samples))


class DataModule(LightningDataModule):
"""Datamodule for training pvnet and using pvnet pipeline in `ocf_datapipes`."""

def _get_datapipe(self, start_time, end_time):
data_pipeline = pvnet_datapipe(
self.configuration,
start_time=start_time,
end_time=end_time,
)
def __init__(
self,
configuration: str | None = None,
sample_dir: str | None = None,
batch_size: int = 16,
num_workers: int = 0,
prefetch_factor: int | None = None,
train_period: list[str | None] = [None, None],
val_period: list[str | None] = [None, None],
):
"""Datamodule for training pvnet architecture.
Can also be used with pre-made batches if `sample_dir` is set.
Args:
configuration: Path to datapipe configuration file.
sample_dir: Path to the directory of pre-saved samples. Cannot be used together with
`configuration` or '[train/val]_period'.
batch_size: Batch size.
num_workers: Number of workers to use in multiprocess batch loading.
prefetch_factor: Number of data will be prefetched at the end of each worker process.
train_period: Date range filter for train dataloader.
val_period: Date range filter for val dataloader.
data_pipeline = (
data_pipeline.batch(self.batch_size)
.map(stack_np_examples_into_batch)
.map(batch_to_tensor)
"""
super().__init__()

if not ((sample_dir is not None) ^ (configuration is not None)):
raise ValueError("Exactly one of `sample_dir` or `configuration` must be set.")

if sample_dir is not None:
if any([period != [None, None] for period in [train_period, val_period]]):
raise ValueError("Cannot set `(train/val)_period` with presaved samples")

self.configuration = configuration
self.sample_dir = sample_dir
self.train_period = train_period
self.val_period = val_period

self._common_dataloader_kwargs = dict(
batch_size=batch_size,
sampler=None,
batch_sampler=None,
num_workers=num_workers,
collate_fn=collate_fn,
pin_memory=False,
drop_last=False,
timeout=0,
worker_init_fn=None,
prefetch_factor=prefetch_factor,
persistent_workers=False,
)
return data_pipeline

def _get_premade_batches_datapipe(self, subdir, shuffle=False):
data_pipeline = FileLister(f"{self.batch_dir}/{subdir}", masks="*.pt", recursive=False)
if shuffle:
data_pipeline = (
data_pipeline.shuffle(buffer_size=10_000)
.sharding_filter()
.map(torch.load)
# Split the batches and reshuffle them to be combined into new batches
.split_batches()
.shuffle(buffer_size=self.shuffle_factor * self.batch_size)
)

def _get_streamed_samples_dataset(self, start_time, end_time) -> Dataset:
return PVNetUKRegionalDataset(self.configuration, start_time=start_time, end_time=end_time)

def _get_premade_samples_dataset(self, subdir) -> Dataset:
split_dir = f"{self.sample_dir}/{subdir}"
return NumpybatchPremadeSamplesDataset(split_dir)

def train_dataloader(self) -> DataLoader:
"""Construct train dataloader"""
if self.sample_dir is not None:
dataset = self._get_premade_samples_dataset("train")
else:
data_pipeline = (
data_pipeline.sharding_filter().map(torch.load)
# Split the batches so we can use any batch-size
.split_batches()
)

data_pipeline = (
data_pipeline.batch(self.batch_size)
.map(stack_np_examples_into_batch)
.map(batch_to_tensor)
)
dataset = self._get_streamed_samples_dataset(*self.train_period)
return DataLoader(dataset, shuffle=True, **self._common_dataloader_kwargs)

return data_pipeline
def val_dataloader(self) -> DataLoader:
"""Construct val dataloader"""
if self.sample_dir is not None:
dataset = self._get_premade_samples_dataset("val")
else:
dataset = self._get_streamed_samples_dataset(*self.val_period)
return DataLoader(dataset, shuffle=False, **self._common_dataloader_kwargs)
77 changes: 17 additions & 60 deletions pvnet/models/base_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@
from huggingface_hub.constants import CONFIG_NAME, PYTORCH_WEIGHTS_NAME
from huggingface_hub.file_download import hf_hub_download
from huggingface_hub.hf_api import HfApi
from ocf_datapipes.batch import BatchKey
from ocf_ml_metrics.evaluation.evaluation import evaluation
from ocf_datapipes.batch import BatchKey, copy_batch_to_device

from pvnet.models.utils import (
BatchAccumulator,
MetricAccumulator,
PredAccumulator,
WeightedLosses,
)
from pvnet.optimizers import AbstractOptimizer
from pvnet.utils import construct_ocf_ml_metrics_batch_df, plot_batch_forecasts
from pvnet.utils import plot_batch_forecasts

DATA_CONFIG_NAME = "data_config.yaml"

Expand Down Expand Up @@ -236,6 +234,11 @@ def get_data_config(

return data_config_file

def _save_pretrained(self, save_directory: Path) -> None:
"""Save weights from a Pytorch model to a local directory."""
model_to_save = self.module if hasattr(self, "module") else self # type: ignore
torch.save(model_to_save.state_dict(), save_directory / PYTORCH_WEIGHTS_NAME)

def save_pretrained(
self,
save_directory: Union[str, Path],
Expand Down Expand Up @@ -348,7 +351,6 @@ def __init__(
target_key: str = "gsp",
interval_minutes: int = 30,
timestep_intervals_to_plot: Optional[list[int]] = None,
use_weighted_loss: bool = False,
forecast_minutes_ignore: Optional[int] = 0,
):
"""Abtstract base class for PVNet submodels.
Expand All @@ -362,7 +364,6 @@ def __init__(
target_key: The key of the target variable in the batch
interval_minutes: The interval in minutes between each timestep in the data
timestep_intervals_to_plot: Intervals, in timesteps, to plot during training
use_weighted_loss: Whether to use a weighted loss function
forecast_minutes_ignore: Number of forecast minutes to ignore when calculating losses.
For example if set to 60, the model doesnt predict the first 60 minutes
"""
Expand Down Expand Up @@ -394,16 +395,13 @@ def __init__(
self.forecast_len = (forecast_minutes - forecast_minutes_ignore) // interval_minutes
self.forecast_len_ignore = forecast_minutes_ignore // interval_minutes

self.weighted_losses = WeightedLosses(forecast_length=self.forecast_len)

self._accumulated_metrics = MetricAccumulator()
self._accumulated_batches = BatchAccumulator(key_to_keep=self._target_key_name)
self._accumulated_y_hat = PredAccumulator()
self._horizon_maes = MetricAccumulator()

# Store whether the model should use quantile regression or simply predict the mean
self.use_quantile_regression = self.output_quantiles is not None
self.use_weighted_loss = use_weighted_loss

# Store the number of ouput features that the model should predict for
if self.use_quantile_regression:
Expand All @@ -414,6 +412,10 @@ def __init__(
# save all validation results to array, so we can save these to weights n biases
self.validation_epoch_results = []

def transfer_batch_to_device(self, batch, device, dataloader_idx):
"""Method to move custom batches to a given device"""
return copy_batch_to_device(batch, device)

def _quantiles_to_prediction(self, y_quantiles):
"""
Convert network prediction into a point prediction.
Expand Down Expand Up @@ -455,13 +457,11 @@ def _calculate_quantile_loss(self, y_quantiles, y):
errors = y - y_quantiles[..., i]
losses.append(torch.max((q - 1) * errors, q * errors).unsqueeze(-1))
losses = 2 * torch.cat(losses, dim=2)
if self.use_weighted_loss:
weights = self.weighted_losses.weights.unsqueeze(1).unsqueeze(0).to(y.device)
losses = losses * weights

return losses.mean()

def _calculate_common_losses(self, y, y_hat):
"""Calculate losses common to train, test, and val"""
"""Calculate losses common to train, and val"""

losses = {}

Expand All @@ -473,19 +473,13 @@ def _calculate_common_losses(self, y, y_hat):
mse_loss = F.mse_loss(y_hat, y)
mae_loss = F.l1_loss(y_hat, y)

# calculate mse, mae with exp weighted loss
mse_exp = self.weighted_losses.get_mse_exp(output=y_hat, target=y)
mae_exp = self.weighted_losses.get_mae_exp(output=y_hat, target=y)

# TODO: Compute correlation coef using np.corrcoef(tensor with
# shape (2, num_timesteps))[0, 1] on each example, and taking
# the mean across the batch?
losses.update(
{
"MSE": mse_loss,
"MAE": mae_loss,
"MSE_EXP": mse_exp,
"MAE_EXP": mae_exp,
}
)

Expand Down Expand Up @@ -531,12 +525,6 @@ def _calculate_val_losses(self, y, y_hat):
losses.update(self._step_mae_and_mse(y, y_persist, dict_key_root="persistence"))
return losses

def _calculate_test_losses(self, y, y_hat):
"""Calculate additional test losses"""
# No additional test losses
losses = {}
return losses

def _training_accumulate_log(self, batch, batch_idx, losses, y_hat):
"""Internal function to accumulate training batches and log results.
Expand Down Expand Up @@ -582,7 +570,7 @@ def _training_accumulate_log(self, batch, batch_idx, losses, y_hat):
def training_step(self, batch, batch_idx):
"""Run training step"""
y_hat = self(batch)
y = batch[self._target_key][:, -self.forecast_len :, 0]
y = batch[self._target_key][:, -self.forecast_len :]

losses = self._calculate_common_losses(y, y_hat)
losses = {f"{k}/train": v for k, v in losses.items()}
Expand Down Expand Up @@ -617,7 +605,7 @@ def _log_validation_results(self, batch, y_hat, accum_batch_num):
"""Append validation results to self.validation_epoch_results"""

# get truth values, shape (b, forecast_len)
y = batch[self._target_key][:, -self.forecast_len :, 0]
y = batch[self._target_key][:, -self.forecast_len :]
y = y.detach().cpu().numpy()
batch_size = y.shape[0]

Expand Down Expand Up @@ -663,8 +651,8 @@ def validation_step(self, batch: dict, batch_idx):
accum_batch_num = batch_idx // self.trainer.accumulate_grad_batches

y_hat = self(batch)
# Sensor seems to be in batch, station, time order
y = batch[self._target_key][:, -self.forecast_len :, 0]

y = batch[self._target_key][:, -self.forecast_len :]

if (batch_idx + 1) % self.trainer.accumulate_grad_batches == 0:
self._log_validation_results(batch, y_hat, accum_batch_num)
Expand Down Expand Up @@ -763,37 +751,6 @@ def on_validation_epoch_end(self):
print("Failed to log horizon_loss_curve to wandb")
print(e)

def test_step(self, batch, batch_idx):
"""Run test step"""
y_hat = self(batch)
y = batch[self._target_key][:, -self.forecast_len :, 0]

losses = self._calculate_common_losses(y, y_hat)
losses.update(self._calculate_val_losses(y, y_hat))
losses.update(self._calculate_test_losses(y, y_hat))
logged_losses = {f"{k}/test": v for k, v in losses.items()}

self.log_dict(
logged_losses,
on_step=False,
on_epoch=True,
)

if self.use_quantile_regression:
y_hat = self._quantiles_to_prediction(y_hat)

return construct_ocf_ml_metrics_batch_df(batch, y, y_hat)

def on_test_epoch_end(self, outputs):
"""Evalauate test results using oc_ml_metrics"""
results_df = pd.concat(outputs)
# setting model_name="test" gives us keys like "test/mw/forecast_horizon_30_minutes/mae"
metrics = evaluation(results_df=results_df, model_name="test", outturn_unit="mw")

self.log_dict(
metrics,
)

def configure_optimizers(self):
"""Configure the optimizers using learning rate found with LR finder if used"""
if self.lr is not None:
Expand Down
2 changes: 1 addition & 1 deletion pvnet/models/baseline/last_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def forward(self, x: dict):

# take the last value non forecaster value and the first in the pv yeild
# (this is the pv site we are preditcting for)
y_hat = gsp_yield[:, -self.forecast_len - 1, 0]
y_hat = gsp_yield[:, -self.forecast_len - 1]

# expand the last valid forward n predict steps
out = y_hat.unsqueeze(1).repeat(1, self.forecast_len)
Expand Down
2 changes: 1 addition & 1 deletion pvnet/models/baseline/single_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ def __init__(
def forward(self, x: dict):
"""Run model forward on dict batch of data"""
# Returns a single value at all steps
y_hat = torch.zeros_like(x[BatchKey.gsp][:, : self.forecast_len, 0]) + self._value
y_hat = torch.zeros_like(x[BatchKey.gsp][:, : self.forecast_len]) + self._value
return y_hat
Loading

0 comments on commit 82b6009

Please sign in to comment.