The purpose of this project is to create a Streams Designer application that listens to the event stream generated by the project kafka-producer-for-simulated-data and apply a python machine learning model to score each purchase line item for the probability that the line item will be cancelled.
- This project has a dependency on the kafka-producer-for-simulated-data project being deployed.
- You have followed the instructions to deploy the project kafka-producer-for-simulated-data
- You have an IBM Cloud account
- Recommended learning:
In this step we run the notebook LogisticRegressionModel.ipynb. This notebook creates a very basic LogisticRegression model that classifies purchase line items for their likelihood to be cancelled.
Streams Designer requires the LogisicRegression model to be created with the same versions of python that are available in the Streams environment. To make setting up the environment easy, we use DSX. Import the notebook into DSX, change the variables to point to your COS S3 bucket where you want to upload the model to and then run all of the cells to build the model and deploy it to COS S3.
In this section we walk step-by-step through setting up the Streams environment.
The overall streams flow looks like this:
- Message Hub Our events arrive into our streaming application via IBM Message Hub
- Python Machine Learning Each event is scored for their probability of being cancelled using a Logistic Regression machine learning model
- Filter Events that have a probability <65% are ignored. The rest are sent to the SMTP endpoint.
- SMTP The SMTP sends an email containing the event details.
- COS S3 Events are also persisted to COS S3 for an audit trail and offline analysis
There are two ways you can deploy this flow. The first is to build the flow manually, dragging and dropping nodes onto the canvas. This process is documented below. The other is to import the retail_streaming.stp as a starting point, and populate the settings on each node with the service credentials from your IBM Cloud account. In order to do this, from your project click 'New Streams Flow' and select the 'From File' option. Drag and drop the .stp file into the form and populate the remaining details. In the Flow dashboard, click the edit button (pencil icon) on the top right, and then follow through the notifications to add the required parameters to each of the nodes.
From the 'Sources' node group, drag and drop the Message Hub node onto the canvas. In the settings on the right hand side, click 'Add Connection'. If you are connection to a Message Hub instance provisioned in your IBM Cloud account, selecting the instance will automatically populate form with the connection credentials. Else, select 'IBM Message Hub', and copy and paste the Connection Details from the 'Service Credentials' tab of the Message Hub instance you wish to connect to.
Select the Topic 'transactions_load' from the Topic dropdown. Click 'Edit Schema', then 'Detect Schema' to automatically generate the schema for the incoming data. Click 'Save' then 'Close'.
From the 'Processing and Analytics' node group, drag and drop the 'Python Machine Learning' node onto the canvas. In the settings of this node under 'File Objects', enter a File Reference Name e.g. 'logistic_model', add a connection to your IBM COS, and the file path to the Machine Learning model you built in the 'Build the machine learning model' section. You can enter the path manually, or locate it in the GUI by clicking the 'Select Data Asset' button to the right of this text field.
Copy and paste the below code into the 'Code' section, selecting Python 3.5 as the language:
import sys
import os
import pickle
import numpy as np
import sklearn
import pandas as pd
from collections import OrderedDict
def process(event, state):
transaction_id = event['TransactionID']
tx_datetime = event['InvoiceDate']
customer_id = event['CustomerID']
quantity = event['Quantity']
price = event['UnitPrice']
data = np.array([[price, quantity, customer_id]])
predict_prob = state['logistic_model'].predict_proba(data)
prob_cancelled = predict_prob[0][1]
prediction = OrderedDict([
('transaction_id', transaction_id),
('invoice_date', tx_datetime),
('prob_cancelled', prob_cancelled)
])
return prediction
def load_logistic_model(state, path_logistic_model):
state['logistic_model'] = pickle.load(open(path_logistic_model, 'rb'))
Still in Python Machine Learning node settings, click 'Edit Output Schema'. Enter enter three attributes, as below:
Attribute name | Type |
---|---|
transaction_id | Number |
invoice_date | Number |
prob_cancelled | Number |
Click Save, then Close.
Finally, connect the two nodes by drawing a line from one to the other on the canvas.
From the 'Processing and Analytics' node group, drag and drop the 'Filter' node onto the canvas. In the settings of the node, paste the below code into the condition expression:
prob_cancelled >= 0.65
You can learn more about filter expressions in [the documentation].(https://dataplatform.ibm.com/docs/content/streaming-pipelines/filter.html)
Connect the filter node to the Python Machine Learning node.
NB: E-mail alerts require credentials to an SMTP server.
From the 'Alerts' node group, drag and drop the 'E-mail' node onto the canvas. In the settings of the node, enter the details of your SMTP server (this could be via the SendGrid service on IBM Cloud, or your own SMTP server.)
Example Subject:
ACTION REQUIRED : Risky Transaction Identified (ID: {{transaction_id}})
Example Body:
Hello,
The Risky Transaction Service has identified the below risky transaction, details below:
Transaction ID : {{transaction_id}}
Invoice Date : {{invoice_date}}
Please action ASAP.
DO NOT REPLY TO THIS EMAIL
Connect the E-mail node to the filter node.
Navigate to your IBM Cloud Dashboard, and under Services click on your Cloud Object Store. Create two new buckets, streaming-data-output
and streaming-risky-transactions
.
From the 'Targets' node group, drag and drop two 'Cloud Object Store' nodes onto the canvas. For each node, add the connection details to your COS instance, and a file path, referencing the two buckets that you just created (streaming-data-output
bucket for Python Machine Learning output, and streaming-risky-transactions
for filtered output.) Set the format and the file creation variables (e.g. CSV, 10k events per file). Connect the COS nodes to the respective nodes in the flow.
Deploy the flow by clicking the 'Save and Run the Streams Flow' button in the toolbar (▶). If you are prompted to start the Streaming Analytics service, click yes. You will now see the flow deploy and run, similar to the gif [above].(https://github.com/ibm-cloud-streaming-retail-demo/wdp-streams-designer-ml-alert#deploy-the-streams-flow) You can see the rate of incoming data in the available graphs. To view a sample of incoming data at any point during the flow, click the pipe between two nodes.
The kafka-producer-for-simulated-data allows you to generate a risky transaction to demo the machine learning model being triggered. To generate a risky transaction, open a new browser tab and to go http://your-app-url.mybluemix.net/simulate_risky_transaction. You will see the details of the risky transaction in the new tab. Switch back to the streaming flow, and you will see the transaction flow through (hover over the filter or e-mail node to see the total throughput).