Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libp2p + HTTP an exploratory refactor #102

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions dagsync/httpsync/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,26 @@ func NewPublisherWithoutServer(address string, handlerPath string, lsys ipld.Lin
}, nil
}

// NewPublisherHandler returns a Publisher for use as an http.Handler. Doesn't listen or know about a url prefix.
func NewPublisherHandler(lsys ipld.LinkSystem, privKey ic.PrivKey) (*Publisher, error) {
if privKey == nil {
return nil, errors.New("private key required to sign head requests")
}
peerID, err := peer.IDFromPrivateKey(privKey)
if err != nil {
return nil, fmt.Errorf("could not get peer id from private key: %w", err)
}

return &Publisher{
addr: nil,
closer: io.NopCloser(nil),
lsys: lsys,
handlerPath: "",
peerID: peerID,
privKey: privKey,
}, nil
}

// Addrs returns the addresses, as []multiaddress, that the Publisher is
// listening on.
func (p *Publisher) Addrs() []multiaddr.Multiaddr {
Expand Down
130 changes: 130 additions & 0 deletions dagsync/httpsync/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,23 @@ import (
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagjson"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/fluent"
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/storage/memstore"
"github.com/ipld/go-ipld-prime/traversal"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
"github.com/ipni/go-libipni/announce"
"github.com/ipni/go-libipni/announce/message"
"github.com/ipni/go-libipni/dagsync/httpsync"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
libp2phttp "github.com/libp2p/go-libp2p/p2p/http"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multicodec"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -96,6 +103,129 @@ func TestNewPublisherForListener(t *testing.T) {
}
}

func TestPublisherWithLibp2pHTTP(t *testing.T) {
ctx := context.Background()
req := require.New(t)

publisherStore := &correctedMemStore{&memstore.Store{
Bag: make(map[string][]byte),
}}
publisherLsys := cidlink.DefaultLinkSystem()
publisherLsys.TrustedStorage = true
publisherLsys.SetReadStorage(publisherStore)
publisherLsys.SetWriteStorage(publisherStore)

privKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 256, rand.Reader)
req.NoError(err)

publisher, err := httpsync.NewPublisherHandler(publisherLsys, privKey)
req.NoError(err)

publisherStreamHost, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
req.NoError(err)

publisherHost, err := libp2phttp.New(
libp2phttp.StreamHost(publisherStreamHost),
libp2phttp.ListenAddrs([]multiaddr.Multiaddr{multiaddr.StringCast("/ip4/127.0.0.1/tcp/0/http")}),
)
req.NoError(err)

go publisherHost.Serve()
defer publisherHost.Close()
Comment on lines +127 to +134
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the "HTTP Host". It's like the libp2p "stream host" (aka core host.Host), but it uses HTTP semantics instead of stream semantics.

You can pass in options on creation like a stream host to do HTTP over libp2p streams, and multiaddrs to create listeners on.


protoID := protocol.ID("/ipni-sync/1")

serverStreamMa := publisherHost.Addrs()[0]
serverHTTPMa := publisherHost.Addrs()[1]
req.Contains(serverHTTPMa.String(), "/http")

publisherHost.SetHttpHandlerAtPath(protoID, "/ipni/", http.StripPrefix("/ipni/", publisher))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is where we attach our request handler. Note that we are mounting the "/ipni-sync/1" protocol at /ipni/. libp2phttp manages this mapping and clients can learn about the mapping at .well-known/libp2p.

In this case we also want out HTTP handler to not even know about the prefix, so we use the stdlib http.StripPrefix.


link, err := publisherLsys.Store(
ipld.LinkContext{Ctx: ctx},
cidlink.LinkPrototype{
Prefix: cid.Prefix{
Version: 1,
Codec: uint64(multicodec.DagJson),
MhType: uint64(multicodec.Sha2_256),
MhLength: -1,
},
},
fluent.MustBuildMap(basicnode.Prototype.Map, 4, func(na fluent.MapAssembler) {
na.AssembleEntry("fish").AssignString("lobster")
na.AssembleEntry("fish1").AssignString("lobster1")
na.AssembleEntry("fish2").AssignString("lobster2")
na.AssembleEntry("fish0").AssignString("lobster0")
}))
req.NoError(err)
publisher.SetRoot(link.(cidlink.Link).Cid)

