Skip to content

Commit

Permalink
Pausable uptime manager (#1372)
Browse files Browse the repository at this point in the history
* add validator state

* add pausable uptime manager

* remove stuttering name

* rename state listener

* Update plugin/evm/validators/state.go

Co-authored-by: Darioush Jalali <[email protected]>
Signed-off-by: Ceyhun Onur <[email protected]>

* use update enum

* Update plugin/evm/validators/state.go

Co-authored-by: Darioush Jalali <[email protected]>
Signed-off-by: Ceyhun Onur <[email protected]>

* Update plugin/evm/validators/state.go

Co-authored-by: Darioush Jalali <[email protected]>
Signed-off-by: Ceyhun Onur <[email protected]>

* respond to comments

* update avalanchego dep branch

* reviews

* reword errs

* fix test changes

* fix upgrades after deactivating latest in context

* use branch commit for ava version

* reviews

* add listener mock

* remove errs from resume and pause

* check after stopping

* use expectedTime in tests

* reviews

* fix requires

* underscore unused params

---------

Signed-off-by: Ceyhun Onur <[email protected]>
Co-authored-by: Darioush Jalali <[email protected]>
  • Loading branch information
ceyonur and darioush authored Nov 5, 2024
1 parent 951e875 commit 8a6df9e
Show file tree
Hide file tree
Showing 2 changed files with 385 additions and 0 deletions.
144 changes: 144 additions & 0 deletions plugin/evm/uptime/pausable_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package uptime

import (
"errors"

"github.com/ava-labs/subnet-evm/plugin/evm/validators"
"github.com/ethereum/go-ethereum/log"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/uptime"
"github.com/ava-labs/avalanchego/utils/set"
)

var _ validators.StateCallbackListener = &pausableManager{}

var errPausedDisconnect = errors.New("paused node cannot be disconnected")

type PausableManager interface {
uptime.Manager
validators.StateCallbackListener
IsPaused(nodeID ids.NodeID) bool
}

type pausableManager struct {
uptime.Manager
pausedVdrs set.Set[ids.NodeID]
// connectedVdrs is a set of nodes that are connected to the manager.
// This is used to immediately connect nodes when they are unpaused.
connectedVdrs set.Set[ids.NodeID]
}

// NewPausableManager takes an uptime.Manager and returns a PausableManager
func NewPausableManager(manager uptime.Manager) PausableManager {
return &pausableManager{
pausedVdrs: make(set.Set[ids.NodeID]),
connectedVdrs: make(set.Set[ids.NodeID]),
Manager: manager,
}
}

// Connect connects the node with the given ID to the uptime.Manager
// If the node is paused, it will not be connected
func (p *pausableManager) Connect(nodeID ids.NodeID) error {
p.connectedVdrs.Add(nodeID)
if !p.IsPaused(nodeID) && !p.Manager.IsConnected(nodeID) {
return p.Manager.Connect(nodeID)
}
return nil
}

// Disconnect disconnects the node with the given ID from the uptime.Manager
// If the node is paused, it will not be disconnected
// Invariant: we should never have a connected paused node that is disconnecting
func (p *pausableManager) Disconnect(nodeID ids.NodeID) error {
p.connectedVdrs.Remove(nodeID)
if p.Manager.IsConnected(nodeID) {
if p.IsPaused(nodeID) {
// We should never see this case
return errPausedDisconnect
}
return p.Manager.Disconnect(nodeID)
}
return nil
}

// StartTracking starts tracking uptime for the nodes with the given IDs
// If a node is paused, it will not be tracked
func (p *pausableManager) StartTracking(nodeIDs []ids.NodeID) error {
activeNodeIDs := make([]ids.NodeID, 0, len(nodeIDs))
for _, nodeID := range nodeIDs {
if !p.IsPaused(nodeID) {
activeNodeIDs = append(activeNodeIDs, nodeID)
}
}
return p.Manager.StartTracking(activeNodeIDs)
}

// OnValidatorAdded is called when a validator is added.
// If the node is inactive, it will be paused.
func (p *pausableManager) OnValidatorAdded(_ ids.ID, nodeID ids.NodeID, _ uint64, isActive bool) {
if !isActive {
err := p.pause(nodeID)
if err != nil {
log.Error("failed to handle added validator %s: %s", nodeID, err)
}
}
}

// OnValidatorRemoved is called when a validator is removed.
// If the node is already paused, it will be resumed.
func (p *pausableManager) OnValidatorRemoved(_ ids.ID, nodeID ids.NodeID) {
if p.IsPaused(nodeID) {
err := p.resume(nodeID)
if err != nil {
log.Error("failed to handle validator removed %s: %s", nodeID, err)
}
}
}

// OnValidatorStatusUpdated is called when the status of a validator is updated.
// If the node is active, it will be resumed. If the node is inactive, it will be paused.
func (p *pausableManager) OnValidatorStatusUpdated(_ ids.ID, nodeID ids.NodeID, isActive bool) {
var err error
if isActive {
err = p.resume(nodeID)
} else {
err = p.pause(nodeID)
}
if err != nil {
log.Error("failed to update status for node %s: %s", nodeID, err)
}
}

// IsPaused returns true if the node with the given ID is paused.
func (p *pausableManager) IsPaused(nodeID ids.NodeID) bool {
return p.pausedVdrs.Contains(nodeID)
}

// pause pauses uptime tracking for the node with the given ID
// pause can disconnect the node from the uptime.Manager if it is connected.
func (p *pausableManager) pause(nodeID ids.NodeID) error {
p.pausedVdrs.Add(nodeID)
if p.Manager.IsConnected(nodeID) {
// If the node is connected, then we need to disconnect it from
// manager
// This should be fine in case tracking has not started yet since
// the inner manager should handle disconnects accordingly
return p.Manager.Disconnect(nodeID)
}
return nil
}

// resume resumes uptime tracking for the node with the given ID
// resume can connect the node to the uptime.Manager if it was connected.
func (p *pausableManager) resume(nodeID ids.NodeID) error {
p.pausedVdrs.Remove(nodeID)
if p.connectedVdrs.Contains(nodeID) && !p.Manager.IsConnected(nodeID) {
return p.Manager.Connect(nodeID)
}
return nil
}
241 changes: 241 additions & 0 deletions plugin/evm/uptime/pausable_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package uptime

import (
"testing"
"time"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/uptime"
"github.com/ava-labs/avalanchego/utils/timer/mockable"
"github.com/stretchr/testify/require"
)

func TestPausableManager(t *testing.T) {
vID := ids.GenerateTestID()
nodeID0 := ids.GenerateTestNodeID()
startTime := time.Now()

tests := []struct {
name string
testFunc func(t *testing.T, up PausableManager, clk *mockable.Clock, s uptime.State)
}{
{
name: "Case 1: Connect, pause, start tracking",
testFunc: func(t *testing.T, up PausableManager, clk *mockable.Clock, s uptime.State) {
require := require.New(t)

// Connect before tracking
require.NoError(up.Connect(nodeID0))
addTime(clk, time.Second)

// Pause before tracking
up.OnValidatorStatusUpdated(vID, nodeID0, false)
require.True(up.IsPaused(nodeID0))

// Elapse Time
addTime(clk, time.Second)

// Start tracking
require.NoError(up.StartTracking([]ids.NodeID{nodeID0}))
currentTime := addTime(clk, time.Second)
// Uptime should not have increased since the node was paused
expectedUptime := 0 * time.Second
checkUptime(t, up, nodeID0, expectedUptime, currentTime)

// Disconnect
require.NoError(up.Disconnect(nodeID0))
// Uptime should not have increased
checkUptime(t, up, nodeID0, expectedUptime, currentTime)
},
},
{
name: "Case 2: Start tracking, connect, pause, re-connect, resume",
testFunc: func(t *testing.T, up PausableManager, clk *mockable.Clock, s uptime.State) {
require := require.New(t)

// Start tracking
require.NoError(up.StartTracking([]ids.NodeID{nodeID0}))

// Connect
addTime(clk, 1*time.Second)
require.NoError(up.Connect(nodeID0))

// Pause
addTime(clk, 1*time.Second)
up.OnValidatorStatusUpdated(vID, nodeID0, false)
require.True(up.IsPaused(nodeID0))

// Elapse time
currentTime := addTime(clk, 2*time.Second)
// Uptime should be 1 second since the node was paused after 1 sec
expectedUptime := 1 * time.Second
checkUptime(t, up, nodeID0, expectedUptime, currentTime)

// Disconnect and check uptime
currentTime = addTime(clk, 3*time.Second)
require.NoError(up.Disconnect(nodeID0))
// Uptime should not have increased since the node was paused
checkUptime(t, up, nodeID0, expectedUptime, currentTime)

// Connect again and check uptime
addTime(clk, 4*time.Second)
require.NoError(up.Connect(nodeID0))
currentTime = addTime(clk, 5*time.Second)
// Uptime should not have increased since the node was paused
checkUptime(t, up, nodeID0, expectedUptime, currentTime)

// Resume and check uptime
currentTime = addTime(clk, 6*time.Second)
up.OnValidatorStatusUpdated(vID, nodeID0, true)
require.False(up.IsPaused(nodeID0))
// Uptime should not have increased since the node was paused
// and we just resumed it
checkUptime(t, up, nodeID0, expectedUptime, currentTime)

// Elapsed time check
currentTime = addTime(clk, 7*time.Second)
// Uptime should increase by 7 seconds above since the node was resumed
expectedUptime += 7 * time.Second
checkUptime(t, up, nodeID0, expectedUptime, currentTime)
},
},
{
name: "Case 3: Pause, start tracking, connect, re-connect, resume",
testFunc: func(t *testing.T, up PausableManager, clk *mockable.Clock, s uptime.State) {
require := require.New(t)

// Pause before tracking
up.OnValidatorStatusUpdated(vID, nodeID0, false)
require.True(up.IsPaused(nodeID0))

// Start tracking
addTime(clk, time.Second)
require.NoError(up.StartTracking([]ids.NodeID{nodeID0}))

// Connect and check uptime
addTime(clk, 1*time.Second)
require.NoError(up.Connect(nodeID0))

currentTime := addTime(clk, 2*time.Second)
// Uptime should not have increased since the node was paused
expectedUptime := 0 * time.Second
checkUptime(t, up, nodeID0, expectedUptime, currentTime)

// Disconnect and check uptime
currentTime = addTime(clk, 3*time.Second)
require.NoError(up.Disconnect(nodeID0))
// Uptime should not have increased since the node was paused
checkUptime(t, up, nodeID0, expectedUptime, currentTime)

// Connect again and resume
addTime(clk, 4*time.Second)
require.NoError(up.Connect(nodeID0))
addTime(clk, 5*time.Second)
up.OnValidatorStatusUpdated(vID, nodeID0, true)
require.False(up.IsPaused(nodeID0))

// Check uptime after resume
currentTime = addTime(clk, 6*time.Second)
// Uptime should have increased by 6 seconds since the node was resumed
expectedUptime += 6 * time.Second
checkUptime(t, up, nodeID0, expectedUptime, currentTime)
},
},
{
name: "Case 4: Start tracking, connect, pause, stop tracking, resume tracking",
testFunc: func(t *testing.T, up PausableManager, clk *mockable.Clock, s uptime.State) {
require := require.New(t)

// Start tracking and connect
require.NoError(up.StartTracking([]ids.NodeID{nodeID0}))
addTime(clk, time.Second)
require.NoError(up.Connect(nodeID0))

// Pause and check uptime
currentTime := addTime(clk, 2*time.Second)
up.OnValidatorStatusUpdated(vID, nodeID0, false)
require.True(up.IsPaused(nodeID0))
// Uptime should be 2 seconds since the node was paused after 2 seconds
expectedUptime := 2 * time.Second

checkUptime(t, up, nodeID0, expectedUptime, currentTime)

// Stop tracking and reinitialize manager
currentTime = addTime(clk, 3*time.Second)
require.NoError(up.StopTracking([]ids.NodeID{nodeID0}))
checkUptime(t, up, nodeID0, expectedUptime, currentTime)
up = NewPausableManager(uptime.NewManager(s, clk))

// Uptime should not have increased since the node was paused
// and we have not started tracking again
checkUptime(t, up, nodeID0, expectedUptime, currentTime)

// Pause and check uptime
up.OnValidatorStatusUpdated(vID, nodeID0, false)
require.True(up.IsPaused(nodeID0))
// Uptime should not have increased since the node was paused
checkUptime(t, up, nodeID0, expectedUptime, currentTime)

// Resume and check uptime
currentTime = addTime(clk, 5*time.Second)
up.OnValidatorStatusUpdated(vID, nodeID0, true)
require.False(up.IsPaused(nodeID0))
// Uptime should have increased by 5 seconds since the node was resumed
expectedUptime += 5 * time.Second
checkUptime(t, up, nodeID0, expectedUptime, currentTime)

// Start tracking and check elapsed time
currentTime = addTime(clk, 6*time.Second)
require.NoError(up.StartTracking([]ids.NodeID{nodeID0}))
// Uptime should have increased by 6 seconds since we started tracking
// and node was resumed (we assume the node was online until we started tracking)
expectedUptime += 6 * time.Second
checkUptime(t, up, nodeID0, expectedUptime, currentTime)

// Elapsed time
currentTime = addTime(clk, 7*time.Second)
// Uptime should not have increased since the node was not connected
checkUptime(t, up, nodeID0, expectedUptime, currentTime)

// Connect and final uptime check
require.NoError(up.Connect(nodeID0))
currentTime = addTime(clk, 8*time.Second)
// Uptime should have increased by 8 seconds since the node was connected
expectedUptime += 8 * time.Second
checkUptime(t, up, nodeID0, expectedUptime, currentTime)
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
up, clk, s := setupTestEnv(nodeID0, startTime)
test.testFunc(t, up, clk, s)
})
}
}

func setupTestEnv(nodeID ids.NodeID, startTime time.Time) (PausableManager, *mockable.Clock, uptime.State) {
clk := mockable.Clock{}
clk.Set(startTime)
s := uptime.NewTestState()
s.AddNode(nodeID, startTime)
up := NewPausableManager(uptime.NewManager(s, &clk))
return up, &clk, s
}

func addTime(clk *mockable.Clock, duration time.Duration) time.Time {
clk.Set(clk.Time().Add(duration))
return clk.Time()
}

func checkUptime(t *testing.T, up PausableManager, nodeID ids.NodeID, expectedUptime time.Duration, expectedLastUpdate time.Time) {
t.Helper()
uptime, lastUpdated, err := up.CalculateUptime(nodeID)
require.NoError(t, err)
require.Equal(t, expectedLastUpdate.Unix(), lastUpdated.Unix())
require.Equal(t, expectedUptime, uptime)
}

0 comments on commit 8a6df9e

Please sign in to comment.