Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ipni-sync http over libp2p #113

Merged
merged 29 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
cb47480
Implement ipni-sync http over libp2p
gammazero Aug 17, 2023
58a980e
Move libp2phttp functionality inside ipnisync
gammazero Aug 18, 2023
a8fe746
Subscriber does not use existing http client
gammazero Aug 18, 2023
37ef775
Fix handler path processing
gammazero Aug 18, 2023
4c6762a
Change option name
gammazero Aug 19, 2023
f9322b0
Update comments
gammazero Aug 19, 2023
b91b8e8
Remove usused field
gammazero Aug 19, 2023
696de20
Sync gets addrs from peerstore if none supplied
gammazero Aug 19, 2023
6fae9ff
Use dtsync if publisher server if libp2p without HTTP
gammazero Aug 19, 2023
d7b76b3
If server no libp2phttp, then use address to choose plain HTTP or dts…
gammazero Aug 20, 2023
03da4f6
Fix error handling in syncer creation
gammazero Aug 20, 2023
62961ad
Fix race when accessing libp2phttp.HTTPHost
gammazero Aug 20, 2023
ab495c8
If publisher HTTP no availabe at IPNI path, retry without IPNI path. …
gammazero Aug 21, 2023
ef38410
Use latest libp2phttp
gammazero Aug 21, 2023
faa5859
latest libp2phttp
gammazero Aug 21, 2023
f1e3263
ignore emtpy http listen addr
gammazero Aug 23, 2023
0669ee8
update to latest libp2phttp
gammazero Aug 24, 2023
999067b
Update log messages
gammazero Aug 24, 2023
309ba51
Log peer.ID consistently as peer
gammazero Aug 24, 2023
1e89b86
Update libp2p
gammazero Aug 25, 2023
352032b
Convert most tests to use ipnisync publisher
gammazero Aug 25, 2023
f5acb82
update comments
gammazero Aug 25, 2023
2194d3a
Update libp2phttp
gammazero Aug 26, 2023
6828f75
Update libp2p
gammazero Aug 29, 2023
1ee93ea
gostream relocated
gammazero Aug 29, 2023
0c9c4ca
update graphsync
gammazero Aug 29, 2023
c4fda5e
Update to release version of libp2p
gammazero Aug 30, 2023
b2617d4
update comment
gammazero Aug 30, 2023
e6d63d9
Use IPNIPath for libp2p protocol ID
gammazero Aug 31, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions announce/httpsender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ type Sender struct {
userAgent string
}

// New creates a new Sender that sends announce messages over HTTP. Announce
// messages are sent to the specified URLs. The addresses in announce messages
// are modified to include the specified peerID, which is necessary to
// communicate the publisher ID over HTTP.
// New creates a new Sender that sends advertisement announcement messages over
// HTTP. Announcements are sent directly to the specified URLs. The specified
// peerID is added to the multiaddrs contained in the announcements, which is
// how the publisher ID is communicated over HTTP.
func New(announceURLs []*url.URL, peerID peer.ID, options ...Option) (*Sender, error) {
if len(announceURLs) == 0 {
return nil, errors.New("no announce urls")
Expand Down
55 changes: 25 additions & 30 deletions dagsync/announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,29 @@ import (

func TestAnnounceReplace(t *testing.T) {
t.Parallel()
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
dstStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcHost := test.MkTestHost(t)
srcHostInfo := peer.AddrInfo{
ID: srcHost.ID(),
Addrs: srcHost.Addrs(),
}
srcLnkS := test.MkLinkSystem(srcStore)
dstHost := test.MkTestHost(t)

srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour)
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)
//dstLnkS := test.MkLinkSystem(dstStore)

dstLnkS, blocked := test.MkBlockedLinkSystem(dstStore)
blocksSeenByHook := make(map[cid.Cid]struct{})
blockHook := func(p peer.ID, c cid.Cid, _ dagsync.SegmentSyncActions) {
blocksSeenByHook[c] = struct{}{}
}

pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic)
require.NoError(t, err)
defer pub.Close()

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce(),
dagsync.BlockHook(blockHook))
require.NoError(t, err)
defer sub.Close()

