Skip to content

Commit

Permalink
fix data race in connection shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
imkira committed Mar 28, 2016
1 parent 2e25825 commit bc44d43
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ services:
env:
- AMQP_URL=amqp://guest:guest@127.0.0.1:5672/ GOMAXPROCS=2

script: go test -v -tags integration ./...
script: go test -v -race -tags integration ./...
8 changes: 5 additions & 3 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,12 @@ func (me *Channel) open() error {
// Performs a request/response call for when the message is not NoWait and is
// specified as Synchronous.
func (me *Channel) call(req message, res ...message) error {
me.m.Lock()
if err := me.send(me, req); err != nil {
me.m.Unlock()
return err
}
me.m.Unlock()

if req.wait() {
select {
Expand Down Expand Up @@ -1476,17 +1479,16 @@ exception could occur if the server does not support this method.
*/
func (me *Channel) Confirm(noWait bool) error {
me.m.Lock()
defer me.m.Unlock()

if err := me.call(
&confirmSelect{Nowait: noWait},
&confirmSelectOk{},
); err != nil {
return err
}

me.m.Lock()
me.confirming = true
me.m.Unlock()

return nil
}
Expand Down
27 changes: 27 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,3 +601,30 @@ func TestPublishAndShutdownDeadlockIssue84(t *testing.T) {
}
}
}

func TestChannelCloseRace(t *testing.T) {
rwc, srv := newSession(t)

done := make(chan bool)

go func() {
srv.connectionOpen()
srv.channelOpen(1)

rwc.Close()
done <- true
}()

c, err := Open(rwc, defaultConfig())
if err != nil {
t.Fatalf("could not create connection: %v (%s)", c, err)
}

ch, err := c.Channel()
if err != nil {
t.Fatalf("could not open channel: %v (%s)", ch, err)
}
<-done
ch.Close()
c.Close()
}
13 changes: 7 additions & 6 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,14 +340,18 @@ func (me *Connection) send(f frame) error {

func (me *Connection) shutdown(err *Error) {
me.destructor.Do(func() {
me.m.Lock()
defer me.m.Unlock()

if err != nil {
for _, c := range me.closes {
c <- err
}
}

for _, ch := range me.channels {
me.closeChannel(ch, err)
ch.shutdown(err)
me.releaseChannel(ch.id)
}

if err != nil {
Expand All @@ -364,9 +368,7 @@ func (me *Connection) shutdown(err *Error) {
close(c)
}

me.m.Lock()
me.noNotify = true
me.m.Unlock()
})
}

Expand Down Expand Up @@ -553,9 +555,6 @@ func (me *Connection) allocateChannel() (*Channel, error) {
// releaseChannel removes a channel from the registry as the final part of the
// channel lifecycle
func (me *Connection) releaseChannel(id uint16) {
me.m.Lock()
defer me.m.Unlock()

delete(me.channels, id)
me.allocator.release(int(id))
}
Expand All @@ -578,6 +577,8 @@ func (me *Connection) openChannel() (*Channel, error) {
// this connection.
func (me *Connection) closeChannel(ch *Channel, e *Error) {
ch.shutdown(e)
me.m.Lock()
defer me.m.Unlock()
me.releaseChannel(ch.id)
}

Expand Down

0 comments on commit bc44d43

Please sign in to comment.