Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve P2P Stream Handling & Stream Sync Enhancements #4762

Merged
merged 32 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e775f52
remove error logs if stream already exist
GheisMohammadi Sep 24, 2024
b4a5245
improve pickAvailableStream to continue looking for other available s…
GheisMohammadi Sep 24, 2024
e95caf8
add thread safe map to request manager in p2p stream layer
GheisMohammadi Sep 26, 2024
fd47604
improve stream sync switching from long range to short range
GheisMohammadi Sep 26, 2024
9fba37c
improve context for doSync loop in stream sync
GheisMohammadi Sep 26, 2024
57b5f2a
add error handling for context deadline exceed issue
GheisMohammadi Sep 26, 2024
ba70765
fix request manager test
GheisMohammadi Sep 27, 2024
42afaa4
improve computeLongestHashChain and its whitelist
GheisMohammadi Oct 1, 2024
0e1af64
improve countHashMaxVote
GheisMohammadi Oct 1, 2024
69eb0e7
introduce new configs to StagedStreamSync struct
GheisMohammadi Oct 1, 2024
e11a4c3
initialize new configs in stream sync and add them to logs
GheisMohammadi Oct 1, 2024
bf6db5f
pass node config to stream sync downloader, increase stream sync tick…
GheisMohammadi Oct 1, 2024
73a68cd
add two new metrics to stream sync
GheisMohammadi Oct 1, 2024
7cb2cd0
pass node configs to service and downloaders to have access to node c…
GheisMohammadi Oct 1, 2024
30d0450
using BeaconValidator and BecaonShard instead of BeaconNode, using Ep…
GheisMohammadi Oct 1, 2024
21916f6
pass nodeconfig to downloaders
GheisMohammadi Oct 1, 2024
0745b12
remove BeaconNode from protocol, fix shortrange tests
GheisMohammadi Oct 1, 2024
d1cabbb
improve lock in request manager test
GheisMohammadi Oct 1, 2024
64cc717
update Failures option names, remove beaconNode from stream protocol
GheisMohammadi Oct 1, 2024
1fdeacd
fix shard id detection
GheisMohammadi Oct 3, 2024
f22f4ce
improve getBlockByMaxVote
GheisMohammadi Oct 4, 2024
f6ca98c
add one more checking for prestaking
GheisMohammadi Oct 9, 2024
773376a
improve epoch and beacon node detection for stream sync
GheisMohammadi Oct 9, 2024
8b20262
fix passing roles to sync protocol
GheisMohammadi Oct 10, 2024
e84dc98
remove nil error from logs
GheisMohammadi Oct 10, 2024
a897b18
fix switch between long range and short range in stream sync
GheisMohammadi Oct 10, 2024
8ca6eff
fix stream sync slowness
GheisMohammadi Oct 10, 2024
1dea695
improve short range helpers concurrency
GheisMohammadi Oct 11, 2024
244480b
stop readMsgLoop once stream is closed
GheisMohammadi Oct 11, 2024
ddd2f87
stop discovery loop once stream manager is closing
GheisMohammadi Oct 11, 2024
60f41dc
fix peer id in stream manager logs
GheisMohammadi Oct 11, 2024
d646c3a
separate stream set and stream manager
GheisMohammadi Oct 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions api/service/stagedstreamsync/block_hash_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ type (
)

