Skip to content

Commit

Permalink
[rest] Add http conf and ExponentialHttpRetryInterceptor to handle re…
Browse files Browse the repository at this point in the history
…try In RESTCatalog (#4929)
  • Loading branch information
jerry-024 authored Jan 17, 2025
1 parent dbd129d commit 587fa28
Show file tree
Hide file tree
Showing 6 changed files with 377 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.rest;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet;
import org.apache.paimon.shade.guava30.com.google.common.net.HttpHeaders;

import okhttp3.Interceptor;
import okhttp3.Request;
import okhttp3.Response;

import javax.net.ssl.SSLException;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.net.UnknownHostException;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

/**
* Defines exponential HTTP request retry interceptor.
*
* <p>The following retrievable IOException
*
* <ul>
* <li>InterruptedIOException
* <li>UnknownHostException
* <li>ConnectException
* <li>NoRouteToHostException
* <li>SSLException
* </ul>
*
* <p>The following retrievable HTTP status codes are defined:
*
* <ul>
* <li>TOO_MANY_REQUESTS (429)
* <li>BAD_GATEWAY (502)
* <li>SERVICE_UNAVAILABLE (503)
* <li>GATEWAY_TIMEOUT (504)
* </ul>
*
* <p>The following retrievable HTTP method which is idempotent are defined:
*
* <ul>
* <li>GET
* <li>HEAD
* <li>PUT
* <li>DELETE
* <li>TRACE
* <li>OPTIONS
* </ul>
*/
public class ExponentialHttpRetryInterceptor implements Interceptor {

private final int maxRetries;
private final Set<Class<? extends IOException>> nonRetriableExceptions;
private final Set<Integer> retrievableCodes;
private final Set<String> retrievableMethods;

public ExponentialHttpRetryInterceptor(int maxRetries) {
this.maxRetries = maxRetries;
this.retrievableMethods =
ImmutableSet.of("GET", "HEAD", "PUT", "DELETE", "TRACE", "OPTIONS");
this.retrievableCodes = ImmutableSet.of(429, 502, 503, 504);
this.nonRetriableExceptions =
ImmutableSet.of(
InterruptedIOException.class,
UnknownHostException.class,
ConnectException.class,
NoRouteToHostException.class,
SSLException.class);
}

@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
Response response = null;

for (int retryCount = 1; ; retryCount++) {
try {
response = chain.proceed(request);
} catch (IOException e) {
if (needRetry(request.method(), e, retryCount)) {
wait(response, retryCount);
continue;
}
}
if (needRetry(response, retryCount)) {
if (response != null) {
response.close();
}
wait(response, retryCount);
} else {
return response;
}
}
}

public boolean needRetry(Response response, int execCount) {
if (execCount > maxRetries) {
return false;
}
return response == null
|| (!response.isSuccessful() && retrievableCodes.contains(response.code()));
}

public boolean needRetry(String method, IOException e, int execCount) {
if (execCount > maxRetries) {
return false;
}
if (!retrievableMethods.contains(method)) {
return false;
}
if (nonRetriableExceptions.contains(e.getClass())) {
return false;
} else {
for (Class<? extends IOException> rejectException : nonRetriableExceptions) {
if (rejectException.isInstance(e)) {
return false;
}
}
}
return true;
}

public long getRetryIntervalInMilliseconds(Response response, int execCount) {
// a server may send a 429 / 503 with a Retry-After header
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After
String retryAfterStrInSecond =
response == null ? null : response.header(HttpHeaders.RETRY_AFTER);
Long retryAfter = null;
if (retryAfterStrInSecond != null) {
try {
retryAfter = Long.parseLong(retryAfterStrInSecond) * 1000;
} catch (Throwable ignore) {
}

if (retryAfter != null && retryAfter > 0) {
return retryAfter;
}
}

int delayMillis = 1000 * (int) Math.min(Math.pow(2.0, (long) execCount - 1.0), 64.0);
int jitter = ThreadLocalRandom.current().nextInt(Math.max(1, (int) (delayMillis * 0.1)));

return delayMillis + jitter;
}

private void wait(Response response, int retryCount) throws InterruptedIOException {
try {
Thread.sleep(getRetryIntervalInMilliseconds(response, retryCount));
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException();
}
}
}
29 changes: 24 additions & 5 deletions paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;

import okhttp3.ConnectionPool;
import okhttp3.Dispatcher;
import okhttp3.Headers;
import okhttp3.MediaType;
Expand All @@ -40,6 +41,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

