Skip to content

Commit

Permalink
hide exposed dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
DMwangnima committed Nov 21, 2024
1 parent 3cc884a commit e153228
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 331 deletions.
13 changes: 8 additions & 5 deletions internal/remote/trans/grpc/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,6 @@ type Error struct {
kerr error
}

// GetMappingErr returns the Kitex custom error that status Error maps to
func (e *Error) GetMappingErr() error {
return e.kerr
}

func (e *Error) Error() string {
str := fmt.Sprintf("rpc error: code = %d desc = %s", codes.Code(e.e.GetCode()), e.e.GetMessage())
if e.kerr == nil {
Expand Down Expand Up @@ -221,6 +216,14 @@ func (e *Error) Is(target error) bool {
return false
}

// GetMappingErr retrieves the Kitex error that status maps to
func GetMappingErr(e *Error) error {
if e != nil {
return e.kerr
}
return nil
}

// FromError returns a Status representing err if it was produced from this
// package or has a method `GRPCStatus() *Status`. Otherwise, ok is false and a
// Status is returned with codes.Unknown and the original error message.
Expand Down
10 changes: 5 additions & 5 deletions pkg/remote/trans/nphttp2/grpc/err_handling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ func TestErrorHandling(t *testing.T) {
{
desc: "normal RstCode",
errs: expectedErrs{
errBizCanceledVal: {kerrors.ErrBizCanceled, errRecvRstStream},
errMiddleHeaderVal: {errMiddleHeader, errRecvRstStream},
errDecodeHeaderVal: {errDecodeHeader, errRecvRstStream},
errHTTP2StreamVal: {errHTTP2Stream, errRecvRstStream},
errBizCanceledVal: {kerrors.ErrBizCanceled, errRecvUpstreamRstStream},
errMiddleHeaderVal: {errMiddleHeader, errRecvUpstreamRstStream},
errDecodeHeaderVal: {errDecodeHeader, errRecvUpstreamRstStream},
errHTTP2StreamVal: {errHTTP2Stream, errRecvUpstreamRstStream},
errClosedWithoutTrailerVal: {errClosedWithoutTrailer, nil},
errRecvRstStreamVal: {errRecvRstStream, nil},
errRecvRstStreamVal: {errRecvDownStreamRstStream, nil},
},
},
{
Expand Down
259 changes: 189 additions & 70 deletions pkg/remote/trans/nphttp2/grpc/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,56 +21,34 @@
package grpc

import (
"context"
"errors"
"fmt"
"io"
"time"

"github.com/cloudwego/kitex/pkg/kerrors"
)
"golang.org/x/net/http2"

type ErrType = uint32

const (
ErrTypeGracefulShutdown = iota + 1
ErrTypeBizCanceled
ErrTypeHandlerReturn
ErrTypeStreamingCanceled
ErrTypeStreamTimeout
ErrTypeMetaSizeExceeded

ErrTypeHTTP2Stream
ErrTypeClosedWithoutTrailer
ErrTypeMiddleHeader
ErrTypeDecodeHeader
ErrTypeRecvRstStream
ErrTypeStreamDrain
ErrTypeStreamFlowControl
ErrTypeIllegalHeaderWrite
ErrTypeStreamIsDone
ErrTypeMaxStreamExceeded

ErrTypeHTTP2Connection
ErrTypeEstablishConnection
ErrTypeHandleGoAway
ErrTypeKeepAlive
ErrTypeOperateHeaders
ErrTypeNoActiveStream
ErrTypeControlBufFinished
ErrTypeNotReachable
ErrTypeConnectionIsClosing
istatus "github.com/cloudwego/kitex/internal/remote/trans/grpc/status"
"github.com/cloudwego/kitex/pkg/kerrors"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/codes"
)

// This file contains all the errors suitable for Kitex errors model.
var (
// stream error
errHTTP2Stream = fmt.Errorf("%w - %s", kerrors.ErrStreamingProtocol, "HTTP2Stream err when parsing HTTP2 frame")
errClosedWithoutTrailer = fmt.Errorf("%w - %s", kerrors.ErrStreamingProtocol, "client received Data frame with END_STREAM flag")
errMiddleHeader = fmt.Errorf("%w - %s", kerrors.ErrStreamingProtocol, "Headers frame appeared in the middle of a stream")
errDecodeHeader = fmt.Errorf("%w - %s", kerrors.ErrStreamingProtocol, "decoded Headers frame failed")
errRecvRstStream = fmt.Errorf("%w - %s", kerrors.ErrStreamingProtocol, "received RstStream frame")
errStreamDrain = fmt.Errorf("%w - %s", kerrors.ErrStreamingProtocol, "stream rejected by draining connection")
errStreamFlowControl = fmt.Errorf("%w - %s", kerrors.ErrStreamingProtocol, "stream-level flow control")
errIllegalHeaderWrite = fmt.Errorf("%w - %s", kerrors.ErrStreamingProtocol, "Headers frame has been already sent by server")
errStreamIsDone = fmt.Errorf("%w - %s", kerrors.ErrStreamingProtocol, "stream is done")
errMaxStreamExceeded = fmt.Errorf("%w - %s", kerrors.ErrStreamingProtocol, "max stream exceeded")
errHTTP2Stream = fmt.Errorf("%w - %s", kerrors.ErrStreamingProtocol, "HTTP2Stream err when parsing HTTP2 frame")
errClosedWithoutTrailer = fmt.Errorf("%w - %s", kerrors.ErrStreamingProtocol, "client received Data frame with END_STREAM flag")
errMiddleHeader = fmt.Errorf("%w - %s", kerrors.ErrStreamingProtocol, "Headers frame appeared in the middle of a stream")
errDecodeHeader = fmt.Errorf("%w - %s", kerrors.ErrStreamingProtocol, "decoded Headers frame failed")
errRecvDownStreamRstStream = fmt.Errorf("%w - %s", kerrors.ErrStreamingProtocol, "received RstStream frame from downstream")
errRecvUpstreamRstStream = fmt.Errorf("%w - %s", kerrors.ErrStreamingProtocol, "receive RstStream frame fron upstream")
errStreamDrain = fmt.Errorf("%w - %s", kerrors.ErrStreamingProtocol, "stream rejected by draining connection")
errStreamFlowControl = fmt.Errorf("%w - %s", kerrors.ErrStreamingProtocol, "stream-level flow control")
errIllegalHeaderWrite = fmt.Errorf("%w - %s", kerrors.ErrStreamingProtocol, "Headers frame has been already sent by server")
errStreamIsDone = fmt.Errorf("%w - %s", kerrors.ErrStreamingProtocol, "stream is done")
errMaxStreamExceeded = fmt.Errorf("%w - %s", kerrors.ErrStreamingProtocol, "max stream exceeded")

// connection error
errHTTP2Connection = fmt.Errorf("%w - %s", kerrors.ErrStreamingProtocol, "HTTP2Connection err when parsing HTTP2 frame")
Expand All @@ -84,32 +62,173 @@ var (
errConnectionIsClosing = fmt.Errorf("%w - %s", kerrors.ErrStreamingProtocol, "connection is closing")
)

var errType2ErrMap = map[ErrType]error{
ErrTypeGracefulShutdown: kerrors.ErrGracefulShutdown,
ErrTypeBizCanceled: kerrors.ErrBizCanceled,
ErrTypeHandlerReturn: kerrors.ErrHandlerReturn,
ErrTypeStreamingCanceled: kerrors.ErrStreamingCanceled,
ErrTypeStreamTimeout: kerrors.ErrStreamTimeout,
ErrTypeMetaSizeExceeded: kerrors.ErrMetaSizeExceeded,
// stream error type
ErrTypeHTTP2Stream: errHTTP2Stream,
ErrTypeClosedWithoutTrailer: errClosedWithoutTrailer,
ErrTypeMiddleHeader: errMiddleHeader,
ErrTypeDecodeHeader: errDecodeHeader,
ErrTypeRecvRstStream: errRecvRstStream,
ErrTypeStreamDrain: errStreamDrain,
ErrTypeStreamFlowControl: errStreamFlowControl,
ErrTypeIllegalHeaderWrite: errIllegalHeaderWrite,
ErrTypeStreamIsDone: errStreamIsDone,
ErrTypeMaxStreamExceeded: errMaxStreamExceeded,
// connection error type
ErrTypeHTTP2Connection: errHTTP2Connection,
ErrTypeEstablishConnection: errEstablishConnection,
ErrTypeHandleGoAway: errHandleGoAway,
ErrTypeKeepAlive: errKeepAlive,
ErrTypeOperateHeaders: errOperateHeaders,
ErrTypeNoActiveStream: errNoActiveStream,
ErrTypeControlBufFinished: errControlBufFinished,
ErrTypeNotReachable: errNotReachable,
ErrTypeConnectionIsClosing: errConnectionIsClosing,
var rstCode2ErrMap = map[http2.ErrCode]error{
http2.ErrCode(1000): kerrors.ErrGracefulShutdown,
http2.ErrCode(1001): kerrors.ErrBizCanceled,
http2.ErrCode(1002): kerrors.ErrHandlerReturn,
http2.ErrCode(1003): kerrors.ErrStreamingCanceled,
http2.ErrCode(1004): errRecvDownStreamRstStream,
http2.ErrCode(1005): errRecvUpstreamRstStream,
http2.ErrCode(1006): kerrors.ErrStreamTimeout,
http2.ErrCode(1007): errMiddleHeader,
http2.ErrCode(1008): errDecodeHeader,
http2.ErrCode(1009): errHTTP2Stream,
http2.ErrCode(1010): errClosedWithoutTrailer,
http2.ErrCode(1011): errStreamFlowControl,
http2.ErrCode(1012): kerrors.ErrMetaSizeExceeded,
}

var err2RstCodeMap = map[error]http2.ErrCode{
kerrors.ErrGracefulShutdown: http2.ErrCode(1000),
kerrors.ErrBizCanceled: http2.ErrCode(1001),
kerrors.ErrHandlerReturn: http2.ErrCode(1002),
kerrors.ErrStreamingCanceled: http2.ErrCode(1003),
errRecvDownStreamRstStream: http2.ErrCode(1004),
errRecvUpstreamRstStream: http2.ErrCode(1005),
kerrors.ErrStreamTimeout: http2.ErrCode(1006),
errMiddleHeader: http2.ErrCode(1007),
errDecodeHeader: http2.ErrCode(1008),
errHTTP2Stream: http2.ErrCode(1009),
errClosedWithoutTrailer: http2.ErrCode(1010),
errStreamFlowControl: http2.ErrCode(1011),
kerrors.ErrMetaSizeExceeded: http2.ErrCode(1012),
}

var err2StatusCodeMap = map[error]codes.Code{
// kitex global error
kerrors.ErrGracefulShutdown: codes.Code(200),
kerrors.ErrBizCanceled: codes.Code(201),
kerrors.ErrHandlerReturn: codes.Code(202),
kerrors.ErrStreamingCanceled: codes.Code(203),
kerrors.ErrStreamTimeout: codes.Code(204),
kerrors.ErrMetaSizeExceeded: codes.Code(205),
// stream error
errHTTP2Stream: codes.Code(10000),
errClosedWithoutTrailer: codes.Code(10001),
errMiddleHeader: codes.Code(10002),
errDecodeHeader: codes.Code(10003),
errRecvDownStreamRstStream: codes.Code(10004),
errRecvUpstreamRstStream: codes.Code(10005),
errStreamDrain: codes.Code(10006),
errStreamFlowControl: codes.Code(10007),
errIllegalHeaderWrite: codes.Code(10008),
errStreamIsDone: codes.Code(10009),
errMaxStreamExceeded: codes.Code(10010),
// connection error
errHTTP2Connection: codes.Code(11000),
errEstablishConnection: codes.Code(11001),
errHandleGoAway: codes.Code(11002),
errKeepAlive: codes.Code(11003),
errOperateHeaders: codes.Code(11004),
errNoActiveStream: codes.Code(11005),
errControlBufFinished: codes.Code(11006),
errNotReachable: codes.Code(11007),
errConnectionIsClosing: codes.Code(11008),
}

var crossStreamErrMap = map[error]bool{
kerrors.ErrStreamTimeout: true,
kerrors.ErrBizCanceled: true,
kerrors.ErrGracefulShutdown: true,
kerrors.ErrHandlerReturn: true,
kerrors.ErrStreamingCanceled: true,
errRecvUpstreamRstStream: true,
}

var customRstCodeEnabled = false

// SetCustomRstCodeEnabled enables/disables the ErrType and custom RstCode mapping.
// it is off by default.
func SetCustomRstCodeEnabled(flag bool) {
customRstCodeEnabled = flag
}

func isCustomRstCodeEnabled() bool {
return customRstCodeEnabled
}

func getMappingErrAndStatusCode(ctx context.Context, rstCode http2.ErrCode, s side) (error, codes.Code) {
mappingErr := getMappingErr(rstCode, s)
stCode := retrieveStatusCode(ctx, rstCode, mappingErr)
return mappingErr, stCode
}

func getMappingErr(rstCode http2.ErrCode, s side) error {
err, ok := rstCode2ErrMap[rstCode]
if !isCustomRstCodeEnabled() || !ok {
if s == clientSide {
return errRecvDownStreamRstStream
}
return errRecvUpstreamRstStream
}
return err
}

func retrieveStatusCode(ctx context.Context, errCode http2.ErrCode, mappingErr error) codes.Code {
stCode, ok := http2ErrConvTab[errCode]
if !ok {
if !isCustomRstCodeEnabled() {
klog.Warnf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received nhttp2 error %v", errCode)
stCode = codes.Unknown
} else {
stCode = codes.Canceled
if errors.Is(mappingErr, kerrors.ErrGracefulShutdown) {
stCode = codes.Unavailable
}
if errors.Is(mappingErr, kerrors.ErrStreamTimeout) {
stCode = codes.DeadlineExceeded
}
}
}
if stCode == codes.Canceled {
if d, ok := ctx.Deadline(); ok && !d.After(time.Now()) {
// Our deadline was already exceeded, and that was likely the cause
// of this cancelation. Alter the status code accordingly.
stCode = codes.DeadlineExceeded
}
}
return stCode
}

func getRstCode(err error) (rstCode http2.ErrCode) {
if err == nil || err == io.EOF {
return http2.ErrCodeNo
}
rstCode = http2.ErrCodeCancel
statusErr, ok := err.(*istatus.Error)
if !ok {
return
}
mappingErr := istatus.GetMappingErr(statusErr)
if mappingErr == nil {
return
}
code, ok := err2RstCodeMap[mappingErr]
if !ok {
return
}
return code
}

var customStatusCodeEnabled = false

// SetCustomStatusCodeEnabled enables/disables the ErrType and custom Status code mapping.
// it is off by default.
func SetCustomStatusCodeEnabled(flag bool) {
customStatusCodeEnabled = flag
}

func isCustomStatusCodeEnabled() bool {
return customStatusCodeEnabled
}

func getStatusCode(def codes.Code, mappingErr error) codes.Code {
if !isCustomStatusCodeEnabled() {
return def
}
code, ok := err2StatusCodeMap[mappingErr]
if ok {
return code
}
return def
}
3 changes: 2 additions & 1 deletion pkg/remote/trans/nphttp2/grpc/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ var errs = []error{
errClosedWithoutTrailer,
errMiddleHeader,
errDecodeHeader,
errRecvRstStream,
errRecvDownStreamRstStream,
errRecvUpstreamRstStream,
errStreamDrain,
errStreamFlowControl,
errIllegalHeaderWrite,
Expand Down
2 changes: 1 addition & 1 deletion pkg/remote/trans/nphttp2/grpc/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
atomic.StoreUint32(&s.unprocessed, 1)
}

mappingErr, stCode := getMappingErrAndStatusCode(s.ctx, f.ErrCode)
mappingErr, stCode := getMappingErrAndStatusCode(s.ctx, f.ErrCode, clientSide)

st := newStatusf(stCode, mappingErr, "stream terminated by RST_STREAM with error code: %v", f.ErrCode)
t.closeStream(s, io.EOF, false, http2.ErrCodeNo, st, nil, false)
Expand Down
2 changes: 1 addition & 1 deletion pkg/remote/trans/nphttp2/grpc/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ func (t *http2Server) handleData(f *grpcframe.DataFrame) {
func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
// If the stream is not deleted from the transport's active streams map, then do a regular close stream.
if s, ok := t.getStream(f); ok {
mappingErr, stCode := getMappingErrAndStatusCode(s.ctx, f.ErrCode)
mappingErr, stCode := getMappingErrAndStatusCode(s.ctx, f.ErrCode, serverSide)
stErr := newStatusf(stCode, mappingErr, "transport: RSTStream Frame received with error code: %d [triggered by %s]", f.ErrCode, s.sourceService).Err()
klog.CtxInfof(s.ctx, "transport: http2Server.handleRSTStream received RSTStream Frame with error code: %v", f.ErrCode)
t.closeStream(s, stErr, false, 0, false)
Expand Down
Loading

0 comments on commit e153228

Please sign in to comment.