Skip to content

Commit

Permalink
wip hack
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Laub committed Nov 15, 2024
1 parent 5f3b905 commit 7e88a7c
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ final class CautiousIncreaseAggressiveDecreaseConcurrencyLimiter {

private static final SafeLogger log =
SafeLoggerFactory.get(CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.class);
private static final double INITIAL_LIMIT = 20;
private static final double INITIAL_LIMIT = 1;
private static final double BACKOFF_RATIO = .9D;
private static final double MIN_LIMIT = 1;
private static final double MAX_LIMIT = 1_000_000D;
Expand Down Expand Up @@ -84,8 +84,15 @@ Optional<Permit> acquire(LimitEnforcement limitEnforcement) {
int currentLimit = (int) getLimit();
while (true) {
int currentInFlight = localInFlight.get();
if (limitEnforcement.enforceLimits() && currentInFlight >= currentLimit) {
return Optional.empty();
// if (limitEnforcement.enforceLimits() && currentInFlight >= currentLimit) {
// return Optional.empty();
// }
if (currentInFlight >= currentLimit) {
if (limitEnforcement.enforceLimits()) {
return Optional.empty();
} else {
log.warn("Bypassing concurrency limits that would have otherwise rejected this request");
}
}

int newInFlight = currentInFlight + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,9 @@ Optional<ListenableFuture<Response>> maybeExecute(Endpoint endpoint, Request req
// Optimistically avoid the queue in the fast path.
// Queuing adds contention between threads and should be avoided unless we need to shed load.
if (queueSizeEstimate.get() <= 0) {
Optional<ListenableFuture<Response>> maybeResult =
delegate.maybeExecute(endpoint, request, limitEnforcement());
LimitEnforcement enforcement = limitEnforcement();
log.warn("limit enforcement set to {}", SafeArg.of("enforcement", enforcement));
Optional<ListenableFuture<Response>> maybeResult = delegate.maybeExecute(endpoint, request, enforcement);
if (maybeResult.isPresent()) {
inFlight.incrementAndGet();
ListenableFuture<Response> result = maybeResult.get();
Expand Down Expand Up @@ -261,8 +262,10 @@ private boolean scheduleNextTask() {
}
try (CloseableSpan ignored = queueHead.span().attach()) {
Endpoint endpoint = queueHead.endpoint();
LimitEnforcement enforcement = limitEnforcement();
log.warn("limit enforcement set to {}", SafeArg.of("enforcement", enforcement));
Optional<ListenableFuture<Response>> maybeResponse =
delegate.maybeExecute(endpoint, queueHead.request(), limitEnforcement());
delegate.maybeExecute(endpoint, queueHead.request(), enforcement);

if (maybeResponse.isPresent()) {
inFlight.incrementAndGet();
Expand Down
18 changes: 18 additions & 0 deletions simulation/src/main/java/com/palantir/dialogue/core/Benchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,24 @@ public void onFailure(Throwable throwable) {

Futures.addCallback(
future, accumulateStatusCodes, DialogueFutures.safeDirectExecutor());
Futures.addCallback(
future,
new FutureCallback<Response>() {
@Override
public void onSuccess(Response result) {
log.warn(
"req #{} finished with status {}",
req.number(),
result.code());
}

@Override
public void onFailure(Throwable t) {
log.warn("req #{} finished with status ERROR", req.number(), t);
}
},
DialogueFutures.safeDirectExecutor());

future.addListener(
() -> {
responsesReceived[0] += 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.codahale.metrics.Meter;
import com.google.common.base.Stopwatch;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.palantir.dialogue.Channel;
import com.palantir.dialogue.Endpoint;
import com.palantir.dialogue.EndpointChannel;
Expand All @@ -46,8 +47,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -348,10 +351,94 @@ public void all_nodes_500(Strategy strategy) {
.requestsPerSecond(100)
.sendUntil(Duration.ofSeconds(20))
.clients(10, _i -> strategy.getChannel(simulation, servers))
.abortAfter(Duration.ofMinutes(10))
.abortAfter(Duration.ofMinutes(5))
.run();
}

private static final class WtfCase {
int servers;
int clients;
int rps;
int duration;

WtfCase(int servers, int clients, int rps, int duration) {
this.servers = servers;
this.clients = clients;
this.rps = rps;
this.duration = duration;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WtfCase wtfCase = (WtfCase) o;
return servers == wtfCase.servers
&& clients == wtfCase.clients
&& rps == wtfCase.rps
&& duration == wtfCase.duration;
}

@Override
public int hashCode() {
return Objects.hash(servers, clients, rps, duration);
}
}

@SimulationCase
public void wtf(Strategy strategy) {
List<WtfCase> tests = ImmutableList.of(new WtfCase(1, 2, 50, 40));
// new WtfCase(1, 2, 50, 40));
// new WtfCase(1, 10, 50, 40),
// new WtfCase(2, 1, 50, 40),
// new WtfCase(2, 2, 50, 40),
// new WtfCase(2, 10, 50, 40),
// new WtfCase(1, 1, 100, 20),
// new WtfCase(1, 2, 100, 20),
// new WtfCase(1, 10, 100, 20),
// new WtfCase(2, 1, 100, 20),
// new WtfCase(2, 2, 100, 20),
// new WtfCase(2, 10, 100, 20));

Map<WtfCase, Map<String, Integer>> results = new HashMap<>();

for (WtfCase wtf : tests) {
Simulation sim = new Simulation();
sim.metricsReporter().onlyRecordMetricsFor(MetricNames::reportedMetricsPredicate);

List<SimulationServer> allServers = new ArrayList<>();
for (int i = 1; i <= wtf.servers; ++i) {
allServers.add(SimulationServer.builder()
.serverName(String.format("node%d", i))
.simulation(sim)
.handler(h -> h.response(500).responseTime(Duration.ofMillis(600)))
.until(Duration.ofSeconds(10), "revert badness")
.handler(h -> h.response(200).responseTime(Duration.ofMillis(600)))
.build());
}
Supplier<Map<String, SimulationServer>> serv = servers(allServers.toArray(new SimulationServer[0]));
Strategy strat = strategy;
result = Benchmark.builder()
.simulation(sim)
.requestsPerSecond(wtf.rps)
.sendUntil(Duration.ofSeconds(wtf.duration))
.clients(wtf.clients, _i -> strat.getChannel(sim, serv))
.abortAfter(Duration.ofMinutes(5))
.run();

results.put(wtf, result.statusCodes());
}

System.out.println("============================");
for (Map.Entry<WtfCase, Map<String, Integer>> e : results.entrySet()) {
WtfCase wtf = e.getKey();
System.out.printf(
"servers=%d, clients=%d, rps=%d, duration=%d: %s\n",
wtf.servers, wtf.clients, wtf.rps, wtf.duration, e.getValue());
}
System.out.println("============================");
}

@SimulationCase
public void black_hole(Strategy strategy) {
servers = servers(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@
import java.util.function.Consumer;
import java.util.function.Supplier;

@SuppressWarnings("ImmutableEnumChecker")
@SuppressWarnings({"ImmutableEnumChecker", "unused"})
public enum Strategy {
CONCURRENCY_LIMITER_ROUND_ROBIN(Strategy::concurrencyLimiter),
CONCURRENCY_LIMITER_PIN_UNTIL_ERROR(Strategy::pinUntilError),
UNLIMITED_ROUND_ROBIN(Strategy::unlimitedRoundRobin);
CONCURRENCY_LIMITER_ROUND_ROBIN(Strategy::concurrencyLimiter);
// CONCURRENCY_LIMITER_PIN_UNTIL_ERROR(Strategy::pinUntilError),
// UNLIMITED_ROUND_ROBIN(Strategy::unlimitedRoundRobin);

private static final ClientConfiguration STUB_CONFIG = stubConfig();
private final Consumer<ClientConfiguration.Builder> applyConfig;
Expand Down

0 comments on commit 7e88a7c

Please sign in to comment.