Skip to content

Commit

Permalink
Solver Remote Refactoring (#589)
Browse files Browse the repository at this point in the history
* refactoring solver remote class

* fixing compiling errors

* fixing compiling errors

* reverting

* fixing compiling errors

* fixing compiling errors

* release notes

* [Gradle Release Plugin] - new version commit:  '3.30.3-snapshot'.

* refactoring
  • Loading branch information
mageddo authored Oct 21, 2024
1 parent 5a15362 commit 383bc46
Show file tree
Hide file tree
Showing 21 changed files with 387 additions and 309 deletions.
3 changes: 3 additions & 0 deletions RELEASE-NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.30.3
* Solver Remote Refactoring. #589

## 3.30.2
* Detect which condition didn't match and log the cause when configuring DPS as default DNS #580
* Fixed backend develop docker-compose.yml
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=3.30.2-snapshot
version=3.30.3-snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ public CircuitCheckException(String message) {
public CircuitCheckException(Throwable e) {
super(e.getMessage(), e);
}

public CircuitCheckException(String message, Throwable e) {
super(message, e);
}
}
93 changes: 5 additions & 88 deletions src/main/java/com/mageddo/dnsproxyserver/solver/SolverRemote.java
Original file line number Diff line number Diff line change
@@ -1,42 +1,31 @@
package com.mageddo.dnsproxyserver.solver;

import com.mageddo.commons.circuitbreaker.CircuitCheckException;
import com.mageddo.dns.utils.Messages;
import com.mageddo.dnsproxyserver.solver.remote.Request;
import com.mageddo.dnsproxyserver.solver.remote.Result;
import com.mageddo.dnsproxyserver.solver.remote.application.CircuitBreakerService;
import com.mageddo.dnsproxyserver.solver.remote.application.RemoteResultSupplier;
import com.mageddo.dnsproxyserver.solver.remote.application.ResolverStatsFactory;
import com.mageddo.dnsproxyserver.solver.remote.application.ResultSupplier;
import com.mageddo.net.NetExecutorWatchdog;
import com.mageddo.utils.Executors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.xbill.DNS.Flags;
import org.xbill.DNS.Message;

import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static com.mageddo.dns.utils.Messages.simplePrint;

@Slf4j
@Singleton
@RequiredArgsConstructor(onConstructor = @__({@Inject}))
public class SolverRemote implements Solver, AutoCloseable {

static final String QUERY_TIMED_OUT_MSG = "Query timed out";
public static final int PING_TIMEOUT_IN_MS = 1_500;

private final CircuitBreakerService circuitBreakerService;
private final ResolverStatsFactory resolverStatsFactory;
private final NetExecutorWatchdog netWatchdog = new NetExecutorWatchdog();
Expand Down Expand Up @@ -93,83 +82,11 @@ Request buildRequest(Message query, int resolverIndex, StopWatch stopWatch, Reso

Result safeQueryResult(Request req) {
req.splitStopWatch();
return this.queryUsingCircuitBreaker(req, () -> this.queryResult(req));
}

Result queryUsingCircuitBreaker(Request req, Supplier<Result> sup) {
return this.circuitBreakerService.safeHandle(req.getResolverAddress(), sup);
}

Result queryResult(Request req) {
final var resFuture = this.sendQueryAsyncToResolver(req);
if (this.isPingWhileGettingQueryResponseActive()) {
this.pingWhileGettingQueryResponse(req, resFuture);
}
return this.transformToResult(resFuture, req);
}

CompletableFuture<Message> sendQueryAsyncToResolver(Request req) {
return req.sendQueryAsyncToResolver(this.executor);
return this.queryUsingCircuitBreaker(new RemoteResultSupplier(req, this.executor, this.netWatchdog));
}

void pingWhileGettingQueryResponse(Request req, CompletableFuture<Message> resFuture) {
this.netWatchdog.watch(req.getResolverAddr(), resFuture, PING_TIMEOUT_IN_MS);
}

boolean isPingWhileGettingQueryResponseActive() {
return Boolean.getBoolean("mg.solverRemote.pingWhileGettingQueryResponse");
}

Result transformToResult(CompletableFuture<Message> resFuture, Request request) {
final var res = this.findFutureRes(resFuture, request);
if (res == null) {
return Result.empty();
}

if (Messages.isSuccess(res)) {
log.trace(
"status=found, i={}, time={}, req={}, res={}, server={}",
request.getResolverIndex(), request.getTime(), simplePrint(request.getQuery()),
simplePrint(res), request.getResolverAddress()
);
return Result.fromSuccessResponse(Response.success(res));
} else {
log.trace(
"status=notFound, i={}, time={}, req={}, res={}, server={}",
request.getResolverIndex(), request.getTime(), simplePrint(request.getQuery()),
simplePrint(res), request.getResolverAddress()
);
return Result.fromErrorMessage(res);
}
}

Message findFutureRes(CompletableFuture<Message> resFuture, Request request) {
try {
return Messages.setFlag(resFuture.get(), Flags.RA);
} catch (InterruptedException | ExecutionException e) {
this.checkCircuitError(e, request);
return null;
}
}

void checkCircuitError(Exception e, Request request) {
if (e.getCause() instanceof IOException) {
final var time = request.getElapsedTimeInMs();
if (e.getMessage().contains(QUERY_TIMED_OUT_MSG)) {
log.info(
"status=timedOut, i={}, time={}, req={}, msg={} class={}",
request.getResolverIndex(), time, simplePrint(request.getQuery()), e.getMessage(), ClassUtils.getSimpleName(e)
);
throw new CircuitCheckException(e);
}
log.warn(
"status=failed, i={}, time={}, req={}, server={}, errClass={}, msg={}",
request.getResolverIndex(), time, simplePrint(request.getQuery()), request.getResolverAddress(),
ClassUtils.getSimpleName(e), e.getMessage(), e
);
} else {
throw new RuntimeException(e.getMessage(), e);
}
Result queryUsingCircuitBreaker(ResultSupplier sup) {
return this.circuitBreakerService.safeHandle(sup);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package com.mageddo.dnsproxyserver.solver.remote;

import com.mageddo.dns.utils.Messages;
import com.mageddo.dnsproxyserver.solver.Resolver;
import com.mageddo.net.IpAddr;
import com.mageddo.net.IpAddrs;
import lombok.Builder;
import lombok.NonNull;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.StopWatch;
import org.xbill.DNS.Message;

import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

@Slf4j
@Value
@Builder
public class Request {
Expand Down Expand Up @@ -42,6 +45,7 @@ public void splitStopWatch() {
}

public CompletableFuture<Message> sendQueryAsyncToResolver(Executor executor) {
log.trace("status=querying, server={}, req={}", this.resolver.getAddress(), Messages.simplePrint(this.query));
return this.resolver.sendAsync(this.query, executor).toCompletableFuture();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ public class CircuitBreakerService {

private String status;

public Result safeHandle(InetSocketAddress resolverAddress, Supplier<Result> sup) {
public Result safeHandle(ResultSupplier sup) {
try {
return this.handle(resolverAddress, sup);
return this.handle(sup);
} catch (CircuitCheckException | CircuitIsOpenException e) {
final var clazz = ClassUtils.getSimpleName(e);
log.debug("status=circuitEvent, server={}, type={}", resolverAddress, clazz);
this.status = String.format("%s for %s", clazz, resolverAddress);
log.debug("status=circuitEvent, server={}, type={}", sup.getRemoteAddress(), clazz);
this.status = String.format("%s for %s", clazz, sup.getRemoteAddress());
return Result.empty();
}
}

private Result handle(InetSocketAddress resolverAddress, Supplier<Result> sup) {
return this.circuitBreakerFactory.check(resolverAddress, sup);
private Result handle(ResultSupplier sup) {
return this.circuitBreakerFactory.check(sup);
}

public String getStatus() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.mageddo.dnsproxyserver.solver.remote.application;

import com.mageddo.dnsproxyserver.solver.remote.Request;
import com.mageddo.dnsproxyserver.solver.remote.Result;
import com.mageddo.dnsproxyserver.solver.remote.application.mapper.ResultMapper;
import com.mageddo.net.IpAddr;
import com.mageddo.net.NetExecutorWatchdog;
import lombok.extern.slf4j.Slf4j;
import org.xbill.DNS.Message;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

@Slf4j
public class RemoteResultSupplier implements ResultSupplier {

public static final int PING_TIMEOUT_IN_MS = 1_500;

private final Request req;
private final Executor executor;
private final NetExecutorWatchdog netWatchdog;

public RemoteResultSupplier(Request req, Executor executor, NetExecutorWatchdog netWatchdog) {
this.req = req;
this.executor = executor;
this.netWatchdog = netWatchdog;
}

@Override
public Result get() {
return this.queryResult(this.req);
}

Result queryResult(Request req) {
final var resFuture = this.sendQueryAsyncToResolver(req);
if (this.isPingWhileGettingQueryResponseActive()) {
this.pingWhileGettingQueryResponse(req, resFuture);
}
return ResultMapper.from(resFuture, req);
}

CompletableFuture<Message> sendQueryAsyncToResolver(Request req) {
return req.sendQueryAsyncToResolver(this.executor);
}

boolean isPingWhileGettingQueryResponseActive() {
return Boolean.getBoolean("mg.solverRemote.pingWhileGettingQueryResponse");
}

void pingWhileGettingQueryResponse(Request req, CompletableFuture<Message> resFuture) {
this.netWatchdog.watch(req.getResolverAddr(), resFuture, PING_TIMEOUT_IN_MS);
}

@Override
public String toString() {
return String.format("server=%s", this.req.getResolverAddr());
}

@Override
public IpAddr getRemoteAddress() {
return this.req.getResolverAddr();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.mageddo.dnsproxyserver.solver.remote.application;

import com.mageddo.dnsproxyserver.solver.remote.Result;
import com.mageddo.net.IpAddr;

import java.util.function.Supplier;

public interface ResultSupplier extends Supplier<Result> {
IpAddr getRemoteAddress();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
import com.mageddo.dnsproxyserver.solver.remote.CircuitStatus;
import com.mageddo.dnsproxyserver.solver.remote.Result;
import com.mageddo.dnsproxyserver.solver.remote.application.FailsafeCircuitBreakerFactory;
import com.mageddo.dnsproxyserver.solver.remote.application.ResultSupplier;
import com.mageddo.dnsproxyserver.solver.remote.circuitbreaker.application.CircuitBreakerDelegate;
import com.mageddo.dnsproxyserver.solver.remote.circuitbreaker.application.CircuitBreakerDelegateNonResilient;
import com.mageddo.dnsproxyserver.solver.remote.circuitbreaker.application.CircuitBreakerDelegateStaticThresholdFailsafe;
import com.mageddo.dnsproxyserver.solver.remote.mapper.ResolverMapper;
import com.mageddo.net.IpAddr;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -21,7 +24,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

@Slf4j
@Singleton
Expand All @@ -34,17 +36,20 @@ public class CircuitBreakerFactory {
private final FailsafeCircuitBreakerFactory failsafeCircuitBreakerFactory;
private final com.mageddo.dnsproxyserver.solver.remote.circuitbreaker.canaryratethreshold.CircuitBreakerFactory canaryThresholdFactory;

public Result check(InetSocketAddress remoteAddress, Supplier<Result> sup) {
final var circuitBreaker = this.findCircuitBreaker(remoteAddress);
public Result check(ResultSupplier sup) {
final var circuitBreaker = this.findCircuitBreaker(sup.getRemoteAddress());
return circuitBreaker.execute(sup);
}

public CircuitBreakerDelegate findCircuitBreaker(InetSocketAddress address) {
final var strategy = this.findCircuitBreakerHotLoad(address);
return this.circuitBreakerMap.computeIfAbsent(address, addr -> strategy);
public CircuitBreakerDelegate findCircuitBreaker(IpAddr serverAddress) {
final var strategy = this.findCircuitBreakerHotLoad(serverAddress);
return this.circuitBreakerMap.computeIfAbsent(
ResolverMapper.toInetSocketAddress(serverAddress),
addr -> strategy
);
}

CircuitBreakerDelegate findCircuitBreakerHotLoad(InetSocketAddress address) {
CircuitBreakerDelegate findCircuitBreakerHotLoad(IpAddr address) {
final var config = this.findCircuitBreakerConfig();
return switch (config.name()) {
case STATIC_THRESHOLD -> this.buildStaticThresholdFailSafeCircuitBreaker(address, config);
Expand All @@ -54,10 +59,10 @@ CircuitBreakerDelegate findCircuitBreakerHotLoad(InetSocketAddress address) {
}

private CircuitBreakerDelegateStaticThresholdFailsafe buildStaticThresholdFailSafeCircuitBreaker(
InetSocketAddress address, CircuitBreakerStrategyConfig config
IpAddr address, CircuitBreakerStrategyConfig config
) {
return new CircuitBreakerDelegateStaticThresholdFailsafe(this.failsafeCircuitBreakerFactory.build(
address,
ResolverMapper.toInetSocketAddress(address),
(StaticThresholdCircuitBreakerStrategyConfig) config
));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.mageddo.dnsproxyserver.solver.remote.application.failsafe;

import com.mageddo.commons.circuitbreaker.CircuitCheckException;
import com.mageddo.dnsproxyserver.solver.SolverRemote;
import com.mageddo.dnsproxyserver.solver.remote.application.RemoteResultSupplier;
import com.mageddo.dnsproxyserver.solver.remote.circuitbreaker.application.CircuitBreakerDelegate;
import com.mageddo.net.Networks;
import dev.failsafe.CircuitBreakerOpenException;
Expand Down Expand Up @@ -46,6 +46,6 @@ void check(InetSocketAddress server, CircuitBreakerDelegate circuitBreaker) {
* @see https://github.com/mageddo/dns-proxy-server/issues/526#issuecomment-2261421618
*/
boolean ping(InetSocketAddress server) {
return Networks.ping(server, SolverRemote.PING_TIMEOUT_IN_MS);
return Networks.ping(server, RemoteResultSupplier.PING_TIMEOUT_IN_MS);
}
}
Loading

0 comments on commit 383bc46

Please sign in to comment.