-
Notifications
You must be signed in to change notification settings - Fork 620
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
connection shutdown, channel close data race #196
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -141,7 +141,11 @@ func (me *Channel) open() error { | |
// Performs a request/response call for when the message is not NoWait and is | ||
// specified as Synchronous. | ||
func (me *Channel) call(req message, res ...message) error { | ||
if err := me.send(me, req); err != nil { | ||
me.m.Lock() | ||
send := me.send | ||
me.m.Unlock() | ||
|
||
if err := send(me, req); err != nil { | ||
return err | ||
} | ||
|
||
|
@@ -273,22 +277,26 @@ func (me *Channel) dispatch(msg message) { | |
} | ||
|
||
case *basicAck: | ||
me.m.Lock() | ||
if me.confirming { | ||
if m.Multiple { | ||
me.confirms.Multiple(Confirmation{m.DeliveryTag, true}) | ||
} else { | ||
me.confirms.One(Confirmation{m.DeliveryTag, true}) | ||
} | ||
} | ||
me.m.Unlock() | ||
|
||
case *basicNack: | ||
me.m.Lock() | ||
if me.confirming { | ||
if m.Multiple { | ||
me.confirms.Multiple(Confirmation{m.DeliveryTag, false}) | ||
} else { | ||
me.confirms.One(Confirmation{m.DeliveryTag, false}) | ||
} | ||
} | ||
me.m.Unlock() | ||
|
||
case *basicDeliver: | ||
me.consumers.send(m.ConsumerTag, newDelivery(me, m)) | ||
|
@@ -1476,17 +1484,16 @@ exception could occur if the server does not support this method. | |
|
||
*/ | ||
func (me *Channel) Confirm(noWait bool) error { | ||
me.m.Lock() | ||
defer me.m.Unlock() | ||
|
||
if err := me.call( | ||
&confirmSelect{Nowait: noWait}, | ||
&confirmSelectOk{}, | ||
); err != nil { | ||
return err | ||
} | ||
|
||
me.m.Lock() | ||
me.confirming = true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bet advised - the Without any sort of mutex. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for pointing out. I fixed it in c6a5ab1 |
||
me.m.Unlock() | ||
|
||
return nil | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a separate mutex for sending stuff,
me.sendM
. I think we should use it here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original code is very misleading.
Please note that
m.send
starts assendOpen
and then switches tosendClosed
:https://github.com/imkira/amqp/blob/0563adcdb2a14308873fce26b14d3637dc2ac685/channel.go#L85
https://github.com/imkira/amqp/blob/0563adcdb2a14308873fce26b14d3637dc2ac685/channel.go#L104
Please, also note that each of those 2 functions use the mutex you specified for actually sending data.
What I am doing here is to protect a race condition that could happen if we used me.send directly.
That would collide with the following modification:
https://github.com/imkira/amqp/blob/0563adcdb2a14308873fce26b14d3637dc2ac685/channel.go#L104