Skip to content

Commit

Permalink
feat(cannon): Additional tracing (#223)
Browse files Browse the repository at this point in the history
* feat(cannon): Add network to root tracing span

* Add processor tracing
  • Loading branch information
samcm authored Oct 3, 2023
1 parent 5abfb4f commit 5b81903
Show file tree
Hide file tree
Showing 12 changed files with 64 additions and 8 deletions.
6 changes: 5 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ func (a *AttesterSlashingDeriver) run(rctx context.Context) {
return
default:
operation := func() error {
ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", a.Name()))
ctx, span := observability.Tracer().Start(rctx,
fmt.Sprintf("Derive %s", a.Name()),
trace.WithAttributes(
attribute.String("network", string(a.beacon.Metadata().Network.Name))),
)
defer span.End()

time.Sleep(100 * time.Millisecond)
Expand Down
5 changes: 4 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/beacon_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ func (b *BeaconBlockDeriver) run(rctx context.Context) {
return
default:
operation := func() error {
ctx, span := tracer.Start(rctx, fmt.Sprintf("Derive %s", b.Name()))
ctx, span := tracer.Start(rctx, fmt.Sprintf("Derive %s", b.Name()),
trace.WithAttributes(
attribute.String("network", string(b.beacon.Metadata().Network.Name))),
)
defer span.End()

time.Sleep(100 * time.Millisecond)
Expand Down
5 changes: 4 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/bls_to_execution_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ func (b *BLSToExecutionChangeDeriver) run(rctx context.Context) {
return
default:
operation := func() error {
ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name()))
ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name()),
trace.WithAttributes(
attribute.String("network", string(b.beacon.Metadata().Network.Name))),
)
defer span.End()

time.Sleep(100 * time.Millisecond)
Expand Down
5 changes: 4 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/deposit.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ func (b *DepositDeriver) run(rctx context.Context) {
return
default:
operation := func() error {
ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name()))
ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name()),
trace.WithAttributes(
attribute.String("network", string(b.beacon.Metadata().Network.Name))),
)
defer span.End()

time.Sleep(100 * time.Millisecond)
Expand Down
5 changes: 4 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ func (b *ExecutionTransactionDeriver) run(rctx context.Context) {
return
default:
operation := func() error {
ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name()))
ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name()),
trace.WithAttributes(
attribute.String("network", string(b.beacon.Metadata().Network.Name))),
)
defer span.End()

time.Sleep(100 * time.Millisecond)
Expand Down
5 changes: 4 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/proposer_slashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ func (b *ProposerSlashingDeriver) run(rctx context.Context) {
return
default:
operation := func() error {
ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name()))
ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name()),
trace.WithAttributes(
attribute.String("network", string(b.beacon.Metadata().Network.Name))),
)
defer span.End()

time.Sleep(100 * time.Millisecond)
Expand Down
5 changes: 4 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/voluntary_exit.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ func (b *VoluntaryExitDeriver) run(rctx context.Context) {
return
default:
operation := func() error {
ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name()))
ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name()),
trace.WithAttributes(
attribute.String("network", string(b.beacon.Metadata().Network.Name))),
)
defer span.End()

time.Sleep(100 * time.Millisecond)
Expand Down
5 changes: 4 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/withdrawal.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ func (b *WithdrawalDeriver) run(rctx context.Context) {
return
default:
operation := func() error {
ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name()))
ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name()),
trace.WithAttributes(
attribute.String("network", string(b.beacon.Metadata().Network.Name))),
)
defer span.End()

time.Sleep(100 * time.Millisecond)
Expand Down
9 changes: 9 additions & 0 deletions pkg/output/http/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ import (
"io"
"net/http"

"github.com/ethpandaops/xatu/pkg/observability"
"github.com/ethpandaops/xatu/pkg/proto/xatu"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/encoding/protojson"
)

Expand Down Expand Up @@ -38,6 +42,9 @@ func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemE
}

func (e ItemExporter) ExportItems(ctx context.Context, items []*xatu.DecoratedEvent) error {
_, span := observability.Tracer().Start(ctx, "HTTPItemExporter.ExportItems", trace.WithAttributes(attribute.Int64("num_events", int64(len(items)))))
defer span.End()

e.log.WithField("events", len(items)).Debug("Sending batch of events to HTTP sink")

if err := e.sendUpstream(ctx, items); err != nil {
Expand All @@ -46,6 +53,8 @@ func (e ItemExporter) ExportItems(ctx context.Context, items []*xatu.DecoratedEv
WithField("num_events", len(items)).
Error("Failed to send events upstream")

span.SetStatus(codes.Error, err.Error())

return err
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/output/stdout/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package stdout
import (
"context"

"github.com/ethpandaops/xatu/pkg/observability"
"github.com/ethpandaops/xatu/pkg/proto/xatu"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/encoding/protojson"
)

Expand All @@ -21,6 +24,9 @@ func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemE
}

func (e ItemExporter) ExportItems(ctx context.Context, items []*xatu.DecoratedEvent) error {
_, span := observability.Tracer().Start(ctx, "StdOutItemExporter.ExportItems", trace.WithAttributes(attribute.Int64("num_events", int64(len(items)))))
defer span.End()

e.log.WithField("events", len(items)).Debug("Sending batch of events to stdout sink")

if err := e.sendUpstream(ctx, items); err != nil {
Expand Down
9 changes: 9 additions & 0 deletions pkg/output/xatu/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ import (
"fmt"
"net"

"github.com/ethpandaops/xatu/pkg/observability"
pb "github.com/ethpandaops/xatu/pkg/proto/xatu"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -50,6 +54,9 @@ func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemE
}

func (e ItemExporter) ExportItems(ctx context.Context, items []*pb.DecoratedEvent) error {
_, span := observability.Tracer().Start(ctx, "XatuItemExporter.ExportItems", trace.WithAttributes(attribute.Int64("num_events", int64(len(items)))))
defer span.End()

e.log.WithField("events", len(items)).Debug("Sending batch of events to xatu sink")

if err := e.sendUpstream(ctx, items); err != nil {
Expand All @@ -58,6 +65,8 @@ func (e ItemExporter) ExportItems(ctx context.Context, items []*pb.DecoratedEven
WithField("num_events", len(items)).
Error("Failed to send events upstream")

span.SetStatus(codes.Error, err.Error())

return err
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/processor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sync/atomic"
"time"

"github.com/ethpandaops/xatu/pkg/observability"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -162,6 +163,9 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log

// OnEnd method enqueues a item for later processing.
func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error {
_, span := observability.Tracer().Start(ctx, "BatchItemProcessor.Write")
defer span.End()

bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue)))

if bvp.o.ShippingMethod == ShippingMethodSync {
Expand All @@ -185,6 +189,9 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error {
// ImmediatelyExportItems immediately exports the items to the exporter.
// Useful for propogating errors from the exporter.
func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, items []*T) error {
_, span := observability.Tracer().Start(ctx, "BatchItemProcessor.ImmediatelyExportItems")
defer span.End()

bvp.batchMutex.Lock()
defer bvp.batchMutex.Unlock()

Expand Down

0 comments on commit 5b81903

Please sign in to comment.