diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 69f904f6..f68de119 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -3,16 +3,17 @@ stages: # - check - build - deploy - - publish +# - publish variables: - PATH: /usr/local/sonar-scanner/sonar-scanner-4.4.0.2170-linux/bin:/usr/local/bin:/usr/local/sbin:/usr/sbin:/usr/bin:/data/golang/go/bin/:/root/go/bin - GOROOT: /data/golang/go + PATH: /opt/go/go/bin/:/opt/node/node/bin/:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin + GOROOT: /opt/go/go GOPROXY: https://goproxy.cn - APP: apinto - VERSION: $CI_COMMIT_SHORT_SHA SAVE_DIR: /opt/apinto + VERSION: $CI_COMMIT_SHORT_SHA + APP: apinto APP_PRE: ${APP}-${VERSION} + BUILD_DIR: apserver-build default: before_script: @@ -30,25 +31,14 @@ merge-informer: # 飞书回调 curl -X POST -H "Content-Type: application/json" \ -d "{\"msg_type\":\"text\",\"content\":{\"text\":\"项目:${CI_PROJECT_NAME}\\n提交人:${GITLAB_USER_NAME}\\n提交信息:${CI_MERGE_REQUEST_TITLE}\\n合并分支信息:${CI_MERGE_REQUEST_SOURCE_BRANCH_NAME} -> ${CI_MERGE_REQUEST_TARGET_BRANCH_NAME}\\n差异性地址:${DIFF_URL}\\n请及时review代码\"}}" \ https://open.feishu.cn/open-apis/bot/v2/hook/1c334752-2874-41a1-8f1b-3060f2d46b6c -# checker: -# stage: check -# rules: -# - if: $CI_COMMIT_BRANCH=="develop" -# script: -# - set +e -# - go mod tidy -# - go test -covermode=atomic -coverpkg=./... -coverprofile=coverage.data -timeout=1m ./... -# - golangci-lint run --timeout=3m --out-format checkstyle --issues-exit-code 0 ./... > report.xml -# - sonar-scanner + builder: stage: build rules: - - if: $CI_COMMIT_BRANCH=="develop" - # when: on_success - - if: $CI_COMMIT_BRANCH=="test" - # when: on_success + - if: $CI_COMMIT_BRANCH=="main" script: - - sed -i '/replace github.com\/eolinker\/eosc => */d' go.mod +# - sed -i '/replace github.com\/eolinker\/eosc => */d' go.mod + - cd ../eosc && git pull && cd ../apinto - go mod tidy - cd build/cmd && ./package.sh $VERSION cache: @@ -57,20 +47,21 @@ builder: deployer: stage: deploy rules: - - if: $CI_COMMIT_BRANCH=="develop" + - if: $CI_COMMIT_BRANCH=="main" variables: - DEPLOY_SERVER: root@172.18.189.43 DEPLOY_DESC: "DEV 环境" - - if: $CI_COMMIT_BRANCH=="test" - variables: - DEPLOY_SERVER: root@172.18.65.60 - DEPLOY_DESC: "TEST 环境" dependencies: - builder script: - - set -e - - scp out/${APP_PRE}.linux.x64.tar.gz ${DEPLOY_SERVER}:${SAVE_DIR} - - ssh ${DEPLOY_SERVER} "set -e; cd ${SAVE_DIR}; mkdir -p ${APP_PRE};tar -zxvf ${APP_PRE}.linux.x64.tar.gz -C ${APP_PRE};cd ${APP_PRE}/${APP}; ./install.sh upgrade;cd ${SAVE_DIR}; ./clean.sh ${APP_PRE}" + - cp out/${APP_PRE}.linux.x64.tar.gz ${SAVE_DIR} + - cd ${SAVE_DIR};cd ${SAVE_DIR}; mkdir -p ${APP_PRE};tar -zxvf ${APP_PRE}.linux.x64.tar.gz -C ${APP_PRE};cd ${APP_PRE}/${APP} + - ln -snF ${SAVE_DIR}/etc/tmp ./ && cp ${SAVE_DIR}/etc/apinto.yml ./ && cp ${SAVE_DIR}/etc/config.yml ./ + - pwd + - ./apinto stop + - sleep 10s + - ./apinto start + - ps -ef | grep apinto + - cd ${SAVE_DIR}; ./clean.sh ${APP_PRE} - | curl -X POST -H "Content-Type: application/json" \ -d "{\"msg_type\":\"text\",\"content\":{\"text\":\"项目:apinto\\n环境:${DEPLOY_DESC}\\n更新部署完成.\"}}" \ @@ -78,14 +69,4 @@ deployer: when: on_success cache: paths: - - out/ -publisher: - stage: publish - only: - - tags - script: - - sed -i '/replace github.com\/eolinker\/eosc => */d' go.mod - - go mod tidy - - GOVERSION=$(go version) EoscVersion=$(sed -n 's/.*eosc v/v/p' go.mod) goreleaser release --skip-validate --rm-dist --skip-publish - - mkdir -p /data/pkg/apinto/${CI_COMMIT_TAG} - - cp -if dist/*.tar.gz /data/pkg/apinto/${CI_COMMIT_TAG} \ No newline at end of file + - out/ \ No newline at end of file diff --git a/discovery/node.go b/discovery/node.go index 965c4a17..cdcd12ac 100644 --- a/discovery/node.go +++ b/discovery/node.go @@ -54,8 +54,8 @@ func (n *_BaseNode) Last() time.Time { } -func newBaseNode(ip string, port int, statusChecker _INodeStatusCheck) *_BaseNode { - return &_BaseNode{ip: ip, port: port, status: Running, statusChecker: statusChecker} +func newBaseNode(id string, ip string, port int, statusChecker _INodeStatusCheck) *_BaseNode { + return &_BaseNode{id: id, ip: ip, port: port, status: Running, statusChecker: statusChecker} } func (n *_BaseNode) ID() string { diff --git a/discovery/nodes.go b/discovery/nodes.go index e2e4f9bd..3eaddb87 100644 --- a/discovery/nodes.go +++ b/discovery/nodes.go @@ -29,7 +29,7 @@ func (ac *appContainer) Get(ip string, port int) INode { return node } - ac.nodes.Set(id, newBaseNode(ip, port, ac)) + ac.nodes.Set(id, newBaseNode(id, ip, port, ac)) node, _ = ac.nodes.Get(id) return node } diff --git a/drivers/discovery/consul/clients.go b/drivers/discovery/consul/clients.go index fe9254cf..add92f15 100644 --- a/drivers/discovery/consul/clients.go +++ b/drivers/discovery/consul/clients.go @@ -18,11 +18,11 @@ func newClients(addrs []string, param map[string]string) *consulClients { clients := make([]*api.Client, 0, len(addrs)) defaultConfig := api.DefaultConfig() - if _, has := param["token"]; has { - defaultConfig.Token = param["token"] + if v, has := param["token"]; has { + defaultConfig.Token = v } - if _, has := param["namespace"]; has { - defaultConfig.Namespace = param["namespace"] + if v, has := param["namespace"]; has { + defaultConfig.Namespace = v } for _, addr := range addrs { @@ -53,8 +53,9 @@ func (c *consulClients) getNodes(service string) ([]discovery.NodeInfo, error) { nodeList := make([]discovery.NodeInfo, 0, 2) nodeIDSet := make(map[string]struct{}) for _, client := range c.clients { - clientNodes := getNodesFromClient(client, service) - if len(clientNodes) == 0 { + clientNodes, err := getNodesFromClient(client, service) + if err != nil { + log.Warnf("consul client down for service %s", service) continue } for _, n := range clientNodes { @@ -64,19 +65,16 @@ func (c *consulClients) getNodes(service string) ([]discovery.NodeInfo, error) { nodeIDSet[n.id] = struct{}{} } } - if len(nodeList) == 0 { - return nil, discovery.ErrDiscoveryDown - } return nodeList, nil } // getNodesFromClient 从连接的客户端返回健康的节点信息 -func getNodesFromClient(client *api.Client, service string) []*consulNodeInfo { +func getNodesFromClient(client *api.Client, service string) ([]*consulNodeInfo, error) { queryOptions := &api.QueryOptions{} serviceEntryArr, _, err := client.Health().Service(service, "", true, queryOptions) if err != nil { - return nil + return nil, err } nodes := make([]*consulNodeInfo, 0, len(serviceEntryArr)) @@ -91,5 +89,5 @@ func getNodesFromClient(client *api.Client, service string) []*consulNodeInfo { }) } - return nodes + return nodes, nil } diff --git a/drivers/discovery/consul/consul.go b/drivers/discovery/consul/consul.go index 5f851c08..7760fe38 100644 --- a/drivers/discovery/consul/consul.go +++ b/drivers/discovery/consul/consul.go @@ -47,7 +47,6 @@ func (c *consul) Start() error { nodeSet, err := c.clients.getNodes(serviceName) if err != nil { log.Warnf("consul %s:%s for service %s", c.Name(), discovery.ErrDiscoveryDown, serviceName) - continue } //更新目标服务的节点列表 c.services.Set(serviceName, nodeSet) @@ -83,8 +82,6 @@ func (c *consul) Stop() error { // GetApp 获取服务发现中目标服务的app func (c *consul) GetApp(serviceName string) (discovery.IApp, error) { - var err error - var has bool c.locker.RLock() app, has := c.services.GetApp(serviceName) c.locker.RUnlock() diff --git a/drivers/discovery/eureka/client.go b/drivers/discovery/eureka/client.go index 9e517345..3a33c270 100644 --- a/drivers/discovery/eureka/client.go +++ b/drivers/discovery/eureka/client.go @@ -22,16 +22,14 @@ func newClient(address []string, params url.Values) *client { // GetNodeList 从eureka接入地址中获取对应服务的节点列表 func (c *client) GetNodeList(serviceName string) ([]discovery.NodeInfo, error) { - isOk := false nodes := make([]discovery.NodeInfo, 0, 5) sets := make(map[string]struct{}) for _, addr := range c.address { app, err := c.GetApplication(addr, serviceName) if err != nil { - log.Info("eureka get node instance list error:", err) + log.Warnf("eureka get node instance list fail. err: %w", err) continue } - isOk = true for _, ins := range app.Instances { if ins.Status != eurekaStatusUp { continue @@ -54,12 +52,9 @@ func (c *client) GetNodeList(serviceName string) ([]discovery.NodeInfo, error) { } nodes = append(nodes, node) } - } } - if !isOk { - return nil, discovery.ErrDiscoveryDown - } + return nodes, nil } diff --git a/drivers/discovery/eureka/eureka.go b/drivers/discovery/eureka/eureka.go index f3137e01..6637ba63 100644 --- a/drivers/discovery/eureka/eureka.go +++ b/drivers/discovery/eureka/eureka.go @@ -73,12 +73,9 @@ func (e *eureka) Start() error { res, err := e.client.GetNodeList(serviceName) if err != nil { log.Warnf("eureka %s:%w for service %s", e.Name(), discovery.ErrDiscoveryDown, serviceName) - continue } //更新目标服务的节点列表 - e.locker.Lock() e.services.Set(serviceName, res) - e.locker.Unlock() } } } diff --git a/drivers/discovery/nacos/client.go b/drivers/discovery/nacos/client.go index ecf8dd73..9caee217 100644 --- a/drivers/discovery/nacos/client.go +++ b/drivers/discovery/nacos/client.go @@ -1,96 +1,128 @@ package nacos import ( - "encoding/json" "fmt" - "io" - "net/http" - "net/url" + "github.com/nacos-group/nacos-sdk-go/v2/clients" + "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client" + "github.com/nacos-group/nacos-sdk-go/v2/common/constant" + "github.com/nacos-group/nacos-sdk-go/v2/vo" "strconv" "strings" - "github.com/eolinker/eosc/log" - "github.com/eolinker/apinto/discovery" ) type client struct { - address []string - params url.Values + namingClient naming_client.INamingClient + group string + clusters []string } -func newClient(address []string, params url.Values) *client { - adds := make([]string, 0, len(address)) - for _, a := range address { - if !strings.HasPrefix(a, "http://") && !strings.HasPrefix(a, "https://") { - a = fmt.Sprintf("%s://%s", defaultScheme, a) - } - adds = append(adds, a) +func newClient(name string, address []string, params map[string]string) (*client, error) { + clientConfig := &constant.ClientConfig{ + LogDir: fmt.Sprintf("/var/log/apinto/nacos/%s", name), + CacheDir: fmt.Sprintf("/var/cache/apinto/nacos/%s", name), + LogLevel: "error", + } + //获取namespaceId, username,password + if v, has := params["namespaceId"]; has { + clientConfig.NamespaceId = v + } + if v, has := params["username"]; has { + clientConfig.Username = v + } + if v, has := params["password"]; has { + clientConfig.Password = v } - return &client{adds, params} -} - -// GetNodeList 从nacos接入地址中获取对应服务的节点列表 -func (c *client) GetNodeList(serviceName string) ([]discovery.NodeInfo, error) { - nodes := make([]discovery.NodeInfo, 0) - set := make(map[string]struct{}) - for _, addr := range c.address { - ins, err := c.GetInstanceList(addr, serviceName) - if err != nil { - log.Info("nacos get node instance list error:", err) - continue + serverConfigs := make([]constant.ServerConfig, 0, len(address)) + var ( + scheme, ipAddr string + port uint64 + err error + ) + for _, addr := range address { + schemeIdx := strings.Index(addr, "://") + if schemeIdx < 0 { + scheme = defaultScheme + } else { + scheme = addr[:schemeIdx] + addr = addr[schemeIdx+3:] } - - for _, host := range ins.Hosts { - label := map[string]string{ - "valid": strconv.FormatBool(host.Valid), - "marked": strconv.FormatBool(host.Marked), - "weight": strconv.FormatFloat(host.Weight, 'f', -1, 64), - } - if _, exist := set[host.InstanceID]; !exist { - set[host.InstanceID] = struct{}{} - nodes = append(nodes, discovery.NodeInfo{ - Ip: host.IP, - Port: host.Port, - Labels: label, - }) + portIdx := strings.Index(addr, ":") + if portIdx < 0 { + ipAddr = addr + port = 0 + } else { + ipAddr = addr[:portIdx] + portStr := addr[portIdx+1:] + port, err = strconv.ParseUint(portStr, 10, 64) + if err != nil { + return nil, err } } + serverConfigs = append(serverConfigs, constant.ServerConfig{ + Scheme: scheme, + IpAddr: ipAddr, + Port: port, + }) } - if len(nodes) == 0 { - return nil, discovery.ErrDiscoveryDown - } - return nodes, nil -} -// GetInstanceList 获取目标地址指定服务名的实例列表 -func (c *client) GetInstanceList(addr string, serviceName string) (*Instance, error) { - addr = addr + instancePath - paramsURL := c.params - paramsURL.Set("serviceName", serviceName) - req, err := http.NewRequest("GET", addr, nil) + namingClient, err := clients.NewNamingClient(vo.NacosClientParam{ + ClientConfig: clientConfig, + ServerConfigs: serverConfigs, + }) if err != nil { return nil, err } - req.URL.RawQuery = paramsURL.Encode() - response, err := http.DefaultClient.Do(req) - if err != nil { - return nil, err + + c := &client{namingClient: namingClient} + //获取group, clusters + if v, has := params["group"]; has { + c.group = v } - // 解析响应数据 - rawResponseData, err := io.ReadAll(response.Body) - if err != nil { - return nil, err + if v, has := params["clusters"]; has { + clusters := strings.Split(strings.TrimSpace(v), ",") + c.clusters = clusters } - err = response.Body.Close() + + return c, nil +} + +// GetNodeList 从nacosClient获取对应服务的节点列表 +func (c *client) GetNodeList(serviceName string) ([]discovery.NodeInfo, error) { + nodes := make([]discovery.NodeInfo, 0) + set := make(map[string]struct{}) + + instances, err := c.namingClient.SelectInstances(vo.SelectInstancesParam{ + ServiceName: serviceName, + Clusters: c.clusters, + GroupName: c.group, + HealthyOnly: true, + }) if err != nil { return nil, err } - var instance = &Instance{} - err = json.Unmarshal(rawResponseData, instance) - if err != nil { - return nil, err + + for _, ins := range instances { + label := map[string]string{ + "weight": strconv.FormatFloat(ins.Weight, 'f', -1, 64), + } + //ins的instanceID可能为空 + instanceID := fmt.Sprintf("%s:%d", ins.Ip, ins.Port) + if _, exist := set[instanceID]; !exist { + set[instanceID] = struct{}{} + + for k, v := range ins.Metadata { + label[k] = v + } + nodes = append(nodes, discovery.NodeInfo{ + Ip: ins.Ip, + Port: int(ins.Port), + Labels: label, + }) + } } - return instance, nil + + return nodes, nil } diff --git a/drivers/discovery/nacos/config.go b/drivers/discovery/nacos/config.go index 50f5ed35..1c342156 100644 --- a/drivers/discovery/nacos/config.go +++ b/drivers/discovery/nacos/config.go @@ -1,27 +1,14 @@ package nacos -import ( - "net/url" -) - const defaultScheme = "http" -//Config nacos驱动配置 +// Config nacos驱动配置 type Config struct { Config AccessConfig `json:"config" label:"配置信息"` } -//AccessConfig 接入地址配置 +// AccessConfig 接入地址配置 type AccessConfig struct { Address []string `json:"address" label:"nacos地址"` Params map[string]string `json:"params" label:"参数"` } - -func (c *Config) getParams() url.Values { - p := url.Values{} - p.Set("healthyOnly", "true") - for k, v := range c.Config.Params { - p.Set(k, v) - } - return p -} diff --git a/drivers/discovery/nacos/driver.go b/drivers/discovery/nacos/driver.go index 678ee644..823e20e5 100644 --- a/drivers/discovery/nacos/driver.go +++ b/drivers/discovery/nacos/driver.go @@ -1,6 +1,7 @@ package nacos import ( + "fmt" "github.com/eolinker/apinto/drivers" "sync" @@ -14,13 +15,15 @@ const ( // Create 创建nacos驱动实例 func Create(id, name string, cfg *Config, workers map[eosc.RequireId]eosc.IWorker) (eosc.IWorker, error) { - + c, err := newClient(name, cfg.Config.Address, cfg.Config.Params) + if err != nil { + return nil, fmt.Errorf("create nacos client fail. err: %w", err) + } return &nacos{ WorkerBase: drivers.Worker(id, name), - client: newClient(cfg.Config.Address, cfg.getParams()), - - services: discovery.NewAppContainer(), - locker: sync.RWMutex{}, + client: c, + services: discovery.NewAppContainer(), + locker: sync.RWMutex{}, }, nil } diff --git a/drivers/discovery/nacos/nacos.go b/drivers/discovery/nacos/nacos.go index dd3a8ad1..9d0f8953 100644 --- a/drivers/discovery/nacos/nacos.go +++ b/drivers/discovery/nacos/nacos.go @@ -15,10 +15,6 @@ import ( "github.com/eolinker/eosc" ) -const ( - instancePath = "/nacos/v1/ns/instance/list" -) - var _ discovery.IDiscovery = (*nacos)(nil) type nacos struct { @@ -68,13 +64,9 @@ func (n *nacos) Start() error { res, err := n.client.GetNodeList(serviceName) if err != nil { log.Warnf("nacos %s:%w for service %s", n.Name(), discovery.ErrDiscoveryDown, serviceName) - continue } //更新目标服务的节点列表 - n.locker.Lock() n.services.Set(serviceName, res) - n.locker.Unlock() - } } } @@ -90,7 +82,11 @@ func (n *nacos) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWorker) if !ok { return fmt.Errorf("need %s,now %s", config.TypeNameOf((*Config)(nil)), config.TypeNameOf(conf)) } - n.client = newClient(cfg.Config.Address, cfg.getParams()) + nClient, err := newClient(n.Name(), cfg.Config.Address, cfg.Config.Params) + if err != nil { + return fmt.Errorf("create nacos client fail. err: %w", err) + } + n.client = nClient return nil } @@ -110,6 +106,7 @@ func (n *nacos) GetApp(serviceName string) (discovery.IApp, error) { } n.locker.Lock() + defer n.locker.Unlock() app, ok = n.services.GetApp(serviceName) if ok { return app.Agent(), nil @@ -117,13 +114,10 @@ func (n *nacos) GetApp(serviceName string) (discovery.IApp, error) { ns, err := n.client.GetNodeList(serviceName) if err != nil { - log.Errorf("%s get %s node list error: %v", driverName, serviceName, err) - + log.Warnf("%s get %s node list error: %v", driverName, serviceName, err) } app = n.services.Set(serviceName, ns) - n.locker.Unlock() - return app.Agent(), nil } diff --git a/drivers/discovery/nacos/nacos_test.go b/drivers/discovery/nacos/nacos_test.go index bd2c953d..8b971fdd 100644 --- a/drivers/discovery/nacos/nacos_test.go +++ b/drivers/discovery/nacos/nacos_test.go @@ -19,10 +19,10 @@ func TestGetApp(t *testing.T) { }, }, } + c, _ := newClient("asd", cfg.Config.Address, cfg.Config.Params) n := &nacos{ - client: newClient(cfg.Config.Address, cfg.getParams()), - nodes: discovery.NewNodesData(), - services: discovery.NewServices(), + client: c, + services: discovery.NewAppContainer(), locker: sync.RWMutex{}, } app, err := n.GetApp(serviceName) @@ -32,9 +32,9 @@ func TestGetApp(t *testing.T) { for _, node := range app.Nodes() { t.Log(node.ID()) } - ns, bo := n.nodes.Get(serviceName) - if bo { - t.Log(len(ns)) + ns, err := n.GetApp(serviceName) + if err == nil { + t.Log(len(ns.Nodes())) } else { t.Error("nodes error") } diff --git a/drivers/output/fileoutput/output.go b/drivers/output/fileoutput/output.go index 50df8db3..4dfe040a 100644 --- a/drivers/output/fileoutput/output.go +++ b/drivers/output/fileoutput/output.go @@ -1,7 +1,10 @@ package fileoutput import ( + "fmt" scope_manager "github.com/eolinker/apinto/scope-manager" + "github.com/eolinker/eosc/router" + "net/http" "reflect" "github.com/eolinker/apinto/drivers" @@ -19,6 +22,14 @@ type FileOutput struct { isRunning bool } +func (a *FileOutput) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if a.writer == nil || a.writer.fileHandler == nil { + w.WriteHeader(http.StatusNotFound) + return + } + a.writer.fileHandler.ServeHTTP(w, r) +} + func (a *FileOutput) Output(entry eosc.IEntry) error { w := a.writer if w != nil { @@ -45,7 +56,7 @@ func (a *FileOutput) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWo w = new(FileWriter) } - err = w.reset(cfg) + err = w.reset(cfg, a.Name()) if err != nil { return err } @@ -58,7 +69,9 @@ func (a *FileOutput) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWo func (a *FileOutput) Stop() error { scope_manager.Del(a.Id()) + router.DeletePath(a.Id()) a.isRunning = false + w := a.writer if w != nil { err := w.stop() @@ -74,13 +87,18 @@ func (a *FileOutput) Start() error { if w == nil { w = new(FileWriter) } - - err := w.reset(a.config) + err := router.SetPath(a.Id(), fmt.Sprintf("/log/access/%s/", a.Name()), a) if err != nil { return err } + err = w.reset(a.config, a.Name()) + if err != nil { + return err + } + a.writer = w scope_manager.Set(a.Id(), a, a.config.Scopes...) + return nil } diff --git a/drivers/output/fileoutput/write.go b/drivers/output/fileoutput/write.go index f1f4659a..3980cce0 100644 --- a/drivers/output/fileoutput/write.go +++ b/drivers/output/fileoutput/write.go @@ -1,15 +1,22 @@ package fileoutput import ( - file_transport "github.com/eolinker/apinto/output/file-transport" + "fmt" "github.com/eolinker/eosc" "github.com/eolinker/eosc/formatter" + "github.com/eolinker/eosc/log/filelog" + "github.com/eolinker/eosc/router" + "net/http" + + "time" ) type FileWriter struct { formatter eosc.IFormatter - transport *file_transport.FileWriterByPeriod + transport *filelog.FileWriterByPeriod //id string + + fileHandler http.Handler } func (a *FileWriter) output(entry eosc.IEntry) error { @@ -24,7 +31,7 @@ func (a *FileWriter) output(entry eosc.IEntry) error { return nil } -func (a *FileWriter) reset(cfg *Config) (err error) { +func (a *FileWriter) reset(cfg *Config, name string) (err error) { factory, has := formatter.GetFormatterFactory(cfg.Type) if !has { @@ -37,14 +44,15 @@ func (a *FileWriter) reset(cfg *Config) (err error) { } transport := a.transport - c := &file_transport.Config{ + c := filelog.Config{ Dir: cfg.Dir, File: cfg.File, - Expire: cfg.Expire, - Period: file_transport.ParsePeriod(cfg.Period), + Expire: time.Duration(cfg.Expire) * 24 * time.Hour, + Period: filelog.ParsePeriod(cfg.Period), } if transport == nil { - transport = file_transport.NewFileWriteByPeriod(c) + transport = filelog.NewFileWriteByPeriod(c) + a.fileHandler = transport.ServeHTTP(fmt.Sprintf("/%slog/access/%s", router.RouterPrefix, name)) } else { transport.Reset(c) } diff --git a/drivers/output/prometheus/check.go b/drivers/output/prometheus/check.go index 5ab3ee95..377c7002 100644 --- a/drivers/output/prometheus/check.go +++ b/drivers/output/prometheus/check.go @@ -2,7 +2,6 @@ package prometheus import ( "sort" - "strings" ) // checkScopesChange 检查scopes配置是否有改变 @@ -27,15 +26,6 @@ func checkScopesChange(oldScopes, newScopes []string) bool { return false } -func checkPathChange(oldPath, newPath string) bool { - oldPath = "/" + strings.Trim(oldPath, "/") - newPath = "/" + strings.Trim(newPath, "/") - if oldPath != newPath { - return true - } - return false -} - func checkMetricConfigChange(oldMC, newMC *MetricConfig) bool { if oldMC.Collector != newMC.Collector { return true diff --git a/drivers/output/prometheus/config.go b/drivers/output/prometheus/config.go index 46a48d6c..ac6bebbd 100644 --- a/drivers/output/prometheus/config.go +++ b/drivers/output/prometheus/config.go @@ -2,7 +2,6 @@ package prometheus type Config struct { Scopes []string `json:"scopes" label:"作用域"` - Path string `json:"path" yaml:"path" required:"true" label:"Metrics路径"` Metrics []*MetricConfig `json:"metrics" yaml:"metrics" required:"true" label:"指标列表"` } diff --git a/drivers/output/prometheus/driver.go b/drivers/output/prometheus/driver.go index 7129fbd2..d856e0cb 100644 --- a/drivers/output/prometheus/driver.go +++ b/drivers/output/prometheus/driver.go @@ -21,10 +21,6 @@ func Check(v *Config, workers map[eosc.RequireId]eosc.IWorker) error { func doCheck(promConf *Config) (map[string]*metricInfoCfg, error) { metricLabels := make(map[string]*metricInfoCfg, len(promConf.Metrics)) - if match := utils.CheckUrlPath(promConf.Path); !match { - return nil, fmt.Errorf(errorPathFormat, promConf.Path) - } - if len(promConf.Metrics) == 0 { return nil, errorNullMetrics } @@ -175,7 +171,8 @@ func Create(id, name string, cfg *Config, workers map[eosc.RequireId]eosc.IWorke p.registry, promhttp.HandlerFor(p.registry, promhttp.HandlerOpts{}), ) - err = router.SetPath(p.Id(), p.config.Path, p) + //metrics路径为 /apinto/metrics/prometheus/{worker_name} 前面的/apinto/在router.SetPath里做拼接 + err = router.SetPath(p.Id(), fmt.Sprintf("/metrics/prometheus/%s", name), p) if err != nil { return nil, fmt.Errorf("create output %s fail: %w", p.Id(), err) } diff --git a/drivers/output/prometheus/output.go b/drivers/output/prometheus/output.go index 24250d4e..e97c77cc 100644 --- a/drivers/output/prometheus/output.go +++ b/drivers/output/prometheus/output.go @@ -110,15 +110,6 @@ func (p *PromOutput) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWo ) } - //若path有变,更新router - if checkPathChange(p.config.Path, cfg.Path) { - //重新设置路由 - err = router.SetPath(p.Id(), cfg.Path, p) - if err != nil { - return fmt.Errorf("reset output %s fail: %w", p.Id(), err) - } - } - if isMetricsUpdate { p.registry = newRegistry p.handler = handler diff --git a/go.mod b/go.mod index 834b58e7..983b3b09 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/brianvoe/gofakeit/v6 v6.20.1 github.com/coocood/freecache v1.2.2 github.com/dubbogo/gost v1.13.1 - github.com/eolinker/eosc v0.13.0 + github.com/eolinker/eosc v0.14.0 github.com/fasthttp/websocket v1.5.0 github.com/fullstorydev/grpcurl v1.8.7 github.com/go-redis/redis/v8 v8.11.5 @@ -16,6 +16,7 @@ require ( github.com/hashicorp/consul/api v1.9.1 github.com/influxdata/influxdb-client-go/v2 v2.12.1 github.com/jhump/protoreflect v1.14.1 + github.com/nacos-group/nacos-sdk-go/v2 v2.1.2 github.com/nsqio/go-nsq v1.1.0 github.com/ohler55/ojg v1.12.9 github.com/pkg/sftp v1.13.4 @@ -32,8 +33,10 @@ require ( contrib.go.opencensus.io/exporter/prometheus v0.4.1 // indirect github.com/RoaringBitmap/roaring v0.7.1 // indirect github.com/Workiva/go-datastructures v1.0.52 // indirect + github.com/aliyun/alibaba-cloud-sdk-go v1.61.1704 // indirect github.com/apache/dubbo-getty v1.4.8 // indirect github.com/bits-and-blooms/bitset v1.2.0 // indirect + github.com/buger/jsonparser v1.1.1 // indirect github.com/creasty/defaults v1.5.2 // indirect github.com/dubbogo/triple v1.1.8 // indirect github.com/go-kit/log v0.1.0 // indirect @@ -46,7 +49,9 @@ require ( github.com/go-playground/validator/v10 v10.11.0 // indirect github.com/golang-jwt/jwt/v4 v4.4.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect + github.com/golang/mock v1.6.0 // indirect github.com/jinzhu/copier v0.3.5 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/k0kubun/pp v3.0.1+incompatible // indirect github.com/knadh/koanf v1.4.1 // indirect github.com/leodido/go-urn v1.2.1 // indirect @@ -67,6 +72,9 @@ require ( github.com/yusufpapurcu/wmi v1.2.2 // indirect go.opencensus.io v0.23.0 // indirect golang.org/x/net v0.8.0 // indirect + golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect + gopkg.in/ini.v1 v1.66.2 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect ) require ( diff --git a/output/file-transport/config.go b/output/file-transport/config.go deleted file mode 100644 index c494cfef..00000000 --- a/output/file-transport/config.go +++ /dev/null @@ -1,16 +0,0 @@ -package file_transport - -//Config filelog-Transporter所需配置 -type Config struct { - Dir string - File string - Expire int - Period LogPeriod -} - -func (c *Config) IsUpdate(cfg *Config) bool { - if cfg.File != c.File || cfg.Dir != c.Dir || cfg.Period != c.Period || cfg.Expire != c.Expire { - return true - } - return false -} diff --git a/output/file-transport/file.go b/output/file-transport/file.go deleted file mode 100644 index bf93e088..00000000 --- a/output/file-transport/file.go +++ /dev/null @@ -1,48 +0,0 @@ -package file_transport - -import ( - "fmt" - "os" - "path/filepath" - "time" -) - -type FileController struct { - expire time.Duration - dir string - file string - period LogPeriod -} - -func (w *FileController) timeTag(t time.Time) string { - - tag := t.Format(w.period.FormatLayout()) - - return filepath.Join(w.dir, fmt.Sprintf("%s-%s.log", w.file, tag)) -} -func (w *FileController) fileName() string { - return filepath.Join(w.dir, fmt.Sprintf("%s.log", w.file)) -} -func (w *FileController) history(history string) { - - path := w.fileName() - os.Rename(path, history) - -} - -func (w *FileController) dropHistory() { - - expireTime := time.Now().Add(-w.expire) - pathPatten := filepath.Join(w.dir, fmt.Sprintf("%s-*", w.file)) - files, err := filepath.Glob(pathPatten) - if err == nil { - for _, f := range files { - if info, e := os.Stat(f); e == nil { - - if expireTime.After(info.ModTime()) { - _ = os.Remove(f) - } - } - } - } -} diff --git a/output/file-transport/period.go b/output/file-transport/period.go deleted file mode 100644 index 61c872cb..00000000 --- a/output/file-transport/period.go +++ /dev/null @@ -1,71 +0,0 @@ -package file_transport - -import ( - "strings" -) - -//LogPeriod 日志周期 -type LogPeriod interface { - String() string - FormatLayout() string -} - -//LogPeriodType 日志周期类型 -type LogPeriodType int - -//ParsePeriod 解析周期 -func ParsePeriod(v string) LogPeriod { - switch strings.ToLower(v) { - //case "month": - // return PeriodMonth, nil - case "day": - return PeriodDay - case "hour": - return PeriodHour - default: - return PeriodDay - } - - //return nil, fmt.Errorf("not a valid period: %q", v) -} -func (period LogPeriodType) String() string { - switch period { - //case PeriodMonth: - // return "month" - case PeriodDay: - return "day" - case PeriodHour: - return "hour" - default: - return "unknown" - } -} - -const ( - //PeriodMonth 月 - PeriodMonth LogPeriodType = iota - //PeriodDay 日 - PeriodDay - //PeriodHour 时 - PeriodHour -) - -//FormatLayout 格式化 -func (period LogPeriodType) FormatLayout() string { - switch period { - case PeriodHour: - { - return "2006-01-02-15" - } - case PeriodDay: - { - return "2006-01-02" - } - case PeriodMonth: - { - return "2006-01" - } - default: - return "2006-01-02-15" - } -} diff --git a/output/file-transport/transporter.go b/output/file-transport/transporter.go deleted file mode 100644 index b3570c84..00000000 --- a/output/file-transport/transporter.go +++ /dev/null @@ -1,30 +0,0 @@ -package file_transport - -import "github.com/eolinker/eosc/formatter" - -//Transporter filelog-Transporter结构 -type Transporter struct { - writer *FileWriterByPeriod -} - -func (t *Transporter) Write(bytes []byte) error { - _, err := t.writer.Write(bytes) - return err -} - -//Close 关闭 -func (t *Transporter) Close() error { - t.writer.Close() - return nil -} - -//NewtTransporter 创建file-Transporter -func NewtTransporter(cfg *Config) formatter.ITransport { - - fileWriterByPeriod := NewFileWriteByPeriod(cfg) - - return &Transporter{ - writer: fileWriterByPeriod, - } - -} diff --git a/output/file-transport/writer.go b/output/file-transport/writer.go deleted file mode 100644 index 21415e88..00000000 --- a/output/file-transport/writer.go +++ /dev/null @@ -1,241 +0,0 @@ -package file_transport - -import ( - "bufio" - "bytes" - "context" - "github.com/eolinker/eosc/log" - "os" - "sync" - "time" -) - -// MaxBuffer buffer最大值 -const MaxBuffer = 1024 * 500 - -var ( - bufferPool = &sync.Pool{ - New: func() interface{} { - return new(bytes.Buffer) - }, - } -) - -// FileWriterByPeriod 文件周期写入 -type FileWriterByPeriod struct { - wC chan *bytes.Buffer - - enable bool - cancelFunc context.CancelFunc - locker sync.RWMutex - wg sync.WaitGroup - resetChan chan FileController -} - -// NewFileWriteByPeriod 获取新的FileWriterByPeriod -func NewFileWriteByPeriod(cfg *Config) *FileWriterByPeriod { - w := &FileWriterByPeriod{ - locker: sync.RWMutex{}, - wg: sync.WaitGroup{}, - enable: false, - resetChan: make(chan FileController), - } - - w.Open(&FileController{ - dir: cfg.Dir, - file: cfg.File, - period: cfg.Period, - expire: time.Duration(cfg.Expire) * 24 * time.Hour, - }) - return w -} - -func (w *FileWriterByPeriod) Reset(cfg *Config) { - w.resetChan <- FileController{ - dir: cfg.Dir, - file: cfg.File, - period: cfg.Period, - expire: time.Duration(cfg.Expire) * 24 * time.Hour, - } -} - -// Open 打开 -func (w *FileWriterByPeriod) Open(config *FileController) { - w.locker.Lock() - defer w.locker.Unlock() - - if w.enable { - return - } - - ctx, cancel := context.WithCancel(context.Background()) - w.cancelFunc = cancel - w.wC = make(chan *bytes.Buffer, 100) - - w.enable = true - go func() { - w.wg.Add(1) - w.do(ctx, config) - w.wg.Done() - }() -} - -// Close 关闭 -func (w *FileWriterByPeriod) Close() { - - isClose := false - w.locker.Lock() - if !w.enable { - w.locker.Unlock() - return - } - - if w.cancelFunc != nil { - isClose = true - w.cancelFunc() - w.cancelFunc = nil - } - w.enable = false - w.locker.Unlock() - if isClose { - w.wg.Wait() - } -} -func (w *FileWriterByPeriod) isEnable() bool { - w.locker.Lock() - defer w.locker.Unlock() - return w.enable -} -func (w *FileWriterByPeriod) Write(p []byte) (n int, err error) { - - l := len(p) - - if l == 0 { - return - } - if !w.isEnable() { - return l, nil - } - buffer := bufferPool.Get().(*bytes.Buffer) - buffer.Reset() - buffer.Write(p) - if p[l-1] != '\n' { - buffer.WriteByte('\n') - } - w.wC <- buffer - return l, nil -} - -func (w *FileWriterByPeriod) do(ctx context.Context, config *FileController) { - fileController := *config - fileController.initFile() - f, lastTag, e := fileController.openFile() - if e != nil { - log.Errorf("open log file:%s\n", e.Error()) - return - } - - buf := bufio.NewWriter(f) - t := time.NewTicker(time.Second * 5) - defer t.Stop() - tFlush := time.NewTimer(time.Second) - - resetFunc := func(controller FileController) { - if lastTag != fileController.timeTag(time.Now()) { - if buf.Buffered() > 0 { - buf.Flush() - tFlush.Reset(time.Second) - } - f.Close() - fileController.history(lastTag) - fnew, tag, err := fileController.openFile() - if err != nil { - return - } - lastTag = tag - f = fnew - buf.Reset(f) - - go fileController.dropHistory() - } - } - - for { - select { - case <-ctx.Done(): - { - for len(w.wC) > 0 { - p := <-w.wC - buf.Write(p.Bytes()) - bufferPool.Put(p) - } - buf.Flush() - f.Close() - t.Stop() - //w.wg.Done() - return - } - - case <-t.C: - { - - resetFunc(fileController) - - } - case <-tFlush.C: - { - if buf.Buffered() > 0 { - buf.Flush() - } - tFlush.Reset(time.Second) - } - case p := <-w.wC: - { - buf.Write(p.Bytes()) - bufferPool.Put(p) - if buf.Buffered() > MaxBuffer { - buf.Flush() - } - tFlush.Reset(time.Second) - } - case controller, ok := <-w.resetChan: - { - if ok { - resetFunc(controller) - fileController = controller - } - } - } - } -} - -func (w *FileController) initFile() { - err := os.MkdirAll(w.dir, 0666) - if err != nil { - log.Error(err) - } - path := w.fileName() - nowHistoryName := w.timeTag(time.Now()) - if info, e := os.Stat(path); e == nil { - - timeTag := w.timeTag(info.ModTime()) - if timeTag != nowHistoryName { - w.history(timeTag) - } - } - - w.dropHistory() - -} - -func (w *FileController) openFile() (*os.File, string, error) { - path := w.fileName() - nowTag := w.timeTag(time.Now()) - f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) - - if err != nil { - return nil, "", err - } - return f, nowTag, err - -}