diff --git a/p2p/pkg/node/node.go b/p2p/pkg/node/node.go index f7bb12ade..378392a78 100644 --- a/p2p/pkg/node/node.go +++ b/p2p/pkg/node/node.go @@ -39,6 +39,7 @@ import ( "github.com/primevprotocol/mev-commit/p2p/pkg/p2p" "github.com/primevprotocol/mev-commit/p2p/pkg/p2p/libp2p" "github.com/primevprotocol/mev-commit/p2p/pkg/preconfirmation" + preconftracker "github.com/primevprotocol/mev-commit/p2p/pkg/preconfirmation/tracker" bidderapi "github.com/primevprotocol/mev-commit/p2p/pkg/rpc/bidder" providerapi "github.com/primevprotocol/mev-commit/p2p/pkg/rpc/provider" "github.com/primevprotocol/mev-commit/p2p/pkg/signer" @@ -47,7 +48,6 @@ import ( "github.com/primevprotocol/mev-commit/p2p/pkg/topology" "github.com/primevprotocol/mev-commit/x/contracts/events" "github.com/primevprotocol/mev-commit/x/contracts/events/publisher" - "github.com/primevprotocol/mev-commit/x/contracts/txmonitor" "google.golang.org/grpc" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" @@ -81,8 +81,8 @@ type Options struct { } type Node struct { - waitClose func() - closers []io.Closer + cancelFunc context.CancelFunc + closers []io.Closer } func NewNode(opts *Options) (*Node, error) { @@ -93,11 +93,24 @@ func NewNode(opts *Options) (*Node, error) { srv := apiserver.New(opts.Version, opts.Logger.With("component", "apiserver")) peerType := p2p.FromString(opts.PeerType) - contractRPC, err := ethclient.Dial(opts.RPCEndpoint) - if err != nil { - opts.Logger.Error("failed to connect to rpc", "error", err) - return nil, err + var ( + contractRPC *ethclient.Client + err error + ) + if opts.WSRPCEndpoint != "" { + contractRPC, err = ethclient.Dial(opts.WSRPCEndpoint) + if err != nil { + opts.Logger.Error("failed to connect to ws rpc", "error", err) + return nil, err + } + } else { + contractRPC, err = ethclient.Dial(opts.RPCEndpoint) + if err != nil { + opts.Logger.Error("failed to connect to rpc", "error", err) + return nil, err + } } + evmClient, err := evmclient.New( opts.KeySigner, evmclient.WrapEthClient(contractRPC), @@ -110,24 +123,7 @@ func NewNode(opts *Options) (*Node, error) { nd.closers = append(nd.closers, evmClient) srv.MetricsRegistry().MustRegister(evmClient.Metrics()...) - wsRPC, err := ethclient.Dial(opts.WSRPCEndpoint) - if err != nil { - opts.Logger.Error("failed to connect to ws rpc", "error", err) - return nil, err - } - wsEvmClient, err := evmclient.New( - opts.KeySigner, - evmclient.WrapEthClient(wsRPC), - opts.Logger.With("component", "wsevmclient"), - ) - if err != nil { - opts.Logger.Error("failed to create ws evm client", "error", err) - return nil, err - } - nd.closers = append(nd.closers, wsEvmClient) - bidderRegistryContractAddr := common.HexToAddress(opts.BidderRegistryContract) - bidderRegistry := bidder_registrycontract.New( opts.KeySigner.GetAddress(), bidderRegistryContractAddr, @@ -136,7 +132,6 @@ func NewNode(opts *Options) (*Node, error) { ) providerRegistryContractAddr := common.HexToAddress(opts.ProviderRegistryContract) - providerRegistry := provider_registrycontract.New( providerRegistryContractAddr, evmClient, @@ -160,6 +155,7 @@ func NewNode(opts *Options) (*Node, error) { default: keyKeeper = keykeeper.NewBaseKeyKeeper(opts.KeySigner) } + p2pSvc, err := libp2p.New(&libp2p.Options{ KeyKeeper: keyKeeper, Secret: opts.Secret, @@ -196,8 +192,6 @@ func NewNode(opts *Options) (*Node, error) { ctx, cancel := context.WithCancel(context.Background()) - var preconfProtoClosed <-chan struct{} - contracts, err := getContractABIs(opts) if err != nil { opts.Logger.Error("failed to get contract ABIs", "error", err) @@ -218,23 +212,23 @@ func NewNode(opts *Options) (*Node, error) { abis..., ) - wsPub := publisher.NewWSPublisher( - testStore{}, - opts.Logger.With("component", "ws_publisher"), - wsRPC, - evtMgr, - ) - - monitor := txmonitor.New( - opts.KeySigner.GetAddress(), - contractRPC, - txmonitor.NewEVMHelper(contractRPC.Client()), - nil, - opts.Logger.With("component", "tx_monitor"), - 128, - ) - - monitorClosed := monitor.Start(ctx) + var evtPublisher Starter + if opts.WSRPCEndpoint != "" { + // Use WS publisher if WSRPCEndpoint is set + evtPublisher = publisher.NewWSPublisher( + testStore{}, + opts.Logger.With("component", "ws_publisher"), + contractRPC, + evtMgr, + ) + } else { + evtPublisher = publisher.NewHTTPPublisher( + testStore{}, + opts.Logger.With("component", "http_publisher"), + contractRPC, + evtMgr, + ) + } if opts.PeerType != p2p.PeerTypeBootnode.String() { lis, err := net.Listen("tcp", opts.RPCAddr) @@ -305,6 +299,15 @@ func NewNode(opts *Options) (*Node, error) { return nil, err } + tracker := preconftracker.NewTracker( + peerType, + evtMgr, + store, + commitmentDA, + opts.Logger.With("component", "tracker"), + ) + nd.closers = append(nd.closers, channelCloserFunc(tracker.Start(ctx))) + switch opts.PeerType { case p2p.PeerTypeProvider.String(): providerAPI := providerapi.NewService( @@ -324,7 +327,7 @@ func NewNode(opts *Options) (*Node, error) { evtMgr, opts.Logger.With("component", "depositmanager"), ) - depositMgr.Start(ctx) + nd.closers = append(nd.closers, channelCloserFunc(depositMgr.Start(ctx))) preconfProto := preconfirmation.New( keyKeeper.GetAddress(), topo, @@ -333,13 +336,10 @@ func NewNode(opts *Options) (*Node, error) { depositMgr, bidProcessor, commitmentDA, - evtMgr, - store, + tracker, opts.Logger.With("component", "preconfirmation_protocol"), ) - preconfProtoClosed = preconfProto.Start(ctx) - // Only register handler for provider p2pSvc.AddStreamHandlers(preconfProto.Streams()...) keyexchange := keyexchange.New( @@ -361,13 +361,10 @@ func NewNode(opts *Options) (*Node, error) { depositMgr, bidProcessor, commitmentDA, - evtMgr, - store, + tracker, opts.Logger.With("component", "preconfirmation_protocol"), ) - preconfProtoClosed = preconfProto.Start(ctx) - srv.RegisterMetricsCollectors(preconfProto.Metrics()...) bidderAPI := bidderapi.NewService( @@ -515,23 +512,9 @@ func NewNode(opts *Options) (*Node, error) { } }() nd.closers = append(nd.closers, server) + nd.closers = append(nd.closers, channelCloserFunc(evtPublisher.Start(ctx, contractAddrs...))) - pubDone := wsPub.Start(ctx, contractAddrs...) - - nd.waitClose = func() { - cancel() - - closeChan := make(chan struct{}) - go func() { - defer close(closeChan) - - <-preconfProtoClosed - <-monitorClosed - <-pubDone - }() - - <-closeChan - } + nd.cancelFunc = cancel return nd, nil } @@ -561,14 +544,9 @@ func getContractABIs(opts *Options) (map[common.Address]*abi.ABI, error) { } func (n *Node) Close() error { - workersClosed := make(chan struct{}) - go func() { - defer close(workersClosed) - - if n.waitClose != nil { - n.waitClose() - } - }() + if n.cancelFunc != nil { + n.cancelFunc() + } var err error for _, c := range n.closers { @@ -615,3 +593,22 @@ func (t testStore) LastBlock() (uint64, error) { func (t testStore) SetLastBlock(_ uint64) error { return nil } + +type channelCloser <-chan struct{} + +func channelCloserFunc(c <-chan struct{}) io.Closer { + return channelCloser(c) +} + +func (c channelCloser) Close() error { + select { + case <-c: + return nil + case <-time.After(5 * time.Second): + return errors.New("timeout waiting for channel to close") + } +} + +type Starter interface { + Start(ctx context.Context, contracts ...common.Address) <-chan struct{} +} diff --git a/p2p/pkg/preconfirmation/preconfirmation.go b/p2p/pkg/preconfirmation/preconfirmation.go index e7299f874..9f4f41770 100644 --- a/p2p/pkg/preconfirmation/preconfirmation.go +++ b/p2p/pkg/preconfirmation/preconfirmation.go @@ -3,24 +3,18 @@ package preconfirmation import ( "context" "errors" - "fmt" "log/slog" "math/big" "sync" "time" "github.com/ethereum/go-ethereum/common" - blocktracker "github.com/primevprotocol/mev-commit/contracts-abi/clients/BlockTracker" - preconfcommstore "github.com/primevprotocol/mev-commit/contracts-abi/clients/PreConfCommitmentStore" preconfpb "github.com/primevprotocol/mev-commit/p2p/gen/go/preconfirmation/v1" providerapiv1 "github.com/primevprotocol/mev-commit/p2p/gen/go/providerapi/v1" - preconfcontract "github.com/primevprotocol/mev-commit/p2p/pkg/contracts/preconf" - "github.com/primevprotocol/mev-commit/x/contracts/events" "github.com/primevprotocol/mev-commit/p2p/pkg/p2p" encryptor "github.com/primevprotocol/mev-commit/p2p/pkg/signer/preconfencryptor" "github.com/primevprotocol/mev-commit/p2p/pkg/store" "github.com/primevprotocol/mev-commit/p2p/pkg/topology" - "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -37,11 +31,8 @@ type Preconfirmation struct { streamer p2p.Streamer depositMgr DepositManager processer BidProcessor - commitmentDA preconfcontract.Interface - evtMgr events.EventManager - ecds CommitmentStore - newL1Blocks chan *blocktracker.BlocktrackerNewL1Block - enryptedCmts chan *preconfcommstore.PreconfcommitmentstoreEncryptedCommitmentStored + commitmentDA PreconfContract + tracker Tracker logger *slog.Logger metrics *metrics } @@ -54,12 +45,8 @@ type BidProcessor interface { ProcessBid(context.Context, *preconfpb.Bid) (chan providerapiv1.BidResponse_Status, error) } -type CommitmentStore interface { - GetCommitmentsByBlockNumber(blockNum int64) ([]*store.EncryptedPreConfirmationWithDecrypted, error) - GetCommitmentByHash(commitmentHash string) (*store.EncryptedPreConfirmationWithDecrypted, error) - AddCommitment(commitment *store.EncryptedPreConfirmationWithDecrypted) - DeleteCommitmentByBlockNumber(blockNum int64) error - SetCommitmentIndexByCommitmentDigest(commitmentDigest, commitmentIndex [32]byte) error +type Tracker interface { + TrackCommitment(ctx context.Context, cm *store.EncryptedPreConfirmationWithDecrypted) error } type DepositManager interface { @@ -68,6 +55,14 @@ type DepositManager interface { RefundDeposit(ethAddress common.Address, amount *big.Int, blockNumber int64) error } +type PreconfContract interface { + StoreEncryptedCommitment( + ctx context.Context, + commitmentDigest []byte, + commitmentSignature []byte, + ) (common.Hash, error) +} + func New( owner common.Address, topo Topology, @@ -75,9 +70,8 @@ func New( encryptor encryptor.Encryptor, depositMgr DepositManager, processor BidProcessor, - commitmentDA preconfcontract.Interface, - evtMgr events.EventManager, - edcs CommitmentStore, + commitmentDA PreconfContract, + tracker Tracker, logger *slog.Logger, ) *Preconfirmation { return &Preconfirmation{ @@ -88,10 +82,7 @@ func New( depositMgr: depositMgr, processer: processor, commitmentDA: commitmentDA, - evtMgr: evtMgr, - ecds: edcs, - newL1Blocks: make(chan *blocktracker.BlocktrackerNewL1Block), - enryptedCmts: make(chan *preconfcommstore.PreconfcommitmentstoreEncryptedCommitmentStored), + tracker: tracker, logger: logger, metrics: newMetrics(), } @@ -109,89 +100,6 @@ func (p *Preconfirmation) Streams() []p2p.StreamDesc { return []p2p.StreamDesc{p.bidStream()} } -func (p *Preconfirmation) Start(ctx context.Context) <-chan struct{} { - doneChan := make(chan struct{}) - - eg, egCtx := errgroup.WithContext(ctx) - - startWg := sync.WaitGroup{} - startWg.Add(2) - - eg.Go(func() error { - ev1 := events.NewEventHandler( - "NewL1Block", - func(newL1Block *blocktracker.BlocktrackerNewL1Block) { - select { - case <-egCtx.Done(): - case p.newL1Blocks <- newL1Block: - } - }, - ) - - sub1, err := p.evtMgr.Subscribe(ev1) - if err != nil { - return fmt.Errorf("failed to subscribe to NewL1Block event: %w", err) - } - defer sub1.Unsubscribe() - - ev2 := events.NewEventHandler( - "EncryptedCommitmentStored", - func(ec *preconfcommstore.PreconfcommitmentstoreEncryptedCommitmentStored) { - select { - case <-egCtx.Done(): - case p.enryptedCmts <- ec: - } - }, - ) - sub2, err := p.evtMgr.Subscribe(ev2) - if err != nil { - return fmt.Errorf("failed to subscribe to EncryptedCommitmentStored event: %w", err) - } - defer sub2.Unsubscribe() - - startWg.Done() - - select { - case <-egCtx.Done(): - return nil - case err := <-sub1.Err(): - return fmt.Errorf("NewL1Block subscription error: %w", err) - case err := <-sub2.Err(): - return fmt.Errorf("EncryptedCommitmentStored subscription error: %w", err) - } - }) - - eg.Go(func() error { - startWg.Done() - - for { - select { - case <-egCtx.Done(): - return nil - case newL1Block := <-p.newL1Blocks: - if err := p.handleNewL1Block(egCtx, newL1Block); err != nil { - return err - } - case ec := <-p.enryptedCmts: - if err := p.handleEncryptedCommitmentStored(egCtx, ec); err != nil { - return err - } - } - } - }) - - go func() { - defer close(doneChan) - if err := eg.Wait(); err != nil { - p.logger.Error("failed to start preconfirmation", "error", err) - } - }() - - startWg.Wait() - - return doneChan -} - // SendBid is meant to be called by the bidder to construct and send bids to the provider. // It takes the txHash, the bid amount in wei and the maximum valid block number. // It waits for preConfirmations from all providers and then returns. @@ -205,7 +113,13 @@ func (p *Preconfirmation) SendBid( decayEndTimestamp int64, ) (chan *preconfpb.PreConfirmation, error) { startTime := time.Now() - bid, encryptedBid, err := p.encryptor.ConstructEncryptedBid(txHash, bidAmt, blockNumber, decayStartTimestamp, decayEndTimestamp) + bid, encryptedBid, err := p.encryptor.ConstructEncryptedBid( + txHash, + bidAmt, + blockNumber, + decayStartTimestamp, + decayEndTimestamp, + ) if err != nil { p.logger.Error("constructing encrypted bid", "error", err, "txHash", txHash) return nil, err @@ -263,7 +177,11 @@ func (p *Preconfirmation) SendBid( // Process preConfirmation as a bidder verifyStartTime := time.Now() - sharedSecretKey, providerAddress, err := p.encryptor.VerifyEncryptedPreConfirmation(provider.Keys.NIKEPublicKey, bid.Digest, encryptedPreConfirmation) + sharedSecretKey, providerAddress, err := p.encryptor.VerifyEncryptedPreConfirmation( + provider.Keys.NIKEPublicKey, + bid.Digest, + encryptedPreConfirmation, + ) if err != nil { logger.Error("verifying provider signature", "error", err) return @@ -286,9 +204,13 @@ func (p *Preconfirmation) SendBid( PreConfirmation: preConfirmation, } - p.ecds.AddCommitment(encryptedAndDecryptedPreconfirmation) logger.Info("received preconfirmation", "preConfirmation", preConfirmation) p.metrics.ReceivedPreconfsCount.Inc() + // Track the preconfirmation + if err := p.tracker.TrackCommitment(ctx, encryptedAndDecryptedPreconfirmation); err != nil { + logger.Error("tracking commitment", "error", err) + return + } select { case preConfirmations <- preConfirmation: @@ -390,7 +312,10 @@ func (p *Preconfirmation) handleBid( PreConfirmation: preConfirmation, } - p.ecds.AddCommitment(encryptedAndDecryptedPreconfirmation) + if err := p.tracker.TrackCommitment(ctx, encryptedAndDecryptedPreconfirmation); err != nil { + p.logger.Error("tracking commitment", "error", err) + return status.Errorf(codes.Internal, "failed to track commitment: %v", err) + } // If we reach here, the bid was successful successful = true @@ -400,50 +325,3 @@ func (p *Preconfirmation) handleBid( } return nil } - -func (p *Preconfirmation) handleNewL1Block(ctx context.Context, newL1Block *blocktracker.BlocktrackerNewL1Block) error { - p.logger.Info("New L1 Block event received", "blockNumber", newL1Block.BlockNumber, "winner", newL1Block.Winner, "window", newL1Block.Window) - commitments, err := p.ecds.GetCommitmentsByBlockNumber(newL1Block.BlockNumber.Int64()) - if err != nil { - p.logger.Error("failed to get commitments by block number", "error", err) - return err - } - for _, commitment := range commitments { - if common.BytesToAddress(commitment.ProviderAddress) != newL1Block.Winner { - p.logger.Info("provider address does not match the winner", "providerAddress", commitment.ProviderAddress, "winner", newL1Block.Winner) - continue - } - startTime := time.Now() - txHash, err := p.commitmentDA.OpenCommitment( - ctx, - commitment.EncryptedPreConfirmation.CommitmentIndex, - commitment.PreConfirmation.Bid.BidAmount, - commitment.PreConfirmation.Bid.BlockNumber, - commitment.PreConfirmation.Bid.TxHash, - commitment.PreConfirmation.Bid.DecayStartTimestamp, - commitment.PreConfirmation.Bid.DecayEndTimestamp, - commitment.PreConfirmation.Bid.Signature, - commitment.PreConfirmation.Signature, - commitment.PreConfirmation.SharedSecret, - ) - if err != nil { - // todo: retry mechanism? - p.logger.Error("failed to open commitment", "error", err) - continue - } - duration := time.Since(startTime) - p.logger.Info("opened commitment", "txHash", txHash, "duration", duration) - } - - err = p.ecds.DeleteCommitmentByBlockNumber(newL1Block.BlockNumber.Int64()) - if err != nil { - p.logger.Error("failed to delete commitments by block number", "error", err) - return err - } - return nil -} - -func (p *Preconfirmation) handleEncryptedCommitmentStored(ctx context.Context, ec *preconfcommstore.PreconfcommitmentstoreEncryptedCommitmentStored) error { - p.logger.Info("Encrypted Commitment Stored event received", "commitmentDigest", ec.CommitmentDigest, "commitmentIndex", ec.CommitmentIndex) - return p.ecds.SetCommitmentIndexByCommitmentDigest(ec.CommitmentDigest, ec.CommitmentIndex) -} diff --git a/p2p/pkg/preconfirmation/preconfirmation_test.go b/p2p/pkg/preconfirmation/preconfirmation_test.go index 157dfa6b0..6dc5c39b3 100644 --- a/p2p/pkg/preconfirmation/preconfirmation_test.go +++ b/p2p/pkg/preconfirmation/preconfirmation_test.go @@ -9,14 +9,11 @@ import ( "log/slog" "math/big" "os" - "strings" "testing" "time" - "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto/ecies" - blocktracker "github.com/primevprotocol/mev-commit/contracts-abi/clients/BlockTracker" preconfpb "github.com/primevprotocol/mev-commit/p2p/gen/go/preconfirmation/v1" providerapiv1 "github.com/primevprotocol/mev-commit/p2p/gen/go/providerapi/v1" "github.com/primevprotocol/mev-commit/p2p/pkg/p2p" @@ -24,8 +21,6 @@ import ( "github.com/primevprotocol/mev-commit/p2p/pkg/preconfirmation" "github.com/primevprotocol/mev-commit/p2p/pkg/store" "github.com/primevprotocol/mev-commit/p2p/pkg/topology" - "github.com/primevprotocol/mev-commit/x/contracts/events" - "github.com/primevprotocol/mev-commit/x/util" ) type testTopo struct { @@ -93,25 +88,6 @@ func (t *testCommitmentDA) StoreEncryptedCommitment( return common.Hash{}, nil } -func (t *testCommitmentDA) OpenCommitment( - _ context.Context, - _ []byte, - _ string, - _ int64, - _ string, - _ int64, - _ int64, - _ []byte, - _ []byte, - _ []byte, -) (common.Hash, error) { - return common.Hash{}, nil -} - -func (t *testCommitmentDA) Close() error { - return nil -} - func newTestLogger(t *testing.T, w io.Writer) *slog.Logger { t.Helper() @@ -135,6 +111,12 @@ func (t *testDepositManager) RefundDeposit(address common.Address, deductedAmoun return nil } +type testTracker struct{} + +func (t *testTracker) TrackCommitment(ctx context.Context, cm *store.EncryptedPreConfirmationWithDecrypted) error { + return nil +} + func TestPreconfBidSubmission(t *testing.T) { t.Parallel() @@ -205,20 +187,6 @@ func TestPreconfBidSubmission(t *testing.T) { preConfirmationSigner: common.HexToAddress("0x2"), } - btABI, err := abi.JSON(strings.NewReader(blocktracker.BlocktrackerABI)) - if err != nil { - t.Fatal(err) - } - - evtMgr := events.NewListener( - util.NewTestLogger(io.Discard), - &btABI, - ) - - store, err := store.NewStore() - if err != nil { - t.Fatal(err) - } depositMgr := &testDepositManager{} p := preconfirmation.New( client.EthAddress, @@ -228,8 +196,7 @@ func TestPreconfBidSubmission(t *testing.T) { depositMgr, proc, &testCommitmentDA{}, - evtMgr, - store, + &testTracker{}, newTestLogger(t, os.Stdout), ) diff --git a/p2p/pkg/preconfirmation/tracker/tracker.go b/p2p/pkg/preconfirmation/tracker/tracker.go new file mode 100644 index 000000000..e7304f434 --- /dev/null +++ b/p2p/pkg/preconfirmation/tracker/tracker.go @@ -0,0 +1,257 @@ +package preconftracker + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/ethereum/go-ethereum/common" + blocktracker "github.com/primevprotocol/mev-commit/contracts-abi/clients/BlockTracker" + preconfcommstore "github.com/primevprotocol/mev-commit/contracts-abi/clients/PreConfCommitmentStore" + "github.com/primevprotocol/mev-commit/p2p/pkg/p2p" + "github.com/primevprotocol/mev-commit/p2p/pkg/store" + "github.com/primevprotocol/mev-commit/x/contracts/events" + "golang.org/x/sync/errgroup" +) + +type Tracker struct { + peerType p2p.PeerType + evtMgr events.EventManager + store CommitmentStore + preconfContract PreconfContract + newL1Blocks chan *blocktracker.BlocktrackerNewL1Block + enryptedCmts chan *preconfcommstore.PreconfcommitmentstoreEncryptedCommitmentStored + commitments chan *preconfcommstore.PreconfcommitmentstoreCommitmentStored + winners map[int64]*blocktracker.BlocktrackerNewL1Block + logger *slog.Logger +} + +type CommitmentStore interface { + GetCommitmentsByBlockNumber(blockNum int64) ([]*store.EncryptedPreConfirmationWithDecrypted, error) + GetCommitmentByHash(commitmentHash string) (*store.EncryptedPreConfirmationWithDecrypted, error) + AddCommitment(commitment *store.EncryptedPreConfirmationWithDecrypted) + DeleteCommitmentByBlockNumber(blockNum int64) error + DeleteCommitmentByIndex(blockNum int64, index [32]byte) error + SetCommitmentIndexByCommitmentDigest(commitmentDigest, commitmentIndex [32]byte) error +} + +type PreconfContract interface { + OpenCommitment( + ctx context.Context, + encryptedCommitmentIndex []byte, + bid string, + blockNumber int64, + txnHash string, + decayStartTimeStamp int64, + decayEndTimeStamp int64, + bidSignature []byte, + commitmentSignature []byte, + sharedSecretKey []byte, + ) (common.Hash, error) +} + +func NewTracker( + peerType p2p.PeerType, + evtMgr events.EventManager, + store CommitmentStore, + preconfContract PreconfContract, + logger *slog.Logger, +) *Tracker { + return &Tracker{ + peerType: peerType, + evtMgr: evtMgr, + store: store, + preconfContract: preconfContract, + newL1Blocks: make(chan *blocktracker.BlocktrackerNewL1Block), + enryptedCmts: make(chan *preconfcommstore.PreconfcommitmentstoreEncryptedCommitmentStored), + commitments: make(chan *preconfcommstore.PreconfcommitmentstoreCommitmentStored), + winners: make(map[int64]*blocktracker.BlocktrackerNewL1Block), + logger: logger, + } +} + +func (t *Tracker) Start(ctx context.Context) <-chan struct{} { + doneChan := make(chan struct{}) + + eg, egCtx := errgroup.WithContext(ctx) + + evts := []events.EventHandler{ + events.NewEventHandler( + "NewL1Block", + func(newL1Block *blocktracker.BlocktrackerNewL1Block) { + select { + case <-egCtx.Done(): + case t.newL1Blocks <- newL1Block: + } + }, + ), + events.NewEventHandler( + "EncryptedCommitmentStored", + func(ec *preconfcommstore.PreconfcommitmentstoreEncryptedCommitmentStored) { + select { + case <-egCtx.Done(): + case t.enryptedCmts <- ec: + } + }, + ), + } + + if t.peerType == p2p.PeerTypeBidder { + evts = append(evts, events.NewEventHandler( + "CommitmentStored", + func(cs *preconfcommstore.PreconfcommitmentstoreCommitmentStored) { + select { + case <-egCtx.Done(): + case t.commitments <- cs: + } + }, + )) + } + + sub, err := t.evtMgr.Subscribe(evts...) + if err != nil { + close(doneChan) + t.logger.Error("failed to subscribe to events", "error", err) + return doneChan + } + + eg.Go(func() error { + select { + case <-egCtx.Done(): + return nil + case err := <-sub.Err(): + return fmt.Errorf("event subscription error: %w", err) + } + }) + + eg.Go(func() error { + for { + select { + case <-egCtx.Done(): + return nil + case newL1Block := <-t.newL1Blocks: + if err := t.handleNewL1Block(egCtx, newL1Block); err != nil { + return err + } + case ec := <-t.enryptedCmts: + if err := t.handleEncryptedCommitmentStored(egCtx, ec); err != nil { + return err + } + case cs := <-t.commitments: + if err := t.handleCommitmentStored(egCtx, cs); err != nil { + return err + } + } + } + }) + + go func() { + defer close(doneChan) + if err := eg.Wait(); err != nil { + t.logger.Error("failed to start preconfirmation", "error", err) + } + }() + + return doneChan +} + +func (t *Tracker) TrackCommitment( + ctx context.Context, + commitment *store.EncryptedPreConfirmationWithDecrypted, +) error { + t.store.AddCommitment(commitment) + return nil +} + +func (t *Tracker) handleNewL1Block( + ctx context.Context, + newL1Block *blocktracker.BlocktrackerNewL1Block, +) error { + t.logger.Info( + "new L1 Block event received", + "blockNumber", newL1Block.BlockNumber, + "winner", newL1Block.Winner, + "window", newL1Block.Window, + ) + + blockToProcess := newL1Block.BlockNumber + if t.peerType == p2p.PeerTypeBidder { + // Bidders should process the block 1 behind the current one. Ideally the + // provider should open the commitment as they get the reward, so the incentive + // for bidder to open is only in cases of slashes as he will get refund. Only one + // of bidder or provider should open the commitment as 1 of the txns would + // fail. This delay is to ensure this. + t.winners[newL1Block.BlockNumber.Int64()] = newL1Block + pastBlock, ok := t.winners[newL1Block.BlockNumber.Int64()-1] + if !ok { + return nil + } + blockToProcess = pastBlock.BlockNumber + for k := range t.winners { + if k < blockToProcess.Int64() { + delete(t.winners, k) + } + } + } + + commitments, err := t.store.GetCommitmentsByBlockNumber(blockToProcess.Int64()) + if err != nil { + return err + } + + for _, commitment := range commitments { + if common.BytesToAddress(commitment.ProviderAddress) != newL1Block.Winner { + t.logger.Debug( + "provider address does not match the winner", + "providerAddress", commitment.ProviderAddress, + "winner", newL1Block.Winner, + ) + continue + } + startTime := time.Now() + txHash, err := t.preconfContract.OpenCommitment( + ctx, + commitment.EncryptedPreConfirmation.CommitmentIndex, + commitment.PreConfirmation.Bid.BidAmount, + commitment.PreConfirmation.Bid.BlockNumber, + commitment.PreConfirmation.Bid.TxHash, + commitment.PreConfirmation.Bid.DecayStartTimestamp, + commitment.PreConfirmation.Bid.DecayEndTimestamp, + commitment.PreConfirmation.Bid.Signature, + commitment.PreConfirmation.Signature, + commitment.PreConfirmation.SharedSecret, + ) + if err != nil { + // todo: retry mechanism? + t.logger.Error("failed to open commitment", "error", err) + continue + } + duration := time.Since(startTime) + t.logger.Info("opened commitment", "txHash", txHash, "duration", duration) + } + + err = t.store.DeleteCommitmentByBlockNumber(blockToProcess.Int64()) + if err != nil { + t.logger.Error("failed to delete commitments by block number", "error", err) + return err + } + + return nil +} + +func (t *Tracker) handleEncryptedCommitmentStored( + ctx context.Context, + ec *preconfcommstore.PreconfcommitmentstoreEncryptedCommitmentStored, +) error { + return t.store.SetCommitmentIndexByCommitmentDigest(ec.CommitmentDigest, ec.CommitmentIndex) +} + +func (t *Tracker) handleCommitmentStored( + ctx context.Context, + cs *preconfcommstore.PreconfcommitmentstoreCommitmentStored, +) error { + // In case of bidders this event keeps track of the commitments already opened + // by the provider. + return t.store.DeleteCommitmentByIndex(int64(cs.BlockNumber), cs.CommitmentIndex) +} diff --git a/p2p/pkg/preconfirmation/tracker/tracker_test.go b/p2p/pkg/preconfirmation/tracker/tracker_test.go new file mode 100644 index 000000000..22f7ca4af --- /dev/null +++ b/p2p/pkg/preconfirmation/tracker/tracker_test.go @@ -0,0 +1,428 @@ +package preconftracker_test + +import ( + "bytes" + "context" + "fmt" + "io" + "math/big" + "os" + "strconv" + "strings" + "testing" + + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + blocktracker "github.com/primevprotocol/mev-commit/contracts-abi/clients/BlockTracker" + preconf "github.com/primevprotocol/mev-commit/contracts-abi/clients/PreConfCommitmentStore" + preconfpb "github.com/primevprotocol/mev-commit/p2p/gen/go/preconfirmation/v1" + "github.com/primevprotocol/mev-commit/p2p/pkg/p2p" + preconftracker "github.com/primevprotocol/mev-commit/p2p/pkg/preconfirmation/tracker" + "github.com/primevprotocol/mev-commit/p2p/pkg/store" + "github.com/primevprotocol/mev-commit/x/contracts/events" + "github.com/primevprotocol/mev-commit/x/util" +) + +func TestTracker(t *testing.T) { + t.Parallel() + + pcABI, err := abi.JSON(strings.NewReader(preconf.PreconfcommitmentstoreABI)) + if err != nil { + t.Fatal(err) + } + + btABI, err := abi.JSON(strings.NewReader(blocktracker.BlocktrackerABI)) + if err != nil { + t.Fatal(err) + } + + evtMgr := events.NewListener( + util.NewTestLogger(os.Stdout), + &btABI, + &pcABI, + ) + + st, err := store.NewStore() + if err != nil { + t.Fatal(err) + } + + contract := &testPreconfContract{ + openedCommitments: make(chan openedCommitment, 10), + } + + tracker := preconftracker.NewTracker( + p2p.PeerTypeBidder, + evtMgr, + st, + contract, + util.NewTestLogger(io.Discard), + ) + + ctx, cancel := context.WithCancel(context.Background()) + doneChan := tracker.Start(ctx) + + winnerProvider := common.HexToAddress("0x1234") + loserProvider := common.HexToAddress("0x5678") + + getProvider := func(blkNum int64) common.Address { + if blkNum%2 != 0 { + return winnerProvider + } + return loserProvider + } + + getBlockNum := func(idx int) int64 { + return int64(idx/2 + idx%2) + } + + commitments := make([]*store.EncryptedPreConfirmationWithDecrypted, 0) + + for i := 1; i <= 10; i++ { + idx := common.HexToHash(fmt.Sprintf("0x%x", i)) + digest := common.HexToHash(fmt.Sprintf("0x%x", i)) + + commitments = append(commitments, &store.EncryptedPreConfirmationWithDecrypted{ + EncryptedPreConfirmation: &preconfpb.EncryptedPreConfirmation{ + Commitment: digest.Bytes(), + Signature: []byte(fmt.Sprintf("signature%d", i)), + CommitmentIndex: idx.Bytes(), + }, + PreConfirmation: &preconfpb.PreConfirmation{ + Bid: &preconfpb.Bid{ + TxHash: common.HexToHash(fmt.Sprintf("0x%d", i)).String(), + BidAmount: "1000", + BlockNumber: getBlockNum(i), + DecayStartTimestamp: 1, + DecayEndTimestamp: 2, + Digest: []byte(fmt.Sprintf("digest%d", i)), + Signature: []byte(fmt.Sprintf("signature%d", i)), + NikePublicKey: []byte(fmt.Sprintf("nikePublicKey%d", i)), + }, + Digest: digest.Bytes(), + Signature: []byte(fmt.Sprintf("signature%d", i)), + ProviderAddress: getProvider(getBlockNum(i)).Bytes(), + SharedSecret: []byte(fmt.Sprintf("sharedSecret%d", i)), + }, + }) + } + + for _, c := range commitments { + err := tracker.TrackCommitment(context.Background(), c) + if err != nil { + t.Fatal(err) + } + + err = publishEncCommitment(evtMgr, &pcABI, preconf.PreconfcommitmentstoreEncryptedCommitmentStored{ + Commiter: common.BytesToAddress(c.PreConfirmation.ProviderAddress), + CommitmentIndex: common.BytesToHash(c.EncryptedPreConfirmation.CommitmentIndex), + CommitmentDigest: common.BytesToHash(c.EncryptedPreConfirmation.Commitment), + CommitmentSignature: c.EncryptedPreConfirmation.Signature, + BlockCommitedAt: big.NewInt(1), + }) + if err != nil { + t.Fatal(err) + } + } + + amount, err := strconv.ParseUint(commitments[4].PreConfirmation.Bid.BidAmount, 10, 64) + if err != nil { + t.Fatal(err) + } + + // this commitment should not be opened again + err = publishCommitment(evtMgr, &pcABI, preconf.PreconfcommitmentstoreCommitmentStored{ + CommitmentIndex: common.BytesToHash(commitments[4].EncryptedPreConfirmation.CommitmentIndex), + Bidder: common.HexToAddress("0x1234"), + Commiter: common.BytesToAddress(commitments[4].PreConfirmation.ProviderAddress), + Bid: amount, + BlockNumber: uint64(commitments[4].PreConfirmation.Bid.BlockNumber), + BidHash: common.BytesToHash(commitments[4].PreConfirmation.Bid.Digest), + DecayStartTimeStamp: uint64(commitments[4].PreConfirmation.Bid.DecayStartTimestamp), + DecayEndTimeStamp: uint64(commitments[4].PreConfirmation.Bid.DecayEndTimestamp), + TxnHash: commitments[4].PreConfirmation.Bid.TxHash, + CommitmentHash: common.BytesToHash(commitments[4].PreConfirmation.Digest), + BidSignature: commitments[4].PreConfirmation.Bid.Signature, + CommitmentSignature: commitments[4].PreConfirmation.Signature, + BlockCommitedAt: big.NewInt(1), + SharedSecretKey: commitments[4].PreConfirmation.SharedSecret, + }) + if err != nil { + t.Fatal(err) + } + + for i := 1; i <= 5; i++ { + publishNewWinner(evtMgr, &btABI, blocktracker.BlocktrackerNewL1Block{ + BlockNumber: big.NewInt(int64(i)), + Winner: winnerProvider, + Window: big.NewInt(1), + }) + } + + opened := []*store.EncryptedPreConfirmationWithDecrypted{ + commitments[0], + commitments[1], + commitments[5], + } + + for _, c := range opened { + oc := <-contract.openedCommitments + if !bytes.Equal(c.EncryptedPreConfirmation.CommitmentIndex, oc.encryptedCommitmentIndex) { + t.Fatalf( + "expected commitment index %x, got %x", + c.EncryptedPreConfirmation.CommitmentIndex, + oc.encryptedCommitmentIndex, + ) + } + if c.PreConfirmation.Bid.BidAmount != oc.bid { + t.Fatalf("expected bid %s, got %s", c.PreConfirmation.Bid.BidAmount, oc.bid) + } + if c.PreConfirmation.Bid.BlockNumber != oc.blockNumber { + t.Fatalf("expected block number %d, got %d", c.PreConfirmation.Bid.BlockNumber, oc.blockNumber) + } + if c.PreConfirmation.Bid.TxHash != oc.txnHash { + t.Fatalf("expected txn hash %s, got %s", c.PreConfirmation.Bid.TxHash, oc.txnHash) + } + if c.PreConfirmation.Bid.DecayStartTimestamp != oc.decayStartTimeStamp { + t.Fatalf( + "expected decay start timestamp %d, got %d", + c.PreConfirmation.Bid.DecayStartTimestamp, + oc.decayStartTimeStamp, + ) + } + if c.PreConfirmation.Bid.DecayEndTimestamp != oc.decayEndTimeStamp { + t.Fatalf("expected decay end timestamp %d, got %d", c.PreConfirmation.Bid.DecayEndTimestamp, oc.decayEndTimeStamp) + } + if !bytes.Equal(c.PreConfirmation.Bid.Signature, oc.bidSignature) { + t.Fatalf( + "expected bid signature %x, got %x", + c.PreConfirmation.Bid.Signature, + oc.bidSignature, + ) + } + if !bytes.Equal(c.PreConfirmation.Signature, oc.commitmentSignature) { + t.Fatalf( + "expected commitment signature %x, got %x", + c.PreConfirmation.Signature, + oc.commitmentSignature, + ) + } + if !bytes.Equal(c.PreConfirmation.SharedSecret, oc.sharedSecretKey) { + t.Fatalf( + "expected shared secret key %x, got %x", + c.PreConfirmation.SharedSecret, + oc.sharedSecretKey, + ) + } + } + + select { + case <-contract.openedCommitments: + t.Fatal("unexpected opened commitment") + default: + } + + publishNewWinner(evtMgr, &btABI, blocktracker.BlocktrackerNewL1Block{ + BlockNumber: big.NewInt(6), + Winner: winnerProvider, + Window: big.NewInt(1), + }) + + opened = []*store.EncryptedPreConfirmationWithDecrypted{ + commitments[8], + commitments[9], + } + + for _, c := range opened { + oc := <-contract.openedCommitments + if !bytes.Equal(c.EncryptedPreConfirmation.CommitmentIndex, oc.encryptedCommitmentIndex) { + t.Fatalf( + "expected commitment index %x, got %x", + c.EncryptedPreConfirmation.CommitmentIndex, + oc.encryptedCommitmentIndex, + ) + } + if c.PreConfirmation.Bid.BidAmount != oc.bid { + t.Fatalf("expected bid %s, got %s", c.PreConfirmation.Bid.BidAmount, oc.bid) + } + if c.PreConfirmation.Bid.BlockNumber != oc.blockNumber { + t.Fatalf("expected block number %d, got %d", c.PreConfirmation.Bid.BlockNumber, oc.blockNumber) + } + if c.PreConfirmation.Bid.TxHash != oc.txnHash { + t.Fatalf("expected txn hash %s, got %s", c.PreConfirmation.Bid.TxHash, oc.txnHash) + } + if c.PreConfirmation.Bid.DecayStartTimestamp != oc.decayStartTimeStamp { + t.Fatalf( + "expected decay start timestamp %d, got %d", + c.PreConfirmation.Bid.DecayStartTimestamp, + oc.decayStartTimeStamp, + ) + } + if c.PreConfirmation.Bid.DecayEndTimestamp != oc.decayEndTimeStamp { + t.Fatalf("expected decay end timestamp %d, got %d", c.PreConfirmation.Bid.DecayEndTimestamp, oc.decayEndTimeStamp) + } + if !bytes.Equal(c.PreConfirmation.Bid.Signature, oc.bidSignature) { + t.Fatalf( + "expected bid signature %x, got %x", + c.PreConfirmation.Bid.Signature, + oc.bidSignature, + ) + } + if !bytes.Equal(c.PreConfirmation.Signature, oc.commitmentSignature) { + t.Fatalf( + "expected commitment signature %x, got %x", + c.PreConfirmation.Signature, + oc.commitmentSignature, + ) + } + if !bytes.Equal(c.PreConfirmation.SharedSecret, oc.sharedSecretKey) { + t.Fatalf( + "expected shared secret key %x, got %x", + c.PreConfirmation.SharedSecret, + oc.sharedSecretKey, + ) + } + } + + cancel() + + <-doneChan +} + +type openedCommitment struct { + encryptedCommitmentIndex []byte + bid string + blockNumber int64 + txnHash string + decayStartTimeStamp int64 + decayEndTimeStamp int64 + bidSignature []byte + commitmentSignature []byte + sharedSecretKey []byte +} + +type testPreconfContract struct { + openedCommitments chan openedCommitment +} + +func (t *testPreconfContract) OpenCommitment( + ctx context.Context, + encryptedCommitmentIndex []byte, + bid string, + blockNumber int64, + txnHash string, + decayStartTimeStamp int64, + decayEndTimeStamp int64, + bidSignature []byte, + commitmentSignature []byte, + sharedSecretKey []byte, +) (common.Hash, error) { + t.openedCommitments <- openedCommitment{ + encryptedCommitmentIndex: encryptedCommitmentIndex, + bid: bid, + blockNumber: blockNumber, + txnHash: txnHash, + decayStartTimeStamp: decayStartTimeStamp, + decayEndTimeStamp: decayEndTimeStamp, + bidSignature: bidSignature, + commitmentSignature: commitmentSignature, + sharedSecretKey: sharedSecretKey, + } + return common.Hash{}, nil +} + +func publishEncCommitment( + evtMgr events.EventManager, + pcABI *abi.ABI, + ec preconf.PreconfcommitmentstoreEncryptedCommitmentStored, +) error { + event := pcABI.Events["EncryptedCommitmentStored"] + buf, err := event.Inputs.NonIndexed().Pack( + ec.Commiter, + ec.CommitmentDigest, + ec.CommitmentSignature, + ec.BlockCommitedAt, + ) + if err != nil { + return err + } + + commitmentIndex := common.BytesToHash(ec.CommitmentIndex[:]) + + // Creating a Log object + testLog := types.Log{ + Topics: []common.Hash{ + event.ID, // The first topic is the hash of the event signature + commitmentIndex, // The next topics are the indexed event parameters + }, + // Non-indexed parameters are stored in the Data field + Data: buf, + } + + evtMgr.PublishLogEvent(context.Background(), testLog) + return nil +} + +func publishCommitment( + evtMgr events.EventManager, + pcABI *abi.ABI, + c preconf.PreconfcommitmentstoreCommitmentStored, +) error { + event := pcABI.Events["CommitmentStored"] + buf, err := event.Inputs.NonIndexed().Pack( + c.Bidder, + c.Commiter, + c.Bid, + c.BlockNumber, + c.BidHash, + c.DecayStartTimeStamp, + c.DecayEndTimeStamp, + c.TxnHash, + c.CommitmentHash, + c.BidSignature, + c.CommitmentSignature, + c.BlockCommitedAt, + c.SharedSecretKey, + ) + if err != nil { + return err + } + + commitmentIndex := common.BytesToHash(c.CommitmentIndex[:]) + + // Creating a Log object + testLog := types.Log{ + Topics: []common.Hash{ + event.ID, // The first topic is the hash of the event signature + commitmentIndex, // The next topics are the indexed event parameters + }, + // Since there are no non-indexed parameters, Data is empty + Data: buf, + } + + evtMgr.PublishLogEvent(context.Background(), testLog) + return nil +} + +func publishNewWinner( + evtMgr events.EventManager, + btABI *abi.ABI, + w blocktracker.BlocktrackerNewL1Block, +) { + event := btABI.Events["NewL1Block"] + + // Creating a Log object + testLog := types.Log{ + Topics: []common.Hash{ + event.ID, // The first topic is the hash of the event signature + common.BigToHash(w.BlockNumber), // The next topics are the indexed event parameters + common.HexToHash(w.Winner.Hex()), + common.BigToHash(w.Window), + }, + // Non-indexed parameters are stored in the Data field + Data: nil, + } + + evtMgr.PublishLogEvent(context.Background(), testLog) +} diff --git a/p2p/pkg/store/store.go b/p2p/pkg/store/store.go index 25d4f2ba5..8cece2757 100644 --- a/p2p/pkg/store/store.go +++ b/p2p/pkg/store/store.go @@ -128,6 +128,29 @@ func (cs *CommitmentsStore) DeleteCommitmentByBlockNumber(blockNum int64) error return nil } +func (cs *CommitmentsStore) DeleteCommitmentByIndex(blockNum int64, index [32]byte) error { + cs.commitmentByBlockNumberMu.Lock() + defer cs.commitmentByBlockNumberMu.Unlock() + + for _, v := range cs.commitmentsByCommitmentHash { + if common.Bytes2Hex(v.EncryptedPreConfirmation.CommitmentIndex) == common.Bytes2Hex(index[:]) { + err := cs.deleteCommitmentByHash(common.Bytes2Hex(v.Commitment)) + if err != nil { + return err + } + break + } + } + + for idx, v := range cs.commitmentsByBlockNumber[blockNum] { + if common.Bytes2Hex(v.EncryptedPreConfirmation.CommitmentIndex) == common.Bytes2Hex(index[:]) { + cs.commitmentsByBlockNumber[blockNum] = append(cs.commitmentsByBlockNumber[blockNum][:idx], cs.commitmentsByBlockNumber[blockNum][idx+1:]...) + break + } + } + return nil +} + func (cs *CommitmentsStore) deleteCommitmentByHash(hash string) error { cs.commitmentsByCommitmentHashMu.Lock() defer cs.commitmentsByCommitmentHashMu.Unlock() diff --git a/x/contracts/events/events.go b/x/contracts/events/events.go index c8e90e57a..9950708dc 100644 --- a/x/contracts/events/events.go +++ b/x/contracts/events/events.go @@ -20,6 +20,7 @@ type EventHandler interface { eventName() string handle(types.Log) error setTopicAndContract(topic common.Hash, contract *abi.ABI) + topic() common.Hash } // eventHandler is a generic implementation of EventHandler for type-safe event handling. @@ -85,13 +86,17 @@ func (h *eventHandler[T]) handle(log types.Log) error { return nil } +func (h *eventHandler[T]) topic() common.Hash { + return h.topicID +} + // EventManager is an interface for subscribing to contract events. The EventHandler callback // is called when an event is received. The Subscription returned by the Subscribe // method can be used to unsubscribe from the event and also to receive any errors // that occur while parsing the event. The PublishLogEvent method is used to publish // the log events to the subscribers. type EventManager interface { - Subscribe(event EventHandler) (Subscription, error) + Subscribe(event ...EventHandler) (Subscription, error) PublishLogEvent(ctx context.Context, log types.Log) } @@ -108,7 +113,7 @@ type Subscription interface { type Listener struct { logger *slog.Logger subMu sync.RWMutex - subscribers map[common.Hash][]*subscription + subscribers map[common.Hash][]*storedEvent contracts []*abi.ABI } @@ -118,13 +123,12 @@ func NewListener( ) *Listener { return &Listener{ logger: logger, - subscribers: make(map[common.Hash][]*subscription), + subscribers: make(map[common.Hash][]*storedEvent), contracts: contracts, } } type subscription struct { - event EventHandler unsub func() errCh chan error } @@ -137,32 +141,50 @@ func (s *subscription) Err() <-chan error { return s.errCh } -func (l *Listener) Subscribe(event EventHandler) (Subscription, error) { - var topic common.Hash - for _, c := range l.contracts { - for _, e := range c.Events { - if e.Name == event.eventName() { - event.setTopicAndContract(e.ID, c) - topic = e.ID - break - } - } +type storedEvent struct { + evt EventHandler + errCh chan error +} + +func (l *Listener) Subscribe(ev ...EventHandler) (Subscription, error) { + if len(ev) == 0 { + return nil, fmt.Errorf("no events provided") } - if topic == (common.Hash{}) { - return nil, fmt.Errorf("event not found") + for _, event := range ev { + found := false + for _, c := range l.contracts { + for _, e := range c.Events { + if e.Name == event.eventName() { + event.setTopicAndContract(e.ID, c) + found = true + break + } + } + } + if !found { + return nil, fmt.Errorf("event %s not found", event.eventName()) + } } l.subMu.Lock() defer l.subMu.Unlock() sub := &subscription{ - event: event, - errCh: make(chan error, 1), - unsub: func() { l.unsubscribe(topic, event) }, + errCh: make(chan error, len(ev)), + unsub: func() { + for _, event := range ev { + l.unsubscribe(event.topic(), event) + } + }, } - l.subscribers[topic] = append(l.subscribers[topic], sub) + for _, event := range ev { + l.subscribers[event.topic()] = append(l.subscribers[event.topic()], &storedEvent{ + evt: event, + errCh: sub.errCh, + }) + } return sub, nil } @@ -173,7 +195,7 @@ func (l *Listener) unsubscribe(topic common.Hash, event EventHandler) { events := l.subscribers[topic] for i, e := range events { - if e.event == event { + if e.evt == event { events = append(events[:i], events[i+1:]...) close(e.errCh) break @@ -195,13 +217,13 @@ func (l *Listener) PublishLogEvent(ctx context.Context, log types.Log) { go func() { defer wg.Done() - if err := ev.event.handle(log); err != nil { + if err := ev.evt.handle(log); err != nil { l.logger.Error("failed to handle log", "error", err) select { - case ev.errCh <- err: + case ev.errCh <- fmt.Errorf("failed to handle event %s: %w", ev.evt.eventName(), err): case <-ctx.Done(): default: - l.logger.Error("failed to send error to subscriber", "error", err, "event", ev.event.eventName()) + l.logger.Error("failed to send error to subscriber", "error", err, "event", ev.evt.eventName()) } } }()