From c886fd4f9431ee2d4b5ca5047678274a13ed7b48 Mon Sep 17 00:00:00 2001 From: Andrew Gillis Date: Wed, 27 Sep 2023 04:25:15 -0700 Subject: [PATCH] Fix tests that were not doing test correctly (#129) Cleanup some test helper functions. --- dagsync/http_test.go | 3 +- dagsync/subscriber_test.go | 3 +- dagsync/sync_test.go | 216 +++++++++++++++++-------------------- 3 files changed, 102 insertions(+), 120 deletions(-) diff --git a/dagsync/http_test.go b/dagsync/http_test.go index 84a561d..cd35c15 100644 --- a/dagsync/http_test.go +++ b/dagsync/http_test.go @@ -201,6 +201,5 @@ func TestSyncFnHttp(t *testing.T) { } cancelWatcher() - err = assertLatestSyncEquals(te.sub, te.srcHost.ID(), newHead) - require.NoError(t, err) + assertLatestSyncEquals(t, te.sub, te.srcHost.ID(), newHead) } diff --git a/dagsync/subscriber_test.go b/dagsync/subscriber_test.go index 31d6a46..623c531 100644 --- a/dagsync/subscriber_test.go +++ b/dagsync/subscriber_test.go @@ -757,9 +757,10 @@ func TestMaxAsyncSyncs(t *testing.T) { } func waitForSync(t *testing.T, logPrefix string, store *dssync.MutexDatastore, expectedCid cidlink.Link, watcher <-chan dagsync.SyncFinished) { + t.Helper() select { case <-time.After(updateTimeout): - t.Fatal("timed out waiting for sync to propogate") + require.FailNow(t, "timed out waiting for sync to propogate") case downstream := <-watcher: require.Equal(t, expectedCid.Cid, downstream.Cid, "sync'd cid unexpected") _, err := store.Get(context.Background(), datastore.NewKey(downstream.Cid.String())) diff --git a/dagsync/sync_test.go b/dagsync/sync_test.go index 00c7ffa..49f1985 100644 --- a/dagsync/sync_test.go +++ b/dagsync/sync_test.go @@ -2,8 +2,6 @@ package dagsync_test import ( "context" - "errors" - "fmt" "testing" "time" @@ -54,12 +52,9 @@ func TestLatestSyncSuccess(t *testing.T) { // Store the whole chain in source node chainLnks := test.MkChain(srcLnkS, true) - err = newUpdateTest(pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[2], false, chainLnks[2].(cidlink.Link).Cid) - require.NoError(t, err) - err = newUpdateTest(pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[1], false, chainLnks[1].(cidlink.Link).Cid) - require.NoError(t, err) - err = newUpdateTest(pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[0], false, chainLnks[0].(cidlink.Link).Cid) - require.NoError(t, err) + senderAnnounceTest(t, pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[2]) + senderAnnounceTest(t, pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[1]) + senderAnnounceTest(t, pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[0]) } func TestSyncFn(t *testing.T) { @@ -173,8 +168,7 @@ func TestSyncFn(t *testing.T) { require.NoError(t, err, "data not in receiver store") syncncl() - err = assertLatestSyncEquals(sub, srcHost.ID(), newHead) - require.NoError(t, err) + assertLatestSyncEquals(t, sub, srcHost.ID(), newHead) } func TestPartialSync(t *testing.T) { @@ -218,8 +212,7 @@ func TestPartialSync(t *testing.T) { defer cncl() // Fetching first few nodes. - err = newUpdateTest(pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[2], false, chainLnks[2].(cidlink.Link).Cid) - require.NoError(t, err) + senderAnnounceTest(t, pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[2]) // Check that first nodes hadn't been synced _, err = dstStore.Get(context.Background(), datastore.NewKey(chainLnks[3].(cidlink.Link).Cid.String())) @@ -228,12 +221,10 @@ func TestPartialSync(t *testing.T) { // Set latest sync so we pass through one of the links err = sub.SetLatestSync(srcHost.ID(), chainLnks[1].(cidlink.Link).Cid) require.NoError(t, err) - err = assertLatestSyncEquals(sub, srcHost.ID(), chainLnks[1].(cidlink.Link).Cid) - require.NoError(t, err) + assertLatestSyncEquals(t, sub, srcHost.ID(), chainLnks[1].(cidlink.Link).Cid) // Update all the chain from scratch again. - err = newUpdateTest(pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[0], false, chainLnks[0].(cidlink.Link).Cid) - require.NoError(t, err) + senderAnnounceTest(t, pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[0]) // Check if the node we pass through was retrieved _, err = dstStore.Get(context.Background(), datastore.NewKey(chainLnks[1].(cidlink.Link).Cid.String())) @@ -271,15 +262,12 @@ func TestStepByStepSync(t *testing.T) { // Store the whole chain in source node chainLnks := test.MkChain(srcLnkS, true) - // Store half of the chain already in destination - // to simulate the partial sync. - test.MkChain(dstLnkS, true) + // Store half of the chain in destination to simulate the partial sync. + test.MkChain(dstLnkS, false) // Sync the rest of the chain - err = newUpdateTest(pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[1], false, chainLnks[1].(cidlink.Link).Cid) - require.NoError(t, err) - err = newUpdateTest(pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[0], false, chainLnks[0].(cidlink.Link).Cid) - require.NoError(t, err) + senderAnnounceTest(t, pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[1]) + senderAnnounceTest(t, pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[0]) } func TestLatestSyncFailure(t *testing.T) { @@ -310,30 +298,68 @@ func TestLatestSyncFailure(t *testing.T) { err = srcHost.Connect(context.Background(), dstHost.Peerstore().PeerInfo(dstHost.ID())) require.NoError(t, err) - err = sub.SetLatestSync(srcHost.ID(), chainLnks[3].(cidlink.Link).Cid) - require.NoError(t, err) - watcher, cncl := sub.OnSyncFinished() + defer cncl() t.Log("Testing sync fail when the other end does not have the data") - err = newUpdateTest(pub, nil, sub, dstStore, watcher, srcHost.ID(), cidlink.Link{Cid: cid.Undef}, true, chainLnks[3].(cidlink.Link).Cid) - cncl() + pubInfo := peer.AddrInfo{ + ID: pub.ID(), + Addrs: pub.Addrs(), + } + // Announce bad CID. + badCid := test.RandomCids(1)[0] + err = sub.Announce(context.Background(), badCid, pubInfo) require.NoError(t, err) + // Check for fetch failure. + select { + case <-time.After(3 * time.Second): + t.Fatal("timeout waiting for sync finished event") + case event, open := <-watcher: + require.True(t, open) + require.Equal(t, badCid, event.Cid) + require.Error(t, event.Err) + require.ErrorContains(t, event.Err, "non success http fetch response") + } + + cncl() sub.Close() dstStore = dssync.MutexWrap(datastore.NewMapDatastore()) - sub2, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.StrictAdsSelector(false)) + sub2, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, + dagsync.RecvAnnounce(), dagsync.StrictAdsSelector(false)) require.NoError(t, err) defer sub2.Close() - err = sub2.SetLatestSync(srcHost.ID(), chainLnks[3].(cidlink.Link).Cid) + t.Log("Testing hitting stop node works correctly") + + err = sub2.SetLatestSync(srcHost.ID(), chainLnks[2].(cidlink.Link).Cid) require.NoError(t, err) watcher, cncl = sub2.OnSyncFinished() + defer cncl() - t.Log("Testing sync fail when not able to run the full exchange") - err = newUpdateTest(pub, nil, sub2, dstStore, watcher, srcHost.ID(), chainLnks[2], true, chainLnks[3].(cidlink.Link).Cid) - cncl() + rootCid := chainLnks[0].(cidlink.Link).Cid + pub.SetRoot(rootCid) + err = sub2.Announce(context.Background(), rootCid, pubInfo) + require.NoError(t, err) + + select { + case <-time.After(3 * time.Second): + t.Fatal("timeout waiting for sync finished event") + case event, open := <-watcher: + require.True(t, open) + require.Equal(t, rootCid, event.Cid) + require.Equal(t, 3, event.Count) + } + + t.Log("Testing no update when node to sync is stop node") + err = sub2.Announce(context.Background(), rootCid, pubInfo) require.NoError(t, err) + + select { + case <-time.After(time.Second): + case <-watcher: + require.FailNow(t, "should not have received a sync finished event") + } } func TestSyncOnAnnounceDataTransfer(t *testing.T) { @@ -368,12 +394,9 @@ func TestSyncOnAnnounceDataTransfer(t *testing.T) { ID: pub.ID(), Addrs: pub.Addrs(), } - err = newAnnounceTest(pub, sub, dstStore, watcher, pubInfo, chainLnks[2], chainLnks[2].(cidlink.Link).Cid) - require.NoError(t, err) - err = newAnnounceTest(pub, sub, dstStore, watcher, pubInfo, chainLnks[1], chainLnks[1].(cidlink.Link).Cid) - require.NoError(t, err) - err = newAnnounceTest(pub, sub, dstStore, watcher, pubInfo, chainLnks[0], chainLnks[0].(cidlink.Link).Cid) - require.NoError(t, err) + announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[2]) + announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[1]) + announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[0]) } func TestSyncOnAnnounceIPNI(t *testing.T) { @@ -407,12 +430,9 @@ func TestSyncOnAnnounceIPNI(t *testing.T) { ID: pub.ID(), Addrs: pub.Addrs(), } - err = newAnnounceTest(pub, sub, dstStore, watcher, pubInfo, chainLnks[2], chainLnks[2].(cidlink.Link).Cid) - require.NoError(t, err) - err = newAnnounceTest(pub, sub, dstStore, watcher, pubInfo, chainLnks[1], chainLnks[1].(cidlink.Link).Cid) - require.NoError(t, err) - err = newAnnounceTest(pub, sub, dstStore, watcher, pubInfo, chainLnks[0], chainLnks[0].(cidlink.Link).Cid) - require.NoError(t, err) + announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[2]) + announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[1]) + announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[0]) } func TestSyncOnAnnounceHTTP(t *testing.T) { @@ -443,12 +463,9 @@ func TestSyncOnAnnounceHTTP(t *testing.T) { ID: pub.ID(), Addrs: pub.Addrs(), } - err = newAnnounceTest(pub, sub, dstStore, watcher, pubInfo, chainLnks[2], chainLnks[2].(cidlink.Link).Cid) - require.NoError(t, err) - err = newAnnounceTest(pub, sub, dstStore, watcher, pubInfo, chainLnks[1], chainLnks[1].(cidlink.Link).Cid) - require.NoError(t, err) - err = newAnnounceTest(pub, sub, dstStore, watcher, pubInfo, chainLnks[0], chainLnks[0].(cidlink.Link).Cid) - require.NoError(t, err) + announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[2]) + announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[1]) + announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[0]) } func TestCancelDeadlock(t *testing.T) { @@ -514,86 +531,51 @@ func TestCancelDeadlock(t *testing.T) { } } -func newAnnounceTest(pub dagsync.Publisher, sub *dagsync.Subscriber, dstStore datastore.Batching, watcher <-chan dagsync.SyncFinished, peerInfo peer.AddrInfo, lnk ipld.Link, expectedSync cid.Cid) error { - var err error +func announceTest(t *testing.T, pub dagsync.Publisher, sub *dagsync.Subscriber, dstStore datastore.Batching, watcher <-chan dagsync.SyncFinished, peerInfo peer.AddrInfo, lnk ipld.Link) { + t.Helper() c := lnk.(cidlink.Link).Cid if c != cid.Undef { pub.SetRoot(c) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err := sub.Announce(ctx, c, peerInfo) + require.NoError(t, err) } - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - err = sub.Announce(ctx, c, peerInfo) - if err != nil { - return err - } - - select { - case <-time.After(updateTimeout): - return errors.New("timed out waiting for sync to propagate") - case downstream, open := <-watcher: - if !open { - return errors.New("event channel closed without receiving event") - } - if !downstream.Cid.Equals(c) { - return fmt.Errorf("sync returned unexpected cid %s, expected %s", downstream.Cid, c) - } - if _, err = dstStore.Get(context.Background(), datastore.NewKey(downstream.Cid.String())); err != nil { - return fmt.Errorf("data not in receiver store: %s", err) - } - } - - return assertLatestSyncEquals(sub, peerInfo.ID, expectedSync) + waitLatestSync(t, dstStore, watcher, c) + assertLatestSyncEquals(t, sub, peerInfo.ID, c) } -func newUpdateTest(pub dagsync.Publisher, sender announce.Sender, sub *dagsync.Subscriber, dstStore datastore.Batching, watcher <-chan dagsync.SyncFinished, peerID peer.ID, lnk ipld.Link, withFailure bool, expectedSync cid.Cid) error { - var err error +func senderAnnounceTest(t *testing.T, pub dagsync.Publisher, sender announce.Sender, sub *dagsync.Subscriber, dstStore datastore.Batching, watcher <-chan dagsync.SyncFinished, peerID peer.ID, lnk ipld.Link) { + t.Helper() c := lnk.(cidlink.Link).Cid if c != cid.Undef { pub.SetRoot(c) - err = announce.Send(context.Background(), c, pub.Addrs(), sender) - if err != nil { - return err - } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err := announce.Send(ctx, c, pub.Addrs(), sender) + require.NoError(t, err) } + waitLatestSync(t, dstStore, watcher, c) + assertLatestSyncEquals(t, sub, peerID, c) +} - // If failure. then latestSync should not be updated. - if withFailure { - select { - case <-time.After(3 * time.Second): - case changeEvent, open := <-watcher: - if !open { - return nil - } - return fmt.Errorf("no exchange should have been performed, but got change from peer %s for cid %s", changeEvent.PeerID, changeEvent.Cid) - } - } else { - select { - case <-time.After(updateTimeout): - return errors.New("timed out waiting for sync to propagate") - case downstream, open := <-watcher: - if !open { - return errors.New("event channle closed without receiving event") - } - if !downstream.Cid.Equals(c) { - return fmt.Errorf("sync returned unexpected cid %s, expected %s", downstream.Cid, c) - } - if _, err = dstStore.Get(context.Background(), datastore.NewKey(downstream.Cid.String())); err != nil { - return fmt.Errorf("data not in receiver store: %s", err) - } - } +func waitLatestSync(t *testing.T, dstStore datastore.Batching, watcher <-chan dagsync.SyncFinished, expectedSync cid.Cid) { + t.Helper() + select { + case <-time.After(updateTimeout): + require.FailNow(t, "timed out waiting for sync to propagate") + case downstream, open := <-watcher: + require.True(t, open, "event channel closed without receiving event") + require.Equal(t, expectedSync, downstream.Cid, "sync returned unexpected cid") + _, err := dstStore.Get(context.Background(), datastore.NewKey(downstream.Cid.String())) + require.NoError(t, err, "data not in receiver store") } - return assertLatestSyncEquals(sub, peerID, expectedSync) } -func assertLatestSyncEquals(sub *dagsync.Subscriber, peerID peer.ID, want cid.Cid) error { +func assertLatestSyncEquals(t *testing.T, sub *dagsync.Subscriber, peerID peer.ID, want cid.Cid) { + t.Helper() latest := sub.GetLatestSync(peerID) - if latest == nil { - return errors.New("latest sync is nil") - } + require.NotNil(t, latest, "latest sync is nil") got := latest.(cidlink.Link) - if got.Cid != want { - return fmt.Errorf("latestSync not updated correctly, got %s want %s", got, want) - } - return nil + require.Equal(t, want, got.Cid, "latestSync not updated correctly") }