Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[P2P] refactor: peerstore provider (part 1) #804

Merged
merged 42 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
34d42bf
refactor: move `persistencePeerstoreProvider` & implement `#GetUnstak…
bryanchriswhite Jun 5, 2023
142071d
feat: add `p2pPeerstoreProvider`
bryanchriswhite Jun 5, 2023
bcf3f65
refactor: only embed IntegratableModule in PeerstoreProvider
bryanchriswhite Jun 5, 2023
00730d4
refactor: remove unused `PeerstoreProvider#GetP2PConfig()` method
bryanchriswhite Jun 5, 2023
41d18ae
feat: add `PeerstoreProvider#GetUnstakedPeerstore()` interface method
bryanchriswhite Jun 5, 2023
507d8af
chore: implement `rpcPeerstoreProvider#GetUnstakedPeerstore()`
bryanchriswhite Jun 5, 2023
e74fea4
chore: add TECHDEBT comment
bryanchriswhite Jun 5, 2023
6128d84
chore: add `Factory` generic type
bryanchriswhite Jun 5, 2023
4bc605a
chore: update comments
bryanchriswhite Jun 5, 2023
094a95e
chore: update changelogs
bryanchriswhite Jun 5, 2023
2a5b300
chore: update changelogs
bryanchriswhite Jun 6, 2023
1d03f38
empty commit
bryanchriswhite Jun 6, 2023
c1318da
chore: replace empty interfaces with `any`
bryanchriswhite Jun 6, 2023
8d1f62a
chore: edit comment
bryanchriswhite Jun 6, 2023
08bcafd
chore: remove unused `GetP2PConfig()` method
bryanchriswhite Jun 7, 2023
5a54400
chore: add godoc comments
bryanchriswhite Jun 7, 2023
5000ad8
fix: retrieve p2p mdoule from bus on each call
bryanchriswhite Jun 7, 2023
c587afa
refactor: persistence peerstor provider
bryanchriswhite Jun 7, 2023
e70b99c
refactor: embed `p2pPStoreProviderFactory`
bryanchriswhite Jun 7, 2023
b3ce271
chore: oneline function signature
bryanchriswhite Jun 7, 2023
8a476a4
refactor: consolidate p2pPeerstoreProvider into persistencePeerstorPr…
bryanchriswhite Jun 7, 2023
650d432
refactor: rename persistence.go back to provider.go
bryanchriswhite Jun 7, 2023
59fffb5
refactor: `p2pPeerstoreProvider` to a single function'
bryanchriswhite Jun 7, 2023
a1e22c4
refactor: update peerstore provider method receivers
bryanchriswhite Jun 7, 2023
ccd9fb7
refactor: re-implement `GetUnstakedPeerstore`
bryanchriswhite Jun 7, 2023
6e1a5b8
chore: add TECHDEBT comments
bryanchriswhite Jun 8, 2023
60302a4
chore: add issue numbers to TECHDEBT comments
bryanchriswhite Jun 8, 2023
1ac2e00
chore: improve comment
bryanchriswhite Jun 8, 2023
3a8e138
refactor: rename `T` & `K` type params to `M` &`C`
bryanchriswhite Jun 8, 2023
0cbca43
chore: update changelogs
bryanchriswhite Jun 8, 2023
9cee9dd
fix: bugs
bryanchriswhite Jun 8, 2023
2489b92
Merge remote-tracking branch 'pokt/main' into refactor/peerstore-prov…
bryanchriswhite Jun 8, 2023
c98e8d1
chore: add TECHDEBT comments
bryanchriswhite Jun 12, 2023
84ce1bd
chore: combine `NewRPCPeerstoreProvider()` & `Create()`
bryanchriswhite Jun 12, 2023
341d982
refactor: rename `NewPersistencePeerstoreProvider()` to `Create()`
bryanchriswhite Jun 12, 2023
a9ffa56
chore: persistence peerstore provider includes all staked actors
bryanchriswhite Jun 12, 2023
0d92a09
chore: fix consensus test
bryanchriswhite Jun 12, 2023
024d88b
fix: p2p test
bryanchriswhite Jun 12, 2023
22ff0fd
chore: update changelogs
bryanchriswhite Jun 12, 2023
044f6f1
revert: `readCtx.GetAllStakedActors()`
bryanchriswhite Jun 13, 2023
b273e87
chore: update changelogs
bryanchriswhite Jun 13, 2023
2e4d60d
chore: fix nit
bryanchriswhite Jun 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/client/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func persistentPreRun(cmd *cobra.Command, _ []string) {
func setupPeerstoreProvider(rm runtime.Manager, rpcURL string) {
bus := rm.GetBus()
modulesRegistry := bus.GetModulesRegistry()
pstoreProvider := rpcABP.NewRPCPeerstoreProvider(
pstoreProvider := rpcABP.Create(
rpcABP.WithP2PConfig(rm.GetConfig().P2P),
rpcABP.WithCustomRPCURL(rpcURL),
)
Expand Down
4 changes: 4 additions & 0 deletions app/client/doc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.0.0.33] - 2023-06-13

- Renamed `NewRPCPeerstoreProvider()` and `NewPersistencePeerstoreProvider()` to `Create()` (per package)

## [0.0.0.32] - 2023-05-25

- Add the `nonInteractive` flag in a couple spots where it was missing
Expand Down
4 changes: 4 additions & 0 deletions consensus/doc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.0.0.54] - 2023-06-13

- Fix tests

## [0.0.0.53] - 2023-06-08

- Add consensus README
Expand Down
16 changes: 16 additions & 0 deletions consensus/e2e_tests/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (

"github.com/benbjohnson/clock"
"github.com/golang/mock/gomock"

"github.com/pokt-network/pocket/consensus"
typesCons "github.com/pokt-network/pocket/consensus/types"
"github.com/pokt-network/pocket/internal/testutil"
persistenceMocks "github.com/pokt-network/pocket/persistence/types/mocks"
"github.com/pokt-network/pocket/runtime"
"github.com/pokt-network/pocket/runtime/configs"
Expand Down Expand Up @@ -432,6 +434,20 @@ func basePersistenceMock(t *testing.T, _ modules.EventsChannel, bus modules.Bus)
Return(bus.GetRuntimeMgr().GetGenesis().Validators, nil).
AnyTimes()

persistenceReadContextMock.
EXPECT().
GetAllStakedActors(gomock.Any()).
DoAndReturn(func(height int64) ([]*coreTypes.Actor, error) {
genesisState := bus.GetRuntimeMgr().GetGenesis()
return testutil.Concatenate[*coreTypes.Actor](
genesisState.Validators,
genesisState.Servicers,
genesisState.Fishermen,
genesisState.Applications,
), nil
}).
AnyTimes()

persistenceReadContextMock.
EXPECT().
GetBlockHash(gomock.Any()).
Expand Down
10 changes: 10 additions & 0 deletions internal/testutil/slice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package testutil

// Concatenate appends the contents of multiple slices of any type (T) into a
// single slice of type T.
func Concatenate[T any](tt ...[]T) (result []T) {
for _, t := range tt {
result = append(result, t...)
}
return result
}
8 changes: 8 additions & 0 deletions p2p/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.0.0.54] - 2023-06-13

- Replaced embedded `modules.Module` with simpler `modules.IntegratableModule` in `PeerstoreProvider` interface
- Removed unused `PeerstoreProvider#GetP2PConfig()` method
- Added `PeerstoreProvider#GetUnstakedPeerstore()` method
- Added temporary `unstakedPeerstoreProvider` interface
- Renamed `NewRPCPeerstoreProvider()` and `NewPersistencePeerstoreProvider()` to `Create()` (per package)

## [0.0.0.53] - 2023-06-01

- Moved nonce field from RainTreeMessage to PocketEnvelope protobuf types
Expand Down
2 changes: 1 addition & 1 deletion p2p/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (m *p2pModule) bootstrap() error {
continue
}

pstoreProvider := rpcABP.NewRPCPeerstoreProvider(
pstoreProvider := rpcABP.Create(
rpcABP.WithP2PConfig(
m.GetBus().GetRuntimeMgr().GetConfig().P2P,
),
Expand Down
21 changes: 15 additions & 6 deletions p2p/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ package p2p
import (
"errors"
"fmt"

"github.com/libp2p/go-libp2p"
libp2pHost "github.com/libp2p/go-libp2p/core/host"
"github.com/multiformats/go-multiaddr"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"github.com/pokt-network/pocket/logger"
"github.com/pokt-network/pocket/p2p/config"
"github.com/pokt-network/pocket/p2p/providers"
"github.com/pokt-network/pocket/p2p/providers/current_height_provider"
"github.com/pokt-network/pocket/p2p/providers/peerstore_provider"
persABP "github.com/pokt-network/pocket/p2p/providers/peerstore_provider/persistence"
persPSP "github.com/pokt-network/pocket/p2p/providers/peerstore_provider/persistence"
"github.com/pokt-network/pocket/p2p/raintree"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/p2p/utils"
Expand All @@ -24,8 +28,6 @@ import (
"github.com/pokt-network/pocket/shared/modules"
"github.com/pokt-network/pocket/shared/modules/base_modules"
"github.com/pokt-network/pocket/telemetry"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)

var _ modules.P2PModule = &p2pModule{}
Expand Down Expand Up @@ -231,14 +233,21 @@ func (m *p2pModule) setupDependencies() error {
// bus, if one is registered, otherwise returns a new `persistencePeerstoreProvider`.
func (m *p2pModule) setupPeerstoreProvider() error {
m.logger.Debug().Msg("setupPeerstoreProvider")

// TECHDEBT(#810): simplify once submodules are more convenient to retrieve.
pstoreProviderModule, err := m.GetBus().GetModulesRegistry().GetModule(peerstore_provider.ModuleName)
if err != nil {
m.logger.Debug().Msg("creating new persistence peerstore...")
pstoreProviderModule = persABP.NewPersistencePeerstoreProvider(m.GetBus())
} else if pstoreProviderModule != nil {
m.logger.Debug().Msg("loaded persistence peerstore...")
pstoreProvider, err := persPSP.Create(m.GetBus())
if err != nil {
return err
}

m.pstoreProvider = pstoreProvider
return nil
}

m.logger.Debug().Msg("loaded persistence peerstore...")
pstoreProvider, ok := pstoreProviderModule.(providers.PeerstoreProvider)
if !ok {
return fmt.Errorf("unknown peerstore provider type: %T", pstoreProviderModule)
Expand Down
10 changes: 7 additions & 3 deletions p2p/providers/peerstore_provider/peerstore_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package peerstore_provider
import (
"github.com/pokt-network/pocket/logger"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/runtime/configs"
coreTypes "github.com/pokt-network/pocket/shared/core/types"
cryptoPocket "github.com/pokt-network/pocket/shared/crypto"
"github.com/pokt-network/pocket/shared/modules"
Expand All @@ -16,10 +15,15 @@ const ModuleName = "peerstore_provider"

// PeerstoreProvider is an interface that provides Peerstore accessors
type PeerstoreProvider interface {
modules.Module
modules.IntegratableModule

// GetStakedPeerstoreAtHeight returns a peerstore containing all staked peers
// at a given height. These peers communicate via the p2p module's staked actor
// router.
GetStakedPeerstoreAtHeight(height uint64) (typesP2P.Peerstore, error)
GetP2PConfig() *configs.P2PConfig
// GetUnstakedPeerstore returns a peerstore containing all peers which
// communicate via the p2p module's unstaked actor router.
GetUnstakedPeerstore() (typesP2P.Peerstore, error)
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
}

func ActorsToPeerstore(abp PeerstoreProvider, actors []*coreTypes.Actor) (pstore typesP2P.Peerstore, errs error) {
Expand Down
40 changes: 22 additions & 18 deletions p2p/providers/peerstore_provider/persistence/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,28 @@ package persistence
import (
"github.com/pokt-network/pocket/p2p/providers/peerstore_provider"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/runtime/configs"
"github.com/pokt-network/pocket/shared/modules"
"github.com/pokt-network/pocket/shared/modules/base_modules"
)

var _ peerstore_provider.PeerstoreProvider = &persistencePeerstoreProvider{}
var (
_ peerstore_provider.PeerstoreProvider = &persistencePeerstoreProvider{}
_ persistencePStoreProviderFactory = &persistencePeerstoreProvider{}
)

type persistencePStoreProviderOption func(*persistencePeerstoreProvider)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the topic of "not being afraid to change everything", take into consideration that maybe "providers" aren't the best approach with our bus based system.

Not making a comment on removing/extending them, but just sharing that it is not set in stone so don't feel constrained.

type persistencePStoreProviderFactory = modules.FactoryWithOptions[peerstore_provider.PeerstoreProvider, persistencePStoreProviderOption]

// TECHDEBT(#810): refactor to implement `Submodule` interface.
type persistencePeerstoreProvider struct {
base_modules.IntegratableModule
base_modules.InterruptableModule
}

func NewPersistencePeerstoreProvider(bus modules.Bus, options ...func(*persistencePeerstoreProvider)) *persistencePeerstoreProvider {
func Create(bus modules.Bus, options ...persistencePStoreProviderOption) (peerstore_provider.PeerstoreProvider, error) {
return new(persistencePeerstoreProvider).Create(bus, options...)
}

func (*persistencePeerstoreProvider) Create(bus modules.Bus, options ...persistencePStoreProviderOption) (peerstore_provider.PeerstoreProvider, error) {
pabp := &persistencePeerstoreProvider{
IntegratableModule: *base_modules.NewIntegratableModule(bus),
}
Expand All @@ -24,35 +33,30 @@ func NewPersistencePeerstoreProvider(bus modules.Bus, options ...func(*persisten
o(pabp)
}

return pabp
}

func Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
return new(persistencePeerstoreProvider).Create(bus, options...)
}

func (*persistencePeerstoreProvider) Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
return NewPersistencePeerstoreProvider(bus), nil
return pabp, nil
}

func (*persistencePeerstoreProvider) GetModuleName() string {
return peerstore_provider.ModuleName
}

func (pabp *persistencePeerstoreProvider) GetStakedPeerstoreAtHeight(height uint64) (typesP2P.Peerstore, error) {
readCtx, err := pabp.GetBus().GetPersistenceModule().NewReadContext(int64(height))
// GetStakedPeerstoreAtHeight implements the respective `PeerstoreProvider` interface method.
func (persistencePSP *persistencePeerstoreProvider) GetStakedPeerstoreAtHeight(height uint64) (typesP2P.Peerstore, error) {
readCtx, err := persistencePSP.GetBus().GetPersistenceModule().NewReadContext(int64(height))
if err != nil {
return nil, err
}
defer readCtx.Release()

// TECHDEBT(#818): consider all staked actors, not just validators.
validators, err := readCtx.GetAllValidators(int64(height))
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
return peerstore_provider.ActorsToPeerstore(pabp, validators)
return peerstore_provider.ActorsToPeerstore(persistencePSP, validators)
}

func (pabp *persistencePeerstoreProvider) GetP2PConfig() *configs.P2PConfig {
return pabp.GetBus().GetRuntimeMgr().GetConfig().P2P
// GetStakedPeerstoreAtHeight implements the respective `PeerstoreProvider` interface method.
func (persistencePSP *persistencePeerstoreProvider) GetUnstakedPeerstore() (typesP2P.Peerstore, error) {
return peerstore_provider.GetUnstakedPeerstore(persistencePSP.GetBus())
}
30 changes: 13 additions & 17 deletions p2p/providers/peerstore_provider/rpc/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ func init() {
rpcHost = runtime.GetEnv("RPC_HOST", defaults.DefaultRPCHost)
}

// TECHDEBT(#810): refactor to implement `Submodule` interface.
type rpcPeerstoreProvider struct {
// TECHDEBT(#810): simplify once submodules are more convenient to retrieve.
base_modules.IntegratableModule
base_modules.InterruptableModule

Expand All @@ -37,7 +39,7 @@ type rpcPeerstoreProvider struct {
rpcClient *rpc.ClientWithResponses
}

func NewRPCPeerstoreProvider(options ...modules.ModuleOption) *rpcPeerstoreProvider {
func Create(options ...modules.ModuleOption) *rpcPeerstoreProvider {
rabp := &rpcPeerstoreProvider{
rpcURL: fmt.Sprintf("http://%s:%s", rpcHost, defaults.DefaultRPCPort), // TODO: Make port configurable
}
Expand All @@ -51,27 +53,24 @@ func NewRPCPeerstoreProvider(options ...modules.ModuleOption) *rpcPeerstoreProvi
return rabp
}

func Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
return new(rpcPeerstoreProvider).Create(bus, options...)
}

// TECHDEBT(#810): refactor to implement `Submodule` interface.
func (*rpcPeerstoreProvider) Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
return NewRPCPeerstoreProvider(options...), nil
return Create(options...), nil
}

func (*rpcPeerstoreProvider) GetModuleName() string {
return peerstore_provider.ModuleName
}

func (rabp *rpcPeerstoreProvider) GetStakedPeerstoreAtHeight(height uint64) (typesP2P.Peerstore, error) {
func (rpcPSP *rpcPeerstoreProvider) GetStakedPeerstoreAtHeight(height uint64) (typesP2P.Peerstore, error) {
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()

var (
h int64 = int64(height)
actorType rpc.ActorTypesEnum = "validator"
)
response, err := rabp.rpcClient.GetV1P2pStakedActorsAddressBookWithResponse(ctx, &rpc.GetV1P2pStakedActorsAddressBookParams{Height: &h, ActorType: &actorType})
response, err := rpcPSP.rpcClient.GetV1P2pStakedActorsAddressBookWithResponse(ctx, &rpc.GetV1P2pStakedActorsAddressBookParams{Height: &h, ActorType: &actorType})
if err != nil {
return nil, err
}
Expand All @@ -91,22 +90,19 @@ func (rabp *rpcPeerstoreProvider) GetStakedPeerstoreAtHeight(height uint64) (typ
})
}

return peerstore_provider.ActorsToPeerstore(rabp, coreActors)
return peerstore_provider.ActorsToPeerstore(rpcPSP, coreActors)
}

func (rabp *rpcPeerstoreProvider) GetP2PConfig() *configs.P2PConfig {
if rabp.p2pCfg == nil {
return rabp.GetBus().GetRuntimeMgr().GetConfig().P2P
}
return rabp.p2pCfg
func (rpcPSP *rpcPeerstoreProvider) GetUnstakedPeerstore() (typesP2P.Peerstore, error) {
return peerstore_provider.GetUnstakedPeerstore(rpcPSP.GetBus())
}

func (rabp *rpcPeerstoreProvider) initRPCClient() {
rpcClient, err := rpc.NewClientWithResponses(rabp.rpcURL)
func (rpcPSP *rpcPeerstoreProvider) initRPCClient() {
rpcClient, err := rpc.NewClientWithResponses(rpcPSP.rpcURL)
if err != nil {
log.Fatalf("could not create RPC client: %v", err)
}
rabp.rpcClient = rpcClient
rpcPSP.rpcClient = rpcClient
}

// options
Expand Down
34 changes: 34 additions & 0 deletions p2p/providers/peerstore_provider/unstaked.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package peerstore_provider

import (
"fmt"

typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/shared/modules"
)

// unstakedPeerstoreProvider is an interface which the p2p module supports in
// order to allow access to the unstaked-actor-router's peerstore.
//
// NB: this peerstore includes all actors which participate in P2P (e.g. full
// and light clients but also validators, servicers, etc.).
//
// TECHDEBT(#811): will become unnecessary after `modules.P2PModule#GetUnstakedPeerstore` is added.`
// CONSIDERATION: split `PeerstoreProvider` into `StakedPeerstoreProvider` and `UnstakedPeerstoreProvider`.
// (see: https://github.com/pokt-network/pocket/pull/804#issuecomment-1576531916)
type unstakedPeerstoreProvider interface {
GetUnstakedPeerstore() (typesP2P.Peerstore, error)
}

func GetUnstakedPeerstore(bus modules.Bus) (typesP2P.Peerstore, error) {
p2pModule := bus.GetP2PModule()
if p2pModule == nil {
return nil, fmt.Errorf("p2p module is not registered to bus and is required")
}

unstakedPSP, ok := p2pModule.(unstakedPeerstoreProvider)
if !ok {
return nil, fmt.Errorf("p2p module does not implement unstakedPeerstoreProvider")
}
return unstakedPSP.GetUnstakedPeerstore()
}
Loading