Skip to content

Commit

Permalink
Merge branch 'feat/sovereign-mainchain-header-sync' into MX-15953-cro…
Browse files Browse the repository at this point in the history
…ss-chain-header-storage-sync
  • Loading branch information
mariusmihaic authored Oct 29, 2024
2 parents 3d1706a + 969aa87 commit da1da54
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 27 deletions.
2 changes: 1 addition & 1 deletion epochStart/bootstrap/bootStrapSovereignShardProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (sbp *sovereignBootStrapShardProcessor) createEpochStartInterceptorsContain
func (bp *sovereignBootStrapShardProcessor) createCrossHeaderRequester() (updateSync.CrossHeaderRequester, error) {
extendedHeaderRequester, castOk := bp.requestHandler.(updateSync.ExtendedShardHeaderRequestHandler)
if !castOk {
return nil, fmt.Errorf("%w in sovereignBootStrapShardProcessor.createHeadersSyncer for extendedHeaderRequester", process.ErrWrongTypeAssertion)
return nil, fmt.Errorf("%w in sovereignBootStrapShardProcessor.createCrossHeaderRequester for extendedHeaderRequester", process.ErrWrongTypeAssertion)
}

return updateSync.NewExtendedHeaderRequester(extendedHeaderRequester)
Expand Down
4 changes: 2 additions & 2 deletions epochStart/bootstrap/bootStrapSovereignShardProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func TestBootStrapSovereignShardProcessor_createEpochStartInterceptorsContainers
require.Zero(t, fullContainer.Len())
}

