-
Notifications
You must be signed in to change notification settings - Fork 2
Home
About Streamroot.
Problem at hand / Data pipeline WHY
Raw data -> group by something (stream, isp, ...) -> aggregate over 5-minute windows
Data pipeline HOW.
Detail Flink mechanisms for resilient sinks.
Detail code organisation (available on github, maven deps, testable with local docker compose).
First of all let's write a simple implementation of a sink writing data point to InfluxDB.
public class InfluxSinkV1 extends RichSinkFunction<Point> {
private static final String RETENTION_POLICY = "";
private transient InfluxDB influx;
private final String connUrl;
private final String user;
private final String password;
private final String database;
public InfluxSinkV1(String connUrl, String user, String password, String database) {
this.connUrl = connUrl;
this.user = user;
this.password = password;
this.database = database;
}
@Override
public void open(Configuration parameters) {
influx = InfluxDBFactory.connect(connUrl, user, password);
}
@Override
public void invoke(Point point, Context context) {
influx.write(database, RETENTION_POLICY, point);
}
@Override
public void close() {
influx.close();
}
}
A sink class must implement SinkFunction<IN>
, however here we choose to extend RichSinkFunction<IN>
,
the reason is that the rich version provides a public void open(...)
method that can be used to initialize
any kind of complex/non-serializable state. In this case the transient InfluxDB
client is not Serializable
which is why we have to create it through this open
method.
The main sink method invoke
is quite simple, it takes a Point
and uses influx
client to send it.
Let's check that everything is alright with a quick unit test:
@Test
public void testWritingData() {
// create an InfluxSink
InfluxSinkV1 sink = new InfluxSinkV1("http://localhost:" + INFLUX_PORT, USER, PASSWORD, DB_DATA);
sink.open(null); // initialize without any special config
// create a data point and send it through the sink
sink.invoke(Point.measurement("telemetry")
.time(Instant.now().toEpochMilli(), TimeUnit.MILLISECONDS)
.addField("temperature", 42)
.tag("location", "Paris")
.build(), null);
// query influxdb to select all telemetry data
QueryResult res = influx.query(new Query("SELECT * FROM telemetry", DB_DATA));
QueryResult.Series series = res.getResults()
.get(0)
.getSeries()
.stream()
.filter(s -> s.getName().equals("telemetry"))
.findFirst()
.orElseThrow(() -> new AssertionError("No telemetry series"));
Map<String, Object> data = zip(series.getColumns(), series.getValues().get(0));
// check that the data point is existing and correct
assertEquals("Paris", data.get("location"));
assertEquals(42.0, data.get("temperature"));
sink.close();
}
Assuming that you have docker-compose
running, as mentioned in the introduction, run this test with:
mvn clean test -Dtest=InfluxSinkV1Test
...
[INFO] Running io.streamroot.InfluxSinkV1Test
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.58 s - in io.streamroot.InfluxSinkV1Test
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 4.244 s
[INFO] Finished at: 2019-04-15T16:43:49+02:00
[INFO] ------------------------------------------------------------------------
Everything looks good! However there's an issue, writing data points one by one, at each call of invoke
, is quite
inefficient for InfluxDB so let's see what we can do about that.
It turns out that the influx
client has built-in support for batching !
So to use it we just need to call enableBatch
as follows:
@Override
public void open(Configuration parameters) {
influx = InfluxDBFactory.connect(connUrl, user, password);
influx.enableBatch(BatchOptions.DEFAULTS.flushDuration(100)); // flush batch every 100 ms
}
As for the unit test we need to change it a bit to take into account the batching duration:
// ... same as before ...
Thread.sleep(150); // before querying influx, wait 100ms for flushDuration + 50ms for safety
QueryResult res = influx.query(new Query("SELECT * FROM telemetry", DB_DATA));
// ... same as before ...
You can test it with:
mvn clean test -Dtest=InfluxSinkV2Test
However is it really that simple to handle batching properly ? What happens if a failure occurs between two batches ? Well it's very likely that data points will get lost as Flink has no idea that we are actually batching them. So let's see how we can handle a stateful sink properly with Flink.
To achieve resiliency we need to have the following mechanisms in place:
- Explicitly handle buffered points (leverage Flink's checkpoints)
- Control batches of data points sent to InfluxDB
- Retry in case of failure (don't process anymore data until recovery)
Flink's documentation mentions managed operator states. This mechanism requires us to implement
CheckpointedFunction
in order to leverage Flink's checkpoints and thus being able to snapshot/recover our
data points that are being buffered and batch sent.
Here are the two new methods of interest that we will have to implement:
/**
* This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to
* ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when
* the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself.
*
* @param context the context for drawing a snapshot of the operator
* @throws Exception
*/
void snapshotState(FunctionSnapshotContext context) throws Exception;
/**
* This method is called when the parallel function instance is created during distributed
* execution. Functions typically set up their state storing data structures in this method.
*
* @param context the context for initializing the operator
* @throws Exception
*/
void initializeState(FunctionInitializationContext context) throws Exception;
So we have to buffer points within our sink implementation, let's simply do that with a List<Point>
:
// Let's buffer data points ourselves now (instead of relying on the influx client built-in mechanism)
private final List<Point> bufferedPoints = new ArrayList<>();
private final int batchSize;
// Let's also add an internal checkpointedState that will be used by Flink
private transient ListState<Point> checkpointedState;
private final String descriptorId;
In this new version our sink main method invoke
is now simply buffering points up to batchSize
:
@Override
public void invoke(Point point, Context context) throws Exception {
bufferedPoints.add(point);
if (bufferedPoints.size() == batchSize) {
// This will be detailed just after,
// but as the name suggests it sends data points to InfluxDB
batchWrite(bufferedPoints);
bufferedPoints.clear();
}
}
Let's implement snapshotState(...)
and initializeState(...)
to respectively backup and restore bufferedPoints
through checkpointedState
:
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (Point point : bufferedPoints) {
checkpointedState.add(point);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Point> descriptor = new ListStateDescriptor<>(descriptorId, TypeInformation.of(Point.class));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
for (Point point : checkpointedState.get()) {
bufferedPoints.add(point);
}
}
}
Since we stopped relying on influx client built-in batching feature, we will have to roll our own.
Luckily we can leverage OkHttpClient
, on top of which influx client is built, to achieve batching.
To this end we'll start by defining a custom InfluxBatchService
as follows:
// A helper for http requests
private static final MediaType MEDIA_TYPE_STRING = MediaType.parse("text/plain");
// Our new batching service
private transient InfluxBatchService influx;
@Override
public void open(Configuration parameters) {
// Create our batching service here
influx = makeBatchService(connUrl);
}
// Batching service creation using OkHttpClient
private static InfluxBatchService makeBatchService(String url) {
return new Retrofit.Builder()
.baseUrl(url)
.client(new OkHttpClient.Builder().build())
.addConverterFactory(MoshiConverterFactory.create())
.build()
.create(InfluxBatchService.class);
}
// Batching service definition, this is InfluxDB http requests format
public interface InfluxBatchService {
String U = "u";
String P = "p";
String DB = "db";
String RP = "rp";
String PRECISION = "precision";
String CONSISTENCY = "consistency";
@POST("/write")
Call<ResponseBody> writePoints(
@Query(U) String username,
@Query(P) String password,
@Query(DB) String database,
@Query(RP) String retentionPolicy,
@Query(PRECISION) String precision,
@Query(CONSISTENCY) String consistency,
@Body RequestBody batchPoints);
}
Now let's come back to the mysterious batchWrite(bufferedPoints)
that we skipped earlier in the sink's new invoke
method and let's see how it uses our new InfluxBatchService
:
private Response<ResponseBody> batchWrite(Iterable<Point> points) throws IOException {
return influx.writePoints(
user, password, database,
RETENTION_POLICY,
TimeUtil.toTimePrecision(TimeUnit.NANOSECONDS),
InfluxDB.ConsistencyLevel.ONE.value(),
RequestBody.create(
MEDIA_TYPE_STRING,
StreamSupport.stream(points.spliterator(), false)
.map(Point::lineProtocol)
.collect(Collectors.joining("\n"))))
.execute();
}
It simply iterates through the data points and call lineProtocol
to format them to the InfluxDB protocol
before batch sending them with a POST request.
There has been a lot of changes since InfluxSinkV1
already, but are we done? Not yet!
With our new batching approach we're flushing data points only when our buffer is full.
What if we buffer points that are the result of 10-minute windowed aggregation and that our buffer isn't full ?
Well they won't be flushed until the next 10-minute window ... Not very nice.
So let's add a final piece to our custom batching: a time limit.
The idea is that we want to flush our buffered point as soon as one of the two following condition is met:
either the batchSize
is reached, or batchFreqMs
has elapsed.
To achieve that we can make use of a SingleThreadScheduledExecutor
that will regularly flush the buffered points.
In order to make sure that everything is thread-safe we can simply add synchronized
to all methods interacting with bufferedPoints
.
private transient ScheduledExecutorService scheduledExec;
@Override
public void open(Configuration parameters) {
influx = makeBatchService(connUrl);
// Simple flushPoints at batchFreqMs interval
scheduledExec = Executors.newSingleThreadScheduledExecutor();
scheduledExec.scheduleAtFixedRate(this::flushPoints, 0, batchFreqMs, TimeUnit.MILLISECONDS);
}
Let's not forget to cleanup the scheduled executor when the sink is being stopped:
@Override
public void close() {
boolean isStopped = false;
try {
isStopped = scheduledExec.awaitTermination(batchFreqMs, TimeUnit.MILLISECONDS);
} catch (Throwable e) {
// slurp timeout
} finally {
if(!isStopped) {
scheduledExec.shutdownNow();
}
}
}
With this new mechanism, invoke
will also have to make use of flushPoints()
:
@Override
public synchronized void invoke(Point point, Context context) {
bufferedPoints.add(point);
if (bufferedPoints.size() == batchSize) {
flushPoints(); // delegates flushing
}
}
And now flushPoints()
is responsible for triggering batchWrite(bufferedPoints)
and clearing the buffer:
private synchronized void flushPoints() {
if (bufferedPoints.size() > 0) {
try {
batchWrite(bufferedPoints); // May throw IOException ...
bufferedPoints.clear();
} catch (IOException e) {
LOG.error("We will handle that correctly in the next section ...");
}
}
}
Finally we have a complete batching mechanism in place! But as you can see from the last line of flushPoints()
,
we're not doing much when an exceptions occurs... This brings us to our last feature,
that is proper error handling and retries.
The last piece of this InfluxSinkV3
is the retry mechanism that will make use of the Failsafe
library.
What we want is to stop processing when an InfluxDB related error occurs, and have a retry policy to help recovering
the situation.
Let's define a retry policy that will just keep on retrying sending the current batch of data at regular intervals:
private transient RetryPolicy<Response<ResponseBody>> retryPolicy;
private final int retryFreqMs;
private final AtomicLong retrying = new AtomicLong(0);
@Override
public void open(Configuration parameters) {
// ... same as before ...
retryPolicy = new RetryPolicy<Response<ResponseBody>>()
.withMaxRetries(-1) // inifinite retries
.handle(IOException.class)
.handleResultIf((Response<ResponseBody> r) -> {
if (!r.isSuccessful()) {
String errMessage = "";
try (ResponseBody errorBody = r.errorBody()) {
if (null != errorBody) errMessage = errorBody.string();
} catch (IOException e) {
LOG.error("Couldn't read response errorBody: ", e.getMessage());
}
LOG.error("Error {} from Influx: {}", r.code(), errMessage);
return true; // will retry
} else {
return false; // don't retry
}
})
.withDelay(Duration.ofMillis(retryFreqMs));
}
Now we wrap our call to batchWrite(bufferedPoints)
with a Failsafe.with(retryPolicy)
call.
It will handle IOException
s and errors returned by InfluxDB and retry infinitely.
private synchronized void flushPoints() {
if (bufferedPoints.size() > 0) {
retrying.getAndSet(0);
Failsafe.with(retryPolicy).get(batchWrite(bufferedPoints)); // blocks until success
if (retrying.get() > 1) {
LOG.info("Batch successfully recovered");
}
bufferedPoints.clear();
}
}
Last but not least, let's modify batchWrite(...)
so that it can be used from Failsafe
.
Let's also keep track of the number of retries.
private CheckedSupplier<Response<ResponseBody>> batchWrite(Iterable<Point> points) {
// Wraps the call to influx.writePoints(...) in a CheckedSupplier for Failsafe
return () -> {
long retryNb = retrying.getAndIncrement();
if (retryNb > 0) {
LOG.warn("Retrying batch (" + retryNb + ")");
}
return influx.writePoints(
user, password, database,
RETENTION_POLICY,
TimeUtil.toTimePrecision(TimeUnit.NANOSECONDS),
InfluxDB.ConsistencyLevel.ONE.value(),
RequestBody.create(
MEDIA_TYPE_STRING,
StreamSupport.stream(points.spliterator(), false)
.map(Point::lineProtocol)
.collect(Collectors.joining("\n"))))
.execute();
};
}
And at last we're all done! For completeness sake here is final version of InfluxSinkV3
.
Let's summarize all the features we implemented:
- Backup and restore state with
bufferedPoints
/checkpointedState
- Custom
InfluxBatchService
with configurable flushing ofbatchSize
/batchFreqMs
- Errors handling and retries with
Failsafe
Our shinny features look good and all, now ideally we'd like to verify them with some unit tests. At least let's check that our retry mechanism is working well.
For that we'll used a library called TcpCrusher
that can simulate a network failure.
Let's modify our tests setup and proxy connections to our local InfluxDB through a TcpCrusher
so that
we can stop and restart TCP traffic at will.
@BeforeClass
public static void setup() throws Exception {
influx = makeInfluxConn();
initInflux(influx);
reactor = new NioReactor(10); // tick im ms
tcpCrusher = TcpCrusherBuilder.builder()
.withReactor(reactor)
.withBindAddress("localhost", PROXIED_PORT)
.withConnectAddress("localhost", INFLUX_PORT)
.buildAndOpen();
}
Our new resiliency tests looks like that:
@Test
public void testResilience() throws Exception {
// Now connect to InfluxDB through the TcpCrusher proxy
InfluxSinkV3 sink = new InfluxSinkV3("http://localhost:" + PROXIED_PORT, USER, PASSWORD, DB_DATA);
sink.open(null);
// stop proxying to influx
tcpCrusher.freeze();
// try writing point
Instant now = Instant.now();
sink.invoke(Point.measurement("monitoring")
.time(now.toEpochMilli(), TimeUnit.MILLISECONDS)
.addField("warnings", 12)
.tag("datacenter", "abc")
.build(), null);
Thread.sleep(150); // wait for batch to complete (a bit more than batchFreqMs)
// simulate influx/network failure
tcpCrusher.close();
Thread.sleep(300);
tcpCrusher.open();
Thread.sleep(200); // wait for retry to complete (a bit more than batchFreqMs + retryFreqMs)
QueryResult res = influx.query(new Query("SELECT * FROM monitoring", DB_DATA));
QueryResult.Series series = res.getResults()
.get(0)
.getSeries()
.stream()
.filter(s -> s.getName().equals("monitoring"))
.findFirst()
.orElseThrow(() -> new AssertionError("No monitoring series"));
Map<String, Object> data = zip(series.getColumns(), series.getValues().get(0));
assertEquals("abc", data.get("datacenter"));
assertEquals(12.0, data.get("warnings"));
assertEquals(now, Instant.parse((String) data.get("time")));
sink.close();
}
Again assuming that you have docker-compose
running, run this test with:
mvn clean test -Dtest=InfluxSinkV3Test
The test should pass and we can see in the output that it has been retrying many times before successfully recovering the current batch:
[INFO] Running io.streamroot.InfluxSinkV3Test
18:36:19,286 INFO org.netcrusher.tcp.TcpCrusher - TcpCrusher <localhost/127.0.0.1:8085>-<localhost/127.0.0.1:8086> is open
18:36:19,784 INFO org.netcrusher.tcp.TcpCrusher - TcpCrusher <localhost/127.0.0.1:8085>-<localhost/127.0.0.1:8086> is closed
18:36:19,811 WARN io.streamroot.InfluxSinkV3 - Retrying batch (1)
18:36:19,840 WARN io.streamroot.InfluxSinkV3 - Retrying batch (2)
18:36:19,869 WARN io.streamroot.InfluxSinkV3 - Retrying batch (3)
18:36:19,902 WARN io.streamroot.InfluxSinkV3 - Retrying batch (4)
18:36:19,934 WARN io.streamroot.InfluxSinkV3 - Retrying batch (5)
18:36:19,966 WARN io.streamroot.InfluxSinkV3 - Retrying batch (6)
18:36:19,994 WARN io.streamroot.InfluxSinkV3 - Retrying batch (7)
18:36:20,024 WARN io.streamroot.InfluxSinkV3 - Retrying batch (8)
18:36:20,054 WARN io.streamroot.InfluxSinkV3 - Retrying batch (9)
18:36:20,081 WARN io.streamroot.InfluxSinkV3 - Retrying batch (10)
18:36:20,085 INFO org.netcrusher.tcp.TcpCrusher - TcpCrusher <localhost/127.0.0.1:8085>-<localhost/127.0.0.1:8086> is open
18:36:20,110 WARN io.streamroot.InfluxSinkV3 - Retrying batch (11)
18:36:20,199 INFO io.streamroot.InfluxSinkV3 - Batch successfully recovered
18:36:20,431 INFO org.netcrusher.tcp.TcpCrusher - TcpCrusher <localhost/127.0.0.1:8085>-<localhost/127.0.0.1:8086> is closed
We ended up with a lot of parameter fo our InfluxSink
which make it a bit cumbersome to use.
A simple solution for that is to use a builder to setup all parameters in a clean and fluent fashion.
Let's see it in action:
public static class Builder implements Serializable {
private String descriptorId;
private String connUrl;
private String user;
private String password;
private String database;
private int batchSize;
private int batchFreqMs;
private int retryFreqMs;
private Builder() {}
public Builder connUrl(String connUrl) {
this.connUrl = connUrl;
return this;
}
// Similar methods returning this and setting some field
public InfluxSinkV4 build() {
return new InfluxSinkV4(this);
}
}
// Factory method for the Builder
public static Builder builder() {
return new Builder();
}
// Now InfluxSinkV4 contructor is private and uses a Builder
private InfluxSinkV4(Builder b) {
this.connUrl = b.connUrl;
this.user = b.user;
this.password = b.password;
this.database = b.database;
this.descriptorId = b.descriptorId;
this.batchSize = b.batchSize;
this.batchFreqMs = b.batchFreqMs;
this.retryFreqMs = b.retryFreqMs;
}
Explain the demo job