Skip to content

Commit

Permalink
add TimesSeries.from_group_dataframe parallel mode (#2292)
Browse files Browse the repository at this point in the history
* add TimesSeries.from_group_dataframe parallel mode

* remove code mess

* add doc string for new parameters

* update CHANGELOG.md

* add miss dtype

* fix static covariates

* make parallel function as local and fix tests

* fix parallel utils imports

* update changelog

* Update CHANGELOG.md

---------

Co-authored-by: Bohdan Bilonoh <[email protected]>
Co-authored-by: dennisbader <[email protected]>
  • Loading branch information
3 people authored Apr 9, 2024
1 parent cdff09a commit e50854b
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 27 deletions.
9 changes: 5 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,17 @@ but cannot always guarantee backwards compatibility. Changes that may **break co
- `TimeSeries`: Residual `TimeSeries` for a single `series` and `historical_forecasts` generated with `last_points_only=True`.
- `List[TimeSeries]` A list of residual `TimeSeries` for a sequence (list) of `series` with `last_points_only=True`. The residual list has length `len(series)`.
- `List[List[TimeSeries]]` A list of lists of residual `TimeSeries` for a sequence of `series` with `last_points_only=False`. The outer residual list has length `len(series)`. The inner lists consist of the residuals from all possible series-specific historical forecasts.
- Improvements to `TimeSeries`: [#2284](https://github.com/unit8co/darts/pull/2284) by [Dennis Bader](https://github.com/dennisbader).
- Performance boost for methods: `slice_intersect()`, `has_same_time_as()`
- New method `slice_intersect_values()`, which returns the sliced values of a series, where the time index has been intersected with another series.
- Improvements to `TimeSeries`:
- `from_group_dataframe()` now supports parallelized creation from a grouped `pandas.DataFrame`. This can be enabled with parameter `n_jobs`. [#2292](https://github.com/unit8co/darts/pull/2292) by [Bohdan Bilonoha](https://github.com/BohdanBilonoh).
- Performance boost for methods: `slice_intersect()`, `has_same_time_as()`. [#2284](https://github.com/unit8co/darts/pull/2284) by [Dennis Bader](https://github.com/dennisbader).
- New method `slice_intersect_values()`, which returns the sliced values of a series, where the time index has been intersected with another series. [#2284](https://github.com/unit8co/darts/pull/2284) by [Dennis Bader](https://github.com/dennisbader).
- 🔴 Moved utils functions to clearly separate Darts-specific from non-Darts-specific logic: [#2284](https://github.com/unit8co/darts/pull/2284) by [Dennis Bader](https://github.com/dennisbader).
- Moved function `generate_index()` from `darts.utils.timeseries_generation` to `darts.utils.utils`
- Moved functions `retain_period_common_to_all()`, `series2seq()`, `seq2series()`, `get_single_series()` from `darts.utils.utils` to `darts.utils.ts_utils`.
- Improvements to `ForecastingModel`: [#2269](https://github.com/unit8co/darts/pull/2269) by [Felix Divo](https://github.com/felixdivo).
- Renamed the private `_is_probabilistic` property to a public `supports_probabilistic_prediction`.
- Improvements to `DataTransformer`: [#2267](https://github.com/unit8co/darts/pull/2267) by [Alicja Krzeminska-Sciga](https://github.com/alicjakrzeminska).
- `InvertibleDataTransformer` now supports parallelized inverse transformation for `series` being a list of lists of `TimeSeries` (`Sequence[Sequence[TimeSeries]]`). This `series` type represents for example the output from `historical_forecasts()` when using multiple series.
- `InvertibleDataTransformer` now supports parallelized inverse transformation for `series` being a list of lists of `TimeSeries` (`Sequence[Sequence[TimeSeries]]`). This `series` type represents for example the output from `historical_forecasts()` when using multiple series.

**Fixed**
- Fixed a bug in `quantile_loss`, where the loss was computed on all samples rather than only on the predicted quantiles. [#2284](https://github.com/unit8co/darts/pull/2284) by [Dennis Bader](https://github.com/dennisbader).
Expand Down
10 changes: 10 additions & 0 deletions darts/tests/test_timeseries_static_covariates.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,16 @@ def test_timeseries_from_longitudinal_df(self):
for ts in ts_groups7:
assert ts.static_covariates is None

ts_groups7_parallel = TimeSeries.from_group_dataframe(
df=self.df_long_multi,
group_cols=["st1", "st2"],
time_col="times",
value_cols=value_cols,
drop_group_cols=["st1", "st2"],
n_jobs=-1,
)
assert ts_groups7_parallel == ts_groups7

def test_from_group_dataframe_invalid_drop_cols(self):
# drop col is not part of `group_cols`
with pytest.raises(ValueError) as err:
Expand Down
57 changes: 34 additions & 23 deletions darts/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from darts.utils.utils import generate_index, n_steps_between

from .logging import get_logger, raise_if, raise_if_not, raise_log
from .utils import _build_tqdm_iterator, _parallel_apply

try:
from typing import Literal
Expand Down Expand Up @@ -759,6 +760,8 @@ def from_group_dataframe(
freq: Optional[Union[str, int]] = None,
fillna_value: Optional[float] = None,
drop_group_cols: Optional[Union[List[str], str]] = None,
n_jobs: Optional[int] = 1,
verbose: Optional[bool] = False,
) -> List[Self]:
"""
Build a list of TimeSeries instances grouped by a selection of columns from a DataFrame.
Expand Down Expand Up @@ -808,6 +811,11 @@ def from_group_dataframe(
Optionally, a numeric value to fill missing values (NaNs) with.
drop_group_cols
Optionally, a string or list of strings with `group_cols` column(s) to exclude from the static covariates.
n_jobs
Optionally, an integer representing the number of parallel jobs to run. Behavior is the same as in the
`joblib.Parallel` class.
verbose
Optionally, a boolean value indicating whether to display a progress bar.
Returns
-------
Expand Down Expand Up @@ -867,12 +875,18 @@ def from_group_dataframe(
df = df.drop(columns=time_col)
df = df.sort_index()

# split df by groups, and store group values and static values (static covariates)
# single elements group columns must be unpacked for same groupby() behavior across different pandas versions
splits = []
for static_cov_vals, group in df.groupby(
group_cols[0] if len(group_cols) == 1 else group_cols
):
groups = df.groupby(group_cols[0] if len(group_cols) == 1 else group_cols)

iterator = _build_tqdm_iterator(
groups,
verbose=verbose,
total=len(groups),
desc="Creating TimeSeries",
)

def from_group(static_cov_vals, group):
split = group[extract_value_cols]

static_cov_vals = (
(static_cov_vals,)
if not isinstance(static_cov_vals, tuple)
Expand Down Expand Up @@ -910,29 +924,26 @@ def from_group_dataframe(
)
# add the static covariates to the group values
static_cov_vals += tuple(group[static_cols].values[0])
# store static covariate Series and group DataFrame (without static cov columns)
splits.append(
(
(
pd.DataFrame([static_cov_vals], columns=extract_static_cov_cols)
if extract_static_cov_cols
else None
),
group[extract_value_cols],
)
)

# create a list with multiple TimeSeries and add static covariates
return [
cls.from_dataframe(
return cls.from_dataframe(
df=split,
fill_missing_dates=fill_missing_dates,
freq=freq,
fillna_value=fillna_value,
static_covariates=static_covs,
static_covariates=(
pd.DataFrame([static_cov_vals], columns=extract_static_cov_cols)
if extract_static_cov_cols
else None
),
)
for static_covs, split in splits
]

return _parallel_apply(
iterator,
from_group,
n_jobs,
fn_args=dict(),
fn_kwargs=dict(),
)

@classmethod
def from_series(
Expand Down

0 comments on commit e50854b

Please sign in to comment.