require.NoError(t, test.WaitForP2PPublisher(pub, dstHost, testTopic))
srcHost, srcPrivKey := test.MkTestHostPK(t)
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcLnkS := test.MkLinkSystem(srcStore)

pub, err := ipnisync.NewPublisher(srcLnkS, srcPrivKey, ipnisync.WithStreamHost(srcHost), ipnisync.WithHeadTopic(testTopic))
require.NoError(t, err)
defer pub.Close()

srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour)
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)

watcher, cncl := sub.OnSyncFinished()
defer cncl()
Expand All @@ -67,6 +60,11 @@ func TestAnnounceReplace(t *testing.T) {
firstCid := chainLnks[2].(cidlink.Link).Cid
pub.SetRoot(firstCid)

srcHostInfo := peer.AddrInfo{
ID: srcHost.ID(),
Addrs: srcHost.Addrs(),
}

// Have the subscriber receive an announce. This is the same as if it was
// published by the publisher without having to wait for it to arrive.
err = sub.Announce(context.Background(), firstCid, srcHostInfo)
Expand Down Expand Up @@ -154,7 +152,7 @@ func TestAnnounce_LearnsHttpPublisherAddr(t *testing.T) {
pubh := test.MkTestHost(t)
pubds := dssync.MutexWrap(datastore.NewMapDatastore())
publs := test.MkLinkSystem(pubds)
pub, err := ipnisync.NewPublisher("0.0.0.0:0", publs, pubh.Peerstore().PrivKey(pubh.ID()))
pub, err := ipnisync.NewPublisher(publs, pubh.Peerstore().PrivKey(pubh.ID()), ipnisync.WithHTTPListenAddrs("0.0.0.0:0"))
require.NoError(t, err)
defer pub.Close()

Expand Down Expand Up @@ -214,11 +212,7 @@ func TestAnnounce_LearnsHttpPublisherAddr(t *testing.T) {
func TestAnnounceRepublish(t *testing.T) {
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
dstStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcHost := test.MkTestHost(t)
srcHostInfo := peer.AddrInfo{
ID: srcHost.ID(),
Addrs: srcHost.Addrs(),
}
srcHost, srcPrivKey := test.MkTestHostPK(t)
srcLnkS := test.MkLinkSystem(srcStore)
dstHost := test.MkTestHost(t)

Expand All @@ -243,10 +237,9 @@ func TestAnnounceRepublish(t *testing.T) {
require.NoError(t, err)
defer sub1.Close()

pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic)
pub, err := ipnisync.NewPublisher(srcLnkS, srcPrivKey, ipnisync.WithStreamHost(srcHost), ipnisync.WithHeadTopic(testTopic))
require.NoError(t, err)
defer pub.Close()
require.NoError(t, test.WaitForP2PPublisher(pub, dstHost, testTopic))

watcher2, cncl := sub2.OnSyncFinished()
defer cncl()
Expand All @@ -258,7 +251,11 @@ func TestAnnounceRepublish(t *testing.T) {
pub.SetRoot(firstCid)

// Announce one CID to subscriber1.
err = sub1.Announce(context.Background(), firstCid, srcHostInfo)
pubInfo := peer.AddrInfo{
ID: pub.ID(),
Addrs: pub.Addrs(),
}
err = sub1.Announce(context.Background(), firstCid, pubInfo)
require.NoError(t, err)
t.Log("Sent announce for first CID", firstCid)

Expand Down Expand Up @@ -444,7 +441,7 @@ func mkLnk(t *testing.T, srcStore datastore.Batching) cid.Cid {
}

func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching, allowPeer func(peer.ID) bool) (host.Host, host.Host, dagsync.Publisher, *dagsync.Subscriber, announce.Sender) {
srcHost := test.MkTestHost(t)
srcHost, srcPrivKey := test.MkTestHostPK(t)
dstHost := test.MkTestHost(t)
topics := test.WaitForMeshWithMessage(t, testTopic, srcHost, dstHost)

Expand All @@ -453,7 +450,7 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching, allowPeer f
p2pSender, err := p2psender.New(nil, "", p2psender.WithTopic(topics[0]), p2psender.WithExtraData([]byte("t01000")))
require.NoError(t, err)

pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic)
pub, err := ipnisync.NewPublisher(srcLnkS, srcPrivKey, ipnisync.WithStreamHost(srcHost), ipnisync.WithHeadTopic(testTopic))
require.NoError(t, err)

srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour)
Expand All @@ -467,7 +464,5 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching, allowPeer f
err = srcHost.Connect(context.Background(), dstHost.Peerstore().PeerInfo(dstHost.ID()))
require.NoError(t, err)

require.NoError(t, test.WaitForP2PPublisher(pub, dstHost, testTopic))

return srcHost, dstHost, pub, sub, p2pSender
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (

"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
gostream "github.com/libp2p/go-libp2p-gostream"
"github.com/libp2p/go-libp2p/core/host"
peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
multistream "github.com/multiformats/go-multistream"
"github.com/libp2p/go-libp2p/p2p/net/gostream"
"github.com/multiformats/go-multistream"
)

const closeTimeout = 30 * time.Second
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
_ "github.com/ipld/go-ipld-prime/codec/dagjson"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipni/go-libipni/dagsync/p2p/protocol/head"
"github.com/ipni/go-libipni/dagsync/dtsync/head"
"github.com/ipni/go-libipni/dagsync/test"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
Expand Down
2 changes: 1 addition & 1 deletion dagsync/dtsync/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipld/go-ipld-prime"
"github.com/ipni/go-libipni/dagsync/p2p/protocol/head"
"github.com/ipni/go-libipni/dagsync/dtsync/head"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
Expand Down
2 changes: 1 addition & 1 deletion dagsync/dtsync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipni/go-libipni/dagsync/p2p/protocol/head"
"github.com/ipni/go-libipni/dagsync/dtsync/head"
"github.com/libp2p/go-libp2p/core/peer"
)

Expand Down
13 changes: 8 additions & 5 deletions dagsync/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dagsync_test
import (
"bytes"
"context"
"crypto/rand"
"fmt"
"io"
"log"
Expand All @@ -15,8 +16,9 @@ import (
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipni/go-libipni/dagsync"
"github.com/ipni/go-libipni/dagsync/dtsync"
"github.com/ipni/go-libipni/dagsync/ipnisync"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/multiformats/go-multicodec"
)
Expand All @@ -25,12 +27,13 @@ var srcHost host.Host

func ExamplePublisher() {
// Init dagsync publisher and subscriber.
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcHost, _ = libp2p.New()
srcPrivKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
srcHost, _ = libp2p.New(libp2p.Identity(srcPrivKey))
defer srcHost.Close()
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcLnkS := makeLinkSystem(srcStore)

pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic)
pub, err := ipnisync.NewPublisher(srcLnkS, srcPrivKey, ipnisync.WithStreamHost(srcHost), ipnisync.WithHeadTopic("/indexer/ingest/testnet"))
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -65,7 +68,7 @@ func ExampleSubscriber() {
srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour)
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkSys, "/indexer/ingest/testnet", nil)
sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkSys, "/indexer/ingest/testnet")
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion dagsync/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func setupPublisherSubscriber(t *testing.T, subscriberOptions []dagsync.Option)
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcLinkSys := test.MkLinkSystem(srcStore)

pub, err := ipnisync.NewPublisher("127.0.0.1:0", srcLinkSys, srcPrivKey, ipnisync.WithServer(true))
pub, err := ipnisync.NewPublisher(srcLinkSys, srcPrivKey, ipnisync.WithHTTPListenAddrs("127.0.0.1:0"))
require.NoError(t, err)
t.Cleanup(func() {
pub.Close()
Expand Down
16 changes: 12 additions & 4 deletions dagsync/ipnisync/ipnipath.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
package ipnisync

import "path"
import "github.com/libp2p/go-libp2p/core/protocol"

const protoVersion = "v1"

var IpniPath = path.Join("/ipni", protoVersion, "ad")
const (
// IPNIPath is the path that the Publisher expects as the last port of the
// HTTP request URL path. The sync client automatically adds this to the
// request path.
IPNIPath = "/ipni/v1/ad"
// ProtocolID is the libp2p protocol ID used to identify the ipni-sync
// protocol. With libp2phttp this protocol ID maps directly to a single
// HTTP path, so the value of the protocol ID is the same as the IPNI path
// for the ipni-sync protocol.
ProtocolID = protocol.ID(IPNIPath)
)
Loading