Skip to content

Commit

Permalink
chore: refactor config center (#4339)
Browse files Browse the repository at this point in the history
Signed-off-by: kevin <wanjunfeng@gmail.com>
  • Loading branch information
kevwan authored Aug 28, 2024
1 parent 44cddec commit 24d6150
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 93 deletions.
47 changes: 21 additions & 26 deletions core/configcenter/configurator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
)

var (
errorEmptyConfig = errors.New("empty config value")
errorMissUnmarshalerType = errors.New("miss unmarshaler type")
errEmptyConfig = errors.New("empty config value")
errMissingUnmarshalerType = errors.New("missing unmarshaler type")
)

// Configurator is the interface for configuration center.
Expand All @@ -32,19 +32,17 @@ type (
Config struct {
// Type is the value type, yaml, json or toml.
Type string `json:",default=yaml,options=[yaml,json,toml]"`
// Log indicates whether to log the configuration.
Log bool `json:",default=ture"`
// Log is the flag to control logging.
Log bool `json:",default=true"`
}

configCenter[T any] struct {
conf Config
unmarshaler LoaderFn

subscriber subscriber.Subscriber

listeners []func()
lock sync.Mutex
snapshot atomic.Value
subscriber subscriber.Subscriber
listeners []func()
lock sync.Mutex
snapshot atomic.Value
}

value[T any] struct {
Expand All @@ -61,7 +59,6 @@ var _ Configurator[any] = (*configCenter[any])(nil)
func MustNewConfigCenter[T any](c Config, subscriber subscriber.Subscriber) Configurator[T] {
cc, err := NewConfigCenter[T](c, subscriber)
logx.Must(err)

return cc
}

Expand All @@ -76,9 +73,6 @@ func NewConfigCenter[T any](c Config, subscriber subscriber.Subscriber) (Configu
conf: c,
unmarshaler: unmarshaler,
subscriber: subscriber,
listeners: nil,
lock: sync.Mutex{},
snapshot: atomic.Value{},
}

if err := cc.loadConfig(); err != nil {
Expand All @@ -105,10 +99,10 @@ func (c *configCenter[T]) AddListener(listener func()) {

// GetConfig return structured config.
func (c *configCenter[T]) GetConfig() (T, error) {
var r T
v := c.value()
if v == nil || len(v.data) < 1 {
return r, errorEmptyConfig
if v == nil || len(v.data) == 0 {
var empty T
return empty, errEmptyConfig
}

return v.marshalData, v.err
Expand Down Expand Up @@ -141,7 +135,9 @@ func (c *configCenter[T]) loadConfig() error {
}

func (c *configCenter[T]) onChange() {
_ = c.loadConfig()
if err := c.loadConfig(); err != nil {
return
}

c.lock.Lock()
listeners := make([]func(), len(c.listeners))
Expand All @@ -165,40 +161,39 @@ func (c *configCenter[T]) genValue(data string) *value[T] {
v := &value[T]{
data: data,
}
if len(data) <= 0 {
if len(data) == 0 {
return v
}

t := reflect.TypeOf(v.marshalData)
// if the type is nil, it means that the user has not set the type of the configuration.
if t == nil {
v.err = errorMissUnmarshalerType
v.err = errMissingUnmarshalerType
return v
}

t = mapping.Deref(t)

switch t.Kind() {
case reflect.Struct, reflect.Array, reflect.Slice:
err := c.unmarshaler([]byte(data), &v.marshalData)
if err != nil {
if err := c.unmarshaler([]byte(data), &v.marshalData); err != nil {
v.err = err
if c.conf.Log {
logx.Errorf("ConfigCenter unmarshal configuration failed, err: %+v, content [%s]", err.Error(), data)
logx.Errorf("ConfigCenter unmarshal configuration failed, err: %+v, content [%s]",
err.Error(), data)
}
}
case reflect.String:
if str, ok := any(data).(T); ok {
v.marshalData = str
} else {
v.err = errorMissUnmarshalerType
v.err = errMissingUnmarshalerType
}
default:
if c.conf.Log {
logx.Errorf("ConfigCenter unmarshal configuration missing unmarshaler for type: %s, content [%s]",
t.Kind(), data)
}
v.err = errorMissUnmarshalerType
v.err = errMissingUnmarshalerType
}

return v
Expand Down
16 changes: 8 additions & 8 deletions core/configcenter/configurator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestConfigCenter_AddListener(t *testing.T) {
func TestConfigCenter_genValue(t *testing.T) {
t.Run("data is empty", func(t *testing.T) {
c := &configCenter[string]{
unmarshaler: defaultRegistry.unmarshalers["json"],
unmarshaler: registry.unmarshalers["json"],
conf: Config{Log: true},
}
v := c.genValue("")
Expand All @@ -159,25 +159,25 @@ func TestConfigCenter_genValue(t *testing.T) {

t.Run("invalid template type", func(t *testing.T) {
c := &configCenter[any]{
unmarshaler: defaultRegistry.unmarshalers["json"],
unmarshaler: registry.unmarshalers["json"],
conf: Config{Log: true},
}
v := c.genValue("xxxx")
assert.Equal(t, errorMissUnmarshalerType, v.err)
assert.Equal(t, errMissingUnmarshalerType, v.err)
})

t.Run("unsupported template type", func(t *testing.T) {
c := &configCenter[int]{
unmarshaler: defaultRegistry.unmarshalers["json"],
unmarshaler: registry.unmarshalers["json"],
conf: Config{Log: true},
}
v := c.genValue("1")
assert.Equal(t, errorMissUnmarshalerType, v.err)
assert.Equal(t, errMissingUnmarshalerType, v.err)
})

t.Run("supported template string type", func(t *testing.T) {
c := &configCenter[string]{
unmarshaler: defaultRegistry.unmarshalers["json"],
unmarshaler: registry.unmarshalers["json"],
conf: Config{Log: true},
}
v := c.genValue("12345")
Expand All @@ -189,7 +189,7 @@ func TestConfigCenter_genValue(t *testing.T) {
c := &configCenter[struct {
Name string `json:"name"`
}]{
unmarshaler: defaultRegistry.unmarshalers["json"],
unmarshaler: registry.unmarshalers["json"],
conf: Config{Log: true},
}
v := c.genValue(`{"name":"new name}`)
Expand All @@ -201,7 +201,7 @@ func TestConfigCenter_genValue(t *testing.T) {
c := &configCenter[struct {
Name string `json:"name"`
}]{
unmarshaler: defaultRegistry.unmarshalers["json"],
unmarshaler: registry.unmarshalers["json"],
conf: Config{Log: true},
}
v := c.genValue(`{"name":"new name"}`)
Expand Down
35 changes: 21 additions & 14 deletions core/configcenter/subscriber/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,19 @@ type (
}

// EtcdConf is the configuration for etcd.
EtcdConf discov.EtcdConf
EtcdConf = discov.EtcdConf
)

// MustNewEtcdSubscriber returns an etcd Subscriber, exits on errors.
func MustNewEtcdSubscriber(conf EtcdConf) Subscriber {
s, err := NewEtcdSubscriber(conf)
logx.Must(err)

return s
}

// NewEtcdSubscriber returns an etcd Subscriber.
func NewEtcdSubscriber(conf EtcdConf) (Subscriber, error) {
var opts = []discov.SubOption{
discov.WithDisablePrefix(),
}
if len(conf.User) != 0 {
opts = append(opts, discov.WithSubEtcdAccount(conf.User, conf.Pass))
}
if len(conf.CertFile) != 0 || len(conf.CertKeyFile) != 0 || len(conf.CACertFile) != 0 {
opts = append(opts,
discov.WithSubEtcdTLS(conf.CertFile, conf.CertKeyFile, conf.CACertFile, conf.InsecureSkipVerify))
}

opts := buildSubOptions(conf)
s, err := discov.NewSubscriber(conf.Hosts, conf.Key, opts...)
if err != nil {
return nil, err
Expand All @@ -44,6 +33,23 @@ func NewEtcdSubscriber(conf EtcdConf) (Subscriber, error) {
return &etcdSubscriber{Subscriber: s}, nil
}

// buildSubOptions constructs the options for creating a new etcd subscriber.
func buildSubOptions(conf EtcdConf) []discov.SubOption {
opts := []discov.SubOption{
discov.WithExactMatch(),
}

if len(conf.User) > 0 {
opts = append(opts, discov.WithSubEtcdAccount(conf.User, conf.Pass))
}
if len(conf.CertFile) > 0 || len(conf.CertKeyFile) > 0 || len(conf.CACertFile) > 0 {
opts = append(opts, discov.WithSubEtcdTLS(conf.CertFile, conf.CertKeyFile,
conf.CACertFile, conf.InsecureSkipVerify))
}

return opts
}

// AddListener adds a listener to the subscriber.
func (s *etcdSubscriber) AddListener(listener func()) error {
s.Subscriber.AddListener(listener)
Expand All @@ -53,8 +59,9 @@ func (s *etcdSubscriber) AddListener(listener func()) error {
// Value returns the value of the subscriber.
func (s *etcdSubscriber) Value() (string, error) {
vs := s.Subscriber.Values()
if len(vs) != 0 {
if len(vs) > 0 {
return vs[len(vs)-1], nil
}

return "", nil
}
41 changes: 18 additions & 23 deletions core/configcenter/unmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,36 @@ import (
"github.com/zeromicro/go-zero/core/conf"
)

var registry = &unmarshalerRegistry{
unmarshalers: map[string]LoaderFn{
"json": conf.LoadFromJsonBytes,
"toml": conf.LoadFromTomlBytes,
"yaml": conf.LoadFromYamlBytes,
},
}

type (
// LoaderFn is the function type for loading configuration.
LoaderFn func([]byte, any) error

// unmarshalerRegistry is the registry for unmarshalers.
unmarshalerRegistry struct {
unmarshalers map[string]LoaderFn

mu sync.RWMutex
mu sync.RWMutex
}

// LoaderFn is the function type for loading configuration.
LoaderFn func([]byte, any) error
)

var defaultRegistry *unmarshalerRegistry

func init() {
defaultRegistry = &unmarshalerRegistry{
unmarshalers: map[string]LoaderFn{
"json": conf.LoadFromJsonBytes,
"toml": conf.LoadFromTomlBytes,
"yaml": conf.LoadFromYamlBytes,
},
}
}

// RegisterUnmarshaler registers an unmarshaler.
func RegisterUnmarshaler(name string, fn LoaderFn) {
defaultRegistry.mu.Lock()
defaultRegistry.unmarshalers[name] = fn
defaultRegistry.mu.Unlock()
registry.mu.Lock()
defer registry.mu.Unlock()
registry.unmarshalers[name] = fn
}

// Unmarshaler returns the unmarshaler by name.
func Unmarshaler(name string) (LoaderFn, bool) {
defaultRegistry.mu.RLock()
fn, ok := defaultRegistry.unmarshalers[name]
defaultRegistry.mu.RUnlock()
registry.mu.RLock()
defer registry.mu.RUnlock()
fn, ok := registry.unmarshalers[name]
return fn, ok
}
28 changes: 14 additions & 14 deletions core/discov/internal/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (r *Registry) GetConn(endpoints []string) (EtcdClient, error) {
}

// Monitor monitors the key on given etcd endpoints, notify with the given UpdateListener.
func (r *Registry) Monitor(endpoints []string, key string, l UpdateListener, disablePrefix bool) error {
func (r *Registry) Monitor(endpoints []string, key string, l UpdateListener, exactMatch bool) error {
c, exists := r.getCluster(endpoints)
// if exists, the existing values should be updated to the listener.
if exists {
Expand All @@ -56,7 +56,7 @@ func (r *Registry) Monitor(endpoints []string, key string, l UpdateListener, dis
}
}

return c.monitor(key, l, disablePrefix)
return c.monitor(key, l, exactMatch)
}

func (r *Registry) getCluster(endpoints []string) (c *cluster, exists bool) {
Expand All @@ -80,14 +80,14 @@ func (r *Registry) getCluster(endpoints []string) (c *cluster, exists bool) {
}

type cluster struct {
endpoints []string
key string
values map[string]map[string]string
listeners map[string][]UpdateListener
watchGroup *threading.RoutineGroup
done chan lang.PlaceholderType
lock sync.RWMutex
disablePrefix bool
endpoints []string
key string
values map[string]map[string]string
listeners map[string][]UpdateListener
watchGroup *threading.RoutineGroup
done chan lang.PlaceholderType
lock sync.RWMutex
exactMatch bool
}

func newCluster(endpoints []string) *cluster {
Expand Down Expand Up @@ -226,7 +226,7 @@ func (c *cluster) load(cli EtcdClient, key string) int64 {
for {
var err error
ctx, cancel := context.WithTimeout(c.context(cli), RequestTimeout)
if c.disablePrefix {
if c.exactMatch {
resp, err = cli.Get(ctx, key)
} else {
resp, err = cli.Get(ctx, makeKeyPrefix(key), clientv3.WithPrefix())
Expand Down Expand Up @@ -254,10 +254,10 @@ func (c *cluster) load(cli EtcdClient, key string) int64 {
return resp.Header.Revision
}

func (c *cluster) monitor(key string, l UpdateListener, disablePrefix bool) error {
func (c *cluster) monitor(key string, l UpdateListener, exactMatch bool) error {
c.lock.Lock()
c.listeners[key] = append(c.listeners[key], l)
c.disablePrefix = disablePrefix
c.exactMatch = exactMatch
c.lock.Unlock()

cli, err := c.getClient()
Expand Down Expand Up @@ -328,7 +328,7 @@ func (c *cluster) watchStream(cli EtcdClient, key string, rev int64) error {
ops []clientv3.OpOption
watchKey = key
)
if !c.disablePrefix {
if !c.exactMatch {
watchKey = makeKeyPrefix(key)
ops = append(ops, clientv3.WithPrefix())
}
Expand Down
Loading

0 comments on commit 24d6150

Please sign in to comment.