acknowledge is not working as expected. #23391
Unanswered
boopesh-byterat
asked this question in
Q&A
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
We have shared consumer approach, now after the consumer consume the message, I take the messageId covert it to the string and pass it along the ETL process, now after the ETL process is completed. I want to deserialize the message string to the Pulsar MessageId object and the use it for acknowledgement. The weird issue is that it's getting acknowledged but the backlog count in the topic is not getting reduced nor the messageAckCount in the topic is incrementing. So it's actually acknowledging it. Also I verified the consumer is instance is same when pulling the message and acknowledging the message as well. One more weird thing that I found is that if I acknowledge the message immediately once I pull the message, like if I call await consumer.acknowledgeId(messageId); immediately after await consumer.receive(400); then its working fine. I am not able to understand the behavior of Pulsar here, Can someone explain what am doing wrong here ? Btw am using "pulsar-client": "^1.11.0" for Node
Message pulling logic, triggered with a grpahQL query
async pull_message(
namespace: string,
topic: string,
subscription_key: string,
) {
const consumer = await this.ensure_consumer(
namespace,
topic,
subscription_key,
);
console.info(
pull_message using consumerId ${(consumer as any).consumerId}
,);
try {
const message = await consumer.receive(400);
}
Message Acknowledgement logic.
async acknowledge_message(
namespace: string,
topic: string,
subscription_key: string,
messageIdStr: string,
) {
const consumer = await this.ensure_consumer(
namespace,
topic,
subscription_key,
);
console.info(
acknowledge_message using consumerId ${(consumer as any).consumerId}
,);
}
async ensure_consumer(namespace, topic, subscription_key) {
const stream_key =
${byterat_iris_apollo_stream_one_tenant}/${namespace}/${topic}
;}
Beta Was this translation helpful? Give feedback.
All reactions