Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BCI-3988] - FilteredLogs receive []Expression instead of whole KeyFilter #14109

Merged
merged 13 commits into from
Sep 4, 2024
5 changes: 5 additions & 0 deletions .changeset/brown-geese-boil.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

FilteredLogs receive Expression instead of whole KeyFilter. #internal
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (d disabled) LogsDataWordBetween(ctx context.Context, eventSig common.Hash,
return nil, ErrDisabled
}

func (d disabled) FilteredLogs(_ context.Context, _ query.KeyFilter, _ query.LimitAndSort, _ string) ([]Log, error) {
func (d disabled) FilteredLogs(_ context.Context, _ []query.Expression, _ query.LimitAndSort, _ string) ([]Log, error) {
return nil, ErrDisabled
}

Expand Down
23 changes: 21 additions & 2 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type LogPoller interface {
LogsDataWordBetween(ctx context.Context, eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error)

// chainlink-common query filtering
FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)
FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)
}

type LogPollerTest interface {
Expand Down Expand Up @@ -1518,6 +1518,25 @@ func EvmWord(i uint64) common.Hash {
return common.BytesToHash(b)
}

func (lp *logPoller) FilteredLogs(ctx context.Context, queryFilter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) {
func (lp *logPoller) FilteredLogs(ctx context.Context, queryFilter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) {
return lp.orm.FilteredLogs(ctx, queryFilter, limitAndSort, queryName)
}

// Where is a query.Where wrapper that ignores the Key and returns a slice of query.Expression rather than query.KeyFilter.
// If no expressions are provided, or an error occurs, an empty slice is returned.
func Where(expressions ...query.Expression) ([]query.Expression, error) {
filter, err := query.Where(
"",
expressions...,
)

if err != nil {
return []query.Expression{}, err
}

if filter.Expressions == nil {
return []query.Expression{}, nil
}

return filter.Expressions, nil
}
38 changes: 38 additions & 0 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"go.uber.org/zap/zapcore"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/types/query"
"github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives"
commonutils "github.com/smartcontractkit/chainlink-common/pkg/utils"

htMocks "github.com/smartcontractkit/chainlink/v2/common/headtracker/mocks"
Expand Down Expand Up @@ -2091,3 +2093,39 @@ func TestFindLCA(t *testing.T) {
})
}
}

func TestWhere(t *testing.T) {
address := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678")
eventSig := common.HexToHash("0xabcdef1234567890abcdef1234567890abcdef1234")
ts := time.Now()

expr1 := logpoller.NewAddressFilter(address)
expr2 := logpoller.NewEventSigFilter(eventSig)
expr3 := query.Timestamp(uint64(ts.Unix()), primitives.Gte)
expr4 := logpoller.NewConfirmationsFilter(evmtypes.Confirmations(0))

t.Run("Valid combination of filters", func(t *testing.T) {
result, err := logpoller.Where(expr1, expr2, expr3, expr4)
assert.NoError(t, err)
assert.Equal(t, []query.Expression{expr1, expr2, expr3, expr4}, result)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume these filters are being combined as a logical "AND" - the events need to match all filters. What if you want a logical "OR" - events that match at least one of the filters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was testing this function at a high level, maybe it's redundant given the underlying tests. More details in my previous comment here: #14109 (comment)

})

t.Run("No expressions (should return empty slice)", func(t *testing.T) {
result, err := logpoller.Where()
assert.NoError(t, err)
assert.Equal(t, []query.Expression{}, result)
})

t.Run("Invalid boolean expression", func(t *testing.T) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are the valid and invalid expressions documented somewhere? Would it make sense to go through all them in these tests and show how to properly use them?

Copy link
Contributor Author

@Farber98 Farber98 Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logpoller.Where() function it's just a wrapper of the original one query.Where() where you don't need to provide a key param and just the expressions

The underlying function has extensive tests here which uses the test cases defined here. Also the usage eg. defined in the function comment is clear

Let me know your thoughts if this seems enough

invalidExpr := query.Expression{
BoolExpression: query.BoolExpression{
Expressions: []query.Expression{},
},
}

result, err := logpoller.Where(invalidExpr)
assert.Error(t, err)
assert.EqualError(t, err, "all boolean expressions should have at least 2 expressions")
assert.Equal(t, []query.Expression{}, result)
})
}
16 changes: 8 additions & 8 deletions core/chains/evm/logpoller/mocks/log_poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (o *ObservedORM) SelectIndexedLogsTopicRange(ctx context.Context, address c
})
}

