Skip to content

Commit

Permalink
apply new functions from master branch
Browse files Browse the repository at this point in the history
  • Loading branch information
soarqin committed Oct 26, 2018
1 parent 0666e9f commit fb565b3
Showing 1 changed file with 150 additions and 75 deletions.
225 changes: 150 additions & 75 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/vbauerster/mpb"
"github.com/vbauerster/mpb/decor"
"io"
"log"
"io/ioutil"
"net/http"
"net/url"
Expand All @@ -17,8 +18,11 @@ import (
"sync"
)

const workerCount = 4
const jobsFile = "jobs.json"

type j struct {
index int
Index int `json:"index"`
Url string `json:"l"`
}

Expand All @@ -37,16 +41,24 @@ type lroot struct {
} `json:"log"`
}

type downloadContext struct {
fn, url string
}

type jslice []j

func (a jslice) Len() int { return len(a) }
func (a jslice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a jslice) Less(i, j int) bool { return a[i].index < a[j].index }
func (a jslice) Less(i, j int) bool { return a[i].Index < a[j].Index }

func main() {
if len(os.Args) < 2 {
return
}
if os.Args[1] == "r" {
resume()
return
}
cont, err := ioutil.ReadFile(os.Args[1])
if err != nil {
panic(err)
Expand Down Expand Up @@ -82,87 +94,150 @@ func main() {
if err != nil {
panic(err)
}
jd.index, err = strconv.Atoi(u.Query().Get("qd_index"))
jd.Index, err = strconv.Atoi(u.Query().Get("qd_index"))
if err != nil {
panic(err)
}
key := res[1] + res[2] + res[3]
data[key] = append(data[key], jd)
}
for name, slices := range data {
fmt.Printf("\nDumping files for %s\n", name)
sort.Sort(slices)
cfn := name + ".txt"
c, _ := os.Create(cfn)
lastIndex := 0
var wg sync.WaitGroup
p := mpb.New(mpb.WithWaitGroup(&wg))
downloads := make([][2]string, 0)
for _, v := range slices {
if v.index == lastIndex {
fmt.Printf("!!!!WARNING: Duplicate index %d\n", lastIndex)
continue
}
if v.index != lastIndex+1 {
fmt.Printf("!!!!WARNING: Missing index %d\n", lastIndex+1)
}
lastIndex = v.index
fn := fmt.Sprintf("%s_%d.f4v", name, v.index)
fmt.Fprintf(c, "file '%s'\n", fn)
downloads = append(downloads, [2]string{fn, v.Url})
}
wg.Add(len(downloads))
for _, v := range downloads {
fn := v[0]
b := p.AddBar(100,
mpb.PrependDecorators(
decor.StaticName(fn, 0, decor.DwidthSync|decor.DidentRight),
decor.Elapsed(3, decor.DSyncSpace),
),
mpb.AppendDecorators(
decor.CountersKiloByte("%.1f / %.1f", 10, decor.DSyncSpace),
decor.StaticName(" ", 0, 0),
),
)
go downloadFunc(fn, v[1], &wg, b)
}
p.Stop()
c.Close()
fname := name + ".mp4"
fmt.Printf("->Merging to %s...", fname)
exec.Command("ffmpeg", "-y", "-f", "concat", "-i", cfn, "-c", "copy", fname).Run()
fmt.Println("finished")
}
j, err := json.Marshal(data)
if err != nil {
log.Fatalln(err)
}
f, err := os.Create(jobsFile)
if err != nil {
log.Fatalln(err)
}
f.Write(j)
f.Close()
dumpFunc(data)
}

func downloadFunc(fn string, url string, wg *sync.WaitGroup, b *mpb.Bar) {
defer wg.Done()
resp, err := http.Get(url)
if err != nil {
panic(err)
}
defer resp.Body.Close()
out, err := os.Create(fn)
if err != nil {
panic(err)
}
defer out.Close()
func resume() {
f, err := os.Open(jobsFile)
if err != nil {
log.Fatalln(err)
}
buf, err := ioutil.ReadAll(f)
f.Close()
if err != nil {
log.Fatalln(err)
}
data := make(map[string]jslice)
err = json.Unmarshal(buf, &data)
if err != nil {
log.Fatalln(err)
}
dumpFunc(data)
}

b.SetTotal(int64(resp.ContentLength), true)
func dumpFunc(data map[string]jslice) {
for name, slices := range data {
fmt.Printf("\nDumping files for %s\n", name)
sort.Sort(slices)
cfn := name + ".txt"
c, _ := os.Create(cfn)
lastIndex := 0
var wg sync.WaitGroup
p := mpb.New(mpb.WithWaitGroup(&wg))
downloads := make([][2]string, 0)
for _, v := range slices {
if v.Index == lastIndex {
fmt.Printf("!!!!WARNING: Duplicate index %d\n", lastIndex)
continue
}
if v.Index != lastIndex+1 {
fmt.Printf("!!!!WARNING: Missing index %d\n", lastIndex+1)
}
lastIndex = v.Index
fn := fmt.Sprintf("%s_%d.f4v", name, v.Index)
fmt.Fprintf(c, "file '%s'\n", fn)
downloads = append(downloads, [2]string{fn, v.Url})
}
wg.Add(workerCount)
downloadChan := make(chan *downloadContext, 256)
for i := 0; i < workerCount; i++ {
go downloadFunc(&wg, p, downloadChan)
}
for _, v := range downloads {
downloadChan <- &downloadContext{v[0], v[1]}
}
close(downloadChan)
p.Wait()
c.Close()
fname := name + ".mp4"
fmt.Printf("->Merging to %s...", fname)
exec.Command("ffmpeg", "-y", "-f", "concat", "-i", cfn, "-c", "copy", fname).Run()
fmt.Println("finished")
}
os.Remove(jobsFile)
}

cache := make([]byte, 65536)
for {
cnt, err := resp.Body.Read(cache)
if cnt > 0 {
out.Write(cache[0:cnt])
b.Incr(cnt)
}
if err != nil {
if err == io.EOF {
b.Complete()
break
}
panic(err)
}
}
func downloadFunc(wg *sync.WaitGroup, p *mpb.Progress, dchan chan *downloadContext) {
defer wg.Done()

for {
dc, ok := <-dchan
if !ok {
break
}
b := p.AddBar(100,
mpb.PrependDecorators(
decor.StaticName(dc.fn, decor.WC{W: 0, C: decor.DSyncWidth | decor.DidentRight}),
decor.Elapsed(0, decor.WC{W: 3, C: decor.DSyncSpace}),
),
mpb.AppendDecorators(
decor.Counters(decor.UnitKiB, "% .1f / % .1f"),
),
)
doDownload(dc.fn, dc.url, b)
}
}

func doDownload(fn, url string, b *mpb.Bar) {
client := &http.Client{}
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
log.Fatalln(err)
}
out, err := os.OpenFile(fn, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
log.Fatalln(err)
}
defer out.Close()
off, err := out.Seek(0, io.SeekEnd)
if err != nil {
log.Fatalln(err)
}
if off > 0 {
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", off))
}
resp, err := client.Do(req)
if err != nil {
log.Fatalln(err)
}
defer resp.Body.Close()

if off > 0 {
b.SetTotal(int64(resp.ContentLength+off), false)
b.IncrBy(int(off))
} else {
b.SetTotal(int64(resp.ContentLength), false)
}

cache := make([]byte, 65536)
for {
cnt, err := resp.Body.Read(cache)
if cnt > 0 {
out.Write(cache[0:cnt])
b.IncrBy(cnt)
}
if err != nil {
if err == io.EOF {
break
}
log.Fatalln(err)
}
}
}

0 comments on commit fb565b3

Please sign in to comment.