Skip to content

Commit

Permalink
usdc - less LP queries and logging improvements (#1177)
Browse files Browse the repository at this point in the history
Keep USDC related logs in-mem, short-lived (20sec) to prevent frequent fetching of the exact same logs when a ccip msg contains multiple usdc token transfers.
  • Loading branch information
dimkouv authored Jul 16, 2024
1 parent f77e6f0 commit 2f2dc93
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 11 deletions.
52 changes: 42 additions & 10 deletions core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package ccipdata
import (
"context"
"fmt"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/patrickmn/go-cache"
"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
Expand All @@ -14,6 +16,13 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/abihelpers"
)

var (
// shortLivedInMemLogsCacheExpiration is used for the short-lived in meme logs cache.
// Value should usually be set to just a few seconds, a larger duration will not increase performance and might
// cause performance issues on re-orged logs.
shortLivedInMemLogsCacheExpiration = 20 * time.Second
)

const (
MESSAGE_SENT_FILTER_NAME = "USDC message sent"
)
Expand All @@ -36,6 +45,10 @@ type USDCReaderImpl struct {
filter logpoller.Filter
lggr logger.Logger
transmitterAddress common.Address

// shortLivedInMemLogs is a short-lived cache (items expire every few seconds)
// used to prevent frequent log fetching from the log poller
shortLivedInMemLogs *cache.Cache
}

func (u *USDCReaderImpl) Close() error {
Expand Down Expand Up @@ -71,20 +84,38 @@ func parseUSDCMessageSent(logData []byte) ([]byte, error) {
}

func (u *USDCReaderImpl) GetUSDCMessagePriorToLogIndexInTx(ctx context.Context, logIndex int64, usdcTokenIndexOffset int, txHash string) ([]byte, error) {
var lpLogs []logpoller.Log

// fetch all the usdc logs for the provided tx hash
logs, err := u.lp.IndexedLogsByTxHash(
ctx,
u.usdcMessageSent,
u.transmitterAddress,
common.HexToHash(txHash),
)
if err != nil {
return nil, err
k := fmt.Sprintf("usdc-%s", txHash) // custom prefix to avoid key collision if someone re-uses the cache
if rawLogs, foundInMem := u.shortLivedInMemLogs.Get(k); foundInMem {
inMemLogs, ok := rawLogs.([]logpoller.Log)
if !ok {
return nil, errors.Errorf("unexpected in-mem logs type %T", rawLogs)
}
u.lggr.Debugw("found logs in memory", "k", k, "len", len(inMemLogs))
lpLogs = inMemLogs
}

if len(lpLogs) == 0 {
u.lggr.Debugw("fetching logs from lp", "k", k)
logs, err := u.lp.IndexedLogsByTxHash(
ctx,
u.usdcMessageSent,
u.transmitterAddress,
common.HexToHash(txHash),
)
if err != nil {
return nil, err
}
lpLogs = logs
u.shortLivedInMemLogs.Set(k, logs, cache.DefaultExpiration)
u.lggr.Debugw("fetched logs from lp", "logs", len(lpLogs))
}

// collect the logs with log index less than the provided log index
allUsdcTokensData := make([][]byte, 0)
for _, current := range logs {
for _, current := range lpLogs {
if current.LogIndex < logIndex {
u.lggr.Infow("Found USDC message", "logIndex", current.LogIndex, "txHash", current.TxHash.Hex(), "data", hexutil.Encode(current.Data))
allUsdcTokensData = append(allUsdcTokensData, current.Data)
Expand Down Expand Up @@ -118,7 +149,8 @@ func NewUSDCReader(lggr logger.Logger, jobID string, transmitter common.Address,
Addresses: []common.Address{transmitter},
Retention: CommitExecLogsRetention,
},
transmitterAddress: transmitter,
transmitterAddress: transmitter,
shortLivedInMemLogs: cache.New(shortLivedInMemLogsCacheExpiration, 2*shortLivedInMemLogsCacheExpiration),
}

if registerFilters {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,35 @@ func TestLogPollerClient_GetUSDCMessagePriorToLogIndexInTx(t *testing.T) {
lp.AssertExpectations(t)
})

t.Run("logs fetched from memory in subsequent calls", func(t *testing.T) {
lp := lpmocks.NewLogPoller(t)
u, _ := NewUSDCReader(lggr, "job_123", utils.RandomAddress(), lp, false)

lp.On("IndexedLogsByTxHash",
mock.Anything,
u.usdcMessageSent,
u.transmitterAddress,
txHash,
).Return([]logpoller.Log{
{LogIndex: ccipLogIndex - 2, Data: hexutil.MustDecode(expectedData)},
{LogIndex: ccipLogIndex - 1, Data: []byte("-2")},
{LogIndex: ccipLogIndex, Data: []byte("0")},
{LogIndex: ccipLogIndex + 1, Data: []byte("1")},
}, nil).Once()

// first call logs must be fetched from lp
usdcMessageData, err := u.GetUSDCMessagePriorToLogIndexInTx(context.Background(), ccipLogIndex, 1, txHash.String())
assert.NoError(t, err)
assert.Equal(t, expectedPostParse, hexutil.Encode(usdcMessageData))

// subsequent call, logs must be fetched from memory
usdcMessageData, err = u.GetUSDCMessagePriorToLogIndexInTx(context.Background(), ccipLogIndex, 1, txHash.String())
assert.NoError(t, err)
assert.Equal(t, expectedPostParse, hexutil.Encode(usdcMessageData))

lp.AssertExpectations(t)
})

t.Run("none found", func(t *testing.T) {
lp := lpmocks.NewLogPoller(t)
u, _ := NewUSDCReader(lggr, "job_123", utils.RandomAddress(), lp, false)
Expand Down
9 changes: 8 additions & 1 deletion core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,20 @@ func (s *TokenDataReader) ReadTokenData(ctx context.Context, msg cciptypes.EVM2E
return []byte{}, errors.Wrap(err, "failed getting the USDC message body")
}

s.lggr.Infow("Calling attestation API", "messageBodyHash", hexutil.Encode(messageBody[:]), "messageID", hexutil.Encode(msg.MessageID[:]))
msgID := hexutil.Encode(msg.MessageID[:])
msgBody := hexutil.Encode(messageBody)
s.lggr.Infow("Calling attestation API", "messageBodyHash", msgBody, "messageID", msgID)

// The attestation API expects the hash of the message body
attestationResp, err := s.callAttestationApi(ctx, utils.Keccak256Fixed(messageBody))
if err != nil {
return []byte{}, errors.Wrap(err, "failed calling usdc attestation API ")
}

s.lggr.Infow("Got response from attestation API", "messageID", msgID,
"attestationStatus", attestationResp.Status, "attestation", attestationResp.Attestation,
"attestationError", attestationResp.Error)

switch attestationResp.Status {
case attestationStatusSuccess:
// The USDC pool needs a combination of the message body and the attestation
Expand Down

0 comments on commit 2f2dc93

Please sign in to comment.