func (o *ObservedORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) {
func (o *ObservedORM) FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) {
return withObservedQueryAndResults(o, queryName, func() ([]Log, error) {
return o.ORM.FilteredLogs(ctx, filter, limitAndSort, queryName)
})
Expand Down
7 changes: 3 additions & 4 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type ORM interface {
SelectLogsDataWordBetween(ctx context.Context, address common.Address, eventSig common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error)

// FilteredLogs accepts chainlink-common filtering DSL.
FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)
FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)
}

type DSORM struct {
Expand Down Expand Up @@ -964,9 +964,8 @@ func (o *DSORM) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, si
return logs, nil
}

// TODO flaky BCF-3258
func (o *DSORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, _ string) ([]Log, error) {
qs, args, err := (&pgDSLParser{}).buildQuery(o.chainID, filter.Expressions, limitAndSort)
func (o *DSORM) FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, _ string) ([]Log, error) {
qs, args, err := (&pgDSLParser{}).buildQuery(o.chainID, filter, limitAndSort)
if err != nil {
return nil, err
}
Expand Down
110 changes: 50 additions & 60 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ func TestORM_IndexedLogs(t *testing.T) {
require.Equal(t, 1, len(lgs))
assert.Equal(t, logpoller.EvmWord(1).Bytes(), lgs[0].GetTopics()[1].Bytes())

lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1}), limiter, "")
lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1}).Expressions, limiter, "")
require.NoError(t, err)
require.Equal(t, 1, len(lgs))
assert.Equal(t, logpoller.EvmWord(1).Bytes(), lgs[0].GetTopics()[1].Bytes())
Expand All @@ -640,19 +640,17 @@ func TestORM_IndexedLogs(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, 2, len(lgs))

lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1, 2}), limiter, "")
lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1, 2}).Expressions, limiter, "")
require.NoError(t, err)
assert.Equal(t, 2, len(lgs))

blockRangeFilter := func(start, end string, topicIdx uint64, topicValues []uint64) query.KeyFilter {
return query.KeyFilter{
Expressions: []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(eventSig),
filtersForTopics(topicIdx, topicValues),
query.Block(start, primitives.Gte),
query.Block(end, primitives.Lte),
},
blockRangeFilter := func(start, end string, topicIdx uint64, topicValues []uint64) []query.Expression {
return []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(eventSig),
filtersForTopics(topicIdx, topicValues),
query.Block(start, primitives.Gte),
query.Block(end, primitives.Lte),
}
}

Expand Down Expand Up @@ -711,23 +709,21 @@ func TestORM_IndexedLogs(t *testing.T) {
},
}

lgs, err = o1.FilteredLogs(ctx, filter, limiter, "")
lgs, err = o1.FilteredLogs(ctx, filter.Expressions, limiter, "")
require.NoError(t, err)
assert.Equal(t, 2, len(lgs))

rangeFilter := func(topicIdx uint64, min, max uint64) query.KeyFilter {
return query.KeyFilter{
Expressions: []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(eventSig),
logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{
{Value: logpoller.EvmWord(min).Hex(), Operator: primitives.Gte},
}),
logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{
{Value: logpoller.EvmWord(max).Hex(), Operator: primitives.Lte},
}),
query.Confidence(primitives.Unconfirmed),
},
rangeFilter := func(topicIdx uint64, min, max uint64) []query.Expression {
return []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(eventSig),
logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{
{Value: logpoller.EvmWord(min).Hex(), Operator: primitives.Gte},
}),
logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{
{Value: logpoller.EvmWord(max).Hex(), Operator: primitives.Lte},
}),
query.Confidence(primitives.Unconfirmed),
}
}

Expand Down Expand Up @@ -835,7 +831,7 @@ func TestORM_SelectIndexedLogsByTxHash(t *testing.T) {
},
}

retrievedLogs, err = o1.FilteredLogs(ctx, filter, limiter, "")
retrievedLogs, err = o1.FilteredLogs(ctx, filter.Expressions, limiter, "")
require.NoError(t, err)

require.Equal(t, 2, len(retrievedLogs))
Expand Down Expand Up @@ -876,19 +872,17 @@ func TestORM_DataWords(t *testing.T) {
},
}))

