diff --git a/app/apinto/plugin.go b/app/apinto/plugin.go index 5e2b9a95..d7c478cf 100644 --- a/app/apinto/plugin.go +++ b/app/apinto/plugin.go @@ -2,6 +2,7 @@ package main import ( "github.com/eolinker/apinto/drivers/plugins/app" + body_record_truncation "github.com/eolinker/apinto/drivers/plugins/body-record-truncation" "github.com/eolinker/apinto/drivers/plugins/cors" data_transform "github.com/eolinker/apinto/drivers/plugins/data-transform" dubbo2_proxy_rewrite "github.com/eolinker/apinto/drivers/plugins/dubbo2-proxy-rewrite" @@ -10,7 +11,9 @@ import ( "github.com/eolinker/apinto/drivers/plugins/gzip" params_check "github.com/eolinker/apinto/drivers/plugins/params-check" "github.com/eolinker/apinto/drivers/plugins/prometheus" + request_file_parse "github.com/eolinker/apinto/drivers/plugins/request-file-parse" request_interception "github.com/eolinker/apinto/drivers/plugins/request-interception" + response_file_parse "github.com/eolinker/apinto/drivers/plugins/response-file-parse" response_filter "github.com/eolinker/apinto/drivers/plugins/response-filter" response_rewrite_v2 "github.com/eolinker/apinto/drivers/plugins/response-rewrite_v2" @@ -71,12 +74,14 @@ func pluginRegister(extenderRegister eosc.IExtenderDriverRegister) { params_check.Register(extenderRegister) data_transform.Register(extenderRegister) request_interception.Register(extenderRegister) + request_file_parse.Register(extenderRegister) // 响应处理插件 response_rewrite.Register(extenderRegister) response_rewrite_v2.Register(extenderRegister) response_filter.Register(extenderRegister) gzip.Register(extenderRegister) + response_file_parse.Register(extenderRegister) // 安全相关插件 ip_restriction.Register(extenderRegister) @@ -90,6 +95,7 @@ func pluginRegister(extenderRegister eosc.IExtenderDriverRegister) { prometheus.Register(extenderRegister) monitor.Register(extenderRegister) proxy_mirror.Register(extenderRegister) + body_record_truncation.Register(extenderRegister) // 计数插件 counter.Register(extenderRegister) diff --git a/drivers/output/fileoutput/config.go b/drivers/output/fileoutput/config.go index caf79fa7..e3b83949 100644 --- a/drivers/output/fileoutput/config.go +++ b/drivers/output/fileoutput/config.go @@ -5,11 +5,17 @@ import ( ) type Config struct { - Scopes []string `json:"scopes" label:"作用域"` - File string `json:"file" yaml:"file" label:"文件名称"` - Dir string `json:"dir" yaml:"dir" label:"文件存放目录"` - Period string `json:"period" yaml:"period" enum:"hour,day" label:"日志分割周期"` - Expire int `json:"expire" yaml:"expire" label:"日志保存时间" description:"单位:天" default:"7" minimum:"1"` - Type string `json:"type" yaml:"type" enum:"json,line" label:"输出格式"` + Scopes []string `json:"scopes" label:"作用域"` + File string `json:"file" yaml:"file" label:"文件名称"` + Dir string `json:"dir" yaml:"dir" label:"文件存放目录"` + Period string `json:"period" yaml:"period" enum:"hour,day" label:"日志分割周期"` + Expire int `json:"expire" yaml:"expire" label:"日志保存时间" description:"单位:天" default:"7" minimum:"1"` + Type string `json:"type" yaml:"type" enum:"json,line" label:"输出格式"` + //BodyConfig BodyConfig `json:"body_config" yaml:"body_config" label:"请求体/响应体配置" description:"请求体/响应体配置" switch:"type===json"` Formatter eosc.FormatterConfig `json:"formatter" yaml:"formatter" label:"格式化配置"` } + +//type BodyConfig struct { +// BodySize int `json:"body_size" label:"请求体/响应体截取长度" description:"单位:M" default:"10" minimum:"0"` +// BodyCode string `json:"body_code" label:"请求体/响应体编码" enum:"latin,utf8,gbk" default:"utf8"` +//} diff --git a/drivers/output/fileoutput/write.go b/drivers/output/fileoutput/write.go index 3980cce0..357a5b42 100644 --- a/drivers/output/fileoutput/write.go +++ b/drivers/output/fileoutput/write.go @@ -2,11 +2,12 @@ package fileoutput import ( "fmt" + "net/http" + "github.com/eolinker/eosc" "github.com/eolinker/eosc/formatter" "github.com/eolinker/eosc/log/filelog" "github.com/eolinker/eosc/router" - "net/http" "time" ) @@ -15,7 +16,6 @@ type FileWriter struct { formatter eosc.IFormatter transport *filelog.FileWriterByPeriod //id string - fileHandler http.Handler } @@ -37,7 +37,10 @@ func (a *FileWriter) reset(cfg *Config, name string) (err error) { if !has { return errorFormatterType } - + //var extendCfg []byte + //if cfg.Type == "json" { + // extendCfg, _ = json.Marshal(cfg.BodyConfig) + //} fm, err := factory.Create(cfg.Formatter) if err != nil { return err diff --git a/drivers/output/httpoutput/http.go b/drivers/output/httpoutput/http.go index be4b145a..a5165da5 100644 --- a/drivers/output/httpoutput/http.go +++ b/drivers/output/httpoutput/http.go @@ -73,6 +73,7 @@ func create(config *Config) (formatter.ITransport, eosc.IFormatter, error) { if !has { return nil, nil, errFormatterType } + fm, err := factory.Create(config.Formatter) if err != nil { return nil, nil, err diff --git a/drivers/output/kafka/producer.go b/drivers/output/kafka/producer.go index 906f54fc..2dfa23e6 100644 --- a/drivers/output/kafka/producer.go +++ b/drivers/output/kafka/producer.go @@ -32,6 +32,7 @@ func (o *tProducer) reset(cfg *ProducerConfig) (err error) { if !has { return errorFormatterType } + o.formatter, err = factory.Create(cfg.Formatter) if o.producer != nil { diff --git a/drivers/output/nsq/write.go b/drivers/output/nsq/write.go index 1b010eb2..9d0661dd 100644 --- a/drivers/output/nsq/write.go +++ b/drivers/output/nsq/write.go @@ -54,6 +54,7 @@ func (n *Writer) reset(config *Config) error { if !has { return errFormatterType } + fm, err := factory.Create(config.Formatter) if err != nil { return err diff --git a/drivers/output/prometheus/driver.go b/drivers/output/prometheus/driver.go index d856e0cb..782a6e75 100644 --- a/drivers/output/prometheus/driver.go +++ b/drivers/output/prometheus/driver.go @@ -2,6 +2,8 @@ package prometheus import ( "fmt" + "strings" + "github.com/eolinker/apinto/drivers" scope_manager "github.com/eolinker/apinto/scope-manager" "github.com/eolinker/apinto/utils" @@ -9,7 +11,6 @@ import ( "github.com/eolinker/eosc/router" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - "strings" ) func Check(v *Config, workers map[eosc.RequireId]eosc.IWorker) error { @@ -164,7 +165,6 @@ func Create(id, name string, cfg *Config, workers map[eosc.RequireId]eosc.IWorke } metrics[metric.Metric] = m } - //注册路由 p.registry = registry p.handler = promhttp.InstrumentMetricHandler( diff --git a/drivers/plugins/body-check/body-check.go b/drivers/plugins/body-check/body-check.go index 60794348..eb0318ae 100644 --- a/drivers/plugins/body-check/body-check.go +++ b/drivers/plugins/body-check/body-check.go @@ -30,7 +30,8 @@ func (b *BodyCheck) DoHttpFilter(ctx http_service.IHttpContext, next eocontext.I if err != nil { return err } - bodySize := len([]rune(string(body))) + // 计算body大小 + bodySize := len(body) if !b.isEmpty && bodySize < 1 { ctx.Response().SetStatus(400, "400") ctx.Response().SetBody([]byte("Body is required")) diff --git a/drivers/plugins/body-record-truncation/config.go b/drivers/plugins/body-record-truncation/config.go new file mode 100644 index 00000000..243553ec --- /dev/null +++ b/drivers/plugins/body-record-truncation/config.go @@ -0,0 +1,5 @@ +package body_record_truncation + +type Config struct { + BodySize int64 `json:"body_size" label:"截断大小"` +} diff --git a/drivers/plugins/body-record-truncation/executor.go b/drivers/plugins/body-record-truncation/executor.go new file mode 100644 index 00000000..913a152f --- /dev/null +++ b/drivers/plugins/body-record-truncation/executor.go @@ -0,0 +1,73 @@ +package body_record_truncation + +import ( + "github.com/eolinker/apinto/drivers" + "github.com/eolinker/eosc" + "github.com/eolinker/eosc/eocontext" + http_service "github.com/eolinker/eosc/eocontext/http-context" +) + +var () + +var _ http_service.HttpFilter = (*executor)(nil) +var _ eocontext.IFilter = (*executor)(nil) +var _ eosc.IWorker = (*executor)(nil) + +type executor struct { + drivers.WorkerBase + bodySize int64 +} + +func (e *executor) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) (err error) { + return http_service.DoHttpFilter(e, ctx, next) +} + +func (e *executor) DoHttpFilter(ctx http_service.IHttpContext, next eocontext.IChain) (err error) { + if ctx.Request().Method() == "POST" || ctx.Request().Method() == "PUT" || ctx.Request().Method() == "PATCH" { + if e.bodySize != 0 && int64(ctx.Request().ContentLength()) > e.bodySize { + // 当请求体大小大于限制时,截断请求体 + entry := ctx.GetEntry() + body := entry.Read("ctx_request_body") + v, _ := body.(string) + ctx.SetLabel("request_body", v[:e.bodySize]) + ctx.WithValue("request_body_complete", 0) + } else { + ctx.WithValue("request_body_complete", 1) + } + } + if next != nil { + err = next.DoChain(ctx) + } + if e.bodySize != 0 && int64(ctx.Response().ContentLength()) > e.bodySize { + // 当响应体大小大于限制时,截断响应体 + entry := ctx.GetEntry() + body := entry.Read("ctx_response_body") + v, _ := body.(string) + ctx.SetLabel("response_body", v[:e.bodySize]) + ctx.WithValue("response_body_complete", 0) + } else { + ctx.WithValue("response_body_complete", 1) + } + return err +} + +func (e *executor) Destroy() { + return +} + +func (e *executor) Start() error { + return nil +} + +func (e *executor) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWorker) error { + return nil +} + +func (e *executor) Stop() error { + e.Destroy() + return nil +} + +func (e *executor) CheckSkill(skill string) bool { + return http_service.FilterSkillName == skill +} diff --git a/drivers/plugins/body-record-truncation/factory.go b/drivers/plugins/body-record-truncation/factory.go new file mode 100644 index 00000000..b5e28f23 --- /dev/null +++ b/drivers/plugins/body-record-truncation/factory.go @@ -0,0 +1,26 @@ +package body_record_truncation + +import ( + "github.com/eolinker/apinto/drivers" + "github.com/eolinker/eosc" +) + +const ( + Name = "body-record-truncation" +) + +func Register(register eosc.IExtenderDriverRegister) { + register.RegisterExtenderDriver(Name, NewFactory()) +} + +func NewFactory() eosc.IExtenderDriverFactory { + return drivers.NewFactory[Config](Create) +} + +func Create(id, name string, conf *Config, workers map[eosc.RequireId]eosc.IWorker) (eosc.IWorker, error) { + + return &executor{ + WorkerBase: drivers.Worker(id, name), + bodySize: conf.BodySize << 20, + }, nil +} diff --git a/drivers/plugins/gRPC-to-http/complete.go b/drivers/plugins/gRPC-to-http/complete.go index 025c4acf..4d64b4ce 100644 --- a/drivers/plugins/gRPC-to-http/complete.go +++ b/drivers/plugins/gRPC-to-http/complete.go @@ -126,7 +126,8 @@ func (h *complete) Complete(org eocontext.EoContext) error { request.URI().SetHost(targetHost) } response := fasthttp.AcquireResponse() - lastErr = fasthttp_client.ProxyTimeout(scheme, node, request, response, timeOut) + + _, lastErr = fasthttp_client.ProxyTimeout(scheme, node, request, response, timeOut) if lastErr == nil { return newGRPCResponse(ctx, response, methodDesc) } diff --git a/drivers/plugins/request-file-parse/config.go b/drivers/plugins/request-file-parse/config.go new file mode 100644 index 00000000..d265846f --- /dev/null +++ b/drivers/plugins/request-file-parse/config.go @@ -0,0 +1,8 @@ +package request_file_parse + +type Config struct { + FileKey string `json:"file_key" label:"文件Key"` + FileSuffix []string `json:"file_suffix" label:"文件有效后缀列表"` + LargeWarn int64 `json:"large_warn" label:"文件大小警告阈值"` + LargeWarnText string `json:"large_warn_text" label:"文件大小警告标签值"` +} diff --git a/drivers/plugins/request-file-parse/executor.go b/drivers/plugins/request-file-parse/executor.go new file mode 100644 index 00000000..f143e376 --- /dev/null +++ b/drivers/plugins/request-file-parse/executor.go @@ -0,0 +1,158 @@ +package request_file_parse + +import ( + "errors" + "io" + "mime" + "mime/multipart" + "strings" + + "golang.org/x/text/encoding/charmap" + + "github.com/eolinker/eosc/log" + + "github.com/eolinker/apinto/drivers" + "github.com/eolinker/eosc" + "github.com/eolinker/eosc/eocontext" + http_service "github.com/eolinker/eosc/eocontext/http-context" +) + +var ( + MultipartForm = "multipart/form-data" + //csv,tar,bz2,xz,jar,pdf,doc,docx,xls,ppt,xlsx,pptx,zip,txt,rar,gz,dot + defaultValidSuf = map[string]struct{}{ + "csv": {}, + "tar": {}, + "bz2": {}, + "xz": {}, + "jar": {}, + "pdf": {}, + "doc": {}, + "docx": {}, + "xls": {}, + "ppt": {}, + "xlsx": {}, + "pptx": {}, + "zip": {}, + "txt": {}, + "rar": {}, + "gz": {}, + "dot": {}, + } +) + +var _ http_service.HttpFilter = (*executor)(nil) +var _ eocontext.IFilter = (*executor)(nil) +var _ eosc.IWorker = (*executor)(nil) + +type executor struct { + drivers.WorkerBase + fileKey string + validSuf map[string]struct{} + largeWarn int64 + largeWarnStr string +} + +func (e *executor) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) (err error) { + return http_service.DoHttpFilter(e, ctx, next) +} + +func (e *executor) DoHttpFilter(ctx http_service.IHttpContext, next eocontext.IChain) (err error) { + if ctx.Request().Method() == "POST" || ctx.Request().Method() == "PUT" || ctx.Request().Method() == "PATCH" { + contentType, _, err := mime.ParseMediaType(ctx.Request().ContentType()) + if err != nil { + return err + } + if contentType == MultipartForm { + // 当请求为文件请求时,解析文件 + fh, has := ctx.Request().Body().GetFile(e.fileKey) + if has { + for _, h := range fh { + suffix, err := getFileSuffix(h) + if err != nil { + log.Errorf("get file suffix error: %v,name is %s", err, e.fileKey) + continue + } + if _, ok := e.validSuf[suffix]; !ok { + log.Errorf("file suffix is not valid,name is %s,suffix is %s", e.fileKey, suffix) + continue + } + f, err := h.Open() + if err != nil { + log.Errorf("file open error: %v,name is %s", err, e.fileKey) + continue + } + + body, err := io.ReadAll(f) + if err != nil { + log.Errorf("read file body error: %v,name is %s", err, e.fileKey) + f.Close() + continue + } + f.Close() + + // body此处要做latin1编码 + out := make([]byte, 0, len(body)) + for _, t := range body { + if v, ok := charmap.ISO8859_1.EncodeRune(rune(t)); ok { + out = append(out, v) + } + } + //if e.largeWarn > 0 && h.Size > e.largeWarn { + // ctx.WithValue("file_size_warn", e.largeWarnStr) + // out = out[:e.largeWarn] + // ctx.WithValue("request_body_complete", 0) + //} + + ctx.SetLabel("request_body", string(out)) + ctx.SetLabel("file_direction", "upload") + ctx.SetLabel("file_name", h.Filename) + ctx.SetLabel("file_suffix", suffix) + ctx.WithValue("file_size", h.Size) + + break + } + } + } + } + + if next != nil { + return next.DoChain(ctx) + } + return nil +} + +func (e *executor) Destroy() { + return +} + +func (e *executor) Start() error { + return nil +} + +func (e *executor) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWorker) error { + return nil +} + +func (e *executor) Stop() error { + e.Destroy() + return nil +} + +func (e *executor) CheckSkill(skill string) bool { + return http_service.FilterSkillName == skill +} + +func getFileSuffix(f *multipart.FileHeader) (string, error) { + // 获取文件后缀 + fileName := f.Filename + // 获取文件后缀 + suffix := fileName[strings.LastIndex(fileName, ".")+1:] + if len(suffix) == 0 { + contentType := f.Header.Get("Content-Type") + if len(contentType) == 0 { + return "", errors.New("file suffix is empty") + } + } + return suffix, nil +} diff --git a/drivers/plugins/request-file-parse/factory.go b/drivers/plugins/request-file-parse/factory.go new file mode 100644 index 00000000..0ea58973 --- /dev/null +++ b/drivers/plugins/request-file-parse/factory.go @@ -0,0 +1,39 @@ +package request_file_parse + +import ( + "github.com/eolinker/apinto/drivers" + "github.com/eolinker/eosc" +) + +const ( + Name = "request_file_parse" +) + +func Register(register eosc.IExtenderDriverRegister) { + register.RegisterExtenderDriver(Name, NewFactory()) +} + +func NewFactory() eosc.IExtenderDriverFactory { + return drivers.NewFactory[Config](Create) +} + +func Create(id, name string, conf *Config, workers map[eosc.RequireId]eosc.IWorker) (eosc.IWorker, error) { + largeWarnText := "large" + if conf.LargeWarnText != "" { + largeWarnText = conf.LargeWarnText + } + validSuffix := make(map[string]struct{}) + for key := range defaultValidSuf { + validSuffix[key] = struct{}{} + } + for _, s := range conf.FileSuffix { + validSuffix[s] = struct{}{} + } + return &executor{ + WorkerBase: drivers.Worker(id, name), + fileKey: conf.FileKey, + validSuf: validSuffix, + largeWarn: conf.LargeWarn << 20, + largeWarnStr: largeWarnText, + }, nil +} diff --git a/drivers/plugins/response-file-parse/config.go b/drivers/plugins/response-file-parse/config.go new file mode 100644 index 00000000..d35b5180 --- /dev/null +++ b/drivers/plugins/response-file-parse/config.go @@ -0,0 +1,8 @@ +package response_file_parse + +type Config struct { + FileKey string `json:"file_key" label:"文件Key"` + FileSuffix []string `json:"file_suffix" label:"文件有效后缀列表"` + LargeWarn int64 `json:"large_warn" label:"文件大小警告阈值"` + LargeWarnText string `json:"large_warn_text" label:"文件大小警告标签值"` +} diff --git a/drivers/plugins/response-file-parse/executor.go b/drivers/plugins/response-file-parse/executor.go new file mode 100644 index 00000000..5699217a --- /dev/null +++ b/drivers/plugins/response-file-parse/executor.go @@ -0,0 +1,130 @@ +package response_file_parse + +import ( + "strings" + + "golang.org/x/text/encoding/charmap" + + "github.com/eolinker/eosc/log" + + "github.com/eolinker/apinto/drivers" + "github.com/eolinker/eosc" + "github.com/eolinker/eosc/eocontext" + http_service "github.com/eolinker/eosc/eocontext/http-context" +) + +var ( + //csv,tar,bz2,xz,jar,pdf,doc,docx,xls,ppt,xlsx,pptx,zip,txt,rar,gz,dot + defaultValidSuf = map[string]struct{}{ + "csv": {}, + "tar": {}, + "bz2": {}, + "xz": {}, + "jar": {}, + "pdf": {}, + "doc": {}, + "docx": {}, + "xls": {}, + "ppt": {}, + "xlsx": {}, + "pptx": {}, + "zip": {}, + "txt": {}, + "rar": {}, + "gz": {}, + "dot": {}, + } +) + +var _ http_service.HttpFilter = (*executor)(nil) +var _ eocontext.IFilter = (*executor)(nil) +var _ eosc.IWorker = (*executor)(nil) + +type executor struct { + drivers.WorkerBase + fileKey string + validSuf map[string]struct{} + largeWarn int64 + largeWarnStr string +} + +func (e *executor) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) (err error) { + return http_service.DoHttpFilter(e, ctx, next) +} + +func (e *executor) DoHttpFilter(ctx http_service.IHttpContext, next eocontext.IChain) (err error) { + if next != nil { + err = next.DoChain(ctx) + if err != nil { + return err + } + } + contentDisposition := ctx.Response().Headers().Get("Content-Disposition") + if contentDisposition != "" { + params := strings.Split(contentDisposition, ";") + paramsMap := make(map[string]string, len(params)) + for _, param := range params { + param = strings.TrimSpace(param) + ps := strings.Split(param, "=") + if ps[0] != "" { + if len(ps) > 1 { + paramsMap[ps[0]] = ps[1] + } else { + paramsMap[ps[0]] = "" + } + } + } + if err != nil { + log.Errorf("parse content disposition error: %v", err) + return nil + } + if fileName, ok := paramsMap[e.fileKey]; ok { + if fileName != "" { + suffix := fileName[strings.LastIndex(fileName, ".")+1:] + if _, ok := e.validSuf[suffix]; !ok { + log.Errorf("file suffix is not valid,name is %s,suffix is %s", e.fileKey, suffix) + return nil + } + body := ctx.Response().GetBody() + // body此处要做latin1编码 + out := make([]byte, 0, len(body)) + for _, t := range body { + if v, ok := charmap.ISO8859_1.EncodeRune(rune(t)); ok { + out = append(out, v) + } + } + size := len(out) + ctx.WithValue("response_body", string(out)) + ctx.WithValue("file_direction", "download") + ctx.WithValue("file_name", fileName) + ctx.WithValue("file_suffix", suffix) + ctx.WithValue("file_size", size) + if int64(size) > e.largeWarn { + ctx.WithValue("file_large_warn", e.largeWarnStr) + } + } + } + } + return nil +} + +func (e *executor) Destroy() { + return +} + +func (e *executor) Start() error { + return nil +} + +func (e *executor) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWorker) error { + return nil +} + +func (e *executor) Stop() error { + e.Destroy() + return nil +} + +func (e *executor) CheckSkill(skill string) bool { + return http_service.FilterSkillName == skill +} diff --git a/drivers/plugins/response-file-parse/factory.go b/drivers/plugins/response-file-parse/factory.go new file mode 100644 index 00000000..ae0e8a56 --- /dev/null +++ b/drivers/plugins/response-file-parse/factory.go @@ -0,0 +1,39 @@ +package response_file_parse + +import ( + "github.com/eolinker/apinto/drivers" + "github.com/eolinker/eosc" +) + +const ( + Name = "response_file_parse" +) + +func Register(register eosc.IExtenderDriverRegister) { + register.RegisterExtenderDriver(Name, NewFactory()) +} + +func NewFactory() eosc.IExtenderDriverFactory { + return drivers.NewFactory[Config](Create) +} + +func Create(id, name string, conf *Config, workers map[eosc.RequireId]eosc.IWorker) (eosc.IWorker, error) { + largeWarnText := "large" + if conf.LargeWarnText != "" { + largeWarnText = conf.LargeWarnText + } + validSuffix := make(map[string]struct{}) + for key := range defaultValidSuf { + validSuffix[key] = struct{}{} + } + for _, s := range conf.FileSuffix { + validSuffix[s] = struct{}{} + } + return &executor{ + WorkerBase: drivers.Worker(id, name), + fileKey: conf.FileKey, + validSuf: validSuffix, + largeWarn: conf.LargeWarn << 20, + largeWarnStr: largeWarnText, + }, nil +} diff --git a/entries/http-entry/entry.go b/entries/http-entry/entry.go index 06c7e234..02d3a831 100644 --- a/entries/http-entry/entry.go +++ b/entries/http-entry/entry.go @@ -26,6 +26,7 @@ func (e *Entry) Read(pattern string) interface{} { if !ok { return "" } + return v } diff --git a/entries/http-entry/reader.go b/entries/http-entry/reader.go index 84f405ed..6135b2b4 100644 --- a/entries/http-entry/reader.go +++ b/entries/http-entry/reader.go @@ -47,8 +47,32 @@ func (f Fields) Read(name string, ctx http_service.IHttpContext) (interface{}, b return label, label != "" } +type CtxRule struct { + fields Fields +} + +func (l *CtxRule) Read(name string, ctx http_service.IHttpContext) (interface{}, bool) { + value := ctx.Value(name) + if value != nil { + return value, true + } + // 先从Label中获取值 + value = ctx.GetLabel(name) + if value != "" { + return value, true + } + + return l.fields.Read(name, ctx) +} +func init() { + ctxRule = &CtxRule{ + fields: rule, + } +} + var ( - rule Fields = map[string]IReader{ + ctxRule *CtxRule + rule Fields = map[string]IReader{ "request_id": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { return ctx.RequestId(), true }), @@ -58,15 +82,18 @@ var ( "cluster": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { return os.Getenv("cluster_id"), true }), - "api_id": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { - return ctx.GetLabel("api_id"), true - }), "query": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { if name == "" { return utils.QueryUrlEncode(ctx.Request().URI().RawQuery()), true } return url.QueryEscape(ctx.Request().URI().GetQuery(name)), true }), + "src_ip": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { + return ctx.Request().RealIp(), true + }), + "src_port": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { + return ctx.Request().RemotePort(), true + }), "uri": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { //不带请求参数的uri return ctx.Request().URI().Path(), true @@ -95,6 +122,10 @@ var ( "remote_port": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { return ctx.Request().RemotePort(), true }), + "ctx": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { + return ctxRule.Read(name, ctx) + + }), "request": Fields{ "body": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { @@ -104,6 +135,17 @@ var ( } return string(body), true }), + "body_filter": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { + value := ctx.GetLabel("xxx") + if value == "" { + body, err := ctx.Request().Body().RawBody() + if err != nil { + return "", false + } + return string(body), true + } + return ctx.GetLabel("xxx"), true + }), "length": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { return ctx.Request().ContentLength(), true @@ -139,12 +181,18 @@ var ( //return time.Now().Format("2006-01-02 15:04:05"), true return ctx.AcceptTime().Format("2006-01-02 15:04:05"), true }), + "timestamp": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { + return ctx.AcceptTime().Unix(), true + }), "header": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { if name == "" { return url.Values(ctx.Request().Header().Headers()).Encode(), true } return ctx.Request().Header().GetHeader(strings.Replace(name, "_", "-", -1)), true }), + "headers": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { + return ctx.Request().Header().Headers(), true + }), "http": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { return ctx.Request().Header().GetHeader(strings.Replace(name, "_", "-", -1)), true }), @@ -160,15 +208,20 @@ var ( "": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { return ctx.Response().String(), true }), - "body": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { - return string(ctx.Response().GetBody()), true - }), + "body": Fields{ + "": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { + return string(ctx.Response().GetBody()), true + }), + }, "header": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { if name == "" { return url.Values(ctx.Response().Headers()).Encode(), true } return ctx.Response().GetHeader(strings.Replace(name, "_", "-", -1)), true }), + "headers": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { + return ctx.Response().Headers(), true + }), "status": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { return ctx.Response().ProxyStatus(), true }), @@ -179,6 +232,15 @@ var ( return strconv.Itoa(ctx.Response().ContentLength()), true }), }, + "set_cookies": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { + return strings.Split(ctx.Response().GetHeader("Set-Cookie"), "; "), true + }), + "dst_ip": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { + return ctx.Response().RemoteIP(), true + }), + "dst_port": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { + return ctx.Response().RemotePort(), true + }), "proxy": proxyFields, } @@ -209,6 +271,12 @@ var ( "addr": ProxyReadFunc(func(name string, proxy http_service.IProxy) (interface{}, bool) { return proxy.URI().Host(), true }), + "dst_ip": ProxyReadFunc(func(name string, proxy http_service.IProxy) (interface{}, bool) { + return proxy.RemotePort(), true + }), + "dst_port": ProxyReadFunc(func(name string, proxy http_service.IProxy) (interface{}, bool) { + return proxy.RemotePort(), true + }), "scheme": ProxyReadFunc(func(name string, proxy http_service.IProxy) (interface{}, bool) { return proxy.URI().Scheme(), true }), diff --git a/go.mod b/go.mod index d28a1222..376990a3 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/clbanning/mxj v1.8.4 github.com/coocood/freecache v1.2.2 github.com/dubbogo/gost v1.13.1 - github.com/eolinker/eosc v0.14.6 + github.com/eolinker/eosc v0.14.7 github.com/fasthttp/websocket v1.5.0 github.com/fullstorydev/grpcurl v1.8.7 github.com/go-redis/redis/v8 v8.11.5 @@ -160,7 +160,7 @@ require ( go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.23.0 golang.org/x/sys v0.6.0 // indirect - golang.org/x/text v0.8.0 // indirect + golang.org/x/text v0.8.0 golang.org/x/time v0.1.0 // indirect google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect gopkg.in/sourcemap.v1 v1.0.5 // indirect diff --git a/node/fasthttp-client/client.go b/node/fasthttp-client/client.go index 0ce90bb4..6b786b34 100644 --- a/node/fasthttp-client/client.go +++ b/node/fasthttp-client/client.go @@ -13,13 +13,41 @@ import ( "github.com/valyala/fasthttp" ) -func ProxyTimeout(scheme string, node eocontext.INode, req *fasthttp.Request, resp *fasthttp.Response, timeout time.Duration) error { - addr := fmt.Sprintf("%s://%s", scheme, node.Addr()) - err := defaultClient.ProxyTimeout(addr, req, resp, timeout) +type Addr struct { + IP net.IP + Port int +} + +func resolveAddr(scheme string, addr string) (*Addr, error) { + tcpAddr, err := net.ResolveTCPAddr("tcp", addr) + if err != nil { + return nil, err + } + port := tcpAddr.Port + if port == 0 { + if scheme == "http" { + port = 80 + } else if scheme == "https" { + port = 443 + } + } + return &Addr{ + IP: tcpAddr.IP, + Port: port, + }, nil +} + +func ProxyTimeout(scheme string, node eocontext.INode, req *fasthttp.Request, resp *fasthttp.Response, timeout time.Duration) (*Addr, error) { + tcpAddr, err := resolveAddr(scheme, node.Addr()) + if err != nil { + return nil, err + } + addr := fmt.Sprintf("%s://%s:%d", scheme, tcpAddr.IP.String(), tcpAddr.Port) + err = defaultClient.ProxyTimeout(addr, req, resp, timeout) if err != nil { node.Down() } - return err + return tcpAddr, err } var defaultClient Client @@ -101,6 +129,32 @@ func (c *Client) getHostClient(addr string) (*fasthttp.HostClient, string, error return hc, scheme, nil } +//func (c *Client) getDialFunc() fasthttp.DialFunc { +// return func(addr string) (net.Conn, error) { +// atomic.AddInt64(&dialCount, 1) +// conn, err := tcpDial.Dial(addr) +// if err != nil { +// return nil, err +// } +// c.conn = conn +// return &debugConn{Conn: conn}, nil +// } +//} +// +//func (c *Client) RemoteAddr() string { +// if c.conn != nil { +// return c.conn.RemoteAddr().String() +// } +// return "unknown" +//} +// +//func (c *Client) LocalAddr() string { +// if c.conn != nil { +// return c.conn.RemoteAddr().String() +// } +// return "unknown" +//} + // ProxyTimeout performs the given request and waits for response during // the given timeout duration. // diff --git a/node/fasthttp-client/dial.go b/node/fasthttp-client/dial.go index f9c0e5b3..b23690b4 100644 --- a/node/fasthttp-client/dial.go +++ b/node/fasthttp-client/dial.go @@ -2,13 +2,14 @@ package fasthttp_client import ( "fmt" - "github.com/eolinker/eosc/debug" - "github.com/valyala/fasthttp" "net" "net/http" "sync" "sync/atomic" "time" + + "github.com/eolinker/eosc/debug" + "github.com/valyala/fasthttp" ) var ( diff --git a/node/http-context/clone.go b/node/http-context/clone.go index cf56ee32..cb729515 100644 --- a/node/http-context/clone.go +++ b/node/http-context/clone.go @@ -139,9 +139,9 @@ func (ctx *cloneContext) SendTo(scheme string, node eoscContext.INode, timeout t case eoscContext.ReWriteHost: request.URI().SetHost(targetHost) } - + var tcpAddr *fasthttp_client.Addr beginTime := time.Now() - ctx.responseError = fasthttp_client.ProxyTimeout(scheme, node, request, ctx.response.Response, timeout) + tcpAddr, ctx.responseError = fasthttp_client.ProxyTimeout(scheme, node, request, ctx.response.Response, timeout) agent := newRequestAgent(&ctx.proxyRequest, host, scheme, beginTime, time.Now()) if ctx.responseError != nil { agent.setStatusCode(504) @@ -149,7 +149,8 @@ func (ctx *cloneContext) SendTo(scheme string, node eoscContext.INode, timeout t agent.setStatusCode(ctx.response.Response.StatusCode()) } agent.responseBody = string(ctx.response.Response.Body()) - + agent.setRemoteIP(tcpAddr.IP.String()) + agent.setRemotePort(tcpAddr.Port) agent.setResponseLength(ctx.response.Response.Header.ContentLength()) ctx.proxyRequests = append(ctx.proxyRequests, agent) diff --git a/node/http-context/context.go b/node/http-context/context.go index 1394118a..867c5386 100644 --- a/node/http-context/context.go +++ b/node/http-context/context.go @@ -143,9 +143,9 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti case eoscContext.ReWriteHost: request.URI().SetHost(targetHost) } - + var tcpAddr *fasthttp_client.Addr beginTime := time.Now() - ctx.response.responseError = fasthttp_client.ProxyTimeout(scheme, node, request, &ctx.fastHttpRequestCtx.Response, timeout) + tcpAddr, ctx.response.responseError = fasthttp_client.ProxyTimeout(scheme, node, request, &ctx.fastHttpRequestCtx.Response, timeout) agent := newRequestAgent(&ctx.proxyRequest, host, scheme, beginTime, time.Now()) if ctx.response.responseError != nil { agent.setStatusCode(504) @@ -154,8 +154,12 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti agent.setStatusCode(ctx.fastHttpRequestCtx.Response.StatusCode()) } agent.responseBody = string(ctx.response.Response.Body()) + agent.setRemoteIP(tcpAddr.IP.String()) + agent.setRemotePort(tcpAddr.Port) agent.setResponseLength(ctx.fastHttpRequestCtx.Response.Header.ContentLength()) + ctx.response.remoteIP = tcpAddr.IP.String() + ctx.response.remotePort = tcpAddr.Port ctx.proxyRequests = append(ctx.proxyRequests, agent) return ctx.response.responseError diff --git a/node/http-context/proxy-agent.go b/node/http-context/proxy-agent.go index 24beff0d..4cee6e9d 100644 --- a/node/http-context/proxy-agent.go +++ b/node/http-context/proxy-agent.go @@ -20,6 +20,8 @@ type requestAgent struct { beginTime time.Time endTime time.Time hostAgent *UrlAgent + remoteIP string + remotePort int } func (a *requestAgent) ResponseBody() string { @@ -53,6 +55,14 @@ func (a *requestAgent) setResponseLength(length int) { } } +func (a *requestAgent) setRemoteIP(ip string) { + a.remoteIP = ip +} + +func (a *requestAgent) setRemotePort(port int) { + a.remotePort = port +} + func newRequestAgent(IRequest http_service.IRequest, host string, scheme string, beginTime, endTime time.Time) *requestAgent { return &requestAgent{IRequest: IRequest, host: host, scheme: scheme, beginTime: beginTime, endTime: endTime} } @@ -61,6 +71,14 @@ func (a *requestAgent) ResponseTime() int64 { return a.endTime.Sub(a.beginTime).Milliseconds() } +func (a *requestAgent) RemoteIP() string { + return a.remoteIP +} + +func (a *requestAgent) RemotePort() int { + return a.remotePort +} + func (a *requestAgent) URI() http_service.IURIWriter { if a.hostAgent == nil { a.hostAgent = NewUrlAgent(a.IRequest.URI(), a.host, a.scheme) diff --git a/node/http-context/response.go b/node/http-context/response.go index 5cbb6ef7..4de40cba 100644 --- a/node/http-context/response.go +++ b/node/http-context/response.go @@ -19,6 +19,8 @@ type Response struct { responseTime time.Duration proxyStatusCode int responseError error + remoteIP string + remotePort int } func (r *Response) ContentLength() int { @@ -119,3 +121,11 @@ func (r *Response) SetResponseTime(t time.Duration) { func (r *Response) ResponseTime() time.Duration { return r.responseTime } + +func (r *Response) RemoteIP() string { + return r.remoteIP +} + +func (r *Response) RemotePort() int { + return r.remotePort +}