diff --git a/.gitignore b/.gitignore index 8c6d45a3..4ea2b92c 100644 --- a/.gitignore +++ b/.gitignore @@ -6,5 +6,5 @@ cannon.yaml dist GeoLite2-ASN.mmdb GeoLite2-City.mmdb -__debug_bin +__debug_bin* .vscode/launch.json diff --git a/Dockerfile b/Dockerfile index 5267680b..3ddc7725 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.19 AS builder +FROM golang:1.20 AS builder WORKDIR /src COPY go.sum go.mod ./ RUN go mod download diff --git a/go.mod b/go.mod index 1314fd72..0030d49b 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ethpandaops/xatu -go 1.19 +go 1.20 replace github.com/attestantio/go-eth2-client v0.16.3 => github.com/samcm/go-eth2-client v0.15.12 @@ -29,9 +29,9 @@ require ( github.com/redis/go-redis/v9 v9.0.4 github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.7.0 - github.com/stretchr/testify v1.8.2 + github.com/stretchr/testify v1.8.4 golang.org/x/sync v0.3.0 - google.golang.org/grpc v1.55.0 + google.golang.org/grpc v1.58.2 google.golang.org/protobuf v1.31.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -54,6 +54,8 @@ require ( github.com/fatih/color v1.14.1 // indirect github.com/ferranbt/fastssz v0.1.3 // indirect github.com/getsentry/sentry-go v0.23.0 // indirect + github.com/go-logr/logr v1.2.4 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-stack/stack v1.8.1 // indirect github.com/goccy/go-yaml v1.9.8 // indirect @@ -62,6 +64,7 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect github.com/gorilla/websocket v1.5.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect github.com/holiman/bloomfilter/v2 v2.0.3 // indirect github.com/holiman/uint256 v1.2.3 // indirect github.com/huandu/xstrings v1.4.0 // indirect @@ -107,14 +110,23 @@ require ( github.com/tklauser/go-sysconf v0.3.11 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.3 // indirect + go.opentelemetry.io/otel v1.19.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 // indirect + go.opentelemetry.io/otel/metric v1.19.0 // indirect + go.opentelemetry.io/otel/sdk v1.19.0 // indirect + go.opentelemetry.io/otel/trace v1.19.0 // indirect + go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.uber.org/atomic v1.10.0 // indirect golang.org/x/crypto v0.11.0 // indirect golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b // indirect - golang.org/x/net v0.10.0 // indirect - golang.org/x/sys v0.10.0 // indirect + golang.org/x/net v0.12.0 // indirect + golang.org/x/sys v0.12.0 // indirect golang.org/x/text v0.11.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect - google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect + google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 4d0965f1..6855f8e8 100644 --- a/go.sum +++ b/go.sum @@ -135,6 +135,11 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= @@ -220,6 +225,8 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao= @@ -432,6 +439,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM= @@ -453,6 +462,20 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs= +go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 h1:Mne5On7VWdx7omSrSSZvM4Kw7cS7NQkOOmLcgscI51U= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0/go.mod h1:IPtUMKL4O3tH5y+iXVyAXqpAwMuzC1IrxVS81rummfE= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU= +go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE= +go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8= +go.opentelemetry.io/otel/sdk v1.19.0 h1:6USY6zH+L8uMH8L3t1enZPR3WFEmSTADlqldyHtJi3o= +go.opentelemetry.io/otel/sdk v1.19.0/go.mod h1:NedEbbS4w3C6zElbLdPJKOpJQOrGUJ+GfzpjUvI0v1A= +go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg= +go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= +go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= +go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= @@ -544,6 +567,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= +golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -628,6 +653,8 @@ golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -750,6 +777,12 @@ google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 h1:DdoeryqhaXp1LtT/emMP1BRJPHHKFi5akj/nbx/zNTA= google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s= +google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98 h1:Z0hjGZePRE0ZBWotvtrwxFNrNE9CUAGtplaDK5NNI/g= +google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98/go.mod h1:S7mY02OqCJTD0E1OiQy1F72PWFB4bZJ87cAtLPYgDR0= +google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 h1:FmF5cCW94Ij59cfpoLiwTgodWmm60eEV0CjlsVg2fuw= +google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98/go.mod h1:rsr7RhLuwsDKL7RmgDDCUc6yaGr1iqceVb5Wv6f6YvQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 h1:bVf09lpb+OJbByTj913DRJioFFAjf/ZGxEz7MajTp2U= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -764,6 +797,8 @@ google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag= google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8= +google.golang.org/grpc v1.58.2 h1:SXUpjxeVF3FKrTYQI4f4KvbGD5u2xccdYdurwowix5I= +google.golang.org/grpc v1.58.2/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/pkg/cannon/cannon.go b/pkg/cannon/cannon.go index 6ecffde2..8cefdd56 100644 --- a/pkg/cannon/cannon.go +++ b/pkg/cannon/cannon.go @@ -20,6 +20,7 @@ import ( v2 "github.com/ethpandaops/xatu/pkg/cannon/deriver/beacon/eth/v2" "github.com/ethpandaops/xatu/pkg/cannon/ethereum" "github.com/ethpandaops/xatu/pkg/cannon/iterator" + "github.com/ethpandaops/xatu/pkg/observability" "github.com/ethpandaops/xatu/pkg/output" "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/go-co-op/gocron" @@ -49,6 +50,8 @@ type Cannon struct { eventDerivers []deriver.EventDeriver coordinatorClient *coordinator.Client + + shutdownFuncs []func(ctx context.Context) error } func New(ctx context.Context, log logrus.FieldLogger, config *Config) (*Cannon, error) { @@ -86,10 +89,36 @@ func New(ctx context.Context, log logrus.FieldLogger, config *Config) (*Cannon, scheduler: gocron.NewScheduler(time.Local), eventDerivers: nil, // Derivers are created once the beacon node is ready coordinatorClient: coordinatorClient, + shutdownFuncs: []func(ctx context.Context) error{}, }, nil } func (c *Cannon) Start(ctx context.Context) error { + // Start tracing if enabled + if c.Config.Tracing.Enabled { + c.log.Info("Tracing enabled") + + res, err := observability.NewResource(xatu.WithMode(xatu.ModeCannon), xatu.Short()) + if err != nil { + return perrors.Wrap(err, "failed to create tracing resource") + } + + tracer, err := observability.NewHTTPTraceProvider(ctx, + res, + c.Config.Tracing.AsOTelOpts()..., + ) + if err != nil { + return perrors.Wrap(err, "failed to create tracing provider") + } + + shutdown, err := observability.SetupOTelSDK(ctx, tracer) + if err != nil { + return perrors.Wrap(err, "failed to setup tracing SDK") + } + + c.shutdownFuncs = append(c.shutdownFuncs, shutdown) + } + if err := c.ServeMetrics(ctx); err != nil { return err } @@ -133,7 +162,15 @@ func (c *Cannon) Start(ctx context.Context) error { sig := <-cancel c.log.Printf("Caught signal: %v", sig) - c.log.Printf("Flushing sinks") + if err := c.Shutdown(ctx); err != nil { + return err + } + + return nil +} + +func (c *Cannon) Shutdown(ctx context.Context) error { + c.log.Printf("Shutting down") for _, sink := range c.sinks { if err := sink.Stop(ctx); err != nil { @@ -141,6 +178,20 @@ func (c *Cannon) Start(ctx context.Context) error { } } + for _, fun := range c.shutdownFuncs { + if err := fun(ctx); err != nil { + return err + } + } + + c.scheduler.Stop() + + for _, deriver := range c.eventDerivers { + if err := deriver.Stop(ctx); err != nil { + return err + } + } + return nil } diff --git a/pkg/cannon/config.go b/pkg/cannon/config.go index 26c760cb..8382c101 100644 --- a/pkg/cannon/config.go +++ b/pkg/cannon/config.go @@ -7,6 +7,7 @@ import ( "github.com/ethpandaops/xatu/pkg/cannon/coordinator" "github.com/ethpandaops/xatu/pkg/cannon/deriver" "github.com/ethpandaops/xatu/pkg/cannon/ethereum" + "github.com/ethpandaops/xatu/pkg/observability" "github.com/ethpandaops/xatu/pkg/output" "github.com/ethpandaops/xatu/pkg/processor" "github.com/sirupsen/logrus" @@ -37,6 +38,9 @@ type Config struct { // Coordinator configuration Coordinator coordinator.Config `yaml:"coordinator"` + + // Tracing configuration + Tracing observability.TracingConfig `yaml:"tracing"` } func (c *Config) Validate() error { @@ -62,6 +66,10 @@ func (c *Config) Validate() error { return fmt.Errorf("invalid coordinator config: %w", err) } + if err := c.Tracing.Validate(); err != nil { + return fmt.Errorf("invalid tracing config: %w", err) + } + return nil } diff --git a/pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go b/pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go index 1b126575..40f8a7c1 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go +++ b/pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go @@ -2,6 +2,7 @@ package v2 import ( "context" + "fmt" "time" "github.com/attestantio/go-eth2-client/spec" @@ -9,11 +10,14 @@ import ( backoff "github.com/cenkalti/backoff/v4" "github.com/ethpandaops/xatu/pkg/cannon/ethereum" "github.com/ethpandaops/xatu/pkg/cannon/iterator" + "github.com/ethpandaops/xatu/pkg/observability" xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1" "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/google/uuid" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -82,16 +86,19 @@ func (a *AttesterSlashingDeriver) Stop(ctx context.Context) error { return nil } -func (a *AttesterSlashingDeriver) run(ctx context.Context) { +func (a *AttesterSlashingDeriver) run(rctx context.Context) { bo := backoff.NewExponentialBackOff() bo.MaxInterval = 3 * time.Minute for { select { - case <-ctx.Done(): + case <-rctx.Done(): return default: operation := func() error { + ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", a.Name())) + defer span.End() + time.Sleep(100 * time.Millisecond) if err := a.beacon.Synced(ctx); err != nil { @@ -149,6 +156,11 @@ func (a *AttesterSlashingDeriver) run(ctx context.Context) { // lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required. func (a *AttesterSlashingDeriver) lookAheadAtLocations(ctx context.Context, locations []*xatu.CannonLocation) { + _, span := observability.Tracer().Start(ctx, + "AttesterSlashingDeriver.lookAheadAtLocations", + ) + defer span.End() + if locations == nil { return } @@ -174,6 +186,12 @@ func (a *AttesterSlashingDeriver) lookAheadAtLocations(ctx context.Context, loca } func (a *AttesterSlashingDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "AttesterSlashingDeriver.processEpoch", + trace.WithAttributes(attribute.Int64("epoch", int64(epoch))), + ) + defer span.End() + sp, err := a.beacon.Node().Spec() if err != nil { return nil, errors.Wrap(err, "failed to obtain spec") @@ -196,6 +214,12 @@ func (a *AttesterSlashingDeriver) processEpoch(ctx context.Context, epoch phase0 } func (a *AttesterSlashingDeriver) processSlot(ctx context.Context, slot phase0.Slot) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "AttesterSlashingDeriver.processSlot", + trace.WithAttributes(attribute.Int64("slot", int64(slot))), + ) + defer span.End() + // Get the block block, err := a.beacon.GetBeaconBlock(ctx, xatuethv1.SlotAsString(slot)) if err != nil { diff --git a/pkg/cannon/deriver/beacon/eth/v2/beacon_block.go b/pkg/cannon/deriver/beacon/eth/v2/beacon_block.go index 71e722c7..7fb92c0f 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/beacon_block.go +++ b/pkg/cannon/deriver/beacon/eth/v2/beacon_block.go @@ -11,6 +11,7 @@ import ( backoff "github.com/cenkalti/backoff/v4" "github.com/ethpandaops/xatu/pkg/cannon/ethereum" "github.com/ethpandaops/xatu/pkg/cannon/iterator" + "github.com/ethpandaops/xatu/pkg/observability" "github.com/ethpandaops/xatu/pkg/proto/eth" xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1" xatuethv2 "github.com/ethpandaops/xatu/pkg/proto/eth/v2" @@ -19,6 +20,9 @@ import ( "github.com/google/uuid" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -87,57 +91,88 @@ func (b *BeaconBlockDeriver) Stop(ctx context.Context) error { return nil } -func (b *BeaconBlockDeriver) run(ctx context.Context) { +func (b *BeaconBlockDeriver) run(rctx context.Context) { bo := backoff.NewExponentialBackOff() bo.MaxInterval = 3 * time.Minute + tracer := observability.Tracer() + for { select { - case <-ctx.Done(): + case <-rctx.Done(): return default: operation := func() error { + ctx, span := tracer.Start(rctx, fmt.Sprintf("Derive %s", b.Name())) + defer span.End() + time.Sleep(100 * time.Millisecond) + span.AddEvent("Checking if beacon node is synced") + if err := b.beacon.Synced(ctx); err != nil { + span.SetStatus(codes.Error, err.Error()) + return err } + span.AddEvent("Grabbing next location") + // Get the next slot location, lookAhead, err := b.iterator.Next(ctx) if err != nil { + span.SetStatus(codes.Error, err.Error()) + return err } + span.AddEvent("Obtained next location, looking ahead...", trace.WithAttributes(attribute.Int64("location", int64(location.GetEthV2BeaconBlock().GetEpoch())))) + // Look ahead b.lookAheadAtLocation(ctx, lookAhead) + span.AddEvent("Look ahead complete. Firing location callbacks...") + for _, fn := range b.onLocationCallbacks { if errr := fn(ctx, location.GetEthV2BeaconBlock().GetEpoch()); errr != nil { b.log.WithError(errr).Error("Failed to send location") } } + span.AddEvent("Location callbacks complete. Processing epoch...") + // Process the epoch events, err := b.processEpoch(ctx, phase0.Epoch(location.GetEthV2BeaconBlock().GetEpoch())) if err != nil { b.log.WithError(err).Error("Failed to process epoch") + span.SetStatus(codes.Error, err.Error()) + return err } + span.AddEvent("Epoch processing complete. Sending events...") + // Send the events for _, fn := range b.onEventsCallbacks { if err := fn(ctx, events); err != nil { + span.SetStatus(codes.Error, err.Error()) + return errors.Wrap(err, "failed to send events") } } + span.AddEvent("Events sent. Updating location...") + // Update our location if err := b.iterator.UpdateLocation(ctx, location); err != nil { + span.SetStatus(codes.Error, err.Error()) + return err } + span.AddEvent("Location updated. Done.") + bo.Reset() return nil @@ -152,6 +187,11 @@ func (b *BeaconBlockDeriver) run(ctx context.Context) { // lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required. func (b *BeaconBlockDeriver) lookAheadAtLocation(ctx context.Context, locations []*xatu.CannonLocation) { + _, span := observability.Tracer().Start(ctx, + "BeaconBlockDeriver.lookAheadAtLocations", + ) + defer span.End() + if locations == nil { return } @@ -177,6 +217,12 @@ func (b *BeaconBlockDeriver) lookAheadAtLocation(ctx context.Context, locations } func (b *BeaconBlockDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "BeaconBlockDeriver.processEpoch", + trace.WithAttributes(attribute.Int64("epoch", int64(epoch))), + ) + defer span.End() + sp, err := b.beacon.Node().Spec() if err != nil { return nil, errors.Wrap(err, "failed to obtain spec") @@ -199,6 +245,12 @@ func (b *BeaconBlockDeriver) processEpoch(ctx context.Context, epoch phase0.Epoc } func (b *BeaconBlockDeriver) processSlot(ctx context.Context, slot phase0.Slot) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "BeaconBlockDeriver.processSlot", + trace.WithAttributes(attribute.Int64("slot", int64(slot))), + ) + defer span.End() + // Get the block block, err := b.beacon.GetBeaconBlock(ctx, xatuethv1.SlotAsString(slot)) if err != nil { diff --git a/pkg/cannon/deriver/beacon/eth/v2/bls_to_execution_change.go b/pkg/cannon/deriver/beacon/eth/v2/bls_to_execution_change.go index 677ba471..dd77c75a 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/bls_to_execution_change.go +++ b/pkg/cannon/deriver/beacon/eth/v2/bls_to_execution_change.go @@ -10,11 +10,14 @@ import ( backoff "github.com/cenkalti/backoff/v4" "github.com/ethpandaops/xatu/pkg/cannon/ethereum" "github.com/ethpandaops/xatu/pkg/cannon/iterator" + "github.com/ethpandaops/xatu/pkg/observability" xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1" xatuethv2 "github.com/ethpandaops/xatu/pkg/proto/eth/v2" "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/google/uuid" "github.com/pkg/errors" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" @@ -85,16 +88,19 @@ func (b *BLSToExecutionChangeDeriver) Stop(ctx context.Context) error { return nil } -func (b *BLSToExecutionChangeDeriver) run(ctx context.Context) { +func (b *BLSToExecutionChangeDeriver) run(rctx context.Context) { bo := backoff.NewExponentialBackOff() bo.MaxInterval = 3 * time.Minute for { select { - case <-ctx.Done(): + case <-rctx.Done(): return default: operation := func() error { + ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name())) + defer span.End() + time.Sleep(100 * time.Millisecond) if err := b.beacon.Synced(ctx); err != nil { @@ -152,6 +158,11 @@ func (b *BLSToExecutionChangeDeriver) run(ctx context.Context) { // lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required. func (b *BLSToExecutionChangeDeriver) lookAheadAtLocation(ctx context.Context, locations []*xatu.CannonLocation) { + _, span := observability.Tracer().Start(ctx, + "BLSToExecutionChangeDeriver.lookAheadAtLocations", + ) + defer span.End() + if locations == nil { return } @@ -177,6 +188,12 @@ func (b *BLSToExecutionChangeDeriver) lookAheadAtLocation(ctx context.Context, l } func (b *BLSToExecutionChangeDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "BLSToExecutionChangeDeriver.processEpoch", + trace.WithAttributes(attribute.Int64("epoch", int64(epoch))), + ) + defer span.End() + sp, err := b.beacon.Node().Spec() if err != nil { return nil, errors.Wrap(err, "failed to obtain spec") @@ -199,6 +216,12 @@ func (b *BLSToExecutionChangeDeriver) processEpoch(ctx context.Context, epoch ph } func (b *BLSToExecutionChangeDeriver) processSlot(ctx context.Context, slot phase0.Slot) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "BLSToExecutionChangeDeriver.processSlot", + trace.WithAttributes(attribute.Int64("slot", int64(slot))), + ) + defer span.End() + // Get the block block, err := b.beacon.GetBeaconBlock(ctx, xatuethv1.SlotAsString(slot)) if err != nil { diff --git a/pkg/cannon/deriver/beacon/eth/v2/deposit.go b/pkg/cannon/deriver/beacon/eth/v2/deposit.go index bf1e7a13..38ae5d77 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/deposit.go +++ b/pkg/cannon/deriver/beacon/eth/v2/deposit.go @@ -10,11 +10,14 @@ import ( backoff "github.com/cenkalti/backoff/v4" "github.com/ethpandaops/xatu/pkg/cannon/ethereum" "github.com/ethpandaops/xatu/pkg/cannon/iterator" + "github.com/ethpandaops/xatu/pkg/observability" xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1" "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/google/uuid" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -83,16 +86,19 @@ func (b *DepositDeriver) Stop(ctx context.Context) error { return nil } -func (b *DepositDeriver) run(ctx context.Context) { +func (b *DepositDeriver) run(rctx context.Context) { bo := backoff.NewExponentialBackOff() bo.MaxInterval = 3 * time.Minute for { select { - case <-ctx.Done(): + case <-rctx.Done(): return default: operation := func() error { + ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name())) + defer span.End() + time.Sleep(100 * time.Millisecond) if err := b.beacon.Synced(ctx); err != nil { @@ -150,6 +156,11 @@ func (b *DepositDeriver) run(ctx context.Context) { // lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required. func (b *DepositDeriver) lookAheadAtLocation(ctx context.Context, locations []*xatu.CannonLocation) { + _, span := observability.Tracer().Start(ctx, + "DepositDeriver.lookAheadAtLocations", + ) + defer span.End() + if locations == nil { return } @@ -175,6 +186,12 @@ func (b *DepositDeriver) lookAheadAtLocation(ctx context.Context, locations []*x } func (b *DepositDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "DepositDeriver.processEpoch", + trace.WithAttributes(attribute.Int64("epoch", int64(epoch))), + ) + defer span.End() + sp, err := b.beacon.Node().Spec() if err != nil { return nil, errors.Wrap(err, "failed to obtain spec") @@ -197,6 +214,12 @@ func (b *DepositDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ( } func (b *DepositDeriver) processSlot(ctx context.Context, slot phase0.Slot) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "DepositDeriver.processSlot", + trace.WithAttributes(attribute.Int64("slot", int64(slot))), + ) + defer span.End() + // Get the block block, err := b.beacon.GetBeaconBlock(ctx, xatuethv1.SlotAsString(slot)) if err != nil { diff --git a/pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go b/pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go index cdea1478..3536d5fd 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go +++ b/pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go @@ -13,11 +13,14 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethpandaops/xatu/pkg/cannon/ethereum" "github.com/ethpandaops/xatu/pkg/cannon/iterator" + "github.com/ethpandaops/xatu/pkg/observability" xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1" "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/google/uuid" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -86,16 +89,19 @@ func (b *ExecutionTransactionDeriver) Stop(ctx context.Context) error { return nil } -func (b *ExecutionTransactionDeriver) run(ctx context.Context) { +func (b *ExecutionTransactionDeriver) run(rctx context.Context) { bo := backoff.NewExponentialBackOff() bo.MaxInterval = 3 * time.Minute for { select { - case <-ctx.Done(): + case <-rctx.Done(): return default: operation := func() error { + ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name())) + defer span.End() + time.Sleep(100 * time.Millisecond) if err := b.beacon.Synced(ctx); err != nil { @@ -152,6 +158,12 @@ func (b *ExecutionTransactionDeriver) run(ctx context.Context) { } func (b *ExecutionTransactionDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "ExecutionTransactionDeriver.processEpoch", + trace.WithAttributes(attribute.Int64("epoch", int64(epoch))), + ) + defer span.End() + sp, err := b.beacon.Node().Spec() if err != nil { return nil, errors.Wrap(err, "failed to obtain spec") @@ -175,6 +187,11 @@ func (b *ExecutionTransactionDeriver) processEpoch(ctx context.Context, epoch ph // lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required. func (b *ExecutionTransactionDeriver) lookAheadAtLocation(ctx context.Context, locations []*xatu.CannonLocation) { + _, span := observability.Tracer().Start(ctx, + "ExecutionTransactionDeriver.lookAheadAtLocations", + ) + defer span.End() + for _, location := range locations { // Get the next look ahead epoch epoch := phase0.Epoch(location.GetEthV2BeaconBlockExecutionTransaction().GetEpoch()) @@ -196,6 +213,12 @@ func (b *ExecutionTransactionDeriver) lookAheadAtLocation(ctx context.Context, l } func (b *ExecutionTransactionDeriver) processSlot(ctx context.Context, slot phase0.Slot) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "ExecutionTransactionDeriver.processSlot", + trace.WithAttributes(attribute.Int64("slot", int64(slot))), + ) + defer span.End() + // Get the block block, err := b.beacon.GetBeaconBlock(ctx, xatuethv1.SlotAsString(slot)) if err != nil { diff --git a/pkg/cannon/deriver/beacon/eth/v2/proposer_slashing.go b/pkg/cannon/deriver/beacon/eth/v2/proposer_slashing.go index 3bc1f92c..330b16f8 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/proposer_slashing.go +++ b/pkg/cannon/deriver/beacon/eth/v2/proposer_slashing.go @@ -2,6 +2,7 @@ package v2 import ( "context" + "fmt" "time" "github.com/attestantio/go-eth2-client/spec" @@ -9,11 +10,14 @@ import ( backoff "github.com/cenkalti/backoff/v4" "github.com/ethpandaops/xatu/pkg/cannon/ethereum" "github.com/ethpandaops/xatu/pkg/cannon/iterator" + "github.com/ethpandaops/xatu/pkg/observability" xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1" "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/google/uuid" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -82,16 +86,19 @@ func (b *ProposerSlashingDeriver) Stop(ctx context.Context) error { return nil } -func (b *ProposerSlashingDeriver) run(ctx context.Context) { +func (b *ProposerSlashingDeriver) run(rctx context.Context) { bo := backoff.NewExponentialBackOff() bo.MaxInterval = 3 * time.Minute for { select { - case <-ctx.Done(): + case <-rctx.Done(): return default: operation := func() error { + ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name())) + defer span.End() + time.Sleep(100 * time.Millisecond) if err := b.beacon.Synced(ctx); err != nil { @@ -148,6 +155,12 @@ func (b *ProposerSlashingDeriver) run(ctx context.Context) { } func (b *ProposerSlashingDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "ProposerSlashingDeriver.processEpoch", + trace.WithAttributes(attribute.Int64("epoch", int64(epoch))), + ) + defer span.End() + sp, err := b.beacon.Node().Spec() if err != nil { return nil, errors.Wrap(err, "failed to obtain spec") @@ -170,6 +183,12 @@ func (b *ProposerSlashingDeriver) processEpoch(ctx context.Context, epoch phase0 } func (b *ProposerSlashingDeriver) processSlot(ctx context.Context, slot phase0.Slot) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "ProposerSlashingDeriver.processSlot", + trace.WithAttributes(attribute.Int64("slot", int64(slot))), + ) + defer span.End() + // Get the block block, err := b.beacon.GetBeaconBlock(ctx, xatuethv1.SlotAsString(slot)) if err != nil { @@ -244,6 +263,11 @@ func (b *ProposerSlashingDeriver) getProposerSlashings(ctx context.Context, bloc // lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required. func (b *ProposerSlashingDeriver) lookAheadAtLocation(ctx context.Context, locations []*xatu.CannonLocation) { + _, span := observability.Tracer().Start(ctx, + "ProposerSlashingDeriver.lookAheadAtLocations", + ) + defer span.End() + if locations == nil { return } diff --git a/pkg/cannon/deriver/beacon/eth/v2/voluntary_exit.go b/pkg/cannon/deriver/beacon/eth/v2/voluntary_exit.go index b2bb1b67..ca48b87e 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/voluntary_exit.go +++ b/pkg/cannon/deriver/beacon/eth/v2/voluntary_exit.go @@ -10,11 +10,14 @@ import ( backoff "github.com/cenkalti/backoff/v4" "github.com/ethpandaops/xatu/pkg/cannon/ethereum" "github.com/ethpandaops/xatu/pkg/cannon/iterator" + "github.com/ethpandaops/xatu/pkg/observability" xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1" "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/google/uuid" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -83,16 +86,19 @@ func (b *VoluntaryExitDeriver) Stop(ctx context.Context) error { return nil } -func (b *VoluntaryExitDeriver) run(ctx context.Context) { +func (b *VoluntaryExitDeriver) run(rctx context.Context) { bo := backoff.NewExponentialBackOff() bo.MaxInterval = 3 * time.Minute for { select { - case <-ctx.Done(): + case <-rctx.Done(): return default: operation := func() error { + ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name())) + defer span.End() + time.Sleep(100 * time.Millisecond) if err := b.beacon.Synced(ctx); err != nil { @@ -150,6 +156,11 @@ func (b *VoluntaryExitDeriver) run(ctx context.Context) { // lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required. func (b *VoluntaryExitDeriver) lookAheadAtLocation(ctx context.Context, locations []*xatu.CannonLocation) { + _, span := observability.Tracer().Start(ctx, + "VoluntaryExitDeriver.lookAheadAtLocations", + ) + defer span.End() + if locations == nil { return } @@ -175,6 +186,12 @@ func (b *VoluntaryExitDeriver) lookAheadAtLocation(ctx context.Context, location } func (b *VoluntaryExitDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "VoluntaryExitDeriver.processEpoch", + trace.WithAttributes(attribute.Int64("epoch", int64(epoch))), + ) + defer span.End() + sp, err := b.beacon.Node().Spec() if err != nil { return nil, errors.Wrap(err, "failed to obtain spec") @@ -197,6 +214,12 @@ func (b *VoluntaryExitDeriver) processEpoch(ctx context.Context, epoch phase0.Ep } func (b *VoluntaryExitDeriver) processSlot(ctx context.Context, slot phase0.Slot) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "VoluntaryExitDeriver.processSlot", + trace.WithAttributes(attribute.Int64("slot", int64(slot))), + ) + defer span.End() + // Get the block block, err := b.beacon.GetBeaconBlock(ctx, xatuethv1.SlotAsString(slot)) if err != nil { diff --git a/pkg/cannon/deriver/beacon/eth/v2/withdrawal.go b/pkg/cannon/deriver/beacon/eth/v2/withdrawal.go index fbbbcdc2..81568fd7 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/withdrawal.go +++ b/pkg/cannon/deriver/beacon/eth/v2/withdrawal.go @@ -2,6 +2,7 @@ package v2 import ( "context" + "fmt" "time" "github.com/attestantio/go-eth2-client/spec" @@ -9,11 +10,14 @@ import ( backoff "github.com/cenkalti/backoff/v4" "github.com/ethpandaops/xatu/pkg/cannon/ethereum" "github.com/ethpandaops/xatu/pkg/cannon/iterator" + "github.com/ethpandaops/xatu/pkg/observability" xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1" "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/google/uuid" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -82,16 +86,19 @@ func (b *WithdrawalDeriver) Stop(ctx context.Context) error { return nil } -func (b *WithdrawalDeriver) run(ctx context.Context) { +func (b *WithdrawalDeriver) run(rctx context.Context) { bo := backoff.NewExponentialBackOff() bo.MaxInterval = 3 * time.Minute for { select { - case <-ctx.Done(): + case <-rctx.Done(): return default: operation := func() error { + ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name())) + defer span.End() + time.Sleep(100 * time.Millisecond) if err := b.beacon.Synced(ctx); err != nil { @@ -145,6 +152,12 @@ func (b *WithdrawalDeriver) run(ctx context.Context) { } func (b *WithdrawalDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "WithdrawalDeriver.processEpoch", + trace.WithAttributes(attribute.Int64("epoch", int64(epoch))), + ) + defer span.End() + sp, err := b.beacon.Node().Spec() if err != nil { return nil, errors.Wrap(err, "failed to obtain spec") @@ -167,6 +180,12 @@ func (b *WithdrawalDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch } func (b *WithdrawalDeriver) processSlot(ctx context.Context, slot phase0.Slot) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "WithdrawalDeriver.processSlot", + trace.WithAttributes(attribute.Int64("slot", int64(slot))), + ) + defer span.End() + // Get the block block, err := b.beacon.GetBeaconBlock(ctx, xatuethv1.SlotAsString(slot)) if err != nil { @@ -203,6 +222,11 @@ func (b *WithdrawalDeriver) processSlot(ctx context.Context, slot phase0.Slot) ( // lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required. func (b *WithdrawalDeriver) lookAheadAtLocation(ctx context.Context, locations []*xatu.CannonLocation) { + _, span := observability.Tracer().Start(ctx, + "WithdrawalDeriver.lookAheadAtLocations", + ) + defer span.End() + if locations == nil { return } diff --git a/pkg/cannon/ethereum/beacon.go b/pkg/cannon/ethereum/beacon.go index a1e7d181..1185cc3f 100644 --- a/pkg/cannon/ethereum/beacon.go +++ b/pkg/cannon/ethereum/beacon.go @@ -9,10 +9,14 @@ import ( "github.com/attestantio/go-eth2-client/spec" "github.com/ethpandaops/beacon/pkg/beacon" "github.com/ethpandaops/xatu/pkg/cannon/ethereum/services" + "github.com/ethpandaops/xatu/pkg/observability" "github.com/go-co-op/gocron" "github.com/jellydator/ttlcache/v3" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "golang.org/x/sync/singleflight" ) @@ -30,6 +34,7 @@ type BeaconNode struct { sfGroup *singleflight.Group blockCache *ttlcache.Cache[string, *spec.VersionedSignedBeaconBlock] blockPreloadChan chan string + blockPreloadSem chan struct{} } func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus.FieldLogger) (*BeaconNode, error) { @@ -61,6 +66,9 @@ func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus. &metadata, } + // Create a buffered channel (semaphore) to limit the number of concurrent goroutines. + sem := make(chan struct{}, config.BlockPreloadWorkers) + return &BeaconNode{ config: config, log: log.WithField("module", "cannon/ethereum/beacon"), @@ -72,6 +80,7 @@ func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus. ), sfGroup: &singleflight.Group{}, blockPreloadChan: make(chan string, config.BlockPreloadQueueSize), + blockPreloadSem: sem, metrics: NewMetrics(namespace, name), }, nil } @@ -119,6 +128,10 @@ func (b *BeaconNode) Start(ctx context.Context) error { return err } + b.blockCache.OnEviction(func(ctx context.Context, reason ttlcache.EvictionReason, item *ttlcache.Item[string, *spec.VersionedSignedBeaconBlock]) { + b.log.WithField("identifier", item.Key()).WithField("reason", reason).Trace("Block evicted from cache") + }) + go b.blockCache.Start() for i := 0; i < int(b.config.BlockPreloadWorkers); i++ { @@ -207,10 +220,11 @@ func (b *BeaconNode) Synced(ctx context.Context) error { // GetBeaconBlock returns a beacon block by its identifier. Blocks can be cached internally. func (b *BeaconNode) GetBeaconBlock(ctx context.Context, identifier string, ignoreMetrics ...bool) (*spec.VersionedSignedBeaconBlock, error) { - b.metrics.IncBlocksFetched(string(b.Metadata().Network.Name)) + ctx, span := observability.Tracer().Start(ctx, "ethereum.beacon.GetBeaconBlock", trace.WithAttributes(attribute.String("identifier", identifier))) - // Create a buffered channel (semaphore) to limit the number of concurrent goroutines. - sem := make(chan struct{}, b.config.BlockPreloadWorkers) + defer span.End() + + b.metrics.IncBlocksFetched(string(b.Metadata().Network.Name)) // Check the cache first. if item := b.blockCache.Get(identifier); item != nil { @@ -218,18 +232,26 @@ func (b *BeaconNode) GetBeaconBlock(ctx context.Context, identifier string, igno b.metrics.IncBlockCacheHit(string(b.Metadata().Network.Name)) } + span.SetAttributes(attribute.Bool("cached", true)) + return item.Value(), nil } + span.SetAttributes(attribute.Bool("cached", false)) + if len(ignoreMetrics) != 0 && ignoreMetrics[0] { b.metrics.IncBlockCacheMiss(string(b.Metadata().Network.Name)) } // Use singleflight to ensure we only make one request for a block at a time. - x, err, _ := b.sfGroup.Do(identifier, func() (interface{}, error) { + x, err, shared := b.sfGroup.Do(identifier, func() (interface{}, error) { + span.AddEvent("Acquiring semaphore...") + // Acquire a semaphore before proceeding. - sem <- struct{}{} - defer func() { <-sem }() + b.blockPreloadSem <- struct{}{} + defer func() { <-b.blockPreloadSem }() + + span.AddEvent("Semaphore acquired. Fetching block from beacon api...") // Not in the cache, so fetch it. block, err := b.beacon.FetchBlock(ctx, identifier) @@ -237,12 +259,16 @@ func (b *BeaconNode) GetBeaconBlock(ctx context.Context, identifier string, igno return nil, err } + span.AddEvent("Block fetched from beacon node.") + // Add it to the cache. b.blockCache.Set(identifier, block, time.Hour) return block, nil }) if err != nil { + span.SetStatus(codes.Error, err.Error()) + if len(ignoreMetrics) != 0 && ignoreMetrics[0] { b.metrics.IncBlocksFetchErrors(string(b.Metadata().Network.Name)) } @@ -250,9 +276,16 @@ func (b *BeaconNode) GetBeaconBlock(ctx context.Context, identifier string, igno return nil, err } + span.AddEvent("Block fetching complete.", trace.WithAttributes(attribute.Bool("shared", shared))) + return x.(*spec.VersionedSignedBeaconBlock), nil } func (b *BeaconNode) LazyLoadBeaconBlock(identifier string) { + // Don't add the block to the preload queue if it's already in the cache. + if item := b.blockCache.Get(identifier); item != nil { + return + } + b.blockPreloadChan <- identifier } diff --git a/pkg/cannon/iterator/checkpoint_iterator.go b/pkg/cannon/iterator/checkpoint_iterator.go index c775275f..fe4617d2 100644 --- a/pkg/cannon/iterator/checkpoint_iterator.go +++ b/pkg/cannon/iterator/checkpoint_iterator.go @@ -8,9 +8,12 @@ import ( "github.com/ethpandaops/ethwallclock" "github.com/ethpandaops/xatu/pkg/cannon/coordinator" "github.com/ethpandaops/xatu/pkg/cannon/ethereum" + "github.com/ethpandaops/xatu/pkg/observability" "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type CheckpointIterator struct { @@ -46,6 +49,27 @@ func (c *CheckpointIterator) UpdateLocation(ctx context.Context, location *xatu. } func (c *CheckpointIterator) Next(ctx context.Context) (next *xatu.CannonLocation, lookAhead []*xatu.CannonLocation, err error) { + ctx, span := observability.Tracer().Start(ctx, + "CheckpointIterator.Next", + trace.WithAttributes( + attribute.String("network", c.networkName), + attribute.String("cannon_type", c.cannonType.String()), + attribute.String("network_id", c.networkID), + ), + ) + defer func() { + if err != nil { + span.RecordError(err) + } + + epoch, err := c.getEpochFromLocation(next) + if err == nil { + span.SetAttributes(attribute.Int64("next", int64(epoch))) + } + + span.End() + }() + for { // Grab the current checkpoint from the beacon node checkpoint, err := c.fetchLatestEpoch(ctx) @@ -129,7 +153,7 @@ func (c *CheckpointIterator) getLookAheads(ctx context.Context, location *xatu.C lookAheads := make([]*xatu.CannonLocation, 0) - for _, i := range []int{1} { + for _, i := range []int{1, 2, 3} { lookAheadEpoch := epoch + phase0.Epoch(i) if lookAheadEpoch > latestCheckpoint.Epoch { @@ -148,6 +172,11 @@ func (c *CheckpointIterator) getLookAheads(ctx context.Context, location *xatu.C } func (c *CheckpointIterator) fetchLatestEpoch(ctx context.Context) (*phase0.Checkpoint, error) { + _, span := observability.Tracer().Start(ctx, + "CheckpointIterator.FetchLatestEpoch", + ) + defer span.End() + finality, err := c.beaconNode.Node().Finality() if err != nil { return nil, errors.Wrap(err, "failed to fetch finality") diff --git a/pkg/proto/xatu/xatu.go b/pkg/proto/xatu/xatu.go index bb2083e8..70e3b621 100644 --- a/pkg/proto/xatu/xatu.go +++ b/pkg/proto/xatu/xatu.go @@ -16,6 +16,14 @@ func Full() string { return fmt.Sprintf("%s/%s", Implementation, Short()) } +func FullWithMode(mode Mode) string { + return fmt.Sprintf("%s-%s/%s", Implementation, mode, Short()) +} + +func WithMode(mode Mode) string { + return fmt.Sprintf("%s-%s", Implementation, mode) +} + func Short() string { return fmt.Sprintf("%s-%s", Release, GitCommit) }