-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add TTL to Operator RPC Client #259
Conversation
Shows a clear data race
// TODO: We never close `httpRpcClient` | ||
httpRpcClient, err := NewHTTPAggregatorRpcClient(c.AggregatorServerIpPortAddress, operatorId, registryCoordinatorAddress, logger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Up to discussion: how do we want to handle this dependency, in particular closing it?
listener RpcClientEventListener | ||
type RpcClient interface { | ||
Call(serviceMethod string, args any, reply any) error | ||
// TODO: Do we also want a `Close` method? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm against the idea of adding a Close
method here, but it would allow us to add a Close
method to the AggregatorRpcClient
such that it would close the provided RpcClient
. Ideally, a component should not be responsible for closing the provided dependencies, but it could be a quick & dirty solution.
nodeConfig, _, _ := genOperatorConfig(t, ctx, "3", mainnetAnvil, rollupAnvils, rabbitMq) | ||
operator := startOperator(t, ctx, nodeConfig) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We had to change the order of initialization in the integration test: if the operator cannot connect to the aggregator on start it will quickly crash. Note that this has no effect during processing: if the connection to the aggregator fails then requests will be retried according to a provided strategy
operator/rpc_client.go
Outdated
// By defaul, retry with a delay of 2 seconds between calls, | ||
// at most 10 times, and only if the error is recent enough (24 hours) | ||
// TODO: Discuss the "recent enough" part | ||
func DefaultAggregatorRpcRetry() RetryStrategy { | ||
return RetryAnd( | ||
RetryWithDelay(2*time.Second), RetryAnd( | ||
RetryAtMost(10), | ||
RetryIfRecentEnough(24*time.Hour))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would be a reasonable TTL for messages? This is a very conservative default just to start the discussion.
I guess let me first start with current design & misunderstandings regarding it, and then I will share my opinion about your approach. There were some evolutions since first rework but I will describe current one: Now we come to this part
We were aware of this problem and in one of the editions @Hyodar introduced message expiration here. So the message is a candidate to resend only 10 times. Now about this proposal and its important flaw, which is a reason why in my opinion we can't proceed with it. It pretty much comes down to this part in
Instead of one goroutine trying to resend a message once in Overall I'm open to discussions here but would like also hear @Hyodar opinion on this radical change and would propose to wait with this PR until then. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to the comment above
Appreciate the feedback. This is mentioned in the PR description but I believe is mostly a tradeoff:
It was my understanding that #212 was asking for three retry conditions: Try at most 10 times with a timeout in between of each retry of 2 seconds while the messages are recent enough (TTL). Note that |
I don't particularly dislike this - the scalability thing is concerning but maybe not that much, would need a benchmark for that -, but I think it's not necessary indeed and the current solution is already more robust in this sense. In #212 I already described and suggested solutions - IMO for the operator we just need to tidy up the retry mechanism a bit, maybe, and use a queue that already drops expired messages automatically, and for the aggregator just ignore old messages and fix the timing check. Those are simple changes and should be tackled before anything more complex. |
Maybe I'm missing something but we're already dropping messages after 10 retries with 2 second delays in between. Do you suggest to add an additional goroutine that periodically inspects the queue and removes messages that are too old?
Moving the discussion regarding the aggregator back to #212. |
Current Behavior
Currently, failed messages are stored in a queue to be processed later by a background thread. The current design does not take into consideration the age of the message, meaning that old messages can still linger around for a lot of time. See #212 for more details
New Behavior
This PR completely changes the design of the RPC client:
RpcClient
that performs the actual RPC call. Go's nativerpc.Client
implements this interface.The reasoning behind this changes is as follows:
go client.SendX
), so we can handle retrying on each individual goroutine without requiring a background goroutine.Breaking Changes
In case order of messages is a requirement, then this is a breaking change since now we definitely not guarantee it (the old design might have tried (unsuccessfully) to guarantee it).
Since now we're adding TTL to messages, messages that used to be sent with large delays now will be dropped altogether.
As usual with refactors of this kind, there might be other implicit details that have been unintentionally changed so thorough review is required.