Skip to content

Commit

Permalink
Drop generic marshaller, implement jsonMarshaller instead
Browse files Browse the repository at this point in the history
  • Loading branch information
anodar committed Apr 12, 2024
1 parent ed97ebf commit c857658
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 53 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible
github.com/OffchainLabs/bold v0.0.0-00010101000000-000000000000
github.com/Shopify/toxiproxy v2.1.4+incompatible
github.com/alicebob/miniredis/v2 v2.21.0
github.com/alicebob/miniredis/v2 v2.32.1
github.com/andybalholm/brotli v1.0.4
github.com/aws/aws-sdk-go-v2 v1.16.4
github.com/aws/aws-sdk-go-v2/config v1.15.5
Expand Down Expand Up @@ -266,7 +266,7 @@ require (
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
Expand Down Expand Up @@ -323,7 +323,7 @@ require (
github.com/go-redis/redis/v8 v8.11.4
github.com/go-stack/stack v1.8.1 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/google/uuid v1.3.1
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/go-bexpr v0.1.10 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 h1:iW0a5
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5/go.mod h1:Y2QMoi1vgtOIfc+6DhrMOGkLoGzqSV2rKp4Sm+opsyA=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.21.0 h1:CdmwIlKUWFBDS+4464GtQiQ0R1vpzOgu4Vnd74rBL7M=
github.com/alicebob/miniredis/v2 v2.21.0/go.mod h1:XNqvJdQJv5mSuVMc0ynneafpnL/zv52acZ6kqeS0t88=
github.com/alicebob/miniredis/v2 v2.32.1 h1:Bz7CciDnYSaa0mX5xODh6GUITRSx+cVhjNoOR4JssBo=
github.com/alicebob/miniredis/v2 v2.32.1/go.mod h1:AqkLNAfUm0K07J28hnAyyQKf/x0YkCY/g5DCtuL01Mw=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY=
Expand Down Expand Up @@ -1693,8 +1693,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw=
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA=
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
Expand Down
10 changes: 5 additions & 5 deletions pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ type Consumer[Request any, Response any] struct {
id string
client redis.UniversalClient
cfg *ConsumerConfig
mReq Marshaller[Request]
mResp Marshaller[Response]
mReq jsonMarshaller[Request]
mResp jsonMarshaller[Response]
}

type Message[Request any] struct {
ID string
Value Request
}

func NewConsumer[Request any, Response any](ctx context.Context, cfg *ConsumerConfig, mReq Marshaller[Request], mResp Marshaller[Response]) (*Consumer[Request, Response], error) {
func NewConsumer[Request any, Response any](ctx context.Context, cfg *ConsumerConfig) (*Consumer[Request, Response], error) {
if cfg.RedisURL == "" {
return nil, fmt.Errorf("redis url cannot be empty")
}
Expand All @@ -78,8 +78,8 @@ func NewConsumer[Request any, Response any](ctx context.Context, cfg *ConsumerCo
id: uuid.NewString(),
client: c,
cfg: cfg,
mReq: mReq,
mResp: mResp,
mReq: jsonMarshaller[Request]{},
mResp: jsonMarshaller[Response]{},
}
return consumer, nil
}
Expand Down
45 changes: 33 additions & 12 deletions pubsub/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package pubsub

import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
Expand All @@ -29,18 +30,36 @@ const (
defaultGroup = "default_consumer_group"
)

type Marshaller[T any] interface {
Marshal(T) []byte
Unmarshal(val []byte) (T, error)
// Generic marshaller for Request and Response generic types.
// Note: unexported fields will be silently ignored.
type jsonMarshaller[T any] struct{}

// Marshal marshals generic type object with json marshal.
func (m jsonMarshaller[T]) Marshal(v T) []byte {
data, err := json.Marshal(v)
if err != nil {
log.Error("error marshaling", "value", v, "error", err)
return nil
}
return data
}

// Unmarshal converts a JSON byte slice back to the generic type object.
func (j jsonMarshaller[T]) Unmarshal(val []byte) (T, error) {
var v T
if err := json.Unmarshal(val, &v); err != nil {
return v, err
}
return v, nil
}

type Producer[Request any, Response any] struct {
stopwaiter.StopWaiter
id string
client redis.UniversalClient
cfg *ProducerConfig
mReq Marshaller[Request]
mResp Marshaller[Response]
mReq jsonMarshaller[Request]
mResp jsonMarshaller[Response]

promisesLock sync.RWMutex
promises map[string]*containers.Promise[Response]
Expand Down Expand Up @@ -85,7 +104,7 @@ var DefaultTestProducerConfig = &ProducerConfig{
RedisStream: "default",
RedisGroup: defaultGroup,
CheckPendingInterval: 10 * time.Millisecond,
KeepAliveTimeout: 20 * time.Millisecond,
KeepAliveTimeout: 100 * time.Millisecond,
CheckResultInterval: 5 * time.Millisecond,
}

Expand All @@ -98,7 +117,7 @@ func ProducerAddConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".redis-group", DefaultProducerConfig.RedisGroup, "redis stream consumer group name")
}

func NewProducer[Request any, Response any](cfg *ProducerConfig, mReq Marshaller[Request], mResp Marshaller[Response]) (*Producer[Request, Response], error) {
func NewProducer[Request any, Response any](cfg *ProducerConfig) (*Producer[Request, Response], error) {
if cfg.RedisURL == "" {
return nil, fmt.Errorf("redis url cannot be empty")
}
Expand All @@ -110,8 +129,8 @@ func NewProducer[Request any, Response any](cfg *ProducerConfig, mReq Marshaller
id: uuid.NewString(),
client: c,
cfg: cfg,
mReq: mReq,
mResp: mResp,
mReq: jsonMarshaller[Request]{},
mResp: jsonMarshaller[Response]{},
promises: make(map[string]*containers.Promise[Response]),
}, nil
}
Expand All @@ -120,8 +139,8 @@ func (p *Producer[Request, Response]) errorPromisesFor(msgs []*Message[Request])
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
for _, msg := range msgs {
if msg != nil {
p.promises[msg.ID].ProduceError(fmt.Errorf("internal error, consumer died while serving the request"))
if promise, found := p.promises[msg.ID]; found {
promise.ProduceError(fmt.Errorf("internal error, consumer died while serving the request"))
delete(p.promises, msg.ID)
}
}
Expand Down Expand Up @@ -208,7 +227,9 @@ func (p *Producer[Request, Response]) reproduce(ctx context.Context, value Reque
defer p.promisesLock.Unlock()
promise := p.promises[oldKey]
if oldKey != "" && promise == nil {
return nil, fmt.Errorf("errror reproducing the message, could not find existing one")
// This will happen if the old consumer became inactive but then ack_d
// the message afterwards.
return nil, fmt.Errorf("error reproducing the message, could not find existing one")
}
if oldKey == "" || promise == nil {
pr := containers.NewPromise[Response](nil)
Expand Down
48 changes: 19 additions & 29 deletions pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,12 @@ var (
messagesCount = 100
)

type testRequestMarshaller struct{}

func (t *testRequestMarshaller) Marshal(val string) []byte {
return []byte(val)
}

func (t *testRequestMarshaller) Unmarshal(val []byte) (string, error) {
return string(val), nil
}

type testResponseMarshaller struct{}

func (t *testResponseMarshaller) Marshal(val string) []byte {
return []byte(val)
type testRequest struct {
Request string
}

func (t *testResponseMarshaller) Unmarshal(val []byte) (string, error) {
return string(val), nil
type testResponse struct {
Response string
}

func createGroup(ctx context.Context, t *testing.T, streamName, groupName string, client redis.UniversalClient) {
Expand All @@ -50,7 +38,7 @@ func createGroup(ctx context.Context, t *testing.T, streamName, groupName string
func destroyGroup(ctx context.Context, t *testing.T, streamName, groupName string, client redis.UniversalClient) {
t.Helper()
if _, err := client.XGroupDestroy(ctx, streamName, groupName).Result(); err != nil {
log.Debug("Error creating stream group: %v", err)
log.Debug("Error destroying a stream group", "error", err)
}
}

Expand Down Expand Up @@ -80,7 +68,7 @@ func consumerCfg() *ConsumerConfig {
}
}

func newProducerConsumers(ctx context.Context, t *testing.T, opts ...configOpt) (*Producer[string, string], []*Consumer[string, string]) {
func newProducerConsumers(ctx context.Context, t *testing.T, opts ...configOpt) (*Producer[testRequest, testResponse], []*Consumer[testRequest, testResponse]) {
t.Helper()
redisURL := redisutil.CreateTestRedis(ctx, t)
prodCfg, consCfg := producerCfg(), consumerCfg()
Expand All @@ -92,14 +80,14 @@ func newProducerConsumers(ctx context.Context, t *testing.T, opts ...configOpt)
for _, o := range opts {
o.apply(consCfg, prodCfg)
}
producer, err := NewProducer[string, string](prodCfg, &testRequestMarshaller{}, &testResponseMarshaller{})
producer, err := NewProducer[testRequest, testResponse](prodCfg)
if err != nil {
t.Fatalf("Error creating new producer: %v", err)
}

var consumers []*Consumer[string, string]
var consumers []*Consumer[testRequest, testResponse]
for i := 0; i < consumersCount; i++ {
c, err := NewConsumer[string, string](ctx, consCfg, &testRequestMarshaller{}, &testResponseMarshaller{})
c, err := NewConsumer[testRequest, testResponse](ctx, consCfg)
if err != nil {
t.Fatalf("Error creating new consumer: %v", err)
}
Expand Down Expand Up @@ -146,10 +134,10 @@ func flatten(responses [][]string) []string {
return ret
}

func produceMessages(ctx context.Context, msgs []string, producer *Producer[string, string]) ([]*containers.Promise[string], error) {
var promises []*containers.Promise[string]
func produceMessages(ctx context.Context, msgs []string, producer *Producer[testRequest, testResponse]) ([]*containers.Promise[testResponse], error) {
var promises []*containers.Promise[testResponse]
for i := 0; i < messagesCount; i++ {
promise, err := producer.Produce(ctx, msgs[i])
promise, err := producer.Produce(ctx, testRequest{Request: msgs[i]})
if err != nil {
return nil, err
}
Expand All @@ -158,7 +146,7 @@ func produceMessages(ctx context.Context, msgs []string, producer *Producer[stri
return promises, nil
}

func awaitResponses(ctx context.Context, promises []*containers.Promise[string]) ([]string, error) {
func awaitResponses(ctx context.Context, promises []*containers.Promise[testResponse]) ([]string, error) {
var (
responses []string
errs []error
Expand All @@ -169,13 +157,13 @@ func awaitResponses(ctx context.Context, promises []*containers.Promise[string])
errs = append(errs, err)
continue
}
responses = append(responses, res)
responses = append(responses, res.Response)
}
return responses, errors.Join(errs...)
}

// consume messages from every consumer except stopped ones.
func consume(ctx context.Context, t *testing.T, consumers []*Consumer[string, string]) ([]map[string]string, [][]string) {
func consume(ctx context.Context, t *testing.T, consumers []*Consumer[testRequest, testResponse]) ([]map[string]string, [][]string) {
t.Helper()
gotMessages := messagesMaps(consumersCount)
wantResponses := make([][]string, consumersCount)
Expand All @@ -200,9 +188,9 @@ func consume(ctx context.Context, t *testing.T, consumers []*Consumer[string, st
if res == nil {
continue
}
gotMessages[idx][res.ID] = res.Value
gotMessages[idx][res.ID] = res.Value.Request
resp := fmt.Sprintf("result for: %v", res.ID)
if err := c.SetResult(ctx, res.ID, resp); err != nil {
if err := c.SetResult(ctx, res.ID, testResponse{Response: resp}); err != nil {
t.Errorf("Error setting a result: %v", err)
}
wantResponses[idx] = append(wantResponses[idx], resp)
Expand Down Expand Up @@ -253,6 +241,7 @@ func TestRedisProduce(t *testing.T) {
if err != nil {
t.Fatalf("Error awaiting responses: %v", err)
}
producer.StopAndWait()
for _, c := range consumers {
c.StopWaiter.StopAndWait()
}
Expand Down Expand Up @@ -302,6 +291,7 @@ func TestRedisReproduceDisabled(t *testing.T) {
if err == nil {
t.Fatalf("All promises were fullfilled with reproduce disabled and some consumers killed")
}
producer.StopAndWait()
for _, c := range consumers {
c.StopWaiter.StopAndWait()
}
Expand Down

0 comments on commit c857658

Please sign in to comment.