-
Notifications
You must be signed in to change notification settings - Fork 0
DataStream API
In this session, you will learn how to implement a stateful stream processing job with Flink's DataStream API.
We mix a lecture-style presentation with interactive coding sessions in which you will extend a basic DataStream job with additional functionality. The initial code is provided as a Maven project and available in the fraud-detection-job
folder of the workshop material. The material is based on the DataStream API code walkthrough of the Flink documentation.
You should have prepared the tutorial environment at this point.
If you haven't done this yet, please do it now by following the preparation instructions.
The presentation slides are available online.
In this session, we work with a DataStream job that aims to detect fraudulent credit card transactions. The job is (mainly) implemented in two Java classes, FraudDetectionJob
and FraudDetector
. Both classes are briefly discussed in the following.
The FraudDetectionJob class defines the data flow of the fraud detection job.
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector");
alerts
.addSink(new AlertStdOutSink())
.name("write-alerts");
env.execute("Fraud Detection");
}
}
The job starts with a data source that provides credit card transaction events. Usually, a data source would ingest the events from some kind of external system, such as Apache Kafka, Kinesis, or Apache Pulsar. Our demo job uses a data generator to simplify the setup.
The DataStream<Transaction>
that is emitted by the data source is partitioned on the account id of the transaction with a keyBy()
transformation. This is done to ensure that all transactions of the same account are processed by the same parallel instance of the subsequent function. This function is defined by the process()
method and provided by the FraudDetector
class. The FraudDetector
function transforms the DataStream<Transaction>
into a DataStream<Alert>
that contains all alerts that are generated for the stream of transaction events.
The DataStream<Alert>
is finally emitted via the AlertStdOutSink
, which simply prints the alerts to the standard out. Usually, the alerts would be emitted to an external system such as Apache Kafka, Kinesis, or Apache Pulsar.
Finally, the execution of the job is triggered by with the execute()
call.
The FraudDetector
class implements the actual business logic to generate alerts from transaction events.
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final long serialVersionUID = 1L;
private static final double SMALL_AMOUNT = 1.00;
private static final double LARGE_AMOUNT = 500.00;
private static final long ONE_MINUTE = 60 * 1000;
@Override
public void processElement(
Transaction transaction,
Context context,
Collector<Alert> collector) throws Exception {
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
}
The function extends KeyedProcessFunction
and must be applied on a KeyedStream
which is created with the keyBy()
transformation. For each transaction event, the processElement()
method of the function is called. All transactions with the same key are processed by the same instance of the FraudDetector
class.
The logic of the FraudDetector
function is very simple. It generates one alert for each incoming transaction. In the course of this session, we will improve the logic of the function and implement more meaningful business logic.
Improve the FraudDetector
function to emit an Alert
for every transaction of more than $500 that follows a transaction of less than $1. No other transaction should have happened between the large and the small transaction.
In order to implement the logic, the FraudDetector
function needs to "remember" whether the last transaction of an account was a small transaction (less than $1) or not. Hence, the function requires state.
Flink provides keyed state to store and access state per key. Keyed state is also checkpointed and restored in case of a failure. For our use case, we can use Flink's ValueState
. ValueState<X>
is a simple wrapper and stores a single value of an arbitrary data type X
. It provides the following methods:
-
X value()
gets the value of the state or null if state is not set -
void update(X)
sets the value of the state -
void clear()
clears the state
Before you can use keyed state, you need to register it with Flink's environment. This is done with a state descriptor via the RuntimeContext
:
public class MyFunc extends KeyedProcessFunction<Long, X, Y> {
private ValueState<X> lastVal;
// open() is called before the first event is processed
@Override
public void open(Configuration conf) {
lastVal = getRuntimeContext().getState(new ValueStateDescriptor<>("lastVal", X.class));
}
@Override
public void processElement(X val, Context ctx, Collector<Y> out) {
X lastValue = lastVal.value();
// do something
lastVal.update(val);
}
}
To implement the requested logic, we need a simple boolean flag (ValueState<Boolean>
).
Improve the FraudDetector
function to emit an Alert
for every transaction of more than $500 that follows a transaction of less than $1 that happened at most one minute earlier. No other transaction should have happened between the large and small transaction.
In order to implement the requested logic, we need to a way to access information about time. A ProcessFunction provides a TimerService
via its context objects. Using the TimerService
you can
- retrieve the current event-time and processing-time
- set a timer for some future event-time or processing-time
- delete a timer before it triggers
When a timer triggers, it invokes the onTimer()
callback method. To implement the logic that is executed when a timer fires, you need to override this method.
The following code snippet shows how to use timers.
public class MyFunc extends KeyedProcessFunction<Long, X, Y> {
@Override
public void processElement(X val, Context ctx, Collector<Y> out) {
long curTime = context.timerService().currentProcessingTime();
long inTenMins = curTime + (10 * 60 * 1000);
context.timerService().registerProcessingTimeTimer(inTenMins);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Y> out) {
// do something 10 mins after timer was set
}
}
Note: In order to be able to delete a timer, you need to remember the time for which the timer was registered. This should also be done via keyed state.
Apache Flink, Flink®, Apache®, the squirrel logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.