Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

websocket: add reuseport support #2261

Closed
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
e7b4454
feat: added REUSEPORT support for websocket listeners #1435
chaitanyaprem Apr 26, 2023
445f860
Added support for using reuseport in connection Dialing #1435
chaitanyaprem Apr 26, 2023
4279f92
chore: cleaned-up the websocket reuse test
chaitanyaprem Apr 26, 2023
80b3b34
chore: addressed review comments for #2261
chaitanyaprem Apr 26, 2023
804ba36
chore: address linter error in websocket test script
chaitanyaprem Apr 27, 2023
f73c1ba
Merge branch 'master' into feat/websocket-reuseport
chaitanyaprem May 16, 2023
cc83e5a
chore: addressed review comments wrt test.
chaitanyaprem May 21, 2023
9c539a6
Merge branch 'libp2p:master' into feat/websocket-reuseport
chaitanyaprem May 21, 2023
153829a
chore: addressed review comments and cleanedup test script
chaitanyaprem May 23, 2023
07aae9b
Update websocket_test.go
chaitanyaprem May 26, 2023
ad59795
feat: use reuseport wihle dialing client connections if set
chaitanyaprem May 30, 2023
b170b43
Merge branch 'master' into feat/websocket-reuseport
chaitanyaprem Aug 4, 2023
863f51d
Merge branch 'master' into feat/websocket-reuseport
chaitanyaprem Aug 15, 2023
4cccef8
Merge branch 'libp2p:master' into feat/websocket-reuseport
chaitanyaprem Aug 16, 2023
4179d80
Merge branch 'libp2p:master' into feat/websocket-reuseport
chaitanyaprem Aug 17, 2023
8840f19
chore:comment lb check in reuseport test to avoid flaky results
chaitanyaprem Aug 17, 2023
f660226
chore: fix gofmt error
chaitanyaprem Aug 17, 2023
3fea95c
Merge branch 'master' into feat/websocket-reuseport
chaitanyaprem Sep 6, 2023
c01a725
chore: increase number of clients to avoid flakiness of reuseport test
chaitanyaprem Sep 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion p2p/transport/websocket/addrs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestConvertWebsocketMultiaddrToNetAddr(t *testing.T) {
}

