Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BCI-3988] - FilteredLogs receive []Expression instead of whole KeyFilter #14109

Merged
merged 13 commits into from
Sep 4, 2024
5 changes: 5 additions & 0 deletions .changeset/brown-geese-boil.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

FilteredLogs receive Expression instead of whole KeyFilter. #internal
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (d disabled) LogsDataWordBetween(ctx context.Context, eventSig common.Hash,
return nil, ErrDisabled
}

func (d disabled) FilteredLogs(_ context.Context, _ query.KeyFilter, _ query.LimitAndSort, _ string) ([]Log, error) {
func (d disabled) FilteredLogs(_ context.Context, _ []query.Expression, _ query.LimitAndSort, _ string) ([]Log, error) {
return nil, ErrDisabled
}

Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type LogPoller interface {
LogsDataWordBetween(ctx context.Context, eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error)

// chainlink-common query filtering
FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)
FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)
}

type LogPollerTest interface {
Expand Down Expand Up @@ -1518,6 +1518,6 @@ func EvmWord(i uint64) common.Hash {
return common.BytesToHash(b)
}

func (lp *logPoller) FilteredLogs(ctx context.Context, queryFilter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) {
func (lp *logPoller) FilteredLogs(ctx context.Context, queryFilter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) {
return lp.orm.FilteredLogs(ctx, queryFilter, limitAndSort, queryName)
}
16 changes: 8 additions & 8 deletions core/chains/evm/logpoller/mocks/log_poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (o *ObservedORM) SelectIndexedLogsTopicRange(ctx context.Context, address c
})
}

func (o *ObservedORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) {
func (o *ObservedORM) FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) {
return withObservedQueryAndResults(o, queryName, func() ([]Log, error) {
return o.ORM.FilteredLogs(ctx, filter, limitAndSort, queryName)
})
Expand Down
7 changes: 3 additions & 4 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type ORM interface {
SelectLogsDataWordBetween(ctx context.Context, address common.Address, eventSig common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error)

// FilteredLogs accepts chainlink-common filtering DSL.
FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)
FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)
}

type DSORM struct {
Expand Down Expand Up @@ -964,9 +964,8 @@ func (o *DSORM) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, si
return logs, nil
}

// TODO flaky BCF-3258
func (o *DSORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, _ string) ([]Log, error) {
qs, args, err := (&pgDSLParser{}).buildQuery(o.chainID, filter.Expressions, limitAndSort)
func (o *DSORM) FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, _ string) ([]Log, error) {
qs, args, err := (&pgDSLParser{}).buildQuery(o.chainID, filter, limitAndSort)
if err != nil {
return nil, err
}
Expand Down
46 changes: 23 additions & 23 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ func TestORM_IndexedLogs(t *testing.T) {
require.Equal(t, 1, len(lgs))
assert.Equal(t, logpoller.EvmWord(1).Bytes(), lgs[0].GetTopics()[1].Bytes())

lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1}), limiter, "")
lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1}).Expressions, limiter, "")
require.NoError(t, err)
require.Equal(t, 1, len(lgs))
assert.Equal(t, logpoller.EvmWord(1).Bytes(), lgs[0].GetTopics()[1].Bytes())
Expand All @@ -640,7 +640,7 @@ func TestORM_IndexedLogs(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, 2, len(lgs))

lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1, 2}), limiter, "")
lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1, 2}).Expressions, limiter, "")
require.NoError(t, err)
assert.Equal(t, 2, len(lgs))

Expand All @@ -660,39 +660,39 @@ func TestORM_IndexedLogs(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, 1, len(lgs))

lgs, err = o1.FilteredLogs(ctx, blockRangeFilter("1", "1", 1, []uint64{1}), limiter, "")
lgs, err = o1.FilteredLogs(ctx, blockRangeFilter("1", "1", 1, []uint64{1}).Expressions, limiter, "")
require.NoError(t, err)
assert.Equal(t, 1, len(lgs))

lgs, err = o1.SelectIndexedLogsByBlockRange(ctx, 1, 2, addr, eventSig, 1, []common.Hash{logpoller.EvmWord(2)})
require.NoError(t, err)
assert.Equal(t, 1, len(lgs))

