-
Notifications
You must be signed in to change notification settings - Fork 451
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Redis val fixes #2486
Redis val fixes #2486
Conversation
off by default, this will allow multiple streams to use the same redis
pubsub/producer.go
Outdated
if len(idParts) != 2 { | ||
return errors.New("invalid i.d") | ||
} | ||
idTimeStamp, err := strconv.Atoi(idParts[0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: in future, strconv.ParseUint(idParts[1], 10, 64) can do this directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!
pubsub/producer.go
Outdated
@@ -140,29 +178,81 @@ func (p *Producer[Request, Response]) checkAndReproduce(ctx context.Context) tim | |||
log.Error("Re-inserting pending messages with inactive consumers", "error", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't we merge this acked
loop into the previous loop? It looks like we can't merge all the loops together because the first loop might exit early to return an error, but these two last loops should be mergeable I think since neither exits early.
Also, while unlikely, couldn't a request get dropped inbetween when it gets acknowledged and when it gets reproduced?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merging loops - done.
Message Dropped - I fixed the edge case by avoiding the error in reproduce. There will be a new promise + request created that no-body will read but it should behave o.k.
|
||
func (p *Producer[Request, Response]) reproduceIds(ctx context.Context, staleIds []string) error { | ||
log.Info("Attempting to claim", "messages", staleIds) | ||
claimedMsgs, err := p.client.XClaim(ctx, &redis.XClaimArgs{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible for two clients calling reproduceIds to claim the same ids, before either client acks it? Or is it no longer claimable after one client claims it? I haven't fully figured out the redis streams model.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all - this is called from the (single) producer and not from the consumers.
As far as I understand what prevent race conditions in call is the "minIdle" - if a message was called by anyone the "idle" timer resets so the next "call" will fail.
So if the producer is trying to reproduce a message while the client is trying to write a response - one of them will "probably" fail (we shouldn't get there because producer will only try to claim a message if the worker keepalive stops)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I was unaware the model of this was a single producer. We don't support running multiple stakers against a single redis stream?
Edit: just saw your later response on why we only support a single producer. I'll think about this some more.
acked := make(map[string]Request) | ||
for _, msg := range msgs { | ||
for _, msg := range messages { | ||
if _, err := p.client.XAck(ctx, p.redisStream, p.redisGroup, msg.ID).Result(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also continue
if this returns zero, meaning it wasn't successfully ack'd?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking in on this again, we seem to still be ignoring the int returned by XAck
minId := "+" | ||
if minIdInt[0] < math.MaxUint64 { | ||
minId = fmt.Sprintf("%d-%d", minIdInt[0], minIdInt[1]) | ||
trimmed, trimErr = p.client.XTrimMinID(ctx, p.redisStream, minId).Result() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this potentially prune others' requests because they're not in our p.promises
map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Short answer: it would.
Long answer:
Generally, we only expect to use this for arbitrator validations, and we only expect to need one producer in the system - otherwise would be wasting work.
Current approach is aligned for single nitro/multiple workers.
I have added the "prefix" field to allow multiple nitro nodes to use the same redis. That would require each to have their pool of workers and their own queue - not great but I don't think we'll really use it.
I think if we go for multi-producer-multi-consumer queue - there are two main options:
- create a mechanism for syncing the multiple producers, marking which messages are done, walking the queue and checking what to trim.
- define the queue with a max number of entries, add a mechanism for the producers to re-send the same message if one was trimmed before it got into a worker
Both add complexity. 1st adds more complexity, 2nd makes redis size larger during work.
Considering the alternatives, I think the single-producer version makes sense at least for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM on the single producer model. We should document that, and I'd still like to see multiple producer support in the future.
added an issue to support multiple producers. |
fixes: NIT-2630
also, Some fixes for redis validator: