From 95ff331160c03ee055ce043637267cac7c667386 Mon Sep 17 00:00:00 2001 From: Connor Stein Date: Wed, 19 Jun 2024 13:01:40 -0400 Subject: [PATCH] Add types to commit spec (#1045) ## Motivation Improving readability of spec ## Solution Add typing --- .../ocr3/plugins/ccip/commit/plugin.go | 71 ++++--- .../plugins/ccip/commit/plugin_e2e_test.go | 34 ++- .../plugins/ccip/commit/plugin_functions.go | 66 ++---- .../ccip/commit/plugin_functions_test.go | 70 +------ .../internal/libs/testhelpers/ocr3runner.go | 9 +- .../ocr3/plugins/ccip/spec/commit_plugin.py | 194 ++++++++++++------ 6 files changed, 234 insertions(+), 210 deletions(-) diff --git a/core/services/ocr3/plugins/ccip/commit/plugin.go b/core/services/ocr3/plugins/ccip/commit/plugin.go index e4657d049a..f27dfa37a4 100644 --- a/core/services/ocr3/plugins/ccip/commit/plugin.go +++ b/core/services/ocr3/plugins/ccip/commit/plugin.go @@ -50,7 +50,13 @@ func NewPlugin( ) *Plugin { knownSourceChains := mapset.NewSet[cciptypes.ChainSelector]() for _, inf := range cfg.ObserverInfo { - knownSourceChains = knownSourceChains.Union(mapset.NewSet(inf.Reads...)) + var sources []cciptypes.ChainSelector + for _, chain := range inf.Reads { + if chain != cfg.DestChain { + sources = append(sources, chain) + } + } + knownSourceChains = knownSourceChains.Union(mapset.NewSet(sources...)) } return &Plugin{ @@ -96,30 +102,10 @@ func (p *Plugin) Query(_ context.Context, _ ocr3types.OutcomeContext) (types.Que // We discover the token prices only for the tokens that are used to pay for ccip fees. // The fee tokens are configured in the plugin config. func (p *Plugin) Observation(ctx context.Context, outctx ocr3types.OutcomeContext, _ types.Query) (types.Observation, error) { - maxSeqNumsPerChain, seqNumsInSync, err := observeMaxSeqNums( - ctx, - p.lggr, - p.ccipReader, - outctx.PreviousOutcome, - p.readableChains, - p.cfg.DestChain, - p.knownSourceChainsSlice(), - ) - if err != nil { - return types.Observation{}, fmt.Errorf("observe max sequence numbers per chain: %w", err) - } - - newMsgs, err := observeNewMsgs( - ctx, - p.lggr, - p.ccipReader, - p.msgHasher, - p.readableChains, - maxSeqNumsPerChain, - p.cfg.NewMsgScanBatchSize, - ) + msgBaseDetails := make([]cciptypes.CCIPMsgBaseDetails, 0) + latestCommittedSeqNumsObservation, err := observeLatestCommittedSeqNums(ctx, p.lggr, p.ccipReader, p.readableChains, p.cfg.DestChain, p.knownSourceChains.ToSlice()) if err != nil { - return types.Observation{}, fmt.Errorf("observe new messages: %w", err) + return types.Observation{}, fmt.Errorf("observe latest committed sequence numbers: %w", err) } var tokenPrices []cciptypes.TokenPrice @@ -140,25 +126,44 @@ func (p *Plugin) Observation(ctx context.Context, outctx ocr3types.OutcomeContex if err != nil { return types.Observation{}, fmt.Errorf("observe gas prices: %w", err) } + // If there's no previous outcome (first round ever), we only observe the latest committed sequence numbers. + // and on the next round we use those to look for messages. + if outctx.PreviousOutcome == nil { + p.lggr.Debugw("first round ever, can't observe new messages yet") + return cciptypes.NewCommitPluginObservation(msgBaseDetails, gasPrices, tokenPrices, latestCommittedSeqNumsObservation, p.cfg).Encode() + } + + prevOutcome, err := cciptypes.DecodeCommitPluginOutcome(outctx.PreviousOutcome) + if err != nil { + return types.Observation{}, fmt.Errorf("decode commit plugin previous outcome: %w", err) + } + p.lggr.Debugw("previous outcome decoded", "outcome", prevOutcome.String()) + + // Always observe based on previous outcome. We'll filter out stale messages in the outcome phase. + newMsgs, err := observeNewMsgs( + ctx, + p.lggr, + p.ccipReader, + p.msgHasher, + p.readableChains, + prevOutcome.MaxSeqNums, // TODO: Chainlink common PR to rename. + p.cfg.NewMsgScanBatchSize, + ) + if err != nil { + return types.Observation{}, fmt.Errorf("observe new messages: %w", err) + } p.lggr.Infow("submitting observation", "observedNewMsgs", len(newMsgs), "gasPrices", len(gasPrices), "tokenPrices", len(tokenPrices), - "maxSeqNumsPerChain", maxSeqNumsPerChain, + "latestCommittedSeqNums", latestCommittedSeqNumsObservation, "observerInfo", p.cfg.ObserverInfo) - msgBaseDetails := make([]cciptypes.CCIPMsgBaseDetails, 0) for _, msg := range newMsgs { msgBaseDetails = append(msgBaseDetails, msg.CCIPMsgBaseDetails) } - - if !seqNumsInSync { - // If the node was not able to sync the max sequence numbers we don't want to transmit - // the potentially outdated ones. We expect that a sufficient number of nodes will be able to observe them. - maxSeqNumsPerChain = nil - } - return cciptypes.NewCommitPluginObservation(msgBaseDetails, gasPrices, tokenPrices, maxSeqNumsPerChain, p.cfg).Encode() + return cciptypes.NewCommitPluginObservation(msgBaseDetails, gasPrices, tokenPrices, latestCommittedSeqNumsObservation, p.cfg).Encode() } func (p *Plugin) ValidateObservation(_ ocr3types.OutcomeContext, _ types.Query, ao types.AttributedObservation) error { diff --git a/core/services/ocr3/plugins/ccip/commit/plugin_e2e_test.go b/core/services/ocr3/plugins/ccip/commit/plugin_e2e_test.go index 888f2a3f63..a6ae763737 100644 --- a/core/services/ocr3/plugins/ccip/commit/plugin_e2e_test.go +++ b/core/services/ocr3/plugins/ccip/commit/plugin_e2e_test.go @@ -5,6 +5,8 @@ import ( "reflect" "testing" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" "github.com/smartcontractkit/libocr/commontypes" @@ -29,6 +31,7 @@ func TestPlugin(t *testing.T) { expErr func(*testing.T, error) expOutcome cciptypes.CommitPluginOutcome expTransmittedReports []cciptypes.CommitPluginReport + initialOutcome cciptypes.CommitPluginOutcome }{ { name: "EmptyOutcome", @@ -69,6 +72,15 @@ func TestPlugin(t *testing.T) { }, }, }, + initialOutcome: cciptypes.CommitPluginOutcome{ + MaxSeqNums: []cciptypes.SeqNumChain{ + {ChainSel: chainA, SeqNum: 10}, + {ChainSel: chainB, SeqNum: 20}, + }, + MerkleRoots: []cciptypes.MerkleRootChain{}, + TokenPrices: []cciptypes.TokenPrice{}, + GasPrices: []cciptypes.GasPriceChain{}, + }, }, { name: "NodesDoNotAgreeOnMsgs", @@ -98,6 +110,15 @@ func TestPlugin(t *testing.T) { }, }, }, + initialOutcome: cciptypes.CommitPluginOutcome{ + MaxSeqNums: []cciptypes.SeqNumChain{ + {ChainSel: chainA, SeqNum: 10}, + {ChainSel: chainB, SeqNum: 20}, + }, + MerkleRoots: []cciptypes.MerkleRootChain{}, + TokenPrices: []cciptypes.TokenPrice{}, + GasPrices: []cciptypes.GasPriceChain{}, + }, }, } @@ -118,7 +139,9 @@ func TestPlugin(t *testing.T) { for _, n := range nodesSetup { nodeIDs = append(nodeIDs, n.node.nodeID) } - runner := testhelpers.NewOCR3Runner(nodes, nodeIDs) + o, err := tc.initialOutcome.Encode() + require.NoError(t, err) + runner := testhelpers.NewOCR3Runner(nodes, nodeIDs, o) res, err := runner.RunRound(ctx) if tc.expErr != nil { @@ -203,10 +226,6 @@ func setupAllNodesReadAllChains(ctx context.Context, t *testing.T, lggr logger.L nodes := []nodeSetup{n1, n2, n3} for _, n := range nodes { - // all nodes observe the same sequence numbers 10 for chainA and 20 for chainB - n.ccipReader.On("NextSeqNum", ctx, []cciptypes.ChainSelector{chainA, chainB}). - Return([]cciptypes.SeqNum{10, 20}, nil) - // then they fetch new msgs, there is nothing new on chainA n.ccipReader.On( "MsgsBetweenSeqNums", @@ -231,6 +250,11 @@ func setupAllNodesReadAllChains(ctx context.Context, t *testing.T, lggr logger.L cciptypes.NewBigIntFromInt64(1000), cciptypes.NewBigIntFromInt64(20_000), }, nil) + + // all nodes observe the same sequence numbers 10 for chainA and 20 for chainB + n.ccipReader.On("NextSeqNum", ctx, []cciptypes.ChainSelector{chainA, chainB}). + Return([]cciptypes.SeqNum{10, 20}, nil) + } return nodes diff --git a/core/services/ocr3/plugins/ccip/commit/plugin_functions.go b/core/services/ocr3/plugins/ccip/commit/plugin_functions.go index a399c8c880..9bf8c58f05 100644 --- a/core/services/ocr3/plugins/ccip/commit/plugin_functions.go +++ b/core/services/ocr3/plugins/ccip/commit/plugin_functions.go @@ -19,80 +19,48 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/merklemulti" ) -// observeMaxSeqNums finds the maximum committed sequence numbers for each source chain. -// If a sequence number is pending (is not on-chain yet), it will be included in the results. -func observeMaxSeqNums( +// observeLatestCommittedSeqNums finds the maximum committed sequence numbers for each source chain. +// If we cannot observe the dest we return an empty slice and no error.. +func observeLatestCommittedSeqNums( ctx context.Context, lggr logger.Logger, ccipReader cciptypes.CCIPReader, - previousOutcomeBytes []byte, readableChains mapset.Set[cciptypes.ChainSelector], destChain cciptypes.ChainSelector, knownSourceChains []cciptypes.ChainSelector, -) ([]cciptypes.SeqNumChain, bool, error) { - seqNumsInSync := false - - // If there is a previous outcome, start with the sequence numbers of it. - seqNumPerChain := make(map[cciptypes.ChainSelector]cciptypes.SeqNum) - if previousOutcomeBytes != nil { - lggr.Debugw("observing based on previous outcome") - prevOutcome, err := cciptypes.DecodeCommitPluginOutcome(previousOutcomeBytes) - if err != nil { - return nil, false, fmt.Errorf("decode commit plugin previous outcome: %w", err) - } - lggr.Debugw("previous outcome decoded", "outcome", prevOutcome.String()) - - for _, seqNumChain := range prevOutcome.MaxSeqNums { - if seqNumChain.SeqNum > seqNumPerChain[seqNumChain.ChainSel] { - seqNumPerChain[seqNumChain.ChainSel] = seqNumChain.SeqNum - } - } - lggr.Debugw("discovered sequence numbers from prev outcome", "seqNumPerChain", seqNumPerChain) - } - - // If reading destination chain is supported find the latest sequence numbers per chain from the onchain state. +) ([]cciptypes.SeqNumChain, error) { + sort.Slice(knownSourceChains, func(i, j int) bool { return knownSourceChains[i] < knownSourceChains[j] }) + latestCommittedSeqNumsObservation := make([]cciptypes.SeqNumChain, 0) if readableChains.Contains(destChain) { - lggr.Debugw("reading sequence numbers from destination") - onChainSeqNums, err := ccipReader.NextSeqNum(ctx, knownSourceChains) + lggr.Debugw("reading latest committed sequence from destination") + onChainLatestCommittedSeqNums, err := ccipReader.NextSeqNum(ctx, knownSourceChains) if err != nil { - return nil, false, fmt.Errorf("get next seq nums: %w", err) + return latestCommittedSeqNumsObservation, fmt.Errorf("get next seq nums: %w", err) } - lggr.Debugw("discovered sequence numbers from destination", "onChainSeqNums", onChainSeqNums) - - // Update the seq nums if the on-chain sequence number is greater than previous outcome. + lggr.Debugw("observed latest committed sequence numbers on destination", "latestCommittedSeqNumsObservation", onChainLatestCommittedSeqNums) for i, ch := range knownSourceChains { - if onChainSeqNums[i] > seqNumPerChain[ch] { - seqNumPerChain[ch] = onChainSeqNums[i] - lggr.Debugw("updated sequence number", "chain", ch, "seqNum", onChainSeqNums[i]) - } + latestCommittedSeqNumsObservation = append(latestCommittedSeqNumsObservation, cciptypes.NewSeqNumChain(ch, onChainLatestCommittedSeqNums[i])) } - seqNumsInSync = true } - - maxChainSeqNums := make([]cciptypes.SeqNumChain, 0) - for ch, seqNum := range seqNumPerChain { - maxChainSeqNums = append(maxChainSeqNums, cciptypes.NewSeqNumChain(ch, seqNum)) - } - - sort.Slice(maxChainSeqNums, func(i, j int) bool { return maxChainSeqNums[i].ChainSel < maxChainSeqNums[j].ChainSel }) - return maxChainSeqNums, seqNumsInSync, nil + return latestCommittedSeqNumsObservation, nil } // observeNewMsgs finds the new messages for each supported chain based on the provided max sequence numbers. +// If latestCommitSeqNums is empty (first ever OCR round), it will return an empty slice. func observeNewMsgs( ctx context.Context, lggr logger.Logger, ccipReader cciptypes.CCIPReader, msgHasher cciptypes.MessageHasher, readableChains mapset.Set[cciptypes.ChainSelector], - maxSeqNumsPerChain []cciptypes.SeqNumChain, + latestCommittedSeqNums []cciptypes.SeqNumChain, msgScanBatchSize int, ) ([]cciptypes.CCIPMsg, error) { // Find the new msgs for each supported chain based on the discovered max sequence numbers. - newMsgsPerChain := make([][]cciptypes.CCIPMsg, len(maxSeqNumsPerChain)) + newMsgsPerChain := make([][]cciptypes.CCIPMsg, len(latestCommittedSeqNums)) eg := new(errgroup.Group) - for chainIdx, seqNumChain := range maxSeqNumsPerChain { + for chainIdx, seqNumChain := range latestCommittedSeqNums { if !readableChains.Contains(seqNumChain.ChainSel) { lggr.Debugw("reading chain is not supported", "chain", seqNumChain.ChainSel) continue @@ -140,7 +108,7 @@ func observeNewMsgs( } observedNewMsgs := make([]cciptypes.CCIPMsg, 0) - for chainIdx := range maxSeqNumsPerChain { + for chainIdx := range latestCommittedSeqNums { observedNewMsgs = append(observedNewMsgs, newMsgsPerChain[chainIdx]...) } return observedNewMsgs, nil diff --git a/core/services/ocr3/plugins/ccip/commit/plugin_functions_test.go b/core/services/ocr3/plugins/ccip/commit/plugin_functions_test.go index 45ba1c1bd5..9f9aa589fb 100644 --- a/core/services/ocr3/plugins/ccip/commit/plugin_functions_test.go +++ b/core/services/ocr3/plugins/ccip/commit/plugin_functions_test.go @@ -3,7 +3,6 @@ package commit import ( "context" "math/big" - "reflect" "slices" "testing" "time" @@ -33,57 +32,21 @@ func Test_observeMaxSeqNumsPerChain(t *testing.T) { expMaxSeqNums []cciptypes.SeqNumChain }{ { - name: "report on chain seq num when no previous outcome and can read dest", - prevOutcome: cciptypes.CommitPluginOutcome{}, + name: "report on chain seq num and can read dest", onChainSeqNums: map[cciptypes.ChainSelector]cciptypes.SeqNum{ 1: 10, 2: 20, }, - readChains: []cciptypes.ChainSelector{1, 2, 3}, - destChain: 3, - expErr: false, - expSeqNumsInSync: true, + readChains: []cciptypes.ChainSelector{1, 2, 3}, + destChain: 3, + expErr: false, expMaxSeqNums: []cciptypes.SeqNumChain{ {ChainSel: 1, SeqNum: 10}, {ChainSel: 2, SeqNum: 20}, }, }, { - name: "nothing to report when there is no previous outcome and cannot read dest", - prevOutcome: cciptypes.CommitPluginOutcome{}, - onChainSeqNums: map[cciptypes.ChainSelector]cciptypes.SeqNum{ - 1: 10, - 2: 20, - }, - readChains: []cciptypes.ChainSelector{1, 2}, - destChain: 3, - expErr: false, - expSeqNumsInSync: false, - expMaxSeqNums: []cciptypes.SeqNumChain{}, - }, - { - name: "report previous outcome seq nums and override when on chain is higher if can read dest", - prevOutcome: cciptypes.CommitPluginOutcome{ - MaxSeqNums: []cciptypes.SeqNumChain{ - {ChainSel: 1, SeqNum: 11}, // for chain 1 previous outcome is higher than on-chain state - {ChainSel: 2, SeqNum: 19}, // for chain 2 previous outcome is behind on-chain state - }, - }, - onChainSeqNums: map[cciptypes.ChainSelector]cciptypes.SeqNum{ - 1: 10, - 2: 20, - }, - readChains: []cciptypes.ChainSelector{1, 2, 3}, - destChain: 3, - expErr: false, - expSeqNumsInSync: true, - expMaxSeqNums: []cciptypes.SeqNumChain{ - {ChainSel: 1, SeqNum: 11}, - {ChainSel: 2, SeqNum: 20}, - }, - }, - { - name: "report previous outcome seq nums and mark as non synced if cannot read dest", + name: "cannot read dest", prevOutcome: cciptypes.CommitPluginOutcome{ MaxSeqNums: []cciptypes.SeqNumChain{ {ChainSel: 1, SeqNum: 11}, // for chain 1 previous outcome is higher than on-chain state @@ -94,14 +57,10 @@ func Test_observeMaxSeqNumsPerChain(t *testing.T) { 1: 10, 2: 20, }, - readChains: []cciptypes.ChainSelector{1, 2}, - destChain: 3, - expErr: false, - expSeqNumsInSync: false, - expMaxSeqNums: []cciptypes.SeqNumChain{ - {ChainSel: 1, SeqNum: 11}, - {ChainSel: 2, SeqNum: 19}, - }, + readChains: []cciptypes.ChainSelector{1, 2}, + destChain: 3, + expErr: false, + expMaxSeqNums: []cciptypes.SeqNumChain{}, }, } @@ -112,13 +71,6 @@ func Test_observeMaxSeqNumsPerChain(t *testing.T) { knownSourceChains := slicelib.Filter(tc.readChains, func(ch cciptypes.ChainSelector) bool { return ch != tc.destChain }) lggr := logger.Test(t) - var encodedPrevOutcome []byte - var err error - if !reflect.DeepEqual(tc.prevOutcome, cciptypes.CommitPluginOutcome{}) { - encodedPrevOutcome, err = tc.prevOutcome.Encode() - assert.NoError(t, err) - } - onChainSeqNums := make([]cciptypes.SeqNum, 0) for _, chain := range knownSourceChains { if v, ok := tc.onChainSeqNums[chain]; !ok { @@ -129,11 +81,10 @@ func Test_observeMaxSeqNumsPerChain(t *testing.T) { } mockReader.On("NextSeqNum", ctx, knownSourceChains).Return(onChainSeqNums, nil) - seqNums, synced, err := observeMaxSeqNums( + seqNums, err := observeLatestCommittedSeqNums( ctx, lggr, mockReader, - encodedPrevOutcome, mapset.NewSet(tc.readChains...), tc.destChain, knownSourceChains, @@ -145,7 +96,6 @@ func Test_observeMaxSeqNumsPerChain(t *testing.T) { } assert.NoError(t, err) assert.Equal(t, tc.expMaxSeqNums, seqNums) - assert.Equal(t, tc.expSeqNumsInSync, synced) }) } } diff --git a/core/services/ocr3/plugins/ccip/internal/libs/testhelpers/ocr3runner.go b/core/services/ocr3/plugins/ccip/internal/libs/testhelpers/ocr3runner.go index 13f216ce56..2f521de38c 100644 --- a/core/services/ocr3/plugins/ccip/internal/libs/testhelpers/ocr3runner.go +++ b/core/services/ocr3/plugins/ccip/internal/libs/testhelpers/ocr3runner.go @@ -36,11 +36,12 @@ type OCR3Runner[RI any] struct { previousOutcome ocr3types.Outcome } -func NewOCR3Runner[RI any](nodes []ocr3types.ReportingPlugin[RI], nodeIDs []commontypes.OracleID) *OCR3Runner[RI] { +func NewOCR3Runner[RI any](nodes []ocr3types.ReportingPlugin[RI], nodeIDs []commontypes.OracleID, initialOutcome ocr3types.Outcome) *OCR3Runner[RI] { return &OCR3Runner[RI]{ - nodes: nodes, - nodeIDs: nodeIDs, - round: 0, + nodes: nodes, + nodeIDs: nodeIDs, + round: 0, + previousOutcome: initialOutcome, } } diff --git a/core/services/ocr3/plugins/ccip/spec/commit_plugin.py b/core/services/ocr3/plugins/ccip/spec/commit_plugin.py index 6a261c3bce..c6048bc3e8 100644 --- a/core/services/ocr3/plugins/ccip/spec/commit_plugin.py +++ b/core/services/ocr3/plugins/ccip/spec/commit_plugin.py @@ -9,37 +9,86 @@ # NOTE: Even though the specification is written in a high-level programming language, it's purpose # is not to be executed. It is meant to be just a reference for the Go implementation. # +from dataclasses import dataclass +from typing import List, Dict + +ChainSelector = int + +@dataclass +class Interval: + min: int + max: int + +@dataclass +class Message: + seq_nr: int + message_id: bytes + # TODO: + +@dataclass +class Commit: + interval: Interval + root: bytes + +@dataclass +class CommitOutcome: + latest_committed_seq_nums: Dict[ChainSelector, int] + commits: Dict[ChainSelector, Commit] + token_prices: Dict[str, int] + gas_prices: Dict[ChainSelector, int] + +@dataclass +class CommitObservation: + latest_committed_seq_nums: Dict[ChainSelector, int] + new_msgs: Dict[ChainSelector, List[Message]] + token_prices: Dict[str, int] + gas_prices: Dict[ChainSelector, int] + f_chain: Dict[ChainSelector, int] + +@dataclass +class CommitConfig: + oracle: int # our own observer + dest_chain: ChainSelector + f_chain: Dict[ChainSelector, int] + # oracleIndex -> supported chains + oracle_info: Dict[int, Dict[ChainSelector, bool]] + priced_tokens: List[str] + class CommitPlugin: def __init__(self): - self.cfg = { - "dest_chain": "chainD", - "f_chain": {"chainA": 2, "chainB": 3, "chainD": 3}, - "observer_info": { - "nodeA": { - "supported_chains": {"chainA", "chainB", "chainD"}, - "token_prices_observer": True, - } - }, - "priced_tokens": {"tokenA", "tokenB"}, - } + self.cfg = CommitConfig( + oracle=1, + dest_chain=10, + f_chain={1: 2, 2: 3, 10: 3}, + oracle_info={ + 0: {1: True, 2: True, 10: True}, + # TODO: other oracles + }, + # TODO: will likely need aggregator address as well to + # actually get the price. + priced_tokens=["tokenA", "tokenB"], + ) self.keep_cfg_in_sync() - def query(self): + def get_token_prices(self): + # Read token prices which are required for the destination chain. + # We only read them if we have the capability to read from the price chain (e.g. arbitrum) + pass + + def get_gas_prices(self): + # Read all gas prices for the chains we support. pass - def observation(self, previous_outcome): - # Observe last msg sequence numbers for each source chain: {sourceChain: sequenceNumber} - observed_seq_nums = previous_outcome.get("observed_seq_nums", default={}) - if self.can_read_dest(): - on_chain_seq_nums = self.offRamp.get_sequence_numbers() - for (chain, seq_num) in on_chain_seq_nums.items(): - if chain not in observed_seq_nums or seq_num > observed_seq_nums[chain]: - observed_seq_nums[chain] = seq_num + def query(self): + pass - # Observe new msgs: {sourceChain: [(id, seq_num)]} + def observation(self, previous_outcome: CommitOutcome) -> CommitObservation: + # max_committed_seq_nr={sourceChainA: 10, sourceChainB: 20,...} + # Provided by the nodes that can read from the destination on the previous round. + # Observe msgs for our supported chains since the prev outcome. new_msgs = {} - for (chain, seq_num) in observed_seq_nums.items(): - if self.can_read(chain): + for (chain, seq_num) in previous_outcome.latest_committed_seq_nums: + if chain in self.cfg.oracle_info[self.cfg.oracle]: new_msgs[chain] = self.onRamp(chain).get_msgs(chain, start=seq_num+1, limit=256) for msg in new_msgs[chain]: assert(msg.id == msg.compute_id()) @@ -48,51 +97,55 @@ def observation(self, previous_outcome): token_prices = self.get_token_prices() # Observe gas prices. {chain: gasPrice} + # TODO: Should be a way to combine the loops over support chains for gas prices and new messages. gas_prices = self.get_gas_prices() # Observe fChain for each chain. {chain: f_chain} - f_chain = self.cfg["f_chain"] + # We observe this because configuration changes may be detected at different times by different nodes. + # We always use the configuration which is seen by a majority of nodes. + f_chain = self.cfg.f_chain - if not self.can_read_dest(): - # If node is not able to read updated sequence numbers from the destination, - # it should not observe the outdated ones that are coming from the previous outcome. - observed_seq_nums = {} + # If we support the destination chain, then we contribute an observation of the max committed seq nums. + # We use these in outcome to filter out messages that have already been committed. + latest_committed_seq_nums = {} + if self.cfg.dest_chain in self.cfg.oracle_info[self.cfg.oracle]: + latest_committed_seq_nums = self.offRamp.latest_committed_seq_nums() - return (observed_seq_nums, new_msgs, token_prices, gas_prices, f_chain) + return CommitObservation(latest_committed_seq_nums, new_msgs, token_prices, gas_prices, f_chain) def validate_observation(self, attributed_observation): observation = attributed_observation.observation - observer = attributed_observation.observer + oracle = attributed_observation.oracle - if "seq_nums" in observation: - assert observer.can_read_dest() - - observer_supported_chains = self.cfg["observer_info"][observer]["supported_chains"] - for (chain, msgs) in observation["new_msgs"].items(): - assert(chain in observer_supported_chains) - - if "seq_nums" in observation: - for msg in msgs: - assert(msg.seq_num > observation["observed_seq_nums"][msg.source_chain]) + # Only accept dest observations from nodes that support the dest chain + if observation.latest_committed_seq_nums is not None: + assert self.cfg.dest_chain in self.cfg.oracle_info[oracle] + # Only accept source observations from nodes which support those sources. + for (chain, msgs) in observation.new_msgs.items(): + assert(chain in self.cfg.oracle_info[oracle]) + # Don't allow duplicates by seqNr or id. Required to prevent double counting. assert(len(msgs) == len(set([msg.seq_num for msg in msgs]))) assert(len(msgs) == len(set([msg.id for msg in msgs]))) def observation_quorum(self): return "2F+1" - def outcome(self, observations): + def outcome(self, observations: List[CommitObservation])->CommitOutcome: f_chain = consensus_f_chain(observations) - seq_nums = consensus_seq_nums(observations, f_chain) + latest_committed_seq_nums = consensus_latest_committed_seq_nums(observations, f_chain) # all_msgs contains all messages from all observations, grouped by source chain - all_msgs = [observation["new_msgs"] for observation in observations].group_by_source_chain() + all_msgs = [observation.new_msgs for observation in observations].group_by_source_chain() - trees = {} # { chain: (root, min_seq_num, max_seq_num) } + commits = {} # { chain: (root, min_seq_num, max_seq_num) } for (chain, msgs) in all_msgs: - # keep only msgs with seq nums greater than the consensus max commited seq nums - msgs = [msg for msg in msgs if msg.seq_num > seq_nums[chain]] + # Keep only msgs with seq nums greater than the consensus max commited seq nums. + # Note right after a report has been submitted, we'll expect those same messages + # to appear in the next observation, because the message observations are built + # on the previous max committed seq nums. + msgs = [msg for msg in msgs if msg.seq_num > latest_committed_seq_nums[chain]] msgs_by_seq_num = msgs.group_by_seq_num() # { 423: [0x1, 0x1, 0x2] } # 2 nodes say that msg id is 0x1 and 1 node says it's 0x2 @@ -107,12 +160,14 @@ def outcome(self, observations): break # gap in sequence numbers, stop here msgs_for_tree.append((seq_num, id)) - trees[chain] = build_merkle_tree(msgs_for_tree, leaves="ids") + commits[chain] = Commit(root=build_merkle_tree(msgs_for_tree), interval=Interval(min=msgs_for_tree[0].seq_num, max=msgs_for_tree[-1].seq_num)) + # TODO: we only want to put token/gas prices onchain + # on a regular cadence unless huge deviation. token_prices = { tk: median(prices) for (tk, prices) in observations.group_token_prices_by_token() } gas_prices = { chain: median(prices) for (chain, prices) in observations.group_gas_prices_by_chain() } - return (seq_nums, trees, token_prices, gas_prices) + return CommitOutcome(latest_committed_seq_nums=latest_committed_seq_nums, commits=commits, token_price=token_prices, gas_prices=gas_prices) def reports(self, outcome): report = report_from_outcome(outcome) @@ -120,14 +175,14 @@ def reports(self, outcome): return [encoded] def should_accept(self, report): - if report is empty or invalid: + if len(report) == 0 or self.validate_report(report): return False def should_transmit(self, report): if not self.is_writer(): return False - if report is empty or invalid: + if len(report) == 0 or not self.validate_report(report): return False on_chain_seq_nums = self.offRamp.get_sequence_numbers() @@ -137,6 +192,9 @@ def should_transmit(self, report): return True + def validate_report(self, report): + pass + def keep_cfg_in_sync(self): # Polling the configuration on the on-chain contract. # When the config is updated on-chain, updates the plugin's local copy to the most recent version. @@ -146,12 +204,30 @@ def consensus_f_chain(observations): f_chain_votes = observations["f_chain"].group_by_chain() # { chainA: [1, 1, 16, 16, 16, 16] } return { ch: elem_most_occurrences(fs) for (ch, fs) in f_chain_votes.items() } # { chainA: 16 } -def consensus_seq_nums(observations, f_chain): - observed_seq_nums = observations["observed_seq_nums"].group_by_chain(sort="asc") # { chainA: [4, 5, 5, 5, 5, 6, 6] } - seq_nums_consensus = {} - - for chain, seq_nums in observed_seq_nums.items(): - if len(observed_seq_nums) >= 2*f_chain[chain]+1: - seq_nums_consensus[chain] = observed_seq_nums[f_chain[chain]] # with f=4 { chainA: 5 } - - return seq_nums_consensus +def consensus_latest_committed_seq_nums(observations, f_chains): + all_latest_committed_seq_nums = {} + for observation in observations: + for (chain, seq_num) in observation.latest_committed_seq_nums.items(): + if chain not in all_latest_committed_seq_nums: + all_latest_committed_seq_nums[chain] = [] + all_latest_committed_seq_nums[chain].append(seq_num) + + latest_committed_seq_nums_consensus = {} + # { chainA: [4, 5, 5, 5, 5, 6, 6] } + for (chain, latest_committed_seq_nums) in all_latest_committed_seq_nums.items(): + if len(latest_committed_seq_nums) >= 2*f_chains[chain]+1: + # 2f+1 = 2*5+1 = 11 + latest_committed_seq_nums_consensus[chain] = sorted(latest_committed_seq_nums)[f_chains[chain]]# with f=4 { chainA: 5 } + return latest_committed_seq_nums_consensus + +def elem_most_occurrences(lst): + pass + +def build_merkle_tree(messages): + pass + +def median(lst): + pass + +def report_from_outcome(outcome: CommitOutcome)->bytes: + pass