Skip to content

Commit

Permalink
Adding concurrency to s3 operations and stripping absolute path from …
Browse files Browse the repository at this point in the history
…files
  • Loading branch information
nextrevision committed Sep 18, 2015
1 parent 1cedb60 commit 22d0f3a
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 63 deletions.
Empty file added README.md
Empty file.
43 changes: 32 additions & 11 deletions indicies.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package sapt
import (
"bytes"
"compress/gzip"
"os"
"log"
"net/http"
"path/filepath"
"regexp"
"sync"
"text/template"

"github.com/goamz/goamz/s3"
Expand All @@ -27,14 +29,34 @@ type Index struct {
Content []byte
}

func RescanBucket(s *S3) {
func ScanBucketPackages(conn *S3) {
packages := []PackageMetadata{}
contents := s.GetBucketContents()
contents := conn.getBucketContents()
packageList := getBucketPackages(contents)

var wg sync.WaitGroup
wg.Add(len(packageList))
headerChan := make(chan http.Header, 10)
for _, pkg := range packageList {
m := MetadataFromHeaders(s.GetObjectHeaders(pkg))
packages = append(packages, *m)
go func(pkg string) {
headerChan <- conn.getObjectHeaders(pkg)
}(pkg)
}
for i := 0; i < len(packageList); i++ {
go func() {
for {
headers, ok := <-headerChan
if !ok {
break
}
m := MetadataFromHeaders(headers)
packages = append(packages, *m)
wg.Done()
}
}()
}
wg.Wait()

packageIndex := createPackageIndex(packages)

indicies := getIndexPaths(contents)
Expand All @@ -43,15 +65,14 @@ func RescanBucket(s *S3) {
}
for _, index := range indicies {
index.Content = packageIndex
s.UploadPackageIndex(&index)
conn.uploadPackageIndex(&index)
}
}

func getIndexPaths(contents *map[string]s3.Key) []Index {
indicies := []Index{}
pathRe := regexp.MustCompile(`^(.*)/Packages.gz$`)

// TODO: err handle
for key := range *contents {
result := pathRe.FindStringSubmatch(key)
if result != nil {
Expand All @@ -65,7 +86,6 @@ func getIndexPaths(contents *map[string]s3.Key) []Index {
func getBucketPackages(contents *map[string]s3.Key) []string {
packages := []string{}

// TODO: err handle
for key := range *contents {
if filepath.Ext(key) == ".deb" {
packages = append(packages, key)
Expand All @@ -78,9 +98,10 @@ func createPackageIndex(packages []PackageMetadata) []byte {
var buf bytes.Buffer
writer := gzip.NewWriter(&buf)

// TODO: err handles
t, _ := template.New("Package Template").Parse(packagesTemplate)
t.Execute(os.Stdout, packages)
t, err := template.New("Package Template").Parse(packagesTemplate)
if err != nil {
log.Fatal(err)
}
t.Execute(writer, packages)

writer.Close()
Expand Down
13 changes: 9 additions & 4 deletions metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ type PackageMetadata struct {
Vendor string `json:"X-Amz-Meta-Vendor"`
}

func MetadataFromFile(file *os.File) *PackageMetadata {
func MetadataFromFile(path string) *PackageMetadata {
metadata := PackageMetadata{}

path := file.Name()
stat, _ := os.Stat(path)
stat, err := os.Stat(path)
if err != nil {
log.Fatal(err)
}

dpkgOut, err := exec.Command("dpkg-deb", "-f", path).Output()
if err != nil {
Expand Down Expand Up @@ -106,7 +108,10 @@ func MetadataFromHeaders(headers http.Header) *PackageMetadata {
for key, value := range headers {
mapping[key] = value[0]
}
jsonMapping, _ := json.Marshal(mapping)
jsonMapping, err := json.Marshal(mapping)
if err != nil {
log.Fatal(err)
}
json.Unmarshal(jsonMapping, &metadata)
return &metadata
}
Expand Down
88 changes: 60 additions & 28 deletions package.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package sapt

import (
"bytes"
"crypto/md5"
"crypto/sha1"
"crypto/sha256"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"
"sync"
"time"
)

type Package struct {
Expand All @@ -17,46 +20,70 @@ type Package struct {
Metadata *PackageMetadata
}

// TODO: refactor...
func UploadPackages(file *os.File, rm bool, conn *S3) {
defer file.Close()
f, _ := file.Stat()
if f.IsDir() {
filepath.Walk(file.Name(), func(p string, f os.FileInfo, err error) error {
stat, _ := file.Stat()
path := file.Name()
fileList := []string{}
if stat.IsDir() {
filepath.Walk(path, func(p string, f os.FileInfo, err error) error {
if !f.IsDir() && filepath.Ext(p) == ".deb" {
fn, _ := os.Open(p)
defer fn.Close()
conn.UploadPackage(NewPackage(fn))
if rm {
os.Remove(p)
}
fileList = append(fileList, p)
}
return nil
})
} else if filepath.Ext(file.Name()) == ".deb" {
conn.UploadPackage(NewPackage(file))
if rm {
os.Remove(file.Name())
}
} else if filepath.Ext(path) == ".deb" {
fileList = append(fileList, path)
}
file.Close()

var wg sync.WaitGroup
wg.Add(len(fileList))
uploadChan := make(chan *Package, 10)
for _, f := range fileList {
go func(f string) {
uploadChan <- NewPackage(f, path)
}(f)
}
for i := 0; i < len(fileList); i++ {
go func() {
for {
pkg, ok := <-uploadChan
if !ok {
break
}
conn.uploadPackage(pkg)
time.Sleep(time.Millisecond * 100)
wg.Done()
}
}()
}
wg.Wait()
}

func NewPackage(file *os.File) *Package {
path := file.Name()
func NewPackage(path string, basePath string) *Package {
var name string
if path == basePath {
name = basename(path)
} else {
name = strings.Replace(path, basePath, "", 1)
}

// read in file contents
buf := bytes.NewBuffer(nil)
io.Copy(buf, file)
content, err := ioutil.ReadFile(path)
if err != nil {
log.Fatal(err)
}

// retrieve metadata
metadata := MetadataFromFile(file)
metadata.MD5sum = Hash(buf.Bytes(), "md5")
metadata.SHA1 = Hash(buf.Bytes(), "sha1")
metadata.SHA256 = Hash(buf.Bytes(), "sha256")
metadata := MetadataFromFile(path)
metadata.Filename = name
metadata.MD5sum = Hash(content, "md5")
metadata.SHA1 = Hash(content, "sha1")
metadata.SHA256 = Hash(content, "sha256")

return &Package{
Path: path,
Content: buf.Bytes(),
Path: name,
Content: content,
Metadata: metadata,
}
}
Expand All @@ -74,3 +101,8 @@ func Hash(content []byte, crypto string) string {

return hash
}

func basename(path string) string {
i := strings.LastIndex(path, "/")
return path[i:]
}
34 changes: 24 additions & 10 deletions s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ type S3 struct {
}

func ConnectS3(bucket string, region string, public bool) *S3 {
var acl s3.ACL
var auth aws.Auth
var err error

// set auth
auth, err := aws.EnvAuth()
auth, err = aws.EnvAuth()
if err != nil {
auth, err = aws.SharedAuth()
if err != nil {
Expand All @@ -37,7 +41,7 @@ func ConnectS3(bucket string, region string, public bool) *S3 {
}
awsRegion := aws.Regions[region]

acl := s3.ACL("private")
acl = s3.ACL("private")
if public {
acl = s3.ACL("public-read")
}
Expand All @@ -57,7 +61,13 @@ func ConnectS3(bucket string, region string, public bool) *S3 {
}
}

func (s *S3) UploadPackage(pkg *Package) {
func (s *S3) CreateBucket() {
if err := s.Bucket.PutBucket(s.ACL); err != nil {
log.Fatal(err)
}
}

func (s *S3) uploadPackage(pkg *Package) {
fileType := http.DetectContentType(pkg.Content)

opts := s3.Options{
Expand All @@ -70,7 +80,7 @@ func (s *S3) UploadPackage(pkg *Package) {
fmt.Printf("Uploaded %s\n", pkg.Path)
}

func (s *S3) UploadPackageIndex(index *Index) {
func (s *S3) uploadPackageIndex(index *Index) {
path := fmt.Sprintf("%s/Packages.gz", index.Path)
fileType := http.DetectContentType(index.Content)
opts := s3.Options{}
Expand All @@ -80,15 +90,19 @@ func (s *S3) UploadPackageIndex(index *Index) {
fmt.Printf("Uploaded Package Index %s/Packages.gz\n", index.Path)
}

func (s *S3) GetBucketContents() *map[string]s3.Key {
contents, _ := s.Bucket.GetBucketContents()
func (s *S3) getBucketContents() *map[string]s3.Key {
contents, err := s.Bucket.GetBucketContents()
if err != nil {
log.Fatal(err)
}
return contents
}

func (s *S3) GetObjectHeaders(object string) http.Header {
func (s *S3) getObjectHeaders(object string) http.Header {
headers := map[string][]string{}
// TODO: err handle
response, _ := s.Bucket.Head(object, headers)

response, err := s.Bucket.Head(object, headers)
if err != nil {
log.Fatal(err)
}
return response.Header
}
20 changes: 10 additions & 10 deletions sapt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,22 @@ import (
)

var (
app = kingpin.New("sapt", "A humble S3 apt manager")
debug = app.Flag("debug", "Enable debug mode.").Bool()
app = kingpin.New("sapt", "A humble S3 apt manager")

bootstrap = app.Command("bootstrap", "Bootstraps a new bucket")
bootstrapPublic = app.Flag("public", "Make uploaded packages public").Bool()
bootstrapPublic = bootstrap.Flag("public", "Make uploaded packages public").Bool()
bootstrapBucket = bootstrap.Arg("bucket", "Name of bucket to use").Required().String()
bootstrapRegion = bootstrap.Arg("region", "Region to use (defaults to AWS_REGION then us-east-1").String()

upload = app.Command("upload", "Uploads deb packages to S3")
uploadPublic = app.Flag("public", "Make uploaded packages public").Bool()
uploadRm = app.Flag("rm", "Remove local packages after upload").Bool()
uploadPublic = upload.Flag("public", "Make uploaded packages public").Bool()
uploadRm = upload.Flag("rm", "Remove local packages after upload").Bool()
uploadRoot = upload.Arg("package_root", "Root path to packages/directory structure for upload").Required().File()
uploadBucket = upload.Arg("bucket", "Name of bucket to use").Required().String()
uploadRegion = upload.Arg("region", "Region to use (defaults to AWS_REGION then us-east-1").String()

rescan = app.Command("rescan", "Rescan the bucket and generate new indicies")
rescanPublic = app.Flag("public", "Make uploaded packages public").Bool()
rescanPublic = rescan.Flag("public", "Make uploaded packages public").Bool()
rescanBucket = rescan.Arg("bucket", "Name of bucket to use").Required().String()
rescanRegion = rescan.Arg("region", "Region to use (defaults to AWS_REGION then us-east-1").String()
)
Expand All @@ -33,18 +32,19 @@ func main() {
switch kingpin.MustParse(app.Parse(os.Args[1:])) {
// Bootstrap new repo
case bootstrap.FullCommand():
println(*bootstrapBucket)
println(*bootstrapRegion)
s3Conn := sapt.ConnectS3(*bootstrapBucket, *bootstrapRegion, *bootstrapPublic)
s3Conn.CreateBucket()
sapt.ScanBucketPackages(s3Conn)

// Upload packages
case upload.FullCommand():
s3Conn := sapt.ConnectS3(*uploadBucket, *uploadRegion, *uploadPublic)
sapt.UploadPackages(*uploadRoot, *uploadRm, s3Conn)
sapt.RescanBucket(s3Conn)
sapt.ScanBucketPackages(s3Conn)

// Rescan s3 and upload new apt data
case rescan.FullCommand():
s3Conn := sapt.ConnectS3(*rescanBucket, *rescanRegion, *rescanPublic)
sapt.RescanBucket(s3Conn)
sapt.ScanBucketPackages(s3Conn)
}
}

0 comments on commit 22d0f3a

Please sign in to comment.