Skip to content

Commit

Permalink
feat(mimicry): add probe and capture delay config
Browse files Browse the repository at this point in the history
  • Loading branch information
Savid committed Sep 20, 2023
1 parent 616134d commit 56c1c34
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 50 deletions.
5 changes: 5 additions & 0 deletions docs/mimicry.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ Mimicry requires a single `yaml` config file. An example file can be found [here
| logging | string | `warn` | Log level (`panic`, `fatal`, `warn`, `info`, `debug`, `trace`) |
| metricsAddr | string | `:9090` | The address the metrics server will listen on |
| pprofAddr | string | | The address the [pprof](https://github.com/google/pprof) server will listen on. When ommited, the pprof server will not be started |
| probeAddr | string | | The address for health probes. When ommited, the probe server will not be started |
| name | string | | Unique name of the mimicry |
| labels | object | | A key value map of labels to append to every mimicry event |
| ntpServer | string | `pool.ntp.org` | NTP server to calculate clock drift for events |
| captureDelay | string | `3m` | Delay before starting to capture transactions |
| coordinator.type | string | | Type of output (`xatu`, `static`) |
| coordinator.config | object | | Coordinator type configuration [`xatu`](#coordinator-xatu-configuration)/[`static`](#coordinator-static-configuration) |
| outputs | array<object> | | List of outputs for the mimicry to send data to |
Expand Down Expand Up @@ -196,6 +198,7 @@ outputs:
logging: "debug"
metricsAddr: ":9090"
pprofAddr: ":6060"
probeAddr: ":8080"

name: example-instance

Expand All @@ -204,6 +207,8 @@ labels:

ntpServer: time.google.com

captureDelay: 3m

coordinator:
type: xatu
config:
Expand Down
6 changes: 6 additions & 0 deletions example_mimicry.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
logging: "debug" # panic,fatal,warn,info,debug,trace
metricsAddr: ":9090"
# pprofAddr: ":6060" # optional. if supplied it enables pprof server
# probeAddr: ":8080" # optional. if supplied it enables health probe server

name: example-instance

Expand All @@ -14,6 +15,11 @@ labels:
# pool.ntp.org - https://www.pool.ntp.org/zone/@
ntpServer: time.google.com

# Delay before capturing transactions from a peer.
# This is the avoid the initial deluge of transactions
# when a peer is first connected to.
captureDelay: 3m

coordinator:
type: static
config:
Expand Down
5 changes: 5 additions & 0 deletions pkg/mimicry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mimicry
import (
"errors"
"fmt"
"time"

"github.com/ethpandaops/xatu/pkg/mimicry/coordinator"
"github.com/ethpandaops/xatu/pkg/output"
Expand All @@ -14,6 +15,7 @@ type Config struct {
LoggingLevel string `yaml:"logging" default:"info"`
MetricsAddr string `yaml:"metricsAddr" default:":9090"`
PProfAddr *string `yaml:"pprofAddr"`
ProbeAddr *string `yaml:"probeAddr"`

// The name of the mimicry
Name string `yaml:"name"`
Expand All @@ -29,6 +31,9 @@ type Config struct {

// NTP Server to use for clock drift correction
NTPServer string `yaml:"ntpServer" default:"time.google.com"`

// CaptureDelay is the Delay before capturing transactions from a peer
CaptureDelay time.Duration `yaml:"captureDelay" default:"3m"`
}

func (c *Config) Validate() error {
Expand Down
7 changes: 4 additions & 3 deletions pkg/mimicry/coordinator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package coordinator
import (
"errors"
"fmt"
"time"

"github.com/creasty/defaults"
"github.com/ethpandaops/xatu/pkg/mimicry/coordinator/static"
Expand All @@ -26,7 +27,7 @@ func (c *Config) Validate() error {
return nil
}

func NewCoordinator(name string, coordinatorType Type, config *RawMessage, handlers *handler.Peer, log logrus.FieldLogger) (Coordinator, error) {
func NewCoordinator(name string, coordinatorType Type, config *RawMessage, handlers *handler.Peer, captureDelay time.Duration, log logrus.FieldLogger) (Coordinator, error) {
if coordinatorType == TypeUnknown {
return nil, errors.New("coordinator type is required")
}
Expand All @@ -43,7 +44,7 @@ func NewCoordinator(name string, coordinatorType Type, config *RawMessage, handl
return nil, err
}

return static.New(name, conf, handlers, log)
return static.New(name, conf, handlers, captureDelay, log)
case TypeXatu:
conf := &xatuCoordinator.Config{}

Expand All @@ -55,7 +56,7 @@ func NewCoordinator(name string, coordinatorType Type, config *RawMessage, handl
return nil, err
}

return xatu.New(name, conf, handlers, log)
return xatu.New(name, conf, handlers, captureDelay, log)
default:
return nil, fmt.Errorf("coordinator type %s is unknown", coordinatorType)
}
Expand Down
19 changes: 11 additions & 8 deletions pkg/mimicry/coordinator/static/static.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type Static struct {

handlers *handler.Peer

captureDelay time.Duration

log logrus.FieldLogger

cache *cache.SharedCache
Expand All @@ -30,7 +32,7 @@ type Static struct {
metrics *Metrics
}

func New(name string, config *Config, handlers *handler.Peer, log logrus.FieldLogger) (*Static, error) {
func New(name string, config *Config, handlers *handler.Peer, captureDelay time.Duration, log logrus.FieldLogger) (*Static, error) {
if config == nil {
return nil, errors.New("config is required")
}
Expand All @@ -40,12 +42,13 @@ func New(name string, config *Config, handlers *handler.Peer, log logrus.FieldLo
}

return &Static{
config: config,
handlers: handlers,
log: log,
cache: cache.NewSharedCache(),
peers: &map[string]bool{},
metrics: NewMetrics("xatu_mimicry_coordinator_static"),
config: config,
handlers: handlers,
captureDelay: captureDelay,
log: log,
cache: cache.NewSharedCache(),
peers: &map[string]bool{},
metrics: NewMetrics("xatu_mimicry_coordinator_static"),
}, nil
}

Expand All @@ -59,7 +62,7 @@ func (s *Static) Start(ctx context.Context) error {
go func(record string, peers *map[string]bool) {
_ = retry.Do(
func() error {
peer, err := execution.New(ctx, s.log, record, s.handlers, s.cache)
peer, err := execution.New(ctx, s.log, record, s.handlers, s.captureDelay, s.cache)
if err != nil {
return err
}
Expand Down
24 changes: 13 additions & 11 deletions pkg/mimicry/coordinator/xatu/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,26 @@ import (
)

type Peer struct {
log logrus.FieldLogger
handlers *handler.Peer
cache *cache.SharedCache
retryDelay time.Duration
log logrus.FieldLogger
handlers *handler.Peer
cache *cache.SharedCache
retryDelay time.Duration
captureDelay time.Duration

stopped bool
mu sync.Mutex

Record *xatu.CoordinatedNodeRecord
}

func NewPeer(log logrus.FieldLogger, handlers *handler.Peer, sharedCache *cache.SharedCache, record string, retryDelay time.Duration) *Peer {
func NewPeer(log logrus.FieldLogger, handlers *handler.Peer, sharedCache *cache.SharedCache, record string, retryDelay, captureDelay time.Duration) *Peer {
return &Peer{
log: log,
handlers: handlers,
cache: sharedCache,
retryDelay: retryDelay,
stopped: false,
log: log,
handlers: handlers,
cache: sharedCache,
retryDelay: retryDelay,
captureDelay: captureDelay,
stopped: false,
Record: &xatu.CoordinatedNodeRecord{
NodeRecord: record,
Connected: false,
Expand All @@ -59,7 +61,7 @@ func (p *Peer) Start(ctx context.Context) error {

p.mu.Unlock()

peer, err := execution.New(ctx, p.log, p.Record.NodeRecord, p.handlers, p.cache)
peer, err := execution.New(ctx, p.log, p.Record.NodeRecord, p.handlers, p.captureDelay, p.cache)
if err != nil {
return err
}
Expand Down
24 changes: 13 additions & 11 deletions pkg/mimicry/coordinator/xatu/xatu.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import (
const Type = "xatu"

type Xatu struct {
handlers *handler.Peer
log logrus.FieldLogger
handlers *handler.Peer
captureDelay time.Duration
log logrus.FieldLogger

cache *cache.SharedCache
coordinator *xatuCoordinator.Coordinator
Expand All @@ -30,7 +31,7 @@ type Xatu struct {
metrics *Metrics
}

func New(name string, config *xatuCoordinator.Config, handlers *handler.Peer, log logrus.FieldLogger) (*Xatu, error) {
func New(name string, config *xatuCoordinator.Config, handlers *handler.Peer, captureDelay time.Duration, log logrus.FieldLogger) (*Xatu, error) {
if config == nil {
return nil, errors.New("config is required")
}
Expand All @@ -47,13 +48,14 @@ func New(name string, config *xatuCoordinator.Config, handlers *handler.Peer, lo
handlers.ExecutionStatus = coordinator.HandleExecutionNodeRecordStatus

return &Xatu{
handlers: handlers,
log: log,
cache: cache.NewSharedCache(),
coordinator: coordinator,
mu: sync.Mutex{},
peers: make(map[string]*xatuPeer.Peer),
metrics: NewMetrics("xatu_mimicry_coordinator_xatu"),
handlers: handlers,
captureDelay: captureDelay,
log: log,
cache: cache.NewSharedCache(),
coordinator: coordinator,
mu: sync.Mutex{},
peers: make(map[string]*xatuPeer.Peer),
metrics: NewMetrics("xatu_mimicry_coordinator_xatu"),
}, nil
}

Expand Down Expand Up @@ -149,7 +151,7 @@ func (x *Xatu) startCrons(ctx context.Context) error {

for _, record := range res.NodeRecords {
if _, ok := x.peers[record]; !ok {
x.peers[record] = xatuPeer.NewPeer(x.log, x.handlers, x.cache, record, retryDelay)
x.peers[record] = xatuPeer.NewPeer(x.log, x.handlers, x.cache, record, retryDelay, x.captureDelay)
if err := x.peers[record].Start(ctx); err != nil {
x.log.WithError(err).Error("failed to start peer")
delete(x.peers, record)
Expand Down
55 changes: 48 additions & 7 deletions pkg/mimicry/mimicry.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type Mimicry struct {
id uuid.UUID

metrics *Metrics

startupTime time.Time
}

func New(ctx context.Context, log logrus.FieldLogger, config *Config) (*Mimicry, error) {
Expand All @@ -56,18 +58,19 @@ func New(ctx context.Context, log logrus.FieldLogger, config *Config) (*Mimicry,
}

mimicry := &Mimicry{
Config: config,
sinks: sinks,
clockDrift: time.Duration(0),
log: log,
id: uuid.New(),
metrics: NewMetrics("xatu_mimicry"),
Config: config,
sinks: sinks,
clockDrift: time.Duration(0),
log: log,
id: uuid.New(),
metrics: NewMetrics("xatu_mimicry"),
startupTime: time.Now(),
}

mimicry.coordinator, err = coordinator.NewCoordinator(config.Name, config.Coordinator.Type, config.Coordinator.Config, &handler.Peer{
CreateNewClientMeta: mimicry.createNewClientMeta,
DecoratedEvent: mimicry.handleNewDecoratedEvent,
}, log)
}, config.CaptureDelay, log)
if err != nil {
return nil, err
}
Expand All @@ -86,6 +89,12 @@ func (m *Mimicry) Start(ctx context.Context) error {
}
}

if m.Config.ProbeAddr != nil {
if err := m.ServeProbe(ctx); err != nil {
return err
}
}

m.log.
WithField("version", xatu.Full()).
WithField("id", m.id.String()).
Expand Down Expand Up @@ -160,6 +169,38 @@ func (m *Mimicry) ServePProf(ctx context.Context) error {
return nil
}

func (m *Mimicry) ServeProbe(ctx context.Context) error {
probeServer := &http.Server{
Addr: *m.Config.ProbeAddr,
ReadHeaderTimeout: 120 * time.Second,
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if time.Since(m.startupTime) > m.Config.CaptureDelay {
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte("OK"))
if err != nil {
m.log.Error("Failed to write response: ", err)
}
} else {
w.WriteHeader(http.StatusServiceUnavailable)
_, err := w.Write([]byte("Service is not ready yet"))
if err != nil {
m.log.Error("Failed to write response: ", err)
}
}
}),
}

go func() {
m.log.Infof("Serving probe at %s", *m.Config.ProbeAddr)

if err := probeServer.ListenAndServe(); err != nil {
m.log.Fatal(err)
}
}()

return nil
}

func (m *Mimicry) createNewClientMeta(ctx context.Context) (*xatu.ClientMeta, error) {
return &xatu.ClientMeta{
Name: m.Config.Name,
Expand Down
22 changes: 12 additions & 10 deletions pkg/mimicry/p2p/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ const PeerType = "execution"
type Peer struct {
log logrus.FieldLogger

nodeRecord string
handlers *handler.Peer
nodeRecord string
handlers *handler.Peer
captureDelay time.Duration

client *mimicry.Client

Expand All @@ -54,18 +55,19 @@ type Peer struct {
ignoreBefore *time.Time
}

func New(ctx context.Context, log logrus.FieldLogger, nodeRecord string, handlers *handler.Peer, sharedCache *coordCache.SharedCache) (*Peer, error) {
func New(ctx context.Context, log logrus.FieldLogger, nodeRecord string, handlers *handler.Peer, captureDelay time.Duration, sharedCache *coordCache.SharedCache) (*Peer, error) {
client, err := mimicry.New(ctx, log, nodeRecord, "xatu")
if err != nil {
return nil, err
}

return &Peer{
log: log.WithField("node_record", nodeRecord),
nodeRecord: nodeRecord,
handlers: handlers,
client: client,
sharedCache: sharedCache,
log: log.WithField("node_record", nodeRecord),
nodeRecord: nodeRecord,
handlers: handlers,
captureDelay: captureDelay,
client: client,
sharedCache: sharedCache,
network: &networks.Network{
Name: networks.NetworkNameNone,
},
Expand Down Expand Up @@ -194,8 +196,8 @@ func (p *Peer) Start(ctx context.Context) (<-chan error, error) {
"fork_id_next": fmt.Sprintf("%d", status.ForkID.Next),
}).Debug("got client status")

// set the ignore before time to 3 minute in the future
ignoreBefore := time.Now().Add(3 * time.Minute)
// This is the avoid the initial deluge of transactions when a peer is first connected to.
ignoreBefore := time.Now().Add(p.captureDelay)
p.ignoreBefore = &ignoreBefore

return nil
Expand Down

0 comments on commit 56c1c34

Please sign in to comment.