Skip to content

Commit

Permalink
Retain endpoint concurrency limits on node refresh (#2418)
Browse files Browse the repository at this point in the history
Proof of concept retaining endpoint concurrency limits on refresh
  • Loading branch information
carterkozak authored Nov 18, 2024
1 parent 1e8fb68 commit b8dc39c
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 8 deletions.
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-2418.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: improvement
improvement:
description: Retain endpoint concurrency limits on node refresh
links:
- https://github.com/palantir/dialogue/pull/2418
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,13 @@ final class ConcurrencyLimitedChannel implements LimitedChannel {
static final ChannelState.Key<CautiousIncreaseAggressiveDecreaseConcurrencyLimiter> HOST_SPECIFIC_STATE_KEY =
new ChannelState.Key<>(
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.class,
ConcurrencyLimitedChannel::createHostSpecificState);
() -> new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.HOST_LEVEL));

@VisibleForTesting
static final ChannelState.Key<CautiousIncreaseAggressiveDecreaseConcurrencyLimiter> ENDPOINT_SPECIFIC_STATE_KEY =
new ChannelState.Key<>(
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.class,
() -> new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.ENDPOINT_LEVEL));

private final NeverThrowChannel delegate;
private final CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter;
Expand All @@ -63,10 +69,11 @@ static LimitedChannel createForHost(Config cf, Channel channel, int uriIndex, Ch
* Creates a concurrency limited channel for per-endpoint limiting.
* Metrics are not reported by this component per-endpoint, only by the per-endpoint queue.
*/
static LimitedChannel createForEndpoint(Channel channel, String channelName, int uriIndex, Endpoint endpoint) {
static LimitedChannel createForEndpoint(
Channel channel, String channelName, int uriIndex, Endpoint endpoint, ChannelState endpointChannelState) {
return new ConcurrencyLimitedChannel(
channel,
new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.ENDPOINT_LEVEL),
endpointChannelState.getState(ENDPOINT_SPECIFIC_STATE_KEY),
new EndpointConcurrencyLimitedChannelInstrumentation(channelName, uriIndex, endpoint));
}

Expand All @@ -79,10 +86,6 @@ static LimitedChannel createForEndpoint(Channel channel, String channelName, int
this.channelNameForLogging = instrumentation.channelNameForLogging();
}

static CautiousIncreaseAggressiveDecreaseConcurrencyLimiter createHostSpecificState() {
return new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.HOST_LEVEL);
}

@Override
public Optional<ListenableFuture<Response>> maybeExecute(
Endpoint endpoint, Request request, LimitEnforcement limitEnforcement) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.palantir.dialogue.core;

import com.codahale.metrics.Meter;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Ticker;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -247,12 +249,20 @@ private static ImmutableList<LimitedChannel> createHostChannels(
LimitedChannel limitedChannel;
if (cf.isConcurrencyLimitingEnabled()) {
Channel unlimited = channel;
EndpointChannelState endpointChannelState = channelState.getState(EndpointChannelState.KEY);
channel = new ChannelToEndpointChannel(endpoint -> {
if (endpoint.tags().contains("dialogue-disable-endpoint-concurrency-limiting")) {
return unlimited;
}
LimitedChannel limited = ConcurrencyLimitedChannel.createForEndpoint(
unlimited, cf.channelName(), uriIndexForInstrumentation, endpoint);
unlimited,
cf.channelName(),
uriIndexForInstrumentation,
endpoint,
endpointChannelState.get(endpoint));
// Note that because the queue is recreated when nodes are refreshed, it's critical that
// the queue can force at least one request through at a time using the behavior introduced
// by https://github.com/palantir/dialogue/pull/2422
return QueuedChannel.create(cf, endpoint, limited);
});
limitedChannel = ConcurrencyLimitedChannel.createForHost(
Expand All @@ -266,6 +276,33 @@ private static ImmutableList<LimitedChannel> createHostChannels(
return perUriChannels.build();
}

/**
* {@link ChannelState} provider for per-endpoint channels like the endpoint concurrency limiter.
* This object is held in the per-host state, and can be used to look up a {@link ChannelState}
* scoped to an individual {@link Endpoint}.
* {@link Endpoint} state is held in a weak-keyed cache, equivalent to the one used in
* {@link ChannelToEndpointChannel}.
* {@link Endpoint} objects are usually enums, which will never be garbage collected, however it's possible
* that callers may build an endpoint instances on a per-call basis, so the weak-keyed map is defensive
* against short-lived endpoints.
* We don't use the same map because the {@link ChannelToEndpointChannel} retains full channel state which
* may or may not be designed to be reused across reloads, and we aim to be more precise with state that is
* kept across uri changes.
*/
private record EndpointChannelState(LoadingCache<Endpoint, ChannelState> cache) {
private static final ChannelState.Key<EndpointChannelState> KEY =
new ChannelState.Key<>(EndpointChannelState.class, EndpointChannelState::create);

ChannelState get(Endpoint endpoint) {
return cache.get(endpoint);
}

private static EndpointChannelState create() {
return new EndpointChannelState(
Caffeine.newBuilder().weakKeys().maximumSize(10_000).build(_key -> new ChannelState()));
}
}

private static EndpointChannelFactory createEndpointChannelFactory(Channel multiHostQueuedChannel, Config cf) {
Channel queuedChannel = new QueueOverrideChannel(multiHostQueuedChannel);
return endpoint -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,30 @@ public void testReuseCachedLimiterState_host() {
assertThat(limiter.getInflight()).isEqualTo(2);
}

@Test
public void testReuseCachedLimiterState_endpoint() {
String channelName = "channel";
ChannelState state = new ChannelState();

// create two channels for the same endpoint, which should re-use the same AIMD state
LimitedChannel forEndpoint =
ConcurrencyLimitedChannel.createForEndpoint(delegate, channelName, 0, endpoint, state);
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter =
state.getState(ConcurrencyLimitedChannel.ENDPOINT_SPECIFIC_STATE_KEY);

assertThat(limiter.getInflight()).isEqualTo(0);

forEndpoint.maybeExecute(endpoint, request, LimitEnforcement.DEFAULT_ENABLED);
assertThat(limiter.getInflight()).isEqualTo(1);

// different uriIndex has no impact on whether state is shared, as indexes will shuffle when nodes go down
LimitedChannel forEndpoint2 =
ConcurrencyLimitedChannel.createForEndpoint(delegate, channelName, 1, endpoint, state);
forEndpoint2.maybeExecute(endpoint, request, LimitEnforcement.DEFAULT_ENABLED);

assertThat(limiter.getInflight()).isEqualTo(2);
}

@Test
public void testLimiterAvailable_successfulRequest_host() {
mockHostLimitAvailable();
Expand Down

0 comments on commit b8dc39c

Please sign in to comment.