Skip to content

Commit

Permalink
refactor: rainTreeRouter as submodule
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanchriswhite committed Jul 11, 2023
1 parent a1e8cc1 commit 7e600c6
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 89 deletions.
16 changes: 6 additions & 10 deletions p2p/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,9 @@ type BackgroundConfig struct {

// RainTreeConfig implements `RouterConfig` for use with `RainTreeRouter`.
type RainTreeConfig struct {
Host host.Host
Addr crypto.Address
CurrentHeightProvider providers.CurrentHeightProvider
PeerstoreProvider providers.PeerstoreProvider
Handler func(data []byte) error
Host host.Host
Addr crypto.Address
Handler func(data []byte) error
}

// IsValid implements the respective member of the `RouterConfig` interface.
Expand Down Expand Up @@ -119,11 +117,9 @@ func (cfg *BackgroundConfig) IsValid() error {
// IsValid implements the respective member of the `RouterConfig` interface.
func (cfg *RainTreeConfig) IsValid() error {
baseCfg := baseConfig{
Host: cfg.Host,
Addr: cfg.Addr,
CurrentHeightProvider: cfg.CurrentHeightProvider,
PeerstoreProvider: cfg.PeerstoreProvider,
Handler: cfg.Handler,
Host: cfg.Host,
Addr: cfg.Addr,
Handler: cfg.Handler,
}
return baseCfg.IsValid()
}
10 changes: 4 additions & 6 deletions p2p/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,14 +397,12 @@ func (m *p2pModule) setupStakedRouter() (err error) {
}

m.logger.Debug().Msg("setting up staked actor router")
m.stakedActorRouter, err = raintree.NewRainTreeRouter(
m.stakedActorRouter, err = raintree.Create(
m.GetBus(),
&config.RainTreeConfig{
Addr: m.address,
CurrentHeightProvider: m.currentHeightProvider,
PeerstoreProvider: m.pstoreProvider,
Host: m.host,
Handler: m.handlePocketEnvelope,
Addr: m.address,
Host: m.host,
Handler: m.handlePocketEnvelope,
},
)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions p2p/module_raintree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,14 @@ func testRainTreeCalls(t *testing.T, origNode string, networkSimulationConfig Te

persistenceMock := preparePersistenceMock(t, busMocks[i], genesisMock)
consensusMock := prepareConsensusMock(t, busMocks[i])
currentHeightProviderMock := prepareCurrentHeightProviderMock(t, busMocks[i])

busMocks[i].RegisterModule(currentHeightProviderMock)
busMocks[i].EXPECT().
GetCurrentHeightProvider().
Return(currentHeightProviderMock).
AnyTimes()

telemetryMock := prepareTelemetryMock(t, busMocks[i], valId, &readWriteWaitGroup, expectedWrites)

prepareBusMock(busMocks[i], persistenceMock, consensusMock, telemetryMock)
Expand Down
58 changes: 33 additions & 25 deletions p2p/raintree/peers_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
"github.com/pokt-network/pocket/p2p/config"
typesP2P "github.com/pokt-network/pocket/p2p/types"
mocksP2P "github.com/pokt-network/pocket/p2p/types/mocks"
"github.com/pokt-network/pocket/runtime"
"github.com/pokt-network/pocket/runtime/configs"
cryptoPocket "github.com/pokt-network/pocket/shared/crypto"
"github.com/pokt-network/pocket/shared/modules"
mockModules "github.com/pokt-network/pocket/shared/modules/mocks"
)

Expand Down Expand Up @@ -93,22 +95,21 @@ func TestRainTree_Peerstore_HandleUpdate(t *testing.T) {
})
require.NoError(t, err)

mockBus := mockBus(ctrl)
pstoreProviderMock := mockPeerstoreProvider(ctrl, pstore)
currentHeightProviderMock := mockCurrentHeightProvider(ctrl, 0)
mockBus := mockBus(ctrl, pstore)
mockBus.EXPECT().RegisterModule(gomock.Any()).DoAndReturn(func(m modules.Submodule) {
m.SetBus(mockBus)
}).AnyTimes()

libp2pMockNet, err := mocknet.WithNPeers(1)
require.NoError(t, err)

rtCfg := &config.RainTreeConfig{
Host: libp2pMockNet.Hosts()[0],
Addr: pubKey.Address(),
PeerstoreProvider: pstoreProviderMock,
CurrentHeightProvider: currentHeightProviderMock,
Handler: noopHandler,
Host: libp2pMockNet.Hosts()[0],
Addr: pubKey.Address(),
Handler: noopHandler,
}

router, err := NewRainTreeRouter(mockBus, rtCfg)
router, err := Create(mockBus, rtCfg)
require.NoError(t, err)

rainTree := router.(*rainTreeRouter)
Expand Down Expand Up @@ -142,7 +143,7 @@ func BenchmarkPeerstoreUpdates(b *testing.B) {
// {1000000000, 19},
}

// the test will add this arbitrary number of addresses after the initial initialization (done via NewRainTreeRouter)
// the test will add this arbitrary number of addresses after the initial initialization (done via Create)
// this is to add extra subsequent work that -should- grow linearly and it's actually going to test AddressBook updates
// not simply initializations.
numAddressesToBeAdded := 1000
Expand All @@ -158,9 +159,7 @@ func BenchmarkPeerstoreUpdates(b *testing.B) {
})
require.NoError(b, err)

mockBus := mockBus(ctrl)
pstoreProviderMock := mockPeerstoreProvider(ctrl, pstore)
currentHeightProviderMock := mockCurrentHeightProvider(ctrl, 0)
mockBus := mockBus(ctrl, pstore)

libp2pPStore, err := pstoremem.NewPeerstore()
require.NoError(b, err)
Expand All @@ -169,14 +168,12 @@ func BenchmarkPeerstoreUpdates(b *testing.B) {
hostMock.EXPECT().Peerstore().Return(libp2pPStore).AnyTimes()

rtCfg := &config.RainTreeConfig{
Host: hostMock,
Addr: pubKey.Address(),
PeerstoreProvider: pstoreProviderMock,
CurrentHeightProvider: currentHeightProviderMock,
Handler: noopHandler,
Host: hostMock,
Addr: pubKey.Address(),
Handler: noopHandler,
}

router, err := NewRainTreeRouter(mockBus, rtCfg)
router, err := Create(mockBus, rtCfg)
require.NoError(b, err)

rainTree := router.(*rainTreeRouter)
Expand Down Expand Up @@ -272,7 +269,14 @@ func TestRainTree_MessageTargets_TwentySevenNodes(t *testing.T) {

func testRainTreeMessageTargets(t *testing.T, expectedMsgProp *ExpectedRainTreeMessageProp) {
ctrl := gomock.NewController(t)
modulesRegistry := runtime.NewModulesRegistry()
busMock := mockModules.NewMockBus(ctrl)
busMock.EXPECT().GetModulesRegistry().Return(modulesRegistry).AnyTimes()
busMock.EXPECT().RegisterModule(gomock.Any()).Do(func(arg modules.Submodule) {
module := arg.(modules.Submodule)
modulesRegistry.RegisterModule(module)
module.SetBus(busMock)
}).AnyTimes()
consensusMock := mockModules.NewMockConsensusModule(ctrl)
consensusMock.EXPECT().CurrentHeight().Return(uint64(1)).AnyTimes()
busMock.EXPECT().GetConsensusModule().Return(consensusMock).AnyTimes()
Expand All @@ -283,10 +287,16 @@ func testRainTreeMessageTargets(t *testing.T, expectedMsgProp *ExpectedRainTreeM
runtimeMgrMock.EXPECT().GetConfig().Return(configs.NewDefaultConfig()).AnyTimes()

mockAlphabetValidatorServiceURLsDNS(t)

// TECHDEBT(#810): simplify once `bus.GetPeerstoreProvider()` is available.
pstore := getAlphabetPeerstore(t, expectedMsgProp.numNodes)
pstoreProviderMock := mockPeerstoreProvider(ctrl, pstore)
busMock.RegisterModule(pstoreProviderMock)

currentHeightProviderMock := mockCurrentHeightProvider(ctrl, 1)

busMock.EXPECT().GetCurrentHeightProvider().Return(currentHeightProviderMock).AnyTimes()

libp2pPStore, err := pstoremem.NewPeerstore()
require.NoError(t, err)

Expand All @@ -296,14 +306,12 @@ func testRainTreeMessageTargets(t *testing.T, expectedMsgProp *ExpectedRainTreeM
hostMock.EXPECT().ID().Return(libp2pPeer.ID("")).AnyTimes()

rtCfg := &config.RainTreeConfig{
Host: hostMock,
Addr: []byte{expectedMsgProp.orig},
PeerstoreProvider: pstoreProviderMock,
CurrentHeightProvider: currentHeightProviderMock,
Handler: noopHandler,
Host: hostMock,
Addr: []byte{expectedMsgProp.orig},
Handler: noopHandler,
}

router, err := NewRainTreeRouter(busMock, rtCfg)
router, err := Create(busMock, rtCfg)
require.NoError(t, err)
rainTree := router.(*rainTreeRouter)

Expand Down
45 changes: 28 additions & 17 deletions p2p/raintree/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type rainTreeRouter struct {
currentHeightProvider modules.CurrentHeightProvider
}

func NewRainTreeRouter(bus modules.Bus, cfg *config.RainTreeConfig) (typesP2P.Router, error) {
func Create(bus modules.Bus, cfg *config.RainTreeConfig) (typesP2P.Router, error) {
return new(rainTreeRouter).Create(bus, cfg)
}

Expand All @@ -59,17 +59,29 @@ func (*rainTreeRouter) Create(bus modules.Bus, cfg *config.RainTreeConfig) (type
}

rtr := &rainTreeRouter{
host: cfg.Host,
selfAddr: cfg.Addr,
pstoreProvider: cfg.PeerstoreProvider,
currentHeightProvider: cfg.CurrentHeightProvider,
logger: rainTreeLogger,
handler: cfg.Handler,
host: cfg.Host,
selfAddr: cfg.Addr,
logger: rainTreeLogger,
handler: cfg.Handler,
}
rtr.SetBus(bus)
bus.RegisterModule(rtr)

height := rtr.currentHeightProvider.CurrentHeight()
pstore, err := rtr.pstoreProvider.GetStakedPeerstoreAtHeight(height)
currentHeightProvider := bus.GetCurrentHeightProvider()
// TECHDEBT(#810, 811): 🙄 cleanup; avoid holding a reference
rtr.currentHeightProvider = currentHeightProvider
// TECHDEBT(#810, 811): use `bus.GetPeerstoreProvider()` once available.
pstoreProviderModule, err := bus.GetModulesRegistry().GetModule(peerstore_provider.PeerstoreProviderSubmoduleName)
if err != nil {
return nil, err
}

pstoreProvider, ok := pstoreProviderModule.(peerstore_provider.PeerstoreProvider)
if !ok {
return nil, fmt.Errorf("unexpected peerstore provider module type: %T", pstoreProviderModule)
}

height := currentHeightProvider.CurrentHeight()
pstore, err := pstoreProvider.GetStakedPeerstoreAtHeight(height)
if err != nil {
return nil, fmt.Errorf("getting staked peerstore at height %d: %w", height, err)
}
Expand All @@ -81,7 +93,7 @@ func (*rainTreeRouter) Create(bus modules.Bus, cfg *config.RainTreeConfig) (type
"peerstore_size": pstore.Size(),
}).Msg("initializing raintree router")

if err := rtr.setupDependencies(); err != nil {
if err := rtr.setupDependencies(pstore); err != nil {
return nil, err
}

Expand All @@ -92,6 +104,10 @@ func (rtr *rainTreeRouter) Close() error {
return nil
}

func (rtr *rainTreeRouter) GetModuleName() string {
return typesP2P.StakedActorRouterSubmoduleName
}

// NetworkBroadcast implements the respective member of `typesP2P.Router`.
func (rtr *rainTreeRouter) Broadcast(data []byte) error {
return rtr.broadcastAtLevel(data, rtr.peersManager.GetMaxNumLevels())
Expand Down Expand Up @@ -317,16 +333,11 @@ func (rtr *rainTreeRouter) setupUnicastRouter() error {
return nil
}

func (rtr *rainTreeRouter) setupDependencies() error {
func (rtr *rainTreeRouter) setupDependencies(pstore typesP2P.Peerstore) error {
if err := rtr.setupUnicastRouter(); err != nil {
return err
}

pstore, err := rtr.pstoreProvider.GetStakedPeerstoreAtHeight(rtr.currentHeightProvider.CurrentHeight())
if err != nil {
return fmt.Errorf("getting staked peerstore: %w", err)
}

if err := rtr.setupPeerManager(pstore); err != nil {
return fmt.Errorf("setting up peer manager: %w", err)
}
Expand Down
35 changes: 16 additions & 19 deletions p2p/raintree/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
libp2pCrypto "github.com/libp2p/go-libp2p/core/crypto"
libp2pHost "github.com/libp2p/go-libp2p/core/host"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"

"github.com/pokt-network/pocket/p2p/config"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/p2p/utils"
"github.com/pokt-network/pocket/runtime/defaults"
cryptoPocket "github.com/pokt-network/pocket/shared/crypto"
"github.com/stretchr/testify/require"
"github.com/pokt-network/pocket/shared/modules"
)

// TECHDEBT(#609): move & de-dup.
Expand Down Expand Up @@ -48,19 +50,18 @@ func TestRainTreeRouter_AddPeer(t *testing.T) {
require.NoError(t, err)
expectedPStoreSize++

busMock := mockBus(ctrl)
peerstoreProviderMock := mockPeerstoreProvider(ctrl, pstore)
currentHeightProviderMock := mockCurrentHeightProvider(ctrl, 0)
busMock := mockBus(ctrl, pstore)
busMock.EXPECT().RegisterModule(gomock.Any()).DoAndReturn(func(m modules.Submodule) {
m.SetBus(busMock)
}).AnyTimes()

rtCfg := &config.RainTreeConfig{
Host: host,
Addr: selfAddr,
PeerstoreProvider: peerstoreProviderMock,
CurrentHeightProvider: currentHeightProviderMock,
Handler: noopHandler,
Host: host,
Addr: selfAddr,
Handler: noopHandler,
}

router, err := NewRainTreeRouter(busMock, rtCfg)
router, err := Create(busMock, rtCfg)
require.NoError(t, err)

rtRouter := router.(*rainTreeRouter)
Expand Down Expand Up @@ -112,18 +113,14 @@ func TestRainTreeRouter_RemovePeer(t *testing.T) {
require.NoError(t, err)
expectedPStoreSize++

busMock := mockBus(ctrl)
peerstoreProviderMock := mockPeerstoreProvider(ctrl, pstore)
currentHeightProviderMock := mockCurrentHeightProvider(ctrl, 0)
busMock := mockBus(ctrl, pstore)
rtCfg := &config.RainTreeConfig{
Host: host,
Addr: selfAddr,
PeerstoreProvider: peerstoreProviderMock,
CurrentHeightProvider: currentHeightProviderMock,
Handler: noopHandler,
Host: host,
Addr: selfAddr,
Handler: noopHandler,
}

router, err := NewRainTreeRouter(busMock, rtCfg)
router, err := Create(busMock, rtCfg)
require.NoError(t, err)
rainTree := router.(*rainTreeRouter)

Expand Down
Loading

0 comments on commit 7e600c6

Please sign in to comment.