Skip to content

Commit

Permalink
feat: 新增HTTP请求失败重试 #59
Browse files Browse the repository at this point in the history
  • Loading branch information
cnlkl authored Sep 14, 2023
1 parent a1b9c21 commit aa5c8e8
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 110 deletions.
39 changes: 18 additions & 21 deletions analysis-tool-sdk-golang/api/bkrepo.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"github.com/TencentBlueKing/ci-repoAnalysis/analysis-tool-sdk-golang/object"
"github.com/TencentBlueKing/ci-repoAnalysis/analysis-tool-sdk-golang/util"
"github.com/hashicorp/go-retryablehttp"
"io"
"net/http"
"net/url"
Expand Down Expand Up @@ -102,16 +103,16 @@ func (c *BkRepoClient) Finish(cancel context.CancelFunc, toolOutput *object.Tool
if err != nil {
panic("Finish analyze failed: " + err.Error())
}
req, err := http.NewRequest("POST", reqUrl, bytes.NewReader(reqBody))
req, err := retryablehttp.NewRequest("POST", reqUrl, bytes.NewReader(reqBody))
if err != nil {
panic("Finish analyze failed: " + err.Error())
}
req.Header.Add("Content-Type", "application/json; charset=UTF-8")
res, err := http.DefaultClient.Do(req)
res, err := util.DefaultClient.Do(req)
if err != nil {
panic("Report analysis result failed, taskId: " + toolOutput.TaskId + ", err: " + err.Error())
}
defer res.Body.Close()
defer util.DrainBody(res.Body)
if res.StatusCode != 200 {
panic("Report analysis result failed, taskId: " + toolOutput.TaskId)
}
Expand All @@ -126,15 +127,15 @@ func (c *BkRepoClient) Failed(cancel context.CancelFunc, err error) {
}

// GenerateInputFile 生成待分析文件
func (c *BkRepoClient) GenerateInputFile(client *http.Client) (*os.File, error) {
downloader, err := c.createDownloader(client)
func (c *BkRepoClient) GenerateInputFile() (*os.File, error) {
downloader, err := c.createDownloader()
if err != nil {
return nil, err
}
return util.GenerateInputFile(c.ToolInput, downloader)
}

func (c *BkRepoClient) createDownloader(client *http.Client) (util.Downloader, error) {
func (c *BkRepoClient) createDownloader() (util.Downloader, error) {
var downloader util.Downloader
workerCount, _ := c.ToolInput.ToolConfig.GetIntArg(util.ArgKeyDownloaderWorkerCount)
if workerCount > 0 {
Expand All @@ -152,30 +153,25 @@ func (c *BkRepoClient) createDownloader(client *http.Client) (util.Downloader, e
}
}
// 创建下载器并生成待分析文件
downloader = util.NewChunkDownloader(
int(workerCount),
util.WorkDir,
headers,
client,
)
downloader = util.NewChunkDownloader(int(workerCount), util.WorkDir, headers)
} else {
downloader = util.NewDownloader(client)
downloader = util.NewDownloader()
}
return downloader, nil
}

// updateSubtaskStatus 更新任务状态为执行中
func (c *BkRepoClient) updateSubtaskStatus() error {
reqUrl := c.Args.Url + analystTemporaryPrefix + "/scan/subtask/" + c.ToolInput.TaskId + "/status?token=" + c.Args.Token + "&status=EXECUTING"
request, err := http.NewRequest("PUT", reqUrl, nil)
request, err := retryablehttp.NewRequest("PUT", reqUrl, nil)
if err != nil {
return err
}
response, err := http.DefaultClient.Do(request)
response, err := util.DefaultClient.Do(request)
if err != nil {
return err
}
defer response.Body.Close()
defer util.DrainBody(response.Body)
if response.StatusCode != 200 {
return errors.New("更新扫描任务[" + c.ToolInput.TaskId + "]状态失败, status: " + response.Status)
}
Expand Down Expand Up @@ -206,20 +202,21 @@ func (c *BkRepoClient) heartbeat(ctx context.Context) {
ticker.Stop()
return
case <-ticker.C:
request, err := http.NewRequest(http.MethodPost, reqUrl, strings.NewReader(body))
request, err := retryablehttp.NewRequest(http.MethodPost, reqUrl, strings.NewReader(body))
if err != nil {
util.Error("heartbeat failed: " + err.Error())
}
request.Header.Add("Content-Type", "application/x-www-form-urlencoded")
response, err := http.DefaultClient.Do(request)
response, err := util.DefaultClient.Do(request)
if err != nil {
util.Error("heartbeat failed: " + err.Error())
return
}
if response.StatusCode != http.StatusOK {
b, _ := io.ReadAll(response.Body)
util.Error("heartbeat failed: " + response.Status + ", message: " + string(b))
}
response.Body.Close()
util.DrainBody(response.Body)
}
}
}
Expand Down Expand Up @@ -280,11 +277,11 @@ func (c *BkRepoClient) pullToolInput() (*object.ToolInput, error) {
}

func (c *BkRepoClient) doFetchToolInput(url string) (*object.ToolInput, error) {
response, err := http.DefaultClient.Get(url)
response, err := util.DefaultClient.Get(url)
if err != nil {
return nil, err
}
defer response.Body.Close()
defer util.DrainBody(response.Body)
if response.StatusCode != http.StatusOK {
errBody, _ := io.ReadAll(response.Body)
errMsg := fmt.Sprintf(
Expand Down
4 changes: 2 additions & 2 deletions analysis-tool-sdk-golang/api/bkrepo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestCreateDownloader(t *testing.T) {
dialContext := func(ctx context.Context, network, addr string) (net.Conn, error) {
return dialer.DialContext(ctx, network, addr)
}
downloaderClient := (&object.Arguments{}).CustomDownloaderHttpClientDialContext(dialContext)
util.SetDefault(util.CreateHttpClient(util.CreateTransport(dialContext)))
client := createClient()
args := make([]object.Argument, 2)
args = append(args, object.Argument{
Expand All @@ -47,7 +47,7 @@ func TestCreateDownloader(t *testing.T) {
client.ToolInput = &object.ToolInput{
ToolConfig: object.ToolConfig{Args: args},
}
downloader, err := client.createDownloader(downloaderClient)
downloader, err := client.createDownloader()
if err != nil {
t.Fatalf(err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion analysis-tool-sdk-golang/framework/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func doAnalyze(executor Executor, arguments *object.Arguments) {
util.Info("no subtask found, exit")
os.Exit(0)
}
file, err := client.GenerateInputFile(arguments.GetDownloaderClient())
file, err := client.GenerateInputFile()
if err != nil {
client.Failed(cancel, errors.New("Generate input file failed: "+err.Error()))
os.Exit(1)
Expand Down
9 changes: 7 additions & 2 deletions analysis-tool-sdk-golang/go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
module github.com/TencentBlueKing/ci-repoAnalysis/analysis-tool-sdk-golang

go 1.20
go 1.21.0

require golang.org/x/sync v0.3.0 // indirect
require (
github.com/hashicorp/go-retryablehttp v0.7.4
golang.org/x/sync v0.3.0
)

require github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
9 changes: 9 additions & 0 deletions analysis-tool-sdk-golang/go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,11 @@
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48=
github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI=
github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-retryablehttp v0.7.4 h1:ZQgVdpTdAL7WpMIwLzCfbalOcSUdkDZnpUv3/+BxzFA=
github.com/hashicorp/go-retryablehttp v0.7.4/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
38 changes: 0 additions & 38 deletions analysis-tool-sdk-golang/object/argument.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
package object

import (
"context"
"flag"
"fmt"
"net"
"net/http"
"time"
)

// Arguments 输入参数
Expand All @@ -20,7 +16,6 @@ type Arguments struct {
OutputFilePath string
KeepRunning bool
Heartbeat int
downloaderClient *http.Client
}

var args *Arguments
Expand Down Expand Up @@ -81,36 +76,3 @@ func (arg *Arguments) Online() bool {
func (arg *Arguments) ShouldKeepRunning() bool {
return arg.Online() && arg.KeepRunning && arg.TaskId == ""
}

// CustomDownloaderHttpClientDialContext 自定义下载器使用的http客户端DNS解析
func (arg *Arguments) CustomDownloaderHttpClientDialContext(
dialContext func(ctx context.Context, network, addr string) (net.Conn, error),
) *http.Client {
if dialContext == nil {
arg.downloaderClient = http.DefaultClient
} else {
transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: dialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}

arg.downloaderClient = &http.Client{
Transport: transport,
}
}
return arg.downloaderClient
}

// GetDownloaderClient 获取下载器HTTP客户端
func (arg *Arguments) GetDownloaderClient() *http.Client {
if arg.downloaderClient == nil {
return http.DefaultClient
} else {
return arg.downloaderClient
}
}
29 changes: 9 additions & 20 deletions analysis-tool-sdk-golang/util/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/hashicorp/go-retryablehttp"
"golang.org/x/sync/errgroup"
"io"
"net/http"
Expand All @@ -18,26 +19,14 @@ type ChunkDownloader struct {
WorkerCount int
TmpDir string
Headers map[string]string
client *http.Client
}

// NewChunkDownloader 创建分片下载器
func NewChunkDownloader(
workerCount int,
tmpDir string,
headers map[string]string,
client *http.Client,
) *ChunkDownloader {
var c = client
if client == nil {
c = http.DefaultClient
}

func NewChunkDownloader(workerCount int, tmpDir string, headers map[string]string) *ChunkDownloader {
return &ChunkDownloader{
WorkerCount: workerCount,
TmpDir: tmpDir,
Headers: headers,
client: c,
}
}

Expand Down Expand Up @@ -92,19 +81,19 @@ func (d *ChunkDownloader) chunkDownload(url string, outputFile *os.File) error {
func (d *ChunkDownloader) doDownload(ctx context.Context, url string, file *os.File, start int, end int) error {
defer timer(fmt.Sprintf("download chunk %d-%d success,", start, end))()
Info("start download chunk %d-%d", start, end)
req, err := http.NewRequest("GET", url, nil)
req, err := retryablehttp.NewRequest("GET", url, nil)
if err != nil {
return err
}
d.setHeaders(req)
rangeHeader := "bytes=" + strconv.Itoa(start) + "-" + strconv.Itoa(end)
req.Header.Set("Range", rangeHeader)

res, err := d.client.Do(req)
res, err := DefaultClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
defer DrainBody(res.Body)

if res.StatusCode != http.StatusPartialContent {
return errors.New("download chunk failed: " + res.Status)
Expand Down Expand Up @@ -136,13 +125,13 @@ func (d *ChunkDownloader) doDownload(ctx context.Context, url string, file *os.F
}

func (d *ChunkDownloader) getFileSize(url string) (int, error) {
req, _ := http.NewRequest("HEAD", url, nil)
req, _ := retryablehttp.NewRequest("HEAD", url, nil)
d.setHeaders(req)
res, err := d.client.Do(req)
res, err := DefaultClient.Do(req)
if err != nil {
return 0, err
}
defer res.Body.Close()
defer DrainBody(res.Body)

if res.StatusCode != http.StatusOK {
return 0, errors.New("get file size failed, status: " + res.Status)
Expand All @@ -157,7 +146,7 @@ func (d *ChunkDownloader) getFileSize(url string) (int, error) {
return size, nil
}

func (d *ChunkDownloader) setHeaders(req *http.Request) {
func (d *ChunkDownloader) setHeaders(req *retryablehttp.Request) {
for k, v := range d.Headers {
req.Header.Set(k, v)
}
Expand Down
13 changes: 4 additions & 9 deletions analysis-tool-sdk-golang/util/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,10 @@ func TestDownload(t *testing.T) {
errorToPanic(func() error { return os.RemoveAll(tmpDir) })
errorToPanic(func() error { return os.Mkdir(tmpDir, 0766) })

downloader := NewChunkDownloader(
8,
tmpDir,
map[string]string{
"X-BKREPO-DOWNLOAD-REDIRECT-TO": "INNERCOS",
"Authorization": os.Getenv("AUTHORIZATION"),
},
nil,
)
downloader := NewChunkDownloader(8, tmpDir, map[string]string{
"X-BKREPO-DOWNLOAD-REDIRECT-TO": "INNERCOS",
"Authorization": os.Getenv("AUTHORIZATION"),
})
file, err := downloader.Download(os.Getenv("URL"))
h := sha256.New()
if _, err := io.Copy(h, file); err != nil {
Expand Down
18 changes: 5 additions & 13 deletions analysis-tool-sdk-golang/util/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,22 @@ type Downloader interface {
}

// DefaultDownloader 默认下载器实现
type DefaultDownloader struct {
client *http.Client
}
type DefaultDownloader struct{}

// NewDownloader 创建默认下载器
func NewDownloader(client *http.Client) Downloader {
var c = client
if client == nil {
c = http.DefaultClient
}

return &DefaultDownloader{
client: c,
}
func NewDownloader() Downloader {
return &DefaultDownloader{}
}

// Download 从指定url获取输入流
func (d *DefaultDownloader) Download(url string) (io.ReadCloser, error) {
Info("downloading %s", url)
response, err := d.client.Get(url)
response, err := DefaultClient.Get(url)
if err != nil {
return nil, err
}
if response.StatusCode != http.StatusOK {
DrainBody(response.Body)
return nil, errors.New("download failed, status: " + response.Status)
}

Expand Down
11 changes: 7 additions & 4 deletions analysis-tool-sdk-golang/util/log.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package util

import "log"
import (
"fmt"
"log/slog"
)

// Info 输出Info级别日志
func Info(format string, v ...any) {
log.Printf(format+"\n", v...)
slog.Info(fmt.Sprintf(format+"\n", v...))
}

// Warn 输出Warn级别日志
func Warn(format string, v ...any) {
log.Printf(format+"\n", v...)
slog.Warn(fmt.Sprintf(format+"\n", v...))
}

// Error 输出Error级别日志
func Error(format string, v ...any) {
log.Printf(format+"\n", v...)
slog.Error(fmt.Sprintf(format+"\n", v...))
}
Loading

0 comments on commit aa5c8e8

Please sign in to comment.