Skip to content

Commit

Permalink
node: Add unit test for flow cancelling when a pending transfer is
Browse files Browse the repository at this point in the history
released

- Add a unit test to ensure that, when a pending transfer is released,
  it also does flow-cancelling on the TargetChain (previously we had a
  bug here)
- Add documentation for CheckPendingForTime to clarify that it has
  side-effects
  • Loading branch information
johnsaigle committed Jul 2, 2024
1 parent 088a77f commit 6aae792
Show file tree
Hide file tree
Showing 2 changed files with 287 additions and 0 deletions.
9 changes: 9 additions & 0 deletions node/pkg/governor/governor.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,15 @@ func (gov *ChainGovernor) CheckPending() ([]*common.MessagePublication, error) {
return gov.CheckPendingForTime(time.Now())
}

// Iterates over all pending transfers for all of the chain entries configured for the Governor.
// If a pending message is ready to be released, modifies the chain entry's `pending` and `transfers` slices by
// moving a `dbTransfer` element from `pending` to `transfers`. Returns a slice of Messages that will be published.
// A transfer is ready to be released when one of the following conditions holds:
// - The 'release time' duration has passed since `now` (i.e. the transfer has been queued for 24 hours, regardless of
// the Governor's current capacity)
// - Within the release time duration, other transfers have been processed and have freed up outbound Governor capacity.
// This happens either because other transfers get released after 24 hours or because incoming transfers of
// flow-cancelling assets have freed up outbound capacity.
func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessagePublication, error) {
gov.mutex.Lock()
defer gov.mutex.Unlock()
Expand Down
278 changes: 278 additions & 0 deletions node/pkg/governor/governor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1544,6 +1544,284 @@ func TestPendingTransferBeingReleased(t *testing.T) {
assert.Equal(t, 3, len(gov.msgsSeen))
}

// Test that, when a small transfer (under the 'big tx limit') of a flow-cancelling asset is queued and
// later released, it causes a reduction in the Governor usage for the destination chain.
func TestPendingTransferFlowCancelsWhenReleased(t *testing.T) {

ctx := context.Background()
gov, err := newChainGovernorForTest(ctx)

require.NoError(t, err)
assert.NotNil(t, gov)

// Set-up time
gov.setDayLengthInMinutes(24 * 60)
transferTime := time.Unix(int64(1654543099), 0)

// Solana USDC used as the flow cancelling asset. This ensures that the flow cancel mechanism works
// when the Origin chain of the asset does not match the emitter chain
// NOTE: Replace this Chain:Address pair if the Flow Cancel Token List is modified
var flowCancelTokenOriginAddress vaa.Address
flowCancelTokenOriginAddress, err = vaa.StringToAddress("c6fa7af3bedbad3a3d65f36aabc97431b1bbe4c2d2f6e0e47ca60203452f5d61")
require.NoError(t, err)

// var notFlowCancelTokenOriginAddress vaa.Address
// notFlowCancelTokenOriginAddress, err = vaa.StringToAddress("77777af3bedbad3a3d65f36aabc97431b1bbe4c2d2f6e0e47ca60203452f7777")
require.NoError(t, err)

// Data for Ethereum
tokenBridgeAddrStrEthereum := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec
tokenBridgeAddrEthereum, err := vaa.StringToAddress(tokenBridgeAddrStrEthereum)
require.NoError(t, err)
recipientEthereum := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8" //nolint:gosec

// Data for Sui
tokenBridgeAddrStrSui := "0xc57508ee0d4595e5a8728974a4a93a787d38f339757230d441e895422c07aba9" //nolint:gosec
tokenBridgeAddrSui, err := vaa.StringToAddress(tokenBridgeAddrStrSui)
require.NoError(t, err)
recipientSui := "0x84a5f374d29fc77e370014dce4fd6a55b58ad608de8074b0be5571701724da31"

// Data for Solana. Only used to represent the flow cancel asset.
// "wormDTUJ6AWPNvk59vGQbDvGJmqbDTdgWgAqcLBCgUb"
tokenBridgeAddrStrSolana := "0x0e0a589e6488147a94dcfa592b90fdd41152bb2ca77bf6016758a6f4df9d21b4" //nolint:gosec

// Add chain entries to `gov`
dailyLimit := uint64(10000)
err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStrEthereum, dailyLimit, 0)
require.NoError(t, err)
err = gov.setChainForTesting(vaa.ChainIDSui, tokenBridgeAddrStrSui, dailyLimit, 0)
require.NoError(t, err)
err = gov.setChainForTesting(vaa.ChainIDSolana, tokenBridgeAddrStrSolana, dailyLimit, 0)
require.NoError(t, err)

// Add flow cancel asset and non-flow cancelable asset to the token entry for `gov`
err = gov.setTokenForTesting(vaa.ChainIDSolana, flowCancelTokenOriginAddress.String(), "USDC", 1.0, true)
require.NoError(t, err)
assert.NotNil(t, gov.tokens[tokenKey{chain: vaa.ChainIDSolana, addr: flowCancelTokenOriginAddress}])
// err = gov.setTokenForTesting(vaa.ChainIDEthereum, notFlowCancelTokenOriginAddress.String(), "NOTCANCELABLE", 1.0, false)
// require.NoError(t, err)

// First message: consume most of the dailyLimit for the emitter chain
msg1 := common.MessagePublication{
TxHash: hashFromString("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"),
Timestamp: time.Unix(int64(transferTime.Unix()+1), 0),
Nonce: uint32(1),
Sequence: uint64(1),
EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddrEthereum,
ConsistencyLevel: uint8(32),
Payload: buildMockTransferPayloadBytes(1,
vaa.ChainIDSolana, // The origin asset for the token being transferred
flowCancelTokenOriginAddress.String(),
vaa.ChainIDSui,
recipientSui,
10000,
),
}

// Second message: This transfer gets queued because the limit is exhausted
msg2 := common.MessagePublication{
TxHash: hashFromString("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"),
Timestamp: time.Unix(int64(transferTime.Unix()+2), 0),
Nonce: uint32(2),
Sequence: uint64(2),
EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddrEthereum,
ConsistencyLevel: uint8(32),
Payload: buildMockTransferPayloadBytes(1,
vaa.ChainIDSolana,
flowCancelTokenOriginAddress.String(),
vaa.ChainIDSui,
recipientSui,
500,
),
}

// Third message: Incoming flow cancelling transfer to the emitter chain for the previous messages. This
// reduces the Governor usage for that chain.
msg3 := common.MessagePublication{
TxHash: hashFromString("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"),
Timestamp: time.Unix(int64(transferTime.Unix()+3), 0),
Nonce: uint32(3),
Sequence: uint64(3),
EmitterChain: vaa.ChainIDSui,
EmitterAddress: tokenBridgeAddrSui,
ConsistencyLevel: uint8(0), // Sui has a consistency level of 0 (instant)
Payload: buildMockTransferPayloadBytes(1,
vaa.ChainIDSolana,
flowCancelTokenOriginAddress.String(),
vaa.ChainIDEthereum,
recipientEthereum,
1000,
),
}

// Stage 0: No transfers sent
chainEntryEthereum, exists := gov.chains[vaa.ChainIDEthereum]
assert.True(t, exists)
assert.NotNil(t, chainEntryEthereum)
chainEntrySui, exists := gov.chains[vaa.ChainIDSui]
assert.True(t, exists)
assert.NotNil(t, chainEntrySui)
sumEth, ethTransfers, err := gov.TrimAndSumValue(chainEntryEthereum.transfers, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Zero(t, len(ethTransfers))
assert.Zero(t, len(chainEntryEthereum.pending))
assert.Zero(t, sumEth)
require.NoError(t, err)
sumSui, suiTransfers, err := gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(1654543099), 0))
assert.Zero(t, len(suiTransfers))
assert.Zero(t, sumSui)
require.NoError(t, err)

