Skip to content

Commit

Permalink
feat(tracker): split preconf protocol for better organisation (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
aloknerurkar authored May 2, 2024
1 parent bfbd778 commit bd43a9b
Show file tree
Hide file tree
Showing 7 changed files with 871 additions and 299 deletions.
151 changes: 74 additions & 77 deletions p2p/pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/primevprotocol/mev-commit/p2p/pkg/p2p"
"github.com/primevprotocol/mev-commit/p2p/pkg/p2p/libp2p"
"github.com/primevprotocol/mev-commit/p2p/pkg/preconfirmation"
preconftracker "github.com/primevprotocol/mev-commit/p2p/pkg/preconfirmation/tracker"
bidderapi "github.com/primevprotocol/mev-commit/p2p/pkg/rpc/bidder"
providerapi "github.com/primevprotocol/mev-commit/p2p/pkg/rpc/provider"
"github.com/primevprotocol/mev-commit/p2p/pkg/signer"
Expand All @@ -47,7 +48,6 @@ import (
"github.com/primevprotocol/mev-commit/p2p/pkg/topology"
"github.com/primevprotocol/mev-commit/x/contracts/events"
"github.com/primevprotocol/mev-commit/x/contracts/events/publisher"
"github.com/primevprotocol/mev-commit/x/contracts/txmonitor"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -81,8 +81,8 @@ type Options struct {
}

type Node struct {
waitClose func()
closers []io.Closer
cancelFunc context.CancelFunc
closers []io.Closer
}

func NewNode(opts *Options) (*Node, error) {
Expand All @@ -93,11 +93,24 @@ func NewNode(opts *Options) (*Node, error) {
srv := apiserver.New(opts.Version, opts.Logger.With("component", "apiserver"))
peerType := p2p.FromString(opts.PeerType)

contractRPC, err := ethclient.Dial(opts.RPCEndpoint)
if err != nil {
opts.Logger.Error("failed to connect to rpc", "error", err)
return nil, err
var (
contractRPC *ethclient.Client
err error
)
if opts.WSRPCEndpoint != "" {
contractRPC, err = ethclient.Dial(opts.WSRPCEndpoint)
if err != nil {
opts.Logger.Error("failed to connect to ws rpc", "error", err)
return nil, err
}
} else {
contractRPC, err = ethclient.Dial(opts.RPCEndpoint)
if err != nil {
opts.Logger.Error("failed to connect to rpc", "error", err)
return nil, err
}
}

evmClient, err := evmclient.New(
opts.KeySigner,
evmclient.WrapEthClient(contractRPC),
Expand All @@ -110,24 +123,7 @@ func NewNode(opts *Options) (*Node, error) {
nd.closers = append(nd.closers, evmClient)
srv.MetricsRegistry().MustRegister(evmClient.Metrics()...)

wsRPC, err := ethclient.Dial(opts.WSRPCEndpoint)
if err != nil {
opts.Logger.Error("failed to connect to ws rpc", "error", err)
return nil, err
}
wsEvmClient, err := evmclient.New(
opts.KeySigner,
evmclient.WrapEthClient(wsRPC),
opts.Logger.With("component", "wsevmclient"),
)
if err != nil {
opts.Logger.Error("failed to create ws evm client", "error", err)
return nil, err
}
nd.closers = append(nd.closers, wsEvmClient)

bidderRegistryContractAddr := common.HexToAddress(opts.BidderRegistryContract)

bidderRegistry := bidder_registrycontract.New(
opts.KeySigner.GetAddress(),
bidderRegistryContractAddr,
Expand All @@ -136,7 +132,6 @@ func NewNode(opts *Options) (*Node, error) {
)

providerRegistryContractAddr := common.HexToAddress(opts.ProviderRegistryContract)

providerRegistry := provider_registrycontract.New(
providerRegistryContractAddr,
evmClient,
Expand All @@ -160,6 +155,7 @@ func NewNode(opts *Options) (*Node, error) {
default:
keyKeeper = keykeeper.NewBaseKeyKeeper(opts.KeySigner)
}

p2pSvc, err := libp2p.New(&libp2p.Options{
KeyKeeper: keyKeeper,
Secret: opts.Secret,
Expand Down Expand Up @@ -196,8 +192,6 @@ func NewNode(opts *Options) (*Node, error) {

ctx, cancel := context.WithCancel(context.Background())

var preconfProtoClosed <-chan struct{}

contracts, err := getContractABIs(opts)
if err != nil {
opts.Logger.Error("failed to get contract ABIs", "error", err)
Expand All @@ -218,23 +212,23 @@ func NewNode(opts *Options) (*Node, error) {
abis...,
)

wsPub := publisher.NewWSPublisher(
testStore{},
opts.Logger.With("component", "ws_publisher"),
wsRPC,
evtMgr,
)

monitor := txmonitor.New(
opts.KeySigner.GetAddress(),
contractRPC,
txmonitor.NewEVMHelper(contractRPC.Client()),
nil,
opts.Logger.With("component", "tx_monitor"),
128,
)

monitorClosed := monitor.Start(ctx)
var evtPublisher Starter
if opts.WSRPCEndpoint != "" {
// Use WS publisher if WSRPCEndpoint is set
evtPublisher = publisher.NewWSPublisher(
testStore{},
opts.Logger.With("component", "ws_publisher"),
contractRPC,
evtMgr,
)
} else {
evtPublisher = publisher.NewHTTPPublisher(
testStore{},
opts.Logger.With("component", "http_publisher"),
contractRPC,
evtMgr,
)
}

if opts.PeerType != p2p.PeerTypeBootnode.String() {
lis, err := net.Listen("tcp", opts.RPCAddr)
Expand Down Expand Up @@ -305,6 +299,15 @@ func NewNode(opts *Options) (*Node, error) {
return nil, err
}

tracker := preconftracker.NewTracker(
peerType,
evtMgr,
store,
commitmentDA,
opts.Logger.With("component", "tracker"),
)
nd.closers = append(nd.closers, channelCloserFunc(tracker.Start(ctx)))

switch opts.PeerType {
case p2p.PeerTypeProvider.String():
providerAPI := providerapi.NewService(
Expand All @@ -324,7 +327,7 @@ func NewNode(opts *Options) (*Node, error) {
evtMgr,
opts.Logger.With("component", "depositmanager"),
)
depositMgr.Start(ctx)
nd.closers = append(nd.closers, channelCloserFunc(depositMgr.Start(ctx)))
preconfProto := preconfirmation.New(
keyKeeper.GetAddress(),
topo,
Expand All @@ -333,13 +336,10 @@ func NewNode(opts *Options) (*Node, error) {
depositMgr,
bidProcessor,
commitmentDA,
evtMgr,
store,
tracker,
opts.Logger.With("component", "preconfirmation_protocol"),
)

preconfProtoClosed = preconfProto.Start(ctx)

// Only register handler for provider
p2pSvc.AddStreamHandlers(preconfProto.Streams()...)
keyexchange := keyexchange.New(
Expand All @@ -361,13 +361,10 @@ func NewNode(opts *Options) (*Node, error) {
depositMgr,
bidProcessor,
commitmentDA,
evtMgr,
store,
tracker,
opts.Logger.With("component", "preconfirmation_protocol"),
)

preconfProtoClosed = preconfProto.Start(ctx)

srv.RegisterMetricsCollectors(preconfProto.Metrics()...)

bidderAPI := bidderapi.NewService(
Expand Down Expand Up @@ -515,23 +512,9 @@ func NewNode(opts *Options) (*Node, error) {
}
}()
nd.closers = append(nd.closers, server)
nd.closers = append(nd.closers, channelCloserFunc(evtPublisher.Start(ctx, contractAddrs...)))

pubDone := wsPub.Start(ctx, contractAddrs...)

nd.waitClose = func() {
cancel()

closeChan := make(chan struct{})
go func() {
defer close(closeChan)

<-preconfProtoClosed
<-monitorClosed
<-pubDone
}()

<-closeChan
}
nd.cancelFunc = cancel

return nd, nil
}
Expand Down Expand Up @@ -561,14 +544,9 @@ func getContractABIs(opts *Options) (map[common.Address]*abi.ABI, error) {
}

func (n *Node) Close() error {
workersClosed := make(chan struct{})
go func() {
defer close(workersClosed)

if n.waitClose != nil {
n.waitClose()
}
}()
if n.cancelFunc != nil {
n.cancelFunc()
}

var err error
for _, c := range n.closers {
Expand Down Expand Up @@ -615,3 +593,22 @@ func (t testStore) LastBlock() (uint64, error) {
func (t testStore) SetLastBlock(_ uint64) error {
return nil
}

type channelCloser <-chan struct{}

func channelCloserFunc(c <-chan struct{}) io.Closer {
return channelCloser(c)
}

func (c channelCloser) Close() error {
select {
case <-c:
return nil
case <-time.After(5 * time.Second):
return errors.New("timeout waiting for channel to close")
}
}

type Starter interface {
Start(ctx context.Context, contracts ...common.Address) <-chan struct{}
}
Loading

0 comments on commit bd43a9b

Please sign in to comment.