diff --git a/pkg/jq/nats.go b/pkg/jq/nats.go index be5e311..a4c5f8d 100644 --- a/pkg/jq/nats.go +++ b/pkg/jq/nats.go @@ -139,6 +139,25 @@ func (jq *JobQueue) ConsumeWithConfig( return consumeCtx, nil } +func (jq *JobQueue) CreateOrUpdateConsumer( + ctx context.Context, + service string, + stream string, + topics []string, + config jetstream.ConsumerConfig, +) error { + config.Name = fmt.Sprintf("%s-service", service) + config.Description = fmt.Sprintf("%s Service", strings.ToTitle(service)) + config.FilterSubjects = topics + + _, err := jq.js.CreateOrUpdateConsumer(ctx, stream, config) + if err != nil { + return err + } + + return nil +} + func (jq *JobQueue) Produce(ctx context.Context, topic string, data []byte, id string) (*uint64, error) { pubAck, err := jq.js.Publish(ctx, topic, data, jetstream.WithMsgID(id)) if err != nil {