Skip to content

Commit

Permalink
Memoize some channel metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
schlosna committed Oct 3, 2023
1 parent a853d3f commit da25cf6
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,30 @@ public DialogueChannel build() {
new StickyValidationChannel(NodeSelectionStrategyChannel.create(cf, channels));

Channel multiHostQueuedChannel = QueuedChannel.create(cf, nodeSelectionChannel);
Channel queuedChannel = new QueueOverrideChannel(multiHostQueuedChannel);
EndpointChannelFactory channelFactory = createEndpointChannelFactory(multiHostQueuedChannel, cf);

Supplier<Channel> stickyChannelSupplier =
StickyEndpointChannels2.create(cf, nodeSelectionChannel, channelFactory);

Meter createMeter = DialogueClientMetrics.of(cf.clientConf().taggedMetricRegistry())
.create()
.clientName(cf.channelName())
.clientType("dialogue-channel-non-reloading")
.build();
createMeter.mark();

return new DialogueChannel(cf, channelFactory, stickyChannelSupplier);
}

EndpointChannelFactory channelFactory = endpoint -> {
EndpointChannel channel = new EndpointChannelAdapter(endpoint, queuedChannel);
private static EndpointChannelFactory createEndpointChannelFactory(Channel multiHostQueuedChannel, Config cf) {
Channel queuedChannel = new QueueOverrideChannel(multiHostQueuedChannel);
return endpoint -> {
EndpointChannel endpointChannel = new EndpointChannelAdapter(endpoint, queuedChannel);
EndpointChannel channel = cf.clientConf()
.userAgent()
.map(userAgent -> UserAgentEndpointChannel.create(endpointChannel, endpoint, userAgent))
.orElse(endpointChannel);
channel = RetryingChannel.create(cf, channel, endpoint);
channel = UserAgentEndpointChannel.create(
channel, endpoint, cf.clientConf().userAgent().get());
channel = DeprecationWarningChannel.create(cf, channel, endpoint);
channel = ContentDecodingChannel.create(cf, channel, endpoint);
channel = new RangeAcceptsIdentityEncodingChannel(channel);
Expand All @@ -195,18 +212,6 @@ public DialogueChannel build() {
channel = new InterruptionChannel(channel);
return new NeverThrowEndpointChannel(channel); // this must come last as a defensive backstop
};

Supplier<Channel> stickyChannelSupplier =
StickyEndpointChannels2.create(cf, nodeSelectionChannel, channelFactory);

Meter createMeter = DialogueClientMetrics.of(cf.clientConf().taggedMetricRegistry())
.create()
.clientName(cf.channelName())
.clientType("dialogue-channel-non-reloading")
.build();
createMeter.mark();

return new DialogueChannel(cf, channelFactory, stickyChannelSupplier);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ final class RetryingChannel implements EndpointChannel {
private final ClientConfiguration.RetryOnTimeout retryOnTimeout;
private final Duration backoffSlotSize;
private final DoubleSupplier jitter;
private final Meter retryDueToServerError;
private final Meter retryDueToQosResponse;
private final Supplier<Meter> retryDueToServerError;
private final Supplier<Meter> retryDueToQosResponse;
private final Function<Throwable, Meter> retryDueToThrowable;

static EndpointChannel create(Config cf, EndpointChannel channel, Endpoint endpoint) {
Expand Down Expand Up @@ -180,16 +180,16 @@ private RetryingChannel(
this.scheduler = instrument(scheduler, metrics);
this.jitter = jitter;
DialogueClientMetrics dialogueClientMetrics = DialogueClientMetrics.of(metrics);
this.retryDueToServerError = dialogueClientMetrics
this.retryDueToServerError = Suppliers.memoize(() -> dialogueClientMetrics
.requestRetry()
.channelName(channelName)
.reason("serverError")
.build();
this.retryDueToQosResponse = dialogueClientMetrics
.build());
this.retryDueToQosResponse = Suppliers.memoize(() -> dialogueClientMetrics
.requestRetry()
.channelName(channelName)
.reason("qosResponse")
.build();
.build());
this.retryDueToThrowable = throwable -> dialogueClientMetrics
.requestRetry()
.channelName(channelName)
Expand Down Expand Up @@ -271,11 +271,11 @@ private ListenableFuture<Response> wrap(ListenableFuture<Response> input) {
private ListenableFuture<Response> handleHttpResponse(Response response) {
boolean canRetryRequest = requestCanBeRetried();
if (canRetryRequest && isRetryableQosStatus(response)) {
return incrementFailuresAndMaybeRetry(response, qosThrowable, retryDueToQosResponse);
return incrementFailuresAndMaybeRetry(response, qosThrowable, retryDueToQosResponse.get());
}

if (canRetryRequest && Responses.isInternalServerError(response) && safeToRetry(endpoint.httpMethod())) {
return incrementFailuresAndMaybeRetry(response, serverErrorThrowable, retryDueToServerError);
return incrementFailuresAndMaybeRetry(response, serverErrorThrowable, retryDueToServerError.get());
}

return Futures.immediateFuture(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.codahale.metrics.Timer;
import com.github.benmanes.caffeine.cache.Ticker;
import com.google.common.base.Suppliers;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.RateLimiter;
Expand All @@ -32,15 +33,16 @@
import com.palantir.tritium.metrics.registry.TaggedMetricRegistry;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

final class TimingEndpointChannel implements EndpointChannel {

private static final SafeLogger log = SafeLoggerFactory.get(TimingEndpointChannel.class);
private static final RateLimiter unknownThrowableLoggingRateLimiter = RateLimiter.create(1);

private final EndpointChannel delegate;
private final Timer successTimer;
private final Timer failureTimer;
private final Supplier<Timer> successTimer;
private final Supplier<Timer> failureTimer;
private final Ticker ticker;

TimingEndpointChannel(
Expand All @@ -52,18 +54,18 @@ final class TimingEndpointChannel implements EndpointChannel {
this.delegate = delegate;
this.ticker = ticker;
ClientMetrics metrics = ClientMetrics.of(taggedMetrics);
this.successTimer = metrics.response()
this.successTimer = Suppliers.memoize(() -> metrics.response()
.channelName(channelName)
.serviceName(endpoint.serviceName())
.endpoint(endpoint.endpointName())
.status("success")
.build();
this.failureTimer = metrics.response()
.build());
this.failureTimer = Suppliers.memoize(() -> metrics.response()
.channelName(channelName)
.serviceName(endpoint.serviceName())
.endpoint(endpoint.endpointName())
.status("failure")
.build();
.build());
}

static EndpointChannel create(Config cf, EndpointChannel delegate, Endpoint endpoint) {
Expand All @@ -76,7 +78,7 @@ public ListenableFuture<Response> execute(Request request) {
long beforeNanos = ticker.read();
ListenableFuture<Response> response = delegate.execute(request);

return DialogueFutures.addDirectCallback(response, new FutureCallback<Response>() {
return DialogueFutures.addDirectCallback(response, new FutureCallback<>() {
@Override
@SuppressWarnings("PreferJavaTimeOverload")
public void onSuccess(Response response) {
Expand All @@ -102,8 +104,8 @@ public void onFailure(Throwable throwable) {
}

@SuppressWarnings("PreferJavaTimeOverload") // performance sensitive
private void updateTimer(Timer timer) {
timer.update(ticker.read() - beforeNanos, TimeUnit.NANOSECONDS);
private void updateTimer(Supplier<Timer> timer) {
timer.get().update(ticker.read() - beforeNanos, TimeUnit.NANOSECONDS);
}
});
}
Expand Down

0 comments on commit da25cf6

Please sign in to comment.