Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSSDK-10095] fix events dropped on staled connections. #545

Merged
merged 6 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ public class ODPEventManagerTest {
@Captor
ArgumentCaptor<String> payloadCaptor;

@Before
public void setup() {
mockApiManager = mock(ODPApiManager.class);
}

@Test
public void logAndDiscardEventWhenEventManagerIsNotRunning() {
ODPConfig odpConfig = new ODPConfig("key", "host", null);
Expand Down
10 changes: 5 additions & 5 deletions core-httpclient-impl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,23 @@ The number of workers determines the number of threads the thread pool uses.
The following builder methods can be used to custom configure the `AsyncEventHandler`.

|Method Name|Default Value|Description|
|---|---|---|
|---|---|-----------------------------------------------|
|`withQueueCapacity(int)`|10000|Queue size for pending logEvents|
|`withNumWorkers(int)`|2|Number of worker threads|
|`withMaxTotalConnections(int)`|200|Maximum number of connections|
|`withMaxPerRoute(int)`|20|Maximum number of connections per route|
|`withValidateAfterInactivity(int)`|5000|Time to maintain idol connections (in milliseconds)|
|`withValidateAfterInactivity(int)`|1000|Time to maintain idle connections (in milliseconds)|

### Advanced configuration
The following properties can be set to override the default configuration.

|Property Name|Default Value|Description|
|---|---|---|
|---|---|-----------------------------------------------|
|**async.event.handler.queue.capacity**|10000|Queue size for pending logEvents|
|**async.event.handler.num.workers**|2|Number of worker threads|
|**async.event.handler.max.connections**|200|Maximum number of connections|
|**async.event.handler.event.max.per.route**|20|Maximum number of connections per route|
|**async.event.handler.validate.after**|5000|Time to maintain idol connections (in milliseconds)|
|**async.event.handler.validate.after**|1000|Time to maintain idle connections (in milliseconds)|

## HttpProjectConfigManager

Expand Down Expand Up @@ -243,4 +243,4 @@ Optimizely optimizely = OptimizelyFactory.newDefaultInstance();
to enable request batching to the Optimizely logging endpoint. By default, a maximum of 10 events are included in each batch
for a maximum interval of 30 seconds. These parameters are configurable via systems properties or through the
`OptimizelyFactory#setMaxEventBatchSize` and `OptimizelyFactory#setMaxEventBatchInterval` methods.


3 changes: 1 addition & 2 deletions core-httpclient-impl/build.gradle
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
dependencies {
compile project(':core-api')

compileOnly group: 'com.google.code.gson', name: 'gson', version: gsonVersion

compile group: 'org.apache.httpcomponents', name: 'httpclient', version: httpClientVersion
testCompile 'org.mock-server:mockserver-netty:5.1.1'
}

task exhaustiveTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ public final class HttpClientUtils {
public static final int CONNECTION_TIMEOUT_MS = 10000;
public static final int CONNECTION_REQUEST_TIMEOUT_MS = 5000;
public static final int SOCKET_TIMEOUT_MS = 10000;

public static final int DEFAULT_VALIDATE_AFTER_INACTIVITY = 1000;
public static final int DEFAULT_MAX_CONNECTIONS = 200;
public static final int DEFAULT_MAX_PER_ROUTE = 20;
private static RequestConfig requestConfigWithTimeout;

private HttpClientUtils() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
package com.optimizely.ab;

import com.optimizely.ab.annotations.VisibleForTesting;
import com.optimizely.ab.HttpClientUtils;

import org.apache.http.client.HttpClient;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
Expand Down Expand Up @@ -73,16 +77,20 @@ public static class Builder {
// The following static values are public so that they can be tweaked if necessary.
// These are the recommended settings for http protocol. https://hc.apache.org/httpcomponents-client-ga/tutorial/html/connmgmt.html
// The maximum number of connections allowed across all routes.
private int maxTotalConnections = 200;
int maxTotalConnections = HttpClientUtils.DEFAULT_MAX_CONNECTIONS;
// The maximum number of connections allowed for a route
private int maxPerRoute = 20;
int maxPerRoute = HttpClientUtils.DEFAULT_MAX_PER_ROUTE;
// Defines period of inactivity in milliseconds after which persistent connections must be re-validated prior to being leased to the consumer.
private int validateAfterInactivity = 5000;
// If this is too long, it's expected to see more requests dropped on staled connections (dropped by the server or networks).
// We can configure retries (POST for AsyncEventDispatcher) to cover the staled connections.
int validateAfterInactivity = HttpClientUtils.DEFAULT_VALIDATE_AFTER_INACTIVITY;
// force-close the connection after this idle time (with 0, eviction is disabled by default)
long evictConnectionIdleTimePeriod = 0;
HttpRequestRetryHandler customRetryHandler = null;
TimeUnit evictConnectionIdleTimeUnit = TimeUnit.MILLISECONDS;
private int timeoutMillis = HttpClientUtils.CONNECTION_TIMEOUT_MS;


private Builder() {

}
Expand All @@ -107,6 +115,12 @@ public Builder withEvictIdleConnections(long maxIdleTime, TimeUnit maxIdleTimeUn
this.evictConnectionIdleTimeUnit = maxIdleTimeUnit;
return this;
}

// customize retryHandler (DefaultHttpRequestRetryHandler will be used by default)
public Builder withRetryHandler(HttpRequestRetryHandler retryHandler) {
this.customRetryHandler = retryHandler;
return this;
}

public Builder setTimeoutMillis(int timeoutMillis) {
this.timeoutMillis = timeoutMillis;
Expand All @@ -124,6 +138,9 @@ public OptimizelyHttpClient build() {
.setConnectionManager(poolingHttpClientConnectionManager)
.disableCookieManagement()
.useSystemProperties();
if (customRetryHandler != null) {
builder.setRetryHandler(customRetryHandler);
}

logger.debug("Creating HttpClient with timeout: " + timeoutMillis);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.optimizely.ab.event;

import com.optimizely.ab.HttpClientUtils;
import com.optimizely.ab.NamedThreadFactory;
import com.optimizely.ab.OptimizelyHttpClient;
import com.optimizely.ab.annotations.VisibleForTesting;
Expand All @@ -31,6 +32,7 @@
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,7 +47,6 @@
import java.util.concurrent.TimeUnit;

import javax.annotation.CheckForNull;
import javax.annotation.Nullable;

/**
* {@link EventHandler} implementation that queues events and has a separate pool of threads responsible
Expand All @@ -61,9 +62,7 @@ public class AsyncEventHandler implements EventHandler, AutoCloseable {

public static final int DEFAULT_QUEUE_CAPACITY = 10000;
public static final int DEFAULT_NUM_WORKERS = 2;
public static final int DEFAULT_MAX_CONNECTIONS = 200;
public static final int DEFAULT_MAX_PER_ROUTE = 20;
public static final int DEFAULT_VALIDATE_AFTER_INACTIVITY = 5000;


private static final Logger logger = LoggerFactory.getLogger(AsyncEventHandler.class);
private static final ProjectConfigResponseHandler EVENT_RESPONSE_HANDLER = new ProjectConfigResponseHandler();
Expand Down Expand Up @@ -135,15 +134,17 @@ public AsyncEventHandler(int queueCapacity,
if (httpClient != null) {
this.httpClient = httpClient;
} else {
maxConnections = validateInput("maxConnections", maxConnections, DEFAULT_MAX_CONNECTIONS);
connectionsPerRoute = validateInput("connectionsPerRoute", connectionsPerRoute, DEFAULT_MAX_PER_ROUTE);
validateAfter = validateInput("validateAfter", validateAfter, DEFAULT_VALIDATE_AFTER_INACTIVITY);
maxConnections = validateInput("maxConnections", maxConnections, HttpClientUtils.DEFAULT_MAX_CONNECTIONS);
connectionsPerRoute = validateInput("connectionsPerRoute", connectionsPerRoute, HttpClientUtils.DEFAULT_MAX_PER_ROUTE);
validateAfter = validateInput("validateAfter", validateAfter, HttpClientUtils.DEFAULT_VALIDATE_AFTER_INACTIVITY);
this.httpClient = OptimizelyHttpClient.builder()
.withMaxTotalConnections(maxConnections)
.withMaxPerRoute(connectionsPerRoute)
.withValidateAfterInactivity(validateAfter)
// infrequent event discards observed. staled connections force-closed after a long idle time.
.withEvictIdleConnections(1L, TimeUnit.MINUTES)
// enable retry on event POST (default: retry on GET only)
.withRetryHandler(new DefaultHttpRequestRetryHandler(3, true))
.build();
}

Expand Down Expand Up @@ -310,9 +311,9 @@ public static class Builder {

int queueCapacity = PropertyUtils.getInteger(CONFIG_QUEUE_CAPACITY, DEFAULT_QUEUE_CAPACITY);
int numWorkers = PropertyUtils.getInteger(CONFIG_NUM_WORKERS, DEFAULT_NUM_WORKERS);
int maxTotalConnections = PropertyUtils.getInteger(CONFIG_MAX_CONNECTIONS, DEFAULT_MAX_CONNECTIONS);
int maxPerRoute = PropertyUtils.getInteger(CONFIG_MAX_PER_ROUTE, DEFAULT_MAX_PER_ROUTE);
int validateAfterInactivity = PropertyUtils.getInteger(CONFIG_VALIDATE_AFTER_INACTIVITY, DEFAULT_VALIDATE_AFTER_INACTIVITY);
int maxTotalConnections = PropertyUtils.getInteger(CONFIG_MAX_CONNECTIONS, HttpClientUtils.DEFAULT_MAX_CONNECTIONS);
int maxPerRoute = PropertyUtils.getInteger(CONFIG_MAX_PER_ROUTE, HttpClientUtils.DEFAULT_MAX_PER_ROUTE);
int validateAfterInactivity = PropertyUtils.getInteger(CONFIG_VALIDATE_AFTER_INACTIVITY, HttpClientUtils.DEFAULT_VALIDATE_AFTER_INACTIVITY);
private long closeTimeout = Long.MAX_VALUE;
private TimeUnit closeTimeoutUnit = TimeUnit.MILLISECONDS;
private OptimizelyHttpClient httpClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,39 @@
*/
package com.optimizely.ab;

import org.apache.http.HttpException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.methods.RequestBuilder;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.impl.client.CloseableHttpClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
import org.apache.http.protocol.HttpContext;
import org.junit.*;
import org.mockserver.integration.ClientAndServer;
import org.mockserver.model.ConnectionOptions;
import org.mockserver.model.HttpError;
import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpResponse;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static com.optimizely.ab.OptimizelyHttpClient.builder;
import static java.util.concurrent.TimeUnit.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
import static org.mockserver.model.HttpForward.forward;
import static org.mockserver.model.HttpRequest.request;
import static org.mockserver.model.HttpResponse.*;
import static org.mockserver.model.HttpResponse.response;
import static org.mockserver.verify.VerificationTimes.exactly;

public class OptimizelyHttpClientTest {

@Before
public void setUp() {
System.setProperty("https.proxyHost", "localhost");
Expand All @@ -51,7 +63,13 @@ public void tearDown() {

@Test
public void testDefaultConfiguration() {
OptimizelyHttpClient optimizelyHttpClient = builder().build();
OptimizelyHttpClient.Builder builder = builder();
assertEquals(builder.validateAfterInactivity, 1000);
assertEquals(builder.maxTotalConnections, 200);
assertEquals(builder.maxPerRoute, 20);
assertNull(builder.customRetryHandler);

OptimizelyHttpClient optimizelyHttpClient = builder.build();
assertTrue(optimizelyHttpClient.getHttpClient() instanceof CloseableHttpClient);
}

Expand Down Expand Up @@ -101,4 +119,74 @@ public void testExecute() throws IOException {
OptimizelyHttpClient optimizelyHttpClient = new OptimizelyHttpClient(mockHttpClient);
assertTrue(optimizelyHttpClient.execute(httpUriRequest, responseHandler));
}

@Test
public void testRetriesWithCustomRetryHandler() throws IOException {

// [NOTE] Request retries are all handled inside HttpClient. Not easy for unit test.
// - "DefaultHttpRetryHandler" in HttpClient retries only with special types of Exceptions
// like "NoHttpResponseException", etc.
// Other exceptions (SocketTimeout, ProtocolException, etc.) all ignored.
// - Not easy to force the specific exception type in the low-level.
// - This test just validates custom retry handler injected ok by validating the number of retries.

class CustomRetryHandler implements HttpRequestRetryHandler {
private final int maxRetries;

public CustomRetryHandler(int maxRetries) {
this.maxRetries = maxRetries;
}

@Override
public boolean retryRequest(IOException exception, int executionCount, HttpContext context) {
// override to retry for any type of exceptions
return executionCount < maxRetries;
}
}

int port = 9999;
ClientAndServer mockServer;
int retryCount;

// default httpclient (retries enabled by default, but no retry for timeout connection)

mockServer = ClientAndServer.startClientAndServer(port);
mockServer
.when(request().withMethod("GET").withPath("/"))
.error(HttpError.error());

OptimizelyHttpClient clientDefault = OptimizelyHttpClient.builder()
.setTimeoutMillis(100)
.build();

try {
clientDefault.execute(new HttpGet("http://localhost:" + port));
fail();
} catch (Exception e) {
retryCount = mockServer.retrieveRecordedRequests(request()).length;
assertEquals(1, retryCount);
}
mockServer.stop();

// httpclient with custom retry handler (5 times retries for any request)

mockServer = ClientAndServer.startClientAndServer(port);
mockServer
.when(request().withMethod("GET").withPath("/"))
.error(HttpError.error());

OptimizelyHttpClient clientWithRetries = OptimizelyHttpClient.builder()
.withRetryHandler(new CustomRetryHandler(5))
.setTimeoutMillis(100)
.build();

try {
clientWithRetries.execute(new HttpGet("http://localhost:" + port));
fail();
} catch (Exception e) {
retryCount = mockServer.retrieveRecordedRequests(request()).length;
assertEquals(5, retryCount);
}
mockServer.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public void testBuilderWithCustomHttpClient() {

AsyncEventHandler eventHandler = builder()
.withOptimizelyHttpClient(customHttpClient)
// these params will be ignored when customHttpClient is injected
.withMaxTotalConnections(1)
.withMaxPerRoute(2)
.withCloseTimeout(10, TimeUnit.SECONDS)
Expand All @@ -134,6 +135,17 @@ public void testBuilderWithCustomHttpClient() {

@Test
public void testBuilderWithDefaultHttpClient() {
AsyncEventHandler.Builder builder = builder();
assertEquals(builder.validateAfterInactivity, 1000);
assertEquals(builder.maxTotalConnections, 200);
assertEquals(builder.maxPerRoute, 20);

AsyncEventHandler eventHandler = builder.build();
assert(eventHandler.httpClient != null);
}

@Test
public void testBuilderWithDefaultHttpClientAndCustomParams() {
AsyncEventHandler eventHandler = builder()
.withMaxTotalConnections(3)
.withMaxPerRoute(4)
Expand Down
Loading