Skip to content

Commit

Permalink
Make sure we close connections in kademlia after queries
Browse files Browse the repository at this point in the history
  • Loading branch information
ianopolous committed Sep 28, 2024
1 parent 2c53611 commit df1324b
Showing 1 changed file with 49 additions and 20 deletions.
69 changes: 49 additions & 20 deletions src/main/java/org/peergos/protocol/dht/Kademlia.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ public void bootstrap(Host us) {
// lookup our own peer id to keep our nearest neighbours up-to-date,
// and connect to all of them, so they know about our addresses
List<PeerAddresses> closestToUs = findClosestPeers(Multihash.deserialize(us.getPeerId().getBytes()), 20, us);
int connectedClosest = 0;
List<Multihash> connected = new ArrayList<>();
for (PeerAddresses peer : closestToUs) {
if (connectTo(us, peer))
connectedClosest++;
connected.add(peer.peerId);
}
LOG.info("Bootstrap connected to " + connectedClosest + " nodes close to us.");
LOG.info("Bootstrap connected to " + connected.size() + " nodes close to us. " + connected.stream().map(Multihash::toString).sorted().limit(5).collect(Collectors.toList()));
}

static class RoutingEntry {
Expand Down Expand Up @@ -233,12 +233,16 @@ public CompletableFuture<List<PeerAddresses>> findProviders(Multihash block, Hos
List<CompletableFuture<Providers>> futures = queryThisRound.stream()
.parallel()
.map(r -> {
KademliaController res = null;
StreamPromise<? extends KademliaController> conn = null;
try {
res = dialPeer(r.addresses, us).join();
return res.getProviders(block).orTimeout(2, TimeUnit.SECONDS);
}catch (Exception e) {
conn = dialPeer(r.addresses, us);
return conn.getController().join()
.getProviders(block).orTimeout(2, TimeUnit.SECONDS);
} catch (Exception e) {
return null;
} finally {
if (conn != null)
conn.getStream().thenApply(s -> s.close());
}
}).filter(prov -> prov != null)
.collect(Collectors.toList());
Expand Down Expand Up @@ -269,8 +273,11 @@ public CompletableFuture<List<PeerAddresses>> findProviders(Multihash block, Hos
}

private CompletableFuture<List<PeerAddresses>> getCloserPeers(byte[] key, PeerAddresses target, Host us) {
StreamPromise<? extends KademliaController> conn = null;
try {
return dialPeer(target, us).orTimeout(2, TimeUnit.SECONDS).join().closerPeers(key);
conn = dialPeer(target, us);
KademliaController contr = conn.getController().orTimeout(2, TimeUnit.SECONDS).join();
return contr.closerPeers(key);
} catch (Exception e) {
// we can't dial quic only nodes until it's implemented
if (target.addresses.stream().allMatch(a -> a.toString().contains("quic")))
Expand All @@ -282,6 +289,9 @@ private CompletableFuture<List<PeerAddresses>> getCloserPeers(byte[] key, PeerAd
else if (e.getCause() instanceof ConnectionClosedException) {}
else
e.printStackTrace();
} finally {
if (conn != null)
conn.getStream().thenApply(s -> s.close());
}
return CompletableFuture.completedFuture(Collections.emptyList());
}
Expand All @@ -292,26 +302,34 @@ private Multiaddr[] getPublic(PeerAddresses target) {
.collect(Collectors.toList()).toArray(new Multiaddr[0]);
}

private CompletableFuture<? extends KademliaController> dialPeer(PeerAddresses target, Host us) {
private StreamPromise<? extends KademliaController> dialPeer(PeerAddresses target, Host us) {
Multiaddr[] multiaddrs = target.addresses.stream()
.map(a -> Multiaddr.fromString(a.toString()))
.filter(a -> ! a.has(Protocol.DNS) && ! a.has(Protocol.DNS4) && ! a.has(Protocol.DNS6))
.collect(Collectors.toList()).toArray(new Multiaddr[0]);
return dial(us, PeerId.fromBase58(target.peerId.toBase58()), multiaddrs).getController();
return dial(us, PeerId.fromBase58(target.peerId.toBase58()), multiaddrs);
}

public CompletableFuture<Void> provideBlock(Multihash block, Host us, PeerAddresses ourAddrs) {
List<PeerAddresses> closestPeers = findClosestPeers(block, 20, us);
List<CompletableFuture<Boolean>> provides = closestPeers.stream()
.parallel()
.map(p -> dialPeer(p, us)
.thenCompose(contr -> contr.provide(block, ourAddrs))
.exceptionally(t -> {
if (t.getCause() instanceof NonCompleteException)
.map(p -> {
StreamPromise<? extends KademliaController> conn = dialPeer(p, us);
return conn.getController()
.thenCompose(contr -> contr.provide(block, ourAddrs))
.thenApply(res -> {
conn.getStream().thenApply(s -> s.close());
return res;
})
.exceptionally(t -> {
if (t.getCause() instanceof NonCompleteException)
return true;
LOG.log(Level.FINE, t, t::getMessage);
conn.getStream().thenApply(s -> s.close());
return true;
LOG.log(Level.FINE, t, t::getMessage);
return true;
}))
});
})
.collect(Collectors.toList());
return CompletableFuture.allOf(provides.toArray(new CompletableFuture[0]));
}
Expand All @@ -333,10 +351,15 @@ private CompletableFuture<Boolean> putValue(Multihash publisher,
byte[] signedRecord,
PeerAddresses peer,
Host us) {
StreamPromise<? extends KademliaController> conn = null;
try {
return dialPeer(peer, us).join()
conn = dialPeer(peer, us);
return conn.getController().join()
.putValue(publisher, signedRecord);
} catch (Exception e) {}
} catch (Exception e) {} finally {
if (conn != null)
conn.getStream().thenApply(s -> s.close());
}
return CompletableFuture.completedFuture(false);
}

Expand Down Expand Up @@ -446,15 +469,21 @@ public CompletableFuture<String> resolveIpnsValue(Multihash publisher, Host us,
}

private CompletableFuture<Optional<GetResult>> getValueFromPeer(PeerAddresses peer, Multihash publisher, Host us) {
StreamPromise<? extends KademliaController> conn = null;
try {
return dialPeer(peer, us)
conn = dialPeer(peer, us);
return conn
.getController()
.orTimeout(1, TimeUnit.SECONDS)
.join()
.getValue(publisher)
.orTimeout(1, TimeUnit.SECONDS)
.thenApply(Optional::of);
} catch (Exception e) {
return CompletableFuture.completedFuture(Optional.empty());
} finally {
if (conn != null)
conn.getStream().thenApply(s -> s.close());
}
}
public List<IpnsRecord> resolveValue(Multihash publisher, int minResults, Host us) {
Expand Down

0 comments on commit df1324b

Please sign in to comment.