wordFilter := func(wordIdx uint8, word1, word2 uint64) query.KeyFilter {
return query.KeyFilter{
Expressions: []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(eventSig),
logpoller.NewEventByWordFilter(eventSig, wordIdx, []primitives.ValueComparator{
{Value: logpoller.EvmWord(word1).Hex(), Operator: primitives.Gte},
}),
logpoller.NewEventByWordFilter(eventSig, wordIdx, []primitives.ValueComparator{
{Value: logpoller.EvmWord(word2).Hex(), Operator: primitives.Lte},
}),
query.Confidence(primitives.Unconfirmed),
},
wordFilter := func(wordIdx uint8, word1, word2 uint64) []query.Expression {
return []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(eventSig),
logpoller.NewEventByWordFilter(eventSig, wordIdx, []primitives.ValueComparator{
{Value: logpoller.EvmWord(word1).Hex(), Operator: primitives.Gte},
}),
logpoller.NewEventByWordFilter(eventSig, wordIdx, []primitives.ValueComparator{
{Value: logpoller.EvmWord(word2).Hex(), Operator: primitives.Lte},
}),
query.Confidence(primitives.Unconfirmed),
}
}

Expand Down Expand Up @@ -947,15 +941,13 @@ func TestORM_DataWords(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, 2, len(lgs))

filter := query.KeyFilter{
Expressions: []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(eventSig),
logpoller.NewEventByWordFilter(eventSig, 0, []primitives.ValueComparator{
{Value: logpoller.EvmWord(1).Hex(), Operator: primitives.Gte},
}),
query.Confidence(primitives.Unconfirmed),
},
filter := []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(eventSig),
logpoller.NewEventByWordFilter(eventSig, 0, []primitives.ValueComparator{
{Value: logpoller.EvmWord(1).Hex(), Operator: primitives.Gte},
}),
query.Confidence(primitives.Unconfirmed),
}

lgs, err = o1.FilteredLogs(ctx, filter, limiter, "")
Farber98 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -1099,7 +1091,7 @@ func TestORM_SelectLogsWithSigsByBlockRangeFilter(t *testing.T) {
})

assertion(t, logs, err, startBlock, endBlock)
logs, err = th.ORM.FilteredLogs(ctx, filter([]common.Hash{topic, topic2}, strconv.Itoa(int(startBlock)), strconv.Itoa(int(endBlock))), limiter, "")
logs, err = th.ORM.FilteredLogs(ctx, filter([]common.Hash{topic, topic2}, strconv.Itoa(int(startBlock)), strconv.Itoa(int(endBlock))).Expressions, limiter, "")

assertion(t, logs, err, startBlock, endBlock)
}
Expand Down Expand Up @@ -1161,14 +1153,12 @@ func TestLogPoller_Logs(t *testing.T) {
assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000005", lgs[4].BlockHash.String())
assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000005", lgs[5].BlockHash.String())

logFilter := func(start, end string, address common.Address) query.KeyFilter {
return query.KeyFilter{
Expressions: []query.Expression{
logpoller.NewAddressFilter(address),
logpoller.NewEventSigFilter(event1),
query.Block(start, primitives.Gte),
query.Block(end, primitives.Lte),
},
logFilter := func(start, end string, address common.Address) []query.Expression {
return []query.Expression{
logpoller.NewAddressFilter(address),
logpoller.NewEventSigFilter(event1),
query.Block(start, primitives.Gte),
query.Block(end, primitives.Lte),
}
}

Expand Down Expand Up @@ -1722,7 +1712,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) {

assertion(t, logs, err, tt.expectedLogs)

logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 0, nil), limiter, "")
logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 0, nil).Expressions, limiter, "")

assertion(t, logs, err, tt.expectedLogs)
})
Expand All @@ -1735,7 +1725,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) {

assertion(t, logs, err, tt.expectedLogs)

logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 1, []common.Hash{event}), limiter, "")
logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 1, []common.Hash{event}).Expressions, limiter, "")

assertion(t, logs, err, tt.expectedLogs)
})
Expand Down Expand Up @@ -1991,7 +1981,7 @@ func TestSelectLogsDataWordBetween(t *testing.T) {

assertion(t, logs, err, tt.expectedLogs)

logs, err = th.ORM.FilteredLogs(ctx, wordFilter(tt.wordValue), limiter, "")
logs, err = th.ORM.FilteredLogs(ctx, wordFilter(tt.wordValue).Expressions, limiter, "")

assertion(t, logs, err, tt.expectedLogs)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,7 @@ func (c *CommitStore) GetAcceptedCommitReportsGteTimestamp(ctx context.Context,
return nil, err
}

reportsQuery, err := query.Where(
c.address.String(),
reportsQuery, err := logpoller.Where(
logpoller.NewAddressFilter(c.address),
logpoller.NewEventSigFilter(c.reportAcceptedSig),
query.Timestamp(uint64(ts.Unix()), primitives.Gte),
Expand Down
Loading
Loading