Skip to content

Commit

Permalink
Implement ipni-sync http over libp2p (#113)
Browse files Browse the repository at this point in the history
* Implement ipni-sync http over libp2p

Use the new libp2phttp functionality for serving and requesting ipnisync over libp2p.

* Move libp2phttp functionality inside ipnisync
* Subscriber does not use existing http client
* Add HttpTimeout option for subscriber
* Add option to use retryable http client
* Change option name from `WithServer` to `WithStartServer`
* Sync gets addrs from peerstore if none supplied
* If server is not libp2phttp, then use address to choose plain HTTP or dtsync.
* If publisher HTTP is not available at IPNI path, then retry without IPNI path. This supports legacy HTTP served without IPNI path.
* ignore emtpy http listen addrs
* Update log messages
* Log peer.ID consistently as peer
* Convert most tests to use ipnisync publisher
* Change AsyncErr to Err in SyncFinished
* Move old p2p head client/server (legs protocol ID) into dtsync, since that is the only place it is used.
* Add tests for libp2phttp
* gostream package relocated
* update graphsync
* Use IPNIPath for libp2p protocol ID
  • Loading branch information
gammazero authored Aug 31, 2023
1 parent e0792fc commit eccd3d5
Show file tree
Hide file tree
Showing 21 changed files with 1,159 additions and 392 deletions.
8 changes: 4 additions & 4 deletions announce/httpsender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ type Sender struct {
userAgent string
}

// New creates a new Sender that sends announce messages over HTTP. Announce
// messages are sent to the specified URLs. The addresses in announce messages
// are modified to include the specified peerID, which is necessary to
// communicate the publisher ID over HTTP.
// New creates a new Sender that sends advertisement announcement messages over
// HTTP. Announcements are sent directly to the specified URLs. The specified
// peerID is added to the multiaddrs contained in the announcements, which is
// how the publisher ID is communicated over HTTP.
func New(announceURLs []*url.URL, peerID peer.ID, options ...Option) (*Sender, error) {
if len(announceURLs) == 0 {
return nil, errors.New("no announce urls")
Expand Down
55 changes: 25 additions & 30 deletions dagsync/announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,29 @@ import (

func TestAnnounceReplace(t *testing.T) {
t.Parallel()
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
dstStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcHost := test.MkTestHost(t)
srcHostInfo := peer.AddrInfo{
ID: srcHost.ID(),
Addrs: srcHost.Addrs(),
}
srcLnkS := test.MkLinkSystem(srcStore)
dstHost := test.MkTestHost(t)

srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour)
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)
//dstLnkS := test.MkLinkSystem(dstStore)

dstLnkS, blocked := test.MkBlockedLinkSystem(dstStore)
blocksSeenByHook := make(map[cid.Cid]struct{})
blockHook := func(p peer.ID, c cid.Cid, _ dagsync.SegmentSyncActions) {
blocksSeenByHook[c] = struct{}{}
}

pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic)
require.NoError(t, err)
defer pub.Close()

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce(),
dagsync.BlockHook(blockHook))
require.NoError(t, err)
defer sub.Close()

require.NoError(t, test.WaitForP2PPublisher(pub, dstHost, testTopic))
srcHost, srcPrivKey := test.MkTestHostPK(t)
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcLnkS := test.MkLinkSystem(srcStore)

pub, err := ipnisync.NewPublisher(srcLnkS, srcPrivKey, ipnisync.WithStreamHost(srcHost), ipnisync.WithHeadTopic(testTopic))
require.NoError(t, err)
defer pub.Close()

srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour)
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)

