-
Notifications
You must be signed in to change notification settings - Fork 452
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
First draft of using redis streams in bold (for discussion) #2226
Conversation
type jsonMarshaller[T any] struct{} | ||
|
||
// Marshal converts a GetLeavesHashRequest into a JSON byte slice. | ||
func (j jsonMarshaller[T]) Marshal(v T) []byte { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good example why we should change Marshaller to return []byte + error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That has already been changed week ago in c8101c2
} | ||
|
||
// Generic marshaller | ||
type jsonMarshaller[T any] struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like jsonMarshaller..
it brings up the question: should Publisher/subscriber avoid Marshaller and instead use json.Marshall / json.Unmarshal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that is the intention, but I want to make incremental changes to speed up the review process a bit. Code isn't in use, so unless you insist on it, I'll follow up in in separate PR.
@@ -122,6 +126,27 @@ func (a *ExecServerAPI) removeOldRuns(ctx context.Context) time.Duration { | |||
func (a *ExecServerAPI) Start(ctx_in context.Context) { | |||
a.StopWaiter.Start(ctx_in, a) | |||
a.CallIteratively(a.removeOldRuns) | |||
if a.consumer != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want multiple (hot-configurale number) of threads consuming and working on messages concurrently
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to multiple consumers.
As for hot-configurable, you mean we need to have ability to stop some/ start more consumers on the fly without restart ?
if err := a.consumer.SetResult(ctx, msg.ID, hashes); err != nil { | ||
log.Error("Error setting result", "error", err) | ||
} | ||
return time.Second |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
waiting a second makes sense in case of an error, but if no error you should return 0 so it would go back to work immediately
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
producerCfg *pubsub.ProducerConfig | ||
} | ||
|
||
type ExecutionClientOpts struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
config structs should only contain config values. Not fetchers and certainly not node/stack/et.
You want to have:
- ExecutionClientConfig struct would include ProducerConfig and a ClientConfig
- ExecutionClientConfigFetcher which is just a function returning ExecutionClientConfig
- NewExecutionClient recieves ExecutionClientConfigFetcher and *node.Node
-
- The function can create a ClientConfigFetcher from it's ExecutionClientConfigFetcher
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a config struct, that's an option struct (https://google.github.io/styleguide/go/best-practices#option-structure), but that part wasn't meant for a review, dropped it altogether to draw your attention to review using pubsub part only.
Closing in favor of: #2354 |
Please ignore pubsub/ folder, that's being reviewed in #2196. This is just a draft for discussion before I fully integrate it to avoid too many iteration over the implementation.