Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis val fixes #2486

Merged
merged 11 commits into from
Jul 18, 2024
Merged

Redis val fixes #2486

merged 11 commits into from
Jul 18, 2024

Conversation

tsahee
Copy link
Contributor

@tsahee tsahee commented Jul 12, 2024

fixes: NIT-2630

also, Some fixes for redis validator:

  • adding metrics
  • removing unnecessary error print when stream doesn't exist
  • producer: trim messages from the queue when not necessary
  • allow prefix for validation stream - off by default, this will allow using multiple validators with streams on the same redis

@cla-bot cla-bot bot added the s Automatically added by the CLA bot if the creator of a PR is registered as having signed the CLA. label Jul 12, 2024
if len(idParts) != 2 {
return errors.New("invalid i.d")
}
idTimeStamp, err := strconv.Atoi(idParts[0])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: in future, strconv.ParseUint(idParts[1], 10, 64) can do this directly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

rauljordan
rauljordan previously approved these changes Jul 12, 2024
pubsub/common.go Show resolved Hide resolved
@@ -140,29 +178,81 @@ func (p *Producer[Request, Response]) checkAndReproduce(ctx context.Context) tim
log.Error("Re-inserting pending messages with inactive consumers", "error", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we merge this acked loop into the previous loop? It looks like we can't merge all the loops together because the first loop might exit early to return an error, but these two last loops should be mergeable I think since neither exits early.

Also, while unlikely, couldn't a request get dropped inbetween when it gets acknowledged and when it gets reproduced?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merging loops - done.

Message Dropped - I fixed the edge case by avoiding the error in reproduce. There will be a new promise + request created that no-body will read but it should behave o.k.


func (p *Producer[Request, Response]) reproduceIds(ctx context.Context, staleIds []string) error {
log.Info("Attempting to claim", "messages", staleIds)
claimedMsgs, err := p.client.XClaim(ctx, &redis.XClaimArgs{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible for two clients calling reproduceIds to claim the same ids, before either client acks it? Or is it no longer claimable after one client claims it? I haven't fully figured out the redis streams model.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all - this is called from the (single) producer and not from the consumers.
As far as I understand what prevent race conditions in call is the "minIdle" - if a message was called by anyone the "idle" timer resets so the next "call" will fail.
So if the producer is trying to reproduce a message while the client is trying to write a response - one of them will "probably" fail (we shouldn't get there because producer will only try to claim a message if the worker keepalive stops)

Copy link
Collaborator

@PlasmaPower PlasmaPower Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I was unaware the model of this was a single producer. We don't support running multiple stakers against a single redis stream?

Edit: just saw your later response on why we only support a single producer. I'll think about this some more.

pubsub/producer.go Outdated Show resolved Hide resolved
acked := make(map[string]Request)
for _, msg := range msgs {
for _, msg := range messages {
if _, err := p.client.XAck(ctx, p.redisStream, p.redisGroup, msg.ID).Result(); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also continue if this returns zero, meaning it wasn't successfully ack'd?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking in on this again, we seem to still be ignoring the int returned by XAck

minId := "+"
if minIdInt[0] < math.MaxUint64 {
minId = fmt.Sprintf("%d-%d", minIdInt[0], minIdInt[1])
trimmed, trimErr = p.client.XTrimMinID(ctx, p.redisStream, minId).Result()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this potentially prune others' requests because they're not in our p.promises map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Short answer: it would.
Long answer:
Generally, we only expect to use this for arbitrator validations, and we only expect to need one producer in the system - otherwise would be wasting work.
Current approach is aligned for single nitro/multiple workers.
I have added the "prefix" field to allow multiple nitro nodes to use the same redis. That would require each to have their pool of workers and their own queue - not great but I don't think we'll really use it.

I think if we go for multi-producer-multi-consumer queue - there are two main options:

  1. create a mechanism for syncing the multiple producers, marking which messages are done, walking the queue and checking what to trim.
  2. define the queue with a max number of entries, add a mechanism for the producers to re-send the same message if one was trimmed before it got into a worker
    Both add complexity. 1st adds more complexity, 2nd makes redis size larger during work.

Considering the alternatives, I think the single-producer version makes sense at least for now.

pubsub/producer.go Show resolved Hide resolved
pubsub/producer.go Show resolved Hide resolved
validator/client/redis/producer.go Show resolved Hide resolved
@tsahee tsahee requested a review from PlasmaPower July 16, 2024 02:49
Copy link
Collaborator

@PlasmaPower PlasmaPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM on the single producer model. We should document that, and I'd still like to see multiple producer support in the future.

@tsahee
Copy link
Contributor Author

tsahee commented Jul 18, 2024

added an issue to support multiple producers.
Also - saw examples of calling XAck and only checking error. Also: we don't really use XAck right now and don't really rely on it succeeding.

@tsahee tsahee merged commit 57ac2ad into master Jul 18, 2024
13 checks passed
@tsahee tsahee deleted the redis_val_fixes branch July 18, 2024 14:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
design-approved s Automatically added by the CLA bot if the creator of a PR is registered as having signed the CLA.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants