From f40c032e4d0b87d543d934319a6d8b21421036f0 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Wed, 27 Nov 2024 19:58:54 +0000 Subject: [PATCH] Fixed s3 assist test Signed-off-by: Jimmy Moore --- .../migrator/migrator_s3_assisted_test.go | 45 +++++++++---------- pkg/storage/protocol/from_protocol.go | 3 +- 2 files changed, 23 insertions(+), 25 deletions(-) diff --git a/pkg/storage/migrator/migrator_s3_assisted_test.go b/pkg/storage/migrator/migrator_s3_assisted_test.go index 0dd4731..0515c94 100644 --- a/pkg/storage/migrator/migrator_s3_assisted_test.go +++ b/pkg/storage/migrator/migrator_s3_assisted_test.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "os" - "sync" "testing" "time" @@ -142,9 +141,6 @@ func TestMigratorS3Assisted(t *testing.T) { r1, w1 := io.Pipe() r2, w2 := io.Pipe() - var wgComplete sync.WaitGroup - - wgComplete.Add(1) initDev := func(ctx context.Context, p protocol.Protocol, dev uint32) { destStorageFactory := func(_ *packets.DevInfo) storage.Provider { return provDest @@ -162,11 +158,7 @@ func TestMigratorS3Assisted(t *testing.T) { _ = destFrom.HandleDevInfo() }() go func() { - _ = destFrom.HandleEvent(func(p *packets.Event) { - if p.Type == packets.EventCompleted { - wgComplete.Done() - } - }) + _ = destFrom.HandleEvent(func(p *packets.Event) {}) }() } @@ -205,12 +197,19 @@ func TestMigratorS3Assisted(t *testing.T) { destination.SendEvent(&packets.Event{Type: packets.EventCompleted}) - // Wait for a completion event, which will include the sync / grab from S3. - wgComplete.Wait() - // This will GRAB the data in alternateSources from S3, and return when it's done. // storage.SendSiloEvent(provDest, "sync.start", device.SyncStartConfig{AlternateSources: destFrom.GetAlternateSources(), Destination: provDest}) + // WAIT for the sync to be running + for { + syncRunning := storage.SendSiloEvent(provDest, "sync.running", nil)[0].(bool) + if syncRunning { + break + } else { + time.Sleep(100 * time.Millisecond) + } + } + // This will end with migration completed, and consumer Locked. eq, err := storage.Equals(provSrc, provDest, blockSize) assert.NoError(t, err) @@ -281,10 +280,6 @@ func TestMigratorS3AssistedChangeSource(t *testing.T) { r1, w1 := io.Pipe() r2, w2 := io.Pipe() - var wgComplete sync.WaitGroup - - wgComplete.Add(1) - initDev := func(ctx context.Context, p protocol.Protocol, dev uint32) { destStorageFactory := func(_ *packets.DevInfo) storage.Provider { return provDest @@ -302,11 +297,7 @@ func TestMigratorS3AssistedChangeSource(t *testing.T) { _ = destFrom.HandleDevInfo() }() go func() { - _ = destFrom.HandleEvent(func(p *packets.Event) { - if p.Type == packets.EventCompleted { - wgComplete.Done() - } - }) + _ = destFrom.HandleEvent(func(p *packets.Event) {}) }() } @@ -358,11 +349,19 @@ func TestMigratorS3AssistedChangeSource(t *testing.T) { destination.SendEvent(&packets.Event{Type: packets.EventCompleted}) - wgComplete.Wait() // Wait for the sync/grab from S3 - // This will GRAB the data in alternateSources from S3, and return when it's done. // storage.SendSiloEvent(provDest, "sync.start", device.SyncStartConfig{AlternateSources: destFrom.GetAlternateSources(), Destination: provDest}) + // WAIT for the sync to be running + for { + syncRunning := storage.SendSiloEvent(provDest, "sync.running", nil)[0].(bool) + if syncRunning { + break + } else { + time.Sleep(100 * time.Millisecond) + } + } + // This will end with migration completed, and consumer Locked. eq, err := storage.Equals(provSrc, provDest, blockSize) assert.NoError(t, err) diff --git a/pkg/storage/protocol/from_protocol.go b/pkg/storage/protocol/from_protocol.go index 9c05584..ee9988a 100644 --- a/pkg/storage/protocol/from_protocol.go +++ b/pkg/storage/protocol/from_protocol.go @@ -168,11 +168,10 @@ func (fp *FromProtocol) getAltSourcesStartSync() { as = append(as, fp.alternateSources...) fp.alternateSourcesLock.Unlock() - // If we got a dirty WriteAt, then there's wasted work here, but it won't overwrite since we're using + // If we got a dirty WriteAt or a localWrite (DontNeedAt sent), then there's wasted work here, but it won't overwrite since we're using // WriteCombinator now. // Deal with the sync here... We don't wait... - go storage.SendSiloEvent(fp.prov, "sync.start", storage.SyncStartConfig{ AlternateSources: as, Destination: fp.provAltSources,