Skip to content

Commit

Permalink
Merge pull request #1304 from cloudwego/release-v0.9.1
Browse files Browse the repository at this point in the history
chore: release v0.9.1
  • Loading branch information
ppzqh authored Mar 18, 2024
2 parents 92f0cb7 + ff5ca14 commit 8526b3a
Show file tree
Hide file tree
Showing 43 changed files with 2,087 additions and 335 deletions.
14 changes: 9 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ Kitex is designed to be extensible to support multiple RPC messaging protocols.

- **Multi-transport Protocol**

For service governance, Kitex supports **TTHeader** and **HTTP2**. TTHeader can be used in conjunction with Thrift and Kitex Protobuf; HTTP2 is currently mainly used with the gRPC protocol, and it will support Thrift in the future.
For service governance, Kitex supports **TTHeader** and **HTTP2**. TTHeader can be used in conjunction with Thrift and Kitex Protobuf.

- **Multi-message Type**

Kitex supports **PingPong**, **One-way**, and **Bidirectional Streaming**. Among them, One-way currently only supports Thrift protocol, two-way Streaming only supports gRPC, and Kitex will support Thrift's two-way Streaming in the future.
Kitex supports **PingPong**, **One-way**, and **Bidirectional Streaming**. Among them, One-way currently only supports Thrift protocol.

- **Service Governance**

Expand Down Expand Up @@ -73,7 +73,10 @@ Kitex has built-in code generation tools that support generating **Thrift**, **P
- **Reference**