watcher, cncl := sub.OnSyncFinished()
defer cncl()
Expand All @@ -67,6 +60,11 @@ func TestAnnounceReplace(t *testing.T) {
firstCid := chainLnks[2].(cidlink.Link).Cid
pub.SetRoot(firstCid)

srcHostInfo := peer.AddrInfo{
ID: srcHost.ID(),
Addrs: srcHost.Addrs(),
}

// Have the subscriber receive an announce. This is the same as if it was
// published by the publisher without having to wait for it to arrive.
err = sub.Announce(context.Background(), firstCid, srcHostInfo)
Expand Down Expand Up @@ -154,7 +152,7 @@ func TestAnnounce_LearnsHttpPublisherAddr(t *testing.T) {
pubh := test.MkTestHost(t)
pubds := dssync.MutexWrap(datastore.NewMapDatastore())
publs := test.MkLinkSystem(pubds)
pub, err := ipnisync.NewPublisher("0.0.0.0:0", publs, pubh.Peerstore().PrivKey(pubh.ID()))
pub, err := ipnisync.NewPublisher(publs, pubh.Peerstore().PrivKey(pubh.ID()), ipnisync.WithHTTPListenAddrs("0.0.0.0:0"))
require.NoError(t, err)
defer pub.Close()

Expand Down Expand Up @@ -214,11 +212,7 @@ func TestAnnounce_LearnsHttpPublisherAddr(t *testing.T) {
func TestAnnounceRepublish(t *testing.T) {
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
dstStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcHost := test.MkTestHost(t)
srcHostInfo := peer.AddrInfo{
ID: srcHost.ID(),
Addrs: srcHost.Addrs(),
}
srcHost, srcPrivKey := test.MkTestHostPK(t)
srcLnkS := test.MkLinkSystem(srcStore)
dstHost := test.MkTestHost(t)

Expand All @@ -243,10 +237,9 @@ func TestAnnounceRepublish(t *testing.T) {
require.NoError(t, err)
defer sub1.Close()

pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic)
pub, err := ipnisync.NewPublisher(srcLnkS, srcPrivKey, ipnisync.WithStreamHost(srcHost), ipnisync.WithHeadTopic(testTopic))
require.NoError(t, err)
defer pub.Close()
require.NoError(t, test.WaitForP2PPublisher(pub, dstHost, testTopic))

watcher2, cncl := sub2.OnSyncFinished()
defer cncl()
Expand All @@ -258,7 +251,11 @@ func TestAnnounceRepublish(t *testing.T) {
pub.SetRoot(firstCid)

// Announce one CID to subscriber1.
err = sub1.Announce(context.Background(), firstCid, srcHostInfo)
pubInfo := peer.AddrInfo{
ID: pub.ID(),
Addrs: pub.Addrs(),
}
err = sub1.Announce(context.Background(), firstCid, pubInfo)
require.NoError(t, err)
t.Log("Sent announce for first CID", firstCid)

Expand Down Expand Up @@ -444,7 +441,7 @@ func mkLnk(t *testing.T, srcStore datastore.Batching) cid.Cid {
}

func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching, allowPeer func(peer.ID) bool) (host.Host, host.Host, dagsync.Publisher, *dagsync.Subscriber, announce.Sender) {
srcHost := test.MkTestHost(t)
srcHost, srcPrivKey := test.MkTestHostPK(t)
dstHost := test.MkTestHost(t)
topics := test.WaitForMeshWithMessage(t, testTopic, srcHost, dstHost)

Expand All @@ -453,7 +450,7 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching, allowPeer f
p2pSender, err := p2psender.New(nil, "", p2psender.WithTopic(topics[0]), p2psender.WithExtraData([]byte("t01000")))
require.NoError(t, err)

pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic)
pub, err := ipnisync.NewPublisher(srcLnkS, srcPrivKey, ipnisync.WithStreamHost(srcHost), ipnisync.WithHeadTopic(testTopic))
require.NoError(t, err)

srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour)
Expand All @@ -467,7 +464,5 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching, allowPeer f
err = srcHost.Connect(context.Background(), dstHost.Peerstore().PeerInfo(dstHost.ID()))
require.NoError(t, err)

require.NoError(t, test.WaitForP2PPublisher(pub, dstHost, testTopic))

return srcHost, dstHost, pub, sub, p2pSender
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (

"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
gostream "github.com/libp2p/go-libp2p-gostream"
"github.com/libp2p/go-libp2p/core/host"
peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
multistream "github.com/multiformats/go-multistream"
"github.com/libp2p/go-libp2p/p2p/net/gostream"
"github.com/multiformats/go-multistream"
)

const closeTimeout = 30 * time.Second
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
_ "github.com/ipld/go-ipld-prime/codec/dagjson"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipni/go-libipni/dagsync/p2p/protocol/head"
"github.com/ipni/go-libipni/dagsync/dtsync/head"
"github.com/ipni/go-libipni/dagsync/test"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
Expand Down
2 changes: 1 addition & 1 deletion dagsync/dtsync/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipld/go-ipld-prime"
"github.com/ipni/go-libipni/dagsync/p2p/protocol/head"
"github.com/ipni/go-libipni/dagsync/dtsync/head"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
Expand Down
2 changes: 1 addition & 1 deletion dagsync/dtsync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipni/go-libipni/dagsync/p2p/protocol/head"
"github.com/ipni/go-libipni/dagsync/dtsync/head"
"github.com/libp2p/go-libp2p/core/peer"
)

Expand Down
13 changes: 8 additions & 5 deletions dagsync/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dagsync_test
import (
"bytes"
"context"
"crypto/rand"
"fmt"
"io"
"log"
Expand All @@ -15,8 +16,9 @@ import (
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipni/go-libipni/dagsync"
"github.com/ipni/go-libipni/dagsync/dtsync"
"github.com/ipni/go-libipni/dagsync/ipnisync"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/multiformats/go-multicodec"
)
Expand All @@ -25,12 +27,13 @@ var srcHost host.Host

func ExamplePublisher() {
// Init dagsync publisher and subscriber.
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcHost, _ = libp2p.New()
srcPrivKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
srcHost, _ = libp2p.New(libp2p.Identity(srcPrivKey))
defer srcHost.Close()
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcLnkS := makeLinkSystem(srcStore)

pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic)
pub, err := ipnisync.NewPublisher(srcLnkS, srcPrivKey, ipnisync.WithStreamHost(srcHost), ipnisync.WithHeadTopic("/indexer/ingest/testnet"))
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -65,7 +68,7 @@ func ExampleSubscriber() {
srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour)
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkSys, "/indexer/ingest/testnet", nil)
sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkSys, "/indexer/ingest/testnet")
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion dagsync/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func setupPublisherSubscriber(t *testing.T, subscriberOptions []dagsync.Option)
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcLinkSys := test.MkLinkSystem(srcStore)

pub, err := ipnisync.NewPublisher("127.0.0.1:0", srcLinkSys, srcPrivKey, ipnisync.WithServer(true))
pub, err := ipnisync.NewPublisher(srcLinkSys, srcPrivKey, ipnisync.WithHTTPListenAddrs("127.0.0.1:0"))
require.NoError(t, err)
t.Cleanup(func() {
pub.Close()
Expand Down
16 changes: 12 additions & 4 deletions dagsync/ipnisync/ipnipath.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
package ipnisync

import "path"
import "github.com/libp2p/go-libp2p/core/protocol"

const protoVersion = "v1"

var IpniPath = path.Join("/ipni", protoVersion, "ad")
const (
// IPNIPath is the path that the Publisher expects as the last port of the
// HTTP request URL path. The sync client automatically adds this to the
// request path.
IPNIPath = "/ipni/v1/ad"
// ProtocolID is the libp2p protocol ID used to identify the ipni-sync
// protocol. With libp2phttp this protocol ID maps directly to a single
// HTTP path, so the value of the protocol ID is the same as the IPNI path
// for the ipni-sync protocol.
ProtocolID = protocol.ID(IPNIPath)
)
Loading

0 comments on commit eccd3d5

Please sign in to comment.