From ff1eee498a0549937754ee77a4c1b6888971fbfa Mon Sep 17 00:00:00 2001 From: Mahan Zendedel DH Date: Tue, 13 Aug 2024 17:36:38 +0400 Subject: [PATCH] feat: update consume with config --- pkg/jq/nats.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/jq/nats.go b/pkg/jq/nats.go index fd83f22..9a1eb59 100644 --- a/pkg/jq/nats.go +++ b/pkg/jq/nats.go @@ -97,7 +97,7 @@ func (jq *JobQueue) Consume( queue string, handler func(jetstream.Msg), ) (jetstream.ConsumeContext, error) { - return jq.ConsumeWithConfig(ctx, service, stream, topics, queue, jetstream.ConsumerConfig{ + return jq.ConsumeWithConfig(ctx, service, stream, topics, jetstream.ConsumerConfig{ Name: fmt.Sprintf("%s-service", service), Description: fmt.Sprintf("%s Service", strings.ToTitle(service)), FilterSubjects: topics, @@ -107,7 +107,7 @@ func (jq *JobQueue) Consume( DeliverPolicy: jetstream.DeliverAllPolicy, MaxAckPending: -1, InactiveThreshold: time.Hour, - }, handler) + }, nil, handler) } // ConsumeWithConfig consumes messages from the given topic using the specified consumer config @@ -118,8 +118,8 @@ func (jq *JobQueue) ConsumeWithConfig( service string, stream string, topics []string, - queue string, config jetstream.ConsumerConfig, + pullConsumerOpts []jetstream.PullConsumeOpt, handler func(jetstream.Msg), ) (jetstream.ConsumeContext, error) { config.Name = fmt.Sprintf("%s-service", service) @@ -131,7 +131,7 @@ func (jq *JobQueue) ConsumeWithConfig( return nil, err } - consumeCtx, err := consumer.Consume(handler) + consumeCtx, err := consumer.Consume(handler, pullConsumerOpts...) if err != nil { return nil, err }