Skip to content

Commit

Permalink
fix: make registry unit test works (#498)
Browse files Browse the repository at this point in the history
  • Loading branch information
sysulq committed Oct 21, 2022
1 parent 6e00bca commit 388bc3c
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 39 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.13.0
github.com/robfig/cron/v3 v3.0.1
github.com/samber/lo v1.33.0
github.com/smallnest/weighted v0.0.0-20200122032019-adf21c9b8bd1
github.com/smartystreets/goconvey v1.7.2
github.com/spf13/cast v1.5.0
Expand Down Expand Up @@ -139,7 +140,7 @@ require (
go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 // indirect
Expand Down
6 changes: 5 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,8 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/samber/lo v1.33.0 h1:2aKucr+rQV6gHpY3bpeZu69uYoQOzVhGT3J22Op6Cjk=
github.com/samber/lo v1.33.0/go.mod h1:HLeWcJRRyLKp3+/XBJvOrerCQn9mhdKMHyd7IRlgeQ8=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
Expand Down Expand Up @@ -596,6 +598,7 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M=
github.com/tidwall/gjson v1.13.0 h1:3TFY9yxOQShrvmjdM76K+jc66zJeT6D3/VFFYCGQf7M=
github.com/tidwall/gjson v1.13.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
Expand Down Expand Up @@ -714,8 +717,9 @@ golang.org/x/exp v0.0.0-20191129062945-2f5052295587/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 h1:QE6XYQK6naiK1EPAe1g/ILLxN5RBoH5xkJk3CqlMI/Y=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down
41 changes: 23 additions & 18 deletions pkg/registry/etcdv3/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const (
// defaultRetryTimes default retry times
defaultRetryTimes = 3
// defaultKeepAliveTimeout is the default timeout for keepalive requests.
defaultKeepaliveTimeout = 5 * time.Second
defaultRegisterTimeout = 5 * time.Second
// servicePrefix is the prefix of service key
servicePrefix = "%s:%s:%s:%s/"
// registerService is servicePrefix+host:port
Expand Down Expand Up @@ -110,12 +110,15 @@ func (reg *etcdv3Registry) ListServices(ctx context.Context, name string, scheme
}

for _, kv := range getResp.Kvs {
var service server.ServiceInfo
var service Update
if err := json.Unmarshal(kv.Value, &service); err != nil {
reg.logger.Warn("invalid service", xlog.FieldErr(err))
continue
}
services = append(services, &service)

services = append(services, &server.ServiceInfo{
Address: service.Addr,
})
}

return
Expand Down Expand Up @@ -234,7 +237,7 @@ func (reg *etcdv3Registry) registerKV(ctx context.Context, key, val string) erro
// opOptions = append(opOptions, clientv3.WithSerializable())
if ttl := reg.Config.ServiceTTL.Seconds(); ttl > 0 {
// 这里基于应用名为key做缓存,每个服务实例应该只需要创建一个lease,降低etcd的压力
lease, err := reg.getLeaseID(ctx, int64(reg.ServiceTTL.Seconds()))
lease, err := reg.getLeaseID(ctx)
if err != nil {
reg.logger.Error("getSession", xlog.FieldErrKind(ecode.ErrKindRegisterErr), xlog.FieldErr(err),
xlog.FieldKeyAny(key), xlog.FieldValueAny(val))
Expand All @@ -259,15 +262,15 @@ func (reg *etcdv3Registry) registerKV(ctx context.Context, key, val string) erro
return nil
}

func (reg *etcdv3Registry) getLeaseID(ctx context.Context, ttl int64) (clientv3.LeaseID, error) {
func (reg *etcdv3Registry) getLeaseID(ctx context.Context) (clientv3.LeaseID, error) {
reg.rmu.Lock()
defer reg.rmu.Unlock()

if reg.leaseID != 0 {
return reg.leaseID, nil
}

grant, err := reg.client.Grant(ctx, ttl)
grant, err := reg.client.Grant(ctx, int64(reg.ServiceTTL.Seconds()))
if err != nil {
reg.logger.Error("reg.client.Grant failed", xlog.FieldErrKind(ecode.ErrKindRegisterErr), xlog.FieldErr(err))
return 0, err
Expand Down Expand Up @@ -301,26 +304,17 @@ func (reg *etcdv3Registry) doKeepalive(ctx context.Context) {
// do register again, and retry 3 times
err := reg.registerAllKvs(cancelCtx)
if err != nil {
cancel()
return
}

// try do keepalive again
// when error or timeout happens, just exit the goroutine
kac, err = reg.client.KeepAlive(cancelCtx, reg.leaseID)
if err != nil {
reg.logger.Error("reg.client.KeepAlive failed", xlog.FieldErrKind(ecode.ErrKindRegisterErr), xlog.FieldErr(err))
return
}

reg.logger.Debug("reg.client.KeepAlive finished", xlog.String("leaseid", fmt.Sprintf("%x", reg.leaseID)))

done <- struct{}{}
}()

// wait keepalive success
select {
case <-time.After(defaultKeepaliveTimeout):
// when timeout or error happens
case <-time.After(defaultRegisterTimeout):
// when timeout happens
// we should cancel the context and retry again
cancel()
// mark leaseID as 0 to retry register
Expand All @@ -331,6 +325,17 @@ func (reg *etcdv3Registry) doKeepalive(ctx context.Context) {
// when done happens, we just receive the kac channel
// or wait the registry context done
}

// try do keepalive again
// when error or timeout happens, just continue
kac, err = reg.client.KeepAlive(ctx, reg.leaseID)
if err != nil {
reg.logger.Error("reg.client.KeepAlive failed", xlog.FieldErrKind(ecode.ErrKindRegisterErr), xlog.FieldErr(err))

continue
}

reg.logger.Debug("reg.client.KeepAlive finished", xlog.String("leaseid", fmt.Sprintf("%x", reg.leaseID)))
}

select {
Expand Down
83 changes: 64 additions & 19 deletions pkg/registry/etcdv3/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,21 @@ package etcdv3
import (
"context"
"fmt"
"log"
"testing"
"time"

"github.com/samber/lo"
"github.com/douyu/jupiter/pkg/client/etcdv3"
"github.com/douyu/jupiter/pkg/core/constant"
"github.com/douyu/jupiter/pkg/registry"
"github.com/douyu/jupiter/pkg/server"
"github.com/douyu/jupiter/pkg/xlog"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/client/v3/mock/mockserver"
)

func startMockServer() {
ms, err := mockserver.StartMockServers(1)
if err != nil {
log.Fatal(err)
}

if err := ms.StartAt(0); err != nil {
log.Fatal(err)
}
}

func TestMain(m *testing.M) {
go startMockServer()
}

func Test_etcdv3Registry(t *testing.T) {
etcdConfig := etcdv3.DefaultConfig()
etcdConfig.Endpoints = []string{"localhost:0"}
etcdConfig.Endpoints = []string{"localhost:2379"}
registry, err := newETCDRegistry(&Config{
Config: etcdConfig,
ReadTimeout: time.Second * 10,
Expand Down Expand Up @@ -112,7 +96,7 @@ func Test_etcdv3Registry(t *testing.T) {

func Test_etcdv3registry_UpdateAddressList(t *testing.T) {
etcdConfig := etcdv3.DefaultConfig()
etcdConfig.Endpoints = []string{"localhost:0"}
etcdConfig.Endpoints = []string{"localhost:2379"}
reg, err := newETCDRegistry(&Config{
Config: etcdConfig,
ReadTimeout: time.Second * 10,
Expand Down Expand Up @@ -151,3 +135,64 @@ func Test_etcdv3registry_UpdateAddressList(t *testing.T) {
_ = reg.Close()
time.Sleep(time.Second * 1)
}

func TestKeepalive(t *testing.T) {
etcdConfig := etcdv3.DefaultConfig()
etcdConfig.Endpoints = []string{"localhost:2379"}
reg, err := newETCDRegistry(&Config{
Config: etcdConfig,
ReadTimeout: time.Second * 10,
Prefix: "jupiter",
logger: xlog.Jupiter(),
ServiceTTL: time.Second,
})
assert.Nil(t, err)
assert.Nil(t, reg.RegisterService(context.Background(), &server.ServiceInfo{
Name: "service_2",
AppID: "",
Scheme: "grpc",
Address: "10.10.10.1:9091",
Weight: 0,
Enable: true,
Healthy: true,
Metadata: map[string]string{},
Region: "default",
Zone: "default",
Kind: constant.ServiceProvider,
Deployment: "default",
Group: "",
}))
assert.Nil(t, reg.RegisterService(context.Background(), &server.ServiceInfo{
Name: "service_2",
AppID: "",
Scheme: "grpc",
Address: "10.10.10.1:9092",
Weight: 0,
Enable: true,
Healthy: true,
Metadata: map[string]string{},
Region: "default",
Zone: "default",
Kind: constant.ServiceProvider,
Deployment: "default",
Group: "",
}))

lease := reg.leaseID
reg.client.Revoke(reg.ctx, lo.Must(reg.getLeaseID(reg.ctx)))

time.Sleep(1 * time.Second)
assert.NotZero(t,lo.Must(reg.getLeaseID(reg.ctx)))
assert.True(t, lease != lo.Must(reg.getLeaseID(reg.ctx)))

ttl, err := reg.client.TimeToLive(reg.ctx, lease)
assert.Nil(t, err)
assert.Equal(t, int64(-1), ttl.TTL)

ttl, err = reg.client.TimeToLive(reg.ctx, lo.Must(reg.getLeaseID(reg.ctx)))
assert.Nil(t, err)
assert.Equal(t, int64(1), ttl.TTL)

_ = reg.Close()
time.Sleep(time.Second * 1)
}

0 comments on commit 388bc3c

Please sign in to comment.