import static okhttp3.ConnectionSpec.CLEARTEXT;
import static okhttp3.ConnectionSpec.COMPATIBLE_TLS;
Expand All @@ -52,6 +54,7 @@ public class HttpClient implements RESTClient {

private static final String THREAD_NAME = "REST-CATALOG-HTTP-CLIENT-THREAD-POOL";
private static final MediaType MEDIA_TYPE = MediaType.parse("application/json");
private static final int CONNECTION_KEEP_ALIVE_DURATION_MS = 300_000;

private final OkHttpClient okHttpClient;
private final String uri;
Expand Down Expand Up @@ -191,14 +194,30 @@ private static OkHttpClient createHttpClient(HttpClientOptions httpClientOptions
BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();
ExecutorService executorService =
createCachedThreadPool(httpClientOptions.threadPoolSize(), THREAD_NAME, workQueue);

ConnectionPool connectionPool =
new ConnectionPool(
httpClientOptions.maxConnections(),
CONNECTION_KEEP_ALIVE_DURATION_MS,
TimeUnit.MILLISECONDS);
Dispatcher dispatcher = new Dispatcher(executorService);
// set max requests per host use max connections
dispatcher.setMaxRequestsPerHost(httpClientOptions.maxConnections());
OkHttpClient.Builder builder =
new OkHttpClient.Builder()
.dispatcher(new Dispatcher(executorService))
.dispatcher(dispatcher)
.retryOnConnectionFailure(true)
.connectionSpecs(Arrays.asList(MODERN_TLS, COMPATIBLE_TLS, CLEARTEXT));
httpClientOptions.connectTimeout().ifPresent(builder::connectTimeout);
httpClientOptions.readTimeout().ifPresent(builder::readTimeout);
.connectionPool(connectionPool)
.connectionSpecs(Arrays.asList(MODERN_TLS, COMPATIBLE_TLS, CLEARTEXT))
.addInterceptor(
new ExponentialHttpRetryInterceptor(
httpClientOptions.maxRetries()));
httpClientOptions
.connectTimeout()
.ifPresent(
timeoutDuration -> {
builder.connectTimeout(timeoutDuration);
builder.readTimeout(timeoutDuration);
});

return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,30 @@ public class HttpClientOptions {

private final String uri;
@Nullable private final Duration connectTimeout;
@Nullable private final Duration readTimeout;
private final int threadPoolSize;
private final int maxConnections;
private final int maxRetries;

public HttpClientOptions(
String uri,
@Nullable Duration connectTimeout,
@Nullable Duration readTimeout,
int threadPoolSize) {
int threadPoolSize,
int maxConnections,
int maxRetries) {
this.uri = uri;
this.connectTimeout = connectTimeout;
this.readTimeout = readTimeout;
this.threadPoolSize = threadPoolSize;
this.maxConnections = maxConnections;
this.maxRetries = maxRetries;
}

public static HttpClientOptions create(Options options) {
return new HttpClientOptions(
options.get(RESTCatalogOptions.URI),
options.get(RESTCatalogOptions.CONNECTION_TIMEOUT),
options.get(RESTCatalogOptions.READ_TIMEOUT),
options.get(RESTCatalogOptions.THREAD_POOL_SIZE));
options.get(RESTCatalogOptions.THREAD_POOL_SIZE),
options.get(RESTCatalogOptions.MAX_CONNECTIONS),
options.get(RESTCatalogOptions.MAX_RETIES));
}

public String uri() {
Expand All @@ -60,11 +64,15 @@ public Optional<Duration> connectTimeout() {
return Optional.ofNullable(connectTimeout);
}

public Optional<Duration> readTimeout() {
return Optional.ofNullable(readTimeout);
}

public int threadPoolSize() {
return threadPoolSize;
}

public int maxConnections() {
return maxConnections;
}

public int maxRetries() {
return Math.max(maxRetries, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,20 @@ public class RESTCatalogOptions {
public static final ConfigOption<Duration> CONNECTION_TIMEOUT =
ConfigOptions.key("rest.client.connection-timeout")
.durationType()
.noDefaultValue()
.defaultValue(Duration.ofSeconds(180))
.withDescription("REST Catalog http client connect timeout.");

public static final ConfigOption<Duration> READ_TIMEOUT =
ConfigOptions.key("rest.client.read-timeout")
.durationType()
.noDefaultValue()
.withDescription("REST Catalog http client read timeout.");
public static final ConfigOption<Integer> MAX_CONNECTIONS =
ConfigOptions.key("rest.client.max-connections")
.intType()
.defaultValue(100)
.withDescription("REST Catalog http client's max connections.");

public static final ConfigOption<Integer> MAX_RETIES =
ConfigOptions.key("rest.client.max-retries")
.intType()
.defaultValue(5)
.withDescription("REST Catalog http client's max retry times.");

public static final ConfigOption<Integer> THREAD_POOL_SIZE =
ConfigOptions.key("rest.client.num-threads")
Expand Down
Loading

0 comments on commit 587fa28

Please sign in to comment.