Skip to content

Commit

Permalink
Handle PubSub better with a queue and actually waiting
Browse files Browse the repository at this point in the history
  • Loading branch information
FireMasterK committed Jul 23, 2023
1 parent 5bdf667 commit c027fbe
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 14 deletions.
36 changes: 31 additions & 5 deletions src/main/java/me/kavin/piped/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import rocks.kavin.reqwest4j.ReqwestUtils;

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -102,15 +104,39 @@ public void run() {

Collections.shuffle(channelIds);

channelIds.stream()
.parallel()
.forEach(id -> Multithreading.runAsyncLimitedPubSub(() -> {
var queue = new ConcurrentLinkedQueue<>(channelIds);

System.out.println("PubSub: queue size - " + queue.size() + " channels");

for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
new Thread(() -> {

Object o = new Object();

String channelId;
while ((channelId = queue.poll()) != null) {
try {
PubSubHelper.subscribePubSub(id);
CompletableFuture<?> future = PubSubHelper.subscribePubSub(channelId);

if (future == null)
continue;

future.whenComplete((resp, throwable) -> {
synchronized (o) {
o.notify();
}
});

synchronized (o) {
o.wait();
}

} catch (Exception e) {
ExceptionHandler.handle(e);
}
}));
}
}, "PubSub-" + i).start();
}

} catch (Exception e) {
e.printStackTrace();
Expand Down
28 changes: 19 additions & 9 deletions src/main/java/me/kavin/piped/utils/PubSubHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,21 @@
import okio.Buffer;
import org.hibernate.StatelessSession;
import rocks.kavin.reqwest4j.ReqwestUtils;
import rocks.kavin.reqwest4j.Response;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class PubSubHelper {
public static void subscribePubSub(String channelId) throws IOException {

@Nullable
public static CompletableFuture<Response> subscribePubSub(String channelId) throws IOException {

if (!ChannelHelpers.isValidId(channelId))
return;
return null;

PubSub pubsub = DatabaseHelper.getPubSubFromId(channelId);

Expand Down Expand Up @@ -44,16 +49,21 @@ public static void subscribePubSub(String channelId) throws IOException {
var buffer = new Buffer();
formBuilder.build().writeTo(buffer);

ReqwestUtils.fetch(Constants.PUBSUB_HUB_URL, "POST", buffer.readByteArray(), Map.of())
.thenAccept(resp -> {
if (resp.status() != 202)
var completableFuture = ReqwestUtils.fetch(Constants.PUBSUB_HUB_URL, "POST", buffer.readByteArray(), Map.of());

completableFuture
.whenComplete((resp, e) -> {
if (e != null) {
ExceptionHandler.handle((Exception) e);
return;
}
if (resp != null && resp.status() != 202)
System.out.println("Failed to subscribe: " + resp.status() + "\n" + new String(resp.body()));
})
.exceptionally(e -> {
ExceptionHandler.handle((Exception) e);
return null;
});

return completableFuture;
}
return null;
}

public static void updatePubSub(String channelId) {
Expand Down

0 comments on commit c027fbe

Please sign in to comment.