Skip to content

Commit

Permalink
Merge pull request #17 from ClarkChenc/adjust-checkpoint-interval
Browse files Browse the repository at this point in the history
Adjust checkpoint interval
  • Loading branch information
zhang0125 authored Aug 16, 2022
2 parents 45b9b0a + 83cdb9c commit 65885b4
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 95 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v1
with:
go-version: 1.13
go-version: 1.16
- name: "Build binaries"
run: make build
- name: "Run tests"
Expand Down
16 changes: 13 additions & 3 deletions bridge/setu/listener/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Listener interface {

StartHeaderProcess(context.Context)

StartPolling(context.Context, time.Duration)
StartPolling(context.Context, time.Duration, bool)

StartSubscription(context.Context, ethereum.Subscription)

Expand Down Expand Up @@ -156,23 +156,33 @@ func (bl *BaseListener) StartHeaderProcess(ctx context.Context) {
}

// startPolling starts polling
func (bl *BaseListener) StartPolling(ctx context.Context, pollInterval time.Duration) {
// needAlign is used to decide whether the ticker is align to 1970 UTC.
// if true, the ticker will always tick as it begins at 1970 UTC.
func (bl *BaseListener) StartPolling(ctx context.Context, pollInterval time.Duration, needAlign bool) {
// How often to fire the passed in function in second
interval := pollInterval
firstInterval := interval
if needAlign {
now := time.Now()
baseTime := time.Unix(0, 0)
firstInterval = interval - (now.UTC().Sub(baseTime) % interval)
}

// Setup the ticket and the channel to signal
// the ending of the interval
ticker := time.NewTicker(interval)
ticker := time.NewTicker(firstInterval)

// start listening
for {
select {
case <-ticker.C:
ticker.Reset(interval)
header, err := bl.chainClient.HeaderByNumber(ctx, nil)
if err == nil && header != nil {
// send data to channel
bl.HeaderChannel <- header
}

case <-ctx.Done():
bl.Logger.Info("Polling stopped")
ticker.Stop()
Expand Down
15 changes: 12 additions & 3 deletions bridge/setu/listener/heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (hl *HeimdallListener) Start() error {
}

hl.Logger.Info("Start polling for events", "pollInterval", pollInterval)
hl.StartPolling(headerCtx, pollInterval)
hl.StartPolling(headerCtx, pollInterval, false)
return nil
}

Expand All @@ -57,13 +57,21 @@ func (hl *HeimdallListener) ProcessHeader(*types.Header) {
}

// StartPolling - starts polling for heimdall events
func (hl *HeimdallListener) StartPolling(ctx context.Context, pollInterval time.Duration) {
// needAlign is used to decide whether the ticker is align to 1970 UTC.
// if true, the ticker will always tick as it begins at 1970 UTC.
func (hl *HeimdallListener) StartPolling(ctx context.Context, pollInterval time.Duration, needAlign bool) {
// How often to fire the passed in function in second
interval := pollInterval
firstInterval := interval
if needAlign {
now := time.Now()
baseTime := time.Unix(0, 0)
firstInterval = interval - (now.UTC().Sub(baseTime) % interval)
}

// Setup the ticket and the channel to signal
// the ending of the interval
ticker := time.NewTicker(interval)
ticker := time.NewTicker(firstInterval)

// var eventTypes []string
// eventTypes = append(eventTypes, "message.action='checkpoint'")
Expand All @@ -75,6 +83,7 @@ func (hl *HeimdallListener) StartPolling(ctx context.Context, pollInterval time.
for {
select {
case <-ticker.C:
ticker.Reset(interval)
fromBlock, toBlock, err := hl.fetchFromAndToBlock()
if err != nil {
hl.Logger.Error("Error fetching fromBlock and toBlock...skipping events query", "error", err)
Expand Down
2 changes: 1 addition & 1 deletion bridge/setu/listener/maticchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (ml *MaticChainListener) Start() error {
if err != nil {
// start go routine to poll for new header using client object
ml.Logger.Info("Start polling for header blocks", "pollInterval", helper.GetConfig().CheckpointerPollInterval)
go ml.StartPolling(ctx, helper.GetConfig().CheckpointerPollInterval)
go ml.StartPolling(ctx, helper.GetConfig().CheckpointerPollInterval, true)
} else {
// start go routine to listen new header using subscription
go ml.StartSubscription(ctx, subscription)
Expand Down
2 changes: 1 addition & 1 deletion bridge/setu/listener/rootchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (rl *RootChainListener) Start() error {
// start go routine to poll for new header using client object
rl.Logger.Info("Start polling for root chain header blocks",
"root", rl.rootChainType, "pollInterval", rl.pollInterval)
go rl.StartPolling(ctx, rl.pollInterval)
go rl.StartPolling(ctx, rl.pollInterval, false)
} else {
// start go routine to listen new header using subscription
go rl.StartSubscription(ctx, subscription)
Expand Down
27 changes: 19 additions & 8 deletions bridge/setu/listener/tron.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,30 +71,41 @@ func (tl *TronListener) Start() error {

tl.Logger.Info("Start polling for events", "pollInterval", pollInterval)
// poll for new header using client object
go tl.StartPolling(headerCtx, pollInterval)
go tl.StartPolling(headerCtx, pollInterval, false)
return nil
}

// startPolling starts polling
func (tl *TronListener) StartPolling(ctx context.Context, pollInterval time.Duration) {
// needAlign is used to decide whether the ticker is align to 1970 UTC.
// if true, the ticker will always tick as it begins at 1970 UTC.
func (tl *TronListener) StartPolling(ctx context.Context, pollInterval time.Duration, needAlign bool) {
// How often to fire the passed in function in second
interval := pollInterval
firstInterval := interval

if needAlign {
now := time.Now()
baseTime := time.Unix(0, 0)
firstInterval = interval - (now.UTC().Sub(baseTime) % interval)
}

// Setup the ticket and the channel to signal
// the ending of the interval
ticker := time.NewTicker(interval)
ticker := time.NewTicker(firstInterval)

// start listening
for {
select {
case <-ticker.C:
ticker.Reset(interval)
headerNum, err := tl.contractConnector.GetTronLatestBlockNumber()
if err == nil {
// send data to channel
tl.HeaderChannel <- &(ethTypes.Header{
Number: big.NewInt(headerNum),
})
}

case <-ctx.Done():
tl.Logger.Info("Polling stopped")
ticker.Stop()
Expand Down Expand Up @@ -214,7 +225,7 @@ func (tl *TronListener) queryAndBroadcastEvents(chainManagerParams *chainmanager
// topup has to be processed first before validator join. so adding delay.
delay := util.TaskDelayBetweenEachVal
tl.sendTaskWithDelay("sendValidatorJoinToHeimdall", selectedEvent.Name, logBytes, delay)
} else if isCurrentValidator, delay := util.CalculateTaskDelayWithOffset(tl.cliCtx,1); isCurrentValidator {
} else if isCurrentValidator, delay := util.CalculateTaskDelayWithOffset(tl.cliCtx, 1); isCurrentValidator {
// topup has to be processed first before validator join. so adding delay.
delay = delay + util.TaskDelayBetweenEachVal
tl.sendTaskWithDelay("sendValidatorJoinToHeimdall", selectedEvent.Name, logBytes, delay)
Expand All @@ -237,7 +248,7 @@ func (tl *TronListener) queryAndBroadcastEvents(chainManagerParams *chainmanager
}
if bytes.Equal(event.SignerPubkey, pubkeyBytes) {
tl.sendTaskWithDelay("sendSignerChangeToHeimdall", selectedEvent.Name, logBytes, 0)
} else if isCurrentValidator, delay := util.CalculateTaskDelayWithOffset(tl.cliCtx,1); isCurrentValidator {
} else if isCurrentValidator, delay := util.CalculateTaskDelayWithOffset(tl.cliCtx, 1); isCurrentValidator {
tl.sendTaskWithDelay("sendSignerChangeToHeimdall", selectedEvent.Name, logBytes, delay)
}

Expand All @@ -248,7 +259,7 @@ func (tl *TronListener) queryAndBroadcastEvents(chainManagerParams *chainmanager
}
if util.IsEventSender(tl.cliCtx, event.ValidatorId.Uint64()) {
tl.sendTaskWithDelay("sendUnstakeInitToHeimdall", selectedEvent.Name, logBytes, 0)
} else if isCurrentValidator, delay := util.CalculateTaskDelayWithOffset(tl.cliCtx,1); isCurrentValidator {
} else if isCurrentValidator, delay := util.CalculateTaskDelayWithOffset(tl.cliCtx, 1); isCurrentValidator {
tl.sendTaskWithDelay("sendUnstakeInitToHeimdall", selectedEvent.Name, logBytes, delay)
}

Expand All @@ -264,7 +275,7 @@ func (tl *TronListener) queryAndBroadcastEvents(chainManagerParams *chainmanager
}
if bytes.Equal(event.User.Bytes(), helper.GetAddress()) {
tl.sendTaskWithDelay("sendTopUpFeeToHeimdall", selectedEvent.Name, logBytes, 0)
} else if isCurrentValidator, delay := util.CalculateTaskDelayWithOffset(tl.cliCtx,1); isCurrentValidator {
} else if isCurrentValidator, delay := util.CalculateTaskDelayWithOffset(tl.cliCtx, 1); isCurrentValidator {
tl.sendTaskWithDelay("sendTopUpFeeToHeimdall", selectedEvent.Name, logBytes, delay)
}

Expand All @@ -280,7 +291,7 @@ func (tl *TronListener) queryAndBroadcastEvents(chainManagerParams *chainmanager
}
if util.IsEventSender(tl.cliCtx, event.ValidatorId.Uint64()) {
tl.sendTaskWithDelay("sendUnjailToHeimdall", selectedEvent.Name, logBytes, 0)
} else if isCurrentValidator, delay := util.CalculateTaskDelayWithOffset(tl.cliCtx,1); isCurrentValidator {
} else if isCurrentValidator, delay := util.CalculateTaskDelayWithOffset(tl.cliCtx, 1); isCurrentValidator {
tl.sendTaskWithDelay("sendUnjailToHeimdall", selectedEvent.Name, logBytes, delay)
}

Expand Down
5 changes: 1 addition & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
module github.com/maticnetwork/heimdall

go 1.12
go 1.16

require (
github.com/RichardKnop/machinery v1.10.6
github.com/allegro/bigcache v1.2.1 // indirect
github.com/aristanetworks/goarista v0.0.0-20191206003309-5d8d36c240c9 // indirect
github.com/btcsuite/btcd v0.20.1-beta // indirect
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect
github.com/cbergoon/merkletree v0.2.0
github.com/cespare/cp v1.1.1 // indirect
github.com/cosmos/cosmos-sdk v0.37.4
github.com/deckarep/golang-set v1.7.1 // indirect
github.com/docker/docker v1.13.1 // indirect
github.com/edsrzf/mmap-go v1.0.0 // indirect
github.com/elastic/gosigar v0.10.5 // indirect
github.com/fatih/color v1.9.0 // indirect
Expand All @@ -22,7 +20,6 @@ require (
github.com/gogo/protobuf v1.3.0
github.com/golang/protobuf v1.5.0
github.com/gorilla/mux v1.7.3
github.com/graph-gophers/graphql-go v0.0.0-20200207002730-8334863f2c8b // indirect
github.com/hashicorp/golang-lru v0.5.3
github.com/huin/goupnp v1.0.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
Expand Down
Loading

0 comments on commit 65885b4

Please sign in to comment.