diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java index e39d2a8dd..97c080ce8 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java @@ -174,13 +174,30 @@ public DialogueChannel build() { new StickyValidationChannel(NodeSelectionStrategyChannel.create(cf, channels)); Channel multiHostQueuedChannel = QueuedChannel.create(cf, nodeSelectionChannel); - Channel queuedChannel = new QueueOverrideChannel(multiHostQueuedChannel); + EndpointChannelFactory channelFactory = createEndpointChannelFactory(multiHostQueuedChannel, cf); + + Supplier stickyChannelSupplier = + StickyEndpointChannels2.create(cf, nodeSelectionChannel, channelFactory); + + Meter createMeter = DialogueClientMetrics.of(cf.clientConf().taggedMetricRegistry()) + .create() + .clientName(cf.channelName()) + .clientType("dialogue-channel-non-reloading") + .build(); + createMeter.mark(); + + return new DialogueChannel(cf, channelFactory, stickyChannelSupplier); + } - EndpointChannelFactory channelFactory = endpoint -> { - EndpointChannel channel = new EndpointChannelAdapter(endpoint, queuedChannel); + private static EndpointChannelFactory createEndpointChannelFactory(Channel multiHostQueuedChannel, Config cf) { + Channel queuedChannel = new QueueOverrideChannel(multiHostQueuedChannel); + return endpoint -> { + EndpointChannel endpointChannel = new EndpointChannelAdapter(endpoint, queuedChannel); + EndpointChannel channel = cf.clientConf() + .userAgent() + .map(userAgent -> UserAgentEndpointChannel.create(endpointChannel, endpoint, userAgent)) + .orElse(endpointChannel); channel = RetryingChannel.create(cf, channel, endpoint); - channel = UserAgentEndpointChannel.create( - channel, endpoint, cf.clientConf().userAgent().get()); channel = DeprecationWarningChannel.create(cf, channel, endpoint); channel = ContentDecodingChannel.create(cf, channel, endpoint); channel = new RangeAcceptsIdentityEncodingChannel(channel); @@ -195,18 +212,6 @@ public DialogueChannel build() { channel = new InterruptionChannel(channel); return new NeverThrowEndpointChannel(channel); // this must come last as a defensive backstop }; - - Supplier stickyChannelSupplier = - StickyEndpointChannels2.create(cf, nodeSelectionChannel, channelFactory); - - Meter createMeter = DialogueClientMetrics.of(cf.clientConf().taggedMetricRegistry()) - .create() - .clientName(cf.channelName()) - .clientType("dialogue-channel-non-reloading") - .build(); - createMeter.mark(); - - return new DialogueChannel(cf, channelFactory, stickyChannelSupplier); } /** diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/RetryingChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/RetryingChannel.java index 04de3f3d3..245ad6352 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/RetryingChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/RetryingChannel.java @@ -104,8 +104,8 @@ final class RetryingChannel implements EndpointChannel { private final ClientConfiguration.RetryOnTimeout retryOnTimeout; private final Duration backoffSlotSize; private final DoubleSupplier jitter; - private final Meter retryDueToServerError; - private final Meter retryDueToQosResponse; + private final Supplier retryDueToServerError; + private final Supplier retryDueToQosResponse; private final Function retryDueToThrowable; static EndpointChannel create(Config cf, EndpointChannel channel, Endpoint endpoint) { @@ -180,16 +180,16 @@ private RetryingChannel( this.scheduler = instrument(scheduler, metrics); this.jitter = jitter; DialogueClientMetrics dialogueClientMetrics = DialogueClientMetrics.of(metrics); - this.retryDueToServerError = dialogueClientMetrics + this.retryDueToServerError = Suppliers.memoize(() -> dialogueClientMetrics .requestRetry() .channelName(channelName) .reason("serverError") - .build(); - this.retryDueToQosResponse = dialogueClientMetrics + .build()); + this.retryDueToQosResponse = Suppliers.memoize(() -> dialogueClientMetrics .requestRetry() .channelName(channelName) .reason("qosResponse") - .build(); + .build()); this.retryDueToThrowable = throwable -> dialogueClientMetrics .requestRetry() .channelName(channelName) @@ -271,11 +271,11 @@ private ListenableFuture wrap(ListenableFuture input) { private ListenableFuture handleHttpResponse(Response response) { boolean canRetryRequest = requestCanBeRetried(); if (canRetryRequest && isRetryableQosStatus(response)) { - return incrementFailuresAndMaybeRetry(response, qosThrowable, retryDueToQosResponse); + return incrementFailuresAndMaybeRetry(response, qosThrowable, retryDueToQosResponse.get()); } if (canRetryRequest && Responses.isInternalServerError(response) && safeToRetry(endpoint.httpMethod())) { - return incrementFailuresAndMaybeRetry(response, serverErrorThrowable, retryDueToServerError); + return incrementFailuresAndMaybeRetry(response, serverErrorThrowable, retryDueToServerError.get()); } return Futures.immediateFuture(response); diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/TimingEndpointChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/TimingEndpointChannel.java index 668b98588..9384d76c7 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/TimingEndpointChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/TimingEndpointChannel.java @@ -18,6 +18,7 @@ import com.codahale.metrics.Timer; import com.github.benmanes.caffeine.cache.Ticker; +import com.google.common.base.Suppliers; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.RateLimiter; @@ -32,6 +33,7 @@ import com.palantir.tritium.metrics.registry.TaggedMetricRegistry; import java.io.IOException; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; final class TimingEndpointChannel implements EndpointChannel { @@ -39,8 +41,8 @@ final class TimingEndpointChannel implements EndpointChannel { private static final RateLimiter unknownThrowableLoggingRateLimiter = RateLimiter.create(1); private final EndpointChannel delegate; - private final Timer successTimer; - private final Timer failureTimer; + private final Supplier successTimer; + private final Supplier failureTimer; private final Ticker ticker; TimingEndpointChannel( @@ -52,18 +54,18 @@ final class TimingEndpointChannel implements EndpointChannel { this.delegate = delegate; this.ticker = ticker; ClientMetrics metrics = ClientMetrics.of(taggedMetrics); - this.successTimer = metrics.response() + this.successTimer = Suppliers.memoize(() -> metrics.response() .channelName(channelName) .serviceName(endpoint.serviceName()) .endpoint(endpoint.endpointName()) .status("success") - .build(); - this.failureTimer = metrics.response() + .build()); + this.failureTimer = Suppliers.memoize(() -> metrics.response() .channelName(channelName) .serviceName(endpoint.serviceName()) .endpoint(endpoint.endpointName()) .status("failure") - .build(); + .build()); } static EndpointChannel create(Config cf, EndpointChannel delegate, Endpoint endpoint) { @@ -76,7 +78,7 @@ public ListenableFuture execute(Request request) { long beforeNanos = ticker.read(); ListenableFuture response = delegate.execute(request); - return DialogueFutures.addDirectCallback(response, new FutureCallback() { + return DialogueFutures.addDirectCallback(response, new FutureCallback<>() { @Override @SuppressWarnings("PreferJavaTimeOverload") public void onSuccess(Response response) { @@ -102,8 +104,8 @@ public void onFailure(Throwable throwable) { } @SuppressWarnings("PreferJavaTimeOverload") // performance sensitive - private void updateTimer(Timer timer) { - timer.update(ticker.read() - beforeNanos, TimeUnit.NANOSECONDS); + private void updateTimer(Supplier timer) { + timer.get().update(ticker.read() - beforeNanos, TimeUnit.NANOSECONDS); } }); }