diff --git a/p2p/protocol/ping/ping.go b/p2p/protocol/ping/ping.go index 6ff5c3fb6f..309b792787 100644 --- a/p2p/protocol/ping/ping.go +++ b/p2p/protocol/ping/ping.go @@ -54,41 +54,29 @@ func (p *PingService) PingHandler(s network.Stream) { buf := pool.Get(PingSize) defer pool.Put(buf) + defer s.Close() errCh := make(chan error, 1) defer close(errCh) - timer := time.NewTimer(pingTimeout) - defer timer.Stop() - - go func() { - select { - case <-timer.C: - log.Debug("ping timeout") - case err, ok := <-errCh: - if ok { - log.Debug(err) - } else { - log.Error("ping loop failed without error") - } - } - s.Close() - }() - - for { - _, err := io.ReadFull(s, buf) - if err != nil { - errCh <- err - return - } - - _, err = s.Write(buf) - if err != nil { - errCh <- err - return - } + _, err := io.ReadFull(s, buf) + if err != nil { + errCh <- err + return + } + log.Errorln("first read done") + st := time.Now() + _, err = io.ReadFull(s, buf) + if err != nil { + log.Errorln("exiting", err, time.Since(st)) + return + } - timer.Reset(pingTimeout) + _, err = s.Write(buf) + if err != nil { + errCh <- err + return } + time.Sleep(5 * time.Second) } // Result is a result of a ping attempt, either an RTT or an error. diff --git a/p2p/transport/webrtc/stream.go b/p2p/transport/webrtc/stream.go index 0358dce56c..0bb723c7c4 100644 --- a/p2p/transport/webrtc/stream.go +++ b/p2p/transport/webrtc/stream.go @@ -123,23 +123,7 @@ func newStream( } } - if s.isDone() { - // onDone removes the stream from the connection and requires the connection lock. - // This callback(onBufferedAmountLow) is executing in the sctp readLoop goroutine. - // If Connection.Close is called concurrently, the closing goroutine will acquire - // the connection lock and wait for sctp readLoop to exit, the sctp readLoop will - // wait for the connection lock before exiting, causing a deadlock. - // Run this in a different goroutine to avoid the deadlock. - go func() { - s.mx.Lock() - defer s.mx.Unlock() - // TODO: we should be closing the underlying datachannel, but this resets the stream - // See https://github.com/libp2p/specs/issues/575 for details. - // _ = s.dataChannel.Close() - // TODO: write for the spawned reader to return - s.onDone() - }() - } + s.maybeDeclareStreamDone() select { case s.writeAvailable <- struct{}{}: @@ -186,6 +170,7 @@ func (s *stream) processIncomingFlag(flag *pb.Message_Flag) { if s.receiveState == receiveStateReceiving { s.receiveState = receiveStateDataRead } + log.Errorln("received FIN", s.id) case pb.Message_STOP_SENDING: if s.sendState == sendStateSending { s.sendState = sendStateReset @@ -211,6 +196,7 @@ func (s *stream) maybeDeclareStreamDone() { // See https://github.com/libp2p/specs/issues/575 for details. // _ = s.dataChannel.Close() // TODO: write for the spawned reader to return + s.dataChannel.Close() s.onDone() } } diff --git a/test-plans/ping-version.json b/test-plans/ping-version.json index 705934f2b7..d3f120f8db 100644 --- a/test-plans/ping-version.json +++ b/test-plans/ping-version.json @@ -2,11 +2,6 @@ "id": "go-libp2p-head", "containerImageID": "go-libp2p-head", "transports": [ - "tcp", - "ws", - "wss", - "quic-v1", - "webtransport", "webrtc-direct" ], "secureChannels": [