Skip to content

Commit

Permalink
Improve P2P Stream Handling & Stream Sync Enhancements (#4762)
Browse files Browse the repository at this point in the history
* remove error logs if stream already exist
* improve pickAvailableStream to continue looking for other available streams if any stream fails
* add thread safe map to request manager in p2p stream layer
* improve stream sync switching from long range to short range
* improve context for doSync loop in stream sync
* add error handling for context deadline exceed issue
* fix request manager test
* improve computeLongestHashChain and its whitelist
* improve countHashMaxVote
* introduce new configs to StagedStreamSync struct
* initialize new configs in stream sync and add them to logs
* pass node config to stream sync downloader, increase stream sync ticker time, init sync if not epoch chain
* add two new metrics to stream sync
* pass node configs to service and downloaders to have access to node configurations in stream sync
* using BeaconValidator and BecaonShard instead of BeaconNode, using EpochChain config to detect epochcian in stages, add acceptPartially to GetBlockHash to be able to get a part of hashes
* pass nodeconfig to downloaders
* remove BeaconNode from protocol, fix shortrange tests
* improve lock in request manager test
* update Failures option names, remove beaconNode from stream protocol
* fix shard id detection
* improve getBlockByMaxVote
* add one more checking for prestaking
* improve epoch and beacon node detection for stream sync
* fix passing roles to sync protocol
* remove nil error from logs
* fix switch between long range and short range in stream sync
* fix stream sync slowness
* improve short range helpers concurrency
* stop readMsgLoop once stream is closed
* stop discovery loop once stream manager is closing
* fix peer id in stream manager logs
* separate stream set and stream manager
  • Loading branch information
GheisMohammadi authored Oct 24, 2024
1 parent 8561e05 commit 2274c29
Show file tree
Hide file tree
Showing 42 changed files with 716 additions and 476 deletions.
20 changes: 13 additions & 7 deletions api/service/stagedstreamsync/block_hash_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ type (
)

func newBlockHashResults(bns []uint64) *blockHashResults {
results := make([]map[sttypes.StreamID]common.Hash, 0, len(bns))
for range bns {
results = append(results, make(map[sttypes.StreamID]common.Hash))
results := make([]map[sttypes.StreamID]common.Hash, len(bns))
for i := range bns {
results[i] = make(map[sttypes.StreamID]common.Hash)
}
return &blockHashResults{
bns: bns,
Expand All @@ -37,24 +37,30 @@ func (res *blockHashResults) addResult(hashes []common.Hash, stid sttypes.Stream
}
res.results[i][stid] = h
}
return
}

func (res *blockHashResults) computeLongestHashChain() ([]common.Hash, []sttypes.StreamID) {
var (
whitelist map[sttypes.StreamID]struct{}
hashChain []common.Hash
)

for _, result := range res.results {
hash, nextWl := countHashMaxVote(result, whitelist)
if hash == emptyHash {
break
break // Exit if emptyHash is encountered
}
hashChain = append(hashChain, hash)
whitelist = nextWl
// add nextWl stream IDs to whitelist
if len(nextWl) > 0 {
whitelist = make(map[sttypes.StreamID]struct{}, len(nextWl))
for st := range nextWl {
whitelist[st] = struct{}{}
}
}
}

sts := make([]sttypes.StreamID, 0, len(whitelist))
sts := make([]sttypes.StreamID, 0)
for st := range whitelist {
sts = append(sts, st)
}
Expand Down
12 changes: 12 additions & 0 deletions api/service/stagedstreamsync/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ const (
SnapSync // Download the chain and the state via compact snapshots
)

func (sm SyncMode) String() string {
switch sm {
case FullSync:
return "Full Sync"
case FastSync:
return "Fast Sync"
case SnapSync:
return "Snap Sync"
}
return "unknown"
}

type (
// Config is the downloader config
Config struct {
Expand Down
42 changes: 23 additions & 19 deletions api/service/stagedstreamsync/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ type (
// Downloader is responsible for sync task of one shard
Downloader struct {
bc blockChain
nodeConfig *nodeconfig.ConfigType
syncProtocol syncProtocol
bh *beaconHelper
stagedSyncInstance *StagedStreamSync
isBeaconNode bool

downloadC chan struct{}
closeC chan struct{}
Expand All @@ -38,7 +38,7 @@ type (
)

// NewDownloader creates a new downloader
func NewDownloader(host p2p.Host, bc core.BlockChain, consensus *consensus.Consensus, dbDir string, isBeaconNode bool, config Config) *Downloader {
func NewDownloader(host p2p.Host, bc core.BlockChain, nodeConfig *nodeconfig.ConfigType, consensus *consensus.Consensus, dbDir string, isBeaconNode bool, config Config) *Downloader {
config.fixValues()

sp := sync.NewProtocol(sync.Config{
Expand All @@ -48,6 +48,8 @@ func NewDownloader(host p2p.Host, bc core.BlockChain, consensus *consensus.Conse
ShardID: nodeconfig.ShardID(bc.ShardID()),
Network: config.Network,
BeaconNode: isBeaconNode,
Validator: nodeConfig.Role() == nodeconfig.Validator,
Explorer: nodeConfig.Role() == nodeconfig.ExplorerNode,
MaxAdvertiseWaitTime: config.MaxAdvertiseWaitTime,
SmSoftLowCap: config.SmSoftLowCap,
SmHardLowCap: config.SmHardLowCap,
Expand All @@ -69,18 +71,18 @@ func NewDownloader(host p2p.Host, bc core.BlockChain, consensus *consensus.Conse
ctx, cancel := context.WithCancel(context.Background())

// create an instance of staged sync for the downloader
stagedSyncInstance, err := CreateStagedSync(ctx, bc, consensus, dbDir, isBeaconNode, sp, config, logger)
stagedSyncInstance, err := CreateStagedSync(ctx, bc, nodeConfig, consensus, dbDir, sp, config, isBeaconNode, logger)
if err != nil {
cancel()
return nil
}

return &Downloader{
bc: bc,
nodeConfig: nodeConfig,
syncProtocol: sp,
bh: bh,
stagedSyncInstance: stagedSyncInstance,
isBeaconNode: isBeaconNode,

downloadC: make(chan struct{}),
closeC: make(chan struct{}),
Expand Down Expand Up @@ -198,13 +200,13 @@ func (d *Downloader) waitForEnoughStreams(requiredStreams int) (bool, int) {
}

func (d *Downloader) loop() {
ticker := time.NewTicker(10 * time.Second)
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

// for shard chain and beacon chain node, first we start with initSync=true to
// make sure it goes through the long range sync first.
// for epoch chain we do only need to go through epoch sync process
initSync := d.isBeaconNode || d.bc.ShardID() != shard.BeaconChainShardID
initSync := !d.stagedSyncInstance.isEpochChain

trigger := func() {
select {
Expand Down Expand Up @@ -267,20 +269,22 @@ func (d *Downloader) loop() {
d.bh.insertSync()
}
}
// if last doSync needed only to add a few blocks less than LastMileBlocksThreshold and
// the node is fully synced now, then switch to short range
// the reason why we need to check distanceBeforeSync is because, if it was long distance,
// very likely, there are a couple of new blocks have been added to the other nodes which
// we should still stay in long range and check them.
bnAfterSync := d.bc.CurrentBlock().NumberU64()
distanceBeforeSync := estimatedHeight - bnBeforeSync
distanceAfterSync := estimatedHeight - bnAfterSync
if estimatedHeight > 0 && addedBN > 0 &&
distanceBeforeSync <= uint64(LastMileBlocksThreshold) &&
distanceAfterSync <= uint64(LastMileBlocksThreshold) {
initSync = false
// If the last sync operation only a few blocks (less than LastMileBlocksThreshold)
// and the node is now fully synced, switch to short-range syncing.
// We check distanceBeforeSync to handle cases where the previous sync covered a long distance.
// In such cases, it’s likely that new blocks were added to other nodes during the sync process,
// so the node should remain in long-range mode to catch up with those blocks.
if initSync && addedBN > 0 {
bnAfterSync := d.bc.CurrentBlock().NumberU64()
distanceBeforeSync := estimatedHeight - bnBeforeSync
distanceAfterSync := estimatedHeight - bnAfterSync
// If after completing a full sync cycle, the node is still within the last mile block range,
// switch to short-range sync.
if distanceBeforeSync <= uint64(LastMileBlocksThreshold) &&
distanceAfterSync <= uint64(LastMileBlocksThreshold) {
initSync = false
}
}

case <-d.closeC:
return
}
Expand Down
5 changes: 3 additions & 2 deletions api/service/stagedstreamsync/downloaders.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/harmony-one/abool"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/p2p"
)

Expand All @@ -16,7 +17,7 @@ type Downloaders struct {
}

// NewDownloaders creates Downloaders for sync of multiple blockchains
func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, dbDir string, config Config) *Downloaders {
func NewDownloaders(host p2p.Host, bcs []core.BlockChain, nodeConfig *nodeconfig.ConfigType, consensus *consensus.Consensus, dbDir string, config Config) *Downloaders {
ds := make(map[uint32]*Downloader)
isBeaconNode := len(bcs) == 1
for _, bc := range bcs {
Expand All @@ -26,7 +27,7 @@ func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.C
if _, ok := ds[bc.ShardID()]; ok {
continue
}
ds[bc.ShardID()] = NewDownloader(host, bc, consensus, dbDir, isBeaconNode, config)
ds[bc.ShardID()] = NewDownloader(host, bc, nodeConfig, consensus, dbDir, isBeaconNode, config)
}
return &Downloaders{
ds: ds,
Expand Down
29 changes: 17 additions & 12 deletions api/service/stagedstreamsync/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ func getBlockByMaxVote(blocks []*types.Block) (*types.Block, error) {
if block == nil {
continue
}
if _, exist := hashesVote[block.Header().Hash()]; !exist {
hashesVote[block.Header().Hash()] = 0
}
hashesVote[block.Header().Hash()]++
if hashesVote[block.Header().Hash()] > maxVote {
maxVote = hashesVote[block.Header().Hash()]
Expand All @@ -94,6 +97,8 @@ func getBlockByMaxVote(blocks []*types.Block) (*types.Block, error) {
return blocks[maxVotedBlockIndex], nil
}

// countHashMaxVote counts the votes for each hash in the map, respecting the whitelist.
// It returns the hash with the most votes and the next whitelist of StreamIDs.
func countHashMaxVote(m map[sttypes.StreamID]common.Hash, whitelist map[sttypes.StreamID]struct{}) (common.Hash, map[sttypes.StreamID]struct{}) {
var (
voteM = make(map[common.Hash]int)
Expand All @@ -102,34 +107,34 @@ func countHashMaxVote(m map[sttypes.StreamID]common.Hash, whitelist map[sttypes.
)

for st, h := range m {
if h == emptyHash {
continue
}
// If a whitelist is provided, skip StreamIDs not in the whitelist
if len(whitelist) != 0 {
if _, ok := whitelist[st]; !ok {
continue
}
}
if _, ok := voteM[h]; !ok {
voteM[h] = 0
}
voteM[h]++
if voteM[h] > maxCnt {
maxCnt = voteM[h]
res = h
}
}

// Build the next whitelist based on the winning hash
nextWl := make(map[sttypes.StreamID]struct{})
for st, h := range m {
if h != res {
continue
}
if len(whitelist) != 0 {
if _, ok := whitelist[st]; ok {
nextWl[st] = struct{}{}
if res != emptyHash {
for st, h := range m {
if h == res {
if len(whitelist) == 0 || (len(whitelist) != 0 && whitelist[st] != struct{}{}) {
nextWl[st] = struct{}{}
}
}
} else {
nextWl[st] = struct{}{}
}
}

return res, nextWl
}

Expand Down
24 changes: 24 additions & 0 deletions api/service/stagedstreamsync/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ func init() {
longRangeSyncedBlockCounterVec,
longRangeFailInsertedBlockCounterVec,
numShortRangeCounterVec,
numEpochSyncCounterVec,
numFailedDownloadCounterVec,
numBlocksInsertedShortRangeHistogramVec,
numBlocksInsertedEpochSyncHistogramVec,
numBlocksInsertedBeaconHelperCounter,
)
}
Expand Down Expand Up @@ -60,6 +62,16 @@ var (
[]string{"ShardID"},
)

numEpochSyncCounterVec = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "hmy",
Subsystem: "staged_stream_sync",
Name: "num_epoch_sync",
Help: "number of epoch blocks sync is triggered",
},
[]string{"ShardID"},
)

numFailedDownloadCounterVec = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "hmy",
Expand All @@ -82,6 +94,18 @@ var (
[]string{"ShardID"},
)

numBlocksInsertedEpochSyncHistogramVec = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "hmy",
Subsystem: "staged_stream_sync",
Name: "num_blocks_inserted_epoch_sync",
Help: "number of blocks inserted for each epoch sync",
// Buckets: 0, 1, 2, 4, +INF (capped at 10)
Buckets: prometheus.ExponentialBuckets(0.5, 2, 5),
},
[]string{"ShardID"},
)

numBlocksInsertedBeaconHelperCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "hmy",
Expand Down
6 changes: 4 additions & 2 deletions api/service/stagedstreamsync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stagedstreamsync
import (
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/p2p"
)

Expand All @@ -12,9 +13,10 @@ type StagedStreamSyncService struct {
}

// NewService creates a new downloader service
func NewService(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, config Config, dbDir string) *StagedStreamSyncService {
func NewService(host p2p.Host, bcs []core.BlockChain, nodeConfig *nodeconfig.ConfigType,
consensus *consensus.Consensus, config Config, dbDir string) *StagedStreamSyncService {
return &StagedStreamSyncService{
Downloaders: NewDownloaders(host, bcs, consensus, dbDir, config),
Downloaders: NewDownloaders(host, bcs, nodeConfig, consensus, dbDir, config),
}
}

Expand Down
Loading

0 comments on commit 2274c29

Please sign in to comment.