Skip to content

Commit

Permalink
feat: add predicate to filter destination (#131)
Browse files Browse the repository at this point in the history
* feat: add predicate to filter destination

* refactor: rename predicate type
  • Loading branch information
taco-paco authored Apr 30, 2024
1 parent 42dd24b commit 2ca4742
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 28 deletions.
41 changes: 21 additions & 20 deletions operator/attestor/attestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,27 @@ func (attestor *Attestor) processRollupHeaders(rollupId uint32, headersC chan *e
func (attestor *Attestor) processHeader(rollupId uint32, rollupHeader *ethtypes.Header, ctx context.Context) {
attestor.logger.Info("Processing header", "rollupId", rollupId, "height", rollupHeader.Number.Uint64())

mqBlocksC, id := attestor.notifier.Subscribe(rollupId)
predicate := func(mqBlock consumer.BlockData) bool {
if mqBlock.RollupId != rollupId {
attestor.logger.Warnf("Subscriber expected rollupId: %v, but got %v", rollupId, mqBlock.RollupId)
return false
}

if rollupHeader.Number.Cmp(mqBlock.Block.Header().Number) != 0 {
return false
}

if mqBlock.Block.Header().Root != rollupHeader.Root {
attestor.logger.Warnf("StateRoot from MQ doesn't match one from Node")
attestor.listener.OnBlockMismatch()

return false
}

return true
}

mqBlocksC, id := attestor.notifier.Subscribe(rollupId, predicate)
defer attestor.notifier.Unsubscribe(rollupId, id)

transactionId := [32]byte{0}
Expand All @@ -257,25 +277,6 @@ loop:
break loop

case mqBlock := <-mqBlocksC:
attestor.logger.Info("Received MQ block", "height", mqBlock.Block.Header().Number.Uint64(), "expectedHeight", rollupHeader.Number.Uint64(), "rollupId", mqBlock.RollupId)

if mqBlock.RollupId != rollupId {
attestor.logger.Warnf("Subscriber expected rollupId: %v, but got %v", rollupId, mqBlock.RollupId)
continue loop
}

// Filter notifications
if rollupHeader.Number.Cmp(mqBlock.Block.Header().Number) != 0 {
continue loop
}

if mqBlock.Block.Header().Root != rollupHeader.Root {
attestor.logger.Warnf("StateRoot from MQ doesn't match one from Node")
attestor.listener.OnBlockMismatch()

break loop
}

attestor.logger.Info("MQ block found", "height", mqBlock.Block.Header().Number.Uint64(), "rollupId", mqBlock.RollupId)

daCommitment = mqBlock.Commitment
Expand Down
24 changes: 18 additions & 6 deletions operator/attestor/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ import (
"github.com/NethermindEth/near-sffl/operator/consumer"
)

type BlockPredicate = func(consumer.BlockData) bool
type subscriberData struct {
predicate BlockPredicate
notifierC chan consumer.BlockData
}

// Notifier Broadcasts block from some rollup
// to subscribers
type Notifier struct {
Expand All @@ -20,18 +26,21 @@ func NewNotifier() Notifier {
}
}

func (notifier *Notifier) Subscribe(rollupId uint32) (<-chan consumer.BlockData, *list.Element) {
func (notifier *Notifier) Subscribe(rollupId uint32, predicate BlockPredicate) (<-chan consumer.BlockData, *list.Element) {
notifier.notifierLock.Lock()
defer notifier.notifierLock.Unlock()

if _, exists := notifier.rollupIdsToSubscribers[rollupId]; !exists {
notifier.rollupIdsToSubscribers[rollupId] = list.New()
}

notifierC := make(chan consumer.BlockData, 150)
id := notifier.rollupIdsToSubscribers[rollupId].PushBack(notifierC)
subscriber := subscriberData{
predicate: predicate,
notifierC: make(chan consumer.BlockData, 100),
}
id := notifier.rollupIdsToSubscribers[rollupId].PushBack(subscriber)

return notifierC, id
return subscriber.notifierC, id
}

func (notifier *Notifier) Notify(rollupId uint32, block consumer.BlockData) error {
Expand All @@ -44,12 +53,15 @@ func (notifier *Notifier) Notify(rollupId uint32, block consumer.BlockData) erro
}

for el := subscribers.Front(); el != nil; el = el.Next() {
subscriber, ok := el.Value.(chan consumer.BlockData)
subscriber, ok := el.Value.(subscriberData)
if !ok {
panic("Notifier: unreachable")
}
if !subscriber.predicate(block) {
continue
}

subscriber <- block
subscriber.notifierC <- block
}

return nil
Expand Down
19 changes: 17 additions & 2 deletions operator/attestor/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,15 @@ func subscribe(notifier *Notifier, blocks []consumer.BlockData, subscribedWg *sy
unsubscribedWg.Add(1)

go func(block consumer.BlockData, notifier *Notifier) {
blocksC, id := notifier.Subscribe(block.RollupId)
predicate := func(mqBlock consumer.BlockData) bool {
if block.Block.Header().Number.Cmp(mqBlock.Block.Header().Number) != 0 {
return false
}

return true
}

blocksC, id := notifier.Subscribe(block.RollupId, predicate)
subscribedWg.Done()

defer func() {
Expand Down Expand Up @@ -98,7 +106,14 @@ func TestNotifierSubscribeAndUnsubscribe(t *testing.T) {
block := generateBlockData()
notifier := NewNotifier()

_, id := notifier.Subscribe(block.RollupId)
predicate := func(mqBlock consumer.BlockData) bool {
if block.Block.Header().Number.Cmp(mqBlock.Block.Header().Number) != 0 {
return false
}

return true
}
_, id := notifier.Subscribe(block.RollupId, predicate)
assert.Equal(t, notifier.rollupIdsToSubscribers[block.RollupId].Len(), 1)

notifier.Unsubscribe(block.RollupId, id)
Expand Down

0 comments on commit 2ca4742

Please sign in to comment.