From 56549fdc1533bceb4ce529537e4af1f48a3f0d21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= Date: Thu, 20 Oct 2022 14:14:25 +0800 Subject: [PATCH] feat: refactor etcd registry protocol (#485) --- pkg/ecode/code.go | 17 --- pkg/env.go | 11 ++ pkg/registry/etcdv3/config.go | 3 +- pkg/registry/etcdv3/registry.go | 181 ++++++---------------------- pkg/registry/etcdv3/update.go | 36 ++++++ pkg/registry/registry.go | 21 ---- pkg/{ => server}/governor/config.go | 0 pkg/{ => server}/governor/http.go | 0 pkg/{ => server}/governor/init.go | 0 pkg/{ => server}/governor/server.go | 2 +- 10 files changed, 90 insertions(+), 181 deletions(-) create mode 100644 pkg/registry/etcdv3/update.go rename pkg/{ => server}/governor/config.go (100%) rename pkg/{ => server}/governor/http.go (100%) rename pkg/{ => server}/governor/init.go (100%) rename pkg/{ => server}/governor/server.go (97%) diff --git a/pkg/ecode/code.go b/pkg/ecode/code.go index 61bc3fad25..4edb8ff8e8 100644 --- a/pkg/ecode/code.go +++ b/pkg/ecode/code.go @@ -15,11 +15,8 @@ package ecode import ( - "encoding/json" - "net/http" "sync" - "github.com/douyu/jupiter/pkg/governor" "github.com/douyu/jupiter/pkg/xlog" spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc/codes" @@ -38,20 +35,6 @@ var ( OK = add(int(codes.OK), "OK") ) -func init() { - // status code list - governor.HandleFunc("/status/code/list", func(w http.ResponseWriter, r *http.Request) { - var rets = make(map[int]*spbStatus) - _codes.Range(func(key, val interface{}) bool { - code := key.(int) - status := val.(*spbStatus) - rets[code] = status - return true - }) - _ = json.NewEncoder(w).Encode(rets) - }) -} - // Add ... func Add(code int, message string) *spbStatus { if code > maxCustomizeCode { diff --git a/pkg/env.go b/pkg/env.go index 65b0d7a0a7..9a4f052e5f 100644 --- a/pkg/env.go +++ b/pkg/env.go @@ -5,6 +5,7 @@ import ( "fmt" "os" + "github.com/douyu/jupiter/pkg/conf" "github.com/douyu/jupiter/pkg/constant" ) @@ -42,7 +43,17 @@ func SetAppLogDir(logDir string) { appLogDir = logDir } +// AppMode returns the current application mode. func AppMode() string { + confMode := conf.GetString(constant.ConfigKey("mode")) + if appMode == "" { + if confMode == "" { + return "unkown-mode" + } + + return confMode + } + return appMode } diff --git a/pkg/registry/etcdv3/config.go b/pkg/registry/etcdv3/config.go index fcecd46a10..13d0d2dfa7 100644 --- a/pkg/registry/etcdv3/config.go +++ b/pkg/registry/etcdv3/config.go @@ -24,6 +24,7 @@ import ( "github.com/douyu/jupiter/pkg/registry" "github.com/douyu/jupiter/pkg/singleton" "github.com/douyu/jupiter/pkg/xlog" + "github.com/spf13/cast" "go.uber.org/zap" ) @@ -53,7 +54,7 @@ func DefaultConfig() *Config { ReadTimeout: time.Second * 3, Prefix: "wsd-reg", logger: xlog.Jupiter(), - ServiceTTL: 0, + ServiceTTL: cast.ToDuration("20s"), } } diff --git a/pkg/registry/etcdv3/registry.go b/pkg/registry/etcdv3/registry.go index c06bc4a269..5afe9de009 100644 --- a/pkg/registry/etcdv3/registry.go +++ b/pkg/registry/etcdv3/registry.go @@ -19,13 +19,13 @@ import ( "encoding/json" "fmt" "net" - "net/url" "strings" "sync" "time" "github.com/douyu/jupiter/pkg" "github.com/douyu/jupiter/pkg/client/etcdv3" + "github.com/douyu/jupiter/pkg/conf" "github.com/douyu/jupiter/pkg/constant" "github.com/douyu/jupiter/pkg/ecode" "github.com/douyu/jupiter/pkg/registry" @@ -46,6 +46,12 @@ type etcdv3Registry struct { sessions map[string]*concurrency.Session } +const ( + servicePrefix = "%s:%s:%s:%s/" + // schema:appname:version:mode/host:port + registerService = "%s:%s:%s:%s/%s" +) + func newETCDRegistry(config *Config) (*etcdv3Registry, error) { if config.logger == nil { config.logger = xlog.Jupiter() @@ -84,7 +90,7 @@ func (reg *etcdv3Registry) UnregisterService(ctx context.Context, info *server.S // ListServices list service registered in registry with name `name` func (reg *etcdv3Registry) ListServices(ctx context.Context, name string, scheme string) (services []*server.ServiceInfo, err error) { - target := fmt.Sprintf("/%s/%s/providers/%s://", reg.Prefix, name, scheme) + target := fmt.Sprintf(servicePrefix, scheme, name, "v1", conf.GetString("app.mode")) getResp, getErr := reg.client.Get(ctx, target, clientv3.WithPrefix()) if getErr != nil { reg.logger.Error(ecode.MsgWatchRequestErr, xlog.FieldErrKind(ecode.ErrKindRequestErr), xlog.FieldErr(getErr), xlog.FieldAddr(target)) @@ -105,7 +111,7 @@ func (reg *etcdv3Registry) ListServices(ctx context.Context, name string, scheme // WatchServices watch service change event, then return address list func (reg *etcdv3Registry) WatchServices(ctx context.Context, name string, scheme string) (chan registry.Endpoints, error) { - prefix := fmt.Sprintf("/%s/%s/", reg.Prefix, name) + prefix := fmt.Sprintf(servicePrefix, scheme, name, "v1", pkg.AppMode()) watch, err := reg.client.WatchPrefix(context.Background(), prefix) if err != nil { return nil, err @@ -213,8 +219,8 @@ func (reg *etcdv3Registry) registerMetric(ctx context.Context, info *server.Serv opOptions := make([]clientv3.OpOption, 0) // opOptions = append(opOptions, clientv3.WithSerializable()) if ttl := reg.Config.ServiceTTL.Seconds(); ttl > 0 { - //todo ctx without timeout for same as service life? - sess, err := reg.getSession(key, concurrency.WithTTL(int(ttl))) + // 这里基于应用名为key做缓存,每个服务实例应该只需要创建一个lease,降低etcd的压力 + sess, err := reg.getSession(info.Name, concurrency.WithTTL(int(ttl))) if err != nil { return err } @@ -244,8 +250,8 @@ func (reg *etcdv3Registry) registerBiz(ctx context.Context, info *server.Service opOptions := make([]clientv3.OpOption, 0) // opOptions = append(opOptions, clientv3.WithSerializable()) if ttl := reg.Config.ServiceTTL.Seconds(); ttl > 0 { - //todo ctx without timeout for same as service life? - sess, err := reg.getSession(key, concurrency.WithTTL(int(ttl))) + // 这里基于应用名为key做缓存,每个服务实例应该只需要创建一个lease,降低etcd的压力 + sess, err := reg.getSession(info.Name, concurrency.WithTTL(int(ttl))) if err != nil { return err } @@ -262,19 +268,21 @@ func (reg *etcdv3Registry) registerBiz(ctx context.Context, info *server.Service } func (reg *etcdv3Registry) getSession(k string, opts ...concurrency.SessionOption) (*concurrency.Session, error) { - reg.rmu.RLock() + // 需要对整个方法加锁,防止并发创建session + reg.rmu.Lock() + defer reg.rmu.Unlock() sess, ok := reg.sessions[k] - reg.rmu.RUnlock() if ok { return sess, nil } + sess, err := concurrency.NewSession(reg.client.Client, opts...) if err != nil { + xlog.Jupiter().Error("create session failed", xlog.FieldKeyAny(k)) return sess, err } - reg.rmu.Lock() reg.sessions[k] = sess - reg.rmu.Unlock() + xlog.Jupiter().Info("create session", xlog.FieldKeyAny(k), xlog.FieldValueAny(sess)) return sess, nil } @@ -296,43 +304,23 @@ func (reg *etcdv3Registry) delSession(k string) error { } func (reg *etcdv3Registry) registerKey(info *server.ServiceInfo) string { - return registry.GetServiceKey(reg.Prefix, info) + return fmt.Sprintf(registerService, info.Scheme, info.Name, "v1", pkg.AppMode(), info.Address) } func (reg *etcdv3Registry) registerValue(info *server.ServiceInfo) string { - return registry.GetServiceValue(info) + update := Update{ + Op: Add, + Addr: info.Address, + } + + val, _ := json.Marshal(update) + + return string(val) } func deleteAddrList(al *registry.Endpoints, prefix, scheme string, kvs ...*mvccpb.KeyValue) { for _, kv := range kvs { var addr = strings.TrimPrefix(string(kv.Key), prefix) - if strings.HasPrefix(addr, "providers/"+scheme) { - // 解析服务注册键 - addr = strings.TrimPrefix(addr, "providers/") - if addr == "" { - continue - } - uri, err := url.Parse(addr) - if err != nil { - xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key))) - continue - } - delete(al.Nodes, uri.String()) - } - - if strings.HasPrefix(addr, "configurators/"+scheme) { - // 解析服务配置键 - addr = strings.TrimPrefix(addr, "configurators/") - if addr == "" { - continue - } - uri, err := url.Parse(addr) - if err != nil { - xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key))) - continue - } - delete(al.RouteConfigs, uri.String()) - } if isIPPort(addr) { // 直接删除addr 因为Delete操作的value值为空 @@ -345,117 +333,28 @@ func deleteAddrList(al *registry.Endpoints, prefix, scheme string, kvs ...*mvccp func updateAddrList(al *registry.Endpoints, prefix, scheme string, kvs ...*mvccpb.KeyValue) { for _, kv := range kvs { var addr = strings.TrimPrefix(string(kv.Key), prefix) - switch { - // 解析服务注册键 - case strings.HasPrefix(addr, "providers/"+scheme): - addr = strings.TrimPrefix(addr, "providers/") - uri, err := url.Parse(addr) - if err != nil { - xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key))) - continue - } - var serviceInfo server.ServiceInfo - if err := json.Unmarshal(kv.Value, &serviceInfo); err != nil { - xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key))) - continue - } - if serviceInfo.Enable { - al.Nodes[uri.String()] = serviceInfo - } else { - delete(al.Nodes, uri.String()) - } - - case strings.HasPrefix(addr, "configurators/"+scheme): - addr = strings.TrimPrefix(addr, "configurators/") - - uri, err := url.Parse(addr) - if err != nil { - xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key))) + if isIPPort(addr) { + var meta Update + if err := json.Unmarshal(kv.Value, &meta); err != nil { + xlog.Jupiter().Error("unmarshal meta", xlog.FieldErr(err), + xlog.FieldExtMessage("value", string(kv.Value), "key", string(kv.Key))) continue } - if strings.HasPrefix(uri.Path, "/routes/") { // 路由配置 - var routeConfig registry.RouteConfig - if err := json.Unmarshal(kv.Value, &routeConfig); err != nil { - xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key))) - continue - } - routeConfig.ID = strings.TrimPrefix(uri.Path, "/routes/") - routeConfig.Scheme = uri.Scheme - routeConfig.Host = uri.Host - al.RouteConfigs[uri.String()] = routeConfig - } - - if strings.HasPrefix(uri.Path, "/providers/") { - var providerConfig registry.ProviderConfig - if err := json.Unmarshal(kv.Value, &providerConfig); err != nil { - xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key))) - continue - } - providerConfig.ID = strings.TrimPrefix(uri.Path, "/providers/") - providerConfig.Scheme = uri.Scheme - providerConfig.Host = uri.Host - al.ProviderConfigs[uri.String()] = providerConfig - } - - if strings.HasPrefix(uri.Path, "/consumers/") { - var consumerConfig registry.ConsumerConfig - if err := json.Unmarshal(kv.Value, &consumerConfig); err != nil { - xlog.Jupiter().Error("parse uri", xlog.FieldErrKind(ecode.ErrKindUriErr), xlog.FieldErr(err), xlog.FieldKey(string(kv.Key))) - continue + switch meta.Op { + case Add: + al.Nodes[addr] = server.ServiceInfo{ + Address: addr, } - consumerConfig.ID = strings.TrimPrefix(uri.Path, "/consumers/") - consumerConfig.Scheme = uri.Scheme - consumerConfig.Host = uri.Host - al.ConsumerConfigs[uri.String()] = consumerConfig + case Delete: + delete(al.Nodes, addr) } } } } + func isIPPort(addr string) bool { _, _, err := net.SplitHostPort(addr) return err == nil } - -/* -key: /jupiter/main/configurator/grpc:///routes/1 -val: -{ - "upstream": { // 客户端配置 - "nodes": { // 按照node负载均衡 - "127.0.0.1:1980": 1, - "127.0.0.1:1981": 4 - }, - "group": { // 按照group负载均衡 - "red": 2, - "green": 1 - } - }, - "uri": "/hello", - "deployment": "open_api" -} - -key: /jupiter/main/configurator/grpc://127.0.0.1/routes/2 -val: -{ - "upstream": { // 客户端配置 - "nodes": { // 按照node负载均衡 - "127.0.0.1:1980": 1, - "127.0.0.1:1981": 1 - }, - "group": { // 按照group负载均衡 - "red": 1, - "green": 2 - } - }, - "uri": "/hello", - "deployment": "core_api" // 部署组 -} - -key: /jupiter/main/configurator/grpc:///consumers/client-demo -val: -{ - -} -*/ diff --git a/pkg/registry/etcdv3/update.go b/pkg/registry/etcdv3/update.go new file mode 100644 index 0000000000..91dd996546 --- /dev/null +++ b/pkg/registry/etcdv3/update.go @@ -0,0 +1,36 @@ +// Copyright 2022 Douyu +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdv3 + +type Operation uint8 + +const ( + // Add indicates an Endpoint is added. + Add Operation = iota + // Delete indicates an existing address is deleted. + Delete +) + +// Update defines a name resolution update. Notice that it is not valid having both +// empty string Addr and nil Metadata in an Update. +type Update struct { + // Op indicates the operation of the update. + Op Operation + // Addr is the updated address. It is empty string if there is no address update. + Addr string + // Metadata is the updated metadata. It is nil if there is no metadata update. + // Metadata is not required for a custom naming implementation. + Metadata interface{} +} diff --git a/pkg/registry/registry.go b/pkg/registry/registry.go index c4ef11cfca..5998a697f6 100644 --- a/pkg/registry/registry.go +++ b/pkg/registry/registry.go @@ -16,8 +16,6 @@ package registry import ( "context" - "encoding/json" - "fmt" "io" "github.com/douyu/jupiter/pkg/server" @@ -106,25 +104,6 @@ type Registry interface { io.Closer } -//GetServiceKey .. -func GetServiceKey(prefix string, s *server.ServiceInfo) string { - return fmt.Sprintf("/%s/%s/%s/%s://%s", prefix, s.Name, s.Kind.String(), s.Scheme, s.Address) -} - -//GetServiceValue .. -func GetServiceValue(s *server.ServiceInfo) string { - val, _ := json.Marshal(s) - return string(val) -} - -//GetService .. -func GetService(s string) *server.ServiceInfo { - var si server.ServiceInfo - // TODO: 查询服务 - _ = json.Unmarshal([]byte(s), &si) - return &si -} - // Configuration ... type Configuration struct { Routes []Route `json:"routes"` // 配置客户端路由策略 diff --git a/pkg/governor/config.go b/pkg/server/governor/config.go similarity index 100% rename from pkg/governor/config.go rename to pkg/server/governor/config.go diff --git a/pkg/governor/http.go b/pkg/server/governor/http.go similarity index 100% rename from pkg/governor/http.go rename to pkg/server/governor/http.go diff --git a/pkg/governor/init.go b/pkg/server/governor/init.go similarity index 100% rename from pkg/governor/init.go rename to pkg/server/governor/init.go diff --git a/pkg/governor/server.go b/pkg/server/governor/server.go similarity index 97% rename from pkg/governor/server.go rename to pkg/server/governor/server.go index 2fbe079b60..4cd00a6a98 100644 --- a/pkg/governor/server.go +++ b/pkg/server/governor/server.go @@ -67,7 +67,7 @@ func (s *Server) Info() *server.ServiceInfo { } info := server.ApplyOptions( - server.WithScheme("http"), + server.WithScheme("govern"), server.WithAddress(serviceAddr), server.WithKind(constant.ServiceGovernor), )