Skip to content

Commit

Permalink
Restoring previous version of pruning (without left join) (#1544)
Browse files Browse the repository at this point in the history
## Motivation

Outer joins are putting too much pressure on the database. There were
some fixes done to improve that, but in 2.18 release. In the meantime,
I'm bringing back the previous implementation until we merge 2.18 to
CCIP repo. The only difference is that, we not gonna drop the orphaned
logs (it would work as the currently released version)

### Using outer join

<img width="945" alt="image"
src="https://github.com/user-attachments/assets/622c2877-f0fe-4b7b-a9c2-7d785902f88a">


### Restoring inner join

<img width="941" alt="image"
src="https://github.com/user-attachments/assets/074176d1-93f1-4fab-a6a5-2a665da2093f">
  • Loading branch information
mateusz-sekara authored Nov 21, 2024
1 parent 2a76297 commit a1636a9
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 31 deletions.
7 changes: 1 addition & 6 deletions core/chains/evm/logpoller/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,7 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) {
assert.Equal(t, float64(20), testutil.ToFloat64(orm.logsInserted.WithLabelValues("420")))
assert.Equal(t, float64(2), testutil.ToFloat64(orm.blocksInserted.WithLabelValues("420")))

rowsAffected, err := orm.DeleteExpiredLogs(ctx, 3)
require.NoError(t, err)
require.Equal(t, int64(3), rowsAffected)
assert.Equal(t, 3, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteExpiredLogs", "delete"))

rowsAffected, err = orm.DeleteBlocksBefore(ctx, 30, 0)
rowsAffected, err := orm.DeleteBlocksBefore(ctx, 30, 0)
require.NoError(t, err)
require.Equal(t, int64(2), rowsAffected)
assert.Equal(t, 2, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteBlocksBefore", "delete"))
Expand Down
32 changes: 18 additions & 14 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,30 +314,34 @@ type Exp struct {
ShouldDelete bool
}

// DeleteExpiredLogs removes any logs which either:
// - don't match any currently registered filters, or
// - have a timestamp older than any matching filter's retention, UNLESS there is at
// least one matching filter with retention=0
func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) {
var err error
var result sql.Result
query := `DELETE FROM evm.logs
if limit > 0 {
result, err = o.ds.ExecContext(ctx, `
DELETE FROM evm.logs
WHERE (evm_chain_id, address, event_sig, block_number) IN (
SELECT l.evm_chain_id, l.address, l.event_sig, l.block_number
FROM evm.logs l
LEFT JOIN (
SELECT address, event, CASE WHEN MIN(retention) = 0 THEN 0 ELSE MAX(retention) END AS retention
INNER JOIN (
SELECT address, event, MAX(retention) AS retention
FROM evm.log_poller_filters
WHERE evm_chain_id = $1
GROUP BY evm_chain_id, address, event
) r ON l.address = r.address AND l.event_sig = r.event
WHERE l.evm_chain_id = $1 AND -- Must be WHERE rather than ON due to LEFT JOIN
r.retention IS NULL OR (r.retention != 0 AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')) %s)`

if limit > 0 {
result, err = o.ds.ExecContext(ctx, fmt.Sprintf(query, "LIMIT $2"), ubig.New(o.chainID), limit)
HAVING NOT 0 = ANY(ARRAY_AGG(retention))
) r ON l.evm_chain_id = $1 AND l.address = r.address AND l.event_sig = r.event
AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')
LIMIT $2
)`, ubig.New(o.chainID), limit)
} else {
result, err = o.ds.ExecContext(ctx, fmt.Sprintf(query, ""), ubig.New(o.chainID))
result, err = o.ds.ExecContext(ctx, `WITH r AS
( SELECT address, event, MAX(retention) AS retention
FROM evm.log_poller_filters WHERE evm_chain_id=$1
GROUP BY evm_chain_id,address, event HAVING NOT 0 = ANY(ARRAY_AGG(retention))
) DELETE FROM evm.logs l USING r
WHERE l.evm_chain_id = $1 AND l.address=r.address AND l.event_sig=r.event
AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')`, // retention is in nanoseconds (time.Duration aka BIGINT)
ubig.New(o.chainID))
}

if err != nil {
Expand Down
15 changes: 5 additions & 10 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,12 +489,7 @@ func TestORM(t *testing.T) {
time.Sleep(2 * time.Millisecond) // just in case we haven't reached the end of the 1ms retention period
deleted, err := o1.DeleteExpiredLogs(ctx, 2)
require.NoError(t, err)
assert.Equal(t, int64(2), deleted)

// Delete expired logs without page limit
deleted, err = o1.DeleteExpiredLogs(ctx, 0)
require.NoError(t, err)
assert.Equal(t, int64(2), deleted)
assert.Equal(t, int64(1), deleted)

// Ensure that both of the logs from the second chain are still there
logs, err = o2.SelectLogs(ctx, 0, 100, common.HexToAddress("0x1236"), topic2)
Expand All @@ -506,10 +501,10 @@ func TestORM(t *testing.T) {

logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber)
require.NoError(t, err)
// It should have retained the log matching filter0 (due to ret=0 meaning permanent retention) as well as all
// 3 logs matching filter12 (ret=1 hour). It should have deleted 3 logs not matching any filter, as well as 1
// of the 2 logs matching filter1 (ret=1ms)--the one that doesn't also match filter12.
assert.Len(t, logs, 4)
// The only log which should be deleted is the one which matches filter1 (ret=1ms) but not filter12 (ret=1 hour)
// Importantly, it shouldn't delete any logs matching only filter0 (ret=0 meaning permanent retention). Anything
// matching filter12 should be kept regardless of what other filters it matches.
assert.Len(t, logs, 7)

// Delete logs after should delete all logs.
err = o1.DeleteLogsAndBlocksAfter(ctx, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# If you want to use a specific commit or a branch you need to switch to the internal ECR in `~/.testsecrets`
# E2E_TEST_CHAINLINK_IMAGE="<aws account number>.dkr.ecr.<aws region>.amazonaws.com/chainlink-ccip"
[CCIP.Env.NewCLCluster.Common.ChainlinkImage]
version = "2.14.0-ccip1.5.0"
version = "2.17.0-ccip1.5.11-beta.0"

[CCIP]
[CCIP.ContractVersions]
Expand Down

0 comments on commit a1636a9

Please sign in to comment.