From ed56ee9b4170c6dcd4d06d40d0ac65d0225b1b92 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Thu, 27 Jun 2024 13:26:17 +1000 Subject: [PATCH 01/26] feat(observability): Additional traces --- .../grafana/datasources/datasources.yaml | 9 +- deploy/local/docker-compose/xatu-server.yaml | 12 ++- docker-compose.yml | 35 ++++++++ pkg/cannon/cannon.go | 8 +- pkg/observability/config.go | 5 ++ pkg/observability/tracing.go | 10 ++- pkg/output/http/exporter.go | 7 +- pkg/processor/batch.go | 48 ++++++++-- pkg/processor/batch_test.go | 15 +++- pkg/processor/metrics.go | 90 +++++++++++++++++-- pkg/sentry/config.go | 8 ++ pkg/sentry/sentry.go | 41 +++++++++ pkg/server/config.go | 8 ++ pkg/server/server.go | 45 ++++++++++ .../service/event-ingester/event/event.go | 12 +-- 15 files changed, 321 insertions(+), 32 deletions(-) diff --git a/deploy/local/docker-compose/grafana/datasources/datasources.yaml b/deploy/local/docker-compose/grafana/datasources/datasources.yaml index 91ddd016..65a1a076 100644 --- a/deploy/local/docker-compose/grafana/datasources/datasources.yaml +++ b/deploy/local/docker-compose/grafana/datasources/datasources.yaml @@ -30,4 +30,11 @@ datasources: sslmode: disable tlsSkipVerify: true postgresVersion: 1500 - database: xatu \ No newline at end of file + database: xatu + - name: Tempo + type: tempo + access: proxy + uid: EbPG8fYoz + url: http://tempo:3200 + jsonData: + httpMethod: GET diff --git a/deploy/local/docker-compose/xatu-server.yaml b/deploy/local/docker-compose/xatu-server.yaml index fb08b855..40fed074 100644 --- a/deploy/local/docker-compose/xatu-server.yaml +++ b/deploy/local/docker-compose/xatu-server.yaml @@ -1,4 +1,4 @@ -logging: "info" # panic,fatal,warn,info,debug,trace +logging: "debug" # panic,fatal,warn,info,debug,trace addr: ":8080" metricsAddr: ":9090" # pprofAddr: ":6060" # optional. if supplied it enables pprof server @@ -26,6 +26,13 @@ store: geoip: enabled: false +tracing: + enabled: true + endpoint: tempo:4318 + insecure: true + sampling: + rate: 0.1 + services: coordinator: enabled: true # requires persistence to be enabled @@ -44,6 +51,7 @@ services: maxQueueSize: 102400 batchTimeout: 3s exportTimeout: 30s - maxExportBatchSize: 5000 + maxExportBatchSize: 512 compression: none keepAlive: true + workers: 50 \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 5aedbc4c..b7708728 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -474,6 +474,39 @@ services: condition: service_healthy networks: - xatu-net + tempo-init: + image: &tempoImage grafana/tempo:latest + user: root + entrypoint: + - "chown" + - "10001:10001" + - "/var/tempo" + volumes: + - tempo-data:/var/tempo + + tempo: + image: *tempoImage + container_name: xatu-tempo + command: [ "-config.file=/etc/tempo.yaml" ] + volumes: + - ./deploy/local/docker-compose/tempo.yaml:/etc/tempo.yaml + - tempo-data:/var/tempo + ports: + - "14268:14268" # jaeger ingest + - "3200:3200" # tempo + - "9095:9095" # tempo grpc + - "4317:4317" # otlp grpc + - "4318:4318" # otlp http + - "9411:9411" # zipkin + networks: + - xatu-net + depends_on: + - tempo-init + healthcheck: + test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:3200/ready"] + interval: 5s + timeout: 5s + retries: 5 networks: xatu-net: @@ -506,3 +539,5 @@ volumes: driver: local prometheus-data: driver: local + tempo-data: + driver: local diff --git a/pkg/cannon/cannon.go b/pkg/cannon/cannon.go index 261bb0fd..74bf4e5f 100644 --- a/pkg/cannon/cannon.go +++ b/pkg/cannon/cannon.go @@ -33,6 +33,7 @@ import ( perrors "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/sdk/trace" ) type Cannon struct { @@ -108,9 +109,14 @@ func (c *Cannon) Start(ctx context.Context) error { return perrors.Wrap(err, "failed to create tracing resource") } + opts := []trace.TracerProviderOption{ + trace.WithSampler(trace.ParentBased(trace.TraceIDRatioBased(c.Config.Tracing.Sampling.Rate))), + } + tracer, err := observability.NewHTTPTraceProvider(ctx, res, - c.Config.Tracing.AsOTelOpts()..., + c.Config.Tracing.AsOTelOpts(), + opts..., ) if err != nil { return perrors.Wrap(err, "failed to create tracing provider") diff --git a/pkg/observability/config.go b/pkg/observability/config.go index ccaf2ce8..ef5ea684 100644 --- a/pkg/observability/config.go +++ b/pkg/observability/config.go @@ -7,6 +7,10 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" ) +type SamplingConfig struct { + Rate float64 `yaml:"rate" default:"0.01"` +} + type TracingConfig struct { Enabled bool `yaml:"enabled" default:"false"` Endpoint string `yaml:"endpoint" default:""` @@ -17,6 +21,7 @@ type TracingConfig struct { Insecure bool `yaml:"insecure" default:"false"` Retry *otlptracehttp.RetryConfig `yaml:"retry"` TLS *tls.Config `yaml:"tls"` + Sampling SamplingConfig `yaml:"sampling"` } func (t *TracingConfig) Validate() error { diff --git a/pkg/observability/tracing.go b/pkg/observability/tracing.go index d2193dd8..596858b2 100644 --- a/pkg/observability/tracing.go +++ b/pkg/observability/tracing.go @@ -54,18 +54,20 @@ func NewResource(serviceName, serviceVersion string) (*resource.Resource, error) ) } -func NewHTTPTraceProvider(ctx context.Context, res *resource.Resource, opts ...otlptracehttp.Option) (*trace.TracerProvider, error) { - client := otlptracehttp.NewClient(opts...) +func NewHTTPTraceProvider(ctx context.Context, res *resource.Resource, httpOpts []otlptracehttp.Option, opts ...trace.TracerProviderOption) (*trace.TracerProvider, error) { + client := otlptracehttp.NewClient(httpOpts...) exporter, err := otlptrace.New(ctx, client) if err != nil { return nil, fmt.Errorf("creating OTLP trace exporter: %w", err) } - traceProvider := trace.NewTracerProvider( + options := append([]trace.TracerProviderOption{ trace.WithBatcher(exporter), trace.WithResource(res), - ) + }, opts...) + + traceProvider := trace.NewTracerProvider(options...) return traceProvider, nil } diff --git a/pkg/output/http/exporter.go b/pkg/output/http/exporter.go index 415ca44a..dc0b4b0c 100644 --- a/pkg/output/http/exporter.go +++ b/pkg/output/http/exporter.go @@ -83,7 +83,7 @@ func (e *ItemExporter) sendUpstream(ctx context.Context, items []*xatu.Decorated buf := bytes.NewBufferString(body) if e.config.Compression == CompressionStrategyGzip { - compressed, err := e.gzip(buf) + compressed, err := e.gzip(ctx, buf) if err != nil { return err } @@ -126,7 +126,10 @@ func (e *ItemExporter) sendUpstream(ctx context.Context, items []*xatu.Decorated return nil } -func (e *ItemExporter) gzip(in *bytes.Buffer) (*bytes.Buffer, error) { +func (e *ItemExporter) gzip(ctx context.Context, in *bytes.Buffer) (*bytes.Buffer, error) { + _, span := observability.Tracer().Start(ctx, "HTTPItemExporter.gzip") + defer span.End() + out := &bytes.Buffer{} g := gzip.NewWriter(out) diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index 1e5ad380..14131ce1 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -6,11 +6,11 @@ import ( "fmt" "runtime" "sync" - "sync/atomic" "time" "github.com/ethpandaops/xatu/pkg/observability" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" ) // ItemExporter is an interface for exporting items. @@ -106,22 +106,25 @@ type BatchItemProcessor[T any] struct { queue chan traceableItem[T] batchCh chan []traceableItem[T] - dropped uint32 name string - metrics *Metrics - timer *time.Timer stopWait sync.WaitGroup stopOnce sync.Once stopCh chan struct{} stopWorkersCh chan struct{} + + // Metrics + metrics *Metrics } type traceableItem[T any] struct { item *T errCh chan error completedCh chan struct{} + queuedAt time.Time + //nolint:containedctx // we need to pass the context to the workers + ctx context.Context } // NewBatchItemProcessor creates a new batch item processor. @@ -202,11 +205,12 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log // configured to use the async shipping method, the items will be written to // the queue and this function will return immediately. func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { - _, span := observability.Tracer().Start(ctx, "BatchItemProcessor.Write") - defer span.End() - bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue))) + defer func() { + bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue))) + }() + if bvp.e == nil { return errors.New("exporter is nil") } @@ -226,7 +230,9 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { for _, i := range s[start:end] { item := traceableItem[T]{ - item: i, + item: i, + queuedAt: time.Now(), + ctx: ctx, } if bvp.o.ShippingMethod == ShippingMethodSync { @@ -264,6 +270,19 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { // exportWithTimeout exports items with a timeout. func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBatch []traceableItem[T]) error { + contexts := make([]context.Context, len(itemsBatch)) + for i, item := range itemsBatch { + contexts[i] = item.ctx + } + + ctx = observability.MergeContexts(ctx, contexts...) + + _, span := observability.Tracer().Start(ctx, "BatchItemProcessor.exportWithTimeout") + defer span.End() + + span.SetAttributes(attribute.String("processor", bvp.name)) + span.SetAttributes(attribute.Int("batch_size", len(itemsBatch))) + if bvp.o.ExportTimeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, bvp.o.ExportTimeout) @@ -276,14 +295,26 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa items[i] = item.item } + startTime := time.Now() + err := bvp.e.ExportItems(ctx, items) + + duration := time.Since(startTime) + + bvp.metrics.ObserveExportDuration(bvp.name, duration.Seconds()) + if err != nil { bvp.metrics.IncItemsFailedBy(bvp.name, float64(len(itemsBatch))) } else { bvp.metrics.IncItemsExportedBy(bvp.name, float64(len(itemsBatch))) + bvp.metrics.ObserveBatchSize(bvp.name, float64(len(itemsBatch))) + bvp.metrics.ObserveProcessingDuration(bvp.name, duration.Seconds()) } for _, item := range itemsBatch { + waitTime := startTime.Sub(item.queuedAt) + bvp.metrics.ObserveQueueWaitTime(bvp.name, waitTime.Seconds()) + if item.errCh != nil { item.errCh <- err close(item.errCh) @@ -479,7 +510,6 @@ func (bvp *BatchItemProcessor[T]) enqueueOrDrop(ctx context.Context, item tracea case bvp.queue <- item: return nil default: - atomic.AddUint32(&bvp.dropped, 1) bvp.metrics.IncItemsDroppedBy(bvp.name, float64(1)) } diff --git a/pkg/processor/batch_test.go b/pkg/processor/batch_test.go index 82e49faa..f1f264d9 100644 --- a/pkg/processor/batch_test.go +++ b/pkg/processor/batch_test.go @@ -16,6 +16,8 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -673,7 +675,7 @@ func TestBatchItemProcessorQueueSize(t *testing.T) { // Write items to the processor for i := 0; i < itemsToExport; i++ { - err := bsp.Write(context.Background(), []*TestItem{items[i]}) + err = bsp.Write(context.Background(), []*TestItem{items[i]}) if i < maxQueueSize { require.NoError(t, err, "Expected no error for item %d", i) } else { @@ -683,6 +685,15 @@ func TestBatchItemProcessorQueueSize(t *testing.T) { // Ensure that the queue size is respected require.Equal(t, maxQueueSize, len(bsp.queue), "Queue size should be equal to maxQueueSize") + // Ensure that the dropped count is correct - require.Equal(t, uint32(itemsToExport-maxQueueSize), bsp.dropped, "Dropped count should be equal to the number of items that exceeded the queue size") + counter, err := bsp.metrics.itemsDropped.GetMetricWith(prometheus.Labels{"processor": "processor"}) + require.NoError(t, err) + + metric := &dto.Metric{} + + err = counter.Write(metric) + require.NoError(t, err) + + require.Equal(t, float64(itemsToExport-maxQueueSize), *metric.Counter.Value, "Dropped count should be equal to the number of items that exceeded the queue size") } diff --git a/pkg/processor/metrics.go b/pkg/processor/metrics.go index 82ff8947..5ffc6582 100644 --- a/pkg/processor/metrics.go +++ b/pkg/processor/metrics.go @@ -1,16 +1,25 @@ package processor -import "github.com/prometheus/client_golang/prometheus" +import ( + "github.com/prometheus/client_golang/prometheus" +) var ( DefaultMetrics = NewMetrics("xatu") ) type Metrics struct { - itemsQueued *prometheus.GaugeVec - itemsDropped *prometheus.CounterVec - itemsFailed *prometheus.CounterVec - itemsExported *prometheus.CounterVec + itemsQueued *prometheus.GaugeVec + itemsDropped *prometheus.CounterVec + itemsFailed *prometheus.CounterVec + itemsExported *prometheus.CounterVec + exportDuration *prometheus.HistogramVec + itemsProcessed *prometheus.CounterVec + batchesProcessed *prometheus.CounterVec + processingDuration *prometheus.HistogramVec + queueWaitTime *prometheus.HistogramVec + batchSize *prometheus.HistogramVec + exportRetries *prometheus.CounterVec } func NewMetrics(namespace string) *Metrics { @@ -41,12 +50,59 @@ func NewMetrics(namespace string) *Metrics { Namespace: namespace, Help: "Number of items exported", }, []string{"processor"}), + exportDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "export_duration_seconds", + Namespace: namespace, + Help: "Duration of export operations in seconds", + Buckets: prometheus.LinearBuckets(1, 3, 10), + }, []string{"processor"}), + + itemsProcessed: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "items_processed_total", + Namespace: namespace, + Help: "Number of items processed", + }, []string{"processor"}), + batchesProcessed: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "batches_processed_total", + Namespace: namespace, + Help: "Number of batches processed", + }, []string{"processor"}), + processingDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "processing_duration_seconds", + Namespace: namespace, + Help: "Duration of processing operations in seconds", + Buckets: prometheus.LinearBuckets(0.1, 0.5, 10), + }, []string{"processor"}), + queueWaitTime: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "queue_wait_time_seconds", + Namespace: namespace, + Help: "Time items spend waiting in the queue in seconds", + Buckets: prometheus.LinearBuckets(0.1, 0.5, 10), + }, []string{"processor"}), + batchSize: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "batch_size", + Namespace: namespace, + Help: "Size of processed batches", + Buckets: prometheus.LinearBuckets(10, 50, 10), + }, []string{"processor"}), + exportRetries: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "export_retries_total", + Namespace: namespace, + Help: "Number of export retries", + }, []string{"processor"}), } prometheus.MustRegister(m.itemsQueued) prometheus.MustRegister(m.itemsDropped) prometheus.MustRegister(m.itemsFailed) prometheus.MustRegister(m.itemsExported) + prometheus.MustRegister(m.exportDuration) + prometheus.MustRegister(m.itemsProcessed) + prometheus.MustRegister(m.batchesProcessed) + prometheus.MustRegister(m.processingDuration) + prometheus.MustRegister(m.queueWaitTime) + prometheus.MustRegister(m.batchSize) + prometheus.MustRegister(m.exportRetries) return m } @@ -66,3 +122,27 @@ func (m *Metrics) IncItemsExportedBy(name string, count float64) { func (m *Metrics) IncItemsFailedBy(name string, count float64) { m.itemsFailed.WithLabelValues(name).Add(count) } + +func (m *Metrics) ObserveExportDuration(name string, duration float64) { + m.exportDuration.WithLabelValues(name).Observe(duration) +} + +func (m *Metrics) IncItemsProcessedBy(name string, count float64) { + m.itemsProcessed.WithLabelValues(name).Add(count) +} + +func (m *Metrics) IncBatchesProcessedBy(name string, count float64) { + m.batchesProcessed.WithLabelValues(name).Add(count) +} + +func (m *Metrics) ObserveProcessingDuration(name string, duration float64) { + m.processingDuration.WithLabelValues(name).Observe(duration) +} + +func (m *Metrics) ObserveQueueWaitTime(name string, duration float64) { + m.queueWaitTime.WithLabelValues(name).Observe(duration) +} + +func (m *Metrics) ObserveBatchSize(name string, size float64) { + m.batchSize.WithLabelValues(name).Observe(size) +} diff --git a/pkg/sentry/config.go b/pkg/sentry/config.go index 3ff516d9..cce19bd3 100644 --- a/pkg/sentry/config.go +++ b/pkg/sentry/config.go @@ -6,6 +6,7 @@ import ( "time" "github.com/ethpandaops/beacon/pkg/human" + "github.com/ethpandaops/xatu/pkg/observability" "github.com/ethpandaops/xatu/pkg/output" "github.com/ethpandaops/xatu/pkg/processor" "github.com/ethpandaops/xatu/pkg/sentry/ethereum" @@ -43,6 +44,9 @@ type Config struct { // ProposerDuty configuration ProposerDuty *ProposerDutyConfig `yaml:"proposerDuty" default:"{'enabled': true}"` + + // Tracing configuration + Tracing *observability.TracingConfig `yaml:"tracing"` } func (c *Config) Validate() error { @@ -64,6 +68,10 @@ func (c *Config) Validate() error { return fmt.Errorf("invalid forkChoice config: %w", err) } + if err := c.Tracing.Validate(); err != nil { + return fmt.Errorf("invalid tracing config: %w", err) + } + return nil } diff --git a/pkg/sentry/sentry.go b/pkg/sentry/sentry.go index 57b30447..e1b5c87c 100644 --- a/pkg/sentry/sentry.go +++ b/pkg/sentry/sentry.go @@ -20,6 +20,7 @@ import ( "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/beevik/ntp" "github.com/ethpandaops/xatu/pkg/networks" + "github.com/ethpandaops/xatu/pkg/observability" "github.com/ethpandaops/xatu/pkg/output" xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1" "github.com/ethpandaops/xatu/pkg/proto/xatu" @@ -29,8 +30,10 @@ import ( v2 "github.com/ethpandaops/xatu/pkg/sentry/event/beacon/eth/v2" "github.com/go-co-op/gocron" "github.com/google/uuid" + perrors "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/sdk/trace" ) type Sentry struct { @@ -54,6 +57,8 @@ type Sentry struct { latestForkChoice *v1.ForkChoice latestForkChoiceMu sync.RWMutex + + shutdownFuncs []func(context.Context) error } func New(ctx context.Context, log logrus.FieldLogger, config *Config) (*Sentry, error) { @@ -90,6 +95,7 @@ func New(ctx context.Context, log logrus.FieldLogger, config *Config) (*Sentry, scheduler: gocron.NewScheduler(time.Local), latestForkChoice: nil, latestForkChoiceMu: sync.RWMutex{}, + shutdownFuncs: []func(context.Context) error{}, }, nil } @@ -110,6 +116,35 @@ func (s *Sentry) Start(ctx context.Context) error { WithField("id", s.id.String()). Info("Starting Xatu in sentry mode") + // Start tracing if enabled + if s.Config.Tracing.Enabled { + s.log.Info("Tracing enabled") + + res, err := observability.NewResource(xatu.WithMode(xatu.ModeSentry), xatu.Short()) + if err != nil { + return perrors.Wrap(err, "failed to create tracing resource") + } + opts := []trace.TracerProviderOption{ + trace.WithSampler(trace.ParentBased(trace.TraceIDRatioBased(s.Config.Tracing.Sampling.Rate))), + } + + tracer, err := observability.NewHTTPTraceProvider(ctx, + res, + s.Config.Tracing.AsOTelOpts(), + opts..., + ) + if err != nil { + return perrors.Wrap(err, "failed to create tracing provider") + } + + shutdown, err := observability.SetupOTelSDK(ctx, tracer) + if err != nil { + return perrors.Wrap(err, "failed to setup tracing SDK") + } + + s.shutdownFuncs = append(s.shutdownFuncs, shutdown) + } + if err := s.startBeaconCommitteesWatcher(ctx); err != nil { return err } @@ -439,6 +474,12 @@ func (s *Sentry) Start(ctx context.Context) error { } } + for _, f := range s.shutdownFuncs { + if err := f(ctx); err != nil { + return err + } + } + return nil } diff --git a/pkg/server/config.go b/pkg/server/config.go index 224528b3..e8d5c4e4 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -1,6 +1,7 @@ package server import ( + "github.com/ethpandaops/xatu/pkg/observability" "github.com/ethpandaops/xatu/pkg/server/geoip" "github.com/ethpandaops/xatu/pkg/server/persistence" "github.com/ethpandaops/xatu/pkg/server/service" @@ -36,6 +37,9 @@ type Config struct { // Services is the list of services to run. Services service.Config `yaml:"services"` + + // Tracing configuration + Tracing observability.TracingConfig `yaml:"tracing"` } func (c *Config) Validate() error { @@ -55,5 +59,9 @@ func (c *Config) Validate() error { return err } + if err := c.Tracing.Validate(); err != nil { + return err + } + return nil } diff --git a/pkg/server/server.go b/pkg/server/server.go index 94ccec5b..9fd2df60 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -13,14 +13,18 @@ import ( _ "net/http/pprof" "github.com/beevik/ntp" + "github.com/ethpandaops/xatu/pkg/observability" + "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/ethpandaops/xatu/pkg/server/geoip" "github.com/ethpandaops/xatu/pkg/server/persistence" "github.com/ethpandaops/xatu/pkg/server/service" "github.com/ethpandaops/xatu/pkg/server/store" "github.com/go-co-op/gocron" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/sdk/trace" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -43,6 +47,10 @@ type Xatu struct { geoipProvider geoip.Provider clockDrift *time.Duration + + tracer *trace.TracerProvider + + shutdownFuncs []func(ctx context.Context) error } func NewXatu(ctx context.Context, log logrus.FieldLogger, conf *Config) (*Xatu, error) { @@ -89,6 +97,7 @@ func NewXatu(ctx context.Context, log logrus.FieldLogger, conf *Config) (*Xatu, geoipProvider: g, services: services, clockDrift: &clockDrift, + shutdownFuncs: []func(ctx context.Context) error{}, }, nil } @@ -96,6 +105,36 @@ func (x *Xatu) Start(ctx context.Context) error { nctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) defer stop() + // Start tracing if enabled + if x.config.Tracing.Enabled { + x.log.Info("Tracing enabled") + + res, err := observability.NewResource(xatu.WithMode(xatu.ModeServer), xatu.Short()) + if err != nil { + return errors.Wrap(err, "failed to create tracing resource") + } + + opts := []trace.TracerProviderOption{ + trace.WithSampler(trace.ParentBased(trace.TraceIDRatioBased(x.config.Tracing.Sampling.Rate))), + } + + tracer, err := observability.NewHTTPTraceProvider(ctx, + res, + x.config.Tracing.AsOTelOpts(), + opts..., + ) + if err != nil { + return errors.Wrap(err, "failed to create tracing provider") + } + + shutdown, err := observability.SetupOTelSDK(ctx, tracer) + if err != nil { + return errors.Wrap(err, "failed to setup tracing SDK") + } + + x.shutdownFuncs = append(x.shutdownFuncs, shutdown) + } + if err := x.startCrons(ctx); err != nil { x.log.WithError(err).Fatal("Failed to start crons") } @@ -199,6 +238,12 @@ func (x *Xatu) stop(ctx context.Context) error { } } + for _, f := range x.shutdownFuncs { + if err := f(ctx); err != nil { + return err + } + } + if x.pprofServer != nil { if err := x.pprofServer.Shutdown(ctx); err != nil { return err diff --git a/pkg/server/service/event-ingester/event/event.go b/pkg/server/service/event-ingester/event/event.go index 2800ceb1..6faa3e71 100644 --- a/pkg/server/service/event-ingester/event/event.go +++ b/pkg/server/service/event-ingester/event/event.go @@ -87,14 +87,16 @@ func New(eventType Type, log logrus.FieldLogger, event *xatu.DecoratedEvent, cac } switch eventType { - case TypeBeaconP2PAttestation: - return v1.NewBeaconP2PAttestation(log, event, geoipProvider), nil + case TypeBeaconETHV1EventsAttestationV2: + return v1.NewEventsAttestationV2(log, event), nil case TypeLibP2PTraceGossipSubBeaconAttestation: return libp2p.NewTraceGossipSubBeaconAttestation(log, event), nil + case TypeBeaconETHV1BeaconValidators: + return v1.NewBeaconValidators(log, event), nil + case TypeBeaconP2PAttestation: + return v1.NewBeaconP2PAttestation(log, event, geoipProvider), nil case TypeBeaconETHV1EventsAttestation: return v1.NewEventsAttestation(log, event), nil - case TypeBeaconETHV1EventsAttestationV2: - return v1.NewEventsAttestationV2(log, event), nil case TypeBeaconETHV1EventsBlock: return v1.NewEventsBlock(log, event), nil case TypeBeaconETHV1EventsBlockV2: @@ -185,8 +187,6 @@ func New(eventType Type, log logrus.FieldLogger, event *xatu.DecoratedEvent, cac return libp2p.NewTraceGossipSubBeaconBlock(log, event), nil case TypeLibP2PTraceGossipSubBlobSidecar: return libp2p.NewTraceGossipSubBlobSidecar(log, event), nil - case TypeBeaconETHV1BeaconValidators: - return v1.NewBeaconValidators(log, event), nil default: return nil, fmt.Errorf("event type %s is unknown", eventType) } From f30f96f8be72080ce0c61fca4fbbbbfccc5ad033 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Thu, 27 Jun 2024 13:31:10 +1000 Subject: [PATCH 02/26] feat: Add observability context merging function --- deploy/local/docker-compose/tempo.yaml | 59 ++++++++++++++++++++++++++ pkg/observability/context.go | 34 +++++++++++++++ 2 files changed, 93 insertions(+) create mode 100644 deploy/local/docker-compose/tempo.yaml create mode 100644 pkg/observability/context.go diff --git a/deploy/local/docker-compose/tempo.yaml b/deploy/local/docker-compose/tempo.yaml new file mode 100644 index 00000000..30718cdc --- /dev/null +++ b/deploy/local/docker-compose/tempo.yaml @@ -0,0 +1,59 @@ +stream_over_http_enabled: true +server: + http_listen_port: 3200 + log_level: info + +query_frontend: + search: + duration_slo: 5s + throughput_bytes_slo: 1.073741824e+09 + trace_by_id: + duration_slo: 5s + +distributor: + receivers: # this configuration will listen on all ports and protocols that tempo is capable of. + jaeger: # the receives all come from the OpenTelemetry collector. more configuration information can + protocols: # be found there: https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver + thrift_http: # + grpc: # for a production deployment you should only enable the receivers you need! + thrift_binary: + thrift_compact: + zipkin: + otlp: + protocols: + http: + grpc: + opencensus: + +ingester: + max_block_duration: 5m # cut the headblock when this much time passes. this is being set for demo purposes and should probably be left alone normally + +compactor: + compaction: + block_retention: 15m # overall Tempo trace retention. set for demo purposes + +metrics_generator: + registry: + external_labels: + source: tempo + cluster: docker-compose + storage: + path: /var/tempo/generator/wal + remote_write: + - url: http://prometheus:9090/api/v1/write + send_exemplars: true + traces_storage: + path: /var/tempo/generator/traces + +storage: + trace: + backend: local # backend configuration to use + wal: + path: /var/tempo/wal # where to store the wal locally + local: + path: /var/tempo/blocks + +overrides: + defaults: + metrics_generator: + processors: [service-graphs, span-metrics, local-blocks] # enables metrics generator \ No newline at end of file diff --git a/pkg/observability/context.go b/pkg/observability/context.go new file mode 100644 index 00000000..0b858c1e --- /dev/null +++ b/pkg/observability/context.go @@ -0,0 +1,34 @@ +package observability + +import ( + "context" + + "go.opentelemetry.io/otel/trace" +) + +func MergeContexts(parent context.Context, contexts ...context.Context) context.Context { + spanContexts := make([]trace.SpanContext, 0, len(contexts)) + + for _, ctx := range contexts { + if spanContext := trace.SpanContextFromContext(ctx); spanContext.IsValid() { + spanContexts = append(spanContexts, spanContext) + } + } + + if len(spanContexts) == 0 { + return parent + } + + // Create a new span that links to all the original spans + tracer := Tracer() + + links := make([]trace.Link, len(spanContexts)) + for i, spanContext := range spanContexts { + links[i] = trace.Link{SpanContext: spanContext} + } + + ctx, span := tracer.Start(parent, "Observability.MergedSpan", trace.WithLinks(links...)) + defer span.End() + + return ctx +} From b5eaf32dc0edbf1a1a4fbf934d4dcf3f8395c9a0 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Thu, 27 Jun 2024 14:35:45 +1000 Subject: [PATCH 03/26] feat: Add metrics and attributes to batch item processor --- pkg/processor/batch.go | 40 ++++++++++++++++----- pkg/processor/batch_test.go | 15 ++++++-- pkg/processor/metrics.go | 72 ++++++++++++++++++++++++++++++++++--- 3 files changed, 111 insertions(+), 16 deletions(-) diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index 1e5ad380..c6a808d4 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -6,11 +6,11 @@ import ( "fmt" "runtime" "sync" - "sync/atomic" "time" "github.com/ethpandaops/xatu/pkg/observability" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" ) // ItemExporter is an interface for exporting items. @@ -106,22 +106,25 @@ type BatchItemProcessor[T any] struct { queue chan traceableItem[T] batchCh chan []traceableItem[T] - dropped uint32 name string - metrics *Metrics - timer *time.Timer stopWait sync.WaitGroup stopOnce sync.Once stopCh chan struct{} stopWorkersCh chan struct{} + + // Metrics + metrics *Metrics } type traceableItem[T any] struct { item *T errCh chan error completedCh chan struct{} + queuedAt time.Time + //nolint:containedctx // we need to pass the context to the workers + ctx context.Context } // NewBatchItemProcessor creates a new batch item processor. @@ -202,11 +205,12 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log // configured to use the async shipping method, the items will be written to // the queue and this function will return immediately. func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { - _, span := observability.Tracer().Start(ctx, "BatchItemProcessor.Write") - defer span.End() - bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue))) + defer func() { + bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue))) + }() + if bvp.e == nil { return errors.New("exporter is nil") } @@ -226,7 +230,9 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { for _, i := range s[start:end] { item := traceableItem[T]{ - item: i, + item: i, + queuedAt: time.Now(), + ctx: ctx, } if bvp.o.ShippingMethod == ShippingMethodSync { @@ -264,6 +270,12 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { // exportWithTimeout exports items with a timeout. func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBatch []traceableItem[T]) error { + _, span := observability.Tracer().Start(ctx, "BatchItemProcessor.exportWithTimeout") + defer span.End() + + span.SetAttributes(attribute.String("processor", bvp.name)) + span.SetAttributes(attribute.Int("batch_size", len(itemsBatch))) + if bvp.o.ExportTimeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, bvp.o.ExportTimeout) @@ -276,14 +288,25 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa items[i] = item.item } + startTime := time.Now() + err := bvp.e.ExportItems(ctx, items) + + duration := time.Since(startTime) + + bvp.metrics.ObserveExportDuration(bvp.name, duration.Seconds()) + if err != nil { bvp.metrics.IncItemsFailedBy(bvp.name, float64(len(itemsBatch))) } else { bvp.metrics.IncItemsExportedBy(bvp.name, float64(len(itemsBatch))) + bvp.metrics.ObserveBatchSize(bvp.name, float64(len(itemsBatch))) } for _, item := range itemsBatch { + waitTime := startTime.Sub(item.queuedAt) + bvp.metrics.ObserveQueueWaitTime(bvp.name, waitTime.Seconds()) + if item.errCh != nil { item.errCh <- err close(item.errCh) @@ -479,7 +502,6 @@ func (bvp *BatchItemProcessor[T]) enqueueOrDrop(ctx context.Context, item tracea case bvp.queue <- item: return nil default: - atomic.AddUint32(&bvp.dropped, 1) bvp.metrics.IncItemsDroppedBy(bvp.name, float64(1)) } diff --git a/pkg/processor/batch_test.go b/pkg/processor/batch_test.go index 82e49faa..f1f264d9 100644 --- a/pkg/processor/batch_test.go +++ b/pkg/processor/batch_test.go @@ -16,6 +16,8 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -673,7 +675,7 @@ func TestBatchItemProcessorQueueSize(t *testing.T) { // Write items to the processor for i := 0; i < itemsToExport; i++ { - err := bsp.Write(context.Background(), []*TestItem{items[i]}) + err = bsp.Write(context.Background(), []*TestItem{items[i]}) if i < maxQueueSize { require.NoError(t, err, "Expected no error for item %d", i) } else { @@ -683,6 +685,15 @@ func TestBatchItemProcessorQueueSize(t *testing.T) { // Ensure that the queue size is respected require.Equal(t, maxQueueSize, len(bsp.queue), "Queue size should be equal to maxQueueSize") + // Ensure that the dropped count is correct - require.Equal(t, uint32(itemsToExport-maxQueueSize), bsp.dropped, "Dropped count should be equal to the number of items that exceeded the queue size") + counter, err := bsp.metrics.itemsDropped.GetMetricWith(prometheus.Labels{"processor": "processor"}) + require.NoError(t, err) + + metric := &dto.Metric{} + + err = counter.Write(metric) + require.NoError(t, err) + + require.Equal(t, float64(itemsToExport-maxQueueSize), *metric.Counter.Value, "Dropped count should be equal to the number of items that exceeded the queue size") } diff --git a/pkg/processor/metrics.go b/pkg/processor/metrics.go index 82ff8947..ee42c902 100644 --- a/pkg/processor/metrics.go +++ b/pkg/processor/metrics.go @@ -1,16 +1,24 @@ package processor -import "github.com/prometheus/client_golang/prometheus" +import ( + "github.com/prometheus/client_golang/prometheus" +) var ( DefaultMetrics = NewMetrics("xatu") ) type Metrics struct { - itemsQueued *prometheus.GaugeVec - itemsDropped *prometheus.CounterVec - itemsFailed *prometheus.CounterVec - itemsExported *prometheus.CounterVec + itemsQueued *prometheus.GaugeVec + itemsDropped *prometheus.CounterVec + itemsFailed *prometheus.CounterVec + itemsExported *prometheus.CounterVec + exportDuration *prometheus.HistogramVec + itemsProcessed *prometheus.CounterVec + batchesProcessed *prometheus.CounterVec + processingDuration *prometheus.HistogramVec + queueWaitTime *prometheus.HistogramVec + batchSize *prometheus.HistogramVec } func NewMetrics(namespace string) *Metrics { @@ -41,12 +49,46 @@ func NewMetrics(namespace string) *Metrics { Namespace: namespace, Help: "Number of items exported", }, []string{"processor"}), + exportDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "export_duration_seconds", + Namespace: namespace, + Help: "Duration of export operations in seconds", + Buckets: prometheus.ExponentialBuckets(0.1, 2, 10), + }, []string{"processor"}), + + itemsProcessed: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "items_processed_total", + Namespace: namespace, + Help: "Number of items processed", + }, []string{"processor"}), + batchesProcessed: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "batches_processed_total", + Namespace: namespace, + Help: "Number of batches processed", + }, []string{"processor"}), + queueWaitTime: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "queue_wait_time_seconds", + Namespace: namespace, + Help: "Time items spend waiting in the queue in seconds", + Buckets: prometheus.ExponentialBuckets(0.1, 2, 10), + }, []string{"processor"}), + batchSize: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "batch_size", + Namespace: namespace, + Help: "Size of processed batches", + Buckets: prometheus.ExponentialBuckets(0.1, 2, 10), + }, []string{"processor"}), } prometheus.MustRegister(m.itemsQueued) prometheus.MustRegister(m.itemsDropped) prometheus.MustRegister(m.itemsFailed) prometheus.MustRegister(m.itemsExported) + prometheus.MustRegister(m.exportDuration) + prometheus.MustRegister(m.itemsProcessed) + prometheus.MustRegister(m.batchesProcessed) + prometheus.MustRegister(m.queueWaitTime) + prometheus.MustRegister(m.batchSize) return m } @@ -66,3 +108,23 @@ func (m *Metrics) IncItemsExportedBy(name string, count float64) { func (m *Metrics) IncItemsFailedBy(name string, count float64) { m.itemsFailed.WithLabelValues(name).Add(count) } + +func (m *Metrics) ObserveExportDuration(name string, duration float64) { + m.exportDuration.WithLabelValues(name).Observe(duration) +} + +func (m *Metrics) IncItemsProcessedBy(name string, count float64) { + m.itemsProcessed.WithLabelValues(name).Add(count) +} + +func (m *Metrics) IncBatchesProcessedBy(name string, count float64) { + m.batchesProcessed.WithLabelValues(name).Add(count) +} + +func (m *Metrics) ObserveQueueWaitTime(name string, duration float64) { + m.queueWaitTime.WithLabelValues(name).Observe(duration) +} + +func (m *Metrics) ObserveBatchSize(name string, size float64) { + m.batchSize.WithLabelValues(name).Observe(size) +} From ad7fffe5cda6d9bd9e58217572d8e933c5f6d08e Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Thu, 27 Jun 2024 14:39:00 +1000 Subject: [PATCH 04/26] refactor: Remove unused imports and fields --- pkg/processor/batch.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index c6a808d4..17f068db 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -8,9 +8,7 @@ import ( "sync" "time" - "github.com/ethpandaops/xatu/pkg/observability" "github.com/sirupsen/logrus" - "go.opentelemetry.io/otel/attribute" ) // ItemExporter is an interface for exporting items. @@ -123,8 +121,6 @@ type traceableItem[T any] struct { errCh chan error completedCh chan struct{} queuedAt time.Time - //nolint:containedctx // we need to pass the context to the workers - ctx context.Context } // NewBatchItemProcessor creates a new batch item processor. @@ -232,7 +228,6 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { item := traceableItem[T]{ item: i, queuedAt: time.Now(), - ctx: ctx, } if bvp.o.ShippingMethod == ShippingMethodSync { @@ -270,12 +265,6 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { // exportWithTimeout exports items with a timeout. func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBatch []traceableItem[T]) error { - _, span := observability.Tracer().Start(ctx, "BatchItemProcessor.exportWithTimeout") - defer span.End() - - span.SetAttributes(attribute.String("processor", bvp.name)) - span.SetAttributes(attribute.Int("batch_size", len(itemsBatch))) - if bvp.o.ExportTimeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, bvp.o.ExportTimeout) From c5cd69c54805694f1012bd5af15d7631a5db6ef0 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Thu, 27 Jun 2024 16:06:18 +1000 Subject: [PATCH 05/26] refactor: Add observability to batch item processor --- pkg/processor/batch.go | 54 +++++++++++++++++++++------------------- pkg/processor/metrics.go | 50 ++++++------------------------------- 2 files changed, 35 insertions(+), 69 deletions(-) diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index 17f068db..851f053d 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/ethpandaops/xatu/pkg/observability" "github.com/sirupsen/logrus" ) @@ -120,7 +121,6 @@ type traceableItem[T any] struct { item *T errCh chan error completedCh chan struct{} - queuedAt time.Time } // NewBatchItemProcessor creates a new batch item processor. @@ -201,11 +201,8 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log // configured to use the async shipping method, the items will be written to // the queue and this function will return immediately. func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { - bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue))) - - defer func() { - bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue))) - }() + _, span := observability.Tracer().Start(ctx, "BatchItemProcessor.Write") + defer span.End() if bvp.e == nil { return errors.New("exporter is nil") @@ -226,8 +223,7 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { for _, i := range s[start:end] { item := traceableItem[T]{ - item: i, - queuedAt: time.Now(), + item: i, } if bvp.o.ShippingMethod == ShippingMethodSync { @@ -245,17 +241,8 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { } if bvp.o.ShippingMethod == ShippingMethodSync { - for _, item := range prepared { - select { - case err := <-item.errCh: - if err != nil { - return err - } - case <-item.completedCh: - continue - case <-ctx.Done(): - return ctx.Err() - } + if err := bvp.waitForBatchCompletion(ctx, prepared); err != nil { + return err } } } @@ -293,9 +280,6 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa } for _, item := range itemsBatch { - waitTime := startTime.Sub(item.queuedAt) - bvp.metrics.ObserveQueueWaitTime(bvp.name, waitTime.Seconds()) - if item.errCh != nil { item.errCh <- err close(item.errCh) @@ -383,6 +367,23 @@ func WithWorkers(workers int) BatchItemProcessorOption { } } +func (bvp *BatchItemProcessor[T]) waitForBatchCompletion(ctx context.Context, items []traceableItem[T]) error { + for _, item := range items { + select { + case err := <-item.errCh: + if err != nil { + return err + } + case <-item.completedCh: + continue + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil +} + func (bvp *BatchItemProcessor[T]) batchBuilder(ctx context.Context) { log := bvp.log.WithField("module", "batch_builder") @@ -416,12 +417,9 @@ func (bvp *BatchItemProcessor[T]) sendBatch(batch []traceableItem[T], reason str log := bvp.log.WithField("reason", reason) log.Tracef("Creating a batch of %d items", len(batch)) - batchCopy := make([]traceableItem[T], len(batch)) - copy(batchCopy, batch) - log.Tracef("Batch items copied") - bvp.batchCh <- batchCopy + bvp.batchCh <- batch log.Tracef("Batch sent to batch channel") } @@ -441,6 +439,8 @@ func (bvp *BatchItemProcessor[T]) worker(ctx context.Context, number int) { if err := bvp.exportWithTimeout(ctx, batch); err != nil { bvp.log.WithError(err).Error("failed to export items") } + + bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue))) } } } @@ -489,6 +489,8 @@ func (bvp *BatchItemProcessor[T]) enqueueOrDrop(ctx context.Context, item tracea select { case bvp.queue <- item: + bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue))) + return nil default: bvp.metrics.IncItemsDroppedBy(bvp.name, float64(1)) diff --git a/pkg/processor/metrics.go b/pkg/processor/metrics.go index ee42c902..d6188bd8 100644 --- a/pkg/processor/metrics.go +++ b/pkg/processor/metrics.go @@ -9,16 +9,12 @@ var ( ) type Metrics struct { - itemsQueued *prometheus.GaugeVec - itemsDropped *prometheus.CounterVec - itemsFailed *prometheus.CounterVec - itemsExported *prometheus.CounterVec - exportDuration *prometheus.HistogramVec - itemsProcessed *prometheus.CounterVec - batchesProcessed *prometheus.CounterVec - processingDuration *prometheus.HistogramVec - queueWaitTime *prometheus.HistogramVec - batchSize *prometheus.HistogramVec + itemsQueued *prometheus.GaugeVec + itemsDropped *prometheus.CounterVec + itemsFailed *prometheus.CounterVec + itemsExported *prometheus.CounterVec + exportDuration *prometheus.HistogramVec + batchSize *prometheus.HistogramVec } func NewMetrics(namespace string) *Metrics { @@ -55,28 +51,11 @@ func NewMetrics(namespace string) *Metrics { Help: "Duration of export operations in seconds", Buckets: prometheus.ExponentialBuckets(0.1, 2, 10), }, []string{"processor"}), - - itemsProcessed: prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "items_processed_total", - Namespace: namespace, - Help: "Number of items processed", - }, []string{"processor"}), - batchesProcessed: prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "batches_processed_total", - Namespace: namespace, - Help: "Number of batches processed", - }, []string{"processor"}), - queueWaitTime: prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: "queue_wait_time_seconds", - Namespace: namespace, - Help: "Time items spend waiting in the queue in seconds", - Buckets: prometheus.ExponentialBuckets(0.1, 2, 10), - }, []string{"processor"}), batchSize: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: "batch_size", Namespace: namespace, Help: "Size of processed batches", - Buckets: prometheus.ExponentialBuckets(0.1, 2, 10), + Buckets: prometheus.ExponentialBucketsRange(1, 50000, 10), }, []string{"processor"}), } @@ -85,9 +64,6 @@ func NewMetrics(namespace string) *Metrics { prometheus.MustRegister(m.itemsFailed) prometheus.MustRegister(m.itemsExported) prometheus.MustRegister(m.exportDuration) - prometheus.MustRegister(m.itemsProcessed) - prometheus.MustRegister(m.batchesProcessed) - prometheus.MustRegister(m.queueWaitTime) prometheus.MustRegister(m.batchSize) return m @@ -113,18 +89,6 @@ func (m *Metrics) ObserveExportDuration(name string, duration float64) { m.exportDuration.WithLabelValues(name).Observe(duration) } -func (m *Metrics) IncItemsProcessedBy(name string, count float64) { - m.itemsProcessed.WithLabelValues(name).Add(count) -} - -func (m *Metrics) IncBatchesProcessedBy(name string, count float64) { - m.batchesProcessed.WithLabelValues(name).Add(count) -} - -func (m *Metrics) ObserveQueueWaitTime(name string, duration float64) { - m.queueWaitTime.WithLabelValues(name).Observe(duration) -} - func (m *Metrics) ObserveBatchSize(name string, size float64) { m.batchSize.WithLabelValues(name).Observe(size) } From 99a2ccb603d3455883b9e491216da91332c641d6 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Thu, 27 Jun 2024 16:27:13 +1000 Subject: [PATCH 06/26] refactor: Remove redundant code and optimize code structure --- pkg/output/http/http.go | 2 ++ pkg/output/kafka/kafka.go | 2 ++ pkg/output/stdout/stdout.go | 2 ++ pkg/output/xatu/xatu.go | 2 ++ pkg/processor/batch.go | 27 ++++++++++----------------- 5 files changed, 18 insertions(+), 17 deletions(-) diff --git a/pkg/output/http/http.go b/pkg/output/http/http.go index 0b40fc33..ed8473db 100644 --- a/pkg/output/http/http.go +++ b/pkg/output/http/http.go @@ -66,6 +66,8 @@ func (h *HTTP) Type() string { } func (h *HTTP) Start(ctx context.Context) error { + h.proc.Start(ctx) + return nil } diff --git a/pkg/output/kafka/kafka.go b/pkg/output/kafka/kafka.go index 7db4becb..fb2a0907 100644 --- a/pkg/output/kafka/kafka.go +++ b/pkg/output/kafka/kafka.go @@ -66,6 +66,8 @@ func (h *Kafka) Type() string { } func (h *Kafka) Start(ctx context.Context) error { + h.proc.Start(ctx) + return nil } diff --git a/pkg/output/stdout/stdout.go b/pkg/output/stdout/stdout.go index b1c2e376..7db813ad 100644 --- a/pkg/output/stdout/stdout.go +++ b/pkg/output/stdout/stdout.go @@ -66,6 +66,8 @@ func (h *StdOut) Type() string { } func (h *StdOut) Start(ctx context.Context) error { + h.proc.Start(ctx) + return nil } diff --git a/pkg/output/xatu/xatu.go b/pkg/output/xatu/xatu.go index 6c6dd801..20a0628f 100644 --- a/pkg/output/xatu/xatu.go +++ b/pkg/output/xatu/xatu.go @@ -70,6 +70,8 @@ func (h *Xatu) Name() string { } func (h *Xatu) Start(ctx context.Context) error { + h.proc.Start(ctx) + return nil } diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index c55082d3..b8f9b481 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -122,7 +122,6 @@ type traceableItem[T any] struct { item *T errCh chan error completedCh chan struct{} - queuedAt time.Time //nolint:containedctx // we need to pass the context to the workers ctx context.Context } @@ -182,21 +181,23 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log }, ).Info("Batch item processor initialized") - bvp.stopWait.Add(o.Workers) + return &bvp, nil +} + +func (bvp *BatchItemProcessor[T]) Start(ctx context.Context) { + bvp.stopWait.Add(bvp.o.Workers) - for i := 0; i < o.Workers; i++ { + for i := 0; i < bvp.o.Workers; i++ { go func(num int) { defer bvp.stopWait.Done() - bvp.worker(context.Background(), num) + bvp.worker(ctx, num) }(i) } go func() { - bvp.batchBuilder(context.Background()) + bvp.batchBuilder(ctx) bvp.log.Info("Batch builder exited") }() - - return &bvp, nil } // Write writes items to the queue. If the Processor is configured to use @@ -227,9 +228,8 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { for _, i := range s[start:end] { item := traceableItem[T]{ - item: i, - queuedAt: time.Now(), - ctx: ctx, + item: i, + ctx: ctx, } if bvp.o.ShippingMethod == ShippingMethodSync { @@ -258,13 +258,6 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { // exportWithTimeout exports items with a timeout. func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBatch []traceableItem[T]) error { - contexts := make([]context.Context, len(itemsBatch)) - for i, item := range itemsBatch { - contexts[i] = item.ctx - } - - ctx = observability.MergeContexts(ctx, contexts...) - _, span := observability.Tracer().Start(ctx, "BatchItemProcessor.exportWithTimeout") defer span.End() From a833e32fba336dabd1c53b54878f4ce7d943485d Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Thu, 27 Jun 2024 17:32:05 +1000 Subject: [PATCH 07/26] feat: Add metrics tracking to batch processor --- pkg/processor/batch.go | 48 +++++++++++++++++++----------- pkg/processor/batch_test.go | 59 +++++++++++++++++++++++++++++++++++-- 2 files changed, 86 insertions(+), 21 deletions(-) diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index b8f9b481..815ef414 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -79,6 +79,8 @@ type BatchItemProcessorOptions struct { // Workers is the number of workers to process batches. // The default value of Workers is 5. Workers int + // Metrics is the metrics instance to use. + Metrics *Metrics } func (o *BatchItemProcessorOptions) Validate() error { @@ -104,8 +106,8 @@ type BatchItemProcessor[T any] struct { log logrus.FieldLogger - queue chan traceableItem[T] - batchCh chan []traceableItem[T] + queue chan *TraceableItem[T] + batchCh chan []*TraceableItem[T] name string timer *time.Timer @@ -118,12 +120,10 @@ type BatchItemProcessor[T any] struct { metrics *Metrics } -type traceableItem[T any] struct { +type TraceableItem[T any] struct { item *T errCh chan error completedCh chan struct{} - //nolint:containedctx // we need to pass the context to the workers - ctx context.Context } // NewBatchItemProcessor creates a new batch item processor. @@ -155,7 +155,10 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log return nil, fmt.Errorf("invalid batch item processor options: %w: %s", err, name) } - metrics := DefaultMetrics + metrics := o.Metrics + if metrics == nil { + metrics = DefaultMetrics + } bvp := BatchItemProcessor[T]{ e: exporter, @@ -164,8 +167,8 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log name: name, metrics: metrics, timer: time.NewTimer(o.BatchTimeout), - queue: make(chan traceableItem[T], o.MaxQueueSize), - batchCh: make(chan []traceableItem[T], o.Workers), + queue: make(chan *TraceableItem[T], o.MaxQueueSize), + batchCh: make(chan []*TraceableItem[T], o.Workers), stopCh: make(chan struct{}), stopWorkersCh: make(chan struct{}), } @@ -224,12 +227,11 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { end = len(s) } - prepared := []traceableItem[T]{} + prepared := []*TraceableItem[T]{} for _, i := range s[start:end] { - item := traceableItem[T]{ + item := &TraceableItem[T]{ item: i, - ctx: ctx, } if bvp.o.ShippingMethod == ShippingMethodSync { @@ -257,7 +259,11 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { } // exportWithTimeout exports items with a timeout. -func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBatch []traceableItem[T]) error { +func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBatch []*TraceableItem[T]) error { + if len(itemsBatch) == 0 { + return nil + } + _, span := observability.Tracer().Start(ctx, "BatchItemProcessor.exportWithTimeout") defer span.End() @@ -379,7 +385,13 @@ func WithWorkers(workers int) BatchItemProcessorOption { } } -func (bvp *BatchItemProcessor[T]) waitForBatchCompletion(ctx context.Context, items []traceableItem[T]) error { +func WithMetrics(metrics *Metrics) BatchItemProcessorOption { + return func(o *BatchItemProcessorOptions) { + o.Metrics = metrics + } +} + +func (bvp *BatchItemProcessor[T]) waitForBatchCompletion(ctx context.Context, items []*TraceableItem[T]) error { for _, item := range items { select { case err := <-item.errCh: @@ -399,7 +411,7 @@ func (bvp *BatchItemProcessor[T]) waitForBatchCompletion(ctx context.Context, it func (bvp *BatchItemProcessor[T]) batchBuilder(ctx context.Context) { log := bvp.log.WithField("module", "batch_builder") - var batch []traceableItem[T] + var batch []*TraceableItem[T] for { select { @@ -413,19 +425,19 @@ func (bvp *BatchItemProcessor[T]) batchBuilder(ctx context.Context) { if len(batch) >= bvp.o.MaxExportBatchSize { bvp.sendBatch(batch, "max_export_batch_size") - batch = []traceableItem[T]{} + batch = []*TraceableItem[T]{} } case <-bvp.timer.C: if len(batch) > 0 { bvp.sendBatch(batch, "timer") - batch = []traceableItem[T]{} + batch = []*TraceableItem[T]{} } else { bvp.timer.Reset(bvp.o.BatchTimeout) } } } } -func (bvp *BatchItemProcessor[T]) sendBatch(batch []traceableItem[T], reason string) { +func (bvp *BatchItemProcessor[T]) sendBatch(batch []*TraceableItem[T], reason string) { log := bvp.log.WithField("reason", reason) log.Tracef("Creating a batch of %d items", len(batch)) @@ -488,7 +500,7 @@ func recoverSendOnClosedChan() { panic(x) } -func (bvp *BatchItemProcessor[T]) enqueueOrDrop(ctx context.Context, item traceableItem[T]) error { +func (bvp *BatchItemProcessor[T]) enqueueOrDrop(ctx context.Context, item *TraceableItem[T]) error { // This ensures the bvp.queue<- below does not panic as the // processor shuts down. defer recoverSendOnClosedChan() diff --git a/pkg/processor/batch_test.go b/pkg/processor/batch_test.go index f1f264d9..3c7e35c4 100644 --- a/pkg/processor/batch_test.go +++ b/pkg/processor/batch_test.go @@ -108,6 +108,8 @@ func TestNewBatchItemProcessorWithNilExporter(t *testing.T) { bsp, err := NewBatchItemProcessor[TestItem](nil, "processor", nullLogger()) require.NoError(t, err) + bsp.Start(context.Background()) + err = bsp.Write(context.Background(), []*TestItem{{ name: "test", }}) @@ -237,6 +239,8 @@ func TestBatchItemProcessorExportTimeout(t *testing.T) { ) require.NoError(t, err) + bvp.Start(context.Background()) + if err := bvp.Write(context.Background(), []*TestItem{{ name: "test", }}); err != nil { @@ -251,7 +255,14 @@ func TestBatchItemProcessorExportTimeout(t *testing.T) { } func createAndRegisterBatchSP[T TestItem](options []BatchItemProcessorOption, te *testBatchExporter[T]) (*BatchItemProcessor[T], error) { - return NewBatchItemProcessor[T](te, "processor", nullLogger(), options...) + bvp, err := NewBatchItemProcessor[T](te, "processor", nullLogger(), options...) + if err != nil { + return nil, err + } + + bvp.Start(context.Background()) + + return bvp, nil } func TestBatchItemProcessorShutdown(t *testing.T) { @@ -259,6 +270,8 @@ func TestBatchItemProcessorShutdown(t *testing.T) { bvp, err := NewBatchItemProcessor[TestItem](&bp, "processor", nullLogger()) require.NoError(t, err) + bvp.Start(context.Background()) + err = bvp.Shutdown(context.Background()) if err != nil { t.Error("Error shutting the BatchItemProcessor down\n") @@ -281,6 +294,8 @@ func TestBatchItemProcessorDrainQueue(t *testing.T) { bsp, err := NewBatchItemProcessor[TestItem](&be, "processor", log, WithMaxExportBatchSize(5), WithBatchTimeout(1*time.Second), WithWorkers(2), WithShippingMethod(ShippingMethodAsync)) require.NoError(t, err) + bsp.Start(context.Background()) + itemsToExport := 5000 for i := 0; i < itemsToExport; i++ { @@ -301,6 +316,8 @@ func TestBatchItemProcessorPostShutdown(t *testing.T) { bsp, err := NewBatchItemProcessor[TestItem](&be, "processor", nullLogger(), WithMaxExportBatchSize(50), WithBatchTimeout(5*time.Millisecond)) require.NoError(t, err) + bsp.Start(context.Background()) + for i := 0; i < 60; i++ { if err := bsp.Write(context.Background(), []*TestItem{{ name: strconv.Itoa(i), @@ -343,6 +360,8 @@ func TestMultipleWorkersConsumeConcurrently(t *testing.T) { bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(10), WithBatchTimeout(5*time.Minute), WithWorkers(20)) require.NoError(t, err) + bsp.Start(context.Background()) + itemsToExport := 100 for i := 0; i < itemsToExport; i++ { @@ -363,6 +382,8 @@ func TestWorkersProcessBatches(t *testing.T) { bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(10), WithWorkers(5)) require.NoError(t, err) + bsp.Start(context.Background()) + itemsToExport := 50 for i := 0; i < itemsToExport; i++ { @@ -387,6 +408,8 @@ func TestDrainQueueWithMultipleWorkers(t *testing.T) { bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(10), WithWorkers(5)) require.NoError(t, err) + bsp.Start(context.Background()) + itemsToExport := 100 for i := 0; i < itemsToExport; i++ { @@ -410,6 +433,8 @@ func TestBatchItemProcessorTimerFunctionality(t *testing.T) { bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(50), WithBatchTimeout(batchTimeout), WithWorkers(5)) require.NoError(t, err) + bsp.Start(context.Background()) + // Add items less than the max batch size itemsToExport := 25 @@ -453,6 +478,8 @@ func TestBatchItemProcessorTimeout(t *testing.T) { t.Fatalf("failed to create batch processor: %v", err) } + bsp.Start(context.Background()) + if got, want := bsp.Write(ctx, []*TestItem{{}}), context.DeadlineExceeded; !errors.Is(got, want) { t.Errorf("expected %q error, got %v", want, got) } @@ -472,6 +499,8 @@ func TestBatchItemProcessorCancellation(t *testing.T) { t.Fatalf("failed to create batch processor: %v", err) } + bsp.Start(context.Background()) + if got, want := bsp.Write(ctx, []*TestItem{{}}), context.Canceled; !errors.Is(got, want) { t.Errorf("expected %q error, got %v", want, got) } @@ -500,6 +529,8 @@ func TestBatchItemProcessorWithSyncErrorExporter(t *testing.T) { t.Fatalf("failed to create batch processor: %v", err) } + bsp.Start(context.Background()) + err = bsp.Write(context.Background(), []*TestItem{{name: "test"}}) if err == nil { t.Errorf("Expected write to fail") @@ -528,6 +559,8 @@ func TestBatchItemProcessorSyncShipping(t *testing.T) { ) require.NoError(t, err) + bsp.Start(context.Background()) + items := make([]*TestItem, itemsToExport) for i := 0; i < itemsToExport; i++ { items[i] = &TestItem{name: strconv.Itoa(i)} @@ -566,6 +599,8 @@ func TestBatchItemProcessorExportCancellationOnFailure(t *testing.T) { bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(maxBatchSize), WithWorkers(workers), WithShippingMethod(ShippingMethodSync)) require.NoError(t, err) + bsp.Start(context.Background()) + items := make([]*TestItem, itemsToExport) for i := 0; i < itemsToExport; i++ { items[i] = &TestItem{name: strconv.Itoa(i)} @@ -627,6 +662,8 @@ func TestBatchItemProcessorExportWithTimeout(t *testing.T) { bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(10), WithWorkers(5), WithExportTimeout(1*time.Second), WithShippingMethod(ShippingMethodSync)) require.NoError(t, err) + bsp.Start(context.Background()) + itemsToExport := 10 items := make([]*TestItem, itemsToExport) @@ -644,6 +681,8 @@ func TestBatchItemProcessorWithBatchTimeout(t *testing.T) { bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(10), WithWorkers(5), WithBatchTimeout(1*time.Second)) require.NoError(t, err) + bsp.Start(context.Background()) + itemsToExport := 5 items := make([]*TestItem, itemsToExport) @@ -662,11 +701,25 @@ func TestBatchItemProcessorWithBatchTimeout(t *testing.T) { func TestBatchItemProcessorQueueSize(t *testing.T) { te := indefiniteExporter[TestItem]{} + + metrics := NewMetrics("test") maxQueueSize := 5 - bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithBatchTimeout(10*time.Minute), WithMaxQueueSize(maxQueueSize), WithMaxExportBatchSize(maxQueueSize), WithWorkers(1), WithShippingMethod(ShippingMethodAsync)) + bsp, err := NewBatchItemProcessor[TestItem]( + &te, + "processor", + nullLogger(), + WithBatchTimeout(10*time.Minute), + WithMaxQueueSize(maxQueueSize), + WithMaxExportBatchSize(maxQueueSize), + WithWorkers(1), + WithShippingMethod(ShippingMethodSync), + WithMetrics(metrics), + ) require.NoError(t, err) - itemsToExport := 10 + bsp.Start(context.Background()) + + itemsToExport := 5 items := make([]*TestItem, itemsToExport) for i := 0; i < itemsToExport; i++ { From 1b861833b80f5a562d6d566a3c168d77c1a5f95e Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Thu, 27 Jun 2024 18:34:22 +1000 Subject: [PATCH 08/26] feat: Add start method for each sink --- pkg/server/service/event-ingester/ingester.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/server/service/event-ingester/ingester.go b/pkg/server/service/event-ingester/ingester.go index 3407808e..24ec5bd9 100644 --- a/pkg/server/service/event-ingester/ingester.go +++ b/pkg/server/service/event-ingester/ingester.go @@ -51,6 +51,12 @@ func (e *Ingester) Start(ctx context.Context, grpcServer *grpc.Server) error { xatu.RegisterEventIngesterServer(grpcServer, e) + for _, sink := range e.sinks { + if err := sink.Start(ctx); err != nil { + return err + } + } + return nil } From 741a4278035070d1a149ead881de0f509f5fd48a Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Thu, 27 Jun 2024 19:07:41 +1000 Subject: [PATCH 09/26] feat: Add method to start processing in Record --- pkg/server/service/coordinator/node/record.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/server/service/coordinator/node/record.go b/pkg/server/service/coordinator/node/record.go index e47e1d2f..0a1b9e57 100644 --- a/pkg/server/service/coordinator/node/record.go +++ b/pkg/server/service/coordinator/node/record.go @@ -45,6 +45,8 @@ func (r *Record) Start(ctx context.Context) error { return err } + r.proc.Start(ctx) + return nil } From fcbe764036f20973f25556a2fbfbdab3aa34d451 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Fri, 28 Jun 2024 15:41:56 +1000 Subject: [PATCH 10/26] feat: Add worker metrics to BatchItemProcessor --- pkg/processor/batch.go | 7 +++++- pkg/processor/batch_test.go | 34 ++++++++++++++++++++++++++++ pkg/processor/metrics.go | 44 ++++++++++++++++++++++++++++++------- 3 files changed, 76 insertions(+), 9 deletions(-) diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index 815ef414..bcd1a2b6 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -190,6 +190,8 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log func (bvp *BatchItemProcessor[T]) Start(ctx context.Context) { bvp.stopWait.Add(bvp.o.Workers) + bvp.metrics.SetWorkerCount(bvp.name, float64(bvp.o.Workers)) + for i := 0; i < bvp.o.Workers; i++ { go func(num int) { defer bvp.stopWait.Done() @@ -264,6 +266,9 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa return nil } + bvp.metrics.IncWorkerExportInProgress(bvp.name) + defer bvp.metrics.DecWorkerExportInProgress(bvp.name) + _, span := observability.Tracer().Start(ctx, "BatchItemProcessor.exportWithTimeout") defer span.End() @@ -288,7 +293,7 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa duration := time.Since(startTime) - bvp.metrics.ObserveExportDuration(bvp.name, duration.Seconds()) + bvp.metrics.ObserveExportDuration(bvp.name, duration) if err != nil { bvp.metrics.IncItemsFailedBy(bvp.name, float64(len(itemsBatch))) diff --git a/pkg/processor/batch_test.go b/pkg/processor/batch_test.go index 3c7e35c4..40e8c263 100644 --- a/pkg/processor/batch_test.go +++ b/pkg/processor/batch_test.go @@ -699,6 +699,40 @@ func TestBatchItemProcessorWithBatchTimeout(t *testing.T) { require.Equal(t, itemsToExport, te.len(), "Expected all items to be exported after batch timeout") } +func TestBatchItemProcessorDrainOnShutdownAfterContextCancellation(t *testing.T) { + te := testBatchExporter[TestItem]{} + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(10), WithWorkers(5), WithBatchTimeout(1*time.Second)) + require.NoError(t, err) + + // Create a cancellable context for Start + ctx, cancel := context.WithCancel(context.Background()) + bsp.Start(ctx) + + itemsToExport := 50 + items := make([]*TestItem, itemsToExport) + + for i := 0; i < itemsToExport; i++ { + items[i] = &TestItem{name: strconv.Itoa(i)} + } + + // Write items to the processor + err = bsp.Write(context.Background(), items) + require.NoError(t, err) + + // Cancel the context immediately after writing + cancel() + + // Allow some time for the cancellation to propagate + time.Sleep(100 * time.Millisecond) + + // Shutdown the processor + err = bsp.Shutdown(context.Background()) + require.NoError(t, err) + + // Check if any items were exported + require.Greater(t, itemsToExport, 0, "No items should have been exported on shutdown") +} + func TestBatchItemProcessorQueueSize(t *testing.T) { te := indefiniteExporter[TestItem]{} diff --git a/pkg/processor/metrics.go b/pkg/processor/metrics.go index d6188bd8..e882904c 100644 --- a/pkg/processor/metrics.go +++ b/pkg/processor/metrics.go @@ -1,6 +1,8 @@ package processor import ( + "time" + "github.com/prometheus/client_golang/prometheus" ) @@ -9,12 +11,14 @@ var ( ) type Metrics struct { - itemsQueued *prometheus.GaugeVec - itemsDropped *prometheus.CounterVec - itemsFailed *prometheus.CounterVec - itemsExported *prometheus.CounterVec - exportDuration *prometheus.HistogramVec - batchSize *prometheus.HistogramVec + itemsQueued *prometheus.GaugeVec + itemsDropped *prometheus.CounterVec + itemsFailed *prometheus.CounterVec + itemsExported *prometheus.CounterVec + exportDuration *prometheus.HistogramVec + batchSize *prometheus.HistogramVec + workerCount *prometheus.GaugeVec + workerExportInProgress *prometheus.GaugeVec } func NewMetrics(namespace string) *Metrics { @@ -57,6 +61,16 @@ func NewMetrics(namespace string) *Metrics { Help: "Size of processed batches", Buckets: prometheus.ExponentialBucketsRange(1, 50000, 10), }, []string{"processor"}), + workerCount: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "worker_count", + Namespace: namespace, + Help: "Number of active workers", + }, []string{"processor"}), + workerExportInProgress: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "worker_export_in_progress", + Namespace: namespace, + Help: "Number of workers currently exporting", + }, []string{"processor"}), } prometheus.MustRegister(m.itemsQueued) @@ -65,6 +79,8 @@ func NewMetrics(namespace string) *Metrics { prometheus.MustRegister(m.itemsExported) prometheus.MustRegister(m.exportDuration) prometheus.MustRegister(m.batchSize) + prometheus.MustRegister(m.workerCount) + prometheus.MustRegister(m.workerExportInProgress) return m } @@ -85,10 +101,22 @@ func (m *Metrics) IncItemsFailedBy(name string, count float64) { m.itemsFailed.WithLabelValues(name).Add(count) } -func (m *Metrics) ObserveExportDuration(name string, duration float64) { - m.exportDuration.WithLabelValues(name).Observe(duration) +func (m *Metrics) ObserveExportDuration(name string, duration time.Duration) { + m.exportDuration.WithLabelValues(name).Observe(duration.Seconds()) } func (m *Metrics) ObserveBatchSize(name string, size float64) { m.batchSize.WithLabelValues(name).Observe(size) } + +func (m *Metrics) SetWorkerCount(name string, count float64) { + m.workerCount.WithLabelValues(name).Set(count) +} + +func (m *Metrics) IncWorkerExportInProgress(name string) { + m.workerExportInProgress.WithLabelValues(name).Inc() +} + +func (m *Metrics) DecWorkerExportInProgress(name string) { + m.workerExportInProgress.WithLabelValues(name).Dec() +} From 595e8fe03bbf6b40afe3b6edd10a42c36b86a840 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Fri, 28 Jun 2024 17:02:54 +1000 Subject: [PATCH 11/26] feat: Add handling time histogram with custom buckets --- pkg/output/xatu/exporter.go | 1 + pkg/server/server.go | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/output/xatu/exporter.go b/pkg/output/xatu/exporter.go index 99b22d53..5be16236 100644 --- a/pkg/output/xatu/exporter.go +++ b/pkg/output/xatu/exporter.go @@ -33,6 +33,7 @@ func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemE opts := []grpc.DialOption{ grpc.WithChainUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor, retry.UnaryClientInterceptor()), grpc.WithChainStreamInterceptor(grpc_prometheus.StreamClientInterceptor, retry.StreamClientInterceptor()), + grpc.WithTimeout(config.ExportTimeout), } if config.TLS { diff --git a/pkg/server/server.go b/pkg/server/server.go index 9fd2df60..d1d33532 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -22,6 +22,7 @@ import ( "github.com/go-co-op/gocron" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/sdk/trace" @@ -269,7 +270,7 @@ func (x *Xatu) startGrpcServer(ctx context.Context) error { mb100 := 1024 * 1024 * 100 - grpc_prometheus.EnableHandlingTimeHistogram() + grpc_prometheus.EnableHandlingTimeHistogram(grpc_prometheus.WithHistogramBuckets(prometheus.ExponentialBucketsRange(0.1, 30, 10))) opts := []grpc.ServerOption{ grpc.MaxRecvMsgSize(mb100), From 55827349210dec21525b5c5353d0ae65229e7b83 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Fri, 28 Jun 2024 17:27:56 +1000 Subject: [PATCH 12/26] style: Update handling time histogram buckets --- pkg/server/server.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/server/server.go b/pkg/server/server.go index d1d33532..e9ff226d 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -22,7 +22,6 @@ import ( "github.com/go-co-op/gocron" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/sdk/trace" @@ -270,7 +269,13 @@ func (x *Xatu) startGrpcServer(ctx context.Context) error { mb100 := 1024 * 1024 * 100 - grpc_prometheus.EnableHandlingTimeHistogram(grpc_prometheus.WithHistogramBuckets(prometheus.ExponentialBucketsRange(0.1, 30, 10))) + grpc_prometheus.EnableHandlingTimeHistogram( + grpc_prometheus.WithHistogramBuckets( + []float64{ + 0.01, 0.03, 0.1, 0.3, 1, 3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, + }, + ), + ) opts := []grpc.ServerOption{ grpc.MaxRecvMsgSize(mb100), From c78dccc1fe9ad7d5d63012d917afbb892d9b29fe Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Fri, 28 Jun 2024 18:36:52 +1000 Subject: [PATCH 13/26] feat: Add option to override shipping method for writing items --- pkg/output/http/http.go | 15 ++++++++++++++- pkg/processor/batch.go | 15 ++++++++++++--- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/pkg/output/http/http.go b/pkg/output/http/http.go index ed8473db..102659f4 100644 --- a/pkg/output/http/http.go +++ b/pkg/output/http/http.go @@ -3,6 +3,7 @@ package http import ( "context" "errors" + "strings" "github.com/ethpandaops/xatu/pkg/processor" "github.com/ethpandaops/xatu/pkg/proto/xatu" @@ -106,5 +107,17 @@ func (h *HTTP) HandleNewDecoratedEvents(ctx context.Context, events []*xatu.Deco } } - return h.proc.Write(ctx, filtered) + if len(filtered) == 0 { + return nil + } + + shippingMethod := processor.ShippingMethodSync + + if strings.Contains(filtered[0].Meta.Client.Name, "sentry") || strings.Contains(filtered[0].Meta.Client.Name, "cl-mimicry") { + shippingMethod = processor.ShippingMethodAsync + } + + return h.proc.Write(ctx, filtered, processor.WriteOptions{ + OverrideShippingMethod: &shippingMethod, + }) } diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index bcd1a2b6..c1ca4bde 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -205,12 +205,16 @@ func (bvp *BatchItemProcessor[T]) Start(ctx context.Context) { }() } +type WriteOptions struct { + OverrideShippingMethod *ShippingMethod +} + // Write writes items to the queue. If the Processor is configured to use // the sync shipping method, the items will be written to the queue and this // function will return when all items have been processed. If the Processor is // configured to use the async shipping method, the items will be written to // the queue and this function will return immediately. -func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { +func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T, opts ...WriteOptions) error { _, span := observability.Tracer().Start(ctx, "BatchItemProcessor.Write") defer span.End() @@ -218,6 +222,11 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { return errors.New("exporter is nil") } + shippingMethod := bvp.o.ShippingMethod + if len(opts) > 0 { + shippingMethod = *opts[0].OverrideShippingMethod + } + // Break our items up in to chunks that can be processed at // one time by our workers. This is to prevent wasting // resources sending items if we've failed an earlier @@ -236,7 +245,7 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { item: i, } - if bvp.o.ShippingMethod == ShippingMethodSync { + if shippingMethod == ShippingMethodSync { item.errCh = make(chan error, 1) item.completedCh = make(chan struct{}, 1) } @@ -250,7 +259,7 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { } } - if bvp.o.ShippingMethod == ShippingMethodSync { + if shippingMethod == ShippingMethodSync { if err := bvp.waitForBatchCompletion(ctx, prepared); err != nil { return err } From 5d8ffb64d2635bfdfbd3782efa77b43ee637ee48 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Sat, 29 Jun 2024 11:26:52 +1000 Subject: [PATCH 14/26] feat: Add keepalive parameters to gRPC client configuration --- pkg/output/xatu/exporter.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/output/xatu/exporter.go b/pkg/output/xatu/exporter.go index 5be16236..a2229dd9 100644 --- a/pkg/output/xatu/exporter.go +++ b/pkg/output/xatu/exporter.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "time" "github.com/ethpandaops/xatu/pkg/observability" pb "github.com/ethpandaops/xatu/pkg/proto/xatu" @@ -18,6 +19,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/encoding/gzip" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" ) @@ -33,7 +35,11 @@ func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemE opts := []grpc.DialOption{ grpc.WithChainUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor, retry.UnaryClientInterceptor()), grpc.WithChainStreamInterceptor(grpc_prometheus.StreamClientInterceptor, retry.StreamClientInterceptor()), - grpc.WithTimeout(config.ExportTimeout), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 10 * time.Second, + Timeout: 30 * time.Second, + PermitWithoutStream: true, + }), } if config.TLS { From 3409b8917e3166f6a62034cfa0ab6222b42337b3 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Sat, 29 Jun 2024 11:34:51 +1000 Subject: [PATCH 15/26] fix: Update MaxConnectionAge to 1 minute --- pkg/server/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/server/server.go b/pkg/server/server.go index e9ff226d..87505585 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -281,7 +281,7 @@ func (x *Xatu) startGrpcServer(ctx context.Context) error { grpc.MaxRecvMsgSize(mb100), grpc.KeepaliveParams(keepalive.ServerParameters{ MaxConnectionIdle: 5 * time.Minute, - MaxConnectionAge: 10 * time.Minute, + MaxConnectionAge: 1 * time.Minute, MaxConnectionAgeGrace: 2 * time.Minute, Time: 1 * time.Minute, Timeout: 15 * time.Second, From c4f31c175e269eaef7eddb105db4690318eb47b1 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Sat, 29 Jun 2024 13:26:15 +1000 Subject: [PATCH 16/26] refactor: Simplify log field assignment in NewItemExporter --- pkg/output/http/exporter.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/output/http/exporter.go b/pkg/output/http/exporter.go index dc0b4b0c..65107c16 100644 --- a/pkg/output/http/exporter.go +++ b/pkg/output/http/exporter.go @@ -25,14 +25,19 @@ type ItemExporter struct { } func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemExporter, error) { + log = log.WithField("output_name", name).WithField("output_type", SinkType) + t := http.DefaultTransport.(*http.Transport).Clone() + if config.KeepAlive != nil && !*config.KeepAlive { + log.WithField("keep_alive", *config.KeepAlive).Warn("Disabling keep-alives") + t.DisableKeepAlives = true } return ItemExporter{ config: config, - log: log.WithField("output_name", name).WithField("output_type", SinkType), + log: log, client: &http.Client{ Transport: t, From 4ff9775ced5905c89c5728e8111759f4eb1c4a64 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 1 Jul 2024 16:49:03 +1000 Subject: [PATCH 17/26] fix: Update MaxConnectionAge to 10 minutes --- pkg/server/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/server/server.go b/pkg/server/server.go index 87505585..e9ff226d 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -281,7 +281,7 @@ func (x *Xatu) startGrpcServer(ctx context.Context) error { grpc.MaxRecvMsgSize(mb100), grpc.KeepaliveParams(keepalive.ServerParameters{ MaxConnectionIdle: 5 * time.Minute, - MaxConnectionAge: 1 * time.Minute, + MaxConnectionAge: 10 * time.Minute, MaxConnectionAgeGrace: 2 * time.Minute, Time: 1 * time.Minute, Timeout: 15 * time.Second, From 7b49ceb31a1c9235d430516098781adde3733232 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Tue, 2 Jul 2024 11:57:13 +1000 Subject: [PATCH 18/26] refactor: Remove unnecessary code and simplify Write function --- pkg/output/http/http.go | 11 +---------- pkg/processor/batch.go | 15 +++------------ 2 files changed, 4 insertions(+), 22 deletions(-) diff --git a/pkg/output/http/http.go b/pkg/output/http/http.go index 102659f4..f015f3c8 100644 --- a/pkg/output/http/http.go +++ b/pkg/output/http/http.go @@ -3,7 +3,6 @@ package http import ( "context" "errors" - "strings" "github.com/ethpandaops/xatu/pkg/processor" "github.com/ethpandaops/xatu/pkg/proto/xatu" @@ -111,13 +110,5 @@ func (h *HTTP) HandleNewDecoratedEvents(ctx context.Context, events []*xatu.Deco return nil } - shippingMethod := processor.ShippingMethodSync - - if strings.Contains(filtered[0].Meta.Client.Name, "sentry") || strings.Contains(filtered[0].Meta.Client.Name, "cl-mimicry") { - shippingMethod = processor.ShippingMethodAsync - } - - return h.proc.Write(ctx, filtered, processor.WriteOptions{ - OverrideShippingMethod: &shippingMethod, - }) + return h.proc.Write(ctx, filtered) } diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index c1ca4bde..bcd1a2b6 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -205,16 +205,12 @@ func (bvp *BatchItemProcessor[T]) Start(ctx context.Context) { }() } -type WriteOptions struct { - OverrideShippingMethod *ShippingMethod -} - // Write writes items to the queue. If the Processor is configured to use // the sync shipping method, the items will be written to the queue and this // function will return when all items have been processed. If the Processor is // configured to use the async shipping method, the items will be written to // the queue and this function will return immediately. -func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T, opts ...WriteOptions) error { +func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { _, span := observability.Tracer().Start(ctx, "BatchItemProcessor.Write") defer span.End() @@ -222,11 +218,6 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T, opts ...Wri return errors.New("exporter is nil") } - shippingMethod := bvp.o.ShippingMethod - if len(opts) > 0 { - shippingMethod = *opts[0].OverrideShippingMethod - } - // Break our items up in to chunks that can be processed at // one time by our workers. This is to prevent wasting // resources sending items if we've failed an earlier @@ -245,7 +236,7 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T, opts ...Wri item: i, } - if shippingMethod == ShippingMethodSync { + if bvp.o.ShippingMethod == ShippingMethodSync { item.errCh = make(chan error, 1) item.completedCh = make(chan struct{}, 1) } @@ -259,7 +250,7 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T, opts ...Wri } } - if shippingMethod == ShippingMethodSync { + if bvp.o.ShippingMethod == ShippingMethodSync { if err := bvp.waitForBatchCompletion(ctx, prepared); err != nil { return err } From e2c40881e77049d3945090787dc20b607e1861c9 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Tue, 2 Jul 2024 11:59:40 +1000 Subject: [PATCH 19/26] style: Update maxExportBatchSize to 64 and compression to zstd --- deploy/local/docker-compose/xatu-server.yaml | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/deploy/local/docker-compose/xatu-server.yaml b/deploy/local/docker-compose/xatu-server.yaml index 600e60e8..1969b6d8 100644 --- a/deploy/local/docker-compose/xatu-server.yaml +++ b/deploy/local/docker-compose/xatu-server.yaml @@ -51,12 +51,7 @@ services: maxQueueSize: 102400 batchTimeout: 3s exportTimeout: 30s -<<<<<<< HEAD - maxExportBatchSize: 512 - compression: none -======= - maxExportBatchSize: 5000 + maxExportBatchSize: 64 compression: zstd ->>>>>>> master keepAlive: true workers: 50 \ No newline at end of file From 46cd0a29c36f9e51c387e2f0503d1dd49347d162 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Tue, 2 Jul 2024 15:10:32 +1000 Subject: [PATCH 20/26] style: Remove pointer from Tracing in Config struct --- pkg/sentry/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sentry/config.go b/pkg/sentry/config.go index cce19bd3..0cae5835 100644 --- a/pkg/sentry/config.go +++ b/pkg/sentry/config.go @@ -46,7 +46,7 @@ type Config struct { ProposerDuty *ProposerDutyConfig `yaml:"proposerDuty" default:"{'enabled': true}"` // Tracing configuration - Tracing *observability.TracingConfig `yaml:"tracing"` + Tracing observability.TracingConfig `yaml:"tracing"` } func (c *Config) Validate() error { From 068be06ca545617d26851a51c63599da0b43b7fe Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Wed, 3 Jul 2024 14:09:44 +1000 Subject: [PATCH 21/26] refactor: Remove unused code and variables --- pkg/cannon/deriver/beacon/eth/v1/beacon_validators.go | 9 --------- pkg/sentry/sentry.go | 1 + pkg/server/server.go | 2 -- 3 files changed, 1 insertion(+), 11 deletions(-) diff --git a/pkg/cannon/deriver/beacon/eth/v1/beacon_validators.go b/pkg/cannon/deriver/beacon/eth/v1/beacon_validators.go index 3c31c804..6dfbbd96 100644 --- a/pkg/cannon/deriver/beacon/eth/v1/beacon_validators.go +++ b/pkg/cannon/deriver/beacon/eth/v1/beacon_validators.go @@ -5,7 +5,6 @@ import ( "fmt" "time" - client "github.com/attestantio/go-eth2-client" apiv1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/spec/phase0" backoff "github.com/cenkalti/backoff/v4" @@ -295,14 +294,6 @@ func (b *BeaconValidatorsDeriver) processEpoch(ctx context.Context, epoch phase0 return allEvents, boundarySlot, nil } -func (b *BeaconValidatorsDeriver) getValidatorsClient(ctx context.Context) (client.ValidatorsProvider, error) { - if provider, isProvider := b.beacon.Node().Service().(client.ValidatorsProvider); isProvider { - return provider, nil - } - - return nil, errors.New("validator states client not found") -} - func (b *BeaconValidatorsDeriver) createEventFromValidators(ctx context.Context, validators []*apiv1.Validator, epoch phase0.Epoch) (*xatu.DecoratedEvent, error) { metadata, ok := proto.Clone(b.clientMeta).(*xatu.ClientMeta) if !ok { diff --git a/pkg/sentry/sentry.go b/pkg/sentry/sentry.go index e1b5c87c..4b30c11f 100644 --- a/pkg/sentry/sentry.go +++ b/pkg/sentry/sentry.go @@ -124,6 +124,7 @@ func (s *Sentry) Start(ctx context.Context) error { if err != nil { return perrors.Wrap(err, "failed to create tracing resource") } + opts := []trace.TracerProviderOption{ trace.WithSampler(trace.ParentBased(trace.TraceIDRatioBased(s.Config.Tracing.Sampling.Rate))), } diff --git a/pkg/server/server.go b/pkg/server/server.go index e9ff226d..bb38a905 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -48,8 +48,6 @@ type Xatu struct { clockDrift *time.Duration - tracer *trace.TracerProvider - shutdownFuncs []func(ctx context.Context) error } From 347101a8a673c24ef2b16f823c3cf1c2f35e883c Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Wed, 3 Jul 2024 14:23:31 +1000 Subject: [PATCH 22/26] refactor: Add duration field to log error with method in RPC --- pkg/server/server.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/server/server.go b/pkg/server/server.go index bb38a905..1fd7883a 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -290,10 +290,13 @@ func (x *Xatu) startGrpcServer(ctx context.Context) error { grpc.ChainUnaryInterceptor( grpc.UnaryServerInterceptor(grpc_prometheus.UnaryServerInterceptor), func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + start := time.Now() + resp, err := handler(ctx, req) if err != nil { x.log. WithField("method", info.FullMethod). + WithField("duration", time.Since(start)). WithError(err). Error("RPC Error") } From 3bbc4b37ff800a0ab7118461d895f4a1dfe169c1 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Wed, 3 Jul 2024 14:30:35 +1000 Subject: [PATCH 23/26] feat: Add github.com/prometheus/client_model v0.6.0 --- go.mod | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index e2d82c5a..5671e3fb 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/huandu/go-sqlbuilder v1.25.0 github.com/jellydator/ttlcache/v3 v3.2.0 + github.com/klauspost/compress v1.17.7 github.com/lib/pq v1.10.9 github.com/libp2p/go-libp2p v0.33.1 github.com/mitchellh/hashstructure/v2 v2.0.2 @@ -31,6 +32,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/probe-lab/hermes v0.0.0-20240327153144-a2528356b4f7 github.com/prometheus/client_golang v1.19.0 + github.com/prometheus/client_model v0.6.0 github.com/prysmaticlabs/prysm/v5 v5.0.2 github.com/r3labs/sse/v2 v2.10.0 github.com/redis/go-redis/v9 v9.5.1 @@ -143,7 +145,6 @@ require ( github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213 // indirect - github.com/klauspost/compress v1.17.7 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect github.com/koron/go-ssdp v0.0.4 // indirect github.com/kr/pretty v0.3.1 // indirect @@ -192,7 +193,6 @@ require ( github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_model v0.6.0 // indirect github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/prysmaticlabs/fastssz v0.0.0-20221107182844-78142813af44 // indirect From 0f34b6def29eac6e057eb672bfdebed5f8ab1552 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Wed, 3 Jul 2024 14:43:45 +1000 Subject: [PATCH 24/26] fix: Correct method name in getAdditionalData function --- pkg/sentry/event/beacon/eth/v1/events_voluntary_exit.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sentry/event/beacon/eth/v1/events_voluntary_exit.go b/pkg/sentry/event/beacon/eth/v1/events_voluntary_exit.go index 9d7f6251..4b282042 100644 --- a/pkg/sentry/event/beacon/eth/v1/events_voluntary_exit.go +++ b/pkg/sentry/event/beacon/eth/v1/events_voluntary_exit.go @@ -102,7 +102,7 @@ func (e *EventsVoluntaryExit) ShouldIgnore(ctx context.Context) (bool, error) { func (e *EventsVoluntaryExit) getAdditionalData(_ context.Context) (*xatu.ClientMeta_AdditionalEthV1EventsVoluntaryExitV2Data, error) { extra := &xatu.ClientMeta_AdditionalEthV1EventsVoluntaryExitV2Data{} - epoch := e.beacon.Metadata().Wallclock().Epochs().FromSlot(uint64(e.event.Message.Epoch)) + epoch := e.beacon.Metadata().Wallclock().Epochs().FromNumber(uint64(e.event.Message.Epoch)) extra.Epoch = &xatu.EpochV2{ Number: &wrapperspb.UInt64Value{Value: epoch.Number()}, From cf4efd1e8590df98efa082516d4349101cb2b46a Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Wed, 3 Jul 2024 15:02:31 +1000 Subject: [PATCH 25/26] fix: Update shipping method to async --- pkg/processor/batch_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/processor/batch_test.go b/pkg/processor/batch_test.go index 40e8c263..98e58ccf 100644 --- a/pkg/processor/batch_test.go +++ b/pkg/processor/batch_test.go @@ -746,7 +746,7 @@ func TestBatchItemProcessorQueueSize(t *testing.T) { WithMaxQueueSize(maxQueueSize), WithMaxExportBatchSize(maxQueueSize), WithWorkers(1), - WithShippingMethod(ShippingMethodSync), + WithShippingMethod(ShippingMethodAsync), WithMetrics(metrics), ) require.NoError(t, err) From b32794fc1fd0192da976c2d4ec330080ee78fb2e Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Wed, 3 Jul 2024 15:16:02 +1000 Subject: [PATCH 26/26] style: update logging level to "info" --- deploy/local/docker-compose/xatu-server.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deploy/local/docker-compose/xatu-server.yaml b/deploy/local/docker-compose/xatu-server.yaml index 1969b6d8..ed36b26d 100644 --- a/deploy/local/docker-compose/xatu-server.yaml +++ b/deploy/local/docker-compose/xatu-server.yaml @@ -1,4 +1,4 @@ -logging: "debug" # panic,fatal,warn,info,debug,trace +logging: "info" # panic,fatal,warn,info,debug,trace addr: ":8080" metricsAddr: ":9090" # pprofAddr: ":6060" # optional. if supplied it enables pprof server