Skip to content

Commit

Permalink
format + changelog
Browse files Browse the repository at this point in the history
  • Loading branch information
ohadbitt committed Oct 31, 2024
1 parent 2c151dd commit 446a19e
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Moved HTTP request building logic into a builder class.
- [BREAKING] Redirects are disabled by default. Use ClientRequestProperties "client_max_redirect_count" option
to enable. Default changed to 0.
- Removed maxConnectionsPerRoute as it was not easily provided by azure-core.

## [5.2.0] - 2024-08-27
### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ protected String post(HttpRequest request, long timeoutMs) throws DataServiceExc
// Execute and get the response
try (HttpResponse response = httpClient.sendSync(request, getContextTimeout(timeoutMs))) {
return processResponseBody(response);
} catch (DataServiceException e){
} catch (DataServiceException e) {
throw e;
} catch (Exception e) {
throw ExceptionUtils.createExceptionOnPost(e, request.getUrl(), "sync");
Expand Down Expand Up @@ -103,7 +103,7 @@ protected InputStream postToStreamingOutput(HttpRequest request, long timeoutMs,
// Thrown from new CloseParentResourcesStream(httpResponse)
throw new DataServiceException(request.getUrl().toString(),
"postToStreamingOutput failed to get or decompress response stream", ex, false);
} catch (UncheckedIOException e){
} catch (UncheckedIOException e) {
throw ExceptionUtils.createExceptionOnPost(e, request.getUrl(), "streaming sync");
} catch (Exception ex) {
throw createExceptionFromResponse(request.getUrl().toString(), httpResponse, ex, errorFromResponse);
Expand Down Expand Up @@ -153,14 +153,13 @@ public static DataServiceException createExceptionFromResponse(String url, HttpR
isPermanent);
}

private Context getContextTimeout(long timeoutMs){
private Context getContextTimeout(long timeoutMs) {
int requestTimeout = timeoutMs > Integer.MAX_VALUE ? Integer.MAX_VALUE : Math.toIntExact(timeoutMs) + CLIENT_SERVER_DELTA_IN_MILLISECS;
return Context.NONE.addData("azure-response-timeout", Duration.ofMillis(requestTimeout));

}

private static void closeResourcesIfNeeded(boolean returnInputStream, HttpResponse httpResponse) {
// If we close the resources after returning the InputStream to the user, he won't be able to read from it - used in streaming query
// If we close the resources after returning the InputStream to the user, he won't be able to read from it - used in streaming query
if (!returnInputStream) {
if (httpResponse != null) {
httpResponse.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private void validateEndpoint() throws DataServiceException, DataClientException

@Override
public KustoOperationResult executeStreamingIngest(String database, String table, InputStream stream, ClientRequestProperties properties,
String streamFormat, String mappingName, boolean leaveOpen)
String streamFormat, String mappingName, boolean leaveOpen)
throws DataServiceException, DataClientException {
if (stream == null) {
throw new IllegalArgumentException("The provided stream is null.");
Expand All @@ -210,7 +210,7 @@ public KustoOperationResult executeStreamingIngest(String database, String table

@Override
public KustoOperationResult executeStreamingIngestFromBlob(String database, String table, String blobUrl, ClientRequestProperties properties,
String dataFormat, String mappingName)
String dataFormat, String mappingName)
throws DataServiceException, DataClientException {
if (blobUrl == null) {
throw new IllegalArgumentException("The provided blobUrl is null.");
Expand All @@ -222,7 +222,7 @@ public KustoOperationResult executeStreamingIngestFromBlob(String database, Stri
}

private KustoOperationResult executeStreamingIngestImpl(String clusterEndpoint, InputStream stream, String blobUrl, ClientRequestProperties properties,
boolean leaveOpen) throws DataServiceException, DataClientException {
boolean leaveOpen) throws DataServiceException, DataClientException {
boolean isStreamSource = stream != null;

Map<String, String> headers = new HashMap<>();
Expand All @@ -233,7 +233,6 @@ private KustoOperationResult executeStreamingIngestImpl(String clusterEndpoint,
contentEncoding = "gzip";
}


Long timeoutMs = determineTimeout(properties, CommandType.STREAMING_INGEST, clusterUrl);

// This was a separate method but was moved into the body of this method because it performs a side effect
Expand Down Expand Up @@ -281,7 +280,7 @@ private KustoOperationResult executeStreamingIngestImpl(String clusterEndpoint,

// Get the response, and trace the call.
String response = MonitoredActivity.invoke(
(SupplierOneException<String, DataServiceException>) () -> post(request, timeoutMs ), "ClientImpl.executeStreamingIngest");
(SupplierOneException<String, DataServiceException>) () -> post(request, timeoutMs), "ClientImpl.executeStreamingIngest");

return new KustoOperationResult(response, "v1");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
public class ClientRequestProperties implements Serializable, TraceableAttributes {
public static final String OPTION_SERVER_TIMEOUT = "servertimeout";

// If set and positive, indicates the maximum number of HTTP redirects that the client will process. [Integer]
// If set and positive, indicates the maximum number of HTTP redirects that the client will process. [Integer]
public static final String OPTION_CLIENT_MAX_REDIRECT_COUNT = "client_max_redirect_count";
/*
* Matches valid Kusto Timespans: Optionally negative, optional number of days followed by a period, optionally up to 24 as hours followed by a colon,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ public static DataServiceException createExceptionOnPost(Exception e, URL url, S
boolean permanent = false;
String prefix = "";
if (e instanceof IOException) {
permanent = !Utils.isRetriableIOException((IOException) e);
permanent = !Utils.isRetriableIOException((IOException) e);
prefix = "IO";
}

if (e instanceof UncheckedIOException) {
e = ((UncheckedIOException) e).getCause();
permanent = !Utils.isRetriableIOException((IOException) e);
permanent = !Utils.isRetriableIOException((IOException) e);
prefix = "IO";
}

Expand Down

0 comments on commit 446a19e

Please sign in to comment.