This is a example for building a scikit-learn-based machine learning pipeline trainer that can be run on AI Platform, which is built on top of the scikit-learn template. The pipeline can be trained locally or remotely on AI platform. The trained model can be further deployed on AI platform to serve online traffic. The entire pipeline includes three major components:
- Transformer: generate new features from raw_data with user defined logic (function).
- Pre-processer: handle typical standard pre-processing e.g. scaling, imputation, one-hot-encoding and etc.
- Estimator: exact machine learning model e.g. RandomForestClassifier.
Compared with the scikit-learn template, this example has the following additional feature:
- Support both Classification and Regression, which can be specified in the configuration
- Support serving for both JSON and List of Value formats
- Support additional custome transformation logics besides typical pre-processing provided by scikit-learn
Google Cloud tools used:
- Google Cloud Platform (GCP) lets you build and host applications and websites, store data, and analyze data on Google's scalable infrastructure.
- Cloud ML Engine is a managed service that enables you to easily build machine learning models that work on any type of data, of any size. This is now part of AI Platform.
- Google Cloud Storage (GCS) is a unified object storage for developers and enterprises, from live data serving to data analytics/ML to data archiving.
- Cloud SDK is a set of tools for Google Cloud Platform, which contains e.g. gcloud, gsutil, and bq command-line tools to interact with Google Cloud products and services.
- Google BigQuery A fast, highly scalable, cost-effective, and fully managed cloud data warehouse for analytics, with even built-in machine learning.
The overall flow of the pipeline can be summarized as follows and illustrated in the flowchart:
Raw Data -> Transformer -> Pre-processor -> Estimator -> Trained Pipeline
template
|__ config
|__ config.yaml # for running normal training job on AI Platform
|__ hptuning_config.yaml # for running hyperparameter tunning job on AI Platform
|__ scripts
|__ train.sh # convenience script for running machine learning training jobs
|__ deploy.sh # convenience script for deploying trained scikit-learn model
|__ predict.sh # convenience script for requesting online prediction
|__ predict.py # helper function for requesting online prediction using python
|__ trainer # trainer package
|__ util # utility functions
|__ utils.py # utility functions including e.g. loading data from bigquery and cloud storage
|__ preprocess_utils.py # utility functions for constructing preprocessing pipeline
|__ transform_utils.py # utility functions for constructing transform pipeline
|__ metadata.py # dataset metadata and feature columns definitions
|__ constants.py # constants used in the project
|__ model.py # pre-processing and machine learning model pipeline definition
|__ task.py # training job entry point, handling the parameters passed from command line
|__ transform_config.py # configuration for transform pipeline construction"
|__ predictor.py # define custom prediction behavior
|__ setup.py # specify necessary dependency for running job on AI Platform
|__ requirements.txt # specify necessary dependency, helper for setup environemnt for local development
Before you follow the instructions below to adapt the tempate to your machine learning job, you need a Google cloud project if you don't have one. You can find detailed instructions here.
-
Make sure the following API & Services are enabled.
- Cloud Storage
- Cloud Machine Learning Engine
- BigQuery API
- Cloud Build API (for CI/CD integration)
- Cloud Source Repositories API (for CI/CD integration)
-
Configure project id and bucket id as environment variable.
$ export PROJECT_ID=[your-google-project-id] $ export BUCKET_ID=[your-google-cloud-storage-bucket-name]
-
Set up a service account for calls to GCP APIs.
More information on setting up a service account can be found here.
metadata.py
is where the dataset's metadata is defined.
By default, the file is configured to train on the Census dataset, which can be found at
bigquery-public-data.ml_datasets.census_adult_income
.
# Usage: Modify below based on the dataset used.
CSV_COLUMNS = None # Schema of the data. Necessary for data stored in GCS
# In the following, I provided an example based on census dataset.
NUMERIC_FEATURES = [
'age',
'hours_per_week',
]
CATEGORICAL_FEATURES = [
'workclass',
'education',
'marital_status',
'occupation',
'relationship',
'race',
'sex',
'native_country'
]
FEATURE_COLUMNS = NUMERIC_FEATURES + CATEGORICAL_FEATURES
LABEL = 'income_bracket'
PROBLEM_TYPE = 'classification' # 'regression' or 'classification'
In most cases, only the following items need to be modified, in order to adapt to the target dataset.
- COLUMNS: the schema of ths data, only required for data stored in GCS
- NUMERIC_FEATURES: columns those will be treated as numerical features
- CATEGORICAL_FEATURES: columns those will be treated as categorical features
- LABEL: column that will be treated as label
transform_config.py
is where the logic of generating new features out of raw dataset is defined.
There are two parts need to be provided for each new feature generating logic:
- User defined function that handle the generation of new feature. There would be four cases in terms of the
combinations of the dimensions of input and output as listed below:
- ()->(): scalar to scalar
- (n) -> (): multi-inputs to scalar
- () -> (n): scalar to multi-outputs
- (n) -> (n): multi-inputs to multi-outputs
The example below takes in age
and converts it into age bucket, which is an example of "scalar to scalar" function.
def _age_class(age):
"""Example scalar processing function
Args:
age: (int), age in integer
Returns:
"""
if age < 10:
return 1
elif 10 <= age < 18:
return 2
elif 18 <= age < 30:
return 3
elif 30 <= age < 50:
return 4
else:
return 5
- An entry in
TRANSFORM_CONFIG
. After the user defined function is done, to incorporate the transformation into the entire pipeline, an additional entry need to be added toTRANSFORM_CONFIG
with- input_columns: name of columns needed for as inputs to the transform function
- process_functions: transform function
- output_columns: names assigned to the output columns, data type indicator (N: for numerical, C: for categorical)
The example below is the counter part of the user defined function in previous section.
# this is an example for generating new categorical feature using single
# column from the raw data
{
'input_columns': ['age'],
'process_function': _age_class,
'output_columns': [('age_class', constants.CATEGORICAL_INDICATOR)]
},
For more examples, please refer to the Appendix.
The files are located in config
:
config.yaml
: for running normal training job on AI Platform.hptuning_config.yaml
: for running hyperparameter tuning job on AI Platform.
The YAML files share some configuration parameters. In particular, runtimeVersion
and pythonVersion
should
correspond in both files. Note that both Python 2.7 and Python 3.5 are supported, but Python 3.5 is the recommended
one since Python 2.7 is deprecated soon.
trainingInput:
scaleTier: STANDARD_1 # Machine type
runtimeVersion: "1.13" # Scikit-learn version
# Note that both Python 2.7 and Python 3.5 are supported, but Python 3.5 is the
# recommended one since 2.7 is deprecated soon
pythonVersion: "3.5"
More information on supported runtime version can be found here.
You can run ML training jobs through the train.sh
Bash script.
bash scripts/train.sh [INPUT] [RUN_ENV] [RUN_TYPE] [EXTRA_TRAINER_ARGS]
- INPUT: Dataset to use for training and evaluation, which can be BigQuery table or a file (CSV).
BigQuery table should be specified as
PROJECT_ID.DATASET.TABLE_NAME
. - RUN_ENV: (Optional), whether to run
local
(on-prem) orremote
(GCP). Default value islocal
. - RUN_TYPE: (Optional), whether to run
train
orhptuning
. Default value istrain
. - EXTRA_TRAINER_ARGS: (Optional), additional arguments to pass to the trainer.
Note: Please make sure the REGION is set to a supported Cloud region for your project in train.sh
REGION=us-central1
The trained model can then be deployed to AI Platform for online serving using the deploy.sh
script.
bash scripts/deploy.sh [MODEL_NAME] [VERSION_NAME] [MODEL_DIR]
where:
- MODEL_NAME: Name of the model to be deployed.
- VERSION_NAME: Version of the model to be deployed.
- MODEL_DIR: Path to directory containing trained and exported scikit-learn model.
Note: Please make sure the following parameters are properly set in deploy.sh
REGION=us-central1
# The following two parameters should be aligned with those used during
# training job, i.e., specified in the yaml files under config/
RUN_TIME=1.13
# Note that both Python 2.7 and Python 3.5 are supported,
# but Python 3.5 is the recommended one since 2.7 is deprecated soon
PYTHON_VERSION=3.5
After the model is successfully deployed, you can send small samples of new data to the API associated with the model,
and it would return predictions in the response.
There are two helper scripts available, predict.sh
and predict.py
, which use gcloud and Python API for
requesting predictions respectively.
bash scripts/predict.sh [INPUT_DATA_FILE] [MODEL_NAME] [VERSION_NAME]
where:
- INPUT_DATA_FILE: Path to sample file contained data in line-delimited JSON format.
See
sample_data/sample_list.txt
orsample_data/sample_json.txt
for an example. More information can be found here. - MODEL_NAME: Name of the deployed model to use.
- VERSION_NAME: Version of the deployed model to use.
Note that two data formats are supported for online prediction:
- List of values:
[39,34," Private"," 9th"," Married-civ-spouse"," Other-service"," Wife"," Black"," Female"," United-States"]
- JSON:
{
"age": 39,
"hours_per_week": 34,
"workclass": " Private",
"education": " 9th",
"marital_status": " Married-civ-spouse",
"occupation": " Other-service",
"relationship": " Wife",
"race": " Black",
"sex": " Female",
"native_country": " United-States"
}
In this section, I have provided a complete example for iris dataset and demonstrate all four cases of user defined functions.
import numpy as np
def _numeric_square(num):
"""Example scalar processing function
Args:
num: (float)
Returns:
float
"""
return np.power(num, 2)
def _numeric_square_root(num):
"""Example scalar processing function
Args:
num: (float)
Returns:
float
"""
return np.sqrt(num)
def _numeric_sq_sr(num):
"""Example function that take scala and return an array
Args:
num: (float)
Returns:
numpy.array
"""
return np.array([_numeric_square(num), _numeric_square_root(num)])
def _area(args):
"""Examples function that take an array and return a scalar
Args:
args: (numpy.array), args[0] -> length, args[1] -> width
Returns:
float
"""
return args[0] * args[1]
def _area_class(args):
"""Examples function that take an array and return a scalar
Args:
args: (numpy.array), args[0] -> length, args[1] -> width
Returns:
int
"""
area = args[0] * args[1]
cl = 1 if area > 2 else 0
return cl
def _area_and_class(args):
"""Examples function that take an array and return an array
Args:
args: (numpy.array), args[0] -> length, args[1] -> width
Returns:
numpy.array
"""
area = args[0] * args[1]
cl = 1 if area > 2 else 0
return np.array([area, cl])
TRANSFORM_CONFIG = [
# this is an example for pass through features,
# i.e., those doesn't need any processing
{
'input_columns': ['sepal_length', 'sepal_width', 'petal_length',
'petal_width'],
# the raw feature types are defined in the metadata,
# no need to do it here
'process_function': None,
'output_columns': ['sepal_length', 'sepal_width', 'petal_length',
'petal_width']
},
# this is an example for generating new numerical feature using a single
# column from the raw data
{
'input_columns': ['sepal_length'],
'process_function': _numeric_square,
'output_columns': [('sepal_length_sq', constants.NUMERICAL_INDICATOR)]
# 'N' stands for numerical feature
},
# this is an example for generating new numerical feature using a single
# column from the raw data
{
'input_columns': ['sepal_width'],
'process_function': _numeric_square_root,
'output_columns': [('sepal_width_sr', constants.NUMERICAL_INDICATOR)]
},
# this is an example for generating new numerical feature using multiple
# columns from the raw data
{
'input_columns': ['petal_length', 'petal_width'],
'process_function': _area,
'output_columns': [('petal_area', constants.NUMERICAL_INDICATOR)]
},
# this is an example for generating new categorical feature using multiple
# columns from the raw data
{
'input_columns': ['petal_length', 'petal_width'],
'process_function': _area_class,
'output_columns': [('petal_area_cl', constants.CATEGORICAL_INDICATOR)]
# 'C' stands for categorical feature
},
# this is an example for generating multiple features using a single column
# from the raw data
{
'input_columns': ['petal_length'],
'process_function': _numeric_sq_sr,
'output_columns': [('petal_length_sq', constants.NUMERICAL_INDICATOR),
('petal_width_sr', constants.NUMERICAL_INDICATOR)]
},
# this is an example for generating multiple features using multiple columns
# from the raw data
{
'input_columns': ['sepal_length', 'sepal_width'],
'process_function': _area_and_class,
'output_columns': [('sepal_area', constants.NUMERICAL_INDICATOR),
('sepal_area_cl', constants.CATEGORICAL_INDICATOR)]
},
]