Skip to content

Commit

Permalink
Use single internal method for POST requests.
Browse files Browse the repository at this point in the history
  • Loading branch information
mcdee committed Dec 11, 2024
1 parent dc6247b commit 727775a
Show file tree
Hide file tree
Showing 22 changed files with 237 additions and 146 deletions.
3 changes: 1 addition & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ linters:
- cyclop
- depguard
- dupl
- execinquery
- exhaustive
- exhaustruct
- exportloopref
Expand All @@ -193,13 +192,13 @@ linters:
- gocognit
- goconst
- err113
- gomnd
- ireturn
- lll
- maintidx
- mnd
- musttag
- perfsprint
- recvcheck
- varnamelen
- wrapcheck
- wsl
15 changes: 12 additions & 3 deletions http/attesterduties.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,22 @@ func (s *Service) AttesterDuties(ctx context.Context,
return nil, errors.Join(errors.New("failed to write end of validator index array"), err)
}

url := fmt.Sprintf("/eth/v1/validator/duties/attester/%d", opts.Epoch)
respBodyReader, err := s.post(ctx, url, &reqBodyReader)
endpoint := fmt.Sprintf("/eth/v1/validator/duties/attester/%d", opts.Epoch)
query := ""

httpResponse, err := s.post(ctx,
endpoint,
query,
&api.CommonOpts{},
&reqBodyReader,
ContentTypeJSON,
map[string]string{},
)
if err != nil {
return nil, errors.Join(errors.New("failed to request attester duties"), err)
}

