Skip to content

Commit

Permalink
feat: refactor etcd registry protocol (#485)
Browse files Browse the repository at this point in the history
  • Loading branch information
sysulq committed Oct 20, 2022
1 parent 9f83c42 commit 56549fd
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 181 deletions.
17 changes: 0 additions & 17 deletions pkg/ecode/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
package ecode

import (
"encoding/json"
"net/http"
"sync"

"github.com/douyu/jupiter/pkg/governor"
"github.com/douyu/jupiter/pkg/xlog"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
Expand All @@ -38,20 +35,6 @@ var (
OK = add(int(codes.OK), "OK")
)

func init() {
// status code list
governor.HandleFunc("/status/code/list", func(w http.ResponseWriter, r *http.Request) {
var rets = make(map[int]*spbStatus)
_codes.Range(func(key, val interface{}) bool {
code := key.(int)
status := val.(*spbStatus)
rets[code] = status
return true
})
_ = json.NewEncoder(w).Encode(rets)
})
}

// Add ...
func Add(code int, message string) *spbStatus {
if code > maxCustomizeCode {
Expand Down
11 changes: 11 additions & 0 deletions pkg/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"

"github.com/douyu/jupiter/pkg/conf"
"github.com/douyu/jupiter/pkg/constant"
)

Expand Down Expand Up @@ -42,7 +43,17 @@ func SetAppLogDir(logDir string) {
appLogDir = logDir
}

// AppMode returns the current application mode.
func AppMode() string {
confMode := conf.GetString(constant.ConfigKey("mode"))
if appMode == "" {
if confMode == "" {
return "unkown-mode"
}

return confMode
}

return appMode
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/registry/etcdv3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/douyu/jupiter/pkg/registry"
"github.com/douyu/jupiter/pkg/singleton"
"github.com/douyu/jupiter/pkg/xlog"
"github.com/spf13/cast"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -53,7 +54,7 @@ func DefaultConfig() *Config {
ReadTimeout: time.Second * 3,
Prefix: "wsd-reg",
logger: xlog.Jupiter(),
ServiceTTL: 0,
ServiceTTL: cast.ToDuration("20s"),
}
}

Expand Down
181 changes: 40 additions & 141 deletions pkg/registry/etcdv3/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (
"encoding/json"
"fmt"
"net"
"net/url"
"strings"
"sync"
"time"

"github.com/douyu/jupiter/pkg"
"github.com/douyu/jupiter/pkg/client/etcdv3"
"github.com/douyu/jupiter/pkg/conf"
"github.com/douyu/jupiter/pkg/constant"
"github.com/douyu/jupiter/pkg/ecode"
"github.com/douyu/jupiter/pkg/registry"
Expand All @@ -46,6 +46,12 @@ type etcdv3Registry struct {
sessions map[string]*concurrency.Session
}

const (
servicePrefix = "%s:%s:%s:%s/"
// schema:appname:version:mode/host:port
registerService = "%s:%s:%s:%s/%s"
)

func newETCDRegistry(config *Config) (*etcdv3Registry, error) {
if config.logger == nil {
config.logger = xlog.Jupiter()
Expand Down Expand Up @@ -84,7 +90,7 @@ func (reg *etcdv3Registry) UnregisterService(ctx context.Context, info *server.S

// ListServices list service registered in registry with name `name`
func (reg *etcdv3Registry) ListServices(ctx context.Context, name string, scheme string) (services []*server.ServiceInfo, err error) {
target := fmt.Sprintf("/%s/%s/providers/%s://", reg.Prefix, name, scheme)
target := fmt.Sprintf(servicePrefix, scheme, name, "v1", conf.GetString("app.mode"))
getResp, getErr := reg.client.Get(ctx, target, clientv3.WithPrefix())
if getErr != nil {
reg.logger.Error(ecode.MsgWatchRequestErr, xlog.FieldErrKind(ecode.ErrKindRequestErr), xlog.FieldErr(getErr), xlog.FieldAddr(target))
Expand All @@ -105,7 +111,7 @@ func (reg *etcdv3Registry) ListServices(ctx context.Context, name string, scheme

// WatchServices watch service change event, then return address list
func (reg *etcdv3Registry) WatchServices(ctx context.Context, name string, scheme string) (chan registry.Endpoints, error) {
prefix := fmt.Sprintf("/%s/%s/", reg.Prefix, name)
prefix := fmt.Sprintf(servicePrefix, scheme, name, "v1", pkg.AppMode())
watch, err := reg.client.WatchPrefix(context.Background(), prefix)
if err != nil {
return nil, err
Expand Down Expand Up @@ -213,8 +219,8 @@ func (reg *etcdv3Registry) registerMetric(ctx context.Context, info *server.Serv
opOptions := make([]clientv3.OpOption, 0)
// opOptions = append(opOptions, clientv3.WithSerializable())
if ttl := reg.Config.ServiceTTL.Seconds(); ttl > 0 {
//todo ctx without timeout for same as service life?
sess, err := reg.getSession(key, concurrency.WithTTL(int(ttl)))
// 这里基于应用名为key做缓存,每个服务实例应该只需要创建一个lease,降低etcd的压力
sess, err := reg.getSession(info.Name, concurrency.WithTTL(int(ttl)))
if err != nil {
return err
}
Expand Down Expand Up @@ -244,8 +250,8 @@ func (reg *etcdv3Registry) registerBiz(ctx context.Context, info *server.Service
opOptions := make([]clientv3.OpOption, 0)
// opOptions = append(opOptions, clientv3.WithSerializable())
if ttl := reg.Config.ServiceTTL.Seconds(); ttl > 0 {
//todo ctx without timeout for same as service life?
sess, err := reg.getSession(key, concurrency.WithTTL(int(ttl)))
// 这里基于应用名为key做缓存,每个服务实例应该只需要创建一个lease,降低etcd的压力
sess, err := reg.getSession(info.Name, concurrency.WithTTL(int(ttl)))
if err != nil {
return err
}
Expand All @@ -262,19 +268,21 @@ func (reg *etcdv3Registry) registerBiz(ctx context.Context, info *server.Service
}

func (reg *etcdv3Registry) getSession(k string, opts ...concurrency.SessionOption) (*concurrency.Session, error) {
reg.rmu.RLock()
// 需要对整个方法加锁,防止并发创建session
reg.rmu.Lock()
defer reg.rmu.Unlock()
sess, ok := reg.sessions[k]
reg.rmu.RUnlock()
if ok {
return sess, nil
}

sess, err := concurrency.NewSession(reg.client.Client, opts...)
if err != nil {
xlog.Jupiter().Error("create session failed", xlog.FieldKeyAny(k))
return sess, err
}
reg.rmu.Lock()
reg.sessions[k] = sess
reg.rmu.Unlock()
xlog.Jupiter().Info("create session", xlog.FieldKeyAny(k), xlog.FieldValueAny(sess))
return sess, nil
}

Expand All @@ -296,43 +304,23 @@ func (reg *etcdv3Registry) delSession(k string) error {
}

func (reg *etcdv3Registry) registerKey(info *server.ServiceInfo) string {
return registry.GetServiceKey(reg.Prefix, info)
return fmt.Sprintf(registerService, info.Scheme, info.Name, "v1", pkg.AppMode(), info.Address)
}

func (reg *etcdv3Registry) registerValue(info *server.ServiceInfo) string {
return registry.GetServiceValue(info)
update := Update{
Op: Add,
Addr: info.Address,
}

val, _ := json.Marshal(update)

return string(val)
}

func deleteAddrList(al *registry.Endpoints, prefix, scheme string, kvs ...*mvccpb.KeyValue) {
for _, kv := range kvs {
var addr = strings.TrimPrefix(string(kv.Key), prefix)
if strings.HasPrefix(addr, "providers/"+scheme) {
// 解析服务注册键
addr = strings.TrimPrefix(addr, "providers/")
if addr == "" {
continue
}
uri, err := url.Parse(addr)
if err != nil {
xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key)))
continue
}
delete(al.Nodes, uri.String())
}

if strings.HasPrefix(addr, "configurators/"+scheme) {
// 解析服务配置键
addr = strings.TrimPrefix(addr, "configurators/")
if addr == "" {
continue
}
uri, err := url.Parse(addr)
if err != nil {
xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key)))
continue
}
delete(al.RouteConfigs, uri.String())
}

if isIPPort(addr) {
// 直接删除addr 因为Delete操作的value值为空
Expand All @@ -345,117 +333,28 @@ func deleteAddrList(al *registry.Endpoints, prefix, scheme string, kvs ...*mvccp
func updateAddrList(al *registry.Endpoints, prefix, scheme string, kvs ...*mvccpb.KeyValue) {
for _, kv := range kvs {
var addr = strings.TrimPrefix(string(kv.Key), prefix)
switch {
// 解析服务注册键
case strings.HasPrefix(addr, "providers/"+scheme):
addr = strings.TrimPrefix(addr, "providers/")
uri, err := url.Parse(addr)
if err != nil {
xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key)))
continue
}
var serviceInfo server.ServiceInfo
if err := json.Unmarshal(kv.Value, &serviceInfo); err != nil {
xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key)))
continue
}
if serviceInfo.Enable {
al.Nodes[uri.String()] = serviceInfo
} else {
delete(al.Nodes, uri.String())
}

case strings.HasPrefix(addr, "configurators/"+scheme):
addr = strings.TrimPrefix(addr, "configurators/")

uri, err := url.Parse(addr)
if err != nil {
xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key)))
if isIPPort(addr) {
var meta Update
if err := json.Unmarshal(kv.Value, &meta); err != nil {
xlog.Jupiter().Error("unmarshal meta", xlog.FieldErr(err),
xlog.FieldExtMessage("value", string(kv.Value), "key", string(kv.Key)))
continue
}

if strings.HasPrefix(uri.Path, "/routes/") { // 路由配置
var routeConfig registry.RouteConfig
if err := json.Unmarshal(kv.Value, &routeConfig); err != nil {
xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key)))
continue
}
routeConfig.ID = strings.TrimPrefix(uri.Path, "/routes/")
routeConfig.Scheme = uri.Scheme
routeConfig.Host = uri.Host
al.RouteConfigs[uri.String()] = routeConfig
}

if strings.HasPrefix(uri.Path, "/providers/") {
var providerConfig registry.ProviderConfig
if err := json.Unmarshal(kv.Value, &providerConfig); err != nil {
xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key)))
continue
}
providerConfig.ID = strings.TrimPrefix(uri.Path, "/providers/")
providerConfig.Scheme = uri.Scheme
providerConfig.Host = uri.Host
al.ProviderConfigs[uri.String()] = providerConfig
}

if strings.HasPrefix(uri.Path, "/consumers/") {
var consumerConfig registry.ConsumerConfig
if err := json.Unmarshal(kv.Value, &consumerConfig); err != nil {
xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key)))
continue
switch meta.Op {
case Add:
al.Nodes[addr] = server.ServiceInfo{
Address: addr,
}
consumerConfig.ID = strings.TrimPrefix(uri.Path, "/consumers/")
consumerConfig.Scheme = uri.Scheme
consumerConfig.Host = uri.Host
al.ConsumerConfigs[uri.String()] = consumerConfig
case Delete:
delete(al.Nodes, addr)
}
}
}
}


