Skip to content

Commit

Permalink
Fixed s3 assist test
Browse files Browse the repository at this point in the history
Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>
  • Loading branch information
jimmyaxod committed Nov 27, 2024
1 parent fd254ec commit f40c032
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 25 deletions.
45 changes: 22 additions & 23 deletions pkg/storage/migrator/migrator_s3_assisted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"io"
"os"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -142,9 +141,6 @@ func TestMigratorS3Assisted(t *testing.T) {
r1, w1 := io.Pipe()
r2, w2 := io.Pipe()

var wgComplete sync.WaitGroup

wgComplete.Add(1)
initDev := func(ctx context.Context, p protocol.Protocol, dev uint32) {
destStorageFactory := func(_ *packets.DevInfo) storage.Provider {
return provDest
Expand All @@ -162,11 +158,7 @@ func TestMigratorS3Assisted(t *testing.T) {
_ = destFrom.HandleDevInfo()
}()
go func() {
_ = destFrom.HandleEvent(func(p *packets.Event) {
if p.Type == packets.EventCompleted {
wgComplete.Done()
}
})
_ = destFrom.HandleEvent(func(p *packets.Event) {})

Check failure on line 161 in pkg/storage/migrator/migrator_s3_assisted_test.go

View workflow job for this annotation

GitHub Actions / golang

unused-parameter: parameter 'p' seems to be unused, consider removing or renaming it as _ (revive)
}()
}

Expand Down Expand Up @@ -205,12 +197,19 @@ func TestMigratorS3Assisted(t *testing.T) {

destination.SendEvent(&packets.Event{Type: packets.EventCompleted})

// Wait for a completion event, which will include the sync / grab from S3.
wgComplete.Wait()

// This will GRAB the data in alternateSources from S3, and return when it's done.
// storage.SendSiloEvent(provDest, "sync.start", device.SyncStartConfig{AlternateSources: destFrom.GetAlternateSources(), Destination: provDest})

// WAIT for the sync to be running
for {
syncRunning := storage.SendSiloEvent(provDest, "sync.running", nil)[0].(bool)
if syncRunning {
break
} else {

Check failure on line 208 in pkg/storage/migrator/migrator_s3_assisted_test.go

View workflow job for this annotation

GitHub Actions / golang

superfluous-else: if block ends with a break statement, so drop this else and outdent its block (revive)
time.Sleep(100 * time.Millisecond)
}
}

// This will end with migration completed, and consumer Locked.
eq, err := storage.Equals(provSrc, provDest, blockSize)
assert.NoError(t, err)
Expand Down Expand Up @@ -281,10 +280,6 @@ func TestMigratorS3AssistedChangeSource(t *testing.T) {
r1, w1 := io.Pipe()
r2, w2 := io.Pipe()

var wgComplete sync.WaitGroup

wgComplete.Add(1)

initDev := func(ctx context.Context, p protocol.Protocol, dev uint32) {
destStorageFactory := func(_ *packets.DevInfo) storage.Provider {
return provDest
Expand All @@ -302,11 +297,7 @@ func TestMigratorS3AssistedChangeSource(t *testing.T) {
_ = destFrom.HandleDevInfo()
}()
go func() {
_ = destFrom.HandleEvent(func(p *packets.Event) {
if p.Type == packets.EventCompleted {
wgComplete.Done()
}
})
_ = destFrom.HandleEvent(func(p *packets.Event) {})

Check failure on line 300 in pkg/storage/migrator/migrator_s3_assisted_test.go

View workflow job for this annotation

GitHub Actions / golang

unused-parameter: parameter 'p' seems to be unused, consider removing or renaming it as _ (revive)
}()
}

Expand Down Expand Up @@ -358,11 +349,19 @@ func TestMigratorS3AssistedChangeSource(t *testing.T) {

destination.SendEvent(&packets.Event{Type: packets.EventCompleted})

wgComplete.Wait() // Wait for the sync/grab from S3

// This will GRAB the data in alternateSources from S3, and return when it's done.
// storage.SendSiloEvent(provDest, "sync.start", device.SyncStartConfig{AlternateSources: destFrom.GetAlternateSources(), Destination: provDest})

// WAIT for the sync to be running
for {
syncRunning := storage.SendSiloEvent(provDest, "sync.running", nil)[0].(bool)
if syncRunning {
break
} else {

Check failure on line 360 in pkg/storage/migrator/migrator_s3_assisted_test.go

View workflow job for this annotation

GitHub Actions / golang

superfluous-else: if block ends with a break statement, so drop this else and outdent its block (revive)
time.Sleep(100 * time.Millisecond)
}
}

// This will end with migration completed, and consumer Locked.
eq, err := storage.Equals(provSrc, provDest, blockSize)
assert.NoError(t, err)
Expand Down
3 changes: 1 addition & 2 deletions pkg/storage/protocol/from_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,10 @@ func (fp *FromProtocol) getAltSourcesStartSync() {
as = append(as, fp.alternateSources...)
fp.alternateSourcesLock.Unlock()

// If we got a dirty WriteAt, then there's wasted work here, but it won't overwrite since we're using
// If we got a dirty WriteAt or a localWrite (DontNeedAt sent), then there's wasted work here, but it won't overwrite since we're using
// WriteCombinator now.

// Deal with the sync here... We don't wait...

go storage.SendSiloEvent(fp.prov, "sync.start", storage.SyncStartConfig{
AlternateSources: as,
Destination: fp.provAltSources,
Expand Down

0 comments on commit f40c032

Please sign in to comment.