From e7b44547e8d6a5674cd77d5d92aed8e3e22b941b Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 26 Apr 2023 11:08:59 +0530 Subject: [PATCH 01/12] feat: added REUSEPORT support for websocket listeners #1435 --- p2p/transport/websocket/addrs_test.go | 2 +- p2p/transport/websocket/listener.go | 21 ++++++--- p2p/transport/websocket/websocket.go | 17 +++++++- p2p/transport/websocket/websocket_test.go | 52 +++++++++++++++++++++++ 4 files changed, 85 insertions(+), 7 deletions(-) diff --git a/p2p/transport/websocket/addrs_test.go b/p2p/transport/websocket/addrs_test.go index 33a4839d06..436fe6fd34 100644 --- a/p2p/transport/websocket/addrs_test.go +++ b/p2p/transport/websocket/addrs_test.go @@ -74,7 +74,7 @@ func TestConvertWebsocketMultiaddrToNetAddr(t *testing.T) { } func TestListeningOnDNSAddr(t *testing.T) { - ln, err := newListener(ma.StringCast("/dns/localhost/tcp/0/ws"), nil) + ln, err := newListener(ma.StringCast("/dns/localhost/tcp/0/ws"), nil, false) require.NoError(t, err) addr := ln.Multiaddr() first, rest := ma.SplitFirst(addr) diff --git a/p2p/transport/websocket/listener.go b/p2p/transport/websocket/listener.go index ab9a73f8ab..e14b7dc97e 100644 --- a/p2p/transport/websocket/listener.go +++ b/p2p/transport/websocket/listener.go @@ -10,6 +10,7 @@ import ( "net/url" "github.com/libp2p/go-libp2p/core/transport" + "github.com/libp2p/go-reuseport" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" @@ -44,7 +45,8 @@ func (pwma *parsedWebsocketMultiaddr) toMultiaddr() ma.Multiaddr { // newListener creates a new listener from a raw net.Listener. // tlsConf may be nil (for unencrypted websockets). -func newListener(a ma.Multiaddr, tlsConf *tls.Config) (*listener, error) { +func newListener(a ma.Multiaddr, tlsConf *tls.Config, reuseportAvailable bool) (*listener, error) { + var nl net.Listener parsed, err := parseWebsocketMultiaddr(a) if err != nil { return nil, err @@ -58,11 +60,20 @@ func newListener(a ma.Multiaddr, tlsConf *tls.Config) (*listener, error) { if err != nil { return nil, err } - nl, err := net.Listen(lnet, lnaddr) - if err != nil { - return nil, err + if reuseportAvailable { + nl, err = reuseport.Listen(lnet, lnaddr) + if err != nil { + nl, err = net.Listen(lnet, lnaddr) + if err != nil { + return nil, err + } + } + } else { + nl, err = net.Listen(lnet, lnaddr) + if err != nil { + return nil, err + } } - laddr, err := manet.FromNetAddr(nl.Addr()) if err != nil { return nil, err diff --git a/p2p/transport/websocket/websocket.go b/p2p/transport/websocket/websocket.go index a78add9782..554329af68 100644 --- a/p2p/transport/websocket/websocket.go +++ b/p2p/transport/websocket/websocket.go @@ -14,6 +14,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/transport" + "github.com/libp2p/go-reuseport" ma "github.com/multiformats/go-multiaddr" mafmt "github.com/multiformats/go-multiaddr-fmt" @@ -55,6 +56,13 @@ func init() { type Option func(*WebsocketTransport) error +func EnableReuseport() Option { + return func(tr *WebsocketTransport) error { + tr.reuseport = true + return nil + } +} + // WithTLSClientConfig sets a TLS client configuration on the WebSocket Dialer. Only // relevant for non-browser usages. // @@ -82,6 +90,7 @@ type WebsocketTransport struct { tlsClientConf *tls.Config tlsConf *tls.Config + reuseport bool //reuseport is disabled by default, can be enabled by passing it as an option. } var _ transport.Transport = (*WebsocketTransport)(nil) @@ -90,6 +99,7 @@ func New(u transport.Upgrader, rcmgr network.ResourceManager, opts ...Option) (* if rcmgr == nil { rcmgr = &network.NullResourceManager{} } + t := &WebsocketTransport{ upgrader: u, rcmgr: rcmgr, @@ -267,7 +277,7 @@ func (t *WebsocketTransport) maDial(ctx context.Context, raddr ma.Multiaddr) (ma } func (t *WebsocketTransport) maListen(a ma.Multiaddr) (manet.Listener, error) { - l, err := newListener(a, t.tlsConf) + l, err := newListener(a, t.tlsConf, t.reuseport) if err != nil { return nil, err } @@ -282,3 +292,8 @@ func (t *WebsocketTransport) Listen(a ma.Multiaddr) (transport.Listener, error) } return &transportListener{Listener: t.upgrader.UpgradeListener(t, malist)}, nil } + +// UseReuseport returns true if reuseport is enabled and available. +func (t *WebsocketTransport) UseReuseport() bool { + return t.reuseport && reuseport.Available() +} diff --git a/p2p/transport/websocket/websocket_test.go b/p2p/transport/websocket/websocket_test.go index 10878397cf..6846015d26 100644 --- a/p2p/transport/websocket/websocket_test.go +++ b/p2p/transport/websocket/websocket_test.go @@ -16,6 +16,7 @@ import ( "net" "net/http" "strings" + "sync" "testing" "time" @@ -548,3 +549,54 @@ func TestResolveMultiaddr(t *testing.T) { }) } } + +func TestListenerResusePort(t *testing.T) { + laddr := ma.StringCast("/ip4/127.0.0.1/tcp/5002/ws") + fmt.Println("Starting Reuse Port test.") + var wg sync.WaitGroup + var opts []Option + opts = append(opts, EnableReuseport()) + _, u := newUpgrader(t) + tpt, err := New(u, &network.NullResourceManager{}, opts...) + require.NoError(t, err) + fmt.Println("Invoking Go routines.") + for i := 0; i < 2; i++ { + wg.Add(1) + go func(index int) { + l, err := tpt.Listen(laddr) + if err != nil { + fmt.Println("Failed to listen on websocket due to error ", err) + } + require.NoError(t, err) + require.Equal(t, lastComponent(t, l.Multiaddr()), wsComponent) + defer l.Close() + fmt.Println("Routine-", index, " Calling Accept...") + for j := 0; j < 2; j++ { + conn, err := l.Accept() + if err != nil { + fmt.Println("Routine-", index, " Failed accepting connection due to error ", err) + } + //require.NoError(t, err) + fmt.Println("Routine-", index, " Accepting connection ", conn) + defer conn.Close() + } + }(i) + } + time.Sleep(2 * time.Second) + fmt.Println("Invoking Connector Go routines.") + + for i := 0; i < 4; i++ { + go func(index int) { + fmt.Println("Routine-", index, " Initiating connection ") + c, err := tpt.maDial(context.Background(), laddr) + if err != nil { + t.Error(err) + return + } + defer c.Close() + fmt.Println("Sleeping for 10 seconds after connection intiation") + time.Sleep(10 * time.Second) + }(i) + } + wg.Wait() +} From 445f860ba32d4cdb7cd53267914b316ed80a7191 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 26 Apr 2023 12:41:00 +0530 Subject: [PATCH 02/12] Added support for using reuseport in connection Dialing #1435 --- p2p/transport/websocket/websocket.go | 11 ++++++- p2p/transport/websocket/websocket_test.go | 39 ++++++++++++++++------- 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/p2p/transport/websocket/websocket.go b/p2p/transport/websocket/websocket.go index 554329af68..8f793fbb75 100644 --- a/p2p/transport/websocket/websocket.go +++ b/p2p/transport/websocket/websocket.go @@ -16,6 +16,7 @@ import ( "github.com/libp2p/go-libp2p/core/transport" "github.com/libp2p/go-reuseport" + reusetransport "github.com/libp2p/go-libp2p/p2p/net/reuseport" ma "github.com/multiformats/go-multiaddr" mafmt "github.com/multiformats/go-multiaddr-fmt" manet "github.com/multiformats/go-multiaddr/net" @@ -91,6 +92,7 @@ type WebsocketTransport struct { tlsClientConf *tls.Config tlsConf *tls.Config reuseport bool //reuseport is disabled by default, can be enabled by passing it as an option. + reuse reusetransport.Transport } var _ transport.Transport = (*WebsocketTransport)(nil) @@ -198,7 +200,14 @@ func (t *WebsocketTransport) maDial(ctx context.Context, raddr ma.Multiaddr) (ma transport := &http.Transport{ DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { - conn, err := net.Dial(network, addr) + var conn manet.Conn + var err error + if t.UseReuseport() { + conn, err = t.reuse.Dial(raddr) + } else { + var d manet.Dialer + conn, err = d.Dial(raddr) + } if err != nil { close(localAddrChan) return nil, err diff --git a/p2p/transport/websocket/websocket_test.go b/p2p/transport/websocket/websocket_test.go index 6846015d26..9de7e3e530 100644 --- a/p2p/transport/websocket/websocket_test.go +++ b/p2p/transport/websocket/websocket_test.go @@ -552,51 +552,68 @@ func TestResolveMultiaddr(t *testing.T) { func TestListenerResusePort(t *testing.T) { laddr := ma.StringCast("/ip4/127.0.0.1/tcp/5002/ws") - fmt.Println("Starting Reuse Port test.") + //fmt.Println("Starting Reuse Port test.") var wg sync.WaitGroup var opts []Option opts = append(opts, EnableReuseport()) _, u := newUpgrader(t) tpt, err := New(u, &network.NullResourceManager{}, opts...) require.NoError(t, err) - fmt.Println("Invoking Go routines.") + //fmt.Println("Invoking Go routines.") for i := 0; i < 2; i++ { wg.Add(1) go func(index int) { - l, err := tpt.Listen(laddr) + l, err := tpt.maListen(laddr) if err != nil { fmt.Println("Failed to listen on websocket due to error ", err) } require.NoError(t, err) require.Equal(t, lastComponent(t, l.Multiaddr()), wsComponent) defer l.Close() - fmt.Println("Routine-", index, " Calling Accept...") + //fmt.Println("Routine-", index, " Calling Accept...") for j := 0; j < 2; j++ { conn, err := l.Accept() if err != nil { fmt.Println("Routine-", index, " Failed accepting connection due to error ", err) } - //require.NoError(t, err) - fmt.Println("Routine-", index, " Accepting connection ", conn) + require.NoError(t, err) + //fmt.Println("Routine-", index, " Accepting connection ", conn) defer conn.Close() + buf := make([]byte, 6) + n, err := conn.Read(buf) + if n != 6 { + t.Errorf("read %d bytes, expected 2", n) + } + require.NoError(t, err) + fmt.Println("Read bytes:", buf) } }(i) } time.Sleep(2 * time.Second) - fmt.Println("Invoking Connector Go routines.") + //fmt.Println("Invoking Connector Go routines.") for i := 0; i < 4; i++ { go func(index int) { - fmt.Println("Routine-", index, " Initiating connection ") + //fmt.Println("Routine-", index, " Initiating connection ") c, err := tpt.maDial(context.Background(), laddr) if err != nil { t.Error(err) return } + require.NoError(t, err) defer c.Close() - fmt.Println("Sleeping for 10 seconds after connection intiation") - time.Sleep(10 * time.Second) + //fmt.Println("Sleeping for 2 seconds after connection intiation") + msg := fmt.Sprintf("Hello%d", index) + n, err := c.Write([]byte(msg)) + if n != 6 { + t.Errorf("expected to write 0 bytes, wrote %d", n) + } + if err != nil { + t.Error(err) + return + } + time.Sleep(2 * time.Second) }(i) } - wg.Wait() + time.Sleep(2 * time.Second) } From 4279f926071db790f632c5850c54c12bf38f8f88 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 26 Apr 2023 12:47:06 +0530 Subject: [PATCH 03/12] chore: cleaned-up the websocket reuse test --- p2p/transport/websocket/websocket_test.go | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/p2p/transport/websocket/websocket_test.go b/p2p/transport/websocket/websocket_test.go index 9de7e3e530..303e75511f 100644 --- a/p2p/transport/websocket/websocket_test.go +++ b/p2p/transport/websocket/websocket_test.go @@ -552,14 +552,12 @@ func TestResolveMultiaddr(t *testing.T) { func TestListenerResusePort(t *testing.T) { laddr := ma.StringCast("/ip4/127.0.0.1/tcp/5002/ws") - //fmt.Println("Starting Reuse Port test.") var wg sync.WaitGroup var opts []Option opts = append(opts, EnableReuseport()) _, u := newUpgrader(t) tpt, err := New(u, &network.NullResourceManager{}, opts...) require.NoError(t, err) - //fmt.Println("Invoking Go routines.") for i := 0; i < 2; i++ { wg.Add(1) go func(index int) { @@ -570,14 +568,13 @@ func TestListenerResusePort(t *testing.T) { require.NoError(t, err) require.Equal(t, lastComponent(t, l.Multiaddr()), wsComponent) defer l.Close() - //fmt.Println("Routine-", index, " Calling Accept...") - for j := 0; j < 2; j++ { + //Looping 4 times to ensure all 4 connections are handled. + for j := 0; j < 4; j++ { conn, err := l.Accept() if err != nil { fmt.Println("Routine-", index, " Failed accepting connection due to error ", err) } require.NoError(t, err) - //fmt.Println("Routine-", index, " Accepting connection ", conn) defer conn.Close() buf := make([]byte, 6) n, err := conn.Read(buf) @@ -585,16 +582,13 @@ func TestListenerResusePort(t *testing.T) { t.Errorf("read %d bytes, expected 2", n) } require.NoError(t, err) - fmt.Println("Read bytes:", buf) } }(i) } time.Sleep(2 * time.Second) - //fmt.Println("Invoking Connector Go routines.") for i := 0; i < 4; i++ { go func(index int) { - //fmt.Println("Routine-", index, " Initiating connection ") c, err := tpt.maDial(context.Background(), laddr) if err != nil { t.Error(err) @@ -602,7 +596,6 @@ func TestListenerResusePort(t *testing.T) { } require.NoError(t, err) defer c.Close() - //fmt.Println("Sleeping for 2 seconds after connection intiation") msg := fmt.Sprintf("Hello%d", index) n, err := c.Write([]byte(msg)) if n != 6 { From 80b3b34cfc1b4a2c01a00c99cff7d4e624de26e9 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Thu, 27 Apr 2023 05:24:07 +0530 Subject: [PATCH 04/12] chore: addressed review comments for #2261 --- p2p/transport/websocket/websocket.go | 4 +-- p2p/transport/websocket/websocket_test.go | 32 ++++++++++++++--------- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/p2p/transport/websocket/websocket.go b/p2p/transport/websocket/websocket.go index 8f793fbb75..ac34d4787b 100644 --- a/p2p/transport/websocket/websocket.go +++ b/p2p/transport/websocket/websocket.go @@ -203,10 +203,10 @@ func (t *WebsocketTransport) maDial(ctx context.Context, raddr ma.Multiaddr) (ma var conn manet.Conn var err error if t.UseReuseport() { - conn, err = t.reuse.Dial(raddr) + conn, err = t.reuse.DialContext(ctx, raddr) } else { var d manet.Dialer - conn, err = d.Dial(raddr) + conn, err = d.DialContext(ctx, raddr) } if err != nil { close(localAddrChan) diff --git a/p2p/transport/websocket/websocket_test.go b/p2p/transport/websocket/websocket_test.go index 303e75511f..5431f46383 100644 --- a/p2p/transport/websocket/websocket_test.go +++ b/p2p/transport/websocket/websocket_test.go @@ -16,7 +16,6 @@ import ( "net" "net/http" "strings" - "sync" "testing" "time" @@ -552,15 +551,16 @@ func TestResolveMultiaddr(t *testing.T) { func TestListenerResusePort(t *testing.T) { laddr := ma.StringCast("/ip4/127.0.0.1/tcp/5002/ws") - var wg sync.WaitGroup + var opts []Option opts = append(opts, EnableReuseport()) _, u := newUpgrader(t) tpt, err := New(u, &network.NullResourceManager{}, opts...) require.NoError(t, err) + c := make(chan int) + for i := 0; i < 2; i++ { - wg.Add(1) - go func(index int) { + go func(index int, ch chan int) { l, err := tpt.maListen(laddr) if err != nil { fmt.Println("Failed to listen on websocket due to error ", err) @@ -568,7 +568,10 @@ func TestListenerResusePort(t *testing.T) { require.NoError(t, err) require.Equal(t, lastComponent(t, l.Multiaddr()), wsComponent) defer l.Close() - //Looping 4 times to ensure all 4 connections are handled. + c <- index + /* Looping 4 times to ensure all 4 connections are handled. + Noticed that sometimes the distribution of connections was + not clearly load-balanced leading to more than 2 being delivered to 1 listener. */ for j := 0; j < 4; j++ { conn, err := l.Accept() if err != nil { @@ -582,13 +585,15 @@ func TestListenerResusePort(t *testing.T) { t.Errorf("read %d bytes, expected 2", n) } require.NoError(t, err) + c <- j } - }(i) + }(i, c) + } + for i := 0; i < 2; i++ { + <-c } - time.Sleep(2 * time.Second) - for i := 0; i < 4; i++ { - go func(index int) { + go func(index int, ch chan int) { c, err := tpt.maDial(context.Background(), laddr) if err != nil { t.Error(err) @@ -596,7 +601,7 @@ func TestListenerResusePort(t *testing.T) { } require.NoError(t, err) defer c.Close() - msg := fmt.Sprintf("Hello%d", index) + msg := fmt.Sprintf("Hello%d", i) n, err := c.Write([]byte(msg)) if n != 6 { t.Errorf("expected to write 0 bytes, wrote %d", n) @@ -605,8 +610,9 @@ func TestListenerResusePort(t *testing.T) { t.Error(err) return } - time.Sleep(2 * time.Second) - }(i) + }(i, c) + } + for i := 0; i < 4; i++ { + <-c } - time.Sleep(2 * time.Second) } From 804ba36082339bb995b2c18e0b182d700562c242 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Thu, 27 Apr 2023 17:21:26 +0530 Subject: [PATCH 05/12] chore: address linter error in websocket test script --- p2p/transport/websocket/websocket_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/transport/websocket/websocket_test.go b/p2p/transport/websocket/websocket_test.go index 5431f46383..020627055b 100644 --- a/p2p/transport/websocket/websocket_test.go +++ b/p2p/transport/websocket/websocket_test.go @@ -601,7 +601,7 @@ func TestListenerResusePort(t *testing.T) { } require.NoError(t, err) defer c.Close() - msg := fmt.Sprintf("Hello%d", i) + msg := fmt.Sprintf("Hello%d", index) n, err := c.Write([]byte(msg)) if n != 6 { t.Errorf("expected to write 0 bytes, wrote %d", n) From cc83e5adf68e04cd72135addf49b03816b3f20d1 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Date: Sun, 21 May 2023 06:56:11 +0530 Subject: [PATCH 06/12] chore: addressed review comments wrt test. Listen on unspecified port. Verify that client connections are distributed amongst multiple listeners. --- p2p/transport/websocket/websocket_test.go | 114 ++++++++++++++++------ 1 file changed, 83 insertions(+), 31 deletions(-) diff --git a/p2p/transport/websocket/websocket_test.go b/p2p/transport/websocket/websocket_test.go index 34e0dda901..0e37292b7f 100644 --- a/p2p/transport/websocket/websocket_test.go +++ b/p2p/transport/websocket/websocket_test.go @@ -32,6 +32,7 @@ import ( ttransport "github.com/libp2p/go-libp2p/p2p/transport/testsuite" ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" "github.com/stretchr/testify/require" ) @@ -549,70 +550,121 @@ func TestResolveMultiaddr(t *testing.T) { } } -func TestListenerResusePort(t *testing.T) { - laddr := ma.StringCast("/ip4/127.0.0.1/tcp/5002/ws") +func startListeners(tpt *WebsocketTransport) (ma.Multiaddr, []*manet.Listener, error) { + laddr := ma.StringCast("/ip4/127.0.0.1/tcp/0/ws") + listeners := make([]*manet.Listener, 2) + + l, err := tpt.maListen(laddr) + if err != nil { + fmt.Println("Failed to listen on websocket due to error ", err) + return nil, nil, err + } + listeners[0] = &l + + port := l.Addr().(*net.TCPAddr).Port + fmt.Println("Port allocated for listener:", port) + laddr = ma.StringCast(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/ws", port)) + l1, err := tpt.maListen(laddr) + if err != nil { + fmt.Println("Failed to listen on websocket due to error ", err) + return nil, nil, err + } + listeners[1] = &l1 + return laddr, listeners, nil +} + +func TestListenerResusePort(t *testing.T) { + noOfClientConns := 4 var opts []Option opts = append(opts, EnableReuseport()) _, u := newUpgrader(t) tpt, err := New(u, &network.NullResourceManager{}, opts...) require.NoError(t, err) - c := make(chan int) + c := make(chan int, noOfClientConns) + raddr, listeners, err := startListeners(tpt) + //fmt.Println("Started Listeners") for i := 0; i < 2; i++ { - go func(index int, ch chan int) { - l, err := tpt.maListen(laddr) - if err != nil { - fmt.Println("Failed to listen on websocket due to error ", err) - } - require.NoError(t, err) - require.Equal(t, lastComponent(t, l.Multiaddr()), wsComponent) + go func(index int, ln *manet.Listener, ch chan int) { + l := *ln defer l.Close() - c <- index - /* Looping 4 times to ensure all 4 connections are handled. - Noticed that sometimes the distribution of connections was - not clearly load-balanced leading to more than 2 being delivered to 1 listener. */ - for j := 0; j < 4; j++ { + /* Looping noOfClientConns times to ensure all 4 connections are handled. + With SO_REUSEPORT the distribution happens based on threads that are waiting on Accept call as mentioned below. + We cannot gaurantee when Server go-routines would block on Accept. + Sometimes the client routines get scheduled first causing unequal distribution of connections. + Ref: https://lwn.net/Articles/542629/ + By contrast, the SO_REUSEPORT implementation distributes connections evenly across all of the threads (or processes) + that are blocked in accept() on the same port. */ + for j := 0; j < noOfClientConns; j++ { + //j := 0 conn, err := l.Accept() if err != nil { - fmt.Println("Routine-", index, " Failed accepting connection due to error ", err) + fmt.Println("Server Routine-", index, " Failed accepting connection ", j, " due to error ", err) + return } require.NoError(t, err) + //fmt.Println("Server Routine-", index, " accepting connection-", j) defer conn.Close() buf := make([]byte, 6) n, err := conn.Read(buf) if n != 6 { - t.Errorf("read %d bytes, expected 2", n) + t.Errorf("read %d bytes, expected 6", n) } require.NoError(t, err) - c <- j + n, err = conn.Write(buf) + if n != 6 { + t.Errorf("expected to write 6 bytes, wrote %d", n) + } + if err != nil { + t.Error(err) + return + } + ch <- index } - }(i, c) - } - for i := 0; i < 2; i++ { - <-c + + }(i, listeners[i], c) } - for i := 0; i < 4; i++ { - go func(index int, ch chan int) { - c, err := tpt.maDial(context.Background(), laddr) + + fmt.Println("Initiating Connections to addr:", raddr) + + for i := 0; i < noOfClientConns; i++ { + go func(index int) { + conn, err := tpt.maDial(context.Background(), raddr) if err != nil { t.Error(err) return } + //fmt.Println("Initiated Conn from Client routine-", index) require.NoError(t, err) - defer c.Close() + defer conn.Close() msg := fmt.Sprintf("Hello%d", index) - n, err := c.Write([]byte(msg)) + n, err := conn.Write([]byte(msg)) if n != 6 { - t.Errorf("expected to write 0 bytes, wrote %d", n) + t.Errorf("expected to write 6 bytes, wrote %d", n) } if err != nil { t.Error(err) return } - }(i, c) + buf := make([]byte, 6) + n, err = conn.Read(buf) + if n != 6 { + t.Errorf("read %d bytes, expected 6", n) + } + }(i) } - for i := 0; i < 4; i++ { - <-c + var connsHandled [2]int + //Waiting to ensure all 4 connections are handled. + for i := 0; i < noOfClientConns; i++ { + temp := <-c + connsHandled[temp]++ + } + for i := 0; i < 2; i++ { + /*Not checking for equal distribution of connections due to above explanation.*/ + if connsHandled[i] == 0 { + t.Fatalf("No connections handled by listener %d", i) + } + fmt.Printf("Listener %d handled %d connections.", i, connsHandled[i]) } } From 153829a19cb85c921a95e80ce9d465c54936352b Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Date: Tue, 23 May 2023 10:33:47 +0530 Subject: [PATCH 07/12] chore: addressed review comments and cleanedup test script --- p2p/transport/websocket/websocket.go | 2 +- p2p/transport/websocket/websocket_test.go | 68 ++++++----------------- 2 files changed, 19 insertions(+), 51 deletions(-) diff --git a/p2p/transport/websocket/websocket.go b/p2p/transport/websocket/websocket.go index 544b5fabd4..b0f7f5b6cd 100644 --- a/p2p/transport/websocket/websocket.go +++ b/p2p/transport/websocket/websocket.go @@ -240,7 +240,7 @@ func (t *WebsocketTransport) maDial(ctx context.Context, raddr ma.Multiaddr) (ma } func (t *WebsocketTransport) maListen(a ma.Multiaddr) (manet.Listener, error) { - l, err := newListener(a, t.tlsConf, t.reuseport) + l, err := newListener(a, t.tlsConf, t.UseReuseport()) if err != nil { return nil, err } diff --git a/p2p/transport/websocket/websocket_test.go b/p2p/transport/websocket/websocket_test.go index 0e37292b7f..2de414823e 100644 --- a/p2p/transport/websocket/websocket_test.go +++ b/p2p/transport/websocket/websocket_test.go @@ -550,31 +550,26 @@ func TestResolveMultiaddr(t *testing.T) { } } -func startListeners(tpt *WebsocketTransport) (ma.Multiaddr, []*manet.Listener, error) { +func startListeners(t *testing.T, tpt *WebsocketTransport) (ma.Multiaddr, []*manet.Listener, error) { + t.Helper() laddr := ma.StringCast("/ip4/127.0.0.1/tcp/0/ws") listeners := make([]*manet.Listener, 2) l, err := tpt.maListen(laddr) - if err != nil { - fmt.Println("Failed to listen on websocket due to error ", err) - return nil, nil, err - } + require.NoError(t, err) listeners[0] = &l port := l.Addr().(*net.TCPAddr).Port - fmt.Println("Port allocated for listener:", port) + t.Logf("Port allocated for listener: %d", port) laddr = ma.StringCast(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/ws", port)) l1, err := tpt.maListen(laddr) - if err != nil { - fmt.Println("Failed to listen on websocket due to error ", err) - return nil, nil, err - } + require.NoError(t, err) listeners[1] = &l1 return laddr, listeners, nil } -func TestListenerResusePort(t *testing.T) { +func TestListenerReusePort(t *testing.T) { noOfClientConns := 4 var opts []Option opts = append(opts, EnableReuseport()) @@ -583,8 +578,8 @@ func TestListenerResusePort(t *testing.T) { require.NoError(t, err) c := make(chan int, noOfClientConns) - raddr, listeners, err := startListeners(tpt) - //fmt.Println("Started Listeners") + raddr, listeners, err := startListeners(t, tpt) + require.NoErrorf(t, err, "failed to start listeners") for i := 0; i < 2; i++ { go func(index int, ln *manet.Listener, ch chan int) { l := *ln @@ -599,59 +594,34 @@ func TestListenerResusePort(t *testing.T) { for j := 0; j < noOfClientConns; j++ { //j := 0 conn, err := l.Accept() - if err != nil { - fmt.Println("Server Routine-", index, " Failed accepting connection ", j, " due to error ", err) - return - } - require.NoError(t, err) - //fmt.Println("Server Routine-", index, " accepting connection-", j) + require.NoErrorf(t, err, "Server Routine-", index, " Failed accepting connection ", j, " due to error ") defer conn.Close() buf := make([]byte, 6) n, err := conn.Read(buf) - if n != 6 { - t.Errorf("read %d bytes, expected 6", n) - } require.NoError(t, err) + require.Equal(t, 6, n) n, err = conn.Write(buf) - if n != 6 { - t.Errorf("expected to write 6 bytes, wrote %d", n) - } - if err != nil { - t.Error(err) - return - } + require.NoError(t, err) + require.Equal(t, 6, n) ch <- index } }(i, listeners[i], c) } - fmt.Println("Initiating Connections to addr:", raddr) - for i := 0; i < noOfClientConns; i++ { go func(index int) { conn, err := tpt.maDial(context.Background(), raddr) - if err != nil { - t.Error(err) - return - } - //fmt.Println("Initiated Conn from Client routine-", index) require.NoError(t, err) defer conn.Close() msg := fmt.Sprintf("Hello%d", index) n, err := conn.Write([]byte(msg)) - if n != 6 { - t.Errorf("expected to write 6 bytes, wrote %d", n) - } - if err != nil { - t.Error(err) - return - } + require.Equal(t, 6, n) + require.NoError(t, err) buf := make([]byte, 6) n, err = conn.Read(buf) - if n != 6 { - t.Errorf("read %d bytes, expected 6", n) - } + require.NoError(t, err) + require.Equal(t, 6, n) }(i) } var connsHandled [2]int @@ -662,9 +632,7 @@ func TestListenerResusePort(t *testing.T) { } for i := 0; i < 2; i++ { /*Not checking for equal distribution of connections due to above explanation.*/ - if connsHandled[i] == 0 { - t.Fatalf("No connections handled by listener %d", i) - } - fmt.Printf("Listener %d handled %d connections.", i, connsHandled[i]) + require.NotEqualf(t, 0, connsHandled[i], "No connections handled by listener %d", i) + t.Logf("Listener %d handled %d connections.", i, connsHandled[i]) } } From 07aae9b8cd67415fde2f7a46c6890ea8ff630c37 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Fri, 26 May 2023 16:19:23 +0530 Subject: [PATCH 08/12] Update websocket_test.go chore: reuseport test script-Skipping connection load balancing check for architectures other than linux as it is not gauranteed. --- p2p/transport/websocket/websocket_test.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/p2p/transport/websocket/websocket_test.go b/p2p/transport/websocket/websocket_test.go index 2de414823e..2c19f018d1 100644 --- a/p2p/transport/websocket/websocket_test.go +++ b/p2p/transport/websocket/websocket_test.go @@ -15,6 +15,7 @@ import ( "math/big" "net" "net/http" + "runtime" "strings" "testing" "time" @@ -565,7 +566,6 @@ func startListeners(t *testing.T, tpt *WebsocketTransport) (ma.Multiaddr, []*man l1, err := tpt.maListen(laddr) require.NoError(t, err) listeners[1] = &l1 - return laddr, listeners, nil } @@ -577,7 +577,6 @@ func TestListenerReusePort(t *testing.T) { tpt, err := New(u, &network.NullResourceManager{}, opts...) require.NoError(t, err) c := make(chan int, noOfClientConns) - raddr, listeners, err := startListeners(t, tpt) require.NoErrorf(t, err, "failed to start listeners") for i := 0; i < 2; i++ { @@ -630,9 +629,20 @@ func TestListenerReusePort(t *testing.T) { temp := <-c connsHandled[temp]++ } - for i := 0; i < 2; i++ { - /*Not checking for equal distribution of connections due to above explanation.*/ - require.NotEqualf(t, 0, connsHandled[i], "No connections handled by listener %d", i) - t.Logf("Listener %d handled %d connections.", i, connsHandled[i]) + /* + For windows and macOS load balancing is not done by kernel as per references below. + For other architectures, behaviour is not known. + Hence, Check for load balancing only for linux based architectures. + Refer https://learn.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse?redirectedfrom=MSDN for windows + References for MACOS + Ref - (https://stackoverflow.com/questions/41247790/so-reuseport-on-macos-with-libuv) + Ref -(https://github.com/uNetworking/uWebSockets/issues/1194) + */ + if runtime.GOOS == "linux" { + for i := 0; i < 2; i++ { + /*Not checking for equal distribution of connections due to above explanation.*/ + require.NotEqualf(t, 0, connsHandled[i], "No connections handled by listener %d", i) + t.Logf("Listener %d handled %d connections.", i, connsHandled[i]) + } } } From ad59795404bd3d93f8c1524572664d8fbe516aa7 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Date: Tue, 30 May 2023 10:52:06 +0530 Subject: [PATCH 09/12] feat: use reuseport wihle dialing client connections if set --- p2p/transport/websocket/websocket.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/p2p/transport/websocket/websocket.go b/p2p/transport/websocket/websocket.go index b0f7f5b6cd..319137fbb5 100644 --- a/p2p/transport/websocket/websocket.go +++ b/p2p/transport/websocket/websocket.go @@ -13,11 +13,11 @@ import ( "github.com/libp2p/go-libp2p/core/transport" "github.com/libp2p/go-reuseport" + ws "github.com/gorilla/websocket" + reuseTransport "github.com/libp2p/go-libp2p/p2p/net/reuseport" ma "github.com/multiformats/go-multiaddr" mafmt "github.com/multiformats/go-multiaddr-fmt" manet "github.com/multiformats/go-multiaddr/net" - - ws "github.com/gorilla/websocket" ) // WsFmt is multiaddr formatter for WsProtocol @@ -93,9 +93,10 @@ type WebsocketTransport struct { upgrader transport.Upgrader rcmgr network.ResourceManager - tlsClientConf *tls.Config - tlsConf *tls.Config - reuseport bool //reuseport is disabled by default, can be enabled by passing it as an option. + tlsClientConf *tls.Config + tlsConf *tls.Config + reuseport bool //reuseport is disabled by default, can be enabled by passing it as an option. + reuseTransport reuseTransport.Transport } var _ transport.Transport = (*WebsocketTransport)(nil) @@ -199,6 +200,11 @@ func (t *WebsocketTransport) maDial(ctx context.Context, raddr ma.Multiaddr) (ma isWss := wsurl.Scheme == "wss" dialer := ws.Dialer{HandshakeTimeout: 30 * time.Second} + if t.UseReuseport() { + dialer.NetDial = func(network, address string) (net.Conn, error) { + return t.reuseTransport.DialContext(ctx, raddr) + } + } if isWss { sni := "" sni, err = raddr.ValueForProtocol(ma.P_SNI) From 8840f1906441a4374dfd2430fedb4be47a74835d Mon Sep 17 00:00:00 2001 From: Prem Prathi Date: Thu, 17 Aug 2023 08:33:56 +0000 Subject: [PATCH 10/12] chore:comment lb check in reuseport test to avoid flaky results --- p2p/transport/websocket/websocket_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/p2p/transport/websocket/websocket_test.go b/p2p/transport/websocket/websocket_test.go index 2c19f018d1..4687e46712 100644 --- a/p2p/transport/websocket/websocket_test.go +++ b/p2p/transport/websocket/websocket_test.go @@ -15,7 +15,6 @@ import ( "math/big" "net" "net/http" - "runtime" "strings" "testing" "time" @@ -632,17 +631,19 @@ func TestListenerReusePort(t *testing.T) { /* For windows and macOS load balancing is not done by kernel as per references below. For other architectures, behaviour is not known. + For ubuntu load balancing doesn't seem to happen consistently. + In order to not have a flaky test, commenting this additiona check. Hence, Check for load balancing only for linux based architectures. Refer https://learn.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse?redirectedfrom=MSDN for windows References for MACOS Ref - (https://stackoverflow.com/questions/41247790/so-reuseport-on-macos-with-libuv) Ref -(https://github.com/uNetworking/uWebSockets/issues/1194) - */ if runtime.GOOS == "linux" { for i := 0; i < 2; i++ { /*Not checking for equal distribution of connections due to above explanation.*/ - require.NotEqualf(t, 0, connsHandled[i], "No connections handled by listener %d", i) + /*require.NotEqualf(t, 0, connsHandled[i], "No connections handled by listener %d", i) t.Logf("Listener %d handled %d connections.", i, connsHandled[i]) } } + */ } From f6602267da42ec02d3e558c5a74be2d5461b3439 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Thu, 17 Aug 2023 14:19:14 +0530 Subject: [PATCH 11/12] chore: fix gofmt error --- p2p/transport/websocket/websocket_test.go | 26 +++++++++++------------ 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/p2p/transport/websocket/websocket_test.go b/p2p/transport/websocket/websocket_test.go index 4687e46712..debf7fbbc5 100644 --- a/p2p/transport/websocket/websocket_test.go +++ b/p2p/transport/websocket/websocket_test.go @@ -629,19 +629,19 @@ func TestListenerReusePort(t *testing.T) { connsHandled[temp]++ } /* - For windows and macOS load balancing is not done by kernel as per references below. - For other architectures, behaviour is not known. - For ubuntu load balancing doesn't seem to happen consistently. - In order to not have a flaky test, commenting this additiona check. - Hence, Check for load balancing only for linux based architectures. - Refer https://learn.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse?redirectedfrom=MSDN for windows - References for MACOS - Ref - (https://stackoverflow.com/questions/41247790/so-reuseport-on-macos-with-libuv) - Ref -(https://github.com/uNetworking/uWebSockets/issues/1194) - if runtime.GOOS == "linux" { - for i := 0; i < 2; i++ { - /*Not checking for equal distribution of connections due to above explanation.*/ - /*require.NotEqualf(t, 0, connsHandled[i], "No connections handled by listener %d", i) + For windows and macOS load balancing is not done by kernel as per references below. + For other architectures, behaviour is not known. + For ubuntu load balancing doesn't seem to happen consistently. + In order to not have a flaky test, commenting this additiona check. + Hence, Check for load balancing only for linux based architectures. + Refer https://learn.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse?redirectedfrom=MSDN for windows + References for MACOS + Ref - (https://stackoverflow.com/questions/41247790/so-reuseport-on-macos-with-libuv) + Ref -(https://github.com/uNetworking/uWebSockets/issues/1194) + if runtime.GOOS == "linux" { + for i := 0; i < 2; i++ { + /*Not checking for equal distribution of connections due to above explanation.*/ + /*require.NotEqualf(t, 0, connsHandled[i], "No connections handled by listener %d", i) t.Logf("Listener %d handled %d connections.", i, connsHandled[i]) } } From c01a72571ef2fe497e0b78725cd48f1231243219 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 6 Sep 2023 11:39:27 +0530 Subject: [PATCH 12/12] chore: increase number of clients to avoid flakiness of reuseport test --- p2p/transport/websocket/websocket_test.go | 46 ++++++++++++----------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/p2p/transport/websocket/websocket_test.go b/p2p/transport/websocket/websocket_test.go index debf7fbbc5..ef836d7ab6 100644 --- a/p2p/transport/websocket/websocket_test.go +++ b/p2p/transport/websocket/websocket_test.go @@ -15,6 +15,7 @@ import ( "math/big" "net" "net/http" + "runtime" "strings" "testing" "time" @@ -569,7 +570,7 @@ func startListeners(t *testing.T, tpt *WebsocketTransport) (ma.Multiaddr, []*man } func TestListenerReusePort(t *testing.T) { - noOfClientConns := 4 + noOfClientConns := 20 var opts []Option opts = append(opts, EnableReuseport()) _, u := newUpgrader(t) @@ -594,13 +595,13 @@ func TestListenerReusePort(t *testing.T) { conn, err := l.Accept() require.NoErrorf(t, err, "Server Routine-", index, " Failed accepting connection ", j, " due to error ") defer conn.Close() - buf := make([]byte, 6) + buf := make([]byte, 5) n, err := conn.Read(buf) require.NoError(t, err) - require.Equal(t, 6, n) + require.Equal(t, 5, n) n, err = conn.Write(buf) require.NoError(t, err) - require.Equal(t, 6, n) + require.Equal(t, 5, n) ch <- index } @@ -612,14 +613,14 @@ func TestListenerReusePort(t *testing.T) { conn, err := tpt.maDial(context.Background(), raddr) require.NoError(t, err) defer conn.Close() - msg := fmt.Sprintf("Hello%d", index) + msg := "Hello" n, err := conn.Write([]byte(msg)) - require.Equal(t, 6, n) + require.Equal(t, 5, n) require.NoError(t, err) - buf := make([]byte, 6) + buf := make([]byte, 5) n, err = conn.Read(buf) require.NoError(t, err) - require.Equal(t, 6, n) + require.Equal(t, 5, n) }(i) } var connsHandled [2]int @@ -629,21 +630,22 @@ func TestListenerReusePort(t *testing.T) { connsHandled[temp]++ } /* - For windows and macOS load balancing is not done by kernel as per references below. - For other architectures, behaviour is not known. - For ubuntu load balancing doesn't seem to happen consistently. - In order to not have a flaky test, commenting this additiona check. - Hence, Check for load balancing only for linux based architectures. - Refer https://learn.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse?redirectedfrom=MSDN for windows - References for MACOS - Ref - (https://stackoverflow.com/questions/41247790/so-reuseport-on-macos-with-libuv) - Ref -(https://github.com/uNetworking/uWebSockets/issues/1194) - if runtime.GOOS == "linux" { - for i := 0; i < 2; i++ { - /*Not checking for equal distribution of connections due to above explanation.*/ - /*require.NotEqualf(t, 0, connsHandled[i], "No connections handled by listener %d", i) + For windows and macOS load balancing is not done by kernel as per references below. + For other architectures, behaviour is not known. + For ubuntu load balancing doesn't seem to happen consistently. + In order to not have a flaky test, commenting this additiona check. + Hence, Check for load balancing only for linux based architectures. + Refer https://learn.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse?redirectedfrom=MSDN for windows + References for MACOS + Ref - (https://stackoverflow.com/questions/41247790/so-reuseport-on-macos-with-libuv) + Ref -(https://github.com/uNetworking/uWebSockets/issues/1194) + */ + if runtime.GOOS == "linux" { + for i := 0; i < 2; i++ { + /*Not checking for equal distribution of connections due to above explanation.*/ + require.NotEqualf(t, 0, connsHandled[i], "No connections handled by listener %d", i) t.Logf("Listener %d handled %d connections.", i, connsHandled[i]) } } - */ + }