func isIPPort(addr string) bool {
_, _, err := net.SplitHostPort(addr)
return err == nil
}

/*
key: /jupiter/main/configurator/grpc:///routes/1
val:
{
"upstream": { // 客户端配置
"nodes": { // 按照node负载均衡
"127.0.0.1:1980": 1,
"127.0.0.1:1981": 4
},
"group": { // 按照group负载均衡
"red": 2,
"green": 1
}
},
"uri": "/hello",
"deployment": "open_api"
}
key: /jupiter/main/configurator/grpc://127.0.0.1/routes/2
val:
{
"upstream": { // 客户端配置
"nodes": { // 按照node负载均衡
"127.0.0.1:1980": 1,
"127.0.0.1:1981": 1
},
"group": { // 按照group负载均衡
"red": 1,
"green": 2
}
},
"uri": "/hello",
"deployment": "core_api" // 部署组
}
key: /jupiter/main/configurator/grpc:///consumers/client-demo
val:
{
}
*/
36 changes: 36 additions & 0 deletions pkg/registry/etcdv3/update.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2022 Douyu
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcdv3

type Operation uint8

const (
// Add indicates an Endpoint is added.
Add Operation = iota
// Delete indicates an existing address is deleted.
Delete
)

// Update defines a name resolution update. Notice that it is not valid having both
// empty string Addr and nil Metadata in an Update.
type Update struct {
// Op indicates the operation of the update.
Op Operation
// Addr is the updated address. It is empty string if there is no address update.
Addr string
// Metadata is the updated metadata. It is nil if there is no metadata update.
// Metadata is not required for a custom naming implementation.
Metadata interface{}
}
Loading

0 comments on commit 56549fd

Please sign in to comment.