diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index f7ed30859e5..69070e46f1e 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -294,20 +294,20 @@ func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig return &l, nil } -// ExecPagedQuery runs a query accepting an upper block number limit in a fast paged way. limit is the maximum number of results to be returned, -// but it is also used to break the query up into smaller queries restricted to limit # of blocks. The first range -// of blocks will be from MIN(block_number) to MIN(block_number) + limit. The iterative process ends either once +// ExecPagedQuery runs a query accepting an upper limit block (end) in a fast paged way. limit is the maximum number +// of results to be returned, but it is also used to break the query up into smaller queries restricted to limit # of blocks. +// The first range of blocks will be from MIN(block_number) to MIN(block_number) + limit. The iterative process ends either once // the limit on results is reached or block_number = end. The query will never be exeucted on blocks where // block_number > end, and it will never be executed on block_number = B unless it has also been executed on all // blocks with block_number < B -func (o *DSORM) ExecPagedQuery(ctx context.Context, limit int64, end int64, query func(limitBlock int64) (int64, error)) (numResults int64, err error) { +func (o *DSORM) ExecPagedQuery(ctx context.Context, limit int64, end int64, query func(lower, upper int64) (int64, error)) (numResults int64, err error) { if limit == 0 { - return query(end) + return query(0, end) } - var limitBlock int64 - err = o.ds.GetContext(ctx, &limitBlock, `SELECT MIN(block_number) FROM evm.log_poller_blocks - WHERE evm_chain_id = $1`, ubig.New(o.chainID)) + var start int64 + err = o.ds.GetContext(ctx, &start, `SELECT MIN(block_number) FROM evm.log_poller_blocks + WHERE evm_chain_id = $1`, ubig.New(o.chainID)) if err != nil { if err == sql.ErrNoRows { return 0, nil @@ -316,17 +316,18 @@ func (o *DSORM) ExecPagedQuery(ctx context.Context, limit int64, end int64, quer } // Remove up to limit blocks at a time, until we've reached the limit or removed everything eligible for deletion - for limitBlock += (limit - 1); numResults < limit; limitBlock += limit { - if limitBlock > end { - limitBlock = end + for lower, upper := start, start+limit-1; numResults < limit; lower = upper + 1 { + upper = lower + limit - 1 + if upper > end { + upper = end } - rows, err2 := query(limitBlock) + rows, err2 := query(lower, upper) if err2 != nil { return numResults, err } numResults += rows - if limitBlock == end { + if upper == end { break } } @@ -336,9 +337,9 @@ func (o *DSORM) ExecPagedQuery(ctx context.Context, limit int64, end int64, quer // DeleteBlocksBefore delete blocks before and including end. When limit is set, it will delete at most limit blocks. // Otherwise, it will delete all blocks at once. func (o *DSORM) DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error) { - return o.ExecPagedQuery(ctx, limit, end, func(int64) (int64, error) { - result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks WHERE evm_chain_id = $1 AND block_number <= $2`, - ubig.New(o.chainID), end) + return o.ExecPagedQuery(ctx, limit, end, func(lower, upper int64) (int64, error) { + result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks WHERE evm_chain_id = $1 AND block_number >= $2 AND block_number <= $3`, + ubig.New(o.chainID), lower, upper) if err != nil { return 0, err } @@ -429,7 +430,7 @@ func (o *DSORM) SelectExcessLogIDs(ctx context.Context, limit int64) (results [] (f.topic2 IS NULL OR l.topics[1] = ANY(f.topic2)) AND (f.topic3 IS NULL OR l.topics[2] = ANY(f.topic3)) AND (f.topic4 IS NULL OR l.topics[3] = ANY(f.topic4)) - WHERE evm_chain_id = $1 AND block_number <= $2 + WHERE evm_chain_id = $1 AND block_number >= $2 AND block_number <= $3 ` // Return all logs considered "old" by every filter they match @@ -441,9 +442,9 @@ func (o *DSORM) SelectExcessLogIDs(ctx context.Context, limit int64) (results [] return results, err } - o.ExecPagedQuery(ctx, limit, latestBlock.FinalizedBlockNumber, func(limitBlock int64) (int64, error) { + o.ExecPagedQuery(ctx, limit, latestBlock.FinalizedBlockNumber, func(lower, upper int64) (int64, error) { var rowIDs []uint64 - err = o.ds.SelectContext(ctx, &rowIDs, query, ubig.New(o.chainID), limitBlock) + err = o.ds.SelectContext(ctx, &rowIDs, query, ubig.New(o.chainID), lower, upper) if err != nil { return 0, err }