From e685f7e05a184e06a9a1a34abffbb3676a2d6f95 Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Mon, 8 Apr 2019 11:31:19 +0200 Subject: [PATCH] Add support for fatal errors --- kafkacat.c | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/kafkacat.c b/kafkacat.c index e7b59abd..21cb9ee9 100644 --- a/kafkacat.c +++ b/kafkacat.c @@ -1085,7 +1085,23 @@ static void term (int sig) { */ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) { +#if RD_KAFKA_VERSION >= 0x01000000 + if (err == RD_KAFKA_RESP_ERR__FATAL) { + /* A fatal error has been raised, extract the + * underlying error, and start graceful termination - + * this to make sure producer delivery reports are + * handled before exiting. */ + char fatal_errstr[512]; + rd_kafka_resp_err_t fatal_err; + + fatal_err = rd_kafka_fatal_error(rk, fatal_errstr, + sizeof(fatal_errstr)); + KC_INFO(0, "FATAL CLIENT ERROR: %s: %s: terminating\n", + rd_kafka_err2str(fatal_err), fatal_errstr); + conf.run = 0; + } else +#endif if (err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN) { KC_ERROR("%s: %s", rd_kafka_err2str(err), reason ? reason : "");