Skip to content

Commit

Permalink
Fix tests that were not doing test correctly (#129)
Browse files Browse the repository at this point in the history
Cleanup some test helper functions.
  • Loading branch information
gammazero authored Sep 27, 2023
1 parent f686b6d commit c886fd4
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 120 deletions.
3 changes: 1 addition & 2 deletions dagsync/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,5 @@ func TestSyncFnHttp(t *testing.T) {
}
cancelWatcher()

err = assertLatestSyncEquals(te.sub, te.srcHost.ID(), newHead)
require.NoError(t, err)
assertLatestSyncEquals(t, te.sub, te.srcHost.ID(), newHead)
}
3 changes: 2 additions & 1 deletion dagsync/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,9 +757,10 @@ func TestMaxAsyncSyncs(t *testing.T) {
}

func waitForSync(t *testing.T, logPrefix string, store *dssync.MutexDatastore, expectedCid cidlink.Link, watcher <-chan dagsync.SyncFinished) {
t.Helper()
select {
case <-time.After(updateTimeout):
t.Fatal("timed out waiting for sync to propogate")
require.FailNow(t, "timed out waiting for sync to propogate")
case downstream := <-watcher:
require.Equal(t, expectedCid.Cid, downstream.Cid, "sync'd cid unexpected")
_, err := store.Get(context.Background(), datastore.NewKey(downstream.Cid.String()))
Expand Down
216 changes: 99 additions & 117 deletions dagsync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package dagsync_test

import (
"context"
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -54,12 +52,9 @@ func TestLatestSyncSuccess(t *testing.T) {
// Store the whole chain in source node
chainLnks := test.MkChain(srcLnkS, true)

err = newUpdateTest(pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[2], false, chainLnks[2].(cidlink.Link).Cid)
require.NoError(t, err)
err = newUpdateTest(pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[1], false, chainLnks[1].(cidlink.Link).Cid)
require.NoError(t, err)
err = newUpdateTest(pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[0], false, chainLnks[0].(cidlink.Link).Cid)
require.NoError(t, err)
senderAnnounceTest(t, pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[2])
senderAnnounceTest(t, pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[1])
senderAnnounceTest(t, pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[0])
}

func TestSyncFn(t *testing.T) {
Expand Down Expand Up @@ -173,8 +168,7 @@ func TestSyncFn(t *testing.T) {
require.NoError(t, err, "data not in receiver store")
syncncl()

err = assertLatestSyncEquals(sub, srcHost.ID(), newHead)
require.NoError(t, err)
assertLatestSyncEquals(t, sub, srcHost.ID(), newHead)
}

func TestPartialSync(t *testing.T) {
Expand Down Expand Up @@ -218,8 +212,7 @@ func TestPartialSync(t *testing.T) {
defer cncl()

// Fetching first few nodes.
err = newUpdateTest(pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[2], false, chainLnks[2].(cidlink.Link).Cid)
require.NoError(t, err)
senderAnnounceTest(t, pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[2])

// Check that first nodes hadn't been synced
_, err = dstStore.Get(context.Background(), datastore.NewKey(chainLnks[3].(cidlink.Link).Cid.String()))
Expand All @@ -228,12 +221,10 @@ func TestPartialSync(t *testing.T) {
// Set latest sync so we pass through one of the links
err = sub.SetLatestSync(srcHost.ID(), chainLnks[1].(cidlink.Link).Cid)
require.NoError(t, err)
err = assertLatestSyncEquals(sub, srcHost.ID(), chainLnks[1].(cidlink.Link).Cid)
require.NoError(t, err)
assertLatestSyncEquals(t, sub, srcHost.ID(), chainLnks[1].(cidlink.Link).Cid)

// Update all the chain from scratch again.
err = newUpdateTest(pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[0], false, chainLnks[0].(cidlink.Link).Cid)
require.NoError(t, err)
senderAnnounceTest(t, pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[0])

// Check if the node we pass through was retrieved
_, err = dstStore.Get(context.Background(), datastore.NewKey(chainLnks[1].(cidlink.Link).Cid.String()))
Expand Down Expand Up @@ -271,15 +262,12 @@ func TestStepByStepSync(t *testing.T) {
// Store the whole chain in source node
chainLnks := test.MkChain(srcLnkS, true)

// Store half of the chain already in destination
// to simulate the partial sync.
test.MkChain(dstLnkS, true)
// Store half of the chain in destination to simulate the partial sync.
test.MkChain(dstLnkS, false)

// Sync the rest of the chain
err = newUpdateTest(pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[1], false, chainLnks[1].(cidlink.Link).Cid)
require.NoError(t, err)
err = newUpdateTest(pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[0], false, chainLnks[0].(cidlink.Link).Cid)
require.NoError(t, err)
senderAnnounceTest(t, pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[1])
senderAnnounceTest(t, pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[0])
}

func TestLatestSyncFailure(t *testing.T) {
Expand Down Expand Up @@ -310,30 +298,68 @@ func TestLatestSyncFailure(t *testing.T) {
err = srcHost.Connect(context.Background(), dstHost.Peerstore().PeerInfo(dstHost.ID()))
require.NoError(t, err)

err = sub.SetLatestSync(srcHost.ID(), chainLnks[3].(cidlink.Link).Cid)
require.NoError(t, err)

watcher, cncl := sub.OnSyncFinished()
defer cncl()

t.Log("Testing sync fail when the other end does not have the data")
err = newUpdateTest(pub, nil, sub, dstStore, watcher, srcHost.ID(), cidlink.Link{Cid: cid.Undef}, true, chainLnks[3].(cidlink.Link).Cid)
cncl()
pubInfo := peer.AddrInfo{
ID: pub.ID(),
Addrs: pub.Addrs(),
}
// Announce bad CID.
badCid := test.RandomCids(1)[0]
err = sub.Announce(context.Background(), badCid, pubInfo)
require.NoError(t, err)
// Check for fetch failure.
select {
case <-time.After(3 * time.Second):
t.Fatal("timeout waiting for sync finished event")
case event, open := <-watcher:
require.True(t, open)
require.Equal(t, badCid, event.Cid)
require.Error(t, event.Err)
require.ErrorContains(t, event.Err, "non success http fetch response")
}

cncl()
sub.Close()

dstStore = dssync.MutexWrap(datastore.NewMapDatastore())
sub2, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.StrictAdsSelector(false))
sub2, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
dagsync.RecvAnnounce(), dagsync.StrictAdsSelector(false))
require.NoError(t, err)
defer sub2.Close()

err = sub2.SetLatestSync(srcHost.ID(), chainLnks[3].(cidlink.Link).Cid)
t.Log("Testing hitting stop node works correctly")

err = sub2.SetLatestSync(srcHost.ID(), chainLnks[2].(cidlink.Link).Cid)
require.NoError(t, err)
watcher, cncl = sub2.OnSyncFinished()
defer cncl()

t.Log("Testing sync fail when not able to run the full exchange")
err = newUpdateTest(pub, nil, sub2, dstStore, watcher, srcHost.ID(), chainLnks[2], true, chainLnks[3].(cidlink.Link).Cid)
cncl()
rootCid := chainLnks[0].(cidlink.Link).Cid
pub.SetRoot(rootCid)
err = sub2.Announce(context.Background(), rootCid, pubInfo)
require.NoError(t, err)

select {
case <-time.After(3 * time.Second):
t.Fatal("timeout waiting for sync finished event")
case event, open := <-watcher:
require.True(t, open)
require.Equal(t, rootCid, event.Cid)
require.Equal(t, 3, event.Count)
}

t.Log("Testing no update when node to sync is stop node")
err = sub2.Announce(context.Background(), rootCid, pubInfo)
require.NoError(t, err)

select {
case <-time.After(time.Second):
case <-watcher:
require.FailNow(t, "should not have received a sync finished event")
}
}

func TestSyncOnAnnounceDataTransfer(t *testing.T) {
Expand Down Expand Up @@ -368,12 +394,9 @@ func TestSyncOnAnnounceDataTransfer(t *testing.T) {
ID: pub.ID(),
Addrs: pub.Addrs(),
}
err = newAnnounceTest(pub, sub, dstStore, watcher, pubInfo, chainLnks[2], chainLnks[2].(cidlink.Link).Cid)
require.NoError(t, err)
err = newAnnounceTest(pub, sub, dstStore, watcher, pubInfo, chainLnks[1], chainLnks[1].(cidlink.Link).Cid)
require.NoError(t, err)
err = newAnnounceTest(pub, sub, dstStore, watcher, pubInfo, chainLnks[0], chainLnks[0].(cidlink.Link).Cid)
require.NoError(t, err)
announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[2])
announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[1])
announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[0])
}

func TestSyncOnAnnounceIPNI(t *testing.T) {
Expand Down Expand Up @@ -407,12 +430,9 @@ func TestSyncOnAnnounceIPNI(t *testing.T) {
ID: pub.ID(),
Addrs: pub.Addrs(),
}
err = newAnnounceTest(pub, sub, dstStore, watcher, pubInfo, chainLnks[2], chainLnks[2].(cidlink.Link).Cid)
require.NoError(t, err)
err = newAnnounceTest(pub, sub, dstStore, watcher, pubInfo, chainLnks[1], chainLnks[1].(cidlink.Link).Cid)
require.NoError(t, err)
err = newAnnounceTest(pub, sub, dstStore, watcher, pubInfo, chainLnks[0], chainLnks[0].(cidlink.Link).Cid)
require.NoError(t, err)
announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[2])
announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[1])
announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[0])
}

