From 6bc8eb1e5ab988a33adf433a702bf34d64c4d14c Mon Sep 17 00:00:00 2001 From: Daniel Baptista Dias Date: Fri, 22 Dec 2023 13:50:57 -0300 Subject: [PATCH] chore: add fallbacks to restart streamWorker if kafka connection breaks (#34) --- Makefile | 3 +++ api/src/services/stream.service.ts | 23 +++++++---------------- docker-compose.stream.yml | 1 + 3 files changed, 11 insertions(+), 16 deletions(-) diff --git a/Makefile b/Makefile index b8d419e..e8a8400 100644 --- a/Makefile +++ b/Makefile @@ -38,3 +38,6 @@ run: ## run Pokeshop API on Docker Compose and run Trace-based tests with Tracet down: ## stop Pokeshop API on Docker Compose and run Trace-based tests with Tracetest docker compose -f docker-compose.yml -f ./docker-compose.stream.yml -f ./tracetest/docker-compose.yml down + +build/docker: # build docker image locally + docker build . -t kubeshop/demo-pokemon-api:latest \ No newline at end of file diff --git a/api/src/services/stream.service.ts b/api/src/services/stream.service.ts index 45cb32c..5636ac6 100644 --- a/api/src/services/stream.service.ts +++ b/api/src/services/stream.service.ts @@ -56,10 +56,6 @@ class InstrumentedKafkaStreamService extends InstrumentedComponent implements const headers = this.extractHeaders(message); const parentContext = propagation.extract(context.active(), headers); - console.log('Extracting headers from message to get OTel data...') - console.log('Message headers: ', headers) - console.log('Context: ', parentContext) - const span = await createSpanFromContext( `${this.topic} ${MessagingOperationValues.PROCESS}`, parentContext, @@ -109,10 +105,8 @@ class KafkaStreamService implements StreamingService { brokers: [KAFKA_BROKER] }); - console.log(`Checking if need to create topic...`) await this.waitForTopicCreation(); - console.log(`Starting consumer for groupId 'test-group'...`) this.consumer = this.client.consumer({ groupId: 'test-group' }); await this.consumer.connect(); } catch (ex) { @@ -125,16 +119,17 @@ class KafkaStreamService implements StreamingService { public async subscribe(callback: Function): Promise { const consumer = await this.connect(); - console.log(`Subscribing consumer for topic '${this.topic}'...`) await consumer.subscribe({ topic: this.topic, fromBeginning: true }); + const { CRASH } = consumer.events; + await consumer.on(CRASH, () => { + // make the node process crash on purpose, + // so we can restart the worker + process.exit(-1); + }); + await consumer.run({ eachMessage: async ({ message }) => { - console.log(`Consuming message...`); - console.log(`Message headers: ${message.headers?.toString()}`) - console.log(`Message key: ${message.key?.toString()}`) - console.log(`Message value: ${message.value?.toString()}`) - await callback(message); }, }) @@ -145,21 +140,17 @@ class KafkaStreamService implements StreamingService { return } - console.log(`Connecting to Kafka admin to check topics...`); const admin = this.client.admin() await admin.connect() while (true) { const topics = await admin.listTopics() - console.log(`Topics registered for broker '${topics}' ...`); if (topics.includes(this.topic)) { - console.log(`Topic '${this.topic}' exists.`); await admin?.disconnect() return } - console.log(`Topic '${this.topic}' does not exists. Waiting for producer to create it.`); await sleep(5_000); //wait for 5 seconds } } diff --git a/docker-compose.stream.yml b/docker-compose.stream.yml index 6ca6c45..f17fa69 100644 --- a/docker-compose.stream.yml +++ b/docker-compose.stream.yml @@ -42,6 +42,7 @@ services: KAFKA_TOPIC: 'pokemon' KAFKA_CLIENT_ID: 'streaming-worker' REDIS_URL: cache + restart: on-failure depends_on: db: condition: service_healthy