Skip to content
Leroux Romain edited this page Apr 16, 2019 · 31 revisions

Introduction

About Streamroot.

Problem at hand / Data pipeline WHY

Raw data -> group by something (stream, isp, ...) -> aggregate over 5-minute windows

Solution with Flink

Data pipeline HOW.

Detail Flink mechanisms for resilient sinks.

Detail code organisation (available on github, maven deps, testable with local docker compose).

Basic implementation - InfluxSinkV1

First of all let's write a simple implementation of a sink writing data point to InfluxDB.

public class InfluxSinkV1 extends RichSinkFunction<Point> {

    private static final String RETENTION_POLICY = "";

    private transient InfluxDB influx;

    private final String connUrl;
    private final String user;
    private final String password;
    private final String database;

    public InfluxSinkV1(String connUrl, String user, String password, String database) {
        this.connUrl = connUrl;
        this.user = user;
        this.password = password;
        this.database = database;
    }

    @Override
    public void open(Configuration parameters) {
        influx = InfluxDBFactory.connect(connUrl, user, password);
    }

    @Override
    public void invoke(Point point, Context context) {
        influx.write(database, RETENTION_POLICY, point);
    }

    @Override
    public void close() {
        influx.close();
    }
}

A sink class must implement SinkFunction<IN>, however here we choose to extend RichSinkFunction<IN>, the reason is that the rich version provides a public void open(...) method that can be used to initialize any kind of complex/non-serializable state. In this case the transient InfluxDB client is not Serializable which is why we have to create it through this open method.

The main sink method invoke is quite simple, it takes a Point and uses influx client to send it.

Let's check that everything is alright with a quick unit test:

@Test
public void testWritingData() {
    // create an InfluxSink
    InfluxSinkV1 sink = new InfluxSinkV1("http://localhost:" + INFLUX_PORT, USER, PASSWORD, DB_DATA);
    sink.open(null); // initialize without any special config

    // create a data point and send it through the sink
    sink.invoke(Point.measurement("telemetry")
            .time(Instant.now().toEpochMilli(), TimeUnit.MILLISECONDS)
            .addField("temperature", 42)
            .tag("location", "Paris")
            .build(), null);

    // query influxdb to select all telemetry data
    QueryResult res = influx.query(new Query("SELECT * FROM telemetry", DB_DATA));
    QueryResult.Series series = res.getResults()
            .get(0)
            .getSeries()
            .stream()
            .filter(s -> s.getName().equals("telemetry"))
            .findFirst()
            .orElseThrow(() -> new AssertionError("No telemetry series"));

    Map<String, Object> data = zip(series.getColumns(), series.getValues().get(0));

    // check that the data point is existing and correct
    assertEquals("Paris", data.get("location"));
    assertEquals(42.0, data.get("temperature"));
    sink.close();
}

Assuming that you have docker-compose running, as mentioned in the introduction, run this test with:

mvn clean test -Dtest=InfluxSinkV1Test

...
[INFO] Running io.streamroot.InfluxSinkV1Test
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.58 s - in io.streamroot.InfluxSinkV1Test
[INFO] 
[INFO] Results:
[INFO] 
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO] 
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  4.244 s
[INFO] Finished at: 2019-04-15T16:43:49+02:00
[INFO] ------------------------------------------------------------------------

Everything looks good! However there's an issue, writing data points one by one, at each call of invoke, is quite inefficient for InfluxDB so let's see what we can do about that.

Adding batching support - InfluxSinkV2

It turns out that the influx client has built-in support for batching ! So to use it we just need to call enableBatch as follows:

@Override
public void open(Configuration parameters) {
    influx = InfluxDBFactory.connect(connUrl, user, password);
    influx.enableBatch(BatchOptions.DEFAULTS.flushDuration(100)); // flush batch every 100 ms
}

As for the unit test we need to change it a bit to take into account the batching duration:

// ... same as before ...

Thread.sleep(150); // before querying influx, wait 100ms for flushDuration + 50ms for safety
QueryResult res = influx.query(new Query("SELECT * FROM telemetry", DB_DATA));

