Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/extra metrics #343

Merged
merged 28 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ed56ee9
feat(observability): Additional traces
samcm Jun 27, 2024
f30f96f
feat: Add observability context merging function
samcm Jun 27, 2024
b5eaf32
feat: Add metrics and attributes to batch item processor
samcm Jun 27, 2024
ad7fffe
refactor: Remove unused imports and fields
samcm Jun 27, 2024
c5cd69c
refactor: Add observability to batch item processor
samcm Jun 27, 2024
d6d087a
Merge
samcm Jun 27, 2024
99a2ccb
refactor: Remove redundant code and optimize code structure
samcm Jun 27, 2024
a833e32
feat: Add metrics tracking to batch processor
samcm Jun 27, 2024
1b86183
feat: Add start method for each sink
samcm Jun 27, 2024
741a427
feat: Add method to start processing in Record
samcm Jun 27, 2024
fcbe764
feat: Add worker metrics to BatchItemProcessor
samcm Jun 28, 2024
595e8fe
feat: Add handling time histogram with custom buckets
samcm Jun 28, 2024
5582734
style: Update handling time histogram buckets
samcm Jun 28, 2024
c78dccc
feat: Add option to override shipping method for writing items
samcm Jun 28, 2024
5d8ffb6
feat: Add keepalive parameters to gRPC client configuration
samcm Jun 29, 2024
3409b89
fix: Update MaxConnectionAge to 1 minute
samcm Jun 29, 2024
c4f31c1
refactor: Simplify log field assignment in NewItemExporter
samcm Jun 29, 2024
4ff9775
fix: Update MaxConnectionAge to 10 minutes
samcm Jul 1, 2024
658fc58
Merge master
samcm Jul 1, 2024
7b49ceb
refactor: Remove unnecessary code and simplify Write function
samcm Jul 2, 2024
e2c4088
style: Update maxExportBatchSize to 64 and compression to zstd
samcm Jul 2, 2024
46cd0a2
style: Remove pointer from Tracing in Config struct
samcm Jul 2, 2024
068be06
refactor: Remove unused code and variables
samcm Jul 3, 2024
347101a
refactor: Add duration field to log error with method in RPC
samcm Jul 3, 2024
3bbc4b3
feat: Add github.com/prometheus/client_model v0.6.0
samcm Jul 3, 2024
0f34b6d
fix: Correct method name in getAdditionalData function
samcm Jul 3, 2024
cf4efd1
fix: Update shipping method to async
samcm Jul 3, 2024
b32794f
style: update logging level to "info"
samcm Jul 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,11 @@ datasources:
sslmode: disable
tlsSkipVerify: true
postgresVersion: 1500
database: xatu
database: xatu
- name: Tempo
type: tempo
access: proxy
uid: EbPG8fYoz
url: http://tempo:3200
jsonData:
httpMethod: GET
59 changes: 59 additions & 0 deletions deploy/local/docker-compose/tempo.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
stream_over_http_enabled: true
server:
http_listen_port: 3200
log_level: info

query_frontend:
search:
duration_slo: 5s
throughput_bytes_slo: 1.073741824e+09
trace_by_id:
duration_slo: 5s

distributor:
receivers: # this configuration will listen on all ports and protocols that tempo is capable of.
jaeger: # the receives all come from the OpenTelemetry collector. more configuration information can
protocols: # be found there: https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver
thrift_http: #
grpc: # for a production deployment you should only enable the receivers you need!
thrift_binary:
thrift_compact:
zipkin:
otlp:
protocols:
http:
grpc:
opencensus:

ingester:
max_block_duration: 5m # cut the headblock when this much time passes. this is being set for demo purposes and should probably be left alone normally

compactor:
compaction:
block_retention: 15m # overall Tempo trace retention. set for demo purposes

metrics_generator:
registry:
external_labels:
source: tempo
cluster: docker-compose
storage:
path: /var/tempo/generator/wal
remote_write:
- url: http://prometheus:9090/api/v1/write
send_exemplars: true
traces_storage:
path: /var/tempo/generator/traces

storage:
trace:
backend: local # backend configuration to use
wal:
path: /var/tempo/wal # where to store the wal locally
local:
path: /var/tempo/blocks

overrides:
defaults:
metrics_generator:
processors: [service-graphs, span-metrics, local-blocks] # enables metrics generator
10 changes: 9 additions & 1 deletion deploy/local/docker-compose/xatu-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ store:
geoip:
enabled: false

tracing:
enabled: true
endpoint: tempo:4318
insecure: true
sampling:
rate: 0.1

