From 366883cd5104cab6f5321ffffcc1bded249249d4 Mon Sep 17 00:00:00 2001 From: sukun Date: Thu, 21 Sep 2023 22:07:42 +0530 Subject: [PATCH 1/2] webrtc: fix flaky TestReadWriteDeadlines/WebRTC/SetDeadline/Write --- p2p/transport/webrtc/stream.go | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/p2p/transport/webrtc/stream.go b/p2p/transport/webrtc/stream.go index 7e873f5634..f777f5b8fd 100644 --- a/p2p/transport/webrtc/stream.go +++ b/p2p/transport/webrtc/stream.go @@ -123,7 +123,23 @@ func newStream( } } - s.maybeDeclareStreamDone() + if s.isDone() { + // onDone removes the stream from the connection and requires the connection lock. + // This callback(onBufferedAmountLow) is executing in the sctp readLoop goroutine. + // If the connection is closed concurrently, the closing goroutine will acquire + // the connection lock and wait for sctp readLoop to exit, the sctp readLoop will + // wait for the connection lock before exiting, causing a deadlock. + // Run this in a different goroutine to avoid the deadlock. + go func() { + s.mx.Lock() + defer s.mx.Unlock() + // TODO: we should be closing the underlying datachannel, but this resets the stream + // See https://github.com/libp2p/specs/issues/575 for details. + // _ = s.dataChannel.Close() + // TODO: write for the spawned reader to return + s.onDone() + }() + } select { case s.writeAvailable <- struct{}{}: @@ -188,9 +204,7 @@ func (s *stream) processIncomingFlag(flag *pb.Message_Flag) { // this is used to force reset a stream func (s *stream) maybeDeclareStreamDone() { - if (s.sendState == sendStateReset || s.sendState == sendStateDataSent) && - (s.receiveState == receiveStateReset || s.receiveState == receiveStateDataRead) && - len(s.controlMsgQueue) == 0 { + if s.isDone() { _ = s.SetReadDeadline(time.Now().Add(-1 * time.Hour)) // pion ignores zero times // TODO: we should be closing the underlying datachannel, but this resets the stream // See https://github.com/libp2p/specs/issues/575 for details. @@ -200,6 +214,12 @@ func (s *stream) maybeDeclareStreamDone() { } } +func (s *stream) isDone() bool { + return (s.sendState == sendStateReset || s.sendState == sendStateDataSent) && + (s.receiveState == receiveStateReset || s.receiveState == receiveStateDataRead) && + len(s.controlMsgQueue) == 0 +} + func (s *stream) setCloseError(e error) { s.mx.Lock() defer s.mx.Unlock() From 67d1746eb88b37711b2046d46a2172861d13a7be Mon Sep 17 00:00:00 2001 From: sukun Date: Mon, 25 Sep 2023 00:15:58 +0530 Subject: [PATCH 2/2] address review comments --- p2p/transport/webrtc/stream.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/p2p/transport/webrtc/stream.go b/p2p/transport/webrtc/stream.go index f777f5b8fd..0358dce56c 100644 --- a/p2p/transport/webrtc/stream.go +++ b/p2p/transport/webrtc/stream.go @@ -126,7 +126,7 @@ func newStream( if s.isDone() { // onDone removes the stream from the connection and requires the connection lock. // This callback(onBufferedAmountLow) is executing in the sctp readLoop goroutine. - // If the connection is closed concurrently, the closing goroutine will acquire + // If Connection.Close is called concurrently, the closing goroutine will acquire // the connection lock and wait for sctp readLoop to exit, the sctp readLoop will // wait for the connection lock before exiting, causing a deadlock. // Run this in a different goroutine to avoid the deadlock. @@ -202,7 +202,8 @@ func (s *stream) processIncomingFlag(flag *pb.Message_Flag) { s.maybeDeclareStreamDone() } -// this is used to force reset a stream +// maybeDeclareStreamDone is used to force reset a stream. It should be called with +// the stream lock acquired. It calls stream.onDone which requires the connection lock. func (s *stream) maybeDeclareStreamDone() { if s.isDone() { _ = s.SetReadDeadline(time.Now().Add(-1 * time.Hour)) // pion ignores zero times @@ -214,6 +215,8 @@ func (s *stream) maybeDeclareStreamDone() { } } +// isDone indicates whether the stream is completed and all the control messages have also been +// flushed. It must be called with the stream lock acquired. func (s *stream) isDone() bool { return (s.sendState == sendStateReset || s.sendState == sendStateDataSent) && (s.receiveState == receiveStateReset || s.receiveState == receiveStateDataRead) &&