Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
Restore graceful shutdown (erigontech#591)
Browse files Browse the repository at this point in the history
* initial

* gracefull shutdown for staged

* more wg fixes

* fmt

* linters

* remove generalization

* linters

* linters

* fix

* fix

* fmt

* quit into etl

* after CR

* after CR
  • Loading branch information
JekaMas authored May 30, 2020
1 parent a49fa49 commit 32ec443
Show file tree
Hide file tree
Showing 15 changed files with 239 additions and 126 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/**/*tx_database*
*/**/*dapps*
build/_vendor/pkg
/*.a

#*
.#*
Expand Down
29 changes: 29 additions & 0 deletions common/chan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package common

import "errors"

var ErrStopped = errors.New("stopped")

func Stopped(ch chan struct{}) error {
if ch == nil {
return nil
}
select {
case <-ch:
return ErrStopped
default:
}
return nil
}

func SafeClose(ch chan struct{}) {
if ch == nil {
return
}
select {
case <-ch:
// Channel was already closed
default:
close(ch)
}
}
26 changes: 21 additions & 5 deletions common/etl/etl.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
"os"
"runtime"

"github.com/ugorji/go/codec"

"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ugorji/go/codec"
)

var cbor codec.CborHandle
Expand All @@ -24,6 +25,7 @@ type Decoder interface {

type State interface {
Get([]byte) ([]byte, error)
Stopped() error
}

type ExtractNextFunc func(k []byte, v interface{}) error
Expand All @@ -40,9 +42,10 @@ func Transform(
startkey []byte,
extractFunc ExtractFunc,
loadFunc LoadFunc,
quit chan struct{},
) error {

filenames, err := extractBucketIntoFiles(db, fromBucket, startkey, datadir, extractFunc)
filenames, err := extractBucketIntoFiles(db, fromBucket, startkey, datadir, extractFunc, quit)

defer func() {
deleteFiles(filenames)
Expand All @@ -52,7 +55,7 @@ func Transform(
return err
}

return loadFilesIntoBucket(db, toBucket, filenames, loadFunc)
return loadFilesIntoBucket(db, toBucket, filenames, loadFunc, quit)
}

func extractBucketIntoFiles(
Expand All @@ -61,6 +64,7 @@ func extractBucketIntoFiles(
startkey []byte,
datadir string,
extractFunc ExtractFunc,
quit chan struct{},
) ([]string, error) {
buffer := bytes.NewBuffer(make([]byte, 0))
encoder := codec.NewEncoder(nil, &cbor)
Expand Down Expand Up @@ -98,6 +102,9 @@ func extractBucketIntoFiles(
}

err := db.Walk(bucket, startkey, len(startkey)*8, func(k, v []byte) (bool, error) {
if err := common.Stopped(quit); err != nil {
return false, err
}
err := extractFunc(k, v, extractNextFunc)
return true, err
})
Expand All @@ -112,7 +119,7 @@ func extractBucketIntoFiles(
return filenames, nil
}

func loadFilesIntoBucket(db ethdb.Database, bucket []byte, files []string, loadFunc LoadFunc) error {
func loadFilesIntoBucket(db ethdb.Database, bucket []byte, files []string, loadFunc LoadFunc, quit chan struct{}) error {
decoder := codec.NewDecoder(nil, &cbor)
var m runtime.MemStats
h := &Heap{}
Expand All @@ -135,7 +142,7 @@ func loadFilesIntoBucket(db ethdb.Database, bucket []byte, files []string, loadF
}
}
batch := db.NewBatch()
state := &bucketState{batch, bucket}
state := &bucketState{batch, bucket, quit}

loadNextFunc := func(k, v []byte) error {
if err := batch.Put(bucket, k, v); err != nil {
Expand All @@ -158,6 +165,10 @@ func loadFilesIntoBucket(db ethdb.Database, bucket []byte, files []string, loadF
}

for h.Len() > 0 {
if err := common.Stopped(quit); err != nil {
return err
}

element := (heap.Pop(h)).(HeapElem)
reader := readers[element.timeIdx]
decoder.ResetBytes(element.value)
Expand Down Expand Up @@ -269,8 +280,13 @@ func readElementFromDisk(decoder Decoder) ([]byte, []byte, error) {
type bucketState struct {
getter ethdb.Getter
bucket []byte
quit chan struct{}
}

func (s *bucketState) Get(key []byte) ([]byte, error) {
return s.getter.Get(s.bucket, key)
}

func (s *bucketState) Stopped() error {
return common.Stopped(s.quit)
}
4 changes: 3 additions & 1 deletion core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,9 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {
// Stop terminates the transaction pool.
func (pool *TxPool) Stop() {
// Unsubscribe all subscriptions registered from txpool
pool.scope.Close()
if pool == nil {
return
}

// Unsubscribe subscriptions registered from blockchain
pool.chainHeadSub.Unsubscribe()
Expand Down
29 changes: 8 additions & 21 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ func (d *Downloader) spawnSync(fetchers []func() error) error {
fn := fn
go func() { defer d.cancelWg.Done(); errc <- fn() }()
}

// Wait for the first error, then terminate the others.
var err error
for i := 0; i < len(fetchers); i++ {
Expand All @@ -576,8 +577,10 @@ func (d *Downloader) spawnSync(fetchers []func() error) error {
break
}
}

d.queue.Close()
d.Cancel()

return err
}

Expand All @@ -588,15 +591,7 @@ func (d *Downloader) cancel() {
// Close the current cancel channel
d.cancelLock.Lock()
defer d.cancelLock.Unlock()

if d.cancelCh != nil {
select {
case <-d.cancelCh:
// Channel was already closed
default:
close(d.cancelCh)
}
}
common.SafeClose(d.cancelCh)
}

// Cancel aborts all of the operations and waits for all download goroutines to
Expand All @@ -615,11 +610,7 @@ func (d *Downloader) Terminate() {
d.blockchain.Stop()
// Close the termination channel (make sure double close is allowed)
d.quitLock.Lock()
select {
case <-d.quitCh:
default:
close(d.quitCh)
}
common.SafeClose(d.quitCh)
d.quitLock.Unlock()

// Cancel any pending download requests
Expand Down Expand Up @@ -1487,10 +1478,8 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
gotHeaders = true
for len(headers) > 0 {
// Terminate if something failed in between processing chunks
select {
case <-d.cancelCh:
return errCanceled
default:
if err := common.Stopped(d.quitCh); err != nil {
return err
}
// Select the next chunk of headers to import
limit := maxHeadersProcess
Expand Down Expand Up @@ -1604,10 +1593,8 @@ func (d *Downloader) importBlockResults(results []*fetchResult, execute bool) (u
if len(results) == 0 {
return 0, nil
}
select {
case <-d.quitCh:
if err := common.Stopped(d.quitCh); err != nil {
return 0, errCancelContentProcessing
default:
}
// Retrieve the a batch of results to import
first, last := results[0].Header, results[len(results)-1].Header
Expand Down
110 changes: 60 additions & 50 deletions eth/downloader/stagedsync_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,83 +8,48 @@ import (
)

func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers []func() error) error {
log.Info("Sync stage 1/7. Downloading headers...")

var err error
defer log.Info("Staged sync finished")

/*
* Stage 1. Download Headers
*/
if err = d.spawnSync(headersFetchers); err != nil {
log.Info("Sync stage 1/7. Downloading headers...")
err = d.DownloadHeaders(headersFetchers)
if err != nil {
return err
}

log.Info("Sync stage 1/7. Downloading headers... Complete!")
log.Info("Checking for unwinding...")
// Check unwinds backwards and if they are outstanding, invoke corresponding functions
for stage := Finish - 1; stage > Headers; stage-- {
unwindPoint, err := GetStageUnwind(d.stateDB, stage)
if err != nil {
return err
}
if unwindPoint == 0 {
continue
}
switch stage {
case Bodies:
err = d.unwindBodyDownloadStage(unwindPoint)
case Senders:
err = d.unwindSendersStage(unwindPoint)
case Execution:
err = unwindExecutionStage(unwindPoint, d.stateDB)
case HashCheck:
err = unwindHashCheckStage(unwindPoint, d.stateDB)
case AccountHistoryIndex:
err = unwindAccountHistoryIndex(unwindPoint, d.stateDB, core.UsePlainStateExecution)
case StorageHistoryIndex:
err = unwindStorageHistoryIndex(unwindPoint, d.stateDB, core.UsePlainStateExecution)
default:
return fmt.Errorf("unrecognized stage for unwinding: %d", stage)
}
if err != nil {
return fmt.Errorf("error unwinding stage: %d: %v", stage, err)
}
}
log.Info("Checking for unwinding... Complete!")
log.Info("Sync stage 2/7. Downloading block bodies...")

/*
* Stage 2. Download Block bodies
*/
log.Info("Sync stage 2/7. Downloading block bodies...")
cont := true

for cont && err == nil {
cont, err = d.spawnBodyDownloadStage(p.id)
}
if err != nil {
return err
if err != nil {
return err
}
}

log.Info("Sync stage 2/7. Downloading block bodies... Complete!")

/*
* Stage 3. Recover senders from tx signatures
*/
log.Info("Sync stage 3/7. Recovering senders from tx signatures...")

err = d.spawnRecoverSendersStage()
if err != nil {
if err = d.spawnRecoverSendersStage(); err != nil {
return err
}

log.Info("Sync stage 3/7. Recovering senders from tx signatures... Complete!")
log.Info("Sync stage 4/7. Executing blocks w/o hash checks...")

/*
* Stage 4. Execute block bodies w/o calculating trie roots
*/

var syncHeadNumber uint64
syncHeadNumber, err = spawnExecuteBlocksStage(d.stateDB, d.blockchain)
log.Info("Sync stage 4/7. Executing blocks w/o hash checks...")
syncHeadNumber, err := spawnExecuteBlocksStage(d.stateDB, d.blockchain, d.quitCh)
if err != nil {
return err
}
Expand All @@ -93,15 +58,16 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers

// Further stages go there
log.Info("Sync stage 5/7. Validating final hash")
if err = spawnCheckFinalHashStage(d.stateDB, syncHeadNumber, d.datadir); err != nil {
err = spawnCheckFinalHashStage(d.stateDB, syncHeadNumber, d.datadir, d.quitCh)
if err != nil {
return err
}

log.Info("Sync stage 5/7. Validating final hash... Complete!")

if d.history {
log.Info("Sync stage 6/7. Generating account history index")
err = spawnAccountHistoryIndex(d.stateDB, d.datadir, core.UsePlainStateExecution)
err = spawnAccountHistoryIndex(d.stateDB, d.datadir, core.UsePlainStateExecution, d.quitCh)
if err != nil {
return err
}
Expand All @@ -112,7 +78,7 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers

if d.history {
log.Info("Sync stage 7/7. Generating storage history index")
err = spawnStorageHistoryIndex(d.stateDB, d.datadir, core.UsePlainStateExecution)
err = spawnStorageHistoryIndex(d.stateDB, d.datadir, core.UsePlainStateExecution, d.quitCh)
if err != nil {
return err
}
Expand All @@ -123,3 +89,47 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers

return err
}

func (d *Downloader) DownloadHeaders(headersFetchers []func() error) error {
err := d.spawnSync(headersFetchers)
if err != nil {
return err
}

log.Info("Sync stage 1/7. Downloading headers... Complete!")
log.Info("Checking for unwinding...")
// Check unwinds backwards and if they are outstanding, invoke corresponding functions
for stage := Finish - 1; stage > Headers; stage-- {
unwindPoint, err := GetStageUnwind(d.stateDB, stage)
if err != nil {
return err
}

if unwindPoint == 0 {
continue
}

switch stage {
case Bodies:
err = d.unwindBodyDownloadStage(unwindPoint)
case Senders:
err = d.unwindSendersStage(unwindPoint)
case Execution:
err = unwindExecutionStage(unwindPoint, d.stateDB)
case HashCheck:
err = unwindHashCheckStage(unwindPoint, d.stateDB)
case AccountHistoryIndex:
err = unwindAccountHistoryIndex(unwindPoint, d.stateDB, core.UsePlainStateExecution)
case StorageHistoryIndex:
err = unwindStorageHistoryIndex(unwindPoint, d.stateDB, core.UsePlainStateExecution)
default:
return fmt.Errorf("unrecognized stage for unwinding: %d", stage)
}

if err != nil {
return fmt.Errorf("error unwinding stage: %d: %w", stage, err)
}
}
log.Info("Checking for unwinding... Complete!")
return nil
}
Loading

0 comments on commit 32ec443

Please sign in to comment.