diff --git a/.gitignore b/.gitignore index 4a278c03ebd..a34f4837113 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,7 @@ */**/*tx_database* */**/*dapps* build/_vendor/pkg +/*.a #* .#* diff --git a/common/chan.go b/common/chan.go new file mode 100644 index 00000000000..479ed807df3 --- /dev/null +++ b/common/chan.go @@ -0,0 +1,29 @@ +package common + +import "errors" + +var ErrStopped = errors.New("stopped") + +func Stopped(ch chan struct{}) error { + if ch == nil { + return nil + } + select { + case <-ch: + return ErrStopped + default: + } + return nil +} + +func SafeClose(ch chan struct{}) { + if ch == nil { + return + } + select { + case <-ch: + // Channel was already closed + default: + close(ch) + } +} diff --git a/common/etl/etl.go b/common/etl/etl.go index 992bf52abe6..fba3d0b8cd5 100644 --- a/common/etl/etl.go +++ b/common/etl/etl.go @@ -10,10 +10,11 @@ import ( "os" "runtime" + "github.com/ugorji/go/codec" + "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/log" - "github.com/ugorji/go/codec" ) var cbor codec.CborHandle @@ -24,6 +25,7 @@ type Decoder interface { type State interface { Get([]byte) ([]byte, error) + Stopped() error } type ExtractNextFunc func(k []byte, v interface{}) error @@ -40,9 +42,10 @@ func Transform( startkey []byte, extractFunc ExtractFunc, loadFunc LoadFunc, + quit chan struct{}, ) error { - filenames, err := extractBucketIntoFiles(db, fromBucket, startkey, datadir, extractFunc) + filenames, err := extractBucketIntoFiles(db, fromBucket, startkey, datadir, extractFunc, quit) defer func() { deleteFiles(filenames) @@ -52,7 +55,7 @@ func Transform( return err } - return loadFilesIntoBucket(db, toBucket, filenames, loadFunc) + return loadFilesIntoBucket(db, toBucket, filenames, loadFunc, quit) } func extractBucketIntoFiles( @@ -61,6 +64,7 @@ func extractBucketIntoFiles( startkey []byte, datadir string, extractFunc ExtractFunc, + quit chan struct{}, ) ([]string, error) { buffer := bytes.NewBuffer(make([]byte, 0)) encoder := codec.NewEncoder(nil, &cbor) @@ -98,6 +102,9 @@ func extractBucketIntoFiles( } err := db.Walk(bucket, startkey, len(startkey)*8, func(k, v []byte) (bool, error) { + if err := common.Stopped(quit); err != nil { + return false, err + } err := extractFunc(k, v, extractNextFunc) return true, err }) @@ -112,7 +119,7 @@ func extractBucketIntoFiles( return filenames, nil } -func loadFilesIntoBucket(db ethdb.Database, bucket []byte, files []string, loadFunc LoadFunc) error { +func loadFilesIntoBucket(db ethdb.Database, bucket []byte, files []string, loadFunc LoadFunc, quit chan struct{}) error { decoder := codec.NewDecoder(nil, &cbor) var m runtime.MemStats h := &Heap{} @@ -135,7 +142,7 @@ func loadFilesIntoBucket(db ethdb.Database, bucket []byte, files []string, loadF } } batch := db.NewBatch() - state := &bucketState{batch, bucket} + state := &bucketState{batch, bucket, quit} loadNextFunc := func(k, v []byte) error { if err := batch.Put(bucket, k, v); err != nil { @@ -158,6 +165,10 @@ func loadFilesIntoBucket(db ethdb.Database, bucket []byte, files []string, loadF } for h.Len() > 0 { + if err := common.Stopped(quit); err != nil { + return err + } + element := (heap.Pop(h)).(HeapElem) reader := readers[element.timeIdx] decoder.ResetBytes(element.value) @@ -269,8 +280,13 @@ func readElementFromDisk(decoder Decoder) ([]byte, []byte, error) { type bucketState struct { getter ethdb.Getter bucket []byte + quit chan struct{} } func (s *bucketState) Get(key []byte) ([]byte, error) { return s.getter.Get(s.bucket, key) } + +func (s *bucketState) Stopped() error { + return common.Stopped(s.quit) +} diff --git a/core/tx_pool.go b/core/tx_pool.go index ebd2cc9f2aa..088c51d6c36 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -475,7 +475,9 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) { // Stop terminates the transaction pool. func (pool *TxPool) Stop() { // Unsubscribe all subscriptions registered from txpool - pool.scope.Close() + if pool == nil { + return + } // Unsubscribe subscriptions registered from blockchain pool.chainHeadSub.Unsubscribe() diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 50fb76179a0..c4a9b01a18c 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -563,6 +563,7 @@ func (d *Downloader) spawnSync(fetchers []func() error) error { fn := fn go func() { defer d.cancelWg.Done(); errc <- fn() }() } + // Wait for the first error, then terminate the others. var err error for i := 0; i < len(fetchers); i++ { @@ -576,8 +577,10 @@ func (d *Downloader) spawnSync(fetchers []func() error) error { break } } + d.queue.Close() d.Cancel() + return err } @@ -588,15 +591,7 @@ func (d *Downloader) cancel() { // Close the current cancel channel d.cancelLock.Lock() defer d.cancelLock.Unlock() - - if d.cancelCh != nil { - select { - case <-d.cancelCh: - // Channel was already closed - default: - close(d.cancelCh) - } - } + common.SafeClose(d.cancelCh) } // Cancel aborts all of the operations and waits for all download goroutines to @@ -615,11 +610,7 @@ func (d *Downloader) Terminate() { d.blockchain.Stop() // Close the termination channel (make sure double close is allowed) d.quitLock.Lock() - select { - case <-d.quitCh: - default: - close(d.quitCh) - } + common.SafeClose(d.quitCh) d.quitLock.Unlock() // Cancel any pending download requests @@ -1487,10 +1478,8 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er gotHeaders = true for len(headers) > 0 { // Terminate if something failed in between processing chunks - select { - case <-d.cancelCh: - return errCanceled - default: + if err := common.Stopped(d.quitCh); err != nil { + return err } // Select the next chunk of headers to import limit := maxHeadersProcess @@ -1604,10 +1593,8 @@ func (d *Downloader) importBlockResults(results []*fetchResult, execute bool) (u if len(results) == 0 { return 0, nil } - select { - case <-d.quitCh: + if err := common.Stopped(d.quitCh); err != nil { return 0, errCancelContentProcessing - default: } // Retrieve the a batch of results to import first, last := results[0].Header, results[len(results)-1].Header diff --git a/eth/downloader/stagedsync_downloader.go b/eth/downloader/stagedsync_downloader.go index 851ab80bdc1..66e4d42cd0a 100644 --- a/eth/downloader/stagedsync_downloader.go +++ b/eth/downloader/stagedsync_downloader.go @@ -8,83 +8,48 @@ import ( ) func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers []func() error) error { - log.Info("Sync stage 1/7. Downloading headers...") - var err error + defer log.Info("Staged sync finished") /* * Stage 1. Download Headers */ - if err = d.spawnSync(headersFetchers); err != nil { + log.Info("Sync stage 1/7. Downloading headers...") + err = d.DownloadHeaders(headersFetchers) + if err != nil { return err } - log.Info("Sync stage 1/7. Downloading headers... Complete!") - log.Info("Checking for unwinding...") - // Check unwinds backwards and if they are outstanding, invoke corresponding functions - for stage := Finish - 1; stage > Headers; stage-- { - unwindPoint, err := GetStageUnwind(d.stateDB, stage) - if err != nil { - return err - } - if unwindPoint == 0 { - continue - } - switch stage { - case Bodies: - err = d.unwindBodyDownloadStage(unwindPoint) - case Senders: - err = d.unwindSendersStage(unwindPoint) - case Execution: - err = unwindExecutionStage(unwindPoint, d.stateDB) - case HashCheck: - err = unwindHashCheckStage(unwindPoint, d.stateDB) - case AccountHistoryIndex: - err = unwindAccountHistoryIndex(unwindPoint, d.stateDB, core.UsePlainStateExecution) - case StorageHistoryIndex: - err = unwindStorageHistoryIndex(unwindPoint, d.stateDB, core.UsePlainStateExecution) - default: - return fmt.Errorf("unrecognized stage for unwinding: %d", stage) - } - if err != nil { - return fmt.Errorf("error unwinding stage: %d: %v", stage, err) - } - } - log.Info("Checking for unwinding... Complete!") - log.Info("Sync stage 2/7. Downloading block bodies...") - /* * Stage 2. Download Block bodies */ + log.Info("Sync stage 2/7. Downloading block bodies...") cont := true for cont && err == nil { cont, err = d.spawnBodyDownloadStage(p.id) - } - if err != nil { - return err + if err != nil { + return err + } } log.Info("Sync stage 2/7. Downloading block bodies... Complete!") + /* * Stage 3. Recover senders from tx signatures */ log.Info("Sync stage 3/7. Recovering senders from tx signatures...") - err = d.spawnRecoverSendersStage() - if err != nil { + if err = d.spawnRecoverSendersStage(); err != nil { return err } - log.Info("Sync stage 3/7. Recovering senders from tx signatures... Complete!") - log.Info("Sync stage 4/7. Executing blocks w/o hash checks...") /* * Stage 4. Execute block bodies w/o calculating trie roots */ - - var syncHeadNumber uint64 - syncHeadNumber, err = spawnExecuteBlocksStage(d.stateDB, d.blockchain) + log.Info("Sync stage 4/7. Executing blocks w/o hash checks...") + syncHeadNumber, err := spawnExecuteBlocksStage(d.stateDB, d.blockchain, d.quitCh) if err != nil { return err } @@ -93,7 +58,8 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers // Further stages go there log.Info("Sync stage 5/7. Validating final hash") - if err = spawnCheckFinalHashStage(d.stateDB, syncHeadNumber, d.datadir); err != nil { + err = spawnCheckFinalHashStage(d.stateDB, syncHeadNumber, d.datadir, d.quitCh) + if err != nil { return err } @@ -101,7 +67,7 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers if d.history { log.Info("Sync stage 6/7. Generating account history index") - err = spawnAccountHistoryIndex(d.stateDB, d.datadir, core.UsePlainStateExecution) + err = spawnAccountHistoryIndex(d.stateDB, d.datadir, core.UsePlainStateExecution, d.quitCh) if err != nil { return err } @@ -112,7 +78,7 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers if d.history { log.Info("Sync stage 7/7. Generating storage history index") - err = spawnStorageHistoryIndex(d.stateDB, d.datadir, core.UsePlainStateExecution) + err = spawnStorageHistoryIndex(d.stateDB, d.datadir, core.UsePlainStateExecution, d.quitCh) if err != nil { return err } @@ -123,3 +89,47 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers return err } + +func (d *Downloader) DownloadHeaders(headersFetchers []func() error) error { + err := d.spawnSync(headersFetchers) + if err != nil { + return err + } + + log.Info("Sync stage 1/7. Downloading headers... Complete!") + log.Info("Checking for unwinding...") + // Check unwinds backwards and if they are outstanding, invoke corresponding functions + for stage := Finish - 1; stage > Headers; stage-- { + unwindPoint, err := GetStageUnwind(d.stateDB, stage) + if err != nil { + return err + } + + if unwindPoint == 0 { + continue + } + + switch stage { + case Bodies: + err = d.unwindBodyDownloadStage(unwindPoint) + case Senders: + err = d.unwindSendersStage(unwindPoint) + case Execution: + err = unwindExecutionStage(unwindPoint, d.stateDB) + case HashCheck: + err = unwindHashCheckStage(unwindPoint, d.stateDB) + case AccountHistoryIndex: + err = unwindAccountHistoryIndex(unwindPoint, d.stateDB, core.UsePlainStateExecution) + case StorageHistoryIndex: + err = unwindStorageHistoryIndex(unwindPoint, d.stateDB, core.UsePlainStateExecution) + default: + return fmt.Errorf("unrecognized stage for unwinding: %d", stage) + } + + if err != nil { + return fmt.Errorf("error unwinding stage: %d: %w", stage, err) + } + } + log.Info("Checking for unwinding... Complete!") + return nil +} diff --git a/eth/downloader/stagedsync_stage_bodies.go b/eth/downloader/stagedsync_stage_bodies.go index 508fd74ada6..5e41d4d512b 100644 --- a/eth/downloader/stagedsync_stage_bodies.go +++ b/eth/downloader/stagedsync_stage_bodies.go @@ -23,7 +23,7 @@ func (d *Downloader) spawnBodyDownloadStage(id string) (bool, error) { // Figure out how many blocks have already been downloaded origin, err := GetStageProgress(d.stateDB, Bodies) if err != nil { - return false, fmt.Errorf("getting Bodies stage progress: %v", err) + return false, fmt.Errorf("getting Bodies stage progress: %w", err) } // Figure out how many headers we have currentNumber := origin + 1 @@ -34,6 +34,10 @@ func (d *Downloader) spawnBodyDownloadStage(id string) (bool, error) { var headers = make(map[common.Hash]*types.Header) // We use map because there might be more than one header by block number var hashCount = 0 err = d.stateDB.Walk(dbutils.HeaderPrefix, dbutils.EncodeBlockNumber(currentNumber), 0, func(k, v []byte) (bool, error) { + if err = common.Stopped(d.quitCh); err != nil { + return false, err + } + // Skip non relevant records if len(k) == 8+len(dbutils.HeaderHashSuffix) && bytes.Equal(k[8:], dbutils.HeaderHashSuffix) { // This is how we learn about canonical chain @@ -65,11 +69,11 @@ func (d *Downloader) spawnBodyDownloadStage(id string) (bool, error) { return true, nil }) if err != nil { - return false, fmt.Errorf("walking over canonical hashes: %v", err) + return false, fmt.Errorf("walking over canonical hashes: %w", err) } if missingHeader != 0 { if err1 := SaveStageProgress(d.stateDB, Headers, missingHeader); err1 != nil { - return false, fmt.Errorf("resetting SyncStage Headers to missing header: %v", err1) + return false, fmt.Errorf("resetting SyncStage Headers to missing header: %w", err1) } // This will cause the sync return to the header stage return false, nil @@ -83,15 +87,20 @@ func (d *Downloader) spawnBodyDownloadStage(id string) (bool, error) { d.queue.Prepare(from, d.mode) d.queue.ScheduleBodies(from, hashes[:hashCount-1], headers) to := from + uint64(hashCount-1) + select { case d.bodyWakeCh <- true: case <-d.cancelCh: + case <-d.quitCh: + return false, errCanceled } + // Now fetch all the bodies fetchers := []func() error{ func() error { return d.fetchBodies(from) }, func() error { return d.processBodiesStage(to) }, } + return true, d.spawnSync(fetchers) } @@ -99,6 +108,10 @@ func (d *Downloader) spawnBodyDownloadStage(id string) (bool, error) { // it doesn't execute blocks func (d *Downloader) processBodiesStage(to uint64) error { for { + if err := common.Stopped(d.quitCh); err != nil { + return err + } + results := d.queue.Results(true) if len(results) == 0 { return nil @@ -110,6 +123,7 @@ func (d *Downloader) processBodiesStage(to uint64) error { if lastNumber == to { select { case d.bodyWakeCh <- false: + case <-d.quitCh: case <-d.cancelCh: } return nil diff --git a/eth/downloader/stagedsync_stage_execute.go b/eth/downloader/stagedsync_stage_execute.go index 2b4c03e3a6b..ffc87521902 100644 --- a/eth/downloader/stagedsync_stage_execute.go +++ b/eth/downloader/stagedsync_stage_execute.go @@ -2,9 +2,9 @@ package downloader import ( "fmt" - //"os" + // "os" "runtime" - //"runtime/pprof" + // "runtime/pprof" "sync/atomic" "time" @@ -72,7 +72,7 @@ func (l *progressLogger) Stop() { const StateBatchSize = 50 * 1024 * 1024 // 50 Mb const ChangeBatchSize = 1024 * 2014 // 1 Mb -func spawnExecuteBlocksStage(stateDB ethdb.Database, blockchain BlockChain) (uint64, error) { +func spawnExecuteBlocksStage(stateDB ethdb.Database, blockchain BlockChain, quit chan struct{}) (uint64, error) { lastProcessedBlockNumber, err := GetStageProgress(stateDB, Execution) if err != nil { return 0, err @@ -109,6 +109,10 @@ func spawnExecuteBlocksStage(stateDB ethdb.Database, blockchain BlockChain) (uin engine := blockchain.Engine() vmConfig := blockchain.GetVMConfig() for { + if err = common.Stopped(quit); err != nil { + return 0, err + } + blockNum := atomic.LoadUint64(&nextBlockNumber) block := blockchain.GetBlockByNumber(blockNum) @@ -178,15 +182,19 @@ func spawnExecuteBlocksStage(stateDB ethdb.Database, blockchain BlockChain) (uin } */ } + + // the last processed block + syncHeadNumber := atomic.LoadUint64(&nextBlockNumber) - 1 + _, err = stateBatch.Commit() if err != nil { - return atomic.LoadUint64(&nextBlockNumber) - 1, fmt.Errorf("sync Execute: failed to write state batch commit: %v", err) + return syncHeadNumber, fmt.Errorf("sync Execute: failed to write state batch commit: %v", err) } _, err = changeBatch.Commit() if err != nil { - return atomic.LoadUint64(&nextBlockNumber) - 1, fmt.Errorf("sync Execute: failed to write change batch commit: %v", err) + return syncHeadNumber, fmt.Errorf("sync Execute: failed to write change batch commit: %v", err) } - return atomic.LoadUint64(&nextBlockNumber) - 1 /* the last processed block */, nil + return syncHeadNumber, nil } func unwindExecutionStage(unwindPoint uint64, stateDB ethdb.Database) error { @@ -194,6 +202,7 @@ func unwindExecutionStage(unwindPoint uint64, stateDB ethdb.Database) error { if err != nil { return fmt.Errorf("unwind Execution: get stage progress: %v", err) } + if unwindPoint >= lastProcessedBlockNumber { err = SaveStageUnwind(stateDB, Execution, 0) if err != nil { @@ -212,6 +221,7 @@ func unwindExecutionStage(unwindPoint uint64, stateDB ethdb.Database) error { deleteAccountFunc := deleteAccountHashed writeAccountFunc := writeAccountHashed recoverCodeHashFunc := recoverCodeHashHashed + if core.UsePlainStateExecution { rewindFunc = ethdb.RewindDataPlain stateBucket = dbutils.PlainStateBucket @@ -223,16 +233,18 @@ func unwindExecutionStage(unwindPoint uint64, stateDB ethdb.Database) error { recoverCodeHashFunc = recoverCodeHashPlain } - accountMap, storageMap, err2 := rewindFunc(stateDB, lastProcessedBlockNumber, unwindPoint) - if err2 != nil { + accountMap, storageMap, err := rewindFunc(stateDB, lastProcessedBlockNumber, unwindPoint) + if err != nil { return fmt.Errorf("unwind Execution: getting rewind data: %v", err) } + for key, value := range accountMap { if len(value) > 0 { var acc accounts.Account if err = acc.DecodeForStorage(value); err != nil { return err } + // Fetch the code hash recoverCodeHashFunc(&acc, stateDB, key) if err = writeAccountFunc(mutation, key, acc); err != nil { @@ -261,14 +273,17 @@ func unwindExecutionStage(unwindPoint uint64, stateDB ethdb.Database) error { return err } } + err = SaveStageUnwind(mutation, Execution, 0) if err != nil { return fmt.Errorf("unwind Execution: reset: %v", err) } + _, err = mutation.Commit() if err != nil { return fmt.Errorf("unwind Execute: failed to write db commit: %v", err) } + return nil } diff --git a/eth/downloader/stagedsync_stage_hashcheck.go b/eth/downloader/stagedsync_stage_hashcheck.go index 2cf008960cc..d9ce1c43fb0 100644 --- a/eth/downloader/stagedsync_stage_hashcheck.go +++ b/eth/downloader/stagedsync_stage_hashcheck.go @@ -18,7 +18,7 @@ import ( var cbor codec.CborHandle -func spawnCheckFinalHashStage(stateDB ethdb.Database, syncHeadNumber uint64, datadir string) error { +func spawnCheckFinalHashStage(stateDB ethdb.Database, syncHeadNumber uint64, datadir string, quit chan struct{}) error { hashProgress, err := GetStageProgress(stateDB, HashCheck) if err != nil { return err @@ -31,7 +31,7 @@ func spawnCheckFinalHashStage(stateDB ethdb.Database, syncHeadNumber uint64, dat } if core.UsePlainStateExecution { - err = promoteHashedState(stateDB, hashProgress, datadir) + err = promoteHashedState(stateDB, hashProgress, datadir, quit) if err != nil { return err } @@ -90,15 +90,17 @@ func unwindHashCheckStage(unwindPoint uint64, stateDB ethdb.Database) error { return nil } -func promoteHashedState(db ethdb.Database, progress uint64, datadir string) error { +func promoteHashedState(db ethdb.Database, progress uint64, datadir string, quit chan struct{}) error { if progress == 0 { - return promoteHashedStateCleanly(db, datadir) + return promoteHashedStateCleanly(db, datadir, quit) } return errors.New("incremental state promotion not implemented") } -func promoteHashedStateCleanly(db ethdb.Database, datadir string) error { - +func promoteHashedStateCleanly(db ethdb.Database, datadir string, quit chan struct{}) error { + if err := common.Stopped(quit); err != nil { + return err + } err := etl.Transform( db, dbutils.PlainStateBucket, @@ -107,6 +109,7 @@ func promoteHashedStateCleanly(db ethdb.Database, datadir string) error { nil, keyTransformExtractFunc(transformPlainStateKey), identityLoadFunc, + quit, ) if err != nil { @@ -121,6 +124,7 @@ func promoteHashedStateCleanly(db ethdb.Database, datadir string) error { nil, keyTransformExtractFunc(transformContractCodeKey), identityLoadFunc, + quit, ) } diff --git a/eth/downloader/stagedsync_stage_hashcheck_test.go b/eth/downloader/stagedsync_stage_hashcheck_test.go index 9cedef36f31..2541e5e55d6 100644 --- a/eth/downloader/stagedsync_stage_hashcheck_test.go +++ b/eth/downloader/stagedsync_stage_hashcheck_test.go @@ -25,7 +25,7 @@ func TestPromoteHashedStateClearState(t *testing.T) { generateBlocks(t, 1, 50, plainWriterGen(db2), changeCodeWithIncarnations) m2 := db2.NewBatch() - err := promoteHashedState(m2, 0, getDataDir()) + err := promoteHashedState(m2, 0, getDataDir(), nil) if err != nil { t.Errorf("error while promoting state: %v", err) } @@ -46,7 +46,7 @@ func TestPromoteHashedStateIncremental(t *testing.T) { generateBlocks(t, 1, 50, plainWriterGen(db2), changeCodeWithIncarnations) m2 := db2.NewBatch() - err := promoteHashedState(m2, 0, getDataDir()) + err := promoteHashedState(m2, 0, getDataDir(), nil) if err != nil { t.Errorf("error while promoting state: %v", err) } @@ -59,7 +59,7 @@ func TestPromoteHashedStateIncremental(t *testing.T) { generateBlocks(t, 51, 50, plainWriterGen(db2), changeCodeWithIncarnations) m2 = db2.NewBatch() - err = promoteHashedState(m2, 50, getDataDir()) + err = promoteHashedState(m2, 50, getDataDir(), nil) if err != nil { t.Errorf("error while promoting state: %v", err) } @@ -81,7 +81,7 @@ func TestPromoteHashedStateIncrementalMixed(t *testing.T) { generateBlocks(t, 51, 50, plainWriterGen(db2), changeCodeWithIncarnations) m2 := db2.NewBatch() - err := promoteHashedState(m2, 50, getDataDir()) + err := promoteHashedState(m2, 50, getDataDir(), nil) if err != nil { t.Errorf("error while promoting state: %v", err) } diff --git a/eth/downloader/stagedsync_stage_indexes.go b/eth/downloader/stagedsync_stage_indexes.go index 622ad191267..19ba31bb035 100644 --- a/eth/downloader/stagedsync_stage_indexes.go +++ b/eth/downloader/stagedsync_stage_indexes.go @@ -16,7 +16,7 @@ const ( emptyValBit uint64 = 0x8000000000000000 ) -func spawnAccountHistoryIndex(db ethdb.Database, datadir string, plainState bool) error { +func spawnAccountHistoryIndex(db ethdb.Database, datadir string, plainState bool, quit chan struct{}) error { if plainState { log.Info("Skipped account index generation for plain state") return nil @@ -44,6 +44,7 @@ func spawnAccountHistoryIndex(db ethdb.Database, datadir string, plainState bool startkey, getExtractFunc(bytes2walker), loadFunc, + quit, ) if err != nil { return err @@ -55,7 +56,7 @@ func spawnAccountHistoryIndex(db ethdb.Database, datadir string, plainState bool return nil } -func spawnStorageHistoryIndex(db ethdb.Database, datadir string, plainState bool) error { +func spawnStorageHistoryIndex(db ethdb.Database, datadir string, plainState bool, quit chan struct{}) error { if plainState { log.Info("Skipped storage index generation for plain state") return nil @@ -82,7 +83,8 @@ func spawnStorageHistoryIndex(db ethdb.Database, datadir string, plainState bool datadir, startkey, getExtractFunc(bytes2walker), - loadFunc) + loadFunc, + quit) if err != nil { return err } @@ -134,6 +136,10 @@ func loadFunc(k []byte, valueDecoder etl.Decoder, state etl.State, next etl.Load return err } for _, b := range blockNumbers { + if err = state.Stopped(); err != nil { + return err + } + vzero := (b & emptyValBit) != 0 blockNr := b &^ emptyValBit currentChunkKey := dbutils.IndexChunkKey(k, ^uint64(0)) @@ -162,6 +168,10 @@ func loadFunc(k []byte, valueDecoder etl.Decoder, state etl.State, next etl.Load } index = index.Append(blockNr, vzero) + if err = state.Stopped(); err != nil { + return err + } + err = next(currentChunkKey, index) if err != nil { return err diff --git a/eth/downloader/stagedsync_stage_senders.go b/eth/downloader/stagedsync_stage_senders.go index 8d3277f8a21..1c7bef71dbe 100644 --- a/eth/downloader/stagedsync_stage_senders.go +++ b/eth/downloader/stagedsync_stage_senders.go @@ -63,12 +63,16 @@ func (d *Downloader) spawnRecoverSendersStage() error { for i := 0; i < numOfGoroutines; i++ { // each goroutine gets it's own crypto context to make sure they are really parallel - go recoverSenders(cryptoContexts[i], jobs, out) + go recoverSenders(cryptoContexts[i], jobs, out, d.quitCh) } log.Info("Sync (Senders): Started recoverer goroutines", "numOfGoroutines", numOfGoroutines) needExit := false for !needExit { + if err = common.Stopped(d.quitCh); err != nil { + return err + } + written := 0 for i := 0; i < batchSize; i++ { hash := rawdb.ReadCanonicalHash(mutation, nextBlockNumber) @@ -122,9 +126,13 @@ type senderRecoveryJob struct { err error } -func recoverSenders(cryptoContext *secp256k1.Context, in chan *senderRecoveryJob, out chan *senderRecoveryJob) { +func recoverSenders(cryptoContext *secp256k1.Context, in chan *senderRecoveryJob, out chan *senderRecoveryJob, quit chan struct{}) { var job *senderRecoveryJob for { + if err := common.Stopped(quit); err != nil { + return + } + job = <-in if job == nil { return diff --git a/eth/downloader/stagedsync_stages.go b/eth/downloader/stagedsync_stages.go index cda48815e3e..948d4247bbc 100644 --- a/eth/downloader/stagedsync_stages.go +++ b/eth/downloader/stagedsync_stages.go @@ -28,14 +28,14 @@ import ( type SyncStage byte const ( - Headers SyncStage = iota // Headers are downloaded, their Proof-Of-Work validity and chaining is verified - Bodies // Block bodies are downloaded, TxHash and UncleHash are getting verified - Senders // "From" recovered from signatures, bodies re-written - Execution // Executing each block w/o buildinf a trie - HashCheck // Checking the root hash - AccountHistoryIndex // Generating history index for accounts - StorageHistoryIndex // Generating history index for storage - Finish // Nominal stage after all other stages + Headers SyncStage = iota // Headers are downloaded, their Proof-Of-Work validity and chaining is verified + Bodies // Block bodies are downloaded, TxHash and UncleHash are getting verified + Senders // "From" recovered from signatures, bodies re-written + Execution // Executing each block w/o buildinf a trie + HashCheck // Checking the root hash + AccountHistoryIndex // Generating history index for accounts + StorageHistoryIndex // Generating history index for storage + Finish // Nominal stage after all other stages ) // GetStageProcess retrieves saved progress of given sync stage from the database @@ -55,9 +55,7 @@ func GetStageProgress(db ethdb.Getter, stage SyncStage) (uint64, error) { // SaveStageProgress saves the progress of the given stage in the database func SaveStageProgress(db ethdb.Putter, stage SyncStage, progress uint64) error { - var v [8]byte - binary.BigEndian.PutUint64(v[:], progress) - return db.Put(dbutils.SyncStageProgress, []byte{byte(stage)}, v[:]) + return db.Put(dbutils.SyncStageProgress, []byte{byte(stage)}, encodeBigEndian(progress)) } // UnwindAllStages marks all the stages after the Headers stage (where unwinding is initiated) to be unwound @@ -68,10 +66,12 @@ func UnwindAllStages(db ethdb.GetterPutter, unwindPoint uint64) error { if err != nil { return err } - progress, err1 := GetStageProgress(db, stage) - if err1 != nil { - return err1 + + progress, err := GetStageProgress(db, stage) + if err != nil { + return err } + if (existingUnwindPoint == 0 || existingUnwindPoint > unwindPoint) && unwindPoint < progress { // Only lower, not higher err = SaveStageUnwind(db, stage, unwindPoint) @@ -83,7 +83,7 @@ func UnwindAllStages(db ethdb.GetterPutter, unwindPoint uint64) error { return nil } -// GetStageInvalidation retrives the invalidation for the given stage +// GetStageInvalidation retrieves the invalidation for the given stage // Invalidation means that that stage needs to rollback to the invalidation // point and be redone func GetStageUnwind(db ethdb.Getter, stage SyncStage) (uint64, error) { @@ -102,7 +102,11 @@ func GetStageUnwind(db ethdb.Getter, stage SyncStage) (uint64, error) { // SaveStageInvalidation saves the progress of the given stage in the database func SaveStageUnwind(db ethdb.Putter, stage SyncStage, invalidation uint64) error { + return db.Put(dbutils.SyncStageUnwind, []byte{byte(stage)}, encodeBigEndian(invalidation)) +} + +func encodeBigEndian(n uint64) []byte { var v [8]byte - binary.BigEndian.PutUint64(v[:], invalidation) - return db.Put(dbutils.SyncStageUnwind, []byte{byte(stage)}, v[:]) + binary.BigEndian.PutUint64(v[:], n) + return v[:] } diff --git a/eth/handler.go b/eth/handler.go index aa7c171b3a3..e19c01afecd 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -350,21 +350,27 @@ func (pm *ProtocolManager) Start(maxPeers int) { pm.maxPeers = maxPeers // broadcast transactions - pm.wg.Add(1) pm.txsCh = make(chan core.NewTxsEvent, txChanSize) pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh) if pm.txsSub != nil { + pm.wg.Add(1) go pm.txBroadcastLoop() } // broadcast mined blocks - pm.wg.Add(1) pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) - go pm.minedBroadcastLoop() + if pm.minedBlockSub != nil { + pm.wg.Add(1) + go pm.minedBroadcastLoop() + } // start sync handlers - pm.wg.Add(2) - go pm.chainSync.loop() + if pm.chainSync != nil { + pm.wg.Add(1) + go pm.chainSync.loop() + } + + pm.wg.Add(1) go pm.txsyncLoop64() // TODO(karalabe): Legacy initial tx echange, drop with eth/64. } @@ -372,7 +378,9 @@ func (pm *ProtocolManager) Stop() { if pm.txsSub != nil { pm.txsSub.Unsubscribe() // quits txBroadcastLoop } - pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop + if pm.minedBlockSub != nil { + pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop + } // Quit chainSync and txsync64. // After this is done, no new peers will be accepted. @@ -1197,6 +1205,8 @@ func (pm *ProtocolManager) txBroadcastLoop() { for { select { + case <-pm.quitSync: + return case event := <-pm.txsCh: // For testing purpose only, disable propagation if pm.broadcastTxAnnouncesOnly { diff --git a/miner/miner.go b/miner/miner.go index f78aa2ddc70..06339c750e6 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -133,6 +133,9 @@ func (miner *Miner) Start(coinbase common.Address) { } func (miner *Miner) Stop() { + if miner == nil || miner.worker == nil { + return + } miner.worker.stop() atomic.StoreInt32(&miner.shouldStart, 0) }