From a1636a98637ce67b4457916c6bee4cd98873cbce Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Thu, 21 Nov 2024 08:41:36 +0100 Subject: [PATCH] Restoring previous version of pruning (without left join) (#1544) ## Motivation Outer joins are putting too much pressure on the database. There were some fixes done to improve that, but in 2.18 release. In the meantime, I'm bringing back the previous implementation until we merge 2.18 to CCIP repo. The only difference is that, we not gonna drop the orphaned logs (it would work as the currently released version) ### Using outer join image ### Restoring inner join image --- .../evm/logpoller/observability_test.go | 7 +--- core/chains/evm/logpoller/orm.go | 32 +++++++++++-------- core/chains/evm/logpoller/orm_test.go | 15 +++------ .../tomls/ccip1.4-stress/baseline.toml | 2 +- 4 files changed, 25 insertions(+), 31 deletions(-) diff --git a/core/chains/evm/logpoller/observability_test.go b/core/chains/evm/logpoller/observability_test.go index 2f502438bb..826c39d3a2 100644 --- a/core/chains/evm/logpoller/observability_test.go +++ b/core/chains/evm/logpoller/observability_test.go @@ -119,12 +119,7 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) { assert.Equal(t, float64(20), testutil.ToFloat64(orm.logsInserted.WithLabelValues("420"))) assert.Equal(t, float64(2), testutil.ToFloat64(orm.blocksInserted.WithLabelValues("420"))) - rowsAffected, err := orm.DeleteExpiredLogs(ctx, 3) - require.NoError(t, err) - require.Equal(t, int64(3), rowsAffected) - assert.Equal(t, 3, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteExpiredLogs", "delete")) - - rowsAffected, err = orm.DeleteBlocksBefore(ctx, 30, 0) + rowsAffected, err := orm.DeleteBlocksBefore(ctx, 30, 0) require.NoError(t, err) require.Equal(t, int64(2), rowsAffected) assert.Equal(t, 2, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteBlocksBefore", "delete")) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 0b5a8f4bd4..c574124b98 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -314,30 +314,34 @@ type Exp struct { ShouldDelete bool } -// DeleteExpiredLogs removes any logs which either: -// - don't match any currently registered filters, or -// - have a timestamp older than any matching filter's retention, UNLESS there is at -// least one matching filter with retention=0 func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) { var err error var result sql.Result - query := `DELETE FROM evm.logs + if limit > 0 { + result, err = o.ds.ExecContext(ctx, ` + DELETE FROM evm.logs WHERE (evm_chain_id, address, event_sig, block_number) IN ( SELECT l.evm_chain_id, l.address, l.event_sig, l.block_number FROM evm.logs l - LEFT JOIN ( - SELECT address, event, CASE WHEN MIN(retention) = 0 THEN 0 ELSE MAX(retention) END AS retention + INNER JOIN ( + SELECT address, event, MAX(retention) AS retention FROM evm.log_poller_filters WHERE evm_chain_id = $1 GROUP BY evm_chain_id, address, event - ) r ON l.address = r.address AND l.event_sig = r.event - WHERE l.evm_chain_id = $1 AND -- Must be WHERE rather than ON due to LEFT JOIN - r.retention IS NULL OR (r.retention != 0 AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')) %s)` - - if limit > 0 { - result, err = o.ds.ExecContext(ctx, fmt.Sprintf(query, "LIMIT $2"), ubig.New(o.chainID), limit) + HAVING NOT 0 = ANY(ARRAY_AGG(retention)) + ) r ON l.evm_chain_id = $1 AND l.address = r.address AND l.event_sig = r.event + AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second') + LIMIT $2 + )`, ubig.New(o.chainID), limit) } else { - result, err = o.ds.ExecContext(ctx, fmt.Sprintf(query, ""), ubig.New(o.chainID)) + result, err = o.ds.ExecContext(ctx, `WITH r AS + ( SELECT address, event, MAX(retention) AS retention + FROM evm.log_poller_filters WHERE evm_chain_id=$1 + GROUP BY evm_chain_id,address, event HAVING NOT 0 = ANY(ARRAY_AGG(retention)) + ) DELETE FROM evm.logs l USING r + WHERE l.evm_chain_id = $1 AND l.address=r.address AND l.event_sig=r.event + AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')`, // retention is in nanoseconds (time.Duration aka BIGINT) + ubig.New(o.chainID)) } if err != nil { diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index ba66e166eb..5809b55d45 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -489,12 +489,7 @@ func TestORM(t *testing.T) { time.Sleep(2 * time.Millisecond) // just in case we haven't reached the end of the 1ms retention period deleted, err := o1.DeleteExpiredLogs(ctx, 2) require.NoError(t, err) - assert.Equal(t, int64(2), deleted) - - // Delete expired logs without page limit - deleted, err = o1.DeleteExpiredLogs(ctx, 0) - require.NoError(t, err) - assert.Equal(t, int64(2), deleted) + assert.Equal(t, int64(1), deleted) // Ensure that both of the logs from the second chain are still there logs, err = o2.SelectLogs(ctx, 0, 100, common.HexToAddress("0x1236"), topic2) @@ -506,10 +501,10 @@ func TestORM(t *testing.T) { logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber) require.NoError(t, err) - // It should have retained the log matching filter0 (due to ret=0 meaning permanent retention) as well as all - // 3 logs matching filter12 (ret=1 hour). It should have deleted 3 logs not matching any filter, as well as 1 - // of the 2 logs matching filter1 (ret=1ms)--the one that doesn't also match filter12. - assert.Len(t, logs, 4) + // The only log which should be deleted is the one which matches filter1 (ret=1ms) but not filter12 (ret=1 hour) + // Importantly, it shouldn't delete any logs matching only filter0 (ret=0 meaning permanent retention). Anything + // matching filter12 should be kept regardless of what other filters it matches. + assert.Len(t, logs, 7) // Delete logs after should delete all logs. err = o1.DeleteLogsAndBlocksAfter(ctx, 1) diff --git a/integration-tests/ccip-tests/testconfig/tomls/ccip1.4-stress/baseline.toml b/integration-tests/ccip-tests/testconfig/tomls/ccip1.4-stress/baseline.toml index d78cd12595..6244eaccff 100644 --- a/integration-tests/ccip-tests/testconfig/tomls/ccip1.4-stress/baseline.toml +++ b/integration-tests/ccip-tests/testconfig/tomls/ccip1.4-stress/baseline.toml @@ -15,7 +15,7 @@ # If you want to use a specific commit or a branch you need to switch to the internal ECR in `~/.testsecrets` # E2E_TEST_CHAINLINK_IMAGE=".dkr.ecr..amazonaws.com/chainlink-ccip" [CCIP.Env.NewCLCluster.Common.ChainlinkImage] -version = "2.14.0-ccip1.5.0" +version = "2.17.0-ccip1.5.11-beta.0" [CCIP] [CCIP.ContractVersions]