Skip to content

Commit

Permalink
Merge pull request #9 from nathan-cormier/pr
Browse files Browse the repository at this point in the history
TTL support (and a few enhancements)
  • Loading branch information
dranikpg authored Sep 8, 2023
2 parents 3d517d7 + 824a716 commit 9c6a580
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 37 deletions.
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,31 @@ errors.Is(msg.Err, errMyTypeFailedToParse)
Streams are simple wrappers for basic redis commands on a stream.

```go
stream := NewStream[Event](rdb, "my-stream")
stream := NewStream[Event](rdb, "my-stream", &Options{TTL: time.Hour})
stream.Add(ctx, Event{
Kind: "Example event",
Priority: 1,
})
```
The Options.TTL parameter will evict stream entries after the specified duration has elapsed (or it can be set to `NoExpiration`).

#### Metadata

The package defines a Metadata type as:
```
type Metadata map[string]any
```

This allows serialization (and deserialization) of generic structured metadata within the stream entries.
Any value that can be serialized to JSON can be inserted from a field of this type (it uses JSON marshaller under the hood).
For example:
```
stream.Add(ctx, EventWithMetadata{
Kind: "Example event",
Priority: 1,
Meta: Metadata{"string": "foobar", "float": float64(1234.5)},
})
```

### Installation

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/dranikpg/gtrs
go 1.18

require (
github.com/alicebob/miniredis/v2 v2.22.0
github.com/alicebob/miniredis/v2 v2.30.5
github.com/redis/go-redis/v9 v9.0.2
github.com/stretchr/testify v1.8.1
)
Expand All @@ -14,6 +14,6 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect
github.com/yuin/gopher-lua v1.1.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.22.0 h1:lIHHiSkEyS1MkKHCHzN+0mWrA4YdbGdimE5iZ2sHSzo=
github.com/alicebob/miniredis/v2 v2.22.0/go.mod h1:XNqvJdQJv5mSuVMc0ynneafpnL/zv52acZ6kqeS0t88=
github.com/alicebob/miniredis/v2 v2.30.5 h1:3r6kTHdKnuP4fkS8k2IrvSfxpxUTcW1SOL0wN7b7Dt0=
github.com/alicebob/miniredis/v2 v2.30.5/go.mod h1:b25qWj4fCEsBeAAR2mlb0ufImGC6uH3VlUfb/HS5zKg=
github.com/bsm/ginkgo/v2 v2.5.0 h1:aOAnND1T40wEdAtkGSkvSICWeQ8L3UASX7YVCqQx+eQ=
github.com/bsm/gomega v1.20.0 h1:JhAwLmtRzXFTx2AkALSLa8ijZafntmhSoU63Ok18Uq8=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
Expand All @@ -25,8 +25,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw=
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA=
github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE=
github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
60 changes: 56 additions & 4 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,40 @@ package gtrs

import (
"context"
"strconv"
"time"

"github.com/redis/go-redis/v9"
)

var NoExpiration = time.Duration(0)

// now is defined here so it can be overridden in unit tests
var now = time.Now

// Stream represents a redis stream with messages of type T.
type Stream[T any] struct {
client redis.Cmdable
stream string
ttl time.Duration
}

type Options struct {
// TTL is an optional parameter to specify how long entries stay in the stream before expiring,
// it only only works as expected when a non-custom id is used to Add a message.
// The default is No Expiration.
// Note that TTL is performed when messages are Added, so Range requests won't clean up old messages.
TTL time.Duration
}

