Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New client missing features #393

Merged
merged 13 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Moved HTTP request building logic into a builder class.
- [BREAKING] Redirects are disabled by default. Use ClientRequestProperties "client_max_redirect_count" option
to enable. Default changed to 0.
- [BREAKING] Removed maxConnectionsPerRoute as it was not easily provided by azure-core.

## [5.2.0] - 2024-08-27
### Fixed
Expand Down
38 changes: 30 additions & 8 deletions data/src/main/java/com/microsoft/azure/kusto/data/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,17 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

public abstract class BaseClient implements Client, StreamingClient {

private static final int MAX_REDIRECT_COUNT = 1;
private static final int EXTRA_TIMEOUT_FOR_CLIENT_SIDE = (int) TimeUnit.SECONDS.toMillis(30);

// Make logger available to implementations
protected static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

Expand All @@ -28,10 +35,14 @@ public BaseClient(HttpClient httpClient) {
this.httpClient = httpClient;
}

protected String post(HttpRequest request) throws DataServiceException {
protected String post(HttpRequest request, long timeoutMs) throws DataServiceException {
// Execute and get the response
try (HttpResponse response = httpClient.sendSync(request, Context.NONE)) {
try (HttpResponse response = httpClient.sendSync(request, getContextTimeout(timeoutMs))) {
return processResponseBody(response);
} catch (DataServiceException e) {
throw e;
} catch (Exception e) {
throw ExceptionUtils.createExceptionOnPost(e, request.getUrl(), "sync");
}
}

Expand All @@ -42,6 +53,7 @@ private String processResponseBody(HttpResponse response) throws DataServiceExce
if (responseBody == null) {
return null;
}

switch (response.getStatusCode()) {
case HttpStatus.OK:
return responseBody;
Expand All @@ -53,14 +65,14 @@ private String processResponseBody(HttpResponse response) throws DataServiceExce
}

// Todo: Implement async version of this method
protected InputStream postToStreamingOutput(HttpRequest request, int currentRedirectCounter, int maxRedirectCount) throws DataServiceException {
protected InputStream postToStreamingOutput(HttpRequest request, long timeoutMs, int currentRedirectCounter, int maxRedirectCount)
throws DataServiceException {
boolean returnInputStream = false;
String errorFromResponse = null;

HttpResponse httpResponse = null;
try {

httpResponse = httpClient.sendSync(request, Context.NONE);
httpResponse = httpClient.sendSync(request, getContextTimeout(timeoutMs));

int responseStatusCode = httpResponse.getStatusCode();

Expand All @@ -73,19 +85,22 @@ protected InputStream postToStreamingOutput(HttpRequest request, int currentRedi
// Ideal to close here (as opposed to finally) so that if any data can't be flushed, the exception will be properly thrown and handled
httpResponse.close();

if (shouldPostToOriginalUrlDueToRedirect(currentRedirectCounter, responseStatusCode, maxRedirectCount)) {
if (shouldPostToOriginalUrlDueToRedirect(responseStatusCode, currentRedirectCounter, maxRedirectCount)) {
Optional<HttpHeader> redirectLocation = Optional.ofNullable(httpResponse.getHeaders().get(HttpHeaderName.LOCATION));
if (redirectLocation.isPresent() && !redirectLocation.get().getValue().equals(request.getUrl().toString())) {
HttpRequest redirectRequest = HttpRequestBuilder
.fromExistingRequest(request)
.withURL(redirectLocation.get().getValue())
.build();
return postToStreamingOutput(redirectRequest, currentRedirectCounter + 1, maxRedirectCount);
return postToStreamingOutput(redirectRequest, timeoutMs, currentRedirectCounter + 1, maxRedirectCount);
}
}
} catch (IOException ex) {
// Thrown from new CloseParentResourcesStream(httpResponse)
throw new DataServiceException(request.getUrl().toString(),
"postToStreamingOutput failed to get or decompress response stream", ex, false);
} catch (UncheckedIOException e) {
throw ExceptionUtils.createExceptionOnPost(e, request.getUrl(), "streaming sync");
} catch (Exception ex) {
throw createExceptionFromResponse(request.getUrl().toString(), httpResponse, ex, errorFromResponse);
} finally {
Expand Down Expand Up @@ -134,6 +149,13 @@ public static DataServiceException createExceptionFromResponse(String url, HttpR
isPermanent);
}

private static Context getContextTimeout(long timeoutMs) {
int requestTimeout = timeoutMs > Integer.MAX_VALUE ? Integer.MAX_VALUE : Math.toIntExact(timeoutMs) + EXTRA_TIMEOUT_FOR_CLIENT_SIDE;

// See https://github.com/Azure/azure-sdk-for-java/blob/azure-core-http-netty_1.10.2/sdk/core/azure-core-http-netty/CHANGELOG.md#features-added
return Context.NONE.addData("azure-response-timeout", Duration.ofMillis(requestTimeout));
ohadbitt marked this conversation as resolved.
Show resolved Hide resolved
}

private static void closeResourcesIfNeeded(boolean returnInputStream, HttpResponse httpResponse) {
// If we close the resources after returning the InputStream to the user, he won't be able to read from it - used in streaming query
if (!returnInputStream) {
Expand All @@ -143,7 +165,7 @@ private static void closeResourcesIfNeeded(boolean returnInputStream, HttpRespon
}
}

private static boolean shouldPostToOriginalUrlDueToRedirect(int redirectCount, int status, int maxRedirectCount) {
private static boolean shouldPostToOriginalUrlDueToRedirect(int status, int redirectCount, int maxRedirectCount) {
return (status == HttpStatus.FOUND || status == HttpStatus.TEMP_REDIRECT) && redirectCount + 1 <= maxRedirectCount;
}

Expand Down
43 changes: 37 additions & 6 deletions data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,23 @@
import com.microsoft.azure.kusto.data.res.JsonResult;
import org.apache.commons.lang3.StringUtils;

import org.apache.http.ParseException;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.util.*;
import java.util.concurrent.TimeUnit;

class ClientImpl extends BaseClient {
public static final String MGMT_ENDPOINT_VERSION = "v1";
public static final String QUERY_ENDPOINT_VERSION = "v2";
public static final String STREAMING_VERSION = "v1";
private static final String DEFAULT_DATABASE_NAME = "NetDefaultDb";
private static final Long COMMAND_TIMEOUT_IN_MILLISECS = TimeUnit.MINUTES.toMillis(10);
private static final Long QUERY_TIMEOUT_IN_MILLISECS = TimeUnit.MINUTES.toMillis(4);
private static final Long STREAMING_INGEST_TIMEOUT_IN_MILLISECS = TimeUnit.MINUTES.toMillis(10);

private final TokenProviderBase aadAuthenticationHelper;

Expand Down Expand Up @@ -126,7 +131,6 @@ private KustoOperationResult processJsonResult(JsonResult res) throws DataServic
}

KustoRequestContext prepareRequest(@NotNull KustoRequest kr) throws DataServiceException, DataClientException {

// Validate and optimize the query object
kr.validateAndOptimize();

Expand Down Expand Up @@ -174,12 +178,12 @@ public String executeToJsonResult(String database, String command, ClientRequest
}

private String executeToJsonResult(KustoRequest kr) throws DataServiceException, DataClientException {

KustoRequestContext request = prepareRequest(kr);
long timeoutMs = determineTimeout(kr.getProperties(), kr.getCommandType(), clusterUrl);

// Get the response and trace the call
return MonitoredActivity.invoke(
(SupplierOneException<String, DataServiceException>) () -> post(request.getHttpRequest()),
(SupplierOneException<String, DataServiceException>) () -> post(request.getHttpRequest(), timeoutMs),
request.getSdkRequest().getCommandType().getActivityTypeSuffix().concat(".executeToJsonResult"));
}

Expand Down Expand Up @@ -228,6 +232,8 @@ private KustoOperationResult executeStreamingIngestImpl(String clusterEndpoint,
contentEncoding = "gzip";
}

long timeoutMs = determineTimeout(properties, CommandType.STREAMING_INGEST, clusterUrl);

// This was a separate method but was moved into the body of this method because it performs a side effect
if (properties != null) {
Iterator<Map.Entry<String, Object>> iterator = properties.getOptions();
Expand Down Expand Up @@ -273,10 +279,9 @@ private KustoOperationResult executeStreamingIngestImpl(String clusterEndpoint,

// Get the response, and trace the call.
String response = MonitoredActivity.invoke(
(SupplierOneException<String, DataServiceException>) () -> post(request), "ClientImpl.executeStreamingIngest");
(SupplierOneException<String, DataServiceException>) () -> post(request, timeoutMs), "ClientImpl.executeStreamingIngest");

return new KustoOperationResult(response, "v1");

} catch (KustoServiceQueryError e) {
throw new DataClientException(clusterEndpoint, "Error converting json response to KustoOperationResult:" + e.getMessage(), e);
} catch (IOException e) {
Expand Down Expand Up @@ -346,13 +351,39 @@ private InputStream executeStreamingQuery(@NotNull KustoRequest kr) throws DataS
.withTracing(tracing)
.withAuthorization(authorization)
.build();
long timeoutMs = determineTimeout(kr.getProperties(), kr.getCommandType(), clusterUrl);

// Get the response and trace the call
return MonitoredActivity.invoke(
(SupplierOneException<InputStream, DataServiceException>) () -> postToStreamingOutput(request, 0, kr.getProperties().getRedirectCount()),
(SupplierOneException<InputStream, DataServiceException>) () -> postToStreamingOutput(request, timeoutMs, 0,
kr.getProperties().getRedirectCount()),
"ClientImpl.executeStreamingQuery", updateAndGetExecuteTracingAttributes(kr.getDatabase(), kr.getProperties()));
}

private long determineTimeout(ClientRequestProperties properties, CommandType commandType, String clusterUrl) throws DataClientException {
Long timeoutMs;
try {
timeoutMs = properties == null ? null : properties.getTimeoutInMilliSec();
} catch (ParseException e) {
throw new DataClientException(clusterUrl, "Failed to parse timeout from ClientRequestProperties");
}

if (timeoutMs == null) {
switch (commandType) {
case ADMIN_COMMAND:
timeoutMs = COMMAND_TIMEOUT_IN_MILLISECS;
break;
case STREAMING_INGEST:
timeoutMs = STREAMING_INGEST_TIMEOUT_IN_MILLISECS;
break;
default:
timeoutMs = QUERY_TIMEOUT_IN_MILLISECS;
}
}

return timeoutMs;
}

private String getAuthorizationHeaderValue() throws DataServiceException, DataClientException {
if (aadAuthenticationHelper != null) {
return String.format("Bearer %s", aadAuthenticationHelper.acquireAccessToken());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.microsoft.azure.kusto.data.exceptions;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URL;

public class ExceptionUtils {
public static DataServiceException createExceptionOnPost(Exception e, URL url, String kind) {
boolean permanent = false;
boolean isIO = false;
if (e instanceof IOException) {
ohadbitt marked this conversation as resolved.
Show resolved Hide resolved
isIO = true;
}

if (e instanceof UncheckedIOException) {
e = ((UncheckedIOException) e).getCause();
isIO = true;
}

if (isIO) {
return new IODataServiceException(url.toString(), (IOException) e, kind);

}

return new DataServiceException(url.toString(), String.format("Exception in %s post request: %s", kind, e.getMessage()), permanent);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.microsoft.azure.kusto.data.exceptions;

import com.microsoft.azure.kusto.data.Utils;

import java.io.IOException;

public class IODataServiceException extends DataServiceException {
public IODataServiceException(String ingestionSource, IOException e, String kind) {
super(ingestionSource, String.format("IOException in %s post request: %s", kind, e.getMessage()),
e,
!Utils.isRetriableIOException(e));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public static HttpClient create(HttpClientProperties properties) {

// If all properties are null, create with default client options
if (properties == null) {
options.setResponseTimeout(Duration.ofMinutes(10));
return HttpClient.createDefault(options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ public class HttpClientProperties {
private final boolean keepAlive;
private final Integer maxKeepAliveTime;
private final Integer maxConnectionTotal;
private final Integer maxConnectionRoute;
private final Class<? extends HttpClientProvider> provider;
private final ProxyOptions proxy;

Expand All @@ -22,7 +21,6 @@ private HttpClientProperties(HttpClientPropertiesBuilder builder) {
this.keepAlive = builder.keepAlive;
this.maxKeepAliveTime = builder.maxKeepAliveTime;
this.maxConnectionTotal = builder.maxConnectionsTotal;
this.maxConnectionRoute = builder.maxConnectionsPerRoute;
this.provider = builder.provider;
this.proxy = builder.proxy;
}
Expand Down Expand Up @@ -92,15 +90,6 @@ public Class<? extends HttpClientProvider> provider() {
return provider;
}

/**
* The maximum number of connections the client may keep open at the same time per route.
*
* @return the maximum number of connections per route
*/
public Integer maxConnectionRoute() {
return maxConnectionRoute;
}

/**
* The proxy to use when connecting to the remote server.
*
Expand All @@ -116,7 +105,6 @@ public static class HttpClientPropertiesBuilder {
private boolean keepAlive;
private Integer maxKeepAliveTime = 120;
private Integer maxConnectionsTotal = 40;
private Integer maxConnectionsPerRoute = 40;
private Duration timeout = Duration.ofMinutes(10);
private Class<? extends HttpClientProvider> provider = null;
private ProxyOptions proxy = null;
Expand Down Expand Up @@ -181,17 +169,6 @@ public HttpClientPropertiesBuilder maxConnectionsTotal(Integer maxConnectionsTot
return this;
}

/**
* Sets the maximum number of connections the client may keep open at the same time for the same route (endpoint).
*
* @param maxConnections the maximum number of connections per route
* @return the builder instance
*/
public HttpClientPropertiesBuilder maxConnectionsPerRoute(Integer maxConnections) {
this.maxConnectionsPerRoute = maxConnections;
return this;
}

/**
* Sets the HTTP Client Provider used by Azure Core when constructing HTTP Client instances.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ public static void setUp() {
.keepAlive(true)
.maxKeepAliveTime(120)
.maxIdleTime(60)
.maxConnectionsPerRoute(50)
.maxConnectionsTotal(50)
.build());
} catch (URISyntaxException ex) {
Expand Down Expand Up @@ -638,7 +637,7 @@ void testSameHttpClientInstance() throws DataClientException, DataServiceExcepti
clientImpl.executeQuery(DB_NAME, query, clientRequestProperties);
clientImpl.executeQuery(DB_NAME, query, clientRequestProperties);

try (HttpResponse httpResponse = Mockito.verify(httpClientSpy, atLeast(2)).sendSync(any(), eq(Context.NONE))) {
try (HttpResponse httpResponse = Mockito.verify(httpClientSpy, atLeast(2)).sendSync(any(), any())) {
}

}
Expand Down
1 change: 0 additions & 1 deletion samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCr
HttpClientProperties properties = HttpClientProperties.builder()
.keepAlive(true)
.maxKeepAliveTime(120)
.maxConnectionsPerRoute(40)
.maxConnectionsTotal(40)
.build();

Expand Down
1 change: 0 additions & 1 deletion samples/src/main/java/AdvancedQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public static void main(String[] args) {
HttpClientProperties properties = HttpClientProperties.builder()
.keepAlive(true)
.maxKeepAliveTime(120)
.maxConnectionsPerRoute(40)
.maxConnectionsTotal(40)
.build();

Expand Down
3 changes: 0 additions & 3 deletions samples/src/main/java/Jmeter/jmeterStressTest.jmx
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCr
HttpClientProperties properties = HttpClientProperties.builder()
.keepAlive(true)
.maxKeepAliveTime(120)
.maxConnectionsPerRoute(40)
.maxConnectionsTotal(40)
.build();

Expand Down Expand Up @@ -203,7 +202,6 @@ ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCr
HttpClientProperties properties = HttpClientProperties.builder()
.keepAlive(true)
.maxKeepAliveTime(120)
.maxConnectionsPerRoute(40)
.maxConnectionsTotal(40)
.build();
log.info(&quot;Start &quot; + testName);
Expand Down Expand Up @@ -371,7 +369,6 @@ Client cslClient = ClientFactory.createClient(engineCsb, (HttpClientProperties)
HttpClientProperties properties = HttpClientProperties.builder()
.keepAlive(true)
.maxKeepAliveTime(120)
.maxConnectionsPerRoute(40)
.maxConnectionsTotal(40)
.build();

Expand Down
1 change: 0 additions & 1 deletion samples/src/main/java/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public static void main(String[] args) {
HttpClientProperties properties = HttpClientProperties.builder()
.keepAlive(true)
.maxKeepAliveTime(120)
.maxConnectionsPerRoute(40)
.maxConnectionsTotal(40)
.build();

Expand Down
Loading