func TestSyncOnAnnounceHTTP(t *testing.T) {
Expand Down Expand Up @@ -443,12 +463,9 @@ func TestSyncOnAnnounceHTTP(t *testing.T) {
ID: pub.ID(),
Addrs: pub.Addrs(),
}
err = newAnnounceTest(pub, sub, dstStore, watcher, pubInfo, chainLnks[2], chainLnks[2].(cidlink.Link).Cid)
require.NoError(t, err)
err = newAnnounceTest(pub, sub, dstStore, watcher, pubInfo, chainLnks[1], chainLnks[1].(cidlink.Link).Cid)
require.NoError(t, err)
err = newAnnounceTest(pub, sub, dstStore, watcher, pubInfo, chainLnks[0], chainLnks[0].(cidlink.Link).Cid)
require.NoError(t, err)
announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[2])
announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[1])
announceTest(t, pub, sub, dstStore, watcher, pubInfo, chainLnks[0])
}

func TestCancelDeadlock(t *testing.T) {
Expand Down Expand Up @@ -514,86 +531,51 @@ func TestCancelDeadlock(t *testing.T) {
}
}

func newAnnounceTest(pub dagsync.Publisher, sub *dagsync.Subscriber, dstStore datastore.Batching, watcher <-chan dagsync.SyncFinished, peerInfo peer.AddrInfo, lnk ipld.Link, expectedSync cid.Cid) error {
var err error
func announceTest(t *testing.T, pub dagsync.Publisher, sub *dagsync.Subscriber, dstStore datastore.Batching, watcher <-chan dagsync.SyncFinished, peerInfo peer.AddrInfo, lnk ipld.Link) {
t.Helper()
c := lnk.(cidlink.Link).Cid
if c != cid.Undef {
pub.SetRoot(c)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := sub.Announce(ctx, c, peerInfo)
require.NoError(t, err)
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = sub.Announce(ctx, c, peerInfo)
if err != nil {
return err
}

select {
case <-time.After(updateTimeout):
return errors.New("timed out waiting for sync to propagate")
case downstream, open := <-watcher:
if !open {
return errors.New("event channel closed without receiving event")
}
if !downstream.Cid.Equals(c) {
return fmt.Errorf("sync returned unexpected cid %s, expected %s", downstream.Cid, c)
}
if _, err = dstStore.Get(context.Background(), datastore.NewKey(downstream.Cid.String())); err != nil {
return fmt.Errorf("data not in receiver store: %s", err)
}
}

return assertLatestSyncEquals(sub, peerInfo.ID, expectedSync)
waitLatestSync(t, dstStore, watcher, c)
assertLatestSyncEquals(t, sub, peerInfo.ID, c)
}

func newUpdateTest(pub dagsync.Publisher, sender announce.Sender, sub *dagsync.Subscriber, dstStore datastore.Batching, watcher <-chan dagsync.SyncFinished, peerID peer.ID, lnk ipld.Link, withFailure bool, expectedSync cid.Cid) error {
var err error
func senderAnnounceTest(t *testing.T, pub dagsync.Publisher, sender announce.Sender, sub *dagsync.Subscriber, dstStore datastore.Batching, watcher <-chan dagsync.SyncFinished, peerID peer.ID, lnk ipld.Link) {
t.Helper()
c := lnk.(cidlink.Link).Cid
if c != cid.Undef {
pub.SetRoot(c)
err = announce.Send(context.Background(), c, pub.Addrs(), sender)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := announce.Send(ctx, c, pub.Addrs(), sender)
require.NoError(t, err)
}
waitLatestSync(t, dstStore, watcher, c)
assertLatestSyncEquals(t, sub, peerID, c)
}

// If failure. then latestSync should not be updated.
if withFailure {
select {
case <-time.After(3 * time.Second):
case changeEvent, open := <-watcher:
if !open {
return nil
}
return fmt.Errorf("no exchange should have been performed, but got change from peer %s for cid %s", changeEvent.PeerID, changeEvent.Cid)
}
} else {
select {
case <-time.After(updateTimeout):
return errors.New("timed out waiting for sync to propagate")
case downstream, open := <-watcher:
if !open {
return errors.New("event channle closed without receiving event")
}
if !downstream.Cid.Equals(c) {
return fmt.Errorf("sync returned unexpected cid %s, expected %s", downstream.Cid, c)
}
if _, err = dstStore.Get(context.Background(), datastore.NewKey(downstream.Cid.String())); err != nil {
return fmt.Errorf("data not in receiver store: %s", err)
}
}
func waitLatestSync(t *testing.T, dstStore datastore.Batching, watcher <-chan dagsync.SyncFinished, expectedSync cid.Cid) {
t.Helper()
select {
case <-time.After(updateTimeout):
require.FailNow(t, "timed out waiting for sync to propagate")
case downstream, open := <-watcher:
require.True(t, open, "event channel closed without receiving event")
require.Equal(t, expectedSync, downstream.Cid, "sync returned unexpected cid")
_, err := dstStore.Get(context.Background(), datastore.NewKey(downstream.Cid.String()))
require.NoError(t, err, "data not in receiver store")
}
return assertLatestSyncEquals(sub, peerID, expectedSync)
}

func assertLatestSyncEquals(sub *dagsync.Subscriber, peerID peer.ID, want cid.Cid) error {
func assertLatestSyncEquals(t *testing.T, sub *dagsync.Subscriber, peerID peer.ID, want cid.Cid) {
t.Helper()
latest := sub.GetLatestSync(peerID)
if latest == nil {
return errors.New("latest sync is nil")
}
require.NotNil(t, latest, "latest sync is nil")
got := latest.(cidlink.Link)
if got.Cid != want {
return fmt.Errorf("latestSync not updated correctly, got %s want %s", got, want)
}
return nil
require.Equal(t, want, got.Cid, "latestSync not updated correctly")
}

0 comments on commit c886fd4

Please sign in to comment.