Skip to content

Commit

Permalink
CCIP-1496 Delete expired logs by the block_timestamp (#12040)
Browse files Browse the repository at this point in the history
* Removing created_at index

* SonarQube fix
Post rebase fix
Added debug log
  • Loading branch information
mateusz-sekara authored Feb 28, 2024
1 parent e9a2dd0 commit aa22ad5
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 74 deletions.
1 change: 1 addition & 0 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,7 @@ func (lp *logPoller) latestBlocks(ctx context.Context) (*evmtypes.Head, int64, e
}
latest := blocks[0]
finalized := blocks[1]
lp.lggr.Debugw("Latest blocks read from chain", "latest", latest.Number, "finalized", finalized.Number)
return latest, finalized.Number, nil
}

Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (o *DbORM) DeleteExpiredLogs(limit int64, qopts ...pg.QOpt) (int64, error)
GROUP BY evm_chain_id,address, event HAVING NOT 0 = ANY(ARRAY_AGG(retention))
) DELETE FROM evm.logs l USING r
WHERE l.evm_chain_id = $1 AND l.address=r.address AND l.event_sig=r.event
AND l.created_at <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')`, // retention is in nanoseconds (time.Duration aka BIGINT)
AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')`, // retention is in nanoseconds (time.Duration aka BIGINT)
ubig.New(o.chainID))
}

