From e50854bdc203200cb6920f76869aaf84a73c49a4 Mon Sep 17 00:00:00 2001 From: Bohdan Bilonoh Date: Tue, 9 Apr 2024 12:01:04 +0300 Subject: [PATCH] add TimesSeries.from_group_dataframe parallel mode (#2292) * add TimesSeries.from_group_dataframe parallel mode * remove code mess * add doc string for new parameters * update CHANGELOG.md * add miss dtype * fix static covariates * make parallel function as local and fix tests * fix parallel utils imports * update changelog * Update CHANGELOG.md --------- Co-authored-by: Bohdan Bilonoh Co-authored-by: dennisbader --- CHANGELOG.md | 9 +-- .../test_timeseries_static_covariates.py | 10 ++++ darts/timeseries.py | 57 +++++++++++-------- 3 files changed, 49 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 258086b94e..dd0f9f2b58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -78,16 +78,17 @@ but cannot always guarantee backwards compatibility. Changes that may **break co - `TimeSeries`: Residual `TimeSeries` for a single `series` and `historical_forecasts` generated with `last_points_only=True`. - `List[TimeSeries]` A list of residual `TimeSeries` for a sequence (list) of `series` with `last_points_only=True`. The residual list has length `len(series)`. - `List[List[TimeSeries]]` A list of lists of residual `TimeSeries` for a sequence of `series` with `last_points_only=False`. The outer residual list has length `len(series)`. The inner lists consist of the residuals from all possible series-specific historical forecasts. -- Improvements to `TimeSeries`: [#2284](https://github.com/unit8co/darts/pull/2284) by [Dennis Bader](https://github.com/dennisbader). - - Performance boost for methods: `slice_intersect()`, `has_same_time_as()` - - New method `slice_intersect_values()`, which returns the sliced values of a series, where the time index has been intersected with another series. +- Improvements to `TimeSeries`: + - `from_group_dataframe()` now supports parallelized creation from a grouped `pandas.DataFrame`. This can be enabled with parameter `n_jobs`. [#2292](https://github.com/unit8co/darts/pull/2292) by [Bohdan Bilonoha](https://github.com/BohdanBilonoh). + - Performance boost for methods: `slice_intersect()`, `has_same_time_as()`. [#2284](https://github.com/unit8co/darts/pull/2284) by [Dennis Bader](https://github.com/dennisbader). + - New method `slice_intersect_values()`, which returns the sliced values of a series, where the time index has been intersected with another series. [#2284](https://github.com/unit8co/darts/pull/2284) by [Dennis Bader](https://github.com/dennisbader). - 🔴 Moved utils functions to clearly separate Darts-specific from non-Darts-specific logic: [#2284](https://github.com/unit8co/darts/pull/2284) by [Dennis Bader](https://github.com/dennisbader). - Moved function `generate_index()` from `darts.utils.timeseries_generation` to `darts.utils.utils` - Moved functions `retain_period_common_to_all()`, `series2seq()`, `seq2series()`, `get_single_series()` from `darts.utils.utils` to `darts.utils.ts_utils`. - Improvements to `ForecastingModel`: [#2269](https://github.com/unit8co/darts/pull/2269) by [Felix Divo](https://github.com/felixdivo). - Renamed the private `_is_probabilistic` property to a public `supports_probabilistic_prediction`. - Improvements to `DataTransformer`: [#2267](https://github.com/unit8co/darts/pull/2267) by [Alicja Krzeminska-Sciga](https://github.com/alicjakrzeminska). - - `InvertibleDataTransformer` now supports parallelized inverse transformation for `series` being a list of lists of `TimeSeries` (`Sequence[Sequence[TimeSeries]]`). This `series` type represents for example the output from `historical_forecasts()` when using multiple series. + - `InvertibleDataTransformer` now supports parallelized inverse transformation for `series` being a list of lists of `TimeSeries` (`Sequence[Sequence[TimeSeries]]`). This `series` type represents for example the output from `historical_forecasts()` when using multiple series. **Fixed** - Fixed a bug in `quantile_loss`, where the loss was computed on all samples rather than only on the predicted quantiles. [#2284](https://github.com/unit8co/darts/pull/2284) by [Dennis Bader](https://github.com/dennisbader). diff --git a/darts/tests/test_timeseries_static_covariates.py b/darts/tests/test_timeseries_static_covariates.py index 463c751305..2f923120d9 100644 --- a/darts/tests/test_timeseries_static_covariates.py +++ b/darts/tests/test_timeseries_static_covariates.py @@ -215,6 +215,16 @@ def test_timeseries_from_longitudinal_df(self): for ts in ts_groups7: assert ts.static_covariates is None + ts_groups7_parallel = TimeSeries.from_group_dataframe( + df=self.df_long_multi, + group_cols=["st1", "st2"], + time_col="times", + value_cols=value_cols, + drop_group_cols=["st1", "st2"], + n_jobs=-1, + ) + assert ts_groups7_parallel == ts_groups7 + def test_from_group_dataframe_invalid_drop_cols(self): # drop col is not part of `group_cols` with pytest.raises(ValueError) as err: diff --git a/darts/timeseries.py b/darts/timeseries.py index 124cc6ffe7..640ff3b7b0 100644 --- a/darts/timeseries.py +++ b/darts/timeseries.py @@ -53,6 +53,7 @@ from darts.utils.utils import generate_index, n_steps_between from .logging import get_logger, raise_if, raise_if_not, raise_log +from .utils import _build_tqdm_iterator, _parallel_apply try: from typing import Literal @@ -759,6 +760,8 @@ def from_group_dataframe( freq: Optional[Union[str, int]] = None, fillna_value: Optional[float] = None, drop_group_cols: Optional[Union[List[str], str]] = None, + n_jobs: Optional[int] = 1, + verbose: Optional[bool] = False, ) -> List[Self]: """ Build a list of TimeSeries instances grouped by a selection of columns from a DataFrame. @@ -808,6 +811,11 @@ def from_group_dataframe( Optionally, a numeric value to fill missing values (NaNs) with. drop_group_cols Optionally, a string or list of strings with `group_cols` column(s) to exclude from the static covariates. + n_jobs + Optionally, an integer representing the number of parallel jobs to run. Behavior is the same as in the + `joblib.Parallel` class. + verbose + Optionally, a boolean value indicating whether to display a progress bar. Returns ------- @@ -867,12 +875,18 @@ def from_group_dataframe( df = df.drop(columns=time_col) df = df.sort_index() - # split df by groups, and store group values and static values (static covariates) - # single elements group columns must be unpacked for same groupby() behavior across different pandas versions - splits = [] - for static_cov_vals, group in df.groupby( - group_cols[0] if len(group_cols) == 1 else group_cols - ): + groups = df.groupby(group_cols[0] if len(group_cols) == 1 else group_cols) + + iterator = _build_tqdm_iterator( + groups, + verbose=verbose, + total=len(groups), + desc="Creating TimeSeries", + ) + + def from_group(static_cov_vals, group): + split = group[extract_value_cols] + static_cov_vals = ( (static_cov_vals,) if not isinstance(static_cov_vals, tuple) @@ -910,29 +924,26 @@ def from_group_dataframe( ) # add the static covariates to the group values static_cov_vals += tuple(group[static_cols].values[0]) - # store static covariate Series and group DataFrame (without static cov columns) - splits.append( - ( - ( - pd.DataFrame([static_cov_vals], columns=extract_static_cov_cols) - if extract_static_cov_cols - else None - ), - group[extract_value_cols], - ) - ) - # create a list with multiple TimeSeries and add static covariates - return [ - cls.from_dataframe( + return cls.from_dataframe( df=split, fill_missing_dates=fill_missing_dates, freq=freq, fillna_value=fillna_value, - static_covariates=static_covs, + static_covariates=( + pd.DataFrame([static_cov_vals], columns=extract_static_cov_cols) + if extract_static_cov_cols + else None + ), ) - for static_covs, split in splits - ] + + return _parallel_apply( + iterator, + from_group, + n_jobs, + fn_args=dict(), + fn_kwargs=dict(), + ) @classmethod def from_series(