Skip to content

Commit

Permalink
[ADDED] Pause and resume jetstream consumer (#1571)
Browse files Browse the repository at this point in the history
Signed-off-by: Yordis Prieto <[email protected]>
  • Loading branch information
yordis authored and piotrpio committed Aug 15, 2024
1 parent 0138e7a commit dd5b462
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 0 deletions.
3 changes: 3 additions & 0 deletions jetstream/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ const (
// apiConsumerDeleteT is used to delete consumers.
apiConsumerDeleteT = "CONSUMER.DELETE.%s.%s"

// apiConsumerPauseT is used to pause a consumer.
apiConsumerPauseT = "CONSUMER.PAUSE.%s.%s"

// apiConsumerListT is used to return all detailed consumer information
apiConsumerListT = "CONSUMER.LIST.%s"

Expand Down
38 changes: 38 additions & 0 deletions jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/nats-io/nats.go/internal/syncx"
"github.com/nats-io/nuid"
Expand Down Expand Up @@ -321,6 +322,43 @@ func deleteConsumer(ctx context.Context, js *jetStream, stream, consumer string)
return nil
}

func pauseConsumer(ctx context.Context, js *jetStream, stream, consumer string, pauseUntil *time.Time) (*ConsumerPauseResponse, error) {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
if err := validateConsumerName(consumer); err != nil {
return nil, err
}
subject := apiSubj(js.apiPrefix, fmt.Sprintf(apiConsumerPauseT, stream, consumer))

var resp consumerPauseApiResponse
req, err := json.Marshal(consumerPauseRequest{
PauseUntil: pauseUntil,
})
if err != nil {
return nil, err
}
if _, err := js.apiRequestJSON(ctx, subject, &resp, req); err != nil {
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == JSErrCodeConsumerNotFound {
return nil, ErrConsumerNotFound
}
return nil, resp.Error
}
return &ConsumerPauseResponse{
Paused: resp.Paused,
PauseUntil: resp.PauseUntil,
PauseRemaining: resp.PauseRemaining,
}, nil
}

func resumeConsumer(ctx context.Context, js *jetStream, stream, consumer string) (*ConsumerPauseResponse, error) {
return pauseConsumer(ctx, js, stream, consumer, nil)
}

func validateConsumerName(dur string) error {
if dur == "" {
return fmt.Errorf("%w: '%s'", ErrInvalidConsumerName, "name is required")
Expand Down
10 changes: 10 additions & 0 deletions jetstream/consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ type (

// TimeStamp indicates when the info was gathered by the server.
TimeStamp time.Time `json:"ts"`

// Paused indicates whether the consumer is paused.
Paused bool `json:"paused,omitempty"`

// PauseRemaining contains the amount of time left until the consumer
// unpauses. It will only be non-zero if the consumer is currently paused.
PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
}

// ConsumerConfig is the configuration of a JetStream consumer.
Expand Down Expand Up @@ -217,6 +224,9 @@ type (
// associating metadata on the consumer. This feature requires
// nats-server v2.10.0 or later.
Metadata map[string]string `json:"metadata,omitempty"`

// PauseUntil is for suspending the consumer until the deadline.
PauseUntil *time.Time `json:"pause_until,omitempty"`
}

// OrderedConsumerConfig is the configuration of an ordered JetStream
Expand Down
20 changes: 20 additions & 0 deletions jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ type (
// DeleteConsumer removes a consumer with given name from a stream.
// If consumer does not exist, ErrConsumerNotFound is returned.
DeleteConsumer(ctx context.Context, stream string, consumer string) error

// PauseConsumer pauses a consumer until the given time.
PauseConsumer(ctx context.Context, stream string, consumer string, pauseUntil time.Time) (*ConsumerPauseResponse, error)

// ResumeConsumer resumes a paused consumer.
ResumeConsumer(ctx context.Context, stream string, consumer string) (*ConsumerPauseResponse, error)
}

// StreamListOpt is a functional option for [StreamManager.ListStreams] and
Expand Down Expand Up @@ -781,6 +787,20 @@ func (js *jetStream) DeleteConsumer(ctx context.Context, stream string, name str
return deleteConsumer(ctx, js, stream, name)
}

func (js *jetStream) PauseConsumer(ctx context.Context, stream string, consumer string, pauseUntil time.Time) (*ConsumerPauseResponse, error) {
if err := validateStreamName(stream); err != nil {
return nil, err
}
return pauseConsumer(ctx, js, stream, consumer, &pauseUntil)
}

func (js *jetStream) ResumeConsumer(ctx context.Context, stream string, consumer string) (*ConsumerPauseResponse, error) {
if err := validateStreamName(stream); err != nil {
return nil, err
}
return resumeConsumer(ctx, js, stream, consumer)
}

func validateStreamName(stream string) error {
if stream == "" {
return ErrStreamNameRequired
Expand Down
34 changes: 34 additions & 0 deletions jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ type (
// If consumer does not exist, ErrConsumerNotFound is returned.
DeleteConsumer(ctx context.Context, consumer string) error

// PauseConsumer pauses a consumer.
PauseConsumer(ctx context.Context, consumer string, pauseUntil time.Time) (*ConsumerPauseResponse, error)

// ResumeConsumer resumes a consumer.
ResumeConsumer(ctx context.Context, consumer string) (*ConsumerPauseResponse, error)

// ListConsumers returns ConsumerInfoLister enabling iterating over a
// channel of consumer infos.
ListConsumers(context.Context) ConsumerInfoLister
Expand Down Expand Up @@ -163,6 +169,24 @@ type (
Success bool `json:"success,omitempty"`
}

consumerPauseRequest struct {
PauseUntil *time.Time `json:"pause_until,omitempty"`
}

ConsumerPauseResponse struct {
// Paused is true if the consumer is paused.
Paused bool `json:"paused"`
// PauseUntil is the time until the consumer is paused.
PauseUntil time.Time `json:"pause_until"`
// PauseRemaining is the time remaining until the consumer is paused.
PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
}

consumerPauseApiResponse struct {
apiResponse
ConsumerPauseResponse
}

// GetMsgOpt is a function setting options for [Stream.GetMsg]
GetMsgOpt func(*apiMsgGetRequest) error

Expand Down Expand Up @@ -296,6 +320,16 @@ func (s *stream) DeleteConsumer(ctx context.Context, name string) error {
return deleteConsumer(ctx, s.jetStream, s.name, name)
}

// PauseConsumer pauses a consumer.
func (s *stream) PauseConsumer(ctx context.Context, name string, pauseUntil time.Time) (*ConsumerPauseResponse, error) {
return pauseConsumer(ctx, s.jetStream, s.name, name, &pauseUntil)
}

// ResumeConsumer resumes a consumer.
func (s *stream) ResumeConsumer(ctx context.Context, name string) (*ConsumerPauseResponse, error) {
return resumeConsumer(ctx, s.jetStream, s.name, name)
}

// Info returns StreamInfo from the server.
func (s *stream) Info(ctx context.Context, opts ...StreamInfoOpt) (*StreamInfo, error) {
ctx, cancel := wrapContextWithoutDeadline(ctx)
Expand Down
9 changes: 9 additions & 0 deletions jetstream/test/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ func TestConsumerInfo(t *testing.T) {
if info.Config.Description != "test consumer" {
t.Fatalf("Invalid consumer description; expected: 'test consumer'; got: %s", info.Config.Description)
}
if info.Config.PauseUntil != nil {
t.Fatalf("Consumer should not be paused")
}
if info.Paused != false {
t.Fatalf("Consumer should not be paused")
}
if info.PauseRemaining != 0 {
t.Fatalf("Consumer should not be paused")
}

// update consumer and see if info is updated
_, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Expand Down
129 changes: 129 additions & 0 deletions jetstream/test/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1483,3 +1483,132 @@ func TestPurgeStream(t *testing.T) {
})
}
}

func TestPauseConsumer(t *testing.T) {
srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)

nc, err := nats.Connect(srv.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

defer nc.Close()

s, err := js.CreateStream(context.TODO(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

t.Run("create a paused consumer", func(t *testing.T) {
const consumerName = "durr"
pauseUntil := time.Now().Add(1 * time.Minute)
consumer, err := s.CreateOrUpdateConsumer(context.TODO(), jetstream.ConsumerConfig{
Durable: consumerName,
AckPolicy: jetstream.AckAllPolicy,
Description: "desc",
PauseUntil: &pauseUntil,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

info, err := consumer.Info(context.TODO())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !info.Paused {
t.Fatalf("Consumer should be paused")
}
if info.PauseRemaining <= time.Duration(0) {
t.Fatalf("PauseRemaining should be greater than 0")
}
})

t.Run("pausing a consumer that does not exists", func(t *testing.T) {
const consumerName = "durr1"
pauseUntil := time.Now().Add(1 * time.Minute)
_, err := s.PauseConsumer(context.TODO(), consumerName, pauseUntil)
if err == nil {
t.Fatalf("Expected error; got: %v", err)
}
if !errors.Is(err, jetstream.ErrConsumerNotFound) {
t.Fatalf("Expected error: %v; got: %v", jetstream.ErrConsumerNotFound, err)
}
})

t.Run("pausing consumer", func(t *testing.T) {
const consumerName = "durr2"
consumer, err := s.CreateOrUpdateConsumer(context.TODO(), jetstream.ConsumerConfig{
Durable: consumerName,
AckPolicy: jetstream.AckAllPolicy,
Description: "desc",
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

info, err := consumer.Info(context.TODO())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if info.Paused {
t.Fatalf("Consumer should not be paused")
}

pauseUntil := time.Now().Add(1 * time.Minute)
resp, err := s.PauseConsumer(context.TODO(), consumerName, pauseUntil)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if !resp.Paused {
t.Fatalf("Consumer should be paused")
}
if !resp.PauseUntil.Equal(pauseUntil) {
t.Fatalf("Invalid pause until; want: %v; got: %v", pauseUntil, resp.PauseUntil)
}
if resp.PauseRemaining <= time.Duration(0) {
t.Fatalf("PauseRemaining should be greater than 0")
}
})

t.Run("resuming consumer", func(t *testing.T) {
const consumerName = "durr3"
pauseUntil := time.Now().Add(20 * time.Minute)
consumer, err := s.CreateOrUpdateConsumer(context.TODO(), jetstream.ConsumerConfig{
Durable: consumerName,
AckPolicy: jetstream.AckAllPolicy,
Description: "desc",
PauseUntil: &pauseUntil,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

info, err := consumer.Info(context.TODO())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !info.Paused {
t.Fatalf("Consumer should be paused")
}

resp, err := s.ResumeConsumer(context.TODO(), consumerName)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if resp.Paused {
t.Fatalf("Consumer should not be paused")
}
if resp.PauseRemaining != time.Duration(0) {
t.Fatalf("PauseRemaining should be 0")
}
})
}

0 comments on commit dd5b462

Please sign in to comment.