From 932ac47a76bc9269223a7a9072da7ea481bc29ab Mon Sep 17 00:00:00 2001 From: Alok Date: Fri, 3 May 2024 18:09:19 +0530 Subject: [PATCH] fix: oracle changes after decay merge --- oracle/pkg/node/node.go | 1 - oracle/pkg/store/store.go | 8 +- oracle/pkg/store/store_test.go | 6 +- oracle/pkg/updater/updater.go | 179 ++++++++++------------------- oracle/pkg/updater/updater_test.go | 48 +++----- x/contracts/events/events.go | 5 +- 6 files changed, 82 insertions(+), 165 deletions(-) diff --git a/oracle/pkg/node/node.go b/oracle/pkg/node/node.go index 2a76ab227..cc2915c8e 100644 --- a/oracle/pkg/node/node.go +++ b/oracle/pkg/node/node.go @@ -213,7 +213,6 @@ func NewNode(opts *Options) (*Node, error) { updtr, err := updater.NewUpdater( nd.logger.With("component", "updater"), l1Client, - settlementClient, st, evtMgr, oracleTransactorSession, diff --git a/oracle/pkg/store/store.go b/oracle/pkg/store/store.go index 7a73b45c6..cf989b064 100644 --- a/oracle/pkg/store/store.go +++ b/oracle/pkg/store/store.go @@ -40,7 +40,7 @@ CREATE TABLE IF NOT EXISTS encrypted_commitments ( committer BYTEA, commitment_hash BYTEA, commitment_signature BYTEA, - block_number BIGINT + dispatch_timestamp BIGINT );` var winnersTable = ` @@ -125,21 +125,21 @@ func (s *Store) AddEncryptedCommitment( committer []byte, commitmentHash []byte, commitmentSignature []byte, - blockNum int64, + dispatchTimestamp uint64, ) error { columns := []string{ "commitment_index", "committer", "commitment_hash", "commitment_signature", - "block_number", + "dispatch_timestamp", } values := []interface{}{ commitmentIdx, committer, commitmentHash, commitmentSignature, - blockNum, + dispatchTimestamp, } placeholder := make([]string, len(values)) for i := range columns { diff --git a/oracle/pkg/store/store_test.go b/oracle/pkg/store/store_test.go index becb863a6..d3d3ae932 100644 --- a/oracle/pkg/store/store_test.go +++ b/oracle/pkg/store/store_test.go @@ -5,6 +5,7 @@ import ( "database/sql" "fmt" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/google/go-cmp/cmp" @@ -210,15 +211,14 @@ func TestStore(t *testing.T) { t.Fatalf("Failed to create store: %s", err) } - for i, settlement := range settlements { - blkNo := int64(i/3) + 1 + for _, settlement := range settlements { err = st.AddEncryptedCommitment( context.Background(), settlement.CommitmentIdx, settlement.Builder, []byte("hash"), []byte("signature"), - blkNo, + uint64(time.Now().Unix()), ) if err != nil { t.Fatalf("Failed to add encrypted commitment: %s", err) diff --git a/oracle/pkg/updater/updater.go b/oracle/pkg/updater/updater.go index c0d6bd878..44215b312 100644 --- a/oracle/pkg/updater/updater.go +++ b/oracle/pkg/updater/updater.go @@ -9,7 +9,6 @@ import ( "math" "math/big" "strings" - "sync" "sync/atomic" "github.com/ethereum/go-ethereum/common" @@ -53,7 +52,7 @@ type WinnerRegister interface { committer []byte, commitmentHash []byte, commitmentSignature []byte, - blockNum int64, + dispatchTimestamp uint64, ) error IsSettled(ctx context.Context, commitmentIdx []byte) (bool, error) GetWinner(ctx context.Context, blockNum int64) (Winner, error) @@ -88,18 +87,16 @@ type EVMClient interface { } type Updater struct { - logger *slog.Logger - l1Client EVMClient - l2Client EVMClient - winnerRegister WinnerRegister - oracle Oracle - evtMgr events.EventManager - l1BlockCache *lru.Cache[uint64, map[string]int] - l2BlockTimeCache *lru.Cache[uint64, uint64] - encryptedCmts chan *preconf.PreconfcommitmentstoreEncryptedCommitmentStored - openedCmts chan *preconf.PreconfcommitmentstoreCommitmentStored - currentWindow atomic.Int64 - metrics *metrics + logger *slog.Logger + l1Client EVMClient + winnerRegister WinnerRegister + oracle Oracle + evtMgr events.EventManager + l1BlockCache *lru.Cache[uint64, map[string]int] + encryptedCmts chan *preconf.PreconfcommitmentstoreEncryptedCommitmentStored + openedCmts chan *preconf.PreconfcommitmentstoreCommitmentStored + currentWindow atomic.Int64 + metrics *metrics } func NewUpdater( @@ -113,22 +110,16 @@ func NewUpdater( if err != nil { return nil, fmt.Errorf("failed to create L1 block cache: %w", err) } - l2BlockTimeCache, err := lru.New[uint64, uint64](1024) - if err != nil { - return nil, fmt.Errorf("failed to create L2 block time cache: %w", err) - } return &Updater{ - logger: logger, - l1Client: l1Client, - l2Client: l2Client, - l1BlockCache: l1BlockCache, - l2BlockTimeCache: l2BlockTimeCache, - winnerRegister: winnerRegister, - evtMgr: evtMgr, - oracle: oracle, - metrics: newMetrics(), - openedCmts: make(chan *preconf.PreconfcommitmentstoreCommitmentStored), - encryptedCmts: make(chan *preconf.PreconfcommitmentstoreEncryptedCommitmentStored), + logger: logger, + l1Client: l1Client, + l1BlockCache: l1BlockCache, + winnerRegister: winnerRegister, + evtMgr: evtMgr, + oracle: oracle, + metrics: newMetrics(), + openedCmts: make(chan *preconf.PreconfcommitmentstoreCommitmentStored), + encryptedCmts: make(chan *preconf.PreconfcommitmentstoreEncryptedCommitmentStored), }, nil } @@ -141,74 +132,51 @@ func (u *Updater) Start(ctx context.Context) <-chan struct{} { eg, egCtx := errgroup.WithContext(ctx) - // Waits for events to be subscribed before returning - startWg := sync.WaitGroup{} - startWg.Add(2) - - eg.Go(func() error { - ev1 := events.NewEventHandler( - "EncryptedCommitmentStored", - func(update *preconf.PreconfcommitmentstoreEncryptedCommitmentStored) { - select { - case <-egCtx.Done(): - case u.encryptedCmts <- update: - } - }, - ) - sub1, err := u.evtMgr.Subscribe(ev1) - if err != nil { - startWg.Done() - return fmt.Errorf("failed to subscribe to encrypted commitments: %w", err) - } - defer sub1.Unsubscribe() - - ev2 := events.NewEventHandler( - "CommitmentStored", - func(update *preconf.PreconfcommitmentstoreCommitmentStored) { - select { - case <-egCtx.Done(): - case u.openedCmts <- update: - } - }, - ) - sub2, err := u.evtMgr.Subscribe(ev2) - if err != nil { - startWg.Done() - return fmt.Errorf("failed to subscribe to opened commitments: %w", err) - } - defer sub2.Unsubscribe() + ev1 := events.NewEventHandler( + "EncryptedCommitmentStored", + func(update *preconf.PreconfcommitmentstoreEncryptedCommitmentStored) { + select { + case <-egCtx.Done(): + case u.encryptedCmts <- update: + } + }, + ) - ev3 := events.NewEventHandler( - "NewWindow", - func(update *blocktracker.BlocktrackerNewWindow) { - u.currentWindow.Store(update.Window.Int64()) - }, - ) + ev2 := events.NewEventHandler( + "CommitmentStored", + func(update *preconf.PreconfcommitmentstoreCommitmentStored) { + select { + case <-egCtx.Done(): + case u.openedCmts <- update: + } + }, + ) - sub3, err := u.evtMgr.Subscribe(ev3) - if err != nil { - startWg.Done() - return fmt.Errorf("failed to subscribe to new window: %w", err) - } - defer sub3.Unsubscribe() + ev3 := events.NewEventHandler( + "NewWindow", + func(update *blocktracker.BlocktrackerNewWindow) { + u.currentWindow.Store(update.Window.Int64()) + }, + ) - startWg.Done() + sub, err := u.evtMgr.Subscribe(ev1, ev2, ev3) + if err != nil { + u.logger.Error("failed to subscribe to events", "error", err) + close(doneChan) + return doneChan + } + eg.Go(func() error { + defer sub.Unsubscribe() select { case <-egCtx.Done(): return nil - case err := <-sub1.Err(): - return err - case err := <-sub2.Err(): - return err - case err := <-sub3.Err(): + case err := <-sub.Err(): return err } }) eg.Go(func() error { - startWg.Done() - for { select { case <-egCtx.Done(): @@ -232,8 +200,6 @@ func (u *Updater) Start(ctx context.Context) <-chan struct{} { } }() - startWg.Wait() - return doneChan } @@ -247,7 +213,7 @@ func (u *Updater) handleEncryptedCommitment( update.Commiter.Bytes(), update.CommitmentDigest[:], update.CommitmentSignature, - update.BlockCommitedAt.Int64(), + update.DispatchTimestamp, ) if err != nil { u.logger.Error( @@ -261,7 +227,7 @@ func (u *Updater) handleEncryptedCommitment( u.logger.Debug( "added encrypted commitment", "commitmentIdx", common.Bytes2Hex(update.CommitmentIndex[:]), - "blockNumber", update.BlockCommitedAt.Int64(), + "dispatch timestamp", update.DispatchTimestamp, ) return nil } @@ -300,7 +266,7 @@ func (u *Updater) handleOpenedCommitment( } u.logger.Error( "failed to get winner", - "blockNumber", update.BlockCommitedAt.Int64(), + "blockNumber", update.BlockNumber, "error", err, ) return err @@ -340,20 +306,10 @@ func (u *Updater) handleOpenedCommitment( } // Compute the decay percentage - l2BlockTime, err := u.getL2BlockTime(ctx, update.BlockCommitedAt.Uint64()) - if err != nil { - u.logger.Error( - "failed to get L2 block time", - "blockNumber", update.BlockCommitedAt.Int64(), - "error", err, - ) - return err - } - decayPercentage := u.computeDecayPercentage( update.DecayStartTimeStamp, update.DecayEndTimeStamp, - l2BlockTime, + update.DispatchTimestamp, ) commitmentTxnHashes := strings.Split(update.TxnHash, ",") @@ -506,31 +462,12 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]in return txnsInBlock, nil } -func (u *Updater) getL2BlockTime(ctx context.Context, blockNum uint64) (uint64, error) { - time, ok := u.l2BlockTimeCache.Get(blockNum) - if ok { - u.metrics.BlockTimeCacheHits.Inc() - return time, nil - } - - u.metrics.BlockTimeCacheMisses.Inc() - - blk, err := u.l2Client.BlockByNumber(ctx, big.NewInt(0).SetUint64(blockNum)) - if err != nil { - return 0, fmt.Errorf("failed to get block by number: %w", err) - } - - _ = u.l2BlockTimeCache.Add(blockNum, blk.Header().Time) - - return blk.Header().Time, nil -} - // computeDecayPercentage takes startTimestamp, endTimestamp, commitTimestamp and computes a linear decay percentage // The computation does not care what format the timestamps are in, as long as they are consistent // (e.g they could be unix or unixMili timestamps) func (u *Updater) computeDecayPercentage(startTimestamp, endTimestamp, commitTimestamp uint64) int64 { if startTimestamp >= endTimestamp || startTimestamp > commitTimestamp || endTimestamp <= commitTimestamp { - u.logger.Info("invalid timestamps", "startTimestamp", startTimestamp, "endTimestamp", endTimestamp, "commitTimestamp", commitTimestamp) + u.logger.Info("timestamp out of range", "startTimestamp", startTimestamp, "endTimestamp", endTimestamp, "commitTimestamp", commitTimestamp) return 0 } diff --git a/oracle/pkg/updater/updater_test.go b/oracle/pkg/updater/updater_test.go index f2fadd367..e71b74a4e 100644 --- a/oracle/pkg/updater/updater_test.go +++ b/oracle/pkg/updater/updater_test.go @@ -63,6 +63,7 @@ func TestUpdater(t *testing.T) { // timestamp of the First block commitment is X startTimestamp := time.UnixMilli(1615195200000) + midTimestamp := startTimestamp.Add(time.Duration(2.5 * float64(time.Second))) endTimestamp := startTimestamp.Add(5 * time.Second) key, err := crypto.GenerateKey() @@ -95,7 +96,7 @@ func TestUpdater(t *testing.T) { CommitmentIndex: idxBytes, CommitmentDigest: common.HexToHash(fmt.Sprintf("0x%02d", i)), CommitmentSignature: []byte("signature"), - BlockCommitedAt: big.NewInt(0), + DispatchTimestamp: uint64(midTimestamp.UnixMilli()), } commitment := preconf.PreconfcommitmentstoreCommitmentStored{ CommitmentIndex: idxBytes, @@ -103,9 +104,9 @@ func TestUpdater(t *testing.T) { BlockNumber: 5, CommitmentHash: common.HexToHash(fmt.Sprintf("0x%02d", i)), CommitmentSignature: []byte("signature"), - BlockCommitedAt: big.NewInt(0), DecayStartTimeStamp: uint64(startTimestamp.UnixMilli()), DecayEndTimeStamp: uint64(endTimestamp.UnixMilli()), + DispatchTimestamp: uint64(midTimestamp.UnixMilli()), } if i%2 == 0 { @@ -135,7 +136,7 @@ func TestUpdater(t *testing.T) { Commiter: builderAddr, CommitmentDigest: common.HexToHash(fmt.Sprintf("0x%02d", i)), CommitmentSignature: []byte("signature"), - BlockCommitedAt: big.NewInt(0), + DispatchTimestamp: uint64(midTimestamp.UnixMilli()), } commitment := preconf.PreconfcommitmentstoreCommitmentStored{ CommitmentIndex: idxBytes, @@ -143,10 +144,10 @@ func TestUpdater(t *testing.T) { TxnHash: bundle, BlockNumber: 5, CommitmentHash: common.HexToHash(fmt.Sprintf("0x%02d", i)), - DispatchTimestamp: uint64((startTimestamp.UnixMilli() + endTimestamp.UnixMilli()) / 2), CommitmentSignature: []byte("signature"), DecayStartTimeStamp: uint64(startTimestamp.UnixMilli()), DecayEndTimeStamp: uint64(endTimestamp.UnixMilli()), + DispatchTimestamp: uint64(midTimestamp.UnixMilli()), } encCommitments = append(encCommitments, encCommitment) commitments = append(commitments, commitment) @@ -172,12 +173,6 @@ func TestUpdater(t *testing.T) { }, } - l2Client := &testEVMClient{ - blocks: map[int64]*types.Block{ - 0: types.NewBlock(&types.Header{Time: uint64(midTimestamp.UnixMilli())}, txns, nil, nil, NewHasher()), - }, - } - pcABI, err := abi.JSON(strings.NewReader(preconf.PreconfcommitmentstoreABI)) if err != nil { t.Fatal(err) @@ -201,7 +196,6 @@ func TestUpdater(t *testing.T) { updtr, err := updater.NewUpdater( slog.New(slog.NewTextHandler(io.Discard, nil)), l1Client, - l2Client, register, evtMgr, oracle, @@ -239,8 +233,8 @@ func TestUpdater(t *testing.T) { if !bytes.Equal(enc.commitmentSignature, ec.CommitmentSignature) { t.Fatal("wrong commitment signature") } - if enc.blockNum != 0 { - t.Fatal("wrong block number") + if enc.dispatchTimestamp != ec.DispatchTimestamp { + t.Fatal("wrong dispatch timestamp") } } } @@ -358,9 +352,9 @@ func TestUpdaterBundlesFailure(t *testing.T) { BlockNumber: 5, CommitmentHash: common.HexToHash(fmt.Sprintf("0x%02d", i)), CommitmentSignature: []byte("signature"), - BlockCommitedAt: big.NewInt(0), DecayStartTimeStamp: uint64(startTimestamp.UnixMilli()), DecayEndTimeStamp: uint64(endTimestamp.UnixMilli()), + DispatchTimestamp: uint64(midTimestamp.UnixMilli()), } commitments = append(commitments, commitment) @@ -385,12 +379,6 @@ func TestUpdaterBundlesFailure(t *testing.T) { }, } - l2Client := &testEVMClient{ - blocks: map[int64]*types.Block{ - 0: types.NewBlock(&types.Header{Time: uint64(midTimestamp.UnixMilli())}, txns, nil, nil, NewHasher()), - }, - } - oracle := &testOracle{ commitments: make(chan processedCommitment, 1), } @@ -414,7 +402,6 @@ func TestUpdaterBundlesFailure(t *testing.T) { updtr, err := updater.NewUpdater( slog.New(slog.NewTextHandler(io.Discard, nil)), l1Client, - l2Client, register, evtMgr, oracle, @@ -545,9 +532,9 @@ func TestUpdaterIgnoreCommitments(t *testing.T) { BlockNumber: blockNum, CommitmentHash: common.HexToHash(fmt.Sprintf("0x%02d", i)), CommitmentSignature: []byte("signature"), - DispatchTimestamp: 0, DecayStartTimeStamp: uint64(startTimestamp.UnixMilli()), DecayEndTimeStamp: uint64(endTimestamp.UnixMilli()), + DispatchTimestamp: uint64(midTimestamp.UnixMilli()), } if i == 9 { @@ -587,12 +574,6 @@ func TestUpdaterIgnoreCommitments(t *testing.T) { }, } - l2Client := &testEVMClient{ - blocks: map[int64]*types.Block{ - 0: types.NewBlock(&types.Header{Time: uint64(midTimestamp.UnixMilli())}, txns, nil, nil, NewHasher()), - }, - } - pcABI, err := abi.JSON(strings.NewReader(preconf.PreconfcommitmentstoreABI)) if err != nil { t.Fatal(err) @@ -616,7 +597,6 @@ func TestUpdaterIgnoreCommitments(t *testing.T) { updtr, err := updater.NewUpdater( slog.New(slog.NewTextHandler(io.Discard, nil)), l1Client, - l2Client, register, evtMgr, oracle, @@ -725,7 +705,7 @@ type testEncryptedCommitment struct { committer []byte commitmentHash []byte commitmentSignature []byte - blockNum int64 + dispatchTimestamp uint64 } type testWinner struct { @@ -801,14 +781,14 @@ func (t *testWinnerRegister) AddEncryptedCommitment( committer []byte, commitmentHash []byte, commitmentSignature []byte, - blockNum int64, + dispatchTimestamp uint64, ) error { t.encCommit <- testEncryptedCommitment{ commitmentIdx: commitmentIdx, committer: committer, commitmentHash: commitmentHash, commitmentSignature: commitmentSignature, - blockNum: blockNum, + dispatchTimestamp: dispatchTimestamp, } return nil } @@ -864,7 +844,7 @@ func publishEncCommitment( ec.Commiter, ec.CommitmentDigest, ec.CommitmentSignature, - ec.BlockCommitedAt, + ec.DispatchTimestamp, ) if err != nil { return err @@ -904,7 +884,7 @@ func publishCommitment( c.CommitmentHash, c.BidSignature, c.CommitmentSignature, - c.BlockCommitedAt, + c.DispatchTimestamp, c.SharedSecretKey, ) if err != nil { diff --git a/x/contracts/events/events.go b/x/contracts/events/events.go index 9950708dc..316a2ba51 100644 --- a/x/contracts/events/events.go +++ b/x/contracts/events/events.go @@ -170,12 +170,14 @@ func (l *Listener) Subscribe(ev ...EventHandler) (Subscription, error) { l.subMu.Lock() defer l.subMu.Unlock() + errC := make(chan error, len(ev)) sub := &subscription{ - errCh: make(chan error, len(ev)), + errCh: errC, unsub: func() { for _, event := range ev { l.unsubscribe(event.topic(), event) } + close(errC) }, } @@ -197,7 +199,6 @@ func (l *Listener) unsubscribe(topic common.Hash, event EventHandler) { for i, e := range events { if e.evt == event { events = append(events[:i], events[i+1:]...) - close(e.errCh) break } }