diff --git a/.changeset/brown-geese-boil.md b/.changeset/brown-geese-boil.md new file mode 100644 index 00000000000..fa7f65f733c --- /dev/null +++ b/.changeset/brown-geese-boil.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +FilteredLogs receive Expression instead of whole KeyFilter. #internal diff --git a/core/chains/evm/logpoller/disabled.go b/core/chains/evm/logpoller/disabled.go index f30837bcfee..a29dd5ea11a 100644 --- a/core/chains/evm/logpoller/disabled.go +++ b/core/chains/evm/logpoller/disabled.go @@ -118,7 +118,7 @@ func (d disabled) LogsDataWordBetween(ctx context.Context, eventSig common.Hash, return nil, ErrDisabled } -func (d disabled) FilteredLogs(_ context.Context, _ query.KeyFilter, _ query.LimitAndSort, _ string) ([]Log, error) { +func (d disabled) FilteredLogs(_ context.Context, _ []query.Expression, _ query.LimitAndSort, _ string) ([]Log, error) { return nil, ErrDisabled } diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index a4560c967c4..2a551c12378 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -69,7 +69,7 @@ type LogPoller interface { LogsDataWordBetween(ctx context.Context, eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error) // chainlink-common query filtering - FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) + FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) } type LogPollerTest interface { @@ -1518,6 +1518,25 @@ func EvmWord(i uint64) common.Hash { return common.BytesToHash(b) } -func (lp *logPoller) FilteredLogs(ctx context.Context, queryFilter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) { +func (lp *logPoller) FilteredLogs(ctx context.Context, queryFilter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) { return lp.orm.FilteredLogs(ctx, queryFilter, limitAndSort, queryName) } + +// Where is a query.Where wrapper that ignores the Key and returns a slice of query.Expression rather than query.KeyFilter. +// If no expressions are provided, or an error occurs, an empty slice is returned. +func Where(expressions ...query.Expression) ([]query.Expression, error) { + filter, err := query.Where( + "", + expressions..., + ) + + if err != nil { + return []query.Expression{}, err + } + + if filter.Expressions == nil { + return []query.Expression{}, nil + } + + return filter.Expressions, nil +} diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index 73302877f95..6ad76030bb5 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -26,6 +26,8 @@ import ( "go.uber.org/zap/zapcore" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" + "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" commonutils "github.com/smartcontractkit/chainlink-common/pkg/utils" htMocks "github.com/smartcontractkit/chainlink/v2/common/headtracker/mocks" @@ -2091,3 +2093,39 @@ func TestFindLCA(t *testing.T) { }) } } + +func TestWhere(t *testing.T) { + address := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678") + eventSig := common.HexToHash("0xabcdef1234567890abcdef1234567890abcdef1234") + ts := time.Now() + + expr1 := logpoller.NewAddressFilter(address) + expr2 := logpoller.NewEventSigFilter(eventSig) + expr3 := query.Timestamp(uint64(ts.Unix()), primitives.Gte) + expr4 := logpoller.NewConfirmationsFilter(evmtypes.Confirmations(0)) + + t.Run("Valid combination of filters", func(t *testing.T) { + result, err := logpoller.Where(expr1, expr2, expr3, expr4) + assert.NoError(t, err) + assert.Equal(t, []query.Expression{expr1, expr2, expr3, expr4}, result) + }) + + t.Run("No expressions (should return empty slice)", func(t *testing.T) { + result, err := logpoller.Where() + assert.NoError(t, err) + assert.Equal(t, []query.Expression{}, result) + }) + + t.Run("Invalid boolean expression", func(t *testing.T) { + invalidExpr := query.Expression{ + BoolExpression: query.BoolExpression{ + Expressions: []query.Expression{}, + }, + } + + result, err := logpoller.Where(invalidExpr) + assert.Error(t, err) + assert.EqualError(t, err, "all boolean expressions should have at least 2 expressions") + assert.Equal(t, []query.Expression{}, result) + }) +} diff --git a/core/chains/evm/logpoller/mocks/log_poller.go b/core/chains/evm/logpoller/mocks/log_poller.go index 4ce68839d16..9ae4d9767c9 100644 --- a/core/chains/evm/logpoller/mocks/log_poller.go +++ b/core/chains/evm/logpoller/mocks/log_poller.go @@ -124,7 +124,7 @@ func (_c *LogPoller_DeleteLogsAndBlocksAfter_Call) RunAndReturn(run func(context } // FilteredLogs provides a mock function with given fields: ctx, filter, limitAndSort, queryName -func (_m *LogPoller) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]logpoller.Log, error) { +func (_m *LogPoller) FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]logpoller.Log, error) { ret := _m.Called(ctx, filter, limitAndSort, queryName) if len(ret) == 0 { @@ -133,10 +133,10 @@ func (_m *LogPoller) FilteredLogs(ctx context.Context, filter query.KeyFilter, l var r0 []logpoller.Log var r1 error - if rf, ok := ret.Get(0).(func(context.Context, query.KeyFilter, query.LimitAndSort, string) ([]logpoller.Log, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, []query.Expression, query.LimitAndSort, string) ([]logpoller.Log, error)); ok { return rf(ctx, filter, limitAndSort, queryName) } - if rf, ok := ret.Get(0).(func(context.Context, query.KeyFilter, query.LimitAndSort, string) []logpoller.Log); ok { + if rf, ok := ret.Get(0).(func(context.Context, []query.Expression, query.LimitAndSort, string) []logpoller.Log); ok { r0 = rf(ctx, filter, limitAndSort, queryName) } else { if ret.Get(0) != nil { @@ -144,7 +144,7 @@ func (_m *LogPoller) FilteredLogs(ctx context.Context, filter query.KeyFilter, l } } - if rf, ok := ret.Get(1).(func(context.Context, query.KeyFilter, query.LimitAndSort, string) error); ok { + if rf, ok := ret.Get(1).(func(context.Context, []query.Expression, query.LimitAndSort, string) error); ok { r1 = rf(ctx, filter, limitAndSort, queryName) } else { r1 = ret.Error(1) @@ -160,16 +160,16 @@ type LogPoller_FilteredLogs_Call struct { // FilteredLogs is a helper method to define mock.On call // - ctx context.Context -// - filter query.KeyFilter +// - filter []query.Expression // - limitAndSort query.LimitAndSort // - queryName string func (_e *LogPoller_Expecter) FilteredLogs(ctx interface{}, filter interface{}, limitAndSort interface{}, queryName interface{}) *LogPoller_FilteredLogs_Call { return &LogPoller_FilteredLogs_Call{Call: _e.mock.On("FilteredLogs", ctx, filter, limitAndSort, queryName)} } -func (_c *LogPoller_FilteredLogs_Call) Run(run func(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string)) *LogPoller_FilteredLogs_Call { +func (_c *LogPoller_FilteredLogs_Call) Run(run func(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string)) *LogPoller_FilteredLogs_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(query.KeyFilter), args[2].(query.LimitAndSort), args[3].(string)) + run(args[0].(context.Context), args[1].([]query.Expression), args[2].(query.LimitAndSort), args[3].(string)) }) return _c } @@ -179,7 +179,7 @@ func (_c *LogPoller_FilteredLogs_Call) Return(_a0 []logpoller.Log, _a1 error) *L return _c } -func (_c *LogPoller_FilteredLogs_Call) RunAndReturn(run func(context.Context, query.KeyFilter, query.LimitAndSort, string) ([]logpoller.Log, error)) *LogPoller_FilteredLogs_Call { +func (_c *LogPoller_FilteredLogs_Call) RunAndReturn(run func(context.Context, []query.Expression, query.LimitAndSort, string) ([]logpoller.Log, error)) *LogPoller_FilteredLogs_Call { _c.Call.Return(run) return _c } diff --git a/core/chains/evm/logpoller/observability.go b/core/chains/evm/logpoller/observability.go index 782307e7d06..e0ed0cc4786 100644 --- a/core/chains/evm/logpoller/observability.go +++ b/core/chains/evm/logpoller/observability.go @@ -262,7 +262,7 @@ func (o *ObservedORM) SelectIndexedLogsTopicRange(ctx context.Context, address c }) } -func (o *ObservedORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) { +func (o *ObservedORM) FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) { return withObservedQueryAndResults(o, queryName, func() ([]Log, error) { return o.ORM.FilteredLogs(ctx, filter, limitAndSort, queryName) }) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 22870efccf3..1c119c9f9ac 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -64,7 +64,7 @@ type ORM interface { SelectLogsDataWordBetween(ctx context.Context, address common.Address, eventSig common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error) // FilteredLogs accepts chainlink-common filtering DSL. - FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) + FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) } type DSORM struct { @@ -964,9 +964,8 @@ func (o *DSORM) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, si return logs, nil } -// TODO flaky BCF-3258 -func (o *DSORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, _ string) ([]Log, error) { - qs, args, err := (&pgDSLParser{}).buildQuery(o.chainID, filter.Expressions, limitAndSort) +func (o *DSORM) FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, _ string) ([]Log, error) { + qs, args, err := (&pgDSLParser{}).buildQuery(o.chainID, filter, limitAndSort) if err != nil { return nil, err } diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index ab8d126e10a..f5f44acf3df 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -631,7 +631,7 @@ func TestORM_IndexedLogs(t *testing.T) { require.Equal(t, 1, len(lgs)) assert.Equal(t, logpoller.EvmWord(1).Bytes(), lgs[0].GetTopics()[1].Bytes()) - lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1}), limiter, "") + lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1}).Expressions, limiter, "") require.NoError(t, err) require.Equal(t, 1, len(lgs)) assert.Equal(t, logpoller.EvmWord(1).Bytes(), lgs[0].GetTopics()[1].Bytes()) @@ -640,19 +640,17 @@ func TestORM_IndexedLogs(t *testing.T) { require.NoError(t, err) assert.Equal(t, 2, len(lgs)) - lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1, 2}), limiter, "") + lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1, 2}).Expressions, limiter, "") require.NoError(t, err) assert.Equal(t, 2, len(lgs)) - blockRangeFilter := func(start, end string, topicIdx uint64, topicValues []uint64) query.KeyFilter { - return query.KeyFilter{ - Expressions: []query.Expression{ - logpoller.NewAddressFilter(addr), - logpoller.NewEventSigFilter(eventSig), - filtersForTopics(topicIdx, topicValues), - query.Block(start, primitives.Gte), - query.Block(end, primitives.Lte), - }, + blockRangeFilter := func(start, end string, topicIdx uint64, topicValues []uint64) []query.Expression { + return []query.Expression{ + logpoller.NewAddressFilter(addr), + logpoller.NewEventSigFilter(eventSig), + filtersForTopics(topicIdx, topicValues), + query.Block(start, primitives.Gte), + query.Block(end, primitives.Lte), } } @@ -711,23 +709,21 @@ func TestORM_IndexedLogs(t *testing.T) { }, } - lgs, err = o1.FilteredLogs(ctx, filter, limiter, "") + lgs, err = o1.FilteredLogs(ctx, filter.Expressions, limiter, "") require.NoError(t, err) assert.Equal(t, 2, len(lgs)) - rangeFilter := func(topicIdx uint64, min, max uint64) query.KeyFilter { - return query.KeyFilter{ - Expressions: []query.Expression{ - logpoller.NewAddressFilter(addr), - logpoller.NewEventSigFilter(eventSig), - logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(min).Hex(), Operator: primitives.Gte}, - }), - logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(max).Hex(), Operator: primitives.Lte}, - }), - query.Confidence(primitives.Unconfirmed), - }, + rangeFilter := func(topicIdx uint64, min, max uint64) []query.Expression { + return []query.Expression{ + logpoller.NewAddressFilter(addr), + logpoller.NewEventSigFilter(eventSig), + logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{ + {Value: logpoller.EvmWord(min).Hex(), Operator: primitives.Gte}, + }), + logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{ + {Value: logpoller.EvmWord(max).Hex(), Operator: primitives.Lte}, + }), + query.Confidence(primitives.Unconfirmed), } } @@ -835,7 +831,7 @@ func TestORM_SelectIndexedLogsByTxHash(t *testing.T) { }, } - retrievedLogs, err = o1.FilteredLogs(ctx, filter, limiter, "") + retrievedLogs, err = o1.FilteredLogs(ctx, filter.Expressions, limiter, "") require.NoError(t, err) require.Equal(t, 2, len(retrievedLogs)) @@ -876,19 +872,17 @@ func TestORM_DataWords(t *testing.T) { }, })) - wordFilter := func(wordIdx uint8, word1, word2 uint64) query.KeyFilter { - return query.KeyFilter{ - Expressions: []query.Expression{ - logpoller.NewAddressFilter(addr), - logpoller.NewEventSigFilter(eventSig), - logpoller.NewEventByWordFilter(eventSig, wordIdx, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(word1).Hex(), Operator: primitives.Gte}, - }), - logpoller.NewEventByWordFilter(eventSig, wordIdx, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(word2).Hex(), Operator: primitives.Lte}, - }), - query.Confidence(primitives.Unconfirmed), - }, + wordFilter := func(wordIdx uint8, word1, word2 uint64) []query.Expression { + return []query.Expression{ + logpoller.NewAddressFilter(addr), + logpoller.NewEventSigFilter(eventSig), + logpoller.NewEventByWordFilter(eventSig, wordIdx, []primitives.ValueComparator{ + {Value: logpoller.EvmWord(word1).Hex(), Operator: primitives.Gte}, + }), + logpoller.NewEventByWordFilter(eventSig, wordIdx, []primitives.ValueComparator{ + {Value: logpoller.EvmWord(word2).Hex(), Operator: primitives.Lte}, + }), + query.Confidence(primitives.Unconfirmed), } } @@ -947,15 +941,13 @@ func TestORM_DataWords(t *testing.T) { require.NoError(t, err) assert.Equal(t, 2, len(lgs)) - filter := query.KeyFilter{ - Expressions: []query.Expression{ - logpoller.NewAddressFilter(addr), - logpoller.NewEventSigFilter(eventSig), - logpoller.NewEventByWordFilter(eventSig, 0, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(1).Hex(), Operator: primitives.Gte}, - }), - query.Confidence(primitives.Unconfirmed), - }, + filter := []query.Expression{ + logpoller.NewAddressFilter(addr), + logpoller.NewEventSigFilter(eventSig), + logpoller.NewEventByWordFilter(eventSig, 0, []primitives.ValueComparator{ + {Value: logpoller.EvmWord(1).Hex(), Operator: primitives.Gte}, + }), + query.Confidence(primitives.Unconfirmed), } lgs, err = o1.FilteredLogs(ctx, filter, limiter, "") @@ -1099,7 +1091,7 @@ func TestORM_SelectLogsWithSigsByBlockRangeFilter(t *testing.T) { }) assertion(t, logs, err, startBlock, endBlock) - logs, err = th.ORM.FilteredLogs(ctx, filter([]common.Hash{topic, topic2}, strconv.Itoa(int(startBlock)), strconv.Itoa(int(endBlock))), limiter, "") + logs, err = th.ORM.FilteredLogs(ctx, filter([]common.Hash{topic, topic2}, strconv.Itoa(int(startBlock)), strconv.Itoa(int(endBlock))).Expressions, limiter, "") assertion(t, logs, err, startBlock, endBlock) } @@ -1161,14 +1153,12 @@ func TestLogPoller_Logs(t *testing.T) { assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000005", lgs[4].BlockHash.String()) assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000005", lgs[5].BlockHash.String()) - logFilter := func(start, end string, address common.Address) query.KeyFilter { - return query.KeyFilter{ - Expressions: []query.Expression{ - logpoller.NewAddressFilter(address), - logpoller.NewEventSigFilter(event1), - query.Block(start, primitives.Gte), - query.Block(end, primitives.Lte), - }, + logFilter := func(start, end string, address common.Address) []query.Expression { + return []query.Expression{ + logpoller.NewAddressFilter(address), + logpoller.NewEventSigFilter(event1), + query.Block(start, primitives.Gte), + query.Block(end, primitives.Lte), } } @@ -1722,7 +1712,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) { assertion(t, logs, err, tt.expectedLogs) - logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 0, nil), limiter, "") + logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 0, nil).Expressions, limiter, "") assertion(t, logs, err, tt.expectedLogs) }) @@ -1735,7 +1725,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) { assertion(t, logs, err, tt.expectedLogs) - logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 1, []common.Hash{event}), limiter, "") + logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 1, []common.Hash{event}).Expressions, limiter, "") assertion(t, logs, err, tt.expectedLogs) }) @@ -1991,7 +1981,7 @@ func TestSelectLogsDataWordBetween(t *testing.T) { assertion(t, logs, err, tt.expectedLogs) - logs, err = th.ORM.FilteredLogs(ctx, wordFilter(tt.wordValue), limiter, "") + logs, err = th.ORM.FilteredLogs(ctx, wordFilter(tt.wordValue).Expressions, limiter, "") assertion(t, logs, err, tt.expectedLogs) }) diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go index f6e957746e6..f53e44a2894 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go @@ -337,8 +337,7 @@ func (c *CommitStore) GetAcceptedCommitReportsGteTimestamp(ctx context.Context, return nil, err } - reportsQuery, err := query.Where( - c.address.String(), + reportsQuery, err := logpoller.Where( logpoller.NewAddressFilter(c.address), logpoller.NewEventSigFilter(c.reportAcceptedSig), query.Timestamp(uint64(ts.Unix()), primitives.Gte), diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go index 2b87a7913ac..d330cab3a5d 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go @@ -349,8 +349,7 @@ func (c *CommitStore) GetAcceptedCommitReportsGteTimestamp(ctx context.Context, return nil, err } - reportsQuery, err := query.Where( - c.address.String(), + reportsQuery, err := logpoller.Where( logpoller.NewAddressFilter(c.address), logpoller.NewEventSigFilter(c.reportAcceptedSig), query.Timestamp(uint64(ts.Unix()), primitives.Gte), diff --git a/core/services/relay/evm/event_binding.go b/core/services/relay/evm/event_binding.go index 7b62d862b35..ae67fae9fcd 100644 --- a/core/services/relay/evm/event_binding.go +++ b/core/services/relay/evm/event_binding.go @@ -169,7 +169,7 @@ func (e *eventBinding) QueryKey(ctx context.Context, filter query.KeyFilter, lim } remapped.Expressions = append(defaultExpressions, remapped.Expressions...) - logs, err := e.lp.FilteredLogs(ctx, remapped, limitAndSort, e.contractName+"-"+e.address.String()+"-"+e.eventName) + logs, err := e.lp.FilteredLogs(ctx, remapped.Expressions, limitAndSort, e.contractName+"-"+e.address.String()+"-"+e.eventName) if err != nil { return nil, err } @@ -229,8 +229,7 @@ func (e *eventBinding) getLatestValueWithFilters( // Create limiter and filter for the query. limiter := query.NewLimitAndSort(query.CountLimit(1), query.NewSortBySequence(query.Desc)) - filter, err := query.Where( - "", + filter, err := logpoller.Where( logpoller.NewAddressFilter(e.address), logpoller.NewEventSigFilter(e.hash), logpoller.NewConfirmationsFilter(confs),