Expand Down Expand Up @@ -397,7 +397,7 @@ func (o *DbORM) SelectLogsByBlockRange(start, end int64) ([]Log, error) {
WHERE evm_chain_id = :evm_chain_id
AND block_number >= :start_block
AND block_number <= :end_block
ORDER BY (block_number, log_index, created_at)`, args)
ORDER BY (block_number, log_index)`, args)
if err != nil {
return nil, err
}
Expand Down
152 changes: 80 additions & 72 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,92 +222,100 @@ func TestORM(t *testing.T) {
topic2 := common.HexToHash("0x1600")
require.NoError(t, o1.InsertLogs([]logpoller.Log{
{
EvmChainId: ubig.New(th.ChainID),
LogIndex: 1,
BlockHash: common.HexToHash("0x1234"),
BlockNumber: int64(10),
EventSig: topic,
Topics: [][]byte{topic[:]},
Address: common.HexToAddress("0x1234"),
TxHash: common.HexToHash("0x1888"),
Data: []byte("hello"),
EvmChainId: ubig.New(th.ChainID),
LogIndex: 1,
BlockHash: common.HexToHash("0x1234"),
BlockNumber: int64(10),
EventSig: topic,
Topics: [][]byte{topic[:]},
Address: common.HexToAddress("0x1234"),
TxHash: common.HexToHash("0x1888"),
Data: []byte("hello"),
BlockTimestamp: time.Now(),
},
{
EvmChainId: ubig.New(th.ChainID),
LogIndex: 2,
BlockHash: common.HexToHash("0x1234"),
BlockNumber: int64(11),
EventSig: topic,
Topics: [][]byte{topic[:]},
Address: common.HexToAddress("0x1234"),
TxHash: common.HexToHash("0x1888"),
Data: []byte("hello"),
EvmChainId: ubig.New(th.ChainID),
LogIndex: 2,
BlockHash: common.HexToHash("0x1234"),
BlockNumber: int64(11),
EventSig: topic,
Topics: [][]byte{topic[:]},
Address: common.HexToAddress("0x1234"),
TxHash: common.HexToHash("0x1888"),
Data: []byte("hello"),
BlockTimestamp: time.Now(),
},
{
EvmChainId: ubig.New(th.ChainID),
LogIndex: 3,
BlockHash: common.HexToHash("0x1234"),
BlockNumber: int64(12),
EventSig: topic,
Topics: [][]byte{topic[:]},
Address: common.HexToAddress("0x1235"),
TxHash: common.HexToHash("0x1888"),
Data: []byte("hello"),
EvmChainId: ubig.New(th.ChainID),
LogIndex: 3,
BlockHash: common.HexToHash("0x1234"),
BlockNumber: int64(12),
EventSig: topic,
Topics: [][]byte{topic[:]},
Address: common.HexToAddress("0x1235"),
TxHash: common.HexToHash("0x1888"),
Data: []byte("hello"),
BlockTimestamp: time.Now(),
},
{
EvmChainId: ubig.New(th.ChainID),
LogIndex: 4,
BlockHash: common.HexToHash("0x1234"),
BlockNumber: int64(13),
EventSig: topic,
Topics: [][]byte{topic[:]},
Address: common.HexToAddress("0x1235"),
TxHash: common.HexToHash("0x1888"),
Data: []byte("hello"),
EvmChainId: ubig.New(th.ChainID),
LogIndex: 4,
BlockHash: common.HexToHash("0x1234"),
BlockNumber: int64(13),
EventSig: topic,
Topics: [][]byte{topic[:]},
Address: common.HexToAddress("0x1235"),
TxHash: common.HexToHash("0x1888"),
Data: []byte("hello"),
BlockTimestamp: time.Now(),
},
{
EvmChainId: ubig.New(th.ChainID),
LogIndex: 5,
BlockHash: common.HexToHash("0x1234"),
BlockNumber: int64(14),
EventSig: topic2,
Topics: [][]byte{topic2[:]},
Address: common.HexToAddress("0x1234"),
TxHash: common.HexToHash("0x1888"),
Data: []byte("hello2"),
EvmChainId: ubig.New(th.ChainID),
LogIndex: 5,
BlockHash: common.HexToHash("0x1234"),
BlockNumber: int64(14),
EventSig: topic2,
Topics: [][]byte{topic2[:]},
Address: common.HexToAddress("0x1234"),
TxHash: common.HexToHash("0x1888"),
Data: []byte("hello2"),
BlockTimestamp: time.Now(),
},
{
EvmChainId: ubig.New(th.ChainID),
LogIndex: 6,
BlockHash: common.HexToHash("0x1234"),
BlockNumber: int64(15),
EventSig: topic2,
Topics: [][]byte{topic2[:]},
Address: common.HexToAddress("0x1235"),
TxHash: common.HexToHash("0x1888"),
Data: []byte("hello2"),
EvmChainId: ubig.New(th.ChainID),
LogIndex: 6,
BlockHash: common.HexToHash("0x1234"),
BlockNumber: int64(15),
EventSig: topic2,
Topics: [][]byte{topic2[:]},
Address: common.HexToAddress("0x1235"),
TxHash: common.HexToHash("0x1888"),
Data: []byte("hello2"),
BlockTimestamp: time.Now(),
},
{
EvmChainId: ubig.New(th.ChainID),
LogIndex: 7,
BlockHash: common.HexToHash("0x1237"),
BlockNumber: int64(16),
EventSig: topic,
Topics: [][]byte{topic[:]},
Address: common.HexToAddress("0x1236"),
TxHash: common.HexToHash("0x1888"),
Data: []byte("hello short retention"),
EvmChainId: ubig.New(th.ChainID),
LogIndex: 7,
BlockHash: common.HexToHash("0x1237"),
BlockNumber: int64(16),
EventSig: topic,
Topics: [][]byte{topic[:]},
Address: common.HexToAddress("0x1236"),
TxHash: common.HexToHash("0x1888"),
Data: []byte("hello short retention"),
BlockTimestamp: time.Now(),
},
{
EvmChainId: ubig.New(th.ChainID),
LogIndex: 8,
BlockHash: common.HexToHash("0x1238"),
BlockNumber: int64(17),
EventSig: topic2,
Topics: [][]byte{topic2[:]},
Address: common.HexToAddress("0x1236"),
TxHash: common.HexToHash("0x1888"),
Data: []byte("hello2 long retention"),
EvmChainId: ubig.New(th.ChainID),
LogIndex: 8,
BlockHash: common.HexToHash("0x1238"),
BlockNumber: int64(17),
EventSig: topic2,
Topics: [][]byte{topic2[:]},
Address: common.HexToAddress("0x1236"),
TxHash: common.HexToHash("0x1888"),
Data: []byte("hello2 long retention"),
BlockTimestamp: time.Now(),
},
}))

Expand Down
5 changes: 5 additions & 0 deletions core/store/migrate/migrations/0226_log_poller_index_drop.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- +goose Up
DROP INDEX evm.evm_logs_idx_created_at;

-- +goose Down
CREATE INDEX evm_logs_idx_created_at ON evm.logs (created_at);

0 comments on commit aa22ad5

Please sign in to comment.