Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

websocket: add reuseport support #2261

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
e7b4454
feat: added REUSEPORT support for websocket listeners #1435
chaitanyaprem Apr 26, 2023
445f860
Added support for using reuseport in connection Dialing #1435
chaitanyaprem Apr 26, 2023
4279f92
chore: cleaned-up the websocket reuse test
chaitanyaprem Apr 26, 2023
80b3b34
chore: addressed review comments for #2261
chaitanyaprem Apr 26, 2023
804ba36
chore: address linter error in websocket test script
chaitanyaprem Apr 27, 2023
f73c1ba
Merge branch 'master' into feat/websocket-reuseport
chaitanyaprem May 16, 2023
cc83e5a
chore: addressed review comments wrt test.
chaitanyaprem May 21, 2023
9c539a6
Merge branch 'libp2p:master' into feat/websocket-reuseport
chaitanyaprem May 21, 2023
153829a
chore: addressed review comments and cleanedup test script
chaitanyaprem May 23, 2023
07aae9b
Update websocket_test.go
chaitanyaprem May 26, 2023
ad59795
feat: use reuseport wihle dialing client connections if set
chaitanyaprem May 30, 2023
b170b43
Merge branch 'master' into feat/websocket-reuseport
chaitanyaprem Aug 4, 2023
863f51d
Merge branch 'master' into feat/websocket-reuseport
chaitanyaprem Aug 15, 2023
4cccef8
Merge branch 'libp2p:master' into feat/websocket-reuseport
chaitanyaprem Aug 16, 2023
4179d80
Merge branch 'libp2p:master' into feat/websocket-reuseport
chaitanyaprem Aug 17, 2023
8840f19
chore:comment lb check in reuseport test to avoid flaky results
chaitanyaprem Aug 17, 2023
f660226
chore: fix gofmt error
chaitanyaprem Aug 17, 2023
3fea95c
Merge branch 'master' into feat/websocket-reuseport
chaitanyaprem Sep 6, 2023
c01a725
chore: increase number of clients to avoid flakiness of reuseport test
chaitanyaprem Sep 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion p2p/transport/websocket/addrs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestConvertWebsocketMultiaddrToNetAddr(t *testing.T) {
}

