Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restoring previous version of pruning (without left join) #1544

Merged
merged 2 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions core/chains/evm/logpoller/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,7 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) {
assert.Equal(t, float64(20), testutil.ToFloat64(orm.logsInserted.WithLabelValues("420")))
assert.Equal(t, float64(2), testutil.ToFloat64(orm.blocksInserted.WithLabelValues("420")))

rowsAffected, err := orm.DeleteExpiredLogs(ctx, 3)
require.NoError(t, err)
require.Equal(t, int64(3), rowsAffected)
assert.Equal(t, 3, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteExpiredLogs", "delete"))

rowsAffected, err = orm.DeleteBlocksBefore(ctx, 30, 0)
rowsAffected, err := orm.DeleteBlocksBefore(ctx, 30, 0)
require.NoError(t, err)
require.Equal(t, int64(2), rowsAffected)
assert.Equal(t, 2, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteBlocksBefore", "delete"))
Expand Down
32 changes: 18 additions & 14 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,30 +314,34 @@ type Exp struct {
ShouldDelete bool
}

// DeleteExpiredLogs removes any logs which either:
// - don't match any currently registered filters, or
// - have a timestamp older than any matching filter's retention, UNLESS there is at
// least one matching filter with retention=0
func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) {
var err error
var result sql.Result
query := `DELETE FROM evm.logs
if limit > 0 {
result, err = o.ds.ExecContext(ctx, `
DELETE FROM evm.logs
WHERE (evm_chain_id, address, event_sig, block_number) IN (
SELECT l.evm_chain_id, l.address, l.event_sig, l.block_number
FROM evm.logs l
LEFT JOIN (
SELECT address, event, CASE WHEN MIN(retention) = 0 THEN 0 ELSE MAX(retention) END AS retention
INNER JOIN (
SELECT address, event, MAX(retention) AS retention
FROM evm.log_poller_filters
WHERE evm_chain_id = $1
GROUP BY evm_chain_id, address, event
) r ON l.address = r.address AND l.event_sig = r.event
WHERE l.evm_chain_id = $1 AND -- Must be WHERE rather than ON due to LEFT JOIN
r.retention IS NULL OR (r.retention != 0 AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')) %s)`

if limit > 0 {
result, err = o.ds.ExecContext(ctx, fmt.Sprintf(query, "LIMIT $2"), ubig.New(o.chainID), limit)
HAVING NOT 0 = ANY(ARRAY_AGG(retention))
) r ON l.evm_chain_id = $1 AND l.address = r.address AND l.event_sig = r.event
AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')
LIMIT $2
)`, ubig.New(o.chainID), limit)
} else {
result, err = o.ds.ExecContext(ctx, fmt.Sprintf(query, ""), ubig.New(o.chainID))
result, err = o.ds.ExecContext(ctx, `WITH r AS
( SELECT address, event, MAX(retention) AS retention
FROM evm.log_poller_filters WHERE evm_chain_id=$1
GROUP BY evm_chain_id,address, event HAVING NOT 0 = ANY(ARRAY_AGG(retention))
) DELETE FROM evm.logs l USING r
WHERE l.evm_chain_id = $1 AND l.address=r.address AND l.event_sig=r.event
AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')`, // retention is in nanoseconds (time.Duration aka BIGINT)
ubig.New(o.chainID))
}

if err != nil {
Expand Down
15 changes: 5 additions & 10 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,12 +489,7 @@ func TestORM(t *testing.T) {
time.Sleep(2 * time.Millisecond) // just in case we haven't reached the end of the 1ms retention period
deleted, err := o1.DeleteExpiredLogs(ctx, 2)
require.NoError(t, err)
assert.Equal(t, int64(2), deleted)

// Delete expired logs without page limit
deleted, err = o1.DeleteExpiredLogs(ctx, 0)
require.NoError(t, err)
assert.Equal(t, int64(2), deleted)
assert.Equal(t, int64(1), deleted)

// Ensure that both of the logs from the second chain are still there
logs, err = o2.SelectLogs(ctx, 0, 100, common.HexToAddress("0x1236"), topic2)
Expand All @@ -506,10 +501,10 @@ func TestORM(t *testing.T) {

logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber)
require.NoError(t, err)
// It should have retained the log matching filter0 (due to ret=0 meaning permanent retention) as well as all
// 3 logs matching filter12 (ret=1 hour). It should have deleted 3 logs not matching any filter, as well as 1
// of the 2 logs matching filter1 (ret=1ms)--the one that doesn't also match filter12.
assert.Len(t, logs, 4)
Comment on lines -509 to -512
Copy link
Collaborator

@reductionista reductionista Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think line 512 and the comment above it are the only lines that actually need to be reverted here. The rest of the changes to this file are all useful improvements to the test that would be relevant with either version.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Marking as approved since this doesn't matter much either way--I'll leave it up to you, if it's easier just to revert the whole thing for now and then re-apply the same commit later that's fine

// The only log which should be deleted is the one which matches filter1 (ret=1ms) but not filter12 (ret=1 hour)
// Importantly, it shouldn't delete any logs matching only filter0 (ret=0 meaning permanent retention). Anything
// matching filter12 should be kept regardless of what other filters it matches.
assert.Len(t, logs, 7)

// Delete logs after should delete all logs.
err = o1.DeleteLogsAndBlocksAfter(ctx, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# If you want to use a specific commit or a branch you need to switch to the internal ECR in `~/.testsecrets`
# E2E_TEST_CHAINLINK_IMAGE="<aws account number>.dkr.ecr.<aws region>.amazonaws.com/chainlink-ccip"
[CCIP.Env.NewCLCluster.Common.ChainlinkImage]
version = "2.14.0-ccip1.5.0"
version = "2.17.0-ccip1.5.11-beta.0"

[CCIP]
[CCIP.ContractVersions]
Expand Down
Loading