data, metadata, err := decodeJSONResponse(respBodyReader, []*apiv1.AttesterDuty{})
data, metadata, err := decodeJSONResponse(bytes.NewReader(httpResponse.body), []*apiv1.AttesterDuty{})
if err != nil {
return nil, err
}
Expand Down
125 changes: 29 additions & 96 deletions http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,96 +27,18 @@ import (

"github.com/attestantio/go-eth2-client/api"
"github.com/attestantio/go-eth2-client/spec"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

// defaultUserAgent is sent with requests if no other user agent has been supplied.
const defaultUserAgent = "go-eth2-client/0.21.11"
const defaultUserAgent = "go-eth2-client/0.22.0"

// post sends an HTTP post request and returns the body.
func (s *Service) post(ctx context.Context, endpoint string, body io.Reader) (io.Reader, error) {
ctx, span := otel.Tracer("attestantio.go-eth2-client.http").Start(ctx, "post")
defer span.End()

// #nosec G404
log := s.log.With().Str("id", fmt.Sprintf("%02x", rand.Int31())).Str("address", s.address).Str("endpoint", endpoint).Logger()
if e := log.Trace(); e.Enabled() {
bodyBytes, err := io.ReadAll(body)
if err != nil {
return nil, errors.New("failed to read request body")
}
body = bytes.NewReader(bodyBytes)

e.RawJSON("body", bodyBytes).Msg("POST request")
}

callURL := urlForCall(s.base, endpoint, "")
log.Trace().Str("url", callURL.String()).Msg("URL to POST")
span.SetAttributes(attribute.String("url", callURL.String()))

opCtx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()
req, err := http.NewRequestWithContext(opCtx, http.MethodPost, callURL.String(), body)
if err != nil {
span.SetStatus(codes.Error, "Failed to create request")

return nil, errors.Join(errors.New("failed to create POST request"), err)
}

s.addExtraHeaders(req)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
if req.Header.Get("User-Agent") == "" {
req.Header.Set("User-Agent", defaultUserAgent)
}

resp, err := s.client.Do(req)
if err != nil {
go s.CheckConnectionState(ctx)

span.SetStatus(codes.Error, err.Error())
s.monitorPostComplete(ctx, callURL.Path, "failed")

return nil, errors.Join(errors.New("failed to call POST endpoint"), err)
}
defer resp.Body.Close()
log = log.With().Int("status_code", resp.StatusCode).Logger()

data, err := io.ReadAll(resp.Body)
if err != nil {
span.SetStatus(codes.Error, err.Error())
s.monitorPostComplete(ctx, callURL.Path, "failed")

return nil, errors.Join(errors.New("failed to read POST response"), err)
}

statusFamily := statusCodeFamily(resp.StatusCode)
if statusFamily != 2 {
trimmedResponse := bytes.ReplaceAll(bytes.ReplaceAll(data, []byte{0x0a}, []byte{}), []byte{0x0d}, []byte{})
log.Debug().Int("status_code", resp.StatusCode).RawJSON("response", trimmedResponse).Msg("POST failed")

span.SetStatus(codes.Error, fmt.Sprintf("Status code %d", resp.StatusCode))
s.monitorPostComplete(ctx, callURL.Path, "failed")

return nil, &api.Error{
Method: http.MethodPost,
StatusCode: resp.StatusCode,
Endpoint: endpoint,
Data: data,
}
}

log.Trace().Str("response", string(data)).Msg("POST response")
s.monitorPostComplete(ctx, callURL.Path, "succeeded")

return bytes.NewReader(data), nil
}

// post2 sends an HTTP post request and returns the body.
func (s *Service) post2(ctx context.Context,
func (s *Service) post(ctx context.Context,
endpoint string,
query string,
opts *api.CommonOpts,
Expand Down Expand Up @@ -244,12 +166,7 @@ func (s *Service) post2(ctx context.Context,

statusFamily := statusCodeFamily(resp.StatusCode)
if statusFamily != 2 {
if res.contentType == ContentTypeJSON {
trimmedResponse := bytes.ReplaceAll(bytes.ReplaceAll(res.body, []byte{0x0a}, []byte{}), []byte{0x0d}, []byte{})
log.Debug().Int("status_code", resp.StatusCode).RawJSON("response", trimmedResponse).Msg("POST failed")
} else {
log.Debug().Int("status_code", resp.StatusCode).Msg("POST failed")
}
s.logBadStatus(ctx, "POST", res, log)

span.SetStatus(codes.Error, fmt.Sprintf("Status code %d", resp.StatusCode))
s.monitorPostComplete(ctx, callURL.Path, "failed")
Expand All @@ -267,6 +184,23 @@ func (s *Service) post2(ctx context.Context,
return res, nil
}

func (*Service) logBadStatus(_ context.Context,
method string,
res *httpResponse,
log zerolog.Logger,
) {
if res.contentType == ContentTypeJSON {
trimmedResponse := bytes.ReplaceAll(bytes.ReplaceAll(res.body, []byte{0x0a}, []byte{}), []byte{0x0d}, []byte{})
if bytes.HasPrefix(res.body, []byte("{")) {
log.Debug().Int("status_code", res.statusCode).RawJSON("response", trimmedResponse).Msg(method + " failed")
} else {
log.Debug().Int("status_code", res.statusCode).Str("response", string(trimmedResponse)).Msg(method + " failed")
}
} else {
log.Debug().Int("status_code", res.statusCode).Msg(method + " failed")
}
}

func (s *Service) addExtraHeaders(req *http.Request) {
for k, v := range s.extraHeaders {
req.Header.Add(k, v)
Expand Down Expand Up @@ -400,17 +334,9 @@ func (s *Service) get(ctx context.Context,
attribute.String("content-type", res.contentType.String()),
))

if res.contentType == ContentTypeJSON {
if e := log.Trace(); e.Enabled() {
trimmedResponse := bytes.ReplaceAll(bytes.ReplaceAll(res.body, []byte{0x0a}, []byte{}), []byte{0x0d}, []byte{})
e.RawJSON("body", trimmedResponse).Msg("GET response")
}
}

statusFamily := statusCodeFamily(resp.StatusCode)
if statusFamily != 2 {
trimmedResponse := bytes.ReplaceAll(bytes.ReplaceAll(res.body, []byte{0x0a}, []byte{}), []byte{0x0d}, []byte{})
log.Debug().Int("status_code", resp.StatusCode).RawJSON("response", trimmedResponse).Msg("GET failed")
s.logBadStatus(ctx, "GET", res, log)

span.SetStatus(codes.Error, fmt.Sprintf("Status code %d", resp.StatusCode))
s.monitorGetComplete(ctx, callURL.Path, "failed")
Expand All @@ -423,6 +349,13 @@ func (s *Service) get(ctx context.Context,
}
}

if res.contentType == ContentTypeJSON {
if e := log.Trace(); e.Enabled() {
trimmedResponse := bytes.ReplaceAll(bytes.ReplaceAll(res.body, []byte{0x0a}, []byte{}), []byte{0x0d}, []byte{})
e.RawJSON("body", trimmedResponse).Msg("GET response")
}
}

if err := populateConsensusVersion(res, resp); err != nil {
return nil, errors.Join(errors.New("failed to parse consensus version"), err)
}
Expand Down
14 changes: 12 additions & 2 deletions http/submitaggregateattestations.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"errors"

"github.com/attestantio/go-eth2-client/api"
"github.com/attestantio/go-eth2-client/spec/phase0"
)

Expand All @@ -33,8 +34,17 @@ func (s *Service) SubmitAggregateAttestations(ctx context.Context, aggregateAndP
return errors.Join(errors.New("failed to marshal JSON"), err)
}

_, err = s.post(ctx, "/eth/v1/validator/aggregate_and_proofs", bytes.NewBuffer(specJSON))
if err != nil {
endpoint := "/eth/v1/validator/aggregate_and_proofs"
query := ""

if _, err := s.post(ctx,
endpoint,
query,
&api.CommonOpts{},
bytes.NewReader(specJSON),
ContentTypeJSON,
map[string]string{},
); err != nil {
return errors.Join(errors.New("failed to submit aggregate and proofs"), err)
}

Expand Down
14 changes: 12 additions & 2 deletions http/submitattestations.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"errors"

"github.com/attestantio/go-eth2-client/api"
"github.com/attestantio/go-eth2-client/spec/phase0"
)

Expand All @@ -33,8 +34,17 @@ func (s *Service) SubmitAttestations(ctx context.Context, attestations []*phase0
return errors.Join(errors.New("failed to marshal JSON"), err)
}

_, err = s.post(ctx, "/eth/v1/beacon/pool/attestations", bytes.NewBuffer(specJSON))
if err != nil {
endpoint := "/eth/v1/beacon/pool/attestations"
query := ""

if _, err := s.post(ctx,
endpoint,
query,
&api.CommonOpts{},
bytes.NewReader(specJSON),
ContentTypeJSON,
map[string]string{},
); err != nil {
return errors.Join(errors.New("failed to submit beacon attestations"), err)
}

Expand Down
14 changes: 12 additions & 2 deletions http/submitattesterslashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"errors"

"github.com/attestantio/go-eth2-client/api"
"github.com/attestantio/go-eth2-client/spec/phase0"
)

Expand All @@ -33,8 +34,17 @@ func (s *Service) SubmitAttesterSlashing(ctx context.Context, slashing *phase0.A
return errors.Join(errors.New("failed to marshal JSON"), err)
}

_, err = s.post(ctx, "/eth/v1/beacon/pool/attester_slashings", bytes.NewBuffer(specJSON))
if err != nil {
endpoint := "/eth/v1/beacon/pool/attester_slashings"
query := ""

if _, err := s.post(ctx,
endpoint,
query,
&api.CommonOpts{},
bytes.NewReader(specJSON),
ContentTypeJSON,
map[string]string{},
); err != nil {
return errors.Join(errors.New("failed to submit proposal slashing"), err)
}

Expand Down
14 changes: 12 additions & 2 deletions http/submitbeaconblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"

client "github.com/attestantio/go-eth2-client"
"github.com/attestantio/go-eth2-client/api"
"github.com/attestantio/go-eth2-client/spec"
)

Expand Down Expand Up @@ -56,8 +57,17 @@ func (s *Service) SubmitBeaconBlock(ctx context.Context, block *spec.VersionedSi
return errors.Join(errors.New("failed to marshal JSON"), err)
}

_, err = s.post(ctx, "/eth/v1/beacon/blocks", bytes.NewBuffer(specJSON))
if err != nil {
endpoint := "/eth/v1/beacon/blocks"
query := ""

if _, err := s.post(ctx,
endpoint,
query,
&api.CommonOpts{},
bytes.NewReader(specJSON),
ContentTypeJSON,
map[string]string{},
); err != nil {
return errors.Join(errors.New("failed to submit beacon block"), err)
}

Expand Down
24 changes: 18 additions & 6 deletions http/submitbeaconcommitteesubscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,34 @@ import (
"encoding/json"
"errors"

api "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/api"
apiv1 "github.com/attestantio/go-eth2-client/api/v1"
)

// SubmitBeaconCommitteeSubscriptions subscribes to beacon committees.
func (s *Service) SubmitBeaconCommitteeSubscriptions(ctx context.Context, subscriptions []*api.BeaconCommitteeSubscription) error {
func (s *Service) SubmitBeaconCommitteeSubscriptions(ctx context.Context,
subscriptions []*apiv1.BeaconCommitteeSubscription,
) error {
if err := s.assertIsSynced(ctx); err != nil {
return err
}

var reqBodyReader bytes.Buffer
if err := json.NewEncoder(&reqBodyReader).Encode(subscriptions); err != nil {
specJSON, err := json.Marshal(subscriptions)
if err != nil {
return errors.Join(errors.New("failed to encode beacon committee subscriptions"), err)
}

_, err := s.post(ctx, "/eth/v1/validator/beacon_committee_subscriptions", &reqBodyReader)
if err != nil {
endpoint := "/eth/v1/validator/beacon_committee_subscriptions"
query := ""

if _, err := s.post(ctx,
endpoint,
query,
&api.CommonOpts{},
bytes.NewReader(specJSON),
ContentTypeJSON,
map[string]string{},
); err != nil {
return errors.Join(errors.New("failed to request beacon committee subscriptions"), err)
}

Expand Down
13 changes: 11 additions & 2 deletions http/submitblindedbeaconblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,17 @@ func (s *Service) SubmitBlindedBeaconBlock(ctx context.Context, block *api.Versi
return errors.Join(errors.New("failed to marshal JSON"), err)
}

_, err = s.post(ctx, "/eth/v1/beacon/blinded_blocks", bytes.NewBuffer(specJSON))
if err != nil {
endpoint := "/eth/v1/beacon/blinded_blocks"
query := ""

if _, err := s.post(ctx,
endpoint,
query,
&api.CommonOpts{},
bytes.NewReader(specJSON),
ContentTypeJSON,
map[string]string{},
); err != nil {
return errors.Join(errors.New("failed to submit blinded beacon block"), err)
}

Expand Down
Loading

0 comments on commit 727775a

Please sign in to comment.