Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis val fixes #2486

Merged
merged 11 commits into from
Jul 18, 2024
5 changes: 4 additions & 1 deletion pubsub/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pubsub

import (
"context"
"strings"

"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
Expand All @@ -22,7 +23,9 @@ func CreateStream(ctx context.Context, streamName string, client redis.Universal
func StreamExists(ctx context.Context, streamName string, client redis.UniversalClient) bool {
got, err := client.Do(ctx, "XINFO", "STREAM", streamName).Result()
if err != nil {
log.Error("Reading redis streams", "error", err)
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
if !strings.Contains(err.Error(), "no such key") {
log.Error("redis error", "err", err, "searching stream", streamName)
}
return false
}
return got != nil
Expand Down
182 changes: 121 additions & 61 deletions pubsub/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -59,27 +62,31 @@ type ProducerConfig struct {
KeepAliveTimeout time.Duration `koanf:"keepalive-timeout"`
// Interval duration for checking the result set by consumers.
CheckResultInterval time.Duration `koanf:"check-result-interval"`
CheckPendingItems int64 `koanf:"check-pending-items"`
}

var DefaultProducerConfig = ProducerConfig{
EnableReproduce: true,
CheckPendingInterval: time.Second,
KeepAliveTimeout: 5 * time.Minute,
CheckResultInterval: 5 * time.Second,
CheckPendingItems: 256,
}

var TestProducerConfig = ProducerConfig{
EnableReproduce: true,
EnableReproduce: false,
CheckPendingInterval: 10 * time.Millisecond,
KeepAliveTimeout: 100 * time.Millisecond,
CheckResultInterval: 5 * time.Millisecond,
CheckPendingItems: 256,
}

func ProducerAddConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable-reproduce", DefaultProducerConfig.EnableReproduce, "when enabled, messages with dead consumer will be re-inserted into the stream")
f.Duration(prefix+".check-pending-interval", DefaultProducerConfig.CheckPendingInterval, "interval in which producer checks pending messages whether consumer processing them is inactive")
f.Duration(prefix+".check-result-interval", DefaultProducerConfig.CheckResultInterval, "interval in which producer checks pending messages whether consumer processing them is inactive")
f.Duration(prefix+".keepalive-timeout", DefaultProducerConfig.KeepAliveTimeout, "timeout after which consumer is considered inactive if heartbeat wasn't performed")
f.Int64(prefix+".check-pending-items", DefaultProducerConfig.CheckPendingItems, "items to screen during check-pending")
}

func NewProducer[Request any, Response any](client redis.UniversalClient, streamName string, cfg *ProducerConfig) (*Producer[Request, Response], error) {
Expand All @@ -99,70 +106,146 @@ func NewProducer[Request any, Response any](client redis.UniversalClient, stream
}, nil
}

func (p *Producer[Request, Response]) errorPromisesFor(msgs []*Message[Request]) {
func (p *Producer[Request, Response]) errorPromisesFor(msgIds []string) {
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
for _, msg := range msgs {
if promise, found := p.promises[msg.ID]; found {
for _, msg := range msgIds {
if promise, found := p.promises[msg]; found {
promise.ProduceError(fmt.Errorf("internal error, consumer died while serving the request"))
delete(p.promises, msg.ID)
delete(p.promises, msg)
}
}
}

// checkAndReproduce reproduce pending messages that were sent to consumers
// that are currently inactive.
func (p *Producer[Request, Response]) checkAndReproduce(ctx context.Context) time.Duration {
msgs, err := p.checkPending(ctx)
staleIds, err := p.checkPending(ctx)
if err != nil {
log.Error("Checking pending messages", "error", err)
return p.cfg.CheckPendingInterval
}
if len(msgs) == 0 {
if len(staleIds) == 0 {
return p.cfg.CheckPendingInterval
}
if !p.cfg.EnableReproduce {
p.errorPromisesFor(msgs)
return p.cfg.CheckPendingInterval
if p.cfg.EnableReproduce {
err = p.reproduceIds(ctx, staleIds)
if err != nil {
log.Warn("filed reproducing messages", "err", err)
}
} else {
p.errorPromisesFor(staleIds)
}
acked := make(map[string]Request)
for _, msg := range msgs {
return p.cfg.CheckPendingInterval
}

func (p *Producer[Request, Response]) reproduceIds(ctx context.Context, staleIds []string) error {
log.Info("Attempting to claim", "messages", staleIds)
claimedMsgs, err := p.client.XClaim(ctx, &redis.XClaimArgs{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible for two clients calling reproduceIds to claim the same ids, before either client acks it? Or is it no longer claimable after one client claims it? I haven't fully figured out the redis streams model.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all - this is called from the (single) producer and not from the consumers.
As far as I understand what prevent race conditions in call is the "minIdle" - if a message was called by anyone the "idle" timer resets so the next "call" will fail.
So if the producer is trying to reproduce a message while the client is trying to write a response - one of them will "probably" fail (we shouldn't get there because producer will only try to claim a message if the worker keepalive stops)

Copy link
Collaborator

@PlasmaPower PlasmaPower Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I was unaware the model of this was a single producer. We don't support running multiple stakers against a single redis stream?

Edit: just saw your later response on why we only support a single producer. I'll think about this some more.

Stream: p.redisStream,
Group: p.redisGroup,
Consumer: p.id,
MinIdle: p.cfg.KeepAliveTimeout,
Messages: staleIds,
}).Result()
if err != nil {
return fmt.Errorf("claiming ownership on messages: %v, error: %w", staleIds, err)
}
for _, msg := range claimedMsgs {
data, ok := (msg.Values[messageKey]).(string)
if !ok {
log.Error("redis producer reproduce: message not string", "id", msg.ID, "value", msg.Values[messageKey])
continue
}
var req Request
if err := json.Unmarshal([]byte(data), &req); err != nil {
log.Error("redis producer reproduce: message not a request", "id", msg.ID, "err", err, "value", msg.Values[messageKey])
continue
}
if _, err := p.client.XAck(ctx, p.redisStream, p.redisGroup, msg.ID).Result(); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also continue if this returns zero, meaning it wasn't successfully ack'd?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking in on this again, we seem to still be ignoring the int returned by XAck

log.Error("ACKing message", "error", err)
log.Error("redis producer reproduce: could not ACK", "id", msg.ID, "err", err)
continue
}
acked[msg.ID] = msg.Value
}
for k, v := range acked {
// Only re-insert messages that were removed the the pending list first.
_, err := p.reproduce(ctx, v, k)
if err != nil {
log.Error("Re-inserting pending messages with inactive consumers", "error", err)
if _, err := p.reproduce(ctx, req, msg.ID); err != nil {
log.Error("redis producer reproduce: error", "err", err)
}
}
return p.cfg.CheckPendingInterval
return nil
}

func setMinIdInt(min *[2]uint64, id string) error {
idParts := strings.Split(id, "-")
if len(idParts) != 2 {
return fmt.Errorf("invalid i.d: %v", id)
}
idTimeStamp, err := strconv.ParseUint(idParts[0], 10, 64)
if err != nil {
return fmt.Errorf("invalid i.d: %v err: %w", id, err)
}
if idTimeStamp > min[0] {
return nil
}
idSerial, err := strconv.ParseUint(idParts[1], 10, 64)
if err != nil {
return fmt.Errorf("invalid i.d serial: %v err: %w", id, err)
}
if idTimeStamp < min[0] {
min[0] = idTimeStamp
min[1] = idSerial
return nil
}
// idTimeStamp == min[0]
if idSerial < min[1] {
min[1] = idSerial
}
return nil
}

// checkResponses checks iteratively whether response for the promise is ready.
func (p *Producer[Request, Response]) checkResponses(ctx context.Context) time.Duration {
minIdInt := [2]uint64{math.MaxUint64, math.MaxUint64}
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
responded := 0
errored := 0
for id, promise := range p.promises {
if ctx.Err() != nil {
return 0
}
res, err := p.client.Get(ctx, id).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
continue
errSetId := setMinIdInt(&minIdInt, id)
if errSetId != nil {
log.Error("error setting minId", "err", err)
return p.cfg.CheckResultInterval
}
if !errors.Is(err, redis.Nil) {
log.Error("Error reading value in redis", "key", id, "error", err)
}
log.Error("Error reading value in redis", "key", id, "error", err)
continue
}
var resp Response
if err := json.Unmarshal([]byte(res), &resp); err != nil {
promise.ProduceError(fmt.Errorf("error unmarshalling: %w", err))
log.Error("Error unmarshaling", "value", res, "error", err)
continue
errored++
} else {
promise.Produce(resp)
responded++
}
promise.Produce(resp)
delete(p.promises, id)
}
var trimmed int64
var trimErr error
minId := "+"
if minIdInt[0] < math.MaxUint64 {
minId = fmt.Sprintf("%d-%d", minIdInt[0], minIdInt[1])
trimmed, trimErr = p.client.XTrimMinID(ctx, p.redisStream, minId).Result()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this potentially prune others' requests because they're not in our p.promises map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Short answer: it would.
Long answer:
Generally, we only expect to use this for arbitrator validations, and we only expect to need one producer in the system - otherwise would be wasting work.
Current approach is aligned for single nitro/multiple workers.
I have added the "prefix" field to allow multiple nitro nodes to use the same redis. That would require each to have their pool of workers and their own queue - not great but I don't think we'll really use it.

I think if we go for multi-producer-multi-consumer queue - there are two main options:

  1. create a mechanism for syncing the multiple producers, marking which messages are done, walking the queue and checking what to trim.
  2. define the queue with a max number of entries, add a mechanism for the producers to re-send the same message if one was trimmed before it got into a worker
    Both add complexity. 1st adds more complexity, 2nd makes redis size larger during work.

Considering the alternatives, I think the single-producer version makes sense at least for now.

} else {
trimmed, trimErr = p.client.XTrimMaxLen(ctx, p.redisStream, 0).Result()
}
log.Trace("trimming", "id", minId, "trimmed", trimmed, "responded", responded, "errored", errored, "trim-err", trimErr)
return p.cfg.CheckResultInterval
}

Expand All @@ -184,20 +267,23 @@ func (p *Producer[Request, Response]) reproduce(ctx context.Context, value Reque
if err != nil {
return nil, fmt.Errorf("marshaling value: %w", err)
}
// catching the promiseLock before we sendXadd makes sure promise ids will
// be always ascending
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
id, err := p.client.XAdd(ctx, &redis.XAddArgs{
Stream: p.redisStream,
Values: map[string]any{messageKey: val},
}).Result()
if err != nil {
return nil, fmt.Errorf("adding values to redis: %w", err)
}
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
promise := p.promises[oldKey]
if oldKey != "" && promise == nil {
// This will happen if the old consumer became inactive but then ack_d
// the message afterwards.
return nil, fmt.Errorf("error reproducing the message, could not find existing one")
// don't error
log.Warn("tried reproducing a message but it wasn't found - probably got response", "oldKey", oldKey)
}
if oldKey == "" || promise == nil {
pr := containers.NewPromise[Response](nil)
Expand Down Expand Up @@ -232,13 +318,14 @@ func (p *Producer[Request, Response]) havePromiseFor(messageID string) bool {
return found
}

func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Message[Request], error) {
// returns ids of pending messages that's worker doesn't appear alive
func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]string, error) {
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
pendingMessages, err := p.client.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: p.redisStream,
Group: p.redisGroup,
Start: "-",
End: "+",
Count: 100,
Count: p.cfg.CheckPendingItems,
}).Result()

if err != nil && !errors.Is(err, redis.Nil) {
Expand All @@ -247,6 +334,9 @@ func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Mess
if len(pendingMessages) == 0 {
return nil, nil
}
if len(pendingMessages) >= int(p.cfg.CheckPendingItems) {
log.Warn("redis producer: many pending items found", "stream", p.redisStream, "check-pending-items", p.cfg.CheckPendingItems)
}
// IDs of the pending messages with inactive consumers.
var ids []string
active := make(map[string]bool)
Expand All @@ -265,35 +355,5 @@ func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Mess
}
ids = append(ids, msg.ID)
}
if len(ids) == 0 {
log.Trace("There are no pending messages with inactive consumers")
return nil, nil
}
log.Info("Attempting to claim", "messages", ids)
claimedMsgs, err := p.client.XClaim(ctx, &redis.XClaimArgs{
Stream: p.redisStream,
Group: p.redisGroup,
Consumer: p.id,
MinIdle: p.cfg.KeepAliveTimeout,
Messages: ids,
}).Result()
if err != nil {
return nil, fmt.Errorf("claiming ownership on messages: %v, error: %w", ids, err)
}
var res []*Message[Request]
for _, msg := range claimedMsgs {
data, ok := (msg.Values[messageKey]).(string)
if !ok {
return nil, fmt.Errorf("casting request: %v to bytes", msg.Values[messageKey])
}
var req Request
if err := json.Unmarshal([]byte(data), &req); err != nil {
return nil, fmt.Errorf("marshaling value: %v, error: %w", msg.Values[messageKey], err)
}
res = append(res, &Message[Request]{
ID: msg.ID,
Value: req,
})
}
return res, nil
return ids, nil
}
Loading
Loading