diff --git a/app/apinto/profession.go b/app/apinto/profession.go index c8586f67..5187168a 100644 --- a/app/apinto/profession.go +++ b/app/apinto/profession.go @@ -123,6 +123,18 @@ func ApintoProfession() []*eosc.ProfessionConfig { Label: "限流策略", Desc: "限流策略", }, + { + Id: "eolinker.com:apinto:strategy-cache", + Name: "cache", + Label: "缓存策略", + Desc: "缓存策略", + }, + { + Id: "eolinker.com:apinto:strategy-grey", + Name: "grey", + Label: "灰度策略", + Desc: "灰度策略", + }, }, Mod: eosc.ProfessionConfig_Worker, }, diff --git a/app/apinto/worker.go b/app/apinto/worker.go index 7945a49b..b81c032f 100644 --- a/app/apinto/worker.go +++ b/app/apinto/worker.go @@ -28,7 +28,9 @@ import ( "github.com/eolinker/apinto/drivers/plugins/strategy" http_router "github.com/eolinker/apinto/drivers/router/http-router" service "github.com/eolinker/apinto/drivers/service" - limiting_stragety "github.com/eolinker/apinto/drivers/strategy/limiting-stragety" + cache_strategy "github.com/eolinker/apinto/drivers/strategy/cache-strategy" + grey_strategy "github.com/eolinker/apinto/drivers/strategy/grey-strategy" + limiting_strategy "github.com/eolinker/apinto/drivers/strategy/limiting-strategy" template "github.com/eolinker/apinto/drivers/template" "github.com/eolinker/eosc" @@ -89,5 +91,7 @@ func Register(extenderRegister eosc.IExtenderDriverRegister) { proxy_rewriteV2.Register(extenderRegister) strategy.Register(extenderRegister) - limiting_stragety.Register(extenderRegister) + limiting_strategy.Register(extenderRegister) + cache_strategy.Register(extenderRegister) + grey_strategy.Register(extenderRegister) } diff --git a/application/auth/aksk/factory.go b/application/auth/aksk/factory.go index ae77fab6..334a4b39 100644 --- a/application/auth/aksk/factory.go +++ b/application/auth/aksk/factory.go @@ -59,7 +59,7 @@ func NewFactory() auth.IAuthFactory { typ := reflect.TypeOf((*Config)(nil)) render, _ := schema.Generate(typ, nil) - return &factory{configType: typ, render: render} + return &factory{configType: typ, render: render, userType: reflect.TypeOf((*User)(nil))} } func toId(tokenName, position string) string { diff --git a/application/auth/jwt/jwt.go b/application/auth/jwt/jwt.go index fde4e7ea..9d1c00e8 100644 --- a/application/auth/jwt/jwt.go +++ b/application/auth/jwt/jwt.go @@ -63,6 +63,7 @@ func (j *jwt) Set(app application.IApp, users []application.ITransformConfig) { HideCredential: v.HideCredential, TokenName: j.tokenName, Position: j.position, + App: app, }) } j.users.Set(app.Id(), infos) diff --git a/drivers/discovery/consul/clients.go b/drivers/discovery/consul/clients.go index 9db9a4d9..1057b545 100644 --- a/drivers/discovery/consul/clients.go +++ b/drivers/discovery/consul/clients.go @@ -1,7 +1,6 @@ package consul import ( - "strconv" "strings" "github.com/eolinker/apinto/discovery" @@ -75,19 +74,7 @@ func getNodesFromClient(client *api.Client, service string) []discovery.INode { nodes := make([]discovery.INode, 0, len(serviceEntryArr)) for _, serviceEntry := range serviceEntryArr { - nodeAddr := serviceEntry.Node.Address - addrSlide := append(strings.Split(nodeAddr, ":")) - ip := addrSlide[0] - var port int - if len(addrSlide) > 1 { - port, err = strconv.Atoi(addrSlide[1]) - if err != nil { - log.Error(err) - continue - } - } - - newNode := discovery.NewNode(serviceEntry.Service.Meta, serviceEntry.Node.ID, ip, port) + newNode := discovery.NewNode(serviceEntry.Service.Meta, serviceEntry.Node.ID, serviceEntry.Service.Address, serviceEntry.Service.Port) nodes = append(nodes, newNode) } diff --git a/drivers/plugins/ip-restriction/restriction.go b/drivers/plugins/ip-restriction/restriction.go index 46e0da5d..20fe083a 100644 --- a/drivers/plugins/ip-restriction/restriction.go +++ b/drivers/plugins/ip-restriction/restriction.go @@ -41,12 +41,12 @@ func (I *IPHandler) Start() error { } func (I *IPHandler) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWorker) error { -confObj, err := I.check(conf) -if err != nil { -return err -} -I.filter = confObj.genFilter() -return nil + confObj, err := I.check(conf) + if err != nil { + return err + } + I.filter = confObj.genFilter() + return nil } func (I *IPHandler) Stop() error { diff --git a/drivers/resources/redis/config.go b/drivers/resources/redis/config.go new file mode 100644 index 00000000..29c34d06 --- /dev/null +++ b/drivers/resources/redis/config.go @@ -0,0 +1,8 @@ +package redis + +type Config struct { + Enable bool `json:"enable"` + Addrs []string `json:"addrs" label:"redis 节点列表"` + Username string `json:"username"` + Password string `json:"password"` +} diff --git a/drivers/resources/redis/controller.go b/drivers/resources/redis/controller.go new file mode 100644 index 00000000..1455c216 --- /dev/null +++ b/drivers/resources/redis/controller.go @@ -0,0 +1,95 @@ +package redis + +import ( + "context" + "github.com/eolinker/apinto/resources" + "github.com/eolinker/eosc" + "github.com/eolinker/eosc/env" + "github.com/eolinker/eosc/log" + "github.com/go-redis/redis/v8" + "reflect" +) + +type Controller struct { + current *_Cacher + config Config +} + +func (m *Controller) ConfigType() reflect.Type { + return configType +} +func (m *Controller) shutdown() { + oldClient := m.current + if oldClient != nil { + m.current = nil + resources.ReplaceCacher() + oldClient.client.Close() + } +} +func (m *Controller) Set(conf interface{}) (err error) { + config, ok := conf.(*Config) + if ok && config != nil { + old := m.config + m.config = *config + + if reflect.DeepEqual(old, m.config) { + return nil + } + + if m.config.Enable && len(m.config.Addrs) > 0 { + + client := redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: m.config.Addrs, + Username: m.config.Username, + Password: m.config.Password, + }) + + if res, errPing := client.Ping(context.Background()).Result(); errPing != nil { + log.Info("ping redis:", res, " error:", err) + client.Close() + return errPing + } + + if env.Process() == eosc.ProcessWorker { + if m.current == nil { + m.current = newCacher(client) + resources.ReplaceCacher(m.current) + } else { + m.current.client = client + } + } else { + client.Close() + } + return nil + } + } + oldClient := m.current + if oldClient != nil { + resources.ReplaceCacher() + m.current = nil + oldClient.client.Close() + } + + return nil +} + +func (m *Controller) Get() interface{} { + return m.config +} + +func (m *Controller) Mode() eosc.SettingMode { + return eosc.SettingModeSingleton +} + +func (m *Controller) Check(cfg interface{}) (profession, name, driver, desc string, err error) { + err = eosc.ErrorUnsupportedKind + return +} + +func (m *Controller) AllWorkers() []string { + return []string{"redis@setting"} +} + +func NewController() *Controller { + return &Controller{} +} diff --git a/drivers/resources/redis/factory.go b/drivers/resources/redis/factory.go new file mode 100644 index 00000000..9362711b --- /dev/null +++ b/drivers/resources/redis/factory.go @@ -0,0 +1,21 @@ +package redis + +import ( + "github.com/eolinker/eosc" + "github.com/eolinker/eosc/setting" + "reflect" +) + +var ( + singleton *Controller + _ eosc.ISetting = singleton + configType = reflect.TypeOf(new(Config)) +) + +func init() { + singleton = NewController() +} + +func Register(register eosc.IExtenderDriverRegister) { + setting.RegisterSetting("redis", singleton) +} diff --git a/drivers/resources/redis/redis.go b/drivers/resources/redis/redis.go new file mode 100644 index 00000000..46817fab --- /dev/null +++ b/drivers/resources/redis/redis.go @@ -0,0 +1,65 @@ +package redis + +import ( + "context" + "github.com/eolinker/apinto/resources" + "github.com/go-redis/redis/v8" + "time" +) + +var ( + _ resources.ICache = (*_Cacher)(nil) +) + +type _Cacher struct { + client *redis.ClusterClient +} + +func (r *_Cacher) Close() error { + if r.client == nil { + e := r.client.Close() + r.client = nil + return e + } + return nil +} + +func (r *_Cacher) Set(ctx context.Context, key string, value []byte, expiration time.Duration) error { + + return r.client.Set(ctx, key, value, expiration).Err() +} + +func (r *_Cacher) SetNX(ctx context.Context, key string, value []byte, expiration time.Duration) (bool, error) { + + return r.client.SetNX(ctx, key, value, expiration).Result() +} + +func (r *_Cacher) DecrBy(ctx context.Context, key string, decrement int64) (int64, error) { + + return r.client.DecrBy(ctx, key, decrement).Result() +} + +func (r *_Cacher) IncrBy(ctx context.Context, key string, decrement int64) (int64, error) { + return r.client.IncrBy(ctx, key, decrement).Result() +} + +func (r *_Cacher) Get(ctx context.Context, key string) ([]byte, error) { + return r.client.Get(ctx, key).Bytes() + +} + +func (r *_Cacher) GetDel(ctx context.Context, key string) ([]byte, error) { + return r.client.GetDel(ctx, key).Bytes() + +} + +func (r *_Cacher) Del(ctx context.Context, keys ...string) (int64, error) { + return r.client.Del(ctx, keys...).Result() +} + +func newCacher(client *redis.ClusterClient) *_Cacher { + if client == nil { + return nil + } + return &_Cacher{client: client} +} diff --git a/drivers/router/http-router/handler.go b/drivers/router/http-router/handler.go index 77c02d41..cba8e7f9 100644 --- a/drivers/router/http-router/handler.go +++ b/drivers/router/http-router/handler.go @@ -23,11 +23,13 @@ type Handler struct { } func (h *Handler) ServeHTTP(ctx eocontext.EoContext) { + + httpContext, err := http_context.Assert(ctx) + if err != nil { + return + } + if h.disable { - httpContext, err := http_context.Assert(ctx) - if err != nil { - return - } httpContext.Response().SetStatus(http.StatusNotFound, "") httpContext.Response().SetBody([]byte("router disable")) httpContext.FastFinish() @@ -36,6 +38,7 @@ func (h *Handler) ServeHTTP(ctx eocontext.EoContext) { //Set Label ctx.SetLabel("api", h.routerName) ctx.SetLabel("service", h.serviceName) + ctx.SetLabel("ip", httpContext.Request().ReadIP()) ctx.SetFinish(&h.finisher) ctx.SetCompleteHandler(h.completeHandler) ctx.SetApp(h.service) diff --git a/drivers/strategy/cache-strategy/actuator.go b/drivers/strategy/cache-strategy/actuator.go new file mode 100644 index 00000000..5122745c --- /dev/null +++ b/drivers/strategy/cache-strategy/actuator.go @@ -0,0 +1,241 @@ +package cache_strategy + +import ( + "github.com/eolinker/apinto/drivers/strategy/cache-strategy/cache" + "github.com/eolinker/apinto/strategy" + "github.com/eolinker/eosc/eocontext" + http_service "github.com/eolinker/eosc/eocontext/http-context" + "net/http" + "sort" + "strconv" + "strings" + "sync" + "time" +) + +var ( + actuatorSet ActuatorSet +) + +func init() { + actuator := newtActuator() + actuatorSet = actuator + strategy.AddStrategyHandler(actuator) +} + +type ActuatorSet interface { + Set(string, *CacheValidTimeHandler) + Del(id string) +} + +type tActuator struct { + lock sync.RWMutex + all map[string]*CacheValidTimeHandler + handlers []*CacheValidTimeHandler +} + +func (a *tActuator) Destroy() { + +} + +func (a *tActuator) Set(id string, val *CacheValidTimeHandler) { + // 调用来源有锁 + a.all[id] = val + a.rebuild() + +} + +func (a *tActuator) Del(id string) { + // 调用来源有锁 + delete(a.all, id) + a.rebuild() +} + +func (a *tActuator) rebuild() { + + handlers := make([]*CacheValidTimeHandler, 0, len(a.all)) + for _, h := range a.all { + if !h.stop { + handlers = append(handlers, h) + } + } + sort.Sort(handlerListSort(handlers)) + a.lock.Lock() + defer a.lock.Unlock() + a.handlers = handlers +} +func newtActuator() *tActuator { + return &tActuator{ + all: make(map[string]*CacheValidTimeHandler), + } +} + +func (a *tActuator) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) error { + + httpCtx, err := http_service.Assert(ctx) + if err != nil { + return err + } + + if httpCtx.Request().Method() != http.MethodGet { + if next != nil { + return next.DoChain(ctx) + } + return nil + } + + a.lock.RLock() + handlers := a.handlers + a.lock.RUnlock() + + for _, handler := range handlers { + if handler.filter.Check(httpCtx) { + + uri := httpCtx.Request().URI().RequestURI() + responseData := cache.GetResponseData(uri) + + if responseData != nil { + httpCtx.SetCompleteHandler(responseData) + } else { + httpCtx.SetCompleteHandler(NewCacheGetCompleteHandler(httpCtx.GetComplete(), handler.validTime, uri)) + } + break + } + } + + if next != nil { + return next.DoChain(ctx) + } + return nil +} + +type CacheGetCompleteHandler struct { + orgHandler eocontext.CompleteHandler + validTime int + uri string +} + +func NewCacheGetCompleteHandler(orgHandler eocontext.CompleteHandler, validTime int, uri string) *CacheGetCompleteHandler { + return &CacheGetCompleteHandler{ + orgHandler: orgHandler, + validTime: validTime, + uri: uri, + } +} + +func (c *CacheGetCompleteHandler) Complete(ctx eocontext.EoContext) error { + + if c.orgHandler != nil { + if err := c.orgHandler.Complete(ctx); err != nil { + return err + } + } + + httpCtx, err := http_service.Assert(ctx) + if err != nil { + return nil + } + + //从cache-control中判断是否需要缓存 + if parseHttpContext(httpCtx).IsCache() { + responseData := &cache.ResponseData{ + Header: httpCtx.Response().Headers(), + Body: httpCtx.Response().GetBody(), + ValidTime: c.validTime, + Now: time.Now(), + } + cache.SetResponseData(httpCtx.Request().URI().RequestURI(), responseData, c.validTime) + } + return nil +} + +type handlerListSort []*CacheValidTimeHandler + +func (hs handlerListSort) Len() int { + return len(hs) +} + +func (hs handlerListSort) Less(i, j int) bool { + + return hs[i].priority < hs[j].priority +} + +func (hs handlerListSort) Swap(i, j int) { + hs[i], hs[j] = hs[j], hs[i] +} + +type httpContext struct { + cacheControl map[string]string + reqHeader http.Header + resHeader http.Header +} + +func (c httpContext) NoCache() bool { + if _, ok := c.cacheControl["no-cache"]; ok { + return true + } + return false +} + +func (c httpContext) IsCache() bool { + if maxAgeStr, ok := c.cacheControl["max-age"]; ok { + maxAge, _ := strconv.Atoi(maxAgeStr) + if maxAge == 0 { + return false + } + } + + if c.NoCache() { + return false + } + + if !c.IsPublic() { + return false + } + + if _, ok := c.cacheControl["no-store"]; ok { + return false + } + + return true +} + +func (c httpContext) IsPublic() bool { + if _, ok := c.reqHeader["Authorization"]; ok { + if _, pOk := c.cacheControl["public"]; pOk { + return true + } + return false + } + + //只要不是私有的 都算公有 + if _, ok := c.cacheControl["private"]; ok { + return false + } + return true +} + +func parseHttpContext(httpCtx http_service.IHttpContext) httpContext { + hc := httpContext{ + cacheControl: make(map[string]string), + } + + hc.resHeader = httpCtx.Response().Headers() + hc.reqHeader = httpCtx.Request().Header().Headers() + + cacheControlHeader := httpCtx.Response().GetHeader("Cache-Control") + for _, part := range strings.Split(cacheControlHeader, ",") { + part = strings.Trim(part, " ") + if part == "" { + continue + } + if strings.ContainsRune(part, '=') { + keyVal := strings.Split(part, "=") + hc.cacheControl[strings.Trim(keyVal[0], " ")] = strings.Trim(keyVal[1], ",") + + } else { + hc.cacheControl[part] = "" + } + } + return hc +} diff --git a/drivers/strategy/cache-strategy/cache/cache.go b/drivers/strategy/cache-strategy/cache/cache.go new file mode 100644 index 00000000..24e0e827 --- /dev/null +++ b/drivers/strategy/cache-strategy/cache/cache.go @@ -0,0 +1,55 @@ +package cache + +import ( + "context" + "encoding/json" + "fmt" + "github.com/eolinker/apinto/resources" + "github.com/eolinker/eosc/eocontext" + http_service "github.com/eolinker/eosc/eocontext/http-context" + "net/http" + "time" +) + +type ResponseData struct { + Header http.Header + Body []byte + ValidTime int + Now time.Time // 缓存存放的时间 +} + +func (r *ResponseData) Complete(ctx eocontext.EoContext) error { + httpCtx, err := http_service.Assert(ctx) + if err != nil { + return err + } + httpCtx.Response().SetBody(r.Body) + for key, val := range r.Header { + if len(val) > 0 { + httpCtx.Response().SetHeader(key, val[0]) + } + } + httpCtx.Response().SetHeader("Date", time.Now().Format(time.RFC822)) + + //计算Age Age 的值通常接近于 0。表示此对象刚刚从原始服务器获取不久;其他的值则是表示代理服务器当前的系统时间与此应答中的通用头 Date 的值之差 + age := int(time.Now().Sub(r.Now).Seconds()) + + httpCtx.Response().Headers().Set("Age", fmt.Sprintf("%d", age)) + httpCtx.Response().Headers().Set("Cache-Control", fmt.Sprintf("%s=%d", "max-age", r.ValidTime)) + + return nil +} + +func SetResponseData(uri string, data *ResponseData, validTime int) { + bytes, _ := json.Marshal(data) + _ = resources.Cacher().Set(context.TODO(), uri, bytes, time.Second*time.Duration(validTime)) +} + +func GetResponseData(uri string) *ResponseData { + bytes, _ := resources.Cacher().Get(context.TODO(), uri) + data := new(ResponseData) + if err := json.Unmarshal(bytes, data); err != nil { + return nil + } + return data +} diff --git a/drivers/strategy/cache-strategy/cache_valid_time.go b/drivers/strategy/cache-strategy/cache_valid_time.go new file mode 100644 index 00000000..759d1922 --- /dev/null +++ b/drivers/strategy/cache-strategy/cache_valid_time.go @@ -0,0 +1,79 @@ +package cache_strategy + +import ( + "fmt" + "github.com/eolinker/eosc" + "reflect" +) + +var ( + _ eosc.IWorker = (*CacheValidTime)(nil) + _ eosc.IWorkerDestroy = (*CacheValidTime)(nil) +) + +type CacheValidTime struct { + id string + name string + handler *CacheValidTimeHandler + config *Config + isRunning int +} + +func (l *CacheValidTime) Destroy() error { + controller.Del(l.id) + return nil +} + +func (l *CacheValidTime) Id() string { + return l.id +} + +func (l *CacheValidTime) Start() error { + if l.isRunning == 0 { + l.isRunning = 1 + actuatorSet.Set(l.id, l.handler) + } + + return nil +} + +func (l *CacheValidTime) Reset(v interface{}, workers map[eosc.RequireId]eosc.IWorker) error { + conf, ok := v.(*Config) + if !ok { + return eosc.ErrorConfigIsNil + } + if conf.Priority > 999 || conf.Priority < 1 { + return fmt.Errorf("priority value %d not allow ", conf.Priority) + } + if conf.ValidTime < 1 { + return fmt.Errorf("validTime value %d not allow ", conf.ValidTime) + } + + confCore := conf + if reflect.DeepEqual(l.config, confCore) { + return nil + } + handler, err := NewCacheValidTimeHandler(confCore) + if err != nil { + return err + } + l.config = confCore + l.handler = handler + if l.isRunning != 0 { + actuatorSet.Set(l.id, l.handler) + } + return nil +} + +func (l *CacheValidTime) Stop() error { + if l.isRunning != 0 { + l.isRunning = 0 + actuatorSet.Del(l.id) + } + + return nil +} + +func (l *CacheValidTime) CheckSkill(skill string) bool { + return false +} diff --git a/drivers/strategy/cache-strategy/config.go b/drivers/strategy/cache-strategy/config.go new file mode 100644 index 00000000..0e8dd083 --- /dev/null +++ b/drivers/strategy/cache-strategy/config.go @@ -0,0 +1,12 @@ +package cache_strategy + +import "github.com/eolinker/apinto/strategy" + +type Config struct { + Name string `json:"name" skip:"skip"` + Description string `json:"description" skip:"skip"` + Stop bool `json:"stop"` + Priority int `json:"priority" label:"优先级" description:"1-999"` + Filters strategy.FilterConfig `json:"filters" label:"过滤规则"` + ValidTime int `json:"valid_time" label:"有效期" description:"有效期"` +} diff --git a/drivers/strategy/limiting-stragety/controller.go b/drivers/strategy/cache-strategy/controller.go similarity index 98% rename from drivers/strategy/limiting-stragety/controller.go rename to drivers/strategy/cache-strategy/controller.go index 80cc8d7a..534c096c 100644 --- a/drivers/strategy/limiting-stragety/controller.go +++ b/drivers/strategy/cache-strategy/controller.go @@ -1,4 +1,4 @@ -package limiting_stragety +package cache_strategy import ( "github.com/eolinker/eosc" diff --git a/drivers/strategy/cache-strategy/driver.go b/drivers/strategy/cache-strategy/driver.go new file mode 100644 index 00000000..7eea119c --- /dev/null +++ b/drivers/strategy/cache-strategy/driver.go @@ -0,0 +1,60 @@ +package cache_strategy + +import ( + "fmt" + "github.com/eolinker/apinto/strategy" + "github.com/eolinker/eosc" + "reflect" +) + +func checkConfig(conf *Config) error { + if conf.Priority > 999 || conf.Priority < 1 { + return fmt.Errorf("priority value %d not allow ", conf.Priority) + } + + if conf.ValidTime < 1 { + return fmt.Errorf("validTime value %d not allow ", conf.ValidTime) + } + + _, err := strategy.ParseFilter(conf.Filters) + if err != nil { + return err + } + + return nil +} + +type driver struct { +} + +func (d *driver) Check(v interface{}, workers map[eosc.RequireId]eosc.IWorker) error { + cfg, ok := v.(*Config) + if !ok { + return eosc.ErrorConfigIsNil + } + + return checkConfig(cfg) +} + +func (d *driver) ConfigType() reflect.Type { + return configType +} + +func (d *driver) Create(id, name string, v interface{}, workers map[eosc.RequireId]eosc.IWorker) (eosc.IWorker, error) { + if err := d.Check(v, workers); err != nil { + return nil, err + } + + lg := &CacheValidTime{ + id: id, + name: name, + } + + err := lg.Reset(v, workers) + if err != nil { + return nil, err + } + + controller.Store(id) + return lg, nil +} diff --git a/drivers/strategy/cache-strategy/factory.go b/drivers/strategy/cache-strategy/factory.go new file mode 100644 index 00000000..ee3e7c6d --- /dev/null +++ b/drivers/strategy/cache-strategy/factory.go @@ -0,0 +1,45 @@ +package cache_strategy + +import ( + "github.com/eolinker/eosc" + "github.com/eolinker/eosc/setting" + "github.com/eolinker/eosc/utils/schema" + "reflect" +) + +const Name = "strategy-cache" + +var ( + configType = reflect.TypeOf((*Config)(nil)) +) + +//Register 注册http路由驱动工厂 +func Register(register eosc.IExtenderDriverRegister) { + + register.RegisterExtenderDriver(Name, newFactory()) + setting.RegisterSetting("strategies-cache", controller) +} + +type factory struct { + render interface{} +} + +func newFactory() *factory { + render, err := schema.Generate(configType, nil) + if err != nil { + panic(err) + } + return &factory{ + render: render, + } +} + +func (f *factory) Render() interface{} { + return f.render +} + +func (f *factory) Create(profession string, name string, label string, desc string, params map[string]interface{}) (eosc.IExtenderDriver, error) { + controller.driver = name + controller.profession = profession + return &driver{}, nil +} diff --git a/drivers/strategy/cache-strategy/handler.go b/drivers/strategy/cache-strategy/handler.go new file mode 100644 index 00000000..8f9e2f4a --- /dev/null +++ b/drivers/strategy/cache-strategy/handler.go @@ -0,0 +1,28 @@ +package cache_strategy + +import ( + "github.com/eolinker/apinto/strategy" +) + +type CacheValidTimeHandler struct { + name string + filter strategy.IFilter + validTime int + priority int + stop bool +} + +func NewCacheValidTimeHandler(conf *Config) (*CacheValidTimeHandler, error) { + filter, err := strategy.ParseFilter(conf.Filters) + if err != nil { + return nil, err + } + + return &CacheValidTimeHandler{ + name: conf.Name, + filter: filter, + validTime: conf.ValidTime, + priority: conf.Priority, + stop: conf.Stop, + }, nil +} diff --git a/drivers/strategy/grey-strategy/actuator.go b/drivers/strategy/grey-strategy/actuator.go new file mode 100644 index 00000000..d4d6195c --- /dev/null +++ b/drivers/strategy/grey-strategy/actuator.go @@ -0,0 +1,144 @@ +package grey_strategy + +import ( + "fmt" + "github.com/eolinker/apinto/strategy" + "github.com/eolinker/eosc/eocontext" + http_service "github.com/eolinker/eosc/eocontext/http-context" + "sort" + "sync" +) + +var ( + actuatorSet ActuatorSet +) + +const cookieName = "grey-cookie-%s" + +func init() { + actuator := newtActuator() + actuatorSet = actuator + strategy.AddStrategyHandler(actuator) +} + +type ActuatorSet interface { + Set(string, *GreyHandler) + Del(id string) +} + +type tActuator struct { + lock sync.RWMutex + all map[string]*GreyHandler + handlers []*GreyHandler +} + +func (a *tActuator) Destroy() { + +} + +func (a *tActuator) Set(id string, val *GreyHandler) { + // 调用来源有锁 + a.all[id] = val + a.rebuild() + +} + +func (a *tActuator) Del(id string) { + // 调用来源有锁 + delete(a.all, id) + a.rebuild() +} + +func (a *tActuator) rebuild() { + + handlers := make([]*GreyHandler, 0, len(a.all)) + for _, h := range a.all { + if !h.stop { + handlers = append(handlers, h) + } + } + sort.Sort(handlerListSort(handlers)) + a.lock.Lock() + defer a.lock.Unlock() + a.handlers = handlers +} +func newtActuator() *tActuator { + return &tActuator{ + all: make(map[string]*GreyHandler), + } +} + +func (a *tActuator) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) error { + + httpCtx, err := http_service.Assert(ctx) + if err != nil { + return err + } + + a.lock.RLock() + handlers := a.handlers + a.lock.RUnlock() + + for _, handler := range handlers { + //check筛选条件 + if handler.filter.Check(httpCtx) { + ctx.SetBalance(newGreyBalanceHandler(ctx.GetBalance(), handler)) + break + } + } + + if next != nil { + return next.DoChain(ctx) + } + return nil +} + +type handlerListSort []*GreyHandler + +func (hs handlerListSort) Len() int { + return len(hs) +} + +func (hs handlerListSort) Less(i, j int) bool { + + return hs[i].priority < hs[j].priority +} + +func (hs handlerListSort) Swap(i, j int) { + hs[i], hs[j] = hs[j], hs[i] +} + +type GreyBalanceHandler struct { + orgHandler eocontext.BalanceHandler + greyHandler *GreyHandler +} + +func newGreyBalanceHandler(orgHandler eocontext.BalanceHandler, greyHandler *GreyHandler) *GreyBalanceHandler { + return &GreyBalanceHandler{orgHandler: orgHandler, greyHandler: greyHandler} +} + +func (g *GreyBalanceHandler) Select(ctx eocontext.EoContext) (eocontext.INode, error) { + httpCtx, err := http_service.Assert(ctx) + if err != nil { + return nil, err + } + + cookieKey := fmt.Sprintf(cookieName, g.greyHandler.name) + + if g.greyHandler.rule.keepSession { + cookie := httpCtx.Request().Header().GetCookie(cookieKey) + if cookie == grey { + return g.greyHandler.selectNodes(), nil + } else if cookie == normal { + return g.orgHandler.Select(ctx) + } + } + + if g.greyHandler.rule.greyMatch.Match(ctx) { //灰度 + httpCtx.Response().Headers().Add("Set-Cookie", fmt.Sprintf("%s=%v", cookieKey, grey)) + return g.greyHandler.selectNodes(), nil + } else { + httpCtx.Response().Headers().Add("Set-Cookie", fmt.Sprintf("%s=%v", cookieKey, normal)) + return g.orgHandler.Select(ctx) + } +} diff --git a/drivers/strategy/grey-strategy/config.go b/drivers/strategy/grey-strategy/config.go new file mode 100644 index 00000000..9ce37b58 --- /dev/null +++ b/drivers/strategy/grey-strategy/config.go @@ -0,0 +1,113 @@ +package grey_strategy + +import ( + "fmt" + "github.com/eolinker/apinto/strategy" + "github.com/eolinker/eosc/eocontext" + "strconv" + "strings" +) + +const ( + percent = "percent" + match = "match" + grey = "grey" + normal = "normal" +) + +type Config struct { + Name string `json:"name" skip:"skip"` + Description string `json:"description" skip:"skip"` + Stop bool `json:"stop"` + Priority int `json:"priority" label:"优先级" description:"1-999"` + Filters strategy.FilterConfig `json:"filters" label:"过滤规则"` + Rule *Rule `json:"rule" label:"灰度规则"` +} + +type Rule struct { + KeepSession bool `json:"keep_session"` + Nodes []string `json:"nodes"` // + Distribution string `json:"distribution"` // percent match + Percent int `json:"percent"` // 灰度的百分比 四位数 + Matching []*Matching `json:"matching"` +} + +type Matching struct { + Type string `json:"type" label:"类型" enum:"header,query,cookie"` + Name string `json:"name" label:"参数名"` + Value string `json:"value" label:"值规" ` +} + +func (r *Rule) GetNodes() []eocontext.INode { + nodes := make([]eocontext.INode, 0) + for _, node := range r.Nodes { + addrSlide := strings.Split(node, ":") + + ip := addrSlide[0] + port := 0 + if len(addrSlide) > 1 { + port, _ = strconv.Atoi(addrSlide[1]) + } + + nodes = append(nodes, newGreyNode(fmt.Sprintf("%s:%d", ip, port), ip, port)) + } + + return nodes +} + +type GreyNode struct { + labels eocontext.Attrs + id string + ip string + port int + status eocontext.NodeStatus +} + +func newGreyNode(id string, ip string, port int) *GreyNode { + return &GreyNode{labels: map[string]string{}, id: id, ip: ip, port: port, status: eocontext.Running} +} + +func (g *GreyNode) GetAttrs() eocontext.Attrs { + return g.labels +} + +func (g *GreyNode) GetAttrByName(name string) (string, bool) { + v, ok := g.labels[name] + return v, ok +} + +func (g *GreyNode) ID() string { + return g.id +} + +func (g *GreyNode) IP() string { + return g.ip +} + +func (g *GreyNode) Port() int { + return g.port + +} + +func (g *GreyNode) Addr() string { + if g.port == 0 { + return g.ip + } + return fmt.Sprintf("%s:%d", g.ip, g.port) +} + +func (g *GreyNode) Status() eocontext.NodeStatus { + return g.status +} + +func (g *GreyNode) Up() { + g.status = eocontext.Running +} + +func (g *GreyNode) Down() { + g.status = eocontext.Down +} + +func (g *GreyNode) Leave() { + g.status = eocontext.Leave +} diff --git a/drivers/strategy/grey-strategy/controller.go b/drivers/strategy/grey-strategy/controller.go new file mode 100644 index 00000000..b6e9e5a8 --- /dev/null +++ b/drivers/strategy/grey-strategy/controller.go @@ -0,0 +1,85 @@ +package grey_strategy + +import ( + "github.com/eolinker/eosc" + "reflect" +) + +var ( + controller = NewController() + _ eosc.ISetting = controller + _ IController = controller +) + +type IController interface { + Store(id string) + Del(id string) +} +type Controller struct { + profession string + driver string + all map[string]struct{} +} + +func (c *Controller) Store(id string) { + c.all[id] = struct{}{} +} + +func (c *Controller) Del(id string) { + delete(c.all, id) +} + +func (c *Controller) ConfigType() reflect.Type { + return configType +} + +func (c *Controller) Set(conf interface{}) (err error) { + return eosc.ErrorUnsupportedKind +} + +func (c *Controller) Get() interface{} { + return nil +} + +func (c *Controller) Mode() eosc.SettingMode { + return eosc.SettingModeBatch +} + +func (c *Controller) Check(cfg interface{}) (profession, name, driver, desc string, err error) { + conf, ok := cfg.(*Config) + if !ok { + err = eosc.ErrorConfigIsNil + return + } + if empty(conf.Name) { + err = eosc.ErrorConfigFieldUnknown + return + } + err = checkConfig(conf) + if err != nil { + return + } + return c.profession, conf.Name, c.driver, conf.Description, nil + +} +func empty(vs ...string) bool { + for _, v := range vs { + if len(v) == 0 { + return true + } + } + return false +} +func (c *Controller) AllWorkers() []string { + ws := make([]string, 0, len(c.all)) + for id := range c.all { + ws = append(ws, id) + } + return ws +} + +func NewController() *Controller { + return &Controller{ + all: map[string]struct{}{}, + } +} diff --git a/drivers/strategy/grey-strategy/driver.go b/drivers/strategy/grey-strategy/driver.go new file mode 100644 index 00000000..99d634af --- /dev/null +++ b/drivers/strategy/grey-strategy/driver.go @@ -0,0 +1,73 @@ +package grey_strategy + +import ( + "fmt" + "github.com/eolinker/apinto/strategy" + "github.com/eolinker/eosc" + "reflect" + "strings" +) + +func checkConfig(conf *Config) error { + if conf.Priority > 999 || conf.Priority < 1 { + return fmt.Errorf("priority value %d not allow ", conf.Priority) + } + + if conf.Rule.Distribution == percent && (conf.Rule.Percent < 0 || conf.Rule.Percent > 10000) { + return fmt.Errorf("percent value %d not allow ", conf.Rule.Percent) + } else if conf.Rule.Distribution == match && len(conf.Rule.Matching) == 0 { + return fmt.Errorf("matching rule len is 0 ") + } + + if len(conf.Rule.Nodes) == 0 { + return fmt.Errorf("nodes len is 0 ") + } + //检查灰度节点是否正确 + for _, node := range conf.Rule.Nodes { + if strings.Count(node, "http") > 0 || strings.Count(node, "https") > 0 { + return fmt.Errorf("node value %s cannot be http or https ", node) + } + } + + _, err := strategy.ParseFilter(conf.Filters) + if err != nil { + return err + } + + return nil +} + +type driver struct { +} + +func (d *driver) Check(v interface{}, workers map[eosc.RequireId]eosc.IWorker) error { + cfg, ok := v.(*Config) + if !ok { + return eosc.ErrorConfigIsNil + } + + return checkConfig(cfg) +} + +func (d *driver) ConfigType() reflect.Type { + return configType +} + +func (d *driver) Create(id, name string, v interface{}, workers map[eosc.RequireId]eosc.IWorker) (eosc.IWorker, error) { + if err := d.Check(v, workers); err != nil { + return nil, err + } + + lg := &Grey{ + id: id, + name: name, + } + + err := lg.Reset(v, workers) + if err != nil { + return nil, err + } + + controller.Store(id) + return lg, nil +} diff --git a/drivers/strategy/grey-strategy/factory.go b/drivers/strategy/grey-strategy/factory.go new file mode 100644 index 00000000..094f692e --- /dev/null +++ b/drivers/strategy/grey-strategy/factory.go @@ -0,0 +1,45 @@ +package grey_strategy + +import ( + "github.com/eolinker/eosc" + "github.com/eolinker/eosc/setting" + "github.com/eolinker/eosc/utils/schema" + "reflect" +) + +const Name = "strategy-grey" + +var ( + configType = reflect.TypeOf((*Config)(nil)) +) + +//Register 注册http路由驱动工厂 +func Register(register eosc.IExtenderDriverRegister) { + + register.RegisterExtenderDriver(Name, newFactory()) + setting.RegisterSetting("strategies-grey", controller) +} + +type factory struct { + render interface{} +} + +func newFactory() *factory { + render, err := schema.Generate(configType, nil) + if err != nil { + panic(err) + } + return &factory{ + render: render, + } +} + +func (f *factory) Render() interface{} { + return f.render +} + +func (f *factory) Create(profession string, name string, label string, desc string, params map[string]interface{}) (eosc.IExtenderDriver, error) { + controller.driver = name + controller.profession = profession + return &driver{}, nil +} diff --git a/drivers/strategy/grey-strategy/grey.go b/drivers/strategy/grey-strategy/grey.go new file mode 100644 index 00000000..2edc711e --- /dev/null +++ b/drivers/strategy/grey-strategy/grey.go @@ -0,0 +1,76 @@ +package grey_strategy + +import ( + "fmt" + "github.com/eolinker/eosc" + "reflect" +) + +var ( + _ eosc.IWorker = (*Grey)(nil) + _ eosc.IWorkerDestroy = (*Grey)(nil) +) + +type Grey struct { + id string + name string + handler *GreyHandler + config *Config + isRunning int +} + +func (l *Grey) Destroy() error { + controller.Del(l.id) + return nil +} + +func (l *Grey) Id() string { + return l.id +} + +func (l *Grey) Start() error { + if l.isRunning == 0 { + l.isRunning = 1 + actuatorSet.Set(l.id, l.handler) + } + + return nil +} + +func (l *Grey) Reset(v interface{}, workers map[eosc.RequireId]eosc.IWorker) error { + conf, ok := v.(*Config) + if !ok { + return eosc.ErrorConfigIsNil + } + if conf.Priority > 999 || conf.Priority < 1 { + return fmt.Errorf("priority value %d not allow ", conf.Priority) + } + + confCore := conf + if reflect.DeepEqual(l.config, confCore) { + return nil + } + handler, err := NewGreyHandler(confCore) + if err != nil { + return err + } + l.config = confCore + l.handler = handler + if l.isRunning != 0 { + actuatorSet.Set(l.id, l.handler) + } + return nil +} + +func (l *Grey) Stop() error { + if l.isRunning != 0 { + l.isRunning = 0 + actuatorSet.Del(l.id) + } + + return nil +} + +func (l *Grey) CheckSkill(skill string) bool { + return false +} diff --git a/drivers/strategy/grey-strategy/handler.go b/drivers/strategy/grey-strategy/handler.go new file mode 100644 index 00000000..ac88676b --- /dev/null +++ b/drivers/strategy/grey-strategy/handler.go @@ -0,0 +1,175 @@ +package grey_strategy + +import ( + "github.com/eolinker/apinto/checker" + "github.com/eolinker/apinto/strategy" + "github.com/eolinker/eosc/eocontext" + http_service "github.com/eolinker/eosc/eocontext/http-context" + "sync" +) + +type GreyHandler struct { + name string + filter strategy.IFilter + priority int + stop bool + rule *ruleHandler +} + +type greyMatch interface { + Match(ctx eocontext.EoContext) bool +} + +type ruleHandler struct { + selectNodeLock *sync.Mutex + index int + keepSession bool + nodes []eocontext.INode + distribution string + greyMatch greyMatch +} + +type ruleGreyFlow struct { + flowRobin *Robin +} + +type ruleGreyMatch struct { + ruleFilter strategy.IFilter +} + +func (r *ruleGreyFlow) Match(ctx eocontext.EoContext) bool { + flow := r.flowRobin.Select() + return flow.GetId() == 1 +} + +func (r *ruleGreyMatch) Match(ctx eocontext.EoContext) bool { + return r.ruleFilter.Check(ctx) +} + +type matchingHandler struct { + Type string + name string + value string + checker checker.Checker +} + +type matchingHandlerFilters []*matchingHandler + +func (m matchingHandlerFilters) Check(ctx eocontext.EoContext) bool { + for _, handler := range m { + if !handler.Check(ctx) { + return false + } + } + return true +} + +func (m *matchingHandler) Check(ctx eocontext.EoContext) bool { + httpCtx, err := http_service.Assert(ctx) + if err != nil { + return false + } + + value := "" + request := httpCtx.Request() + switch m.Type { + case "header": + value = request.Header().GetHeader(m.name) + case "query": + value = request.URI().GetQuery(m.name) + case "cookie": + value = request.Header().GetCookie(m.name) + default: + return false + } + + return m.checker.Check(value, true) +} + +type flowHandler struct { + id int //1为灰度流量 2为正常流量 + weight int +} + +func (f *flowHandler) GetId() uint32 { + return uint32(f.id) +} + +func (f *flowHandler) GetWeight() int { + return f.weight +} + +// ABCABCABCABC 轮询从nodes中拿一个节点信息 +func (g *GreyHandler) selectNodes() eocontext.INode { + if len(g.rule.nodes) == 1 { + return g.rule.nodes[0] + } + g.rule.selectNodeLock.Lock() + defer g.rule.selectNodeLock.Unlock() + + var node eocontext.INode + if g.rule.index == len(g.rule.nodes)-1 { + node = g.rule.nodes[g.rule.index] + g.rule.index = 0 + } else { + node = g.rule.nodes[g.rule.index] + g.rule.index++ + } + + return node +} + +func NewGreyHandler(conf *Config) (*GreyHandler, error) { + filter, err := strategy.ParseFilter(conf.Filters) + if err != nil { + return nil, err + } + + rule := &ruleHandler{ + selectNodeLock: &sync.Mutex{}, + keepSession: conf.Rule.KeepSession, + nodes: conf.Rule.GetNodes(), + distribution: conf.Rule.Distribution, + } + + if conf.Rule.Distribution == percent { + greyFlow := &flowHandler{ + id: 1, + weight: conf.Rule.Percent, + } + normalFlow := &flowHandler{ + id: 2, + weight: 10000 - greyFlow.weight, + } + //总权重10000 + rule.greyMatch = &ruleGreyFlow{flowRobin: NewRobin(greyFlow, normalFlow)} + } else { + ruleFilter := make(matchingHandlerFilters, 0) + for _, matching := range conf.Rule.Matching { + + check, err := checker.Parse(matching.Value) + if err != nil { + return nil, err + } + + matchingHandlerVal := &matchingHandler{ + Type: matching.Type, + name: matching.Name, + value: matching.Value, + checker: check, + } + + ruleFilter = append(ruleFilter, matchingHandlerVal) + } + + rule.greyMatch = &ruleGreyMatch{ruleFilter: ruleFilter} + } + + return &GreyHandler{ + name: conf.Name, + filter: filter, + priority: conf.Priority, + stop: conf.Stop, + rule: rule, + }, nil +} diff --git a/drivers/strategy/grey-strategy/round-robin.go b/drivers/strategy/grey-strategy/round-robin.go new file mode 100644 index 00000000..83c2984b --- /dev/null +++ b/drivers/strategy/grey-strategy/round-robin.go @@ -0,0 +1,82 @@ +package grey_strategy + +type Weighted interface { + GetId() uint32 + GetWeight() int +} + +// NewRobin 初始化一个池子 +func NewRobin(servers ...Weighted) *Robin { + newRobin := &Robin{} + newRobin.updateServers(servers) + return newRobin +} + +type Training struct { + Server Weighted + Weight int //初始化设置权重值 + CurrentWeight int //目前的权重值 +} + +type Robin struct { + Weighted []Weighted + Training []*Training +} + +func (l *Robin) updateServers(servers []Weighted) { + weighted := make([]*Training, 0) + for _, v := range servers { + w := &Training{ + Server: v, + Weight: v.GetWeight(), + CurrentWeight: 0, + } + weighted = append(weighted, w) + } + l.Training = weighted + l.Weighted = servers +} + +// Select remove为需要屏蔽的ID, +func (l *Robin) Select(remove ...uint) Weighted { + if len(l.Training) == 0 { + return nil + } + w := l.nextWeighted(remove) + if w == nil { + return nil + } + return w.Server +} +func (l *Robin) nextWeighted(remove []uint) (best *Training) { + total := 0 + for i := 0; i < len(l.Training); i++ { + w := l.Training[i] + if w == nil { + continue + } + isFind := false + for _, v := range remove { + if v == uint(w.Server.GetId()) { + isFind = true + } + } + if isFind { + continue + } + //每次都加原始的权重值 + w.CurrentWeight += w.Weight + //所有权重之和 + total += w.Weight + //判断当前最大的权重。不管有没有最大 先取第一个、然后依次对比、取出最大 + if best == nil || w.CurrentWeight > best.CurrentWeight { + best = w + } + } + if best == nil { + return best + } + //抽出后-最大权重值 + best.CurrentWeight -= total + return best +} diff --git a/drivers/strategy/limiting-stragety/actuator-handle.go b/drivers/strategy/limiting-strategy/actuator-handle.go similarity index 84% rename from drivers/strategy/limiting-stragety/actuator-handle.go rename to drivers/strategy/limiting-strategy/actuator-handle.go index 7daa3e03..c90c88b3 100644 --- a/drivers/strategy/limiting-stragety/actuator-handle.go +++ b/drivers/strategy/limiting-strategy/actuator-handle.go @@ -1,7 +1,7 @@ -package limiting_stragety +package limiting_strategy import ( - "github.com/eolinker/apinto/drivers/strategy/limiting-stragety/scalar" + "github.com/eolinker/apinto/drivers/strategy/limiting-strategy/scalar" "github.com/eolinker/eosc/eocontext" ) diff --git a/drivers/strategy/limiting-stragety/actuator.go b/drivers/strategy/limiting-strategy/actuator.go similarity index 95% rename from drivers/strategy/limiting-stragety/actuator.go rename to drivers/strategy/limiting-strategy/actuator.go index 451c2223..5a7408be 100644 --- a/drivers/strategy/limiting-stragety/actuator.go +++ b/drivers/strategy/limiting-strategy/actuator.go @@ -1,7 +1,7 @@ -package limiting_stragety +package limiting_strategy import ( - "github.com/eolinker/apinto/drivers/strategy/limiting-stragety/scalar" + "github.com/eolinker/apinto/drivers/strategy/limiting-strategy/scalar" "github.com/eolinker/apinto/strategy" "github.com/eolinker/eosc/eocontext" "sort" diff --git a/drivers/strategy/limiting-stragety/config.go b/drivers/strategy/limiting-strategy/config.go similarity index 93% rename from drivers/strategy/limiting-stragety/config.go rename to drivers/strategy/limiting-strategy/config.go index 5e543ed6..87cdc7a5 100644 --- a/drivers/strategy/limiting-stragety/config.go +++ b/drivers/strategy/limiting-strategy/config.go @@ -1,4 +1,4 @@ -package limiting_stragety +package limiting_strategy import "github.com/eolinker/apinto/strategy" @@ -16,7 +16,7 @@ type Rule struct { type Config struct { Name string `json:"name" skip:"skip"` Description string `json:"description" skip:"skip"` - Stop bool `json:"stop" ` + Stop bool `json:"stop"` Priority int `json:"priority" label:"优先级" description:"1-999"` Filters strategy.FilterConfig `json:"filters" label:"过滤规则"` Rule Rule `json:"limiting" label:"限流规则" description:"限流规则"` diff --git a/drivers/strategy/limiting-strategy/controller.go b/drivers/strategy/limiting-strategy/controller.go new file mode 100644 index 00000000..35e79fba --- /dev/null +++ b/drivers/strategy/limiting-strategy/controller.go @@ -0,0 +1,85 @@ +package limiting_strategy + +import ( + "github.com/eolinker/eosc" + "reflect" +) + +var ( + controller = NewController() + _ eosc.ISetting = controller + _ IController = controller +) + +type IController interface { + Store(id string) + Del(id string) +} +type Controller struct { + profession string + driver string + all map[string]struct{} +} + +func (c *Controller) Store(id string) { + c.all[id] = struct{}{} +} + +func (c *Controller) Del(id string) { + delete(c.all, id) +} + +func (c *Controller) ConfigType() reflect.Type { + return configType +} + +func (c *Controller) Set(conf interface{}) (err error) { + return eosc.ErrorUnsupportedKind +} + +func (c *Controller) Get() interface{} { + return nil +} + +func (c *Controller) Mode() eosc.SettingMode { + return eosc.SettingModeBatch +} + +func (c *Controller) Check(cfg interface{}) (profession, name, driver, desc string, err error) { + conf, ok := cfg.(*Config) + if !ok { + err = eosc.ErrorConfigIsNil + return + } + if empty(conf.Name) { + err = eosc.ErrorConfigFieldUnknown + return + } + err = checkConfig(conf) + if err != nil { + return + } + return c.profession, conf.Name, c.driver, conf.Description, nil + +} +func empty(vs ...string) bool { + for _, v := range vs { + if len(v) == 0 { + return true + } + } + return false +} +func (c *Controller) AllWorkers() []string { + ws := make([]string, 0, len(c.all)) + for id := range c.all { + ws = append(ws, id) + } + return ws +} + +func NewController() *Controller { + return &Controller{ + all: map[string]struct{}{}, + } +} diff --git a/drivers/strategy/limiting-stragety/driver.go b/drivers/strategy/limiting-strategy/driver.go similarity index 97% rename from drivers/strategy/limiting-stragety/driver.go rename to drivers/strategy/limiting-strategy/driver.go index ae49175f..9126d6ca 100644 --- a/drivers/strategy/limiting-stragety/driver.go +++ b/drivers/strategy/limiting-strategy/driver.go @@ -1,4 +1,4 @@ -package limiting_stragety +package limiting_strategy import ( "fmt" diff --git a/drivers/strategy/limiting-stragety/factory.go b/drivers/strategy/limiting-strategy/factory.go similarity index 97% rename from drivers/strategy/limiting-stragety/factory.go rename to drivers/strategy/limiting-strategy/factory.go index 3479777c..eb0a0b27 100644 --- a/drivers/strategy/limiting-stragety/factory.go +++ b/drivers/strategy/limiting-strategy/factory.go @@ -1,4 +1,4 @@ -package limiting_stragety +package limiting_strategy import ( "github.com/eolinker/eosc" diff --git a/drivers/strategy/limiting-stragety/handler.go b/drivers/strategy/limiting-strategy/handler.go similarity index 96% rename from drivers/strategy/limiting-stragety/handler.go rename to drivers/strategy/limiting-strategy/handler.go index d77ed0e8..e80be9c9 100644 --- a/drivers/strategy/limiting-stragety/handler.go +++ b/drivers/strategy/limiting-strategy/handler.go @@ -1,4 +1,4 @@ -package limiting_stragety +package limiting_strategy import ( "github.com/eolinker/apinto/metrics" @@ -57,11 +57,12 @@ func NewLimitingHandler(conf *Config) (*LimitingHandler, error) { mts := metrics.Parse(conf.Rule.Metrics) return &LimitingHandler{ + name: conf.Name, filter: filter, metrics: mts, - stop: conf.Stop, query: parseThreshold(conf.Rule.Query), traffic: parseThreshold(conf.Rule.Traffic), priority: conf.Priority, + stop: conf.Stop, }, nil } diff --git a/drivers/strategy/limiting-stragety/http-handler.go b/drivers/strategy/limiting-strategy/http-handler.go similarity index 97% rename from drivers/strategy/limiting-stragety/http-handler.go rename to drivers/strategy/limiting-strategy/http-handler.go index caf02dde..3a9188d9 100644 --- a/drivers/strategy/limiting-stragety/http-handler.go +++ b/drivers/strategy/limiting-strategy/http-handler.go @@ -1,8 +1,8 @@ -package limiting_stragety +package limiting_strategy import ( "errors" - "github.com/eolinker/apinto/drivers/strategy/limiting-stragety/scalar" + "github.com/eolinker/apinto/drivers/strategy/limiting-strategy/scalar" "github.com/eolinker/eosc/eocontext" http_service "github.com/eolinker/eosc/eocontext/http-context" "github.com/eolinker/eosc/log" diff --git a/drivers/strategy/limiting-stragety/limiting.go b/drivers/strategy/limiting-strategy/limiting.go similarity index 97% rename from drivers/strategy/limiting-stragety/limiting.go rename to drivers/strategy/limiting-strategy/limiting.go index b8db9c65..dfb54910 100644 --- a/drivers/strategy/limiting-stragety/limiting.go +++ b/drivers/strategy/limiting-strategy/limiting.go @@ -1,4 +1,4 @@ -package limiting_stragety +package limiting_strategy import ( "fmt" diff --git a/drivers/strategy/limiting-stragety/scalar/scalar.go b/drivers/strategy/limiting-strategy/scalar/scalar.go similarity index 100% rename from drivers/strategy/limiting-stragety/scalar/scalar.go rename to drivers/strategy/limiting-strategy/scalar/scalar.go diff --git a/drivers/strategy/limiting-stragety/scalar/vectors.go b/drivers/strategy/limiting-strategy/scalar/vectors.go similarity index 100% rename from drivers/strategy/limiting-stragety/scalar/vectors.go rename to drivers/strategy/limiting-strategy/scalar/vectors.go diff --git a/go.mod b/go.mod index 5d8f1e66..1afb554d 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/Shopify/sarama v1.32.0 github.com/eolinker/eosc v0.7.1 github.com/go-basic/uuid v1.0.0 + github.com/go-redis/redis/v8 v8.11.5 github.com/hashicorp/consul/api v1.9.1 github.com/nsqio/go-nsq v1.1.0 github.com/ohler55/ojg v1.12.9 @@ -20,11 +21,12 @@ require ( github.com/andybalholm/brotli v1.0.2 // indirect github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.1.1 // indirect + github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/eapache/go-resiliency v1.2.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect diff --git a/resources/cache-no.go b/resources/cache-no.go new file mode 100644 index 00000000..1336a3f8 --- /dev/null +++ b/resources/cache-no.go @@ -0,0 +1,70 @@ +package resources + +import ( + "context" + "github.com/coocood/freecache" + "time" +) + +var ( + _ ICache = (*NoCache)(nil) +) + +type NoCache struct { + client *freecache.Cache +} + +func (n *NoCache) Close() error { + n.client.Clear() + return nil +} + +func (n *NoCache) Set(ctx context.Context, key string, value []byte, expiration time.Duration) error { + + return n.client.Set([]byte(key), value, int(expiration.Seconds())) +} + +func (n *NoCache) SetNX(ctx context.Context, key string, value []byte, expiration time.Duration) (bool, error) { + + old, err := n.client.GetOrSet([]byte(key), value, int(expiration.Seconds())) + if err != nil { + return false, err + } + return old == nil, nil +} + +func (n *NoCache) DecrBy(ctx context.Context, key string, decrement int64) (int64, error) { + return 0, ErrorNoCache +} + +func (n *NoCache) IncrBy(ctx context.Context, key string, decrement int64) (int64, error) { + return 0, ErrorNoCache + +} + +func (n *NoCache) Get(ctx context.Context, key string) ([]byte, error) { + return n.client.Get([]byte(key)) + +} + +func (n *NoCache) GetDel(ctx context.Context, key string) ([]byte, error) { + bytes, err := n.client.Get([]byte(key)) + if err != nil { + return nil, err + } + n.client.Del([]byte(key)) + return bytes, nil +} + +func (n *NoCache) Del(ctx context.Context, keys ...string) (int64, error) { + + for _, key := range keys { + n.client.Del([]byte(key)) + } + + return 1, nil +} + +func NewCacher() *NoCache { + return &NoCache{client: freecache.NewCache(0)} +} diff --git a/resources/cache-no_test.go b/resources/cache-no_test.go new file mode 100644 index 00000000..6bd39e7b --- /dev/null +++ b/resources/cache-no_test.go @@ -0,0 +1,29 @@ +package resources + +import ( + "fmt" + "github.com/coocood/freecache" +) + +func ExampleFreeCache() { + key := []byte("test") + value := []byte("value") + + client := freecache.NewCache(0) + set, err := client.GetOrSet(key, value, 100) + if err != nil { + fmt.Println(err) + return + } + fmt.Printf("value1=%s\n", string(set)) + set, err = client.GetOrSet(key, value, 100) + if err != nil { + fmt.Println(err) + return + } + fmt.Printf("value=%s\n", string(set)) + // output: + //value1= + //value=value + +} diff --git a/resources/cache.go b/resources/cache.go new file mode 100644 index 00000000..2cdb01c3 --- /dev/null +++ b/resources/cache.go @@ -0,0 +1,52 @@ +package resources + +import ( + "context" + "errors" + "time" +) + +var ( + ErrorNoCache = errors.New("no cache") + _ ICache = (*_Proxy)(nil) +) +var ( + singCacheProxy *_Proxy +) + +func init() { + singCacheProxy = newProxy(new(NoCache)) +} +func ReplaceCacher(caches ...ICache) { + if len(caches) < 1 || caches[0] == nil { + if singCacheProxy.ICache != nil { + singCacheProxy.ICache.Close() + } + singCacheProxy.ICache = NewCacher() + return + } + singCacheProxy.ICache = caches[0] +} + +func Cacher() ICache { + return singCacheProxy +} + +type ICache interface { + Set(ctx context.Context, key string, value []byte, expiration time.Duration) error + SetNX(ctx context.Context, key string, value []byte, expiration time.Duration) (bool, error) + DecrBy(ctx context.Context, key string, decrement int64) (int64, error) + IncrBy(ctx context.Context, key string, decrement int64) (int64, error) + Get(ctx context.Context, key string) ([]byte, error) + GetDel(ctx context.Context, key string) ([]byte, error) + Del(ctx context.Context, keys ...string) (int64, error) + Close() error +} + +type _Proxy struct { + ICache +} + +func newProxy(target ICache) *_Proxy { + return &_Proxy{ICache: target} +} diff --git a/resources/cache_test.go b/resources/cache_test.go new file mode 100644 index 00000000..af19db67 --- /dev/null +++ b/resources/cache_test.go @@ -0,0 +1,23 @@ +package resources + +import "testing" + +func TestReplace(t *testing.T) { + type args struct { + caches []ICache + } + tests := []struct { + name string + args args + }{ + { + name: "nil", + args: args{nil}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ReplaceCacher(tt.args.caches...) + }) + } +}