Skip to content

Commit

Permalink
fix: misc changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Alok committed Jul 27, 2024
1 parent 100d3e5 commit 69f821c
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 38 deletions.
55 changes: 33 additions & 22 deletions p2p/integrationtest/real-bidder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,24 +173,31 @@ func main() {
txns []string
}

blockChans := make([]chan blockWithTxns, *bidWorkers)
blockChans := make([]chan *blockWithTxns, *bidWorkers)
for i := 0; i < *bidWorkers; i++ {
blockChans[i] = make(chan blockWithTxns, 1)
blockChans[i] = make(chan *blockWithTxns, 1)
}

wg.Add(1)
go func(logger *slog.Logger) {
defer wg.Done()

currentBlkNum := int64(0)
for {
block, blkNum, err := RetreivedBlock(rpcClient)
if err != nil || len(block) == 0 {
logger.Error("failed to get block", "err", err)
currentBlkNum := uint64(0)
ticker := time.NewTicker(2 * time.Second)
for _ = range ticker.C {

Check failure on line 187 in p2p/integrationtest/real-bidder/main.go

View workflow job for this annotation

GitHub Actions / Test and Build Go Modules

S1005: unnecessary assignment to the blank identifier (gosimple)

Check failure on line 187 in p2p/integrationtest/real-bidder/main.go

View workflow job for this annotation

GitHub Actions / Test and Build Go Modules

S1005: unnecessary assignment to the blank identifier (gosimple)
blkNum, err := rpcClient.BlockNumber(context.Background())
if err != nil {
logger.Error("failed to get block number", "err", err)
continue
}

if blkNum <= currentBlkNum {
continue
}

if currentBlkNum == blkNum {
time.Sleep(1 * time.Second)
block, err := RetrieveTxns(rpcClient, blkNum)
if err != nil {
logger.Error("failed to get block", "err", err)
continue
}

Expand All @@ -199,8 +206,8 @@ func main() {
txns := make([]string, len(block))
copy(txns, block)

ch <- blockWithTxns{
blockNum: blkNum,
ch <- &blockWithTxns{
blockNum: int64(blkNum),
txns: txns,
}
}
Expand All @@ -209,13 +216,16 @@ func main() {

for i := 0; i < *bidWorkers; i++ {
wg.Add(1)
go func(logger *slog.Logger, newBlockChan <-chan blockWithTxns) {
go func(logger *slog.Logger, newBlockChan <-chan *blockWithTxns) {
defer wg.Done()
ticker := time.NewTicker(200 * time.Millisecond)
currentBlock := blockWithTxns{}
currentBlock := &blockWithTxns{}
for {
select {
case block := <-newBlockChan:
if block.blockNum <= currentBlock.blockNum {
continue
}
currentBlock = block
case <-ticker.C:
}
Expand All @@ -238,7 +248,6 @@ func main() {
err = sendBid(
bidderClient,
logger,
rpcClient,
currentBlock.txns[bundleStart:bundleEnd],
currentBlock.blockNum,
(time.Now().UnixMilli())-int64(startTimeDiff),
Expand All @@ -254,14 +263,10 @@ func main() {
wg.Wait()
}

func RetreivedBlock(rpcClient *ethclient.Client) ([]string, int64, error) {
blkNum, err := rpcClient.BlockNumber(context.Background())
if err != nil {
return nil, -1, err
}
func RetrieveTxns(rpcClient *ethclient.Client, blkNum uint64) ([]string, error) {
fullBlock, err := rpcClient.BlockByNumber(context.Background(), big.NewInt(int64(blkNum)))
if err != nil {
return nil, -1, err
return nil, err
}

blockTxns := []string{}
Expand All @@ -270,18 +275,24 @@ func RetreivedBlock(rpcClient *ethclient.Client) ([]string, int64, error) {
blockTxns = append(blockTxns, strings.TrimPrefix(txn.Hash().Hex(), "0x"))
}

return blockTxns, int64(blkNum), nil
if len(blockTxns) == 0 {
return nil, errors.New("no txns in block")
}

return blockTxns, nil
}

func sendBid(
bidderClient pb.BidderClient,
logger *slog.Logger,
rpcClient *ethclient.Client,
txnHashes []string,
blkNum int64,
decayStartTimestamp int64,
decayEndTimestamp int64,
) error {
if len(txnHashes) == 0 {
return errors.New("no txns to send")
}
amount := rand.Intn(200000)
amount += 100000

Expand Down
98 changes: 82 additions & 16 deletions p2p/pkg/preconfirmation/store/store.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package store

import (
"bytes"
"encoding/binary"
"fmt"
"sync"

Expand All @@ -16,6 +16,8 @@ const (

// block winners
blockWinnerNS = "bw/"

cmtIndexNS = "ci/"
)

var (
Expand All @@ -28,6 +30,9 @@ var (
blockWinnerKey = func(blockNumber int64) string {
return fmt.Sprintf("%s%d", blockWinnerNS, blockNumber)
}
cmtIndexKey = func(cIndex []byte) string {
return fmt.Sprintf("%s%s", cmtIndexNS, string(cIndex))
}
)

type Store struct {
Expand All @@ -52,17 +57,40 @@ func New(st storage.Storage) *Store {
}
}

func (s *Store) AddCommitment(commitment *EncryptedPreConfirmationWithDecrypted) error {
func (s *Store) AddCommitment(commitment *EncryptedPreConfirmationWithDecrypted) (err error) {
s.mu.Lock()
defer s.mu.Unlock()

var writer storage.Writer
if w, ok := s.st.(storage.Batcher); ok {
batch := w.Batch()
writer = batch
defer func() {
if err == nil {
err = batch.Write()
}
}()

} else {
writer = s.st
}

key := commitmentKey(commitment.Bid.BlockNumber, commitment.EncryptedPreConfirmation.Commitment)

buf, err := msgpack.Marshal(commitment)
if err != nil {
return err
}
return s.st.Put(key, buf)

if err := writer.Put(key, buf); err != nil {
return err
}

cIndexKey := cmtIndexKey(commitment.EncryptedPreConfirmation.Commitment)
blkNumBuf := make([]byte, 8)
binary.LittleEndian.PutUint64(blkNumBuf, uint64(commitment.Bid.BlockNumber))

return writer.Put(cIndexKey, blkNumBuf)
}

func (s *Store) GetCommitments(blockNum int64) ([]*EncryptedPreConfirmationWithDecrypted, error) {
Expand Down Expand Up @@ -107,18 +135,50 @@ func (s *Store) DeleteCommitmentByDigest(blockNum int64, digest [32]byte) error
}

func (s *Store) SetCommitmentIndexByDigest(cDigest, cIndex [32]byte) error {
var cmt *EncryptedPreConfirmationWithDecrypted
s.mu.RLock()
blkNumBuf, err := s.st.Get(cmtIndexKey(cDigest[:]))
s.mu.RUnlock()
switch {
case err == storage.ErrKeyNotFound:
// this would happen for most of the commitments as the node only
// stores the commitments it is involved in.
return nil
case err != nil:
return err
}

s.mu.Lock()
defer s.mu.Unlock()

blkNum := binary.LittleEndian.Uint64(blkNumBuf)
commitmentKey := commitmentKey(int64(blkNum), cDigest[:])
cmtBuf, err := s.st.Get(commitmentKey)
if err != nil {
return err
}

cmt := new(EncryptedPreConfirmationWithDecrypted)
err = msgpack.Unmarshal(cmtBuf, cmt)
if err != nil {
return err
}

cmt.EncryptedPreConfirmation.CommitmentIndex = cIndex[:]
buf, err := msgpack.Marshal(cmt)
if err != nil {
return err
}

return s.st.Put(commitmentKey, buf)
}

func (s *Store) ClearCommitmentIndexes(uptoBlock int64) error {
keys := make([]string, 0)
s.mu.RLock()
err := s.st.WalkPrefix(commitmentNS, func(key string, value []byte) bool {
c := new(EncryptedPreConfirmationWithDecrypted)
err := msgpack.Unmarshal(value, c)
if err != nil {
return false
}
if bytes.Equal(c.EncryptedPreConfirmation.Commitment, cDigest[:]) {
cmt = c
return true
err := s.st.WalkPrefix(cmtIndexNS, func(key string, val []byte) bool {
blkNum := binary.LittleEndian.Uint64([]byte(val))
if blkNum < uint64(uptoBlock) {
keys = append(keys, key)
}
return false
})
Expand All @@ -127,10 +187,16 @@ func (s *Store) SetCommitmentIndexByDigest(cDigest, cIndex [32]byte) error {
return err
}

if cmt != nil {
cmt.EncryptedPreConfirmation.CommitmentIndex = cIndex[:]
return s.AddCommitment(cmt)
s.mu.Lock()
defer s.mu.Unlock()

for _, key := range keys {
err := s.st.Delete(key)
if err != nil {
return err
}
}

return nil
}

Expand Down
37 changes: 37 additions & 0 deletions p2p/pkg/preconfirmation/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,43 @@ func TestStore_ClearBlockNumber(t *testing.T) {
}
}

func TestStore_ClearCommitmentIndex(t *testing.T) {
inmemstore := inmem.New()
st := store.New(inmemstore)
commitment := &store.EncryptedPreConfirmationWithDecrypted{
EncryptedPreConfirmation: &preconfpb.EncryptedPreConfirmation{
Commitment: []byte("commitment"),
},
PreConfirmation: &preconfpb.PreConfirmation{
Bid: &preconfpb.Bid{
BlockNumber: 1,
},
},
}
err := st.AddCommitment(commitment)
if err != nil {
t.Fatal(err)
}

err = st.ClearCommitmentIndexes(2)
if err != nil {
t.Fatal(err)
}

entries := 0
err = inmemstore.WalkPrefix("ci/", func(_ string, _ []byte) bool {
entries++
return false
})
if err != nil {
t.Fatal(err)
}

if entries != 0 {
t.Fatalf("expected 0 entries, got %d", entries)
}
}

func TestStore_DeleteCommitmentByDigest(t *testing.T) {
st := store.New(inmem.New())
digest := [32]byte{}
Expand Down
34 changes: 34 additions & 0 deletions p2p/pkg/preconfirmation/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type CommitmentStore interface {
commitmentDigest,
commitmentIndex [32]byte,
) error
ClearCommitmentIndexes(upto int64) error
AddWinner(winner *store.BlockWinner) error
BlockWinners() ([]*store.BlockWinner, error)
}
Expand Down Expand Up @@ -276,6 +277,10 @@ func (t *Tracker) Start(ctx context.Context) <-chan struct{} {
}
})

eg.Go(func() error {
return t.clearCommitments(egCtx)
})

if t.peerType == p2p.PeerTypeBidder {
eg.Go(func() error {
for {
Expand Down Expand Up @@ -449,6 +454,35 @@ func (t *Tracker) openCommitments(
return nil
}

func (t *Tracker) clearCommitments(ctx context.Context) error {
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
winners, err := t.store.BlockWinners()
if err != nil {
return err
}

if len(winners) == 0 {
continue
}

// clear commitment indexes for all the blocks before the oldest winner
err = t.store.ClearCommitmentIndexes(winners[0].BlockNumber)
if err != nil {
return err
}

t.logger.Info("commitment indexes cleared", "blockNumber", winners[0].BlockNumber)
}
}

func (t *Tracker) handleEncryptedCommitmentStored(
ctx context.Context,
ec *preconfcommstore.PreconfcommitmentstoreEncryptedCommitmentStored,
Expand Down

0 comments on commit 69f821c

Please sign in to comment.