From 7b71448942c1475ea154fa14f733926dd8f77721 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Tue, 3 Oct 2023 15:02:05 +1000 Subject: [PATCH 01/10] feat: Tracing --- .gitignore | 2 +- Dockerfile | 2 +- go.mod | 24 ++++++-- go.sum | 35 ++++++++++++ pkg/cannon/cannon.go | 53 +++++++++++++++++- pkg/cannon/config.go | 8 +++ .../beacon/eth/v2/attester_slashing.go | 28 +++++++++- .../deriver/beacon/eth/v2/beacon_block.go | 56 ++++++++++++++++++- .../beacon/eth/v2/bls_to_execution_change.go | 27 ++++++++- pkg/cannon/deriver/beacon/eth/v2/deposit.go | 27 ++++++++- .../beacon/eth/v2/execution_transaction.go | 27 ++++++++- .../beacon/eth/v2/proposer_slashing.go | 28 +++++++++- .../deriver/beacon/eth/v2/voluntary_exit.go | 27 ++++++++- .../deriver/beacon/eth/v2/withdrawal.go | 28 +++++++++- pkg/cannon/ethereum/beacon.go | 45 +++++++++++++-- pkg/cannon/iterator/checkpoint_iterator.go | 31 +++++++++- pkg/proto/xatu/xatu.go | 8 +++ 17 files changed, 424 insertions(+), 32 deletions(-) diff --git a/.gitignore b/.gitignore index 8c6d45a3..4ea2b92c 100644 --- a/.gitignore +++ b/.gitignore @@ -6,5 +6,5 @@ cannon.yaml dist GeoLite2-ASN.mmdb GeoLite2-City.mmdb -__debug_bin +__debug_bin* .vscode/launch.json diff --git a/Dockerfile b/Dockerfile index 5267680b..3ddc7725 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.19 AS builder +FROM golang:1.20 AS builder WORKDIR /src COPY go.sum go.mod ./ RUN go mod download diff --git a/go.mod b/go.mod index 1314fd72..0030d49b 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ethpandaops/xatu -go 1.19 +go 1.20 replace github.com/attestantio/go-eth2-client v0.16.3 => github.com/samcm/go-eth2-client v0.15.12 @@ -29,9 +29,9 @@ require ( github.com/redis/go-redis/v9 v9.0.4 github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.7.0 - github.com/stretchr/testify v1.8.2 + github.com/stretchr/testify v1.8.4 golang.org/x/sync v0.3.0 - google.golang.org/grpc v1.55.0 + google.golang.org/grpc v1.58.2 google.golang.org/protobuf v1.31.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -54,6 +54,8 @@ require ( github.com/fatih/color v1.14.1 // indirect github.com/ferranbt/fastssz v0.1.3 // indirect github.com/getsentry/sentry-go v0.23.0 // indirect + github.com/go-logr/logr v1.2.4 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-stack/stack v1.8.1 // indirect github.com/goccy/go-yaml v1.9.8 // indirect @@ -62,6 +64,7 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect github.com/gorilla/websocket v1.5.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect github.com/holiman/bloomfilter/v2 v2.0.3 // indirect github.com/holiman/uint256 v1.2.3 // indirect github.com/huandu/xstrings v1.4.0 // indirect @@ -107,14 +110,23 @@ require ( github.com/tklauser/go-sysconf v0.3.11 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.3 // indirect + go.opentelemetry.io/otel v1.19.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 // indirect + go.opentelemetry.io/otel/metric v1.19.0 // indirect + go.opentelemetry.io/otel/sdk v1.19.0 // indirect + go.opentelemetry.io/otel/trace v1.19.0 // indirect + go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.uber.org/atomic v1.10.0 // indirect golang.org/x/crypto v0.11.0 // indirect golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b // indirect - golang.org/x/net v0.10.0 // indirect - golang.org/x/sys v0.10.0 // indirect + golang.org/x/net v0.12.0 // indirect + golang.org/x/sys v0.12.0 // indirect golang.org/x/text v0.11.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect - google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect + google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 4d0965f1..6855f8e8 100644 --- a/go.sum +++ b/go.sum @@ -135,6 +135,11 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= @@ -220,6 +225,8 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao= @@ -432,6 +439,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM= @@ -453,6 +462,20 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs= +go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 h1:Mne5On7VWdx7omSrSSZvM4Kw7cS7NQkOOmLcgscI51U= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0/go.mod h1:IPtUMKL4O3tH5y+iXVyAXqpAwMuzC1IrxVS81rummfE= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU= +go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE= +go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8= +go.opentelemetry.io/otel/sdk v1.19.0 h1:6USY6zH+L8uMH8L3t1enZPR3WFEmSTADlqldyHtJi3o= +go.opentelemetry.io/otel/sdk v1.19.0/go.mod h1:NedEbbS4w3C6zElbLdPJKOpJQOrGUJ+GfzpjUvI0v1A= +go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg= +go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= +go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= +go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= @@ -544,6 +567,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= +golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -628,6 +653,8 @@ golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -750,6 +777,12 @@ google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 h1:DdoeryqhaXp1LtT/emMP1BRJPHHKFi5akj/nbx/zNTA= google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s= +google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98 h1:Z0hjGZePRE0ZBWotvtrwxFNrNE9CUAGtplaDK5NNI/g= +google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98/go.mod h1:S7mY02OqCJTD0E1OiQy1F72PWFB4bZJ87cAtLPYgDR0= +google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 h1:FmF5cCW94Ij59cfpoLiwTgodWmm60eEV0CjlsVg2fuw= +google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98/go.mod h1:rsr7RhLuwsDKL7RmgDDCUc6yaGr1iqceVb5Wv6f6YvQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 h1:bVf09lpb+OJbByTj913DRJioFFAjf/ZGxEz7MajTp2U= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -764,6 +797,8 @@ google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag= google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8= +google.golang.org/grpc v1.58.2 h1:SXUpjxeVF3FKrTYQI4f4KvbGD5u2xccdYdurwowix5I= +google.golang.org/grpc v1.58.2/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/pkg/cannon/cannon.go b/pkg/cannon/cannon.go index 6ecffde2..8cefdd56 100644 --- a/pkg/cannon/cannon.go +++ b/pkg/cannon/cannon.go @@ -20,6 +20,7 @@ import ( v2 "github.com/ethpandaops/xatu/pkg/cannon/deriver/beacon/eth/v2" "github.com/ethpandaops/xatu/pkg/cannon/ethereum" "github.com/ethpandaops/xatu/pkg/cannon/iterator" + "github.com/ethpandaops/xatu/pkg/observability" "github.com/ethpandaops/xatu/pkg/output" "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/go-co-op/gocron" @@ -49,6 +50,8 @@ type Cannon struct { eventDerivers []deriver.EventDeriver coordinatorClient *coordinator.Client + + shutdownFuncs []func(ctx context.Context) error } func New(ctx context.Context, log logrus.FieldLogger, config *Config) (*Cannon, error) { @@ -86,10 +89,36 @@ func New(ctx context.Context, log logrus.FieldLogger, config *Config) (*Cannon, scheduler: gocron.NewScheduler(time.Local), eventDerivers: nil, // Derivers are created once the beacon node is ready coordinatorClient: coordinatorClient, + shutdownFuncs: []func(ctx context.Context) error{}, }, nil } func (c *Cannon) Start(ctx context.Context) error { + // Start tracing if enabled + if c.Config.Tracing.Enabled { + c.log.Info("Tracing enabled") + + res, err := observability.NewResource(xatu.WithMode(xatu.ModeCannon), xatu.Short()) + if err != nil { + return perrors.Wrap(err, "failed to create tracing resource") + } + + tracer, err := observability.NewHTTPTraceProvider(ctx, + res, + c.Config.Tracing.AsOTelOpts()..., + ) + if err != nil { + return perrors.Wrap(err, "failed to create tracing provider") + } + + shutdown, err := observability.SetupOTelSDK(ctx, tracer) + if err != nil { + return perrors.Wrap(err, "failed to setup tracing SDK") + } + + c.shutdownFuncs = append(c.shutdownFuncs, shutdown) + } + if err := c.ServeMetrics(ctx); err != nil { return err } @@ -133,7 +162,15 @@ func (c *Cannon) Start(ctx context.Context) error { sig := <-cancel c.log.Printf("Caught signal: %v", sig) - c.log.Printf("Flushing sinks") + if err := c.Shutdown(ctx); err != nil { + return err + } + + return nil +} + +func (c *Cannon) Shutdown(ctx context.Context) error { + c.log.Printf("Shutting down") for _, sink := range c.sinks { if err := sink.Stop(ctx); err != nil { @@ -141,6 +178,20 @@ func (c *Cannon) Start(ctx context.Context) error { } } + for _, fun := range c.shutdownFuncs { + if err := fun(ctx); err != nil { + return err + } + } + + c.scheduler.Stop() + + for _, deriver := range c.eventDerivers { + if err := deriver.Stop(ctx); err != nil { + return err + } + } + return nil } diff --git a/pkg/cannon/config.go b/pkg/cannon/config.go index 26c760cb..8382c101 100644 --- a/pkg/cannon/config.go +++ b/pkg/cannon/config.go @@ -7,6 +7,7 @@ import ( "github.com/ethpandaops/xatu/pkg/cannon/coordinator" "github.com/ethpandaops/xatu/pkg/cannon/deriver" "github.com/ethpandaops/xatu/pkg/cannon/ethereum" + "github.com/ethpandaops/xatu/pkg/observability" "github.com/ethpandaops/xatu/pkg/output" "github.com/ethpandaops/xatu/pkg/processor" "github.com/sirupsen/logrus" @@ -37,6 +38,9 @@ type Config struct { // Coordinator configuration Coordinator coordinator.Config `yaml:"coordinator"` + + // Tracing configuration + Tracing observability.TracingConfig `yaml:"tracing"` } func (c *Config) Validate() error { @@ -62,6 +66,10 @@ func (c *Config) Validate() error { return fmt.Errorf("invalid coordinator config: %w", err) } + if err := c.Tracing.Validate(); err != nil { + return fmt.Errorf("invalid tracing config: %w", err) + } + return nil } diff --git a/pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go b/pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go index 1b126575..40f8a7c1 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go +++ b/pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go @@ -2,6 +2,7 @@ package v2 import ( "context" + "fmt" "time" "github.com/attestantio/go-eth2-client/spec" @@ -9,11 +10,14 @@ import ( backoff "github.com/cenkalti/backoff/v4" "github.com/ethpandaops/xatu/pkg/cannon/ethereum" "github.com/ethpandaops/xatu/pkg/cannon/iterator" + "github.com/ethpandaops/xatu/pkg/observability" xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1" "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/google/uuid" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -82,16 +86,19 @@ func (a *AttesterSlashingDeriver) Stop(ctx context.Context) error { return nil } -func (a *AttesterSlashingDeriver) run(ctx context.Context) { +func (a *AttesterSlashingDeriver) run(rctx context.Context) { bo := backoff.NewExponentialBackOff() bo.MaxInterval = 3 * time.Minute for { select { - case <-ctx.Done(): + case <-rctx.Done(): return default: operation := func() error { + ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", a.Name())) + defer span.End() + time.Sleep(100 * time.Millisecond) if err := a.beacon.Synced(ctx); err != nil { @@ -149,6 +156,11 @@ func (a *AttesterSlashingDeriver) run(ctx context.Context) { // lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required. func (a *AttesterSlashingDeriver) lookAheadAtLocations(ctx context.Context, locations []*xatu.CannonLocation) { + _, span := observability.Tracer().Start(ctx, + "AttesterSlashingDeriver.lookAheadAtLocations", + ) + defer span.End() + if locations == nil { return } @@ -174,6 +186,12 @@ func (a *AttesterSlashingDeriver) lookAheadAtLocations(ctx context.Context, loca } func (a *AttesterSlashingDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "AttesterSlashingDeriver.processEpoch", + trace.WithAttributes(attribute.Int64("epoch", int64(epoch))), + ) + defer span.End() + sp, err := a.beacon.Node().Spec() if err != nil { return nil, errors.Wrap(err, "failed to obtain spec") @@ -196,6 +214,12 @@ func (a *AttesterSlashingDeriver) processEpoch(ctx context.Context, epoch phase0 } func (a *AttesterSlashingDeriver) processSlot(ctx context.Context, slot phase0.Slot) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "AttesterSlashingDeriver.processSlot", + trace.WithAttributes(attribute.Int64("slot", int64(slot))), + ) + defer span.End() + // Get the block block, err := a.beacon.GetBeaconBlock(ctx, xatuethv1.SlotAsString(slot)) if err != nil { diff --git a/pkg/cannon/deriver/beacon/eth/v2/beacon_block.go b/pkg/cannon/deriver/beacon/eth/v2/beacon_block.go index 71e722c7..7fb92c0f 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/beacon_block.go +++ b/pkg/cannon/deriver/beacon/eth/v2/beacon_block.go @@ -11,6 +11,7 @@ import ( backoff "github.com/cenkalti/backoff/v4" "github.com/ethpandaops/xatu/pkg/cannon/ethereum" "github.com/ethpandaops/xatu/pkg/cannon/iterator" + "github.com/ethpandaops/xatu/pkg/observability" "github.com/ethpandaops/xatu/pkg/proto/eth" xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1" xatuethv2 "github.com/ethpandaops/xatu/pkg/proto/eth/v2" @@ -19,6 +20,9 @@ import ( "github.com/google/uuid" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -87,57 +91,88 @@ func (b *BeaconBlockDeriver) Stop(ctx context.Context) error { return nil } -func (b *BeaconBlockDeriver) run(ctx context.Context) { +func (b *BeaconBlockDeriver) run(rctx context.Context) { bo := backoff.NewExponentialBackOff() bo.MaxInterval = 3 * time.Minute + tracer := observability.Tracer() + for { select { - case <-ctx.Done(): + case <-rctx.Done(): return default: operation := func() error { + ctx, span := tracer.Start(rctx, fmt.Sprintf("Derive %s", b.Name())) + defer span.End() + time.Sleep(100 * time.Millisecond) + span.AddEvent("Checking if beacon node is synced") + if err := b.beacon.Synced(ctx); err != nil { + span.SetStatus(codes.Error, err.Error()) + return err } + span.AddEvent("Grabbing next location") + // Get the next slot location, lookAhead, err := b.iterator.Next(ctx) if err != nil { + span.SetStatus(codes.Error, err.Error()) + return err } + span.AddEvent("Obtained next location, looking ahead...", trace.WithAttributes(attribute.Int64("location", int64(location.GetEthV2BeaconBlock().GetEpoch())))) + // Look ahead b.lookAheadAtLocation(ctx, lookAhead) + span.AddEvent("Look ahead complete. Firing location callbacks...") + for _, fn := range b.onLocationCallbacks { if errr := fn(ctx, location.GetEthV2BeaconBlock().GetEpoch()); errr != nil { b.log.WithError(errr).Error("Failed to send location") } } + span.AddEvent("Location callbacks complete. Processing epoch...") + // Process the epoch events, err := b.processEpoch(ctx, phase0.Epoch(location.GetEthV2BeaconBlock().GetEpoch())) if err != nil { b.log.WithError(err).Error("Failed to process epoch") + span.SetStatus(codes.Error, err.Error()) + return err } + span.AddEvent("Epoch processing complete. Sending events...") + // Send the events for _, fn := range b.onEventsCallbacks { if err := fn(ctx, events); err != nil { + span.SetStatus(codes.Error, err.Error()) + return errors.Wrap(err, "failed to send events") } } + span.AddEvent("Events sent. Updating location...") + // Update our location if err := b.iterator.UpdateLocation(ctx, location); err != nil { + span.SetStatus(codes.Error, err.Error()) + return err } + span.AddEvent("Location updated. Done.") + bo.Reset() return nil @@ -152,6 +187,11 @@ func (b *BeaconBlockDeriver) run(ctx context.Context) { // lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required. func (b *BeaconBlockDeriver) lookAheadAtLocation(ctx context.Context, locations []*xatu.CannonLocation) { + _, span := observability.Tracer().Start(ctx, + "BeaconBlockDeriver.lookAheadAtLocations", + ) + defer span.End() + if locations == nil { return } @@ -177,6 +217,12 @@ func (b *BeaconBlockDeriver) lookAheadAtLocation(ctx context.Context, locations } func (b *BeaconBlockDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "BeaconBlockDeriver.processEpoch", + trace.WithAttributes(attribute.Int64("epoch", int64(epoch))), + ) + defer span.End() + sp, err := b.beacon.Node().Spec() if err != nil { return nil, errors.Wrap(err, "failed to obtain spec") @@ -199,6 +245,12 @@ func (b *BeaconBlockDeriver) processEpoch(ctx context.Context, epoch phase0.Epoc } func (b *BeaconBlockDeriver) processSlot(ctx context.Context, slot phase0.Slot) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "BeaconBlockDeriver.processSlot", + trace.WithAttributes(attribute.Int64("slot", int64(slot))), + ) + defer span.End() + // Get the block block, err := b.beacon.GetBeaconBlock(ctx, xatuethv1.SlotAsString(slot)) if err != nil { diff --git a/pkg/cannon/deriver/beacon/eth/v2/bls_to_execution_change.go b/pkg/cannon/deriver/beacon/eth/v2/bls_to_execution_change.go index 677ba471..dd77c75a 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/bls_to_execution_change.go +++ b/pkg/cannon/deriver/beacon/eth/v2/bls_to_execution_change.go @@ -10,11 +10,14 @@ import ( backoff "github.com/cenkalti/backoff/v4" "github.com/ethpandaops/xatu/pkg/cannon/ethereum" "github.com/ethpandaops/xatu/pkg/cannon/iterator" + "github.com/ethpandaops/xatu/pkg/observability" xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1" xatuethv2 "github.com/ethpandaops/xatu/pkg/proto/eth/v2" "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/google/uuid" "github.com/pkg/errors" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" @@ -85,16 +88,19 @@ func (b *BLSToExecutionChangeDeriver) Stop(ctx context.Context) error { return nil } -func (b *BLSToExecutionChangeDeriver) run(ctx context.Context) { +func (b *BLSToExecutionChangeDeriver) run(rctx context.Context) { bo := backoff.NewExponentialBackOff() bo.MaxInterval = 3 * time.Minute for { select { - case <-ctx.Done(): + case <-rctx.Done(): return default: operation := func() error { + ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name())) + defer span.End() + time.Sleep(100 * time.Millisecond) if err := b.beacon.Synced(ctx); err != nil { @@ -152,6 +158,11 @@ func (b *BLSToExecutionChangeDeriver) run(ctx context.Context) { // lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required. func (b *BLSToExecutionChangeDeriver) lookAheadAtLocation(ctx context.Context, locations []*xatu.CannonLocation) { + _, span := observability.Tracer().Start(ctx, + "BLSToExecutionChangeDeriver.lookAheadAtLocations", + ) + defer span.End() + if locations == nil { return } @@ -177,6 +188,12 @@ func (b *BLSToExecutionChangeDeriver) lookAheadAtLocation(ctx context.Context, l } func (b *BLSToExecutionChangeDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "BLSToExecutionChangeDeriver.processEpoch", + trace.WithAttributes(attribute.Int64("epoch", int64(epoch))), + ) + defer span.End() + sp, err := b.beacon.Node().Spec() if err != nil { return nil, errors.Wrap(err, "failed to obtain spec") @@ -199,6 +216,12 @@ func (b *BLSToExecutionChangeDeriver) processEpoch(ctx context.Context, epoch ph } func (b *BLSToExecutionChangeDeriver) processSlot(ctx context.Context, slot phase0.Slot) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "BLSToExecutionChangeDeriver.processSlot", + trace.WithAttributes(attribute.Int64("slot", int64(slot))), + ) + defer span.End() + // Get the block block, err := b.beacon.GetBeaconBlock(ctx, xatuethv1.SlotAsString(slot)) if err != nil { diff --git a/pkg/cannon/deriver/beacon/eth/v2/deposit.go b/pkg/cannon/deriver/beacon/eth/v2/deposit.go index bf1e7a13..38ae5d77 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/deposit.go +++ b/pkg/cannon/deriver/beacon/eth/v2/deposit.go @@ -10,11 +10,14 @@ import ( backoff "github.com/cenkalti/backoff/v4" "github.com/ethpandaops/xatu/pkg/cannon/ethereum" "github.com/ethpandaops/xatu/pkg/cannon/iterator" + "github.com/ethpandaops/xatu/pkg/observability" xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1" "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/google/uuid" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -83,16 +86,19 @@ func (b *DepositDeriver) Stop(ctx context.Context) error { return nil } -func (b *DepositDeriver) run(ctx context.Context) { +func (b *DepositDeriver) run(rctx context.Context) { bo := backoff.NewExponentialBackOff() bo.MaxInterval = 3 * time.Minute for { select { - case <-ctx.Done(): + case <-rctx.Done(): return default: operation := func() error { + ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name())) + defer span.End() + time.Sleep(100 * time.Millisecond) if err := b.beacon.Synced(ctx); err != nil { @@ -150,6 +156,11 @@ func (b *DepositDeriver) run(ctx context.Context) { // lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required. func (b *DepositDeriver) lookAheadAtLocation(ctx context.Context, locations []*xatu.CannonLocation) { + _, span := observability.Tracer().Start(ctx, + "DepositDeriver.lookAheadAtLocations", + ) + defer span.End() + if locations == nil { return } @@ -175,6 +186,12 @@ func (b *DepositDeriver) lookAheadAtLocation(ctx context.Context, locations []*x } func (b *DepositDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "DepositDeriver.processEpoch", + trace.WithAttributes(attribute.Int64("epoch", int64(epoch))), + ) + defer span.End() + sp, err := b.beacon.Node().Spec() if err != nil { return nil, errors.Wrap(err, "failed to obtain spec") @@ -197,6 +214,12 @@ func (b *DepositDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ( } func (b *DepositDeriver) processSlot(ctx context.Context, slot phase0.Slot) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "DepositDeriver.processSlot", + trace.WithAttributes(attribute.Int64("slot", int64(slot))), + ) + defer span.End() + // Get the block block, err := b.beacon.GetBeaconBlock(ctx, xatuethv1.SlotAsString(slot)) if err != nil { diff --git a/pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go b/pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go index cdea1478..3536d5fd 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go +++ b/pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go @@ -13,11 +13,14 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethpandaops/xatu/pkg/cannon/ethereum" "github.com/ethpandaops/xatu/pkg/cannon/iterator" + "github.com/ethpandaops/xatu/pkg/observability" xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1" "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/google/uuid" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -86,16 +89,19 @@ func (b *ExecutionTransactionDeriver) Stop(ctx context.Context) error { return nil } -func (b *ExecutionTransactionDeriver) run(ctx context.Context) { +func (b *ExecutionTransactionDeriver) run(rctx context.Context) { bo := backoff.NewExponentialBackOff() bo.MaxInterval = 3 * time.Minute for { select { - case <-ctx.Done(): + case <-rctx.Done(): return default: operation := func() error { + ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name())) + defer span.End() + time.Sleep(100 * time.Millisecond) if err := b.beacon.Synced(ctx); err != nil { @@ -152,6 +158,12 @@ func (b *ExecutionTransactionDeriver) run(ctx context.Context) { } func (b *ExecutionTransactionDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "ExecutionTransactionDeriver.processEpoch", + trace.WithAttributes(attribute.Int64("epoch", int64(epoch))), + ) + defer span.End() + sp, err := b.beacon.Node().Spec() if err != nil { return nil, errors.Wrap(err, "failed to obtain spec") @@ -175,6 +187,11 @@ func (b *ExecutionTransactionDeriver) processEpoch(ctx context.Context, epoch ph // lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required. func (b *ExecutionTransactionDeriver) lookAheadAtLocation(ctx context.Context, locations []*xatu.CannonLocation) { + _, span := observability.Tracer().Start(ctx, + "ExecutionTransactionDeriver.lookAheadAtLocations", + ) + defer span.End() + for _, location := range locations { // Get the next look ahead epoch epoch := phase0.Epoch(location.GetEthV2BeaconBlockExecutionTransaction().GetEpoch()) @@ -196,6 +213,12 @@ func (b *ExecutionTransactionDeriver) lookAheadAtLocation(ctx context.Context, l } func (b *ExecutionTransactionDeriver) processSlot(ctx context.Context, slot phase0.Slot) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "ExecutionTransactionDeriver.processSlot", + trace.WithAttributes(attribute.Int64("slot", int64(slot))), + ) + defer span.End() + // Get the block block, err := b.beacon.GetBeaconBlock(ctx, xatuethv1.SlotAsString(slot)) if err != nil { diff --git a/pkg/cannon/deriver/beacon/eth/v2/proposer_slashing.go b/pkg/cannon/deriver/beacon/eth/v2/proposer_slashing.go index 3bc1f92c..330b16f8 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/proposer_slashing.go +++ b/pkg/cannon/deriver/beacon/eth/v2/proposer_slashing.go @@ -2,6 +2,7 @@ package v2 import ( "context" + "fmt" "time" "github.com/attestantio/go-eth2-client/spec" @@ -9,11 +10,14 @@ import ( backoff "github.com/cenkalti/backoff/v4" "github.com/ethpandaops/xatu/pkg/cannon/ethereum" "github.com/ethpandaops/xatu/pkg/cannon/iterator" + "github.com/ethpandaops/xatu/pkg/observability" xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1" "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/google/uuid" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -82,16 +86,19 @@ func (b *ProposerSlashingDeriver) Stop(ctx context.Context) error { return nil } -func (b *ProposerSlashingDeriver) run(ctx context.Context) { +func (b *ProposerSlashingDeriver) run(rctx context.Context) { bo := backoff.NewExponentialBackOff() bo.MaxInterval = 3 * time.Minute for { select { - case <-ctx.Done(): + case <-rctx.Done(): return default: operation := func() error { + ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name())) + defer span.End() + time.Sleep(100 * time.Millisecond) if err := b.beacon.Synced(ctx); err != nil { @@ -148,6 +155,12 @@ func (b *ProposerSlashingDeriver) run(ctx context.Context) { } func (b *ProposerSlashingDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "ProposerSlashingDeriver.processEpoch", + trace.WithAttributes(attribute.Int64("epoch", int64(epoch))), + ) + defer span.End() + sp, err := b.beacon.Node().Spec() if err != nil { return nil, errors.Wrap(err, "failed to obtain spec") @@ -170,6 +183,12 @@ func (b *ProposerSlashingDeriver) processEpoch(ctx context.Context, epoch phase0 } func (b *ProposerSlashingDeriver) processSlot(ctx context.Context, slot phase0.Slot) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "ProposerSlashingDeriver.processSlot", + trace.WithAttributes(attribute.Int64("slot", int64(slot))), + ) + defer span.End() + // Get the block block, err := b.beacon.GetBeaconBlock(ctx, xatuethv1.SlotAsString(slot)) if err != nil { @@ -244,6 +263,11 @@ func (b *ProposerSlashingDeriver) getProposerSlashings(ctx context.Context, bloc // lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required. func (b *ProposerSlashingDeriver) lookAheadAtLocation(ctx context.Context, locations []*xatu.CannonLocation) { + _, span := observability.Tracer().Start(ctx, + "ProposerSlashingDeriver.lookAheadAtLocations", + ) + defer span.End() + if locations == nil { return } diff --git a/pkg/cannon/deriver/beacon/eth/v2/voluntary_exit.go b/pkg/cannon/deriver/beacon/eth/v2/voluntary_exit.go index b2bb1b67..ca48b87e 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/voluntary_exit.go +++ b/pkg/cannon/deriver/beacon/eth/v2/voluntary_exit.go @@ -10,11 +10,14 @@ import ( backoff "github.com/cenkalti/backoff/v4" "github.com/ethpandaops/xatu/pkg/cannon/ethereum" "github.com/ethpandaops/xatu/pkg/cannon/iterator" + "github.com/ethpandaops/xatu/pkg/observability" xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1" "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/google/uuid" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -83,16 +86,19 @@ func (b *VoluntaryExitDeriver) Stop(ctx context.Context) error { return nil } -func (b *VoluntaryExitDeriver) run(ctx context.Context) { +func (b *VoluntaryExitDeriver) run(rctx context.Context) { bo := backoff.NewExponentialBackOff() bo.MaxInterval = 3 * time.Minute for { select { - case <-ctx.Done(): + case <-rctx.Done(): return default: operation := func() error { + ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name())) + defer span.End() + time.Sleep(100 * time.Millisecond) if err := b.beacon.Synced(ctx); err != nil { @@ -150,6 +156,11 @@ func (b *VoluntaryExitDeriver) run(ctx context.Context) { // lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required. func (b *VoluntaryExitDeriver) lookAheadAtLocation(ctx context.Context, locations []*xatu.CannonLocation) { + _, span := observability.Tracer().Start(ctx, + "VoluntaryExitDeriver.lookAheadAtLocations", + ) + defer span.End() + if locations == nil { return } @@ -175,6 +186,12 @@ func (b *VoluntaryExitDeriver) lookAheadAtLocation(ctx context.Context, location } func (b *VoluntaryExitDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "VoluntaryExitDeriver.processEpoch", + trace.WithAttributes(attribute.Int64("epoch", int64(epoch))), + ) + defer span.End() + sp, err := b.beacon.Node().Spec() if err != nil { return nil, errors.Wrap(err, "failed to obtain spec") @@ -197,6 +214,12 @@ func (b *VoluntaryExitDeriver) processEpoch(ctx context.Context, epoch phase0.Ep } func (b *VoluntaryExitDeriver) processSlot(ctx context.Context, slot phase0.Slot) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "VoluntaryExitDeriver.processSlot", + trace.WithAttributes(attribute.Int64("slot", int64(slot))), + ) + defer span.End() + // Get the block block, err := b.beacon.GetBeaconBlock(ctx, xatuethv1.SlotAsString(slot)) if err != nil { diff --git a/pkg/cannon/deriver/beacon/eth/v2/withdrawal.go b/pkg/cannon/deriver/beacon/eth/v2/withdrawal.go index fbbbcdc2..81568fd7 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/withdrawal.go +++ b/pkg/cannon/deriver/beacon/eth/v2/withdrawal.go @@ -2,6 +2,7 @@ package v2 import ( "context" + "fmt" "time" "github.com/attestantio/go-eth2-client/spec" @@ -9,11 +10,14 @@ import ( backoff "github.com/cenkalti/backoff/v4" "github.com/ethpandaops/xatu/pkg/cannon/ethereum" "github.com/ethpandaops/xatu/pkg/cannon/iterator" + "github.com/ethpandaops/xatu/pkg/observability" xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1" "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/google/uuid" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -82,16 +86,19 @@ func (b *WithdrawalDeriver) Stop(ctx context.Context) error { return nil } -func (b *WithdrawalDeriver) run(ctx context.Context) { +func (b *WithdrawalDeriver) run(rctx context.Context) { bo := backoff.NewExponentialBackOff() bo.MaxInterval = 3 * time.Minute for { select { - case <-ctx.Done(): + case <-rctx.Done(): return default: operation := func() error { + ctx, span := observability.Tracer().Start(rctx, fmt.Sprintf("Derive %s", b.Name())) + defer span.End() + time.Sleep(100 * time.Millisecond) if err := b.beacon.Synced(ctx); err != nil { @@ -145,6 +152,12 @@ func (b *WithdrawalDeriver) run(ctx context.Context) { } func (b *WithdrawalDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "WithdrawalDeriver.processEpoch", + trace.WithAttributes(attribute.Int64("epoch", int64(epoch))), + ) + defer span.End() + sp, err := b.beacon.Node().Spec() if err != nil { return nil, errors.Wrap(err, "failed to obtain spec") @@ -167,6 +180,12 @@ func (b *WithdrawalDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch } func (b *WithdrawalDeriver) processSlot(ctx context.Context, slot phase0.Slot) ([]*xatu.DecoratedEvent, error) { + ctx, span := observability.Tracer().Start(ctx, + "WithdrawalDeriver.processSlot", + trace.WithAttributes(attribute.Int64("slot", int64(slot))), + ) + defer span.End() + // Get the block block, err := b.beacon.GetBeaconBlock(ctx, xatuethv1.SlotAsString(slot)) if err != nil { @@ -203,6 +222,11 @@ func (b *WithdrawalDeriver) processSlot(ctx context.Context, slot phase0.Slot) ( // lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required. func (b *WithdrawalDeriver) lookAheadAtLocation(ctx context.Context, locations []*xatu.CannonLocation) { + _, span := observability.Tracer().Start(ctx, + "WithdrawalDeriver.lookAheadAtLocations", + ) + defer span.End() + if locations == nil { return } diff --git a/pkg/cannon/ethereum/beacon.go b/pkg/cannon/ethereum/beacon.go index a1e7d181..1185cc3f 100644 --- a/pkg/cannon/ethereum/beacon.go +++ b/pkg/cannon/ethereum/beacon.go @@ -9,10 +9,14 @@ import ( "github.com/attestantio/go-eth2-client/spec" "github.com/ethpandaops/beacon/pkg/beacon" "github.com/ethpandaops/xatu/pkg/cannon/ethereum/services" + "github.com/ethpandaops/xatu/pkg/observability" "github.com/go-co-op/gocron" "github.com/jellydator/ttlcache/v3" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "golang.org/x/sync/singleflight" ) @@ -30,6 +34,7 @@ type BeaconNode struct { sfGroup *singleflight.Group blockCache *ttlcache.Cache[string, *spec.VersionedSignedBeaconBlock] blockPreloadChan chan string + blockPreloadSem chan struct{} } func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus.FieldLogger) (*BeaconNode, error) { @@ -61,6 +66,9 @@ func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus. &metadata, } + // Create a buffered channel (semaphore) to limit the number of concurrent goroutines. + sem := make(chan struct{}, config.BlockPreloadWorkers) + return &BeaconNode{ config: config, log: log.WithField("module", "cannon/ethereum/beacon"), @@ -72,6 +80,7 @@ func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus. ), sfGroup: &singleflight.Group{}, blockPreloadChan: make(chan string, config.BlockPreloadQueueSize), + blockPreloadSem: sem, metrics: NewMetrics(namespace, name), }, nil } @@ -119,6 +128,10 @@ func (b *BeaconNode) Start(ctx context.Context) error { return err } + b.blockCache.OnEviction(func(ctx context.Context, reason ttlcache.EvictionReason, item *ttlcache.Item[string, *spec.VersionedSignedBeaconBlock]) { + b.log.WithField("identifier", item.Key()).WithField("reason", reason).Trace("Block evicted from cache") + }) + go b.blockCache.Start() for i := 0; i < int(b.config.BlockPreloadWorkers); i++ { @@ -207,10 +220,11 @@ func (b *BeaconNode) Synced(ctx context.Context) error { // GetBeaconBlock returns a beacon block by its identifier. Blocks can be cached internally. func (b *BeaconNode) GetBeaconBlock(ctx context.Context, identifier string, ignoreMetrics ...bool) (*spec.VersionedSignedBeaconBlock, error) { - b.metrics.IncBlocksFetched(string(b.Metadata().Network.Name)) + ctx, span := observability.Tracer().Start(ctx, "ethereum.beacon.GetBeaconBlock", trace.WithAttributes(attribute.String("identifier", identifier))) - // Create a buffered channel (semaphore) to limit the number of concurrent goroutines. - sem := make(chan struct{}, b.config.BlockPreloadWorkers) + defer span.End() + + b.metrics.IncBlocksFetched(string(b.Metadata().Network.Name)) // Check the cache first. if item := b.blockCache.Get(identifier); item != nil { @@ -218,18 +232,26 @@ func (b *BeaconNode) GetBeaconBlock(ctx context.Context, identifier string, igno b.metrics.IncBlockCacheHit(string(b.Metadata().Network.Name)) } + span.SetAttributes(attribute.Bool("cached", true)) + return item.Value(), nil } + span.SetAttributes(attribute.Bool("cached", false)) + if len(ignoreMetrics) != 0 && ignoreMetrics[0] { b.metrics.IncBlockCacheMiss(string(b.Metadata().Network.Name)) } // Use singleflight to ensure we only make one request for a block at a time. - x, err, _ := b.sfGroup.Do(identifier, func() (interface{}, error) { + x, err, shared := b.sfGroup.Do(identifier, func() (interface{}, error) { + span.AddEvent("Acquiring semaphore...") + // Acquire a semaphore before proceeding. - sem <- struct{}{} - defer func() { <-sem }() + b.blockPreloadSem <- struct{}{} + defer func() { <-b.blockPreloadSem }() + + span.AddEvent("Semaphore acquired. Fetching block from beacon api...") // Not in the cache, so fetch it. block, err := b.beacon.FetchBlock(ctx, identifier) @@ -237,12 +259,16 @@ func (b *BeaconNode) GetBeaconBlock(ctx context.Context, identifier string, igno return nil, err } + span.AddEvent("Block fetched from beacon node.") + // Add it to the cache. b.blockCache.Set(identifier, block, time.Hour) return block, nil }) if err != nil { + span.SetStatus(codes.Error, err.Error()) + if len(ignoreMetrics) != 0 && ignoreMetrics[0] { b.metrics.IncBlocksFetchErrors(string(b.Metadata().Network.Name)) } @@ -250,9 +276,16 @@ func (b *BeaconNode) GetBeaconBlock(ctx context.Context, identifier string, igno return nil, err } + span.AddEvent("Block fetching complete.", trace.WithAttributes(attribute.Bool("shared", shared))) + return x.(*spec.VersionedSignedBeaconBlock), nil } func (b *BeaconNode) LazyLoadBeaconBlock(identifier string) { + // Don't add the block to the preload queue if it's already in the cache. + if item := b.blockCache.Get(identifier); item != nil { + return + } + b.blockPreloadChan <- identifier } diff --git a/pkg/cannon/iterator/checkpoint_iterator.go b/pkg/cannon/iterator/checkpoint_iterator.go index c775275f..fe4617d2 100644 --- a/pkg/cannon/iterator/checkpoint_iterator.go +++ b/pkg/cannon/iterator/checkpoint_iterator.go @@ -8,9 +8,12 @@ import ( "github.com/ethpandaops/ethwallclock" "github.com/ethpandaops/xatu/pkg/cannon/coordinator" "github.com/ethpandaops/xatu/pkg/cannon/ethereum" + "github.com/ethpandaops/xatu/pkg/observability" "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type CheckpointIterator struct { @@ -46,6 +49,27 @@ func (c *CheckpointIterator) UpdateLocation(ctx context.Context, location *xatu. } func (c *CheckpointIterator) Next(ctx context.Context) (next *xatu.CannonLocation, lookAhead []*xatu.CannonLocation, err error) { + ctx, span := observability.Tracer().Start(ctx, + "CheckpointIterator.Next", + trace.WithAttributes( + attribute.String("network", c.networkName), + attribute.String("cannon_type", c.cannonType.String()), + attribute.String("network_id", c.networkID), + ), + ) + defer func() { + if err != nil { + span.RecordError(err) + } + + epoch, err := c.getEpochFromLocation(next) + if err == nil { + span.SetAttributes(attribute.Int64("next", int64(epoch))) + } + + span.End() + }() + for { // Grab the current checkpoint from the beacon node checkpoint, err := c.fetchLatestEpoch(ctx) @@ -129,7 +153,7 @@ func (c *CheckpointIterator) getLookAheads(ctx context.Context, location *xatu.C lookAheads := make([]*xatu.CannonLocation, 0) - for _, i := range []int{1} { + for _, i := range []int{1, 2, 3} { lookAheadEpoch := epoch + phase0.Epoch(i) if lookAheadEpoch > latestCheckpoint.Epoch { @@ -148,6 +172,11 @@ func (c *CheckpointIterator) getLookAheads(ctx context.Context, location *xatu.C } func (c *CheckpointIterator) fetchLatestEpoch(ctx context.Context) (*phase0.Checkpoint, error) { + _, span := observability.Tracer().Start(ctx, + "CheckpointIterator.FetchLatestEpoch", + ) + defer span.End() + finality, err := c.beaconNode.Node().Finality() if err != nil { return nil, errors.Wrap(err, "failed to fetch finality") diff --git a/pkg/proto/xatu/xatu.go b/pkg/proto/xatu/xatu.go index bb2083e8..70e3b621 100644 --- a/pkg/proto/xatu/xatu.go +++ b/pkg/proto/xatu/xatu.go @@ -16,6 +16,14 @@ func Full() string { return fmt.Sprintf("%s/%s", Implementation, Short()) } +func FullWithMode(mode Mode) string { + return fmt.Sprintf("%s-%s/%s", Implementation, mode, Short()) +} + +func WithMode(mode Mode) string { + return fmt.Sprintf("%s-%s", Implementation, mode) +} + func Short() string { return fmt.Sprintf("%s-%s", Release, GitCommit) } From deba1cbb7a0ef3ce3a793de4fc2293b9639a21da Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Tue, 3 Oct 2023 15:05:24 +1000 Subject: [PATCH 02/10] feat: Tracing --- pkg/observability/config.go | 64 ++++++++++++++++++++++++++++ pkg/observability/tracing.go | 81 ++++++++++++++++++++++++++++++++++++ pkg/proto/xatu/mode.go | 12 ++++++ 3 files changed, 157 insertions(+) create mode 100644 pkg/observability/config.go create mode 100644 pkg/observability/tracing.go create mode 100644 pkg/proto/xatu/mode.go diff --git a/pkg/observability/config.go b/pkg/observability/config.go new file mode 100644 index 00000000..ccaf2ce8 --- /dev/null +++ b/pkg/observability/config.go @@ -0,0 +1,64 @@ +package observability + +import ( + "crypto/tls" + + "github.com/ethpandaops/beacon/pkg/human" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" +) + +type TracingConfig struct { + Enabled bool `yaml:"enabled" default:"false"` + Endpoint string `yaml:"endpoint" default:""` + URLPath string `yaml:"urlPath" default:""` + Timeout human.Duration `yaml:"timeout" default:"15s"` + Compression bool `yaml:"compression" default:"true"` + Headers map[string]string `yaml:"headers"` + Insecure bool `yaml:"insecure" default:"false"` + Retry *otlptracehttp.RetryConfig `yaml:"retry"` + TLS *tls.Config `yaml:"tls"` +} + +func (t *TracingConfig) Validate() error { + return nil +} + +func (t *TracingConfig) AsOTelOpts() []otlptracehttp.Option { + var opts []otlptracehttp.Option + + if t.Endpoint != "" { + opts = append(opts, otlptracehttp.WithEndpoint(t.Endpoint)) + } + + if t.URLPath != "" { + opts = append(opts, otlptracehttp.WithURLPath(t.URLPath)) + } + + if t.Timeout.Duration != 0 { + opts = append(opts, otlptracehttp.WithTimeout(t.Timeout.Duration)) + } + + if t.Compression { + opts = append(opts, otlptracehttp.WithCompression(otlptracehttp.GzipCompression)) + } else { + opts = append(opts, otlptracehttp.WithCompression(otlptracehttp.NoCompression)) + } + + if len(t.Headers) > 0 { + opts = append(opts, otlptracehttp.WithHeaders(t.Headers)) + } + + if t.Insecure { + opts = append(opts, otlptracehttp.WithInsecure()) + } + + if t.Retry != nil && t.Retry.Enabled { + opts = append(opts, otlptracehttp.WithRetry(*t.Retry)) + } + + if t.TLS != nil { + opts = append(opts, otlptracehttp.WithTLSClientConfig(t.TLS)) + } + + return opts +} diff --git a/pkg/observability/tracing.go b/pkg/observability/tracing.go new file mode 100644 index 00000000..4ef63227 --- /dev/null +++ b/pkg/observability/tracing.go @@ -0,0 +1,81 @@ +package observability + +import ( + "context" + "errors" + "fmt" + + "github.com/ethpandaops/xatu/pkg/proto/xatu" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.21.0" + otrace "go.opentelemetry.io/otel/trace" +) + +func Tracer() otrace.Tracer { + return otel.GetTracerProvider().Tracer(fmt.Sprintf("%s/observability", xatu.Full())) +} + +// SetupOTelSDK bootstraps the OpenTelemetry pipeline. +// If it does not return an error, make sure to call shutdown for proper cleanup. +func SetupOTelSDK(ctx context.Context, tracerProvider *trace.TracerProvider) (shutdown func(context.Context) error, err error) { + var shutdownFuncs []func(context.Context) error + + // shutdown calls cleanup functions registered via shutdownFuncs. + // The errors from the calls are joined. + // Each registered cleanup will be invoked once. + shutdown = func(ctx context.Context) error { + var err error + + for _, fn := range shutdownFuncs { + err = errors.Join(err, fn(ctx)) + } + + shutdownFuncs = nil + + return err + } + + shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown) + otel.SetTracerProvider(tracerProvider) + + return shutdown, nil +} + +func NewResource(serviceName, serviceVersion string) (*resource.Resource, error) { + res, err := resource.New(context.Background(), + resource.WithFromEnv(), + resource.WithProcess(), + resource.WithOS(), + resource.WithContainer(), + resource.WithHost()) + if err != nil { + return nil, fmt.Errorf("creating resource: %w", err) + } + + return resource.Merge(res, + resource.NewWithAttributes(semconv.SchemaURL, + semconv.ServiceName(serviceName), + semconv.ServiceVersion(serviceVersion), + ), + ) +} + +func NewHTTPTraceProvider(ctx context.Context, res *resource.Resource, opts ...otlptracehttp.Option) (*trace.TracerProvider, error) { + client := otlptracehttp.NewClient(opts...) + + exporter, err := otlptrace.New(ctx, client) + if err != nil { + return nil, fmt.Errorf("creating OTLP trace exporter: %w", err) + } + + traceProvider := trace.NewTracerProvider( + trace.WithBatcher(exporter), + trace.WithResource(res), + ) + + return traceProvider, nil +} diff --git a/pkg/proto/xatu/mode.go b/pkg/proto/xatu/mode.go new file mode 100644 index 00000000..c1c9173b --- /dev/null +++ b/pkg/proto/xatu/mode.go @@ -0,0 +1,12 @@ +package xatu + +type Mode string + +const ( + ModeUnknown Mode = "" + ModeSentry Mode = "sentry" + ModeCannon Mode = "cannon" + ModeServer Mode = "server" + ModeMimicry Mode = "mimicry" + ModeDiscovery Mode = "discovery" +) From c2ef00080f5c425c09c1841297b6080163297aab Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Tue, 3 Oct 2023 15:09:49 +1000 Subject: [PATCH 03/10] chore: Update Go version to 1.20 in GitHub workflows --- .github/workflows/golangci-lint.yaml | 2 +- .github/workflows/test.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/golangci-lint.yaml b/.github/workflows/golangci-lint.yaml index 2b3af1ea..7fb1edea 100644 --- a/.github/workflows/golangci-lint.yaml +++ b/.github/workflows/golangci-lint.yaml @@ -17,7 +17,7 @@ jobs: steps: - uses: actions/setup-go@v4 with: - go-version: 1.19 + go-version: 1.20 - uses: actions/checkout@v3 - name: golangci-lint uses: golangci/golangci-lint-action@v3 diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 9bfc69c7..556bc73f 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -10,7 +10,7 @@ jobs: full_ci: strategy: matrix: - go_version: [ 1.19.x ] + go_version: [ 1.20.x ] runs-on: ubuntu-20.04 From 6064d1143524b3d5836171f5196e52c6c3510016 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Tue, 3 Oct 2023 15:14:14 +1000 Subject: [PATCH 04/10] chore: update golangci-lint version --- .github/workflows/golangci-lint.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/golangci-lint.yaml b/.github/workflows/golangci-lint.yaml index 7fb1edea..81f9a67a 100644 --- a/.github/workflows/golangci-lint.yaml +++ b/.github/workflows/golangci-lint.yaml @@ -23,7 +23,7 @@ jobs: uses: golangci/golangci-lint-action@v3 with: # Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version - version: v1.51.2 + version: latest # Optional: working directory, useful for monorepos # working-directory: somedir From ec96f3fdc2f435629e00edf3ea5ee1ea79e18f2c Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Tue, 3 Oct 2023 15:16:22 +1000 Subject: [PATCH 05/10] chore: Disable Go modules in golangci-lint workflow --- .github/workflows/golangci-lint.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/golangci-lint.yaml b/.github/workflows/golangci-lint.yaml index 81f9a67a..c3590971 100644 --- a/.github/workflows/golangci-lint.yaml +++ b/.github/workflows/golangci-lint.yaml @@ -18,6 +18,8 @@ jobs: - uses: actions/setup-go@v4 with: go-version: 1.20 + env: + GO111MODULE: off - uses: actions/checkout@v3 - name: golangci-lint uses: golangci/golangci-lint-action@v3 From 31bd024f6707c47e10a41a178f529b572a628cca Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Tue, 3 Oct 2023 15:18:40 +1000 Subject: [PATCH 06/10] chore: update golangci-lint version to v1.54 --- .github/workflows/golangci-lint.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/golangci-lint.yaml b/.github/workflows/golangci-lint.yaml index c3590971..7fba64fe 100644 --- a/.github/workflows/golangci-lint.yaml +++ b/.github/workflows/golangci-lint.yaml @@ -18,6 +18,7 @@ jobs: - uses: actions/setup-go@v4 with: go-version: 1.20 + cache: false env: GO111MODULE: off - uses: actions/checkout@v3 @@ -25,7 +26,7 @@ jobs: uses: golangci/golangci-lint-action@v3 with: # Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version - version: latest + version: v1.54 # Optional: working directory, useful for monorepos # working-directory: somedir From 1c68045449ecac599bc36db11977c23abbf4a838 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Tue, 3 Oct 2023 15:21:51 +1000 Subject: [PATCH 07/10] chore: Update Go version to 1.19 --- .github/workflows/golangci-lint.yaml | 3 --- go.mod | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/.github/workflows/golangci-lint.yaml b/.github/workflows/golangci-lint.yaml index 7fba64fe..d1c3a925 100644 --- a/.github/workflows/golangci-lint.yaml +++ b/.github/workflows/golangci-lint.yaml @@ -18,9 +18,6 @@ jobs: - uses: actions/setup-go@v4 with: go-version: 1.20 - cache: false - env: - GO111MODULE: off - uses: actions/checkout@v3 - name: golangci-lint uses: golangci/golangci-lint-action@v3 diff --git a/go.mod b/go.mod index 0030d49b..c7c3602f 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ethpandaops/xatu -go 1.20 +go 1.19 replace github.com/attestantio/go-eth2-client v0.16.3 => github.com/samcm/go-eth2-client v0.15.12 From 506fd2124ec528ce4df473c2a8d1c2f1b73b223d Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Tue, 3 Oct 2023 15:44:57 +1000 Subject: [PATCH 08/10] Go mod tidy --- go.mod | 13 ++++++------- go.sum | 11 +---------- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/go.mod b/go.mod index c7c3602f..200c15b7 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/ethpandaops/ethcore v0.0.0-20230804013106-6453c36c8c30 github.com/ethpandaops/ethwallclock v0.3.0 github.com/go-co-op/gocron v1.27.1 + github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb github.com/google/uuid v1.3.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/huandu/go-sqlbuilder v1.21.0 @@ -30,6 +31,11 @@ require ( github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.7.0 github.com/stretchr/testify v1.8.4 + go.opentelemetry.io/otel v1.19.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 + go.opentelemetry.io/otel/sdk v1.19.0 + go.opentelemetry.io/otel/trace v1.19.0 golang.org/x/sync v0.3.0 google.golang.org/grpc v1.58.2 google.golang.org/protobuf v1.31.0 @@ -62,7 +68,6 @@ require ( github.com/gofrs/flock v0.8.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect github.com/holiman/bloomfilter/v2 v2.0.3 // indirect @@ -110,12 +115,7 @@ require ( github.com/tklauser/go-sysconf v0.3.11 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.3 // indirect - go.opentelemetry.io/otel v1.19.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 // indirect go.opentelemetry.io/otel/metric v1.19.0 // indirect - go.opentelemetry.io/otel/sdk v1.19.0 // indirect - go.opentelemetry.io/otel/trace v1.19.0 // indirect go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.uber.org/atomic v1.10.0 // indirect golang.org/x/crypto v0.11.0 // indirect @@ -124,7 +124,6 @@ require ( golang.org/x/sys v0.12.0 // indirect golang.org/x/text v0.11.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect - google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect diff --git a/go.sum b/go.sum index 6855f8e8..b901c233 100644 --- a/go.sum +++ b/go.sum @@ -162,6 +162,7 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v1.1.0 h1:/d3pCKDPWNnvIWe0vVUpNP32qc8U3PDVxySP/y360qE= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -437,7 +438,6 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= @@ -565,8 +565,6 @@ golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= -golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= -golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -651,8 +649,6 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= -golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -775,10 +771,7 @@ google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7Fc google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= -google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 h1:DdoeryqhaXp1LtT/emMP1BRJPHHKFi5akj/nbx/zNTA= -google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s= google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98 h1:Z0hjGZePRE0ZBWotvtrwxFNrNE9CUAGtplaDK5NNI/g= -google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98/go.mod h1:S7mY02OqCJTD0E1OiQy1F72PWFB4bZJ87cAtLPYgDR0= google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 h1:FmF5cCW94Ij59cfpoLiwTgodWmm60eEV0CjlsVg2fuw= google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98/go.mod h1:rsr7RhLuwsDKL7RmgDDCUc6yaGr1iqceVb5Wv6f6YvQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 h1:bVf09lpb+OJbByTj913DRJioFFAjf/ZGxEz7MajTp2U= @@ -795,8 +788,6 @@ google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKa google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= -google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag= -google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8= google.golang.org/grpc v1.58.2 h1:SXUpjxeVF3FKrTYQI4f4KvbGD5u2xccdYdurwowix5I= google.golang.org/grpc v1.58.2/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= From 9a473148308070c817cd4b4eb562a75b5c0e13ba Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Tue, 3 Oct 2023 15:48:59 +1000 Subject: [PATCH 09/10] build: Update Go version and enable skip-cache option --- .github/workflows/golangci-lint.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/golangci-lint.yaml b/.github/workflows/golangci-lint.yaml index d1c3a925..51d42946 100644 --- a/.github/workflows/golangci-lint.yaml +++ b/.github/workflows/golangci-lint.yaml @@ -17,7 +17,7 @@ jobs: steps: - uses: actions/setup-go@v4 with: - go-version: 1.20 + go-version: '1.20' - uses: actions/checkout@v3 - name: golangci-lint uses: golangci/golangci-lint-action@v3 @@ -36,7 +36,7 @@ jobs: # Optional: if set to true then the all caching functionality will be complete disabled, # takes precedence over all other caching options. - # skip-cache: true + skip-cache: true # Optional: if set to true then the action don't cache or restore ~/go/pkg. # skip-pkg-cache: true From 798077c33fe308ed847c9647f57df53b24457004 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Tue, 3 Oct 2023 15:54:24 +1000 Subject: [PATCH 10/10] chore: Remove unused linter depguard --- .github/workflows/golangci-lint.yaml | 2 +- .golangci.yml | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/golangci-lint.yaml b/.github/workflows/golangci-lint.yaml index 51d42946..7a5b84ff 100644 --- a/.github/workflows/golangci-lint.yaml +++ b/.github/workflows/golangci-lint.yaml @@ -36,7 +36,7 @@ jobs: # Optional: if set to true then the all caching functionality will be complete disabled, # takes precedence over all other caching options. - skip-cache: true + # skip-cache: true # Optional: if set to true then the action don't cache or restore ~/go/pkg. # skip-pkg-cache: true diff --git a/.golangci.yml b/.golangci.yml index 02ea52c7..f9348a7c 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -25,7 +25,6 @@ linters: - bodyclose - containedctx - decorder - - depguard - dogsled - durationcheck - errcheck