Skip to content

Commit

Permalink
addressing review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
manu-sj committed Jan 31, 2025
1 parent 3aa510d commit a17901b
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 74 deletions.
84 changes: 54 additions & 30 deletions python/hsfs/core/vector_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,45 +652,77 @@ def assemble_feature_vector(
for fname in self._untransformed_feature_vector_col_name
]

def _check_feature_vectors_type_and_convert_to_dict(
def _validate_input_features(
self,
feature_vectors: Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame],
on_demand_features: bool = False,
) -> Tuple[Dict[str, Any], Literal["pandas", "polars", "list"]]:
) -> None:
"""
Function that converts an input feature vector into a list of dictionaries.
Validate if an feature-vector provided contain all required features.
# Arguments
feature_vectors: `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`. The feature vectors to be converted.
on_demand_features : `bool`. Specify if on-demand features provided in the input feature vector.
# Returns
`Tuple[Dict[str, Any], Literal["pandas", "polars", "list"]]`: A tuple that contains the feature vector as a dictionary and a string denoting the data type of the input feature vector.
"""

required_features = (
set(self._untransformed_feature_vector_col_name)
if not on_demand_features
else set(self._on_demand_feature_vector_col_name)
)
if isinstance(feature_vectors, pd.DataFrame):
return_type = "pandas"

if isinstance(
feature_vectors, pd.DataFrame or isinstance(feature_vectors, pl.DataFrame)
):
missing_features = required_features - set(feature_vectors.columns)
if missing_features:
raise exceptions.FeatureStoreException(
f"The input feature vector is missing the following required features: `{'`, `'.join(missing_features)}`. Please include these features in the feature vector."
)
else:
if isinstance(feature_vectors, list):
if feature_vectors and all(
isinstance(feature_vector, list)
for feature_vector in feature_vectors
):
if any(
len(feature_vector) != len(required_features)
for feature_vector in feature_vectors
):
raise exceptions.FeatureStoreException(
f"Input feature vector is missing required features. Please ensure the following features are included: `{'`, `'.join(required_features)}`."
)
else:
if len(feature_vectors) != len(required_features):
raise exceptions.FeatureStoreException(
f"Input feature vector is missing required features. Please ensure the following features are included: '{', '.join(required_features)}'."
)

def _check_feature_vectors_type_and_convert_to_dict(
self,
feature_vectors: Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame],
on_demand_features: bool = False,
) -> Tuple[Dict[str, Any], Literal["pandas", "polars", "list"]]:
"""
Function that converts an input feature vector into a list of dictionaries.
# Arguments
feature_vectors: `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`. The feature vectors to be converted.
on_demand_features : `bool`. Specify if on-demand features provided in the input feature vector.
# Returns
`Tuple[Dict[str, Any], Literal["pandas", "polars", "list"]]`: A tuple that contains the feature vector as a dictionary and a string denoting the data type of the input feature vector.
"""

if isinstance(feature_vectors, pd.DataFrame):
return_type = "pandas"
feature_vectors = feature_vectors.to_dict(orient="records")

elif HAS_POLARS and isinstance(feature_vectors, pl.DataFrame):
return_type = "polars"
feature_vectors = feature_vectors.to_pandas()
missing_features = required_features - set(feature_vectors.columns)
if missing_features:
raise exceptions.FeatureStoreException(
f"The input feature vector is missing the following required features: `{'`, `'.join(missing_features)}`. Please include these features in the feature vector."
)
feature_vectors = feature_vectors.to_dict(orient="records")
feature_vectors = feature_vectors.to_pandas().to_dict(orient="records")

