diff --git a/staker/stateless_block_validator.go b/staker/stateless_block_validator.go index ea45e76dde..82ca052efe 100644 --- a/staker/stateless_block_validator.go +++ b/staker/stateless_block_validator.go @@ -236,11 +236,7 @@ func NewStatelessBlockValidator( validationSpawners[i] = server_api.NewValidationClient(valConfFetcher, stack) } valConfFetcher := func() *rpcclient.ClientConfig { return &config().ValidationServerConfigs[0] } - execClient := server_api.NewExecutionClient( - &server_api.ExecutionClientOpts{ - Config: valConfFetcher, - Stack: stack, - }) + execClient := server_api.NewExecutionClient(valConfFetcher, stack) validator := &StatelessBlockValidator{ config: config(), execSpawner: execClient, diff --git a/system_tests/validation_mock_test.go b/system_tests/validation_mock_test.go index 580d54dd09..ff896006e2 100644 --- a/system_tests/validation_mock_test.go +++ b/system_tests/validation_mock_test.go @@ -195,12 +195,7 @@ func TestValidationServerAPI(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() _, validationDefault := createMockValidationNode(t, ctx, nil) - client := server_api.NewExecutionClient( - &server_api.ExecutionClientOpts{ - Config: StaticFetcherFrom(t, &rpcclient.TestClientConfig), - Stack: validationDefault, - }, - ) + client := server_api.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), validationDefault) err := client.Start(ctx) Require(t, err) @@ -266,12 +261,7 @@ func TestValidationClientRoom(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() mockSpawner, spawnerStack := createMockValidationNode(t, ctx, nil) - client := server_api.NewExecutionClient( - &server_api.ExecutionClientOpts{ - Config: StaticFetcherFrom(t, &rpcclient.TestClientConfig), - Stack: spawnerStack, - }, - ) + client := server_api.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), spawnerStack) err := client.Start(ctx) Require(t, err) @@ -358,10 +348,10 @@ func TestExecutionKeepAlive(t *testing.T) { _, validationShortTO := createMockValidationNode(t, ctx, &shortTimeoutConfig) configFetcher := StaticFetcherFrom(t, &rpcclient.TestClientConfig) - clientDefault := server_api.NewExecutionClient(&server_api.ExecutionClientOpts{Config: configFetcher, Stack: validationDefault}) + clientDefault := server_api.NewExecutionClient(configFetcher, validationDefault) err := clientDefault.Start(ctx) Require(t, err) - clientShortTO := server_api.NewExecutionClient(&server_api.ExecutionClientOpts{Config: configFetcher, Stack: validationShortTO}) + clientShortTO := server_api.NewExecutionClient(configFetcher, validationShortTO) err = clientShortTO.Start(ctx) Require(t, err) diff --git a/validator/server_api/validation_api.go b/validator/server_api/validation_api.go index 3a225be5d7..c8f738f169 100644 --- a/validator/server_api/validation_api.go +++ b/validator/server_api/validation_api.go @@ -60,7 +60,7 @@ type ExecServerAPI struct { nextId uint64 runs map[uint64]*execRunEntry - consumer *pubsub.Consumer[GetLeavesHashRequest, []common.Hash] + consumers []*pubsub.Consumer[GetLeavesHashRequest, []common.Hash] } func NewExecutionServerAPI(valSpawner validator.ValidationSpawner, execution validator.ExecutionSpawner, config server_arb.ArbitratorSpawnerConfigFecher) *ExecServerAPI { @@ -126,26 +126,29 @@ func (a *ExecServerAPI) removeOldRuns(ctx context.Context) time.Duration { func (a *ExecServerAPI) Start(ctx_in context.Context) { a.StopWaiter.Start(ctx_in, a) a.CallIteratively(a.removeOldRuns) - if a.consumer != nil { - a.consumer.Start(ctx_in) - a.StopWaiter.CallIteratively(func(ctx context.Context) time.Duration { - msg, err := a.consumer.Consume(ctx) - if err != nil { - log.Error("Consuming request", "error", err) + if len(a.consumers) != 0 { + for i := 0; i < len(a.consumers); i++ { + a.consumers[i].Start(ctx_in) + a.StopWaiter.CallIteratively(func(ctx context.Context) time.Duration { + msg, err := a.consumers[i].Consume(ctx) + if err != nil { + log.Error("Consuming request", "error", err) + return 0 + } + hashes, err := a.GetLeavesWithStepSize(ctx, msg.Value.ExecutionID, msg.Value.FromBatch, msg.Value.MachineStartIndex, msg.Value.StepSize, msg.Value.NumDesiredLeaves) + if err != nil { + // TODO: log other fields as well. + log.Error("Error getting leaves with stepsize", "error", err) + return 0 + } + if err := a.consumers[i].SetResult(ctx, msg.ID, hashes); err != nil { + log.Error("Error setting result", "error", err) + return 0 + } // TODO: consider having this as config param. return time.Second - } - hashes, err := a.GetLeavesWithStepSize(ctx, msg.Value.ExecutionID, msg.Value.FromBatch, msg.Value.MachineStartIndex, msg.Value.StepSize, msg.Value.NumDesiredLeaves) - if err != nil { - // TODO: log other fields as well. - log.Error("Error getting leaves with stepsize", "error", err) - } - if err := a.consumer.SetResult(ctx, msg.ID, hashes); err != nil { - log.Error("Error setting result", "error", err) - } - return time.Second - }) - + }) + } } } diff --git a/validator/server_api/validation_client.go b/validator/server_api/validation_client.go index 3c02b9b9d5..e9aea588d9 100644 --- a/validator/server_api/validation_client.go +++ b/validator/server_api/validation_client.go @@ -3,7 +3,6 @@ package server_api import ( "context" "encoding/base64" - "encoding/json" "errors" "sync/atomic" "time" @@ -106,58 +105,16 @@ type GetLeavesHashRequest struct { NumDesiredLeaves uint64 } -// Generic marshaller -type jsonMarshaller[T any] struct{} - -// Marshal converts a GetLeavesHashRequest into a JSON byte slice. -func (j jsonMarshaller[T]) Marshal(v T) []byte { - data, err := json.Marshal(v) - if err != nil { - log.Error("error marshaling", "value", v, "error", err) - return nil - } - return data -} - -// Unmarshal converts a JSON byte slice into a GetLeavesHashRequest. -func (j jsonMarshaller[T]) Unmarshal(val []byte) (T, error) { - var v T - err := json.Unmarshal(val, &v) - if err != nil { - return v, err - } - return v, nil -} - type ExecutionClient struct { ValidationClient producer *pubsub.Producer[GetLeavesHashRequest, []common.Hash] } -type RedisStreamCfg struct { - producerCfg *pubsub.ProducerConfig -} - -type ExecutionClientOpts struct { - Config rpcclient.ClientConfigFetcher - Stack *node.Node - RedisStreamCfg *RedisStreamCfg -} - -func NewExecutionClient(opts *ExecutionClientOpts) *ExecutionClient { - ret := &ExecutionClient{ValidationClient: *NewValidationClient(opts.Config, opts.Stack)} - if opts.RedisStreamCfg != nil { - p, err := pubsub.NewProducer[GetLeavesHashRequest, []common.Hash]( - opts.RedisStreamCfg.producerCfg, jsonMarshaller[GetLeavesHashRequest]{}, jsonMarshaller[[]common.Hash]{}, - ) - if err != nil { - log.Error("Creating new producer", "error", err) - return ret - } - ret.producer = p +func NewExecutionClient(config rpcclient.ClientConfigFetcher, stack *node.Node) *ExecutionClient { + return &ExecutionClient{ + ValidationClient: *NewValidationClient(config, stack), } - return ret } func (c *ExecutionClient) CreateBoldExecutionRun(wasmModuleRoot common.Hash, stepSize uint64, input *validator.ValidationInput) containers.PromiseInterface[validator.ExecutionRun] {