Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abandon pubsub usage for chain sync #178

Merged
merged 2 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ Confura is comprised of serveral components as below:

### Blockchain Sync

You can use the `sync` subcommand to start sync service, including DB/KV/ETH sync as well as fast catchup.
You can use the `sync` subcommand to start sync service, including DB/ETH sync as well as fast catchup.

> Usage:
> confura sync [flags]
Expand All @@ -114,7 +114,6 @@ You can use the `sync` subcommand to start sync service, including DB/KV/ETH syn
>
> --db start core space DB sync server
> --eth start ETH sync server
> --kv start core space KV sync server
> --catchup start core space fast catchup server
> --adaptive automatically adjust target epoch number to the latest stable epoch
> --benchmark benchmarking the performance during fast catch-up sync (default true)
Expand Down Expand Up @@ -246,7 +245,7 @@ confura-ethsync ./confura sync --eth Up
confura-node-management ./confura nm --cfx Up 0.0.0.0:22530->22530/tcp,:::22530->22530/tcp
confura-redis docker-entrypoint.sh redis ... Up 0.0.0.0:53779->6379/tcp
confura-rpc ./confura rpc --cfx Up 0.0.0.0:22537->22537/tcp,:::22537->22537/tcp
confura-sync ./confura sync --db --kv Up
confura-sync ./confura sync --db Up
confura-virtual-filter ./confura vf Up 0.0.0.0:48545->48545/tcp,:::48545->48545/tcp
```

Expand Down
66 changes: 5 additions & 61 deletions cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sync"

"github.com/Conflux-Chain/confura/cmd/util"
"github.com/Conflux-Chain/confura/store"
cisync "github.com/Conflux-Chain/confura/sync"
"github.com/Conflux-Chain/confura/sync/catchup"
"github.com/sirupsen/logrus"
Expand All @@ -16,7 +15,6 @@ var (
// sync boot options
syncOpt struct {
dbSyncEnabled bool
kvSyncEnabled bool
ethSyncEnabled bool
catchupEnabled bool
}
Expand All @@ -30,7 +28,7 @@ var (

syncCmd = &cobra.Command{
Use: "sync",
Short: "Start sync service, including DB/KV/ETH sync, as well as fast catchup",
Short: "Start sync service, including DB/ETH sync, as well as fast catchup",
Run: startSyncService,
}
)
Expand All @@ -41,11 +39,6 @@ func init() {
&syncOpt.dbSyncEnabled, "db", false, "start core space DB sync server",
)

// boot flag for core space DB sync
syncCmd.Flags().BoolVar(
&syncOpt.kvSyncEnabled, "kv", false, "start core space KV sync server",
)

// boot flag for evm space sync
syncCmd.Flags().BoolVar(
&syncOpt.ethSyncEnabled, "eth", false, "start ETH sync server",
Expand Down Expand Up @@ -78,7 +71,7 @@ func init() {
}

func startSyncService(*cobra.Command, []string) {
if !syncOpt.dbSyncEnabled && !syncOpt.kvSyncEnabled &&
if !syncOpt.dbSyncEnabled &&
!syncOpt.ethSyncEnabled && !syncOpt.catchupEnabled {
logrus.Fatal("No Sync server specified")
}
Expand All @@ -92,22 +85,8 @@ func startSyncService(*cobra.Command, []string) {
syncCtx := util.MustInitSyncContext(storeCtx)
defer syncCtx.Close()

var subs []cisync.EpochSubscriber

if syncOpt.dbSyncEnabled { // start DB sync
syncer := startSyncCfxDatabase(ctx, &wg, syncCtx)
subs = append(subs, syncer)
}

if syncOpt.kvSyncEnabled { // start KV sync
if syncer := startSyncCfxCache(ctx, &wg, syncCtx); syncer != nil {
subs = append(subs, syncer)
}
}

if len(subs) > 0 { // monitor pivot chain switch via pub/sub
logrus.Info("Start to pub/sub epoch to monitor pivot chain switch")
go cisync.MustSubEpoch(ctx, &wg, syncCtx.SubCfx, subs...)
startSyncCfxDatabase(ctx, &wg, syncCtx)
}

if syncOpt.catchupEnabled { // start fast catchup
Expand All @@ -123,26 +102,12 @@ func startSyncService(*cobra.Command, []string) {

// startSyncServiceAdaptively adaptively starts kinds of sync server per to store instances.
func startSyncServiceAdaptively(ctx context.Context, wg *sync.WaitGroup, syncCtx util.SyncContext) {
if syncCtx.CfxDB == nil && syncCtx.CfxCache == nil && syncCtx.EthDB == nil {
if syncCtx.CfxDB == nil && syncCtx.EthDB == nil {
logrus.Fatal("No data sync configured")
}

var subs []cisync.EpochSubscriber

if syncCtx.CfxDB != nil { // start DB sync
syncer := startSyncCfxDatabase(ctx, wg, syncCtx)
subs = append(subs, syncer)
}

if syncCtx.CfxCache != nil { // start KV sync
if syncer := startSyncCfxCache(ctx, wg, syncCtx); syncer != nil {
subs = append(subs, syncer)
}
}

if len(subs) > 0 { // monitor pivot chain switch via pub/sub
logrus.Info("Start to pub/sub epoch to monitor pivot chain switch")
go cisync.MustSubEpoch(ctx, wg, syncCtx.SubCfx, subs...)
startSyncCfxDatabase(ctx, wg, syncCtx)
}

if syncCtx.EthDB != nil { // start ETH sync
Expand All @@ -162,27 +127,6 @@ func startSyncCfxDatabase(ctx context.Context, wg *sync.WaitGroup, syncCtx util.
return syncer
}

func startSyncCfxCache(ctx context.Context, wg *sync.WaitGroup, syncCtx util.SyncContext) *cisync.KVCacheSyncer {
if store.StoreConfig().IsChainBlockDisabled() &&
store.StoreConfig().IsChainTxnDisabled() &&
store.StoreConfig().IsChainReceiptDisabled() {
// KV sync only syncs block, transaction and receipt data. If all of them are disabled,
// nothing needs to sync, just stop right here.
return nil
}

logrus.Info("Start to sync core space blockchain data into cache")

csyncer := cisync.MustNewKVCacheSyncer(syncCtx.SyncCfx, syncCtx.CfxCache)
go csyncer.Sync(ctx, wg)

// start core space cache prune
cpruner := cisync.MustNewKVCachePruner(syncCtx.CfxCache)
go cpruner.Prune(ctx, wg)

return csyncer
}

func startSyncEthDatabase(ctx context.Context, wg *sync.WaitGroup, syncCtx util.SyncContext) {
logrus.Info("Start to sync evm space blockchain data into database")

Expand Down
6 changes: 4 additions & 2 deletions cmd/util/data_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func MustInitSyncContext(storeCtx StoreContext) SyncContext {
func (ctx *SyncContext) Close() {
// Usually, storeContext will be defer closed by itself
// ctx.storeContext.Close()

if ctx.SyncCfx != nil {
ctx.SyncCfx.Close()
}
Expand All @@ -105,6 +106,7 @@ func (ctx *SyncContext) Close() {
ctx.SubCfx.Close()
}

// not provided yet!
// ctx.syncEth.Close()
if ctx.SyncEth != nil {
ctx.SyncEth.Close()
}
}
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ services:
# blockchain sync
chain-sync:
image: conflux/confura:latest
command: sync --db --kv
command: sync --db
restart: unless-stopped
depends_on:
- db
Expand Down
210 changes: 0 additions & 210 deletions sync/epoch_sub.go

This file was deleted.

Loading
Loading