lgs, err = o1.FilteredLogs(ctx, blockRangeFilter("1", "2", 1, []uint64{2}), limiter, "")
lgs, err = o1.FilteredLogs(ctx, blockRangeFilter("1", "2", 1, []uint64{2}).Expressions, limiter, "")
require.NoError(t, err)
assert.Equal(t, 1, len(lgs))

lgs, err = o1.SelectIndexedLogsByBlockRange(ctx, 1, 2, addr, eventSig, 1, []common.Hash{logpoller.EvmWord(1)})
require.NoError(t, err)
assert.Equal(t, 1, len(lgs))

lgs, err = o1.FilteredLogs(ctx, blockRangeFilter("1", "2", 1, []uint64{1}), limiter, "")
lgs, err = o1.FilteredLogs(ctx, blockRangeFilter("1", "2", 1, []uint64{1}).Expressions, limiter, "")
Farber98 marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
assert.Equal(t, 1, len(lgs))

_, err = o1.SelectIndexedLogsByBlockRange(ctx, 1, 2, addr, eventSig, 0, []common.Hash{logpoller.EvmWord(1)})
require.Error(t, err)
assert.Contains(t, err.Error(), "invalid index for topic: 0")

_, err = o1.FilteredLogs(ctx, blockRangeFilter("1", "2", 0, []uint64{1}), limiter, "")
_, err = o1.FilteredLogs(ctx, blockRangeFilter("1", "2", 0, []uint64{1}).Expressions, limiter, "")
require.Error(t, err)
assert.Contains(t, err.Error(), "invalid index for topic: 0")

_, err = o1.SelectIndexedLogsByBlockRange(ctx, 1, 2, addr, eventSig, 4, []common.Hash{logpoller.EvmWord(1)})
require.Error(t, err)
assert.Contains(t, err.Error(), "invalid index for topic: 4")

_, err = o1.FilteredLogs(ctx, blockRangeFilter("1", "2", 4, []uint64{1}), limiter, "")
_, err = o1.FilteredLogs(ctx, blockRangeFilter("1", "2", 4, []uint64{1}).Expressions, limiter, "")
require.Error(t, err)
assert.Contains(t, err.Error(), "invalid index for topic: 4")

Expand All @@ -711,7 +711,7 @@ func TestORM_IndexedLogs(t *testing.T) {
},
}

lgs, err = o1.FilteredLogs(ctx, filter, limiter, "")
lgs, err = o1.FilteredLogs(ctx, filter.Expressions, limiter, "")
require.NoError(t, err)
assert.Equal(t, 2, len(lgs))

Expand All @@ -736,7 +736,7 @@ func TestORM_IndexedLogs(t *testing.T) {
assert.Equal(t, 1, len(lgs))
assert.Equal(t, logpoller.EvmWord(3).Bytes(), lgs[0].GetTopics()[1].Bytes())

lgs, err = o1.FilteredLogs(ctx, rangeFilter(1, 3, 3), limiter, "")
lgs, err = o1.FilteredLogs(ctx, rangeFilter(1, 3, 3).Expressions, limiter, "")
require.NoError(t, err)
assert.Equal(t, 1, len(lgs))
assert.Equal(t, logpoller.EvmWord(3).Bytes(), lgs[0].GetTopics()[1].Bytes())
Expand All @@ -745,7 +745,7 @@ func TestORM_IndexedLogs(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, 3, len(lgs))

lgs, err = o1.FilteredLogs(ctx, rangeFilter(1, 1, 3), limiter, "")
lgs, err = o1.FilteredLogs(ctx, rangeFilter(1, 1, 3).Expressions, limiter, "")
Farber98 marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
assert.Equal(t, 3, len(lgs))

Expand Down Expand Up @@ -835,7 +835,7 @@ func TestORM_SelectIndexedLogsByTxHash(t *testing.T) {
},
}

retrievedLogs, err = o1.FilteredLogs(ctx, filter, limiter, "")
retrievedLogs, err = o1.FilteredLogs(ctx, filter.Expressions, limiter, "")
require.NoError(t, err)

require.Equal(t, 2, len(retrievedLogs))
Expand Down Expand Up @@ -899,7 +899,7 @@ func TestORM_DataWords(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 0, len(lgs))

