-
Notifications
You must be signed in to change notification settings - Fork 88
Save sender in request log while disconnected #90
base: scala2.11
Are you sure you want to change the base?
Conversation
log.info(s"got channel $channel") | ||
statusListeners.map(a => a ! Connected) | ||
context.become(connected(channel, forwarder)) | ||
} | ||
case Record(request: Request) => { | ||
requestLog :+= request | ||
requestLog :+= request -> Some(sender()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we get requests while disconnected, save the sender so that we reply to them later
@@ -185,13 +185,13 @@ class ChannelOwner(init: Seq[Request] = Seq.empty[Request], channelParams: Optio | |||
forwarder ! AddShutdownListener(self) | |||
forwarder ! AddReturnListener(self) | |||
onChannel(channel, forwarder) | |||
requestLog.map(r => self forward r) | |||
requestLog.foreach { case (request, sender) => self.tell(request, sender.getOrElse(context.sender())) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we saved a sender (which we do for non-init
requests), forward that sender. Otherwise, do current behavior (forward current sender)
@@ -13,11 +13,11 @@ import com.rabbitmq.client.ConnectionFactory | |||
import com.github.sstone.amqp.Amqp._ | |||
import scala.util.Random | |||
|
|||
class ChannelSpec extends TestKit(ActorSystem("TestSystem")) with WordSpecLike with ShouldMatchers with BeforeAndAfter with ImplicitSender { | |||
trait ChannelSpecNoTestKit extends WordSpecLike with ShouldMatchers with BeforeAndAfter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests didn't pass for me. Specifically, I had issues with ConsumerSpec
. The test suite extends TestKit
, and then uses TestProbe
... but that actually reuses the same actor across all tests in the suite (I think due to mixing in ImplicitSender; that uses the actor from the testkit). The tests mostly worked when run individually, but would run into issues when run all together via sbt test
.
This change lets each test create its own actor. See also https://hseeberger.gitlab.io/2017/09/13/how-to-use-akka-testkit.html
@@ -32,12 +39,16 @@ class ConsumerSpec extends ChannelSpec { | |||
"be able to set their channel's prefetch size" in { | |||
val queue = randomQueue | |||
val probe = TestProbe() | |||
implicit val sender = probe.ref | |||
probe.ignoreMsg { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test was doing Publish
es but wasn't expecting Ok(Publish)
probe.expectMsgAllOf(1 second, ChannelOwner.Disconnected, ChannelOwner.Connected) | ||
val Ok(AddBinding(Binding(`exchange`, `queue`, "my_key")), Some(_)) = probe.receiveOne(1 second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after the channel crashes and reconnects, it plays back the AddBinding
as well
|
||
producer ! Publish("", queue1.name, "test1".getBytes("UTF-8")) | ||
probe.expectNoMsg() | ||
} | ||
"send consumer cancellation notifications" in { | ||
val probe = TestProbe() | ||
implicit val sender = probe.ref | ||
probe.ignoreMsg { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test publishes without expecting Ok
probe.expectMsg(1 second, ConsumerCancelled(consumerTag)) | ||
} | ||
"create exclusive consumers" in { | ||
val probe = TestProbe() | ||
implicit val sender = probe.ref | ||
probe.ignoreMsg { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test also had a publish without expecting Ok(Publish)
val Amqp.Error(_, reason) = receiveOne(1 second) | ||
val Amqp.Error(_, reason) = probe.receiveOne(1 second) | ||
} | ||
"save sender for requests while disconnected" in { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a new test to make sure ask
works if you make a request while the actor is in disconnected state.
previously, it would time out (because the reply didn't go back to the ask
er)
Could you take a look @sstone ? Is this PR reasonable? |
This allows the original sender to receive back the reply, rather than the actor that connected. Otherwise, things like the
ask
pattern won't work while disconnected (because the reply goes somewhere else).