Skip to content

Commit

Permalink
fix data race in connection shutdown and channel close
Browse files Browse the repository at this point in the history
  • Loading branch information
imkira committed Nov 24, 2016
1 parent ab5060f commit 2c4cb9d
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 10 deletions.
11 changes: 7 additions & 4 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,11 @@ func (me *Channel) open() error {
// Performs a request/response call for when the message is not NoWait and is
// specified as Synchronous.
func (me *Channel) call(req message, res ...message) error {
if err := me.send(me, req); err != nil {
me.m.Lock()
send := me.send
me.m.Unlock()

if err := send(me, req); err != nil {
return err
}

Expand Down Expand Up @@ -1476,17 +1480,16 @@ exception could occur if the server does not support this method.
*/
func (me *Channel) Confirm(noWait bool) error {
me.m.Lock()
defer me.m.Unlock()

if err := me.call(
&confirmSelect{Nowait: noWait},
&confirmSelectOk{},
); err != nil {
return err
}

me.m.Lock()
me.confirming = true
me.m.Unlock()

return nil
}
Expand Down
27 changes: 27 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,3 +601,30 @@ func TestPublishAndShutdownDeadlockIssue84(t *testing.T) {
}
}
}

func TestChannelCloseRace(t *testing.T) {
rwc, srv := newSession(t)

done := make(chan bool)

go func() {
srv.connectionOpen()
srv.channelOpen(1)

rwc.Close()
done <- true
}()

c, err := Open(rwc, defaultConfig())
if err != nil {
t.Fatalf("could not create connection: %v (%s)", c, err)
}

ch, err := c.Channel()
if err != nil {
t.Fatalf("could not open channel: %v (%s)", ch, err)
}
<-done
ch.Close()
c.Close()
}
11 changes: 5 additions & 6 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,14 +344,16 @@ func (me *Connection) shutdown(err *Error) {
closes := make([]chan *Error, len(me.closes))
copy(closes, me.closes)
me.m.Unlock()

if err != nil {
for _, c := range closes {
c <- err
}
}

for _, ch := range me.channels {
me.closeChannel(ch, err)
ch.shutdown(err)
me.releaseChannel(ch.id)
}

if err != nil {
Expand All @@ -368,9 +370,7 @@ func (me *Connection) shutdown(err *Error) {
close(c)
}

me.m.Lock()
me.noNotify = true
me.m.Unlock()
})
}

Expand Down Expand Up @@ -557,9 +557,6 @@ func (me *Connection) allocateChannel() (*Channel, error) {
// releaseChannel removes a channel from the registry as the final part of the
// channel lifecycle
func (me *Connection) releaseChannel(id uint16) {
me.m.Lock()
defer me.m.Unlock()

delete(me.channels, id)
me.allocator.release(int(id))
}
Expand All @@ -582,7 +579,9 @@ func (me *Connection) openChannel() (*Channel, error) {
// this connection.
func (me *Connection) closeChannel(ch *Channel, e *Error) {
ch.shutdown(e)
me.m.Lock()
me.releaseChannel(ch.id)
me.m.Unlock()
}

/*
Expand Down

0 comments on commit 2c4cb9d

Please sign in to comment.