func TestListeningOnDNSAddr(t *testing.T) {
ln, err := newListener(ma.StringCast("/dns/localhost/tcp/0/ws"), nil)
ln, err := newListener(ma.StringCast("/dns/localhost/tcp/0/ws"), nil, false)
require.NoError(t, err)
addr := ln.Multiaddr()
first, rest := ma.SplitFirst(addr)
Expand Down
21 changes: 16 additions & 5 deletions p2p/transport/websocket/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-reuseport"

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
Expand Down Expand Up @@ -40,7 +41,8 @@ func (pwma *parsedWebsocketMultiaddr) toMultiaddr() ma.Multiaddr {

// newListener creates a new listener from a raw net.Listener.
// tlsConf may be nil (for unencrypted websockets).
func newListener(a ma.Multiaddr, tlsConf *tls.Config) (*listener, error) {
func newListener(a ma.Multiaddr, tlsConf *tls.Config, reuseportAvailable bool) (*listener, error) {
var nl net.Listener
parsed, err := parseWebsocketMultiaddr(a)
if err != nil {
return nil, err
Expand All @@ -54,11 +56,20 @@ func newListener(a ma.Multiaddr, tlsConf *tls.Config) (*listener, error) {
if err != nil {
return nil, err
}
nl, err := net.Listen(lnet, lnaddr)
if err != nil {
return nil, err
if reuseportAvailable {
nl, err = reuseport.Listen(lnet, lnaddr)
if err != nil {
Comment on lines +60 to +61
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to use the transport's reuseTransport field. Otherwise this will never reuse the port we're listening on when dialing. I guess you can pass them in to newListener.

Also add a test that we are reusing port when dialing.

nl, err = net.Listen(lnet, lnaddr)
if err != nil {
return nil, err
}
}
} else {
nl, err = net.Listen(lnet, lnaddr)
if err != nil {
return nil, err
}
}

laddr, err := manet.FromNetAddr(nl.Addr())
if err != nil {
return nil, err
Expand Down
32 changes: 27 additions & 5 deletions p2p/transport/websocket/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-reuseport"

ws "github.com/gorilla/websocket"
reuseTransport "github.com/libp2p/go-libp2p/p2p/net/reuseport"
ma "github.com/multiformats/go-multiaddr"
mafmt "github.com/multiformats/go-multiaddr-fmt"
manet "github.com/multiformats/go-multiaddr/net"

ws "github.com/gorilla/websocket"
)

// WsFmt is multiaddr formatter for WsProtocol
Expand Down Expand Up @@ -60,6 +61,13 @@ var upgrader = ws.Upgrader{

type Option func(*WebsocketTransport) error

func EnableReuseport() Option {
return func(tr *WebsocketTransport) error {
tr.reuseport = true
return nil
}
}

// WithTLSClientConfig sets a TLS client configuration on the WebSocket Dialer. Only
// relevant for non-browser usages.
//
Expand All @@ -85,8 +93,10 @@ type WebsocketTransport struct {
upgrader transport.Upgrader
rcmgr network.ResourceManager

tlsClientConf *tls.Config
tlsConf *tls.Config
tlsClientConf *tls.Config
tlsConf *tls.Config
reuseport bool //reuseport is disabled by default, can be enabled by passing it as an option.
reuseTransport reuseTransport.Transport
}

var _ transport.Transport = (*WebsocketTransport)(nil)
Expand All @@ -95,6 +105,7 @@ func New(u transport.Upgrader, rcmgr network.ResourceManager, opts ...Option) (*
if rcmgr == nil {
rcmgr = &network.NullResourceManager{}
}

t := &WebsocketTransport{
upgrader: u,
rcmgr: rcmgr,
Expand Down Expand Up @@ -187,7 +198,13 @@ func (t *WebsocketTransport) maDial(ctx context.Context, raddr ma.Multiaddr) (ma
return nil, err
}
isWss := wsurl.Scheme == "wss"

dialer := ws.Dialer{HandshakeTimeout: 30 * time.Second}
if t.UseReuseport() {
dialer.NetDial = func(network, address string) (net.Conn, error) {
return t.reuseTransport.DialContext(ctx, raddr)
}
}
if isWss {
sni := ""
sni, err = raddr.ValueForProtocol(ma.P_SNI)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expand Down Expand Up @@ -229,7 +246,7 @@ func (t *WebsocketTransport) maDial(ctx context.Context, raddr ma.Multiaddr) (ma
}

func (t *WebsocketTransport) maListen(a ma.Multiaddr) (manet.Listener, error) {
l, err := newListener(a, t.tlsConf)
l, err := newListener(a, t.tlsConf, t.UseReuseport())
if err != nil {
return nil, err
}
Expand All @@ -244,3 +261,8 @@ func (t *WebsocketTransport) Listen(a ma.Multiaddr) (transport.Listener, error)
}
return &transportListener{Listener: t.upgrader.UpgradeListener(t, malist)}, nil
}

// UseReuseport returns true if reuseport is enabled and available.
func (t *WebsocketTransport) UseReuseport() bool {
return t.reuseport && reuseport.Available()
}
101 changes: 101 additions & 0 deletions p2p/transport/websocket/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"math/big"
"net"
"net/http"
"runtime"
"strings"
"testing"
"time"
Expand All @@ -32,6 +33,7 @@ import (
ttransport "github.com/libp2p/go-libp2p/p2p/transport/testsuite"

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -548,3 +550,102 @@ func TestResolveMultiaddr(t *testing.T) {
})
}
}

func startListeners(t *testing.T, tpt *WebsocketTransport) (ma.Multiaddr, []*manet.Listener, error) {
t.Helper()
laddr := ma.StringCast("/ip4/127.0.0.1/tcp/0/ws")
listeners := make([]*manet.Listener, 2)

l, err := tpt.maListen(laddr)
require.NoError(t, err)
listeners[0] = &l

port := l.Addr().(*net.TCPAddr).Port
t.Logf("Port allocated for listener: %d", port)
laddr = ma.StringCast(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/ws", port))
l1, err := tpt.maListen(laddr)
require.NoError(t, err)
listeners[1] = &l1
return laddr, listeners, nil
}

func TestListenerReusePort(t *testing.T) {
noOfClientConns := 20
var opts []Option
opts = append(opts, EnableReuseport())
_, u := newUpgrader(t)
tpt, err := New(u, &network.NullResourceManager{}, opts...)
require.NoError(t, err)
c := make(chan int, noOfClientConns)
raddr, listeners, err := startListeners(t, tpt)
require.NoErrorf(t, err, "failed to start listeners")
for i := 0; i < 2; i++ {
go func(index int, ln *manet.Listener, ch chan int) {
l := *ln
defer l.Close()
/* Looping noOfClientConns times to ensure all 4 connections are handled.
With SO_REUSEPORT the distribution happens based on threads that are waiting on Accept call as mentioned below.
We cannot gaurantee when Server go-routines would block on Accept.
Sometimes the client routines get scheduled first causing unequal distribution of connections.
Ref: https://lwn.net/Articles/542629/
By contrast, the SO_REUSEPORT implementation distributes connections evenly across all of the threads (or processes)
that are blocked in accept() on the same port. */
for j := 0; j < noOfClientConns; j++ {
//j := 0
conn, err := l.Accept()
require.NoErrorf(t, err, "Server Routine-", index, " Failed accepting connection ", j, " due to error ")
defer conn.Close()
buf := make([]byte, 5)
n, err := conn.Read(buf)
require.NoError(t, err)
require.Equal(t, 5, n)
n, err = conn.Write(buf)
require.NoError(t, err)
require.Equal(t, 5, n)
ch <- index
}

}(i, listeners[i], c)
}

for i := 0; i < noOfClientConns; i++ {
go func(index int) {
conn, err := tpt.maDial(context.Background(), raddr)
require.NoError(t, err)
defer conn.Close()
msg := "Hello"
n, err := conn.Write([]byte(msg))
require.Equal(t, 5, n)
require.NoError(t, err)
buf := make([]byte, 5)
n, err = conn.Read(buf)
require.NoError(t, err)
require.Equal(t, 5, n)
}(i)
}
var connsHandled [2]int
//Waiting to ensure all 4 connections are handled.
for i := 0; i < noOfClientConns; i++ {
temp := <-c
connsHandled[temp]++
}
/*
For windows and macOS load balancing is not done by kernel as per references below.
For other architectures, behaviour is not known.
For ubuntu load balancing doesn't seem to happen consistently.
In order to not have a flaky test, commenting this additiona check.
Hence, Check for load balancing only for linux based architectures.
Refer https://learn.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse?redirectedfrom=MSDN for windows
References for MACOS
Ref - (https://stackoverflow.com/questions/41247790/so-reuseport-on-macos-with-libuv)
Ref -(https://github.com/uNetworking/uWebSockets/issues/1194)
*/
if runtime.GOOS == "linux" {
for i := 0; i < 2; i++ {
/*Not checking for equal distribution of connections due to above explanation.*/
require.NotEqualf(t, 0, connsHandled[i], "No connections handled by listener %d", i)
t.Logf("Listener %d handled %d connections.", i, connsHandled[i])
}
}

}