Skip to content

Streams designer demo project using python machine learning operator and email operator

License

Notifications You must be signed in to change notification settings

ibm-cloud-streaming-retail-demo/wdp-streams-designer-ml-alert

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

22 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Introduction

The purpose of this project is to create a Streams Designer application that listens to the event stream generated by the project kafka-producer-for-simulated-data and apply a python machine learning model to score each purchase line item for the probability that the line item will be cancelled.

Dependencies

Prerequisites

Deploy

Build the machine learning model

In this step we run the notebook LogisticRegressionModel.ipynb. This notebook creates a very basic LogisticRegression model that classifies purchase line items for their likelihood to be cancelled.

Streams Designer requires the LogisicRegression model to be created with the same versions of python that are available in the Streams environment. To make setting up the environment easy, we use DSX. Import the notebook into DSX, change the variables to point to your COS S3 bucket where you want to upload the model to and then run all of the cells to build the model and deploy it to COS S3.

Deploy the Streams flow

In this section we walk step-by-step through setting up the Streams environment.

The overall streams flow looks like this:

Streams Designer Flow
  • Message Hub Our events arrive into our streaming application via IBM Message Hub
  • Python Machine Learning Each event is scored for their probability of being cancelled using a Logistic Regression machine learning model
  • Filter Events that have a probability <65% are ignored. The rest are sent to the SMTP endpoint.
  • SMTP The SMTP sends an email containing the event details.
  • COS S3 Events are also persisted to COS S3 for an audit trail and offline analysis

There are two ways you can deploy this flow. The first is to build the flow manually, dragging and dropping nodes onto the canvas. This process is documented below. The other is to import the retail_streaming.stp as a starting point, and populate the settings on each node with the service credentials from your IBM Cloud account. In order to do this, from your project click 'New Streams Flow' and select the 'From File' option. Drag and drop the .stp file into the form and populate the remaining details. In the Flow dashboard, click the edit button (pencil icon) on the top right, and then follow through the notifications to add the required parameters to each of the nodes.

Message Hub source setup

From the 'Sources' node group, drag and drop the Message Hub node onto the canvas. In the settings on the right hand side, click 'Add Connection'. If you are connection to a Message Hub instance provisioned in your IBM Cloud account, selecting the instance will automatically populate form with the connection credentials. Else, select 'IBM Message Hub', and copy and paste the Connection Details from the 'Service Credentials' tab of the Message Hub instance you wish to connect to.

Select the Topic 'transactions_load' from the Topic dropdown. Click 'Edit Schema', then 'Detect Schema' to automatically generate the schema for the incoming data. Click 'Save' then 'Close'.

Python Machine Learning setup

From the 'Processing and Analytics' node group, drag and drop the 'Python Machine Learning' node onto the canvas. In the settings of this node under 'File Objects', enter a File Reference Name e.g. 'logistic_model', add a connection to your IBM COS, and the file path to the Machine Learning model you built in the 'Build the machine learning model' section. You can enter the path manually, or locate it in the GUI by clicking the 'Select Data Asset' button to the right of this text field.

Copy and paste the below code into the 'Code' section, selecting Python 3.5 as the language:

import sys
import os
import pickle
import numpy as np
import sklearn
import pandas as pd
from collections import OrderedDict

def process(event, state):
    
    transaction_id = event['TransactionID']
    tx_datetime = event['InvoiceDate']
    
    customer_id = event['CustomerID']
    quantity = event['Quantity']
    price = event['UnitPrice']
    
    data = np.array([[price, quantity, customer_id]])
    
    predict_prob = state['logistic_model'].predict_proba(data)
    
    prob_cancelled = predict_prob[0][1]
    
    prediction =  OrderedDict([    
            ('transaction_id', transaction_id),
            ('invoice_date',   tx_datetime),
            ('prob_cancelled', prob_cancelled)
            ])
            
    return prediction


def load_logistic_model(state, path_logistic_model):
	state['logistic_model'] = pickle.load(open(path_logistic_model, 'rb'))

Still in Python Machine Learning node settings, click 'Edit Output Schema'. Enter enter three attributes, as below:

Attribute name Type
transaction_id Number
invoice_date Number
prob_cancelled Number

Click Save, then Close.

Finally, connect the two nodes by drawing a line from one to the other on the canvas.

Connecting nodes on the canvas

Filter setup

From the 'Processing and Analytics' node group, drag and drop the 'Filter' node onto the canvas. In the settings of the node, paste the below code into the condition expression:

prob_cancelled >= 0.65

You can learn more about filter expressions in [the documentation].(https://dataplatform.ibm.com/docs/content/streaming-pipelines/filter.html)

Connect the filter node to the Python Machine Learning node.

SMTP setup

NB: E-mail alerts require credentials to an SMTP server.

From the 'Alerts' node group, drag and drop the 'E-mail' node onto the canvas. In the settings of the node, enter the details of your SMTP server (this could be via the SendGrid service on IBM Cloud, or your own SMTP server.)

Example Subject:
ACTION REQUIRED : Risky Transaction Identified (ID: {{transaction_id}})

Example Body:
Hello,
The Risky Transaction Service has identified the below risky transaction, details below:
Transaction ID : {{transaction_id}}
Invoice Date : {{invoice_date}}
Please action ASAP.

DO NOT REPLY TO THIS EMAIL

Connect the E-mail node to the filter node.

COS S3 setup

Bucket setup

Navigate to your IBM Cloud Dashboard, and under Services click on your Cloud Object Store. Create two new buckets, streaming-data-output and streaming-risky-transactions.

Streaming Flows node setup

From the 'Targets' node group, drag and drop two 'Cloud Object Store' nodes onto the canvas. For each node, add the connection details to your COS instance, and a file path, referencing the two buckets that you just created (streaming-data-output bucket for Python Machine Learning output, and streaming-risky-transactions for filtered output.) Set the format and the file creation variables (e.g. CSV, 10k events per file). Connect the COS nodes to the respective nodes in the flow.

Deploying and Monitoring the Streams flow

Deploy the flow by clicking the 'Save and Run the Streams Flow' button in the toolbar (▶). If you are prompted to start the Streaming Analytics service, click yes. You will now see the flow deploy and run, similar to the gif [above].(https://github.com/ibm-cloud-streaming-retail-demo/wdp-streams-designer-ml-alert#deploy-the-streams-flow) You can see the rate of incoming data in the available graphs. To view a sample of incoming data at any point during the flow, click the pipe between two nodes.

Simulating a risky transaction

The kafka-producer-for-simulated-data allows you to generate a risky transaction to demo the machine learning model being triggered. To generate a risky transaction, open a new browser tab and to go http://your-app-url.mybluemix.net/simulate_risky_transaction. You will see the details of the risky transaction in the new tab. Switch back to the streaming flow, and you will see the transaction flow through (hover over the filter or e-mail node to see the total throughput).

About

Streams designer demo project using python machine learning operator and email operator

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published