Skip to content

Commit

Permalink
feat: update consume with config
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahanmmi committed Aug 13, 2024
1 parent e4d357e commit ff1eee4
Showing 1 changed file with 4 additions and 4 deletions.
8 changes: 4 additions & 4 deletions pkg/jq/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (jq *JobQueue) Consume(
queue string,
handler func(jetstream.Msg),
) (jetstream.ConsumeContext, error) {
return jq.ConsumeWithConfig(ctx, service, stream, topics, queue, jetstream.ConsumerConfig{
return jq.ConsumeWithConfig(ctx, service, stream, topics, jetstream.ConsumerConfig{
Name: fmt.Sprintf("%s-service", service),
Description: fmt.Sprintf("%s Service", strings.ToTitle(service)),
FilterSubjects: topics,
Expand All @@ -107,7 +107,7 @@ func (jq *JobQueue) Consume(
DeliverPolicy: jetstream.DeliverAllPolicy,
MaxAckPending: -1,
InactiveThreshold: time.Hour,
}, handler)
}, nil, handler)
}

// ConsumeWithConfig consumes messages from the given topic using the specified consumer config
Expand All @@ -118,8 +118,8 @@ func (jq *JobQueue) ConsumeWithConfig(
service string,
stream string,
topics []string,
queue string,
config jetstream.ConsumerConfig,
pullConsumerOpts []jetstream.PullConsumeOpt,
handler func(jetstream.Msg),
) (jetstream.ConsumeContext, error) {
config.Name = fmt.Sprintf("%s-service", service)
Expand All @@ -131,7 +131,7 @@ func (jq *JobQueue) ConsumeWithConfig(
return nil, err
}

consumeCtx, err := consumer.Consume(handler)
consumeCtx, err := consumer.Consume(handler, pullConsumerOpts...)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit ff1eee4

Please sign in to comment.