Skip to content

Commit

Permalink
#104 future_frame - parallel processing
Browse files Browse the repository at this point in the history
  • Loading branch information
mdancho84 committed Oct 11, 2023
1 parent aabc1ff commit a0f1ca1
Showing 1 changed file with 68 additions and 58 deletions.
126 changes: 68 additions & 58 deletions src/pytimetk/core/make_future_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

from pytimetk.utils.parallel_helpers import conditional_tqdm

from concurrent.futures import ThreadPoolExecutor

@pf.register_series_method
def make_future_timeseries(
idx: Union[str, List[str], pd.Series, pd.DatetimeIndex],
Expand Down Expand Up @@ -154,6 +156,7 @@ def future_frame(
The `future_frame` function extends a given DataFrame or GroupBy object with future dates based on a specified length, optionally binding the original data.
Parameters
----------
data : pd.DataFrame or pd.core.groupby.generic.DataFrameGroupBy
Expand All @@ -176,7 +179,21 @@ def future_frame(
-------
pd.DataFrame
An extended DataFrame with future dates.
Notes
-----
## Performance
This function uses a number of techniques to speed up computation for large datasets with many time series groups:
- We vectorize where possible and use parallel processing to speed up.
- The `threads` parameter controls the number of threads to use for parallel processing.
- Set threads = -1 to use all available processors.
- Set threads = 1 to disable parallel processing.
See Also
--------
make_future_timeseries: Generate future dates for a time series.
Expand All @@ -189,10 +206,8 @@ def future_frame(
df = tk.load_dataset('m4_hourly', parse_dates = ['date'])
df
```
```{python}
# Extend the data for a single time series group by 12 hours
# Example 1 - Extend the data for a single time series group by 12 hours
extended_df = (
df
.query('id == "H10"')
Expand All @@ -206,23 +221,24 @@ def future_frame(
```
```{python}
# Extend the data for each group by 12 hours
# Example 2 - Extend the data for each group by 12 hours
extended_df = (
df
.groupby('id')
.groupby('id', sort = False) # Use sort = False to preserve the original order of the data
.future_frame(
date_column = 'date',
length_out = 12
length_out = 12,
threads = 2 # Use 2 threads for parallel processing
)
)
extended_df
```
```{python}
# Same as above, but just return the extended data with bind_data=False
# Example 3 - Same as above, but just return the extended data with bind_data=False
extended_df = (
df
.groupby('id')
.groupby('id', sort = False)
.future_frame(
date_column = 'date',
length_out = 12,
Expand All @@ -233,21 +249,25 @@ def future_frame(
```
```{python}
# Working with irregular dates: Business Days (Stocks Data)
# Example 4 - Working with irregular dates: Business Days (Stocks Data)
import pytimetk as tk
import pandas as pd
# Stock data
df = tk.load_dataset('stocks_daily', parse_dates = ['date'])
df
```
```{python}
# Allow irregular future dates (i.e. business days)
extended_df = (
df
.groupby('symbol')
.groupby('symbol', sort = False)
.future_frame(
date_column = 'date',
length_out = 12,
force_regular = False, # Allow irregular future dates (i.e. business days)),
bind_data = True
bind_data = True,
threads = 1
)
)
extended_df
Expand All @@ -257,7 +277,7 @@ def future_frame(
# Force regular: Include Weekends
extended_df = (
df
.groupby('symbol')
.groupby('symbol', sort = False)
.future_frame(
date_column = 'date',
length_out = 12,
Expand All @@ -273,10 +293,7 @@ def future_frame(
check_dataframe_or_groupby(data)
check_date_column(data, date_column)

# DATAFRAME EXTENSION - If data is a Pandas DataFrame, extend with future dates

if isinstance(data, pd.DataFrame):

ts_series = data[date_column]

new_dates = make_future_timeseries(
Expand All @@ -294,42 +311,31 @@ def future_frame(

return extended_df


# GROUPED EXTENSION - If data is a GroupBy object, extend with future dates by group

elif isinstance(data, pd.core.groupby.generic.DataFrameGroupBy):

group_names = data.grouper.names

# If freq is None, infer the frequency from the first series in the data
if freq is None:

label_of_first_group = list(data.groups.keys())[0]
first_group = data.get_group(label_of_first_group)

freq = get_frequency(first_group[date_column].sort_values(), force_regular=force_regular)

# Use agg to get the last date of each group in a vectorized manner
last_dates_df = data.agg({date_column: 'max'}).reset_index()

future_dates_list = []

iterable = conditional_tqdm(last_dates_df.iterrows(), total=len(last_dates_df), display=show_progress)
chunk_size = int(len(last_dates_df) / threads)
subsets = [last_dates_df.iloc[i:i + chunk_size] for i in range(0, len(last_dates_df), chunk_size)]

for _, row in iterable:
future_dates = make_future_timeseries(
idx=pd.Series(row[date_column]),
length_out=length_out,
freq=freq,
force_regular=force_regular
)

future_dates_df = pd.DataFrame({date_column: future_dates})

for group_name in group_names:
future_dates_df[group_name] = row[group_name]

future_dates_list.append(future_dates_df)
future_dates_list = []
with ThreadPoolExecutor(max_workers=threads) as executor:
results = list(conditional_tqdm(executor.map(_process_future_frame_subset, subsets,
[date_column] * len(subsets),
[group_names] * len(subsets),
[length_out] * len(subsets),
[freq] * len(subsets),
[force_regular] * len(subsets)),
total=len(subsets), display=show_progress))
for future_dates_subset in results:
future_dates_list.extend(future_dates_subset)

future_dates_df = pd.concat(future_dates_list, axis=0).reset_index(drop=True)

Expand All @@ -338,27 +344,31 @@ def future_frame(
else:
extended_df = future_dates_df

# Sort
# extended_df = extended_df.sort_values(by=[*group_names, date_column]).reset_index(drop=True)

return extended_df


# Monkey patch the method to pandas groupby objects
pd.core.groupby.generic.DataFrameGroupBy.future_frame = future_frame

# UTILITIES ------------------------------------------------------------------
def _parallel_group_extension(group_data, date_column, length_out, freq, force_regular, group_names):

last_date = group_data[date_column].max()

future_dates = make_future_timeseries(
idx=pd.Series(last_date),
length_out=length_out,
freq=freq,
force_regular=force_regular
)

future_dates_df = pd.DataFrame({date_column: future_dates})

for group_name in group_names:
future_dates_df[group_name] = group_data[group_name].iloc[0]

return future_dates_df
def _process_future_frame_subset(subset, date_column, group_names, length_out, freq, force_regular):
future_dates_list = []
for _, row in subset.iterrows():
future_dates = make_future_timeseries(
idx=pd.Series(row[date_column]),
length_out=length_out,
freq=freq,
force_regular=force_regular
)

future_dates_df = pd.DataFrame({date_column: future_dates})
for group_name in group_names:
future_dates_df[group_name] = row[group_name]

future_dates_list.append(future_dates_df)
return future_dates_list

0 comments on commit a0f1ca1

Please sign in to comment.