func TestListeningOnDNSAddr(t *testing.T) {
ln, err := newListener(ma.StringCast("/dns/localhost/tcp/0/ws"), nil)
ln, err := newListener(ma.StringCast("/dns/localhost/tcp/0/ws"), nil, false)
require.NoError(t, err)
addr := ln.Multiaddr()
first, rest := ma.SplitFirst(addr)
Expand Down
21 changes: 16 additions & 5 deletions p2p/transport/websocket/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/url"

"github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-reuseport"

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
Expand Down Expand Up @@ -44,7 +45,8 @@ func (pwma *parsedWebsocketMultiaddr) toMultiaddr() ma.Multiaddr {

// newListener creates a new listener from a raw net.Listener.
// tlsConf may be nil (for unencrypted websockets).
func newListener(a ma.Multiaddr, tlsConf *tls.Config) (*listener, error) {
func newListener(a ma.Multiaddr, tlsConf *tls.Config, reuseportAvailable bool) (*listener, error) {
var nl net.Listener
parsed, err := parseWebsocketMultiaddr(a)
if err != nil {
return nil, err
Expand All @@ -58,11 +60,20 @@ func newListener(a ma.Multiaddr, tlsConf *tls.Config) (*listener, error) {
if err != nil {
return nil, err
}
nl, err := net.Listen(lnet, lnaddr)
if err != nil {
return nil, err
if reuseportAvailable {
nl, err = reuseport.Listen(lnet, lnaddr)
if err != nil {
Comment on lines +60 to +61
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to use the transport's reuseTransport field. Otherwise this will never reuse the port we're listening on when dialing. I guess you can pass them in to newListener.

Also add a test that we are reusing port when dialing.

nl, err = net.Listen(lnet, lnaddr)
if err != nil {
return nil, err
}
}
} else {
nl, err = net.Listen(lnet, lnaddr)
if err != nil {
return nil, err
}
}

laddr, err := manet.FromNetAddr(nl.Addr())
if err != nil {
return nil, err
Expand Down
28 changes: 26 additions & 2 deletions p2p/transport/websocket/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-reuseport"

reusetransport "github.com/libp2p/go-libp2p/p2p/net/reuseport"
ma "github.com/multiformats/go-multiaddr"
mafmt "github.com/multiformats/go-multiaddr-fmt"
manet "github.com/multiformats/go-multiaddr/net"
Expand Down Expand Up @@ -55,6 +57,13 @@ func init() {

type Option func(*WebsocketTransport) error

func EnableReuseport() Option {
return func(tr *WebsocketTransport) error {
tr.reuseport = true
return nil
}
}

// WithTLSClientConfig sets a TLS client configuration on the WebSocket Dialer. Only
// relevant for non-browser usages.
//
Expand Down Expand Up @@ -82,6 +91,8 @@ type WebsocketTransport struct {

tlsClientConf *tls.Config
tlsConf *tls.Config
reuseport bool //reuseport is disabled by default, can be enabled by passing it as an option.
reuse reusetransport.Transport
}

var _ transport.Transport = (*WebsocketTransport)(nil)
Expand All @@ -90,6 +101,7 @@ func New(u transport.Upgrader, rcmgr network.ResourceManager, opts ...Option) (*
if rcmgr == nil {
rcmgr = &network.NullResourceManager{}
}

t := &WebsocketTransport{
upgrader: u,
rcmgr: rcmgr,
Expand Down Expand Up @@ -188,7 +200,14 @@ func (t *WebsocketTransport) maDial(ctx context.Context, raddr ma.Multiaddr) (ma

transport := &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
conn, err := net.Dial(network, addr)
var conn manet.Conn
var err error
if t.UseReuseport() {
conn, err = t.reuse.DialContext(ctx, raddr)
} else {
var d manet.Dialer
conn, err = d.DialContext(ctx, raddr)
}
if err != nil {
close(localAddrChan)
return nil, err
Expand Down Expand Up @@ -267,7 +286,7 @@ func (t *WebsocketTransport) maDial(ctx context.Context, raddr ma.Multiaddr) (ma
}

func (t *WebsocketTransport) maListen(a ma.Multiaddr) (manet.Listener, error) {
l, err := newListener(a, t.tlsConf)
l, err := newListener(a, t.tlsConf, t.reuseport)
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
Expand All @@ -282,3 +301,8 @@ func (t *WebsocketTransport) Listen(a ma.Multiaddr) (transport.Listener, error)
}
return &transportListener{Listener: t.upgrader.UpgradeListener(t, malist)}, nil
}

// UseReuseport returns true if reuseport is enabled and available.
func (t *WebsocketTransport) UseReuseport() bool {
return t.reuseport && reuseport.Available()
}
68 changes: 68 additions & 0 deletions p2p/transport/websocket/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,3 +548,71 @@ func TestResolveMultiaddr(t *testing.T) {
})
}
}

func TestListenerResusePort(t *testing.T) {
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
laddr := ma.StringCast("/ip4/127.0.0.1/tcp/5002/ws")
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved

var opts []Option
opts = append(opts, EnableReuseport())
_, u := newUpgrader(t)
tpt, err := New(u, &network.NullResourceManager{}, opts...)
require.NoError(t, err)
c := make(chan int)

for i := 0; i < 2; i++ {
go func(index int, ch chan int) {
l, err := tpt.maListen(laddr)
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
fmt.Println("Failed to listen on websocket due to error ", err)
}
require.NoError(t, err)
require.Equal(t, lastComponent(t, l.Multiaddr()), wsComponent)
defer l.Close()
c <- index
/* Looping 4 times to ensure all 4 connections are handled.
Noticed that sometimes the distribution of connections was
not clearly load-balanced leading to more than 2 being delivered to 1 listener. */
for j := 0; j < 4; j++ {
conn, err := l.Accept()
if err != nil {
fmt.Println("Routine-", index, " Failed accepting connection due to error ", err)
}
require.NoError(t, err)
defer conn.Close()
buf := make([]byte, 6)
n, err := conn.Read(buf)
if n != 6 {
t.Errorf("read %d bytes, expected 2", n)
}
require.NoError(t, err)
c <- j
}
}(i, c)
}
for i := 0; i < 2; i++ {
<-c
}
for i := 0; i < 4; i++ {
go func(index int, ch chan int) {
c, err := tpt.maDial(context.Background(), laddr)
if err != nil {
t.Error(err)
return
}
require.NoError(t, err)
defer c.Close()
msg := fmt.Sprintf("Hello%d", index)
n, err := c.Write([]byte(msg))
if n != 6 {
t.Errorf("expected to write 0 bytes, wrote %d", n)
}
if err != nil {
t.Error(err)
return
}
}(i, c)
}
for i := 0; i < 4; i++ {
<-c
}
}