services:
coordinator:
enabled: true # requires persistence to be enabled
Expand All @@ -44,6 +51,7 @@ services:
maxQueueSize: 102400
batchTimeout: 3s
exportTimeout: 30s
maxExportBatchSize: 5000
maxExportBatchSize: 64
compression: zstd
keepAlive: true
workers: 50
35 changes: 35 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,39 @@ services:
condition: service_healthy
networks:
- xatu-net
tempo-init:
image: &tempoImage grafana/tempo:latest
user: root
entrypoint:
- "chown"
- "10001:10001"
- "/var/tempo"
volumes:
- tempo-data:/var/tempo

tempo:
image: *tempoImage
container_name: xatu-tempo
command: [ "-config.file=/etc/tempo.yaml" ]
volumes:
- ./deploy/local/docker-compose/tempo.yaml:/etc/tempo.yaml
- tempo-data:/var/tempo
ports:
- "14268:14268" # jaeger ingest
- "3200:3200" # tempo
- "9095:9095" # tempo grpc
- "4317:4317" # otlp grpc
- "4318:4318" # otlp http
- "9411:9411" # zipkin
networks:
- xatu-net
depends_on:
- tempo-init
healthcheck:
test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:3200/ready"]
interval: 5s
timeout: 5s
retries: 5

networks:
xatu-net:
Expand Down Expand Up @@ -506,3 +539,5 @@ volumes:
driver: local
prometheus-data:
driver: local
tempo-data:
driver: local
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/huandu/go-sqlbuilder v1.25.0
github.com/jellydator/ttlcache/v3 v3.2.0
github.com/klauspost/compress v1.17.7
github.com/lib/pq v1.10.9
github.com/libp2p/go-libp2p v0.33.1
github.com/mitchellh/hashstructure/v2 v2.0.2
Expand All @@ -31,6 +32,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/probe-lab/hermes v0.0.0-20240327153144-a2528356b4f7
github.com/prometheus/client_golang v1.19.0
github.com/prometheus/client_model v0.6.0
github.com/prysmaticlabs/prysm/v5 v5.0.2
github.com/r3labs/sse/v2 v2.10.0
github.com/redis/go-redis/v9 v9.5.1
Expand Down Expand Up @@ -143,7 +145,6 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/kr/pretty v0.3.1 // indirect
Expand Down Expand Up @@ -192,7 +193,6 @@ require (
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.6.0 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/prysmaticlabs/fastssz v0.0.0-20221107182844-78142813af44 // indirect
Expand Down
8 changes: 7 additions & 1 deletion pkg/cannon/cannon.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
perrors "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/sdk/trace"
)

