From 7aa19ac4f64a811bb2d54c0068c4e4eede1dfbac Mon Sep 17 00:00:00 2001 From: Conflux Date: Fri, 22 Mar 2024 15:57:43 +0800 Subject: [PATCH 1/2] Abandon pubsub usage for chain sync also remove kv sync and update README --- README.md | 5 +- cmd/sync.go | 64 +---- cmd/util/data_context.go | 6 +- docker-compose.yml | 2 +- sync/epoch_sub.go | 210 ----------------- sync/sync_db.go | 199 ++-------------- sync/sync_kv.go | 493 --------------------------------------- 7 files changed, 24 insertions(+), 955 deletions(-) delete mode 100644 sync/epoch_sub.go delete mode 100644 sync/sync_kv.go diff --git a/README.md b/README.md index 320b9d7f..a744b256 100644 --- a/README.md +++ b/README.md @@ -105,7 +105,7 @@ Confura is comprised of serveral components as below: ### Blockchain Sync -You can use the `sync` subcommand to start sync service, including DB/KV/ETH sync as well as fast catchup. +You can use the `sync` subcommand to start sync service, including DB/ETH sync as well as fast catchup. > Usage: > confura sync [flags] @@ -114,7 +114,6 @@ You can use the `sync` subcommand to start sync service, including DB/KV/ETH syn > > --db start core space DB sync server > --eth start ETH sync server -> --kv start core space KV sync server > --catchup start core space fast catchup server > --adaptive automatically adjust target epoch number to the latest stable epoch > --benchmark benchmarking the performance during fast catch-up sync (default true) @@ -246,7 +245,7 @@ confura-ethsync ./confura sync --eth Up confura-node-management ./confura nm --cfx Up 0.0.0.0:22530->22530/tcp,:::22530->22530/tcp confura-redis docker-entrypoint.sh redis ... Up 0.0.0.0:53779->6379/tcp confura-rpc ./confura rpc --cfx Up 0.0.0.0:22537->22537/tcp,:::22537->22537/tcp -confura-sync ./confura sync --db --kv Up +confura-sync ./confura sync --db Up confura-virtual-filter ./confura vf Up 0.0.0.0:48545->48545/tcp,:::48545->48545/tcp ``` diff --git a/cmd/sync.go b/cmd/sync.go index 1a8776ea..2d448f36 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -5,7 +5,6 @@ import ( "sync" "github.com/Conflux-Chain/confura/cmd/util" - "github.com/Conflux-Chain/confura/store" cisync "github.com/Conflux-Chain/confura/sync" "github.com/Conflux-Chain/confura/sync/catchup" "github.com/sirupsen/logrus" @@ -16,7 +15,6 @@ var ( // sync boot options syncOpt struct { dbSyncEnabled bool - kvSyncEnabled bool ethSyncEnabled bool catchupEnabled bool } @@ -30,7 +28,7 @@ var ( syncCmd = &cobra.Command{ Use: "sync", - Short: "Start sync service, including DB/KV/ETH sync, as well as fast catchup", + Short: "Start sync service, including DB/ETH sync, as well as fast catchup", Run: startSyncService, } ) @@ -41,11 +39,6 @@ func init() { &syncOpt.dbSyncEnabled, "db", false, "start core space DB sync server", ) - // boot flag for core space DB sync - syncCmd.Flags().BoolVar( - &syncOpt.kvSyncEnabled, "kv", false, "start core space KV sync server", - ) - // boot flag for evm space sync syncCmd.Flags().BoolVar( &syncOpt.ethSyncEnabled, "eth", false, "start ETH sync server", @@ -78,7 +71,7 @@ func init() { } func startSyncService(*cobra.Command, []string) { - if !syncOpt.dbSyncEnabled && !syncOpt.kvSyncEnabled && + if !syncOpt.dbSyncEnabled && !syncOpt.ethSyncEnabled && !syncOpt.catchupEnabled { logrus.Fatal("No Sync server specified") } @@ -92,22 +85,8 @@ func startSyncService(*cobra.Command, []string) { syncCtx := util.MustInitSyncContext(storeCtx) defer syncCtx.Close() - var subs []cisync.EpochSubscriber - if syncOpt.dbSyncEnabled { // start DB sync - syncer := startSyncCfxDatabase(ctx, &wg, syncCtx) - subs = append(subs, syncer) - } - - if syncOpt.kvSyncEnabled { // start KV sync - if syncer := startSyncCfxCache(ctx, &wg, syncCtx); syncer != nil { - subs = append(subs, syncer) - } - } - - if len(subs) > 0 { // monitor pivot chain switch via pub/sub - logrus.Info("Start to pub/sub epoch to monitor pivot chain switch") - go cisync.MustSubEpoch(ctx, &wg, syncCtx.SubCfx, subs...) + startSyncCfxDatabase(ctx, &wg, syncCtx) } if syncOpt.catchupEnabled { // start fast catchup @@ -127,22 +106,8 @@ func startSyncServiceAdaptively(ctx context.Context, wg *sync.WaitGroup, syncCtx logrus.Fatal("No data sync configured") } - var subs []cisync.EpochSubscriber - if syncCtx.CfxDB != nil { // start DB sync - syncer := startSyncCfxDatabase(ctx, wg, syncCtx) - subs = append(subs, syncer) - } - - if syncCtx.CfxCache != nil { // start KV sync - if syncer := startSyncCfxCache(ctx, wg, syncCtx); syncer != nil { - subs = append(subs, syncer) - } - } - - if len(subs) > 0 { // monitor pivot chain switch via pub/sub - logrus.Info("Start to pub/sub epoch to monitor pivot chain switch") - go cisync.MustSubEpoch(ctx, wg, syncCtx.SubCfx, subs...) + startSyncCfxDatabase(ctx, wg, syncCtx) } if syncCtx.EthDB != nil { // start ETH sync @@ -162,27 +127,6 @@ func startSyncCfxDatabase(ctx context.Context, wg *sync.WaitGroup, syncCtx util. return syncer } -func startSyncCfxCache(ctx context.Context, wg *sync.WaitGroup, syncCtx util.SyncContext) *cisync.KVCacheSyncer { - if store.StoreConfig().IsChainBlockDisabled() && - store.StoreConfig().IsChainTxnDisabled() && - store.StoreConfig().IsChainReceiptDisabled() { - // KV sync only syncs block, transaction and receipt data. If all of them are disabled, - // nothing needs to sync, just stop right here. - return nil - } - - logrus.Info("Start to sync core space blockchain data into cache") - - csyncer := cisync.MustNewKVCacheSyncer(syncCtx.SyncCfx, syncCtx.CfxCache) - go csyncer.Sync(ctx, wg) - - // start core space cache prune - cpruner := cisync.MustNewKVCachePruner(syncCtx.CfxCache) - go cpruner.Prune(ctx, wg) - - return csyncer -} - func startSyncEthDatabase(ctx context.Context, wg *sync.WaitGroup, syncCtx util.SyncContext) { logrus.Info("Start to sync evm space blockchain data into database") diff --git a/cmd/util/data_context.go b/cmd/util/data_context.go index 53b54145..27b0e95c 100644 --- a/cmd/util/data_context.go +++ b/cmd/util/data_context.go @@ -97,6 +97,7 @@ func MustInitSyncContext(storeCtx StoreContext) SyncContext { func (ctx *SyncContext) Close() { // Usually, storeContext will be defer closed by itself // ctx.storeContext.Close() + if ctx.SyncCfx != nil { ctx.SyncCfx.Close() } @@ -105,6 +106,7 @@ func (ctx *SyncContext) Close() { ctx.SubCfx.Close() } - // not provided yet! - // ctx.syncEth.Close() + if ctx.SyncEth != nil { + ctx.SyncEth.Close() + } } diff --git a/docker-compose.yml b/docker-compose.yml index c05ad5f6..8b881620 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -26,7 +26,7 @@ services: # blockchain sync chain-sync: image: conflux/confura:latest - command: sync --db --kv + command: sync --db restart: unless-stopped depends_on: - db diff --git a/sync/epoch_sub.go b/sync/epoch_sub.go deleted file mode 100644 index e543512a..00000000 --- a/sync/epoch_sub.go +++ /dev/null @@ -1,210 +0,0 @@ -package sync - -import ( - "context" - "fmt" - "math/big" - "sync" - "time" - - sdk "github.com/Conflux-Chain/go-conflux-sdk" - "github.com/Conflux-Chain/go-conflux-sdk/types" - "github.com/ethereum/go-ethereum/common" - "github.com/openweb3/go-rpc-provider" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "github.com/spf13/viper" -) - -const ( - // Sleep for a while after resub error - resubWaitDuration time.Duration = 5 * time.Second -) - -var ( - errSubAlreadyStopped = errors.New("epoch subscription already stopped") -) - -// EpochSubscriber is an interface to consume subscribed epochs. -type EpochSubscriber interface { - // any object implemented this method should handle in async mode - onEpochReceived(epoch types.WebsocketEpochResponse) - onEpochSubStart() -} - -type epochSubChan chan types.WebsocketEpochResponse - -// Epoch subscription manager -type epochSubMan struct { - subCh epochSubChan // channel to receive epoch - cfxSub *rpc.ClientSubscription // conflux subscription stub - cfxSdk *sdk.Client // conflux sdk - subscribers []EpochSubscriber // subscription observers - stopped bool // subscription stopped or not -} - -// Create epoch subscription manager -func newEpochSubMan(cfx *sdk.Client, subscribers ...EpochSubscriber) *epochSubMan { - bufferSize := viper.GetInt("sync.sub.buffer") - - return &epochSubMan{ - subCh: make(epochSubChan, bufferSize), - cfxSdk: cfx, - subscribers: subscribers, - stopped: false, - } -} - -// Start subscribing -func (subMan *epochSubMan) doSub() error { - if subMan.stopped { - return errSubAlreadyStopped - } - - sub, err := subMan.cfxSdk.SubscribeEpochs(subMan.subCh, *types.EpochLatestState) - subMan.cfxSub = sub - - return err -} - -// Run subscribing to handle channel signals in block mode -func (subMan *epochSubMan) runSub(ctx context.Context) error { - if subMan.stopped { - return errSubAlreadyStopped - } - - logrus.Debug("Epoch subscription starting to handle channel signals") - - // Notify all subscribers epoch sub started - for _, s := range subMan.subscribers { - s.onEpochSubStart() - } - - for { // Start handling epoch subscription - select { - case <-ctx.Done(): - subMan.stopped = true - return nil - case err := <-subMan.cfxSub.Err(): - logrus.WithError(err).Info("Epoch subscription error") - return err - case epoch := <-subMan.subCh: - for _, s := range subMan.subscribers { - s.onEpochReceived(epoch) - } - } - } -} - -// Retry subscribing -func (subMan *epochSubMan) reSub() error { - if subMan.stopped { - return errSubAlreadyStopped - } - - logrus.Debug("Epoch subscription restarting") - - subMan.close() - return subMan.doSub() -} - -// Close to reclaim resource -func (subMan *epochSubMan) close() { - logrus.Debug("Epoch subscription closing") - - if subMan.cfxSub != nil { // unsubscribe old epoch sub - subMan.cfxSub.Unsubscribe() - } - - for len(subMan.subCh) > 0 { // empty channel - <-subMan.subCh - } -} - -// MustSubEpoch subscribes the latest mined epoch. -// Note, it will block the current thread. -func MustSubEpoch(ctx context.Context, wg *sync.WaitGroup, cfx *sdk.Client, subscribers ...EpochSubscriber) { - subMan := newEpochSubMan(cfx, subscribers...) - if err := subMan.doSub(); err != nil { - logrus.WithError(err).Fatal("Failed to subscribe epoch") - } - - wg.Add(1) - defer func() { - subMan.close() - wg.Done() - - logrus.Info("Epoch subscription shutdown ok") - }() - - for { - // blocks until sub error or stopped - if err := subMan.runSub(ctx); err == nil { - return - } - - // resub until suceess or stopped - for failures, err := 0, subMan.reSub(); err != nil; { - logrus.WithError(err).Debug("Failed to resub epoch") - - failures++ - if failures%3 == 0 { // trigger error for every 3 failures - logrus.WithField("failures", failures).WithError(err).Error("Failed to try too many epoch resubs") - } - - tc := time.After(resubWaitDuration) - select { - case <-ctx.Done(): - return - case <-tc: - err = subMan.reSub() - } - } - - logrus.Info("Epoch resub ok!") - } -} - -type consoleEpochSubscriber struct { - cfx sdk.ClientOperator - lastEpoch *big.Int -} - -// NewConsoleEpochSubscriber creates an instance of EpochSubscriber to consume epoch. -func NewConsoleEpochSubscriber(cfx sdk.ClientOperator) EpochSubscriber { - return &consoleEpochSubscriber{cfx, nil} -} - -func (sub *consoleEpochSubscriber) onEpochReceived(epoch types.WebsocketEpochResponse) { - latestMined, err := sub.cfx.GetEpochNumber(types.EpochLatestMined) - if err != nil { - fmt.Println("[ERROR] failed to get epoch number:", err.Error()) - latestMined = epoch.EpochNumber - } - - newEpoch := epoch.EpochNumber.ToInt() - - fmt.Printf("[LATEST_MINED] %v", newEpoch) - if latestMined.ToInt().Cmp(newEpoch) != 0 { - fmt.Printf(" (gap %v)", subBig(newEpoch, latestMined.ToInt())) - } - - if sub.lastEpoch != nil { - if sub.lastEpoch.Cmp(newEpoch) >= 0 { - fmt.Printf(" (reverted %v)", subBig(newEpoch, sub.lastEpoch)) - } else if delta := subBig(newEpoch, sub.lastEpoch); delta.Cmp(common.Big1) > 0 { - panic(fmt.Sprintf("some epoch missed in subscription, last = %v, new = %v", sub.lastEpoch, newEpoch)) - } - } - - fmt.Println() - - sub.lastEpoch = newEpoch -} - -func (sub *consoleEpochSubscriber) onEpochSubStart() { - // Nothing to do for the moment (no concern) -} - -// func addBig(x, y *big.Int) *big.Int { return new(big.Int).Add(x, y) } -func subBig(x, y *big.Int) *big.Int { return new(big.Int).Sub(x, y) } diff --git a/sync/sync_db.go b/sync/sync_db.go index 5271b4a4..77bba53f 100644 --- a/sync/sync_db.go +++ b/sync/sync_db.go @@ -3,7 +3,6 @@ package sync import ( "context" "sync" - "sync/atomic" "time" "github.com/Conflux-Chain/confura/store" @@ -25,11 +24,6 @@ const ( syncPivotInfoWinCapacity = 50 ) -type pivotSwitchEvent struct { - revertFrom uint64 - revertTo uint64 -} - // db sync configuration type syncConfig struct { FromEpoch uint64 `default:"0"` @@ -58,16 +52,8 @@ type DatabaseSyncer struct { syncIntervalNormal time.Duration // interval to sync data in catching up mode syncIntervalCatchUp time.Duration - // last received epoch number from pubsub for pivot chain switch detection - lastSubEpochNo uint64 - // channel to receive pivot chain switch events - pivotSwitchEventCh chan *pivotSwitchEvent - // checkpoint channel received to check sync data - checkPointCh chan bool // window to cache epoch pivot info epochPivotWin *epochPivotWindow - // sync is ready only after fast catch-up is completed - catchupCompleted uint32 // error tolerant logger etLogger *logutil.ErrorTolerantLogger } @@ -85,9 +71,6 @@ func MustNewDatabaseSyncer(cfx sdk.ClientOperator, db *mysql.MysqlStore) *Databa maxSyncEpochs: conf.MaxEpochs, syncIntervalNormal: time.Second, syncIntervalCatchUp: time.Millisecond, - lastSubEpochNo: citypes.EpochNumberNil, - pivotSwitchEventCh: make(chan *pivotSwitchEvent, conf.Sub.Buffer), - checkPointCh: make(chan bool, 2), epochPivotWin: newEpochPivotWindow(syncPivotInfoWinCapacity), etLogger: logutil.NewErrorTolerantLogger(logutil.DefaultETConfig), } @@ -110,44 +93,22 @@ func (syncer *DatabaseSyncer) Sync(ctx context.Context, wg *sync.WaitGroup) { wg.Add(1) defer wg.Done() - checkpoint := func() { - if err := syncer.doCheckPoint(); err != nil { - logrus.WithError(err).Error("Db syncer failed to do checkpoint") - syncer.triggerCheckpoint() // re-trigger checkpoint - } - } - - breakLoop := false - quit := func() { - breakLoop = true - logrus.Info("DB syncer shutdown ok") - } - syncer.fastCatchup(ctx) - atomic.StoreUint32(&syncer.catchupCompleted, 1) ticker := time.NewTicker(syncer.syncIntervalCatchUp) defer ticker.Stop() - for !breakLoop { - select { // first class priority + for { + select { case <-ctx.Done(): - quit() - case <-syncer.checkPointCh: - checkpoint() - default: - select { // second class priority - case <-ctx.Done(): - quit() - case <-syncer.checkPointCh: - checkpoint() - case <-ticker.C: - err := syncer.doTicker(ticker) - syncer.etLogger.Log( - logrus.WithField("epochFrom", syncer.epochFrom), - err, "Db syncer failed to sync epoch data", - ) - } + logrus.Info("DB syncer shutdown ok") + return + case <-ticker.C: + err := syncer.doTicker(ticker) + syncer.etLogger.Log( + logrus.WithField("epochFrom", syncer.epochFrom), + err, "Db syncer failed to sync epoch data", + ) } } } @@ -194,11 +155,6 @@ func (syncer *DatabaseSyncer) loadLastSyncEpoch() (loaded bool, err error) { // Sync data once and return true if catch up to the latest confirmed epoch, otherwise false. func (syncer *DatabaseSyncer) syncOnce() (bool, error) { - // Drain pivot switch reorg event channel to handle pivot chain reorg - if err := syncer.drainPivotReorgEvents(); err != nil { - return false, err - } - // Fetch latest confirmed epoch from blockchain epoch, err := syncer.cfx.GetEpochNumber(types.EpochLatestConfirmed) if err != nil { @@ -209,8 +165,9 @@ func (syncer *DatabaseSyncer) syncOnce() (bool, error) { maxEpochTo := epoch.ToInt().Uint64() if syncer.epochFrom > maxEpochTo { // cached up to the latest confirmed epoch? - logrus.WithFields(logrus.Fields{ - "epochRange": citypes.RangeUint64{From: syncer.epochFrom, To: maxEpochTo}, + logrus.WithField("epochRange", citypes.RangeUint64{ + From: syncer.epochFrom, + To: maxEpochTo, }).Debug("Db syncer skipped due to already catch-up") return true, nil } @@ -321,35 +278,6 @@ func (syncer *DatabaseSyncer) syncOnce() (bool, error) { return false, nil } -func (syncer *DatabaseSyncer) doCheckPoint() error { - logger := logrus.WithFields(logrus.Fields{ - "epochFrom": syncer.epochFrom, - "lastSubEpochNo": atomic.LoadUint64(&syncer.lastSubEpochNo), - }) - - logger.Info("Db syncer ensuring epoch data validity on pubsub checkpoint") - - if err := ensureStoreEpochDataOk(syncer.cfx, syncer.db); err != nil { - logger.WithError(err).Info( - "Db syncer failed to ensure epoch data validity on checkpoint", - ) - - return errors.WithMessage(err, "failed to ensure data validity") - } - - if _, err := syncer.loadLastSyncEpoch(); err != nil { - logger.WithError(err).Info( - "Db syncer failed to reload last sync point on checkpoint", - ) - - return errors.WithMessage(err, "failed to reload last sync point") - } - - syncer.epochPivotWin.popn(syncer.epochFrom) - - return nil -} - func (syncer *DatabaseSyncer) doTicker(ticker *time.Ticker) error { logrus.Debug("DB sync ticking") @@ -369,37 +297,6 @@ func (syncer *DatabaseSyncer) doTicker(ticker *time.Ticker) error { return nil } -// implement the EpochSubscriber interface. -func (syncer *DatabaseSyncer) onEpochReceived(epoch types.WebsocketEpochResponse) { - if atomic.LoadUint32(&syncer.catchupCompleted) != 1 { // not ready for sync yet - return - } - - epochNo := epoch.EpochNumber.ToInt().Uint64() - - logger := logrus.WithField("epoch", epochNo) - logger.Debug("Db syncer onEpochReceived new epoch received") - - if err := syncer.detectPivotSwitchFromPubsub(&epoch); err != nil { - logger.WithError(err).Error( - "Db syncer failed to detect pivot chain switch from pubsub", - ) - } -} - -func (syncer *DatabaseSyncer) onEpochSubStart() { - if atomic.LoadUint32(&syncer.catchupCompleted) != 1 { - // not ready for sync yet - return - } - - logrus.Debug("DB sync onEpochSubStart event received") - - // reset lastSubEpochNo - atomic.StoreUint64(&(syncer.lastSubEpochNo), citypes.EpochNumberNil) - syncer.triggerCheckpoint() -} - func (syncer *DatabaseSyncer) nextEpochTo(maxEpochTo uint64) (uint64, uint64) { epochTo := util.MinUint64(syncer.epochFrom+syncer.maxSyncEpochs-1, maxEpochTo) @@ -411,30 +308,6 @@ func (syncer *DatabaseSyncer) nextEpochTo(maxEpochTo uint64) (uint64, uint64) { return epochTo, syncSize } -func (syncer *DatabaseSyncer) drainPivotReorgEvents() error { - for { - select { - case psevent := <-syncer.pivotSwitchEventCh: - if psevent.revertTo >= syncer.epochFrom { - break - } - - logrus.WithFields(logrus.Fields{ - "revertFromEpoch": psevent.revertFrom, - "revertToEpoch": psevent.revertTo, - "syncFromEpoch": syncer.epochFrom, - }).Warn("Db syncer detected pivot chain reorg for the latest confirmed epoch from pubsub") - - // pivot switch reorg for the latest confirmed epoch - if err := syncer.pivotSwitchRevert(psevent.revertTo); err != nil { - return errors.WithMessage(err, "failed to revert epoch(s) from pivot switch reorg channel") - } - default: - return nil - } - } -} - func (syncer *DatabaseSyncer) pivotSwitchRevert(revertTo uint64) error { if revertTo == 0 { return errors.New("genesis epoch must not be reverted") @@ -471,52 +344,6 @@ func (syncer *DatabaseSyncer) pivotSwitchRevert(revertTo uint64) error { return nil } -func (syncer *DatabaseSyncer) triggerCheckpoint() { - if len(syncer.checkPointCh) == 0 { - syncer.checkPointCh <- true - } -} - -// Detect pivot chain switch by new received epoch from pubsub. Besides, it also validates if -// the new received epoch is continuous to the last received subscription epoch number. -func (syncer *DatabaseSyncer) detectPivotSwitchFromPubsub(epoch *types.WebsocketEpochResponse) error { - newEpoch := epoch.EpochNumber.ToInt().Uint64() - - addrPtr := &(syncer.lastSubEpochNo) - lastSubEpochNo := atomic.LoadUint64(addrPtr) - - var pivotHash types.Hash - if len(epoch.EpochHashesOrdered) > 0 { - pivotHash = epoch.EpochHashesOrdered[len(epoch.EpochHashesOrdered)-1] - } - - logger := logrus.WithFields(logrus.Fields{ - "newEpoch": newEpoch, "lastSubEpochNo": lastSubEpochNo, "pivotHash": pivotHash, - }) - - switch { - case lastSubEpochNo == citypes.EpochNumberNil: // initial state - logger.Debug("Db syncer initially set last sub epoch number for pivot switch detection") - - atomic.StoreUint64(addrPtr, newEpoch) - case lastSubEpochNo >= newEpoch: // pivot switch - logger.Info("Db syncer detected pubsub new epoch pivot switched") - - atomic.StoreUint64(addrPtr, newEpoch) - syncer.pivotSwitchEventCh <- &pivotSwitchEvent{ - revertFrom: lastSubEpochNo, revertTo: newEpoch, - } - case lastSubEpochNo+1 == newEpoch: // continuous - logger.Debug("Db syncer validated continuous new epoch from pubsub") - - atomic.StoreUint64(addrPtr, newEpoch) - default: // bad incontinuous epoch - return errors.Errorf("bad incontinuous epoch, expect %v got %v", lastSubEpochNo+1, newEpoch) - } - - return nil -} - func (syncer *DatabaseSyncer) getStoreLatestPivotHash() (types.Hash, error) { if syncer.epochFrom == 0 { // no epoch synchronized yet return "", nil diff --git a/sync/sync_kv.go b/sync/sync_kv.go deleted file mode 100644 index d79c86f4..00000000 --- a/sync/sync_kv.go +++ /dev/null @@ -1,493 +0,0 @@ -package sync - -import ( - "context" - "sync" - "sync/atomic" - "time" - - "github.com/Conflux-Chain/confura/store" - citypes "github.com/Conflux-Chain/confura/types" - "github.com/Conflux-Chain/confura/util/metrics" - sdk "github.com/Conflux-Chain/go-conflux-sdk" - "github.com/Conflux-Chain/go-conflux-sdk/types" - logutil "github.com/Conflux-Chain/go-conflux-util/log" - viperutil "github.com/Conflux-Chain/go-conflux-util/viper" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" -) - -const ( - // The threshold gap between the latest epoch and some epoch before - // which the epochs are regarded as decayed. - decayedEpochGapThreshold = 20000 -) - -// KVCacheSyncer is used to sync core space blockchain data into kv cache against -// the latest state epoch. -type KVCacheSyncer struct { - conf *syncConfig - // conflux sdk client - cfx sdk.ClientOperator - // redis store - cache store.CacheStore - // interval to sync epoch data in normal status - syncIntervalNormal time.Duration - // interval to sync epoch data in catching up mode - syncIntervalCatchUp time.Duration - // maximum number of epochs to sync once - maxSyncEpochs uint64 - // epoch sync window on which the sync polling depends - syncWindow *epochWindow - // last received epoch number from subscription, which is used for - // pubsub validation - lastSubEpochNo uint64 - // receive the epoch from pub/sub to detect pivot chain switch or - // to update epoch sync window - subEpochCh chan uint64 - // checkpoint channel received to check epoch data - checkPointCh chan bool - // timer channel received to trigger sync task - syncTimerCh <-chan time.Time - // window to cache epoch pivot info - epochPivotWin *epochPivotWindow - // error tolerant logger - etLogger *logutil.ErrorTolerantLogger -} - -// MustNewKVCacheSyncer creates an instance of KVCacheSyncer to sync -// the latest state epoch data. -func MustNewKVCacheSyncer(cfx sdk.ClientOperator, cache store.CacheStore) *KVCacheSyncer { - var conf syncConfig - viperutil.MustUnmarshalKey("sync", &conf) - - syncer := &KVCacheSyncer{ - conf: &conf, - cfx: cfx, - cache: cache, - syncIntervalNormal: time.Second, - syncIntervalCatchUp: time.Millisecond, - maxSyncEpochs: conf.MaxEpochs, - syncWindow: newEpochWindow(decayedEpochGapThreshold), - lastSubEpochNo: citypes.EpochNumberNil, - subEpochCh: make(chan uint64, conf.Sub.Buffer), - checkPointCh: make(chan bool, 2), - epochPivotWin: newEpochPivotWindow(syncPivotInfoWinCapacity), - etLogger: logutil.NewErrorTolerantLogger(logutil.DefaultETConfig), - } - - // Ensure epoch data validity in cache - if err := ensureStoreEpochDataOk(cfx, cache); err != nil { - logrus.WithError(err).Fatal( - "KV syncer failed to ensure epoch data validity in redis", - ) - } - - // Load last sync epoch information - if _, err := syncer.loadLastSyncEpoch(); err != nil { - logrus.WithError(err).Fatal( - "Failed to load last sync epoch range from cache", - ) - } - - return syncer -} - -// Sync starts to sync epoch data from blockchain to cache. -func (syncer *KVCacheSyncer) Sync(ctx context.Context, wg *sync.WaitGroup) { - wg.Add(1) - defer wg.Done() - - logger := logrus.WithField("syncWindow", syncer.syncWindow) - logger.Info("Cache syncer starting to sync epoch data...") - - ticker := time.NewTicker(syncer.syncIntervalNormal) - defer ticker.Stop() - - checkpoint := func() { - if err := syncer.doCheckPoint(); err != nil { - logger.WithError(err).Error("Cache syncer failed to do checkpoint") - - syncer.triggerCheckpoint() // re-trigger checkpoint - } - } - - breakLoop := false - quit := func() { - breakLoop = true - logrus.Info("Cache syncer shutdown ok") - } - - for !breakLoop { - select { // first class priority - case <-ctx.Done(): - quit() - case <-syncer.checkPointCh: - checkpoint() - default: - select { // second class priority - case <-ctx.Done(): - quit() - case <-syncer.checkPointCh: - checkpoint() - case newEpoch := <-syncer.subEpochCh: - if err := syncer.handleNewEpoch(newEpoch, ticker); err != nil { - syncer.syncTimerCh = nil - logger.WithField("newEpoch", newEpoch).WithError(err).Error( - "Cache syncer failed to handle new received epoch", - ) - } - case <-syncer.syncTimerCh: - start := time.Now() - err := syncer.syncOnce() - metrics.Registry.Sync.SyncOnceQps("cfx", "cache", err).UpdateSince(start) - - syncer.etLogger.Log(logger, err, "Cache syncer failed to sync epoch data") - - if syncer.syncWindow.isEmpty() { - syncer.syncTimerCh = nil - } - } - } - } -} - -// doCheckPoint pubsub checkpoint to validate epoch data in cache -func (syncer *KVCacheSyncer) doCheckPoint() error { - logger := logrus.WithFields(logrus.Fields{ - "syncWindow": syncer.syncWindow, - "lastSubEpochNo": atomic.LoadUint64(&syncer.lastSubEpochNo), - }) - - logger.Info("Cache syncer ensuring epoch data validity on pubsub checkpoint") - - if err := ensureStoreEpochDataOk(syncer.cfx, syncer.cache); err != nil { - logger.WithError(err).Info( - "Cache syncer failed to ensure epoch data validity on checkpoint", - ) - - return errors.WithMessage(err, "failed to ensure data validity") - } - - if _, err := syncer.loadLastSyncEpoch(); err != nil { - logger.WithError(err).Info( - "Cache syncer failed to reload last sync point on checkpoint", - ) - - return errors.WithMessage(err, "failed to reload last sync point") - } - - syncer.epochPivotWin.popn(syncer.syncWindow.epochFrom) - - return nil -} - -// Revert the epoch data in cache store until to some epoch -func (syncer *KVCacheSyncer) pivotSwitchRevert(revertTo uint64) error { - if revertTo == 0 { - return errors.New("genesis epoch must not be reverted") - } - - logger := logrus.WithFields(logrus.Fields{ - "revertTo": revertTo, - "syncWindow": syncer.syncWindow, - }) - - logger.Info("Cache syncer reverting epoch data due to pivot chain switch") - - // remove epoch data from database due to pivot switch - if err := syncer.cache.Popn(revertTo); err != nil { - logger.WithError(err).Info( - "Cache syncer failed to pop epoch data from redis due to pivot switch", - ) - - return errors.WithMessage(err, "failed to pop epoch data from redis") - } - - // remove pivot data of reverted epoch from cache window - syncer.epochPivotWin.popn(revertTo) - // reset sync window to start from the revert point again - syncer.syncWindow.reset(revertTo, revertTo) - - return nil -} - -// Handle new epoch received to detect pivot switch or update epoch sync window -func (syncer *KVCacheSyncer) handleNewEpoch(newEpoch uint64, syncTicker *time.Ticker) error { - logger := logrus.WithFields(logrus.Fields{ - "newEpoch": newEpoch, - "beforeSyncWindow": *(syncer.syncWindow), - }) - - if syncer.syncWindow.peekWillOverflow(newEpoch) { // peek overflow - logger.Info("Cache syncer sync window overflow detected") - - if err := syncer.cache.Flush(); err != nil { - return errors.WithMessage( - err, "failed to flush decayed data in cache due to window overflow", - ) - } - - syncer.syncWindow.reset(newEpoch, newEpoch) - - } else if syncer.syncWindow.peekWillPivotSwitch(newEpoch) { // peek pivot switch - logger.Info("Cache syncer pivot switch detected") - - if err := syncer.pivotSwitchRevert(newEpoch); err != nil { - return errors.WithMessage( - err, "failed to remove epoch data in cache due to pivot switch", - ) - } - } else { // expand the sync window to the new epoch received - syncer.syncWindow.updateTo(newEpoch) - } - - // dynamically adjust the sync frequency - syncWinSize := uint64(syncer.syncWindow.size()) - switch { - case syncWinSize == 0: - syncer.syncTimerCh = nil - case syncWinSize > syncer.maxSyncEpochs: - syncTicker.Reset(syncer.syncIntervalCatchUp) - syncer.syncTimerCh = syncTicker.C - default: - syncTicker.Reset(syncer.syncIntervalNormal) - syncer.syncTimerCh = syncTicker.C - } - - return nil -} - -func (syncer *KVCacheSyncer) syncOnce() error { - logger := logrus.WithField("syncWindow", syncer.syncWindow) - - if syncer.syncWindow.isEmpty() { - logger.Debug("Cache syncer syncOnce skipped with epoch sync window empty") - return nil - } - - syncFrom, syncSize := syncer.syncWindow.peekShrinkFrom(uint32(syncer.maxSyncEpochs)) - - logger = logger.WithFields(logrus.Fields{ - "syncFrom": syncFrom, - "syncSize": syncSize, - }) - logger.Debug("Cache syncer starting to sync epoch(s)...") - - epochDataSlice := make([]*store.EpochData, 0, syncSize) - for i := uint32(0); i < syncSize; i++ { - epochNo := syncFrom + uint64(i) - eplogger := logger.WithField("epoch", epochNo) - - data, err := store.QueryEpochData(syncer.cfx, epochNo, syncer.conf.UseBatch) - - // If epoch pivot switched, stop the querying right now since it's pointless to query epoch data - // that will be reverted late. - if errors.Is(err, store.ErrEpochPivotSwitched) { - eplogger.WithError(err).Info("Cache syncer failed to query epoch data due to pivot switch") - break - } - - if err != nil { - return errors.WithMessagef(err, "failed to query epoch data for epoch %v", epochNo) - } - - if i == 0 { // the first epoch must be continuous to the latest epoch in cache store - latestPivotHash, err := syncer.getStoreLatestPivotHash() - if err != nil { - eplogger.WithError(err).Error( - "Cache syncer failed to get latest pivot hash from cache for parent hash check", - ) - return errors.WithMessage(err, "failed to get latest pivot hash") - } - - if len(latestPivotHash) > 0 && data.GetPivotBlock().ParentHash != latestPivotHash { - latestStoreEpoch := syncer.latestStoreEpoch() - - eplogger.WithFields(logrus.Fields{ - "latestStoreEpoch": latestStoreEpoch, - "latestPivotHash": latestPivotHash, - }).Info("Cache syncer popping latest epoch from cache store due to parent hash mismatched") - - if err := syncer.pivotSwitchRevert(latestStoreEpoch); err != nil { - eplogger.WithError(err).Error( - "Cache syncer failed to pop latest epoch from cache store due to parent hash mismatched", - ) - - return errors.WithMessage( - err, "failed to pop latest epoch from cache store due to parent hash mismatched", - ) - } - - return nil - } - } else { // otherwise non-first epoch must also be continuous to previous one - continuous, desc := data.IsContinuousTo(epochDataSlice[i-1]) - if !continuous { - // truncate the batch synced epoch data until the previous epoch - epochDataSlice = epochDataSlice[:i-1] - - eplogger.WithField("i", i).Infof( - "Cache syncer truncated batch synced data due to epoch not continuous for %v", desc, - ) - break - } - } - - epochDataSlice = append(epochDataSlice, &data) - - eplogger.Debug("Cache syncer succeeded to query epoch data") - } - - metrics.Registry.Sync.SyncOnceSize("cfx", "cache").Update(int64(len(epochDataSlice))) - - if len(epochDataSlice) == 0 { // empty epoch data query - logger.Debug("Cache syncer skipped due to empty sync range") - return nil - } - - if err := syncer.cache.Pushn(epochDataSlice); err != nil { - logger.WithError(err).Error("Cache syncer failed to push epoch data to cache store") - return errors.WithMessage(err, "failed to push epoch data to cache store") - } - - for _, epdata := range epochDataSlice { // cache epoch pivot info for late use - err := syncer.epochPivotWin.push(epdata.GetPivotBlock()) - if err != nil { - logger.WithField("pivotBlockEpoch", epdata.Number).WithError(err).Info( - "Cache syncer failed to push pivot block into epoch cache window", - ) - - syncer.epochPivotWin.reset() - break - } - } - - syncFrom, syncSize = syncer.syncWindow.shrinkFrom(uint32(len(epochDataSlice))) - - logger.WithFields(logrus.Fields{ - "newSyncFrom": syncFrom, "finalSyncSize": syncSize, - }).Debug("Cache syncer succeeded to sync epoch data range") - - return nil -} - -// Validate new received epoch from pubsub to check if it's continous to the last one or pivot chain switch. -func (syncer *KVCacheSyncer) validateNewReceivedEpoch(epoch *types.WebsocketEpochResponse) error { - newEpoch := epoch.EpochNumber.ToInt().Uint64() - - addrPtr := &(syncer.lastSubEpochNo) - lastSubEpochNo := atomic.LoadUint64(addrPtr) - - logger := logrus.WithFields(logrus.Fields{ - "newEpoch": newEpoch, - "lastSubEpochNo": lastSubEpochNo, - }) - - switch { - case lastSubEpochNo == citypes.EpochNumberNil: // initial state - logger.Debug("Cache syncer initially set last sub epoch number for validation") - - atomic.StoreUint64(addrPtr, newEpoch) - return nil - case lastSubEpochNo >= newEpoch: // pivot switch - logger.Info("Cache syncer validated pubsub new epoch pivot switched") - - atomic.StoreUint64(addrPtr, newEpoch) - return nil - case lastSubEpochNo+1 == newEpoch: // continuous - logger.Debug("Cache syncer validated pubsub new epoch continuous") - - atomic.StoreUint64(addrPtr, newEpoch) - return nil - default: // bad incontinuous epoch - return errors.Errorf("bad incontinuous epoch, expect %v got %v", lastSubEpochNo+1, newEpoch) - } -} - -func (syncer *KVCacheSyncer) getStoreLatestPivotHash() (types.Hash, error) { - if !syncer.syncWindow.isSet() { - return types.Hash(""), nil - } - - latestEpochNo := syncer.latestStoreEpoch() - - // load from in-memory cache first - if pivotHash, ok := syncer.epochPivotWin.getPivotHash(latestEpochNo); ok { - return pivotHash, nil - } - - // load from cache store - pivotBlock, err := syncer.cache.GetBlockSummaryByEpoch(context.Background(), latestEpochNo) - if err == nil { - return pivotBlock.CfxBlockSummary.Hash, nil - } - - if syncer.cache.IsRecordNotFound(err) { - return types.Hash(""), nil - } - - return types.Hash(""), errors.WithMessagef( - err, "failed to get block by epoch %v", latestEpochNo, - ) -} - -func (syncer *KVCacheSyncer) triggerCheckpoint() { - if len(syncer.checkPointCh) == 0 { - syncer.checkPointCh <- true - } -} - -// Load last sync epoch from cache store to continue synchronization. -func (syncer *KVCacheSyncer) loadLastSyncEpoch() (loaded bool, err error) { - _, maxEpoch, err := syncer.cache.GetGlobalEpochRange() - if err == nil { - syncer.syncWindow.reset(maxEpoch+1, maxEpoch) - return true, nil - } - - if !syncer.cache.IsRecordNotFound(err) { - return false, errors.WithMessage( - err, "failed to get global epoch range from cache", - ) - } - - return false, nil -} - -// implement the EpochSubscriber interface. - -func (syncer *KVCacheSyncer) onEpochReceived(epoch types.WebsocketEpochResponse) { - epochNo := epoch.EpochNumber.ToInt().Uint64() - - logger := logrus.WithField("epoch", epochNo) - logger.Debug("Cache syncer onEpochReceived new epoch received") - - if err := syncer.validateNewReceivedEpoch(&epoch); err != nil { - logger.WithError(err).Error( - "Cache syncer failed to validate new received epoch from pubsub", - ) - - // reset lastSubEpochNo - atomic.StoreUint64(&(syncer.lastSubEpochNo), citypes.EpochNumberNil) - return - } - - syncer.subEpochCh <- epochNo -} - -func (syncer *KVCacheSyncer) onEpochSubStart() { - logrus.Debug("Cache syncer onEpochSubStart event received") - - // reset lastSubEpochNo - atomic.StoreUint64(&(syncer.lastSubEpochNo), citypes.EpochNumberNil) - syncer.triggerCheckpoint() -} - -func (syncer *KVCacheSyncer) latestStoreEpoch() uint64 { - if syncer.syncWindow.epochFrom > 0 { - return syncer.syncWindow.epochFrom - 1 - } - - return 0 -} From bb3494a22809177af39497ca4e9f503facec5fac Mon Sep 17 00:00:00 2001 From: Conflux Date: Fri, 22 Mar 2024 16:20:31 +0800 Subject: [PATCH 2/2] minor mod --- cmd/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/sync.go b/cmd/sync.go index 2d448f36..81493fb6 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -102,7 +102,7 @@ func startSyncService(*cobra.Command, []string) { // startSyncServiceAdaptively adaptively starts kinds of sync server per to store instances. func startSyncServiceAdaptively(ctx context.Context, wg *sync.WaitGroup, syncCtx util.SyncContext) { - if syncCtx.CfxDB == nil && syncCtx.CfxCache == nil && syncCtx.EthDB == nil { + if syncCtx.CfxDB == nil && syncCtx.EthDB == nil { logrus.Fatal("No data sync configured") }