Skip to content

Commit

Permalink
[FSSDK-10095] fix events dropped on staled connections. (#545)
Browse files Browse the repository at this point in the history
Events can be discarded for staled connections with httpclient connection pooling.
This PR fixs it with -

- reduce the time for connection validation from 5 to 1sec.
- enable retries (x3) for event POST.
  • Loading branch information
jaeopt authored May 8, 2024
1 parent 8fdbfbf commit 71f9e75
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ public class ODPEventManagerTest {
@Captor
ArgumentCaptor<String> payloadCaptor;

@Before
public void setup() {
mockApiManager = mock(ODPApiManager.class);
}

@Test
public void logAndDiscardEventWhenEventManagerIsNotRunning() {
ODPConfig odpConfig = new ODPConfig("key", "host", null);
Expand Down
10 changes: 5 additions & 5 deletions core-httpclient-impl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,23 @@ The number of workers determines the number of threads the thread pool uses.
The following builder methods can be used to custom configure the `AsyncEventHandler`.

|Method Name|Default Value|Description|
|---|---|---|
|---|---|-----------------------------------------------|
|`withQueueCapacity(int)`|10000|Queue size for pending logEvents|
|`withNumWorkers(int)`|2|Number of worker threads|
|`withMaxTotalConnections(int)`|200|Maximum number of connections|
|`withMaxPerRoute(int)`|20|Maximum number of connections per route|
|`withValidateAfterInactivity(int)`|5000|Time to maintain idol connections (in milliseconds)|
|`withValidateAfterInactivity(int)`|1000|Time to maintain idle connections (in milliseconds)|

### Advanced configuration
The following properties can be set to override the default configuration.

|Property Name|Default Value|Description|
|---|---|---|
|---|---|-----------------------------------------------|
|**async.event.handler.queue.capacity**|10000|Queue size for pending logEvents|
|**async.event.handler.num.workers**|2|Number of worker threads|
|**async.event.handler.max.connections**|200|Maximum number of connections|
|**async.event.handler.event.max.per.route**|20|Maximum number of connections per route|
|**async.event.handler.validate.after**|5000|Time to maintain idol connections (in milliseconds)|
|**async.event.handler.validate.after**|1000|Time to maintain idle connections (in milliseconds)|

## HttpProjectConfigManager

Expand Down Expand Up @@ -243,4 +243,4 @@ Optimizely optimizely = OptimizelyFactory.newDefaultInstance();
to enable request batching to the Optimizely logging endpoint. By default, a maximum of 10 events are included in each batch
for a maximum interval of 30 seconds. These parameters are configurable via systems properties or through the
`OptimizelyFactory#setMaxEventBatchSize` and `OptimizelyFactory#setMaxEventBatchInterval` methods.


3 changes: 1 addition & 2 deletions core-httpclient-impl/build.gradle
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
dependencies {
compile project(':core-api')

compileOnly group: 'com.google.code.gson', name: 'gson', version: gsonVersion

compile group: 'org.apache.httpcomponents', name: 'httpclient', version: httpClientVersion
testCompile 'org.mock-server:mockserver-netty:5.1.1'
}

task exhaustiveTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ public final class HttpClientUtils {
public static final int CONNECTION_TIMEOUT_MS = 10000;
public static final int CONNECTION_REQUEST_TIMEOUT_MS = 5000;
public static final int SOCKET_TIMEOUT_MS = 10000;

public static final int DEFAULT_VALIDATE_AFTER_INACTIVITY = 1000;
public static final int DEFAULT_MAX_CONNECTIONS = 200;
public static final int DEFAULT_MAX_PER_ROUTE = 20;
private static RequestConfig requestConfigWithTimeout;

private HttpClientUtils() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
package com.optimizely.ab;

import com.optimizely.ab.annotations.VisibleForTesting;
import com.optimizely.ab.HttpClientUtils;

import org.apache.http.client.HttpClient;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
Expand Down Expand Up @@ -73,16 +77,20 @@ public static class Builder {
// The following static values are public so that they can be tweaked if necessary.
// These are the recommended settings for http protocol. https://hc.apache.org/httpcomponents-client-ga/tutorial/html/connmgmt.html
// The maximum number of connections allowed across all routes.
private int maxTotalConnections = 200;
int maxTotalConnections = HttpClientUtils.DEFAULT_MAX_CONNECTIONS;
// The maximum number of connections allowed for a route
private int maxPerRoute = 20;
int maxPerRoute = HttpClientUtils.DEFAULT_MAX_PER_ROUTE;
// Defines period of inactivity in milliseconds after which persistent connections must be re-validated prior to being leased to the consumer.
private int validateAfterInactivity = 5000;
// If this is too long, it's expected to see more requests dropped on staled connections (dropped by the server or networks).
// We can configure retries (POST for AsyncEventDispatcher) to cover the staled connections.
int validateAfterInactivity = HttpClientUtils.DEFAULT_VALIDATE_AFTER_INACTIVITY;
// force-close the connection after this idle time (with 0, eviction is disabled by default)
long evictConnectionIdleTimePeriod = 0;
HttpRequestRetryHandler customRetryHandler = null;
TimeUnit evictConnectionIdleTimeUnit = TimeUnit.MILLISECONDS;
private int timeoutMillis = HttpClientUtils.CONNECTION_TIMEOUT_MS;


private Builder() {

}
Expand All @@ -107,6 +115,12 @@ public Builder withEvictIdleConnections(long maxIdleTime, TimeUnit maxIdleTimeUn
this.evictConnectionIdleTimeUnit = maxIdleTimeUnit;
return this;
}

// customize retryHandler (DefaultHttpRequestRetryHandler will be used by default)
public Builder withRetryHandler(HttpRequestRetryHandler retryHandler) {
this.customRetryHandler = retryHandler;
return this;
}

public Builder setTimeoutMillis(int timeoutMillis) {
this.timeoutMillis = timeoutMillis;
Expand All @@ -124,6 +138,9 @@ public OptimizelyHttpClient build() {
.setConnectionManager(poolingHttpClientConnectionManager)
.disableCookieManagement()
.useSystemProperties();
if (customRetryHandler != null) {
builder.setRetryHandler(customRetryHandler);
}

logger.debug("Creating HttpClient with timeout: " + timeoutMillis);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.optimizely.ab.event;

import com.optimizely.ab.HttpClientUtils;
import com.optimizely.ab.NamedThreadFactory;
import com.optimizely.ab.OptimizelyHttpClient;
import com.optimizely.ab.annotations.VisibleForTesting;
Expand All @@ -31,6 +32,7 @@
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,7 +47,6 @@
import java.util.concurrent.TimeUnit;

import javax.annotation.CheckForNull;
import javax.annotation.Nullable;

/**
* {@link EventHandler} implementation that queues events and has a separate pool of threads responsible
Expand All @@ -61,9 +62,7 @@ public class AsyncEventHandler implements EventHandler, AutoCloseable {

public static final int DEFAULT_QUEUE_CAPACITY = 10000;
public static final int DEFAULT_NUM_WORKERS = 2;
public static final int DEFAULT_MAX_CONNECTIONS = 200;
public static final int DEFAULT_MAX_PER_ROUTE = 20;
public static final int DEFAULT_VALIDATE_AFTER_INACTIVITY = 5000;


private static final Logger logger = LoggerFactory.getLogger(AsyncEventHandler.class);
private static final ProjectConfigResponseHandler EVENT_RESPONSE_HANDLER = new ProjectConfigResponseHandler();
Expand Down Expand Up @@ -135,15 +134,17 @@ public AsyncEventHandler(int queueCapacity,
if (httpClient != null) {
this.httpClient = httpClient;
} else {
maxConnections = validateInput("maxConnections", maxConnections, DEFAULT_MAX_CONNECTIONS);
connectionsPerRoute = validateInput("connectionsPerRoute", connectionsPerRoute, DEFAULT_MAX_PER_ROUTE);
validateAfter = validateInput("validateAfter", validateAfter, DEFAULT_VALIDATE_AFTER_INACTIVITY);
maxConnections = validateInput("maxConnections", maxConnections, HttpClientUtils.DEFAULT_MAX_CONNECTIONS);
connectionsPerRoute = validateInput("connectionsPerRoute", connectionsPerRoute, HttpClientUtils.DEFAULT_MAX_PER_ROUTE);
validateAfter = validateInput("validateAfter", validateAfter, HttpClientUtils.DEFAULT_VALIDATE_AFTER_INACTIVITY);
this.httpClient = OptimizelyHttpClient.builder()
.withMaxTotalConnections(maxConnections)
.withMaxPerRoute(connectionsPerRoute)
.withValidateAfterInactivity(validateAfter)
// infrequent event discards observed. staled connections force-closed after a long idle time.
.withEvictIdleConnections(1L, TimeUnit.MINUTES)
// enable retry on event POST (default: retry on GET only)
.withRetryHandler(new DefaultHttpRequestRetryHandler(3, true))
.build();
}

Expand Down Expand Up @@ -310,9 +311,9 @@ public static class Builder {

int queueCapacity = PropertyUtils.getInteger(CONFIG_QUEUE_CAPACITY, DEFAULT_QUEUE_CAPACITY);
int numWorkers = PropertyUtils.getInteger(CONFIG_NUM_WORKERS, DEFAULT_NUM_WORKERS);
int maxTotalConnections = PropertyUtils.getInteger(CONFIG_MAX_CONNECTIONS, DEFAULT_MAX_CONNECTIONS);
int maxPerRoute = PropertyUtils.getInteger(CONFIG_MAX_PER_ROUTE, DEFAULT_MAX_PER_ROUTE);
int validateAfterInactivity = PropertyUtils.getInteger(CONFIG_VALIDATE_AFTER_INACTIVITY, DEFAULT_VALIDATE_AFTER_INACTIVITY);
int maxTotalConnections = PropertyUtils.getInteger(CONFIG_MAX_CONNECTIONS, HttpClientUtils.DEFAULT_MAX_CONNECTIONS);
int maxPerRoute = PropertyUtils.getInteger(CONFIG_MAX_PER_ROUTE, HttpClientUtils.DEFAULT_MAX_PER_ROUTE);
int validateAfterInactivity = PropertyUtils.getInteger(CONFIG_VALIDATE_AFTER_INACTIVITY, HttpClientUtils.DEFAULT_VALIDATE_AFTER_INACTIVITY);
private long closeTimeout = Long.MAX_VALUE;
private TimeUnit closeTimeoutUnit = TimeUnit.MILLISECONDS;
private OptimizelyHttpClient httpClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,39 @@
*/
package com.optimizely.ab;

import org.apache.http.HttpException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.methods.RequestBuilder;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.impl.client.CloseableHttpClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
import org.apache.http.protocol.HttpContext;
import org.junit.*;
import org.mockserver.integration.ClientAndServer;
import org.mockserver.model.ConnectionOptions;
import org.mockserver.model.HttpError;
import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpResponse;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static com.optimizely.ab.OptimizelyHttpClient.builder;
import static java.util.concurrent.TimeUnit.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
import static org.mockserver.model.HttpForward.forward;
import static org.mockserver.model.HttpRequest.request;
import static org.mockserver.model.HttpResponse.*;
import static org.mockserver.model.HttpResponse.response;
import static org.mockserver.verify.VerificationTimes.exactly;

public class OptimizelyHttpClientTest {

@Before
public void setUp() {
System.setProperty("https.proxyHost", "localhost");
Expand All @@ -51,7 +63,13 @@ public void tearDown() {

@Test
public void testDefaultConfiguration() {
OptimizelyHttpClient optimizelyHttpClient = builder().build();
OptimizelyHttpClient.Builder builder = builder();
assertEquals(builder.validateAfterInactivity, 1000);
assertEquals(builder.maxTotalConnections, 200);
assertEquals(builder.maxPerRoute, 20);
assertNull(builder.customRetryHandler);

OptimizelyHttpClient optimizelyHttpClient = builder.build();
assertTrue(optimizelyHttpClient.getHttpClient() instanceof CloseableHttpClient);
}

Expand Down Expand Up @@ -101,4 +119,74 @@ public void testExecute() throws IOException {
OptimizelyHttpClient optimizelyHttpClient = new OptimizelyHttpClient(mockHttpClient);
assertTrue(optimizelyHttpClient.execute(httpUriRequest, responseHandler));
}

@Test
public void testRetriesWithCustomRetryHandler() throws IOException {

// [NOTE] Request retries are all handled inside HttpClient. Not easy for unit test.
// - "DefaultHttpRetryHandler" in HttpClient retries only with special types of Exceptions
// like "NoHttpResponseException", etc.
// Other exceptions (SocketTimeout, ProtocolException, etc.) all ignored.
// - Not easy to force the specific exception type in the low-level.
// - This test just validates custom retry handler injected ok by validating the number of retries.

class CustomRetryHandler implements HttpRequestRetryHandler {
private final int maxRetries;

public CustomRetryHandler(int maxRetries) {
this.maxRetries = maxRetries;
}

@Override
public boolean retryRequest(IOException exception, int executionCount, HttpContext context) {
// override to retry for any type of exceptions
return executionCount < maxRetries;
}
}

int port = 9999;
ClientAndServer mockServer;
int retryCount;

// default httpclient (retries enabled by default, but no retry for timeout connection)

mockServer = ClientAndServer.startClientAndServer(port);
mockServer
.when(request().withMethod("GET").withPath("/"))
.error(HttpError.error());

OptimizelyHttpClient clientDefault = OptimizelyHttpClient.builder()
.setTimeoutMillis(100)
.build();

try {
clientDefault.execute(new HttpGet("http://localhost:" + port));
fail();
} catch (Exception e) {
retryCount = mockServer.retrieveRecordedRequests(request()).length;
assertEquals(1, retryCount);
}
mockServer.stop();

// httpclient with custom retry handler (5 times retries for any request)

mockServer = ClientAndServer.startClientAndServer(port);
mockServer
.when(request().withMethod("GET").withPath("/"))
.error(HttpError.error());

OptimizelyHttpClient clientWithRetries = OptimizelyHttpClient.builder()
.withRetryHandler(new CustomRetryHandler(5))
.setTimeoutMillis(100)
.build();

try {
clientWithRetries.execute(new HttpGet("http://localhost:" + port));
fail();
} catch (Exception e) {
retryCount = mockServer.retrieveRecordedRequests(request()).length;
assertEquals(5, retryCount);
}
mockServer.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public void testBuilderWithCustomHttpClient() {

AsyncEventHandler eventHandler = builder()
.withOptimizelyHttpClient(customHttpClient)
// these params will be ignored when customHttpClient is injected
.withMaxTotalConnections(1)
.withMaxPerRoute(2)
.withCloseTimeout(10, TimeUnit.SECONDS)
Expand All @@ -134,6 +135,17 @@ public void testBuilderWithCustomHttpClient() {

@Test
public void testBuilderWithDefaultHttpClient() {
AsyncEventHandler.Builder builder = builder();
assertEquals(builder.validateAfterInactivity, 1000);
assertEquals(builder.maxTotalConnections, 200);
assertEquals(builder.maxPerRoute, 20);

AsyncEventHandler eventHandler = builder.build();
assert(eventHandler.httpClient != null);
}

@Test
public void testBuilderWithDefaultHttpClientAndCustomParams() {
AsyncEventHandler eventHandler = builder()
.withMaxTotalConnections(3)
.withMaxPerRoute(4)
Expand Down

0 comments on commit 71f9e75

Please sign in to comment.