-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathtrain_workflow.py
76 lines (67 loc) · 2.31 KB
/
train_workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import mlrun
from kfp import dsl
@dsl.pipeline(name="train_workflow")
def pipeline(
dataset: str = "https://s3.us-east-1.wasabisys.com/iguazio/data/nyc-taxi/train.csv",
project_name: str = None,
):
# Get our project object:
project = mlrun.get_current_project()
# Dataset Preparation:
prepare_dataset_run = mlrun.run_function(
function="data-prep",
name="data-prep",
inputs={"dataset": dataset},
outputs=["train_dataset", "test_dataset", "label"],
auto_build=True,
)
# Training
training_run = mlrun.run_function(
function="trainer",
name="trainer",
inputs={"train_set": prepare_dataset_run.outputs["train_dataset"]},
# hyperparams={
# "boosting_type": ["gbdt"],
# "subsample": [0.2, 0.5, 0.8],
# "min_split_gain": [0.2, 0.5, 0.7],
# "min_child_samples": [5, 10, 15],
# },
selector="min.mean_squared_error",
outputs=["model"],
auto_build=True,
)
# Evaluating
mlrun.run_function(
function="evaluate",
name="evaluate",
handler="evaluate",
inputs={"dataset": prepare_dataset_run.outputs["test_dataset"]},
params={
"model": training_run.outputs["model"],
"label_columns": "fare_amount",
},
)
# Get the function:
serving_function = project.get_function("serving")
serving_function.spec.graph["predict_fare"].class_args["model_path"] = str(
training_run.outputs["model"]
)
# Enable model monitoring
image = "quay.io/eyaligu/mlrun-api:nyc-demo"
tracking_policy = {'default_batch_intervals': "0 */2 * * *", 'stream_image': image, 'default_batch_image': image}
serving_function.set_tracking(tracking_policy=tracking_policy)
serving_function.spec.build.image = image
serving_function.spec.image = image
# Deploy the serving function:
deploy_return = project.deploy_function("serving").after(training_run)
# Model server tester
mlrun.run_function(
function="server_tester",
name="server_tester",
inputs={"dataset": dataset},
params={
"label_column": "fare_amount",
"endpoint": deploy_return.outputs["endpoint"],
},
auto_build=True,
).after(deploy_return)