Skip to content

Commit

Permalink
Exploratory refactor-2 of libp2p + HTTP
Browse files Browse the repository at this point in the history
This is another version of the libp2p + HTTP refactor in #102. The key difference is that a new `NamespacedClient`, having the publisher's address, is created for each sync within the per-sync Syncer.
  • Loading branch information
gammazero committed Aug 9, 2023
1 parent 52d79f6 commit bc6a41f
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 22 deletions.
21 changes: 21 additions & 0 deletions dagsync/httpsync/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,27 @@ func NewPublisherWithoutServer(address string, handlerPath string, lsys ipld.Lin
}, nil
}

// NewPublisherHandler returns a Publisher for use as an http.Handler. Doesn't
// listen or know about a url prefix.
func NewPublisherHandler(lsys ipld.LinkSystem, privKey ic.PrivKey) (*Publisher, error) {
if privKey == nil {
return nil, errors.New("private key required to sign head requests")
}
peerID, err := peer.IDFromPrivateKey(privKey)
if err != nil {
return nil, fmt.Errorf("could not get peer id from private key: %w", err)
}

return &Publisher{
addr: nil,
closer: io.NopCloser(nil),
lsys: lsys,
handlerPath: "",
peerID: peerID,
privKey: privKey,
}, nil
}

// Addrs returns the addresses, as []multiaddress, that the Publisher is
// listening on.
func (p *Publisher) Addrs() []multiaddr.Multiaddr {
Expand Down
150 changes: 150 additions & 0 deletions dagsync/httpsync/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,23 @@ import (
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagjson"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/fluent"
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/storage/memstore"
"github.com/ipld/go-ipld-prime/traversal"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
"github.com/ipni/go-libipni/announce"
"github.com/ipni/go-libipni/announce/message"
"github.com/ipni/go-libipni/dagsync/httpsync"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
libp2phttp "github.com/libp2p/go-libp2p/p2p/http"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multicodec"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -96,6 +103,149 @@ func TestNewPublisherForListener(t *testing.T) {
}
}

func TestPublisherWithLibp2pHTTP(t *testing.T) {
ctx := context.Background()
req := require.New(t)

publisherStore := &correctedMemStore{&memstore.Store{
Bag: make(map[string][]byte),
}}
publisherLsys := cidlink.DefaultLinkSystem()
publisherLsys.TrustedStorage = true
publisherLsys.SetReadStorage(publisherStore)
publisherLsys.SetWriteStorage(publisherStore)

privKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 256, rand.Reader)
req.NoError(err)

publisher, err := httpsync.NewPublisherHandler(publisherLsys, privKey)
req.NoError(err)

