Skip to content

Commit

Permalink
feat: 支持分片下载 #52
Browse files Browse the repository at this point in the history
  • Loading branch information
cnlkl authored Aug 3, 2023
1 parent c57b611 commit 6639142
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 0 deletions.
2 changes: 2 additions & 0 deletions analysis-tool-sdk-golang/go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
module github.com/TencentBlueKing/ci-repoAnalysis/analysis-tool-sdk-golang

go 1.20

require golang.org/x/sync v0.3.0 // indirect
2 changes: 2 additions & 0 deletions analysis-tool-sdk-golang/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
150 changes: 150 additions & 0 deletions analysis-tool-sdk-golang/util/chunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package util

import (
"context"
"errors"
"fmt"
"golang.org/x/sync/errgroup"
"io"
"net/http"
"os"
"runtime"
"strconv"
"time"
)

// ChunkDownloader 分片下载器
type ChunkDownloader struct {
WorkerCount int
TmpDir string
Headers map[string]string
}

// Download 分片下载
func (d *ChunkDownloader) Download(url string) (io.ReadCloser, error) {
defer timer("chunk download finished,")()
Info("downloading %s", url)
file, err := os.CreateTemp(d.TmpDir, "*-download.tmp")
if err != nil {
return nil, err
}

if err = d.chunkDownload(url, file); err != nil {
return nil, err
}

return file, nil
}

func (d *ChunkDownloader) chunkDownload(url string, outputFile *os.File) error {
fileSize, err := d.getFileSize(url)
if err != nil {
return err
}

var chunkSize int
workCount := runtime.NumCPU()
if d.WorkerCount > 0 {
workCount = d.WorkerCount
}
chunkSize = fileSize / workCount
g, ctx := errgroup.WithContext(context.Background())
g.SetLimit(workCount)

for i := 0; i < workCount; i++ {
start := i * chunkSize
end := start + chunkSize - 1
if i == workCount-1 {
end = fileSize - 1
}

g.Go(
func() error {
return d.doDownload(ctx, url, outputFile, start, end)
},
)
}

return g.Wait()
}

func (d *ChunkDownloader) doDownload(ctx context.Context, url string, file *os.File, start int, end int) error {
defer timer(fmt.Sprintf("download chunk %d-%d success,", start, end))()
Info("start download chunk %d-%d", start, end)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return err
}
d.setHeaders(req)
rangeHeader := "bytes=" + strconv.Itoa(start) + "-" + strconv.Itoa(end)
req.Header.Set("Range", rangeHeader)

res, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()

if res.StatusCode != http.StatusPartialContent {
return errors.New("download chunk failed: " + res.Status)
}

buf := make([]byte, 4*1024)
off := start
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
n, err := res.Body.Read(buf)
if err != nil && err != io.EOF {
return err
}
if n == 0 {
return nil
}

_, err = file.WriteAt(buf[:n], int64(off))
if err != nil {
return err
}
off += n
}

}
}

func (d *ChunkDownloader) getFileSize(url string) (int, error) {
req, _ := http.NewRequest("HEAD", url, nil)
d.setHeaders(req)
res, err := http.DefaultClient.Do(req)
if err != nil {
return 0, err
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
return 0, errors.New("get file size failed, status: " + res.Status)
}

sizeHeader := res.Header.Get("Content-Length")
size, err := strconv.Atoi(sizeHeader)
if err != nil {
return 0, err
}

return size, nil
}

func (d *ChunkDownloader) setHeaders(req *http.Request) {
for k, v := range d.Headers {
req.Header.Set(k, v)
}
}

func timer(name string) func() {
start := time.Now()
return func() {
fmt.Printf("%s took %v\n", name, time.Since(start))
}
}
47 changes: 47 additions & 0 deletions analysis-tool-sdk-golang/util/chunk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package util

import (
"crypto/sha256"
"fmt"
"io"
"os"
"path/filepath"
"testing"
)

func TestDownload(t *testing.T) {
tmpDir := filepath.Join(os.TempDir(), "bkrepo-analysis-download")
errorToPanic(func() error { return os.RemoveAll(tmpDir) })
errorToPanic(func() error { return os.Mkdir(tmpDir, 0766) })

downloader := ChunkDownloader{
WorkerCount: 8,
TmpDir: tmpDir,
Headers: map[string]string{
"X-BKREPO-DOWNLOAD-REDIRECT-TO": "INNERCOS",
"Authorization": os.Getenv("AUTHORIZATION"),
},
}
file, err := downloader.Download(os.Getenv("URL"))
h := sha256.New()
if _, err := io.Copy(h, file); err != nil {
fmt.Println("error calculating hash:", err)
} else {
fmt.Printf("SHA256 hash of file: %x\n", h.Sum(nil))
}

if err != nil {
fmt.Printf("download failed: %s\n", err.Error())
}
if err = file.Close(); err != nil {
fmt.Printf("close file failed: %s\n", err.Error())
}

errorToPanic(func() error { return os.RemoveAll(tmpDir) })
}

func errorToPanic(f func() error) {
if err := f(); err != nil {
panic(err.Error())
}
}

0 comments on commit 6639142

Please sign in to comment.