Skip to content

Commit

Permalink
Add block_number >= lower
Browse files Browse the repository at this point in the history
  • Loading branch information
reductionista committed Sep 29, 2024
1 parent ae4cb7a commit 11f3b78
Showing 1 changed file with 20 additions and 19 deletions.
39 changes: 20 additions & 19 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,20 +294,20 @@ func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig
return &l, nil
}

// ExecPagedQuery runs a query accepting an upper block number limit in a fast paged way. limit is the maximum number of results to be returned,
// but it is also used to break the query up into smaller queries restricted to limit # of blocks. The first range
// of blocks will be from MIN(block_number) to MIN(block_number) + limit. The iterative process ends either once
// ExecPagedQuery runs a query accepting an upper limit block (end) in a fast paged way. limit is the maximum number
// of results to be returned, but it is also used to break the query up into smaller queries restricted to limit # of blocks.
// The first range of blocks will be from MIN(block_number) to MIN(block_number) + limit. The iterative process ends either once
// the limit on results is reached or block_number = end. The query will never be exeucted on blocks where
// block_number > end, and it will never be executed on block_number = B unless it has also been executed on all
// blocks with block_number < B
func (o *DSORM) ExecPagedQuery(ctx context.Context, limit int64, end int64, query func(limitBlock int64) (int64, error)) (numResults int64, err error) {
func (o *DSORM) ExecPagedQuery(ctx context.Context, limit int64, end int64, query func(lower, upper int64) (int64, error)) (numResults int64, err error) {
if limit == 0 {
return query(end)
return query(0, end)
}

var limitBlock int64
err = o.ds.GetContext(ctx, &limitBlock, `SELECT MIN(block_number) FROM evm.log_poller_blocks
WHERE evm_chain_id = $1`, ubig.New(o.chainID))
var start int64
err = o.ds.GetContext(ctx, &start, `SELECT MIN(block_number) FROM evm.log_poller_blocks
WHERE evm_chain_id = $1`, ubig.New(o.chainID))
if err != nil {
if err == sql.ErrNoRows {
return 0, nil
Expand All @@ -316,17 +316,18 @@ func (o *DSORM) ExecPagedQuery(ctx context.Context, limit int64, end int64, quer
}

// Remove up to limit blocks at a time, until we've reached the limit or removed everything eligible for deletion
for limitBlock += (limit - 1); numResults < limit; limitBlock += limit {
if limitBlock > end {
limitBlock = end
for lower, upper := start, start+limit-1; numResults < limit; lower = upper + 1 {
upper = lower + limit - 1
if upper > end {
upper = end
}
rows, err2 := query(limitBlock)
rows, err2 := query(lower, upper)
if err2 != nil {
return numResults, err
}
numResults += rows

if limitBlock == end {
if upper == end {
break
}
}
Expand All @@ -336,9 +337,9 @@ func (o *DSORM) ExecPagedQuery(ctx context.Context, limit int64, end int64, quer
// DeleteBlocksBefore delete blocks before and including end. When limit is set, it will delete at most limit blocks.
// Otherwise, it will delete all blocks at once.
func (o *DSORM) DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error) {
return o.ExecPagedQuery(ctx, limit, end, func(int64) (int64, error) {
result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks WHERE evm_chain_id = $1 AND block_number <= $2`,
ubig.New(o.chainID), end)
return o.ExecPagedQuery(ctx, limit, end, func(lower, upper int64) (int64, error) {
result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks WHERE evm_chain_id = $1 AND block_number >= $2 AND block_number <= $3`,
ubig.New(o.chainID), lower, upper)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -429,7 +430,7 @@ func (o *DSORM) SelectExcessLogIDs(ctx context.Context, limit int64) (results []
(f.topic2 IS NULL OR l.topics[1] = ANY(f.topic2)) AND
(f.topic3 IS NULL OR l.topics[2] = ANY(f.topic3)) AND
(f.topic4 IS NULL OR l.topics[3] = ANY(f.topic4))
WHERE evm_chain_id = $1 AND block_number <= $2
WHERE evm_chain_id = $1 AND block_number >= $2 AND block_number <= $3
`

// Return all logs considered "old" by every filter they match
Expand All @@ -441,9 +442,9 @@ func (o *DSORM) SelectExcessLogIDs(ctx context.Context, limit int64) (results []
return results, err
}

o.ExecPagedQuery(ctx, limit, latestBlock.FinalizedBlockNumber, func(limitBlock int64) (int64, error) {
o.ExecPagedQuery(ctx, limit, latestBlock.FinalizedBlockNumber, func(lower, upper int64) (int64, error) {
var rowIDs []uint64
err = o.ds.SelectContext(ctx, &rowIDs, query, ubig.New(o.chainID), limitBlock)
err = o.ds.SelectContext(ctx, &rowIDs, query, ubig.New(o.chainID), lower, upper)
if err != nil {
return 0, err
}
Expand Down

0 comments on commit 11f3b78

Please sign in to comment.