From fa5c83f36fcd15fad44b0842ed7eed11629a6250 Mon Sep 17 00:00:00 2001 From: Nicolas Vollmar Date: Thu, 7 Mar 2024 10:48:29 +0100 Subject: [PATCH] Add handling for tcp register timeout leaving connection dead --- .../io/dns/internal/TcpDnsClientSpec.scala | 23 +++++++++++++++++-- .../pekko/io/dns/internal/TcpDnsClient.scala | 7 ++++-- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/TcpDnsClientSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/TcpDnsClientSpec.scala index 9164c14121a..e756d2b24aa 100644 --- a/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/TcpDnsClientSpec.scala +++ b/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/TcpDnsClientSpec.scala @@ -18,12 +18,12 @@ import java.net.InetSocketAddress import scala.collection.immutable.Seq import org.apache.pekko -import pekko.actor.Props +import pekko.actor.{ ActorKilledException, Kill, Props } import pekko.io.Tcp import pekko.io.Tcp.{ Connected, PeerClosed, Register } import pekko.io.dns.{ RecordClass, RecordType } import pekko.io.dns.internal.DnsClient.Answer -import pekko.testkit.{ ImplicitSender, PekkoSpec, TestProbe } +import pekko.testkit.{ EventFilter, ImplicitSender, PekkoSpec, TestProbe } class TcpDnsClientSpec extends PekkoSpec with ImplicitSender { import TcpDnsClient._ @@ -107,5 +107,24 @@ class TcpDnsClientSpec extends PekkoSpec with ImplicitSender { answerProbe.expectMsg(Answer(42, Nil)) answerProbe.expectMsg(Answer(43, Nil)) } + + "fail when the connection just terminates" in { + val tcpExtensionProbe = TestProbe() + val answerProbe = TestProbe() + val connectionProbe = TestProbe() + + val client = system.actorOf(Props(new TcpDnsClient(tcpExtensionProbe.ref, dnsServerAddress, answerProbe.ref))) + + client ! exampleRequestMessage + + tcpExtensionProbe.expectMsg(Tcp.Connect(dnsServerAddress)) + connectionProbe.send(tcpExtensionProbe.lastSender, Connected(dnsServerAddress, localAddress)) + connectionProbe.expectMsgType[Register] + + EventFilter[ActorKilledException](occurrences = 1).intercept { + // simulate connection stopping due to register timeout => client must fail + connectionProbe.ref ! Kill + } + } } } diff --git a/actor/src/main/scala/org/apache/pekko/io/dns/internal/TcpDnsClient.scala b/actor/src/main/scala/org/apache/pekko/io/dns/internal/TcpDnsClient.scala index b8022cca102..07fa1ad9482 100644 --- a/actor/src/main/scala/org/apache/pekko/io/dns/internal/TcpDnsClient.scala +++ b/actor/src/main/scala/org/apache/pekko/io/dns/internal/TcpDnsClient.scala @@ -14,10 +14,9 @@ package org.apache.pekko.io.dns.internal import java.net.InetSocketAddress - import org.apache.pekko import pekko.PekkoException -import pekko.actor.{ Actor, ActorLogging, ActorRef, Stash } +import pekko.actor.{ Actor, ActorLogging, ActorRef, Stash, Terminated } import pekko.annotation.InternalApi import pekko.io.Tcp import pekko.io.dns.internal.DnsClient.Answer @@ -49,6 +48,7 @@ import pekko.util.ByteString log.debug("Connected to TCP address [{}]", ns) val connection = sender() context.become(ready(connection)) + context.watch(connection) connection ! Tcp.Register(self) unstashAll() case _: Message => @@ -80,7 +80,10 @@ import pekko.util.ByteString } } case Tcp.PeerClosed => + context.unwatch(connection) context.become(idle) + case Terminated(`connection`) => + throwFailure("TCP connection terminated without closing (register timeout?)", None) } private def parseResponse(data: ByteString) = {