From fb37059d31ff01ab6bdaee578859bf500f4df963 Mon Sep 17 00:00:00 2001 From: zhangzeyi Date: Tue, 11 Oct 2022 10:54:52 +0800 Subject: [PATCH 01/19] =?UTF-8?q?=E7=BC=93=E5=AD=98=E7=AD=96=E7=95=A5demo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/apinto/worker.go | 6 +- .../cache-strategy/actuator-handle.go | 23 ++ drivers/strategy/cache-strategy/actuator.go | 205 ++++++++++++++++++ .../strategy/cache-strategy/cache/cache.go | 16 ++ .../cache-strategy/cache_valid_time.go | 79 +++++++ drivers/strategy/cache-strategy/config.go | 12 + .../controller.go | 2 +- drivers/strategy/cache-strategy/driver.go | 60 +++++ drivers/strategy/cache-strategy/factory.go | 45 ++++ drivers/strategy/cache-strategy/handler.go | 28 +++ .../strategy/cache-strategy/http-handler.go | 47 ++++ .../actuator-handle.go | 4 +- .../actuator.go | 4 +- .../config.go | 4 +- .../strategy/limiting-strategy/controller.go | 85 ++++++++ .../driver.go | 2 +- .../factory.go | 2 +- .../handler.go | 5 +- .../http-handler.go | 4 +- .../limiting.go | 2 +- .../scalar/scalar.go | 0 .../scalar/vectors.go | 0 22 files changed, 619 insertions(+), 16 deletions(-) create mode 100644 drivers/strategy/cache-strategy/actuator-handle.go create mode 100644 drivers/strategy/cache-strategy/actuator.go create mode 100644 drivers/strategy/cache-strategy/cache/cache.go create mode 100644 drivers/strategy/cache-strategy/cache_valid_time.go create mode 100644 drivers/strategy/cache-strategy/config.go rename drivers/strategy/{limiting-stragety => cache-strategy}/controller.go (98%) create mode 100644 drivers/strategy/cache-strategy/driver.go create mode 100644 drivers/strategy/cache-strategy/factory.go create mode 100644 drivers/strategy/cache-strategy/handler.go create mode 100644 drivers/strategy/cache-strategy/http-handler.go rename drivers/strategy/{limiting-stragety => limiting-strategy}/actuator-handle.go (84%) rename drivers/strategy/{limiting-stragety => limiting-strategy}/actuator.go (95%) rename drivers/strategy/{limiting-stragety => limiting-strategy}/config.go (93%) create mode 100644 drivers/strategy/limiting-strategy/controller.go rename drivers/strategy/{limiting-stragety => limiting-strategy}/driver.go (97%) rename drivers/strategy/{limiting-stragety => limiting-strategy}/factory.go (97%) rename drivers/strategy/{limiting-stragety => limiting-strategy}/handler.go (96%) rename drivers/strategy/{limiting-stragety => limiting-strategy}/http-handler.go (97%) rename drivers/strategy/{limiting-stragety => limiting-strategy}/limiting.go (97%) rename drivers/strategy/{limiting-stragety => limiting-strategy}/scalar/scalar.go (100%) rename drivers/strategy/{limiting-stragety => limiting-strategy}/scalar/vectors.go (100%) diff --git a/app/apinto/worker.go b/app/apinto/worker.go index 7945a49b..9d060de3 100644 --- a/app/apinto/worker.go +++ b/app/apinto/worker.go @@ -28,7 +28,8 @@ import ( "github.com/eolinker/apinto/drivers/plugins/strategy" http_router "github.com/eolinker/apinto/drivers/router/http-router" service "github.com/eolinker/apinto/drivers/service" - limiting_stragety "github.com/eolinker/apinto/drivers/strategy/limiting-stragety" + cache_strategy "github.com/eolinker/apinto/drivers/strategy/cache-strategy" + limiting_strategy "github.com/eolinker/apinto/drivers/strategy/limiting-strategy" template "github.com/eolinker/apinto/drivers/template" "github.com/eolinker/eosc" @@ -89,5 +90,6 @@ func Register(extenderRegister eosc.IExtenderDriverRegister) { proxy_rewriteV2.Register(extenderRegister) strategy.Register(extenderRegister) - limiting_stragety.Register(extenderRegister) + limiting_strategy.Register(extenderRegister) + cache_strategy.Register(extenderRegister) } diff --git a/drivers/strategy/cache-strategy/actuator-handle.go b/drivers/strategy/cache-strategy/actuator-handle.go new file mode 100644 index 00000000..f2d668e1 --- /dev/null +++ b/drivers/strategy/cache-strategy/actuator-handle.go @@ -0,0 +1,23 @@ +package cache_strategy + +import ( + "github.com/eolinker/eosc/eocontext" +) + +type ActuatorsHandler interface { + Assert(ctx eocontext.EoContext) bool + Check(ctx eocontext.EoContext, handlers []*CacheValidTimeHandler) error +} + +var ( + actuatorsHandlers []ActuatorsHandler +) + +func RegisterActuator(handler ActuatorsHandler) { + + actuatorsHandlers = append(actuatorsHandlers, handler) +} +func getActuatorsHandlers() []ActuatorsHandler { + + return actuatorsHandlers +} diff --git a/drivers/strategy/cache-strategy/actuator.go b/drivers/strategy/cache-strategy/actuator.go new file mode 100644 index 00000000..07f5e401 --- /dev/null +++ b/drivers/strategy/cache-strategy/actuator.go @@ -0,0 +1,205 @@ +package cache_strategy + +import ( + "github.com/eolinker/apinto/drivers/strategy/cache-strategy/cache" + "github.com/eolinker/apinto/strategy" + "github.com/eolinker/eosc/eocontext" + http_service "github.com/eolinker/eosc/eocontext/http-context" + "net/http" + "sort" + "strconv" + "strings" + "sync" +) + +var ( + actuatorSet ActuatorSet +) + +func init() { + actuator := newtActuator() + actuatorSet = actuator + strategy.AddStrategyHandler(actuator) +} + +type ActuatorSet interface { + Set(string, *CacheValidTimeHandler) + Del(id string) +} + +type tActuator struct { + lock sync.RWMutex + all map[string]*CacheValidTimeHandler + handlers []*CacheValidTimeHandler +} + +func (a *tActuator) Destroy() { + +} + +func (a *tActuator) Set(id string, val *CacheValidTimeHandler) { + // 调用来源有锁 + a.all[id] = val + a.rebuild() + +} + +func (a *tActuator) Del(id string) { + // 调用来源有锁 + delete(a.all, id) + a.rebuild() +} + +func (a *tActuator) rebuild() { + + handlers := make([]*CacheValidTimeHandler, 0, len(a.all)) + for _, h := range a.all { + if !h.stop { + handlers = append(handlers, h) + } + } + sort.Sort(handlerListSort(handlers)) + a.lock.Lock() + defer a.lock.Unlock() + a.handlers = handlers +} +func newtActuator() *tActuator { + return &tActuator{ + all: make(map[string]*CacheValidTimeHandler), + } +} + +func (a *tActuator) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) error { + + httpCtx, err := http_service.Assert(ctx) + if err != nil { + return err + } + + uri := httpCtx.Request().URI().RequestURI() + + isCache := false + controlMap := parseCacheControl(httpCtx) + if !controlMap.NoCache() && controlMap.IsPublic() && httpCtx.Request().Method() == http.MethodGet { + isCache = true + } + + if isCache { + a.lock.RLock() + handlers := a.handlers + a.lock.RUnlock() + + for _, handler := range handlers { + if handler.stop { + continue + } + if handler.filter.Check(ctx) { + + maxAge := controlMap.MaxAge() + if maxAge > 0 { //todo 已经设置过缓存时间 + + } + + localCache := cache.GetCache(uri) + if localCache != nil { + httpCtx.Response().SetBody(localCache.Body) + for key, val := range localCache.Header { + httpCtx.Response().SetHeader(key, val) + } + return nil + } else { + //拿不到说明已经过期了 + if next != nil { + if err = next.DoChain(ctx); err != nil { + return err + } + } + httpCtx, err = http_service.Assert(ctx) + if err != nil { + return err + } + + header := make(map[string]string) + for key, values := range httpCtx.Response().Headers() { + if len(values) > 0 { + header[key] = values[0] + } + } + + localCache = &cache.Cache{ + Header: header, + Body: httpCtx.Response().GetBody(), + } + cache.SetCache(uri, localCache, handler.validTime) + return nil + } + + } + + } + } + + if next != nil { + return next.DoChain(ctx) + } + return nil +} + +type handlerListSort []*CacheValidTimeHandler + +func (hs handlerListSort) Len() int { + return len(hs) +} + +func (hs handlerListSort) Less(i, j int) bool { + + return hs[i].priority < hs[j].priority +} + +func (hs handlerListSort) Swap(i, j int) { + hs[i], hs[j] = hs[j], hs[i] +} + +type cacheControlMap map[string]string + +func (c cacheControlMap) NoCache() bool { + if _, ok := c["no-cache"]; ok { + return true + } + return false +} + +func (c cacheControlMap) MaxAge() int { + if maxAgeStr, ok := c["max-age"]; ok { + maxAge, _ := strconv.Atoi(maxAgeStr) + return maxAge + } + return 0 +} + +func (c cacheControlMap) IsPublic() bool { + //只要不是私有的 都算公有 + if _, ok := c["private"]; ok { + return false + } + return true +} + +func parseCacheControl(httpCtx http_service.IHttpContext) cacheControlMap { + cc := cacheControlMap{} + + header := httpCtx.Request().Header().GetHeader("Cache-Control") + for _, part := range strings.Split(header, ",") { + part = strings.Trim(part, " ") + if part == "" { + continue + } + if strings.ContainsRune(part, '=') { + keyVal := strings.Split(part, "=") + cc[strings.Trim(keyVal[0], " ")] = strings.Trim(keyVal[1], ",") + } else { + cc[part] = "" + } + } + return cc +} diff --git a/drivers/strategy/cache-strategy/cache/cache.go b/drivers/strategy/cache-strategy/cache/cache.go new file mode 100644 index 00000000..534f9e59 --- /dev/null +++ b/drivers/strategy/cache-strategy/cache/cache.go @@ -0,0 +1,16 @@ +package cache + +//todo 本地缓存demo + +type Cache struct { + Header map[string]string + Body []byte +} + +func SetCache(uri string, cache *Cache, validTime int) { + +} + +func GetCache(uri string) *Cache { + return nil +} diff --git a/drivers/strategy/cache-strategy/cache_valid_time.go b/drivers/strategy/cache-strategy/cache_valid_time.go new file mode 100644 index 00000000..759d1922 --- /dev/null +++ b/drivers/strategy/cache-strategy/cache_valid_time.go @@ -0,0 +1,79 @@ +package cache_strategy + +import ( + "fmt" + "github.com/eolinker/eosc" + "reflect" +) + +var ( + _ eosc.IWorker = (*CacheValidTime)(nil) + _ eosc.IWorkerDestroy = (*CacheValidTime)(nil) +) + +type CacheValidTime struct { + id string + name string + handler *CacheValidTimeHandler + config *Config + isRunning int +} + +func (l *CacheValidTime) Destroy() error { + controller.Del(l.id) + return nil +} + +func (l *CacheValidTime) Id() string { + return l.id +} + +func (l *CacheValidTime) Start() error { + if l.isRunning == 0 { + l.isRunning = 1 + actuatorSet.Set(l.id, l.handler) + } + + return nil +} + +func (l *CacheValidTime) Reset(v interface{}, workers map[eosc.RequireId]eosc.IWorker) error { + conf, ok := v.(*Config) + if !ok { + return eosc.ErrorConfigIsNil + } + if conf.Priority > 999 || conf.Priority < 1 { + return fmt.Errorf("priority value %d not allow ", conf.Priority) + } + if conf.ValidTime < 1 { + return fmt.Errorf("validTime value %d not allow ", conf.ValidTime) + } + + confCore := conf + if reflect.DeepEqual(l.config, confCore) { + return nil + } + handler, err := NewCacheValidTimeHandler(confCore) + if err != nil { + return err + } + l.config = confCore + l.handler = handler + if l.isRunning != 0 { + actuatorSet.Set(l.id, l.handler) + } + return nil +} + +func (l *CacheValidTime) Stop() error { + if l.isRunning != 0 { + l.isRunning = 0 + actuatorSet.Del(l.id) + } + + return nil +} + +func (l *CacheValidTime) CheckSkill(skill string) bool { + return false +} diff --git a/drivers/strategy/cache-strategy/config.go b/drivers/strategy/cache-strategy/config.go new file mode 100644 index 00000000..821eb5ef --- /dev/null +++ b/drivers/strategy/cache-strategy/config.go @@ -0,0 +1,12 @@ +package cache_strategy + +import "github.com/eolinker/apinto/strategy" + +type Config struct { + Name string `json:"name" skip:"skip"` + Description string `json:"description" skip:"skip"` + Stop bool `json:"stop" ` + Priority int `json:"priority" label:"优先级" description:"1-999"` + Filters strategy.FilterConfig `json:"filters" label:"过滤规则"` + ValidTime int `json:"valid_time" label:"有效期" description:"有效期"` +} diff --git a/drivers/strategy/limiting-stragety/controller.go b/drivers/strategy/cache-strategy/controller.go similarity index 98% rename from drivers/strategy/limiting-stragety/controller.go rename to drivers/strategy/cache-strategy/controller.go index 80cc8d7a..534c096c 100644 --- a/drivers/strategy/limiting-stragety/controller.go +++ b/drivers/strategy/cache-strategy/controller.go @@ -1,4 +1,4 @@ -package limiting_stragety +package cache_strategy import ( "github.com/eolinker/eosc" diff --git a/drivers/strategy/cache-strategy/driver.go b/drivers/strategy/cache-strategy/driver.go new file mode 100644 index 00000000..7eea119c --- /dev/null +++ b/drivers/strategy/cache-strategy/driver.go @@ -0,0 +1,60 @@ +package cache_strategy + +import ( + "fmt" + "github.com/eolinker/apinto/strategy" + "github.com/eolinker/eosc" + "reflect" +) + +func checkConfig(conf *Config) error { + if conf.Priority > 999 || conf.Priority < 1 { + return fmt.Errorf("priority value %d not allow ", conf.Priority) + } + + if conf.ValidTime < 1 { + return fmt.Errorf("validTime value %d not allow ", conf.ValidTime) + } + + _, err := strategy.ParseFilter(conf.Filters) + if err != nil { + return err + } + + return nil +} + +type driver struct { +} + +func (d *driver) Check(v interface{}, workers map[eosc.RequireId]eosc.IWorker) error { + cfg, ok := v.(*Config) + if !ok { + return eosc.ErrorConfigIsNil + } + + return checkConfig(cfg) +} + +func (d *driver) ConfigType() reflect.Type { + return configType +} + +func (d *driver) Create(id, name string, v interface{}, workers map[eosc.RequireId]eosc.IWorker) (eosc.IWorker, error) { + if err := d.Check(v, workers); err != nil { + return nil, err + } + + lg := &CacheValidTime{ + id: id, + name: name, + } + + err := lg.Reset(v, workers) + if err != nil { + return nil, err + } + + controller.Store(id) + return lg, nil +} diff --git a/drivers/strategy/cache-strategy/factory.go b/drivers/strategy/cache-strategy/factory.go new file mode 100644 index 00000000..ee3e7c6d --- /dev/null +++ b/drivers/strategy/cache-strategy/factory.go @@ -0,0 +1,45 @@ +package cache_strategy + +import ( + "github.com/eolinker/eosc" + "github.com/eolinker/eosc/setting" + "github.com/eolinker/eosc/utils/schema" + "reflect" +) + +const Name = "strategy-cache" + +var ( + configType = reflect.TypeOf((*Config)(nil)) +) + +//Register 注册http路由驱动工厂 +func Register(register eosc.IExtenderDriverRegister) { + + register.RegisterExtenderDriver(Name, newFactory()) + setting.RegisterSetting("strategies-cache", controller) +} + +type factory struct { + render interface{} +} + +func newFactory() *factory { + render, err := schema.Generate(configType, nil) + if err != nil { + panic(err) + } + return &factory{ + render: render, + } +} + +func (f *factory) Render() interface{} { + return f.render +} + +func (f *factory) Create(profession string, name string, label string, desc string, params map[string]interface{}) (eosc.IExtenderDriver, error) { + controller.driver = name + controller.profession = profession + return &driver{}, nil +} diff --git a/drivers/strategy/cache-strategy/handler.go b/drivers/strategy/cache-strategy/handler.go new file mode 100644 index 00000000..8f9e2f4a --- /dev/null +++ b/drivers/strategy/cache-strategy/handler.go @@ -0,0 +1,28 @@ +package cache_strategy + +import ( + "github.com/eolinker/apinto/strategy" +) + +type CacheValidTimeHandler struct { + name string + filter strategy.IFilter + validTime int + priority int + stop bool +} + +func NewCacheValidTimeHandler(conf *Config) (*CacheValidTimeHandler, error) { + filter, err := strategy.ParseFilter(conf.Filters) + if err != nil { + return nil, err + } + + return &CacheValidTimeHandler{ + name: conf.Name, + filter: filter, + validTime: conf.ValidTime, + priority: conf.Priority, + stop: conf.Stop, + }, nil +} diff --git a/drivers/strategy/cache-strategy/http-handler.go b/drivers/strategy/cache-strategy/http-handler.go new file mode 100644 index 00000000..4dbb52d3 --- /dev/null +++ b/drivers/strategy/cache-strategy/http-handler.go @@ -0,0 +1,47 @@ +package cache_strategy + +import ( + "github.com/eolinker/eosc/eocontext" + http_service "github.com/eolinker/eosc/eocontext/http-context" +) + +func init() { + RegisterActuator(newActuator()) +} + +type actuatorHttp struct { +} + +func newActuator() *actuatorHttp { + return &actuatorHttp{} +} + +func (hd *actuatorHttp) Assert(ctx eocontext.EoContext) bool { + _, err := http_service.Assert(ctx) + if err != nil { + return false + } + return true +} + +func (hd *actuatorHttp) Check(ctx eocontext.EoContext, handlers []*CacheValidTimeHandler) error { + //httpCtx, err := http_service.Assert(ctx) + //if err != nil { + // return err + //} + + return nil +} + +type Set map[string]struct{} + +func newSet(l int) Set { + return make(Set, l) +} +func (s Set) Has(key string) bool { + _, has := s[key] + return has +} +func (s Set) Add(key string) { + s[key] = struct{}{} +} diff --git a/drivers/strategy/limiting-stragety/actuator-handle.go b/drivers/strategy/limiting-strategy/actuator-handle.go similarity index 84% rename from drivers/strategy/limiting-stragety/actuator-handle.go rename to drivers/strategy/limiting-strategy/actuator-handle.go index 7daa3e03..c90c88b3 100644 --- a/drivers/strategy/limiting-stragety/actuator-handle.go +++ b/drivers/strategy/limiting-strategy/actuator-handle.go @@ -1,7 +1,7 @@ -package limiting_stragety +package limiting_strategy import ( - "github.com/eolinker/apinto/drivers/strategy/limiting-stragety/scalar" + "github.com/eolinker/apinto/drivers/strategy/limiting-strategy/scalar" "github.com/eolinker/eosc/eocontext" ) diff --git a/drivers/strategy/limiting-stragety/actuator.go b/drivers/strategy/limiting-strategy/actuator.go similarity index 95% rename from drivers/strategy/limiting-stragety/actuator.go rename to drivers/strategy/limiting-strategy/actuator.go index 451c2223..5a7408be 100644 --- a/drivers/strategy/limiting-stragety/actuator.go +++ b/drivers/strategy/limiting-strategy/actuator.go @@ -1,7 +1,7 @@ -package limiting_stragety +package limiting_strategy import ( - "github.com/eolinker/apinto/drivers/strategy/limiting-stragety/scalar" + "github.com/eolinker/apinto/drivers/strategy/limiting-strategy/scalar" "github.com/eolinker/apinto/strategy" "github.com/eolinker/eosc/eocontext" "sort" diff --git a/drivers/strategy/limiting-stragety/config.go b/drivers/strategy/limiting-strategy/config.go similarity index 93% rename from drivers/strategy/limiting-stragety/config.go rename to drivers/strategy/limiting-strategy/config.go index 5e543ed6..87cdc7a5 100644 --- a/drivers/strategy/limiting-stragety/config.go +++ b/drivers/strategy/limiting-strategy/config.go @@ -1,4 +1,4 @@ -package limiting_stragety +package limiting_strategy import "github.com/eolinker/apinto/strategy" @@ -16,7 +16,7 @@ type Rule struct { type Config struct { Name string `json:"name" skip:"skip"` Description string `json:"description" skip:"skip"` - Stop bool `json:"stop" ` + Stop bool `json:"stop"` Priority int `json:"priority" label:"优先级" description:"1-999"` Filters strategy.FilterConfig `json:"filters" label:"过滤规则"` Rule Rule `json:"limiting" label:"限流规则" description:"限流规则"` diff --git a/drivers/strategy/limiting-strategy/controller.go b/drivers/strategy/limiting-strategy/controller.go new file mode 100644 index 00000000..35e79fba --- /dev/null +++ b/drivers/strategy/limiting-strategy/controller.go @@ -0,0 +1,85 @@ +package limiting_strategy + +import ( + "github.com/eolinker/eosc" + "reflect" +) + +var ( + controller = NewController() + _ eosc.ISetting = controller + _ IController = controller +) + +type IController interface { + Store(id string) + Del(id string) +} +type Controller struct { + profession string + driver string + all map[string]struct{} +} + +func (c *Controller) Store(id string) { + c.all[id] = struct{}{} +} + +func (c *Controller) Del(id string) { + delete(c.all, id) +} + +func (c *Controller) ConfigType() reflect.Type { + return configType +} + +func (c *Controller) Set(conf interface{}) (err error) { + return eosc.ErrorUnsupportedKind +} + +func (c *Controller) Get() interface{} { + return nil +} + +func (c *Controller) Mode() eosc.SettingMode { + return eosc.SettingModeBatch +} + +func (c *Controller) Check(cfg interface{}) (profession, name, driver, desc string, err error) { + conf, ok := cfg.(*Config) + if !ok { + err = eosc.ErrorConfigIsNil + return + } + if empty(conf.Name) { + err = eosc.ErrorConfigFieldUnknown + return + } + err = checkConfig(conf) + if err != nil { + return + } + return c.profession, conf.Name, c.driver, conf.Description, nil + +} +func empty(vs ...string) bool { + for _, v := range vs { + if len(v) == 0 { + return true + } + } + return false +} +func (c *Controller) AllWorkers() []string { + ws := make([]string, 0, len(c.all)) + for id := range c.all { + ws = append(ws, id) + } + return ws +} + +func NewController() *Controller { + return &Controller{ + all: map[string]struct{}{}, + } +} diff --git a/drivers/strategy/limiting-stragety/driver.go b/drivers/strategy/limiting-strategy/driver.go similarity index 97% rename from drivers/strategy/limiting-stragety/driver.go rename to drivers/strategy/limiting-strategy/driver.go index ae49175f..9126d6ca 100644 --- a/drivers/strategy/limiting-stragety/driver.go +++ b/drivers/strategy/limiting-strategy/driver.go @@ -1,4 +1,4 @@ -package limiting_stragety +package limiting_strategy import ( "fmt" diff --git a/drivers/strategy/limiting-stragety/factory.go b/drivers/strategy/limiting-strategy/factory.go similarity index 97% rename from drivers/strategy/limiting-stragety/factory.go rename to drivers/strategy/limiting-strategy/factory.go index 3479777c..eb0a0b27 100644 --- a/drivers/strategy/limiting-stragety/factory.go +++ b/drivers/strategy/limiting-strategy/factory.go @@ -1,4 +1,4 @@ -package limiting_stragety +package limiting_strategy import ( "github.com/eolinker/eosc" diff --git a/drivers/strategy/limiting-stragety/handler.go b/drivers/strategy/limiting-strategy/handler.go similarity index 96% rename from drivers/strategy/limiting-stragety/handler.go rename to drivers/strategy/limiting-strategy/handler.go index d77ed0e8..e80be9c9 100644 --- a/drivers/strategy/limiting-stragety/handler.go +++ b/drivers/strategy/limiting-strategy/handler.go @@ -1,4 +1,4 @@ -package limiting_stragety +package limiting_strategy import ( "github.com/eolinker/apinto/metrics" @@ -57,11 +57,12 @@ func NewLimitingHandler(conf *Config) (*LimitingHandler, error) { mts := metrics.Parse(conf.Rule.Metrics) return &LimitingHandler{ + name: conf.Name, filter: filter, metrics: mts, - stop: conf.Stop, query: parseThreshold(conf.Rule.Query), traffic: parseThreshold(conf.Rule.Traffic), priority: conf.Priority, + stop: conf.Stop, }, nil } diff --git a/drivers/strategy/limiting-stragety/http-handler.go b/drivers/strategy/limiting-strategy/http-handler.go similarity index 97% rename from drivers/strategy/limiting-stragety/http-handler.go rename to drivers/strategy/limiting-strategy/http-handler.go index caf02dde..3a9188d9 100644 --- a/drivers/strategy/limiting-stragety/http-handler.go +++ b/drivers/strategy/limiting-strategy/http-handler.go @@ -1,8 +1,8 @@ -package limiting_stragety +package limiting_strategy import ( "errors" - "github.com/eolinker/apinto/drivers/strategy/limiting-stragety/scalar" + "github.com/eolinker/apinto/drivers/strategy/limiting-strategy/scalar" "github.com/eolinker/eosc/eocontext" http_service "github.com/eolinker/eosc/eocontext/http-context" "github.com/eolinker/eosc/log" diff --git a/drivers/strategy/limiting-stragety/limiting.go b/drivers/strategy/limiting-strategy/limiting.go similarity index 97% rename from drivers/strategy/limiting-stragety/limiting.go rename to drivers/strategy/limiting-strategy/limiting.go index b8db9c65..dfb54910 100644 --- a/drivers/strategy/limiting-stragety/limiting.go +++ b/drivers/strategy/limiting-strategy/limiting.go @@ -1,4 +1,4 @@ -package limiting_stragety +package limiting_strategy import ( "fmt" diff --git a/drivers/strategy/limiting-stragety/scalar/scalar.go b/drivers/strategy/limiting-strategy/scalar/scalar.go similarity index 100% rename from drivers/strategy/limiting-stragety/scalar/scalar.go rename to drivers/strategy/limiting-strategy/scalar/scalar.go diff --git a/drivers/strategy/limiting-stragety/scalar/vectors.go b/drivers/strategy/limiting-strategy/scalar/vectors.go similarity index 100% rename from drivers/strategy/limiting-stragety/scalar/vectors.go rename to drivers/strategy/limiting-strategy/scalar/vectors.go From 3dd380ca00c45f6fda24f59f64e7557fd7de804d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=AD=9F=E6=9F=B1?= Date: Tue, 11 Oct 2022 14:07:10 +0800 Subject: [PATCH 02/19] cache --- cache/cache.go | 5 +++++ go.mod | 16 ++++------------ 2 files changed, 9 insertions(+), 12 deletions(-) create mode 100644 cache/cache.go diff --git a/cache/cache.go b/cache/cache.go new file mode 100644 index 00000000..75af7ab9 --- /dev/null +++ b/cache/cache.go @@ -0,0 +1,5 @@ +package cache + +type ICache interface { + Set(key string, value) +} diff --git a/go.mod b/go.mod index 9725acb9..8797e405 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/Shopify/sarama v1.32.0 github.com/eolinker/eosc v0.6.2 github.com/go-basic/uuid v1.0.0 + github.com/go-redis/redis/v8 v8.11.5 github.com/hashicorp/consul/api v1.9.1 github.com/nsqio/go-nsq v1.1.0 github.com/ohler55/ojg v1.12.9 @@ -20,11 +21,12 @@ require ( github.com/andybalholm/brotli v1.0.2 // indirect github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.1.1 // indirect + github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/eapache/go-resiliency v1.2.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect @@ -32,12 +34,10 @@ require ( github.com/fatih/color v1.9.0 // indirect github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect github.com/ghodss/yaml v1.0.0 // indirect - github.com/go-ole/go-ole v1.2.6 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/btree v1.0.1 // indirect - github.com/google/gops v0.3.25 // indirect github.com/google/uuid v1.1.2 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect @@ -56,10 +56,8 @@ require ( github.com/jonboulle/clockwork v0.2.2 // indirect github.com/json-iterator/go v1.1.11 // indirect github.com/julienschmidt/httprouter v1.3.0 // indirect - github.com/keybase/go-ps v0.0.0-20190827175125-91aafc93ba19 // indirect github.com/klauspost/compress v1.14.4 // indirect github.com/kr/fs v0.1.0 // indirect - github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/mattn/go-colorable v0.1.6 // indirect github.com/mattn/go-isatty v0.0.12 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect @@ -68,23 +66,18 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.1 // indirect github.com/pierrec/lz4 v2.6.1+incompatible // indirect - github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/prometheus/client_golang v1.11.1 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.26.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/russross/blackfriday/v2 v2.0.1 // indirect - github.com/shirou/gopsutil/v3 v3.22.4 // indirect github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect github.com/soheilhy/cmux v0.1.5 // indirect - github.com/tklauser/go-sysconf v0.3.10 // indirect - github.com/tklauser/numcpus v0.4.0 // indirect + github.com/stretchr/testify v1.7.1 // indirect github.com/urfave/cli/v2 v2.3.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect - github.com/xlab/treeprint v1.1.0 // indirect - github.com/yusufpapurcu/wmi v1.2.2 // indirect go.etcd.io/bbolt v1.3.6 // indirect go.etcd.io/etcd/api/v3 v3.5.4 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect @@ -111,7 +104,6 @@ require ( gopkg.in/sourcemap.v1 v1.0.5 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect - rsc.io/goversion v1.2.0 // indirect ) replace github.com/eolinker/eosc => ../eosc From e60684dc4c80402e76897dd46fc818ae173f6fe1 Mon Sep 17 00:00:00 2001 From: zhangzeyi Date: Tue, 11 Oct 2022 14:36:48 +0800 Subject: [PATCH 03/19] =?UTF-8?q?cacheControl=E6=96=B0=E5=A2=9EisCache?= =?UTF-8?q?=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- drivers/strategy/cache-strategy/actuator.go | 33 ++++++++++++++++----- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/drivers/strategy/cache-strategy/actuator.go b/drivers/strategy/cache-strategy/actuator.go index 07f5e401..62db2997 100644 --- a/drivers/strategy/cache-strategy/actuator.go +++ b/drivers/strategy/cache-strategy/actuator.go @@ -79,9 +79,8 @@ func (a *tActuator) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) err uri := httpCtx.Request().URI().RequestURI() isCache := false - controlMap := parseCacheControl(httpCtx) - if !controlMap.NoCache() && controlMap.IsPublic() && httpCtx.Request().Method() == http.MethodGet { - isCache = true + if httpCtx.Request().Method() == http.MethodGet { + isCache = parseCacheControl(httpCtx).IsCache() } if isCache { @@ -95,11 +94,6 @@ func (a *tActuator) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) err } if handler.filter.Check(ctx) { - maxAge := controlMap.MaxAge() - if maxAge > 0 { //todo 已经设置过缓存时间 - - } - localCache := cache.GetCache(uri) if localCache != nil { httpCtx.Response().SetBody(localCache.Body) @@ -177,7 +171,30 @@ func (c cacheControlMap) MaxAge() int { return 0 } +func (c cacheControlMap) IsCache() bool { + if c.MaxAge() == 0 { + return false + } + if c.NoCache() { + return false + } + if !c.IsPublic() { + return false + } + if _, ok := c["no-store"]; ok { + return false + } + return true +} + func (c cacheControlMap) IsPublic() bool { + if _, ok := c["Authorization"]; ok { + if _, pOk := c["public"]; pOk { + return true + } else { + return false + } + } //只要不是私有的 都算公有 if _, ok := c["private"]; ok { return false From aa29d00096604baba029e74462839b4f5995f34f Mon Sep 17 00:00:00 2001 From: zhangzeyi Date: Tue, 11 Oct 2022 15:09:26 +0800 Subject: [PATCH 04/19] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E7=BC=93=E5=AD=98freeC?= =?UTF-8?q?ache?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- drivers/strategy/cache-strategy/actuator.go | 3 ++- .../strategy/cache-strategy/cache/cache.go | 27 ++++++++++++++----- go.mod | 15 +++-------- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/drivers/strategy/cache-strategy/actuator.go b/drivers/strategy/cache-strategy/actuator.go index 62db2997..e256363d 100644 --- a/drivers/strategy/cache-strategy/actuator.go +++ b/drivers/strategy/cache-strategy/actuator.go @@ -20,6 +20,7 @@ func init() { actuator := newtActuator() actuatorSet = actuator strategy.AddStrategyHandler(actuator) + cache.NewCache() } type ActuatorSet interface { @@ -120,7 +121,7 @@ func (a *tActuator) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) err } } - localCache = &cache.Cache{ + localCache = &cache.ResponseData{ Header: header, Body: httpCtx.Response().GetBody(), } diff --git a/drivers/strategy/cache-strategy/cache/cache.go b/drivers/strategy/cache-strategy/cache/cache.go index 534f9e59..f2e1f4f8 100644 --- a/drivers/strategy/cache-strategy/cache/cache.go +++ b/drivers/strategy/cache-strategy/cache/cache.go @@ -1,16 +1,31 @@ package cache -//todo 本地缓存demo +import ( + "encoding/json" + "github.com/coocood/freecache" +) -type Cache struct { +var freeCache *freecache.Cache + +func NewCache() { + freeCache = freecache.NewCache(0) +} + +type ResponseData struct { Header map[string]string Body []byte } -func SetCache(uri string, cache *Cache, validTime int) { - +func SetCache(uri string, data *ResponseData, validTime int) { + bytes, _ := json.Marshal(data) + _ = freeCache.Set([]byte(uri), bytes, validTime) } -func GetCache(uri string) *Cache { - return nil +func GetCache(uri string) *ResponseData { + bytes, _ := freeCache.Get([]byte(uri)) + data := new(ResponseData) + if err := json.Unmarshal(bytes, data); err != nil { + return nil + } + return data } diff --git a/go.mod b/go.mod index 9725acb9..245a3a7a 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.17 require ( github.com/Shopify/sarama v1.32.0 + github.com/coocood/freecache v1.2.2 github.com/eolinker/eosc v0.6.2 github.com/go-basic/uuid v1.0.0 github.com/hashicorp/consul/api v1.9.1 @@ -20,7 +21,7 @@ require ( github.com/andybalholm/brotli v1.0.2 // indirect github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.1.1 // indirect + github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect @@ -32,12 +33,10 @@ require ( github.com/fatih/color v1.9.0 // indirect github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect github.com/ghodss/yaml v1.0.0 // indirect - github.com/go-ole/go-ole v1.2.6 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/btree v1.0.1 // indirect - github.com/google/gops v0.3.25 // indirect github.com/google/uuid v1.1.2 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect @@ -56,10 +55,8 @@ require ( github.com/jonboulle/clockwork v0.2.2 // indirect github.com/json-iterator/go v1.1.11 // indirect github.com/julienschmidt/httprouter v1.3.0 // indirect - github.com/keybase/go-ps v0.0.0-20190827175125-91aafc93ba19 // indirect github.com/klauspost/compress v1.14.4 // indirect github.com/kr/fs v0.1.0 // indirect - github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/mattn/go-colorable v0.1.6 // indirect github.com/mattn/go-isatty v0.0.12 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect @@ -68,23 +65,18 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.1 // indirect github.com/pierrec/lz4 v2.6.1+incompatible // indirect - github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/prometheus/client_golang v1.11.1 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.26.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/russross/blackfriday/v2 v2.0.1 // indirect - github.com/shirou/gopsutil/v3 v3.22.4 // indirect github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect github.com/soheilhy/cmux v0.1.5 // indirect - github.com/tklauser/go-sysconf v0.3.10 // indirect - github.com/tklauser/numcpus v0.4.0 // indirect + github.com/stretchr/testify v1.7.1 // indirect github.com/urfave/cli/v2 v2.3.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect - github.com/xlab/treeprint v1.1.0 // indirect - github.com/yusufpapurcu/wmi v1.2.2 // indirect go.etcd.io/bbolt v1.3.6 // indirect go.etcd.io/etcd/api/v3 v3.5.4 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect @@ -111,7 +103,6 @@ require ( gopkg.in/sourcemap.v1 v1.0.5 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect - rsc.io/goversion v1.2.0 // indirect ) replace github.com/eolinker/eosc => ../eosc From ab17080fd42fa149db034135e7afc88d62fdb31b Mon Sep 17 00:00:00 2001 From: zhangzeyi Date: Tue, 11 Oct 2022 16:09:06 +0800 Subject: [PATCH 05/19] =?UTF-8?q?cache-control=E6=94=B9=E4=B8=BA=E4=BB=8Er?= =?UTF-8?q?esponse=E4=B8=AD=E8=AF=BB=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- drivers/strategy/cache-strategy/actuator.go | 42 +++++++++---------- .../strategy/cache-strategy/cache/cache.go | 4 +- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/drivers/strategy/cache-strategy/actuator.go b/drivers/strategy/cache-strategy/actuator.go index e256363d..808cbb5b 100644 --- a/drivers/strategy/cache-strategy/actuator.go +++ b/drivers/strategy/cache-strategy/actuator.go @@ -77,14 +77,7 @@ func (a *tActuator) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) err return err } - uri := httpCtx.Request().URI().RequestURI() - - isCache := false if httpCtx.Request().Method() == http.MethodGet { - isCache = parseCacheControl(httpCtx).IsCache() - } - - if isCache { a.lock.RLock() handlers := a.handlers a.lock.RUnlock() @@ -95,10 +88,12 @@ func (a *tActuator) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) err } if handler.filter.Check(ctx) { - localCache := cache.GetCache(uri) - if localCache != nil { - httpCtx.Response().SetBody(localCache.Body) - for key, val := range localCache.Header { + uri := httpCtx.Request().URI().RequestURI() + responseData := cache.GetResponseData(uri) + + if responseData != nil { + httpCtx.Response().SetBody(responseData.Body) + for key, val := range responseData.Header { httpCtx.Response().SetHeader(key, val) } return nil @@ -109,23 +104,28 @@ func (a *tActuator) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) err return err } } + httpCtx, err = http_service.Assert(ctx) if err != nil { return err } - header := make(map[string]string) - for key, values := range httpCtx.Response().Headers() { - if len(values) > 0 { - header[key] = values[0] + //从cache-control中判断是否需要缓存 + if parseCacheControl(httpCtx).IsCache() { + header := make(map[string]string) + for key, values := range httpCtx.Response().Headers() { + if len(values) > 0 { + header[key] = values[0] + } } - } - localCache = &cache.ResponseData{ - Header: header, - Body: httpCtx.Response().GetBody(), + responseData = &cache.ResponseData{ + Header: header, + Body: httpCtx.Response().GetBody(), + } + cache.SetResponseData(uri, responseData, handler.validTime) } - cache.SetCache(uri, localCache, handler.validTime) + return nil } @@ -206,7 +206,7 @@ func (c cacheControlMap) IsPublic() bool { func parseCacheControl(httpCtx http_service.IHttpContext) cacheControlMap { cc := cacheControlMap{} - header := httpCtx.Request().Header().GetHeader("Cache-Control") + header := httpCtx.Response().GetHeader("Cache-Control") for _, part := range strings.Split(header, ",") { part = strings.Trim(part, " ") if part == "" { diff --git a/drivers/strategy/cache-strategy/cache/cache.go b/drivers/strategy/cache-strategy/cache/cache.go index f2e1f4f8..1245dcb3 100644 --- a/drivers/strategy/cache-strategy/cache/cache.go +++ b/drivers/strategy/cache-strategy/cache/cache.go @@ -16,12 +16,12 @@ type ResponseData struct { Body []byte } -func SetCache(uri string, data *ResponseData, validTime int) { +func SetResponseData(uri string, data *ResponseData, validTime int) { bytes, _ := json.Marshal(data) _ = freeCache.Set([]byte(uri), bytes, validTime) } -func GetCache(uri string) *ResponseData { +func GetResponseData(uri string) *ResponseData { bytes, _ := freeCache.Get([]byte(uri)) data := new(ResponseData) if err := json.Unmarshal(bytes, data); err != nil { From 0246181df6da857a30bb7730ab34f51ccffc4e66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=AD=9F=E6=9F=B1?= Date: Tue, 11 Oct 2022 17:28:45 +0800 Subject: [PATCH 06/19] redis --- cache/cache.go | 5 --- drivers/resources/redis/config.go | 4 ++ drivers/resources/redis/controller.go | 52 +++++++++++++++++++++++++ drivers/resources/redis/factory.go | 21 ++++++++++ drivers/resources/redis/redis.go | 51 ++++++++++++++++++++++++ resources/cache-no.go | 39 +++++++++++++++++++ resources/cache.go | 56 +++++++++++++++++++++++++++ resources/cache_test.go | 23 +++++++++++ 8 files changed, 246 insertions(+), 5 deletions(-) delete mode 100644 cache/cache.go create mode 100644 drivers/resources/redis/config.go create mode 100644 drivers/resources/redis/controller.go create mode 100644 drivers/resources/redis/factory.go create mode 100644 drivers/resources/redis/redis.go create mode 100644 resources/cache-no.go create mode 100644 resources/cache.go create mode 100644 resources/cache_test.go diff --git a/cache/cache.go b/cache/cache.go deleted file mode 100644 index 75af7ab9..00000000 --- a/cache/cache.go +++ /dev/null @@ -1,5 +0,0 @@ -package cache - -type ICache interface { - Set(key string, value) -} diff --git a/drivers/resources/redis/config.go b/drivers/resources/redis/config.go new file mode 100644 index 00000000..0e4882b0 --- /dev/null +++ b/drivers/resources/redis/config.go @@ -0,0 +1,4 @@ +package redis + +type Config struct { +} diff --git a/drivers/resources/redis/controller.go b/drivers/resources/redis/controller.go new file mode 100644 index 00000000..c3095364 --- /dev/null +++ b/drivers/resources/redis/controller.go @@ -0,0 +1,52 @@ +package redis + +import ( + "github.com/eolinker/eosc" + "github.com/eolinker/eosc/env" + "github.com/go-redis/redis/v8" + "reflect" +) + +type Controller struct { + current *_Cacher + config Config +} + +func (m *Controller) ConfigType() reflect.Type { + return configType +} + +func (m *Controller) Set(conf interface{}) (err error) { + config, ok := conf.(*Config) + if ok && config != nil { + old := m.config + m.config = *config + + if env.Process() == eosc.ProcessWorker { + // todo open or close redis + } + redis.NewClusterClient().Close() + } + return nil +} + +func (m *Controller) Get() interface{} { + return m.config +} + +func (m *Controller) Mode() eosc.SettingMode { + return eosc.SettingModeSingleton +} + +func (m *Controller) Check(cfg interface{}) (profession, name, driver, desc string, err error) { + err = eosc.ErrorUnsupportedKind + return +} + +func (m *Controller) AllWorkers() []string { + return []string{"redis@setting"} +} + +func NewController() *Controller { + return &Controller{} +} diff --git a/drivers/resources/redis/factory.go b/drivers/resources/redis/factory.go new file mode 100644 index 00000000..9362711b --- /dev/null +++ b/drivers/resources/redis/factory.go @@ -0,0 +1,21 @@ +package redis + +import ( + "github.com/eolinker/eosc" + "github.com/eolinker/eosc/setting" + "reflect" +) + +var ( + singleton *Controller + _ eosc.ISetting = singleton + configType = reflect.TypeOf(new(Config)) +) + +func init() { + singleton = NewController() +} + +func Register(register eosc.IExtenderDriverRegister) { + setting.RegisterSetting("redis", singleton) +} diff --git a/drivers/resources/redis/redis.go b/drivers/resources/redis/redis.go new file mode 100644 index 00000000..afea93cf --- /dev/null +++ b/drivers/resources/redis/redis.go @@ -0,0 +1,51 @@ +package redis + +import ( + "context" + "github.com/go-redis/redis/v8" + "time" +) + +type _Cacher struct { + client *redis.ClusterClient +} + +func (r *_Cacher) Set(ctx context.Context, key string, value []byte, expiration time.Duration) error { + + return r.client.Set(ctx, key, value, expiration).Err() +} + +func (r *_Cacher) SetNX(ctx context.Context, key string, value []byte, expiration time.Duration) (bool, error) { + + return r.client.SetNX(ctx, key, value, expiration).Result() +} + +func (r *_Cacher) DecrBy(ctx context.Context, key string, decrement int64) (int64, error) { + + return r.client.DecrBy(ctx, key, decrement).Result() +} + +func (r *_Cacher) IncrBy(ctx context.Context, key string, decrement int64) (int64, error) { + return r.client.IncrBy(ctx, key, decrement).Result() +} + +func (r *_Cacher) Get(ctx context.Context, key string) ([]byte, error) { + return r.client.Get(ctx, key).Bytes() + +} + +func (r *_Cacher) GetDel(ctx context.Context, key string) ([]byte, error) { + return r.client.GetDel(ctx, key).Bytes() + +} + +func (r *_Cacher) Del(ctx context.Context, keys ...string) (int64, error) { + return r.client.Del(ctx, keys...).Result() +} + +func newCacher(client *redis.ClusterClient) *_Cacher { + if client == nil { + return nil + } + return &_Cacher{client: client} +} diff --git a/resources/cache-no.go b/resources/cache-no.go new file mode 100644 index 00000000..ba9adbc6 --- /dev/null +++ b/resources/cache-no.go @@ -0,0 +1,39 @@ +package resources + +import ( + "context" + "time" +) + +type NoCache struct { +} + +func (n *NoCache) Set(ctx context.Context, key string, value []byte, expiration time.Duration) error { + return ErrorNoCache +} + +func (n *NoCache) SetNX(ctx context.Context, key string, value []byte, expiration time.Duration) (bool, error) { + return false, ErrorNoCache +} + +func (n *NoCache) DecrBy(ctx context.Context, key string, decrement int64) (int64, error) { + return 0, ErrorNoCache +} + +func (n *NoCache) IncrBy(ctx context.Context, key string, decrement int64) (int64, error) { + return 0, ErrorNoCache + +} + +func (n *NoCache) Get(ctx context.Context, key string) ([]byte, error) { + return nil, ErrorNoCache + +} + +func (n *NoCache) GetDel(ctx context.Context, key string) ([]byte, error) { + return nil, ErrorNoCache +} + +func (n *NoCache) Del(ctx context.Context, keys ...string) (int64, error) { + return 0, ErrorNoCache +} diff --git a/resources/cache.go b/resources/cache.go new file mode 100644 index 00000000..e5ac0680 --- /dev/null +++ b/resources/cache.go @@ -0,0 +1,56 @@ +package resources + +import ( + "context" + "errors" + "time" +) + +var ( + ErrorNoCache = errors.New("no cache") + _ ICaches = (*_Proxy)(nil) +) +var ( + singCacheProxy *_Proxy +) + +func init() { + singCacheProxy = newProxy(new(NoCache)) +} +func Replace(caches ...ICache) { + if len(caches) < 1 || caches[0] == nil { + singCacheProxy.ICache, singCacheProxy.had = new(NoCache), false + return + } + singCacheProxy.ICache, singCacheProxy.had = caches[0], false +} + +func Cacher() ICaches { + return singCacheProxy +} + +type ICache interface { + Set(ctx context.Context, key string, value []byte, expiration time.Duration) error + SetNX(ctx context.Context, key string, value []byte, expiration time.Duration) (bool, error) + DecrBy(ctx context.Context, key string, decrement int64) (int64, error) + IncrBy(ctx context.Context, key string, decrement int64) (int64, error) + Get(ctx context.Context, key string) ([]byte, error) + GetDel(ctx context.Context, key string) ([]byte, error) + Del(ctx context.Context, keys ...string) (int64, error) +} +type ICaches interface { + ICache + HasCache() bool +} +type _Proxy struct { + had bool + ICache +} + +func (p *_Proxy) HasCache() bool { + return p.had +} + +func newProxy(target ICache) *_Proxy { + return &_Proxy{ICache: target} +} diff --git a/resources/cache_test.go b/resources/cache_test.go new file mode 100644 index 00000000..b0eda2ba --- /dev/null +++ b/resources/cache_test.go @@ -0,0 +1,23 @@ +package resources + +import "testing" + +func TestReplace(t *testing.T) { + type args struct { + caches []ICache + } + tests := []struct { + name string + args args + }{ + { + name: "nil", + args: args{nil}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + Replace(tt.args.caches...) + }) + } +} From 8cd6a0550f7e79f12847add161e4c59b85d8b7af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=AD=9F=E6=9F=B1?= Date: Wed, 12 Oct 2022 10:43:42 +0800 Subject: [PATCH 07/19] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- drivers/strategy/cache-strategy/actuator.go | 110 +++++++++--------- .../strategy/cache-strategy/cache/cache.go | 14 +++ 2 files changed, 72 insertions(+), 52 deletions(-) diff --git a/drivers/strategy/cache-strategy/actuator.go b/drivers/strategy/cache-strategy/actuator.go index 808cbb5b..d8411664 100644 --- a/drivers/strategy/cache-strategy/actuator.go +++ b/drivers/strategy/cache-strategy/actuator.go @@ -77,60 +77,28 @@ func (a *tActuator) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) err return err } - if httpCtx.Request().Method() == http.MethodGet { - a.lock.RLock() - handlers := a.handlers - a.lock.RUnlock() - - for _, handler := range handlers { - if handler.stop { - continue - } - if handler.filter.Check(ctx) { - - uri := httpCtx.Request().URI().RequestURI() - responseData := cache.GetResponseData(uri) - - if responseData != nil { - httpCtx.Response().SetBody(responseData.Body) - for key, val := range responseData.Header { - httpCtx.Response().SetHeader(key, val) - } - return nil - } else { - //拿不到说明已经过期了 - if next != nil { - if err = next.DoChain(ctx); err != nil { - return err - } - } - - httpCtx, err = http_service.Assert(ctx) - if err != nil { - return err - } - - //从cache-control中判断是否需要缓存 - if parseCacheControl(httpCtx).IsCache() { - header := make(map[string]string) - for key, values := range httpCtx.Response().Headers() { - if len(values) > 0 { - header[key] = values[0] - } - } - - responseData = &cache.ResponseData{ - Header: header, - Body: httpCtx.Response().GetBody(), - } - cache.SetResponseData(uri, responseData, handler.validTime) - } - - return nil - } + if httpCtx.Request().Method() != http.MethodGet { + return next.DoChain(ctx) + } + a.lock.RLock() + handlers := a.handlers + a.lock.RUnlock() - } + for _, handler := range handlers { + if handler.stop { + continue + } + if handler.filter.Check(httpCtx) { + + uri := httpCtx.Request().URI().RequestURI() + responseData := cache.GetResponseData(uri) + if responseData != nil { + httpCtx.SetCompleteHandler(responseData) + } else { + httpCtx.SetCompleteHandler(NewCacheGetCompleteHandler(httpCtx.GetComplete(), handler.validTime)) + } + break } } @@ -140,6 +108,44 @@ func (a *tActuator) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) err return nil } +type CacheGetCompleteHandler struct { + orgHandler eocontext.CompleteHandler + validTime int +} + +func NewCacheGetCompleteHandler(orgHandler eocontext.CompleteHandler, validTime int) *CacheGetCompleteHandler { + return &CacheGetCompleteHandler{orgHandler: orgHandler, validTime: validTime} +} + +func (c *CacheGetCompleteHandler) Complete(ctx eocontext.EoContext) error { + + err := c.orgHandler.Complete(ctx) + if err != nil { + return err + } + httpCtx, err2 := http_service.Assert(ctx) + if err2 != nil { + return nil + } + + //从cache-control中判断是否需要缓存 + if parseCacheControl(httpCtx).IsCache() { + header := make(map[string]string) + for key, values := range httpCtx.Response().Headers() { + if len(values) > 0 { + header[key] = values[0] + } + } + + responseData := &cache.ResponseData{ + Header: header, + Body: httpCtx.Response().GetBody(), + } + cache.SetResponseData(httpCtx.Request().URI().RequestURI(), responseData, c.validTime) + } + return nil +} + type handlerListSort []*CacheValidTimeHandler func (hs handlerListSort) Len() int { diff --git a/drivers/strategy/cache-strategy/cache/cache.go b/drivers/strategy/cache-strategy/cache/cache.go index 1245dcb3..e336404b 100644 --- a/drivers/strategy/cache-strategy/cache/cache.go +++ b/drivers/strategy/cache-strategy/cache/cache.go @@ -3,6 +3,8 @@ package cache import ( "encoding/json" "github.com/coocood/freecache" + "github.com/eolinker/eosc/eocontext" + http_service "github.com/eolinker/eosc/eocontext/http-context" ) var freeCache *freecache.Cache @@ -16,6 +18,18 @@ type ResponseData struct { Body []byte } +func (r *ResponseData) Complete(ctx eocontext.EoContext) error { + httpCtx, err := http_service.Assert(ctx) + if err != nil { + return err + } + httpCtx.Response().SetBody(r.Body) + for key, val := range r.Header { + httpCtx.Response().SetHeader(key, val) + } + return nil +} + func SetResponseData(uri string, data *ResponseData, validTime int) { bytes, _ := json.Marshal(data) _ = freeCache.Set([]byte(uri), bytes, validTime) From 6dcc992cebf765563b56f2f5834898ed2ce78722 Mon Sep 17 00:00:00 2001 From: zhangzeyi Date: Wed, 12 Oct 2022 14:24:37 +0800 Subject: [PATCH 08/19] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- drivers/strategy/cache-strategy/actuator.go | 106 ++++++++++-------- .../strategy/cache-strategy/cache/cache.go | 21 +++- drivers/strategy/cache-strategy/config.go | 2 +- 3 files changed, 80 insertions(+), 49 deletions(-) diff --git a/drivers/strategy/cache-strategy/actuator.go b/drivers/strategy/cache-strategy/actuator.go index d8411664..e8af4c5f 100644 --- a/drivers/strategy/cache-strategy/actuator.go +++ b/drivers/strategy/cache-strategy/actuator.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" "sync" + "time" ) var ( @@ -78,8 +79,12 @@ func (a *tActuator) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) err } if httpCtx.Request().Method() != http.MethodGet { - return next.DoChain(ctx) + if next != nil { + return next.DoChain(ctx) + } + return nil } + a.lock.RLock() handlers := a.handlers a.lock.RUnlock() @@ -96,7 +101,7 @@ func (a *tActuator) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) err if responseData != nil { httpCtx.SetCompleteHandler(responseData) } else { - httpCtx.SetCompleteHandler(NewCacheGetCompleteHandler(httpCtx.GetComplete(), handler.validTime)) + httpCtx.SetCompleteHandler(NewCacheGetCompleteHandler(httpCtx.GetComplete(), handler.validTime, uri)) } break } @@ -111,35 +116,37 @@ func (a *tActuator) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) err type CacheGetCompleteHandler struct { orgHandler eocontext.CompleteHandler validTime int + uri string } -func NewCacheGetCompleteHandler(orgHandler eocontext.CompleteHandler, validTime int) *CacheGetCompleteHandler { - return &CacheGetCompleteHandler{orgHandler: orgHandler, validTime: validTime} +func NewCacheGetCompleteHandler(orgHandler eocontext.CompleteHandler, validTime int, uri string) *CacheGetCompleteHandler { + return &CacheGetCompleteHandler{ + orgHandler: orgHandler, + validTime: validTime, + uri: uri, + } } func (c *CacheGetCompleteHandler) Complete(ctx eocontext.EoContext) error { - err := c.orgHandler.Complete(ctx) - if err != nil { - return err + if c.orgHandler != nil { + if err := c.orgHandler.Complete(ctx); err != nil { + return err + } } - httpCtx, err2 := http_service.Assert(ctx) - if err2 != nil { + + httpCtx, err := http_service.Assert(ctx) + if err != nil { return nil } //从cache-control中判断是否需要缓存 - if parseCacheControl(httpCtx).IsCache() { - header := make(map[string]string) - for key, values := range httpCtx.Response().Headers() { - if len(values) > 0 { - header[key] = values[0] - } - } - + if parseHttpContext(httpCtx).IsCache() { responseData := &cache.ResponseData{ - Header: header, - Body: httpCtx.Response().GetBody(), + Header: httpCtx.Response().Headers(), + Body: httpCtx.Response().GetBody(), + ValidTime: c.validTime, + Now: time.Now(), } cache.SetResponseData(httpCtx.Request().URI().RequestURI(), responseData, c.validTime) } @@ -161,69 +168,78 @@ func (hs handlerListSort) Swap(i, j int) { hs[i], hs[j] = hs[j], hs[i] } -type cacheControlMap map[string]string +type httpContext struct { + cacheControl map[string]string + reqHeader http.Header + resHeader http.Header +} -func (c cacheControlMap) NoCache() bool { - if _, ok := c["no-cache"]; ok { +func (c httpContext) NoCache() bool { + if _, ok := c.cacheControl["no-cache"]; ok { return true } return false } -func (c cacheControlMap) MaxAge() int { - if maxAgeStr, ok := c["max-age"]; ok { +func (c httpContext) IsCache() bool { + if maxAgeStr, ok := c.cacheControl["max-age"]; ok { maxAge, _ := strconv.Atoi(maxAgeStr) - return maxAge + if maxAge == 0 { + return false + } } - return 0 -} -func (c cacheControlMap) IsCache() bool { - if c.MaxAge() == 0 { - return false - } if c.NoCache() { return false } + if !c.IsPublic() { return false } - if _, ok := c["no-store"]; ok { + + if _, ok := c.cacheControl["no-store"]; ok { return false } + return true } -func (c cacheControlMap) IsPublic() bool { - if _, ok := c["Authorization"]; ok { - if _, pOk := c["public"]; pOk { +func (c httpContext) IsPublic() bool { + if _, ok := c.reqHeader["Authorization"]; ok { + if _, pOk := c.cacheControl["public"]; pOk { return true - } else { - return false } + return false } + //只要不是私有的 都算公有 - if _, ok := c["private"]; ok { + if _, ok := c.cacheControl["private"]; ok { return false } return true } -func parseCacheControl(httpCtx http_service.IHttpContext) cacheControlMap { - cc := cacheControlMap{} +func parseHttpContext(httpCtx http_service.IHttpContext) httpContext { + hc := httpContext{ + cacheControl: make(map[string]string), + } + + hc.resHeader = httpCtx.Response().Headers() + hc.reqHeader = httpCtx.Request().Header().Headers() - header := httpCtx.Response().GetHeader("Cache-Control") - for _, part := range strings.Split(header, ",") { + cacheControlHeader := httpCtx.Response().GetHeader("Cache-Control") + for _, part := range strings.Split(cacheControlHeader, ",") { part = strings.Trim(part, " ") if part == "" { continue } if strings.ContainsRune(part, '=') { keyVal := strings.Split(part, "=") - cc[strings.Trim(keyVal[0], " ")] = strings.Trim(keyVal[1], ",") + hc.cacheControl[strings.Trim(keyVal[0], " ")] = strings.Trim(keyVal[1], ",") + } else { - cc[part] = "" + hc.cacheControl[part] = "" } } - return cc + return hc } diff --git a/drivers/strategy/cache-strategy/cache/cache.go b/drivers/strategy/cache-strategy/cache/cache.go index e336404b..aae36073 100644 --- a/drivers/strategy/cache-strategy/cache/cache.go +++ b/drivers/strategy/cache-strategy/cache/cache.go @@ -2,9 +2,12 @@ package cache import ( "encoding/json" + "fmt" "github.com/coocood/freecache" "github.com/eolinker/eosc/eocontext" http_service "github.com/eolinker/eosc/eocontext/http-context" + "net/http" + "time" ) var freeCache *freecache.Cache @@ -14,8 +17,10 @@ func NewCache() { } type ResponseData struct { - Header map[string]string - Body []byte + Header http.Header + Body []byte + ValidTime int + Now time.Time // 缓存存放的时间 } func (r *ResponseData) Complete(ctx eocontext.EoContext) error { @@ -25,8 +30,18 @@ func (r *ResponseData) Complete(ctx eocontext.EoContext) error { } httpCtx.Response().SetBody(r.Body) for key, val := range r.Header { - httpCtx.Response().SetHeader(key, val) + if len(val) > 0 { + httpCtx.Response().SetHeader(key, val[0]) + } } + httpCtx.Response().SetHeader("Date", time.Now().Format(time.RFC822)) + + //计算Age Age 的值通常接近于 0。表示此对象刚刚从原始服务器获取不久;其他的值则是表示代理服务器当前的系统时间与此应答中的通用头 Date 的值之差 + age := int(time.Now().Sub(r.Now).Seconds()) + + httpCtx.Response().Headers().Set("Age", fmt.Sprintf("%d", age)) + httpCtx.Response().Headers().Set("Cache-Control", fmt.Sprintf("%s=%d", "max-age", r.ValidTime)) + return nil } diff --git a/drivers/strategy/cache-strategy/config.go b/drivers/strategy/cache-strategy/config.go index 821eb5ef..0e8dd083 100644 --- a/drivers/strategy/cache-strategy/config.go +++ b/drivers/strategy/cache-strategy/config.go @@ -5,7 +5,7 @@ import "github.com/eolinker/apinto/strategy" type Config struct { Name string `json:"name" skip:"skip"` Description string `json:"description" skip:"skip"` - Stop bool `json:"stop" ` + Stop bool `json:"stop"` Priority int `json:"priority" label:"优先级" description:"1-999"` Filters strategy.FilterConfig `json:"filters" label:"过滤规则"` ValidTime int `json:"valid_time" label:"有效期" description:"有效期"` From c7415928e2e2323bfe55f29367c2ef5e291d16dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=AD=9F=E6=9F=B1?= Date: Wed, 12 Oct 2022 15:07:01 +0800 Subject: [PATCH 09/19] =?UTF-8?q?=E7=BC=93=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- drivers/resources/redis/config.go | 3 ++ drivers/resources/redis/controller.go | 55 +++++++++++++++++++++++++-- 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/drivers/resources/redis/config.go b/drivers/resources/redis/config.go index 0e4882b0..f8618b52 100644 --- a/drivers/resources/redis/config.go +++ b/drivers/resources/redis/config.go @@ -1,4 +1,7 @@ package redis type Config struct { + Addrs []string `json:"addrs" label:"redis 节点列表"` + Username string `json:"username"` + Password string `json:"password"` } diff --git a/drivers/resources/redis/controller.go b/drivers/resources/redis/controller.go index c3095364..584f44b1 100644 --- a/drivers/resources/redis/controller.go +++ b/drivers/resources/redis/controller.go @@ -1,8 +1,11 @@ package redis import ( + "context" + "github.com/eolinker/apinto/resources" "github.com/eolinker/eosc" "github.com/eolinker/eosc/env" + "github.com/eolinker/eosc/log" "github.com/go-redis/redis/v8" "reflect" ) @@ -15,17 +18,63 @@ type Controller struct { func (m *Controller) ConfigType() reflect.Type { return configType } - +func (m *Controller) shutdown() { + oldClient := m.current + if oldClient != nil { + m.current = nil + resources.Replace() + oldClient.client.Close() + } +} func (m *Controller) Set(conf interface{}) (err error) { config, ok := conf.(*Config) if ok && config != nil { old := m.config m.config = *config + if reflect.DeepEqual(old, m.config) { + return nil + } + + if len(m.config.Addrs) == 0 { + oldClient := m.current + if oldClient != nil { + resources.Replace() + m.current = nil + oldClient.client.Close() + } + return nil + } + + client := redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: m.config.Addrs, + Username: m.config.Username, + Password: m.config.Password, + }) + + if res, errPing := client.Ping(context.Background()).Result(); errPing != nil { + log.Info("ping redis:", res, " error:", err) + client.Close() + return errPing + } + if env.Process() == eosc.ProcessWorker { - // todo open or close redis + if m.current == nil { + m.current = newCacher(client) + resources.Replace(m.current) + } else { + m.current.client = client + } + } else { + client.Close() + } + } else { + oldClient := m.current + if oldClient != nil { + resources.Replace() + m.current = nil + oldClient.client.Close() } - redis.NewClusterClient().Close() } return nil } From 5eb3a1904ff4733bf5cf94de21a04939f45e006c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=AD=9F=E6=9F=B1?= Date: Wed, 12 Oct 2022 15:14:17 +0800 Subject: [PATCH 10/19] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=BC=93=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- drivers/resources/redis/controller.go | 8 ++++---- resources/cache.go | 22 +++++++--------------- resources/cache_test.go | 2 +- 3 files changed, 12 insertions(+), 20 deletions(-) diff --git a/drivers/resources/redis/controller.go b/drivers/resources/redis/controller.go index 584f44b1..7c9f64cb 100644 --- a/drivers/resources/redis/controller.go +++ b/drivers/resources/redis/controller.go @@ -22,7 +22,7 @@ func (m *Controller) shutdown() { oldClient := m.current if oldClient != nil { m.current = nil - resources.Replace() + resources.ReplaceCacher() oldClient.client.Close() } } @@ -39,7 +39,7 @@ func (m *Controller) Set(conf interface{}) (err error) { if len(m.config.Addrs) == 0 { oldClient := m.current if oldClient != nil { - resources.Replace() + resources.ReplaceCacher() m.current = nil oldClient.client.Close() } @@ -61,7 +61,7 @@ func (m *Controller) Set(conf interface{}) (err error) { if env.Process() == eosc.ProcessWorker { if m.current == nil { m.current = newCacher(client) - resources.Replace(m.current) + resources.ReplaceCacher(m.current) } else { m.current.client = client } @@ -71,7 +71,7 @@ func (m *Controller) Set(conf interface{}) (err error) { } else { oldClient := m.current if oldClient != nil { - resources.Replace() + resources.ReplaceCacher() m.current = nil oldClient.client.Close() } diff --git a/resources/cache.go b/resources/cache.go index e5ac0680..3f8c3083 100644 --- a/resources/cache.go +++ b/resources/cache.go @@ -7,8 +7,8 @@ import ( ) var ( - ErrorNoCache = errors.New("no cache") - _ ICaches = (*_Proxy)(nil) + ErrorNoCache = errors.New("no cache") + _ ICache = (*_Proxy)(nil) ) var ( singCacheProxy *_Proxy @@ -17,15 +17,15 @@ var ( func init() { singCacheProxy = newProxy(new(NoCache)) } -func Replace(caches ...ICache) { +func ReplaceCacher(caches ...ICache) { if len(caches) < 1 || caches[0] == nil { - singCacheProxy.ICache, singCacheProxy.had = new(NoCache), false + singCacheProxy.ICache = new(NoCache) return } - singCacheProxy.ICache, singCacheProxy.had = caches[0], false + singCacheProxy.ICache = caches[0] } -func Cacher() ICaches { +func Cacher() ICache { return singCacheProxy } @@ -38,19 +38,11 @@ type ICache interface { GetDel(ctx context.Context, key string) ([]byte, error) Del(ctx context.Context, keys ...string) (int64, error) } -type ICaches interface { - ICache - HasCache() bool -} + type _Proxy struct { - had bool ICache } -func (p *_Proxy) HasCache() bool { - return p.had -} - func newProxy(target ICache) *_Proxy { return &_Proxy{ICache: target} } diff --git a/resources/cache_test.go b/resources/cache_test.go index b0eda2ba..af19db67 100644 --- a/resources/cache_test.go +++ b/resources/cache_test.go @@ -17,7 +17,7 @@ func TestReplace(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - Replace(tt.args.caches...) + ReplaceCacher(tt.args.caches...) }) } } From 9a98b84eeb292b05ec1908267774e5a39fbb0431 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=AD=9F=E6=9F=B1?= Date: Thu, 13 Oct 2022 14:35:21 +0800 Subject: [PATCH 11/19] =?UTF-8?q?redis=20=E5=A2=9E=E5=8A=A0=20enable?= =?UTF-8?q?=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- drivers/resources/redis/config.go | 1 + drivers/resources/redis/controller.go | 60 ++++++++++++--------------- 2 files changed, 28 insertions(+), 33 deletions(-) diff --git a/drivers/resources/redis/config.go b/drivers/resources/redis/config.go index f8618b52..29c34d06 100644 --- a/drivers/resources/redis/config.go +++ b/drivers/resources/redis/config.go @@ -1,6 +1,7 @@ package redis type Config struct { + Enable bool `json:"enable"` Addrs []string `json:"addrs" label:"redis 节点列表"` Username string `json:"username"` Password string `json:"password"` diff --git a/drivers/resources/redis/controller.go b/drivers/resources/redis/controller.go index 7c9f64cb..1455c216 100644 --- a/drivers/resources/redis/controller.go +++ b/drivers/resources/redis/controller.go @@ -36,46 +36,40 @@ func (m *Controller) Set(conf interface{}) (err error) { return nil } - if len(m.config.Addrs) == 0 { - oldClient := m.current - if oldClient != nil { - resources.ReplaceCacher() - m.current = nil - oldClient.client.Close() - } - return nil - } + if m.config.Enable && len(m.config.Addrs) > 0 { - client := redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: m.config.Addrs, - Username: m.config.Username, - Password: m.config.Password, - }) + client := redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: m.config.Addrs, + Username: m.config.Username, + Password: m.config.Password, + }) - if res, errPing := client.Ping(context.Background()).Result(); errPing != nil { - log.Info("ping redis:", res, " error:", err) - client.Close() - return errPing - } + if res, errPing := client.Ping(context.Background()).Result(); errPing != nil { + log.Info("ping redis:", res, " error:", err) + client.Close() + return errPing + } - if env.Process() == eosc.ProcessWorker { - if m.current == nil { - m.current = newCacher(client) - resources.ReplaceCacher(m.current) + if env.Process() == eosc.ProcessWorker { + if m.current == nil { + m.current = newCacher(client) + resources.ReplaceCacher(m.current) + } else { + m.current.client = client + } } else { - m.current.client = client + client.Close() } - } else { - client.Close() - } - } else { - oldClient := m.current - if oldClient != nil { - resources.ReplaceCacher() - m.current = nil - oldClient.client.Close() + return nil } } + oldClient := m.current + if oldClient != nil { + resources.ReplaceCacher() + m.current = nil + oldClient.client.Close() + } + return nil } From a4d84b6b5c7bb0ee599c895c46aaf8e1e2d0e591 Mon Sep 17 00:00:00 2001 From: zhangzeyi Date: Thu, 13 Oct 2022 14:40:36 +0800 Subject: [PATCH 12/19] cache-no.go --- drivers/resources/redis/controller.go | 1 - .../cache-strategy/actuator-handle.go | 23 --------- drivers/strategy/cache-strategy/actuator.go | 4 -- .../strategy/cache-strategy/cache/cache.go | 13 ++--- .../strategy/cache-strategy/http-handler.go | 47 ------------------- resources/cache-no.go | 33 +++++++++++-- 6 files changed, 35 insertions(+), 86 deletions(-) delete mode 100644 drivers/strategy/cache-strategy/actuator-handle.go delete mode 100644 drivers/strategy/cache-strategy/http-handler.go diff --git a/drivers/resources/redis/controller.go b/drivers/resources/redis/controller.go index 7c9f64cb..ee2f11f1 100644 --- a/drivers/resources/redis/controller.go +++ b/drivers/resources/redis/controller.go @@ -51,7 +51,6 @@ func (m *Controller) Set(conf interface{}) (err error) { Username: m.config.Username, Password: m.config.Password, }) - if res, errPing := client.Ping(context.Background()).Result(); errPing != nil { log.Info("ping redis:", res, " error:", err) client.Close() diff --git a/drivers/strategy/cache-strategy/actuator-handle.go b/drivers/strategy/cache-strategy/actuator-handle.go deleted file mode 100644 index f2d668e1..00000000 --- a/drivers/strategy/cache-strategy/actuator-handle.go +++ /dev/null @@ -1,23 +0,0 @@ -package cache_strategy - -import ( - "github.com/eolinker/eosc/eocontext" -) - -type ActuatorsHandler interface { - Assert(ctx eocontext.EoContext) bool - Check(ctx eocontext.EoContext, handlers []*CacheValidTimeHandler) error -} - -var ( - actuatorsHandlers []ActuatorsHandler -) - -func RegisterActuator(handler ActuatorsHandler) { - - actuatorsHandlers = append(actuatorsHandlers, handler) -} -func getActuatorsHandlers() []ActuatorsHandler { - - return actuatorsHandlers -} diff --git a/drivers/strategy/cache-strategy/actuator.go b/drivers/strategy/cache-strategy/actuator.go index e8af4c5f..5122745c 100644 --- a/drivers/strategy/cache-strategy/actuator.go +++ b/drivers/strategy/cache-strategy/actuator.go @@ -21,7 +21,6 @@ func init() { actuator := newtActuator() actuatorSet = actuator strategy.AddStrategyHandler(actuator) - cache.NewCache() } type ActuatorSet interface { @@ -90,9 +89,6 @@ func (a *tActuator) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) err a.lock.RUnlock() for _, handler := range handlers { - if handler.stop { - continue - } if handler.filter.Check(httpCtx) { uri := httpCtx.Request().URI().RequestURI() diff --git a/drivers/strategy/cache-strategy/cache/cache.go b/drivers/strategy/cache-strategy/cache/cache.go index aae36073..b6ee8201 100644 --- a/drivers/strategy/cache-strategy/cache/cache.go +++ b/drivers/strategy/cache-strategy/cache/cache.go @@ -1,19 +1,20 @@ package cache import ( + "context" "encoding/json" "fmt" - "github.com/coocood/freecache" + "github.com/eolinker/apinto/resources" "github.com/eolinker/eosc/eocontext" http_service "github.com/eolinker/eosc/eocontext/http-context" "net/http" "time" ) -var freeCache *freecache.Cache +var iCache resources.ICache -func NewCache() { - freeCache = freecache.NewCache(0) +func init() { + iCache = resources.NewCacher() } type ResponseData struct { @@ -47,11 +48,11 @@ func (r *ResponseData) Complete(ctx eocontext.EoContext) error { func SetResponseData(uri string, data *ResponseData, validTime int) { bytes, _ := json.Marshal(data) - _ = freeCache.Set([]byte(uri), bytes, validTime) + _ = iCache.Set(context.TODO(), uri, bytes, time.Second*time.Duration(validTime)) } func GetResponseData(uri string) *ResponseData { - bytes, _ := freeCache.Get([]byte(uri)) + bytes, _ := iCache.Get(context.TODO(), uri) data := new(ResponseData) if err := json.Unmarshal(bytes, data); err != nil { return nil diff --git a/drivers/strategy/cache-strategy/http-handler.go b/drivers/strategy/cache-strategy/http-handler.go deleted file mode 100644 index 4dbb52d3..00000000 --- a/drivers/strategy/cache-strategy/http-handler.go +++ /dev/null @@ -1,47 +0,0 @@ -package cache_strategy - -import ( - "github.com/eolinker/eosc/eocontext" - http_service "github.com/eolinker/eosc/eocontext/http-context" -) - -func init() { - RegisterActuator(newActuator()) -} - -type actuatorHttp struct { -} - -func newActuator() *actuatorHttp { - return &actuatorHttp{} -} - -func (hd *actuatorHttp) Assert(ctx eocontext.EoContext) bool { - _, err := http_service.Assert(ctx) - if err != nil { - return false - } - return true -} - -func (hd *actuatorHttp) Check(ctx eocontext.EoContext, handlers []*CacheValidTimeHandler) error { - //httpCtx, err := http_service.Assert(ctx) - //if err != nil { - // return err - //} - - return nil -} - -type Set map[string]struct{} - -func newSet(l int) Set { - return make(Set, l) -} -func (s Set) Has(key string) bool { - _, has := s[key] - return has -} -func (s Set) Add(key string) { - s[key] = struct{}{} -} diff --git a/resources/cache-no.go b/resources/cache-no.go index ba9adbc6..d7225e0b 100644 --- a/resources/cache-no.go +++ b/resources/cache-no.go @@ -2,18 +2,27 @@ package resources import ( "context" + "github.com/coocood/freecache" "time" ) type NoCache struct { + client *freecache.Cache } func (n *NoCache) Set(ctx context.Context, key string, value []byte, expiration time.Duration) error { - return ErrorNoCache + + return n.client.Set([]byte(key), value, int(expiration.Seconds())) } func (n *NoCache) SetNX(ctx context.Context, key string, value []byte, expiration time.Duration) (bool, error) { - return false, ErrorNoCache + + _, err := n.client.GetOrSet([]byte(key), value, int(expiration.Seconds())) + if err != nil { + return false, err + } + + return true, nil } func (n *NoCache) DecrBy(ctx context.Context, key string, decrement int64) (int64, error) { @@ -26,14 +35,28 @@ func (n *NoCache) IncrBy(ctx context.Context, key string, decrement int64) (int6 } func (n *NoCache) Get(ctx context.Context, key string) ([]byte, error) { - return nil, ErrorNoCache + return n.client.Get([]byte(key)) } func (n *NoCache) GetDel(ctx context.Context, key string) ([]byte, error) { - return nil, ErrorNoCache + bytes, err := n.client.Get([]byte(key)) + if err != nil { + return nil, err + } + n.client.Del([]byte(key)) + return bytes, nil } func (n *NoCache) Del(ctx context.Context, keys ...string) (int64, error) { - return 0, ErrorNoCache + + for _, key := range keys { + n.client.Del([]byte(key)) + } + + return 1, nil +} + +func NewCacher() *NoCache { + return &NoCache{client: freecache.NewCache(0)} } From 1931c6cdb33703e2e6717891bea8a1849eb8eb3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=AD=9F=E6=9F=B1?= Date: Thu, 13 Oct 2022 14:48:42 +0800 Subject: [PATCH 13/19] =?UTF-8?q?=E7=BC=93=E5=AD=98=E5=AF=B9=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- drivers/resources/redis/redis.go | 14 ++++++++++++++ drivers/strategy/cache-strategy/cache/cache.go | 10 ++-------- resources/cache-no.go | 9 +++++++++ resources/cache.go | 6 +++++- 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/drivers/resources/redis/redis.go b/drivers/resources/redis/redis.go index afea93cf..46817fab 100644 --- a/drivers/resources/redis/redis.go +++ b/drivers/resources/redis/redis.go @@ -2,14 +2,28 @@ package redis import ( "context" + "github.com/eolinker/apinto/resources" "github.com/go-redis/redis/v8" "time" ) +var ( + _ resources.ICache = (*_Cacher)(nil) +) + type _Cacher struct { client *redis.ClusterClient } +func (r *_Cacher) Close() error { + if r.client == nil { + e := r.client.Close() + r.client = nil + return e + } + return nil +} + func (r *_Cacher) Set(ctx context.Context, key string, value []byte, expiration time.Duration) error { return r.client.Set(ctx, key, value, expiration).Err() diff --git a/drivers/strategy/cache-strategy/cache/cache.go b/drivers/strategy/cache-strategy/cache/cache.go index b6ee8201..24e0e827 100644 --- a/drivers/strategy/cache-strategy/cache/cache.go +++ b/drivers/strategy/cache-strategy/cache/cache.go @@ -11,12 +11,6 @@ import ( "time" ) -var iCache resources.ICache - -func init() { - iCache = resources.NewCacher() -} - type ResponseData struct { Header http.Header Body []byte @@ -48,11 +42,11 @@ func (r *ResponseData) Complete(ctx eocontext.EoContext) error { func SetResponseData(uri string, data *ResponseData, validTime int) { bytes, _ := json.Marshal(data) - _ = iCache.Set(context.TODO(), uri, bytes, time.Second*time.Duration(validTime)) + _ = resources.Cacher().Set(context.TODO(), uri, bytes, time.Second*time.Duration(validTime)) } func GetResponseData(uri string) *ResponseData { - bytes, _ := iCache.Get(context.TODO(), uri) + bytes, _ := resources.Cacher().Get(context.TODO(), uri) data := new(ResponseData) if err := json.Unmarshal(bytes, data); err != nil { return nil diff --git a/resources/cache-no.go b/resources/cache-no.go index d7225e0b..529b21d1 100644 --- a/resources/cache-no.go +++ b/resources/cache-no.go @@ -6,10 +6,19 @@ import ( "time" ) +var ( + _ ICache = (*NoCache)(nil) +) + type NoCache struct { client *freecache.Cache } +func (n *NoCache) Close() error { + n.client.Clear() + return nil +} + func (n *NoCache) Set(ctx context.Context, key string, value []byte, expiration time.Duration) error { return n.client.Set([]byte(key), value, int(expiration.Seconds())) diff --git a/resources/cache.go b/resources/cache.go index 3f8c3083..2cdb01c3 100644 --- a/resources/cache.go +++ b/resources/cache.go @@ -19,7 +19,10 @@ func init() { } func ReplaceCacher(caches ...ICache) { if len(caches) < 1 || caches[0] == nil { - singCacheProxy.ICache = new(NoCache) + if singCacheProxy.ICache != nil { + singCacheProxy.ICache.Close() + } + singCacheProxy.ICache = NewCacher() return } singCacheProxy.ICache = caches[0] @@ -37,6 +40,7 @@ type ICache interface { Get(ctx context.Context, key string) ([]byte, error) GetDel(ctx context.Context, key string) ([]byte, error) Del(ctx context.Context, keys ...string) (int64, error) + Close() error } type _Proxy struct { From d509c09b65479d073c1d5c826cf52b5270b7b677 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=AD=9F=E6=9F=B1?= Date: Thu, 13 Oct 2022 17:34:26 +0800 Subject: [PATCH 14/19] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=86=85=E5=AD=98?= =?UTF-8?q?=E7=BC=93=E5=AD=98=20setNX=E7=9A=84bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- resources/cache-no.go | 5 ++--- resources/cache-no_test.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 3 deletions(-) create mode 100644 resources/cache-no_test.go diff --git a/resources/cache-no.go b/resources/cache-no.go index 529b21d1..1336a3f8 100644 --- a/resources/cache-no.go +++ b/resources/cache-no.go @@ -26,12 +26,11 @@ func (n *NoCache) Set(ctx context.Context, key string, value []byte, expiration func (n *NoCache) SetNX(ctx context.Context, key string, value []byte, expiration time.Duration) (bool, error) { - _, err := n.client.GetOrSet([]byte(key), value, int(expiration.Seconds())) + old, err := n.client.GetOrSet([]byte(key), value, int(expiration.Seconds())) if err != nil { return false, err } - - return true, nil + return old == nil, nil } func (n *NoCache) DecrBy(ctx context.Context, key string, decrement int64) (int64, error) { diff --git a/resources/cache-no_test.go b/resources/cache-no_test.go new file mode 100644 index 00000000..6bd39e7b --- /dev/null +++ b/resources/cache-no_test.go @@ -0,0 +1,29 @@ +package resources + +import ( + "fmt" + "github.com/coocood/freecache" +) + +func ExampleFreeCache() { + key := []byte("test") + value := []byte("value") + + client := freecache.NewCache(0) + set, err := client.GetOrSet(key, value, 100) + if err != nil { + fmt.Println(err) + return + } + fmt.Printf("value1=%s\n", string(set)) + set, err = client.GetOrSet(key, value, 100) + if err != nil { + fmt.Println(err) + return + } + fmt.Printf("value=%s\n", string(set)) + // output: + //value1= + //value=value + +} From 1613dd335c5cd3be2cf6f748b8483dfa0491d40d Mon Sep 17 00:00:00 2001 From: zhangzeyi Date: Thu, 13 Oct 2022 18:10:25 +0800 Subject: [PATCH 15/19] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E7=81=B0=E5=BA=A6?= =?UTF-8?q?=E7=AD=96=E7=95=A5=20=E4=BB=A5=E5=8F=8A=E8=BF=87=E6=BB=A4?= =?UTF-8?q?=E8=A7=84=E5=88=99=E6=96=B0=E5=A2=9Eip=E8=BF=87=E6=BB=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/apinto/worker.go | 2 + drivers/router/http-router/handler.go | 11 +- drivers/strategy/grey-strategy/actuator.go | 163 ++++++++++++++++++ drivers/strategy/grey-strategy/config.go | 111 ++++++++++++ drivers/strategy/grey-strategy/controller.go | 85 +++++++++ drivers/strategy/grey-strategy/driver.go | 68 ++++++++ drivers/strategy/grey-strategy/factory.go | 45 +++++ drivers/strategy/grey-strategy/grey.go | 76 ++++++++ drivers/strategy/grey-strategy/handler.go | 161 +++++++++++++++++ drivers/strategy/grey-strategy/round-robin.go | 82 +++++++++ 10 files changed, 800 insertions(+), 4 deletions(-) create mode 100644 drivers/strategy/grey-strategy/actuator.go create mode 100644 drivers/strategy/grey-strategy/config.go create mode 100644 drivers/strategy/grey-strategy/controller.go create mode 100644 drivers/strategy/grey-strategy/driver.go create mode 100644 drivers/strategy/grey-strategy/factory.go create mode 100644 drivers/strategy/grey-strategy/grey.go create mode 100644 drivers/strategy/grey-strategy/handler.go create mode 100644 drivers/strategy/grey-strategy/round-robin.go diff --git a/app/apinto/worker.go b/app/apinto/worker.go index 9d060de3..b81c032f 100644 --- a/app/apinto/worker.go +++ b/app/apinto/worker.go @@ -29,6 +29,7 @@ import ( http_router "github.com/eolinker/apinto/drivers/router/http-router" service "github.com/eolinker/apinto/drivers/service" cache_strategy "github.com/eolinker/apinto/drivers/strategy/cache-strategy" + grey_strategy "github.com/eolinker/apinto/drivers/strategy/grey-strategy" limiting_strategy "github.com/eolinker/apinto/drivers/strategy/limiting-strategy" template "github.com/eolinker/apinto/drivers/template" @@ -92,4 +93,5 @@ func Register(extenderRegister eosc.IExtenderDriverRegister) { strategy.Register(extenderRegister) limiting_strategy.Register(extenderRegister) cache_strategy.Register(extenderRegister) + grey_strategy.Register(extenderRegister) } diff --git a/drivers/router/http-router/handler.go b/drivers/router/http-router/handler.go index 77c02d41..cba8e7f9 100644 --- a/drivers/router/http-router/handler.go +++ b/drivers/router/http-router/handler.go @@ -23,11 +23,13 @@ type Handler struct { } func (h *Handler) ServeHTTP(ctx eocontext.EoContext) { + + httpContext, err := http_context.Assert(ctx) + if err != nil { + return + } + if h.disable { - httpContext, err := http_context.Assert(ctx) - if err != nil { - return - } httpContext.Response().SetStatus(http.StatusNotFound, "") httpContext.Response().SetBody([]byte("router disable")) httpContext.FastFinish() @@ -36,6 +38,7 @@ func (h *Handler) ServeHTTP(ctx eocontext.EoContext) { //Set Label ctx.SetLabel("api", h.routerName) ctx.SetLabel("service", h.serviceName) + ctx.SetLabel("ip", httpContext.Request().ReadIP()) ctx.SetFinish(&h.finisher) ctx.SetCompleteHandler(h.completeHandler) ctx.SetApp(h.service) diff --git a/drivers/strategy/grey-strategy/actuator.go b/drivers/strategy/grey-strategy/actuator.go new file mode 100644 index 00000000..5deffcc0 --- /dev/null +++ b/drivers/strategy/grey-strategy/actuator.go @@ -0,0 +1,163 @@ +package grey_strategy + +import ( + "fmt" + "github.com/eolinker/apinto/strategy" + "github.com/eolinker/eosc/eocontext" + http_service "github.com/eolinker/eosc/eocontext/http-context" + "sort" + "sync" +) + +var ( + actuatorSet ActuatorSet +) + +const cookieName = "grey-cookie" + +func init() { + actuator := newtActuator() + actuatorSet = actuator + strategy.AddStrategyHandler(actuator) +} + +type ActuatorSet interface { + Set(string, *GreyHandler) + Del(id string) +} + +type tActuator struct { + lock sync.RWMutex + all map[string]*GreyHandler + handlers []*GreyHandler +} + +func (a *tActuator) Destroy() { + +} + +func (a *tActuator) Set(id string, val *GreyHandler) { + // 调用来源有锁 + a.all[id] = val + a.rebuild() + +} + +func (a *tActuator) Del(id string) { + // 调用来源有锁 + delete(a.all, id) + a.rebuild() +} + +func (a *tActuator) rebuild() { + + handlers := make([]*GreyHandler, 0, len(a.all)) + for _, h := range a.all { + if !h.stop { + handlers = append(handlers, h) + } + } + sort.Sort(handlerListSort(handlers)) + a.lock.Lock() + defer a.lock.Unlock() + a.handlers = handlers +} +func newtActuator() *tActuator { + return &tActuator{ + all: make(map[string]*GreyHandler), + } +} + +func (a *tActuator) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) error { + + httpCtx, err := http_service.Assert(ctx) + if err != nil { + return err + } + + a.lock.RLock() + handlers := a.handlers + a.lock.RUnlock() + + for _, handler := range handlers { + //check筛选条件 + if handler.filter.Check(httpCtx) { + ctx.SetBalance(newGreyBalanceHandler(ctx.GetBalance(), handler)) + break + } + } + + if next != nil { + return next.DoChain(ctx) + } + return nil +} + +type handlerListSort []*GreyHandler + +func (hs handlerListSort) Len() int { + return len(hs) +} + +func (hs handlerListSort) Less(i, j int) bool { + + return hs[i].priority < hs[j].priority +} + +func (hs handlerListSort) Swap(i, j int) { + hs[i], hs[j] = hs[j], hs[i] +} + +type GreyBalanceHandler struct { + orgHandler eocontext.BalanceHandler + greyHandler *GreyHandler +} + +func newGreyBalanceHandler(orgHandler eocontext.BalanceHandler, greyHandler *GreyHandler) *GreyBalanceHandler { + return &GreyBalanceHandler{orgHandler: orgHandler, greyHandler: greyHandler} +} + +func (g *GreyBalanceHandler) Select(ctx eocontext.EoContext) (eocontext.INode, error) { + httpCtx, err := http_service.Assert(ctx) + if err != nil { + return nil, err + } + + if g.greyHandler.rule.keepSession { + cookie := httpCtx.Request().Header().GetCookie(cookieName) + if cookie != "" { + return g.greyHandler.selectNodes(), nil + } + } + + if g.greyHandler.rule.distribution == percent { + + //round-robin算法判断是走灰度流量还是正常流量 + flow := g.greyHandler.rule.flowRobin.Select() + + if flow.GetId() == 1 { //灰度流量 + if g.greyHandler.rule.keepSession { + httpCtx.Response().Headers().Add("Set-Cookie", fmt.Sprintf("%s=%s", cookieName, cookieName)) + } + return g.greyHandler.selectNodes(), nil + } + + } else { + + //按匹配规则 + if !g.greyHandler.ruleFilter.Check(ctx) { + //匹配失败走正常节点 + return g.orgHandler.Select(ctx) + } + + //匹配成功 + if g.greyHandler.rule.keepSession { + httpCtx.Response().Headers().Add("Set-Cookie", fmt.Sprintf("%s=%s", cookieName, cookieName)) + } + + return g.greyHandler.selectNodes(), nil + } + + //走正常节点 + return g.orgHandler.Select(ctx) +} diff --git a/drivers/strategy/grey-strategy/config.go b/drivers/strategy/grey-strategy/config.go new file mode 100644 index 00000000..150244a4 --- /dev/null +++ b/drivers/strategy/grey-strategy/config.go @@ -0,0 +1,111 @@ +package grey_strategy + +import ( + "fmt" + "github.com/eolinker/apinto/strategy" + "github.com/eolinker/eosc/eocontext" + "strconv" + "strings" +) + +const ( + percent = "percent" + match = "match" +) + +type Config struct { + Name string `json:"name" skip:"skip"` + Description string `json:"description" skip:"skip"` + Stop bool `json:"stop"` + Priority int `json:"priority" label:"优先级" description:"1-999"` + Filters strategy.FilterConfig `json:"filters" label:"过滤规则"` + Rule *Rule `json:"rule" label:"灰度规则"` +} + +type Rule struct { + KeepSession bool `json:"keep_session"` + Nodes []string `json:"nodes"` // + Distribution string `json:"distribution"` // percent match + Percent int `json:"percent"` // 灰度的百分比 四位数 + Matching []*Matching `json:"matching"` +} + +type Matching struct { + Type string `json:"type" label:"类型" enum:"header,query,cookie"` + Name string `json:"name" label:"参数名"` + Value string `json:"value" label:"值规" ` +} + +func (r *Rule) GetNodes() []eocontext.INode { + nodes := make([]eocontext.INode, 0) + for _, node := range r.Nodes { + addrSlide := strings.Split(node, ":") + + ip := addrSlide[0] + port := 0 + if len(addrSlide) > 1 { + port, _ = strconv.Atoi(addrSlide[1]) + } + + nodes = append(nodes, newGreyNode(fmt.Sprintf("%s:%d", ip, port), ip, port)) + } + + return nodes +} + +type GreyNode struct { + labels eocontext.Attrs + id string + ip string + port int + status eocontext.NodeStatus +} + +func newGreyNode(id string, ip string, port int) *GreyNode { + return &GreyNode{labels: map[string]string{}, id: id, ip: ip, port: port, status: eocontext.Running} +} + +func (g *GreyNode) GetAttrs() eocontext.Attrs { + return g.labels +} + +func (g *GreyNode) GetAttrByName(name string) (string, bool) { + v, ok := g.labels[name] + return v, ok +} + +func (g *GreyNode) ID() string { + return g.id +} + +func (g *GreyNode) IP() string { + return g.ip +} + +func (g *GreyNode) Port() int { + return g.port + +} + +func (g *GreyNode) Addr() string { + if g.port == 0 { + return g.ip + } + return fmt.Sprintf("%s:%d", g.ip, g.port) +} + +func (g *GreyNode) Status() eocontext.NodeStatus { + return g.status +} + +func (g *GreyNode) Up() { + g.status = eocontext.Running +} + +func (g *GreyNode) Down() { + g.status = eocontext.Down +} + +func (g *GreyNode) Leave() { + g.status = eocontext.Leave +} diff --git a/drivers/strategy/grey-strategy/controller.go b/drivers/strategy/grey-strategy/controller.go new file mode 100644 index 00000000..b6e9e5a8 --- /dev/null +++ b/drivers/strategy/grey-strategy/controller.go @@ -0,0 +1,85 @@ +package grey_strategy + +import ( + "github.com/eolinker/eosc" + "reflect" +) + +var ( + controller = NewController() + _ eosc.ISetting = controller + _ IController = controller +) + +type IController interface { + Store(id string) + Del(id string) +} +type Controller struct { + profession string + driver string + all map[string]struct{} +} + +func (c *Controller) Store(id string) { + c.all[id] = struct{}{} +} + +func (c *Controller) Del(id string) { + delete(c.all, id) +} + +func (c *Controller) ConfigType() reflect.Type { + return configType +} + +func (c *Controller) Set(conf interface{}) (err error) { + return eosc.ErrorUnsupportedKind +} + +func (c *Controller) Get() interface{} { + return nil +} + +func (c *Controller) Mode() eosc.SettingMode { + return eosc.SettingModeBatch +} + +func (c *Controller) Check(cfg interface{}) (profession, name, driver, desc string, err error) { + conf, ok := cfg.(*Config) + if !ok { + err = eosc.ErrorConfigIsNil + return + } + if empty(conf.Name) { + err = eosc.ErrorConfigFieldUnknown + return + } + err = checkConfig(conf) + if err != nil { + return + } + return c.profession, conf.Name, c.driver, conf.Description, nil + +} +func empty(vs ...string) bool { + for _, v := range vs { + if len(v) == 0 { + return true + } + } + return false +} +func (c *Controller) AllWorkers() []string { + ws := make([]string, 0, len(c.all)) + for id := range c.all { + ws = append(ws, id) + } + return ws +} + +func NewController() *Controller { + return &Controller{ + all: map[string]struct{}{}, + } +} diff --git a/drivers/strategy/grey-strategy/driver.go b/drivers/strategy/grey-strategy/driver.go new file mode 100644 index 00000000..a889a80a --- /dev/null +++ b/drivers/strategy/grey-strategy/driver.go @@ -0,0 +1,68 @@ +package grey_strategy + +import ( + "fmt" + "github.com/eolinker/apinto/strategy" + "github.com/eolinker/eosc" + "reflect" + "strings" +) + +func checkConfig(conf *Config) error { + if conf.Priority > 999 || conf.Priority < 1 { + return fmt.Errorf("priority value %d not allow ", conf.Priority) + } + + if conf.Rule.Distribution == percent && (conf.Rule.Percent < 0 || conf.Rule.Percent > 10000) { + return fmt.Errorf("percent value %d not allow ", conf.Rule.Percent) + } + + //检查灰度节点是否正确 + for _, node := range conf.Rule.Nodes { + if strings.Count(node, "http") > 0 || strings.Count(node, "https") > 0 { + return fmt.Errorf("node value %s cannot be http or https ", node) + } + } + + _, err := strategy.ParseFilter(conf.Filters) + if err != nil { + return err + } + + return nil +} + +type driver struct { +} + +func (d *driver) Check(v interface{}, workers map[eosc.RequireId]eosc.IWorker) error { + cfg, ok := v.(*Config) + if !ok { + return eosc.ErrorConfigIsNil + } + + return checkConfig(cfg) +} + +func (d *driver) ConfigType() reflect.Type { + return configType +} + +func (d *driver) Create(id, name string, v interface{}, workers map[eosc.RequireId]eosc.IWorker) (eosc.IWorker, error) { + if err := d.Check(v, workers); err != nil { + return nil, err + } + + lg := &Grey{ + id: id, + name: name, + } + + err := lg.Reset(v, workers) + if err != nil { + return nil, err + } + + controller.Store(id) + return lg, nil +} diff --git a/drivers/strategy/grey-strategy/factory.go b/drivers/strategy/grey-strategy/factory.go new file mode 100644 index 00000000..094f692e --- /dev/null +++ b/drivers/strategy/grey-strategy/factory.go @@ -0,0 +1,45 @@ +package grey_strategy + +import ( + "github.com/eolinker/eosc" + "github.com/eolinker/eosc/setting" + "github.com/eolinker/eosc/utils/schema" + "reflect" +) + +const Name = "strategy-grey" + +var ( + configType = reflect.TypeOf((*Config)(nil)) +) + +//Register 注册http路由驱动工厂 +func Register(register eosc.IExtenderDriverRegister) { + + register.RegisterExtenderDriver(Name, newFactory()) + setting.RegisterSetting("strategies-grey", controller) +} + +type factory struct { + render interface{} +} + +func newFactory() *factory { + render, err := schema.Generate(configType, nil) + if err != nil { + panic(err) + } + return &factory{ + render: render, + } +} + +func (f *factory) Render() interface{} { + return f.render +} + +func (f *factory) Create(profession string, name string, label string, desc string, params map[string]interface{}) (eosc.IExtenderDriver, error) { + controller.driver = name + controller.profession = profession + return &driver{}, nil +} diff --git a/drivers/strategy/grey-strategy/grey.go b/drivers/strategy/grey-strategy/grey.go new file mode 100644 index 00000000..2edc711e --- /dev/null +++ b/drivers/strategy/grey-strategy/grey.go @@ -0,0 +1,76 @@ +package grey_strategy + +import ( + "fmt" + "github.com/eolinker/eosc" + "reflect" +) + +var ( + _ eosc.IWorker = (*Grey)(nil) + _ eosc.IWorkerDestroy = (*Grey)(nil) +) + +type Grey struct { + id string + name string + handler *GreyHandler + config *Config + isRunning int +} + +func (l *Grey) Destroy() error { + controller.Del(l.id) + return nil +} + +func (l *Grey) Id() string { + return l.id +} + +func (l *Grey) Start() error { + if l.isRunning == 0 { + l.isRunning = 1 + actuatorSet.Set(l.id, l.handler) + } + + return nil +} + +func (l *Grey) Reset(v interface{}, workers map[eosc.RequireId]eosc.IWorker) error { + conf, ok := v.(*Config) + if !ok { + return eosc.ErrorConfigIsNil + } + if conf.Priority > 999 || conf.Priority < 1 { + return fmt.Errorf("priority value %d not allow ", conf.Priority) + } + + confCore := conf + if reflect.DeepEqual(l.config, confCore) { + return nil + } + handler, err := NewGreyHandler(confCore) + if err != nil { + return err + } + l.config = confCore + l.handler = handler + if l.isRunning != 0 { + actuatorSet.Set(l.id, l.handler) + } + return nil +} + +func (l *Grey) Stop() error { + if l.isRunning != 0 { + l.isRunning = 0 + actuatorSet.Del(l.id) + } + + return nil +} + +func (l *Grey) CheckSkill(skill string) bool { + return false +} diff --git a/drivers/strategy/grey-strategy/handler.go b/drivers/strategy/grey-strategy/handler.go new file mode 100644 index 00000000..74620387 --- /dev/null +++ b/drivers/strategy/grey-strategy/handler.go @@ -0,0 +1,161 @@ +package grey_strategy + +import ( + "github.com/eolinker/apinto/checker" + "github.com/eolinker/apinto/strategy" + "github.com/eolinker/eosc/eocontext" + http_service "github.com/eolinker/eosc/eocontext/http-context" + "sync" +) + +type GreyHandler struct { + name string + filter strategy.IFilter + priority int + stop bool + rule *ruleHandler + ruleFilter strategy.IFilter +} + +type ruleHandler struct { + selectNodeLock *sync.Mutex + index int + keepSession bool + nodes []eocontext.INode + distribution string + flowRobin *Robin + matching []*matchingHandler +} + +type matchingHandler struct { + Type string + name string + value string + checker checker.Checker +} + +type matchingHandlerFilters []*matchingHandler + +func (m matchingHandlerFilters) Check(ctx eocontext.EoContext) bool { + for _, handler := range m { + if !handler.Check(ctx) { + return false + } + } + return true +} + +func (m *matchingHandler) Check(ctx eocontext.EoContext) bool { + httpCtx, err := http_service.Assert(ctx) + if err != nil { + return false + } + + value := "" + request := httpCtx.Request() + switch m.Type { + case "header": + value = request.Header().GetHeader(m.name) + case "query": + value = request.URI().GetQuery(m.name) + case "cookie": + value = request.Header().GetCookie(m.name) + default: + return false + } + + return m.checker.Check(value, true) +} + +type flowHandler struct { + id int //1为灰度流量 2为正常流量 + weight int +} + +func (f *flowHandler) GetId() uint32 { + return uint32(f.id) +} + +func (f *flowHandler) GetWeight() int { + return f.weight +} + +// ABCABCABCABC 轮询从nodes中拿一个节点信息 +func (g *GreyHandler) selectNodes() eocontext.INode { + if len(g.rule.nodes) == 1 { + return g.rule.nodes[0] + } + g.rule.selectNodeLock.Lock() + defer g.rule.selectNodeLock.Unlock() + + var node eocontext.INode + if g.rule.index == len(g.rule.nodes)-1 { + node = g.rule.nodes[g.rule.index] + g.rule.index = 0 + } else { + node = g.rule.nodes[g.rule.index] + g.rule.index++ + } + + return node +} + +func NewGreyHandler(conf *Config) (*GreyHandler, error) { + filter, err := strategy.ParseFilter(conf.Filters) + if err != nil { + return nil, err + } + + rule := &ruleHandler{ + selectNodeLock: &sync.Mutex{}, + keepSession: conf.Rule.KeepSession, + nodes: conf.Rule.GetNodes(), + distribution: conf.Rule.Distribution, + } + + matchHandlers := make([]*matchingHandler, 0) + ruleFilter := make(matchingHandlerFilters, 0) + for _, matching := range conf.Rule.Matching { + + check, err := checker.Parse(matching.Value) + if err != nil { + return nil, err + } + + matchingHandlerVal := &matchingHandler{ + Type: matching.Type, + name: matching.Name, + value: matching.Value, + checker: check, + } + + matchHandlers = append(matchHandlers, matchingHandlerVal) + ruleFilter = append(ruleFilter, matchingHandlerVal) + } + rule.matching = matchHandlers + + //robin算法所需要的数据 + if conf.Rule.Distribution == percent { + + greyFlow := &flowHandler{ + id: 1, + weight: conf.Rule.Percent, + } + normalFlow := &flowHandler{ + id: 2, + weight: 10000 - greyFlow.weight, + } + + //总权重10000 + rule.flowRobin = NewRobin(greyFlow, normalFlow) + } + + return &GreyHandler{ + name: conf.Name, + filter: filter, + priority: conf.Priority, + stop: conf.Stop, + rule: rule, + ruleFilter: ruleFilter, + }, nil +} diff --git a/drivers/strategy/grey-strategy/round-robin.go b/drivers/strategy/grey-strategy/round-robin.go new file mode 100644 index 00000000..83c2984b --- /dev/null +++ b/drivers/strategy/grey-strategy/round-robin.go @@ -0,0 +1,82 @@ +package grey_strategy + +type Weighted interface { + GetId() uint32 + GetWeight() int +} + +// NewRobin 初始化一个池子 +func NewRobin(servers ...Weighted) *Robin { + newRobin := &Robin{} + newRobin.updateServers(servers) + return newRobin +} + +type Training struct { + Server Weighted + Weight int //初始化设置权重值 + CurrentWeight int //目前的权重值 +} + +type Robin struct { + Weighted []Weighted + Training []*Training +} + +func (l *Robin) updateServers(servers []Weighted) { + weighted := make([]*Training, 0) + for _, v := range servers { + w := &Training{ + Server: v, + Weight: v.GetWeight(), + CurrentWeight: 0, + } + weighted = append(weighted, w) + } + l.Training = weighted + l.Weighted = servers +} + +// Select remove为需要屏蔽的ID, +func (l *Robin) Select(remove ...uint) Weighted { + if len(l.Training) == 0 { + return nil + } + w := l.nextWeighted(remove) + if w == nil { + return nil + } + return w.Server +} +func (l *Robin) nextWeighted(remove []uint) (best *Training) { + total := 0 + for i := 0; i < len(l.Training); i++ { + w := l.Training[i] + if w == nil { + continue + } + isFind := false + for _, v := range remove { + if v == uint(w.Server.GetId()) { + isFind = true + } + } + if isFind { + continue + } + //每次都加原始的权重值 + w.CurrentWeight += w.Weight + //所有权重之和 + total += w.Weight + //判断当前最大的权重。不管有没有最大 先取第一个、然后依次对比、取出最大 + if best == nil || w.CurrentWeight > best.CurrentWeight { + best = w + } + } + if best == nil { + return best + } + //抽出后-最大权重值 + best.CurrentWeight -= total + return best +} From be7f38b0cc2c487fb98bf07cc71a49118e7b8a50 Mon Sep 17 00:00:00 2001 From: zhangzeyi Date: Thu, 13 Oct 2022 18:11:02 +0800 Subject: [PATCH 16/19] =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E5=8C=96=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- drivers/plugins/ip-restriction/restriction.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/drivers/plugins/ip-restriction/restriction.go b/drivers/plugins/ip-restriction/restriction.go index 46e0da5d..20fe083a 100644 --- a/drivers/plugins/ip-restriction/restriction.go +++ b/drivers/plugins/ip-restriction/restriction.go @@ -41,12 +41,12 @@ func (I *IPHandler) Start() error { } func (I *IPHandler) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWorker) error { -confObj, err := I.check(conf) -if err != nil { -return err -} -I.filter = confObj.genFilter() -return nil + confObj, err := I.check(conf) + if err != nil { + return err + } + I.filter = confObj.genFilter() + return nil } func (I *IPHandler) Stop() error { From bb3b886d20ac5ee6f4fc94b07571690afb44ef6e Mon Sep 17 00:00:00 2001 From: zhangzeyi Date: Fri, 14 Oct 2022 14:25:13 +0800 Subject: [PATCH 17/19] =?UTF-8?q?checkConfig=E6=96=B0=E5=A2=9E=E8=A7=84?= =?UTF-8?q?=E5=88=99=E6=A0=A1=E9=AA=8C=20drivers=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E7=BC=93=E5=AD=98=E7=AD=96=E7=95=A5=E5=92=8C=E7=81=B0=E5=BA=A6?= =?UTF-8?q?=E7=AD=96=E7=95=A5=20=E4=BC=98=E5=8C=96=E7=81=B0=E5=BA=A6?= =?UTF-8?q?=E7=AD=96=E7=95=A5=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/apinto/profession.go | 12 +++ drivers/strategy/grey-strategy/actuator.go | 43 +++------- drivers/strategy/grey-strategy/config.go | 2 + drivers/strategy/grey-strategy/driver.go | 5 ++ drivers/strategy/grey-strategy/handler.go | 92 +++++++++++++--------- 5 files changed, 84 insertions(+), 70 deletions(-) diff --git a/app/apinto/profession.go b/app/apinto/profession.go index c8586f67..5187168a 100644 --- a/app/apinto/profession.go +++ b/app/apinto/profession.go @@ -123,6 +123,18 @@ func ApintoProfession() []*eosc.ProfessionConfig { Label: "限流策略", Desc: "限流策略", }, + { + Id: "eolinker.com:apinto:strategy-cache", + Name: "cache", + Label: "缓存策略", + Desc: "缓存策略", + }, + { + Id: "eolinker.com:apinto:strategy-grey", + Name: "grey", + Label: "灰度策略", + Desc: "灰度策略", + }, }, Mod: eosc.ProfessionConfig_Worker, }, diff --git a/drivers/strategy/grey-strategy/actuator.go b/drivers/strategy/grey-strategy/actuator.go index 5deffcc0..d4d6195c 100644 --- a/drivers/strategy/grey-strategy/actuator.go +++ b/drivers/strategy/grey-strategy/actuator.go @@ -13,7 +13,7 @@ var ( actuatorSet ActuatorSet ) -const cookieName = "grey-cookie" +const cookieName = "grey-cookie-%s" func init() { actuator := newtActuator() @@ -123,41 +123,22 @@ func (g *GreyBalanceHandler) Select(ctx eocontext.EoContext) (eocontext.INode, e return nil, err } - if g.greyHandler.rule.keepSession { - cookie := httpCtx.Request().Header().GetCookie(cookieName) - if cookie != "" { - return g.greyHandler.selectNodes(), nil - } - } + cookieKey := fmt.Sprintf(cookieName, g.greyHandler.name) - if g.greyHandler.rule.distribution == percent { - - //round-robin算法判断是走灰度流量还是正常流量 - flow := g.greyHandler.rule.flowRobin.Select() - - if flow.GetId() == 1 { //灰度流量 - if g.greyHandler.rule.keepSession { - httpCtx.Response().Headers().Add("Set-Cookie", fmt.Sprintf("%s=%s", cookieName, cookieName)) - } + if g.greyHandler.rule.keepSession { + cookie := httpCtx.Request().Header().GetCookie(cookieKey) + if cookie == grey { return g.greyHandler.selectNodes(), nil - } - - } else { - - //按匹配规则 - if !g.greyHandler.ruleFilter.Check(ctx) { - //匹配失败走正常节点 + } else if cookie == normal { return g.orgHandler.Select(ctx) } + } - //匹配成功 - if g.greyHandler.rule.keepSession { - httpCtx.Response().Headers().Add("Set-Cookie", fmt.Sprintf("%s=%s", cookieName, cookieName)) - } - + if g.greyHandler.rule.greyMatch.Match(ctx) { //灰度 + httpCtx.Response().Headers().Add("Set-Cookie", fmt.Sprintf("%s=%v", cookieKey, grey)) return g.greyHandler.selectNodes(), nil + } else { + httpCtx.Response().Headers().Add("Set-Cookie", fmt.Sprintf("%s=%v", cookieKey, normal)) + return g.orgHandler.Select(ctx) } - - //走正常节点 - return g.orgHandler.Select(ctx) } diff --git a/drivers/strategy/grey-strategy/config.go b/drivers/strategy/grey-strategy/config.go index 150244a4..9ce37b58 100644 --- a/drivers/strategy/grey-strategy/config.go +++ b/drivers/strategy/grey-strategy/config.go @@ -11,6 +11,8 @@ import ( const ( percent = "percent" match = "match" + grey = "grey" + normal = "normal" ) type Config struct { diff --git a/drivers/strategy/grey-strategy/driver.go b/drivers/strategy/grey-strategy/driver.go index a889a80a..99d634af 100644 --- a/drivers/strategy/grey-strategy/driver.go +++ b/drivers/strategy/grey-strategy/driver.go @@ -15,8 +15,13 @@ func checkConfig(conf *Config) error { if conf.Rule.Distribution == percent && (conf.Rule.Percent < 0 || conf.Rule.Percent > 10000) { return fmt.Errorf("percent value %d not allow ", conf.Rule.Percent) + } else if conf.Rule.Distribution == match && len(conf.Rule.Matching) == 0 { + return fmt.Errorf("matching rule len is 0 ") } + if len(conf.Rule.Nodes) == 0 { + return fmt.Errorf("nodes len is 0 ") + } //检查灰度节点是否正确 for _, node := range conf.Rule.Nodes { if strings.Count(node, "http") > 0 || strings.Count(node, "https") > 0 { diff --git a/drivers/strategy/grey-strategy/handler.go b/drivers/strategy/grey-strategy/handler.go index 74620387..ac88676b 100644 --- a/drivers/strategy/grey-strategy/handler.go +++ b/drivers/strategy/grey-strategy/handler.go @@ -9,12 +9,15 @@ import ( ) type GreyHandler struct { - name string - filter strategy.IFilter - priority int - stop bool - rule *ruleHandler - ruleFilter strategy.IFilter + name string + filter strategy.IFilter + priority int + stop bool + rule *ruleHandler +} + +type greyMatch interface { + Match(ctx eocontext.EoContext) bool } type ruleHandler struct { @@ -23,8 +26,24 @@ type ruleHandler struct { keepSession bool nodes []eocontext.INode distribution string - flowRobin *Robin - matching []*matchingHandler + greyMatch greyMatch +} + +type ruleGreyFlow struct { + flowRobin *Robin +} + +type ruleGreyMatch struct { + ruleFilter strategy.IFilter +} + +func (r *ruleGreyFlow) Match(ctx eocontext.EoContext) bool { + flow := r.flowRobin.Select() + return flow.GetId() == 1 +} + +func (r *ruleGreyMatch) Match(ctx eocontext.EoContext) bool { + return r.ruleFilter.Check(ctx) } type matchingHandler struct { @@ -113,30 +132,7 @@ func NewGreyHandler(conf *Config) (*GreyHandler, error) { distribution: conf.Rule.Distribution, } - matchHandlers := make([]*matchingHandler, 0) - ruleFilter := make(matchingHandlerFilters, 0) - for _, matching := range conf.Rule.Matching { - - check, err := checker.Parse(matching.Value) - if err != nil { - return nil, err - } - - matchingHandlerVal := &matchingHandler{ - Type: matching.Type, - name: matching.Name, - value: matching.Value, - checker: check, - } - - matchHandlers = append(matchHandlers, matchingHandlerVal) - ruleFilter = append(ruleFilter, matchingHandlerVal) - } - rule.matching = matchHandlers - - //robin算法所需要的数据 if conf.Rule.Distribution == percent { - greyFlow := &flowHandler{ id: 1, weight: conf.Rule.Percent, @@ -145,17 +141,35 @@ func NewGreyHandler(conf *Config) (*GreyHandler, error) { id: 2, weight: 10000 - greyFlow.weight, } - //总权重10000 - rule.flowRobin = NewRobin(greyFlow, normalFlow) + rule.greyMatch = &ruleGreyFlow{flowRobin: NewRobin(greyFlow, normalFlow)} + } else { + ruleFilter := make(matchingHandlerFilters, 0) + for _, matching := range conf.Rule.Matching { + + check, err := checker.Parse(matching.Value) + if err != nil { + return nil, err + } + + matchingHandlerVal := &matchingHandler{ + Type: matching.Type, + name: matching.Name, + value: matching.Value, + checker: check, + } + + ruleFilter = append(ruleFilter, matchingHandlerVal) + } + + rule.greyMatch = &ruleGreyMatch{ruleFilter: ruleFilter} } return &GreyHandler{ - name: conf.Name, - filter: filter, - priority: conf.Priority, - stop: conf.Stop, - rule: rule, - ruleFilter: ruleFilter, + name: conf.Name, + filter: filter, + priority: conf.Priority, + stop: conf.Stop, + rule: rule, }, nil } From 4c633df650635a73c94c49e9702b57a0d60034de Mon Sep 17 00:00:00 2001 From: chenjiekun Date: Fri, 14 Oct 2022 17:23:05 +0800 Subject: [PATCH 18/19] fix consul --- drivers/discovery/consul/clients.go | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/drivers/discovery/consul/clients.go b/drivers/discovery/consul/clients.go index 9db9a4d9..1057b545 100644 --- a/drivers/discovery/consul/clients.go +++ b/drivers/discovery/consul/clients.go @@ -1,7 +1,6 @@ package consul import ( - "strconv" "strings" "github.com/eolinker/apinto/discovery" @@ -75,19 +74,7 @@ func getNodesFromClient(client *api.Client, service string) []discovery.INode { nodes := make([]discovery.INode, 0, len(serviceEntryArr)) for _, serviceEntry := range serviceEntryArr { - nodeAddr := serviceEntry.Node.Address - addrSlide := append(strings.Split(nodeAddr, ":")) - ip := addrSlide[0] - var port int - if len(addrSlide) > 1 { - port, err = strconv.Atoi(addrSlide[1]) - if err != nil { - log.Error(err) - continue - } - } - - newNode := discovery.NewNode(serviceEntry.Service.Meta, serviceEntry.Node.ID, ip, port) + newNode := discovery.NewNode(serviceEntry.Service.Meta, serviceEntry.Node.ID, serviceEntry.Service.Address, serviceEntry.Service.Port) nodes = append(nodes, newNode) } From c9602cef515b30d0822bd39494b1bab9726a9f2f Mon Sep 17 00:00:00 2001 From: Liujian <824010343@qq.com> Date: Tue, 18 Oct 2022 19:28:17 +0800 Subject: [PATCH 19/19] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dak/sk=E3=80=81jwt?= =?UTF-8?q?=E6=BC=8F=E6=B4=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/auth/aksk/factory.go | 2 +- application/auth/jwt/jwt.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/application/auth/aksk/factory.go b/application/auth/aksk/factory.go index ae77fab6..334a4b39 100644 --- a/application/auth/aksk/factory.go +++ b/application/auth/aksk/factory.go @@ -59,7 +59,7 @@ func NewFactory() auth.IAuthFactory { typ := reflect.TypeOf((*Config)(nil)) render, _ := schema.Generate(typ, nil) - return &factory{configType: typ, render: render} + return &factory{configType: typ, render: render, userType: reflect.TypeOf((*User)(nil))} } func toId(tokenName, position string) string { diff --git a/application/auth/jwt/jwt.go b/application/auth/jwt/jwt.go index fde4e7ea..9d1c00e8 100644 --- a/application/auth/jwt/jwt.go +++ b/application/auth/jwt/jwt.go @@ -63,6 +63,7 @@ func (j *jwt) Set(app application.IApp, users []application.ITransformConfig) { HideCredential: v.HideCredential, TokenName: j.tokenName, Position: j.position, + App: app, }) } j.users.Set(app.Id(), infos)