Skip to content

Commit

Permalink
fix: add grpc test for async connect (#544)
Browse files Browse the repository at this point in the history
  • Loading branch information
sysulq authored Nov 3, 2022
1 parent 43d88e5 commit 6eae1c1
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 14 deletions.
46 changes: 37 additions & 9 deletions pkg/client/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package grpc

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -43,12 +44,39 @@ func TestConfigBlockTrue(t *testing.T) {
})
}

// func TestConfigBlockFalse(t *testing.T) {
// t.Run("test no address and no block", func(t *testing.T) {
// cfg := DefaultConfig()
// cfg.Level = "panic"
// cfg.Block = false
// conn := newGRPCClient(cfg)
// assert.Equal(t, conn.GetState().String(), "CONNECTING")
// })
// }
func TestAsyncConnect(t *testing.T) {
t.Run("test async connect", func(t *testing.T) {
cfg := DefaultConfig()
cfg.Addr = "127.0.0.1:9530"
conn := cfg.Build()

ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

res, err := testproto.NewGreeterClient(conn).SayHello(ctx, &testproto.HelloRequest{
Name: "hello",
})
assert.NotNil(t, err)
assert.Nil(t, res)

go func() {
startServer("127.0.0.1:9530", "test-async-server")
}()

assert.Eventually(t, func() bool {

ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

fmt.Println(conn.GetState())
res, err := testproto.NewGreeterClient(conn).SayHello(ctx, &testproto.HelloRequest{
Name: "hello",
})
fmt.Println(err, res)
return err == nil && res != nil
}, 5*time.Second, time.Second)

})
}
2 changes: 1 addition & 1 deletion pkg/client/rocketmq/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func producerDefaultInterceptor(producer *Producer) primitive.Interceptor {
}
}

// 统一 metadata 传递
// 统一 metadata 传递.
func producerMDInterceptor(producer *Producer) primitive.Interceptor {
return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
if md, ok := imeta.FromContext(ctx); ok {
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/xtrace/otelgrpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (config *Config) Build() trace.TracerProvider {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

conn, err := grpc.DialContext(ctx, config.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
conn, err := grpc.DialContext(ctx, config.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
xlog.Jupiter().Panic("new otelgrpc", xlog.FieldMod("build"), xlog.FieldErr(err))
return nil
Expand Down
4 changes: 1 addition & 3 deletions pkg/registry/etcdv3/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,9 @@ func TestKeepalive(t *testing.T) {
reg.client.Revoke(context.Background(), lease)

assert.Eventually(t, func() bool {
return reg.getLeaseID() != 0
return reg.getLeaseID() != 0 && lease != reg.getLeaseID()
}, 5*time.Second, time.Second)

assert.True(t, lease != reg.getLeaseID())

ttl, err := reg.client.TimeToLive(context.Background(), lease)
assert.Nil(t, err)
assert.Equal(t, int64(-1), ttl.TTL)
Expand Down

0 comments on commit 6eae1c1

Please sign in to comment.