- For Transport Protocol, Exception Instruction and Version Specification, please refer to [doc](https://www.cloudwego.io/docs/kitex/reference/).


- **Best Practice**
- Kitex best practices in production, such as graceful shutdown, error handling, integration testing. [More](https://www.cloudwego.io/docs/kitex/best-practice/)

- **FAQ**

- Please refer to [FAQ](https://www.cloudwego.io/docs/kitex/faq/).
Expand All @@ -92,6 +95,7 @@ We provide the [kitex-benchmark](https://github.com/cloudwego/kitex-benchmark) p
- [biz-demo](https://github.com/cloudwego/biz-demo): Business demos using Kitex.

## Blogs
- [Enhancing Performance in Microservice Architecture with Kitex](https://www.cloudwego.io/blog/2024/01/29/enhancing-performance-in-microservice-architecture-with-kitex/)
- [CloudWeGo: A leading practice for building enterprise cloud native middleware!](https://www.cloudwego.io/blog/2023/06/15/cloudwego-a-leading-practice-for-building-enterprise-cloud-native-middleware/)
- [Kitex: Unifying Open Source Practice for a High-Performance RPC Framework](https://www.cloudwego.io/blog/2022/09/30/kitex-unifying-open-source-practice-for-a-high-performance-rpc-framework/)
- [Performance Optimization on Kitex](https://www.cloudwego.io/blog/2021/09/23/performance-optimization-on-kitex/)
Expand All @@ -100,7 +104,7 @@ We provide the [kitex-benchmark](https://github.com/cloudwego/kitex-benchmark) p

## Contributing

[Contributing](https://github.com/cloudwego/kitex/blob/develop/CONTRIBUTING.md).
Contributor guide: [Contributing](https://github.com/cloudwego/kitex/blob/develop/CONTRIBUTING.md).

## License

Expand All @@ -118,7 +122,7 @@ Kitex is distributed under the [Apache License, version 2.0](https://github.com/
## Landscapes

<p align="center">
<img src="https://landscape.cncf.io/images/left-logo.svg" width="150"/>&nbsp;&nbsp;<img src="https://landscape.cncf.io/images/right-logo.svg" width="200"/>
<img src="https://landscape.cncf.io/images/cncf-landscape-horizontal-color.svg" width="150"/>&nbsp;&nbsp;<img src="https://www.cncf.io/wp-content/uploads/2023/04/cncf-main-site-logo.svg" width="200"/>
<br/><br/>
CloudWeGo enriches the <a href="https://landscape.cncf.io/">CNCF CLOUD NATIVE Landscape</a>.
</p>
14 changes: 9 additions & 5 deletions README_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ Kitex[kaɪt'eks] 字节跳动内部的 Golang 微服务 RPC 框架,具有**高

- **多传输协议**

传输协议封装消息协议进行 RPC 互通,传输协议可以额外透传元信息,用于服务治理,Kitex 支持的传输协议有 **TTHeader****HTTP2**。TTHeader 可以和 Thrift、Kitex Protobuf 结合使用;HTTP2 目前主要是结合 gRPC 协议使用,后续也会支持 Thrift
传输协议封装消息协议进行 RPC 互通,传输协议可以额外透传元信息,用于服务治理,Kitex 支持的传输协议有 **TTHeader****HTTP2**。TTHeader 可以和 Thrift、Kitex Protobuf 结合使用。

- **多种消息类型**

支持 **PingPong****Oneway****双向 Streaming**。其中 Oneway 目前只对 Thrift 协议支持,双向 Streaming 只对 gRPC 支持,后续会考虑支持 Thrift 的双向 Streaming
支持 **PingPong****Oneway****双向 Streaming**。其中 Oneway 目前只对 Thrift 协议支持。

- **服务治理**

Expand Down Expand Up @@ -74,6 +74,9 @@ Kitex[kaɪt'eks] 字节跳动内部的 Golang 微服务 RPC 框架,具有**高

- 关于应用层传输协议 TTHeader、异常说明与版本管理,请参考[文档](https://www.cloudwego.io/zh/docs/kitex/reference/)

- **最佳实践**
- Kitex 在生产环境下的最佳实践,如优雅停机、错误处理、集成测试,详见:[文档](https://www.cloudwego.io/zh/docs/kitex/best-practice/)

- **FAQ**
- 请参考 [FAQ](https://www.cloudwego.io/zh/docs/kitex/faq/)
## 框架性能
Expand All @@ -91,14 +94,15 @@ Kitex[kaɪt'eks] 字节跳动内部的 Golang 微服务 RPC 框架,具有**高

## 相关文章

- [Kitex 两周年回顾 — 能力升级、社区合作与未来展望](https://www.cloudwego.io/zh/blog/2023/11/30/kitex-%E4%B8%A4%E5%91%A8%E5%B9%B4%E5%9B%9E%E9%A1%BE-%E8%83%BD%E5%8A%9B%E5%8D%87%E7%BA%A7%E7%A4%BE%E5%8C%BA%E5%90%88%E4%BD%9C%E4%B8%8E%E6%9C%AA%E6%9D%A5%E5%B1%95%E6%9C%9B/)
- [高性能 RPC 框架 CloudWeGo-Kitex 内外统一的开源实践](https://www.cloudwego.io/zh/blog/2022/09/20/%E9%AB%98%E6%80%A7%E8%83%BD-rpc-%E6%A1%86%E6%9E%B6-cloudwego-kitex-%E5%86%85%E5%A4%96%E7%BB%9F%E4%B8%80%E7%9A%84%E5%BC%80%E6%BA%90%E5%AE%9E%E8%B7%B5/)
- [字节跳动 Go RPC 框架 Kitex 性能优化实践](https://www.cloudwego.io/zh/blog/2021/09/23/%E5%AD%97%E8%8A%82%E8%B7%B3%E5%8A%A8-go-rpc-%E6%A1%86%E6%9E%B6-kitex-%E6%80%A7%E8%83%BD%E4%BC%98%E5%8C%96%E5%AE%9E%E8%B7%B5/)
- [字节跳动在 Go 网络库上的实践](https://www.cloudwego.io/zh/blog/2021/10/09/%E5%AD%97%E8%8A%82%E8%B7%B3%E5%8A%A8%E5%9C%A8-go-%E7%BD%91%E7%BB%9C%E5%BA%93%E4%B8%8A%E7%9A%84%E5%AE%9E%E8%B7%B5/)
- [RPC 框架 Kitex 实践入门:性能测试指南](https://www.cloudwego.io/zh/blog/2021/11/24/rpc-%E6%A1%86%E6%9E%B6-kitex-%E5%AE%9E%E8%B7%B5%E5%85%A5%E9%97%A8%E6%80%A7%E8%83%BD%E6%B5%8B%E8%AF%95%E6%8C%87%E5%8D%97/)

## 贡献代码

[Contributing](CONTRIBUTING.md)
贡献者指南:[Contributing](CONTRIBUTING.md)

## 开源许可

Expand All @@ -109,14 +113,14 @@ Kitex 基于[Apache License 2.0](LICENSE) 许可证,其依赖的三方组件
- 如何成为 member: [COMMUNITY MEMBERSHIP](https://github.com/cloudwego/community/blob/main/COMMUNITY_MEMBERSHIP.md)
- Issues: [Issues](https://github.com/cloudwego/kitex/issues)
- Slack: 加入我们的 [Slack 频道](https://join.slack.com/t/cloudwego/shared_invite/zt-tmcbzewn-UjXMF3ZQsPhl7W3tEDZboA)
- 飞书用户群([注册飞书](https://www.feishu.cn/)进群
- 飞书用户群([注册飞书](https://www.feishu.cn/)后扫码进群

![LarkGroup](images/lark_group_cn.png)

## Landscapes

<p align="center">
<img src="https://landscape.cncf.io/images/left-logo.svg" width="150"/>&nbsp;&nbsp;<img src="https://landscape.cncf.io/images/right-logo.svg" width="200"/>
<img src="https://landscape.cncf.io/images/cncf-landscape-horizontal-color.svg" width="150"/>&nbsp;&nbsp;<img src="https://www.cncf.io/wp-content/uploads/2023/04/cncf-main-site-logo.svg" width="200"/>
<br/><br/>
CloudWeGo 丰富了 <a href="https://landscape.cncf.io/">CNCF 云原生生态</a>。
</p>
104 changes: 13 additions & 91 deletions client/service_inline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@ package client
import (
"context"
"errors"
"net"
"runtime/debug"

"github.com/bytedance/gopkg/cloud/metainfo"
"unsafe"

"github.com/cloudwego/kitex/client/callopt"
"github.com/cloudwego/kitex/internal/client"
internal_server "github.com/cloudwego/kitex/internal/server"
"github.com/cloudwego/kitex/pkg/consts"
"github.com/cloudwego/kitex/pkg/endpoint"
"github.com/cloudwego/kitex/pkg/klog"
Expand All @@ -35,15 +32,9 @@ import (
"github.com/cloudwego/kitex/pkg/utils"
)

var localAddr net.Addr

func init() {
localAddr = utils.NewNetAddr("tcp", "127.0.0.1")
}

type ContextServiceInlineHandler interface {
WriteMeta(cliCtx, svrCtx context.Context, req interface{}) (newSvrCtx context.Context, err error)
ReadMeta(cliCtx, svrCtx context.Context, resp interface{}) (newCliCtx context.Context, err error)
WriteMeta(cliCtx context.Context, req interface{}) (newCliCtx context.Context, err error)
ReadMeta(cliCtx context.Context, resp interface{}) (err error)
}

type serviceInlineClient struct {
Expand All @@ -57,15 +48,12 @@ type serviceInlineClient struct {

// server info
serverEps endpoint.Endpoint
serverOpt *internal_server.Options

contextServiceInlineHandler ContextServiceInlineHandler
}

type ServerInitialInfo interface {
Endpoints() endpoint.Endpoint
Option() *internal_server.Options
GetServiceInfos() map[string]*serviceinfo.ServiceInfo
BuildServiceInlineInvokeChain() endpoint.Endpoint
}

// NewServiceInlineClient creates a kitex.Client with the given ServiceInfo, it is from generated code.
Expand All @@ -76,10 +64,7 @@ func NewServiceInlineClient(svcInfo *serviceinfo.ServiceInfo, s ServerInitialInf
kc := &serviceInlineClient{}
kc.svcInfo = svcInfo
kc.opt = client.NewOptions(opts)
kc.serverEps = s.Endpoints()
kc.serverOpt = s.Option()
kc.serverOpt.RemoteOpt.TargetSvcInfo = svcInfo
kc.serverOpt.RemoteOpt.SvcSearchMap = s.GetServiceInfos()
kc.serverEps = s.BuildServiceInlineInvokeChain()
if err := kc.init(); err != nil {
_ = kc.Close()
return nil, err
Expand Down Expand Up @@ -176,88 +161,25 @@ func (kc *serviceInlineClient) buildInvokeChain() error {
return nil
}

func (kc *serviceInlineClient) constructServerCtxWithMetadata(cliCtx context.Context) (serverCtx context.Context) {
serverCtx = context.Background()
// metainfo
// forward transmission
kvs := make(map[string]string, 16)
metainfo.SaveMetaInfoToMap(cliCtx, kvs)
if len(kvs) > 0 {
serverCtx = metainfo.SetMetaInfoFromMap(serverCtx, kvs)
}
serverCtx = metainfo.TransferForward(serverCtx)
// reverse transmission, backward mark
serverCtx = metainfo.WithBackwardValuesToSend(serverCtx)
return serverCtx
}

func (kc *serviceInlineClient) constructServerRPCInfo(svrCtx, cliCtx context.Context) (newServerCtx context.Context, svrRPCInfo rpcinfo.RPCInfo) {
rpcStats := rpcinfo.AsMutableRPCStats(rpcinfo.NewRPCStats())
if kc.serverOpt.StatsLevel != nil {
rpcStats.SetLevel(*kc.serverOpt.StatsLevel)
}
// Export read-only views to external users and keep a mapping for internal users.
ri := rpcinfo.NewRPCInfo(
rpcinfo.EmptyEndpointInfo(),
rpcinfo.FromBasicInfo(kc.serverOpt.Svr),
rpcinfo.NewServerInvocation(),
rpcinfo.AsMutableRPCConfig(kc.serverOpt.Configs).Clone().ImmutableView(),
rpcStats.ImmutableView(),
)
rpcinfo.AsMutableEndpointInfo(ri.From()).SetAddress(localAddr)
svrCtx = rpcinfo.NewCtxWithRPCInfo(svrCtx, ri)

cliRpcInfo := rpcinfo.GetRPCInfo(cliCtx)
// handle common rpcinfo
method := cliRpcInfo.To().Method()
if ink, ok := ri.Invocation().(rpcinfo.InvocationSetter); ok {
ink.SetMethodName(method)
ink.SetServiceName(kc.svcInfo.ServiceName)
}
rpcinfo.AsMutableEndpointInfo(ri.To()).SetMethod(method)
svrCtx = context.WithValue(svrCtx, consts.CtxKeyMethod, method)
return svrCtx, ri
}

func (kc *serviceInlineClient) invokeHandleEndpoint() (endpoint.Endpoint, error) {
svrTraceCtl := kc.serverOpt.TracerCtl
if svrTraceCtl == nil {
svrTraceCtl = &rpcinfo.TraceController{}
}

return func(ctx context.Context, req, resp interface{}) (err error) {
serverCtx := kc.constructServerCtxWithMetadata(ctx)
defer func() {
// backward key
kvs := metainfo.AllBackwardValuesToSend(serverCtx)
if len(kvs) > 0 {
metainfo.SetBackwardValuesFromMap(ctx, kvs)
}
}()
serverCtx, svrRPCInfo := kc.constructServerRPCInfo(serverCtx, ctx)
defer func() {
rpcinfo.PutRPCInfo(svrRPCInfo)
}()

// server trace
serverCtx = svrTraceCtl.DoStart(serverCtx, svrRPCInfo)

cliRpcInfo := rpcinfo.GetRPCInfo(ctx)
if v, ok := cliRpcInfo.Invocation().(rpcinfo.InvocationSetter); ok {
v.SetExtra(consts.SERVICE_INLINE_SERVICE_NAME, kc.svcInfo.ServiceName)
}
ctx = context.WithValue(ctx, consts.SERVICE_INLINE_RPCINFO_KEY, unsafe.Pointer(&cliRpcInfo))
if kc.contextServiceInlineHandler != nil {
serverCtx, err = kc.contextServiceInlineHandler.WriteMeta(ctx, serverCtx, req)
ctx, err = kc.contextServiceInlineHandler.WriteMeta(ctx, req)
if err != nil {
return err
}
}

// server logic
err = kc.serverEps(serverCtx, req, resp)
// finish server trace
// contextServiceInlineHandler may convert nil err to non nil err, so handle trace here
svrTraceCtl.DoFinish(serverCtx, svrRPCInfo, err)
err = kc.serverEps(ctx, req, resp)

if kc.contextServiceInlineHandler != nil {
var err1 error
ctx, err1 = kc.contextServiceInlineHandler.ReadMeta(ctx, serverCtx, resp)
err1 := kc.contextServiceInlineHandler.ReadMeta(ctx, resp)
if err1 != nil {
return err1
}
Expand Down
38 changes: 1 addition & 37 deletions client/service_inline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,18 @@ import (
"github.com/cloudwego/kitex/client/callopt"
"github.com/cloudwego/kitex/internal/client"
"github.com/cloudwego/kitex/internal/mocks"
internal_server "github.com/cloudwego/kitex/internal/server"
"github.com/cloudwego/kitex/internal/test"
"github.com/cloudwego/kitex/pkg/consts"
"github.com/cloudwego/kitex/pkg/endpoint"
"github.com/cloudwego/kitex/pkg/kerrors"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/pkg/rpcinfo/remoteinfo"
"github.com/cloudwego/kitex/pkg/serviceinfo"
)

type serverInitialInfoImpl struct {
EndpointsFunc func(ctx context.Context, req, resp interface{}) (err error)
}

func (s serverInitialInfoImpl) Endpoints() endpoint.Endpoint {
func (s serverInitialInfoImpl) BuildServiceInlineInvokeChain() endpoint.Endpoint {
if s.EndpointsFunc != nil {
return s.EndpointsFunc
}
Expand All @@ -53,14 +50,6 @@ func (s serverInitialInfoImpl) Endpoints() endpoint.Endpoint {
}
}

func (s serverInitialInfoImpl) Option() *internal_server.Options {
return internal_server.NewOptions(nil)
}

func (s serverInitialInfoImpl) GetServiceInfos() map[string]*serviceinfo.ServiceInfo {
return nil
}

func newMockServerInitialInfo() ServerInitialInfo {
return &serverInitialInfoImpl{}
}
Expand Down Expand Up @@ -347,28 +336,3 @@ func TestServiceInlineClientFinalizer(t *testing.T) {
t.Logf("After second GC, allocation: %f Mb, Number of allocation: %d\n", secondGCHeapAlloc, secondGCHeapObjects)
test.Assert(t, secondGCHeapAlloc < firstGCHeapAlloc/2 && secondGCHeapObjects < firstGCHeapObjects/2)
}

func TestServiceInlineMethodKeyCall(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mtd := mocks.MockMethod
opts := []Option{
WithTransHandlerFactory(newMockCliTransHandlerFactory(ctrl)),
WithResolver(resolver404(ctrl)),
WithDialer(newDialer(ctrl)),
WithDestService("destService"),
}
svcInfo := mocks.ServiceInfo()
s := serverInitialInfoImpl{}
s.EndpointsFunc = func(ctx context.Context, req, resp interface{}) (err error) {
test.Assert(t, ctx.Value(consts.CtxKeyMethod) == mtd)
return nil
}
cli, err := NewServiceInlineClient(svcInfo, s, opts...)
test.Assert(t, err == nil)
ctx := context.Background()
req := new(MockTStruct)
res := new(MockTStruct)
err = cli.Call(ctx, mtd, req, res)
test.Assert(t, err == nil, err)
}
18 changes: 17 additions & 1 deletion client/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"sync/atomic"

"github.com/cloudwego/kitex/pkg/kerrors"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/metadata"
"github.com/cloudwego/kitex/pkg/serviceinfo"

Expand Down Expand Up @@ -164,6 +165,8 @@ func (s *stream) Context() context.Context {
func (s *stream) RecvMsg(m interface{}) (err error) {
err = s.recvEndpoint(s.stream, m)
if err == nil {
// BizStatusErr is returned by the server handle, meaning the stream is ended;
// And it should be returned to the calling business code for error handling
err = s.ri.Invocation().BizStatusErr()
}
if err != nil || s.streamingMode == serviceinfo.StreamingClient {
Expand Down Expand Up @@ -193,10 +196,23 @@ func (s *stream) DoFinish(err error) {
// already called
return
}
if err == io.EOF {
if !isRPCError(err) {
// only rpc errors are reported
err = nil
}
ctx := s.Context()
ri := rpcinfo.GetRPCInfo(ctx)
s.kc.opt.TracerCtl.DoFinish(ctx, ri, err)
}

func isRPCError(err error) bool {
if err == nil {
return false
}
if err == io.EOF {
return false
}
_, isBizStatusError := err.(kerrors.BizStatusErrorIface)
// if a tracer needs to get the BizStatusError, it should read from rpcinfo.invocation.bizStatusErr
return !isBizStatusError
}
Loading

0 comments on commit 8526b3a

Please sign in to comment.