Skip to content

Commit

Permalink
swarm: integrate webrtc dialing
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Sep 25, 2023
1 parent 9baf4ad commit 5472eb9
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 14 deletions.
7 changes: 7 additions & 0 deletions core/network/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ func WithForceDirectDial(ctx context.Context, reason string) context.Context {
return context.WithValue(ctx, forceDirectDial, reason)
}

// WithoutForceDirectDial constructs a new context with the ForceDirectDial option dropped.
// This is useful in case establishing a direct connection first requires establishing a
// relayed connection e.g. dialing /webrtc addresses.
func WithoutForceDirectDial(ctx context.Context) context.Context {
return context.WithValue(ctx, forceDirectDial, nil)
}

// EXPERIMENTAL
// GetForceDirectDial returns true if the force direct dial option is set in the context.
func GetForceDirectDial(ctx context.Context) (forceDirect bool, reason string) {
Expand Down
6 changes: 5 additions & 1 deletion p2p/net/swarm/dial_ranker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ func NoDelayDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
// no additional latency in the vast majority of cases.
//
// Private and public address groups are dialed in parallel.
//
// Dialing relay addresses is delayed by 500 ms, if we have any non-relay alternatives.
// We treat webrtc addresses the same as relay addresses as we need a relay connection to establish a
// webrtc connection. So any available direct addresses are preferred over webrtc addresses.
//
// Within each group (private, public, relay addresses) we apply the following ranking logic:
//
Expand All @@ -61,7 +64,8 @@ func NoDelayDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
//
// We dial lowest ports first for QUIC addresses as they are more likely to be the listen port.
func DefaultDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
relay, addrs := filterAddrs(addrs, isRelayAddr)
// includes /webrtc addresses too
relay, addrs := filterAddrs(addrs, func(a ma.Multiaddr) bool { return isProtocolAddr(a, ma.P_CIRCUIT) })
pvt, addrs := filterAddrs(addrs, manet.IsPrivateAddr)
public, addrs := filterAddrs(addrs, func(a ma.Multiaddr) bool { return isProtocolAddr(a, ma.P_IP4) || isProtocolAddr(a, ma.P_IP6) })

Expand Down
5 changes: 0 additions & 5 deletions p2p/net/swarm/swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,11 +591,6 @@ func isFdConsumingAddr(addr ma.Multiaddr) bool {
return err1 == nil || err2 == nil
}

func isRelayAddr(addr ma.Multiaddr) bool {
_, err := addr.ValueForProtocol(ma.P_CIRCUIT)
return err == nil
}

// filterLowPriorityAddresses removes addresses inplace for which we have a better alternative
// 1. If a /quic-v1 address is present, filter out /quic and /webtransport address on the same 2-tuple:
// QUIC v1 is preferred over the deprecated QUIC draft-29, and given the choice, we prefer using
Expand Down
5 changes: 4 additions & 1 deletion p2p/net/swarm/swarm_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ func (s *Swarm) TransportForDialing(a ma.Multiaddr) transport.Transport {
}
return nil
}
if isRelayAddr(a) {
if isProtocolAddr(a, ma.P_WEBRTC) {
return s.transports.m[ma.P_WEBRTC]
}
if isProtocolAddr(a, ma.P_CIRCUIT) {
return s.transports.m[ma.P_CIRCUIT]
}
for _, t := range s.transports.m {
Expand Down
80 changes: 80 additions & 0 deletions p2p/test/swarm/swarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
libp2pwebrtcprivate "github.com/libp2p/go-libp2p/p2p/transport/webrtcprivate"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -68,3 +70,81 @@ func TestDialPeerTransientConnection(t *testing.T) {
require.Error(t, err)
require.Nil(t, conn)
}

func TestDialPeerWebRTC(t *testing.T) {
h1, err := libp2p.New(
libp2p.NoListenAddrs,
libp2p.EnableRelay(),
)
require.NoError(t, err)

h2, err := libp2p.New(
libp2p.NoListenAddrs,
libp2p.EnableRelay(),
)
require.NoError(t, err)

relay1, err := libp2p.New()
require.NoError(t, err)

_, err = relay.New(relay1)
require.NoError(t, err)

relay1info := peer.AddrInfo{
ID: relay1.ID(),
Addrs: relay1.Addrs(),
}

err = h2.Connect(context.Background(), relay1info)
require.NoError(t, err)

_, err = client.Reserve(context.Background(), h2, relay1info)
require.NoError(t, err)

_, err = libp2pwebrtcprivate.AddTransport(h1, nil)
require.NoError(t, err)
_, err = libp2pwebrtcprivate.AddTransport(h2, nil)
require.NoError(t, err)

webrtcAddr := ma.StringCast(relay1info.Addrs[0].String() + "/p2p/" + relay1info.ID.String() + "/p2p-circuit/webrtc/p2p/" + h2.ID().String())
relayAddrs := ma.StringCast(relay1info.Addrs[0].String() + "/p2p/" + relay1info.ID.String() + "/p2p-circuit/p2p/" + h2.ID().String())

h1.Peerstore().AddAddrs(h2.ID(), []ma.Multiaddr{webrtcAddr, relayAddrs}, peerstore.TempAddrTTL)

// swarm.DialPeer should connect over transient connections
conn1, err := h1.Network().DialPeer(context.Background(), h2.ID())
require.NoError(t, err)
require.NotNil(t, conn1)
require.Condition(t, func() bool {
_, err1 := conn1.RemoteMultiaddr().ValueForProtocol(ma.P_CIRCUIT)
_, err2 := conn1.RemoteMultiaddr().ValueForProtocol(ma.P_WEBRTC)
return err1 == nil && err2 != nil
})

// should connect to webrtc address
ctx := network.WithForceDirectDial(context.Background(), "test")
conn, err := h1.Network().DialPeer(ctx, h2.ID())
require.NoError(t, err)
require.NotNil(t, conn)
require.Condition(t, func() bool {
_, err1 := conn.RemoteMultiaddr().ValueForProtocol(ma.P_CIRCUIT)
_, err2 := conn.RemoteMultiaddr().ValueForProtocol(ma.P_WEBRTC)
return err1 != nil && err2 == nil
})

done := make(chan struct{})
h2.SetStreamHandler("test-addr", func(s network.Stream) {
s.Conn().LocalMultiaddr()
_, err1 := conn.RemoteMultiaddr().ValueForProtocol(ma.P_CIRCUIT)
assert.Error(t, err1)
_, err2 := conn.RemoteMultiaddr().ValueForProtocol(ma.P_WEBRTC)
assert.NoError(t, err2)
s.Reset()
close(done)
})

s, err := h1.NewStream(context.Background(), h2.ID(), "test-addr")
require.NoError(t, err)
s.Write([]byte("test"))
<-done
}
14 changes: 9 additions & 5 deletions p2p/transport/webrtcprivate/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ type transport struct {

var _ tpt.Transport = &transport{}

func AddTransport(h host.Host) (*transport, error) {
func AddTransport(h host.Host, gater connmgr.ConnectionGater) (*transport, error) {
n, ok := h.Network().(tpt.TransportNetwork)
if !ok {
return nil, fmt.Errorf("%v is not a transport network", h.Network())
}

t, err := newTransport(h)
t, err := newTransport(h, gater)
if err != nil {
return nil, err
}
Expand All @@ -82,7 +82,7 @@ func AddTransport(h host.Host) (*transport, error) {
return t, nil
}

func newTransport(h host.Host) (*transport, error) {
func newTransport(h host.Host, gater connmgr.ConnectionGater) (*transport, error) {
// We use elliptic P-256 since it is widely supported by browsers.
//
// Implementation note: Testing with the browser,
Expand Down Expand Up @@ -111,6 +111,7 @@ func newTransport(h host.Host) (*transport, error) {
rcmgr: h.Network().ResourceManager(),
webrtcConfig: config,
maxInFlightConnections: 16,
gater: gater,
}, nil
}

Expand All @@ -136,6 +137,11 @@ func (t *transport) CanDial(addr ma.Multiaddr) bool {
func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tpt.CapableConn, error) {
// Connect to the peer on the circuit address
relayAddr := getRelayAddr(raddr)
// We drop the ForceDirectDial option as we need a relayed connection before we can
// setup a direct connection
ctx = network.WithoutForceDirectDial(ctx)
// We need this for the signaling stream
ctx = network.WithUseTransient(ctx, "webrtcprivate dial")
err := t.host.Connect(ctx, peer.AddrInfo{ID: p, Addrs: []ma.Multiaddr{relayAddr}})
if err != nil {
return nil, fmt.Errorf("failed to open %s stream: %w", SignalingProtocol, err)
Expand All @@ -160,8 +166,6 @@ func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tp
}

func (t *transport) dialWithScope(ctx context.Context, p peer.ID, scope network.ConnManagementScope) (tpt.CapableConn, error) {
// Start signaling protocol stream
ctx = network.WithUseTransient(ctx, "webrtcprivate dial")
s, err := t.host.NewStream(ctx, p, SignalingProtocol)
if err != nil {
return nil, fmt.Errorf("error opening stream %s: %w", SignalingProtocol, err)
Expand Down
4 changes: 2 additions & 2 deletions p2p/transport/webrtcprivate/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func newWebRTCHost(t *testing.T) *webrtcHost {
upg := swarmt.GenUpgrader(t, as, nil)
err := client.AddTransport(a, upg)
require.NoError(t, err)
ta, err := newTransport(a)
ta, err := newTransport(a, nil)
require.NoError(t, err)
return &webrtcHost{
Host: a,
Expand All @@ -68,7 +68,7 @@ func newRelayedHost(t *testing.T) *relayedHost {
client.AddTransport(p, upg)
_, err = client.Reserve(context.Background(), p, peer.AddrInfo{ID: rh.ID(), Addrs: rh.Addrs()})
require.NoError(t, err)
tp, err := newTransport(p)
tp, err := newTransport(p, nil)
require.NoError(t, err)
return &relayedHost{
webrtcHost: webrtcHost{
Expand Down

0 comments on commit 5472eb9

Please sign in to comment.