testCases := []struct {
name string
newClient func(t *testing.T) *http.Client
}{
{"HTTP transport", func(t *testing.T) *http.Client {
clientHost, err := libp2phttp.New()
req.NoError(err)

c, err := clientHost.NamespacedClient(protoID, peer.AddrInfo{Addrs: []multiaddr.Multiaddr{serverHTTPMa}})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to mean that a separate NamespacedClient is needed for each separate server that a request is sent to. This is somewhat different than the semantics of the standard http.Client where the same client instance can be used to send requests to servers at different URLs. Is this correct?

This matters because with the structure of the httpsync code as it currently is, the sync is a long-lived object (created once) that contains the http.Client which is reused for separate Syncer requests. What will need to change is that a new NamespacedClient will need to be associated with each per-sync Syncer instance and not with the Sync.

Can NamespacedClient instances be closed and removed from their clientHost?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a quick note: Connections are managed by the underlying Roundtripper. NamespacedClients are a thin wrapper around an underlying RoundTripper that adds a prefix path to http requests. They don't change how connections are managed or cached.

This matters because with the structure of the httpsync code as it currently is, the sync is a long-lived object (created once) that contains the http.Client which is reused for separate Syncer requests. What will need to change is that a new NamespacedClient will need to be associated with each per-sync Syncer instance and not with the Sync.

Yep that makes sense to me. 👍

an NamespacedClient instances be closed and removed from their clientHost?

Not necessary to remove them from the clientHost (the client http host doesn't track them). You may optionally close the idle connections on them with client.CloseIdleConnections, but it's not necessary. The HTTP transport or stream host is responsible for managing/closing these connections. Clients are relatively cheap to make.

req.NoError(err)
return &c
}},
{"libp2p stream transport", func(t *testing.T) *http.Client {
clientStreamHost, err := libp2p.New(libp2p.NoListenAddrs)
req.NoError(err)
clientHost, err := libp2phttp.New(libp2phttp.StreamHost(clientStreamHost))
req.NoError(err)

c, err := clientHost.NamespacedClient(protoID, peer.AddrInfo{ID: publisherStreamHost.ID(), Addrs: []multiaddr.Multiaddr{serverStreamMa}})
req.NoError(err)

wk, err := clientHost.GetAndStorePeerProtoMap(c.Transport, publisherStreamHost.ID())
req.NoError(err)
// Assert we see the ipni protocol in the well known map
req.Contains(wk, protoID)

return &c
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing special here. Just plumbing to set up the test.

clientStore := &correctedMemStore{&memstore.Store{
Bag: make(map[string][]byte),
}}
clientLsys := cidlink.DefaultLinkSystem()
clientLsys.TrustedStorage = true
clientLsys.SetReadStorage(clientStore)
clientLsys.SetWriteStorage(clientStore)
clientSync := httpsync.NewSync(clientLsys, tc.newClient(t), nil)

clientSyncer, err := clientSync.NewSyncerWithoutAddrs(publisher.ID())
req.NoError(err)

headCid, err := clientSyncer.GetHead(ctx)
req.NoError(err)

req.Equal(link.(cidlink.Link).Cid, headCid)

clientSyncer.Sync(ctx, headCid, selectorparse.CommonSelector_MatchPoint)
require.NoError(t, err)

// Assert that data is loadable from the link system.
wantLink := cidlink.Link{Cid: headCid}
node, err := clientLsys.Load(ipld.LinkContext{Ctx: ctx}, wantLink, basicnode.Prototype.Any)
require.NoError(t, err)

// Assert synced node link matches the computed link, i.e. is spec-compliant.
gotLink, err := clientLsys.ComputeLink(wantLink.Prototype(), node)
require.NoError(t, err)
require.Equal(t, gotLink, wantLink, "computed %s but got %s", gotLink.String(), wantLink.String())
})
}
}

