From d69694493ca2c0629c659031f7f005bdff3f4c28 Mon Sep 17 00:00:00 2001 From: amr Date: Wed, 4 Oct 2023 23:06:29 -0700 Subject: [PATCH 1/2] Overhaul ugment_rolling in rolling.py to streamline window function handling --- src/pytimetk/core/rolling.py | 277 ++++++++++++++++++++++++----------- 1 file changed, 190 insertions(+), 87 deletions(-) diff --git a/src/pytimetk/core/rolling.py b/src/pytimetk/core/rolling.py index 45982a8d..ba2c3769 100644 --- a/src/pytimetk/core/rolling.py +++ b/src/pytimetk/core/rolling.py @@ -11,9 +11,9 @@ def augment_rolling( data: Union[pd.DataFrame, pd.core.groupby.generic.DataFrameGroupBy], date_column: str, value_column: Union[str, list], - use_independent_variables: bool = False, window: Union[int, tuple, list] = 2, window_func: Union[str, list, Tuple[str, Callable]] = 'mean', + window_func_with_iv: Union[str, list, Tuple[str, Callable]] = [], min_periods: Optional[int] = None, center: bool = False, **kwargs, @@ -23,32 +23,40 @@ def augment_rolling( Parameters ---------- data : Union[pd.DataFrame, pd.core.groupby.generic.DataFrameGroupBy] - The `data` parameter is the input DataFrame or GroupBy object that contains the data to be processed. It can be either a Pandas DataFrame or a GroupBy object. + Input data to be processed. Can be a Pandas DataFrame or a GroupBy object. date_column : str - The `date_column` parameter is the name of the datetime column in the DataFrame by which the data should be sorted within each group. + Name of the datetime column. Data is sorted by this column within each group. value_column : Union[str, list] - The `value_column` parameter is the name of the column(s) in the DataFrame to which the rolling window function(s) should be applied. It can be a single column name or a list of column names. - use_independent_variables : bool - The `use_independent_variables` parameter is an optional parameter that specifies whether the rolling function(s) require independent variables, such as rolling correlation or rolling regression. (See Examples below.) + Column(s) to which the rolling window functions should be applied. Can be a single column name or a list. window : Union[int, tuple, list], optional - The `window` parameter in the `augment_rolling` function is used to specify the size of the rolling windows. It can be either an integer or a list of integers. - - - If it is an integer, the same window size will be applied to all columns specified in the `value_column`. - - - If it is a tuple, it will generate windows from the first to the second value (inclusive). - - - If it is a list of integers, each integer in the list will be used as the window size for the corresponding column in the `value_column` list. + Specifies the size of the rolling windows. + - An integer applies the same window size to all columns in `value_column`. + - A tuple generates windows from the first to the second value (inclusive). + - A list of integers designates multiple window sizes for each respective column. window_func : Union[str, list, Tuple[str, Callable]], optional - The `window_func` parameter in the `augment_rolling` function is used to specify the function(s) to be applied to the rolling windows. - - 1. It can be a string or a list of strings, where each string represents the name of the function to be applied. - - 2. Alternatively, it can be a list of tuples, where each tuple contains the name of the function to be applied and the function itself. The function is applied as a Pandas Series. (See Examples below.) - - 3. If the function requires independent variables, the `use_independent_variables` parameter must be specified. The independent variables will be passed to the function as a DataFrame containing the window of rows. (See Examples below.) - + The `window_func` parameter in the `augment_rolling` function specifies the function(s) to apply to the rolling windows. + + 1. If your function operates solely on a single value column: + - It can be a string representing the name of a standard function (e.g., 'mean', 'sum'). + - For custom functions, provide a list of tuples, where each tuple consists of a custom name and the function itself. The function should accept a Pandas Series as input. (See Examples below.) + + 2. If your function requires independent variables (i.e., it uses more than just the single value column), you should use 'window_func_with_iv' instead. Such functions should expect a DataFrame input, representing the current window of rows. (Refer to the Examples section below.) + + window_func_with_iv : Union[str, list, Tuple[str, Callable]], optional + The `window_func_with_iv` parameter in the `augment_rolling` function specifies function(s) requiring independent variables for the rolling windows. + + 1. It can be either: + - A string representing the name of a predefined function. + - A list of strings, each specifying a function name. + - A list of tuples, where each tuple contains a custom function name and the function itself. + + 2. Functions specified under `window_func_with_iv` take a Pandas DataFrame, representing the current rolling window of rows, as input. They operate on more than just one value column, utilizing multiple columns or contextual data from the entire window. + + 3. If your function processes only a single value column and doesn't need other columns as context, consider using 'window_func' instead. (Refer to the Examples section below.) + min_periods : int, optional, default None + Minimum observations in the window to have a value. Defaults to the window size. If set, a value will be produced even if fewer observations are present than the window size. center : bool, optional - The `center` parameter in the `augment_rolling` function determines whether the rolling window is centered or not. If `center` is set to `True`, the rolling window will be centered, meaning that the value at the center of the window will be used as the result. If ` + If `True`, the rolling window will be centered on the current value. For even-sized windows, the window will be left-biased. Otherwise, it uses a trailing window. Returns ------- @@ -66,40 +74,53 @@ def augment_rolling( ``` ```{python} - # String Function Name and Series Lambda Function (no independent variables) - # window = [2,7] yields only 2 and 7 + # This example demonstrates the use of both string-named functions + # and lambda functions on a rolling window with no independent variables. + # We specify a list of window sizes: [2,7]. + # As a result, the output will have computations for both window sizes 2 and 7. + rolled_df = ( df .groupby('id') .augment_rolling( date_column = 'date', value_column = 'value', - window = [2,7], - window_func = ['mean', ('std', lambda x: x.std())] + window = [2,7], # Specifying multiple window sizes + window_func = [ + 'mean', # Built-in mean function + ('std', lambda x: x.std()) # Lambda function to compute standard deviation + ] ) ) - rolled_df + display(rolled_df) ``` ```{python} - # String Function Name and Series Lambda Function (no independent variables) - # window = (1,3) yields 1, 2, and 3 + # Example showcasing the use of string function names and lambda functions + # applied on rolling windows. In this case, no independent variables are required. + # The `window` tuple (1,3) will generate window sizes of 1, 2, and 3. + rolled_df = ( df .groupby('id') .augment_rolling( date_column = 'date', value_column = 'value', - window = (1,3), - window_func = ['mean', ('std', lambda x: x.std())] + window = (1,3), # Specifying a range of window sizes + window_func = [ + 'mean', # Using built-in mean function + ('std', lambda x: x.std()) # Lambda function for standard deviation + ] ) ) - rolled_df + display(rolled_df) ``` ```{python} - # Rolling Correlation: Uses independent variables (value2) - + # Example showcasing the rolling correlation between two columns (`value1` and `value2`). + # The correlation requires both columns as input (i.e., independent variables). + + # Sample DataFrame with id, date, value1, and value2 columns. df = pd.DataFrame({ 'id': [1, 1, 1, 2, 2, 2], 'date': pd.to_datetime(['2023-01-01', '2023-01-02', '2023-01-03', '2023-01-04', '2023-01-05', '2023-01-06']), @@ -107,27 +128,31 @@ def augment_rolling( 'value2': [2, 16, 20, 40, 41, 50], }) - result_df = ( + # Compute the rolling correlation for each group of 'id' + # Using a rolling window of size 3 and a lambda function to calculate the correlation. + rolling_df = ( df.groupby('id') .augment_rolling( date_column='date', value_column='value1', - use_independent_variables=True, window=3, - window_func=[('corr', lambda df: df['value1'].corr(df['value2']))], - center = False + window_func=[], # No standard window functions specified + window_func_with_iv=[('corr', lambda x: x['value1'].corr(x['value2']))], # Lambda function for correlation + center = False # Not centering the rolling window ) ) - result_df + display(rolling_df) ``` ```{python} - # Rolling Regression: Using independent variables (value2 and value3) - - # Requires: scikit-learn + # Rolling Regression Example: Using independent variables (`value2` and `value3`) + # This example demonstrates how to perform a rolling regression using two independent variables. + + # Required module (scikit-learn) for regression. from sklearn.linear_model import LinearRegression + # Sample DataFrame with `id`, `date`, `value1`, `value2`, and `value3` columns. df = pd.DataFrame({ 'id': [1, 1, 1, 2, 2, 2], 'date': pd.to_datetime(['2023-01-01', '2023-01-02', '2023-01-03', '2023-01-04', '2023-01-05', '2023-01-06']), @@ -136,64 +161,142 @@ def augment_rolling( 'value3': [2, 3, 6, 9, 10, 13] }) - # Define Regression Function + # Define Regression Function to be applied on the rolling window. def regression(df): model = LinearRegression() - X = df[['value2', 'value3']] # Extract X values (independent variables) - y = df['value1'] # Extract y values (dependent variable) + X = df[['value2', 'value3']] # Independent variables + y = df['value1'] # Dependent variable model.fit(X, y) ret = pd.Series([model.intercept_, model.coef_[0]], index=['Intercept', 'Slope']) return ret # Return intercept and slope as a Series - # Example to call the function + # Compute the rolling regression for each group of `id` + # Using a rolling window of size 3 and the regression function. result_df = ( df.groupby('id') .augment_rolling( date_column='date', value_column='value1', - use_independent_variables=True, window=3, - window_func=[('regression', regression)] + window_func=[], + window_func_with_iv=[('regression', regression)] ) .dropna() ) - # Display Results in Wide Format since returning multiple values - regression_wide_df = pd.concat(result_df['value1_rolling_regression_win_3'].to_list(), axis=1).T - + # Format the results to have each regression output (slope and intercept) in separate columns. + regression_wide_df = pd.concat(result_df['rolling_regression_win_3'].to_list(), axis=1).T regression_wide_df = pd.concat([result_df.reset_index(drop = True), regression_wide_df], axis=1) + display(regression_wide_df) + + ``` + + ```{python} + # This example is a showcase of the diverse functionalities available through the `augment_rolling` function. + # Key Highlights: + # - Use of built-in Pandas rolling window functions: mean and std. + # - Incorporation of custom-defined functions for more specific tasks: sample and population standard deviations. + # - Advanced rolling operations requiring independent variables, represented by correlation and regression. + # - Handling of multiple value columns, capturing broader data dynamics. + + # Required module (scikit-learn) for regression. + from sklearn.linear_model import LinearRegression + + # Sample DataFrame with `id`, `date`, `value1`, `value2`, and `value3` columns. + df = pd.DataFrame({ + 'id': [1, 1, 1, 2, 2, 2], + 'date': pd.to_datetime(['2023-01-01', '2023-01-02', '2023-01-03', '2023-01-04', '2023-01-05', '2023-01-06']), + 'value1': [10, 20, 29, 42, 53, 59], + 'value2': [5, 16, 24, 35, 45, 58], + 'value3': [2, 3, 6, 9, 10, 13] + }) + + # Define Regression Function to be applied on the rolling window. + def regression(df): + + model = LinearRegression() + X = df[['value2', 'value3']] # Independent variables + y = df['value1'] # Dependent variable + model.fit(X, y) + ret = pd.Series([model.intercept_, model.coef_[0]], index=['Intercept', 'Slope']) + + return ret # Return intercept and slope as a Series - regression_wide_df + # Define a function to calculate the sample standard deviation. + def sample_std(data): + n = len(data) + if n < 2: + return float('nan') + mean = sum(data) / n + variance = sum((x - mean) ** 2 for x in data) / (n - 1) + return variance ** 0.5 + + # Define a function to calculate the population standard deviation. + def population_std(data): + n = len(data) + if n == 0: + return float('nan') + mean = sum(data) / n + variance = sum((x - mean) ** 2 for x in data) / n + return variance ** 0.5 + + # Use the `augment_rolling` function to compute various rolling window metrics: + # mean, standard deviation, sample std, population std, correlation, and regression. + rolled_df = ( + df.augment_rolling( + date_column = 'date', + value_column = ['value1', 'value2', 'value3'], + window=[2, 4], + window_func = [ + 'mean', + 'std', + ('sample_std', lambda x: sample_std(x)), + ('pop_std', lambda x: population_std(x)) + ], + window_func_with_iv = [ + ('corr', lambda x: x['value1'].corr(x['value2'])), + ('regression', regression) + ], + min_periods=1, + center=True + ) +) + rolled_df ``` ''' - # Common checks + # Ensure data is a DataFrame or a GroupBy object check_dataframe_or_groupby(data) + + # Ensure date column exists and is properly formatted check_date_column(data, date_column) + + # Ensure value column(s) exist check_value_column(data, value_column) - # Specific checks + # Convert string value column to list for consistency if isinstance(value_column, str): value_column = [value_column] + # Validate window argument and convert it to a consistent list format if not isinstance(window, (int, tuple, list)): raise TypeError("`window` must be an integer, tuple, or list.") - if isinstance(window, int): window = [window] elif isinstance(window, tuple): window = list(range(window[0], window[1] + 1)) - + + # Convert single window function to list for consistent processing if isinstance(window_func, (str, tuple)): window_func = [window_func] - # Copy Data + # Create a fresh copy of the data, leaving the original untouched data_copy = data.copy() if isinstance(data, pd.DataFrame) else data.obj.copy() - # Format Data + # Group data if it's a GroupBy object; otherwise, prepare it for the rolling calculations if isinstance(data, pd.core.groupby.generic.DataFrameGroupBy): group_names = data.grouper.names grouped = data_copy.sort_values(by=[*group_names, date_column]).groupby(group_names) @@ -202,8 +305,8 @@ def regression(df): grouped = [([], data_copy.sort_values(by=[date_column]))] - # Rolling Apply Function when independent variables are required - def rolling_apply_2(func, df, window_size, min_periods, center): + # Helper function to apply rolling calculations that require independent variables + def rolling_apply(func, df, window_size, min_periods, center): results = [np.nan] * len(df) adjusted_window = window_size // 2 if center else window_size - 1 # determine the offset for centering @@ -212,7 +315,7 @@ def rolling_apply_2(func, df, window_size, min_periods, center): if window_size % 2 == 0: # left biased window if window size is even start = max(0, center_point - adjusted_window) end = min(len(df), center_point + adjusted_window) - else: + else: start = max(0, center_point - adjusted_window) end = min(len(df), center_point + adjusted_window + 1) else: @@ -224,49 +327,49 @@ def rolling_apply_2(func, df, window_size, min_periods, center): if len(window_df) >= min_periods: results[center_point if center else end - 1] = func(window_df) - return pd.DataFrame(results, columns=['result'], index=df.index) - + return pd.DataFrame({'result': results}, index=df.index) - # Apply Rolling Functions + # Apply rolling window functions result_dfs = [] for _, group_df in grouped: + + # Apply the basic window functions for value_col in value_column: for window_size in window: - - if min_periods is None: - min_periods = window_size - + # Set min_periods to window_size if not specified + min_periods = window_size if min_periods is None else min_periods + for func in window_func: if isinstance(func, tuple): func_name, func = func new_column_name = f"{value_col}_rolling_{func_name}_win_{window_size}" - - if use_independent_variables: - group_df[new_column_name] = rolling_apply_2(func, group_df, window_size, min_periods=min_periods, center=center) - else: - try: - group_df[new_column_name] = group_df[value_col].rolling(window=window_size, min_periods=min_periods, center=center, **kwargs).apply(func, raw=True) - except Exception as e: - try: # try independent variables incase user mistakenly did not set to True - group_df[new_column_name] = rolling_apply_2(func, group_df, window_size, min_periods=min_periods, center=center) - except: - raise e - + group_df[new_column_name] = group_df[value_col].rolling(window=window_size, min_periods=min_periods, center=center, **kwargs).apply(func, raw=True) + elif isinstance(func, str): new_column_name = f"{value_col}_rolling_{func}_win_{window_size}" - - rolling_method = getattr(group_df[value_col].rolling(window=window_size, min_periods=min_periods, center=center, **kwargs), func, None) - - if rolling_method: - group_df[new_column_name] = rolling_method() + # Get the rolling function (like mean, sum, etc.) specified by `func` for the given column and window settings + rolling_function = getattr(group_df[value_col].rolling(window=window_size, min_periods=min_periods, center=center, **kwargs), func, None) + # Apply rolling function to data and store in new column + if rolling_function: + group_df[new_column_name] = rolling_function() else: raise ValueError(f"Invalid function name: {func}") - else: raise TypeError(f"Invalid function type: {type(func)}") + # Apply the functions that require independent variables + for window_size in window: + for func in window_func_with_iv: + if isinstance(func, tuple): + func_name, func = func + new_column_name = f"rolling_{func_name}_win_{window_size}" + group_df[new_column_name] = rolling_apply(func, group_df, window_size, min_periods=min_periods, center=center) + else: + raise TypeError(f"Expected 'tuple', but got invalid function type: {type(func)}") + result_dfs.append(group_df) - + + # Combine processed dataframes and sort by index result_df = pd.concat(result_dfs).sort_index() # Sort by the original index return result_df From 652aad3cc1bda48f230346d01abb525cb7d1d707 Mon Sep 17 00:00:00 2001 From: amr Date: Fri, 6 Oct 2023 11:01:16 -0700 Subject: [PATCH 2/2] Refactor: Split into augment_rolling and augment_rolling_apply --- src/pytimetk/core/rolling.py | 312 ++++++++++++++++++----------------- 1 file changed, 157 insertions(+), 155 deletions(-) diff --git a/src/pytimetk/core/rolling.py b/src/pytimetk/core/rolling.py index ba2c3769..189f92b8 100644 --- a/src/pytimetk/core/rolling.py +++ b/src/pytimetk/core/rolling.py @@ -10,10 +10,9 @@ def augment_rolling( data: Union[pd.DataFrame, pd.core.groupby.generic.DataFrameGroupBy], date_column: str, - value_column: Union[str, list], - window: Union[int, tuple, list] = 2, + value_column: Union[str, list], window_func: Union[str, list, Tuple[str, Callable]] = 'mean', - window_func_with_iv: Union[str, list, Tuple[str, Callable]] = [], + window: Union[int, tuple, list] = 2, min_periods: Optional[int] = None, center: bool = False, **kwargs, @@ -28,34 +27,28 @@ def augment_rolling( Name of the datetime column. Data is sorted by this column within each group. value_column : Union[str, list] Column(s) to which the rolling window functions should be applied. Can be a single column name or a list. - window : Union[int, tuple, list], optional + window_func : Union[str, list, Tuple[str, Callable]], optional, default 'mean' + The `window_func` parameter in the `augment_rolling` function specifies the function(s) to be applied to the rolling windows of the value column(s). + + 1. It can be either: + - A string representing the name of a standard function (e.g., 'mean', 'sum'). + + 2. For custom functions: + - Provide a list of tuples. Each tuple should contain a custom name for the function and the function itself. + - Each custom function should accept a Pandas Series as its input and operate on that series. + Example: ("range", lambda x: x.max() - x.min()) + + (See more Examples below.) + + Note: If your function needs to operate on multiple columns (i.e., it requires access to a DataFrame rather than just a Series), consider using the `augment_rolling_apply` function in this library. + window : Union[int, tuple, list], optional, default 2 Specifies the size of the rolling windows. - An integer applies the same window size to all columns in `value_column`. - A tuple generates windows from the first to the second value (inclusive). - A list of integers designates multiple window sizes for each respective column. - window_func : Union[str, list, Tuple[str, Callable]], optional - The `window_func` parameter in the `augment_rolling` function specifies the function(s) to apply to the rolling windows. - - 1. If your function operates solely on a single value column: - - It can be a string representing the name of a standard function (e.g., 'mean', 'sum'). - - For custom functions, provide a list of tuples, where each tuple consists of a custom name and the function itself. The function should accept a Pandas Series as input. (See Examples below.) - - 2. If your function requires independent variables (i.e., it uses more than just the single value column), you should use 'window_func_with_iv' instead. Such functions should expect a DataFrame input, representing the current window of rows. (Refer to the Examples section below.) - - window_func_with_iv : Union[str, list, Tuple[str, Callable]], optional - The `window_func_with_iv` parameter in the `augment_rolling` function specifies function(s) requiring independent variables for the rolling windows. - - 1. It can be either: - - A string representing the name of a predefined function. - - A list of strings, each specifying a function name. - - A list of tuples, where each tuple contains a custom function name and the function itself. - - 2. Functions specified under `window_func_with_iv` take a Pandas DataFrame, representing the current rolling window of rows, as input. They operate on more than just one value column, utilizing multiple columns or contextual data from the entire window. - - 3. If your function processes only a single value column and doesn't need other columns as context, consider using 'window_func' instead. (Refer to the Examples section below.) min_periods : int, optional, default None Minimum observations in the window to have a value. Defaults to the window size. If set, a value will be produced even if fewer observations are present than the window size. - center : bool, optional + center : bool, optional, default False If `True`, the rolling window will be centered on the current value. For even-sized windows, the window will be left-biased. Otherwise, it uses a trailing window. Returns @@ -75,7 +68,7 @@ def augment_rolling( ```{python} # This example demonstrates the use of both string-named functions - # and lambda functions on a rolling window with no independent variables. + # and lambda functions on a rolling window. # We specify a list of window sizes: [2,7]. # As a result, the output will have computations for both window sizes 2 and 7. @@ -97,7 +90,7 @@ def augment_rolling( ```{python} # Example showcasing the use of string function names and lambda functions - # applied on rolling windows. In this case, no independent variables are required. + # applied on rolling windows. # The `window` tuple (1,3) will generate window sizes of 1, 2, and 3. rolled_df = ( @@ -114,12 +107,132 @@ def augment_rolling( ) ) display(rolled_df) + ``` + ''' + # Ensure data is a DataFrame or a GroupBy object + check_dataframe_or_groupby(data) + + # Ensure date column exists and is properly formatted + check_date_column(data, date_column) + + # Ensure value column(s) exist + check_value_column(data, value_column) + + # Convert string value column to list for consistency + if isinstance(value_column, str): + value_column = [value_column] + + # Validate window argument and convert it to a consistent list format + if not isinstance(window, (int, tuple, list)): + raise TypeError("`window` must be an integer, tuple, or list.") + if isinstance(window, int): + window = [window] + elif isinstance(window, tuple): + window = list(range(window[0], window[1] + 1)) + + # Convert single window function to list for consistent processing + if isinstance(window_func, (str, tuple)): + window_func = [window_func] + + # Create a fresh copy of the data, leaving the original untouched + data_copy = data.copy() if isinstance(data, pd.DataFrame) else data.obj.copy() + + # Group data if it's a GroupBy object; otherwise, prepare it for the rolling calculations + if isinstance(data, pd.core.groupby.generic.DataFrameGroupBy): + group_names = data.grouper.names + grouped = data_copy.sort_values(by=[*group_names, date_column]).groupby(group_names) + else: + group_names = None + grouped = [([], data_copy.sort_values(by=[date_column]))] + + # Apply Series-based rolling window functions + result_dfs = [] + for _, group_df in grouped: + for value_col in value_column: + for window_size in window: + min_periods = window_size if min_periods is None else min_periods + for func in window_func: + if isinstance(func, tuple): + func_name, func = func + new_column_name = f"{value_col}_rolling_{func_name}_win_{window_size}" + group_df[new_column_name] = group_df[value_col].rolling(window=window_size, min_periods=min_periods, center=center, **kwargs).apply(func, raw=True) + + elif isinstance(func, str): + new_column_name = f"{value_col}_rolling_{func}_win_{window_size}" + # Get the rolling function (like mean, sum, etc.) specified by `func` for the given column and window settings + rolling_function = getattr(group_df[value_col].rolling(window=window_size, min_periods=min_periods, center=center, **kwargs), func, None) + # Apply rolling function to data and store in new column + if rolling_function: + group_df[new_column_name] = rolling_function() + else: + raise ValueError(f"Invalid function name: {func}") + else: + raise TypeError(f"Invalid function type: {type(func)}") + + result_dfs.append(group_df) + + # Combine processed dataframes and sort by index + result_df = pd.concat(result_dfs).sort_index() # Sort by the original index + + return result_df + +# Monkey patch the method to pandas groupby objects +pd.core.groupby.generic.DataFrameGroupBy.augment_rolling = augment_rolling + + +@pf.register_dataframe_method +def augment_rolling_apply( + data: Union[pd.DataFrame, pd.core.groupby.generic.DataFrameGroupBy], + date_column: str, + window_func: Union[Tuple[str, Callable], List[Tuple[str, Callable]]], + window: Union[int, tuple, list] = 2, + min_periods: Optional[int] = None, + center: bool = False, +) -> pd.DataFrame: + '''Apply one or more rolling functions and window sizes to one or more columns of a DataFrame. + + Parameters + ---------- + data : Union[pd.DataFrame, pd.core.groupby.generic.DataFrameGroupBy] + Input data to be processed. Can be a Pandas DataFrame or a GroupBy object. + date_column : str + Name of the datetime column. Data is sorted by this column within each group. + window_func : Union[Tuple[str, Callable], List[Tuple[str, Callable]]] + The `window_func` parameter in the `augment_rolling_apply` function specifies the function(s) that operate on a rolling window with the consideration of multiple columns. + + The specification can be: + - A tuple where the first element is a string representing the function's name and the second element is the callable function itself. + - A list of such tuples for multiple functions. + + Note: For functions targeting only a single value column without the need for contextual data from other columns, consider using the `augment_rolling` function in this library. + window : Union[int, tuple, list], optional + Specifies the size of the rolling windows. + - An integer applies the same window size to all columns in `value_column`. + - A tuple generates windows from the first to the second value (inclusive). + - A list of integers designates multiple window sizes for each respective column. + min_periods : int, optional, default None + Minimum observations in the window to have a value. Defaults to the window size. If set, a value will be produced even if fewer observations are present than the window size. + center : bool, optional + If `True`, the rolling window will be centered on the current value. For even-sized windows, the window will be left-biased. Otherwise, it uses a trailing window. + + Returns + ------- + pd.DataFrame + The `augment_rolling` function returns a DataFrame with new columns for each applied function, window size, and value column. + + Examples + -------- + ```{python} + import pytimetk as tk + import pandas as pd + import numpy as np + ``` ```{python} # Example showcasing the rolling correlation between two columns (`value1` and `value2`). - # The correlation requires both columns as input (i.e., independent variables). - + # The correlation requires both columns as input. + # Sample DataFrame with id, date, value1, and value2 columns. df = pd.DataFrame({ 'id': [1, 1, 1, 2, 2, 2], @@ -132,12 +245,10 @@ def augment_rolling( # Using a rolling window of size 3 and a lambda function to calculate the correlation. rolling_df = ( df.groupby('id') - .augment_rolling( + .augment_rolling_apply( date_column='date', - value_column='value1', window=3, - window_func=[], # No standard window functions specified - window_func_with_iv=[('corr', lambda x: x['value1'].corr(x['value2']))], # Lambda function for correlation + window_func=[('corr', lambda x: x['value1'].corr(x['value2']))], # Lambda function for correlation center = False # Not centering the rolling window ) ) @@ -146,7 +257,7 @@ def augment_rolling( ``` ```{python} - # Rolling Regression Example: Using independent variables (`value2` and `value3`) + # Rolling Regression Example: Using `value1` as the dependent variable and `value2` and `value3` as the independent variables. # This example demonstrates how to perform a rolling regression using two independent variables. # Required module (scikit-learn) for regression. @@ -172,17 +283,14 @@ def regression(df): return ret # Return intercept and slope as a Series - # Compute the rolling regression for each group of `id` # Using a rolling window of size 3 and the regression function. result_df = ( df.groupby('id') - .augment_rolling( + .augment_rolling_apply( date_column='date', - value_column='value1', window=3, - window_func=[], - window_func_with_iv=[('regression', regression)] + window_func=[('regression', regression)] ) .dropna() ) @@ -192,95 +300,14 @@ def regression(df): regression_wide_df = pd.concat([result_df.reset_index(drop = True), regression_wide_df], axis=1) display(regression_wide_df) - ``` - - ```{python} - # This example is a showcase of the diverse functionalities available through the `augment_rolling` function. - # Key Highlights: - # - Use of built-in Pandas rolling window functions: mean and std. - # - Incorporation of custom-defined functions for more specific tasks: sample and population standard deviations. - # - Advanced rolling operations requiring independent variables, represented by correlation and regression. - # - Handling of multiple value columns, capturing broader data dynamics. - - # Required module (scikit-learn) for regression. - from sklearn.linear_model import LinearRegression - - # Sample DataFrame with `id`, `date`, `value1`, `value2`, and `value3` columns. - df = pd.DataFrame({ - 'id': [1, 1, 1, 2, 2, 2], - 'date': pd.to_datetime(['2023-01-01', '2023-01-02', '2023-01-03', '2023-01-04', '2023-01-05', '2023-01-06']), - 'value1': [10, 20, 29, 42, 53, 59], - 'value2': [5, 16, 24, 35, 45, 58], - 'value3': [2, 3, 6, 9, 10, 13] - }) - - # Define Regression Function to be applied on the rolling window. - def regression(df): - - model = LinearRegression() - X = df[['value2', 'value3']] # Independent variables - y = df['value1'] # Dependent variable - model.fit(X, y) - ret = pd.Series([model.intercept_, model.coef_[0]], index=['Intercept', 'Slope']) - - return ret # Return intercept and slope as a Series - - # Define a function to calculate the sample standard deviation. - def sample_std(data): - n = len(data) - if n < 2: - return float('nan') - mean = sum(data) / n - variance = sum((x - mean) ** 2 for x in data) / (n - 1) - return variance ** 0.5 - - # Define a function to calculate the population standard deviation. - def population_std(data): - n = len(data) - if n == 0: - return float('nan') - mean = sum(data) / n - variance = sum((x - mean) ** 2 for x in data) / n - return variance ** 0.5 - - # Use the `augment_rolling` function to compute various rolling window metrics: - # mean, standard deviation, sample std, population std, correlation, and regression. - rolled_df = ( - df.augment_rolling( - date_column = 'date', - value_column = ['value1', 'value2', 'value3'], - window=[2, 4], - window_func = [ - 'mean', - 'std', - ('sample_std', lambda x: sample_std(x)), - ('pop_std', lambda x: population_std(x)) - ], - window_func_with_iv = [ - ('corr', lambda x: x['value1'].corr(x['value2'])), - ('regression', regression) - ], - min_periods=1, - center=True - ) -) - rolled_df ``` ''' - # Ensure data is a DataFrame or a GroupBy object check_dataframe_or_groupby(data) # Ensure date column exists and is properly formatted check_date_column(data, date_column) - # Ensure value column(s) exist - check_value_column(data, value_column) - - # Convert string value column to list for consistency - if isinstance(value_column, str): - value_column = [value_column] - # Validate window argument and convert it to a consistent list format if not isinstance(window, (int, tuple, list)): raise TypeError("`window` must be an integer, tuple, or list.") @@ -304,20 +331,20 @@ def population_std(data): group_names = None grouped = [([], data_copy.sort_values(by=[date_column]))] - - # Helper function to apply rolling calculations that require independent variables + # Helper function to apply rolling calculations on a dataframe def rolling_apply(func, df, window_size, min_periods, center): - results = [np.nan] * len(df) + num_rows = len(df) + results = [np.nan] * num_rows adjusted_window = window_size // 2 if center else window_size - 1 # determine the offset for centering - for center_point in range(len(df)): + for center_point in range(num_rows): if center: if window_size % 2 == 0: # left biased window if window size is even start = max(0, center_point - adjusted_window) - end = min(len(df), center_point + adjusted_window) + end = min(num_rows, center_point + adjusted_window) else: start = max(0, center_point - adjusted_window) - end = min(len(df), center_point + adjusted_window + 1) + end = min(num_rows, center_point + adjusted_window + 1) else: start = max(0, center_point - adjusted_window) end = center_point + 1 @@ -329,37 +356,12 @@ def rolling_apply(func, df, window_size, min_periods, center): return pd.DataFrame({'result': results}, index=df.index) - # Apply rolling window functions + # Apply DataFrame-based rolling window functions result_dfs = [] for _, group_df in grouped: - - # Apply the basic window functions - for value_col in value_column: - for window_size in window: - # Set min_periods to window_size if not specified - min_periods = window_size if min_periods is None else min_periods - - for func in window_func: - if isinstance(func, tuple): - func_name, func = func - new_column_name = f"{value_col}_rolling_{func_name}_win_{window_size}" - group_df[new_column_name] = group_df[value_col].rolling(window=window_size, min_periods=min_periods, center=center, **kwargs).apply(func, raw=True) - - elif isinstance(func, str): - new_column_name = f"{value_col}_rolling_{func}_win_{window_size}" - # Get the rolling function (like mean, sum, etc.) specified by `func` for the given column and window settings - rolling_function = getattr(group_df[value_col].rolling(window=window_size, min_periods=min_periods, center=center, **kwargs), func, None) - # Apply rolling function to data and store in new column - if rolling_function: - group_df[new_column_name] = rolling_function() - else: - raise ValueError(f"Invalid function name: {func}") - else: - raise TypeError(f"Invalid function type: {type(func)}") - - # Apply the functions that require independent variables for window_size in window: - for func in window_func_with_iv: + min_periods = window_size if min_periods is None else min_periods + for func in window_func: if isinstance(func, tuple): func_name, func = func new_column_name = f"rolling_{func_name}_win_{window_size}" @@ -375,4 +377,4 @@ def rolling_apply(func, df, window_size, min_periods, center): return result_df # Monkey patch the method to pandas groupby objects -pd.core.groupby.generic.DataFrameGroupBy.augment_rolling = augment_rolling +pd.core.groupby.generic.DataFrameGroupBy.augment_rolling_apply = augment_rolling_apply \ No newline at end of file