Skip to content

Commit

Permalink
feat(MLOP-1985): optional params (#347)
Browse files Browse the repository at this point in the history
* feat: optional params
  • Loading branch information
ralphrass committed Nov 14, 2023
1 parent 99791c9 commit 2c1f125
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 7 deletions.
13 changes: 11 additions & 2 deletions butterfree/extract/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,22 @@ class Source(HookableComponent):
temporary views regarding each reader and, after, will run the
desired query and return a dataframe.
The `eager_evaluation` param forces Spark to apply the currently
mapped changes to the DataFrame. When this parameter is set to
False, Spark follows its standard behaviour of lazy evaluation.
Lazy evaluation can improve Spark's performance as it allows
Spark to build the best version of the execution plan.
"""

def __init__(self, readers: List[Reader], query: str) -> None:
def __init__(
self, readers: List[Reader], query: str, eager_evaluation: bool = True,
) -> None:
super().__init__()
self.enable_pre_hooks = False
self.readers = readers
self.query = query
self.eager_evaluation = eager_evaluation

def construct(
self, client: SparkClient, start_date: str = None, end_date: str = None
Expand Down Expand Up @@ -87,7 +96,7 @@ def construct(

dataframe = client.sql(self.query)

if not dataframe.isStreaming:
if not dataframe.isStreaming and self.eager_evaluation:
dataframe.cache().count()

post_hook_df = self.run_post_hooks(dataframe)
Expand Down
17 changes: 14 additions & 3 deletions butterfree/transform/aggregated_feature_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,23 @@ def __init__(
keys: List[KeyFeature],
timestamp: TimestampFeature,
features: List[Feature],
deduplicate_rows: bool = True,
eager_evaluation: bool = True,
):
self._windows: List[Any] = []
self._pivot_column: Optional[str] = None
self._pivot_values: Optional[List[Union[bool, float, int, str]]] = []
self._distinct_subset: List[Any] = []
self._distinct_keep: Optional[str] = None
super(AggregatedFeatureSet, self).__init__(
name, entity, description, keys, timestamp, features,
name,
entity,
description,
keys,
timestamp,
features,
deduplicate_rows,
eager_evaluation,
)

@property
Expand Down Expand Up @@ -626,8 +635,10 @@ def construct(
float("nan"), None
)
if not output_df.isStreaming:
output_df = self._filter_duplicated_rows(output_df)
output_df.cache().count()
if self.deduplicate_rows:
output_df = self._filter_duplicated_rows(output_df)
if self.eager_evaluation:
output_df.cache().count()

post_hook_df = self.run_post_hooks(output_df)

Expand Down
16 changes: 14 additions & 2 deletions butterfree/transform/feature_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ class FeatureSet(HookableComponent):
values over key columns and timestamp column, we do this in order to reduce
our dataframe (regarding the number of rows). A detailed explation of this
method can be found at filter_duplicated_rows docstring.
The `eager_evaluation` param forces Spark to apply the currently
mapped changes to the DataFrame. When this parameter is set to
False, Spark follows its standard behaviour of lazy evaluation.
Lazy evaluation can improve Spark's performance as it allows
Spark to build the best version of the execution plan.
"""

def __init__(
Expand All @@ -107,6 +113,8 @@ def __init__(
keys: List[KeyFeature],
timestamp: TimestampFeature,
features: List[Feature],
deduplicate_rows: bool = True,
eager_evaluation: bool = True,
) -> None:
super().__init__()
self.name = name
Expand All @@ -116,6 +124,8 @@ def __init__(
self.timestamp = timestamp
self.features = features
self.incremental_strategy = IncrementalStrategy(column=TIMESTAMP_COLUMN)
self.deduplicate_rows = deduplicate_rows
self.eager_evaluation = eager_evaluation

@property
def name(self) -> str:
Expand Down Expand Up @@ -426,8 +436,10 @@ def construct(
).select(*self.columns)

if not output_df.isStreaming:
output_df = self._filter_duplicated_rows(output_df)
output_df.cache().count()
if self.deduplicate_rows:
output_df = self._filter_duplicated_rows(output_df)
if self.eager_evaluation:
output_df.cache().count()

output_df = self.incremental_strategy.filter_with_incremental_strategy(
dataframe=output_df, start_date=start_date, end_date=end_date
Expand Down

0 comments on commit 2c1f125

Please sign in to comment.