Skip to content

Commit

Permalink
05 log & register fe model 100%
Browse files Browse the repository at this point in the history
  • Loading branch information
cremerf committed Nov 5, 2024
1 parent e0ec1a5 commit 74b0fbd
Showing 1 changed file with 143 additions and 66 deletions.
209 changes: 143 additions & 66 deletions notebooks/05.log_and_register_fe_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@

# Define table names and function name
feature_table_name = f"{catalog_name}.{schema_name}.hotel_features"
function_name = f"{catalog_name}.{schema_name}.calculate_total_guests"

loyalty_function_name = f"{catalog_name}.{schema_name}.calculate_loyalty_score"
#days_until_arrival_function_name = f"{catalog_name}.{schema_name}.calculate_days_until_arrival"
#holiday_season_function_name = f"{catalog_name}.{schema_name}.is_holiday_season"

# COMMAND ----------

Expand All @@ -65,78 +67,33 @@
test_set = spark.table(f"{catalog_name}.{schema_name}.test_set")


# COMMAND ----------

from pandas.api.types import CategoricalDtype
import numpy as np
import pandas as pd

df = spark.read.csv(
f'{ALLPATHS.data_volume}/hotel_reservations.csv',
header=True,
inferSchema=True).toPandas()

non_zero_values = df['avg_price_per_room'][(df['avg_price_per_room'] != 0) & (~df['avg_price_per_room'].isna())]
median_value = non_zero_values.median()
df['avg_price_per_room'] = df['avg_price_per_room'].replace(0, np.nan)
df['avg_price_per_room'] = df['avg_price_per_room'].fillna(median_value)

df[config.target] = df[config.target].map({'Not_Canceled': 0, 'Canceled': 1})

# Handle numeric features
num_features = config.num_features
for col in num_features:
df[col] = pd.to_numeric(df[col], errors='coerce')

# Handle categorical features
cat_features = config.cat_features
for cat_col in cat_features:
df[cat_col] = df[cat_col].astype('category')

for col in cat_features:
# Ensure the column is of type 'category'
if not isinstance(df[col].dtype, CategoricalDtype):
df[col] = df[col].astype('category')

# Add 'Unknown' to categories if not already present
if 'Unknown' not in df[col].cat.categories:
df[col] = df[col].cat.add_categories(['Unknown'])

# Fill NaN values with 'Unknown'
df[col] = df[col].fillna('Unknown')

# Extract target and relevant features
# Extract target and relevant features
id_field = config.id_field
target = config.target
relevant_columns = cat_features + num_features + [target] + [id_field]
df = df[relevant_columns]

# COMMAND ----------

target = config.target
target

# COMMAND ----------

spark.table(f"{catalog_name}.{schema_name}.train_set").toPandas().dtypes

# COMMAND ----------

# Create or replace the hotel_features table
spark.sql(f"""
CREATE OR REPLACE TABLE {feature_table_name}(
booking_id STRING NOT NULL,
Booking_ID STRING NOT NULL,
lead_time INT,
no_of_special_requests INT,
avg_price_per_room FLOAT);
""")

# COMMAND ----------

spark.sql(f"""ALTER TABLE {feature_table_name}
ADD CONSTRAINT hotel_pk PRIMARY KEY(booking_id);""")
ADD CONSTRAINT hotel_pk PRIMARY KEY(Booking_ID);""")

spark.sql(f"""ALTER TABLE {feature_table_name}
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);""")



# COMMAND ----------

# Insert data into the feature table from both train and test sets
spark.sql(f"""
INSERT INTO {feature_table_name}
Expand All @@ -152,15 +109,135 @@

# COMMAND ----------

