Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
anodar committed Apr 11, 2024
1 parent 0d35932 commit ed97ebf
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 84 deletions.
6 changes: 1 addition & 5 deletions staker/stateless_block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,7 @@ func NewStatelessBlockValidator(
validationSpawners[i] = server_api.NewValidationClient(valConfFetcher, stack)
}
valConfFetcher := func() *rpcclient.ClientConfig { return &config().ValidationServerConfigs[0] }
execClient := server_api.NewExecutionClient(
&server_api.ExecutionClientOpts{
Config: valConfFetcher,
Stack: stack,
})
execClient := server_api.NewExecutionClient(valConfFetcher, stack)
validator := &StatelessBlockValidator{
config: config(),
execSpawner: execClient,
Expand Down
18 changes: 4 additions & 14 deletions system_tests/validation_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,7 @@ func TestValidationServerAPI(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, validationDefault := createMockValidationNode(t, ctx, nil)
client := server_api.NewExecutionClient(
&server_api.ExecutionClientOpts{
Config: StaticFetcherFrom(t, &rpcclient.TestClientConfig),
Stack: validationDefault,
},
)
client := server_api.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), validationDefault)
err := client.Start(ctx)
Require(t, err)

Expand Down Expand Up @@ -266,12 +261,7 @@ func TestValidationClientRoom(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mockSpawner, spawnerStack := createMockValidationNode(t, ctx, nil)
client := server_api.NewExecutionClient(
&server_api.ExecutionClientOpts{
Config: StaticFetcherFrom(t, &rpcclient.TestClientConfig),
Stack: spawnerStack,
},
)
client := server_api.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), spawnerStack)
err := client.Start(ctx)
Require(t, err)

Expand Down Expand Up @@ -358,10 +348,10 @@ func TestExecutionKeepAlive(t *testing.T) {
_, validationShortTO := createMockValidationNode(t, ctx, &shortTimeoutConfig)
configFetcher := StaticFetcherFrom(t, &rpcclient.TestClientConfig)

clientDefault := server_api.NewExecutionClient(&server_api.ExecutionClientOpts{Config: configFetcher, Stack: validationDefault})
clientDefault := server_api.NewExecutionClient(configFetcher, validationDefault)
err := clientDefault.Start(ctx)
Require(t, err)
clientShortTO := server_api.NewExecutionClient(&server_api.ExecutionClientOpts{Config: configFetcher, Stack: validationShortTO})
clientShortTO := server_api.NewExecutionClient(configFetcher, validationShortTO)
err = clientShortTO.Start(ctx)
Require(t, err)

Expand Down
41 changes: 22 additions & 19 deletions validator/server_api/validation_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type ExecServerAPI struct {
nextId uint64
runs map[uint64]*execRunEntry

consumer *pubsub.Consumer[GetLeavesHashRequest, []common.Hash]
consumers []*pubsub.Consumer[GetLeavesHashRequest, []common.Hash]
}

func NewExecutionServerAPI(valSpawner validator.ValidationSpawner, execution validator.ExecutionSpawner, config server_arb.ArbitratorSpawnerConfigFecher) *ExecServerAPI {
Expand Down Expand Up @@ -126,26 +126,29 @@ func (a *ExecServerAPI) removeOldRuns(ctx context.Context) time.Duration {
func (a *ExecServerAPI) Start(ctx_in context.Context) {
a.StopWaiter.Start(ctx_in, a)
a.CallIteratively(a.removeOldRuns)
if a.consumer != nil {
a.consumer.Start(ctx_in)
a.StopWaiter.CallIteratively(func(ctx context.Context) time.Duration {
msg, err := a.consumer.Consume(ctx)
if err != nil {
log.Error("Consuming request", "error", err)
if len(a.consumers) != 0 {
for i := 0; i < len(a.consumers); i++ {
a.consumers[i].Start(ctx_in)
a.StopWaiter.CallIteratively(func(ctx context.Context) time.Duration {
msg, err := a.consumers[i].Consume(ctx)
if err != nil {
log.Error("Consuming request", "error", err)
return 0
}
hashes, err := a.GetLeavesWithStepSize(ctx, msg.Value.ExecutionID, msg.Value.FromBatch, msg.Value.MachineStartIndex, msg.Value.StepSize, msg.Value.NumDesiredLeaves)
if err != nil {
// TODO: log other fields as well.
log.Error("Error getting leaves with stepsize", "error", err)
return 0
}
if err := a.consumers[i].SetResult(ctx, msg.ID, hashes); err != nil {
log.Error("Error setting result", "error", err)
return 0
}
// TODO: consider having this as config param.
return time.Second
}
hashes, err := a.GetLeavesWithStepSize(ctx, msg.Value.ExecutionID, msg.Value.FromBatch, msg.Value.MachineStartIndex, msg.Value.StepSize, msg.Value.NumDesiredLeaves)
if err != nil {
// TODO: log other fields as well.
log.Error("Error getting leaves with stepsize", "error", err)
}
if err := a.consumer.SetResult(ctx, msg.ID, hashes); err != nil {
log.Error("Error setting result", "error", err)
}
return time.Second
})

})
}
}
}

Expand Down
49 changes: 3 additions & 46 deletions validator/server_api/validation_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package server_api
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"sync/atomic"
"time"
Expand Down Expand Up @@ -106,58 +105,16 @@ type GetLeavesHashRequest struct {
NumDesiredLeaves uint64
}

// Generic marshaller
type jsonMarshaller[T any] struct{}

// Marshal converts a GetLeavesHashRequest into a JSON byte slice.
func (j jsonMarshaller[T]) Marshal(v T) []byte {
data, err := json.Marshal(v)
if err != nil {
log.Error("error marshaling", "value", v, "error", err)
return nil
}
return data
}

// Unmarshal converts a JSON byte slice into a GetLeavesHashRequest.
func (j jsonMarshaller[T]) Unmarshal(val []byte) (T, error) {
var v T
err := json.Unmarshal(val, &v)
if err != nil {
return v, err
}
return v, nil
}

type ExecutionClient struct {
ValidationClient

producer *pubsub.Producer[GetLeavesHashRequest, []common.Hash]
}

type RedisStreamCfg struct {
producerCfg *pubsub.ProducerConfig
}

type ExecutionClientOpts struct {
Config rpcclient.ClientConfigFetcher
Stack *node.Node
RedisStreamCfg *RedisStreamCfg
}

func NewExecutionClient(opts *ExecutionClientOpts) *ExecutionClient {
ret := &ExecutionClient{ValidationClient: *NewValidationClient(opts.Config, opts.Stack)}
if opts.RedisStreamCfg != nil {
p, err := pubsub.NewProducer[GetLeavesHashRequest, []common.Hash](
opts.RedisStreamCfg.producerCfg, jsonMarshaller[GetLeavesHashRequest]{}, jsonMarshaller[[]common.Hash]{},
)
if err != nil {
log.Error("Creating new producer", "error", err)
return ret
}
ret.producer = p
func NewExecutionClient(config rpcclient.ClientConfigFetcher, stack *node.Node) *ExecutionClient {
return &ExecutionClient{
ValidationClient: *NewValidationClient(config, stack),
}
return ret
}

func (c *ExecutionClient) CreateBoldExecutionRun(wasmModuleRoot common.Hash, stepSize uint64, input *validator.ValidationInput) containers.PromiseInterface[validator.ExecutionRun] {
Expand Down

0 comments on commit ed97ebf

Please sign in to comment.