diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index bba8c23b6c..5305582b97 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -186,27 +186,33 @@ func (o *DSORM) LoadFilters(ctx context.Context) (map[string]Filter, error) { return filters, err } -const ( - blocksFields = `evm_chain_id, block_hash, block_number, block_timestamp, finalized_block_number, created_at` - logsFields = `evm_chain_id, log_index, block_hash, block_number, address, event_sig, topics, tx_hash, data, created_at, block_timestamp` -) - func blocksQuery(clause string) string { - return fmt.Sprintf(`SELECT %s FROM evm.log_poller_blocks %s`, blocksFields, clause) + return fmt.Sprintf(`SELECT %s FROM evm.log_poller_blocks %s`, strings.Join(blocksFields[:], ", "), clause) } func logsQuery(clause string) string { - return fmt.Sprintf(`SELECT %s FROM evm.logs %s`, logsFields, clause) + return fmt.Sprintf(`SELECT %s FROM evm.logs %s`, strings.Join(logsFields[:], ", "), clause) +} + +func logsQueryWithTablePrefix(schema string, clause string) string { + var s strings.Builder + for i, field := range logsFields { + if i > 0 { + s.WriteString(", ") + } + s.WriteString(fmt.Sprintf("%s.%s", schema, field)) + } + return fmt.Sprintf(`SELECT %s FROM evm.logs AS %s %s`, s.String(), schema, clause) } func withConfs(query string, confs evmtypes.Confirmations) string { var lastConfirmedBlock string if confs == evmtypes.Finalized { - lastConfirmedBlock = `MAX(finalized_block)` + lastConfirmedBlock = `MAX(finalized_block_number)` } else { lastConfirmedBlock = `MAX(block_number) - :confs` } - return fmt.Sprintf(`%s AND block_number <= ( + return fmt.Sprintf(`%sblock_number <= ( SELECT %s FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id)`, query, lastConfirmedBlock) @@ -236,7 +242,7 @@ func (o *DSORM) SelectBlockByHash(ctx context.Context, hash common.Hash) (*LogPo func (o *DSORM) SelectBlockByNumber(ctx context.Context, n int64) (*LogPollerBlock, error) { var b LogPollerBlock if err := o.ds.GetContext(ctx, &b, - blocksQuery(`WHERE block_number = $1 AND evm_chain_id = $2)`), n, ubig.New(o.chainID), + blocksQuery(`WHERE block_number = $1 AND evm_chain_id = $2`), n, ubig.New(o.chainID), ); err != nil { return nil, err } @@ -274,7 +280,7 @@ func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig query := logsQueryWithConfs( `WHERE evm_chain_id = :evm_chain_id AND event_sig = :event_sig - AND address = :address`, confs) + + AND address = :address AND `, confs) + `ORDER BY block_number desc, log_index DESC LIMIT 1` var l Log @@ -578,7 +584,7 @@ func (o *DSORM) SelectLogsCreatedAfter(ctx context.Context, address common.Addre `WHERE evm_chain_id = :evm_chain_id AND address = :address AND event_sig = :event_sig - AND block_timestamp > :block_timestamp_after`, confs) + + AND block_timestamp > :block_timestamp_after AND `, confs) + `ORDER BY block_number, log_index` var logs []Log @@ -670,7 +676,7 @@ func (o *DSORM) SelectLatestLogEventSigsAddrsWithConfs(ctx context.Context, from WHERE evm_chain_id = :evm_chain_id AND event_sig = ANY(:event_sig_array) AND address = ANY(:address_array) - AND block_number > :start_block`, confs) + + AND block_number > :start_block AND `, confs) + `GROUP BY event_sig, address) ORDER BY block_number ASC` @@ -701,7 +707,7 @@ func (o *DSORM) SelectLatestBlockByEventSigsAddrsWithConfs(ctx context.Context, WHERE evm_chain_id = :evm_chain_id AND event_sig = ANY(:event_sig_array) AND address = ANY(:address_array) - AND block_number > :start_block`, confs) + AND block_number > :start_block AND `, confs) var blockNumber int64 query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -730,7 +736,7 @@ func (o *DSORM) SelectLogsDataWordRange(ctx context.Context, address common.Addr AND address = :address AND event_sig = :event_sig AND substring(data from 32*:word_index+1 for 32) >= :word_value_min - AND substring(data from 32*:word_index+1 for 32) <= :word_value_max`, confs) + + AND substring(data from 32*:word_index+1 for 32) <= :word_value_max AND `, confs) + `ORDER BY block_number, log_index` var logs []Log @@ -758,7 +764,7 @@ func (o *DSORM) SelectLogsDataWordGreaterThan(ctx context.Context, address commo query := logsQueryWithConfs(`WHERE evm_chain_id = :evm_chain_id AND address = :address AND event_sig = :event_sig - AND substring(data from 32*:word_index+1 for 32) >= :word_value_min`, confs) + + AND substring(data from 32*:word_index+1 for 32) >= :word_value_min AND `, confs) + `ORDER BY block_number, log_index` var logs []Log @@ -787,7 +793,7 @@ func (o *DSORM) SelectLogsDataWordBetween(ctx context.Context, address common.Ad AND address = :address AND event_sig = :event_sig AND substring(data from 32*:word_index_min+1 for 32) <= :word_value - AND substring(data from 32*:word_index_max+1 for 32) >= :word_value`, confs) + + AND substring(data from 32*:word_index_max+1 for 32) >= :word_value AND `, confs) + `ORDER BY block_number, log_index` var logs []Log @@ -815,7 +821,7 @@ func (o *DSORM) SelectIndexedLogsTopicGreaterThan(ctx context.Context, address c query := logsQueryWithConfs(`WHERE evm_chain_id = :evm_chain_id AND address = :address AND event_sig = :event_sig - AND topics[:topic_index] >= :topic_value_min`, confs) + + AND topics[:topic_index] >= :topic_value_min AND `, confs) + `ORDER BY block_number, log_index` var logs []Log @@ -845,7 +851,7 @@ func (o *DSORM) SelectIndexedLogsTopicRange(ctx context.Context, address common. AND address = :address AND event_sig = :event_sig AND topics[:topic_index] >= :topic_value_min - AND topics[:topic_index] <= :topic_value_max`, confs) + + AND topics[:topic_index] <= :topic_value_max AND `, confs) + `ORDER BY block_number, log_index` var logs []Log @@ -874,7 +880,7 @@ func (o *DSORM) SelectIndexedLogs(ctx context.Context, address common.Address, e WHERE evm_chain_id = :evm_chain_id AND address = :address AND event_sig = :event_sig - AND topics[:topic_index] = ANY(:topic_values)`, confs) + + AND topics[:topic_index] = ANY(:topic_values) AND `, confs) + `ORDER BY block_number, log_index` var logs []Log @@ -939,7 +945,7 @@ func (o *DSORM) SelectIndexedLogsCreatedAfter(ctx context.Context, address commo AND address = :address AND event_sig = :event_sig AND topics[:topic_index] = ANY(:topic_values) - AND block_timestamp > :block_timestamp_after`, confs) + + AND block_timestamp > :block_timestamp_after AND `, confs) + `ORDER BY block_number, log_index` var logs []Log @@ -1003,17 +1009,18 @@ func (o *DSORM) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, si WHERE evm_chain_id = :evm_chain_id AND address = :address AND event_sig = :sigA - AND block_number BETWEEN :start_block AND :end_block`, confs) + - withConfs(`EXCEPT - SELECT a.* FROM evm.logs AS a - INNER JOIN evm.logs B + AND block_number BETWEEN :start_block AND :end_block AND `, confs) + + ` EXCEPT ` + + withConfs(logsQueryWithTablePrefix("a", ` + INNER JOIN evm.logs AS b ON a.evm_chain_id = b.evm_chain_id AND a.address = b.address AND a.topics[:topic_index] = b.topics[:topic_index] AND a.event_sig = :sigA AND b.event_sig = :sigB - AND b.block_number BETWEEN :start_block AND :end_block`, confs) + - `ORDER BY block_number, log_index` + AND b.block_number BETWEEN :start_block AND :end_block + AND b.`), confs) + + ` ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) diff --git a/core/chains/evm/logpoller/parser.go b/core/chains/evm/logpoller/parser.go index e08ea93da7..0acac07575 100644 --- a/core/chains/evm/logpoller/parser.go +++ b/core/chains/evm/logpoller/parser.go @@ -13,6 +13,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ) @@ -26,6 +27,10 @@ const ( var ( ErrUnexpectedCursorFormat = errors.New("unexpected cursor format") + logsFields = [...]string{"evm_chain_id", "log_index", "block_hash", "block_number", + "address", "event_sig", "topics", "tx_hash", "data", "created_at", "block_timestamp"} + blocksFields = [...]string{"evm_chain_id", "block_hash", "block_number", "block_timestamp", + "finalized_block_number", "created_at"} ) // The parser builds SQL expressions piece by piece for each Accept function call and resets the error and expression @@ -220,7 +225,7 @@ func (v *pgDSLParser) buildQuery(chainID *big.Int, expressions []query.Expressio v.err = nil // build the query string - clauses := []string{"SELECT evm.logs.* FROM evm.logs"} + clauses := []string{logsQuery("")} where, err := v.whereClause(expressions, limiter) if err != nil { diff --git a/core/chains/evm/logpoller/parser_test.go b/core/chains/evm/logpoller/parser_test.go index 5e99ec7ba8..b4099e000d 100644 --- a/core/chains/evm/logpoller/parser_test.go +++ b/core/chains/evm/logpoller/parser_test.go @@ -10,6 +10,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ) @@ -34,7 +35,7 @@ func TestDSLParser(t *testing.T) { result, args, err := parser.buildQuery(chainID, expressions, limiter) require.NoError(t, err) - assert.Equal(t, "SELECT evm.logs.* FROM evm.logs WHERE evm_chain_id = :evm_chain_id ORDER BY "+defaultSort, result) + assert.Equal(t, logsQuery(" WHERE evm_chain_id = :evm_chain_id ORDER BY "+defaultSort), result) assertArgs(t, args, 1) }) @@ -52,15 +53,14 @@ func TestDSLParser(t *testing.T) { limiter := query.NewLimitAndSort(query.CursorLimit("10-5-0x42", query.CursorFollowing, 20)) result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND (address = :address_0 AND event_sig = :event_sig_0 " + - "AND block_number <= " + - "(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " + - "AND (block_number > :cursor_block_number OR (block_number = :cursor_block_number AND log_index > :cursor_log_index)) " + - "ORDER BY block_number ASC, log_index ASC, tx_hash ASC " + - "LIMIT 20" + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND (address = :address_0 AND event_sig = :event_sig_0 " + + "AND block_number <= " + + "(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " + + "AND (block_number > :cursor_block_number OR (block_number = :cursor_block_number AND log_index > :cursor_log_index)) " + + "ORDER BY block_number ASC, log_index ASC, tx_hash ASC " + + "LIMIT 20") require.NoError(t, err) assert.Equal(t, expected, result) @@ -80,12 +80,11 @@ func TestDSLParser(t *testing.T) { limiter := query.NewLimitAndSort(query.CountLimit(20)) result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND (address = :address_0 AND event_sig = :event_sig_0) " + - "ORDER BY " + defaultSort + " " + - "LIMIT 20" + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND (address = :address_0 AND event_sig = :event_sig_0) " + + "ORDER BY " + defaultSort + " " + + "LIMIT 20") require.NoError(t, err) assert.Equal(t, expected, result) @@ -102,10 +101,9 @@ func TestDSLParser(t *testing.T) { limiter := query.NewLimitAndSort(query.Limit{}, query.NewSortBySequence(query.Desc)) result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "ORDER BY block_number DESC, log_index DESC, tx_hash DESC" + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "ORDER BY block_number DESC, log_index DESC, tx_hash DESC") require.NoError(t, err) assert.Equal(t, expected, result) @@ -122,10 +120,9 @@ func TestDSLParser(t *testing.T) { limiter := query.NewLimitAndSort(query.Limit{}, query.NewSortByBlock(query.Asc), query.NewSortByTimestamp(query.Desc)) result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "ORDER BY block_number ASC, block_timestamp DESC" + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "ORDER BY block_number ASC, block_timestamp DESC") require.NoError(t, err) assert.Equal(t, expected, result) @@ -147,16 +144,15 @@ func TestDSLParser(t *testing.T) { limiter := query.NewLimitAndSort(query.CursorLimit("10-20-0x42", query.CursorPrevious, 20)) result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND (block_timestamp = :block_timestamp_0 " + - "AND tx_hash = :tx_hash_0 " + - "AND block_number != :block_number_0 " + - "AND block_number <= " + - "(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " + - "AND (block_number < :cursor_block_number OR (block_number = :cursor_block_number AND log_index < :cursor_log_index)) " + - "ORDER BY block_number DESC, log_index DESC, tx_hash DESC LIMIT 20" + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND (block_timestamp = :block_timestamp_0 " + + "AND tx_hash = :tx_hash_0 " + + "AND block_number != :block_number_0 " + + "AND block_number <= " + + "(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " + + "AND (block_number < :cursor_block_number OR (block_number = :cursor_block_number AND log_index < :cursor_log_index)) " + + "ORDER BY block_number DESC, log_index DESC, tx_hash DESC LIMIT 20") require.NoError(t, err) assert.Equal(t, expected, result) @@ -175,10 +171,9 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND block_number <= (SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND block_number <= (SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -194,10 +189,9 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -213,10 +207,9 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -243,10 +236,9 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND substring(data from 32*:word_index_0+1 for 32) > :word_value_0 ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND substring(data from 32*:word_index_0+1 for 32) > :word_value_0 ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -268,10 +260,9 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND topics[:topic_index_0] > :topic_value_0 AND topics[:topic_index_0] < :topic_value_1 ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND topics[:topic_index_0] > :topic_value_0 AND topics[:topic_index_0] < :topic_value_1 ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -304,12 +295,11 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND (block_timestamp >= :block_timestamp_0 " + - "AND (tx_hash = :tx_hash_0 " + - "OR block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1))) ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND (block_timestamp >= :block_timestamp_0 " + + "AND (tx_hash = :tx_hash_0 " + + "OR block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1))) ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -353,14 +343,13 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND (block_timestamp = :block_timestamp_0 " + - "AND (tx_hash = :tx_hash_0 " + - "OR (block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) " + - "AND substring(data from 32*:word_index_0+1 for 32) > :word_value_0 " + - "AND substring(data from 32*:word_index_0+1 for 32) <= :word_value_1))) ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND (block_timestamp = :block_timestamp_0 " + + "AND (tx_hash = :tx_hash_0 " + + "OR (block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) " + + "AND substring(data from 32*:word_index_0+1 for 32) > :word_value_0 " + + "AND substring(data from 32*:word_index_0+1 for 32) <= :word_value_1))) ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result)