Skip to content

DataStream API

Fabian Hueske edited this page Oct 21, 2019 · 5 revisions

Implementing a DataStream Job

In this session, you will learn how to implement a stateful stream processing job with Flink's DataStream API.

We mix a lecture-style presentation with interactive coding sessions in which you will extend a basic DataStream job with additional functionality. The initial code is provided as a Maven project and available in the fraud-detection-job folder of the workshop material. The material is based on the DataStream API code walkthrough of the Flink documentation.


You should have prepared the tutorial environment at this point.
If you haven't done this yet, please do it now by following the preparation instructions.


The presentation slides are available online.

Original Fraud Detection Job

In this session, we work with a DataStream job that aims to detect fraudulent credit card transactions. The job is (mainly) implemented in two Java classes, FraudDetectionJob and FraudDetector. Both classes are briefly discussed in the following.

FraudDetectionJob

The FraudDetectionJob class defines the data flow of the fraud detection job.

public class FraudDetectionJob {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Transaction> transactions = env
      .addSource(new TransactionSource())
      .name("transactions");

    DataStream<Alert> alerts = transactions
      .keyBy(Transaction::getAccountId)
      .process(new FraudDetector())
      .name("fraud-detector");

    alerts
      .addSink(new AlertStdOutSink())
      .name("write-alerts");

    env.execute("Fraud Detection");
  }
}

The job starts with a data source that provides credit card transaction events. Usually, a data source would ingest the events from some kind of external system, such as Apache Kafka, Kinesis, or Apache Pulsar. Our demo job uses a data generator to simplify the setup.

The DataStream<Transaction> that is emitted by the data source is partitioned on the account id of the transaction with a keyBy() transformation. This is done to ensure that all transactions of the same account are processed by the same parallel instance of the subsequent function. This function is defined by the process() method and provided by the FraudDetector class. The FraudDetector function transforms the DataStream<Transaction> into a DataStream<Alert> that contains all alerts that are generated for the stream of transaction events.

The DataStream<Alert> is finally emitted via the AlertStdOutSink, which simply prints the alerts to the standard out. Usually, the alerts would be emitted to an external system such as Apache Kafka, Kinesis, or Apache Pulsar.

Finally, the execution of the job is triggered by with the execute() call.

FraudDetector

The FraudDetector class implements the actual business logic to generate alerts from transaction events.

public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

  private static final long serialVersionUID = 1L;

  private static final double SMALL_AMOUNT = 1.00;
  private static final double LARGE_AMOUNT = 500.00;
  private static final long ONE_MINUTE = 60 * 1000;

  @Override
  public void processElement(
      Transaction transaction,
      Context context,
      Collector<Alert> collector) throws Exception {

    Alert alert = new Alert();
    alert.setId(transaction.getAccountId());

    collector.collect(alert);
  }
}

The function extends KeyedProcessFunction and must be applied on a KeyedStream which is created with the keyBy() transformation. For each transaction event, the processElement() method of the function is called. All transactions with the same key are processed by the same instance of the FraudDetector class.

The logic of the FraudDetector function is very simple. It generates one alert for each incoming transaction. In the course of this session, we will improve the logic of the function and implement more meaningful business logic.

Adding State

Goal

Improve the FraudDetector function to emit an Alert for every transaction of more than $500 that follows a transaction of less than $1. No other transaction should have happened between the large and the small transaction.

Approach

In order to implement the logic, the FraudDetector function needs to "remember" whether the last transaction of an account was a small transaction (less than $1) or not. Hence, the function requires state.

Flink provides keyed state to store and access state per key. Keyed state is also checkpointed and restored in case of a failure. For our use case, we can use Flink's ValueState. ValueState<X> is a simple wrapper and stores a single value of an arbitrary data type X. It provides the following methods:

  • X value() gets the value of the state or null if state is not set
  • void update(X) sets the value of the state
  • void clear() clears the state

Before you can use keyed state, you need to register it with Flink's environment. This is done with a state descriptor via the RuntimeContext:

public class MyFunc extends KeyedProcessFunction<Long, X, Y> {

  private ValueState<X> lastVal;

  // open() is called before the first event is processed
  @Override
  public void open(Configuration conf) {
    lastVal = getRuntimeContext().getState(new ValueStateDescriptor<>("lastVal", X.class));
  }


  @Override
  public void processElement(X val, Context ctx, Collector<Y> out) { 
    X lastValue = lastVal.value();
    // do something
    lastVal.update(val);
  }
}

To implement the requested logic, we need a simple boolean flag (ValueState<Boolean>).

Adding a Timer

Goal

Improve the FraudDetector function to emit an Alert for every transaction of more than $500 that follows a transaction of less than $1 that happened at most one minute earlier. No other transaction should have happened between the large and small transaction.

Approach

In order to implement the requested logic, we need to a way to access information about time. A ProcessFunction provides a TimerService via its context objects. Using the TimerService you can

  • retrieve the current event-time and processing-time
  • set a timer for some future event-time or processing-time
  • delete a timer before it triggers

When a timer triggers, it invokes the onTimer() callback method. To implement the logic that is executed when a timer fires, you need to override this method.

The following code snippet shows how to use timers.

public class MyFunc extends KeyedProcessFunction<Long, X, Y> {

  @Override
  public void processElement(X val, Context ctx, Collector<Y> out) { 
    long curTime = context.timerService().currentProcessingTime();
    long inTenMins = curTime + (10 * 60 * 1000);
    context.timerService().registerProcessingTimeTimer(inTenMins);
  }

  @Override
  public void onTimer(long timestamp, OnTimerContext ctx, Collector<Y> out) { 
    // do something 10 mins after timer was set
  }
}

Note: In order to be able to delete a timer, you need to remember the time for which the timer was registered. This should also be done via keyed state.

Clone this wiki locally