func newBlockHashResults(bns []uint64) *blockHashResults {
results := make([]map[sttypes.StreamID]common.Hash, 0, len(bns))
for range bns {
results = append(results, make(map[sttypes.StreamID]common.Hash))
results := make([]map[sttypes.StreamID]common.Hash, len(bns))
for i := range bns {
results[i] = make(map[sttypes.StreamID]common.Hash)
}
return &blockHashResults{
bns: bns,
Expand All @@ -37,24 +37,30 @@ func (res *blockHashResults) addResult(hashes []common.Hash, stid sttypes.Stream
}
res.results[i][stid] = h
}
return
}

func (res *blockHashResults) computeLongestHashChain() ([]common.Hash, []sttypes.StreamID) {
var (
whitelist map[sttypes.StreamID]struct{}
hashChain []common.Hash
)

for _, result := range res.results {
hash, nextWl := countHashMaxVote(result, whitelist)
if hash == emptyHash {
break
break // Exit if emptyHash is encountered
}
hashChain = append(hashChain, hash)
whitelist = nextWl
// add nextWl stream IDs to whitelist
if len(nextWl) > 0 {
whitelist = make(map[sttypes.StreamID]struct{}, len(nextWl))
for st := range nextWl {
whitelist[st] = struct{}{}
}
}
}

sts := make([]sttypes.StreamID, 0, len(whitelist))
sts := make([]sttypes.StreamID, 0)
for st := range whitelist {
sts = append(sts, st)
}
Expand Down
12 changes: 12 additions & 0 deletions api/service/stagedstreamsync/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ const (
SnapSync // Download the chain and the state via compact snapshots
)

func (sm SyncMode) String() string {
switch sm {
case FullSync:
return "Full Sync"
case FastSync:
return "Fast Sync"
case SnapSync:
return "Snap Sync"
}
return "unknown"
}

type (
// Config is the downloader config
Config struct {
Expand Down
42 changes: 23 additions & 19 deletions api/service/stagedstreamsync/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ type (
// Downloader is responsible for sync task of one shard
Downloader struct {
bc blockChain
nodeConfig *nodeconfig.ConfigType
syncProtocol syncProtocol
bh *beaconHelper
stagedSyncInstance *StagedStreamSync
isBeaconNode bool

downloadC chan struct{}
closeC chan struct{}
Expand All @@ -38,7 +38,7 @@ type (
)

// NewDownloader creates a new downloader
func NewDownloader(host p2p.Host, bc core.BlockChain, consensus *consensus.Consensus, dbDir string, isBeaconNode bool, config Config) *Downloader {
func NewDownloader(host p2p.Host, bc core.BlockChain, nodeConfig *nodeconfig.ConfigType, consensus *consensus.Consensus, dbDir string, isBeaconNode bool, config Config) *Downloader {
config.fixValues()

sp := sync.NewProtocol(sync.Config{
Expand All @@ -48,6 +48,8 @@ func NewDownloader(host p2p.Host, bc core.BlockChain, consensus *consensus.Conse
ShardID: nodeconfig.ShardID(bc.ShardID()),
Network: config.Network,
BeaconNode: isBeaconNode,
Validator: nodeConfig.Role() == nodeconfig.Validator,
Explorer: nodeConfig.Role() == nodeconfig.ExplorerNode,
MaxAdvertiseWaitTime: config.MaxAdvertiseWaitTime,
SmSoftLowCap: config.SmSoftLowCap,
SmHardLowCap: config.SmHardLowCap,
Expand All @@ -69,18 +71,18 @@ func NewDownloader(host p2p.Host, bc core.BlockChain, consensus *consensus.Conse
ctx, cancel := context.WithCancel(context.Background())

// create an instance of staged sync for the downloader
stagedSyncInstance, err := CreateStagedSync(ctx, bc, consensus, dbDir, isBeaconNode, sp, config, logger)
stagedSyncInstance, err := CreateStagedSync(ctx, bc, nodeConfig, consensus, dbDir, sp, config, isBeaconNode, logger)
if err != nil {
cancel()
return nil
}

return &Downloader{
bc: bc,
nodeConfig: nodeConfig,
syncProtocol: sp,
bh: bh,
stagedSyncInstance: stagedSyncInstance,
isBeaconNode: isBeaconNode,

downloadC: make(chan struct{}),
closeC: make(chan struct{}),
Expand Down Expand Up @@ -198,13 +200,13 @@ func (d *Downloader) waitForEnoughStreams(requiredStreams int) (bool, int) {
}

func (d *Downloader) loop() {
ticker := time.NewTicker(10 * time.Second)
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

// for shard chain and beacon chain node, first we start with initSync=true to
// make sure it goes through the long range sync first.
// for epoch chain we do only need to go through epoch sync process
initSync := d.isBeaconNode || d.bc.ShardID() != shard.BeaconChainShardID
initSync := !d.stagedSyncInstance.isEpochChain

trigger := func() {
select {
Expand Down Expand Up @@ -267,20 +269,22 @@ func (d *Downloader) loop() {
d.bh.insertSync()
}
}
// if last doSync needed only to add a few blocks less than LastMileBlocksThreshold and
// the node is fully synced now, then switch to short range
// the reason why we need to check distanceBeforeSync is because, if it was long distance,
// very likely, there are a couple of new blocks have been added to the other nodes which
// we should still stay in long range and check them.
bnAfterSync := d.bc.CurrentBlock().NumberU64()
distanceBeforeSync := estimatedHeight - bnBeforeSync
distanceAfterSync := estimatedHeight - bnAfterSync
if estimatedHeight > 0 && addedBN > 0 &&
distanceBeforeSync <= uint64(LastMileBlocksThreshold) &&
distanceAfterSync <= uint64(LastMileBlocksThreshold) {
initSync = false
// If the last sync operation only a few blocks (less than LastMileBlocksThreshold)
// and the node is now fully synced, switch to short-range syncing.
// We check distanceBeforeSync to handle cases where the previous sync covered a long distance.
// In such cases, it’s likely that new blocks were added to other nodes during the sync process,
// so the node should remain in long-range mode to catch up with those blocks.
if initSync && addedBN > 0 {
bnAfterSync := d.bc.CurrentBlock().NumberU64()
distanceBeforeSync := estimatedHeight - bnBeforeSync
distanceAfterSync := estimatedHeight - bnAfterSync
// If after completing a full sync cycle, the node is still within the last mile block range,
// switch to short-range sync.
if distanceBeforeSync <= uint64(LastMileBlocksThreshold) &&
distanceAfterSync <= uint64(LastMileBlocksThreshold) {
initSync = false
}
}

case <-d.closeC:
return
}
Expand Down
5 changes: 3 additions & 2 deletions api/service/stagedstreamsync/downloaders.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/harmony-one/abool"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/p2p"
)

Expand All @@ -16,7 +17,7 @@ type Downloaders struct {
}

// NewDownloaders creates Downloaders for sync of multiple blockchains
func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, dbDir string, config Config) *Downloaders {
func NewDownloaders(host p2p.Host, bcs []core.BlockChain, nodeConfig *nodeconfig.ConfigType, consensus *consensus.Consensus, dbDir string, config Config) *Downloaders {
ds := make(map[uint32]*Downloader)
isBeaconNode := len(bcs) == 1
for _, bc := range bcs {
Expand All @@ -26,7 +27,7 @@ func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.C
if _, ok := ds[bc.ShardID()]; ok {
continue
}
ds[bc.ShardID()] = NewDownloader(host, bc, consensus, dbDir, isBeaconNode, config)
ds[bc.ShardID()] = NewDownloader(host, bc, nodeConfig, consensus, dbDir, isBeaconNode, config)
}
return &Downloaders{
ds: ds,
Expand Down
29 changes: 17 additions & 12 deletions api/service/stagedstreamsync/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ func getBlockByMaxVote(blocks []*types.Block) (*types.Block, error) {
if block == nil {
continue
}
if _, exist := hashesVote[block.Header().Hash()]; !exist {
hashesVote[block.Header().Hash()] = 0
}
hashesVote[block.Header().Hash()]++
if hashesVote[block.Header().Hash()] > maxVote {
maxVote = hashesVote[block.Header().Hash()]
Expand All @@ -94,6 +97,8 @@ func getBlockByMaxVote(blocks []*types.Block) (*types.Block, error) {
return blocks[maxVotedBlockIndex], nil
}

// countHashMaxVote counts the votes for each hash in the map, respecting the whitelist.
// It returns the hash with the most votes and the next whitelist of StreamIDs.
func countHashMaxVote(m map[sttypes.StreamID]common.Hash, whitelist map[sttypes.StreamID]struct{}) (common.Hash, map[sttypes.StreamID]struct{}) {
var (
voteM = make(map[common.Hash]int)
Expand All @@ -102,34 +107,34 @@ func countHashMaxVote(m map[sttypes.StreamID]common.Hash, whitelist map[sttypes.
)

for st, h := range m {
if h == emptyHash {
continue
}
// If a whitelist is provided, skip StreamIDs not in the whitelist
if len(whitelist) != 0 {
if _, ok := whitelist[st]; !ok {
continue
}
}
if _, ok := voteM[h]; !ok {
voteM[h] = 0
}
voteM[h]++
if voteM[h] > maxCnt {
maxCnt = voteM[h]
res = h
}
}

// Build the next whitelist based on the winning hash
nextWl := make(map[sttypes.StreamID]struct{})
for st, h := range m {
if h != res {
continue
}
if len(whitelist) != 0 {
if _, ok := whitelist[st]; ok {
nextWl[st] = struct{}{}
if res != emptyHash {
for st, h := range m {
if h == res {
if len(whitelist) == 0 || (len(whitelist) != 0 && whitelist[st] != struct{}{}) {
nextWl[st] = struct{}{}
}
}
} else {
nextWl[st] = struct{}{}
}
}

return res, nextWl
}

Expand Down
24 changes: 24 additions & 0 deletions api/service/stagedstreamsync/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ func init() {
longRangeSyncedBlockCounterVec,
longRangeFailInsertedBlockCounterVec,
numShortRangeCounterVec,
numEpochSyncCounterVec,
numFailedDownloadCounterVec,
numBlocksInsertedShortRangeHistogramVec,
numBlocksInsertedEpochSyncHistogramVec,
numBlocksInsertedBeaconHelperCounter,
)
}
Expand Down Expand Up @@ -60,6 +62,16 @@ var (
[]string{"ShardID"},
)

numEpochSyncCounterVec = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "hmy",
Subsystem: "staged_stream_sync",
Name: "num_epoch_sync",
Help: "number of epoch blocks sync is triggered",
},
[]string{"ShardID"},
)

numFailedDownloadCounterVec = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "hmy",
Expand All @@ -82,6 +94,18 @@ var (
[]string{"ShardID"},
)

numBlocksInsertedEpochSyncHistogramVec = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "hmy",
Subsystem: "staged_stream_sync",
Name: "num_blocks_inserted_epoch_sync",
Help: "number of blocks inserted for each epoch sync",
GheisMohammadi marked this conversation as resolved.
Show resolved Hide resolved
// Buckets: 0, 1, 2, 4, +INF (capped at 10)
Buckets: prometheus.ExponentialBuckets(0.5, 2, 5),
},
[]string{"ShardID"},
)

numBlocksInsertedBeaconHelperCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "hmy",
Expand Down
6 changes: 4 additions & 2 deletions api/service/stagedstreamsync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stagedstreamsync
import (
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/p2p"
)

Expand All @@ -12,9 +13,10 @@ type StagedStreamSyncService struct {
}

// NewService creates a new downloader service
func NewService(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, config Config, dbDir string) *StagedStreamSyncService {
func NewService(host p2p.Host, bcs []core.BlockChain, nodeConfig *nodeconfig.ConfigType,
consensus *consensus.Consensus, config Config, dbDir string) *StagedStreamSyncService {
return &StagedStreamSyncService{
Downloaders: NewDownloaders(host, bcs, consensus, dbDir, config),
Downloaders: NewDownloaders(host, bcs, nodeConfig, consensus, dbDir, config),
}
}

Expand Down
Loading