Skip to content

Commit

Permalink
Merge branch 'master' into release/testbed
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Sep 22, 2023
2 parents a1c44e4 + 2da39d4 commit 7fe62df
Show file tree
Hide file tree
Showing 36 changed files with 1,104 additions and 654 deletions.
5 changes: 5 additions & 0 deletions docs/mimicry.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ Mimicry requires a single `yaml` config file. An example file can be found [here
| logging | string | `warn` | Log level (`panic`, `fatal`, `warn`, `info`, `debug`, `trace`) |
| metricsAddr | string | `:9090` | The address the metrics server will listen on |
| pprofAddr | string | | The address the [pprof](https://github.com/google/pprof) server will listen on. When ommited, the pprof server will not be started |
| probeAddr | string | | The address for health probes. When ommited, the probe server will not be started |
| name | string | | Unique name of the mimicry |
| labels | object | | A key value map of labels to append to every mimicry event |
| ntpServer | string | `pool.ntp.org` | NTP server to calculate clock drift for events |
| captureDelay | string | `3m` | Delay before starting to capture transactions |
| coordinator.type | string | | Type of output (`xatu`, `static`) |
| coordinator.config | object | | Coordinator type configuration [`xatu`](#coordinator-xatu-configuration)/[`static`](#coordinator-static-configuration) |
| outputs | array<object> | | List of outputs for the mimicry to send data to |
Expand Down Expand Up @@ -196,6 +198,7 @@ outputs:
logging: "debug"
metricsAddr: ":9090"
pprofAddr: ":6060"
probeAddr: ":8080"

name: example-instance

Expand All @@ -204,6 +207,8 @@ labels:

ntpServer: time.google.com

captureDelay: 3m

coordinator:
type: xatu
config:
Expand Down
6 changes: 6 additions & 0 deletions example_mimicry.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
logging: "debug" # panic,fatal,warn,info,debug,trace
metricsAddr: ":9090"
# pprofAddr: ":6060" # optional. if supplied it enables pprof server
# probeAddr: ":8080" # optional. if supplied it enables health probe server

name: example-instance

Expand All @@ -14,6 +15,11 @@ labels:
# pool.ntp.org - https://www.pool.ntp.org/zone/@
ntpServer: time.google.com

# Delay before capturing transactions from a peer.
# This is the avoid the initial deluge of transactions
# when a peer is first connected to.
captureDelay: 3m

coordinator:
type: static
config:
Expand Down
7 changes: 2 additions & 5 deletions pkg/cannon/cannon.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ethpandaops/xatu/pkg/proto/xatu"
"github.com/go-co-op/gocron"
"github.com/google/uuid"
perrors "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -249,11 +250,7 @@ func (c *Cannon) syncClockDrift(ctx context.Context) error {
func (c *Cannon) handleNewDecoratedEvents(ctx context.Context, events []*xatu.DecoratedEvent) error {
for _, sink := range c.sinks {
if err := sink.HandleNewDecoratedEvents(ctx, events); err != nil {
c.log.
WithError(err).
WithField("sink", sink.Type()).
WithField("events", len(events)).
Error("Failed to send events to sink")
return perrors.Wrapf(err, "failed to handle new decorated events in sink %s", sink.Name())
}
}

Expand Down
31 changes: 30 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,14 @@ func (a *AttesterSlashingDeriver) run(ctx context.Context) {
}

// Get the next slot
location, err := a.iterator.Next(ctx)
location, lookAhead, err := a.iterator.Next(ctx)
if err != nil {
return err
}

// Look ahead
a.lookAheadAtLocations(ctx, lookAhead)

for _, fn := range a.onLocationCallbacks {
if errr := fn(ctx, location.GetEthV2BeaconBlockAttesterSlashing().GetEpoch()); errr != nil {
a.log.WithError(errr).Error("Failed to send location")
Expand Down Expand Up @@ -145,6 +148,32 @@ func (a *AttesterSlashingDeriver) run(ctx context.Context) {
}
}

// lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required.
func (a *AttesterSlashingDeriver) lookAheadAtLocations(ctx context.Context, locations []*xatu.CannonLocation) {
if locations == nil {
return
}

for _, location := range locations {
// Get the next look ahead epoch
epoch := phase0.Epoch(location.GetEthV2BeaconBlockVoluntaryExit().GetEpoch())

sp, err := a.beacon.Node().Spec()
if err != nil {
a.log.WithError(err).WithField("epoch", epoch).Warn("Failed to look ahead at epoch")

return
}

for i := uint64(0); i <= uint64(sp.SlotsPerEpoch); i++ {
slot := phase0.Slot(i + uint64(epoch)*uint64(sp.SlotsPerEpoch))

// Add the block to the preload queue so it's available when we need it
a.beacon.LazyLoadBeaconBlock(xatuethv1.SlotAsString(slot))
}
}
}

func (a *AttesterSlashingDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) {
sp, err := a.beacon.Node().Spec()
if err != nil {
Expand Down
31 changes: 30 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/bls_to_execution_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,14 @@ func (b *BLSToExecutionChangeDeriver) run(ctx context.Context) {
}

// Get the next slot
location, err := b.iterator.Next(ctx)
location, lookAheads, err := b.iterator.Next(ctx)
if err != nil {
return err
}

// Look ahead
b.lookAheadAtLocation(ctx, lookAheads)

for _, fn := range b.onLocationCallbacks {
if errr := fn(ctx, location.GetEthV2BeaconBlockBlsToExecutionChange().GetEpoch()); errr != nil {
b.log.WithError(errr).Error("Failed to send location")
Expand Down Expand Up @@ -148,6 +151,32 @@ func (b *BLSToExecutionChangeDeriver) run(ctx context.Context) {
}
}

// lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required.
func (b *BLSToExecutionChangeDeriver) lookAheadAtLocation(ctx context.Context, locations []*xatu.CannonLocation) {
if locations == nil {
return
}

for _, location := range locations {
// Get the next look ahead epoch
epoch := phase0.Epoch(location.GetEthV2BeaconBlockVoluntaryExit().GetEpoch())

sp, err := b.beacon.Node().Spec()
if err != nil {
b.log.WithError(err).WithField("epoch", epoch).Warn("Failed to look ahead at epoch")

return
}

for i := uint64(0); i <= uint64(sp.SlotsPerEpoch); i++ {
slot := phase0.Slot(i + uint64(epoch)*uint64(sp.SlotsPerEpoch))

// Add the block to the preload queue so it's available when we need it
b.beacon.LazyLoadBeaconBlock(xatuethv1.SlotAsString(slot))
}
}
}

func (b *BLSToExecutionChangeDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) {
sp, err := b.beacon.Node().Spec()
if err != nil {
Expand Down
31 changes: 30 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/deposit.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,14 @@ func (b *DepositDeriver) run(ctx context.Context) {
}

// Get the next slot
location, err := b.iterator.Next(ctx)
location, lookAhead, err := b.iterator.Next(ctx)
if err != nil {
return err
}

// Look ahead
b.lookAheadAtLocation(ctx, lookAhead)

for _, fn := range b.onLocationCallbacks {
if errr := fn(ctx, location.GetEthV2BeaconBlockDeposit().GetEpoch()); errr != nil {
b.log.WithError(errr).Error("Failed to send location")
Expand Down Expand Up @@ -146,6 +149,32 @@ func (b *DepositDeriver) run(ctx context.Context) {
}
}

// lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required.
func (b *DepositDeriver) lookAheadAtLocation(ctx context.Context, locations []*xatu.CannonLocation) {
if locations == nil {
return
}

for _, location := range locations {
// Get the next look ahead epoch
epoch := phase0.Epoch(location.GetEthV2BeaconBlockVoluntaryExit().GetEpoch())

sp, err := b.beacon.Node().Spec()
if err != nil {
b.log.WithError(err).WithField("epoch", epoch).Warn("Failed to look ahead at epoch")

return
}

for i := uint64(0); i <= uint64(sp.SlotsPerEpoch); i++ {
slot := phase0.Slot(i + uint64(epoch)*uint64(sp.SlotsPerEpoch))

// Add the block to the preload queue so it's available when we need it
b.beacon.LazyLoadBeaconBlock(xatuethv1.SlotAsString(slot))
}
}
}

func (b *DepositDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) {
sp, err := b.beacon.Node().Spec()
if err != nil {
Expand Down
27 changes: 26 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,14 @@ func (b *ExecutionTransactionDeriver) run(ctx context.Context) {
}

// Get the next slot
location, err := b.iterator.Next(ctx)
location, lookAhead, err := b.iterator.Next(ctx)
if err != nil {
return err
}

// Look ahead
b.lookAheadAtLocation(ctx, lookAhead)

for _, fn := range b.onLocationCallbacks {
if errr := fn(ctx, location.GetEthV2BeaconBlockExecutionTransaction().GetEpoch()); errr != nil {
b.log.WithError(errr).Error("Failed to send location")
Expand Down Expand Up @@ -171,6 +174,28 @@ func (b *ExecutionTransactionDeriver) processEpoch(ctx context.Context, epoch ph
return allEvents, nil
}

// lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required.
func (b *ExecutionTransactionDeriver) lookAheadAtLocation(ctx context.Context, locations []*xatu.CannonLocation) {
for _, location := range locations {
// Get the next look ahead epoch
epoch := phase0.Epoch(location.GetEthV2BeaconBlockVoluntaryExit().GetEpoch())

sp, err := b.beacon.Node().Spec()
if err != nil {
b.log.WithError(err).WithField("epoch", epoch).Warn("Failed to look ahead at epoch")

return
}

for i := uint64(0); i <= uint64(sp.SlotsPerEpoch); i++ {
slot := phase0.Slot(i + uint64(epoch)*uint64(sp.SlotsPerEpoch))

// Add the block to the preload queue so it's available when we need it
b.beacon.LazyLoadBeaconBlock(xatuethv1.SlotAsString(slot))
}
}
}

func (b *ExecutionTransactionDeriver) processSlot(ctx context.Context, slot phase0.Slot) ([]*xatu.DecoratedEvent, error) {
// Get the block
block, err := b.beacon.GetBeaconBlock(ctx, xatuethv1.SlotAsString(slot))
Expand Down
31 changes: 30 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/proposer_slashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,14 @@ func (b *ProposerSlashingDeriver) run(ctx context.Context) {
}

// Get the next slot
location, err := b.iterator.Next(ctx)
location, lookAhead, err := b.iterator.Next(ctx)
if err != nil {
return err
}

// Look ahead
b.lookAheadAtLocation(ctx, lookAhead)

for _, fn := range b.onLocationCallbacks {
if errr := fn(ctx, location.GetEthV2BeaconBlockProposerSlashing().GetEpoch()); errr != nil {
b.log.WithError(errr).Error("Failed to send location")
Expand Down Expand Up @@ -240,6 +243,32 @@ func (b *ProposerSlashingDeriver) getProposerSlashings(ctx context.Context, bloc
return slashings, nil
}

// lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required.
func (b *ProposerSlashingDeriver) lookAheadAtLocation(ctx context.Context, locations []*xatu.CannonLocation) {
if locations == nil {
return
}

for _, location := range locations {
// Get the next look ahead epoch
epoch := phase0.Epoch(location.GetEthV2BeaconBlockVoluntaryExit().GetEpoch())

sp, err := b.beacon.Node().Spec()
if err != nil {
b.log.WithError(err).WithField("epoch", epoch).Warn("Failed to look ahead at epoch")

return
}

for i := uint64(0); i <= uint64(sp.SlotsPerEpoch); i++ {
slot := phase0.Slot(i + uint64(epoch)*uint64(sp.SlotsPerEpoch))

// Add the block to the preload queue so it's available when we need it
b.beacon.LazyLoadBeaconBlock(xatuethv1.SlotAsString(slot))
}
}
}

func (b *ProposerSlashingDeriver) createEvent(ctx context.Context, slashing *xatuethv1.ProposerSlashingV2, identifier *xatu.BlockIdentifier) (*xatu.DecoratedEvent, error) {
// Make a clone of the metadata
metadata, ok := proto.Clone(b.clientMeta).(*xatu.ClientMeta)
Expand Down
31 changes: 30 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/voluntary_exit.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,14 @@ func (b *VoluntaryExitDeriver) run(ctx context.Context) {
}

// Get the next slot
location, err := b.iterator.Next(ctx)
location, lookAhead, err := b.iterator.Next(ctx)
if err != nil {
return err
}

// Look ahead
b.lookAheadAtLocation(ctx, lookAhead)

for _, fn := range b.onLocationCallbacks {
if errr := fn(ctx, location.GetEthV2BeaconBlockVoluntaryExit().GetEpoch()); errr != nil {
b.log.WithError(errr).Error("Failed to send location")
Expand Down Expand Up @@ -146,6 +149,32 @@ func (b *VoluntaryExitDeriver) run(ctx context.Context) {
}
}

// lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required.
func (b *VoluntaryExitDeriver) lookAheadAtLocation(ctx context.Context, locations []*xatu.CannonLocation) {
if locations == nil {
return
}

for _, location := range locations {
// Get the next look ahead epoch
epoch := phase0.Epoch(location.GetEthV2BeaconBlockVoluntaryExit().GetEpoch())

sp, err := b.beacon.Node().Spec()
if err != nil {
b.log.WithError(err).WithField("epoch", epoch).Warn("Failed to look ahead at epoch")

return
}

for i := uint64(0); i <= uint64(sp.SlotsPerEpoch); i++ {
slot := phase0.Slot(i + uint64(epoch)*uint64(sp.SlotsPerEpoch))

// Add the block to the preload queue so it's available when we need it
b.beacon.LazyLoadBeaconBlock(xatuethv1.SlotAsString(slot))
}
}
}

func (b *VoluntaryExitDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) {
sp, err := b.beacon.Node().Spec()
if err != nil {
Expand Down
Loading

0 comments on commit 7fe62df

Please sign in to comment.