// Use same identity as publisher. This is necessary so that same ID that
// the publisher uses to sign head/ query responses is the same as the ID
// used to identify the publisherStreamHost. Otherwise, it would be
// necessary for the sync client to know both IDs: one for the stream host
// to connect to, and one for the publisher to validate the dignatuse with.
publisherStreamHost, err := libp2p.New(libp2p.Identity(privKey), libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
req.NoError(err)

// This is the "HTTP Host". It's like the libp2p "stream host" (aka core
// host.Host), but it uses HTTP semantics instead of stream semantics.
//
// You can pass in options on creation like a stream host to do HTTP over
// libp2p streams, and multiaddrs to create listeners on.
publisherHost, err := libp2phttp.New(
libp2phttp.StreamHost(publisherStreamHost),
libp2phttp.ListenAddrs([]multiaddr.Multiaddr{multiaddr.StringCast("/ip4/127.0.0.1/tcp/0/http")}),
)
req.NoError(err)

go publisherHost.Serve()
defer publisherHost.Close()

protoID := protocol.ID("/ipni-sync/1")

serverStreamMa := publisherHost.Addrs()[0]
serverHTTPMa := publisherHost.Addrs()[1]
req.Contains(serverHTTPMa.String(), "/http")

t.Log("libp2p stream server address:", serverStreamMa.String())
t.Log("libp2p http server address:", serverHTTPMa.String())

// Here is where we attach our request handler. Note that we are mounting
// the "/ipni-sync/1" protocol at /ipni/. libp2phttp manages this mapping
// and clients can learn about the mapping at .well-known/libp2p.
//
// In this case we also want out HTTP handler to not even know about the
// prefix, so we use the stdlib http.StripPrefix.
publisherHost.SetHttpHandlerAtPath(protoID, "/ipni/", http.StripPrefix("/ipni/", publisher))

link, err := publisherLsys.Store(
ipld.LinkContext{Ctx: ctx},
cidlink.LinkPrototype{
Prefix: cid.Prefix{
Version: 1,
Codec: uint64(multicodec.DagJson),
MhType: uint64(multicodec.Sha2_256),
MhLength: -1,
},
},
fluent.MustBuildMap(basicnode.Prototype.Map, 4, func(na fluent.MapAssembler) {
na.AssembleEntry("fish").AssignString("lobster")
na.AssembleEntry("fish1").AssignString("lobster1")
na.AssembleEntry("fish2").AssignString("lobster2")
na.AssembleEntry("fish0").AssignString("lobster0")
}))
req.NoError(err)
publisher.SetRoot(link.(cidlink.Link).Cid)

testCases := []struct {
name string
publisher peer.AddrInfo
newClientHost func(t *testing.T) *libp2phttp.HTTPHost
}{
{
"HTTP transport",
peer.AddrInfo{Addrs: []multiaddr.Multiaddr{serverHTTPMa}},
func(t *testing.T) *libp2phttp.HTTPHost {
clientHost, err := libp2phttp.New()
req.NoError(err)
return clientHost
},
},
{
"libp2p stream transport",
peer.AddrInfo{ID: publisherStreamHost.ID(), Addrs: []multiaddr.Multiaddr{serverStreamMa}},
func(t *testing.T) *libp2phttp.HTTPHost {
clientStreamHost, err := libp2p.New(libp2p.NoListenAddrs)
req.NoError(err)
clientHost, err := libp2phttp.New(libp2phttp.StreamHost(clientStreamHost))
req.NoError(err)
return clientHost
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Plumbing to set up the test.
clientStore := &correctedMemStore{&memstore.Store{
Bag: make(map[string][]byte),
}}
clientLsys := cidlink.DefaultLinkSystem()
clientLsys.TrustedStorage = true
clientLsys.SetReadStorage(clientStore)
clientLsys.SetWriteStorage(clientStore)
clientSync := httpsync.NewLibp2pSync(clientLsys, tc.newClientHost(t), protoID, nil)

clientSyncer, err := clientSync.NewSyncer(tc.publisher.ID, tc.publisher.Addrs)
req.NoError(err)
wk := clientSyncer.PeerProtoMap()
if wk != nil {
req.Contains(wk, protoID)
}

headCid, err := clientSyncer.GetHead(ctx)
req.NoError(err)

req.Equal(link.(cidlink.Link).Cid, headCid)

clientSyncer.Sync(ctx, headCid, selectorparse.CommonSelector_MatchPoint)
require.NoError(t, err)

// Assert that data is loadable from the link system.
wantLink := cidlink.Link{Cid: headCid}
node, err := clientLsys.Load(ipld.LinkContext{Ctx: ctx}, wantLink, basicnode.Prototype.Any)
require.NoError(t, err)

// Assert synced node link matches the computed link, i.e. is spec-compliant.
gotLink, err := clientLsys.ComputeLink(wantLink.Prototype(), node)
require.NoError(t, err)
require.Equal(t, gotLink, wantLink, "computed %s but got %s", gotLink.String(), wantLink.String())
})
}
}

func mapKeys(t *testing.T, n ipld.Node) []string {
var keys []string
require.Equal(t, n.Kind(), datamodel.Kind_Map)
Expand Down
94 changes: 78 additions & 16 deletions dagsync/httpsync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/ipni/go-libipni/maurl"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
libp2phttp "github.com/libp2p/go-libp2p/p2p/http"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"
)
Expand All @@ -34,6 +36,10 @@ type Sync struct {
blockHook func(peer.ID, cid.Cid)
client *http.Client
lsys ipld.LinkSystem

// libp2phttp
clientHost *libp2phttp.HTTPHost
protoID protocol.ID
}

// NewSync creates a new Sync.
Expand All @@ -50,19 +56,79 @@ func NewSync(lsys ipld.LinkSystem, client *http.Client, blockHook func(peer.ID,
}
}

