From 70550ec2429438e0788cb6bd94b50cca60122afb Mon Sep 17 00:00:00 2001 From: kunlongli <16629885+cnlkl@users.noreply.github.com> Date: Fri, 4 Aug 2023 22:58:39 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E8=87=AA=E5=AE=9A?= =?UTF-8?q?=E4=B9=89DNS=E8=A7=A3=E6=9E=90=E8=BF=87=E7=A8=8B=20#54?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- analysis-tool-sdk-golang/api/bkrepo.go | 39 +++++++++++- analysis-tool-sdk-golang/api/bkrepo_test.go | 61 ++++++++++++++++--- .../framework/executor.go | 2 +- analysis-tool-sdk-golang/object/argument.go | 38 ++++++++++++ analysis-tool-sdk-golang/util/chunk.go | 25 +++++++- analysis-tool-sdk-golang/util/chunk_test.go | 11 ++-- analysis-tool-sdk-golang/util/downloader.go | 18 +++++- analysis-tool-sdk-golang/util/file.go | 2 + 8 files changed, 177 insertions(+), 19 deletions(-) diff --git a/analysis-tool-sdk-golang/api/bkrepo.go b/analysis-tool-sdk-golang/api/bkrepo.go index 1dccbd4..04e81a4 100644 --- a/analysis-tool-sdk-golang/api/bkrepo.go +++ b/analysis-tool-sdk-golang/api/bkrepo.go @@ -11,6 +11,7 @@ import ( "net/http" "os" "strconv" + "strings" "time" ) @@ -120,8 +121,42 @@ func (c *BkRepoClient) Failed(err error) { } // GenerateInputFile 生成待分析文件 -func (c *BkRepoClient) GenerateInputFile() (*os.File, error) { - return util.GenerateInputFile(c.ToolInput, &util.DefaultDownloader{}) +func (c *BkRepoClient) GenerateInputFile(client *http.Client) (*os.File, error) { + downloader, err := c.createDownloader(client) + if err != nil { + return nil, err + } + return util.GenerateInputFile(c.ToolInput, downloader) +} + +func (c *BkRepoClient) createDownloader(client *http.Client) (util.Downloader, error) { + var downloader util.Downloader + workerCount, _ := c.ToolInput.ToolConfig.GetIntArg(util.ArgKeyDownloaderWorkerCount) + if workerCount > 0 { + // 解析header + downloaderHeadersStr := c.ToolInput.ToolConfig.GetStringArg(util.ArgKeyDownloaderWorkerHeaders) + headers := make(map[string]string) + if len(downloaderHeadersStr) > 0 { + downloaderHeaders := strings.Split(downloaderHeadersStr, ",") + for i := range downloaderHeaders { + h := strings.Split(downloaderHeaders[i], ":") + if len(h) != 2 { + return nil, errors.New("headers error: " + downloaderHeaders[i]) + } + headers[strings.TrimSpace(h[0])] = strings.TrimSpace(h[1]) + } + } + // 创建下载器并生成待分析文件 + downloader = util.NewChunkDownloader( + int(workerCount), + util.WorkDir, + headers, + client, + ) + } else { + downloader = util.NewDownloader(client) + } + return downloader, nil } // updateSubtaskStatus 更新任务状态为执行中 diff --git a/analysis-tool-sdk-golang/api/bkrepo_test.go b/analysis-tool-sdk-golang/api/bkrepo_test.go index 8d04762..48bf1d4 100644 --- a/analysis-tool-sdk-golang/api/bkrepo_test.go +++ b/analysis-tool-sdk-golang/api/bkrepo_test.go @@ -1,20 +1,18 @@ package api import ( + "context" "fmt" "github.com/TencentBlueKing/ci-repoAnalysis/analysis-tool-sdk-golang/object" + "github.com/TencentBlueKing/ci-repoAnalysis/analysis-tool-sdk-golang/util" + "net" "os" "testing" + "time" ) func TestPullTask(t *testing.T) { - client := BkRepoClient{} - client.Args = &object.Arguments{ - Url: os.Getenv("URL"), - Token: os.Getenv("TOKEN"), - ExecutionCluster: os.Getenv("EXECUTION_CLUSTER"), - PullRetry: -1, - } + client := createClient() toolInput, err := client.pullToolInput() if err != nil { fmt.Println(err.Error()) @@ -24,3 +22,52 @@ func TestPullTask(t *testing.T) { fmt.Println(toolInput.TaskId) } } + +func TestCreateDownloader(t *testing.T) { + dialer := &net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + } + dialContext := func(ctx context.Context, network, addr string) (net.Conn, error) { + return dialer.DialContext(ctx, network, addr) + } + downloaderClient := (&object.Arguments{}).CustomDownloaderHttpClientDialContext(dialContext) + client := createClient() + args := make([]object.Argument, 2) + args = append(args, object.Argument{ + Type: "STRING", + Key: util.ArgKeyDownloaderWorkerHeaders, + Value: "X-Test-Header: test, X-Test-Header2: test2", + }) + args = append(args, object.Argument{ + Type: "NUMBER", + Key: util.ArgKeyDownloaderWorkerCount, + Value: "2", + }) + client.ToolInput = &object.ToolInput{ + ToolConfig: object.ToolConfig{Args: args}, + } + downloader, err := client.createDownloader(downloaderClient) + if err != nil { + t.Fatalf(err.Error()) + } + reader, err := downloader.Download(os.Getenv("ARTIFACT_URL")) + if err != nil { + fmt.Println(err.Error()) + } else { + _ = reader.Close() + } + + _ = os.RemoveAll(util.WorkDir) +} + +func createClient() *BkRepoClient { + client := BkRepoClient{} + client.Args = &object.Arguments{ + Url: os.Getenv("URL"), + Token: os.Getenv("TOKEN"), + ExecutionCluster: os.Getenv("EXECUTION_CLUSTER"), + PullRetry: -1, + } + return &client +} diff --git a/analysis-tool-sdk-golang/framework/executor.go b/analysis-tool-sdk-golang/framework/executor.go index 26f24b1..7bcc712 100644 --- a/analysis-tool-sdk-golang/framework/executor.go +++ b/analysis-tool-sdk-golang/framework/executor.go @@ -44,7 +44,7 @@ func doAnalyze(executor Executor, arguments *object.Arguments) { util.Info("no subtask found, exit") os.Exit(0) } - file, err := client.GenerateInputFile() + file, err := client.GenerateInputFile(arguments.GetDownloaderClient()) defer file.Close() if err != nil { client.Failed(errors.New("Generate input file failed: " + err.Error())) diff --git a/analysis-tool-sdk-golang/object/argument.go b/analysis-tool-sdk-golang/object/argument.go index d30a91f..08c28a8 100644 --- a/analysis-tool-sdk-golang/object/argument.go +++ b/analysis-tool-sdk-golang/object/argument.go @@ -1,8 +1,12 @@ package object import ( + "context" "flag" "fmt" + "net" + "net/http" + "time" ) // Arguments 输入参数 @@ -15,6 +19,7 @@ type Arguments struct { InputFilePath string OutputFilePath string KeepRunning bool + downloaderClient *http.Client } var args *Arguments @@ -73,3 +78,36 @@ func (arg *Arguments) Online() bool { func (arg *Arguments) ShouldKeepRunning() bool { return arg.Online() && arg.KeepRunning && arg.TaskId == "" } + +// CustomDownloaderHttpClientDialContext 自定义下载器使用的http客户端DNS解析 +func (arg *Arguments) CustomDownloaderHttpClientDialContext( + dialContext func(ctx context.Context, network, addr string) (net.Conn, error), +) *http.Client { + if dialContext == nil { + arg.downloaderClient = http.DefaultClient + } else { + transport := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: dialContext, + ForceAttemptHTTP2: true, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + } + + arg.downloaderClient = &http.Client{ + Transport: transport, + } + } + return arg.downloaderClient +} + +// GetDownloaderClient 获取下载器HTTP客户端 +func (arg *Arguments) GetDownloaderClient() *http.Client { + if arg.downloaderClient == nil { + return http.DefaultClient + } else { + return arg.downloaderClient + } +} diff --git a/analysis-tool-sdk-golang/util/chunk.go b/analysis-tool-sdk-golang/util/chunk.go index 3e31317..fd6c41a 100644 --- a/analysis-tool-sdk-golang/util/chunk.go +++ b/analysis-tool-sdk-golang/util/chunk.go @@ -18,6 +18,27 @@ type ChunkDownloader struct { WorkerCount int TmpDir string Headers map[string]string + client *http.Client +} + +// NewChunkDownloader 创建分片下载器 +func NewChunkDownloader( + workerCount int, + tmpDir string, + headers map[string]string, + client *http.Client, +) *ChunkDownloader { + var c = client + if client == nil { + c = http.DefaultClient + } + + return &ChunkDownloader{ + WorkerCount: workerCount, + TmpDir: tmpDir, + Headers: headers, + client: c, + } } // Download 分片下载 @@ -79,7 +100,7 @@ func (d *ChunkDownloader) doDownload(ctx context.Context, url string, file *os.F rangeHeader := "bytes=" + strconv.Itoa(start) + "-" + strconv.Itoa(end) req.Header.Set("Range", rangeHeader) - res, err := http.DefaultClient.Do(req) + res, err := d.client.Do(req) if err != nil { return err } @@ -117,7 +138,7 @@ func (d *ChunkDownloader) doDownload(ctx context.Context, url string, file *os.F func (d *ChunkDownloader) getFileSize(url string) (int, error) { req, _ := http.NewRequest("HEAD", url, nil) d.setHeaders(req) - res, err := http.DefaultClient.Do(req) + res, err := d.client.Do(req) if err != nil { return 0, err } diff --git a/analysis-tool-sdk-golang/util/chunk_test.go b/analysis-tool-sdk-golang/util/chunk_test.go index 9acdcdc..529727a 100644 --- a/analysis-tool-sdk-golang/util/chunk_test.go +++ b/analysis-tool-sdk-golang/util/chunk_test.go @@ -14,14 +14,15 @@ func TestDownload(t *testing.T) { errorToPanic(func() error { return os.RemoveAll(tmpDir) }) errorToPanic(func() error { return os.Mkdir(tmpDir, 0766) }) - downloader := ChunkDownloader{ - WorkerCount: 8, - TmpDir: tmpDir, - Headers: map[string]string{ + downloader := NewChunkDownloader( + 8, + tmpDir, + map[string]string{ "X-BKREPO-DOWNLOAD-REDIRECT-TO": "INNERCOS", "Authorization": os.Getenv("AUTHORIZATION"), }, - } + nil, + ) file, err := downloader.Download(os.Getenv("URL")) h := sha256.New() if _, err := io.Copy(h, file); err != nil { diff --git a/analysis-tool-sdk-golang/util/downloader.go b/analysis-tool-sdk-golang/util/downloader.go index c1f94d3..20dfde6 100644 --- a/analysis-tool-sdk-golang/util/downloader.go +++ b/analysis-tool-sdk-golang/util/downloader.go @@ -13,12 +13,26 @@ type Downloader interface { } // DefaultDownloader 默认下载器实现 -type DefaultDownloader struct{} +type DefaultDownloader struct { + client *http.Client +} + +// NewDownloader 创建默认下载器 +func NewDownloader(client *http.Client) Downloader { + var c = client + if client == nil { + c = http.DefaultClient + } + + return &DefaultDownloader{ + client: c, + } +} // Download 从指定url获取输入流 func (d *DefaultDownloader) Download(url string) (io.ReadCloser, error) { Info("downloading %s", url) - response, err := http.DefaultClient.Get(url) + response, err := d.client.Get(url) if err != nil { return nil, err } diff --git a/analysis-tool-sdk-golang/util/file.go b/analysis-tool-sdk-golang/util/file.go index d5cff73..d2b554f 100644 --- a/analysis-tool-sdk-golang/util/file.go +++ b/analysis-tool-sdk-golang/util/file.go @@ -17,6 +17,8 @@ import ( "strings" ) +const ArgKeyDownloaderWorkerCount = "downloaderWorker" +const ArgKeyDownloaderWorkerHeaders = "downloaderHeaders" const ArgKeyPkgType = "packageType" const PackageTypeDocker = "DOCKER" const WorkDir = "/bkrepo/workspace"