From 971195d16ee20561530a11bce044814075abc31b Mon Sep 17 00:00:00 2001 From: Christian Rocha Date: Sun, 9 Aug 2020 20:33:14 -0400 Subject: [PATCH] Initial commit; add everything --- .github/workflows/go.yml | 36 ++++ .gitignore | 3 + LICENSE | 21 +++ README.md | 112 +++++++++++ example/main.go | 49 +++++ go.mod | 14 ++ go.sum | 168 +++++++++++++++++ pipedream.go | 380 ++++++++++++++++++++++++++++++++++++++ pipedream/.goreleaser.yml | 73 ++++++++ pipedream/Makefile | 4 + pipedream/main.go | 185 +++++++++++++++++++ 11 files changed, 1045 insertions(+) create mode 100644 .github/workflows/go.yml create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100644 example/main.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 pipedream.go create mode 100644 pipedream/.goreleaser.yml create mode 100644 pipedream/Makefile create mode 100644 pipedream/main.go diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml new file mode 100644 index 0000000..a6f61ec --- /dev/null +++ b/.github/workflows/go.yml @@ -0,0 +1,36 @@ +name: build + +on: + push: + branches: [ master ] + pull_request: + branches: [ master ] + +jobs: + + build: + name: Build + runs-on: ${{ matrix.platform }} + strategy: + matrix: + platform: [ubuntu-latest, macos-latest, windows-latest] + go-version: [1.11.x, 1.12.x, 1.13.x, 1.14.x] + env: + GO111MODULE: "on" + steps: + + - name: Set up Go 1.x + uses: actions/setup-go@v2 + with: + go-version: ${{ matrix.go-version }} + id: go + + - name: Check out code into the Go module directory + uses: actions/checkout@v2 + + - name: Get dependencies + run: | + go get -v -t -d ./... + + - name: Build + run: go build -v ./... diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9c5a81b --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/pipedream/pipedream +/example/example +/pipedream/dist diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..02f26b0 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2020 Christian Rocha + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..2b758b1 --- /dev/null +++ b/README.md @@ -0,0 +1,112 @@ +Pipe Dream +========== + +

+ Latest Release + GoDoc + Build Status +