// ... same as before ...

You can test it with:

mvn clean test -Dtest=InfluxSinkV2Test

However is it really that simple to handle batching properly ? What happens if a failure occurs between two batches ? Well it's very likely that data points will get lost as Flink has no idea that we are actually batching them. So let's see how we can handle a stateful sink properly with Flink.

Fully resilient implementation - InfluxSinkV3

Now we want to:

  1. Explicitly handle buffered points (leverage Flink's checkpoints)
  2. Control batches of data points sent to InfluxDB
  3. Retry in case of failure (don't process anymore data until recovery)

As for 1., Flink's documentation mentions managed operator states. This mechanism requires us to implement CheckpointedFunction in order to leverage Flink's checkpoints and thus being able to snapshot/recover our data points that are being buffered and batch sent.

Here are the two new methods of interest that we will have to implement:

/**
 * This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to
 * ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when
 * the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself.
 *
 * @param context the context for drawing a snapshot of the operator
 * @throws Exception
 */
void snapshotState(FunctionSnapshotContext context) throws Exception;

/**
 * This method is called when the parallel function instance is created during distributed
 * execution. Functions typically set up their state storing data structures in this method.
 *
 * @param context the context for initializing the operator
 * @throws Exception
 */
void initializeState(FunctionInitializationContext context) throws Exception;

So we have to buffer points within our sink implementation, let's simply do that with a List<Point>:

// Let's buffer data points ourselves now (instead of relying on the influx client built-in mechanism)
private final List<Point> bufferedPoints = new ArrayList<>();
private final int batchSize;

// Let's also add an internal checkpointedState that will be used by Flink
private transient ListState<Point> checkpointedState;
private final String descriptorId;

In this new version our sink main method invoke is now simply buffering points up to batchSize:

@Override
public synchronized void invoke(Point point, Context context) {
    bufferedPoints.add(point);
    if (bufferedPoints.size() == batchSize) {
        // will be detailed just after, but as the name suggests it will send data points to InfluxDB
        batchWrite(bufferedPoints); 
    }
}

Let's implement snapshotState(...) and initializeState(...) to respectively backup and restore bufferedPoints through checkpointedState:

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    checkpointedState.clear();
    for (Point point : bufferedPoints) {
        checkpointedState.add(point);
    }
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
    ListStateDescriptor<Point> descriptor = new ListStateDescriptor<>(descriptorId, TypeInformation.of(Point.class));
    checkpointedState = context.getOperatorStateStore().getListState(descriptor);
    if (context.isRestored()) {
        for (Point point : checkpointedState.get()) {
            bufferedPoints.add(point);
        }
    }
}

Now let's come back to the mysterious batchWrite(bufferedPoints) that we skipped in invoke's implementation. Our point 2. is to control batches of data points sent to InfluxDB.

private static final MediaType MEDIA_TYPE_STRING = MediaType.parse("text/plain");

private transient InfluxBatchService influx;

@Override
public void open(Configuration parameters) {
    influx = makeBatchService(connUrl);
}
private static InfluxBatchService makeBatchService(String url) {
    return new Retrofit.Builder()
            .baseUrl(url)
            .client(new OkHttpClient.Builder().build())
            .addConverterFactory(MoshiConverterFactory.create())
            .build()
            .create(InfluxBatchService.class);
}

public interface InfluxBatchService {
    String U = "u";
    String P = "p";
    String DB = "db";
    String RP = "rp";
    String PRECISION = "precision";
    String CONSISTENCY = "consistency";

    @POST("/write")
    Call<ResponseBody> writePoints(
            @Query(U) String username,
            @Query(P) String password,
            @Query(DB) String database,
            @Query(RP) String retentionPolicy,
            @Query(PRECISION) String precision,
            @Query(CONSISTENCY) String consistency,
            @Body RequestBody batchPoints);
}

Let's summarize ... features we implemented:

  • Backup and restore state with bufferedPoints / checkpointedState

Improving usability with a builder - InfluxSinkV4

Clone this wiki locally