Skip to content

Commit

Permalink
webrtc: fix flaky TestReadWriteDeadlines/WebRTC/SetDeadline/Write
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Sep 22, 2023
1 parent 4ae549a commit 8b09446
Showing 1 changed file with 24 additions and 4 deletions.
28 changes: 24 additions & 4 deletions p2p/transport/webrtc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,23 @@ func newStream(
}
}

s.maybeDeclareStreamDone()
if s.isDone() {
// onDone removes the stream from the connection and requires the connection lock.
// This callback(onBufferedAmountLow) is executing in the sctp readLoop goroutine.
// If the connection is closed concurrently, the closing goroutine will acquire
// the connection lock and wait for sctp readLoop to exit, the sctp readLoop will
// wait for the connection lock before exiting, causing a deadlock.
// Run this in a different goroutine to avoid the deadlock.
go func() {
s.mx.Lock()
defer s.mx.Unlock()
// TODO: we should be closing the underlying datachannel, but this resets the stream
// See https://github.com/libp2p/specs/issues/575 for details.
// _ = s.dataChannel.Close()
// TODO: write for the spawned reader to return
s.onDone()
}()
}

select {
case s.writeAvailable <- struct{}{}:
Expand Down Expand Up @@ -188,9 +204,7 @@ func (s *stream) processIncomingFlag(flag *pb.Message_Flag) {

// this is used to force reset a stream
func (s *stream) maybeDeclareStreamDone() {
if (s.sendState == sendStateReset || s.sendState == sendStateDataSent) &&
(s.receiveState == receiveStateReset || s.receiveState == receiveStateDataRead) &&
len(s.controlMsgQueue) == 0 {
if s.isDone() {
_ = s.SetReadDeadline(time.Now().Add(-1 * time.Hour)) // pion ignores zero times
// TODO: we should be closing the underlying datachannel, but this resets the stream
// See https://github.com/libp2p/specs/issues/575 for details.
Expand All @@ -200,6 +214,12 @@ func (s *stream) maybeDeclareStreamDone() {
}
}

func (s *stream) isDone() bool {
return (s.sendState == sendStateReset || s.sendState == sendStateDataSent) &&
(s.receiveState == receiveStateReset || s.receiveState == receiveStateDataRead) &&
len(s.controlMsgQueue) == 0
}

func (s *stream) setCloseError(e error) {
s.mx.Lock()
defer s.mx.Unlock()
Expand Down

0 comments on commit 8b09446

Please sign in to comment.