Skip to content

Commit

Permalink
feat(cannon): Connect to Coordinator (#179)
Browse files Browse the repository at this point in the history
* feat(coordinator): Add new Cannon types

* feat: Split out derivers and hook up coordinator rpc

* feat: Split out derivers and hook up coordinator rpc

* refactor: set maximum interval for backoff to 1 minute
  • Loading branch information
samcm authored Sep 6, 2023
1 parent aad6116 commit e68a9b2
Show file tree
Hide file tree
Showing 32 changed files with 3,074 additions and 1,530 deletions.
201 changes: 127 additions & 74 deletions pkg/cannon/cannon.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ import (
_ "net/http/pprof"

"github.com/beevik/ntp"
"github.com/ethpandaops/xatu/pkg/cannon/coordinator"
"github.com/ethpandaops/xatu/pkg/cannon/deriver"
v2 "github.com/ethpandaops/xatu/pkg/cannon/deriver/beacon/eth/v2"
"github.com/ethpandaops/xatu/pkg/cannon/ethereum"
v2 "github.com/ethpandaops/xatu/pkg/cannon/event/beacon/eth/v2"
"github.com/ethpandaops/xatu/pkg/cannon/iterator"
"github.com/ethpandaops/xatu/pkg/output"
"github.com/ethpandaops/xatu/pkg/proto/xatu"
"github.com/go-co-op/gocron"
Expand All @@ -42,7 +45,9 @@ type Cannon struct {

scheduler *gocron.Scheduler

beaconBlockDerivers []v2.BeaconBlockEventDeriver
eventDerivers []deriver.EventDeriver

coordinatorClient *coordinator.Client
}

func New(ctx context.Context, log logrus.FieldLogger, config *Config) (*Cannon, error) {
Expand All @@ -64,25 +69,22 @@ func New(ctx context.Context, log logrus.FieldLogger, config *Config) (*Cannon,
return nil, err
}

beaconBlockDerivers := []v2.BeaconBlockEventDeriver{
v2.NewAttesterSlashingDeriver(log),
v2.NewProposerSlashingDeriver(log),
v2.NewVoluntaryExitDeriver(log),
v2.NewDepositDeriver(log),
v2.NewBLSToExecutionChangeDeriver(log),
v2.NewExecutionTransactionDeriver(log),
coordinatorClient, err := coordinator.New(&config.Coordinator, log)
if err != nil {
return nil, err
}

return &Cannon{
Config: config,
sinks: sinks,
beacon: beacon,
clockDrift: time.Duration(0),
log: log,
id: uuid.New(),
metrics: NewMetrics("xatu_cannon"),
scheduler: gocron.NewScheduler(time.Local),
beaconBlockDerivers: beaconBlockDerivers,
Config: config,
sinks: sinks,
beacon: beacon,
clockDrift: time.Duration(0),
log: log,
id: uuid.New(),
metrics: NewMetrics("xatu_cannon"),
scheduler: gocron.NewScheduler(time.Local),
eventDerivers: nil, // Derivers are created once the beacon node is ready
coordinatorClient: coordinatorClient,
}, nil
}

Expand Down Expand Up @@ -249,20 +251,6 @@ func (c *Cannon) handleNewDecoratedEvent(ctx context.Context, event *xatu.Decora
return err
}

network := event.GetMeta().GetClient().GetEthereum().GetNetwork().GetId()
networkStr := fmt.Sprintf("%d", network)

if networkStr == "" || networkStr == "0" {
networkStr = "unknown"
}

eventType := event.GetEvent().GetName().String()
if eventType == "" {
eventType = "unknown"
}

c.metrics.AddDecoratedEvent(1, eventType, networkStr)

for _, sink := range c.sinks {
if err := sink.HandleNewDecoratedEvent(ctx, event); err != nil {
c.log.
Expand All @@ -278,59 +266,124 @@ func (c *Cannon) handleNewDecoratedEvent(ctx context.Context, event *xatu.Decora

func (c *Cannon) startBeaconBlockProcessor(ctx context.Context) error {
c.beacon.OnReady(ctx, func(ctx context.Context) error {
c.log.Info("Internal beacon node is ready, firing up beacon block processor")
c.log.Info("Internal beacon node is ready, firing up event derivers")

// TODO: Fetch our starting point from xatu-server
start := uint64(5193791)
networkID := fmt.Sprintf("%d", c.beacon.Metadata().Network.ID)
wallclock := c.beacon.Metadata().Wallclock()

for {
select {
case <-ctx.Done():
return nil
default:
time.Sleep(1 * time.Second)

c.log.WithField("slot", start).Info("Processing beacon block")

if err := c.processBeaconBlock(ctx, start); err != nil {
c.log.WithError(err).Error("Failed to process beacon block")

// TODO: Make sure we don't tell xatu-server we've processed this block
// If we can't process it, we should stop here and wait for
// human intervention.
}
clientMeta, err := c.createNewClientMeta(ctx)
if err != nil {
return err
}

start++
}
eventDerivers := []deriver.EventDeriver{
v2.NewAttesterSlashingDeriver(
c.log,
&c.Config.Derivers.AttesterSlashingConfig,
iterator.NewSlotIterator(
c.log,
networkID,
xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_ATTESTER_SLASHING,
c.coordinatorClient,
wallclock,
),
c.beacon,
clientMeta,
),
v2.NewProposerSlashingDeriver(
c.log,
&c.Config.Derivers.ProposerSlashingConfig,
iterator.NewSlotIterator(
c.log,
networkID,
xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_PROPOSER_SLASHING,
c.coordinatorClient,
wallclock,
),
c.beacon,
clientMeta,
),
v2.NewVoluntaryExitDeriver(
c.log,
&c.Config.Derivers.VoluntaryExitConfig,
iterator.NewSlotIterator(
c.log,
networkID,
xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_VOLUNTARY_EXIT,
c.coordinatorClient,
wallclock,
),
c.beacon,
clientMeta,
),
v2.NewDepositDeriver(
c.log,
&c.Config.Derivers.DepositConfig,
iterator.NewSlotIterator(
c.log,
networkID,
xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_DEPOSIT,
c.coordinatorClient,
wallclock,
),
c.beacon,
clientMeta,
),
v2.NewBLSToExecutionChangeDeriver(
c.log,
&c.Config.Derivers.BLSToExecutionConfig,
iterator.NewSlotIterator(
c.log,
networkID,
xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_BLS_TO_EXECUTION_CHANGE,
c.coordinatorClient,
wallclock,
),
c.beacon,
clientMeta,
),
v2.NewExecutionTransactionDeriver(
c.log,
&c.Config.Derivers.ExecutionTransactionConfig,
iterator.NewSlotIterator(
c.log,
networkID,
xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_EXECUTION_TRANSACTION,
c.coordinatorClient,
wallclock,
),
c.beacon,
clientMeta,
),
}
})

return nil
}
c.eventDerivers = eventDerivers

func (c *Cannon) processBeaconBlock(ctx context.Context, slot uint64) error {
block, err := c.beacon.Node().FetchBlock(ctx, fmt.Sprintf("%d", slot))
if err != nil {
return err
}
for _, deriver := range c.eventDerivers {
d := deriver

meta, err := c.createNewClientMeta(ctx)
if err != nil {
return err
}
d.OnEventDerived(ctx, func(ctx context.Context, event *xatu.DecoratedEvent) error {
return c.handleNewDecoratedEvent(ctx, event)
})

event := v2.NewBeaconBlockMetadata(c.log, block, time.Now(), c.beacon, meta, c.beaconBlockDerivers)
d.OnLocationUpdated(ctx, func(ctx context.Context, location uint64) error {
c.metrics.SetDeriverLocation(location, d.CannonType())

events, err := event.Process(ctx)
if err != nil {
return err
}
return nil
})

for _, event := range events {
if err := c.handleNewDecoratedEvent(ctx, event); err != nil {
return err
c.log.
WithField("deriver", deriver.Name()).
WithField("type", deriver.CannonType()).
Info("Starting cannon event deriver")

if err := deriver.Start(ctx); err != nil {
return err
}
}
}

return nil
})

return nil
}
18 changes: 17 additions & 1 deletion pkg/cannon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"errors"
"fmt"

"github.com/ethpandaops/xatu/pkg/cannon/coordinator"
"github.com/ethpandaops/xatu/pkg/cannon/deriver"
"github.com/ethpandaops/xatu/pkg/cannon/ethereum"
"github.com/ethpandaops/xatu/pkg/output"
"github.com/sirupsen/logrus"
Expand All @@ -28,6 +30,12 @@ type Config struct {

// NTP Server to use for clock drift correction
NTPServer string `yaml:"ntpServer" default:"time.google.com"`

// Derivers configures the cannon with event derivers
Derivers deriver.Config `yaml:"derivers"`

// Coordinator configuration
Coordinator coordinator.Config `yaml:"coordinator"`
}

func (c *Config) Validate() error {
Expand All @@ -41,10 +49,18 @@ func (c *Config) Validate() error {

for _, output := range c.Outputs {
if err := output.Validate(); err != nil {
return fmt.Errorf("output %s: %w", output.Name, err)
return fmt.Errorf("invalid output config %s: %w", output.Name, err)
}
}

if err := c.Derivers.Validate(); err != nil {
return fmt.Errorf("invalid derivers config: %w", err)
}

if err := c.Coordinator.Validate(); err != nil {
return fmt.Errorf("invalid coordinator config: %w", err)
}

return nil
}

Expand Down
106 changes: 106 additions & 0 deletions pkg/cannon/coordinator/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package coordinator

import (
"context"
"errors"
"fmt"
"net"

"github.com/ethpandaops/xatu/pkg/proto/xatu"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"
)

type Client struct {
config *Config
log logrus.FieldLogger

conn *grpc.ClientConn
pb xatu.CoordinatorClient
}

func New(config *Config, log logrus.FieldLogger) (*Client, error) {
if config == nil {
return nil, errors.New("config is required")
}

if err := config.Validate(); err != nil {
return nil, err
}

var opts []grpc.DialOption

if config.TLS {
host, _, err := net.SplitHostPort(config.Address)
if err != nil {
return nil, fmt.Errorf("fail to get host from address: %v", err)
}

opts = append(opts, grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, host)))
} else {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

conn, err := grpc.Dial(config.Address, opts...)
if err != nil {
return nil, fmt.Errorf("fail to dial: %v", err)
}

pbClient := xatu.NewCoordinatorClient(conn)

return &Client{
config: config,
log: log,
conn: conn,
pb: pbClient,
}, nil
}

func (c *Client) Start(ctx context.Context) error {
return nil
}

func (c *Client) Stop(ctx context.Context) error {
if err := c.conn.Close(); err != nil {
return err
}

return nil
}

func (c *Client) GetCannonLocation(ctx context.Context, typ xatu.CannonType, networkID string) (*xatu.CannonLocation, error) {
req := xatu.GetCannonLocationRequest{
Type: typ,
NetworkId: networkID,
}

md := metadata.New(c.config.Headers)
ctx = metadata.NewOutgoingContext(ctx, md)

res, err := c.pb.GetCannonLocation(ctx, &req, grpc.UseCompressor(gzip.Name))
if err != nil {
return nil, err
}

return res.Location, nil
}

func (c *Client) UpsertCannonLocationRequest(ctx context.Context, location *xatu.CannonLocation) error {
req := xatu.UpsertCannonLocationRequest{
Location: location,
}

md := metadata.New(c.config.Headers)
ctx = metadata.NewOutgoingContext(ctx, md)

_, err := c.pb.UpsertCannonLocation(ctx, &req, grpc.UseCompressor(gzip.Name))
if err != nil {
return err
}

return nil
}
Loading

0 comments on commit e68a9b2

Please sign in to comment.