lgs, err = o1.FilteredLogs(ctx, wordFilter(0, 2, 2), limiter, "")
lgs, err = o1.FilteredLogs(ctx, wordFilter(0, 2, 2).Expressions, limiter, "")
Farber98 marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
require.Equal(t, 0, len(lgs))

Expand All @@ -908,7 +908,7 @@ func TestORM_DataWords(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(lgs))

lgs, err = o1.FilteredLogs(ctx, wordFilter(0, 1, 2), limiter, "")
lgs, err = o1.FilteredLogs(ctx, wordFilter(0, 1, 2).Expressions, limiter, "")
require.NoError(t, err)
require.Equal(t, 1, len(lgs))

Expand All @@ -917,7 +917,7 @@ func TestORM_DataWords(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(lgs))

lgs, err = o1.FilteredLogs(ctx, wordFilter(0, 1, 1), limiter, "")
lgs, err = o1.FilteredLogs(ctx, wordFilter(0, 1, 1).Expressions, limiter, "")
require.NoError(t, err)
require.Equal(t, 1, len(lgs))

Expand All @@ -926,7 +926,7 @@ func TestORM_DataWords(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 0, len(lgs))

lgs, err = o1.FilteredLogs(ctx, wordFilter(1, 3, 3), limiter, "")
lgs, err = o1.FilteredLogs(ctx, wordFilter(1, 3, 3).Expressions, limiter, "")
require.NoError(t, err)
require.Equal(t, 0, len(lgs))

Expand All @@ -937,7 +937,7 @@ func TestORM_DataWords(t *testing.T) {
require.Equal(t, 1, len(lgs))
require.Equal(t, lgs[0].Data, append(logpoller.EvmWord(2).Bytes(), logpoller.EvmWord(3).Bytes()...))

lgs, err = o1.FilteredLogs(ctx, wordFilter(1, 3, 3), limiter, "")
lgs, err = o1.FilteredLogs(ctx, wordFilter(1, 3, 3).Expressions, limiter, "")
require.NoError(t, err)
require.Equal(t, 1, len(lgs))
require.Equal(t, lgs[0].Data, append(logpoller.EvmWord(2).Bytes(), logpoller.EvmWord(3).Bytes()...))
Expand All @@ -958,7 +958,7 @@ func TestORM_DataWords(t *testing.T) {
},
}

lgs, err = o1.FilteredLogs(ctx, filter, limiter, "")
Farber98 marked this conversation as resolved.
Show resolved Hide resolved
lgs, err = o1.FilteredLogs(ctx, filter.Expressions, limiter, "")
require.NoError(t, err)
assert.Equal(t, 2, len(lgs))
}
Expand Down Expand Up @@ -1099,7 +1099,7 @@ func TestORM_SelectLogsWithSigsByBlockRangeFilter(t *testing.T) {
})

assertion(t, logs, err, startBlock, endBlock)
logs, err = th.ORM.FilteredLogs(ctx, filter([]common.Hash{topic, topic2}, strconv.Itoa(int(startBlock)), strconv.Itoa(int(endBlock))), limiter, "")
logs, err = th.ORM.FilteredLogs(ctx, filter([]common.Hash{topic, topic2}, strconv.Itoa(int(startBlock)), strconv.Itoa(int(endBlock))).Expressions, limiter, "")

assertion(t, logs, err, startBlock, endBlock)
}
Expand Down Expand Up @@ -1182,7 +1182,7 @@ func TestLogPoller_Logs(t *testing.T) {
assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000005", lgs[1].BlockHash.String())
assert.Equal(t, address1, lgs[1].Address)

lgs, err = th.ORM.FilteredLogs(ctx, logFilter("1", "3", address1), query.LimitAndSort{
lgs, err = th.ORM.FilteredLogs(ctx, logFilter("1", "3", address1).Expressions, query.LimitAndSort{
Farber98 marked this conversation as resolved.
Show resolved Hide resolved
SortBy: []query.SortBy{query.NewSortBySequence(query.Asc)},
}, "")
require.NoError(t, err)
Expand All @@ -1202,7 +1202,7 @@ func TestLogPoller_Logs(t *testing.T) {
assert.Equal(t, address2, lgs[0].Address)
assert.Equal(t, event1.Bytes(), lgs[0].Topics[0])

lgs, err = th.ORM.FilteredLogs(ctx, logFilter("2", "2", address2), query.LimitAndSort{
lgs, err = th.ORM.FilteredLogs(ctx, logFilter("2", "2", address2).Expressions, query.LimitAndSort{
SortBy: []query.SortBy{query.NewSortBySequence(query.Asc)},
}, "")
require.NoError(t, err)
Expand Down Expand Up @@ -1722,7 +1722,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) {

assertion(t, logs, err, tt.expectedLogs)

logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 0, nil), limiter, "")
logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 0, nil).Expressions, limiter, "")

assertion(t, logs, err, tt.expectedLogs)
})
Expand All @@ -1735,7 +1735,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) {

assertion(t, logs, err, tt.expectedLogs)

logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 1, []common.Hash{event}), limiter, "")
logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 1, []common.Hash{event}).Expressions, limiter, "")

