From 2c1f1253029817dbc46d47818f20516cf069f41a Mon Sep 17 00:00:00 2001 From: Ralph Rassweiler Date: Mon, 13 Nov 2023 11:27:38 -0300 Subject: [PATCH] feat(MLOP-1985): optional params (#347) * feat: optional params --- butterfree/extract/source.py | 13 +++++++++++-- butterfree/transform/aggregated_feature_set.py | 17 ++++++++++++++--- butterfree/transform/feature_set.py | 16 ++++++++++++++-- 3 files changed, 39 insertions(+), 7 deletions(-) diff --git a/butterfree/extract/source.py b/butterfree/extract/source.py index 6d905c6b5..1209e9162 100644 --- a/butterfree/extract/source.py +++ b/butterfree/extract/source.py @@ -49,13 +49,22 @@ class Source(HookableComponent): temporary views regarding each reader and, after, will run the desired query and return a dataframe. + The `eager_evaluation` param forces Spark to apply the currently + mapped changes to the DataFrame. When this parameter is set to + False, Spark follows its standard behaviour of lazy evaluation. + Lazy evaluation can improve Spark's performance as it allows + Spark to build the best version of the execution plan. + """ - def __init__(self, readers: List[Reader], query: str) -> None: + def __init__( + self, readers: List[Reader], query: str, eager_evaluation: bool = True, + ) -> None: super().__init__() self.enable_pre_hooks = False self.readers = readers self.query = query + self.eager_evaluation = eager_evaluation def construct( self, client: SparkClient, start_date: str = None, end_date: str = None @@ -87,7 +96,7 @@ def construct( dataframe = client.sql(self.query) - if not dataframe.isStreaming: + if not dataframe.isStreaming and self.eager_evaluation: dataframe.cache().count() post_hook_df = self.run_post_hooks(dataframe) diff --git a/butterfree/transform/aggregated_feature_set.py b/butterfree/transform/aggregated_feature_set.py index 133195d72..0bff33c65 100644 --- a/butterfree/transform/aggregated_feature_set.py +++ b/butterfree/transform/aggregated_feature_set.py @@ -197,6 +197,8 @@ def __init__( keys: List[KeyFeature], timestamp: TimestampFeature, features: List[Feature], + deduplicate_rows: bool = True, + eager_evaluation: bool = True, ): self._windows: List[Any] = [] self._pivot_column: Optional[str] = None @@ -204,7 +206,14 @@ def __init__( self._distinct_subset: List[Any] = [] self._distinct_keep: Optional[str] = None super(AggregatedFeatureSet, self).__init__( - name, entity, description, keys, timestamp, features, + name, + entity, + description, + keys, + timestamp, + features, + deduplicate_rows, + eager_evaluation, ) @property @@ -626,8 +635,10 @@ def construct( float("nan"), None ) if not output_df.isStreaming: - output_df = self._filter_duplicated_rows(output_df) - output_df.cache().count() + if self.deduplicate_rows: + output_df = self._filter_duplicated_rows(output_df) + if self.eager_evaluation: + output_df.cache().count() post_hook_df = self.run_post_hooks(output_df) diff --git a/butterfree/transform/feature_set.py b/butterfree/transform/feature_set.py index c2e40a498..469a353a8 100644 --- a/butterfree/transform/feature_set.py +++ b/butterfree/transform/feature_set.py @@ -97,6 +97,12 @@ class FeatureSet(HookableComponent): values over key columns and timestamp column, we do this in order to reduce our dataframe (regarding the number of rows). A detailed explation of this method can be found at filter_duplicated_rows docstring. + + The `eager_evaluation` param forces Spark to apply the currently + mapped changes to the DataFrame. When this parameter is set to + False, Spark follows its standard behaviour of lazy evaluation. + Lazy evaluation can improve Spark's performance as it allows + Spark to build the best version of the execution plan. """ def __init__( @@ -107,6 +113,8 @@ def __init__( keys: List[KeyFeature], timestamp: TimestampFeature, features: List[Feature], + deduplicate_rows: bool = True, + eager_evaluation: bool = True, ) -> None: super().__init__() self.name = name @@ -116,6 +124,8 @@ def __init__( self.timestamp = timestamp self.features = features self.incremental_strategy = IncrementalStrategy(column=TIMESTAMP_COLUMN) + self.deduplicate_rows = deduplicate_rows + self.eager_evaluation = eager_evaluation @property def name(self) -> str: @@ -426,8 +436,10 @@ def construct( ).select(*self.columns) if not output_df.isStreaming: - output_df = self._filter_duplicated_rows(output_df) - output_df.cache().count() + if self.deduplicate_rows: + output_df = self._filter_duplicated_rows(output_df) + if self.eager_evaluation: + output_df.cache().count() output_df = self.incremental_strategy.filter_with_incremental_strategy( dataframe=output_df, start_date=start_date, end_date=end_date