diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/DiscoveryPeer.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/DiscoveryPeer.java index 9b5981b4978..1888e0df274 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/DiscoveryPeer.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/DiscoveryPeer.java @@ -17,12 +17,12 @@ import org.hyperledger.besu.ethereum.forkid.ForkId; import org.hyperledger.besu.ethereum.p2p.peers.DefaultPeer; import org.hyperledger.besu.ethereum.p2p.peers.Peer; -import org.hyperledger.besu.ethereum.p2p.peers.PeerId; import org.hyperledger.besu.ethereum.rlp.RLPInput; import org.hyperledger.besu.ethereum.rlp.RLPOutput; import org.hyperledger.besu.plugin.data.EnodeURL; import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; import org.apache.tuweni.bytes.Bytes; import org.ethereum.beacon.discovery.schema.NodeRecord; @@ -37,9 +37,7 @@ public class DiscoveryPeer extends DefaultPeer { private final Endpoint endpoint; // Timestamps. - private long firstDiscovered = 0; - private long lastContacted = 0; - private long lastSeen = 0; + private final AtomicLong firstDiscovered = new AtomicLong(0L); private long lastAttemptedConnection = 0; private NodeRecord nodeRecord; @@ -96,20 +94,11 @@ public void setStatus(final PeerDiscoveryStatus status) { } public long getFirstDiscovered() { - return firstDiscovered; + return firstDiscovered.get(); } - public PeerId setFirstDiscovered(final long firstDiscovered) { - this.firstDiscovered = firstDiscovered; - return this; - } - - public long getLastContacted() { - return lastContacted; - } - - public void setLastContacted(final long lastContacted) { - this.lastContacted = lastContacted; + public void setFirstDiscovered(final long firstDiscovered) { + this.firstDiscovered.compareAndExchange(0L, firstDiscovered); } public long getLastAttemptedConnection() { @@ -120,14 +109,6 @@ public void setLastAttemptedConnection(final long lastAttemptedConnection) { this.lastAttemptedConnection = lastAttemptedConnection; } - public long getLastSeen() { - return lastSeen; - } - - public void setLastSeen(final long lastSeen) { - this.lastSeen = lastSeen; - } - public Endpoint getEndpoint() { return endpoint; } @@ -163,8 +144,6 @@ public String toString() { sb.append("status=").append(status); sb.append(", enode=").append(this.getEnodeURL()); sb.append(", firstDiscovered=").append(firstDiscovered); - sb.append(", lastContacted=").append(lastContacted); - sb.append(", lastSeen=").append(lastSeen); sb.append('}'); return sb.toString(); } diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgent.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgent.java index 7efc50750fb..703b702e8e7 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgent.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgent.java @@ -365,9 +365,7 @@ protected void handleOutgoingPacket(final DiscoveryPeer peer, final Packet packe (res, err) -> { if (err != null) { handleOutgoingPacketError(err, peer, packet); - return; } - peer.setLastContacted(System.currentTimeMillis()); }); } diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryStatus.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryStatus.java index 5311a2212a5..28ccac70360 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryStatus.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryStatus.java @@ -35,10 +35,7 @@ public enum PeerDiscoveryStatus { * We have successfully bonded with this {@link DiscoveryPeer}, and we are able to exchange * messages with them. */ - BONDED, - - /** We have requested the ENR record from this {@link DiscoveryPeer} */ - ENR_REQUESTED; + BONDED; @Override public String toString() { diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryController.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryController.java index 11b5216a82b..2261f813546 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryController.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryController.java @@ -27,6 +27,7 @@ import org.hyperledger.besu.ethereum.p2p.peers.PeerId; import org.hyperledger.besu.ethereum.p2p.permissions.PeerPermissions; import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent; +import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.metrics.Counter; @@ -43,6 +44,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Predicate; @@ -321,7 +323,6 @@ public void onMessage(final Packet packet, final DiscoveryPeer sender) { switch (packet.getType()) { case PING: if (peerPermissions.allowInboundBonding(peer)) { - peer.setLastSeen(System.currentTimeMillis()); final PingPacketData ping = packet.getPacketData(PingPacketData.class).get(); if (!PeerDiscoveryStatus.BONDED.equals(peer.getStatus()) && (bondingPeers.getIfPresent(sender.getId()) == null)) { @@ -338,7 +339,7 @@ public void onMessage(final Packet packet, final DiscoveryPeer sender) { requestENR(peer); } bondingPeers.invalidate(peerId); - addToPeerTable(peer); + checkBeforeAddingToPeerTable(peer); recursivePeerRefreshState.onBondingComplete(peer); Optional.ofNullable(cachedEnrRequests.getIfPresent(peerId)) .ifPresent(cachedEnrRequest -> processEnrRequest(peer, cachedEnrRequest)); @@ -405,38 +406,45 @@ private List getPeersFromNeighborsPacket(final Packet packet) { .collect(Collectors.toList()); } - private boolean addToPeerTable(final DiscoveryPeer peer) { - final PeerTable.AddResult result = peerTable.tryAdd(peer); - if (result.getOutcome() != PeerTable.AddResult.AddOutcome.INVALID) { - - // Reset the last seen timestamp. - final long now = System.currentTimeMillis(); - if (peer.getFirstDiscovered() == 0) { - peer.setFirstDiscovered(now); - } - peer.setLastSeen(now); + private void checkBeforeAddingToPeerTable(final DiscoveryPeer peer) { + if (peerTable.isIpAddressInvalid(peer.getEndpoint())) { + return; + } - if (peer.getStatus() != PeerDiscoveryStatus.BONDED) { - peer.setStatus(PeerDiscoveryStatus.BONDED); - connectOnRlpxLayer(peer); - } + if (peer.getFirstDiscovered() == 0L) { + connectOnRlpxLayer(peer) + .whenComplete( + (pc, th) -> { + if (th == null || !(th.getCause() instanceof TimeoutException)) { + peer.setStatus(PeerDiscoveryStatus.BONDED); + peer.setFirstDiscovered(System.currentTimeMillis()); + addToPeerTable(peer); + } else { + LOG.debug("Handshake timed out with peer {}", peer.getLoggableId(), th); + peerTable.invalidateIP(peer.getEndpoint()); + } + }); + } else { + peer.setStatus(PeerDiscoveryStatus.BONDED); + addToPeerTable(peer); + } + } - if (result.getOutcome() == PeerTable.AddResult.AddOutcome.ALREADY_EXISTED) { - // Bump peer. - peerTable.tryEvict(peer); - peerTable.tryAdd(peer); - } else if (result.getOutcome() == PeerTable.AddResult.AddOutcome.BUCKET_FULL) { - peerTable.tryEvict(result.getEvictionCandidate()); - peerTable.tryAdd(peer); - } + public void addToPeerTable(final DiscoveryPeer peer) { + final PeerTable.AddResult result = peerTable.tryAdd(peer); - return true; + if (result.getOutcome() == PeerTable.AddResult.AddOutcome.ALREADY_EXISTED) { + // Bump peer. + peerTable.tryEvict(peer); + peerTable.tryAdd(peer); + } else if (result.getOutcome() == PeerTable.AddResult.AddOutcome.BUCKET_FULL) { + peerTable.tryEvict(result.getEvictionCandidate()); + peerTable.tryAdd(peer); } - return false; } - void connectOnRlpxLayer(final DiscoveryPeer peer) { - rlpxAgent.connect(peer); + CompletableFuture connectOnRlpxLayer(final DiscoveryPeer peer) { + return rlpxAgent.connect(peer); } private Optional matchInteraction(final Packet packet) { @@ -512,7 +520,6 @@ void bond(final DiscoveryPeer peer) { return; } - peer.setFirstDiscovered(System.currentTimeMillis()); peer.setStatus(PeerDiscoveryStatus.BONDING); bondingPeers.put(peer.getId(), peer); @@ -719,7 +726,7 @@ public void handleBondingRequest(final DiscoveryPeer peer) { // Load the peer first from the table, then from bonding cache or use the instance that comes in. private DiscoveryPeer resolvePeer(final DiscoveryPeer peer) { - if (peerTable.ipAddressIsInvalid(peer.getEndpoint())) { + if (peerTable.isIpAddressInvalid(peer.getEndpoint())) { return null; } final Optional maybeKnownPeer = diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTable.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTable.java index 50250e1c2a0..9f58ab1af97 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTable.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTable.java @@ -112,7 +112,7 @@ public Optional get(final PeerId peer) { * @see AddOutcome */ public AddResult tryAdd(final DiscoveryPeer peer) { - if (ipAddressIsInvalid(peer.getEndpoint())) { + if (isIpAddressInvalid(peer.getEndpoint())) { return AddResult.invalid(); } final Bytes id = peer.getId(); @@ -212,7 +212,7 @@ public Stream streamAllPeers() { return Arrays.stream(table).flatMap(e -> e.getPeers().stream()); } - boolean ipAddressIsInvalid(final Endpoint endpoint) { + public boolean isIpAddressInvalid(final Endpoint endpoint) { final String key = getKey(endpoint); if (invalidIPs.contains(key)) { return true; @@ -223,7 +223,7 @@ boolean ipAddressIsInvalid(final Endpoint endpoint) { for (final Bucket bucket : table) { bucket.getPeers().stream() .filter(p -> p.getEndpoint().getHost().equals(endpoint.getHost())) - .forEach(p -> evictAndStore(p, bucket, key)); + .forEach(bucket::evict); } return true; } else { @@ -231,13 +231,13 @@ boolean ipAddressIsInvalid(final Endpoint endpoint) { } } - private void evictAndStore(final DiscoveryPeer peer, final Bucket bucket, final String key) { - bucket.evict(peer); + public void invalidateIP(final Endpoint endpoint) { + final String key = getKey(endpoint); invalidIPs.add(key); } private static String getKey(final Endpoint endpoint) { - return endpoint.getHost() + endpoint.getFunctionalTcpPort(); + return endpoint.getHost() + ":" + endpoint.getFunctionalTcpPort(); } /** diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/RecursivePeerRefreshState.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/RecursivePeerRefreshState.java index 3329dad4ce8..c0442e891f7 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/RecursivePeerRefreshState.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/RecursivePeerRefreshState.java @@ -202,7 +202,7 @@ private boolean satisfiesMapAdditionCriteria(final DiscoveryPeer discoPeer) { return !oneTrueMap.containsKey(discoPeer.getId()) && (initialPeers.contains(discoPeer) || !peerTable.get(discoPeer).isPresent()) && !discoPeer.getId().equals(localPeer.getId()) - && !peerTable.ipAddressIsInvalid(discoPeer.getEndpoint()); + && !peerTable.isIpAddressInvalid(discoPeer.getEndpoint()); } void onNeighboursReceived(final DiscoveryPeer peer, final List peers) { diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/NettyConnectionInitializer.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/NettyConnectionInitializer.java index f386c59a384..045b6a9b6b2 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/NettyConnectionInitializer.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/NettyConnectionInitializer.java @@ -16,7 +16,6 @@ import org.hyperledger.besu.cryptoservices.NodeKey; import org.hyperledger.besu.ethereum.p2p.config.RlpxConfiguration; -import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer; import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable; import org.hyperledger.besu.ethereum.p2p.peers.LocalNode; import org.hyperledger.besu.ethereum.p2p.peers.Peer; @@ -174,10 +173,6 @@ public void subscribeIncomingConnect(final ConnectCallback callback) { public CompletableFuture connect(final Peer peer) { final CompletableFuture connectionFuture = new CompletableFuture<>(); - if (peer instanceof DiscoveryPeer) { - ((DiscoveryPeer) peer).setLastAttemptedConnection(System.currentTimeMillis()); - } - final EnodeURL enode = peer.getEnodeURL(); new Bootstrap() .group(workers) diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgentTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgentTest.java index 269eb92b084..57eb6f518b9 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgentTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgentTest.java @@ -182,7 +182,7 @@ public void neighborsPacketFromUnbondedPeerIsDropped() { } @Test - public void neighborsPacketLimited() { + public void neighborsPacketLimited() throws InterruptedException { // Start 20 agents with no bootstrap peers. final List otherAgents = helper.startDiscoveryAgents(20, Collections.emptyList()); @@ -192,8 +192,9 @@ public void neighborsPacketLimited() { .map(Optional::get) .collect(Collectors.toList()); - // Start another peer pointing to those 20 agents. + // Start another peer final MockPeerDiscoveryAgent agent = helper.startDiscoveryAgent(otherPeers); + // We used to do a hasSize match but we had issues with duplicate peers getting added to the // list. By moving to a contains we make sure that all the peers are loaded with tolerance for // duplicates. If we fix the duplication problem we should use containsExactlyInAnyOrder to @@ -222,7 +223,7 @@ public void neighborsPacketLimited() { final List incomingPackets = testAgent.getIncomingPackets().stream() .filter(p -> p.packet.getType().equals(PacketType.NEIGHBORS)) - .collect(toList()); + .toList(); assertThat(incomingPackets.size()).isEqualTo(1); final IncomingPacket neighborsPacket = incomingPackets.get(0); assertThat(neighborsPacket.fromAgent).isEqualTo(agent); diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryBondingTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryBondingTest.java index 898a415d8a3..64f159c58bc 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryBondingTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryBondingTest.java @@ -47,11 +47,15 @@ public void pongSentUponPing() { final List otherAgentIncomingPongs = otherAgent.getIncomingPackets().stream() .filter(p -> p.packet.getType().equals(PacketType.PONG)) - .collect(Collectors.toList()); + .toList(); assertThat(otherAgentIncomingPongs.size()).isEqualTo(1); assertThat( - otherAgentIncomingPongs.get(0).packet.getPacketData(PongPacketData.class).isPresent()) + otherAgentIncomingPongs + .getFirst() + .packet + .getPacketData(PongPacketData.class) + .isPresent()) .isTrue(); final PongPacketData pong = otherAgentIncomingPongs.get(0).packet.getPacketData(PongPacketData.class).get(); diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryTestHelper.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryTestHelper.java index c29855374ea..dbfb34de28c 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryTestHelper.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryTestHelper.java @@ -16,6 +16,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static java.util.Arrays.asList; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -44,6 +45,7 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalInt; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -301,15 +303,12 @@ public MockPeerDiscoveryAgent build() { final ForkId forkId = new ForkId(Bytes.EMPTY, Bytes.EMPTY); when(mockForkIdManager.getForkIdForChainHead()).thenReturn(forkId); when(mockForkIdManager.peerCheck(forkId)).thenReturn(true); + final RlpxAgent rlpxAgent = mock(RlpxAgent.class); + when(rlpxAgent.connect(any())) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException())); final MockPeerDiscoveryAgent mockPeerDiscoveryAgent = new MockPeerDiscoveryAgent( - nodeKey, - config, - peerPermissions, - agents, - natService, - mockForkIdManager, - mock(RlpxAgent.class)); + nodeKey, config, peerPermissions, agents, natService, mockForkIdManager, rlpxAgent); mockPeerDiscoveryAgent.getAdvertisedPeer().ifPresent(peer -> peer.setNodeRecord(nodeRecord)); return mockPeerDiscoveryAgent; diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryTimestampsTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryTimestampsTest.java index d75847050d4..7a5fea14d67 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryTimestampsTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryTimestampsTest.java @@ -40,16 +40,13 @@ public void lastSeenAndFirstDiscoveredTimestampsUpdatedOnMessage() { final Packet pong = helper.createPongPacket(agent, Hash.hash(agentPing.getHash())); helper.sendMessageBetweenAgents(testAgent, agent, pong); - long lastSeen; long firstDiscovered; assertThat(agent.streamDiscoveredPeers()).hasSize(1); DiscoveryPeer p = agent.streamDiscoveredPeers().iterator().next(); - assertThat(p.getLastSeen()).isGreaterThan(0); assertThat(p.getFirstDiscovered()).isGreaterThan(0); - lastSeen = p.getLastSeen(); firstDiscovered = p.getFirstDiscovered(); helper.sendMessageBetweenAgents(testAgent, agent, testAgentPing); @@ -57,52 +54,6 @@ public void lastSeenAndFirstDiscoveredTimestampsUpdatedOnMessage() { assertThat(agent.streamDiscoveredPeers()).hasSize(1); p = agent.streamDiscoveredPeers().iterator().next(); - assertThat(p.getLastSeen()).isGreaterThan(lastSeen); assertThat(p.getFirstDiscovered()).isEqualTo(firstDiscovered); } - - @Test - public void lastContactedTimestampUpdatedOnOutboundMessage() { - final MockPeerDiscoveryAgent agent = helper.startDiscoveryAgent(Collections.emptyList()); - assertThat(agent.streamDiscoveredPeers()).hasSize(0); - - // Start a test peer and send a PING packet to the agent under test. - final MockPeerDiscoveryAgent testAgent = helper.startDiscoveryAgent(); - final Packet ping = helper.createPingPacket(testAgent, agent); - helper.sendMessageBetweenAgents(testAgent, agent, ping); - - assertThat(agent.streamDiscoveredPeers()).hasSize(1); - - final long lastContacted; - final long lastSeen; - final long firstDiscovered; - - DiscoveryPeer peer = agent.streamDiscoveredPeers().iterator().next(); - final long lc = peer.getLastContacted(); - final long ls = peer.getLastSeen(); - final long fd = peer.getFirstDiscovered(); - - assertThat(lc).isGreaterThan(0); - assertThat(ls).isGreaterThan(0); - assertThat(fd).isGreaterThan(0); - - lastContacted = lc; - lastSeen = ls; - firstDiscovered = fd; - - // Send another packet and ensure that timestamps are updated accordingly. - // Sleep beforehand to make sure timestamps will be different. - try { - Thread.sleep(1); - } catch (InterruptedException e) { - // Swallow exception because we only want to pause the test. - } - helper.sendMessageBetweenAgents(testAgent, agent, ping); - - peer = agent.streamDiscoveredPeers().iterator().next(); - - assertThat(peer.getLastContacted()).isGreaterThan(lastContacted); - assertThat(peer.getLastSeen()).isGreaterThan(lastSeen); - assertThat(peer.getFirstDiscovered()).isEqualTo(firstDiscovered); - } } diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java index 1064da78e58..4005673e3d3 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java @@ -55,7 +55,9 @@ import java.util.HashMap; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -468,14 +470,13 @@ public void findNeighborsSentAfterBondingFinished() { .build(); // Mock the creation of the PING packet, so that we can control the hash, which gets validated - // when - // processing the PONG. + // when processing the PONG. final PingPacketData mockPing = PingPacketData.create( Optional.ofNullable(localPeer.getEndpoint()), peers.get(0).getEndpoint(), UInt64.ONE); final Packet mockPacket = Packet.create(PacketType.PING, mockPing, nodeKeys.get(0)); mockPingPacketCreation(mockPacket); - controller.setRetryDelayFunction((prev) -> 999999999L); + controller.setRetryDelayFunction(PeerDiscoveryControllerTest::longDelayFunction); controller.start(); // Verify that the PING was sent. @@ -506,11 +507,68 @@ public void findNeighborsSentAfterBondingFinished() { .isEqualTo(PeerDiscoveryStatus.BONDED); } + @Test + public void addedToInvalidIpsWhenConnectTimedOut() { + // Create a peer + final List nodeKeys = PeerDiscoveryTestHelper.generateNodeKeys(1); + final NodeKey nodeKey = nodeKeys.getFirst(); + final DiscoveryPeer peerThatTimesOut = helper.createDiscoveryPeers(nodeKeys).getFirst(); + + // Initialize the peer controller, using a rlpx agent that times out when asked to connect. + // Set a high controller refresh interval and a high timeout threshold, to avoid retries + // getting in the way of this test. + final OutboundMessageHandler outboundMessageHandler = mock(OutboundMessageHandler.class); + RlpxAgent rlpxAgentMock = mock(RlpxAgent.class); + when(rlpxAgentMock.connect(any())) + .thenReturn(CompletableFuture.failedFuture(new Exception(new TimeoutException()))); + controller = + getControllerBuilder() + .outboundMessageHandler(outboundMessageHandler) + .rlpxAgent(rlpxAgentMock) + .build(); + + // Mock the creation of the PING packet, so that we can control the hash, which gets validated + // when processing the PONG. + final PingPacketData mockPing = + PingPacketData.create( + Optional.ofNullable(localPeer.getEndpoint()), + peerThatTimesOut.getEndpoint(), + UInt64.ONE); + final Packet mockPacket = Packet.create(PacketType.PING, mockPing, nodeKey); + mockPingPacketCreation(mockPacket); + controller.setRetryDelayFunction(PeerDiscoveryControllerTest::longDelayFunction); + controller.start(); + + controller.handleBondingRequest(peerThatTimesOut); + + // Verify that the PING was sent. + verify(outboundMessageHandler, times(1)) + .send(eq(peerThatTimesOut), matchPacketOfType(PacketType.PING)); + + // Simulate a PONG message from the peer. + respondWithPong(peerThatTimesOut, nodeKey, mockPacket.getHash()); + + final List peersInTable = controller.streamDiscoveredPeers().toList(); + assertThat(peersInTable).hasSize(0); + assertThat(peersInTable).doesNotContain(peerThatTimesOut); + + // Try bonding again, and check that the peer is not sent the PING packet again + controller.handleBondingRequest(peerThatTimesOut); + + // verify that the ping was not sent, no additional interaction + verify(outboundMessageHandler, times(1)) + .send(eq(peerThatTimesOut), matchPacketOfType(PacketType.PING)); + } + private ControllerBuilder getControllerBuilder() { + final RlpxAgent rlpxAgent = mock(RlpxAgent.class); + when(rlpxAgent.connect(any())) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException())); return ControllerBuilder.create() .nodeKey(localNodeKey) .localPeer(localPeer) - .peerTable(peerTable); + .peerTable(peerTable) + .rlpxAgent(rlpxAgent); } private void respondWithPong( @@ -544,7 +602,7 @@ public void peerSeenTwice() { mockPingPacketCreation(pingPacket); - controller.setRetryDelayFunction((prev) -> 999999999L); + controller.setRetryDelayFunction(PeerDiscoveryControllerTest::longDelayFunction); controller.start(); verify(outboundMessageHandler, times(1)) @@ -994,7 +1052,7 @@ public void shouldAddNewPeerWhenReceivedPongAndPeerTableBucketIsNotFull() { .build(); mockPingPacketCreation(pingPacket); - controller.setRetryDelayFunction((prev) -> 999999999L); + controller.setRetryDelayFunction(PeerDiscoveryControllerTest::longDelayFunction); controller.start(); verify(outboundMessageHandler, times(1)).send(any(), matchPacketOfType(PacketType.PING)); @@ -1689,6 +1747,7 @@ static class ControllerBuilder { private Cache enrs = CacheBuilder.newBuilder().maximumSize(50).expireAfterWrite(10, TimeUnit.SECONDS).build(); private boolean filterOnForkId = false; + private RlpxAgent rlpxAgent; public static ControllerBuilder create() { return new ControllerBuilder(); @@ -1744,6 +1803,11 @@ public ControllerBuilder filterOnForkId(final boolean filterOnForkId) { return this; } + public ControllerBuilder rlpxAgent(final RlpxAgent rlpxAgent) { + this.rlpxAgent = rlpxAgent; + return this; + } + PeerDiscoveryController build() { checkNotNull(nodeKey); if (localPeer == null) { @@ -1752,6 +1816,7 @@ PeerDiscoveryController build() { if (peerTable == null) { peerTable = new PeerTable(localPeer.getId()); } + return spy( PeerDiscoveryController.builder() .nodeKey(nodeKey) @@ -1767,7 +1832,7 @@ PeerDiscoveryController build() { .metricsSystem(new NoOpMetricsSystem()) .cacheForEnrRequests(enrs) .filterOnEnrForkId(filterOnForkId) - .rlpxAgent(mock(RlpxAgent.class)) + .rlpxAgent(rlpxAgent) .build()); } } diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryTableRefreshTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryTableRefreshTest.java index 6320c909622..b200239fee5 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryTableRefreshTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryTableRefreshTest.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import org.hyperledger.besu.cryptoservices.NodeKey; import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer; @@ -34,8 +35,8 @@ import java.util.HashSet; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; -import java.util.stream.Collectors; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.units.bigints.UInt64; @@ -59,6 +60,9 @@ public void tableRefreshSingleNode() { final OutboundMessageHandler outboundMessageHandler = mock(OutboundMessageHandler.class); final MockTimerUtil timer = new MockTimerUtil(); + final RlpxAgent rlpxAgent = mock(RlpxAgent.class); + when(rlpxAgent.connect(any())) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException())); final PeerDiscoveryController controller = spy( PeerDiscoveryController.builder() @@ -70,7 +74,7 @@ public void tableRefreshSingleNode() { .workerExecutor(new BlockingAsyncExecutor()) .tableRefreshIntervalMs(0) .metricsSystem(new NoOpMetricsSystem()) - .rlpxAgent(mock(RlpxAgent.class)) + .rlpxAgent(rlpxAgent) .build()); controller.start(); @@ -117,7 +121,7 @@ public void tableRefreshSingleNode() { final List capturedFindNeighborsPackets = captor.getAllValues().stream() .filter(p -> p.getType().equals(PacketType.FIND_NEIGHBORS)) - .collect(Collectors.toList()); + .toList(); assertThat(capturedFindNeighborsPackets.size()).isEqualTo(5); // Collect targets from find neighbors packets diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTableTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTableTest.java index bcef09da817..331711a07f6 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTableTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTableTest.java @@ -196,7 +196,7 @@ public void ipAddressIsInvalidReturnsTrue() { final PeerTable.AddResult addResult1 = table.tryAdd(peer1); assertThat(addResult1.getOutcome()).isEqualTo(PeerTable.AddResult.added().getOutcome()); - assertThat(table.ipAddressIsInvalid(peer2.getEndpoint())).isEqualTo(true); + assertThat(table.isIpAddressInvalid(peer2.getEndpoint())).isEqualTo(true); } @Test @@ -210,7 +210,7 @@ public void ipAddressIsInvalidReturnsFalse() { final PeerTable.AddResult addResult1 = table.tryAdd(peer1); assertThat(addResult1.getOutcome()).isEqualTo(PeerTable.AddResult.added().getOutcome()); - assertThat(table.ipAddressIsInvalid(peer2.getEndpoint())).isEqualTo(false); + assertThat(table.isIpAddressInvalid(peer2.getEndpoint())).isEqualTo(false); } @Test