Skip to content

Commit

Permalink
Merge pull request #11 from shenyute/dev/stream_opt
Browse files Browse the repository at this point in the history
Add new stream options to support size limit and approximate match.
  • Loading branch information
dranikpg authored Nov 16, 2023
2 parents 9c6a580 + 60f8619 commit df2c754
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
19 changes: 18 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
)

var NoExpiration = time.Duration(0)
var NoMaxLen = int64(0)

// now is defined here so it can be overridden in unit tests
var now = time.Now
Expand All @@ -18,6 +19,8 @@ type Stream[T any] struct {
client redis.Cmdable
stream string
ttl time.Duration
maxLen int64
approx bool
}

type Options struct {
Expand All @@ -26,16 +29,24 @@ type Options struct {
// The default is No Expiration.
// Note that TTL is performed when messages are Added, so Range requests won't clean up old messages.
TTL time.Duration
// MaxLen is an optional parameter to specify the maximum length of the stream.
MaxLen int64
// Approx causes MaxLen and TTL to be approximate instead of exact.
Approx bool
}

// Create a new stream with messages of type T.
// Options are optional (the parameter can be nil to use defaults).
func NewStream[T any](client redis.Cmdable, stream string, opt *Options) Stream[T] {
var approx bool
maxLen := NoMaxLen
ttl := NoExpiration
if opt != nil {
ttl = opt.TTL
maxLen = opt.MaxLen
approx = opt.Approx
}
return Stream[T]{client: client, stream: stream, ttl: ttl}
return Stream[T]{client: client, stream: stream, ttl: ttl, maxLen: maxLen, approx: approx}
}

// Key returns the redis stream key.
Expand All @@ -49,6 +60,10 @@ func (s Stream[T]) Add(ctx context.Context, v T, idarg ...string) (string, error
if len(idarg) > 0 {
id = idarg[0]
}
var maxLen int64
if s.maxLen > NoMaxLen {
maxLen = s.maxLen
}
minID := ""
if s.ttl > NoExpiration {
minID = strconv.Itoa(int(now().Add(-s.ttl).UnixMilli()))
Expand All @@ -64,6 +79,8 @@ func (s Stream[T]) Add(ctx context.Context, v T, idarg ...string) (string, error
Values: vals,
ID: id,
MinID: minID,
MaxLen: maxLen,
Approx: s.approx,
}).Result()

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestStream_TTL(t *testing.T) {
ms.SetTime(ts)

ttl := 10 * time.Second
stream := NewStream[Person](rdb, "s1", &Options{ttl})
stream := NewStream[Person](rdb, "s1", &Options{TTL: ttl})
// Add first entry.
_, err := stream.Add(ctx, Person{Name: "First"})
assert.NoError(t, err)
Expand Down

0 comments on commit df2c754

Please sign in to comment.