From 56c1c34d36666c4c2986e9ce7d1449e041c46e63 Mon Sep 17 00:00:00 2001 From: Andrew Davis <1709934+Savid@users.noreply.github.com> Date: Wed, 20 Sep 2023 11:52:01 +1000 Subject: [PATCH] feat(mimicry): add probe and capture delay config --- docs/mimicry.md | 5 +++ example_mimicry.yaml | 6 +++ pkg/mimicry/config.go | 5 +++ pkg/mimicry/coordinator/config.go | 7 +-- pkg/mimicry/coordinator/static/static.go | 19 ++++---- pkg/mimicry/coordinator/xatu/peer/peer.go | 24 +++++----- pkg/mimicry/coordinator/xatu/xatu.go | 24 +++++----- pkg/mimicry/mimicry.go | 55 ++++++++++++++++++++--- pkg/mimicry/p2p/execution/execution.go | 22 ++++----- 9 files changed, 117 insertions(+), 50 deletions(-) diff --git a/docs/mimicry.md b/docs/mimicry.md index f5a0478a..fb89b1ea 100644 --- a/docs/mimicry.md +++ b/docs/mimicry.md @@ -52,9 +52,11 @@ Mimicry requires a single `yaml` config file. An example file can be found [here | logging | string | `warn` | Log level (`panic`, `fatal`, `warn`, `info`, `debug`, `trace`) | | metricsAddr | string | `:9090` | The address the metrics server will listen on | | pprofAddr | string | | The address the [pprof](https://github.com/google/pprof) server will listen on. When ommited, the pprof server will not be started | +| probeAddr | string | | The address for health probes. When ommited, the probe server will not be started | | name | string | | Unique name of the mimicry | | labels | object | | A key value map of labels to append to every mimicry event | | ntpServer | string | `pool.ntp.org` | NTP server to calculate clock drift for events | +| captureDelay | string | `3m` | Delay before starting to capture transactions | | coordinator.type | string | | Type of output (`xatu`, `static`) | | coordinator.config | object | | Coordinator type configuration [`xatu`](#coordinator-xatu-configuration)/[`static`](#coordinator-static-configuration) | | outputs | array | | List of outputs for the mimicry to send data to | @@ -196,6 +198,7 @@ outputs: logging: "debug" metricsAddr: ":9090" pprofAddr: ":6060" +probeAddr: ":8080" name: example-instance @@ -204,6 +207,8 @@ labels: ntpServer: time.google.com +captureDelay: 3m + coordinator: type: xatu config: diff --git a/example_mimicry.yaml b/example_mimicry.yaml index abe658ce..37924ac6 100644 --- a/example_mimicry.yaml +++ b/example_mimicry.yaml @@ -1,6 +1,7 @@ logging: "debug" # panic,fatal,warn,info,debug,trace metricsAddr: ":9090" # pprofAddr: ":6060" # optional. if supplied it enables pprof server +# probeAddr: ":8080" # optional. if supplied it enables health probe server name: example-instance @@ -14,6 +15,11 @@ labels: # pool.ntp.org - https://www.pool.ntp.org/zone/@ ntpServer: time.google.com +# Delay before capturing transactions from a peer. +# This is the avoid the initial deluge of transactions +# when a peer is first connected to. +captureDelay: 3m + coordinator: type: static config: diff --git a/pkg/mimicry/config.go b/pkg/mimicry/config.go index 4df86fa0..b0eade13 100644 --- a/pkg/mimicry/config.go +++ b/pkg/mimicry/config.go @@ -3,6 +3,7 @@ package mimicry import ( "errors" "fmt" + "time" "github.com/ethpandaops/xatu/pkg/mimicry/coordinator" "github.com/ethpandaops/xatu/pkg/output" @@ -14,6 +15,7 @@ type Config struct { LoggingLevel string `yaml:"logging" default:"info"` MetricsAddr string `yaml:"metricsAddr" default:":9090"` PProfAddr *string `yaml:"pprofAddr"` + ProbeAddr *string `yaml:"probeAddr"` // The name of the mimicry Name string `yaml:"name"` @@ -29,6 +31,9 @@ type Config struct { // NTP Server to use for clock drift correction NTPServer string `yaml:"ntpServer" default:"time.google.com"` + + // CaptureDelay is the Delay before capturing transactions from a peer + CaptureDelay time.Duration `yaml:"captureDelay" default:"3m"` } func (c *Config) Validate() error { diff --git a/pkg/mimicry/coordinator/config.go b/pkg/mimicry/coordinator/config.go index ce5df561..ac81a05c 100644 --- a/pkg/mimicry/coordinator/config.go +++ b/pkg/mimicry/coordinator/config.go @@ -3,6 +3,7 @@ package coordinator import ( "errors" "fmt" + "time" "github.com/creasty/defaults" "github.com/ethpandaops/xatu/pkg/mimicry/coordinator/static" @@ -26,7 +27,7 @@ func (c *Config) Validate() error { return nil } -func NewCoordinator(name string, coordinatorType Type, config *RawMessage, handlers *handler.Peer, log logrus.FieldLogger) (Coordinator, error) { +func NewCoordinator(name string, coordinatorType Type, config *RawMessage, handlers *handler.Peer, captureDelay time.Duration, log logrus.FieldLogger) (Coordinator, error) { if coordinatorType == TypeUnknown { return nil, errors.New("coordinator type is required") } @@ -43,7 +44,7 @@ func NewCoordinator(name string, coordinatorType Type, config *RawMessage, handl return nil, err } - return static.New(name, conf, handlers, log) + return static.New(name, conf, handlers, captureDelay, log) case TypeXatu: conf := &xatuCoordinator.Config{} @@ -55,7 +56,7 @@ func NewCoordinator(name string, coordinatorType Type, config *RawMessage, handl return nil, err } - return xatu.New(name, conf, handlers, log) + return xatu.New(name, conf, handlers, captureDelay, log) default: return nil, fmt.Errorf("coordinator type %s is unknown", coordinatorType) } diff --git a/pkg/mimicry/coordinator/static/static.go b/pkg/mimicry/coordinator/static/static.go index 3ddc2876..a61ec062 100644 --- a/pkg/mimicry/coordinator/static/static.go +++ b/pkg/mimicry/coordinator/static/static.go @@ -21,6 +21,8 @@ type Static struct { handlers *handler.Peer + captureDelay time.Duration + log logrus.FieldLogger cache *cache.SharedCache @@ -30,7 +32,7 @@ type Static struct { metrics *Metrics } -func New(name string, config *Config, handlers *handler.Peer, log logrus.FieldLogger) (*Static, error) { +func New(name string, config *Config, handlers *handler.Peer, captureDelay time.Duration, log logrus.FieldLogger) (*Static, error) { if config == nil { return nil, errors.New("config is required") } @@ -40,12 +42,13 @@ func New(name string, config *Config, handlers *handler.Peer, log logrus.FieldLo } return &Static{ - config: config, - handlers: handlers, - log: log, - cache: cache.NewSharedCache(), - peers: &map[string]bool{}, - metrics: NewMetrics("xatu_mimicry_coordinator_static"), + config: config, + handlers: handlers, + captureDelay: captureDelay, + log: log, + cache: cache.NewSharedCache(), + peers: &map[string]bool{}, + metrics: NewMetrics("xatu_mimicry_coordinator_static"), }, nil } @@ -59,7 +62,7 @@ func (s *Static) Start(ctx context.Context) error { go func(record string, peers *map[string]bool) { _ = retry.Do( func() error { - peer, err := execution.New(ctx, s.log, record, s.handlers, s.cache) + peer, err := execution.New(ctx, s.log, record, s.handlers, s.captureDelay, s.cache) if err != nil { return err } diff --git a/pkg/mimicry/coordinator/xatu/peer/peer.go b/pkg/mimicry/coordinator/xatu/peer/peer.go index 530d82d9..e112c7af 100644 --- a/pkg/mimicry/coordinator/xatu/peer/peer.go +++ b/pkg/mimicry/coordinator/xatu/peer/peer.go @@ -15,10 +15,11 @@ import ( ) type Peer struct { - log logrus.FieldLogger - handlers *handler.Peer - cache *cache.SharedCache - retryDelay time.Duration + log logrus.FieldLogger + handlers *handler.Peer + cache *cache.SharedCache + retryDelay time.Duration + captureDelay time.Duration stopped bool mu sync.Mutex @@ -26,13 +27,14 @@ type Peer struct { Record *xatu.CoordinatedNodeRecord } -func NewPeer(log logrus.FieldLogger, handlers *handler.Peer, sharedCache *cache.SharedCache, record string, retryDelay time.Duration) *Peer { +func NewPeer(log logrus.FieldLogger, handlers *handler.Peer, sharedCache *cache.SharedCache, record string, retryDelay, captureDelay time.Duration) *Peer { return &Peer{ - log: log, - handlers: handlers, - cache: sharedCache, - retryDelay: retryDelay, - stopped: false, + log: log, + handlers: handlers, + cache: sharedCache, + retryDelay: retryDelay, + captureDelay: captureDelay, + stopped: false, Record: &xatu.CoordinatedNodeRecord{ NodeRecord: record, Connected: false, @@ -59,7 +61,7 @@ func (p *Peer) Start(ctx context.Context) error { p.mu.Unlock() - peer, err := execution.New(ctx, p.log, p.Record.NodeRecord, p.handlers, p.cache) + peer, err := execution.New(ctx, p.log, p.Record.NodeRecord, p.handlers, p.captureDelay, p.cache) if err != nil { return err } diff --git a/pkg/mimicry/coordinator/xatu/xatu.go b/pkg/mimicry/coordinator/xatu/xatu.go index 64cee159..9af41f2b 100644 --- a/pkg/mimicry/coordinator/xatu/xatu.go +++ b/pkg/mimicry/coordinator/xatu/xatu.go @@ -18,8 +18,9 @@ import ( const Type = "xatu" type Xatu struct { - handlers *handler.Peer - log logrus.FieldLogger + handlers *handler.Peer + captureDelay time.Duration + log logrus.FieldLogger cache *cache.SharedCache coordinator *xatuCoordinator.Coordinator @@ -30,7 +31,7 @@ type Xatu struct { metrics *Metrics } -func New(name string, config *xatuCoordinator.Config, handlers *handler.Peer, log logrus.FieldLogger) (*Xatu, error) { +func New(name string, config *xatuCoordinator.Config, handlers *handler.Peer, captureDelay time.Duration, log logrus.FieldLogger) (*Xatu, error) { if config == nil { return nil, errors.New("config is required") } @@ -47,13 +48,14 @@ func New(name string, config *xatuCoordinator.Config, handlers *handler.Peer, lo handlers.ExecutionStatus = coordinator.HandleExecutionNodeRecordStatus return &Xatu{ - handlers: handlers, - log: log, - cache: cache.NewSharedCache(), - coordinator: coordinator, - mu: sync.Mutex{}, - peers: make(map[string]*xatuPeer.Peer), - metrics: NewMetrics("xatu_mimicry_coordinator_xatu"), + handlers: handlers, + captureDelay: captureDelay, + log: log, + cache: cache.NewSharedCache(), + coordinator: coordinator, + mu: sync.Mutex{}, + peers: make(map[string]*xatuPeer.Peer), + metrics: NewMetrics("xatu_mimicry_coordinator_xatu"), }, nil } @@ -149,7 +151,7 @@ func (x *Xatu) startCrons(ctx context.Context) error { for _, record := range res.NodeRecords { if _, ok := x.peers[record]; !ok { - x.peers[record] = xatuPeer.NewPeer(x.log, x.handlers, x.cache, record, retryDelay) + x.peers[record] = xatuPeer.NewPeer(x.log, x.handlers, x.cache, record, retryDelay, x.captureDelay) if err := x.peers[record].Start(ctx); err != nil { x.log.WithError(err).Error("failed to start peer") delete(x.peers, record) diff --git a/pkg/mimicry/mimicry.go b/pkg/mimicry/mimicry.go index bf03ee63..a920177a 100644 --- a/pkg/mimicry/mimicry.go +++ b/pkg/mimicry/mimicry.go @@ -39,6 +39,8 @@ type Mimicry struct { id uuid.UUID metrics *Metrics + + startupTime time.Time } func New(ctx context.Context, log logrus.FieldLogger, config *Config) (*Mimicry, error) { @@ -56,18 +58,19 @@ func New(ctx context.Context, log logrus.FieldLogger, config *Config) (*Mimicry, } mimicry := &Mimicry{ - Config: config, - sinks: sinks, - clockDrift: time.Duration(0), - log: log, - id: uuid.New(), - metrics: NewMetrics("xatu_mimicry"), + Config: config, + sinks: sinks, + clockDrift: time.Duration(0), + log: log, + id: uuid.New(), + metrics: NewMetrics("xatu_mimicry"), + startupTime: time.Now(), } mimicry.coordinator, err = coordinator.NewCoordinator(config.Name, config.Coordinator.Type, config.Coordinator.Config, &handler.Peer{ CreateNewClientMeta: mimicry.createNewClientMeta, DecoratedEvent: mimicry.handleNewDecoratedEvent, - }, log) + }, config.CaptureDelay, log) if err != nil { return nil, err } @@ -86,6 +89,12 @@ func (m *Mimicry) Start(ctx context.Context) error { } } + if m.Config.ProbeAddr != nil { + if err := m.ServeProbe(ctx); err != nil { + return err + } + } + m.log. WithField("version", xatu.Full()). WithField("id", m.id.String()). @@ -160,6 +169,38 @@ func (m *Mimicry) ServePProf(ctx context.Context) error { return nil } +func (m *Mimicry) ServeProbe(ctx context.Context) error { + probeServer := &http.Server{ + Addr: *m.Config.ProbeAddr, + ReadHeaderTimeout: 120 * time.Second, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if time.Since(m.startupTime) > m.Config.CaptureDelay { + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte("OK")) + if err != nil { + m.log.Error("Failed to write response: ", err) + } + } else { + w.WriteHeader(http.StatusServiceUnavailable) + _, err := w.Write([]byte("Service is not ready yet")) + if err != nil { + m.log.Error("Failed to write response: ", err) + } + } + }), + } + + go func() { + m.log.Infof("Serving probe at %s", *m.Config.ProbeAddr) + + if err := probeServer.ListenAndServe(); err != nil { + m.log.Fatal(err) + } + }() + + return nil +} + func (m *Mimicry) createNewClientMeta(ctx context.Context) (*xatu.ClientMeta, error) { return &xatu.ClientMeta{ Name: m.Config.Name, diff --git a/pkg/mimicry/p2p/execution/execution.go b/pkg/mimicry/p2p/execution/execution.go index 79bb9677..ea91c145 100644 --- a/pkg/mimicry/p2p/execution/execution.go +++ b/pkg/mimicry/p2p/execution/execution.go @@ -29,8 +29,9 @@ const PeerType = "execution" type Peer struct { log logrus.FieldLogger - nodeRecord string - handlers *handler.Peer + nodeRecord string + handlers *handler.Peer + captureDelay time.Duration client *mimicry.Client @@ -54,18 +55,19 @@ type Peer struct { ignoreBefore *time.Time } -func New(ctx context.Context, log logrus.FieldLogger, nodeRecord string, handlers *handler.Peer, sharedCache *coordCache.SharedCache) (*Peer, error) { +func New(ctx context.Context, log logrus.FieldLogger, nodeRecord string, handlers *handler.Peer, captureDelay time.Duration, sharedCache *coordCache.SharedCache) (*Peer, error) { client, err := mimicry.New(ctx, log, nodeRecord, "xatu") if err != nil { return nil, err } return &Peer{ - log: log.WithField("node_record", nodeRecord), - nodeRecord: nodeRecord, - handlers: handlers, - client: client, - sharedCache: sharedCache, + log: log.WithField("node_record", nodeRecord), + nodeRecord: nodeRecord, + handlers: handlers, + captureDelay: captureDelay, + client: client, + sharedCache: sharedCache, network: &networks.Network{ Name: networks.NetworkNameNone, }, @@ -194,8 +196,8 @@ func (p *Peer) Start(ctx context.Context) (<-chan error, error) { "fork_id_next": fmt.Sprintf("%d", status.ForkID.Next), }).Debug("got client status") - // set the ignore before time to 3 minute in the future - ignoreBefore := time.Now().Add(3 * time.Minute) + // This is the avoid the initial deluge of transactions when a peer is first connected to. + ignoreBefore := time.Now().Add(p.captureDelay) p.ignoreBefore = &ignoreBefore return nil