assertion(t, logs, err, tt.expectedLogs)
})
Expand Down Expand Up @@ -1991,7 +1991,7 @@ func TestSelectLogsDataWordBetween(t *testing.T) {

assertion(t, logs, err, tt.expectedLogs)

logs, err = th.ORM.FilteredLogs(ctx, wordFilter(tt.wordValue), limiter, "")
logs, err = th.ORM.FilteredLogs(ctx, wordFilter(tt.wordValue).Expressions, limiter, "")

assertion(t, logs, err, tt.expectedLogs)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ func (c *CommitStore) GetAcceptedCommitReportsGteTimestamp(ctx context.Context,
}

reportsQuery, err := query.Where(
c.address.String(),
// no key needed as logpoller.NewAddressFilter is used
"",
ilija42 marked this conversation as resolved.
Show resolved Hide resolved
logpoller.NewAddressFilter(c.address),
logpoller.NewEventSigFilter(c.reportAcceptedSig),
query.Timestamp(uint64(ts.Unix()), primitives.Gte),
Expand All @@ -350,7 +351,7 @@ func (c *CommitStore) GetAcceptedCommitReportsGteTimestamp(ctx context.Context,

logs, err := c.lp.FilteredLogs(
ctx,
reportsQuery,
reportsQuery.Expressions,
query.NewLimitAndSort(query.Limit{}, query.NewSortBySequence(query.Asc)),
"GetAcceptedCommitReportsGteTimestamp",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ func (c *CommitStore) GetAcceptedCommitReportsGteTimestamp(ctx context.Context,
}

reportsQuery, err := query.Where(
c.address.String(),
// no key needed as logpoller.NewAddressFilter is used
"",
Farber98 marked this conversation as resolved.
Show resolved Hide resolved
logpoller.NewAddressFilter(c.address),
logpoller.NewEventSigFilter(c.reportAcceptedSig),
query.Timestamp(uint64(ts.Unix()), primitives.Gte),
Expand All @@ -362,7 +363,7 @@ func (c *CommitStore) GetAcceptedCommitReportsGteTimestamp(ctx context.Context,

logs, err := c.lp.FilteredLogs(
ctx,
reportsQuery,
reportsQuery.Expressions,
query.NewLimitAndSort(query.Limit{}, query.NewSortBySequence(query.Asc)),
"GetAcceptedCommitReportsGteTimestamp",
)
Expand Down
4 changes: 2 additions & 2 deletions core/services/relay/evm/event_binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (e *eventBinding) QueryKey(ctx context.Context, filter query.KeyFilter, lim
}
remapped.Expressions = append(defaultExpressions, remapped.Expressions...)

logs, err := e.lp.FilteredLogs(ctx, remapped, limitAndSort, e.contractName+"-"+e.address.String()+"-"+e.eventName)
logs, err := e.lp.FilteredLogs(ctx, remapped.Expressions, limitAndSort, e.contractName+"-"+e.address.String()+"-"+e.eventName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -241,7 +241,7 @@ func (e *eventBinding) getLatestValueWithFilters(
}

// Gets the latest log that matches the filter and limiter.
logs, err := e.lp.FilteredLogs(ctx, filter, limiter, e.contractName+"-"+e.address.String()+"-"+e.eventName)
logs, err := e.lp.FilteredLogs(ctx, filter.Expressions, limiter, e.contractName+"-"+e.address.String()+"-"+e.eventName)
if err != nil {
return wrapInternalErr(err)
}
Expand Down
Loading