Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(output): Add filtering on module #375

Merged
merged 10 commits into from
Sep 16, 2024
38 changes: 13 additions & 25 deletions .github/cannon/seeding.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,6 @@ networks:
AND execution_payload_transactions_total_bytes = 49669
AND execution_payload_transactions_total_bytes_compressed = 27703"
expected: "1"
- name: BEACON_API_ETH_V1_PROPOSER_DUTY
finalizedEpoch: 71002
assert:
query: "SELECT COUNT(*) FROM canonical_beacon_proposer_duty FINAL
WHERE
slot = 2272098
AND slot_start_date_time = '2024-08-09 01:39:36'
AND epoch = 71003
AND epoch_start_date_time = '2024-08-09 01:39:12'
AND proposer_validator_index = 1337851
AND proposer_pubkey = '0xb69cc87f9610eeaff816b2c0182e3320fd04c8be572a3adba2b40d7f11a6d84dadeb24cf2829d10eda0d611b46da0907'"
expected: "1"
- name: BEACON_API_ETH_V2_BEACON_BLOCK_EXECUTION_TRANSACTION
finalizedEpoch: 71010
assert:
Expand Down Expand Up @@ -188,19 +176,19 @@ networks:
AND withdrawal_address = '0x428614Fb30e3007e5d628D09e8BDB0CE9720FAdB'
AND withdrawal_amount = '7498159'"
expected: "1"
- name: BEACON_API_ETH_V1_BEACON_COMMITTEE
finalizedEpoch: 71016
assert:
query: "SELECT COUNT(*) FROM canonical_beacon_committee FINAL
WHERE
slot = 2272557
AND slot_start_date_time = '2024-08-09 03:11:24'
AND committee_index = '21'
AND epoch = 71017
AND epoch_start_date_time = '2024-08-09 03:08:48'
AND has(validators, 1119923)
"
expected: "1"
# - name: BEACON_API_ETH_V1_BEACON_COMMITTEE
# finalizedEpoch: 71016
# assert:
# query: "SELECT COUNT(*) FROM canonical_beacon_committee FINAL
# WHERE
# slot = 2272557
# AND slot_start_date_time = '2024-08-09 03:11:24'
# AND committee_index = '21'
# AND epoch = 71017
# AND epoch_start_date_time = '2024-08-09 03:08:48'
# AND has(validators, 1119923)
# "
# expected: "1"
# - name: BEACON_API_ETH_V1_BEACON_VALIDATORS
# finalizedEpoch: 71005
# assert:
Expand Down
3 changes: 2 additions & 1 deletion pkg/cannon/cannon.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (c *Cannon) Start(ctx context.Context) error {
if c.Config.Tracing.Enabled {
c.log.Info("Tracing enabled")

res, err := observability.NewResource(xatu.WithMode(xatu.ModeCannon), xatu.Short())
res, err := observability.NewResource(xatu.WithModule(xatu.ModuleName_CANNON), xatu.Short())
if err != nil {
return perrors.Wrap(err, "failed to create tracing resource")
}
Expand Down Expand Up @@ -265,6 +265,7 @@ func (c *Cannon) createNewClientMeta(ctx context.Context) (*xatu.ClientMeta, err
Id: c.id.String(),
Implementation: xatu.Implementation,
Os: runtime.GOOS,
ModuleName: xatu.ModuleName_CANNON,
ClockDrift: uint64(c.clockDrift.Milliseconds()),
Ethereum: &xatu.ClientMeta_Ethereum{
Network: networkMeta,
Expand Down
1 change: 1 addition & 0 deletions pkg/clmimicry/mimicry.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ func (m *Mimicry) createNewClientMeta(ctx context.Context) (*xatu.ClientMeta, er
Id: m.id.String(),
Implementation: xatu.Implementation,
Os: runtime.GOOS,
ModuleName: xatu.ModuleName_CL_MIMICRY,
ClockDrift: uint64(m.clockDrift.Milliseconds()),
Ethereum: &xatu.ClientMeta_Ethereum{
Network: &xatu.ClientMeta_Ethereum_Network{
Expand Down
1 change: 1 addition & 0 deletions pkg/mimicry/mimicry.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (m *Mimicry) createNewClientMeta(ctx context.Context) (*xatu.ClientMeta, er
Version: xatu.Short(),
Id: m.id.String(),
Implementation: xatu.Implementation,
ModuleName: xatu.ModuleName_EL_MIMICRY,
Os: runtime.GOOS,
ClockDrift: uint64(m.clockDrift.Milliseconds()),
Labels: m.Config.Labels,
Expand Down
4,449 changes: 2,233 additions & 2,216 deletions pkg/proto/xatu/event_ingester.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions pkg/proto/xatu/event_ingester.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import "pkg/proto/mevrelay/bids.proto";
import "pkg/proto/mevrelay/relay.proto";
import "pkg/proto/mevrelay/payloads.proto";

import "pkg/proto/xatu/module.proto";

import "pkg/proto/libp2p/peer.proto";
import "pkg/proto/libp2p/trace.proto";
import "pkg/proto/libp2p/gossipsub/eth.proto";
Expand Down Expand Up @@ -1174,6 +1176,9 @@ message ClientMeta {
// AdditionalMevRelayPayloadDeliveredData contains additional data about the proposer payload delivered event.
AdditionalMevRelayPayloadDeliveredData mev_relay_payload_delivered = 62 [ json_name = "MEV_RELAY_PROPOSER_PAYLOAD_DELIVERED" ];
}

// ModuleName contains the name of the module that sent the event.
ModuleName module_name = 63 [ json_name = "module_name" ];
}

message ServerMeta {
Expand Down
59 changes: 55 additions & 4 deletions pkg/proto/xatu/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
type EventFilter interface {
// EventNames returns the list of event names to filter on.
EventNames() []string

// Modules returns the list of modules to filter on.
Modules() []string
// ShouldBeDropped returns true if the event should be dropped.
ShouldBeDropped(event *DecoratedEvent) (bool, error)
}

type EventFilterConfig struct {
EventNames []string `yaml:"eventNames"`
Modules []string `yaml:"modules"`
}

func (f *EventFilterConfig) Validate() error {
Expand All @@ -39,21 +41,34 @@ func NewEventFilter(config *EventFilterConfig) (EventFilter, error) {
eventNames[eventName] = struct{}{}
}

modules := make(map[string]struct{}, len(config.Modules))

for _, module := range config.Modules {
modules[module] = struct{}{}
}

return &eventFilter{
config: config,
eventNames: eventNames,
modules: modules,
}, nil
}

type eventFilter struct {
config *EventFilterConfig

eventNames map[string]struct{}
modules map[string]struct{}
}

func (f *eventFilter) EventNames() []string {
return f.config.EventNames
}

func (f *eventFilter) Modules() []string {
return f.config.Modules
}

func (f *eventFilter) ShouldBeDropped(event *DecoratedEvent) (bool, error) {
if event == nil {
return true, errors.New("event is nil")
Expand All @@ -63,14 +78,36 @@ func (f *eventFilter) ShouldBeDropped(event *DecoratedEvent) (bool, error) {
return true, errors.New("event.event is nil")
}

if len(f.eventNames) == 0 {
if len(f.eventNames) == 0 && len(f.modules) == 0 {
return false, nil
}

return f.applyEventNamesFilter(event)
if len(f.eventNames) > 0 {
shouldDrop, err := f.shouldDropFromEventNames(event)
if err != nil {
return true, errors.Wrap(err, "failed to apply event names filter")
}

if shouldDrop {
return true, nil
}
}

if len(f.modules) > 0 {
shouldDrop, err := f.shouldDropFromModules(event)
if err != nil {
return true, errors.Wrap(err, "failed to apply modules filter")
}

if shouldDrop {
return true, nil
}
}

return false, nil
}

func (f *eventFilter) applyEventNamesFilter(event *DecoratedEvent) (bool, error) {
func (f *eventFilter) shouldDropFromEventNames(event *DecoratedEvent) (bool, error) {
if len(f.eventNames) == 0 {
return false, nil
}
Expand All @@ -83,3 +120,17 @@ func (f *eventFilter) applyEventNamesFilter(event *DecoratedEvent) (bool, error)

return !ok, nil
}

func (f *eventFilter) shouldDropFromModules(event *DecoratedEvent) (bool, error) {
if len(f.modules) == 0 {
return false, nil
}

if event.GetMeta().GetClient().GetModuleName() == 0 {
return true, errors.New("event.meta.client.module is empty")
}

_, ok := f.modules[event.GetMeta().GetClient().GetModuleName().String()]

return !ok, nil
}
Loading
Loading