From fb565b3cbd171d0860b4b2216388c72f80c87af2 Mon Sep 17 00:00:00 2001 From: Soar Qin Date: Fri, 26 Oct 2018 14:19:33 +0800 Subject: [PATCH] apply new functions from master branch --- main.go | 225 +++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 150 insertions(+), 75 deletions(-) diff --git a/main.go b/main.go index 4c11a6f..0032fb7 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "github.com/vbauerster/mpb" "github.com/vbauerster/mpb/decor" "io" + "log" "io/ioutil" "net/http" "net/url" @@ -17,8 +18,11 @@ import ( "sync" ) +const workerCount = 4 +const jobsFile = "jobs.json" + type j struct { - index int + Index int `json:"index"` Url string `json:"l"` } @@ -37,16 +41,24 @@ type lroot struct { } `json:"log"` } +type downloadContext struct { + fn, url string +} + type jslice []j func (a jslice) Len() int { return len(a) } func (a jslice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a jslice) Less(i, j int) bool { return a[i].index < a[j].index } +func (a jslice) Less(i, j int) bool { return a[i].Index < a[j].Index } func main() { if len(os.Args) < 2 { return } + if os.Args[1] == "r" { + resume() + return + } cont, err := ioutil.ReadFile(os.Args[1]) if err != nil { panic(err) @@ -82,87 +94,150 @@ func main() { if err != nil { panic(err) } - jd.index, err = strconv.Atoi(u.Query().Get("qd_index")) + jd.Index, err = strconv.Atoi(u.Query().Get("qd_index")) if err != nil { panic(err) } key := res[1] + res[2] + res[3] data[key] = append(data[key], jd) } - for name, slices := range data { - fmt.Printf("\nDumping files for %s\n", name) - sort.Sort(slices) - cfn := name + ".txt" - c, _ := os.Create(cfn) - lastIndex := 0 - var wg sync.WaitGroup - p := mpb.New(mpb.WithWaitGroup(&wg)) - downloads := make([][2]string, 0) - for _, v := range slices { - if v.index == lastIndex { - fmt.Printf("!!!!WARNING: Duplicate index %d\n", lastIndex) - continue - } - if v.index != lastIndex+1 { - fmt.Printf("!!!!WARNING: Missing index %d\n", lastIndex+1) - } - lastIndex = v.index - fn := fmt.Sprintf("%s_%d.f4v", name, v.index) - fmt.Fprintf(c, "file '%s'\n", fn) - downloads = append(downloads, [2]string{fn, v.Url}) - } - wg.Add(len(downloads)) - for _, v := range downloads { - fn := v[0] - b := p.AddBar(100, - mpb.PrependDecorators( - decor.StaticName(fn, 0, decor.DwidthSync|decor.DidentRight), - decor.Elapsed(3, decor.DSyncSpace), - ), - mpb.AppendDecorators( - decor.CountersKiloByte("%.1f / %.1f", 10, decor.DSyncSpace), - decor.StaticName(" ", 0, 0), - ), - ) - go downloadFunc(fn, v[1], &wg, b) - } - p.Stop() - c.Close() - fname := name + ".mp4" - fmt.Printf("->Merging to %s...", fname) - exec.Command("ffmpeg", "-y", "-f", "concat", "-i", cfn, "-c", "copy", fname).Run() - fmt.Println("finished") - } + j, err := json.Marshal(data) + if err != nil { + log.Fatalln(err) + } + f, err := os.Create(jobsFile) + if err != nil { + log.Fatalln(err) + } + f.Write(j) + f.Close() + dumpFunc(data) } -func downloadFunc(fn string, url string, wg *sync.WaitGroup, b *mpb.Bar) { - defer wg.Done() - resp, err := http.Get(url) - if err != nil { - panic(err) - } - defer resp.Body.Close() - out, err := os.Create(fn) - if err != nil { - panic(err) - } - defer out.Close() +func resume() { + f, err := os.Open(jobsFile) + if err != nil { + log.Fatalln(err) + } + buf, err := ioutil.ReadAll(f) + f.Close() + if err != nil { + log.Fatalln(err) + } + data := make(map[string]jslice) + err = json.Unmarshal(buf, &data) + if err != nil { + log.Fatalln(err) + } + dumpFunc(data) +} - b.SetTotal(int64(resp.ContentLength), true) +func dumpFunc(data map[string]jslice) { + for name, slices := range data { + fmt.Printf("\nDumping files for %s\n", name) + sort.Sort(slices) + cfn := name + ".txt" + c, _ := os.Create(cfn) + lastIndex := 0 + var wg sync.WaitGroup + p := mpb.New(mpb.WithWaitGroup(&wg)) + downloads := make([][2]string, 0) + for _, v := range slices { + if v.Index == lastIndex { + fmt.Printf("!!!!WARNING: Duplicate index %d\n", lastIndex) + continue + } + if v.Index != lastIndex+1 { + fmt.Printf("!!!!WARNING: Missing index %d\n", lastIndex+1) + } + lastIndex = v.Index + fn := fmt.Sprintf("%s_%d.f4v", name, v.Index) + fmt.Fprintf(c, "file '%s'\n", fn) + downloads = append(downloads, [2]string{fn, v.Url}) + } + wg.Add(workerCount) + downloadChan := make(chan *downloadContext, 256) + for i := 0; i < workerCount; i++ { + go downloadFunc(&wg, p, downloadChan) + } + for _, v := range downloads { + downloadChan <- &downloadContext{v[0], v[1]} + } + close(downloadChan) + p.Wait() + c.Close() + fname := name + ".mp4" + fmt.Printf("->Merging to %s...", fname) + exec.Command("ffmpeg", "-y", "-f", "concat", "-i", cfn, "-c", "copy", fname).Run() + fmt.Println("finished") + } + os.Remove(jobsFile) +} - cache := make([]byte, 65536) - for { - cnt, err := resp.Body.Read(cache) - if cnt > 0 { - out.Write(cache[0:cnt]) - b.Incr(cnt) - } - if err != nil { - if err == io.EOF { - b.Complete() - break - } - panic(err) - } - } +func downloadFunc(wg *sync.WaitGroup, p *mpb.Progress, dchan chan *downloadContext) { + defer wg.Done() + + for { + dc, ok := <-dchan + if !ok { + break + } + b := p.AddBar(100, + mpb.PrependDecorators( + decor.StaticName(dc.fn, decor.WC{W: 0, C: decor.DSyncWidth | decor.DidentRight}), + decor.Elapsed(0, decor.WC{W: 3, C: decor.DSyncSpace}), + ), + mpb.AppendDecorators( + decor.Counters(decor.UnitKiB, "% .1f / % .1f"), + ), + ) + doDownload(dc.fn, dc.url, b) + } +} + +func doDownload(fn, url string, b *mpb.Bar) { + client := &http.Client{} + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + log.Fatalln(err) + } + out, err := os.OpenFile(fn, os.O_RDWR|os.O_CREATE, 0666) + if err != nil { + log.Fatalln(err) + } + defer out.Close() + off, err := out.Seek(0, io.SeekEnd) + if err != nil { + log.Fatalln(err) + } + if off > 0 { + req.Header.Set("Range", fmt.Sprintf("bytes=%d-", off)) + } + resp, err := client.Do(req) + if err != nil { + log.Fatalln(err) + } + defer resp.Body.Close() + + if off > 0 { + b.SetTotal(int64(resp.ContentLength+off), false) + b.IncrBy(int(off)) + } else { + b.SetTotal(int64(resp.ContentLength), false) + } + + cache := make([]byte, 65536) + for { + cnt, err := resp.Body.Read(cache) + if cnt > 0 { + out.Write(cache[0:cnt]) + b.IncrBy(cnt) + } + if err != nil { + if err == io.EOF { + break + } + log.Fatalln(err) + } + } }