diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader.go index eb222e36c4..32430d7209 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader.go @@ -3,9 +3,11 @@ package ccipdata import ( "context" "fmt" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/patrickmn/go-cache" "github.com/pkg/errors" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" @@ -14,6 +16,13 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/abihelpers" ) +var ( + // shortLivedInMemLogsCacheExpiration is used for the short-lived in meme logs cache. + // Value should usually be set to just a few seconds, a larger duration will not increase performance and might + // cause performance issues on re-orged logs. + shortLivedInMemLogsCacheExpiration = 20 * time.Second +) + const ( MESSAGE_SENT_FILTER_NAME = "USDC message sent" ) @@ -36,6 +45,10 @@ type USDCReaderImpl struct { filter logpoller.Filter lggr logger.Logger transmitterAddress common.Address + + // shortLivedInMemLogs is a short-lived cache (items expire every few seconds) + // used to prevent frequent log fetching from the log poller + shortLivedInMemLogs *cache.Cache } func (u *USDCReaderImpl) Close() error { @@ -71,20 +84,38 @@ func parseUSDCMessageSent(logData []byte) ([]byte, error) { } func (u *USDCReaderImpl) GetUSDCMessagePriorToLogIndexInTx(ctx context.Context, logIndex int64, usdcTokenIndexOffset int, txHash string) ([]byte, error) { + var lpLogs []logpoller.Log + // fetch all the usdc logs for the provided tx hash - logs, err := u.lp.IndexedLogsByTxHash( - ctx, - u.usdcMessageSent, - u.transmitterAddress, - common.HexToHash(txHash), - ) - if err != nil { - return nil, err + k := fmt.Sprintf("usdc-%s", txHash) // custom prefix to avoid key collision if someone re-uses the cache + if rawLogs, foundInMem := u.shortLivedInMemLogs.Get(k); foundInMem { + inMemLogs, ok := rawLogs.([]logpoller.Log) + if !ok { + return nil, errors.Errorf("unexpected in-mem logs type %T", rawLogs) + } + u.lggr.Debugw("found logs in memory", "k", k, "len", len(inMemLogs)) + lpLogs = inMemLogs + } + + if len(lpLogs) == 0 { + u.lggr.Debugw("fetching logs from lp", "k", k) + logs, err := u.lp.IndexedLogsByTxHash( + ctx, + u.usdcMessageSent, + u.transmitterAddress, + common.HexToHash(txHash), + ) + if err != nil { + return nil, err + } + lpLogs = logs + u.shortLivedInMemLogs.Set(k, logs, cache.DefaultExpiration) + u.lggr.Debugw("fetched logs from lp", "logs", len(lpLogs)) } // collect the logs with log index less than the provided log index allUsdcTokensData := make([][]byte, 0) - for _, current := range logs { + for _, current := range lpLogs { if current.LogIndex < logIndex { u.lggr.Infow("Found USDC message", "logIndex", current.LogIndex, "txHash", current.TxHash.Hex(), "data", hexutil.Encode(current.Data)) allUsdcTokensData = append(allUsdcTokensData, current.Data) @@ -118,7 +149,8 @@ func NewUSDCReader(lggr logger.Logger, jobID string, transmitter common.Address, Addresses: []common.Address{transmitter}, Retention: CommitExecLogsRetention, }, - transmitterAddress: transmitter, + transmitterAddress: transmitter, + shortLivedInMemLogs: cache.New(shortLivedInMemLogsCacheExpiration, 2*shortLivedInMemLogsCacheExpiration), } if registerFilters { diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader_internal_test.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader_internal_test.go index ef3adfee7f..a5f0a1ffd0 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader_internal_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader_internal_test.go @@ -75,6 +75,35 @@ func TestLogPollerClient_GetUSDCMessagePriorToLogIndexInTx(t *testing.T) { lp.AssertExpectations(t) }) + t.Run("logs fetched from memory in subsequent calls", func(t *testing.T) { + lp := lpmocks.NewLogPoller(t) + u, _ := NewUSDCReader(lggr, "job_123", utils.RandomAddress(), lp, false) + + lp.On("IndexedLogsByTxHash", + mock.Anything, + u.usdcMessageSent, + u.transmitterAddress, + txHash, + ).Return([]logpoller.Log{ + {LogIndex: ccipLogIndex - 2, Data: hexutil.MustDecode(expectedData)}, + {LogIndex: ccipLogIndex - 1, Data: []byte("-2")}, + {LogIndex: ccipLogIndex, Data: []byte("0")}, + {LogIndex: ccipLogIndex + 1, Data: []byte("1")}, + }, nil).Once() + + // first call logs must be fetched from lp + usdcMessageData, err := u.GetUSDCMessagePriorToLogIndexInTx(context.Background(), ccipLogIndex, 1, txHash.String()) + assert.NoError(t, err) + assert.Equal(t, expectedPostParse, hexutil.Encode(usdcMessageData)) + + // subsequent call, logs must be fetched from memory + usdcMessageData, err = u.GetUSDCMessagePriorToLogIndexInTx(context.Background(), ccipLogIndex, 1, txHash.String()) + assert.NoError(t, err) + assert.Equal(t, expectedPostParse, hexutil.Encode(usdcMessageData)) + + lp.AssertExpectations(t) + }) + t.Run("none found", func(t *testing.T) { lp := lpmocks.NewLogPoller(t) u, _ := NewUSDCReader(lggr, "job_123", utils.RandomAddress(), lp, false) diff --git a/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go index e7aac86033..fe3a86d2af 100644 --- a/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go +++ b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go @@ -187,13 +187,20 @@ func (s *TokenDataReader) ReadTokenData(ctx context.Context, msg cciptypes.EVM2E return []byte{}, errors.Wrap(err, "failed getting the USDC message body") } - s.lggr.Infow("Calling attestation API", "messageBodyHash", hexutil.Encode(messageBody[:]), "messageID", hexutil.Encode(msg.MessageID[:])) + msgID := hexutil.Encode(msg.MessageID[:]) + msgBody := hexutil.Encode(messageBody) + s.lggr.Infow("Calling attestation API", "messageBodyHash", msgBody, "messageID", msgID) // The attestation API expects the hash of the message body attestationResp, err := s.callAttestationApi(ctx, utils.Keccak256Fixed(messageBody)) if err != nil { return []byte{}, errors.Wrap(err, "failed calling usdc attestation API ") } + + s.lggr.Infow("Got response from attestation API", "messageID", msgID, + "attestationStatus", attestationResp.Status, "attestation", attestationResp.Attestation, + "attestationError", attestationResp.Error) + switch attestationResp.Status { case attestationStatusSuccess: // The USDC pool needs a combination of the message body and the attestation