Skip to content

Commit

Permalink
fix: oracle changes after decay merge
Browse files Browse the repository at this point in the history
  • Loading branch information
Alok authored and Mikelle committed May 3, 2024
1 parent 5fa92db commit 932ac47
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 165 deletions.
1 change: 0 additions & 1 deletion oracle/pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ func NewNode(opts *Options) (*Node, error) {
updtr, err := updater.NewUpdater(
nd.logger.With("component", "updater"),
l1Client,
settlementClient,
st,
evtMgr,
oracleTransactorSession,
Expand Down
8 changes: 4 additions & 4 deletions oracle/pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ CREATE TABLE IF NOT EXISTS encrypted_commitments (
committer BYTEA,
commitment_hash BYTEA,
commitment_signature BYTEA,
block_number BIGINT
dispatch_timestamp BIGINT
);`

var winnersTable = `
Expand Down Expand Up @@ -125,21 +125,21 @@ func (s *Store) AddEncryptedCommitment(
committer []byte,
commitmentHash []byte,
commitmentSignature []byte,
blockNum int64,
dispatchTimestamp uint64,
) error {
columns := []string{
"commitment_index",
"committer",
"commitment_hash",
"commitment_signature",
"block_number",
"dispatch_timestamp",
}
values := []interface{}{
commitmentIdx,
committer,
commitmentHash,
commitmentSignature,
blockNum,
dispatchTimestamp,
}
placeholder := make([]string, len(values))
for i := range columns {
Expand Down
6 changes: 3 additions & 3 deletions oracle/pkg/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"fmt"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -210,15 +211,14 @@ func TestStore(t *testing.T) {
t.Fatalf("Failed to create store: %s", err)
}

for i, settlement := range settlements {
blkNo := int64(i/3) + 1
for _, settlement := range settlements {
err = st.AddEncryptedCommitment(
context.Background(),
settlement.CommitmentIdx,
settlement.Builder,
[]byte("hash"),
[]byte("signature"),
blkNo,
uint64(time.Now().Unix()),
)
if err != nil {
t.Fatalf("Failed to add encrypted commitment: %s", err)
Expand Down
179 changes: 58 additions & 121 deletions oracle/pkg/updater/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"math"
"math/big"
"strings"
"sync"
"sync/atomic"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -53,7 +52,7 @@ type WinnerRegister interface {
committer []byte,
commitmentHash []byte,
commitmentSignature []byte,
blockNum int64,
dispatchTimestamp uint64,
) error
IsSettled(ctx context.Context, commitmentIdx []byte) (bool, error)
GetWinner(ctx context.Context, blockNum int64) (Winner, error)
Expand Down Expand Up @@ -88,18 +87,16 @@ type EVMClient interface {
}

type Updater struct {
logger *slog.Logger
l1Client EVMClient
l2Client EVMClient
winnerRegister WinnerRegister
oracle Oracle
evtMgr events.EventManager
l1BlockCache *lru.Cache[uint64, map[string]int]
l2BlockTimeCache *lru.Cache[uint64, uint64]
encryptedCmts chan *preconf.PreconfcommitmentstoreEncryptedCommitmentStored
openedCmts chan *preconf.PreconfcommitmentstoreCommitmentStored
currentWindow atomic.Int64
metrics *metrics
logger *slog.Logger
l1Client EVMClient
winnerRegister WinnerRegister
oracle Oracle
evtMgr events.EventManager
l1BlockCache *lru.Cache[uint64, map[string]int]
encryptedCmts chan *preconf.PreconfcommitmentstoreEncryptedCommitmentStored
openedCmts chan *preconf.PreconfcommitmentstoreCommitmentStored
currentWindow atomic.Int64
metrics *metrics
}

func NewUpdater(
Expand All @@ -113,22 +110,16 @@ func NewUpdater(
if err != nil {
return nil, fmt.Errorf("failed to create L1 block cache: %w", err)
}
l2BlockTimeCache, err := lru.New[uint64, uint64](1024)
if err != nil {
return nil, fmt.Errorf("failed to create L2 block time cache: %w", err)
}
return &Updater{
logger: logger,
l1Client: l1Client,
l2Client: l2Client,
l1BlockCache: l1BlockCache,
l2BlockTimeCache: l2BlockTimeCache,
winnerRegister: winnerRegister,
evtMgr: evtMgr,
oracle: oracle,
metrics: newMetrics(),
openedCmts: make(chan *preconf.PreconfcommitmentstoreCommitmentStored),
encryptedCmts: make(chan *preconf.PreconfcommitmentstoreEncryptedCommitmentStored),
logger: logger,
l1Client: l1Client,
l1BlockCache: l1BlockCache,
winnerRegister: winnerRegister,
evtMgr: evtMgr,
oracle: oracle,
metrics: newMetrics(),
openedCmts: make(chan *preconf.PreconfcommitmentstoreCommitmentStored),
encryptedCmts: make(chan *preconf.PreconfcommitmentstoreEncryptedCommitmentStored),
}, nil
}

Expand All @@ -141,74 +132,51 @@ func (u *Updater) Start(ctx context.Context) <-chan struct{} {

eg, egCtx := errgroup.WithContext(ctx)

// Waits for events to be subscribed before returning
startWg := sync.WaitGroup{}
startWg.Add(2)

eg.Go(func() error {
ev1 := events.NewEventHandler(
"EncryptedCommitmentStored",
func(update *preconf.PreconfcommitmentstoreEncryptedCommitmentStored) {
select {
case <-egCtx.Done():
case u.encryptedCmts <- update:
}
},
)
sub1, err := u.evtMgr.Subscribe(ev1)
if err != nil {
startWg.Done()
return fmt.Errorf("failed to subscribe to encrypted commitments: %w", err)
}
defer sub1.Unsubscribe()

ev2 := events.NewEventHandler(
"CommitmentStored",
func(update *preconf.PreconfcommitmentstoreCommitmentStored) {
select {
case <-egCtx.Done():
case u.openedCmts <- update:
}
},
)
sub2, err := u.evtMgr.Subscribe(ev2)
if err != nil {
startWg.Done()
return fmt.Errorf("failed to subscribe to opened commitments: %w", err)
}
defer sub2.Unsubscribe()
ev1 := events.NewEventHandler(
"EncryptedCommitmentStored",
func(update *preconf.PreconfcommitmentstoreEncryptedCommitmentStored) {
select {
case <-egCtx.Done():
case u.encryptedCmts <- update:
}
},
)

ev3 := events.NewEventHandler(
"NewWindow",
func(update *blocktracker.BlocktrackerNewWindow) {
u.currentWindow.Store(update.Window.Int64())
},
)
ev2 := events.NewEventHandler(
"CommitmentStored",
func(update *preconf.PreconfcommitmentstoreCommitmentStored) {
select {
case <-egCtx.Done():
case u.openedCmts <- update:
}
},
)

sub3, err := u.evtMgr.Subscribe(ev3)
if err != nil {
startWg.Done()
return fmt.Errorf("failed to subscribe to new window: %w", err)
}
defer sub3.Unsubscribe()
ev3 := events.NewEventHandler(
"NewWindow",
func(update *blocktracker.BlocktrackerNewWindow) {
u.currentWindow.Store(update.Window.Int64())
},
)

startWg.Done()
sub, err := u.evtMgr.Subscribe(ev1, ev2, ev3)
if err != nil {
u.logger.Error("failed to subscribe to events", "error", err)
close(doneChan)
return doneChan
}

eg.Go(func() error {
defer sub.Unsubscribe()
select {
case <-egCtx.Done():
return nil
case err := <-sub1.Err():
return err
case err := <-sub2.Err():
return err
case err := <-sub3.Err():
case err := <-sub.Err():
return err
}
})

eg.Go(func() error {
startWg.Done()

for {
select {
case <-egCtx.Done():
Expand All @@ -232,8 +200,6 @@ func (u *Updater) Start(ctx context.Context) <-chan struct{} {
}
}()

startWg.Wait()

return doneChan
}

Expand All @@ -247,7 +213,7 @@ func (u *Updater) handleEncryptedCommitment(
update.Commiter.Bytes(),
update.CommitmentDigest[:],
update.CommitmentSignature,
update.BlockCommitedAt.Int64(),
update.DispatchTimestamp,
)
if err != nil {
u.logger.Error(
Expand All @@ -261,7 +227,7 @@ func (u *Updater) handleEncryptedCommitment(
u.logger.Debug(
"added encrypted commitment",
"commitmentIdx", common.Bytes2Hex(update.CommitmentIndex[:]),
"blockNumber", update.BlockCommitedAt.Int64(),
"dispatch timestamp", update.DispatchTimestamp,
)
return nil
}
Expand Down Expand Up @@ -300,7 +266,7 @@ func (u *Updater) handleOpenedCommitment(
}
u.logger.Error(
"failed to get winner",
"blockNumber", update.BlockCommitedAt.Int64(),
"blockNumber", update.BlockNumber,
"error", err,
)
return err
Expand Down Expand Up @@ -340,20 +306,10 @@ func (u *Updater) handleOpenedCommitment(
}

// Compute the decay percentage
l2BlockTime, err := u.getL2BlockTime(ctx, update.BlockCommitedAt.Uint64())
if err != nil {
u.logger.Error(
"failed to get L2 block time",
"blockNumber", update.BlockCommitedAt.Int64(),
"error", err,
)
return err
}

decayPercentage := u.computeDecayPercentage(
update.DecayStartTimeStamp,
update.DecayEndTimeStamp,
l2BlockTime,
update.DispatchTimestamp,
)

commitmentTxnHashes := strings.Split(update.TxnHash, ",")
Expand Down Expand Up @@ -506,31 +462,12 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]in
return txnsInBlock, nil
}

func (u *Updater) getL2BlockTime(ctx context.Context, blockNum uint64) (uint64, error) {
time, ok := u.l2BlockTimeCache.Get(blockNum)
if ok {
u.metrics.BlockTimeCacheHits.Inc()
return time, nil
}

u.metrics.BlockTimeCacheMisses.Inc()

blk, err := u.l2Client.BlockByNumber(ctx, big.NewInt(0).SetUint64(blockNum))
if err != nil {
return 0, fmt.Errorf("failed to get block by number: %w", err)
}

_ = u.l2BlockTimeCache.Add(blockNum, blk.Header().Time)

return blk.Header().Time, nil
}

// computeDecayPercentage takes startTimestamp, endTimestamp, commitTimestamp and computes a linear decay percentage
// The computation does not care what format the timestamps are in, as long as they are consistent
// (e.g they could be unix or unixMili timestamps)
func (u *Updater) computeDecayPercentage(startTimestamp, endTimestamp, commitTimestamp uint64) int64 {
if startTimestamp >= endTimestamp || startTimestamp > commitTimestamp || endTimestamp <= commitTimestamp {
u.logger.Info("invalid timestamps", "startTimestamp", startTimestamp, "endTimestamp", endTimestamp, "commitTimestamp", commitTimestamp)
u.logger.Info("timestamp out of range", "startTimestamp", startTimestamp, "endTimestamp", endTimestamp, "commitTimestamp", commitTimestamp)
return 0
}

Expand Down
Loading

0 comments on commit 932ac47

Please sign in to comment.