From 5446b84f783e536f22a3e11f83e631c8b4ad89d1 Mon Sep 17 00:00:00 2001 From: artaasadi Date: Fri, 22 Nov 2024 20:02:53 +0100 Subject: [PATCH] fix: add create or update consumer function --- pkg/jq/nats.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/pkg/jq/nats.go b/pkg/jq/nats.go index be5e311..a4c5f8d 100644 --- a/pkg/jq/nats.go +++ b/pkg/jq/nats.go @@ -139,6 +139,25 @@ func (jq *JobQueue) ConsumeWithConfig( return consumeCtx, nil } +func (jq *JobQueue) CreateOrUpdateConsumer( + ctx context.Context, + service string, + stream string, + topics []string, + config jetstream.ConsumerConfig, +) error { + config.Name = fmt.Sprintf("%s-service", service) + config.Description = fmt.Sprintf("%s Service", strings.ToTitle(service)) + config.FilterSubjects = topics + + _, err := jq.js.CreateOrUpdateConsumer(ctx, stream, config) + if err != nil { + return err + } + + return nil +} + func (jq *JobQueue) Produce(ctx context.Context, topic string, data []byte, id string) (*uint64, error) { pubAck, err := jq.js.Publish(ctx, topic, data, jetstream.WithMsgID(id)) if err != nil {