diff --git a/pkg/server/service/coordinator/client.go b/pkg/server/service/coordinator/client.go index 1efacd04..ca1ce9ac 100644 --- a/pkg/server/service/coordinator/client.go +++ b/pkg/server/service/coordinator/client.go @@ -17,6 +17,8 @@ import ( n "github.com/ethpandaops/xatu/pkg/server/service/coordinator/node" "github.com/sirupsen/logrus" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -66,7 +68,7 @@ func (c *Client) Start(ctx context.Context, grpcServer *grpc.Server) error { xatu.RegisterCoordinatorServer(grpcServer, c) if err := c.nodeRecord.Start(ctx); err != nil { - return err + return status.Error(codes.Internal, err.Error()) } return nil @@ -91,7 +93,7 @@ func (c *Client) CreateNodeRecords(ctx context.Context, req *xatu.CreateNodeReco pRecord, err := node.Parse(record) if err != nil { - return nil, err + return nil, status.Error(codes.InvalidArgument, err.Error()) } if c.geoipProvider != nil { @@ -140,7 +142,7 @@ func (c *Client) ListStalledExecutionNodeRecords(ctx context.Context, req *xatu. nodeRecords, err := c.persistence.CheckoutStalledExecutionNodeRecords(ctx, pageSize) if err != nil { - return nil, err + return nil, status.Error(codes.Internal, err.Error()) } response := &xatu.ListStalledExecutionNodeRecordsResponse{ @@ -156,10 +158,10 @@ func (c *Client) ListStalledExecutionNodeRecords(ctx context.Context, req *xatu. func (c *Client) CreateExecutionNodeRecordStatus(ctx context.Context, req *xatu.CreateExecutionNodeRecordStatusRequest) (*xatu.CreateExecutionNodeRecordStatusResponse, error) { if req.Status == nil { - return nil, fmt.Errorf("status is required") + return nil, status.Errorf(codes.InvalidArgument, "status is required") } - status := node.Execution{ + st := node.Execution{ Enr: req.Status.NodeRecord, Name: req.Status.Name, ProtocolVersion: fmt.Sprintf("%v", req.Status.ProtocolVersion), @@ -170,8 +172,8 @@ func (c *Client) CreateExecutionNodeRecordStatus(ctx context.Context, req *xatu. } if req.Status.ForkId != nil { - status.ForkIDHash = req.Status.ForkId.Hash - status.ForkIDNext = fmt.Sprintf("%v", req.Status.ForkId.Next) + st.ForkIDHash = req.Status.ForkId.Hash + st.ForkIDNext = fmt.Sprintf("%v", req.Status.ForkId.Next) } if req.Status.Capabilities != nil { @@ -180,19 +182,19 @@ func (c *Client) CreateExecutionNodeRecordStatus(ctx context.Context, req *xatu. capabilitiesStr = append(capabilitiesStr, cap.GetName()+"/"+fmt.Sprint(cap.GetVersion())) } - status.Capabilities = strings.Join(capabilitiesStr, ",") + st.Capabilities = strings.Join(capabilitiesStr, ",") } result := "error" defer func() { // TODO(sam.calder-mason): Derive client id/name from the request jwt - c.metrics.AddExecutionNodeRecordStatusReceived(1, "unknown", result, status.NetworkID, fmt.Sprintf("0x%x", status.ForkIDHash)) + c.metrics.AddExecutionNodeRecordStatusReceived(1, "unknown", result, st.NetworkID, fmt.Sprintf("0x%x", st.ForkIDHash)) }() - err := c.persistence.InsertNodeRecordExecution(ctx, &status) + err := c.persistence.InsertNodeRecordExecution(ctx, &st) if err != nil { - return nil, err + return nil, status.Error(codes.Internal, err.Error()) } result = "success" @@ -205,7 +207,7 @@ func (c *Client) CreateExecutionNodeRecordStatus(ctx context.Context, req *xatu. err = c.persistence.UpdateNodeRecord(ctx, nodeRecord) if err != nil { - return nil, err + return nil, status.Error(codes.Internal, err.Error()) } return &xatu.CreateExecutionNodeRecordStatusResponse{}, nil @@ -217,7 +219,7 @@ func (c *Client) CoordinateExecutionNodeRecords(ctx context.Context, req *xatu.C activities := []*node.Activity{} if req.ClientId == "" { - return nil, fmt.Errorf("client id is required") + return nil, status.Errorf(codes.InvalidArgument, "client id is required") } for _, record := range req.NodeRecords { @@ -252,7 +254,7 @@ func (c *Client) CoordinateExecutionNodeRecords(ctx context.Context, req *xatu.C if limit > 0 { newNodeRecords, err := c.persistence.ListAvailableExecutionNodeRecords(ctx, req.ClientId, ignoredNodeRecords, req.NetworkIds, req.ForkIdHashes, int(limit)) if err != nil { - return nil, err + return nil, status.Error(codes.Internal, err.Error()) } for _, record := range newNodeRecords { @@ -271,7 +273,7 @@ func (c *Client) CoordinateExecutionNodeRecords(ctx context.Context, req *xatu.C if len(activities) != 0 { err := c.persistence.UpsertNodeRecordActivities(ctx, activities) if err != nil { - return nil, err + return nil, status.Error(codes.Internal, err.Error()) } } @@ -284,11 +286,11 @@ func (c *Client) CoordinateExecutionNodeRecords(ctx context.Context, req *xatu.C func (c *Client) GetDiscoveryNodeRecord(ctx context.Context, req *xatu.GetDiscoveryNodeRecordRequest) (*xatu.GetDiscoveryNodeRecordResponse, error) { records, err := c.persistence.ListNodeRecordExecutions(ctx, req.NetworkIds, req.ForkIdHashes, 100) if err != nil { - return nil, err + return nil, status.Error(codes.Internal, err.Error()) } if len(records) == 0 { - return nil, fmt.Errorf("no records found") + return nil, status.Errorf(codes.NotFound, "no records found") } //nolint:gosec // not a security issue @@ -302,7 +304,7 @@ func (c *Client) GetDiscoveryNodeRecord(ctx context.Context, req *xatu.GetDiscov func (c *Client) GetCannonLocation(ctx context.Context, req *xatu.GetCannonLocationRequest) (*xatu.GetCannonLocationResponse, error) { location, err := c.persistence.GetCannonLocationByNetworkIDAndType(ctx, req.NetworkId, req.Type.Enum().String()) if err != nil && err != persistence.ErrCannonLocationNotFound { - return nil, err + return nil, status.Error(codes.Internal, err.Error()) } rsp := &xatu.GetCannonLocationResponse{} @@ -313,7 +315,7 @@ func (c *Client) GetCannonLocation(ctx context.Context, req *xatu.GetCannonLocat protoLoc, err := location.Unmarshal() if err != nil { - return nil, err + return nil, status.Error(codes.Internal, err.Error()) } return &xatu.GetCannonLocationResponse{ @@ -326,12 +328,12 @@ func (c *Client) UpsertCannonLocation(ctx context.Context, req *xatu.UpsertCanno err := newLocation.Marshal(req.Location) if err != nil { - return nil, err + return nil, status.Error(codes.InvalidArgument, err.Error()) } err = c.persistence.UpsertCannonLocation(ctx, newLocation) if err != nil { - return nil, err + return nil, status.Error(codes.Internal, err.Error()) } return &xatu.UpsertCannonLocationResponse{}, nil diff --git a/pkg/server/service/event-ingester/handler.go b/pkg/server/service/event-ingester/handler.go index e8578da5..ba4b8168 100644 --- a/pkg/server/service/event-ingester/handler.go +++ b/pkg/server/service/event-ingester/handler.go @@ -3,6 +3,7 @@ package eventingester import ( "context" "errors" + "fmt" "net" "strings" "time" @@ -135,13 +136,13 @@ func (h *Handler) Events(ctx context.Context, clientID string, events []*xatu.De if err != nil { h.log.WithError(err).WithField("event", eventName).Warn("failed to create event handler") - continue + return nil, fmt.Errorf("failed to create event for %s event handler: %w ", eventName, err) } if err := e.Validate(ctx); err != nil { h.log.WithError(err).WithField("event", eventName).Warn("failed to validate event") - continue + return nil, fmt.Errorf("%s event failed validation: %w", eventName, err) } if shouldFilter := e.Filter(ctx); shouldFilter { diff --git a/pkg/server/service/event-ingester/ingester.go b/pkg/server/service/event-ingester/ingester.go index c1008d2d..cbe9dc8d 100644 --- a/pkg/server/service/event-ingester/ingester.go +++ b/pkg/server/service/event-ingester/ingester.go @@ -11,6 +11,8 @@ import ( "github.com/ethpandaops/xatu/pkg/server/store" "github.com/sirupsen/logrus" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -57,7 +59,7 @@ func (e *Ingester) Stop(ctx context.Context) error { for _, sink := range e.sinks { if err := sink.Stop(ctx); err != nil { - return err + return status.Error(codes.Internal, err.Error()) } } @@ -72,12 +74,12 @@ func (e *Ingester) CreateEvents(ctx context.Context, req *xatu.CreateEventsReque filteredEvents, err := e.handler.Events(ctx, clientID, req.Events) if err != nil { - return nil, err + return nil, status.Error(codes.Internal, err.Error()) } for _, sink := range e.sinks { if err := sink.HandleNewDecoratedEvents(ctx, filteredEvents); err != nil { - return nil, err + return nil, status.Error(codes.Internal, err.Error()) } }