Skip to content

Commit

Permalink
Daily commit
Browse files Browse the repository at this point in the history
  • Loading branch information
pyropy committed Nov 21, 2024
1 parent 079bd3e commit b14b377
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 121 deletions.
182 changes: 89 additions & 93 deletions p2p/test/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,9 @@ import (
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/sec"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/libp2p/go-libp2p/p2p/security/noise"
tls "github.com/libp2p/go-libp2p/p2p/security/tls"
libp2pmemory "github.com/libp2p/go-libp2p/p2p/transport/memory"
libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc"
"go.uber.org/mock/gomock"

ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -68,95 +64,95 @@ func transformOpts(opts TransportTestCaseOpts) []config.Option {
}

var transportsToTest = []TransportTestCase{
{
Name: "TCP / Noise / Yamux",
HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host {
libp2pOpts := transformOpts(opts)
libp2pOpts = append(libp2pOpts, libp2p.Security(noise.ID, noise.New))
libp2pOpts = append(libp2pOpts, libp2p.Muxer(yamux.ID, yamux.DefaultTransport))
if opts.NoListen {
libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs)
} else {
libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
}
h, err := libp2p.New(libp2pOpts...)
require.NoError(t, err)
return h
},
},
{
Name: "TCP / TLS / Yamux",
HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host {
libp2pOpts := transformOpts(opts)
libp2pOpts = append(libp2pOpts, libp2p.Security(tls.ID, tls.New))
libp2pOpts = append(libp2pOpts, libp2p.Muxer(yamux.ID, yamux.DefaultTransport))
if opts.NoListen {
libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs)
} else {
libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
}
h, err := libp2p.New(libp2pOpts...)
require.NoError(t, err)
return h
},
},
{
Name: "WebSocket",
HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host {
libp2pOpts := transformOpts(opts)
if opts.NoListen {
libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs)
} else {
libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0/ws"))
}
h, err := libp2p.New(libp2pOpts...)
require.NoError(t, err)
return h
},
},
{
Name: "QUIC",
HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host {
libp2pOpts := transformOpts(opts)
if opts.NoListen {
libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs)
} else {
libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/udp/0/quic-v1"))
}
h, err := libp2p.New(libp2pOpts...)
require.NoError(t, err)
return h
},
},
{
Name: "WebTransport",
HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host {
libp2pOpts := transformOpts(opts)
if opts.NoListen {
libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs)
} else {
libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/udp/0/quic-v1/webtransport"))
}
h, err := libp2p.New(libp2pOpts...)
require.NoError(t, err)
return h
},
},
{
Name: "WebRTC",
HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host {
libp2pOpts := transformOpts(opts)
libp2pOpts = append(libp2pOpts, libp2p.Transport(libp2pwebrtc.New))
if opts.NoListen {
libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs)
} else {
libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/udp/0/webrtc-direct"))
}
h, err := libp2p.New(libp2pOpts...)
require.NoError(t, err)
return h
},
},
// {
// Name: "TCP / Noise / Yamux",
// HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host {
// libp2pOpts := transformOpts(opts)
// libp2pOpts = append(libp2pOpts, libp2p.Security(noise.ID, noise.New))
// libp2pOpts = append(libp2pOpts, libp2p.Muxer(yamux.ID, yamux.DefaultTransport))
// if opts.NoListen {
// libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs)
// } else {
// libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
// }
// h, err := libp2p.New(libp2pOpts...)
// require.NoError(t, err)
// return h
// },
// },
// {
// Name: "TCP / TLS / Yamux",
// HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host {
// libp2pOpts := transformOpts(opts)
// libp2pOpts = append(libp2pOpts, libp2p.Security(tls.ID, tls.New))
// libp2pOpts = append(libp2pOpts, libp2p.Muxer(yamux.ID, yamux.DefaultTransport))
// if opts.NoListen {
// libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs)
// } else {
// libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
// }
// h, err := libp2p.New(libp2pOpts...)
// require.NoError(t, err)
// return h
// },
// },
// {
// Name: "WebSocket",
// HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host {
// libp2pOpts := transformOpts(opts)
// if opts.NoListen {
// libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs)
// } else {
// libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0/ws"))
// }
// h, err := libp2p.New(libp2pOpts...)
// require.NoError(t, err)
// return h
// },
// },
// {
// Name: "QUIC",
// HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host {
// libp2pOpts := transformOpts(opts)
// if opts.NoListen {
// libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs)
// } else {
// libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/udp/0/quic-v1"))
// }
// h, err := libp2p.New(libp2pOpts...)
// require.NoError(t, err)
// return h
// },
// },
// {
// Name: "WebTransport",
// HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host {
// libp2pOpts := transformOpts(opts)
// if opts.NoListen {
// libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs)
// } else {
// libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/udp/0/quic-v1/webtransport"))
// }
// h, err := libp2p.New(libp2pOpts...)
// require.NoError(t, err)
// return h
// },
// },
// {
// Name: "WebRTC",
// HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host {
// libp2pOpts := transformOpts(opts)
// libp2pOpts = append(libp2pOpts, libp2p.Transport(libp2pwebrtc.New))
// if opts.NoListen {
// libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs)
// } else {
// libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/udp/0/webrtc-direct"))
// }
// h, err := libp2p.New(libp2pOpts...)
// require.NoError(t, err)
// return h
// },
// },
{
Name: "Memory",
HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host {
Expand Down
6 changes: 3 additions & 3 deletions p2p/transport/memory/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func newConnection(

func (c *conn) Close() error {
c.closed.Store(true)
for _, s := range c.streams {
for id, s := range c.streams {
c.removeStream(id)
s.Close()
}

Expand All @@ -83,8 +84,7 @@ func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) {

func (c *conn) AcceptStream() (network.MuxedStream, error) {
in := <-c.streamC
id := streamCounter.Add(1)
c.addStream(id, in)
c.addStream(in.id, in)
return in, nil
}

Expand Down
58 changes: 34 additions & 24 deletions p2p/transport/memory/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ var ErrClosed = errors.New("stream closed")

func newStreamPair() (*stream, *stream) {
ra, rb := make(chan byte, 4096), make(chan byte, 4096)
wa, wb := rb, ra

in := newStream(rb, wb, network.DirInbound)
out := newStream(ra, wa, network.DirOutbound)
in := newStream(rb, ra, network.DirInbound)
out := newStream(ra, rb, network.DirOutbound)
return in, out
}

Expand All @@ -47,56 +46,67 @@ func newStream(r, w chan byte, _ network.Direction) *stream {
return s
}

// How to handle errors with writes?
func (s *stream) Write(p []byte) (n int, err error) {
if s.closed.Load() {
return 0, ErrClosed
}

for i := 0; i < len(p); i++ {
select {
case <-s.reset:
return 0, network.ErrReset
case <-s.closeWrite:
return 0, ErrClosed
default:
}

for n < len(p) {
select {
case <-s.reset:
err = network.ErrReset
return
case <-s.closeWrite:
err = ErrClosed
return
case s.write <- p[i]:
n = i
return n, ErrClosed
case <-s.reset:
return n, network.ErrReset
case s.write <- p[n]:
n++
default:
err = io.ErrClosedPipe
return
}
}

return n + 1, err
return
}

func (s *stream) Read(p []byte) (n int, err error) {
if s.closed.Load() {
return 0, ErrClosed
}

for n = 0; n < len(p); n++ {
select {
case <-s.reset:
return 0, network.ErrReset
case <-s.closeRead:
return 0, ErrClosed
default:
}

for n < len(p) {
select {
case <-s.reset:
err = network.ErrReset
return
case <-s.closeRead:
err = ErrClosed
return
return n, ErrClosed
case <-s.reset:
return n, network.ErrReset
case b, ok := <-s.read:
if !ok {
err = io.EOF
return
return n, ErrClosed
}
p[n] = b
n++
default:
err = io.EOF
return
return n, io.EOF
}
}

return
return n, nil
}

func (s *stream) CloseWrite() error {
Expand Down
24 changes: 23 additions & 1 deletion p2p/transport/memory/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
)

func TestStreamSimpleReadWriteClose(t *testing.T) {
// t.Parallel()
clientStr, serverStr := newStreamPair()

// send a foobar from the client
Expand All @@ -24,6 +25,7 @@ func TestStreamSimpleReadWriteClose(t *testing.T) {
b, err := io.ReadAll(serverStr)
require.NoError(t, err)
require.Equal(t, []byte("foobar"), b)

// reading again should give another io.EOF
n, err = serverStr.Read(make([]byte, 10))
require.Zero(t, n)
Expand All @@ -35,7 +37,6 @@ func TestStreamSimpleReadWriteClose(t *testing.T) {
require.NoError(t, serverStr.CloseWrite())

// and read it at the client
//require.False(t, clientDone.Load())
b, err = io.ReadAll(clientStr)
require.NoError(t, err)
require.Equal(t, []byte("lorem ipsum"), b)
Expand All @@ -46,3 +47,24 @@ func TestStreamSimpleReadWriteClose(t *testing.T) {
// Need to call Close for cleanup. Otherwise the FIN_ACK is never read
require.NoError(t, serverStr.Close())
}

func TestStreamPartialReads(t *testing.T) {
// t.Parallel()
clientStr, serverStr := newStreamPair()

_, err := serverStr.Write([]byte("foobar"))
require.NoError(t, err)
require.NoError(t, serverStr.CloseWrite())

n, err := clientStr.Read([]byte{}) // empty read
require.NoError(t, err)
require.Zero(t, n)
b := make([]byte, 3)
n, err = clientStr.Read(b)
require.Equal(t, 3, n)
require.NoError(t, err)
require.Equal(t, []byte("foo"), b)
b, err = io.ReadAll(clientStr)
require.NoError(t, err)
require.Equal(t, []byte("bar"), b)
}

0 comments on commit b14b377

Please sign in to comment.