Skip to content

Commit

Permalink
fix data race in connection shutdown and channel close
Browse files Browse the repository at this point in the history
  • Loading branch information
imkira committed Nov 29, 2016
1 parent 9831e04 commit c6a5ab1
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 11 deletions.
15 changes: 11 additions & 4 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,11 @@ func (me *Channel) open() error {
// Performs a request/response call for when the message is not NoWait and is
// specified as Synchronous.
func (me *Channel) call(req message, res ...message) error {
if err := me.send(me, req); err != nil {
me.m.Lock()
send := me.send
me.m.Unlock()

if err := send(me, req); err != nil {
return err
}

Expand Down Expand Up @@ -273,22 +277,26 @@ func (me *Channel) dispatch(msg message) {
}

case *basicAck:
me.m.Lock()
if me.confirming {
if m.Multiple {
me.confirms.Multiple(Confirmation{m.DeliveryTag, true})
} else {
me.confirms.One(Confirmation{m.DeliveryTag, true})
}
}
me.m.Unlock()

case *basicNack:
me.m.Lock()
if me.confirming {
if m.Multiple {
me.confirms.Multiple(Confirmation{m.DeliveryTag, false})
} else {
me.confirms.One(Confirmation{m.DeliveryTag, false})
}
}
me.m.Unlock()

case *basicDeliver:
me.consumers.send(m.ConsumerTag, newDelivery(me, m))
Expand Down Expand Up @@ -1476,17 +1484,16 @@ exception could occur if the server does not support this method.
*/
func (me *Channel) Confirm(noWait bool) error {
me.m.Lock()
defer me.m.Unlock()

if err := me.call(
&confirmSelect{Nowait: noWait},
&confirmSelectOk{},
); err != nil {
return err
}

me.m.Lock()
me.confirming = true
me.m.Unlock()

return nil
}
Expand Down
27 changes: 27 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,3 +601,30 @@ func TestPublishAndShutdownDeadlockIssue84(t *testing.T) {
}
}
}

func TestChannelCloseRace(t *testing.T) {
rwc, srv := newSession(t)

done := make(chan bool)

go func() {
srv.connectionOpen()
srv.channelOpen(1)

rwc.Close()
done <- true
}()

c, err := Open(rwc, defaultConfig())
if err != nil {
t.Fatalf("could not create connection: %v (%s)", c, err)
}

ch, err := c.Channel()
if err != nil {
t.Fatalf("could not open channel: %v (%s)", ch, err)
}
<-done
ch.Close()
c.Close()
}
11 changes: 5 additions & 6 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,14 +357,16 @@ func (me *Connection) shutdown(err *Error) {
closes := make([]chan *Error, len(me.closes))
copy(closes, me.closes)
me.m.Unlock()

if err != nil {
for _, c := range closes {
c <- err
}
}

for _, ch := range me.channels {
me.closeChannel(ch, err)
ch.shutdown(err)
me.releaseChannel(ch.id)
}

if err != nil {
Expand All @@ -381,9 +383,7 @@ func (me *Connection) shutdown(err *Error) {
close(c)
}

me.m.Lock()
me.noNotify = true
me.m.Unlock()
})
}

Expand Down Expand Up @@ -570,9 +570,6 @@ func (me *Connection) allocateChannel() (*Channel, error) {
// releaseChannel removes a channel from the registry as the final part of the
// channel lifecycle
func (me *Connection) releaseChannel(id uint16) {
me.m.Lock()
defer me.m.Unlock()

delete(me.channels, id)
me.allocator.release(int(id))
}
Expand All @@ -595,7 +592,9 @@ func (me *Connection) openChannel() (*Channel, error) {
// this connection.
func (me *Connection) closeChannel(ch *Channel, e *Error) {
ch.shutdown(e)
me.m.Lock()
me.releaseChannel(ch.id)
me.m.Unlock()
}

/*
Expand Down
2 changes: 1 addition & 1 deletion shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (me *logIO) Close() (err error) {
if err != nil {
me.t.Logf("%s close : %v\n", me.prefix, err)
} else {
me.t.Logf("%s close\n", me.prefix, err)
me.t.Logf("%s close\n", me.prefix)
}
return
}

0 comments on commit c6a5ab1

Please sign in to comment.