Skip to content

Commit

Permalink
rebase and resolve conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
DMwangnima committed Nov 22, 2024
1 parent 1f6b631 commit a9dee0c
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 15 deletions.
3 changes: 2 additions & 1 deletion pkg/remote/trans/nphttp2/grpc/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"golang.org/x/net/http2/hpack"

"github.com/cloudwego/kitex/internal/utils/safemcache"
"github.com/cloudwego/kitex/pkg/kerrors"
"github.com/cloudwego/kitex/pkg/klog"
)

Expand Down Expand Up @@ -569,7 +570,7 @@ func (l *loopyWriter) run(remoteAddr string) (err error) {
err = nil
}
// make sure the Graceful Shutdown behaviour triggered
if errors.Is(err, errGracefulShutdown) {
if errors.Is(err, kerrors.ErrGracefulShutdown) {
l.framer.writer.Flush()
}
}()
Expand Down
2 changes: 1 addition & 1 deletion pkg/remote/trans/nphttp2/grpc/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func Test_getMappingErrAndStatusCode(t *testing.T) {
}{
{
err: kerrors.ErrGracefulShutdown,
stCode: codes.Canceled,
stCode: codes.Unavailable,
},
{
err: kerrors.ErrBizCanceled,
Expand Down
8 changes: 7 additions & 1 deletion pkg/remote/trans/nphttp2/grpc/graceful_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@ import (
"testing"

"github.com/cloudwego/kitex/internal/test"
"github.com/cloudwego/kitex/pkg/kerrors"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/codes"
)

func TestGracefulShutdown(t *testing.T) {
SetCustomRstCodeEnabled(true)
defer func() {
SetCustomRstCodeEnabled(false)
}()
onGoAwayCh := make(chan struct{})
srv, cli := setUpWithOnGoAway(t, 10000, &ServerConfig{MaxStreams: math.MaxUint32}, gracefulShutdown, ConnectOptions{}, func(reason GoAwayReason) {
close(onGoAwayCh)
Expand Down Expand Up @@ -55,6 +60,7 @@ func TestGracefulShutdown(t *testing.T) {
_, err = stream.Read(msg)
test.Assert(t, err != nil, err)
st := stream.Status()
test.Assert(t, strings.Contains(st.Message(), gracefulShutdownMsg), st.Message())
test.Assert(t, errors.Is(st.Err(), kerrors.ErrGracefulShutdown), st)
test.Assert(t, strings.Contains(st.Err().Error(), gracefulShutdownMsg), st.Message())
test.Assert(t, st.Code() == codes.Unavailable, st.Code())
}
12 changes: 4 additions & 8 deletions pkg/remote/trans/nphttp2/grpc/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package grpc

import (
"context"
"errors"
"io"
"math"
"net"
Expand Down Expand Up @@ -542,11 +541,7 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
)
if err != nil {
rst = true
if errors.Is(err, errGracefulShutdown) {
rstCode = gracefulShutdownCode
} else {
rstCode = http2.ErrCodeCancel
}
rstCode = http2.ErrCodeCancel
klog.CtxInfof(s.ctx, "KITEX: stream closed by ctx canceled, err: %v"+sendRSTStreamFrameSuffix, err)
}
t.closeStream(s, err, rst, rstCode, istatus.Convert(err), nil, false)
Expand Down Expand Up @@ -835,7 +830,7 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {

mappingErr, stCode := getMappingErrAndStatusCode(s.ctx, f.ErrCode, clientSide)

st := newStatusf(stCode, mappingErr, "stream terminated by RST_STREAM with error code: %v", f.ErrCode)
st := newStatusf(stCode, mappingErr, "stream terminated by RST_STREAM with error code: %d", f.ErrCode)
t.closeStream(s, io.EOF, false, http2.ErrCodeNo, st, nil, false)
}

Expand Down Expand Up @@ -978,7 +973,8 @@ func (t *http2Client) handleGoAway(f *grpcframe.GoAwayFrame) {
// Pls refer to checkForStreamQuota in NewStream, it gets the controlbuf.mu and
// wants to get the t.mu.
for _, stream := range unprocessedStream {
t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
st := newStatus(codes.Unavailable, errStreamDrain, "the stream is rejected because server is draining the connection")
t.closeStream(stream, st.Err(), false, http2.ErrCodeNo, st, nil, false)
}
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/remote/trans/nphttp2/grpc/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ var (
Err()
errStatusHandlerReturn = newStatus(codes.Canceled, kerrors.ErrServerStreamFinished, "transport: handler return"+triggeredByHandlerSideSuffix).
Err()
errStatusGracefulShutdown = newStatus(codes.Canceled, kerrors.ErrGracefulShutdown, gracefulShutdownMsg).
Err()
)

func init() {
Expand Down Expand Up @@ -1011,7 +1013,7 @@ func (t *http2Server) Close() error {
t.activeStreams = nil
t.mu.Unlock()

finishErr := errGracefulShutdown
finishErr := errStatusGracefulShutdown
finishCh := make(chan struct{}, 1)
activeNums := t.rstActiveStreams(streams, finishErr, gracefulShutdownCode, finishCh)
if activeNums == 0 {
Expand Down
7 changes: 4 additions & 3 deletions pkg/remote/trans/nphttp2/grpc/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,10 @@ func (h *testStreamHandler) gracefulShutdown(t *testing.T, s *Stream) {
_, err = s.Read(msg)
test.Assert(t, err != nil, err)
test.Assert(t, strings.Contains(err.Error(), gracefulShutdownMsg), err)
st, ok := status.FromError(err)
test.Assert(t, ok, err)
test.Assert(t, st.Code() == codes.Unavailable, st)
test.Assert(t, errors.Is(err, kerrors.ErrGracefulShutdown), err)
//st, ok := status.FromError(err)
//test.Assert(t, ok, err)
//test.Assert(t, st.Code() == codes.Unavailable, st)
}

// start starts server. Other goroutines should block on s.readyChan for further operations.
Expand Down

0 comments on commit a9dee0c

Please sign in to comment.