Skip to content

Commit

Permalink
Improve sync error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
GRMrGecko committed Feb 7, 2024
1 parent c1c7ee3 commit 50823ce
Showing 1 changed file with 60 additions and 24 deletions.
84 changes: 60 additions & 24 deletions command/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (s Sync) Run(c *cli.Context) error {
pipeReader, pipeWriter := io.Pipe() // create a reader, writer pipe to pass commands to run

// Create commands in background.
go s.planRun(c, onlySource, onlyDest, commonObjects, dsturl, strategy, pipeWriter, isBatch)
go s.planRun(ctx, c, onlySource, onlyDest, commonObjects, dsturl, strategy, pipeWriter, isBatch)

err = NewRun(c, pipeReader).Run(ctx)
return multierror.Append(err, merrorWaiter).ErrorOrNil()
Expand Down Expand Up @@ -434,6 +434,7 @@ func (s Sync) getSourceAndDestinationObjects(ctx context.Context, cancel context

// planRun prepares the commands and writes them to writer 'w'.
func (s Sync) planRun(
ctx context.Context,
c *cli.Context,
onlySource, onlyDest chan *url.URL,
common chan *ObjectPair,
Expand All @@ -459,36 +460,58 @@ func (s Sync) planRun(
wg.Add(1)
go func() {
defer wg.Done()
for srcurl := range onlySource {
curDestURL := generateDestinationURL(srcurl, dsturl, isBatch)
command, err := generateCommand(c, "cp", defaultFlags, srcurl, curDestURL)
if err != nil {
printDebug(s.op, err, srcurl, curDestURL)
continue
for {
select {
case srcurl := <-onlySource:
if srcurl == nil {
return
}
curDestURL := generateDestinationURL(srcurl, dsturl, isBatch)
command, err := generateCommand(c, "cp", defaultFlags, srcurl, curDestURL)
if err != nil {
printDebug(s.op, err, srcurl, curDestURL)
continue
}
fmt.Fprintln(w, command)
case <-ctx.Done():
// If the context was cancelled, there was an error.
if err := ctx.Err(); err != nil {
return
}
}
fmt.Fprintln(w, command)
}
}()

// both in source and destination
wg.Add(1)
go func() {
defer wg.Done()
for commonObject := range common {
sourceObject, destObject := commonObject.src, commonObject.dst
curSourceURL, curDestURL := sourceObject.URL, destObject.URL
err := strategy.ShouldSync(sourceObject, destObject) // check if object should be copied.
if err != nil {
printDebug(s.op, err, curSourceURL, curDestURL)
continue
}
for {
select {
case commonObject := <-common:
if commonObject == nil {
return
}
sourceObject, destObject := commonObject.src, commonObject.dst
curSourceURL, curDestURL := sourceObject.URL, destObject.URL
err := strategy.ShouldSync(sourceObject, destObject) // check if object should be copied.
if err != nil {
printDebug(s.op, err, curSourceURL, curDestURL)
continue
}

command, err := generateCommand(c, "cp", defaultFlags, curSourceURL, curDestURL)
if err != nil {
printDebug(s.op, err, curSourceURL, curDestURL)
continue
command, err := generateCommand(c, "cp", defaultFlags, curSourceURL, curDestURL)
if err != nil {
printDebug(s.op, err, curSourceURL, curDestURL)
continue
}
fmt.Fprintln(w, command)
case <-ctx.Done():
// If the context was cancelled, there was an error.
if err := ctx.Err(); err != nil {
return
}
}
fmt.Fprintln(w, command)
}
}()

Expand All @@ -501,8 +524,21 @@ func (s Sync) planRun(
// or rewrite generateCommand function?
dstURLs := make([]*url.URL, 0, extsortChunkSize)

for d := range onlyDest {
dstURLs = append(dstURLs, d)
finished := false
for !finished {
select {
case d := <-onlyDest:
if d == nil {
finished = true
break
}
dstURLs = append(dstURLs, d)
case <-ctx.Done():
// If the context was cancelled, there was an error.
if err := ctx.Err(); err != nil {
return
}
}
}

if len(dstURLs) == 0 {
Expand Down Expand Up @@ -576,7 +612,7 @@ func (s Sync) shouldStopSync(err error) bool {
}
if awsErr, ok := err.(awserr.Error); ok {
switch awsErr.Code() {
case "AccessDenied", "NoSuchBucket":
case "AccessDenied", "NoSuchBucket", "RequestError", "SerializationError":
return true
}
}
Expand Down

0 comments on commit 50823ce

Please sign in to comment.