Skip to content

Commit

Permalink
Issue #000 feat: Send individual events to error topic from batch event
Browse files Browse the repository at this point in the history
  • Loading branch information
Anand committed Mar 1, 2019
1 parent a7ca546 commit 0af1d5f
Showing 1 changed file with 14 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,21 @@ public void process(String message, TelemetryExtractorSink sink) {
String syncTimestamp = df.print(syncts);
List<Map<String, Object>> events = (List<Map<String, Object>>) batchEvent.get("events");
for (Map<String, Object> event : events) {
event.put("syncts", syncts);
event.put("@timestamp", syncTimestamp);
Map<String, Object> context = (Map<String, Object>)event.get("context");
String channel = (String)context.get("channel");
if(StringUtils.isEmpty(channel)){
event.put("context", context);
String json = "";
try {
event.put("syncts", syncts);
event.put("@timestamp", syncTimestamp);
Map<String, Object> context = (Map<String, Object>) event.get("context");
String channel = (String) context.get("channel");
if (StringUtils.isEmpty(channel)) {
event.put("context", context);
}
json = new Gson().toJson(event);
sink.toSuccessTopic(json);
} catch (Throwable t) {
LOGGER.info("", "Failed to send extracted event to success topic: " + t.getMessage());
sink.toErrorTopic(json);
}
String json = new Gson().toJson(event);
sink.toSuccessTopic(json);
}
metrics.incSuccessCounter();
generateAuditEvent(batchEvent, syncts, syncTimestamp, sink, defaultChannel);
Expand Down

0 comments on commit 0af1d5f

Please sign in to comment.