// Perform a FIRST transfer (Ethereum --> Sui)
result, err := gov.ProcessMsgForTime(&msg1, time.Now())
assert.True(t, result)
require.NoError(t, err)

numTrans, netValueTrans, numPending, valuePending := gov.getStatsForAllChainsCancelFlow()
assert.Equal(t, 2, numTrans) // One for the positive and one for the negative
assert.Equal(t, int64(0), netValueTrans) // Zero, because the asset flow cancels
assert.Equal(t, 0, numPending)
assert.Equal(t, uint64(0), valuePending)
assert.Equal(t, 1, len(gov.msgsSeen))

// Check the state of the governor
chainEntryEthereum = gov.chains[vaa.ChainIDEthereum]
chainEntrySui = gov.chains[vaa.ChainIDSui]
assert.Equal(t, int(1), len(chainEntryEthereum.transfers))
assert.Equal(t, int(0), len(chainEntryEthereum.pending)) // One for inbound refund and another for outbound
assert.Equal(t, int(1), len(chainEntrySui.transfers))
sumEth, ethTransfers, err = gov.TrimAndSumValue(chainEntryEthereum.transfers, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Equal(t, int64(10000), sumEth) // Equal to total dailyLimit
assert.Equal(t, int(1), len(ethTransfers))
require.NoError(t, err)

// Outbound check:
// - ensure that the sum of the transfers is equal to the value of the inverse transfer
// - ensure the actual governor usage is Zero (any negative value is converted to zero by TrimAndSumValueForChain)
sumSui, suiTransfers, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Equal(t, 1, len(suiTransfers)) // A single NEGATIVE transfer
assert.Equal(t, int64(-10000), sumSui) // Ensure the inverse (negative) transfer is in the Sui chain Entry
require.NoError(t, err)
suiGovernorUsage, err := gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Zero(t, suiGovernorUsage) // Actual governor usage must not be negative.
require.NoError(t, err)

// Perform a SECOND transfer (Ethereum --> Sui again)
// When a transfer is queued, ProcessMsgForTime should return false.
result, err = gov.ProcessMsgForTime(&msg2, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.False(t, result)
require.NoError(t, err)

// Stage 2: Transfer sent from Ethereum to Sui gets queued
numTrans, netValueTrans, numPending, valuePending = gov.getStatsForAllChainsCancelFlow()
assert.Equal(t, 2, len(gov.msgsSeen)) // Two messages observed
assert.Equal(t, 2, numTrans) // Two transfers (same as previous step)
assert.Equal(t, int64(0), netValueTrans) // The two transfers and their inverses cancel each other out.
assert.Equal(t, 1, numPending) // Second transfer is queued because the limit is exhausted
assert.Equal(t, uint64(500), valuePending)

// Verify the stats that are non flow-cancelling.
// In practice this is the sum of the absolute value of all the transfers.
// 5000 * 2 + 1000 * 2 = 12000
// _, absValueTrans, _, _ := gov.getStatsForAllChains()
// assert.Equal(t, uint64(12000), absValueTrans)

// Check the state of the governor.
chainEntryEthereum = gov.chains[vaa.ChainIDEthereum]
chainEntrySui = gov.chains[vaa.ChainIDSui]
assert.Equal(t, int(1), len(chainEntryEthereum.transfers)) // One from previous step
assert.Equal(t, int(1), len(chainEntryEthereum.pending)) // One for inbound refund and another for outbound
assert.Equal(t, int(1), len(chainEntrySui.transfers)) // One inverse transfer. Inverse from pending not added yet
sumEth, ethTransfers, err = gov.TrimAndSumValue(chainEntryEthereum.transfers, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Equal(t, int64(10000), sumEth) // Same as before: full dailyLimit
assert.Equal(t, int(1), len(ethTransfers)) // Same as before
require.NoError(t, err)
sumSui, suiTransfers, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Equal(t, int(1), len(suiTransfers)) // just the inverse from before
assert.Equal(t, int64(-10000), sumSui) // Unchanged.
require.NoError(t, err)
suiGovernorUsage, err = gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Zero(t, suiGovernorUsage) // Actual governor usage must not be negative.
require.NoError(t, err)

// Stage 3: Message that reduces Governor usage for Ethereum (Sui --> Ethereum)
result, err = gov.ProcessMsgForTime(&msg3, time.Now())
assert.True(t, result)
require.NoError(t, err)

// Stage 3: Governor usage reduced on Ethereum due to incoming from Sui
numTrans, netValueTrans, numPending, valuePending = gov.getStatsForAllChainsCancelFlow()
assert.Equal(t, 3, len(gov.msgsSeen))
assert.Equal(t, 4, numTrans) // Two transfers and their inverses
assert.Equal(t, int64(0), netValueTrans) // Still zero because everything flow cancels
assert.Equal(t, 1, numPending) // Not released yet
assert.Equal(t, uint64(500), valuePending)
// Verify the stats that are non flow-cancelling.
// In practice this is the sum of the absolute value of all the transfers.
// 5000 * 2 + 1000 * 2 + 1500 = 13500
// _, absValueTrans, _, _ = gov.getStatsForAllChains()
// assert.Equal(t, uint64(13500), absValueTrans) // The net actual flow of assets is 4000 (after cancelling) plus 1500

// Check the state of the governor
chainEntryEthereum = gov.chains[vaa.ChainIDEthereum]
chainEntrySui = gov.chains[vaa.ChainIDSui]
assert.Equal(t, int(2), len(chainEntryEthereum.transfers))
assert.Equal(t, int(1), len(chainEntryEthereum.pending)) // We have not yet released the pending transfer
assert.Equal(t, int(2), len(chainEntrySui.transfers))
sumEth, ethTransfers, err = gov.TrimAndSumValue(chainEntryEthereum.transfers, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Equal(t, int64(9000), sumEth) // We freed up room because of Sui incoming
assert.Equal(t, int(2), len(ethTransfers)) // Two transfers cancel each other out
require.NoError(t, err)
sumSui, suiTransfers, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Equal(t, int(2), len(suiTransfers))
assert.Equal(t, int64(-9000), sumSui) // We consumed some outbound capacity
require.NoError(t, err)
suiGovernorUsage, err = gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Equal(t, uint64(0), suiGovernorUsage) // Still zero because it's still negative
require.NoError(t, err)

// Stage 4: Release the pending transfer. We deliberately do not advance the time here because we are relying
// on the pending transfer being released as a result of flow-cancelling and not because 24 hours have passed.
// NOTE that even though the function says "Checked..." it modifies `gov` as a side-effect when a pending
// transfer is ready to be released
toBePublished, err := gov.CheckPendingForTime(time.Unix(int64(transferTime.Unix()-1000), 0))
require.NoError(t, err)
assert.Equal(t, 1, len(toBePublished))

// Stage 4: Pending transfer released. This increases the Ethereum Governor usage again and reduces Sui.
numTrans, netValueTrans, numPending, valuePending = gov.getStatsForAllChainsCancelFlow()
assert.Equal(t, 3, len(gov.msgsSeen))
assert.Equal(t, 6, numTrans) // Two new transfers created from previous pending transfer
assert.Equal(t, int64(0), netValueTrans) // Still zero because everything flow cancels
assert.Equal(t, 0, numPending) // Pending transfer has been released
assert.Equal(t, uint64(0), valuePending)

// Verify the stats that are non flow-cancelling.
// In practice this is the sum of the absolute value of all the transfers, including the inverses.
// 2 * (10000 + 1000 + 500) = 23000
_, absValueTrans, _, _ := gov.getStatsForAllChains()
assert.Equal(t, uint64(23000), absValueTrans)

// Check the state of the governor
chainEntryEthereum = gov.chains[vaa.ChainIDEthereum]
chainEntrySui = gov.chains[vaa.ChainIDSui]
assert.Equal(t, int(3), len(chainEntryEthereum.transfers)) // Two outbound, one inverse from Sui
assert.Equal(t, int(0), len(chainEntryEthereum.pending)) // Released
assert.Equal(t, int(3), len(chainEntrySui.transfers)) // One outbound, two inverses from Ethereum
sumEth, ethTransfers, err = gov.TrimAndSumValue(chainEntryEthereum.transfers, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Equal(t, int64(9500), sumEth)
assert.Equal(t, int(3), len(ethTransfers))
require.NoError(t, err)
sumSui, suiTransfers, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Equal(t, int(3), len(suiTransfers)) // New inverse transfer added after pending transfer was released
assert.Equal(t, int64(-9500), sumSui) // Flow-cancelling inverse transfer added to Sui after released
require.NoError(t, err)
suiGovernorUsage, err = gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Equal(t, uint64(0), suiGovernorUsage) // Still zero
require.NoError(t, err)
}

func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
ctx := context.Background()
gov, err := newChainGovernorForTest(ctx)
Expand Down

0 comments on commit 6aae792

Please sign in to comment.