func TestBootStrapSovereignShardProcessor_createHeadersSyncer(t *testing.T) {
func TestBootStrapSovereignShardProcessor_createCrossHeaderRequester(t *testing.T) {
t.Parallel()

sovProc := createSovBootStrapProc()
Expand All @@ -307,6 +307,6 @@ func TestBootStrapSovereignShardProcessor_createHeadersSyncer(t *testing.T) {

sovProc.requestHandler = &testscommon.ExtendedShardHeaderRequestHandlerStub{}
requester, err = sovProc.createCrossHeaderRequester()
require.NotNil(t, requester)
require.Nil(t, err)
require.Equal(t, "*sync.extendedHeaderRequester", fmt.Sprintf("%T", requester))
}
16 changes: 14 additions & 2 deletions process/block/sovereign/incomingHeader/extendedHeaderProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import (
"github.com/multiversx/mx-chain-core-go/data/transaction"
"github.com/multiversx/mx-chain-core-go/hashing"
"github.com/multiversx/mx-chain-core-go/marshal"

"github.com/multiversx/mx-chain-go/process"
)

type extendedHeaderProcessor struct {
headersPool HeadersPool
txPool TransactionPool
marshaller marshal.Marshalizer
hasher hashing.Hasher
}
Expand Down Expand Up @@ -80,15 +83,24 @@ func (ehp *extendedHeaderProcessor) addPreGenesisExtendedHeaderToPool(incomingHe
IncomingEvents: []*transaction.Event{},
}

return ehp.addExtendedHeaderToPool(extendedHeader)
return ehp.addExtendedHeaderAndSCRsToPool(extendedHeader, make([]*scrInfo, 0))
}

func (ehp *extendedHeaderProcessor) addExtendedHeaderToPool(extendedHeader data.ShardHeaderExtendedHandler) error {
func (ehp *extendedHeaderProcessor) addExtendedHeaderAndSCRsToPool(extendedHeader data.ShardHeaderExtendedHandler, scrs []*scrInfo) error {
extendedHeaderHash, err := core.CalculateHash(ehp.marshaller, ehp.hasher, extendedHeader)
if err != nil {
return err
}

ehp.addSCRsToPool(scrs)
ehp.headersPool.AddHeaderInShard(extendedHeaderHash, extendedHeader, core.MainChainShardId)
return nil
}

func (ehp *extendedHeaderProcessor) addSCRsToPool(scrs []*scrInfo) {
cacheID := process.ShardCacherIdentifier(core.MainChainShardId, core.SovereignChainShardId)

for _, scrData := range scrs {
ehp.txPool.AddData(scrData.hash, scrData.scr, scrData.scr.Size(), cacheID)
}
}
21 changes: 5 additions & 16 deletions process/block/sovereign/incomingHeader/incomingHeaderProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@ package incomingHeader
import (
"encoding/hex"

sovereignBlock "github.com/multiversx/mx-chain-go/dataRetriever/dataPool/sovereign"
"github.com/multiversx/mx-chain-go/errors"
"github.com/multiversx/mx-chain-go/process"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/data"
"github.com/multiversx/mx-chain-core-go/data/sovereign"
"github.com/multiversx/mx-chain-core-go/hashing"
"github.com/multiversx/mx-chain-core-go/marshal"
logger "github.com/multiversx/mx-chain-logger-go"

sovereignBlock "github.com/multiversx/mx-chain-go/dataRetriever/dataPool/sovereign"
"github.com/multiversx/mx-chain-go/errors"
)

var log = logger.GetOrCreate("headerSubscriber")
Expand All @@ -34,7 +33,6 @@ type incomingHeaderProcessor struct {
eventsProc *incomingEventsProcessor
extendedHeaderProc *extendedHeaderProcessor

txPool TransactionPool
outGoingPool sovereignBlock.OutGoingOperationsPool
mainChainNotarizationStartRound uint64
}
Expand Down Expand Up @@ -74,6 +72,7 @@ func NewIncomingHeaderProcessor(args ArgsIncomingHeaderProcessor) (*incomingHead

extendedHearProc := &extendedHeaderProcessor{
headersPool: args.HeadersPool,
txPool: args.TxPool,
marshaller: args.Marshaller,
hasher: args.Hasher,
}
Expand All @@ -83,7 +82,6 @@ func NewIncomingHeaderProcessor(args ArgsIncomingHeaderProcessor) (*incomingHead
return &incomingHeaderProcessor{
eventsProc: eventsProc,
extendedHeaderProc: extendedHearProc,
txPool: args.TxPool,
outGoingPool: args.OutGoingOperationsPool,
mainChainNotarizationStartRound: args.MainChainNotarizationStartRound,
}, nil
Expand Down Expand Up @@ -127,24 +125,15 @@ func (ihp *incomingHeaderProcessor) AddHeader(headerHash []byte, header sovereig
return err
}

err = ihp.extendedHeaderProc.addExtendedHeaderToPool(extendedHeader)
err = ihp.extendedHeaderProc.addExtendedHeaderAndSCRsToPool(extendedHeader, res.scrs)
if err != nil {
return err
}

ihp.addConfirmedBridgeOpsToPool(res.confirmedBridgeOps)
ihp.addSCRsToPool(res.scrs)
return nil
}

func (ihp *incomingHeaderProcessor) addSCRsToPool(scrs []*scrInfo) {
cacheID := process.ShardCacherIdentifier(core.MainChainShardId, core.SovereignChainShardId)

for _, scrData := range scrs {
ihp.txPool.AddData(scrData.hash, scrData.scr, scrData.scr.Size(), cacheID)
}
}

func (ihp *incomingHeaderProcessor) addConfirmedBridgeOpsToPool(ops []*confirmedBridgeOp) {
for _, op := range ops {
// This is not a critical error. This might just happen when a leader tries to re-send unconfirmed confirmation
Expand Down
18 changes: 12 additions & 6 deletions process/block/sovereignChainBlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,23 +834,29 @@ func (scbp *sovereignChainBlockProcessor) checkExtendedShardHeadersValidity() er
return err
}

log.Trace("checkExtendedShardHeadersValidity",
"lastCrossNotarizedHeader nonce", lastCrossNotarizedHeader.GetNonce(),
"lastCrossNotarizedHeader round", lastCrossNotarizedHeader.GetRound(),
)

extendedShardHdrs := scbp.sortExtendedShardHeadersForCurrentBlockByNonce()
if len(extendedShardHdrs) == 0 {
return nil
}

log.Trace("checkExtendedShardHeadersValidity",
"lastCrossNotarizedHeader nonce", lastCrossNotarizedHeader.GetNonce(),
"lastCrossNotarizedHeader round", lastCrossNotarizedHeader.GetRound(),
)
if scbp.receivedGenesisMainChainHeaderWithoutPreGenesis(extendedShardHdrs[0]) {
if scbp.isGenesisHeaderWithNoPreviousTracking(extendedShardHdrs[0]) {
// we are missing pre-genesis header, so we can't link it to previous header
if len(extendedShardHdrs) == 1 {
return nil
}

lastCrossNotarizedHeader = extendedShardHdrs[0]
extendedShardHdrs = extendedShardHdrs[1:]

log.Debug("checkExtendedShardHeadersValidity missing pre genesis, updating lastCrossNotarizedHeader",
"lastCrossNotarizedHeader nonce", lastCrossNotarizedHeader.GetNonce(),
"lastCrossNotarizedHeader round", lastCrossNotarizedHeader.GetRound(),
)
}

for _, extendedShardHdr := range extendedShardHdrs {
Expand All @@ -873,7 +879,7 @@ func (scbp *sovereignChainBlockProcessor) checkExtendedShardHeadersValidity() er
// - no notifier is attached => we did not track main chain and don't have pre-genesis header
// - node is in re-sync/start in the exact epoch when we start to notarize main chain => no previous
// main chain tracking(notifier is also disabled)
func (scbp *sovereignChainBlockProcessor) receivedGenesisMainChainHeaderWithoutPreGenesis(incomingHeader data.HeaderHandler) bool {
func (scbp *sovereignChainBlockProcessor) isGenesisHeaderWithNoPreviousTracking(incomingHeader data.HeaderHandler) bool {
return scbp.extendedShardHeaderTracker.IsGenesisLastCrossNotarizedHeader() && incomingHeader.GetRound() == scbp.mainChainNotarizationStartRound
}

Expand Down
1 change: 1 addition & 0 deletions process/track/sovereignChainShardBlockTrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (scsbt *sovereignChainShardBlockTrack) receivedExtendedShardHeader(
scsbt.blockProcessor.ProcessReceivedHeader(extendedShardHeaderHandler)
}

// IsGenesisLastCrossNotarizedHeader returns true if the last cross chain notarized header is the dummy genesis header
func (scsbt *sovereignChainShardBlockTrack) IsGenesisLastCrossNotarizedHeader() bool {
lastNotarizedHeader, _, err := scsbt.crossNotarizer.GetLastNotarizedHeader(core.MainChainShardId)

Expand Down

0 comments on commit da1da54

Please sign in to comment.