+ +Easy multipart uploads for Amazon S3, DigitalOcean Spaces and S3-compatible +services. Available as a CLI and Go library. + +## CLI + +### Install It + +Download a build from the [releases][releases] page. macOS, Linux and Windows builds are available for various architectures. + +macOS users can also use Homebrew: + +``` +brew install meowgorithm/homebrew-tap/pipedream +``` + +Or you can just use `go get`: + +```bash +go get github.com/meowgorithm/pipedream/pipedream +``` + +[releases]: https://github.com/meowgorithm/pipedream/releases + +### Usage + +```bash +# Set your secrets, region and endpoint in the environment +export ACCESS_KEY="..." +export SECRET_KEY="..." +export ENDPOINT="sfo2.digitaloceanspaces.com" # for AWS set REGION + +# Pipe in data or redirect in a file +pipedream --bucket images --path pets/puppy.jpg < puppy.jpg + +# Get fancy +export now=$(date +"%Y-%m-%d_%H:%M:%S_%Z") +cat /data/dump.rdb | gzip | pipedream -bucket backups -path dump-$(now).rdb.gz + +# For more info +pipedream -h +``` + +## Library + +The library uses an event based model, sending events through a channel. + +```go +import "github.com/meowgorithm/pipedream" + +// Create a new multipart upload object +m := pipedream.MultipartUpload{ + AccessKey: os.Getenv("ACCESS_KEY"), + SecretKey: os.Getenv("SECRET_KEY"), + Endpoint: "sfo2.digitaloceanspaces.com", // you could use Region for AWS + Bucket: "my-fave-bucket", +} + +// Get an io.Reader, like an *os.File or os.Stdout +f, err := os.Open("big-redis-dump.rdb") +if err != nil { + fmt.Printf("Rats: %v\n", err) + os.Exit(1) +} +defer f.Close() + +// Send up the data. Pipdream returns a channel where you can listen for events +ch := m.Send(f, "backups/dump.rdb") +done := make(chan struct{}) + +// Listen for activity. For more detailed reporting, see the docs +go func() { + for { + e := <-ch + switch e.(type) { + case pipedream.Complete: + fmt.Println("It worked!") + close(done) + return + case pipedream.Error: + fmt.Println("Rats, it didn't work.") + close(done) + return + } + } +}() + +<-done +``` + +[Full source][example] of this example. For an example with more detailed +reporting, see the source code in the [CLI][cli]. + +[example]: https://github.com/meowgorithm/pipedream/blob/master/example/main.go +[cli]: https://github.com/meowgorithm/pipedream/tree/master/pipedream + +## Awknowledgements + +Thanks to to Apoorva Manjunath‘s [S3 multipart upload example](https://github.com/apoorvam/aws-s3-multipart-upload) +for the S3 implementation details. + +## License + +[MIT](https://github.com/meowgorithm/pipedream/raw/master/LICENSE) diff --git a/example/main.go b/example/main.go new file mode 100644 index 0000000..867a66f --- /dev/null +++ b/example/main.go @@ -0,0 +1,49 @@ +package main + +import ( + "fmt" + "os" + + "github.com/meowgorithm/pipedream" +) + +func main() { + + m := pipedream.MultipartUpload{ + AccessKey: os.Getenv("ACCESS_KEY"), + SecretKey: os.Getenv("SECRET_KEY"), + Endpoint: "sfo2.digitaloceanspaces.com", // you could use Region for AWS + Bucket: "my-fave-bucket", + } + + // Get an io.Reader + f, err := os.Open("big-redis-dump.rdb") + if err != nil { + fmt.Printf("Rats: %v\n", err) + os.Exit(1) + } + defer f.Close() + + // Send it up! Pipdream returns a channel where you can listen for events. + ch := m.Send(f, "backups/dump.rdb") + done := make(chan struct{}) + + // Listen for activity. For more detailed reporting, see the docs. + go func() { + for { + e := <-ch + switch e.(type) { + case pipedream.Complete: + fmt.Println("It worked!") + close(done) + return + case pipedream.Error: + fmt.Println("Rats, it didn't work.") + close(done) + return + } + } + }() + + <-done +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..03c13e5 --- /dev/null +++ b/go.mod @@ -0,0 +1,14 @@ +module github.com/meowgorithm/pipedream + +go 1.14 + +require ( + github.com/aws/aws-sdk-go v1.33.7 + github.com/dustin/go-humanize v1.0.0 + github.com/meowgorithm/babyenv v1.3.0 + github.com/muesli/reflow v0.1.0 + github.com/muesli/termenv v0.7.0 + github.com/spf13/cobra v1.0.0 + github.com/spf13/pflag v1.0.5 // indirect + golang.org/x/sys v0.0.0-20200806125547-5acd03effb82 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..5816329 --- /dev/null +++ b/go.sum @@ -0,0 +1,168 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/aws/aws-sdk-go v1.33.7 h1:vOozL5hmWHHriRviVTQnUwz8l05RS0rehmEFymI+/x8= +github.com/aws/aws-sdk-go v1.33.7/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= +github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= +github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= +github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/goterm v0.0.0-20190703233501-fc88cf888a3f h1:5CjVwnuUcp5adK4gmY6i72gpVFVnZDP2h5TmPScB6u4= +github.com/google/goterm v0.0.0-20190703233501-fc88cf888a3f/go.mod h1:nOFQdrUlIlx6M6ODdSpBj1NVA+VgLC6kmw60mkw34H4= +github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= +github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= +github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= +github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc= +github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= +github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= +github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lucasb-eyer/go-colorful v1.0.3 h1:QIbQXiugsb+q10B+MI+7DI1oQLdmnep86tWFlaaUAac= +github.com/lucasb-eyer/go-colorful v1.0.3/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= +github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= +github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= +github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/meowgorithm/babyenv v1.3.0 h1:klb7ugoZt0/Xlqkd5kLxM7eLZX8waiwxHZWW5nfEZ0Q= +github.com/meowgorithm/babyenv v1.3.0/go.mod h1:lwNX+J6AGBFqNrMZ2PTLkM6SO+W4X8DOg9zBDO4j3Ig= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/muesli/reflow v0.1.0 h1:oQdpLfO56lr5pgLvqD0TcjW85rDjSYSBVdiG1Ch1ddM= +github.com/muesli/reflow v0.1.0/go.mod h1:I9bWAt7QTg/que/qmUCJBGlj7wEq8OAFBjPNjc6xK4I= +github.com/muesli/termenv v0.7.0 h1:KcLfgg/KICGxOxNr+P9gmkrJ5azxOm3WzkfXMePGTq4= +github.com/muesli/termenv v0.7.0/go.mod h1:SohX91w6swWA4AYU+QmPx+aSgXhWO0juiyID9UZmbpA= +github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= +github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cobra v1.0.0 h1:6m/oheQuQ13N9ks4hubMG6BnvwOeaJrqSPLahSnczz8= +github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= +github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= +github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= +github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200806125547-5acd03effb82 h1:6cBnXxYO+CiRVrChvCosSv7magqTPbyAgz1M8iOv5wM= +golang.org/x/sys v0.0.0-20200806125547-5acd03effb82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= +gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pipedream.go b/pipedream.go new file mode 100644 index 0000000..e7b3192 --- /dev/null +++ b/pipedream.go @@ -0,0 +1,380 @@ +// Package pipedream provides a simple interface to multipart Amazon S3 +// uploads. It also works with other S3 compatible services such as +// DigitalOcean's spaces. +// +// The general workflow is to create MultipartUpload struct, run Send on the +// struct, and then listen for events on the returned channel. +// +// Example usage: +// +// package main +// +// import ( +// "fmt" +// "os" +// +// "github.com/meowgorithm/pipedream" +// ) +// +// func main() { +// +// // Prep the multipart upload +// m := pipedream.MultipartUpload{ +// AccessKey: os.Getenv("ACCESS_KEY"), +// SecretKey: os.Getenv("SECRET_KEY"), +// Endpoint: "sfo2.digitaloceanspaces.com", // you could use Region for AWS +// Bucket: "my-fave-bucket", +// } +// +// // Get an io.Reader +// f, err := os.Open("big-redis-dump.rdb") +// if err != nil { +// fmt.Printf("Rats: %v\n", err) +// os.Exit(1) +// } +// defer f.Close() +// +// // Send it up! Pipdream returns a channel where you can listen for events. +// ch := m.Send(f, "backups/dump.rdb") +// done := make(chan struct{}) +// +// // Listen for activity. For more detailed reporting, see the docs below. +// go func() { +// for { +// e := <-ch +// switch e.(type) { +// case pipedream.Complete: +// fmt.Println("It worked!") +// close(done) +// return +// case pipedream.Error: +// fmt.Println("Rats, it didn't work.") +// close(done) +// return +// } +// } +// }() +// +// <-done +// } +// +// There's also a command line interface available at +// https://github.com/meowgorithm/pipedream/pipedream +package pipedream + +import ( + "bytes" + "errors" + "fmt" + "io" + "net/http" + "strings" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" +) + +const ( + Kilobyte int64 = 1024 + Megabyte int64 = Kilobyte * 1024 + DefaultRegion = "us-east-1" +) + +// Event represents activity that occurred during the upload. Events are sent +// through the channel returned by MultipartUpload.Send(). To figure out which +// event was received use a type switch or type assertion. +type Event interface { + // This is a dummy method for type safety. + event() +} + +// Progress is an Event indicating upload progress. It's sent when a part has +// successfully uploaded. +type Progress struct { + PartNumber int + Bytes int +} + +// Retry is an Event indicating there was an error uploading a part and the +// part is being retried. An Error will be send if the retries are exhaused and +// the upload fails. +type Retry struct { + PartNumber int + RetryNumber int + MaxRetries int +} + +// Complete is an Event sent when an upload has completed successfully. When +// a Complete is received there will be no further activity send on the +// channel, so you can confidently move on. +type Complete struct { + Bytes int + Result *s3.CompleteMultipartUploadOutput +} + +// Error is an event indicating that an Error occurred during the upload. When +// an Error is received the operation has failed and no further activity will +// be send, so you can confidently move on. +type Error struct { + Err error +} + +// Error returns the a string representation of the error. It satisfies the +// Error interface. +func (e Error) Error() string { + return e.Err.Error() +} + +// Implement dummy methods to satisfy Event interface. We're doing this for +// type safety. +func (x Progress) event() {} +func (x Retry) event() {} +func (x Complete) event() {} +func (x Error) event() {} + +// MultipartUpload handles multipart uploads to S3 and S3-compatible systems. +type MultipartUpload struct { + Endpoint string + Region string + Bucket string + AccessKey string + SecretKey string + MaxRetries int + MaxPartSize int64 + + svc *s3.S3 + res *s3.CreateMultipartUploadOutput + completedParts []*s3.CompletedPart + currentPartNumber int + path string + reader io.Reader +} + +// Send uploads data from a given io.Reader (such as an *os.File or os.Stdin) +// to a given path in a bucket. +func (m *MultipartUpload) Send(reader io.Reader, path string) chan Event { + m.reader = reader + m.path = path + ch := make(chan Event) + go m.run(ch) + return ch +} + +func (m *MultipartUpload) run(ch chan Event) { + + // Set defaults + if m.MaxRetries == 0 { + m.MaxRetries = 3 + } + if m.MaxPartSize == 0 { + m.MaxPartSize = Megabyte * 5 + } + if m.Endpoint == "" { + m.Endpoint = "nyc3.digitaloceanspaces.com" + } + if m.Region == "" { + m.Region = DefaultRegion + } + + // Validate + var missing []string + if m.AccessKey == "" { + missing = append(missing, "AccessKey") + } + if m.SecretKey == "" { + missing = append(missing, "SecretKey") + } + if m.Bucket == "" { + missing = append(missing, "Bucket") + } + if len(missing) > 0 { + ch <- Error{ + Err: errors.New("missing " + EnglishJoin(missing, true)), + } + return + } + + // Make S3 config + s3Config := &aws.Config{ + Credentials: credentials.NewStaticCredentials(m.AccessKey, m.SecretKey, ""), + Endpoint: aws.String(m.Endpoint), + Region: aws.String(m.Region), + } + + // Init S3 session + newSession := session.New(s3Config) + m.svc = s3.New(newSession) + + // Upload parts + totalBytes := 0 + m.currentPartNumber = 1 + buf := make([]byte, m.MaxPartSize) + for { + + n, err := m.reader.Read(buf) + if err != nil && err == io.EOF { + // There's no more data, so we've successfully uploaded all parts. + break + } + if err != nil { + if abortErr := m.Abort(); abortErr != nil { + ch <- Error{ + Err: fmt.Errorf("upload error: %v, as well as an error aborting the upload: %v", err, abortErr), + } + return + } + ch <- Error{err} + return + } + + // Request the upload if we haven't already. We wait until we've read + // some bytes so we can detect the file type. + if m.res == nil { + input := &s3.CreateMultipartUploadInput{ + Bucket: aws.String(m.Bucket), + Key: aws.String(m.path), + ContentType: aws.String(http.DetectContentType(buf[:n])), + } + + m.res, err = m.svc.CreateMultipartUpload(input) + if err != nil { + ch <- Error{err} + return + } + } + + // Perform the upload + part, err := m.uploadPart(ch, buf[:n], m.currentPartNumber) + if err != nil { + if abortErr := m.Abort(); abortErr != nil { + ch <- Error{ + Err: fmt.Errorf("upload error: %v, as well as an error aborting the upload: %v", err, abortErr), + } + return + } + ch <- Error{err} + return + } + + ch <- Progress{ + PartNumber: m.currentPartNumber, + Bytes: n, + } + + totalBytes += n + m.completedParts = append(m.completedParts, part) + m.currentPartNumber++ + } + + res, err := m.complete() + if err != nil { + ch <- Error{err} + } + ch <- Complete{ + Bytes: totalBytes, + Result: res, + } + +} + +// uploadPart performs the technical S3 stuff to upload one part of the +// multipart upload. If it fails we'll retry based on the number set in +// multipartUploadManager.MaxRetries. +func (m MultipartUpload) uploadPart(ch chan Event, chunk []byte, partNum int) (*s3.CompletedPart, error) { + partInput := &s3.UploadPartInput{ + Body: bytes.NewReader(chunk), + Bucket: m.res.Bucket, + Key: m.res.Key, + PartNumber: aws.Int64(int64(partNum)), + UploadId: m.res.UploadId, + ContentLength: aws.Int64(int64(len(chunk))), + } + + tryNum := 1 + for tryNum <= m.MaxRetries { + + // Attempt to upload part + res, err := m.svc.UploadPart(partInput) + if err != nil { + + // Fail + if tryNum == m.MaxRetries { + if aerr, ok := err.(awserr.Error); ok { + return nil, aerr + } + return nil, err + } + + ch <- Retry{ + PartNumber: m.currentPartNumber, + RetryNumber: tryNum, + MaxRetries: m.MaxRetries, + } + + tryNum++ + + } else { + + // Success + return &s3.CompletedPart{ + ETag: res.ETag, + PartNumber: aws.Int64(int64(partNum)), + }, nil + + } + } + + // This should never happen + return nil, errors.New("could not upload part") +} + +// complete finishes up the upload. This must be called after all parts have +// been sent. +func (m MultipartUpload) complete() (*s3.CompleteMultipartUploadOutput, error) { + return m.svc.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ + Bucket: m.res.Bucket, + Key: m.res.Key, + UploadId: m.res.UploadId, + MultipartUpload: &s3.CompletedMultipartUpload{ + Parts: m.completedParts, + }, + }) +} + +// Abort cancels the upload. +func (m MultipartUpload) Abort() error { + _, err := m.svc.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ + Bucket: m.res.Bucket, + Key: m.res.Key, + UploadId: m.res.UploadId, + }) + return err +} + +// EnglishJoin joins a slice of strings with commas and the word "and" like one +// would in English. Oxford comma optional. +func EnglishJoin(words []string, oxfordComma bool) string { + b := strings.Builder{} + for i, w := range words { + + if i == 0 { + b.WriteString(w) + continue + } + + if len(words) > 1 && i == len(words)-1 { + if oxfordComma && i > 2 { + b.WriteString(",") + } + b.WriteString(" and") + b.WriteString(" " + w) + continue + } + + b.WriteString(", " + w) + } + return b.String() +} diff --git a/pipedream/.goreleaser.yml b/pipedream/.goreleaser.yml new file mode 100644 index 0000000..e82cb27 --- /dev/null +++ b/pipedream/.goreleaser.yml @@ -0,0 +1,73 @@ +project_name: pipedream + +before: + hooks: + - go mod download + +builds: + - id: "pipedream" + binary: pipedream + ldflags: -s -w -X main.Version={{ .Version }} + env: + - CGO_ENABLED=0 + goos: + - linux + - windows + goarch: + - amd64 + - arm64 + - 386 + - arm + goarm: + - 6 + - 7 + - id: "darwin" + binary: pipedream + ldflags: -s -w -X main.Version={{ .Version }} + goos: + - darwin + goarch: + - amd64 + + +archives: + - id: "default" + builds: + - pipedream + format_overrides: + - goos: windows + format: zip + replacements: + linux: Linux + windows: Windows + 386: i386 + amd64: x86_64 + - id: "darwin" + builds: + - darwin + replacements: + darwin: macOS + amd64: x86_64 + +checksum: + name_template: 'checksums.txt' +snapshot: + name_template: "{{ .Tag }}-next" +changelog: + sort: asc + filters: + exclude: + - '^docs:' + - '^test:' + +brews: + - ids: + - darwin + tap: + owner: meowgorithm + name: homebrew-tap + commit_author: + name: "Christian Rocha" + email: "christian@rocha.is" + homepage: "https://github.com/meowgorithm/pipedream" + description: "A multipart uploader for S3 and S3-compatible services." diff --git a/pipedream/Makefile b/pipedream/Makefile new file mode 100644 index 0000000..b3e1552 --- /dev/null +++ b/pipedream/Makefile @@ -0,0 +1,4 @@ +.PHONY: pipedream + +pipedream: + go build -ldflags="-X 'main.Version=$(shell git describe)-next'" diff --git a/pipedream/main.go b/pipedream/main.go new file mode 100644 index 0000000..a6fc73d --- /dev/null +++ b/pipedream/main.go @@ -0,0 +1,185 @@ +package main + +import ( + "errors" + "fmt" + "log" + "os" + "strings" + "time" + + "github.com/dustin/go-humanize" + "github.com/meowgorithm/babyenv" + "github.com/meowgorithm/pipedream" + "github.com/muesli/reflow/indent" + "github.com/muesli/reflow/wordwrap" + "github.com/muesli/termenv" + "github.com/spf13/cobra" +) + +const ( + wrapAt = 78 + errorIndent = 4 +) + +var ( + // Version stores the version of the application. It's set during build + // time. + Version = "(unknown; built from source)" + + color = termenv.ColorProfile().Color + check = termenv.String("✔").Foreground(color("78")).String() + ex = termenv.String("✘").Foreground(color("203")).String() + subtle = termenv.Style{}.Foreground(color("240")).Styled + arrow = subtle(">") + + // Flags + endpoint string + region string + bucket string + remotePath string + maxRetries int + maxPartSize int + silent bool + showVersion bool +) + +type config struct { + AccessKey string `env:"ACCESS_KEY,required"` + SecretKey string `env:"SECRET_KEY,required"` + Endpoint string `env:"ENDPOINT" default:"s3.amazonaws.com"` + Region string `env:"REGION" default:"us-east-1"` +} + +var rootCmd = &cobra.Command{ + Use: "INPUT | pipedream [flags]\n pipedream [flags] < INPUT", + Short: "An S3 multipart uploader", + Long: info(), + Args: cobra.NoArgs, + RunE: run, +} + +func init() { + rootCmd.PersistentFlags().StringVarP(&endpoint, "endpoint", "e", "", "the endpoint to upload to (default \"s3.amazonaws.com\")") + rootCmd.PersistentFlags().StringVarP(®ion, "region", "r", "", "the region to use; AWS only (default \"us-east-1\")") + rootCmd.PersistentFlags().StringVarP(&bucket, "bucket", "b", "", "the bucket/space to upload to") + rootCmd.PersistentFlags().StringVarP(&remotePath, "path", "p", "", "the remote path at which we should put the file") + rootCmd.PersistentFlags().IntVarP(&maxRetries, "retries", "t", 3, "the maximum number of times to retry uploading a part") + rootCmd.PersistentFlags().IntVarP(&maxPartSize, "part-size", "m", 5, "the maximum size per part, in megabytes") + rootCmd.PersistentFlags().BoolVarP(&silent, "silent", "s", false, "silence output, except errors") + rootCmd.PersistentFlags().BoolVarP(&showVersion, "version", "v", false, "output version information") +} + +func info() string { + b := strings.Builder{} + b.WriteString(wordwrap.String("A multipart uploader for Amazon S3, DigitalOcean Spaces, and S3-compatible systems.\n\n", wrapAt)) + b.WriteString("Example:\n\n") + b.WriteString(wordwrap.String(" cat dump.rdb | gzip | pipedream -bucket backups -path dump.rdb.gz\n\n", wrapAt)) + b.WriteString(wordwrap.String("ACCESS_KEY and SECRET_KEY must be set in the environment. ENDPOINT and REGION can also be set in the environment, but corresponding flags will take precedence. Also note that if you're using AWS you don't need to set the endpoint. Conversely, if you're using DigitalOcean you don't need to set the region.\n", wrapAt)) + return b.String() +} + +func run(cmd *cobra.Command, args []string) error { + if showVersion { + fmt.Println(Version) + os.Exit(0) + } + + // Get environment + var cfg config + if err := babyenv.Parse(&cfg); err != nil { + return fmt.Errorf("Could not parse config: %v", err) + } + + var missing []string + + // Validate CLI args + if endpoint == "" && cfg.Endpoint != "" { + endpoint = cfg.Endpoint + } else if endpoint == "" { + missing = append(missing, "endpoint") + } + if region == "" && cfg.Region != "" { + region = cfg.Region + } else if region == "" { + missing = append(missing, "region") + } + if bucket == "" { + missing = append(missing, "bucket") + } + if remotePath == "" { + missing = append(missing, "path") + } + if len(missing) > 0 { + return errors.New(fmt.Sprintf("missing %s", pipedream.EnglishJoin(missing, true))) + } + log.Println(endpoint) + log.Println(region) + + // Is stdin a pipe? + info, err := os.Stdin.Stat() + if err != nil { + return err + } + if info.Mode()&os.ModeCharDevice != 0 || info.Size() <= 0 { + return errors.New("input must be through a pipe") + } + + m := pipedream.MultipartUpload{ + AccessKey: cfg.AccessKey, + SecretKey: cfg.SecretKey, + Endpoint: endpoint, + Region: region, + MaxRetries: maxRetries, + MaxPartSize: pipedream.Megabyte * int64(maxPartSize), + Bucket: bucket, + } + + now := time.Now() + + ch := m.Send(os.Stdin, remotePath) + done := make(chan struct{}) + + fmt.Printf("%s Starting upload...\n", arrow) + + go func() { + for { + select { + case e := <-ch: + switch e := e.(type) { + case pipedream.Progress: + if !silent { + bytes := humanize.Bytes(uint64(e.Bytes)) + fmt.Printf("%s Uploaded part #%d %s\n", arrow, e.PartNumber, subtle(bytes)) + } + case pipedream.Retry: + if !silent { + details := fmt.Sprintf("try %d of %d", e.RetryNumber, e.MaxRetries) + fmt.Printf("Retrying part #%d %s\n", e.PartNumber, subtle(details)) + } + case pipedream.Error: + if !silent { + errMsg := strings.Replace(e.Error(), "\n", "", -1) + errMsg = strings.Replace(errMsg, "\t", " ", -1) + errMsg = indent.String(wordwrap.String(errMsg, wrapAt-errorIndent), errorIndent) + fmt.Printf("%s Upload failed:\n\n%s\n\n", ex, errMsg) + } + close(done) + case pipedream.Complete: + if !silent { + fmt.Printf("%s Done. Sent %s in %s.\n", check, humanize.Bytes(uint64(e.Bytes)), time.Since(now).Round(time.Millisecond)) + } + close(done) + } + } + } + }() + + <-done + + return nil +} + +func main() { + rootCmd.Execute() +}