Skip to content

Commit

Permalink
chore: add fallbacks to restart streamWorker if kafka connection brea…
Browse files Browse the repository at this point in the history
…ks (#34)
  • Loading branch information
danielbdias authored Dec 22, 2023
1 parent f28757f commit 6bc8eb1
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 16 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,6 @@ run: ## run Pokeshop API on Docker Compose and run Trace-based tests with Tracet

down: ## stop Pokeshop API on Docker Compose and run Trace-based tests with Tracetest
docker compose -f docker-compose.yml -f ./docker-compose.stream.yml -f ./tracetest/docker-compose.yml down

build/docker: # build docker image locally
docker build . -t kubeshop/demo-pokemon-api:latest
23 changes: 7 additions & 16 deletions api/src/services/stream.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ class InstrumentedKafkaStreamService<T> extends InstrumentedComponent implements
const headers = this.extractHeaders(message);
const parentContext = propagation.extract(context.active(), headers);

console.log('Extracting headers from message to get OTel data...')
console.log('Message headers: ', headers)
console.log('Context: ', parentContext)

const span = await createSpanFromContext(
`${this.topic} ${MessagingOperationValues.PROCESS}`,
parentContext,
Expand Down Expand Up @@ -109,10 +105,8 @@ class KafkaStreamService<T> implements StreamingService<T> {
brokers: [KAFKA_BROKER]
});

console.log(`Checking if need to create topic...`)
await this.waitForTopicCreation();

console.log(`Starting consumer for groupId 'test-group'...`)
this.consumer = this.client.consumer({ groupId: 'test-group' });
await this.consumer.connect();
} catch (ex) {
Expand All @@ -125,16 +119,17 @@ class KafkaStreamService<T> implements StreamingService<T> {
public async subscribe(callback: Function): Promise<void> {
const consumer = await this.connect();

console.log(`Subscribing consumer for topic '${this.topic}'...`)
await consumer.subscribe({ topic: this.topic, fromBeginning: true });

const { CRASH } = consumer.events;
await consumer.on(CRASH, () => {
// make the node process crash on purpose,
// so we can restart the worker
process.exit(-1);
});

await consumer.run({
eachMessage: async ({ message }) => {
console.log(`Consuming message...`);
console.log(`Message headers: ${message.headers?.toString()}`)
console.log(`Message key: ${message.key?.toString()}`)
console.log(`Message value: ${message.value?.toString()}`)

await callback(message);
},
})
Expand All @@ -145,21 +140,17 @@ class KafkaStreamService<T> implements StreamingService<T> {
return
}

console.log(`Connecting to Kafka admin to check topics...`);
const admin = this.client.admin()
await admin.connect()

while (true) {
const topics = await admin.listTopics()
console.log(`Topics registered for broker '${topics}' ...`);

if (topics.includes(this.topic)) {
console.log(`Topic '${this.topic}' exists.`);
await admin?.disconnect()
return
}

console.log(`Topic '${this.topic}' does not exists. Waiting for producer to create it.`);
await sleep(5_000); //wait for 5 seconds
}
}
Expand Down
1 change: 1 addition & 0 deletions docker-compose.stream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ services:
KAFKA_TOPIC: 'pokemon'
KAFKA_CLIENT_ID: 'streaming-worker'
REDIS_URL: cache
restart: on-failure
depends_on:
db:
condition: service_healthy
Expand Down

0 comments on commit 6bc8eb1

Please sign in to comment.