diff --git a/data-pipeline/telemetry-extractor/src/main/java/org/ekstep/ep/samza/service/TelemetryExtractorService.java b/data-pipeline/telemetry-extractor/src/main/java/org/ekstep/ep/samza/service/TelemetryExtractorService.java index ff69bf4462..868c45936a 100644 --- a/data-pipeline/telemetry-extractor/src/main/java/org/ekstep/ep/samza/service/TelemetryExtractorService.java +++ b/data-pipeline/telemetry-extractor/src/main/java/org/ekstep/ep/samza/service/TelemetryExtractorService.java @@ -36,15 +36,21 @@ public void process(String message, TelemetryExtractorSink sink) { String syncTimestamp = df.print(syncts); List> events = (List>) batchEvent.get("events"); for (Map event : events) { - event.put("syncts", syncts); - event.put("@timestamp", syncTimestamp); - Map context = (Map)event.get("context"); - String channel = (String)context.get("channel"); - if(StringUtils.isEmpty(channel)){ - event.put("context", context); + String json = ""; + try { + event.put("syncts", syncts); + event.put("@timestamp", syncTimestamp); + Map context = (Map) event.get("context"); + String channel = (String) context.get("channel"); + if (StringUtils.isEmpty(channel)) { + event.put("context", context); + } + json = new Gson().toJson(event); + sink.toSuccessTopic(json); + } catch (Throwable t) { + LOGGER.info("", "Failed to send extracted event to success topic: " + t.getMessage()); + sink.toErrorTopic(json); } - String json = new Gson().toJson(event); - sink.toSuccessTopic(json); } metrics.incSuccessCounter(); generateAuditEvent(batchEvent, syncts, syncTimestamp, sink, defaultChannel);