From 60f8619a94c296acbefaa21d254b311c0d31ddfb Mon Sep 17 00:00:00 2001 From: YuTeh Shen Date: Thu, 16 Nov 2023 13:52:42 +0800 Subject: [PATCH] Add new stream options to support size limit and approximate match. --- stream.go | 19 ++++++++++++++++++- stream_test.go | 2 +- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/stream.go b/stream.go index d95d71f..206cf2a 100644 --- a/stream.go +++ b/stream.go @@ -9,6 +9,7 @@ import ( ) var NoExpiration = time.Duration(0) +var NoMaxLen = int64(0) // now is defined here so it can be overridden in unit tests var now = time.Now @@ -18,6 +19,8 @@ type Stream[T any] struct { client redis.Cmdable stream string ttl time.Duration + maxLen int64 + approx bool } type Options struct { @@ -26,16 +29,24 @@ type Options struct { // The default is No Expiration. // Note that TTL is performed when messages are Added, so Range requests won't clean up old messages. TTL time.Duration + // MaxLen is an optional parameter to specify the maximum length of the stream. + MaxLen int64 + // Approx causes MaxLen and TTL to be approximate instead of exact. + Approx bool } // Create a new stream with messages of type T. // Options are optional (the parameter can be nil to use defaults). func NewStream[T any](client redis.Cmdable, stream string, opt *Options) Stream[T] { + var approx bool + maxLen := NoMaxLen ttl := NoExpiration if opt != nil { ttl = opt.TTL + maxLen = opt.MaxLen + approx = opt.Approx } - return Stream[T]{client: client, stream: stream, ttl: ttl} + return Stream[T]{client: client, stream: stream, ttl: ttl, maxLen: maxLen, approx: approx} } // Key returns the redis stream key. @@ -49,6 +60,10 @@ func (s Stream[T]) Add(ctx context.Context, v T, idarg ...string) (string, error if len(idarg) > 0 { id = idarg[0] } + var maxLen int64 + if s.maxLen > NoMaxLen { + maxLen = s.maxLen + } minID := "" if s.ttl > NoExpiration { minID = strconv.Itoa(int(now().Add(-s.ttl).UnixMilli())) @@ -64,6 +79,8 @@ func (s Stream[T]) Add(ctx context.Context, v T, idarg ...string) (string, error Values: vals, ID: id, MinID: minID, + MaxLen: maxLen, + Approx: s.approx, }).Result() if err != nil { diff --git a/stream_test.go b/stream_test.go index 538094b..3880ac7 100644 --- a/stream_test.go +++ b/stream_test.go @@ -188,7 +188,7 @@ func TestStream_TTL(t *testing.T) { ms.SetTime(ts) ttl := 10 * time.Second - stream := NewStream[Person](rdb, "s1", &Options{ttl}) + stream := NewStream[Person](rdb, "s1", &Options{TTL: ttl}) // Add first entry. _, err := stream.Add(ctx, Person{Name: "First"}) assert.NoError(t, err)