From cecdd5c67ed6ecbcda4122daf5fd67555fe3d029 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Thu, 25 Jul 2024 11:59:07 +0200 Subject: [PATCH] Add retries while sending messages to Kafka --- handlers/analytics/log_processor.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/handlers/analytics/log_processor.go b/handlers/analytics/log_processor.go index 9cfafecba..f0b30f9d2 100644 --- a/handlers/analytics/log_processor.go +++ b/handlers/analytics/log_processor.go @@ -182,11 +182,21 @@ func (p *LogProcessor) sendEvents() { } p.logs = []LogData{} - err := p.writer.WriteMessages(context.Background(), msgs...) - if err != nil { - metrics.Metrics.AnalyticsMetrics.LogProcessorWriteErrors.Inc() - glog.Errorf("error while sending analytics log to Kafka, err=%v", err) + // We retry sending messages to Kafka in case of a failure + // We don't use any backoff, because the number of events are filling up very quickly, so in case of a failure + // it's better to lose analytics logs than fill up the memory and crashing the whole catalyst-api + kafkaWriteRetries := 3 + var err error + for i := 0; i < kafkaWriteRetries; i++ { + err = p.writer.WriteMessages(context.Background(), msgs...) + if err == nil { + return + } else { + glog.Warningf("error while sending analytics log to Kafka, retrying, try=%d, err=%v", i, err) + } } + metrics.Metrics.AnalyticsMetrics.LogProcessorWriteErrors.Inc() + glog.Errorf("error while sending analytics log to Kafka, the analytics logs are lost, err=%d", err) } func (p *LogProcessor) logWriteMetrics() {