diff --git a/CHANGELOG.md b/CHANGELOG.md index 4bbab708..6394ad4b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Moved HTTP request building logic into a builder class. - [BREAKING] Redirects are disabled by default. Use ClientRequestProperties "client_max_redirect_count" option to enable. Default changed to 0. +- [BREAKING] Removed maxConnectionsPerRoute as it was not easily provided by azure-core. ## [5.2.0] - 2024-08-27 ### Fixed diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/BaseClient.java b/data/src/main/java/com/microsoft/azure/kusto/data/BaseClient.java index d47ba44d..2fd42a37 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/BaseClient.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/BaseClient.java @@ -15,10 +15,17 @@ import java.io.IOException; import java.io.InputStream; +import java.io.UncheckedIOException; import java.lang.invoke.MethodHandles; +import java.time.Duration; import java.util.Optional; +import java.util.concurrent.TimeUnit; public abstract class BaseClient implements Client, StreamingClient { + + private static final int MAX_REDIRECT_COUNT = 1; + private static final int EXTRA_TIMEOUT_FOR_CLIENT_SIDE = (int) TimeUnit.SECONDS.toMillis(30); + // Make logger available to implementations protected static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -28,10 +35,14 @@ public BaseClient(HttpClient httpClient) { this.httpClient = httpClient; } - protected String post(HttpRequest request) throws DataServiceException { + protected String post(HttpRequest request, long timeoutMs) throws DataServiceException { // Execute and get the response - try (HttpResponse response = httpClient.sendSync(request, Context.NONE)) { + try (HttpResponse response = httpClient.sendSync(request, getContextTimeout(timeoutMs))) { return processResponseBody(response); + } catch (DataServiceException e) { + throw e; + } catch (Exception e) { + throw ExceptionUtils.createExceptionOnPost(e, request.getUrl(), "sync"); } } @@ -42,6 +53,7 @@ private String processResponseBody(HttpResponse response) throws DataServiceExce if (responseBody == null) { return null; } + switch (response.getStatusCode()) { case HttpStatus.OK: return responseBody; @@ -53,14 +65,14 @@ private String processResponseBody(HttpResponse response) throws DataServiceExce } // Todo: Implement async version of this method - protected InputStream postToStreamingOutput(HttpRequest request, int currentRedirectCounter, int maxRedirectCount) throws DataServiceException { + protected InputStream postToStreamingOutput(HttpRequest request, long timeoutMs, int currentRedirectCounter, int maxRedirectCount) + throws DataServiceException { boolean returnInputStream = false; String errorFromResponse = null; HttpResponse httpResponse = null; try { - - httpResponse = httpClient.sendSync(request, Context.NONE); + httpResponse = httpClient.sendSync(request, getContextTimeout(timeoutMs)); int responseStatusCode = httpResponse.getStatusCode(); @@ -73,19 +85,22 @@ protected InputStream postToStreamingOutput(HttpRequest request, int currentRedi // Ideal to close here (as opposed to finally) so that if any data can't be flushed, the exception will be properly thrown and handled httpResponse.close(); - if (shouldPostToOriginalUrlDueToRedirect(currentRedirectCounter, responseStatusCode, maxRedirectCount)) { + if (shouldPostToOriginalUrlDueToRedirect(responseStatusCode, currentRedirectCounter, maxRedirectCount)) { Optional redirectLocation = Optional.ofNullable(httpResponse.getHeaders().get(HttpHeaderName.LOCATION)); if (redirectLocation.isPresent() && !redirectLocation.get().getValue().equals(request.getUrl().toString())) { HttpRequest redirectRequest = HttpRequestBuilder .fromExistingRequest(request) .withURL(redirectLocation.get().getValue()) .build(); - return postToStreamingOutput(redirectRequest, currentRedirectCounter + 1, maxRedirectCount); + return postToStreamingOutput(redirectRequest, timeoutMs, currentRedirectCounter + 1, maxRedirectCount); } } } catch (IOException ex) { + // Thrown from new CloseParentResourcesStream(httpResponse) throw new DataServiceException(request.getUrl().toString(), "postToStreamingOutput failed to get or decompress response stream", ex, false); + } catch (UncheckedIOException e) { + throw ExceptionUtils.createExceptionOnPost(e, request.getUrl(), "streaming sync"); } catch (Exception ex) { throw createExceptionFromResponse(request.getUrl().toString(), httpResponse, ex, errorFromResponse); } finally { @@ -134,6 +149,13 @@ public static DataServiceException createExceptionFromResponse(String url, HttpR isPermanent); } + private static Context getContextTimeout(long timeoutMs) { + int requestTimeout = timeoutMs > Integer.MAX_VALUE ? Integer.MAX_VALUE : Math.toIntExact(timeoutMs) + EXTRA_TIMEOUT_FOR_CLIENT_SIDE; + + // See https://github.com/Azure/azure-sdk-for-java/blob/azure-core-http-netty_1.10.2/sdk/core/azure-core-http-netty/CHANGELOG.md#features-added + return Context.NONE.addData("azure-response-timeout", Duration.ofMillis(requestTimeout)); + } + private static void closeResourcesIfNeeded(boolean returnInputStream, HttpResponse httpResponse) { // If we close the resources after returning the InputStream to the user, he won't be able to read from it - used in streaming query if (!returnInputStream) { @@ -143,7 +165,7 @@ private static void closeResourcesIfNeeded(boolean returnInputStream, HttpRespon } } - private static boolean shouldPostToOriginalUrlDueToRedirect(int redirectCount, int status, int maxRedirectCount) { + private static boolean shouldPostToOriginalUrlDueToRedirect(int status, int redirectCount, int maxRedirectCount) { return (status == HttpStatus.FOUND || status == HttpStatus.TEMP_REDIRECT) && redirectCount + 1 <= maxRedirectCount; } diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java b/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java index 523ac91e..320759fd 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java @@ -19,18 +19,23 @@ import com.microsoft.azure.kusto.data.res.JsonResult; import org.apache.commons.lang3.StringUtils; +import org.apache.http.ParseException; import org.jetbrains.annotations.NotNull; import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; import java.util.*; +import java.util.concurrent.TimeUnit; class ClientImpl extends BaseClient { public static final String MGMT_ENDPOINT_VERSION = "v1"; public static final String QUERY_ENDPOINT_VERSION = "v2"; public static final String STREAMING_VERSION = "v1"; private static final String DEFAULT_DATABASE_NAME = "NetDefaultDb"; + private static final Long COMMAND_TIMEOUT_IN_MILLISECS = TimeUnit.MINUTES.toMillis(10); + private static final Long QUERY_TIMEOUT_IN_MILLISECS = TimeUnit.MINUTES.toMillis(4); + private static final Long STREAMING_INGEST_TIMEOUT_IN_MILLISECS = TimeUnit.MINUTES.toMillis(10); private final TokenProviderBase aadAuthenticationHelper; @@ -126,7 +131,6 @@ private KustoOperationResult processJsonResult(JsonResult res) throws DataServic } KustoRequestContext prepareRequest(@NotNull KustoRequest kr) throws DataServiceException, DataClientException { - // Validate and optimize the query object kr.validateAndOptimize(); @@ -174,12 +178,12 @@ public String executeToJsonResult(String database, String command, ClientRequest } private String executeToJsonResult(KustoRequest kr) throws DataServiceException, DataClientException { - KustoRequestContext request = prepareRequest(kr); + long timeoutMs = determineTimeout(kr.getProperties(), kr.getCommandType(), clusterUrl); // Get the response and trace the call return MonitoredActivity.invoke( - (SupplierOneException) () -> post(request.getHttpRequest()), + (SupplierOneException) () -> post(request.getHttpRequest(), timeoutMs), request.getSdkRequest().getCommandType().getActivityTypeSuffix().concat(".executeToJsonResult")); } @@ -228,6 +232,8 @@ private KustoOperationResult executeStreamingIngestImpl(String clusterEndpoint, contentEncoding = "gzip"; } + long timeoutMs = determineTimeout(properties, CommandType.STREAMING_INGEST, clusterUrl); + // This was a separate method but was moved into the body of this method because it performs a side effect if (properties != null) { Iterator> iterator = properties.getOptions(); @@ -273,10 +279,9 @@ private KustoOperationResult executeStreamingIngestImpl(String clusterEndpoint, // Get the response, and trace the call. String response = MonitoredActivity.invoke( - (SupplierOneException) () -> post(request), "ClientImpl.executeStreamingIngest"); + (SupplierOneException) () -> post(request, timeoutMs), "ClientImpl.executeStreamingIngest"); return new KustoOperationResult(response, "v1"); - } catch (KustoServiceQueryError e) { throw new DataClientException(clusterEndpoint, "Error converting json response to KustoOperationResult:" + e.getMessage(), e); } catch (IOException e) { @@ -346,13 +351,39 @@ private InputStream executeStreamingQuery(@NotNull KustoRequest kr) throws DataS .withTracing(tracing) .withAuthorization(authorization) .build(); + long timeoutMs = determineTimeout(kr.getProperties(), kr.getCommandType(), clusterUrl); // Get the response and trace the call return MonitoredActivity.invoke( - (SupplierOneException) () -> postToStreamingOutput(request, 0, kr.getProperties().getRedirectCount()), + (SupplierOneException) () -> postToStreamingOutput(request, timeoutMs, 0, + kr.getProperties().getRedirectCount()), "ClientImpl.executeStreamingQuery", updateAndGetExecuteTracingAttributes(kr.getDatabase(), kr.getProperties())); } + private long determineTimeout(ClientRequestProperties properties, CommandType commandType, String clusterUrl) throws DataClientException { + Long timeoutMs; + try { + timeoutMs = properties == null ? null : properties.getTimeoutInMilliSec(); + } catch (ParseException e) { + throw new DataClientException(clusterUrl, "Failed to parse timeout from ClientRequestProperties"); + } + + if (timeoutMs == null) { + switch (commandType) { + case ADMIN_COMMAND: + timeoutMs = COMMAND_TIMEOUT_IN_MILLISECS; + break; + case STREAMING_INGEST: + timeoutMs = STREAMING_INGEST_TIMEOUT_IN_MILLISECS; + break; + default: + timeoutMs = QUERY_TIMEOUT_IN_MILLISECS; + } + } + + return timeoutMs; + } + private String getAuthorizationHeaderValue() throws DataServiceException, DataClientException { if (aadAuthenticationHelper != null) { return String.format("Bearer %s", aadAuthenticationHelper.acquireAccessToken()); diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/exceptions/ExceptionUtils.java b/data/src/main/java/com/microsoft/azure/kusto/data/exceptions/ExceptionUtils.java new file mode 100644 index 00000000..0f2099c4 --- /dev/null +++ b/data/src/main/java/com/microsoft/azure/kusto/data/exceptions/ExceptionUtils.java @@ -0,0 +1,27 @@ +package com.microsoft.azure.kusto.data.exceptions; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URL; + +public class ExceptionUtils { + public static DataServiceException createExceptionOnPost(Exception e, URL url, String kind) { + boolean permanent = false; + boolean isIO = false; + if (e instanceof IOException) { + isIO = true; + } + + if (e instanceof UncheckedIOException) { + e = ((UncheckedIOException) e).getCause(); + isIO = true; + } + + if (isIO) { + return new IODataServiceException(url.toString(), (IOException) e, kind); + + } + + return new DataServiceException(url.toString(), String.format("Exception in %s post request: %s", kind, e.getMessage()), permanent); + } +} diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/exceptions/IODataServiceException.java b/data/src/main/java/com/microsoft/azure/kusto/data/exceptions/IODataServiceException.java new file mode 100644 index 00000000..45c336c2 --- /dev/null +++ b/data/src/main/java/com/microsoft/azure/kusto/data/exceptions/IODataServiceException.java @@ -0,0 +1,13 @@ +package com.microsoft.azure.kusto.data.exceptions; + +import com.microsoft.azure.kusto.data.Utils; + +import java.io.IOException; + +public class IODataServiceException extends DataServiceException { + public IODataServiceException(String ingestionSource, IOException e, String kind) { + super(ingestionSource, String.format("IOException in %s post request: %s", kind, e.getMessage()), + e, + !Utils.isRetriableIOException(e)); + } +} diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/http/HttpClientFactory.java b/data/src/main/java/com/microsoft/azure/kusto/data/http/HttpClientFactory.java index 3dcfd904..17330016 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/http/HttpClientFactory.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/http/HttpClientFactory.java @@ -31,7 +31,6 @@ public static HttpClient create(HttpClientProperties properties) { // If all properties are null, create with default client options if (properties == null) { - options.setResponseTimeout(Duration.ofMinutes(10)); return HttpClient.createDefault(options); } diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/http/HttpClientProperties.java b/data/src/main/java/com/microsoft/azure/kusto/data/http/HttpClientProperties.java index 00d0cc8b..37e1f9e7 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/http/HttpClientProperties.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/http/HttpClientProperties.java @@ -13,7 +13,6 @@ public class HttpClientProperties { private final boolean keepAlive; private final Integer maxKeepAliveTime; private final Integer maxConnectionTotal; - private final Integer maxConnectionRoute; private final Class provider; private final ProxyOptions proxy; @@ -22,7 +21,6 @@ private HttpClientProperties(HttpClientPropertiesBuilder builder) { this.keepAlive = builder.keepAlive; this.maxKeepAliveTime = builder.maxKeepAliveTime; this.maxConnectionTotal = builder.maxConnectionsTotal; - this.maxConnectionRoute = builder.maxConnectionsPerRoute; this.provider = builder.provider; this.proxy = builder.proxy; } @@ -92,15 +90,6 @@ public Class provider() { return provider; } - /** - * The maximum number of connections the client may keep open at the same time per route. - * - * @return the maximum number of connections per route - */ - public Integer maxConnectionRoute() { - return maxConnectionRoute; - } - /** * The proxy to use when connecting to the remote server. * @@ -116,7 +105,6 @@ public static class HttpClientPropertiesBuilder { private boolean keepAlive; private Integer maxKeepAliveTime = 120; private Integer maxConnectionsTotal = 40; - private Integer maxConnectionsPerRoute = 40; private Duration timeout = Duration.ofMinutes(10); private Class provider = null; private ProxyOptions proxy = null; @@ -181,17 +169,6 @@ public HttpClientPropertiesBuilder maxConnectionsTotal(Integer maxConnectionsTot return this; } - /** - * Sets the maximum number of connections the client may keep open at the same time for the same route (endpoint). - * - * @param maxConnections the maximum number of connections per route - * @return the builder instance - */ - public HttpClientPropertiesBuilder maxConnectionsPerRoute(Integer maxConnections) { - this.maxConnectionsPerRoute = maxConnections; - return this; - } - /** * Sets the HTTP Client Provider used by Azure Core when constructing HTTP Client instances. * diff --git a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/E2ETest.java b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/E2ETest.java index df0877c5..a8bcbd22 100644 --- a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/E2ETest.java +++ b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/E2ETest.java @@ -111,7 +111,6 @@ public static void setUp() { .keepAlive(true) .maxKeepAliveTime(120) .maxIdleTime(60) - .maxConnectionsPerRoute(50) .maxConnectionsTotal(50) .build()); } catch (URISyntaxException ex) { @@ -638,7 +637,7 @@ void testSameHttpClientInstance() throws DataClientException, DataServiceExcepti clientImpl.executeQuery(DB_NAME, query, clientRequestProperties); clientImpl.executeQuery(DB_NAME, query, clientRequestProperties); - try (HttpResponse httpResponse = Mockito.verify(httpClientSpy, atLeast(2)).sendSync(any(), eq(Context.NONE))) { + try (HttpResponse httpResponse = Mockito.verify(httpClientSpy, atLeast(2)).sendSync(any(), any())) { } } diff --git a/samples/README.md b/samples/README.md index d34602b7..ac5e9439 100644 --- a/samples/README.md +++ b/samples/README.md @@ -45,7 +45,6 @@ ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCr HttpClientProperties properties = HttpClientProperties.builder() .keepAlive(true) .maxKeepAliveTime(120) - .maxConnectionsPerRoute(40) .maxConnectionsTotal(40) .build(); diff --git a/samples/src/main/java/AdvancedQuery.java b/samples/src/main/java/AdvancedQuery.java index 97299192..99918483 100644 --- a/samples/src/main/java/AdvancedQuery.java +++ b/samples/src/main/java/AdvancedQuery.java @@ -26,7 +26,6 @@ public static void main(String[] args) { HttpClientProperties properties = HttpClientProperties.builder() .keepAlive(true) .maxKeepAliveTime(120) - .maxConnectionsPerRoute(40) .maxConnectionsTotal(40) .build(); diff --git a/samples/src/main/java/Jmeter/jmeterStressTest.jmx b/samples/src/main/java/Jmeter/jmeterStressTest.jmx index fbc740a4..e9dcc173 100644 --- a/samples/src/main/java/Jmeter/jmeterStressTest.jmx +++ b/samples/src/main/java/Jmeter/jmeterStressTest.jmx @@ -87,7 +87,6 @@ ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCr HttpClientProperties properties = HttpClientProperties.builder() .keepAlive(true) .maxKeepAliveTime(120) - .maxConnectionsPerRoute(40) .maxConnectionsTotal(40) .build(); @@ -203,7 +202,6 @@ ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCr HttpClientProperties properties = HttpClientProperties.builder() .keepAlive(true) .maxKeepAliveTime(120) - .maxConnectionsPerRoute(40) .maxConnectionsTotal(40) .build(); log.info("Start " + testName); @@ -371,7 +369,6 @@ Client cslClient = ClientFactory.createClient(engineCsb, (HttpClientProperties) HttpClientProperties properties = HttpClientProperties.builder() .keepAlive(true) .maxKeepAliveTime(120) - .maxConnectionsPerRoute(40) .maxConnectionsTotal(40) .build(); diff --git a/samples/src/main/java/Query.java b/samples/src/main/java/Query.java index 0420314d..4f6769c7 100644 --- a/samples/src/main/java/Query.java +++ b/samples/src/main/java/Query.java @@ -25,7 +25,6 @@ public static void main(String[] args) { HttpClientProperties properties = HttpClientProperties.builder() .keepAlive(true) .maxKeepAliveTime(120) - .maxConnectionsPerRoute(40) .maxConnectionsTotal(40) .build();