Skip to content

Commit

Permalink
Merge branch 'master' into streamingBetter
Browse files Browse the repository at this point in the history
  • Loading branch information
ohadbitt authored Nov 10, 2024
2 parents 4711073 + b7d2477 commit 304ff39
Show file tree
Hide file tree
Showing 12 changed files with 110 additions and 46 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ based on the size, format and compression of data. This will also allow users to
- [BREAKING] Redirects are disabled by default. Use ClientRequestProperties "client_max_redirect_count" option
to enable. Default changed to 0.
- [BREAKING] Added exception to signature of ResourceAlgorithms.postToQueueWithRetries.
- [BREAKING] Removed maxConnectionsPerRoute as it was not easily provided by azure-core.

## [5.2.0] - 2024-08-27
### Fixed
- Used Msal user prompt old code which is deprecated in the new version coming from last bom update resulted in method not found exception.
Expand Down
38 changes: 30 additions & 8 deletions data/src/main/java/com/microsoft/azure/kusto/data/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,17 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

public abstract class BaseClient implements Client, StreamingClient {

private static final int MAX_REDIRECT_COUNT = 1;
private static final int EXTRA_TIMEOUT_FOR_CLIENT_SIDE = (int) TimeUnit.SECONDS.toMillis(30);

// Make logger available to implementations
protected static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

Expand All @@ -28,10 +35,14 @@ public BaseClient(HttpClient httpClient) {
this.httpClient = httpClient;
}

protected String post(HttpRequest request) throws DataServiceException {
protected String post(HttpRequest request, long timeoutMs) throws DataServiceException {
// Execute and get the response
try (HttpResponse response = httpClient.sendSync(request, Context.NONE)) {
try (HttpResponse response = httpClient.sendSync(request, getContextTimeout(timeoutMs))) {
return processResponseBody(response);
} catch (DataServiceException e) {
throw e;
} catch (Exception e) {
throw ExceptionUtils.createExceptionOnPost(e, request.getUrl(), "sync");
}
}

Expand All @@ -42,6 +53,7 @@ private String processResponseBody(HttpResponse response) throws DataServiceExce
if (responseBody == null) {
return null;
}

switch (response.getStatusCode()) {
case HttpStatus.OK:
return responseBody;
Expand All @@ -53,14 +65,14 @@ private String processResponseBody(HttpResponse response) throws DataServiceExce
}

// Todo: Implement async version of this method
protected InputStream postToStreamingOutput(HttpRequest request, int currentRedirectCounter, int maxRedirectCount) throws DataServiceException {
protected InputStream postToStreamingOutput(HttpRequest request, long timeoutMs, int currentRedirectCounter, int maxRedirectCount)
throws DataServiceException {
boolean returnInputStream = false;
String errorFromResponse = null;

HttpResponse httpResponse = null;
try {

httpResponse = httpClient.sendSync(request, Context.NONE);
httpResponse = httpClient.sendSync(request, getContextTimeout(timeoutMs));

int responseStatusCode = httpResponse.getStatusCode();

Expand All @@ -73,19 +85,22 @@ protected InputStream postToStreamingOutput(HttpRequest request, int currentRedi
// Ideal to close here (as opposed to finally) so that if any data can't be flushed, the exception will be properly thrown and handled
httpResponse.close();

if (shouldPostToOriginalUrlDueToRedirect(currentRedirectCounter, responseStatusCode, maxRedirectCount)) {
if (shouldPostToOriginalUrlDueToRedirect(responseStatusCode, currentRedirectCounter, maxRedirectCount)) {
Optional<HttpHeader> redirectLocation = Optional.ofNullable(httpResponse.getHeaders().get(HttpHeaderName.LOCATION));
if (redirectLocation.isPresent() && !redirectLocation.get().getValue().equals(request.getUrl().toString())) {
HttpRequest redirectRequest = HttpRequestBuilder
.fromExistingRequest(request)
.withURL(redirectLocation.get().getValue())
.build();
return postToStreamingOutput(redirectRequest, currentRedirectCounter + 1, maxRedirectCount);
return postToStreamingOutput(redirectRequest, timeoutMs, currentRedirectCounter + 1, maxRedirectCount);
}
}
} catch (IOException ex) {
// Thrown from new CloseParentResourcesStream(httpResponse)
throw new DataServiceException(request.getUrl().toString(),
"postToStreamingOutput failed to get or decompress response stream", ex, false);
} catch (UncheckedIOException e) {
throw ExceptionUtils.createExceptionOnPost(e, request.getUrl(), "streaming sync");
} catch (Exception ex) {
throw createExceptionFromResponse(request.getUrl().toString(), httpResponse, ex, errorFromResponse);
} finally {
Expand Down Expand Up @@ -134,6 +149,13 @@ public static DataServiceException createExceptionFromResponse(String url, HttpR
isPermanent);
}

private static Context getContextTimeout(long timeoutMs) {
int requestTimeout = timeoutMs > Integer.MAX_VALUE ? Integer.MAX_VALUE : Math.toIntExact(timeoutMs) + EXTRA_TIMEOUT_FOR_CLIENT_SIDE;

// See https://github.com/Azure/azure-sdk-for-java/blob/azure-core-http-netty_1.10.2/sdk/core/azure-core-http-netty/CHANGELOG.md#features-added
return Context.NONE.addData("azure-response-timeout", Duration.ofMillis(requestTimeout));
}

private static void closeResourcesIfNeeded(boolean returnInputStream, HttpResponse httpResponse) {
// If we close the resources after returning the InputStream to the user, he won't be able to read from it - used in streaming query
if (!returnInputStream) {
Expand All @@ -143,7 +165,7 @@ private static void closeResourcesIfNeeded(boolean returnInputStream, HttpRespon
}
}

private static boolean shouldPostToOriginalUrlDueToRedirect(int redirectCount, int status, int maxRedirectCount) {
private static boolean shouldPostToOriginalUrlDueToRedirect(int status, int redirectCount, int maxRedirectCount) {
return (status == HttpStatus.FOUND || status == HttpStatus.TEMP_REDIRECT) && redirectCount + 1 <= maxRedirectCount;
}

Expand Down
43 changes: 37 additions & 6 deletions data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,23 @@
import com.microsoft.azure.kusto.data.instrumentation.TraceableAttributes;
import org.apache.commons.lang3.StringUtils;

import org.apache.http.ParseException;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.util.*;
import java.util.concurrent.TimeUnit;

class ClientImpl extends BaseClient {
public static final String MGMT_ENDPOINT_VERSION = "v1";
public static final String QUERY_ENDPOINT_VERSION = "v2";
public static final String STREAMING_VERSION = "v1";
private static final String DEFAULT_DATABASE_NAME = "NetDefaultDb";
private static final Long COMMAND_TIMEOUT_IN_MILLISECS = TimeUnit.MINUTES.toMillis(10);
private static final Long QUERY_TIMEOUT_IN_MILLISECS = TimeUnit.MINUTES.toMillis(4);
private static final Long STREAMING_INGEST_TIMEOUT_IN_MILLISECS = TimeUnit.MINUTES.toMillis(10);

private final TokenProviderBase aadAuthenticationHelper;

Expand Down Expand Up @@ -134,7 +139,6 @@ private KustoOperationResult processJsonResult(JsonResult res) throws DataServic
}

KustoRequestContext prepareRequest(@NotNull KustoRequest kr) throws DataServiceException, DataClientException {

// Validate and optimize the query object
kr.validateAndOptimize();

Expand Down Expand Up @@ -182,12 +186,12 @@ public String executeToJsonResult(String database, String command, ClientRequest
}

private String executeToJsonResult(KustoRequest kr) throws DataServiceException, DataClientException {

KustoRequestContext request = prepareRequest(kr);
long timeoutMs = determineTimeout(kr.getProperties(), kr.getCommandType(), clusterUrl);

// Get the response and trace the call
return MonitoredActivity.invoke(
(SupplierOneException<String, DataServiceException>) () -> post(request.getHttpRequest()),
(SupplierOneException<String, DataServiceException>) () -> post(request.getHttpRequest(), timeoutMs),
request.getSdkRequest().getCommandType().getActivityTypeSuffix().concat(".executeToJsonResult"));
}

Expand Down Expand Up @@ -236,6 +240,8 @@ private KustoOperationResult executeStreamingIngestImpl(String clusterEndpoint,
contentEncoding = "gzip";
}

long timeoutMs = determineTimeout(properties, CommandType.STREAMING_INGEST, clusterUrl);

// This was a separate method but was moved into the body of this method because it performs a side effect
if (properties != null) {
Iterator<Map.Entry<String, Object>> iterator = properties.getOptions();
Expand Down Expand Up @@ -281,10 +287,9 @@ private KustoOperationResult executeStreamingIngestImpl(String clusterEndpoint,

// Get the response, and trace the call.
String response = MonitoredActivity.invoke(
(SupplierOneException<String, DataServiceException>) () -> post(request), "ClientImpl.executeStreamingIngest");
(SupplierOneException<String, DataServiceException>) () -> post(request, timeoutMs), "ClientImpl.executeStreamingIngest");

return new KustoOperationResult(response, "v1");

} catch (KustoServiceQueryError e) {
throw new DataClientException(clusterEndpoint, "Error converting json response to KustoOperationResult:" + e.getMessage(), e);
} catch (IOException e) {
Expand Down Expand Up @@ -354,13 +359,39 @@ private InputStream executeStreamingQuery(@NotNull KustoRequest kr) throws DataS
.withTracing(tracing)
.withAuthorization(authorization)
.build();
long timeoutMs = determineTimeout(kr.getProperties(), kr.getCommandType(), clusterUrl);

// Get the response and trace the call
return MonitoredActivity.invoke(
(SupplierOneException<InputStream, DataServiceException>) () -> postToStreamingOutput(request, 0, kr.getProperties().getRedirectCount()),
(SupplierOneException<InputStream, DataServiceException>) () -> postToStreamingOutput(request, timeoutMs, 0,
kr.getProperties().getRedirectCount()),
"ClientImpl.executeStreamingQuery", updateAndGetExecuteTracingAttributes(kr.getDatabase(), kr.getProperties()));
}

private long determineTimeout(ClientRequestProperties properties, CommandType commandType, String clusterUrl) throws DataClientException {
Long timeoutMs;
try {
timeoutMs = properties == null ? null : properties.getTimeoutInMilliSec();
} catch (ParseException e) {
throw new DataClientException(clusterUrl, "Failed to parse timeout from ClientRequestProperties");
}

if (timeoutMs == null) {
switch (commandType) {
case ADMIN_COMMAND:
timeoutMs = COMMAND_TIMEOUT_IN_MILLISECS;
break;
case STREAMING_INGEST:
timeoutMs = STREAMING_INGEST_TIMEOUT_IN_MILLISECS;
break;
default:
timeoutMs = QUERY_TIMEOUT_IN_MILLISECS;
}
}

return timeoutMs;
}

private String getAuthorizationHeaderValue() throws DataServiceException, DataClientException {
if (aadAuthenticationHelper != null) {
return String.format("Bearer %s", aadAuthenticationHelper.acquireAccessToken());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.microsoft.azure.kusto.data.exceptions;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URL;

public class ExceptionUtils {
public static DataServiceException createExceptionOnPost(Exception e, URL url, String kind) {
boolean permanent = false;
boolean isIO = false;
if (e instanceof IOException) {
isIO = true;
}

if (e instanceof UncheckedIOException) {
e = ((UncheckedIOException) e).getCause();
isIO = true;
}

if (isIO) {
return new IODataServiceException(url.toString(), (IOException) e, kind);

}

return new DataServiceException(url.toString(), String.format("Exception in %s post request: %s", kind, e.getMessage()), permanent);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.microsoft.azure.kusto.data.exceptions;

import com.microsoft.azure.kusto.data.Utils;

import java.io.IOException;

public class IODataServiceException extends DataServiceException {
public IODataServiceException(String ingestionSource, IOException e, String kind) {
super(ingestionSource, String.format("IOException in %s post request: %s", kind, e.getMessage()),
e,
!Utils.isRetriableIOException(e));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public static HttpClient create(HttpClientProperties properties) {

// If all properties are null, create with default client options
if (properties == null) {
options.setResponseTimeout(Duration.ofMinutes(10));
return HttpClient.createDefault(options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ public class HttpClientProperties {
private final boolean keepAlive;
private final Integer maxKeepAliveTime;
private final Integer maxConnectionTotal;
private final Integer maxConnectionRoute;
private final Class<? extends HttpClientProvider> provider;
private final ProxyOptions proxy;

Expand All @@ -22,7 +21,6 @@ private HttpClientProperties(HttpClientPropertiesBuilder builder) {
this.keepAlive = builder.keepAlive;
this.maxKeepAliveTime = builder.maxKeepAliveTime;
this.maxConnectionTotal = builder.maxConnectionsTotal;
this.maxConnectionRoute = builder.maxConnectionsPerRoute;
this.provider = builder.provider;
this.proxy = builder.proxy;
}
Expand Down Expand Up @@ -92,15 +90,6 @@ public Class<? extends HttpClientProvider> provider() {
return provider;
}

/**
* The maximum number of connections the client may keep open at the same time per route.
*
* @return the maximum number of connections per route
*/
public Integer maxConnectionRoute() {
return maxConnectionRoute;
}

/**
* The proxy to use when connecting to the remote server.
*
Expand All @@ -116,7 +105,6 @@ public static class HttpClientPropertiesBuilder {
private boolean keepAlive;
private Integer maxKeepAliveTime = 120;
private Integer maxConnectionsTotal = 40;
private Integer maxConnectionsPerRoute = 40;
private Duration timeout = Duration.ofMinutes(10);
private Class<? extends HttpClientProvider> provider = null;
private ProxyOptions proxy = null;
Expand Down Expand Up @@ -181,17 +169,6 @@ public HttpClientPropertiesBuilder maxConnectionsTotal(Integer maxConnectionsTot
return this;
}

/**
* Sets the maximum number of connections the client may keep open at the same time for the same route (endpoint).
*
* @param maxConnections the maximum number of connections per route
* @return the builder instance
*/
public HttpClientPropertiesBuilder maxConnectionsPerRoute(Integer maxConnections) {
this.maxConnectionsPerRoute = maxConnections;
return this;
}

/**
* Sets the HTTP Client Provider used by Azure Core when constructing HTTP Client instances.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ public static void setUp() {
.keepAlive(true)
.maxKeepAliveTime(120)
.maxIdleTime(60)
.maxConnectionsPerRoute(50)
.maxConnectionsTotal(50)
.build());
} catch (URISyntaxException ex) {
Expand Down Expand Up @@ -654,7 +653,7 @@ void testSameHttpClientInstance() throws DataClientException, DataServiceExcepti
clientImpl.executeQuery(DB_NAME, query, clientRequestProperties);
clientImpl.executeQuery(DB_NAME, query, clientRequestProperties);

try (HttpResponse httpResponse = Mockito.verify(httpClientSpy, atLeast(2)).sendSync(any(), eq(Context.NONE))) {
try (HttpResponse httpResponse = Mockito.verify(httpClientSpy, atLeast(2)).sendSync(any(), any())) {
}

}
Expand Down
1 change: 0 additions & 1 deletion samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCr
HttpClientProperties properties = HttpClientProperties.builder()
.keepAlive(true)
.maxKeepAliveTime(120)
.maxConnectionsPerRoute(40)
.maxConnectionsTotal(40)
.build();

Expand Down
1 change: 0 additions & 1 deletion samples/src/main/java/AdvancedQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public static void main(String[] args) {
HttpClientProperties properties = HttpClientProperties.builder()
.keepAlive(true)
.maxKeepAliveTime(120)
.maxConnectionsPerRoute(40)
.maxConnectionsTotal(40)
.build();

Expand Down
3 changes: 0 additions & 3 deletions samples/src/main/java/Jmeter/jmeterStressTest.jmx
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCr
HttpClientProperties properties = HttpClientProperties.builder()
.keepAlive(true)
.maxKeepAliveTime(120)
.maxConnectionsPerRoute(40)
.maxConnectionsTotal(40)
.build();

Expand Down Expand Up @@ -203,7 +202,6 @@ ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCr
HttpClientProperties properties = HttpClientProperties.builder()
.keepAlive(true)
.maxKeepAliveTime(120)
.maxConnectionsPerRoute(40)
.maxConnectionsTotal(40)
.build();
log.info(&quot;Start &quot; + testName);
Expand Down Expand Up @@ -371,7 +369,6 @@ Client cslClient = ClientFactory.createClient(engineCsb, (HttpClientProperties)
HttpClientProperties properties = HttpClientProperties.builder()
.keepAlive(true)
.maxKeepAliveTime(120)
.maxConnectionsPerRoute(40)
.maxConnectionsTotal(40)
.build();

Expand Down
1 change: 0 additions & 1 deletion samples/src/main/java/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public static void main(String[] args) {
HttpClientProperties properties = HttpClientProperties.builder()
.keepAlive(true)
.maxKeepAliveTime(120)
.maxConnectionsPerRoute(40)
.maxConnectionsTotal(40)
.build();

Expand Down

0 comments on commit 304ff39

Please sign in to comment.