Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize threading when batch fetching DeArrow parallelly #651

Merged
merged 1 commit into from
Jul 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/me/kavin/piped/server/ServerLauncher.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ AsyncServlet mainServlet(Executor executor) {
var videoIds = getArray(request.getQueryParameter("videoIds"));

return getJsonResponse(
SponsorBlockUtils.getDeArrowedInfo(List.of(videoIds))
SponsorBlockUtils.getDeArrowedInfo(videoIds)
.thenApplyAsync(json -> {
try {
return mapper.writeValueAsBytes(json);
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/me/kavin/piped/utils/Multithreading.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package me.kavin.piped.utils;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.*;
import java.util.function.Supplier;

public class Multithreading {
Expand All @@ -12,11 +10,16 @@ public class Multithreading {
.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 8);
private static final ExecutorService esLimitedPubSub = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static final ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());

public static void runAsync(final Runnable runnable) {
es.submit(runnable);
}

public static void runAsyncTask(final ForkJoinTask<?> task) {
forkJoinPool.submit(task);
}

public static void runAsyncLimited(final Runnable runnable) {
esLimited.submit(runnable);
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/me/kavin/piped/utils/RequestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ public static JsonNode getJsonNode(OkHttpClient client, Request request) throws
}
}

public static CompletableFuture<JsonNode> sendGetJson(String url) throws Exception {
public static CompletableFuture<JsonNode> sendGetJson(String url) {
return ReqwestUtils.fetch(url, "GET", null, Map.of()).thenApply(Response::body).thenApplyAsync(resp -> {
try {
return mapper.readTree(resp);
} catch (Exception e) {
throw new RuntimeException("Failed to parse JSON", e);
}
}, Multithreading.getCachedExecutor());
});
}
}
65 changes: 39 additions & 26 deletions src/main/java/me/kavin/piped/utils/SponsorBlockUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.util.List;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinTask;

import static me.kavin.piped.consts.Constants.SPONSORBLOCK_SERVERS;
import static me.kavin.piped.consts.Constants.mapper;

public class SponsorBlockUtils {
Expand Down Expand Up @@ -54,47 +56,58 @@ public static String getSponsors(String id, String categories)
return null;
}

public static CompletableFuture<ObjectNode> getDeArrowedInfo(List<String> videoIds) {
public static CompletableFuture<ObjectNode> getDeArrowedInfo(String[] videoIds) {
ObjectNode objectNode = mapper.createObjectNode();

var futures = videoIds.stream()
.map(id -> getDeArrowedInfo(id).thenAcceptAsync(jsonNode -> objectNode.set(id, jsonNode.orElse(NullNode.getInstance()))))
var futures = Arrays.stream(videoIds)
.map(id -> getDeArrowedInfo(id, SPONSORBLOCK_SERVERS.toArray(new String[0]))
.thenAcceptAsync(jsonNode -> objectNode.set(id, jsonNode.orElse(NullNode.getInstance())))
)
.toArray(CompletableFuture[]::new);

return CompletableFuture.allOf(futures)
.thenApplyAsync(v -> objectNode, Multithreading.getCachedExecutor());
}

private static CompletableFuture<Optional<JsonNode>> getDeArrowedInfo(String videoId) {
private static CompletableFuture<Optional<JsonNode>> getDeArrowedInfo(String videoId, String[] servers) {

String hash = DigestUtils.sha256Hex(videoId);

CompletableFuture<Optional<JsonNode>> future = new CompletableFuture<>();

Multithreading.runAsync(() -> {
for (String url : Constants.SPONSORBLOCK_SERVERS)
try {
Optional<JsonNode> optional = RequestUtils.sendGetJson(url + "/api/branding/" + URLUtils.silentEncode(hash.substring(0, 4)))
.thenApplyAsync(json -> json.has(videoId) ? Optional.of(json.get(videoId)) : Optional.<JsonNode>empty())
.get();

optional.ifPresent(jsonNode -> {
ArrayNode nodes = (ArrayNode) jsonNode.get("thumbnails");
for (JsonNode node : nodes) {
if (!node.get("original").booleanValue())
((ObjectNode) node).set("thumbnail", new TextNode(URLUtils.rewriteURL("https://dearrow-thumb.ajay.app/api/v1/getThumbnail?videoID=" + videoId + "&time=" + node.get("timestamp").asText())));
}
});


future.complete(optional);
return;
} catch (Exception ignored) {
}
future.completeExceptionally(new Exception("All SponsorBlock servers are down"));
var task = ForkJoinTask.adapt(() -> {
fetchDeArrowedCf(future, videoId, hash, servers);
});

Multithreading.runAsyncTask(task);

return future;

}

private static void fetchDeArrowedCf(CompletableFuture<Optional<JsonNode>> future, String videoId, String hash, String[] servers) {

var completableFuture = RequestUtils.sendGetJson(servers[0] + "/api/branding/" + URLUtils.silentEncode(hash.substring(0, 4)))
.thenApplyAsync(json -> json.has(videoId) ? Optional.of(json.get(videoId)) : Optional.<JsonNode>empty());

completableFuture.thenAcceptAsync(optional -> optional.ifPresent(jsonNode -> {
ArrayNode nodes = (ArrayNode) jsonNode.get("thumbnails");
for (JsonNode node : nodes) {
if (!node.get("original").booleanValue())
((ObjectNode) node).set("thumbnail", new TextNode(URLUtils.rewriteURL("https://dearrow-thumb.ajay.app/api/v1/getThumbnail?videoID=" + videoId + "&time=" + node.get("timestamp").asText())));
}
}));


completableFuture.whenComplete((optional, throwable) -> {
if (throwable == null)
future.complete(optional);
else {
if (servers.length == 1)
future.completeExceptionally(new Exception("All SponsorBlock servers are down"));
else
fetchDeArrowedCf(future, videoId, hash, Arrays.copyOfRange(servers, 1, servers.length));
}
});
}
}