diff --git a/src/main/java/me/kavin/piped/Main.java b/src/main/java/me/kavin/piped/Main.java index a8647c0f..a5a8b57b 100644 --- a/src/main/java/me/kavin/piped/Main.java +++ b/src/main/java/me/kavin/piped/Main.java @@ -23,6 +23,8 @@ import rocks.kavin.reqwest4j.ReqwestUtils; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -102,15 +104,39 @@ public void run() { Collections.shuffle(channelIds); - channelIds.stream() - .parallel() - .forEach(id -> Multithreading.runAsyncLimitedPubSub(() -> { + var queue = new ConcurrentLinkedQueue<>(channelIds); + + System.out.println("PubSub: queue size - " + queue.size() + " channels"); + + for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) { + new Thread(() -> { + + Object o = new Object(); + + String channelId; + while ((channelId = queue.poll()) != null) { try { - PubSubHelper.subscribePubSub(id); + CompletableFuture future = PubSubHelper.subscribePubSub(channelId); + + if (future == null) + continue; + + future.whenComplete((resp, throwable) -> { + synchronized (o) { + o.notify(); + } + }); + + synchronized (o) { + o.wait(); + } + } catch (Exception e) { ExceptionHandler.handle(e); } - })); + } + }, "PubSub-" + i).start(); + } } catch (Exception e) { e.printStackTrace(); diff --git a/src/main/java/me/kavin/piped/utils/PubSubHelper.java b/src/main/java/me/kavin/piped/utils/PubSubHelper.java index e1b66066..f74bb5d2 100644 --- a/src/main/java/me/kavin/piped/utils/PubSubHelper.java +++ b/src/main/java/me/kavin/piped/utils/PubSubHelper.java @@ -6,16 +6,21 @@ import okio.Buffer; import org.hibernate.StatelessSession; import rocks.kavin.reqwest4j.ReqwestUtils; +import rocks.kavin.reqwest4j.Response; +import javax.annotation.Nullable; import java.io.IOException; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; public class PubSubHelper { - public static void subscribePubSub(String channelId) throws IOException { + + @Nullable + public static CompletableFuture subscribePubSub(String channelId) throws IOException { if (!ChannelHelpers.isValidId(channelId)) - return; + return null; PubSub pubsub = DatabaseHelper.getPubSubFromId(channelId); @@ -44,16 +49,21 @@ public static void subscribePubSub(String channelId) throws IOException { var buffer = new Buffer(); formBuilder.build().writeTo(buffer); - ReqwestUtils.fetch(Constants.PUBSUB_HUB_URL, "POST", buffer.readByteArray(), Map.of()) - .thenAccept(resp -> { - if (resp.status() != 202) + var completableFuture = ReqwestUtils.fetch(Constants.PUBSUB_HUB_URL, "POST", buffer.readByteArray(), Map.of()); + + completableFuture + .whenComplete((resp, e) -> { + if (e != null) { + ExceptionHandler.handle((Exception) e); + return; + } + if (resp != null && resp.status() != 202) System.out.println("Failed to subscribe: " + resp.status() + "\n" + new String(resp.body())); - }) - .exceptionally(e -> { - ExceptionHandler.handle((Exception) e); - return null; }); + + return completableFuture; } + return null; } public static void updatePubSub(String channelId) {