# Insert data into the feature table from both train and test sets
spark.sql(f"""
INSERT INTO {feature_table_name}
SELECT
booking_id, lead_time, no_of_special_requests, avg_price_per_room
FROM {catalog_name}.{schema_name}.train_set
""")
# COMMAND ----------
spark.sql(f"""
INSERT INTO {feature_table_name}
SELECT
booking_id, lead_time, no_of_special_requests, avg_price_per_room
FROM {catalog_name}.{schema_name}.test_set""")
CREATE OR REPLACE FUNCTION {loyalty_function_name}(
no_of_previous_cancellations DOUBLE,
no_of_previous_bookings_not_canceled DOUBLE
)
RETURNS DOUBLE
LANGUAGE PYTHON AS
$$
# Define weightings
w1 = 1.5 # Weight the number of times a previous booking was NOT cancelled
w2 = 1.0 # Weight the number of times a previous booking was cancelled
# Calculate loyalty score
loyalty_score = (w1 * no_of_previous_bookings_not_canceled) - (w2 * no_of_previous_cancellations)
return loyalty_score
$$
""")

# COMMAND ----------

# Load training and test sets
train_set = spark.table(f"{catalog_name}.{schema_name}.train_set").drop(
"lead_time", "no_of_special_requests", "avg_price_per_room"
)
test_set = spark.table(f"{catalog_name}.{schema_name}.test_set").toPandas()

# Cast YearBuilt to int for the function input
# Cast relevant columns to double for the function input
train_set = train_set.withColumn(
"no_of_previous_bookings_not_canceled", train_set["no_of_previous_bookings_not_canceled"].cast("double")
)
train_set = train_set.withColumn(
"no_of_previous_cancellations", train_set["no_of_previous_cancellations"].cast("double")
)
train_set = train_set.withColumn("Booking_ID", train_set["Booking_ID"].cast("string"))

# COMMAND ----------

# Feature engineering setup
training_set = fe.create_training_set(
df=train_set,
label=target,
feature_lookups=[
FeatureLookup(
table_name=feature_table_name,
feature_names=["lead_time", "no_of_special_requests", "avg_price_per_room"],
lookup_key="Booking_ID",
),
FeatureFunction(
udf_name=loyalty_function_name,
output_name="loyalty_score",
input_bindings={
"no_of_previous_cancellations": "no_of_previous_cancellations",
"no_of_previous_bookings_not_canceled": "no_of_previous_bookings_not_canceled",
},
),
],
)

# COMMAND ----------

# Load feature-engineered DataFrame
training_df = training_set.load_df().toPandas()

# COMMAND ----------

test_set["loyalty_score"] = (test_set["no_of_previous_bookings_not_canceled"] * 1.5) + (
test_set["no_of_week_nights"] * 1.0
)

# COMMAND ----------

# Split features and target
X_train = training_df[num_features + cat_features + ["loyalty_score"]]
y_train = training_df[target]
X_test = test_set[num_features + cat_features + ["loyalty_score"]] # Ensure test set has the same features
y_test = test_set[target]

# COMMAND ----------

# Setup preprocessing and model pipeline
preprocessor = ColumnTransformer(
transformers=[("cat", OneHotEncoder(handle_unknown="ignore"), cat_features)], remainder="passthrough"
)
pipeline = Pipeline(steps=[("preprocessor", preprocessor), ("classifier", LGBMClassifier(**parameters))])

# COMMAND ----------

# Set and start MLflow experiment
mlflow.set_experiment(experiment_name="/Shared/hotel-reservations-cremerf")
git_sha = "e0ec1a5902a2d8c60c434766c0013d47c3879237"

with mlflow.start_run(tags={"branch": "week2", "git_sha": f"{git_sha}"}) as run:
run_id = run.info.run_id
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)

# Calculate and print metrics
accuracy = accuracy_score(y_test, y_pred)
report = classification_report(y_test, y_pred, output_dict=True)
print(f"Accuracy: {accuracy}")
print("Classification Report:")
print(report)

# Log model parameters, metrics, and model
mlflow.log_param("model_type", "LightGBM with preprocessing")
mlflow.log_params(parameters)
mlflow.log_metric("accuracy", accuracy)

# Log classification report metrics
for class_label, metrics in report.items():
if isinstance(metrics, dict):
mlflow.log_metric(f"precision_{class_label}", metrics["precision"])
mlflow.log_metric(f"recall_{class_label}", metrics["recall"])
mlflow.log_metric(f"f1-score_{class_label}", metrics["f1-score"])

# Log model with feature engineering
signature = infer_signature(model_input=X_train, model_output=y_pred)

fe.log_model(
model=pipeline,
flavor=mlflow.sklearn,
artifact_path="lightgbm-pipeline-model-fe",
training_set=training_set,
signature=signature,
)
mlflow.register_model(
model_uri=f"runs:/{run_id}/lightgbm-pipeline-model-fe",
name=f"{catalog_name}.{schema_name}.hotel_reservations_model_fe",
)

0 comments on commit 74b0fbd

Please sign in to comment.