// Create a new stream with messages of type T.
func NewStream[T any](client redis.Cmdable, stream string) Stream[T] {
return Stream[T]{client: client, stream: stream}
// Options are optional (the parameter can be nil to use defaults).
func NewStream[T any](client redis.Cmdable, stream string, opt *Options) Stream[T] {
ttl := NoExpiration
if opt != nil {
ttl = opt.TTL
}
return Stream[T]{client: client, stream: stream, ttl: ttl}
}

// Key returns the redis stream key.
Expand All @@ -28,11 +49,21 @@ func (s Stream[T]) Add(ctx context.Context, v T, idarg ...string) (string, error
if len(idarg) > 0 {
id = idarg[0]
}
minID := ""
if s.ttl > NoExpiration {
minID = strconv.Itoa(int(now().Add(-s.ttl).UnixMilli()))
}

id, err := s.client.XAdd(ctx, &redis.XAddArgs{
vals, err := structToMap(v)
if err != nil {
return "", err
}

id, err = s.client.XAdd(ctx, &redis.XAddArgs{
Stream: s.stream,
Values: structToMap(v),
Values: vals,
ID: id,
MinID: minID,
}).Result()

if err != nil {
Expand Down Expand Up @@ -63,6 +94,27 @@ func (s Stream[T]) Range(ctx context.Context, from, to string, count ...int64) (
return msgs, nil
}

// RevRange returns a portion of the stream in reverse order compared to Range. Calls XREVRANGE.
func (s Stream[T]) RevRange(ctx context.Context, from, to string, count ...int64) ([]Message[T], error) {
var redisSlice []redis.XMessage
var err error
if len(count) == 0 {
redisSlice, err = s.client.XRevRange(ctx, s.stream, from, to).Result()
} else {
redisSlice, err = s.client.XRevRangeN(ctx, s.stream, from, to, count[0]).Result()
}

if err != nil {
return nil, ReadError{Err: err}
}

msgs := make([]Message[T], len(redisSlice))
for i, msg := range redisSlice {
msgs[i] = toMessage[T](msg, s.stream)
}
return msgs, nil
}

// Len returns the current stream length. Calls XLEN.
func (s Stream[T]) Len(ctx context.Context) (int64, error) {
len, err := s.client.XLen(ctx, s.stream).Result()
Expand Down
87 changes: 81 additions & 6 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gtrs
import (
"context"
"testing"
"time"

"github.com/alicebob/miniredis/v2"
"github.com/redis/go-redis/v9"
Expand Down Expand Up @@ -32,7 +33,7 @@ func TestStream_RangeLenSimple(t *testing.T) {
ms, rdb := startMiniredis(t)
ctx := context.TODO()

stream := NewStream[Person](rdb, "s1")
stream := NewStream[Person](rdb, "s1", nil)

// Just a check for codecov :)
assert.Equal(t, "s1", stream.Key())
Expand Down Expand Up @@ -62,6 +63,16 @@ func TestStream_RangeLenSimple(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, int64(2), len)

values, err = stream.RevRange(ctx, "+", "-")
assert.Nil(t, err)
assert.Equal(t, []Message[Person]{
{ID: "0-2", Stream: "s1", Data: Person{Name: "Second"}},
{ID: "0-1", Stream: "s1", Data: Person{Name: "First"}},
}, values)
len, err = stream.Len(ctx)
assert.Nil(t, err)
assert.Equal(t, int64(2), len)

// Add third entry.
ms.XAdd("s1", "0-3", []string{"name", "Third"})

Expand All @@ -71,13 +82,20 @@ func TestStream_RangeLenSimple(t *testing.T) {
{ID: "0-1", Stream: "s1", Data: Person{Name: "First"}},
{ID: "0-2", Stream: "s1", Data: Person{Name: "Second"}},
}, values)

values, err = stream.RevRange(ctx, "+", "-", 2)
assert.Nil(t, err)
assert.Equal(t, []Message[Person]{
{ID: "0-3", Stream: "s1", Data: Person{Name: "Third"}},
{ID: "0-2", Stream: "s1", Data: Person{Name: "Second"}},
}, values)
}

func TestStream_RangeInterval(t *testing.T) {
ms, rdb := startMiniredis(t)
ctx := context.TODO()

stream := NewStream[Person](rdb, "s1")
stream := NewStream[Person](rdb, "s1", nil)

ms.XAdd("s1", "0-1", []string{"name", "First"})
ms.XAdd("s1", "0-2", []string{"name", "Second"})
Expand Down Expand Up @@ -113,10 +131,11 @@ func TestStream_Add(t *testing.T) {
_, rdb := startMiniredis(t)
ctx := context.TODO()

stream := NewStream[Person](rdb, "s1")
stream := NewStream[Person](rdb, "s1", nil)

// Add first entry.
stream.Add(ctx, Person{Name: "First"}, "0-1")
_, err := stream.Add(ctx, Person{Name: "First"}, "0-1")
assert.NoError(t, err)

len, err := stream.Len(ctx)
assert.Nil(t, err)
Expand All @@ -129,7 +148,8 @@ func TestStream_Add(t *testing.T) {
}, vals)

// Add second entry.
stream.Add(ctx, Person{Name: "Second"})
_, err = stream.Add(ctx, Person{Name: "Second"})
assert.NoError(t, err)

len, err = stream.Len(ctx)
assert.Nil(t, err)
Expand All @@ -142,7 +162,7 @@ func TestStream_Error(t *testing.T) {

ms.Close()

stream := NewStream[Person](rdb, "s1")
stream := NewStream[Person](rdb, "s1", nil)

_, err := stream.Range(ctx, "-", "+")
assert.NotNil(t, err)
Expand All @@ -156,3 +176,58 @@ func TestStream_Error(t *testing.T) {
assert.NotNil(t, err)
assert.IsType(t, ReadError{}, err)
}

func TestStream_TTL(t *testing.T) {
ms, rdb := startMiniredis(t)
ctx := context.TODO()
ts := time.Date(2023, 1, 1, 4, 4, 5, 4000000, time.UTC)
defer func() { now = time.Now }()
now = func() time.Time {
return ts
}
ms.SetTime(ts)

ttl := 10 * time.Second
stream := NewStream[Person](rdb, "s1", &Options{ttl})
// Add first entry.
_, err := stream.Add(ctx, Person{Name: "First"})
assert.NoError(t, err)
vals, err := stream.Range(ctx, "-", "+")
assert.NoError(t, err)
assert.Len(t, vals, 1)

// Wait a few seconds and add a second entry.
ts = ts.Add(2 * time.Second)
ms.SetTime(ts)
_, err = stream.Add(ctx, Person{Name: "Second"})
assert.NoError(t, err)
vals, err = stream.Range(ctx, "-", "+")
assert.NoError(t, err)
assert.Len(t, vals, 2)

// Wait past the TTL and add a third entry.
ts = ts.Add(ttl)
ms.SetTime(ts)
_, err = stream.Add(ctx, Person{Name: "Third"})
assert.NoError(t, err)

vals, err = stream.Range(ctx, "-", "+")
assert.NoError(t, err)
assert.Len(t, vals, 2) // first entry should have expired already
assert.Equal(t, vals[0].Stream, "s1")
assert.Equal(t, vals[0].Data.Name, "Second")
assert.Equal(t, vals[1].Stream, "s1")
assert.Equal(t, vals[1].Data.Name, "Third")

// Wait longer and add a fourth entry.
ts = ts.Add(ttl + time.Millisecond)
ms.SetTime(ts)
_, err = stream.Add(ctx, Person{Name: "Fourth"})
assert.NoError(t, err)

vals, err = stream.Range(ctx, "-", "+")
assert.NoError(t, err)
assert.Len(t, vals, 1) // only the latest entry should still be in the stream
assert.Equal(t, vals[0].Stream, "s1")
assert.Equal(t, vals[0].Data.Name, "Fourth")
}
Loading

0 comments on commit 9c6a580

Please sign in to comment.