type Cannon struct {
Expand Down Expand Up @@ -108,9 +109,14 @@ func (c *Cannon) Start(ctx context.Context) error {
return perrors.Wrap(err, "failed to create tracing resource")
}

opts := []trace.TracerProviderOption{
trace.WithSampler(trace.ParentBased(trace.TraceIDRatioBased(c.Config.Tracing.Sampling.Rate))),
}

tracer, err := observability.NewHTTPTraceProvider(ctx,
res,
c.Config.Tracing.AsOTelOpts()...,
c.Config.Tracing.AsOTelOpts(),
opts...,
)
if err != nil {
return perrors.Wrap(err, "failed to create tracing provider")
Expand Down
9 changes: 0 additions & 9 deletions pkg/cannon/deriver/beacon/eth/v1/beacon_validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"time"

client "github.com/attestantio/go-eth2-client"
apiv1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/phase0"
backoff "github.com/cenkalti/backoff/v4"
Expand Down Expand Up @@ -295,14 +294,6 @@ func (b *BeaconValidatorsDeriver) processEpoch(ctx context.Context, epoch phase0
return allEvents, boundarySlot, nil
}

func (b *BeaconValidatorsDeriver) getValidatorsClient(ctx context.Context) (client.ValidatorsProvider, error) {
if provider, isProvider := b.beacon.Node().Service().(client.ValidatorsProvider); isProvider {
return provider, nil
}

return nil, errors.New("validator states client not found")
}

func (b *BeaconValidatorsDeriver) createEventFromValidators(ctx context.Context, validators []*apiv1.Validator, epoch phase0.Epoch) (*xatu.DecoratedEvent, error) {
metadata, ok := proto.Clone(b.clientMeta).(*xatu.ClientMeta)
if !ok {
Expand Down
5 changes: 5 additions & 0 deletions pkg/observability/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
)

type SamplingConfig struct {
Rate float64 `yaml:"rate" default:"0.01"`
}

type TracingConfig struct {
Enabled bool `yaml:"enabled" default:"false"`
Endpoint string `yaml:"endpoint" default:""`
Expand All @@ -17,6 +21,7 @@ type TracingConfig struct {
Insecure bool `yaml:"insecure" default:"false"`
Retry *otlptracehttp.RetryConfig `yaml:"retry"`
TLS *tls.Config `yaml:"tls"`
Sampling SamplingConfig `yaml:"sampling"`
}

func (t *TracingConfig) Validate() error {
Expand Down
34 changes: 34 additions & 0 deletions pkg/observability/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package observability

import (
"context"

"go.opentelemetry.io/otel/trace"
)

func MergeContexts(parent context.Context, contexts ...context.Context) context.Context {
spanContexts := make([]trace.SpanContext, 0, len(contexts))

for _, ctx := range contexts {
if spanContext := trace.SpanContextFromContext(ctx); spanContext.IsValid() {
spanContexts = append(spanContexts, spanContext)
}
}

if len(spanContexts) == 0 {
return parent
}

// Create a new span that links to all the original spans
tracer := Tracer()

links := make([]trace.Link, len(spanContexts))
for i, spanContext := range spanContexts {
links[i] = trace.Link{SpanContext: spanContext}
}

ctx, span := tracer.Start(parent, "Observability.MergedSpan", trace.WithLinks(links...))
defer span.End()

return ctx
}
10 changes: 6 additions & 4 deletions pkg/observability/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,20 @@ func NewResource(serviceName, serviceVersion string) (*resource.Resource, error)
)
}

func NewHTTPTraceProvider(ctx context.Context, res *resource.Resource, opts ...otlptracehttp.Option) (*trace.TracerProvider, error) {
client := otlptracehttp.NewClient(opts...)
func NewHTTPTraceProvider(ctx context.Context, res *resource.Resource, httpOpts []otlptracehttp.Option, opts ...trace.TracerProviderOption) (*trace.TracerProvider, error) {
client := otlptracehttp.NewClient(httpOpts...)

exporter, err := otlptrace.New(ctx, client)
if err != nil {
return nil, fmt.Errorf("creating OTLP trace exporter: %w", err)
}

traceProvider := trace.NewTracerProvider(
options := append([]trace.TracerProviderOption{
trace.WithBatcher(exporter),
trace.WithResource(res),
)
}, opts...)

traceProvider := trace.NewTracerProvider(options...)

return traceProvider, nil
}
7 changes: 6 additions & 1 deletion pkg/output/http/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,19 @@ type ItemExporter struct {
}

func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemExporter, error) {
log = log.WithField("output_name", name).WithField("output_type", SinkType)

t := http.DefaultTransport.(*http.Transport).Clone()

if config.KeepAlive != nil && !*config.KeepAlive {
log.WithField("keep_alive", *config.KeepAlive).Warn("Disabling keep-alives")

t.DisableKeepAlives = true
}

return ItemExporter{
config: config,
log: log.WithField("output_name", name).WithField("output_type", SinkType),
log: log,

client: &http.Client{
Transport: t,
Expand Down
6 changes: 6 additions & 0 deletions pkg/output/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func (h *HTTP) Type() string {
}

func (h *HTTP) Start(ctx context.Context) error {
h.proc.Start(ctx)

return nil
}

Expand Down Expand Up @@ -104,5 +106,9 @@ func (h *HTTP) HandleNewDecoratedEvents(ctx context.Context, events []*xatu.Deco
}
}

if len(filtered) == 0 {
return nil
}

return h.proc.Write(ctx, filtered)
}
2 changes: 2 additions & 0 deletions pkg/output/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func (h *Kafka) Type() string {
}

func (h *Kafka) Start(ctx context.Context) error {
h.proc.Start(ctx)

return nil
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/output/stdout/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func (h *StdOut) Type() string {
}

func (h *StdOut) Start(ctx context.Context) error {
h.proc.Start(ctx)

return nil
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/output/xatu/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"time"

"github.com/ethpandaops/xatu/pkg/observability"
pb "github.com/ethpandaops/xatu/pkg/proto/xatu"
Expand All @@ -18,6 +19,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
)

Expand All @@ -33,6 +35,11 @@ func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemE
opts := []grpc.DialOption{
grpc.WithChainUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor, retry.UnaryClientInterceptor()),
grpc.WithChainStreamInterceptor(grpc_prometheus.StreamClientInterceptor, retry.StreamClientInterceptor()),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 30 * time.Second,
PermitWithoutStream: true,
}),
}

if config.TLS {
Expand Down
2 changes: 2 additions & 0 deletions pkg/output/xatu/xatu.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ func (h *Xatu) Name() string {
}

func (h *Xatu) Start(ctx context.Context) error {
h.proc.Start(ctx)

return nil
}

Expand Down
Loading
Loading