func mapKeys(t *testing.T, n ipld.Node) []string {
var keys []string
require.Equal(t, n.Kind(), datamodel.Kind_Map)
Expand Down
9 changes: 9 additions & 0 deletions dagsync/httpsync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ func (s *Sync) NewSyncer(peerID peer.ID, peerAddrs []multiaddr.Multiaddr) (*Sync
}, nil
}

func (s *Sync) NewSyncerWithoutAddrs(peerID peer.ID) (*Syncer, error) {
return &Syncer{
peerID: peerID,
rootURL: url.URL{Path: "/"},
urls: nil,
sync: s,
}, nil
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A Syncer instance is created for each sync operation and is responsible for making a request to a specific advertisement publisher. This means that each instance of aSyncer requires the addresses to connect to since these will not be known in advance.

I think instead of NewSyncerWithoutAddrs here, that NewSyncer should be modified to create a new clientHost.NamespacedClient if its parent Sync is using a libp2phttp client. The NamespacedClient can then be used as the http.Client in Syncer functions.

Will it be necessary to close each NamespacedClient instance and remove it from the clientHost and remove its peer info from the peerstore? Will that happen automatically after some time?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think instead of NewSyncerWithoutAddrs here, that NewSyncer should be modified to create a new clientHost.NamespacedClient if its parent Sync is using a libp2phttp client. The NamespacedClient can then be used as the http.Client in Syncer functions.

👍 . I thought about this change, but wanted to make a less invasive change to demo. I like it though.

Will it be necessary to close each NamespacedClient instance and remove it from the clientHost and remove its peer info from the peerstore? Will that happen automatically after some time?

Happens automatically :) (just like the stock http.Client does it)


func (s *Sync) Close() {
s.client.CloseIdleConnections()
}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,5 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
)

replace github.com/libp2p/go-libp2p v0.29.0 => github.com/libp2p/go-libp2p v0.29.1-0.20230804182920-49d7db486c5e
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,8 @@ github.com/libp2p/go-cidranger v1.1.0 h1:ewPN8EZ0dd1LSnrtuwd4709PXVcITVeuwbag38y
github.com/libp2p/go-cidranger v1.1.0/go.mod h1:KWZTfSr+r9qEo9OkI9/SIEeAtw+NNoU0dXIXt15Okic=
github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFGkx3Q3WM=
github.com/libp2p/go-flow-metrics v0.1.0/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro=
github.com/libp2p/go-libp2p v0.29.0 h1:QduJ2XQr/Crg4EnloueWDL0Jj86N3Ezhyyj7XH+XwHI=
github.com/libp2p/go-libp2p v0.29.0/go.mod h1:iNKL7mEnZ9wAss+03IjAwM9ZAQXfVUAPUUmOACQfQ/g=
github.com/libp2p/go-libp2p v0.29.1-0.20230804182920-49d7db486c5e h1:OGUuDNhPAEt58YoUW5fSSB1XeQB3k5OFPokuLc1KVeA=
github.com/libp2p/go-libp2p v0.29.1-0.20230804182920-49d7db486c5e/go.mod h1:iNKL7mEnZ9wAss+03IjAwM9ZAQXfVUAPUUmOACQfQ/g=
github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLEQHwOCZ7s8s=
github.com/libp2p/go-libp2p-asn-util v0.3.0/go.mod h1:B1mcOrKUE35Xq/ASTmQ4tN3LNzVVaMNmq2NACuqyB9w=
github.com/libp2p/go-libp2p-gostream v0.6.0 h1:QfAiWeQRce6pqnYfmIVWJFXNdDyfiR/qkCnjyaZUPYU=
Expand Down