Skip to content

Commit

Permalink
Merge pull request #155 from opengovern/fix-create-consumer
Browse files Browse the repository at this point in the history
fix: add create or update consumer function
  • Loading branch information
artaasadi authored Nov 22, 2024
2 parents 74bcb3d + 5446b84 commit a516e35
Showing 1 changed file with 19 additions and 0 deletions.
19 changes: 19 additions & 0 deletions pkg/jq/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,25 @@ func (jq *JobQueue) ConsumeWithConfig(
return consumeCtx, nil
}

func (jq *JobQueue) CreateOrUpdateConsumer(
ctx context.Context,
service string,
stream string,
topics []string,
config jetstream.ConsumerConfig,
) error {
config.Name = fmt.Sprintf("%s-service", service)
config.Description = fmt.Sprintf("%s Service", strings.ToTitle(service))
config.FilterSubjects = topics

_, err := jq.js.CreateOrUpdateConsumer(ctx, stream, config)
if err != nil {
return err
}

return nil
}

func (jq *JobQueue) Produce(ctx context.Context, topic string, data []byte, id string) (*uint64, error) {
pubAck, err := jq.js.Publish(ctx, topic, data, jetstream.WithMsgID(id))
if err != nil {
Expand Down

0 comments on commit a516e35

Please sign in to comment.