elif isinstance(feature_vectors, list):
if feature_vectors and all(
Expand Down Expand Up @@ -870,10 +902,6 @@ def get_untransformed_features_map(
`Dict[str, Any]` : Dictionary mapping features name to values.
"""
if on_demand_features:
if len(features) != len(self._on_demand_feature_vector_col_name):
raise exceptions.FeatureStoreException(
f"Input feature vector is missing required features. Please ensure the following features are included: `{'`, `'.join(self._on_demand_feature_vector_col_name)}`."
)
return dict(
[
(fname, fvalue)
Expand All @@ -883,10 +911,6 @@ def get_untransformed_features_map(
]
)
else:
if len(features) != len(self._untransformed_feature_vector_col_name):
raise exceptions.FeatureStoreException(
f"Input feature vector is missing required features. Please ensure the following features are included: '{', '.join(self._untransformed_feature_vector_col_name)}'."
)
return dict(
[
(fname, fvalue)
Expand Down Expand Up @@ -1135,13 +1159,13 @@ def apply_on_demand_transformations(
else:
prefixed_feature = unprefixed_feature

# Check if the prefixed feature name is provided as a request parameter, if so then use it. Otherwise if the unprefixed feature name is provided as a request parameter and use it. Else fetch the feature from the retrieved feature vector.
if prefixed_feature in request_parameter.keys():
feature_value = request_parameter[prefixed_feature]
elif unprefixed_feature in request_parameter.keys():
feature_value = request_parameter[unprefixed_feature]
else:
feature_value = rows[prefixed_feature]
# Check if the prefixed feature name is provided as a request parameter, if so then use it. Otherwise if the unprefixed feature name is provided as a request parameter and use it. Else fetch the feature from the retrieved feature vector
feature_value = request_parameter.get(
prefixed_feature,
request_parameter.get(
unprefixed_feature, rows.get(prefixed_feature)
),
)

if (
tf.hopsworks_udf.execution_mode.get_current_execution_mode(
Expand Down
52 changes: 32 additions & 20 deletions python/hsfs/hopsworks_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,9 @@ def _parse_function_signature(source_code: str) -> Tuple[List[str], str, int, in
]

# Extracting keywords like `context`and `statistics` from the function signature.
if UDFKeyWords.STATISTICS.value in arg_list:
arg_list.remove(UDFKeyWords.STATISTICS.value)
if UDFKeyWords.CONTEXT.value in arg_list:
arg_list.remove(UDFKeyWords.CONTEXT.value)
keyword_to_remove = [keyword.value for keyword in UDFKeyWords]
arg_list = [arg for arg in arg_list if arg not in keyword_to_remove]

return arg_list, signature, signature_start_line, signature_end_line

@staticmethod
Expand Down Expand Up @@ -525,6 +524,28 @@ def _create_pandas_udf_return_schema_from_list(self) -> str:
else:
return self.return_types[0]

def _prepare_transformation_function_scope(self, **kwargs) -> Dict[str, Any]:
"""
Function that prepares the scope for the transformation function to be executed. This scope would include any variable that are required to be injected into the transformation function.
By default the output column names, transformation statistics and transformation context are injected into the scope if they are required by the transformation function. Any additional arguments can be passed as kwargs.
"""
# Shallow copy of scope performed because updating statistics argument of scope must not affect other instances.
scope = __import__("__main__").__dict__.copy()

# Adding variables required to be injected into the scope.
vaariable_to_inject = {
UDFKeyWords.STATISTICS.value: self.transformation_statistics,
UDFKeyWords.CONTEXT.value: self.transformation_context,
"_output_col_names": self.output_column_names,
}
vaariable_to_inject.update(**kwargs)

# Injecting variables that have a value into scope.
scope.update({k: v for k, v in vaariable_to_inject.items() if v is not None})

return scope

def python_udf_wrapper(self, rename_outputs) -> Callable:
"""
Function that creates a dynamic wrapper function for the defined udf. The wrapper function would be used to specify column names, in spark engines and to localize timezones.
Expand Down Expand Up @@ -587,13 +608,9 @@ def python_udf_wrapper(self, rename_outputs) -> Callable:
code += " return transformed_features"

# Inject required parameter to scope
scope = __import__("__main__").__dict__.copy()
if self.transformation_statistics is not None:
scope.update({UDFKeyWords.STATISTICS.value: self.transformation_statistics})
if self.transformation_context:
scope.update({UDFKeyWords.CONTEXT.value: self.transformation_context})
scope.update({"_output_col_names": self.output_column_names})
scope.update({"_date_time_output_index": date_time_output_index})
scope = self._prepare_transformation_function_scope(
_date_time_output_index=date_time_output_index
)

# executing code
exec(code, scope)
Expand Down Expand Up @@ -665,16 +682,11 @@ def renaming_wrapper(*args):
return df"""
)

# injecting variables into scope used to execute wrapper function.
# Inject required parameter to scope
scope = self._prepare_transformation_function_scope(
_date_time_output_columns=date_time_output_columns
)

# Shallow copy of scope performed because updating statistics argument of scope must not affect other instances.
scope = __import__("__main__").__dict__.copy()
if self.transformation_statistics:
scope.update({UDFKeyWords.STATISTICS.value: self.transformation_statistics})
if self.transformation_context:
scope.update({UDFKeyWords.CONTEXT.value: self.transformation_context})
scope.update({"_output_col_names": self.output_column_names})
scope.update({"_date_time_output_columns": date_time_output_columns})
# executing code
exec(code, scope)

Expand Down
28 changes: 14 additions & 14 deletions python/hsfs/transformation_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,21 +274,21 @@ def _get_output_column_names(self) -> str:
else:
if self.transformation_type == TransformationType.MODEL_DEPENDENT:
_BASE_COLUMN_NAME = f'{self.__hopsworks_udf.function_name}_{"_".join(self.__hopsworks_udf.transformation_features)}_'
if len(self.__hopsworks_udf.return_types) > 1:
output_col_names = [
f"{_BASE_COLUMN_NAME}{i}"
for i in range(len(self.__hopsworks_udf.return_types))
]
else:
output_col_names = [f"{_BASE_COLUMN_NAME}"]
elif self.transformation_type == TransformationType.ON_DEMAND:
if len(self.__hopsworks_udf.return_types) > 1:
output_col_names = [
f"{self.__hopsworks_udf.function_name}_{i}"
for i in range(0, len(self.__hopsworks_udf.return_types))
]
else:
output_col_names = [self.__hopsworks_udf.function_name]
_BASE_COLUMN_NAME = (
self.__hopsworks_udf.function_name
if len(self.__hopsworks_udf.return_types) == 1
else f"{self.__hopsworks_udf.function_name}_"
)

output_col_names = (
[
f"{_BASE_COLUMN_NAME}{i}"
for i in range(len(self.__hopsworks_udf.return_types))
]
if len(self.__hopsworks_udf.return_types) > 1
else [_BASE_COLUMN_NAME]
)

if any(
len(output_col_name) > FEATURES.MAX_LENGTH_NAME
Expand Down
20 changes: 10 additions & 10 deletions python/tests/engine/test_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -4641,7 +4641,7 @@ def plus_one(col1):
"col_2": [True, False],
"plus_one_col_0_": [2, 3],
}
) # todo why it doesnt return int?
)

expected_spark_df = spark_engine._spark_session.createDataFrame(expected_df)

Expand Down Expand Up @@ -4702,7 +4702,7 @@ def plus_one(col1):
"col_2": [True, False],
"plus_one_col_0_": [2, 3],
}
) # todo why it doesnt return int?
)

expected_spark_df = spark_engine._spark_session.createDataFrame(expected_df)

Expand Down Expand Up @@ -4763,7 +4763,7 @@ def plus_one(col1):
"col_2": [True, False],
"plus_one_col_0_": [2, 3],
}
) # todo why it doesnt return int?
)

expected_spark_df = spark_engine._spark_session.createDataFrame(expected_df)

Expand Down Expand Up @@ -4827,7 +4827,7 @@ def plus_one(col1, context):
"col_2": [True, False],
"plus_one_col_0_": [21, 22],
}
) # todo why it doesnt return int?
)

expected_spark_df = spark_engine._spark_session.createDataFrame(expected_df)

Expand Down Expand Up @@ -4892,7 +4892,7 @@ def plus_two(col1):
"plus_two_col_0_0": [2, 3],
"plus_two_col_0_1": [3, 4],
}
) # todo why it doesnt return int?
)

expected_spark_df = spark_engine._spark_session.createDataFrame(expected_df)

Expand Down Expand Up @@ -4956,7 +4956,7 @@ def plus_two(col1):
"plus_two_col_0_0": [2, 3],
"plus_two_col_0_1": [3, 4],
}
) # todo why it doesnt return int?
)

expected_spark_df = spark_engine._spark_session.createDataFrame(expected_df)

Expand Down Expand Up @@ -5020,7 +5020,7 @@ def plus_two(col1):
"plus_two_col_0_0": [2, 3],
"plus_two_col_0_1": [3, 4],
}
) # todo why it doesnt return int?
)

expected_spark_df = spark_engine._spark_session.createDataFrame(expected_df)

Expand Down Expand Up @@ -5470,7 +5470,7 @@ def test(col1, col2):
"test_col_0_col_2_0": [2, 3],
"test_col_0_col_2_1": [12, 13],
}
) # todo why it doesnt return int?
)

expected_spark_df = spark_engine._spark_session.createDataFrame(expected_df)

Expand Down Expand Up @@ -5533,7 +5533,7 @@ def test(col1, col2):
"test_col_0_col_2_0": [2, 3],
"test_col_0_col_2_1": [12, 13],
}
) # todo why it doesnt return int?
)

expected_spark_df = spark_engine._spark_session.createDataFrame(expected_df)

Expand Down Expand Up @@ -5596,7 +5596,7 @@ def test(col1, col2):
"test_col_0_col_2_0": [2, 3],
"test_col_0_col_2_1": [12, 13],
}
) # todo why it doesnt return int?
)

expected_spark_df = spark_engine._spark_session.createDataFrame(expected_df)

Expand Down
Loading

0 comments on commit a17901b

Please sign in to comment.