var errHeadFromUnexpectedPeer = errors.New("found head signed from an unexpected peer")

// Syncer provides sync functionality for a single sync with a peer.
type Syncer struct {
client *http.Client
peerID peer.ID
protos libp2phttp.WellKnownProtoMap
rootURL url.URL
urls []*url.URL
sync *Sync
}

func NewLibp2pSync(lsys ipld.LinkSystem, clientHost *libp2phttp.HTTPHost, protoID protocol.ID, blockHook func(peer.ID, cid.Cid)) *Sync {
return &Sync{
blockHook: blockHook,
lsys: lsys,

clientHost: clientHost,
protoID: protoID,
}
}

// NewSyncer creates a new Syncer to use for a single sync operation against a peer.
func (s *Sync) NewSyncer(peerID peer.ID, peerAddrs []multiaddr.Multiaddr) (*Syncer, error) {
urls := make([]*url.URL, len(peerAddrs))
for i := range peerAddrs {
//
// TODO: Replace arguments with peer.AddrInfo
func (s *Sync) NewSyncer(peerID peer.ID, addrs []multiaddr.Multiaddr) (*Syncer, error) {
peerInfo := peer.AddrInfo{
ID: peerID,
Addrs: addrs,
}
if s.clientHost != nil {
return s.newLibp2pSyncer(peerInfo)
}
return s.newSyncer(peerInfo)
}

func (s *Sync) newLibp2pSyncer(peerInfo peer.AddrInfo) (*Syncer, error) {
httpClient, err := s.clientHost.NamespacedClient(s.protoID, peerInfo)
if err != nil {
return nil, err
}

sr := &Syncer{
client: &httpClient,
peerID: peerInfo.ID,
rootURL: url.URL{Path: "/"},
urls: nil,
sync: s,
}

if peerInfo.ID != "" {
sr.protos, err = s.clientHost.GetAndStorePeerProtoMap(httpClient.Transport, peerInfo.ID)
if err != nil {
return nil, err
}
}

return sr, nil
}

func (s *Sync) newSyncer(peerInfo peer.AddrInfo) (*Syncer, error) {
urls := make([]*url.URL, len(peerInfo.Addrs))
for i, addr := range peerInfo.Addrs {
var err error
urls[i], err = maurl.ToURL(peerAddrs[i])
urls[i], err = maurl.ToURL(addr)
if err != nil {
return nil, err
}
}

return &Syncer{
peerID: peerID,
client: s.client,
peerID: peerInfo.ID,
rootURL: *urls[0],
urls: urls[1:],
sync: s,
Expand All @@ -73,14 +139,8 @@ func (s *Sync) Close() {
s.client.CloseIdleConnections()
}

var errHeadFromUnexpectedPeer = errors.New("found head signed from an unexpected peer")

// Syncer provides sync functionality for a single sync with a peer.
type Syncer struct {
peerID peer.ID
rootURL url.URL
urls []*url.URL
sync *Sync
func (s *Syncer) PeerProtoMap() libp2phttp.WellKnownProtoMap {
return s.protos
}

// GetHead fetches the head of the peer's advertisement chain.
Expand All @@ -102,7 +162,9 @@ func (s *Syncer) GetHead(ctx context.Context) (cid.Cid, error) {
return cid.Undef, err
}

if peerIDFromSig != s.peerID {
if s.peerID == "" {
log.Warn("cannot verify publisher signature without peer ID")
} else if peerIDFromSig != s.peerID {
return cid.Undef, errHeadFromUnexpectedPeer
}

Expand Down Expand Up @@ -136,7 +198,7 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error
}
}

s.sync.client.CloseIdleConnections()
s.client.CloseIdleConnections()
return nil
}

Expand Down Expand Up @@ -205,7 +267,7 @@ nextURL:
return err
}

resp, err := s.sync.client.Do(req)
resp, err := s.client.Do(req)
if err != nil {
if len(s.urls) != 0 {
log.Errorw("Fetch request failed, will retry with next address", "err", err)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/ipfs/go-ipld-format v0.3.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipld/go-ipld-prime v0.20.0
github.com/libp2p/go-libp2p v0.29.1
github.com/libp2p/go-libp2p v0.29.1-0.20230804182920-49d7db486c5e
github.com/libp2p/go-libp2p-gostream v0.6.0
github.com/libp2p/go-libp2p-pubsub v0.9.3
github.com/libp2p/go-msgio v0.3.0
Expand Down Expand Up @@ -113,7 +113,7 @@ require (
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/qtls-go1-19 v0.3.3 // indirect
github.com/quic-go/qtls-go1-20 v0.2.3 // indirect
github.com/quic-go/quic-go v0.36.3 // indirect
github.com/quic-go/quic-go v0.36.4 // indirect
github.com/quic-go/webtransport-go v0.5.3 // indirect
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/russross/blackfriday/v2 v2.0.1 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,8 @@ github.com/libp2p/go-cidranger v1.1.0 h1:ewPN8EZ0dd1LSnrtuwd4709PXVcITVeuwbag38y
github.com/libp2p/go-cidranger v1.1.0/go.mod h1:KWZTfSr+r9qEo9OkI9/SIEeAtw+NNoU0dXIXt15Okic=
github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFGkx3Q3WM=
github.com/libp2p/go-flow-metrics v0.1.0/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro=
github.com/libp2p/go-libp2p v0.29.1 h1:yNeg6XgP8gbdc4YSrwiIt5T1TGOrVjH8dzl8h0GIOfQ=
github.com/libp2p/go-libp2p v0.29.1/go.mod h1:20El+LLy3/YhdUYIvGbLnvVJN32nMdqY6KXBENRAfLY=
github.com/libp2p/go-libp2p v0.29.1-0.20230804182920-49d7db486c5e h1:OGUuDNhPAEt58YoUW5fSSB1XeQB3k5OFPokuLc1KVeA=
github.com/libp2p/go-libp2p v0.29.1-0.20230804182920-49d7db486c5e/go.mod h1:iNKL7mEnZ9wAss+03IjAwM9ZAQXfVUAPUUmOACQfQ/g=
github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLEQHwOCZ7s8s=
github.com/libp2p/go-libp2p-asn-util v0.3.0/go.mod h1:B1mcOrKUE35Xq/ASTmQ4tN3LNzVVaMNmq2NACuqyB9w=
github.com/libp2p/go-libp2p-gostream v0.6.0 h1:QfAiWeQRce6pqnYfmIVWJFXNdDyfiR/qkCnjyaZUPYU=
Expand Down Expand Up @@ -426,8 +426,8 @@ github.com/quic-go/qtls-go1-19 v0.3.3 h1:wznEHvJwd+2X3PqftRha0SUKmGsnb6dfArMhy9P
github.com/quic-go/qtls-go1-19 v0.3.3/go.mod h1:ySOI96ew8lnoKPtSqx2BlI5wCpUVPT05RMAlajtnyOI=
github.com/quic-go/qtls-go1-20 v0.2.3 h1:m575dovXn1y2ATOb1XrRFcrv0F+EQmlowTkoraNkDPI=
github.com/quic-go/qtls-go1-20 v0.2.3/go.mod h1:JKtK6mjbAVcUTN/9jZpvLbGxvdWIKS8uT7EiStoU1SM=
github.com/quic-go/quic-go v0.36.3 h1:f+yOqeGhMoRX7/M3wmEw/djhzKWr15FtQysox85/834=
github.com/quic-go/quic-go v0.36.3/go.mod h1:qxQumdeKw5GmWs1OsTZZnOxzSI+RJWuhf1O8FN35L2o=
github.com/quic-go/quic-go v0.36.4 h1:CXn/ZLN5Vntlk53fjR+kUMC8Jt7flfQe+I5Ty5A+k0o=
github.com/quic-go/quic-go v0.36.4/go.mod h1:qxQumdeKw5GmWs1OsTZZnOxzSI+RJWuhf1O8FN35L2o=
github.com/quic-go/webtransport-go v0.5.3 h1:5XMlzemqB4qmOlgIus5zB45AcZ2kCgCy2EptUrfOPWU=
github.com/quic-go/webtransport-go v0.5.3/go.mod h1:OhmmgJIzTTqXK5xvtuX0oBpLV2GkLWNDA+UeTGJXErU=
github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk=
Expand Down

0 comments on commit bc6a41f

Please sign in to comment.