From 6cc3a5dfe8ecf248c950264684612cdce958caf7 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Wed, 11 Sep 2024 10:12:38 -0600 Subject: [PATCH 01/15] block_validator: read batches once per batch --- staker/block_validator.go | 60 +++++++--- staker/stateless_block_validator.go | 171 ++++++++++++++++------------ 2 files changed, 145 insertions(+), 86 deletions(-) diff --git a/staker/block_validator.go b/staker/block_validator.go index 7a7efca846..18411c6163 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -56,12 +56,12 @@ type BlockValidator struct { chainCaughtUp bool // can only be accessed from creation thread or if holding reorg-write - nextCreateBatch []byte - nextCreateBatchBlockHash common.Hash - nextCreateBatchMsgCount arbutil.MessageIndex - nextCreateBatchReread bool - nextCreateStartGS validator.GoGlobalState - nextCreatePrevDelayed uint64 + nextCreateBatch *FullBatchInfo + nextCreateBatchReread bool + prevBatchCache map[uint64]*FullBatchInfo + + nextCreateStartGS validator.GoGlobalState + nextCreatePrevDelayed uint64 // can only be accessed from from validation thread or if holding reorg-write lastValidGS validator.GoGlobalState @@ -108,6 +108,7 @@ type BlockValidatorConfig struct { PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"` RecordingIterLimit uint64 `koanf:"recording-iter-limit"` ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"` + BatchCacheLimit uint32 `koanf:"batch-cache-limit"` CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"` @@ -174,6 +175,7 @@ func BlockValidatorConfigAddOptions(prefix string, f *pflag.FlagSet) { f.Duration(prefix+".validation-poll", DefaultBlockValidatorConfig.ValidationPoll, "poll time to check validations") f.Uint64(prefix+".forward-blocks", DefaultBlockValidatorConfig.ForwardBlocks, "prepare entries for up to that many blocks ahead of validation (small footprint)") f.Uint64(prefix+".prerecorded-blocks", DefaultBlockValidatorConfig.PrerecordedBlocks, "record that many blocks ahead of validation (larger footprint)") + f.Uint32(prefix+".batch-cache-limit", DefaultBlockValidatorConfig.BatchCacheLimit, "limit number of old batches to keep in block-validator") f.String(prefix+".current-module-root", DefaultBlockValidatorConfig.CurrentModuleRoot, "current wasm module root ('current' read from chain, 'latest' from machines/latest dir, or provide hash)") f.Uint64(prefix+".recording-iter-limit", DefaultBlockValidatorConfig.RecordingIterLimit, "limit on block recordings sent per iteration") f.String(prefix+".pending-upgrade-module-root", DefaultBlockValidatorConfig.PendingUpgradeModuleRoot, "pending upgrade wasm module root to additionally validate (hash, 'latest' or empty)") @@ -194,6 +196,7 @@ var DefaultBlockValidatorConfig = BlockValidatorConfig{ ValidationPoll: time.Second, ForwardBlocks: 1024, PrerecordedBlocks: uint64(2 * runtime.NumCPU()), + BatchCacheLimit: 20, CurrentModuleRoot: "current", PendingUpgradeModuleRoot: "latest", FailureIsFatal: true, @@ -209,6 +212,7 @@ var TestBlockValidatorConfig = BlockValidatorConfig{ RedisValidationClientConfig: redis.TestValidationClientConfig, ValidationPoll: 100 * time.Millisecond, ForwardBlocks: 128, + BatchCacheLimit: 20, PrerecordedBlocks: uint64(2 * runtime.NumCPU()), RecordingIterLimit: 20, CurrentModuleRoot: "latest", @@ -271,6 +275,7 @@ func NewBlockValidator( progressValidationsChan: make(chan struct{}, 1), config: config, fatalErr: fatalErr, + prevBatchCache: make(map[uint64]*FullBatchInfo), } if !config().Dangerous.ResetBlockValidation { validated, err := ret.ReadLastValidatedInfo() @@ -571,33 +576,58 @@ func (v *BlockValidator) createNextValidationEntry(ctx context.Context) (bool, e } if v.nextCreateStartGS.PosInBatch == 0 || v.nextCreateBatchReread { // new batch - found, batch, batchBlockHash, count, err := v.readBatch(ctx, v.nextCreateStartGS.Batch) + found, fullBatchInfo, err := v.readBatch(ctx, v.nextCreateStartGS.Batch) if !found { return false, err } - v.nextCreateBatch = batch - v.nextCreateBatchBlockHash = batchBlockHash - v.nextCreateBatchMsgCount = count + if v.nextCreateBatch != nil { + v.prevBatchCache[v.nextCreateBatch.Number] = v.nextCreateBatch + } + v.nextCreateBatch = fullBatchInfo // #nosec G115 - validatorMsgCountCurrentBatch.Update(int64(count)) + validatorMsgCountCurrentBatch.Update(int64(fullBatchInfo.MsgCount)) + batchCacheLimit := v.config().BatchCacheLimit + if len(v.prevBatchCache) > int(batchCacheLimit) { + for num := range v.prevBatchCache { + if num < v.nextCreateStartGS.Batch-uint64(batchCacheLimit) { + delete(v.prevBatchCache, num) + } + } + } v.nextCreateBatchReread = false } endGS := validator.GoGlobalState{ BlockHash: endRes.BlockHash, SendRoot: endRes.SendRoot, } - if pos+1 < v.nextCreateBatchMsgCount { + if pos+1 < v.nextCreateBatch.MsgCount { endGS.Batch = v.nextCreateStartGS.Batch endGS.PosInBatch = v.nextCreateStartGS.PosInBatch + 1 - } else if pos+1 == v.nextCreateBatchMsgCount { + } else if pos+1 == v.nextCreateBatch.MsgCount { endGS.Batch = v.nextCreateStartGS.Batch + 1 endGS.PosInBatch = 0 } else { - return false, fmt.Errorf("illegal batch msg count %d pos %d batch %d", v.nextCreateBatchMsgCount, pos, endGS.Batch) + return false, fmt.Errorf("illegal batch msg count %d pos %d batch %d", v.nextCreateBatch.MsgCount, pos, endGS.Batch) } chainConfig := v.streamer.ChainConfig() + batchReader := func(batchNum uint64) (*FullBatchInfo, error) { + if batchNum == v.nextCreateBatch.Number { + return v.nextCreateBatch, nil + } + if entry, found := v.prevBatchCache[batchNum]; found { + return entry, nil + } + found, entry, err := v.readBatch(ctx, batchNum) + if err != nil { + return nil, err + } + if !found { + return nil, fmt.Errorf("batch %d not found", batchNum) + } + return entry, nil + } entry, err := newValidationEntry( - pos, v.nextCreateStartGS, endGS, msg, v.nextCreateBatch, v.nextCreateBatchBlockHash, v.nextCreatePrevDelayed, chainConfig, + pos, v.nextCreateStartGS, endGS, msg, batchReader, v.nextCreatePrevDelayed, chainConfig, ) if err != nil { return false, err diff --git a/staker/stateless_block_validator.go b/staker/stateless_block_validator.go index f54ec8b58c..857bc255ad 100644 --- a/staker/stateless_block_validator.go +++ b/staker/stateless_block_validator.go @@ -114,6 +114,14 @@ const ( Ready ) +type FullBatchInfo struct { + Number uint64 + BlockHash common.Hash + PostedData []byte + MsgCount arbutil.MessageIndex + Preimages map[arbutil.PreimageType]map[common.Hash][]byte +} + type validationEntry struct { Stage ValidationEntryStage // Valid since ReadyforRecord: @@ -171,16 +179,27 @@ func newValidationEntry( start validator.GoGlobalState, end validator.GoGlobalState, msg *arbostypes.MessageWithMetadata, - batch []byte, - batchBlockHash common.Hash, + fullBatchFetcher func(uint64) (*FullBatchInfo, error), prevDelayed uint64, chainConfig *params.ChainConfig, ) (*validationEntry, error) { - batchInfo := validator.BatchInfo{ - Number: start.Batch, - BlockHash: batchBlockHash, - Data: batch, + preimages := make(map[arbutil.PreimageType]map[common.Hash][]byte) + valBatches := make([]validator.BatchInfo, 0) + + valBatchFetcher := func(batchNum uint64) ([]byte, error) { + fullBatchInfo, err := fullBatchFetcher(batchNum) + if err != nil { + return nil, err + } + valBatches = append(valBatches, validator.BatchInfo{ + Number: batchNum, + BlockHash: fullBatchInfo.BlockHash, + Data: fullBatchInfo.PostedData, + }) + clonePreimagesInto(preimages, fullBatchInfo.Preimages) + return fullBatchInfo.PostedData, nil } + hasDelayed := false var delayedNum uint64 if msg.DelayedMessagesRead == prevDelayed+1 { @@ -189,6 +208,15 @@ func newValidationEntry( } else if msg.DelayedMessagesRead != prevDelayed { return nil, fmt.Errorf("illegal validation entry delayedMessage %d, previous %d", msg.DelayedMessagesRead, prevDelayed) } + + if _, err := valBatchFetcher(start.Batch); err != nil { + return nil, err + } + msg.Message.BatchGasCost = nil + if err := msg.Message.FillInBatchGasCost(valBatchFetcher); err != nil { + return nil, err + } + return &validationEntry{ Stage: ReadyForRecord, Pos: pos, @@ -197,8 +225,9 @@ func newValidationEntry( HasDelayedMsg: hasDelayed, DelayedMsgNr: delayedNum, msg: msg, - BatchInfo: []validator.BatchInfo{batchInfo}, + BatchInfo: valBatches, ChainConfig: chainConfig, + Preimages: preimages, }, nil } @@ -246,30 +275,73 @@ func NewStatelessBlockValidator( }, nil } -func (v *StatelessBlockValidator) readBatch(ctx context.Context, batchNum uint64) (bool, []byte, common.Hash, arbutil.MessageIndex, error) { +func (v *StatelessBlockValidator) readBatch(ctx context.Context, batchNum uint64) (bool, *FullBatchInfo, error) { batchCount, err := v.inboxTracker.GetBatchCount() if err != nil { - return false, nil, common.Hash{}, 0, err + return false, nil, err } if batchCount <= batchNum { - return false, nil, common.Hash{}, 0, nil + return false, nil, nil } batchMsgCount, err := v.inboxTracker.GetBatchMessageCount(batchNum) if err != nil { - return false, nil, common.Hash{}, 0, err + return false, nil, err } - batch, batchBlockHash, err := v.inboxReader.GetSequencerMessageBytes(ctx, batchNum) + postedData, batchBlockHash, err := v.inboxReader.GetSequencerMessageBytes(ctx, batchNum) if err != nil { - return false, nil, common.Hash{}, 0, err + return false, nil, err + } + preimages := make(map[arbutil.PreimageType]map[common.Hash][]byte) + if len(postedData) > 40 { + foundDA := false + for _, dapReader := range v.dapReaders { + if dapReader != nil && dapReader.IsValidHeaderByte(postedData[40]) { + preimageRecorder := daprovider.RecordPreimagesTo(preimages) + _, err := dapReader.RecoverPayloadFromBatch(ctx, batchNum, batchBlockHash, postedData, preimageRecorder, true) + if err != nil { + // Matches the way keyset validation was done inside DAS readers i.e logging the error + // But other daproviders might just want to return the error + if errors.Is(err, daprovider.ErrSeqMsgValidation) && daprovider.IsDASMessageHeaderByte(postedData[40]) { + log.Error(err.Error()) + } else { + return false, nil, err + } + } + foundDA = true + break + } + } + if !foundDA { + if daprovider.IsDASMessageHeaderByte(postedData[40]) { + log.Error("No DAS Reader configured, but sequencer message found with DAS header") + } + } + } + fullInfo := FullBatchInfo{ + Number: batchNum, + BlockHash: batchBlockHash, + PostedData: postedData, + MsgCount: batchMsgCount, + Preimages: preimages, + } + return true, &fullInfo, nil +} + +func clonePreimagesInto(dest, source map[arbutil.PreimageType]map[common.Hash][]byte) { + for piType, piMap := range source { + if dest[piType] == nil { + dest[piType] = make(map[common.Hash][]byte, len(source[piType])) + } + for hash, preimage := range piMap { + dest[piType][hash] = preimage + } } - return true, batch, batchBlockHash, batchMsgCount, nil } func (v *StatelessBlockValidator) ValidationEntryRecord(ctx context.Context, e *validationEntry) error { if e.Stage != ReadyForRecord { return fmt.Errorf("validation entry should be ReadyForRecord, is: %v", e.Stage) } - e.Preimages = make(map[arbutil.PreimageType]map[common.Hash][]byte) if e.Pos != 0 { recording, err := v.recorder.RecordBlockCreation(ctx, e.Pos, e.msg) if err != nil { @@ -278,30 +350,10 @@ func (v *StatelessBlockValidator) ValidationEntryRecord(ctx context.Context, e * if recording.BlockHash != e.End.BlockHash { return fmt.Errorf("recording failed: pos %d, hash expected %v, got %v", e.Pos, e.End.BlockHash, recording.BlockHash) } - // record any additional batch fetching - batchFetcher := func(batchNum uint64) ([]byte, error) { - found, data, hash, _, err := v.readBatch(ctx, batchNum) - if err != nil { - return nil, err - } - if !found { - return nil, errors.New("batch not found") - } - e.BatchInfo = append(e.BatchInfo, validator.BatchInfo{ - Number: batchNum, - BlockHash: hash, - Data: data, - }) - return data, nil - } - e.msg.Message.BatchGasCost = nil - err = e.msg.Message.FillInBatchGasCost(batchFetcher) - if err != nil { - return err - } - if recording.Preimages != nil { - e.Preimages[arbutil.Keccak256PreimageType] = recording.Preimages + recordingPreimages := make(map[arbutil.PreimageType]map[common.Hash][]byte) + recordingPreimages[arbutil.Keccak256PreimageType] = recording.Preimages + clonePreimagesInto(e.Preimages, recordingPreimages) } e.UserWasms = recording.UserWasms } @@ -316,35 +368,6 @@ func (v *StatelessBlockValidator) ValidationEntryRecord(ctx context.Context, e * } e.DelayedMsg = delayedMsg } - for _, batch := range e.BatchInfo { - if len(batch.Data) <= 40 { - continue - } - foundDA := false - for _, dapReader := range v.dapReaders { - if dapReader != nil && dapReader.IsValidHeaderByte(batch.Data[40]) { - preimageRecorder := daprovider.RecordPreimagesTo(e.Preimages) - _, err := dapReader.RecoverPayloadFromBatch(ctx, batch.Number, batch.BlockHash, batch.Data, preimageRecorder, true) - if err != nil { - // Matches the way keyset validation was done inside DAS readers i.e logging the error - // But other daproviders might just want to return the error - if errors.Is(err, daprovider.ErrSeqMsgValidation) && daprovider.IsDASMessageHeaderByte(batch.Data[40]) { - log.Error(err.Error()) - } else { - return err - } - } - foundDA = true - break - } - } - if !foundDA { - if daprovider.IsDASMessageHeaderByte(batch.Data[40]) { - log.Error("No DAS Reader configured, but sequencer message found with DAS header") - } - } - } - e.msg = nil // no longer needed e.Stage = Ready return nil @@ -404,11 +427,17 @@ func (v *StatelessBlockValidator) CreateReadyValidationEntry(ctx context.Context } start := buildGlobalState(*prevResult, startPos) end := buildGlobalState(*result, endPos) - seqMsg, batchBlockHash, err := v.inboxReader.GetSequencerMessageBytes(ctx, startPos.BatchNumber) - if err != nil { - return nil, err + fullBatchReader := func(batchNum uint64) (*FullBatchInfo, error) { + found, fullBlockInfo, err := v.readBatch(ctx, batchNum) + if err != nil { + return nil, err + } + if !found { + return nil, fmt.Errorf("batch %d not found", startPos.BatchNumber) + } + return fullBlockInfo, nil } - entry, err := newValidationEntry(pos, start, end, msg, seqMsg, batchBlockHash, prevDelayed, v.streamer.ChainConfig()) + entry, err := newValidationEntry(pos, start, end, msg, fullBatchReader, prevDelayed, v.streamer.ChainConfig()) if err != nil { return nil, err } From d752445bfac0da673fcfe7ec7f3dbe209223ac71 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Wed, 11 Sep 2024 10:16:50 -0600 Subject: [PATCH 02/15] remove blockhash from validator.BatchInfo --- staker/stateless_block_validator.go | 5 ++--- validator/validation_entry.go | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/staker/stateless_block_validator.go b/staker/stateless_block_validator.go index 857bc255ad..3d92b9df59 100644 --- a/staker/stateless_block_validator.go +++ b/staker/stateless_block_validator.go @@ -192,9 +192,8 @@ func newValidationEntry( return nil, err } valBatches = append(valBatches, validator.BatchInfo{ - Number: batchNum, - BlockHash: fullBatchInfo.BlockHash, - Data: fullBatchInfo.PostedData, + Number: batchNum, + Data: fullBatchInfo.PostedData, }) clonePreimagesInto(preimages, fullBatchInfo.Preimages) return fullBatchInfo.PostedData, nil diff --git a/validator/validation_entry.go b/validator/validation_entry.go index d340993fa2..4ec6919d3b 100644 --- a/validator/validation_entry.go +++ b/validator/validation_entry.go @@ -7,9 +7,8 @@ import ( ) type BatchInfo struct { - Number uint64 - BlockHash common.Hash - Data []byte + Number uint64 + Data []byte } type ValidationInput struct { From 25b4afe0363f65a9df15ee27a20917151dddf3f8 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Wed, 11 Sep 2024 10:31:01 -0600 Subject: [PATCH 03/15] update default forward-blocks value --- staker/block_validator.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/staker/block_validator.go b/staker/block_validator.go index 18411c6163..847b6f6349 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -173,7 +173,7 @@ func BlockValidatorConfigAddOptions(prefix string, f *pflag.FlagSet) { redis.ValidationClientConfigAddOptions(prefix+".redis-validation-client-config", f) f.String(prefix+".validation-server-configs-list", DefaultBlockValidatorConfig.ValidationServerConfigsList, "array of execution rpc configs given as a json string. time duration should be supplied in number indicating nanoseconds") f.Duration(prefix+".validation-poll", DefaultBlockValidatorConfig.ValidationPoll, "poll time to check validations") - f.Uint64(prefix+".forward-blocks", DefaultBlockValidatorConfig.ForwardBlocks, "prepare entries for up to that many blocks ahead of validation (small footprint)") + f.Uint64(prefix+".forward-blocks", DefaultBlockValidatorConfig.ForwardBlocks, "prepare entries for up to that many blocks ahead of validation (stores batch-copy per block)") f.Uint64(prefix+".prerecorded-blocks", DefaultBlockValidatorConfig.PrerecordedBlocks, "record that many blocks ahead of validation (larger footprint)") f.Uint32(prefix+".batch-cache-limit", DefaultBlockValidatorConfig.BatchCacheLimit, "limit number of old batches to keep in block-validator") f.String(prefix+".current-module-root", DefaultBlockValidatorConfig.CurrentModuleRoot, "current wasm module root ('current' read from chain, 'latest' from machines/latest dir, or provide hash)") @@ -194,7 +194,7 @@ var DefaultBlockValidatorConfig = BlockValidatorConfig{ ValidationServer: rpcclient.DefaultClientConfig, RedisValidationClientConfig: redis.DefaultValidationClientConfig, ValidationPoll: time.Second, - ForwardBlocks: 1024, + ForwardBlocks: 128, PrerecordedBlocks: uint64(2 * runtime.NumCPU()), BatchCacheLimit: 20, CurrentModuleRoot: "current", From c98d652ce97e99063596096f5b0bd993c3cf22d4 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Wed, 11 Sep 2024 10:38:08 -0600 Subject: [PATCH 04/15] block_validator: prevBatchCache in reorgs --- staker/block_validator.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/staker/block_validator.go b/staker/block_validator.go index 847b6f6349..49dc59c4a2 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -1027,6 +1027,9 @@ func (v *BlockValidator) UpdateLatestStaked(count arbutil.MessageIndex, globalSt v.nextCreateStartGS = globalState v.nextCreatePrevDelayed = msg.DelayedMessagesRead v.nextCreateBatchReread = true + if v.nextCreateBatch != nil { + v.prevBatchCache[v.nextCreateBatch.Number] = v.nextCreateBatch + } v.createdA.Store(countUint64) } // under the reorg mutex we don't need atomic access @@ -1053,6 +1056,7 @@ func (v *BlockValidator) ReorgToBatchCount(count uint64) { defer v.reorgMutex.Unlock() if v.nextCreateStartGS.Batch >= count { v.nextCreateBatchReread = true + v.prevBatchCache = make(map[uint64]*FullBatchInfo) } } @@ -1093,6 +1097,7 @@ func (v *BlockValidator) Reorg(ctx context.Context, count arbutil.MessageIndex) v.nextCreateStartGS = buildGlobalState(*res, endPosition) v.nextCreatePrevDelayed = msg.DelayedMessagesRead v.nextCreateBatchReread = true + v.prevBatchCache = make(map[uint64]*FullBatchInfo) countUint64 := uint64(count) v.createdA.Store(countUint64) // under the reorg mutex we don't need atomic access From b71fd7b5befb34901b2fdfbe1ead03dd1b5eeb26 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Wed, 11 Sep 2024 11:03:59 -0600 Subject: [PATCH 05/15] block_recorder: separate config, add place to max blocks --- execution/gethexec/block_recorder.go | 32 +++++++++++++++++++++++++--- execution/gethexec/node.go | 30 +++++++++++++------------- 2 files changed, 44 insertions(+), 18 deletions(-) diff --git a/execution/gethexec/block_recorder.go b/execution/gethexec/block_recorder.go index 8879c90702..a31b6b3736 100644 --- a/execution/gethexec/block_recorder.go +++ b/execution/gethexec/block_recorder.go @@ -16,6 +16,7 @@ import ( "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/execution" + flag "github.com/spf13/pflag" ) // BlockRecorder uses a separate statedatabase from the blockchain. @@ -25,6 +26,8 @@ import ( // Most recent/advanced header we ever computed (lastHdr) // Hopefully - some recent valid block. For that we always keep one candidate block until it becomes validated. type BlockRecorder struct { + config *BlockRecorderConfig + recordingDatabase *arbitrum.RecordingDatabase execEngine *ExecutionEngine @@ -39,10 +42,33 @@ type BlockRecorder struct { preparedLock sync.Mutex } -func NewBlockRecorder(config *arbitrum.RecordingDatabaseConfig, execEngine *ExecutionEngine, ethDb ethdb.Database) *BlockRecorder { +type BlockRecorderConfig struct { + TrieDirtyCache int `koanf:"trie-dirty-cache"` + TrieCleanCache int `koanf:"trie-clean-cache"` + MaxPrepared int `koanf:"max-prepared"` +} + +var DefaultBlockRecorderConfig = BlockRecorderConfig{ + TrieDirtyCache: 1024, + TrieCleanCache: 16, + MaxPrepared: 1000, +} + +func BlockRecorderConfigAddOptions(prefix string, f *flag.FlagSet) { + f.Int(prefix+".trie-dirty-cache", DefaultBlockRecorderConfig.TrieDirtyCache, "like trie-dirty-cache for the separate, recording database (used for validation)") + f.Int(prefix+".trie-clean-cache", DefaultBlockRecorderConfig.TrieCleanCache, "like trie-clean-cache for the separate, recording database (used for validation)") + f.Int(prefix+".max-prepared", DefaultBlockRecorderConfig.MaxPrepared, "max references to store in the recording database") +} + +func NewBlockRecorder(config *BlockRecorderConfig, execEngine *ExecutionEngine, ethDb ethdb.Database) *BlockRecorder { + dbConfig := arbitrum.RecordingDatabaseConfig{ + TrieDirtyCache: config.TrieDirtyCache, + TrieCleanCache: config.TrieCleanCache, + } recorder := &BlockRecorder{ + config: config, execEngine: execEngine, - recordingDatabase: arbitrum.NewRecordingDatabase(config, ethDb, execEngine.bc), + recordingDatabase: arbitrum.NewRecordingDatabase(&dbConfig, ethDb, execEngine.bc), } execEngine.SetRecorder(recorder) return recorder @@ -303,7 +329,7 @@ func (r *BlockRecorder) PrepareForRecord(ctx context.Context, start, end arbutil r.updateLastHdr(header) hdrNum++ } - r.preparedAddTrim(references, 1000) + r.preparedAddTrim(references, r.config.MaxPrepared) return nil } diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 21c2b4bece..5847f392b1 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -78,19 +78,19 @@ func StylusTargetConfigAddOptions(prefix string, f *flag.FlagSet) { } type Config struct { - ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"` - Sequencer SequencerConfig `koanf:"sequencer" reload:"hot"` - RecordingDatabase arbitrum.RecordingDatabaseConfig `koanf:"recording-database"` - TxPreChecker TxPreCheckerConfig `koanf:"tx-pre-checker" reload:"hot"` - Forwarder ForwarderConfig `koanf:"forwarder"` - ForwardingTarget string `koanf:"forwarding-target"` - SecondaryForwardingTarget []string `koanf:"secondary-forwarding-target"` - Caching CachingConfig `koanf:"caching"` - RPC arbitrum.Config `koanf:"rpc"` - TxLookupLimit uint64 `koanf:"tx-lookup-limit"` - EnablePrefetchBlock bool `koanf:"enable-prefetch-block"` - SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"` - StylusTarget StylusTargetConfig `koanf:"stylus-target"` + ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"` + Sequencer SequencerConfig `koanf:"sequencer" reload:"hot"` + RecordingDatabase BlockRecorderConfig `koanf:"recording-database"` + TxPreChecker TxPreCheckerConfig `koanf:"tx-pre-checker" reload:"hot"` + Forwarder ForwarderConfig `koanf:"forwarder"` + ForwardingTarget string `koanf:"forwarding-target"` + SecondaryForwardingTarget []string `koanf:"secondary-forwarding-target"` + Caching CachingConfig `koanf:"caching"` + RPC arbitrum.Config `koanf:"rpc"` + TxLookupLimit uint64 `koanf:"tx-lookup-limit"` + EnablePrefetchBlock bool `koanf:"enable-prefetch-block"` + SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"` + StylusTarget StylusTargetConfig `koanf:"stylus-target"` forwardingTarget string } @@ -123,7 +123,7 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) { arbitrum.ConfigAddOptions(prefix+".rpc", f) SequencerConfigAddOptions(prefix+".sequencer", f) headerreader.AddOptions(prefix+".parent-chain-reader", f) - arbitrum.RecordingDatabaseConfigAddOptions(prefix+".recording-database", f) + BlockRecorderConfigAddOptions(prefix+".recording-database", f) f.String(prefix+".forwarding-target", ConfigDefault.ForwardingTarget, "transaction forwarding target URL, or \"null\" to disable forwarding (iff not sequencer)") f.StringSlice(prefix+".secondary-forwarding-target", ConfigDefault.SecondaryForwardingTarget, "secondary transaction forwarding target URL") AddOptionsForNodeForwarderConfig(prefix+".forwarder", f) @@ -139,7 +139,7 @@ var ConfigDefault = Config{ RPC: arbitrum.DefaultConfig, Sequencer: DefaultSequencerConfig, ParentChainReader: headerreader.DefaultConfig, - RecordingDatabase: arbitrum.DefaultRecordingDatabaseConfig, + RecordingDatabase: DefaultBlockRecorderConfig, ForwardingTarget: "", SecondaryForwardingTarget: []string{}, TxPreChecker: DefaultTxPreCheckerConfig, From 747a0f37c0f72a1135ecfb3eb4d1eab6820b0ca1 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Wed, 11 Sep 2024 13:31:09 -0600 Subject: [PATCH 06/15] block_validator: delete batch from cache after used by report --- staker/block_validator.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/staker/block_validator.go b/staker/block_validator.go index 49dc59c4a2..4a5bd740a8 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -614,7 +614,9 @@ func (v *BlockValidator) createNextValidationEntry(ctx context.Context) (bool, e if batchNum == v.nextCreateBatch.Number { return v.nextCreateBatch, nil } + // only batch-posting-reports will get here, and there's only one per batch if entry, found := v.prevBatchCache[batchNum]; found { + delete(v.prevBatchCache, batchNum) return entry, nil } found, entry, err := v.readBatch(ctx, batchNum) From 6d0ee874def40855f2be93bf89625bc56854f48b Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Wed, 11 Sep 2024 19:18:36 -0600 Subject: [PATCH 07/15] block_validator: nits --- staker/stateless_block_validator.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/staker/stateless_block_validator.go b/staker/stateless_block_validator.go index 3d92b9df59..5c92fb4025 100644 --- a/staker/stateless_block_validator.go +++ b/staker/stateless_block_validator.go @@ -195,7 +195,7 @@ func newValidationEntry( Number: batchNum, Data: fullBatchInfo.PostedData, }) - clonePreimagesInto(preimages, fullBatchInfo.Preimages) + copyPreimagesInto(preimages, fullBatchInfo.Preimages) return fullBatchInfo.PostedData, nil } @@ -326,10 +326,10 @@ func (v *StatelessBlockValidator) readBatch(ctx context.Context, batchNum uint64 return true, &fullInfo, nil } -func clonePreimagesInto(dest, source map[arbutil.PreimageType]map[common.Hash][]byte) { +func copyPreimagesInto(dest, source map[arbutil.PreimageType]map[common.Hash][]byte) { for piType, piMap := range source { if dest[piType] == nil { - dest[piType] = make(map[common.Hash][]byte, len(source[piType])) + dest[piType] = make(map[common.Hash][]byte, len(piMap)) } for hash, preimage := range piMap { dest[piType][hash] = preimage @@ -350,9 +350,10 @@ func (v *StatelessBlockValidator) ValidationEntryRecord(ctx context.Context, e * return fmt.Errorf("recording failed: pos %d, hash expected %v, got %v", e.Pos, e.End.BlockHash, recording.BlockHash) } if recording.Preimages != nil { - recordingPreimages := make(map[arbutil.PreimageType]map[common.Hash][]byte) - recordingPreimages[arbutil.Keccak256PreimageType] = recording.Preimages - clonePreimagesInto(e.Preimages, recordingPreimages) + recordingPreimages := map[arbutil.PreimageType]map[common.Hash][]byte{ + arbutil.Keccak256PreimageType: recording.Preimages, + } + copyPreimagesInto(e.Preimages, recordingPreimages) } e.UserWasms = recording.UserWasms } From 5591e5a1c4e787c493e4957b3f73248a2ec704c1 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Wed, 11 Sep 2024 21:05:35 -0600 Subject: [PATCH 08/15] block_validator: separate reading full batches from just posted data --- arbos/arbostypes/incomingmessage.go | 11 ++++ staker/block_validator.go | 51 ++++++++++--------- staker/stateless_block_validator.go | 79 ++++++++++++++++++----------- 3 files changed, 86 insertions(+), 55 deletions(-) diff --git a/arbos/arbostypes/incomingmessage.go b/arbos/arbostypes/incomingmessage.go index 04ce8ebe2e..c4c2dc037b 100644 --- a/arbos/arbostypes/incomingmessage.go +++ b/arbos/arbostypes/incomingmessage.go @@ -182,6 +182,17 @@ func (msg *L1IncomingMessage) FillInBatchGasCost(batchFetcher FallibleBatchFetch return nil } +func (msg *L1IncomingMessage) PastBatchesRequired() ([]uint64, error) { + if msg.Header.Kind != L1MessageType_BatchPostingReport { + return nil, nil + } + _, _, _, batchNum, _, _, err := ParseBatchPostingReportMessageFields(bytes.NewReader(msg.L2msg)) + if err != nil { + return nil, fmt.Errorf("failed to parse batch posting report: %w", err) + } + return []uint64{batchNum}, nil +} + func ParseIncomingL1Message(rd io.Reader, batchFetcher FallibleBatchFetcher) (*L1IncomingMessage, error) { var kindBuf [1]byte _, err := rd.Read(kindBuf[:]) diff --git a/staker/block_validator.go b/staker/block_validator.go index 4a5bd740a8..e1b2c75b84 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -58,7 +58,7 @@ type BlockValidator struct { // can only be accessed from creation thread or if holding reorg-write nextCreateBatch *FullBatchInfo nextCreateBatchReread bool - prevBatchCache map[uint64]*FullBatchInfo + prevBatchCache map[uint64][]byte nextCreateStartGS validator.GoGlobalState nextCreatePrevDelayed uint64 @@ -275,7 +275,7 @@ func NewBlockValidator( progressValidationsChan: make(chan struct{}, 1), config: config, fatalErr: fatalErr, - prevBatchCache: make(map[uint64]*FullBatchInfo), + prevBatchCache: make(map[uint64][]byte), } if !config().Dangerous.ResetBlockValidation { validated, err := ret.ReadLastValidatedInfo() @@ -576,12 +576,12 @@ func (v *BlockValidator) createNextValidationEntry(ctx context.Context) (bool, e } if v.nextCreateStartGS.PosInBatch == 0 || v.nextCreateBatchReread { // new batch - found, fullBatchInfo, err := v.readBatch(ctx, v.nextCreateStartGS.Batch) + found, fullBatchInfo, err := v.readFullBatch(ctx, v.nextCreateStartGS.Batch) if !found { return false, err } if v.nextCreateBatch != nil { - v.prevBatchCache[v.nextCreateBatch.Number] = v.nextCreateBatch + v.prevBatchCache[v.nextCreateBatch.Number] = v.nextCreateBatch.PostedData } v.nextCreateBatch = fullBatchInfo // #nosec G115 @@ -589,7 +589,7 @@ func (v *BlockValidator) createNextValidationEntry(ctx context.Context) (bool, e batchCacheLimit := v.config().BatchCacheLimit if len(v.prevBatchCache) > int(batchCacheLimit) { for num := range v.prevBatchCache { - if num < v.nextCreateStartGS.Batch-uint64(batchCacheLimit) { + if num+uint64(batchCacheLimit) < v.nextCreateStartGS.Batch { delete(v.prevBatchCache, num) } } @@ -610,26 +610,29 @@ func (v *BlockValidator) createNextValidationEntry(ctx context.Context) (bool, e return false, fmt.Errorf("illegal batch msg count %d pos %d batch %d", v.nextCreateBatch.MsgCount, pos, endGS.Batch) } chainConfig := v.streamer.ChainConfig() - batchReader := func(batchNum uint64) (*FullBatchInfo, error) { - if batchNum == v.nextCreateBatch.Number { - return v.nextCreateBatch, nil - } - // only batch-posting-reports will get here, and there's only one per batch - if entry, found := v.prevBatchCache[batchNum]; found { + prevBatchNums, err := msg.Message.PastBatchesRequired() + if err != nil { + return false, err + } + prevBatches := make([]validator.BatchInfo, 0, len(prevBatchNums)) + // prevBatchNums are only used for batch reports, each is only used once + for _, batchNum := range prevBatchNums { + data, found := v.prevBatchCache[batchNum] + if found { delete(v.prevBatchCache, batchNum) - return entry, nil - } - found, entry, err := v.readBatch(ctx, batchNum) - if err != nil { - return nil, err - } - if !found { - return nil, fmt.Errorf("batch %d not found", batchNum) + } else { + data, err = v.readPostedBatch(ctx, batchNum) + if err != nil { + return false, err + } } - return entry, nil + prevBatches = append(prevBatches, validator.BatchInfo{ + Number: batchNum, + Data: data, + }) } entry, err := newValidationEntry( - pos, v.nextCreateStartGS, endGS, msg, batchReader, v.nextCreatePrevDelayed, chainConfig, + pos, v.nextCreateStartGS, endGS, msg, v.nextCreateBatch, prevBatches, v.nextCreatePrevDelayed, chainConfig, ) if err != nil { return false, err @@ -1030,7 +1033,7 @@ func (v *BlockValidator) UpdateLatestStaked(count arbutil.MessageIndex, globalSt v.nextCreatePrevDelayed = msg.DelayedMessagesRead v.nextCreateBatchReread = true if v.nextCreateBatch != nil { - v.prevBatchCache[v.nextCreateBatch.Number] = v.nextCreateBatch + v.prevBatchCache[v.nextCreateBatch.Number] = v.nextCreateBatch.PostedData } v.createdA.Store(countUint64) } @@ -1058,7 +1061,7 @@ func (v *BlockValidator) ReorgToBatchCount(count uint64) { defer v.reorgMutex.Unlock() if v.nextCreateStartGS.Batch >= count { v.nextCreateBatchReread = true - v.prevBatchCache = make(map[uint64]*FullBatchInfo) + v.prevBatchCache = make(map[uint64][]byte) } } @@ -1099,7 +1102,7 @@ func (v *BlockValidator) Reorg(ctx context.Context, count arbutil.MessageIndex) v.nextCreateStartGS = buildGlobalState(*res, endPosition) v.nextCreatePrevDelayed = msg.DelayedMessagesRead v.nextCreateBatchReread = true - v.prevBatchCache = make(map[uint64]*FullBatchInfo) + v.prevBatchCache = make(map[uint64][]byte) countUint64 := uint64(count) v.createdA.Store(countUint64) // under the reorg mutex we don't need atomic access diff --git a/staker/stateless_block_validator.go b/staker/stateless_block_validator.go index 5c92fb4025..60306d712f 100644 --- a/staker/stateless_block_validator.go +++ b/staker/stateless_block_validator.go @@ -116,7 +116,6 @@ const ( type FullBatchInfo struct { Number uint64 - BlockHash common.Hash PostedData []byte MsgCount arbutil.MessageIndex Preimages map[arbutil.PreimageType]map[common.Hash][]byte @@ -179,25 +178,27 @@ func newValidationEntry( start validator.GoGlobalState, end validator.GoGlobalState, msg *arbostypes.MessageWithMetadata, - fullBatchFetcher func(uint64) (*FullBatchInfo, error), + fullBatchInfo *FullBatchInfo, + prevBatches []validator.BatchInfo, prevDelayed uint64, chainConfig *params.ChainConfig, ) (*validationEntry, error) { preimages := make(map[arbutil.PreimageType]map[common.Hash][]byte) - valBatches := make([]validator.BatchInfo, 0) - - valBatchFetcher := func(batchNum uint64) ([]byte, error) { - fullBatchInfo, err := fullBatchFetcher(batchNum) - if err != nil { - return nil, err - } - valBatches = append(valBatches, validator.BatchInfo{ - Number: batchNum, + if fullBatchInfo == nil { + return nil, fmt.Errorf("fullbatchInfo cannot be nil") + } + if fullBatchInfo.Number != start.Batch { + return nil, fmt.Errorf("got wrong batch expected: %d got: %d", start.Batch, fullBatchInfo.Number) + } + valBatches := []validator.BatchInfo{ + { + Number: fullBatchInfo.Number, Data: fullBatchInfo.PostedData, - }) - copyPreimagesInto(preimages, fullBatchInfo.Preimages) - return fullBatchInfo.PostedData, nil + }, } + valBatches = append(valBatches, prevBatches...) + + copyPreimagesInto(preimages, fullBatchInfo.Preimages) hasDelayed := false var delayedNum uint64 @@ -208,14 +209,6 @@ func newValidationEntry( return nil, fmt.Errorf("illegal validation entry delayedMessage %d, previous %d", msg.DelayedMessagesRead, prevDelayed) } - if _, err := valBatchFetcher(start.Batch); err != nil { - return nil, err - } - msg.Message.BatchGasCost = nil - if err := msg.Message.FillInBatchGasCost(valBatchFetcher); err != nil { - return nil, err - } - return &validationEntry{ Stage: ReadyForRecord, Pos: pos, @@ -274,7 +267,19 @@ func NewStatelessBlockValidator( }, nil } -func (v *StatelessBlockValidator) readBatch(ctx context.Context, batchNum uint64) (bool, *FullBatchInfo, error) { +func (v *StatelessBlockValidator) readPostedBatch(ctx context.Context, batchNum uint64) ([]byte, error) { + batchCount, err := v.inboxTracker.GetBatchCount() + if err != nil { + return nil, err + } + if batchCount <= batchNum { + return nil, fmt.Errorf("batch not found: %d", batchNum) + } + postedData, _, err := v.inboxReader.GetSequencerMessageBytes(ctx, batchNum) + return postedData, err +} + +func (v *StatelessBlockValidator) readFullBatch(ctx context.Context, batchNum uint64) (bool, *FullBatchInfo, error) { batchCount, err := v.inboxTracker.GetBatchCount() if err != nil { return false, nil, err @@ -318,7 +323,6 @@ func (v *StatelessBlockValidator) readBatch(ctx context.Context, batchNum uint64 } fullInfo := FullBatchInfo{ Number: batchNum, - BlockHash: batchBlockHash, PostedData: postedData, MsgCount: batchMsgCount, Preimages: preimages, @@ -427,17 +431,30 @@ func (v *StatelessBlockValidator) CreateReadyValidationEntry(ctx context.Context } start := buildGlobalState(*prevResult, startPos) end := buildGlobalState(*result, endPos) - fullBatchReader := func(batchNum uint64) (*FullBatchInfo, error) { - found, fullBlockInfo, err := v.readBatch(ctx, batchNum) + found, fullBatchInfo, err := v.readFullBatch(ctx, start.Batch) + if err != nil { + return nil, err + } + if !found { + return nil, fmt.Errorf("batch %d not found", startPos.BatchNumber) + } + + prevBatchNums, err := msg.Message.PastBatchesRequired() + if err != nil { + return nil, err + } + prevBatches := make([]validator.BatchInfo, 0, len(prevBatchNums)) + for _, batchNum := range prevBatchNums { + data, err := v.readPostedBatch(ctx, batchNum) if err != nil { return nil, err } - if !found { - return nil, fmt.Errorf("batch %d not found", startPos.BatchNumber) - } - return fullBlockInfo, nil + prevBatches = append(prevBatches, validator.BatchInfo{ + Number: batchNum, + Data: data, + }) } - entry, err := newValidationEntry(pos, start, end, msg, fullBatchReader, prevDelayed, v.streamer.ChainConfig()) + entry, err := newValidationEntry(pos, start, end, msg, fullBatchInfo, prevBatches, prevDelayed, v.streamer.ChainConfig()) if err != nil { return nil, err } From 7553cec93b4d29c6ee68d0192ab586ad3fcac9d6 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Wed, 11 Sep 2024 21:19:48 -0600 Subject: [PATCH 09/15] go-ethereum: remove koanf from recordingdb --- go-ethereum | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go-ethereum b/go-ethereum index 81114dde8a..c7539d8f7d 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit 81114dde8a26bae90c188605c4a36d5919a4a265 +Subproject commit c7539d8f7dfa3070a050220588cbe3fa20e2ce08 From d156b8081ebc5ebad47b6634317033fe21f7ad12 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Thu, 12 Sep 2024 12:40:39 -0600 Subject: [PATCH 10/15] add traces to workers --- pubsub/consumer.go | 6 ++++++ validator/valnode/redis/consumer.go | 8 ++++++++ 2 files changed, 14 insertions(+) diff --git a/pubsub/consumer.go b/pubsub/consumer.go index df3695606d..1fccae56ad 100644 --- a/pubsub/consumer.go +++ b/pubsub/consumer.go @@ -77,6 +77,10 @@ func (c *Consumer[Request, Response]) Start(ctx context.Context) { ) } +func (c *Consumer[Request, Response]) Id() string { + return c.id +} + func (c *Consumer[Request, Response]) StopAndWait() { c.StopWaiter.StopAndWait() c.deleteHeartBeat(c.GetParentContext()) @@ -164,10 +168,12 @@ func (c *Consumer[Request, Response]) SetResult(ctx context.Context, messageID s if err != nil { return fmt.Errorf("marshaling result: %w", err) } + log.Trace("consumer: setting result", "cid", c.id, "messageId", messageID) acquired, err := c.client.SetNX(ctx, messageID, resp, c.cfg.ResponseEntryTimeout).Result() if err != nil || !acquired { return fmt.Errorf("setting result for message: %v, error: %w", messageID, err) } + log.Trace("consumer: xack", "cid", c.id, "messageId", messageID) if _, err := c.client.XAck(ctx, c.redisStream, c.redisGroup, messageID).Result(); err != nil { return fmt.Errorf("acking message: %v, error: %w", messageID, err) } diff --git a/validator/valnode/redis/consumer.go b/validator/valnode/redis/consumer.go index 2b025600cc..90bffbb704 100644 --- a/validator/valnode/redis/consumer.go +++ b/validator/valnode/redis/consumer.go @@ -103,11 +103,13 @@ func (s *ValidationServer) Start(ctx_in context.Context) { case <-ready: // Wait until the stream exists and start consuming iteratively. } s.StopWaiter.CallIteratively(func(ctx context.Context) time.Duration { + log.Trace("waiting for request token", "cid", c.Id()) select { case <-ctx.Done(): return 0 case <-requestTokenQueue: } + log.Trace("got request token", "cid", c.Id()) req, err := c.Consume(ctx) if err != nil { log.Error("Consuming request", "error", err) @@ -115,10 +117,12 @@ func (s *ValidationServer) Start(ctx_in context.Context) { return 0 } if req == nil { + log.Trace("consumed nil", "cid", c.Id()) // There's nothing in the queue requestTokenQueue <- struct{}{} return time.Second } + log.Trace("forwarding work", "cid", c.Id(), "workid", req.ID) select { case <-ctx.Done(): case workQueue <- workUnit{req, moduleRoot}: @@ -144,20 +148,24 @@ func (s *ValidationServer) Start(ctx_in context.Context) { for i := 0; i < workers; i++ { s.StopWaiter.LaunchThread(func(ctx context.Context) { for { + log.Trace("waiting for work", "thread", i) var work workUnit select { case <-ctx.Done(): return case work = <-workQueue: } + log.Trace("got work", "thread", i, "workid", work.req.ID) valRun := s.spawner.Launch(work.req.Value, work.moduleRoot) res, err := valRun.Await(ctx) if err != nil { log.Error("Error validating", "request value", work.req.Value, "error", err) } else { + log.Trace("done work", "thread", i, "workid", work.req.ID) if err := s.consumers[work.moduleRoot].SetResult(ctx, work.req.ID, res); err != nil { log.Error("Error setting result for request", "id", work.req.ID, "result", res, "error", err) } + log.Trace("set result", "thread", i, "workid", work.req.ID) } select { case <-ctx.Done(): From 1ab23ac4fcb8bf088244254a2db8b56dd2b4dc90 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Thu, 12 Sep 2024 12:43:13 -0600 Subject: [PATCH 11/15] redis consumer logs to Debug --- pubsub/consumer.go | 4 ++-- validator/valnode/redis/consumer.go | 18 +++++++++--------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pubsub/consumer.go b/pubsub/consumer.go index 1fccae56ad..bd73e729e7 100644 --- a/pubsub/consumer.go +++ b/pubsub/consumer.go @@ -168,12 +168,12 @@ func (c *Consumer[Request, Response]) SetResult(ctx context.Context, messageID s if err != nil { return fmt.Errorf("marshaling result: %w", err) } - log.Trace("consumer: setting result", "cid", c.id, "messageId", messageID) + log.Debug("consumer: setting result", "cid", c.id, "messageId", messageID) acquired, err := c.client.SetNX(ctx, messageID, resp, c.cfg.ResponseEntryTimeout).Result() if err != nil || !acquired { return fmt.Errorf("setting result for message: %v, error: %w", messageID, err) } - log.Trace("consumer: xack", "cid", c.id, "messageId", messageID) + log.Debug("consumer: xack", "cid", c.id, "messageId", messageID) if _, err := c.client.XAck(ctx, c.redisStream, c.redisGroup, messageID).Result(); err != nil { return fmt.Errorf("acking message: %v, error: %w", messageID, err) } diff --git a/validator/valnode/redis/consumer.go b/validator/valnode/redis/consumer.go index 90bffbb704..49e72477f5 100644 --- a/validator/valnode/redis/consumer.go +++ b/validator/valnode/redis/consumer.go @@ -103,13 +103,13 @@ func (s *ValidationServer) Start(ctx_in context.Context) { case <-ready: // Wait until the stream exists and start consuming iteratively. } s.StopWaiter.CallIteratively(func(ctx context.Context) time.Duration { - log.Trace("waiting for request token", "cid", c.Id()) + log.Debug("waiting for request token", "cid", c.Id()) select { case <-ctx.Done(): return 0 case <-requestTokenQueue: } - log.Trace("got request token", "cid", c.Id()) + log.Debug("got request token", "cid", c.Id()) req, err := c.Consume(ctx) if err != nil { log.Error("Consuming request", "error", err) @@ -117,12 +117,12 @@ func (s *ValidationServer) Start(ctx_in context.Context) { return 0 } if req == nil { - log.Trace("consumed nil", "cid", c.Id()) + log.Debug("consumed nil", "cid", c.Id()) // There's nothing in the queue requestTokenQueue <- struct{}{} return time.Second } - log.Trace("forwarding work", "cid", c.Id(), "workid", req.ID) + log.Debug("forwarding work", "cid", c.Id(), "workid", req.ID) select { case <-ctx.Done(): case workQueue <- workUnit{req, moduleRoot}: @@ -135,7 +135,7 @@ func (s *ValidationServer) Start(ctx_in context.Context) { for { select { case <-readyStreams: - log.Trace("At least one stream is ready") + log.Debug("At least one stream is ready") return // Don't block Start if at least one of the stream is ready. case <-time.After(s.config.StreamTimeout): log.Error("Waiting for redis streams timed out") @@ -148,24 +148,24 @@ func (s *ValidationServer) Start(ctx_in context.Context) { for i := 0; i < workers; i++ { s.StopWaiter.LaunchThread(func(ctx context.Context) { for { - log.Trace("waiting for work", "thread", i) + log.Debug("waiting for work", "thread", i) var work workUnit select { case <-ctx.Done(): return case work = <-workQueue: } - log.Trace("got work", "thread", i, "workid", work.req.ID) + log.Debug("got work", "thread", i, "workid", work.req.ID) valRun := s.spawner.Launch(work.req.Value, work.moduleRoot) res, err := valRun.Await(ctx) if err != nil { log.Error("Error validating", "request value", work.req.Value, "error", err) } else { - log.Trace("done work", "thread", i, "workid", work.req.ID) + log.Debug("done work", "thread", i, "workid", work.req.ID) if err := s.consumers[work.moduleRoot].SetResult(ctx, work.req.ID, res); err != nil { log.Error("Error setting result for request", "id", work.req.ID, "result", res, "error", err) } - log.Trace("set result", "thread", i, "workid", work.req.ID) + log.Debug("set result", "thread", i, "workid", work.req.ID) } select { case <-ctx.Done(): From 7075c7f2e02c8404fadf0243da9282577a2406b6 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Thu, 12 Sep 2024 19:21:42 -0600 Subject: [PATCH 12/15] redis producer: delete entry after result exists --- pubsub/producer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pubsub/producer.go b/pubsub/producer.go index 2b1cdb5e3f..e7f2ac938c 100644 --- a/pubsub/producer.go +++ b/pubsub/producer.go @@ -234,6 +234,7 @@ func (p *Producer[Request, Response]) checkResponses(ctx context.Context) time.D promise.Produce(resp) responded++ } + p.client.Del(ctx, id) delete(p.promises, id) } var trimmed int64 From f5750e7ad22812cf193aabb959badf4a8b25d0cb Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Thu, 12 Sep 2024 19:23:52 -0600 Subject: [PATCH 13/15] producer: debug check responses loop --- pubsub/producer.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pubsub/producer.go b/pubsub/producer.go index e7f2ac938c..5eec3a4b52 100644 --- a/pubsub/producer.go +++ b/pubsub/producer.go @@ -205,30 +205,33 @@ func setMinIdInt(min *[2]uint64, id string) error { // checkResponses checks iteratively whether response for the promise is ready. func (p *Producer[Request, Response]) checkResponses(ctx context.Context) time.Duration { minIdInt := [2]uint64{math.MaxUint64, math.MaxUint64} + log.Debug("redis producer: check responses starting") p.promisesLock.Lock() defer p.promisesLock.Unlock() responded := 0 errored := 0 + checked := 0 for id, promise := range p.promises { if ctx.Err() != nil { return 0 } + checked++ res, err := p.client.Get(ctx, id).Result() if err != nil { errSetId := setMinIdInt(&minIdInt, id) if errSetId != nil { - log.Error("error setting minId", "err", err) + log.Error("redis producer: error setting minId", "err", err) return p.cfg.CheckResultInterval } if !errors.Is(err, redis.Nil) { - log.Error("Error reading value in redis", "key", id, "error", err) + log.Error("redis producer: Error reading value in redis", "key", id, "error", err) } continue } var resp Response if err := json.Unmarshal([]byte(res), &resp); err != nil { promise.ProduceError(fmt.Errorf("error unmarshalling: %w", err)) - log.Error("Error unmarshaling", "value", res, "error", err) + log.Error("redis producer: Error unmarshaling", "value", res, "error", err) errored++ } else { promise.Produce(resp) @@ -246,7 +249,7 @@ func (p *Producer[Request, Response]) checkResponses(ctx context.Context) time.D } else { trimmed, trimErr = p.client.XTrimMaxLen(ctx, p.redisStream, 0).Result() } - log.Trace("trimming", "id", minId, "trimmed", trimmed, "responded", responded, "errored", errored, "trim-err", trimErr) + log.Debug("trimming", "id", minId, "trimmed", trimmed, "responded", responded, "errored", errored, "trim-err", trimErr, "checked", checked) return p.cfg.CheckResultInterval } From 0975c44a39e207dffb5f1f480112e5ed6da5f012 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Thu, 12 Sep 2024 19:29:27 -0600 Subject: [PATCH 14/15] update geth pin --- go-ethereum | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go-ethereum b/go-ethereum index c7539d8f7d..9ca65b8610 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit c7539d8f7dfa3070a050220588cbe3fa20e2ce08 +Subproject commit 9ca65b86101b4c87df188923d6eeff4db992520f From b5fa17d9a447d39b08680fdf9807301623aa2d2e Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Thu, 12 Sep 2024 20:25:42 -0600 Subject: [PATCH 15/15] fix race in test --- validator/valnode/redis/consumer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/validator/valnode/redis/consumer.go b/validator/valnode/redis/consumer.go index 49e72477f5..e0d53ffb2e 100644 --- a/validator/valnode/redis/consumer.go +++ b/validator/valnode/redis/consumer.go @@ -146,6 +146,7 @@ func (s *ValidationServer) Start(ctx_in context.Context) { } }) for i := 0; i < workers; i++ { + i := i s.StopWaiter.LaunchThread(func(ctx context.Context) { for { log.Debug("waiting for work", "thread", i)