Skip to content

Commit

Permalink
fix(client): return gRPC status error on Start and CreateNodeRecords (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Sep 18, 2023
1 parent a40e5a4 commit ef135f8
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 26 deletions.
44 changes: 23 additions & 21 deletions pkg/server/service/coordinator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
n "github.com/ethpandaops/xatu/pkg/server/service/coordinator/node"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -66,7 +68,7 @@ func (c *Client) Start(ctx context.Context, grpcServer *grpc.Server) error {
xatu.RegisterCoordinatorServer(grpcServer, c)

if err := c.nodeRecord.Start(ctx); err != nil {
return err
return status.Error(codes.Internal, err.Error())
}

return nil
Expand All @@ -91,7 +93,7 @@ func (c *Client) CreateNodeRecords(ctx context.Context, req *xatu.CreateNodeReco

pRecord, err := node.Parse(record)
if err != nil {
return nil, err
return nil, status.Error(codes.InvalidArgument, err.Error())
}

if c.geoipProvider != nil {
Expand Down Expand Up @@ -140,7 +142,7 @@ func (c *Client) ListStalledExecutionNodeRecords(ctx context.Context, req *xatu.

nodeRecords, err := c.persistence.CheckoutStalledExecutionNodeRecords(ctx, pageSize)
if err != nil {
return nil, err
return nil, status.Error(codes.Internal, err.Error())
}

response := &xatu.ListStalledExecutionNodeRecordsResponse{
Expand All @@ -156,10 +158,10 @@ func (c *Client) ListStalledExecutionNodeRecords(ctx context.Context, req *xatu.

func (c *Client) CreateExecutionNodeRecordStatus(ctx context.Context, req *xatu.CreateExecutionNodeRecordStatusRequest) (*xatu.CreateExecutionNodeRecordStatusResponse, error) {
if req.Status == nil {
return nil, fmt.Errorf("status is required")
return nil, status.Errorf(codes.InvalidArgument, "status is required")
}

status := node.Execution{
st := node.Execution{
Enr: req.Status.NodeRecord,
Name: req.Status.Name,
ProtocolVersion: fmt.Sprintf("%v", req.Status.ProtocolVersion),
Expand All @@ -170,8 +172,8 @@ func (c *Client) CreateExecutionNodeRecordStatus(ctx context.Context, req *xatu.
}

if req.Status.ForkId != nil {
status.ForkIDHash = req.Status.ForkId.Hash
status.ForkIDNext = fmt.Sprintf("%v", req.Status.ForkId.Next)
st.ForkIDHash = req.Status.ForkId.Hash
st.ForkIDNext = fmt.Sprintf("%v", req.Status.ForkId.Next)
}

if req.Status.Capabilities != nil {
Expand All @@ -180,19 +182,19 @@ func (c *Client) CreateExecutionNodeRecordStatus(ctx context.Context, req *xatu.
capabilitiesStr = append(capabilitiesStr, cap.GetName()+"/"+fmt.Sprint(cap.GetVersion()))
}

status.Capabilities = strings.Join(capabilitiesStr, ",")
st.Capabilities = strings.Join(capabilitiesStr, ",")
}

result := "error"

defer func() {
// TODO(sam.calder-mason): Derive client id/name from the request jwt
c.metrics.AddExecutionNodeRecordStatusReceived(1, "unknown", result, status.NetworkID, fmt.Sprintf("0x%x", status.ForkIDHash))
c.metrics.AddExecutionNodeRecordStatusReceived(1, "unknown", result, st.NetworkID, fmt.Sprintf("0x%x", st.ForkIDHash))
}()

err := c.persistence.InsertNodeRecordExecution(ctx, &status)
err := c.persistence.InsertNodeRecordExecution(ctx, &st)
if err != nil {
return nil, err
return nil, status.Error(codes.Internal, err.Error())
}

result = "success"
Expand All @@ -205,7 +207,7 @@ func (c *Client) CreateExecutionNodeRecordStatus(ctx context.Context, req *xatu.

err = c.persistence.UpdateNodeRecord(ctx, nodeRecord)
if err != nil {
return nil, err
return nil, status.Error(codes.Internal, err.Error())
}

return &xatu.CreateExecutionNodeRecordStatusResponse{}, nil
Expand All @@ -217,7 +219,7 @@ func (c *Client) CoordinateExecutionNodeRecords(ctx context.Context, req *xatu.C
activities := []*node.Activity{}

if req.ClientId == "" {
return nil, fmt.Errorf("client id is required")
return nil, status.Errorf(codes.InvalidArgument, "client id is required")
}

for _, record := range req.NodeRecords {
Expand Down Expand Up @@ -252,7 +254,7 @@ func (c *Client) CoordinateExecutionNodeRecords(ctx context.Context, req *xatu.C
if limit > 0 {
newNodeRecords, err := c.persistence.ListAvailableExecutionNodeRecords(ctx, req.ClientId, ignoredNodeRecords, req.NetworkIds, req.ForkIdHashes, int(limit))
if err != nil {
return nil, err
return nil, status.Error(codes.Internal, err.Error())
}

for _, record := range newNodeRecords {
Expand All @@ -271,7 +273,7 @@ func (c *Client) CoordinateExecutionNodeRecords(ctx context.Context, req *xatu.C
if len(activities) != 0 {
err := c.persistence.UpsertNodeRecordActivities(ctx, activities)
if err != nil {
return nil, err
return nil, status.Error(codes.Internal, err.Error())
}
}

Expand All @@ -284,11 +286,11 @@ func (c *Client) CoordinateExecutionNodeRecords(ctx context.Context, req *xatu.C
func (c *Client) GetDiscoveryNodeRecord(ctx context.Context, req *xatu.GetDiscoveryNodeRecordRequest) (*xatu.GetDiscoveryNodeRecordResponse, error) {
records, err := c.persistence.ListNodeRecordExecutions(ctx, req.NetworkIds, req.ForkIdHashes, 100)
if err != nil {
return nil, err
return nil, status.Error(codes.Internal, err.Error())
}

if len(records) == 0 {
return nil, fmt.Errorf("no records found")
return nil, status.Errorf(codes.NotFound, "no records found")
}

//nolint:gosec // not a security issue
Expand All @@ -302,7 +304,7 @@ func (c *Client) GetDiscoveryNodeRecord(ctx context.Context, req *xatu.GetDiscov
func (c *Client) GetCannonLocation(ctx context.Context, req *xatu.GetCannonLocationRequest) (*xatu.GetCannonLocationResponse, error) {
location, err := c.persistence.GetCannonLocationByNetworkIDAndType(ctx, req.NetworkId, req.Type.Enum().String())
if err != nil && err != persistence.ErrCannonLocationNotFound {
return nil, err
return nil, status.Error(codes.Internal, err.Error())
}

rsp := &xatu.GetCannonLocationResponse{}
Expand All @@ -313,7 +315,7 @@ func (c *Client) GetCannonLocation(ctx context.Context, req *xatu.GetCannonLocat

protoLoc, err := location.Unmarshal()
if err != nil {
return nil, err
return nil, status.Error(codes.Internal, err.Error())
}

return &xatu.GetCannonLocationResponse{
Expand All @@ -326,12 +328,12 @@ func (c *Client) UpsertCannonLocation(ctx context.Context, req *xatu.UpsertCanno

err := newLocation.Marshal(req.Location)
if err != nil {
return nil, err
return nil, status.Error(codes.InvalidArgument, err.Error())
}

err = c.persistence.UpsertCannonLocation(ctx, newLocation)
if err != nil {
return nil, err
return nil, status.Error(codes.Internal, err.Error())
}

return &xatu.UpsertCannonLocationResponse{}, nil
Expand Down
5 changes: 3 additions & 2 deletions pkg/server/service/event-ingester/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package eventingester
import (
"context"
"errors"
"fmt"
"net"
"strings"
"time"
Expand Down Expand Up @@ -135,13 +136,13 @@ func (h *Handler) Events(ctx context.Context, clientID string, events []*xatu.De
if err != nil {
h.log.WithError(err).WithField("event", eventName).Warn("failed to create event handler")

continue
return nil, fmt.Errorf("failed to create event for %s event handler: %w ", eventName, err)
}

if err := e.Validate(ctx); err != nil {
h.log.WithError(err).WithField("event", eventName).Warn("failed to validate event")

continue
return nil, fmt.Errorf("%s event failed validation: %w", eventName, err)
}

if shouldFilter := e.Filter(ctx); shouldFilter {
Expand Down
8 changes: 5 additions & 3 deletions pkg/server/service/event-ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/ethpandaops/xatu/pkg/server/store"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -57,7 +59,7 @@ func (e *Ingester) Stop(ctx context.Context) error {

for _, sink := range e.sinks {
if err := sink.Stop(ctx); err != nil {
return err
return status.Error(codes.Internal, err.Error())
}
}

Expand All @@ -72,12 +74,12 @@ func (e *Ingester) CreateEvents(ctx context.Context, req *xatu.CreateEventsReque

filteredEvents, err := e.handler.Events(ctx, clientID, req.Events)
if err != nil {
return nil, err
return nil, status.Error(codes.Internal, err.Error())
}

for _, sink := range e.sinks {
if err := sink.HandleNewDecoratedEvents(ctx, filteredEvents); err != nil {
return nil, err
return nil, status.Error(codes.Internal, err.Error())
}
}

Expand Down

0 comments on commit ef135f8

Please sign in to comment.