Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(MLOP-1985): optional params #347

Merged
merged 4 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions butterfree/extract/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,22 @@ class Source(HookableComponent):
temporary views regarding each reader and, after, will run the
desired query and return a dataframe.

The `eager_evaluation` param forces Spark to apply the currently
mapped changes to the DataFrame. When this parameter is set to
False, Spark follows its standard behaviour of lazy evaluation.
Lazy evaluation can improve Spark's performance as it allows
Spark to build the best version of the execution plan.

"""

def __init__(self, readers: List[Reader], query: str) -> None:
def __init__(
self, readers: List[Reader], query: str, eager_evaluation: bool = True,
) -> None:
super().__init__()
self.enable_pre_hooks = False
self.readers = readers
self.query = query
self.eager_evaluation = eager_evaluation

def construct(
self, client: SparkClient, start_date: str = None, end_date: str = None
Expand Down Expand Up @@ -87,7 +96,7 @@ def construct(

dataframe = client.sql(self.query)

if not dataframe.isStreaming:
if not dataframe.isStreaming and self.eager_evaluation:
dataframe.cache().count()

post_hook_df = self.run_post_hooks(dataframe)
Expand Down
17 changes: 14 additions & 3 deletions butterfree/transform/aggregated_feature_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,23 @@ def __init__(
keys: List[KeyFeature],
timestamp: TimestampFeature,
features: List[Feature],
deduplicate_rows: bool = True,
eager_evaluation: bool = True,
):
self._windows: List[Any] = []
self._pivot_column: Optional[str] = None
self._pivot_values: Optional[List[Union[bool, float, int, str]]] = []
self._distinct_subset: List[Any] = []
self._distinct_keep: Optional[str] = None
super(AggregatedFeatureSet, self).__init__(
name, entity, description, keys, timestamp, features,
name,
entity,
description,
keys,
timestamp,
features,
deduplicate_rows,
eager_evaluation,
)

@property
Expand Down Expand Up @@ -626,8 +635,10 @@ def construct(
float("nan"), None
)
if not output_df.isStreaming:
output_df = self._filter_duplicated_rows(output_df)
output_df.cache().count()
if self.deduplicate_rows:
output_df = self._filter_duplicated_rows(output_df)
if self.eager_evaluation:
output_df.cache().count()

post_hook_df = self.run_post_hooks(output_df)

Expand Down
16 changes: 14 additions & 2 deletions butterfree/transform/feature_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ class FeatureSet(HookableComponent):
values over key columns and timestamp column, we do this in order to reduce
our dataframe (regarding the number of rows). A detailed explation of this
method can be found at filter_duplicated_rows docstring.

The `eager_evaluation` param forces Spark to apply the currently
mapped changes to the DataFrame. When this parameter is set to
False, Spark follows its standard behaviour of lazy evaluation.
Lazy evaluation can improve Spark's performance as it allows
Spark to build the best version of the execution plan.
"""

def __init__(
Expand All @@ -107,6 +113,8 @@ def __init__(
keys: List[KeyFeature],
timestamp: TimestampFeature,
features: List[Feature],
deduplicate_rows: bool = True,
eager_evaluation: bool = True,
) -> None:
super().__init__()
self.name = name
Expand All @@ -116,6 +124,8 @@ def __init__(
self.timestamp = timestamp
self.features = features
self.incremental_strategy = IncrementalStrategy(column=TIMESTAMP_COLUMN)
self.deduplicate_rows = deduplicate_rows
self.eager_evaluation = eager_evaluation

@property
def name(self) -> str:
Expand Down Expand Up @@ -426,8 +436,10 @@ def construct(
).select(*self.columns)

if not output_df.isStreaming:
output_df = self._filter_duplicated_rows(output_df)
output_df.cache().count()
if self.deduplicate_rows:
output_df = self._filter_duplicated_rows(output_df)
if self.eager_evaluation:
output_df.cache().count()

output_df = self.incremental_strategy.filter_with_incremental_strategy(
dataframe=output_df, start_date=start_date, end_date=end_date
Expand Down
Loading