From 46a3e92f63358253350fed9d2a30a10b8728b83a Mon Sep 17 00:00:00 2001 From: George Banasios Date: Wed, 4 Dec 2024 09:50:15 +0200 Subject: [PATCH] improves cloud info retrieval & kusto formatting --- .../azure/kusto/data/BaseClient.java | 19 +- .../azure/kusto/data/ClientImpl.java | 37 ++- .../azure/kusto/data/ExponentialRetry.java | 42 ++-- .../azure/kusto/data/StreamingClient.java | 14 +- .../com/microsoft/azure/kusto/data/Utils.java | 13 +- .../auth/ApplicationKeyTokenProvider.java | 2 +- .../azure/kusto/data/auth/CloudInfo.java | 140 +++++------ .../data/auth/ConnectionStringBuilder.java | 2 +- .../kusto/data/auth/HttpClientWrapper.java | 2 +- .../kusto/data/auth/TokenProviderFactory.java | 2 +- .../auth/endpoints/KustoTrustedEndpoints.java | 1 + .../kusto/data/exceptions/ExceptionUtils.java | 3 +- .../instrumentation/MonitoredActivity.java | 4 +- .../azure/kusto/data/HeaderTest.java | 1 - .../ingest/ManagedStreamingIngestClient.java | 134 +++++----- .../kusto/ingest/QueuedIngestClientImpl.java | 231 +++++++++--------- .../kusto/ingest/ResourceAlgorithms.java | 45 ++-- .../kusto/ingest/StreamingIngestClient.java | 139 ++++++----- .../microsoft/azure/kusto/ingest/E2ETest.java | 28 +-- .../kusto/ingest/QueuedIngestClientTest.java | 2 +- .../ingest/StreamingIngestClientTest.java | 53 ++-- .../azure/kusto/quickstart/Utils.java | 6 +- 22 files changed, 452 insertions(+), 468 deletions(-) diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/BaseClient.java b/data/src/main/java/com/microsoft/azure/kusto/data/BaseClient.java index 3a95d1d1..7ae60131 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/BaseClient.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/BaseClient.java @@ -50,12 +50,10 @@ public BaseClient(HttpClient httpClient) { protected Mono postAsync(HttpRequest request, long timeoutMs) { return httpClient.send(request, getContextTimeout(timeoutMs)) - .flatMap(response -> - Mono.using( - () -> response, - this::processResponseBodyAsync, - HttpResponse::close - )) + .flatMap(response -> Mono.using( + () -> response, + this::processResponseBodyAsync, + HttpResponse::close)) .onErrorMap(e -> { if (e instanceof DataServiceException) { return e; @@ -82,19 +80,20 @@ public Mono processResponseBodyAsync(HttpResponse response) { } protected Mono postToStreamingOutputAsync(HttpRequest request, long timeoutMs, - int currentRedirectCounter, int maxRedirectCount) { + int currentRedirectCounter, int maxRedirectCount) { ResponseState state = new ResponseState(); return httpClient.send(request, getContextTimeout(timeoutMs)) .flatMap(httpResponse -> processHttpResponse(httpResponse, state, request, timeoutMs, currentRedirectCounter, maxRedirectCount)) .onErrorMap(IOException.class, e -> new DataServiceException(request.getUrl().toString(), "postToStreamingOutput failed to get or decompress response stream", e, false)) .onErrorMap(UncheckedIOException.class, e -> ExceptionUtils.createExceptionOnPost(e, request.getUrl(), "streaming async")) - .onErrorMap(Exception.class, e -> createExceptionFromResponse(request.getUrl().toString(), state.getHttpResponse(), e, state.getErrorFromResponse())) + .onErrorMap(Exception.class, + e -> createExceptionFromResponse(request.getUrl().toString(), state.getHttpResponse(), e, state.getErrorFromResponse())) .doFinally(ignored -> closeResourcesIfNeeded(state.isReturnInputStream(), state.getHttpResponse())); } private Mono processHttpResponse(HttpResponse httpResponse, ResponseState state, HttpRequest request, - long timeoutMs, int currentRedirectCounter, int maxRedirectCount) { + long timeoutMs, int currentRedirectCounter, int maxRedirectCount) { try { state.setHttpResponse(httpResponse); int responseStatusCode = httpResponse.getStatusCode(); @@ -122,7 +121,7 @@ private static Mono handleSuccessfulResponse(HttpResponse httpRespo } private Mono handleErrorResponse(HttpResponse httpResponse, ResponseState state, HttpRequest request, - long timeoutMs, int currentRedirectCounter, int maxRedirectCount) { + long timeoutMs, int currentRedirectCounter, int maxRedirectCount) { return httpResponse.getBodyAsByteArray() .flatMap(bytes -> { try { diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java b/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java index 76d068e2..31ad9336 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java @@ -188,8 +188,7 @@ Mono prepareRequestAsync(@NotNull KustoRequest kr) { HttpTracing tracing = buildTracing(kr); return validateEndpointAsync() - .then(authorization.flatMap(token -> - buildKustoRequestContext(kr, clusterEndpoint, tracing, token)) + .then(authorization.flatMap(token -> buildKustoRequestContext(kr, clusterEndpoint, tracing, token)) .switchIfEmpty(buildKustoRequestContext(kr, clusterEndpoint, tracing, null))); } @@ -236,26 +235,26 @@ private Mono validateEndpointAsync() { if (endpointValidated) { return Mono.empty(); } + return CloudInfo.retrieveCloudInfoForClusterAsync(clusterUrl, null) .map(CloudInfo::getLoginEndpoint) .flatMap(loginEndpoint -> Mono.fromCallable(() -> { - KustoTrustedEndpoints.validateTrustedEndpoint(clusterUrl, loginEndpoint); - return true; - } - )) + KustoTrustedEndpoints.validateTrustedEndpoint(clusterUrl, loginEndpoint); + return true; + })) .doOnSuccess(ignored -> endpointValidated = true) .then(); } @Override public KustoOperationResult executeStreamingIngest(String database, String table, InputStream stream, ClientRequestProperties properties, - String streamFormat, String mappingName, boolean leaveOpen) { + String streamFormat, String mappingName, boolean leaveOpen) { return executeStreamingIngestAsync(database, table, stream, properties, streamFormat, mappingName, leaveOpen).block(); } @Override public Mono executeStreamingIngestAsync(String database, String table, InputStream stream, ClientRequestProperties properties, - String streamFormat, String mappingName, boolean leaveOpen) { + String streamFormat, String mappingName, boolean leaveOpen) { if (stream == null) { return Mono.error(new IllegalArgumentException("The provided stream is null.")); } @@ -268,13 +267,13 @@ public Mono executeStreamingIngestAsync(String database, S @Override public KustoOperationResult executeStreamingIngestFromBlob(String database, String table, String blobUrl, ClientRequestProperties properties, - String dataFormat, String mappingName) { + String dataFormat, String mappingName) { return executeStreamingIngestFromBlobAsync(database, table, blobUrl, properties, dataFormat, mappingName).block(); } @Override public Mono executeStreamingIngestFromBlobAsync(String database, String table, String blobUrl, ClientRequestProperties properties, - String dataFormat, String mappingName) { + String dataFormat, String mappingName) { if (blobUrl == null) { return Mono.error(new IllegalArgumentException("The provided blobUrl is null.")); } @@ -287,22 +286,21 @@ public Mono executeStreamingIngestFromBlobAsync(String dat } private Mono executeStreamingIngestImplAsync(String clusterEndpoint, InputStream stream, String blobUrl, - ClientRequestProperties properties, boolean leaveOpen) { + ClientRequestProperties properties, boolean leaveOpen) { return validateEndpointAsync() .then(getAuthorizationHeaderValueAsync() - .flatMap(token -> - executeStreamingIngest(clusterEndpoint, stream, blobUrl, properties, leaveOpen, token)) + .flatMap(token -> executeStreamingIngest(clusterEndpoint, stream, blobUrl, properties, leaveOpen, token)) .switchIfEmpty(executeStreamingIngest(clusterEndpoint, stream, blobUrl, properties, leaveOpen, null))); } private Mono executeStreamingIngest(String clusterEndpoint, InputStream stream, String blobUrl, - ClientRequestProperties properties, boolean leaveOpen, String authorizationToken) { + ClientRequestProperties properties, boolean leaveOpen, String authorizationToken) { boolean isStreamSource = stream != null; Map headers = new HashMap<>(); String contentEncoding = isStreamSource ? "gzip" : null; String contentType = isStreamSource ? "application/octet-stream" : "application/json"; - return Mono.fromCallable(() -> determineTimeout(properties, CommandType.STREAMING_INGEST, clusterUrl)) // Step 1: Determine timeout + return Mono.fromCallable(() -> determineTimeout(properties, CommandType.STREAMING_INGEST, clusterUrl)) // Step 1: Determine timeout .flatMap(timeoutMs -> { // This was a separate method but was moved into the body of this method because it performs a side effect @@ -341,9 +339,10 @@ private Mono executeStreamingIngest(String clusterEndpoint .withBody(data) .build(); }).flatMap(httpRequest -> MonitoredActivity.wrap(postAsync(httpRequest, timeoutMs), "ClientImpl.executeStreamingIngest") - .flatMap(response -> - Mono.fromCallable(() -> new KustoOperationResult(response, "v1"))) - .onErrorMap(KustoServiceQueryError.class, e -> new DataClientException(clusterEndpoint, "Error converting json response to KustoOperationResult:" + e.getMessage(), e)) + .flatMap(response -> Mono.fromCallable(() -> new KustoOperationResult(response, "v1"))) + .onErrorMap(KustoServiceQueryError.class, + e -> new DataClientException(clusterEndpoint, "Error converting json response to KustoOperationResult:" + e.getMessage(), + e)) .onErrorMap(IOException.class, e -> new DataClientException(clusterUrl, e.getMessage(), e))); }) .doFinally(signalType -> { @@ -420,7 +419,7 @@ private Mono executeStreamingQueryAsync(@NotNull KustoRequest kr) { } private Mono executeStreamingQuery(String clusterEndpoint, KustoRequest kr, - HttpTracing tracing, String authorizationToken) { + HttpTracing tracing, String authorizationToken) { try { HttpRequest request = HttpRequestBuilder .newPost(clusterEndpoint) diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/ExponentialRetry.java b/data/src/main/java/com/microsoft/azure/kusto/data/ExponentialRetry.java index 82c40aa0..4d385c32 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/ExponentialRetry.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/ExponentialRetry.java @@ -35,34 +35,32 @@ public ExponentialRetry(ExponentialRetry other) { } public Retry retry() { - return Retry.from(retrySignals -> - retrySignals.flatMap(retrySignal -> { + return Retry.from(retrySignals -> retrySignals.flatMap(retrySignal -> { - Retry.RetrySignal signalCopy = retrySignal.copy(); - long currentAttempt = signalCopy.totalRetries(); - log.info("Retry attempt {}.", currentAttempt); + Retry.RetrySignal signalCopy = retrySignal.copy(); + long currentAttempt = signalCopy.totalRetries(); + log.info("Retry attempt {}.", currentAttempt); - Throwable failure = signalCopy.failure(); - if (failure instanceof DataServiceException && ((DataServiceException) failure).isPermanent()) { - log.error("Error is permanent, stopping.", failure); - return Mono.error(failure); - } + Throwable failure = signalCopy.failure(); + if (failure instanceof DataServiceException && ((DataServiceException) failure).isPermanent()) { + log.error("Error is permanent, stopping.", failure); + return Mono.error(failure); + } - if (currentAttempt >= maxAttempts) { - log.info("Max retry attempts reached: {}.", currentAttempt); - return Mono.error(failure); - } + if (currentAttempt >= maxAttempts) { + log.info("Max retry attempts reached: {}.", currentAttempt); + return Mono.error(failure); + } - double currentSleepSecs = sleepBaseSecs * (float) Math.pow(2, currentAttempt); - double jitterSecs = (float) Math.random() * maxJitterSecs; - double sleepMs = (currentSleepSecs + jitterSecs) * 1000; + double currentSleepSecs = sleepBaseSecs * (float) Math.pow(2, currentAttempt); + double jitterSecs = (float) Math.random() * maxJitterSecs; + double sleepMs = (currentSleepSecs + jitterSecs) * 1000; - log.info("Attempt {} failed, trying again after sleep of {} seconds.", currentAttempt, sleepMs / 1000); + log.info("Attempt {} failed, trying again after sleep of {} seconds.", currentAttempt, sleepMs / 1000); - // Each retry can occur on a different thread - return Mono.delay(Duration.ofMillis((long) sleepMs)); - }) - ); + // Each retry can occur on a different thread + return Mono.delay(Duration.ofMillis((long) sleepMs)); + })); } } diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/StreamingClient.java b/data/src/main/java/com/microsoft/azure/kusto/data/StreamingClient.java index f34091ea..04cd35f1 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/StreamingClient.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/StreamingClient.java @@ -24,10 +24,11 @@ public interface StreamingClient { * @return {@link KustoOperationResult} object including the ingestion result */ KustoOperationResult executeStreamingIngest(String database, String table, InputStream stream, ClientRequestProperties properties, String streamFormat, - String mappingName, boolean leaveOpen); + String mappingName, boolean leaveOpen); - Mono executeStreamingIngestAsync(String database, String table, InputStream stream, ClientRequestProperties properties, String streamFormat, - String mappingName, boolean leaveOpen); + Mono executeStreamingIngestAsync(String database, String table, InputStream stream, ClientRequestProperties properties, + String streamFormat, + String mappingName, boolean leaveOpen); /** *

Query directly from Kusto database using streaming output.

@@ -46,7 +47,7 @@ Mono executeStreamingIngestAsync(String database, String t InputStream executeStreamingQuery(String command); KustoOperationResult executeStreamingIngestFromBlob(String databaseName, String tableName, String blobUrl, ClientRequestProperties clientRequestProperties, - String dataFormat, String ingestionMappingReference); + String dataFormat, String ingestionMappingReference); Mono executeStreamingQueryAsync(String command); @@ -54,7 +55,8 @@ KustoOperationResult executeStreamingIngestFromBlob(String databaseName, String Mono executeStreamingQueryAsync(String database, String command, ClientRequestProperties properties); - Mono executeStreamingIngestFromBlobAsync(String databaseName, String tableName, String blobUrl, ClientRequestProperties clientRequestProperties, - String dataFormat, String ingestionMappingReference); + Mono executeStreamingIngestFromBlobAsync(String databaseName, String tableName, String blobUrl, + ClientRequestProperties clientRequestProperties, + String dataFormat, String ingestionMappingReference); } diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/Utils.java b/data/src/main/java/com/microsoft/azure/kusto/data/Utils.java index 3952062c..cdd1371c 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/Utils.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/Utils.java @@ -51,7 +51,7 @@ public class Utils { public static ObjectMapper getObjectMapper() { return JsonMapper.builder().configure(MapperFeature.PROPAGATE_TRANSIENT_MARKER, true).addModule(new JavaTimeModule()).build().configure( DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true).configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, true).setNodeFactory( - JsonNodeFactory.withExactBigDecimals(true)); + JsonNodeFactory.withExactBigDecimals(true)); } private static final HashSet> nonRetriableClasses = new HashSet>() { @@ -150,12 +150,11 @@ private static Mono processGzipBody(Flux body) { return new ByteArrayInputStream(bytes); }) .collectList() - .flatMap(inputStreams -> - Mono.fromCallable(() -> { - try (GZIPInputStream gzipStream = new GZIPInputStream(new SequenceInputStream(Collections.enumeration(inputStreams)))) { - return readStreamToString(gzipStream); - } - }).subscribeOn(Schedulers.boundedElastic()) //TODO: same as ingest module + .flatMap(inputStreams -> Mono.fromCallable(() -> { + try (GZIPInputStream gzipStream = new GZIPInputStream(new SequenceInputStream(Collections.enumeration(inputStreams)))) { + return readStreamToString(gzipStream); + } + }).subscribeOn(Schedulers.boundedElastic()) // TODO: same as ingest module ); } diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/auth/ApplicationKeyTokenProvider.java b/data/src/main/java/com/microsoft/azure/kusto/data/auth/ApplicationKeyTokenProvider.java index 257e73b0..155405c5 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/auth/ApplicationKeyTokenProvider.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/auth/ApplicationKeyTokenProvider.java @@ -24,7 +24,7 @@ public ApplicationKeyTokenProvider(@NotNull String clusterUrl, String clientId, @Override protected TokenCredential createTokenCredential(CredentialBuilderBase builder) { - return ((ClientSecretCredentialBuilder)builder) + return ((ClientSecretCredentialBuilder) builder) .clientSecret(clientSecret) .build(); } diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/auth/CloudInfo.java b/data/src/main/java/com/microsoft/azure/kusto/data/auth/CloudInfo.java index 06a77a2a..53380281 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/auth/CloudInfo.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/auth/CloudInfo.java @@ -1,5 +1,18 @@ package com.microsoft.azure.kusto.data.auth; +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.lang3.StringUtils; +import org.jetbrains.annotations.Nullable; + import com.azure.core.http.HttpClient; import com.azure.core.http.HttpHeaderName; import com.azure.core.http.HttpMethod; @@ -15,24 +28,14 @@ import com.microsoft.azure.kusto.data.exceptions.ExceptionUtils; import com.microsoft.azure.kusto.data.http.HttpClientFactory; import com.microsoft.azure.kusto.data.http.HttpStatus; +import com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity; import com.microsoft.azure.kusto.data.instrumentation.TraceableAttributes; import com.microsoft.azure.kusto.data.req.RequestUtils; -import org.apache.commons.lang3.StringUtils; -import org.jetbrains.annotations.Nullable; -import reactor.core.publisher.Mono; -import java.io.Closeable; -import java.io.IOException; -import java.io.Serializable; -import java.net.URISyntaxException; -import java.time.Duration; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; +import reactor.core.publisher.Mono; public class CloudInfo implements TraceableAttributes, Serializable { - private static final Map cache = new ConcurrentHashMap<>(); //TODO: is this correct? + private static final Map cache = new ConcurrentHashMap<>(); // TODO: is this correct? public static final String METADATA_ENDPOINT = "v1/rest/auth/metadata"; public static final String DEFAULT_KUSTO_CLIENT_APP_ID = "db662dc1-0cfe-4e1c-a843-19a68e65be58"; @@ -65,7 +68,7 @@ public class CloudInfo implements TraceableAttributes, Serializable { private static final ExponentialRetry exponentialRetryTemplate = new ExponentialRetry(ATTEMPT_COUNT); public CloudInfo(boolean loginMfaRequired, String loginEndpoint, String kustoClientAppId, String kustoClientRedirectUri, String kustoServiceResourceId, - String firstPartyAuthorityUrl) { + String firstPartyAuthorityUrl) { this.loginMfaRequired = loginMfaRequired; this.loginEndpoint = loginEndpoint; this.kustoClientAppId = kustoClientAppId; @@ -85,57 +88,46 @@ public static CloudInfo retrieveCloudInfoForCluster(String clusterUrl) { } public static Mono retrieveCloudInfoForClusterAsync(String clusterUrl, @Nullable HttpClient givenHttpClient) { - CloudInfo cachedCloudInfo; + return getCachedCloudInfo(clusterUrl) + .switchIfEmpty( + fetchCloudInfoAsync(clusterUrl, givenHttpClient) + .retryWhen(new ExponentialRetry(exponentialRetryTemplate).retry()) + .doOnNext(cloudInfo -> cache.put(clusterUrl, cloudInfo))) + .onErrorMap(e -> ExceptionUtils.unwrapCloudInfoException(clusterUrl, e)); + } + + private static Mono getCachedCloudInfo(String clusterUrl) { try { - cachedCloudInfo = cache.get(UriUtils.setPathForUri(clusterUrl, "")); + return Mono.justOrEmpty(cache.get(UriUtils.setPathForUri(clusterUrl, ""))); } catch (URISyntaxException ex) { return Mono.error(new DataServiceException(clusterUrl, "Error in metadata endpoint, cluster uri invalid", ex, true)); } - if (cachedCloudInfo != null) { - return Mono.just(cachedCloudInfo); - } - - return fetchCloudInfoAsync(clusterUrl, givenHttpClient) - .retryWhen(new ExponentialRetry(exponentialRetryTemplate).retry()) - .doOnNext(cloudInfo -> cache.put(clusterUrl, cloudInfo)) - .onErrorMap(e -> ExceptionUtils.unwrapCloudInfoException(clusterUrl, e)); } private static Mono fetchCloudInfoAsync(String clusterUrl, @Nullable HttpClient givenHttpClient) { HttpClient localHttpClient = givenHttpClient == null ? HttpClientFactory.create(null) : givenHttpClient; return Mono.using( - () -> localHttpClient, - client -> { - try { - HttpRequest request = new HttpRequest(HttpMethod.GET, UriUtils.appendPathToUri(clusterUrl, METADATA_ENDPOINT)); - request.setHeader(HttpHeaderName.ACCEPT_ENCODING, "gzip,deflate"); - request.setHeader(HttpHeaderName.ACCEPT, "application/json"); - - return localHttpClient.send(request, RequestUtils.contextWithTimeout(CLOUD_INFO_TIMEOUT)) - .flatMap(response -> getCloudInfo(response, clusterUrl)) - .onErrorMap(e -> ExceptionUtils.unwrapCloudInfoException(clusterUrl, e)); - } catch (URISyntaxException e) { - return Mono.error(new DataServiceException(clusterUrl, - "URISyntaxException when trying to retrieve cluster metadata:" + e.getMessage(), e, true)); - } catch (Exception e) { - return Mono.error(new DataServiceException(clusterUrl, "Error while retrieving the cluster metadata: " + e, e, true)); - } - }, - client -> { - if (givenHttpClient == null && client instanceof Closeable) { - try { - ((Closeable) client).close(); - } catch (IOException ex) { - throw new RuntimeException("Error closing HttpClient while retrieving the cluster metadata", ex); - } - } - }) - .onErrorMap(e -> { - if (e instanceof RuntimeException) { - return new DataServiceException(clusterUrl, "Failed during resource cleanup: " + e.getCause(), (Exception) e, false); - } - return e; - }); + () -> localHttpClient, + client -> fetchCloudInfo(localHttpClient, clusterUrl), + client -> closeHttpClient(localHttpClient, givenHttpClient)) + .onErrorMap(e -> ExceptionUtils.unwrapCloudInfoException(clusterUrl, e)); + } + + private static Mono fetchCloudInfo(HttpClient localHttpClient, String clusterUrl) { + return sendRequest(localHttpClient, clusterUrl) + .flatMap(response -> getCloudInfo(response, clusterUrl)) + .onErrorMap(e -> ExceptionUtils.unwrapCloudInfoException(clusterUrl, e)); + } + + private static Mono sendRequest(HttpClient localHttpClient, String clusterUrl) { + return Mono.fromCallable(() -> { + HttpRequest request = new HttpRequest(HttpMethod.GET, UriUtils.appendPathToUri(clusterUrl, METADATA_ENDPOINT)); + request.setHeader(HttpHeaderName.ACCEPT_ENCODING, "gzip,deflate"); + request.setHeader(HttpHeaderName.ACCEPT, "application/json"); + return request; + }) + .flatMap(httpRequest -> MonitoredActivity.wrap(localHttpClient.send(httpRequest, RequestUtils.contextWithTimeout(CLOUD_INFO_TIMEOUT)), + "CloudInfo.httpCall")); } private static Mono getCloudInfo(HttpResponse response, String clusterUrl) { @@ -143,18 +135,7 @@ private static Mono getCloudInfo(HttpResponse response, String cluste return response.getBodyAsByteArray() .flatMap(bodyAsBinaryData -> { if (statusCode == HttpStatus.OK) { - String content; - try { - content = Utils.getContentAsString(response, bodyAsBinaryData); - if (content.isEmpty() || content.equals("{}")) { - return Mono.error(new DataServiceException(clusterUrl, - "Error in metadata endpoint, received no data", - true)); - } - return Mono.justOrEmpty(parseCloudInfo(content)); - } catch (Exception e) { - return Mono.error(e); - } + return parseCloudInfoSafely(response, bodyAsBinaryData, clusterUrl); } else if (statusCode == HttpStatus.NOT_FOUND) { return Mono.just(DEFAULT_CLOUD); } else { @@ -169,6 +150,29 @@ private static Mono getCloudInfo(HttpResponse response, String cluste }); } + private static Mono parseCloudInfoSafely(HttpResponse response, byte[] bodyAsBinaryData, String clusterUrl) { + try { + String content = Utils.getContentAsString(response, bodyAsBinaryData); + if (content.isEmpty() || content.equals("{}")) { + return Mono.error(new DataServiceException(clusterUrl, "Error in metadata endpoint, received no data", true)); + } + + return Mono.just(parseCloudInfo(content)); + } catch (Exception e) { + return Mono.error(e); + } + } + + private static void closeHttpClient(HttpClient localHttpClient, @Nullable HttpClient givenHttpClient) { + if (givenHttpClient == null && localHttpClient instanceof Closeable) { + try { + ((Closeable) localHttpClient).close(); + } catch (IOException ex) { + throw new RuntimeException("Error closing HttpClient while retrieving the cluster metadata", ex); // TODO: fix this + } + } + } + private static CloudInfo parseCloudInfo(String content) throws JsonProcessingException { ObjectMapper objectMapper = Utils.getObjectMapper(); JsonNode jsonObject = objectMapper.readTree(content); diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/auth/ConnectionStringBuilder.java b/data/src/main/java/com/microsoft/azure/kusto/data/auth/ConnectionStringBuilder.java index 7cf18a68..4b3f6940 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/auth/ConnectionStringBuilder.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/auth/ConnectionStringBuilder.java @@ -529,7 +529,7 @@ public static ConnectionStringBuilder createWithAzureCli(String clusterUrl) { return csb; } - public static ConnectionStringBuilder createWithTokenCredential(@NotNull String clusterUrl, @Nullable TokenCredential tokenCredential) { + public static ConnectionStringBuilder createWithTokenCredential(@NotNull String clusterUrl, @Nullable TokenCredential tokenCredential) { if (StringUtils.isEmpty(clusterUrl)) { throw new IllegalArgumentException("clusterUrl cannot be null or empty"); } diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/auth/HttpClientWrapper.java b/data/src/main/java/com/microsoft/azure/kusto/data/auth/HttpClientWrapper.java index dcb58e5a..46f1b83a 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/auth/HttpClientWrapper.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/auth/HttpClientWrapper.java @@ -21,7 +21,7 @@ public HttpClientWrapper(HttpClient httpClient) { // Implementation of the synchronous HttpClient @Override - public IHttpResponse send(com.microsoft.aad.msal4j.HttpRequest httpRequest) { //TODO: ? + public IHttpResponse send(com.microsoft.aad.msal4j.HttpRequest httpRequest) { // TODO: ? HttpMethod method; switch (httpRequest.httpMethod()) { case GET: diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/auth/TokenProviderFactory.java b/data/src/main/java/com/microsoft/azure/kusto/data/auth/TokenProviderFactory.java index d7e0f0c4..6696f848 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/auth/TokenProviderFactory.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/auth/TokenProviderFactory.java @@ -45,7 +45,7 @@ public static TokenProviderBase createTokenProvider(@NotNull ConnectionStringBui } else if (asyncTokenProvider != null) { return new AsyncCallbackTokenProvider(clusterUrl, asyncTokenProvider); } else if (csb.getCustomTokenCredential() != null) { - return new TokenCredentialProvider(clusterUrl, csb.getCustomTokenCredential()); + return new TokenCredentialProvider(clusterUrl, csb.getCustomTokenCredential()); } else if (csb.isUseDeviceCodeAuth()) { return new DeviceAuthTokenProvider(clusterUrl, authorityId, httpClient); } else if (csb.isUseManagedIdentityAuth()) { diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/auth/endpoints/KustoTrustedEndpoints.java b/data/src/main/java/com/microsoft/azure/kusto/data/auth/endpoints/KustoTrustedEndpoints.java index b3b9f90d..7bc28b2a 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/auth/endpoints/KustoTrustedEndpoints.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/auth/endpoints/KustoTrustedEndpoints.java @@ -74,6 +74,7 @@ public static void validateTrustedEndpoint(String uri, String loginEndpoint) thr */ public static void validateTrustedEndpoint(String uri) throws KustoClientInvalidConnectionStringException { try { + // TODO: if this method will be used, replace the sync retrieveCloudInfoForCluster with the async one validateTrustedEndpoint(new URI(uri), CloudInfo.retrieveCloudInfoForCluster(uri).getLoginEndpoint()); } catch (URISyntaxException ex) { throw new KustoClientInvalidConnectionStringException(ex); diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/exceptions/ExceptionUtils.java b/data/src/main/java/com/microsoft/azure/kusto/data/exceptions/ExceptionUtils.java index 515ec8ef..498c3a7d 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/exceptions/ExceptionUtils.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/exceptions/ExceptionUtils.java @@ -30,7 +30,8 @@ public static DataServiceException createExceptionOnPost(Exception e, URL url, S public static Exception unwrapCloudInfoException(String clusterUrl, Throwable throwable) { if (throwable instanceof URISyntaxException) { - return new DataServiceException(clusterUrl, "URISyntaxException when trying to retrieve cluster metadata:" + throwable.getMessage(), (URISyntaxException) throwable, true); + return new DataServiceException(clusterUrl, "URISyntaxException when trying to retrieve cluster metadata:" + throwable.getMessage(), + (URISyntaxException) throwable, true); } if (throwable instanceof IOException) { diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/instrumentation/MonitoredActivity.java b/data/src/main/java/com/microsoft/azure/kusto/data/instrumentation/MonitoredActivity.java index a3f0a4c2..4adfc72b 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/instrumentation/MonitoredActivity.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/instrumentation/MonitoredActivity.java @@ -41,8 +41,8 @@ public static Mono wrap(Mono mono, String nameOfSpan, Map Mono invokeAsync(FunctionOneException, Tracer.Span, U> function, - String nameOfSpan, - Map attributes) { + String nameOfSpan, + Map attributes) { return Mono.defer(() -> { Tracer.Span span = Tracer.startSpan(nameOfSpan, attributes); try { diff --git a/data/src/test/java/com/microsoft/azure/kusto/data/HeaderTest.java b/data/src/test/java/com/microsoft/azure/kusto/data/HeaderTest.java index 72ec5aed..80b217ed 100644 --- a/data/src/test/java/com/microsoft/azure/kusto/data/HeaderTest.java +++ b/data/src/test/java/com/microsoft/azure/kusto/data/HeaderTest.java @@ -226,7 +226,6 @@ public void testHttpRequestNoAuth() throws DataClientException { } } - private Map extractHeadersFromAzureRequest(HttpRequest request) { Map uncomplicatedHeaders = new HashMap<>(); HttpHeaders headers = request.getHeaders(); diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ManagedStreamingIngestClient.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ManagedStreamingIngestClient.java index 8f3a087c..32c0bbab 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ManagedStreamingIngestClient.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ManagedStreamingIngestClient.java @@ -86,7 +86,7 @@ public static ManagedStreamingIngestClient fromDmConnectionString(ConnectionStri * For advanced usage, use {@link ManagedStreamingIngestClient#ManagedStreamingIngestClient(ConnectionStringBuilder, ConnectionStringBuilder)} */ public static ManagedStreamingIngestClient fromDmConnectionString(ConnectionStringBuilder dmConnectionString, - @Nullable HttpClientProperties properties) + @Nullable HttpClientProperties properties) throws URISyntaxException { ConnectionStringBuilder engineConnectionString = new ConnectionStringBuilder(dmConnectionString); engineConnectionString.setClusterUrl(IngestClientBase.getQueryEndpoint(engineConnectionString.getClusterUrl())); @@ -117,7 +117,7 @@ public static ManagedStreamingIngestClient fromEngineConnectionString(Connection * For advanced usage, use {@link ManagedStreamingIngestClient#ManagedStreamingIngestClient(ConnectionStringBuilder, ConnectionStringBuilder)} */ public static ManagedStreamingIngestClient fromEngineConnectionString(ConnectionStringBuilder engineConnectionString, - @Nullable HttpClientProperties properties) + @Nullable HttpClientProperties properties) throws URISyntaxException { ConnectionStringBuilder dmConnectionString = new ConnectionStringBuilder(engineConnectionString); dmConnectionString.setClusterUrl(IngestClientBase.getIngestionEndpoint(engineConnectionString.getClusterUrl())); @@ -133,7 +133,7 @@ public static ManagedStreamingIngestClient fromEngineConnectionString(Connection * instead. */ public ManagedStreamingIngestClient(ConnectionStringBuilder ingestionEndpointConnectionStringBuilder, - ConnectionStringBuilder queryEndpointConnectionStringBuilder) throws URISyntaxException { + ConnectionStringBuilder queryEndpointConnectionStringBuilder) throws URISyntaxException { this(ingestionEndpointConnectionStringBuilder, queryEndpointConnectionStringBuilder, null); } @@ -144,7 +144,7 @@ public ManagedStreamingIngestClient(ConnectionStringBuilder ingestionEndpointCon * @throws URISyntaxException if the connection string is invalid */ public ManagedStreamingIngestClient(ConnectionStringBuilder ingestionEndpointConnectionStringBuilder, - ConnectionStringBuilder queryEndpointConnectionStringBuilder, boolean autoCorrectEndpoint) throws URISyntaxException { + ConnectionStringBuilder queryEndpointConnectionStringBuilder, boolean autoCorrectEndpoint) throws URISyntaxException { this(ingestionEndpointConnectionStringBuilder, queryEndpointConnectionStringBuilder, null, autoCorrectEndpoint); } @@ -159,8 +159,8 @@ public ManagedStreamingIngestClient(ConnectionStringBuilder ingestionEndpointCon * {@link #ManagedStreamingIngestClient(ConnectionStringBuilder, HttpClientProperties)})} instead. */ public ManagedStreamingIngestClient(ConnectionStringBuilder ingestionEndpointConnectionStringBuilder, - ConnectionStringBuilder queryEndpointConnectionStringBuilder, - @Nullable HttpClientProperties properties, boolean autoCorrectEndpoint) throws URISyntaxException { + ConnectionStringBuilder queryEndpointConnectionStringBuilder, + @Nullable HttpClientProperties properties, boolean autoCorrectEndpoint) throws URISyntaxException { log.info("Creating a new ManagedStreamingIngestClient from connection strings"); queuedIngestClient = new QueuedIngestClientImpl(ingestionEndpointConnectionStringBuilder, properties, autoCorrectEndpoint); streamingIngestClient = new StreamingIngestClient(queryEndpointConnectionStringBuilder, properties, autoCorrectEndpoint); @@ -174,7 +174,7 @@ public ManagedStreamingIngestClient(ConnectionStringBuilder ingestionEndpointCon * @throws URISyntaxException if the connection string is invalid */ public ManagedStreamingIngestClient(ConnectionStringBuilder connectionStringBuilder, - @Nullable HttpClientProperties properties, boolean autoCorrectEndpoint) throws URISyntaxException { + @Nullable HttpClientProperties properties, boolean autoCorrectEndpoint) throws URISyntaxException { log.info("Creating a new ManagedStreamingIngestClient from connection strings"); queuedIngestClient = new QueuedIngestClientImpl(connectionStringBuilder, properties, autoCorrectEndpoint); streamingIngestClient = new StreamingIngestClient(connectionStringBuilder, properties, autoCorrectEndpoint); @@ -188,7 +188,7 @@ public ManagedStreamingIngestClient(ConnectionStringBuilder connectionStringBuil * @throws URISyntaxException if the connection string is invalid */ public ManagedStreamingIngestClient(ConnectionStringBuilder connectionStringBuilder, - @Nullable HttpClient httpClient, boolean autoCorrectEndpoint) throws URISyntaxException { + @Nullable HttpClient httpClient, boolean autoCorrectEndpoint) throws URISyntaxException { log.info("Creating a new ManagedStreamingIngestClient from connection strings"); queuedIngestClient = new QueuedIngestClientImpl(connectionStringBuilder, httpClient, autoCorrectEndpoint); streamingIngestClient = new StreamingIngestClient(connectionStringBuilder, httpClient, autoCorrectEndpoint); @@ -203,8 +203,8 @@ public ManagedStreamingIngestClient(ConnectionStringBuilder connectionStringBuil * @throws URISyntaxException if the connection string is invalid */ public ManagedStreamingIngestClient(ConnectionStringBuilder ingestionEndpointConnectionStringBuilder, - ConnectionStringBuilder queryEndpointConnectionStringBuilder, - @Nullable HttpClientProperties properties) throws URISyntaxException { + ConnectionStringBuilder queryEndpointConnectionStringBuilder, + @Nullable HttpClientProperties properties) throws URISyntaxException { log.info("Creating a new ManagedStreamingIngestClient from connection strings"); queuedIngestClient = new QueuedIngestClientImpl(ingestionEndpointConnectionStringBuilder, properties, true); streamingIngestClient = new StreamingIngestClient(queryEndpointConnectionStringBuilder, properties, true); @@ -217,7 +217,7 @@ public ManagedStreamingIngestClient(ConnectionStringBuilder ingestionEndpointCon * @throws URISyntaxException if the connection string is invalid */ public ManagedStreamingIngestClient(ConnectionStringBuilder connectionStringBuilder, - @Nullable HttpClientProperties properties) throws URISyntaxException { + @Nullable HttpClientProperties properties) throws URISyntaxException { log.info("Creating a new ManagedStreamingIngestClient from connection strings"); queuedIngestClient = new QueuedIngestClientImpl(connectionStringBuilder, properties, true); streamingIngestClient = new StreamingIngestClient(connectionStringBuilder, properties, true); @@ -230,7 +230,7 @@ public ManagedStreamingIngestClient(ConnectionStringBuilder connectionStringBuil * @throws URISyntaxException if the connection string is invalid */ public ManagedStreamingIngestClient(ConnectionStringBuilder connectionStringBuilder, - @Nullable HttpClient httpClient) throws URISyntaxException { + @Nullable HttpClient httpClient) throws URISyntaxException { log.info("Creating a new ManagedStreamingIngestClient from connection strings"); queuedIngestClient = new QueuedIngestClientImpl(connectionStringBuilder, httpClient, true); streamingIngestClient = new StreamingIngestClient(connectionStringBuilder, httpClient, true); @@ -246,8 +246,8 @@ public ManagedStreamingIngestClient(ConnectionStringBuilder connectionStringBuil * {@link IngestClientFactory#createManagedStreamingIngestClient(ConnectionStringBuilder)} instead. */ public ManagedStreamingIngestClient(ResourceManager resourceManager, - AzureStorageClient storageClient, - StreamingClient streamingClient) { + AzureStorageClient storageClient, + StreamingClient streamingClient) { log.info("Creating a new ManagedStreamingIngestClient from raw parts"); queuedIngestClient = new QueuedIngestClientImpl(resourceManager, storageClient); streamingIngestClient = new StreamingIngestClient(streamingClient); @@ -261,9 +261,9 @@ public ManagedStreamingIngestClient(ResourceManager resourceManager, * @param retryTemplate - retry template */ public ManagedStreamingIngestClient(ResourceManager resourceManager, - AzureStorageClient storageClient, - StreamingClient streamingClient, - ExponentialRetry retryTemplate) { + AzureStorageClient storageClient, + StreamingClient streamingClient, + ExponentialRetry retryTemplate) { log.info("Creating a new ManagedStreamingIngestClient from raw parts"); queuedIngestClient = new QueuedIngestClientImpl(resourceManager, storageClient); streamingIngestClient = new StreamingIngestClient(streamingClient); @@ -279,12 +279,12 @@ public ManagedStreamingIngestClient(ResourceManager resourceManager, @Override protected Mono ingestFromFileAsyncImpl(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) { return Mono.fromCallable(() -> { - Ensure.argIsNotNull(fileSourceInfo, "fileSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - fileSourceInfo.validate(); - ingestionProperties.validate(); - return true; - }) + Ensure.argIsNotNull(fileSourceInfo, "fileSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); + fileSourceInfo.validate(); + ingestionProperties.validate(); + return true; + }) .flatMap(validInput -> Mono.fromCallable(() -> IngestionUtils.fileToStream(fileSourceInfo, true, ingestionProperties.getDataFormat())) .flatMap(streamSourceInfo -> ingestFromStreamAsync(streamSourceInfo, ingestionProperties))) .onErrorMap(FileNotFoundException.class, e -> { @@ -296,12 +296,12 @@ protected Mono ingestFromFileAsyncImpl(FileSourceInfo fileSourc @Override protected Mono ingestFromBlobAsyncImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) { return Mono.fromCallable(() -> { - Ensure.argIsNotNull(blobSourceInfo, "blobSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - blobSourceInfo.validate(); - ingestionProperties.validate(); - return true; - }) + Ensure.argIsNotNull(blobSourceInfo, "blobSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); + blobSourceInfo.validate(); + ingestionProperties.validate(); + return true; + }) .flatMap(valid -> { BlobClientBuilder blobClientBuilder = new BlobClientBuilder().endpoint(blobSourceInfo.getBlobPath()); if (httpClient != null) { @@ -311,12 +311,13 @@ protected Mono ingestFromBlobAsyncImpl(BlobSourceInfo blobSourc BlobClient blobClient = blobClientBuilder.buildClient(); return Mono.fromCallable(() -> { - if (blobSourceInfo.getRawSizeInBytes() <= 0) { - return blobClient.getProperties().getBlobSize(); - } - return blobSourceInfo.getRawSizeInBytes(); - }) - .subscribeOn(Schedulers.boundedElastic()) //TODO: all ingest apis have blocking calls. offload them to bounded elastic pool in order for the main reactive thread to continue operate? + if (blobSourceInfo.getRawSizeInBytes() <= 0) { + return blobClient.getProperties().getBlobSize(); + } + return blobSourceInfo.getRawSizeInBytes(); + }) + .subscribeOn(Schedulers.boundedElastic()) // TODO: all ingest apis have blocking calls. offload them to bounded elastic pool in + // order for the main reactive thread to continue operate? .onErrorMap(BlobStorageException.class, e -> new IngestionServiceException( blobSourceInfo.getBlobPath(), "Failed getting blob properties: " + ExceptionsUtils.getMessageEx(e), @@ -327,9 +328,9 @@ protected Mono ingestFromBlobAsyncImpl(BlobSourceInfo blobSourc } private Mono handleIngestion(BlobSourceInfo blobSourceInfo, - IngestionProperties ingestionProperties, - BlobClient blobClient, - long blobSize) { + IngestionProperties ingestionProperties, + BlobClient blobClient, + long blobSize) { if (queuingPolicy.shouldUseQueuedIngestion(blobSize, blobSourceInfo.getRawSizeInBytes(), blobSourceInfo.getCompressionType() != null, ingestionProperties.getDataFormat())) { log.info(FALLBACK_LOG_STRING); @@ -342,7 +343,7 @@ private Mono handleIngestion(BlobSourceInfo blobSourceInfo, } private Mono executeStream(SourceInfo sourceInfo, IngestionProperties ingestionProperties, @Nullable BlobClient blobClient) { - if (blobClient != null) { //TODO: is the currentAttempt in the clientRequestId needed? + if (blobClient != null) { // TODO: is the currentAttempt in the clientRequestId needed? String clientRequestId = String.format("KJC.executeManagedStreamingIngest.ingestFromBlob;%s", sourceInfo.getSourceId()); return streamingIngestClient.ingestFromBlobAsync((BlobSourceInfo) sourceInfo, ingestionProperties, blobClient, clientRequestId) .onErrorResume(e -> handleStreamingError(sourceInfo, e)); @@ -355,7 +356,7 @@ private Mono executeStream(SourceInfo sourceInfo, IngestionProp private Mono handleStreamingError(SourceInfo sourceInfo, Throwable e) { if (e instanceof IngestionServiceException - && e.getCause() != null //TODO: is this needed? + && e.getCause() != null // TODO: is this needed? && e.getCause() instanceof DataServiceException && e.getCause().getCause() != null && e.getCause().getCause() instanceof DataWebException) { @@ -381,12 +382,12 @@ private Mono handleStreamingError(SourceInfo sourceInfo, Throwa @Override protected Mono ingestFromResultSetAsyncImpl(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) { return Mono.fromCallable(() -> { - Ensure.argIsNotNull(resultSetSourceInfo, "resultSetSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - resultSetSourceInfo.validate(); - ingestionProperties.validateResultSetProperties(); - return IngestionUtils.resultSetToStream(resultSetSourceInfo); - }) + Ensure.argIsNotNull(resultSetSourceInfo, "resultSetSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); + resultSetSourceInfo.validate(); + ingestionProperties.validateResultSetProperties(); + return IngestionUtils.resultSetToStream(resultSetSourceInfo); + }) .onErrorMap(IOException.class, e -> { String msg = "Failed to read from ResultSet."; log.error(msg, e); @@ -398,24 +399,24 @@ protected Mono ingestFromResultSetAsyncImpl(ResultSetSourceInfo @Override protected Mono ingestFromStreamAsyncImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) { return Mono.fromCallable(() -> { - Ensure.argIsNotNull(streamSourceInfo, "streamSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - streamSourceInfo.validate(); - ingestionProperties.validate(); + Ensure.argIsNotNull(streamSourceInfo, "streamSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); + streamSourceInfo.validate(); + ingestionProperties.validate(); - if (streamSourceInfo.getSourceId() == null) { - streamSourceInfo.setSourceId(UUID.randomUUID()); - } - return streamSourceInfo; - }) + if (streamSourceInfo.getSourceId() == null) { + streamSourceInfo.setSourceId(UUID.randomUUID()); + } + return streamSourceInfo; + }) .flatMap(sourceInfo -> Mono.fromCallable(() -> { - int availableBytes = sourceInfo.getStream().available(); - return queuingPolicy.shouldUseQueuedIngestion( - availableBytes, - streamSourceInfo.getRawSizeInBytes(), - streamSourceInfo.getCompressionType() != null, - ingestionProperties.getDataFormat()); - }).subscribeOn(Schedulers.boundedElastic()) //TODO: same + int availableBytes = sourceInfo.getStream().available(); + return queuingPolicy.shouldUseQueuedIngestion( + availableBytes, + streamSourceInfo.getRawSizeInBytes(), + streamSourceInfo.getCompressionType() != null, + ingestionProperties.getDataFormat()); + }).subscribeOn(Schedulers.boundedElastic()) // TODO: same .flatMap(useQueued -> { if (Boolean.TRUE.equals(useQueued)) { return queuedIngestClient.ingestFromStreamAsync(streamSourceInfo, ingestionProperties); @@ -428,7 +429,7 @@ protected Mono ingestFromStreamAsyncImpl(StreamSourceInfo strea private Mono processStream(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) { return Mono.fromCallable(() -> streamSourceInfo.getStream() instanceof ByteArrayInputStream - || streamSourceInfo.getStream() instanceof ResettableFileInputStream) + || streamSourceInfo.getStream() instanceof ResettableFileInputStream) .flatMap(isKnownStreamType -> { if (Boolean.TRUE.equals(isKnownStreamType)) { StreamSourceInfo managedSourceInfo = new StreamSourceInfo(streamSourceInfo.getStream(), @@ -440,8 +441,8 @@ private Mono processStream(StreamSourceInfo streamSourceInfo, I .doFinally(signal -> closeStreamSafely(managedSourceInfo)); } else { return Mono.fromCallable(() -> IngestionUtils.readBytesFromInputStream(streamSourceInfo.getStream(), - ManagedStreamingQueuingPolicy.MAX_STREAMING_STREAM_SIZE_BYTES + 1)) - .subscribeOn(Schedulers.boundedElastic())//TODO: same + ManagedStreamingQueuingPolicy.MAX_STREAMING_STREAM_SIZE_BYTES + 1)) + .subscribeOn(Schedulers.boundedElastic())// TODO: same .flatMap(streamingBytes -> { InputStream byteArrayStream = new ByteArrayInputStream(streamingBytes); int size = streamingBytes.length; @@ -453,7 +454,8 @@ private Mono processStream(StreamSourceInfo streamSourceInfo, I ingestionProperties.getDataFormat()); if (shouldUseQueuedIngestion) { log.info(FALLBACK_LOG_STRING); - StreamSourceInfo managedSourceInfo = new StreamSourceInfo(new SequenceInputStream(byteArrayStream, streamSourceInfo.getStream()), + StreamSourceInfo managedSourceInfo = new StreamSourceInfo( + new SequenceInputStream(byteArrayStream, streamSourceInfo.getStream()), streamSourceInfo.isLeaveOpen(), streamSourceInfo.getSourceId(), streamSourceInfo.getCompressionType()); return queuedIngestClient.ingestFromStreamAsync(managedSourceInfo, ingestionProperties); diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientImpl.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientImpl.java index 5af8e772..16e0b814 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientImpl.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientImpl.java @@ -98,94 +98,93 @@ public IngestionResourceManager getResourceManager() { @Override protected Mono ingestFromBlobAsyncImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) { return Mono.fromCallable(() -> { - Ensure.argIsNotNull(blobSourceInfo, "blobSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - - blobSourceInfo.validate(); - ingestionProperties.validate(); - ingestionProperties.setAuthorizationContextToken(resourceManager.getIdentityToken()); - - // Create the ingestion message - IngestionBlobInfo ingestionBlobInfo = new IngestionBlobInfo(blobSourceInfo.getBlobPath(), - ingestionProperties.getDatabaseName(), ingestionProperties.getTableName(), this.applicationForTracing, - this.clientVersionForTracing); - String urlWithoutSecrets = SecurityUtils.removeSecretsFromUrl(blobSourceInfo.getBlobPath()); - if (blobSourceInfo.getRawSizeInBytes() > 0L) { - ingestionBlobInfo.setRawDataSize(blobSourceInfo.getRawSizeInBytes()); - } else { - log.warn("Blob '{}' was sent for ingestion without specifying its raw data size", urlWithoutSecrets); - } - - ingestionBlobInfo.setReportLevel(ingestionProperties.getReportLevel().getKustoValue()); - ingestionBlobInfo.setReportMethod(ingestionProperties.getReportMethod().getKustoValue()); - ingestionBlobInfo.setFlushImmediately(ingestionProperties.getFlushImmediately()); - ingestionBlobInfo.setValidationPolicy(ingestionProperties.getValidationPolicy()); - ingestionBlobInfo.setAdditionalProperties(ingestionProperties.getIngestionProperties()); - if (blobSourceInfo.getSourceId() != null) { - ingestionBlobInfo.setId(blobSourceInfo.getSourceId()); - } - - String id = ingestionBlobInfo.getId().toString(); - IngestionStatus status = new IngestionStatus(); - status.setDatabase(ingestionProperties.getDatabaseName()); - status.setTable(ingestionProperties.getTableName()); - status.setStatus(OperationStatus.Queued); - status.setUpdatedOn(Instant.now()); - status.setIngestionSourceId(ingestionBlobInfo.getId()); - status.setIngestionSourcePath(urlWithoutSecrets); - - boolean reportToTable = ingestionProperties.getReportLevel() != IngestionProperties.IngestionReportLevel.NONE && - ingestionProperties.getReportMethod() != IngestionProperties.IngestionReportMethod.QUEUE; - List tableStatuses = new LinkedList<>(); - - if (reportToTable) { - status.setStatus(OperationStatus.Pending); - TableWithSas statusTable = resourceManager.getStatusTable(); - IngestionStatusInTableDescription ingestionStatusInTable = new IngestionStatusInTableDescription(); - ingestionStatusInTable.setTableClient(statusTable.getTable()); - ingestionStatusInTable.setTableConnectionString(statusTable.getUri()); - ingestionStatusInTable.setPartitionKey(ingestionBlobInfo.getId().toString()); - ingestionStatusInTable.setRowKey(ingestionBlobInfo.getId().toString()); - ingestionBlobInfo.setIngestionStatusInTable(ingestionStatusInTable); - - return Mono.fromCallable(() -> { - azureStorageClient.azureTableInsertEntity(statusTable.getTable(), new TableEntity(id, id).setProperties(status.getEntityProperties())); - tableStatuses.add(ingestionBlobInfo.getIngestionStatusInTable()); - return tableStatuses; - }) - .publishOn(Schedulers.boundedElastic()) - .flatMap(insertedTableStatuses -> - ResourceAlgorithms.postToQueueWithRetriesAsync(resourceManager, azureStorageClient, ingestionBlobInfo) - .thenReturn(new TableReportIngestionResult(insertedTableStatuses))) - .onErrorMap(e -> { - if (e instanceof BlobStorageException || e instanceof QueueStorageException || e instanceof TableServiceException) { - return new IngestionServiceException("Failed to ingest from blob", (Exception) e); - } else if (e instanceof IOException || e instanceof URISyntaxException) { - return new IngestionClientException("Failed to ingest from blob", e); - } else { - return e; - } - }); - } - - return ResourceAlgorithms.postToQueueWithRetriesAsync(resourceManager, azureStorageClient, ingestionBlobInfo) - .thenReturn(new IngestionStatusResult(status)); + Ensure.argIsNotNull(blobSourceInfo, "blobSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); + + blobSourceInfo.validate(); + ingestionProperties.validate(); + ingestionProperties.setAuthorizationContextToken(resourceManager.getIdentityToken()); + + // Create the ingestion message + IngestionBlobInfo ingestionBlobInfo = new IngestionBlobInfo(blobSourceInfo.getBlobPath(), + ingestionProperties.getDatabaseName(), ingestionProperties.getTableName(), this.applicationForTracing, + this.clientVersionForTracing); + String urlWithoutSecrets = SecurityUtils.removeSecretsFromUrl(blobSourceInfo.getBlobPath()); + if (blobSourceInfo.getRawSizeInBytes() > 0L) { + ingestionBlobInfo.setRawDataSize(blobSourceInfo.getRawSizeInBytes()); + } else { + log.warn("Blob '{}' was sent for ingestion without specifying its raw data size", urlWithoutSecrets); + } + + ingestionBlobInfo.setReportLevel(ingestionProperties.getReportLevel().getKustoValue()); + ingestionBlobInfo.setReportMethod(ingestionProperties.getReportMethod().getKustoValue()); + ingestionBlobInfo.setFlushImmediately(ingestionProperties.getFlushImmediately()); + ingestionBlobInfo.setValidationPolicy(ingestionProperties.getValidationPolicy()); + ingestionBlobInfo.setAdditionalProperties(ingestionProperties.getIngestionProperties()); + if (blobSourceInfo.getSourceId() != null) { + ingestionBlobInfo.setId(blobSourceInfo.getSourceId()); + } + + String id = ingestionBlobInfo.getId().toString(); + IngestionStatus status = new IngestionStatus(); + status.setDatabase(ingestionProperties.getDatabaseName()); + status.setTable(ingestionProperties.getTableName()); + status.setStatus(OperationStatus.Queued); + status.setUpdatedOn(Instant.now()); + status.setIngestionSourceId(ingestionBlobInfo.getId()); + status.setIngestionSourcePath(urlWithoutSecrets); + + boolean reportToTable = ingestionProperties.getReportLevel() != IngestionProperties.IngestionReportLevel.NONE && + ingestionProperties.getReportMethod() != IngestionProperties.IngestionReportMethod.QUEUE; + List tableStatuses = new LinkedList<>(); + + if (reportToTable) { + status.setStatus(OperationStatus.Pending); + TableWithSas statusTable = resourceManager.getStatusTable(); + IngestionStatusInTableDescription ingestionStatusInTable = new IngestionStatusInTableDescription(); + ingestionStatusInTable.setTableClient(statusTable.getTable()); + ingestionStatusInTable.setTableConnectionString(statusTable.getUri()); + ingestionStatusInTable.setPartitionKey(ingestionBlobInfo.getId().toString()); + ingestionStatusInTable.setRowKey(ingestionBlobInfo.getId().toString()); + ingestionBlobInfo.setIngestionStatusInTable(ingestionStatusInTable); + + return Mono.fromCallable(() -> { + azureStorageClient.azureTableInsertEntity(statusTable.getTable(), new TableEntity(id, id).setProperties(status.getEntityProperties())); + tableStatuses.add(ingestionBlobInfo.getIngestionStatusInTable()); + return tableStatuses; }) + .publishOn(Schedulers.boundedElastic()) + .flatMap(insertedTableStatuses -> ResourceAlgorithms.postToQueueWithRetriesAsync(resourceManager, azureStorageClient, ingestionBlobInfo) + .thenReturn(new TableReportIngestionResult(insertedTableStatuses))) + .onErrorMap(e -> { + if (e instanceof BlobStorageException || e instanceof QueueStorageException || e instanceof TableServiceException) { + return new IngestionServiceException("Failed to ingest from blob", (Exception) e); + } else if (e instanceof IOException || e instanceof URISyntaxException) { + return new IngestionClientException("Failed to ingest from blob", e); + } else { + return e; + } + }); + } + + return ResourceAlgorithms.postToQueueWithRetriesAsync(resourceManager, azureStorageClient, ingestionBlobInfo) + .thenReturn(new IngestionStatusResult(status)); + }) .flatMap(Function.identity()); } @Override protected Mono ingestFromFileAsyncImpl(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) { return Mono.fromCallable(() -> { - Ensure.argIsNotNull(fileSourceInfo, "fileSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - fileSourceInfo.validate(); - ingestionProperties.validate(); - - String filePath = fileSourceInfo.getFilePath(); - Ensure.fileExists(filePath); - return filePath; - }) + Ensure.argIsNotNull(fileSourceInfo, "fileSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); + fileSourceInfo.validate(); + ingestionProperties.validate(); + + String filePath = fileSourceInfo.getFilePath(); + Ensure.fileExists(filePath); + return filePath; + }) .onErrorMap(IOException.class, e -> new IngestionClientException("Failed to ingest from file", e)) .flatMap(filePath -> { CompressionType sourceCompressionType = IngestionUtils.getCompression(filePath); @@ -197,7 +196,8 @@ protected Mono ingestFromFileAsyncImpl(FileSourceInfo fileSourc file.getName(), ingestionProperties.getDatabaseName(), ingestionProperties.getTableName(), - dataFormat.getKustoValue(), // Used to use an empty string if the DataFormat was empty. Now it can't be empty, with a default of CSV. + dataFormat.getKustoValue(), // Used to use an empty string if the DataFormat was empty. Now it can't be empty, with a default of + // CSV. shouldCompress ? CompressionType.gz : sourceCompressionType); return ResourceAlgorithms.uploadLocalFileWithRetriesAsync(resourceManager, azureStorageClient, file, blobName, shouldCompress) @@ -214,22 +214,21 @@ protected Mono ingestFromFileAsyncImpl(FileSourceInfo fileSourc @Override protected Mono ingestFromStreamAsyncImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) { return Mono.fromCallable(() -> { - Ensure.argIsNotNull(streamSourceInfo, "streamSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); + Ensure.argIsNotNull(streamSourceInfo, "streamSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - streamSourceInfo.validate(); - ingestionProperties.validate(); - return true; - }) + streamSourceInfo.validate(); + ingestionProperties.validate(); + return true; + }) .flatMap(valid -> Mono.fromCallable(() -> { - if (streamSourceInfo.getStream() == null) { - return Mono.error(new IngestionClientException("The provided stream is null.")); - } else if (streamSourceInfo.getStream().available() <= 0) { - return Mono.error(new IngestionClientException("The provided stream is empty.")); - } - return true; - }) - ) + if (streamSourceInfo.getStream() == null) { + return Mono.error(new IngestionClientException("The provided stream is null.")); + } else if (streamSourceInfo.getStream().available() <= 0) { + return Mono.error(new IngestionClientException("The provided stream is empty.")); + } + return true; + })) .flatMap(ignored -> { IngestionProperties.DataFormat dataFormat = ingestionProperties.getDataFormat(); boolean shouldCompress = shouldCompress(streamSourceInfo.getCompressionType(), dataFormat); @@ -238,15 +237,17 @@ protected Mono ingestFromStreamAsyncImpl(StreamSourceInfo strea "StreamUpload", ingestionProperties.getDatabaseName(), ingestionProperties.getTableName(), - dataFormat.getKustoValue(), // Used to use an empty string if the DataFormat was empty. Now it can't be empty, with a default of CSV. + dataFormat.getKustoValue(), // Used to use an empty string if the DataFormat was empty. Now it can't be empty, with a default of + // CSV. shouldCompress ? CompressionType.gz : streamSourceInfo.getCompressionType()); return ResourceAlgorithms.uploadStreamToBlobWithRetriesAsync(resourceManager, - azureStorageClient, - streamSourceInfo.getStream(), - blobName, - shouldCompress) + azureStorageClient, + streamSourceInfo.getStream(), + blobName, + shouldCompress) .flatMap(blobPath -> { - BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath, streamSourceInfo.getRawSizeInBytes(), streamSourceInfo.getSourceId()); + BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath, streamSourceInfo.getRawSizeInBytes(), + streamSourceInfo.getSourceId()); return ingestFromBlobAsync(blobSourceInfo, ingestionProperties); }) .onErrorMap(BlobStorageException.class, e -> new IngestionServiceException("Failed to ingest from stream", e)) @@ -289,20 +290,20 @@ String genBlobName(String fileName, String databaseName, String tableName, Strin @Override protected Mono ingestFromResultSetAsyncImpl(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) { return Mono.fromCallable(() -> { - Ensure.argIsNotNull(resultSetSourceInfo, "resultSetSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - resultSetSourceInfo.validate(); - ingestionProperties.validateResultSetProperties(); - return true; - }) + Ensure.argIsNotNull(resultSetSourceInfo, "resultSetSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); + resultSetSourceInfo.validate(); + ingestionProperties.validateResultSetProperties(); + return true; + }) .flatMap(valid -> Mono.fromCallable(() -> { - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - new CsvRoutines().write(resultSetSourceInfo.getResultSet(), byteArrayOutputStream); - byteArrayOutputStream.flush(); - ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray()); - return new StreamSourceInfo(byteArrayInputStream, false, resultSetSourceInfo.getSourceId()); - }) - .subscribeOn(Schedulers.boundedElastic())) //TODO: same + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + new CsvRoutines().write(resultSetSourceInfo.getResultSet(), byteArrayOutputStream); + byteArrayOutputStream.flush(); + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray()); + return new StreamSourceInfo(byteArrayInputStream, false, resultSetSourceInfo.getSourceId()); + }) + .subscribeOn(Schedulers.boundedElastic())) // TODO: same .flatMap(streamSourceInfo -> ingestFromStreamAsync(streamSourceInfo, ingestionProperties)) .onErrorMap(IOException.class, e -> { String msg = "Failed to read from ResultSet."; diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ResourceAlgorithms.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ResourceAlgorithms.java index 7c6a233c..6be95144 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ResourceAlgorithms.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ResourceAlgorithms.java @@ -83,11 +83,10 @@ private static , TOut> Mono action.apply(resource) - .doOnSuccess(ignored -> resourceManager.reportIngestionResult(resource, true)), - actionName, - attributes - ) + span -> action.apply(resource) + .doOnSuccess(ignored -> resourceManager.reportIngestionResult(resource, true)), + actionName, + attributes) .onErrorResume(e -> { log.warn(String.format("Error during attempt %d of %d for %s.", attempt, RETRY_COUNT, actionName), e); resourceManager.reportIngestionResult(resource, false); @@ -97,28 +96,27 @@ private static , TOut> Mono postToQueueWithRetriesAsync(ResourceManager resourceManager, AzureStorageClient azureStorageClient, IngestionBlobInfo blob) { return Mono.fromCallable(() -> { - ObjectMapper objectMapper = Utils.getObjectMapper(); - String message = objectMapper.writeValueAsString(blob); - List shuffledQueues = resourceManager.getShuffledQueues(); - return new AbstractMap.SimpleImmutableEntry<>(message, shuffledQueues); - }) + ObjectMapper objectMapper = Utils.getObjectMapper(); + String message = objectMapper.writeValueAsString(blob); + List shuffledQueues = resourceManager.getShuffledQueues(); + return new AbstractMap.SimpleImmutableEntry<>(message, shuffledQueues); + }) .flatMap(entry -> { String message = entry.getKey(); List shuffledQueues = entry.getValue(); return resourceActionWithRetriesAsync(resourceManager, shuffledQueues, queue -> Mono.fromCallable(() -> { - azureStorageClient.postMessageToQueue(queue.getQueue(), message); //TODO: offload all sync calls to bounded elastic? + azureStorageClient.postMessageToQueue(queue.getQueue(), message); // TODO: offload all sync calls to bounded elastic? return null; }), "ResourceAlgorithms.postToQueueWithRetriesAsync", - Collections.singletonMap("blob", SecurityUtils.removeSecretsFromUrl(blob.getBlobPath())) - ); + Collections.singletonMap("blob", SecurityUtils.removeSecretsFromUrl(blob.getBlobPath()))); }); } public static Mono uploadStreamToBlobWithRetriesAsync(ResourceManager resourceManager, AzureStorageClient azureStorageClient, InputStream stream, - String blobName, boolean shouldCompress) { + String blobName, boolean shouldCompress) { return Mono.fromCallable(resourceManager::getShuffledContainers) .flatMap(shuffledContainers -> resourceActionWithRetriesAsync(resourceManager, shuffledContainers, @@ -127,12 +125,12 @@ public static Mono uploadStreamToBlobWithRetriesAsync(ResourceManager re return (container.getContainer().getBlobContainerUrl() + "/" + blobName + container.getSas()); }), "ResourceAlgorithms.uploadStreamToBlobWithRetriesAsync", - Collections.emptyMap() - )); + Collections.emptyMap())); } - public static Mono uploadLocalFileWithRetriesAsync(ResourceManager resourceManager, AzureStorageClient azureStorageClient, File file, String blobName, - boolean shouldCompress) { + public static Mono uploadLocalFileWithRetriesAsync(ResourceManager resourceManager, AzureStorageClient azureStorageClient, File file, + String blobName, + boolean shouldCompress) { return Mono.fromCallable(resourceManager::getShuffledContainers) .flatMap(shuffledContainers -> resourceActionWithRetriesAsync(resourceManager, shuffledContainers, @@ -140,8 +138,7 @@ public static Mono uploadLocalFileWithRetriesAsync(ResourceManager resou azureStorageClient.uploadLocalFileToBlob(file, blobName, container.getContainer(), shouldCompress); return container.getContainer().getBlobContainerUrl() + "/" + blobName + container.getSas(); }), "ResourceAlgorithms.uploadLocalFileWithRetriesAsync", - Collections.emptyMap() - )); + Collections.emptyMap())); } @NotNull @@ -152,10 +149,10 @@ public static List roundRobinNestedList(@NotNull List> validResou return IntStream.range(0, longestResourceList).boxed() // This flat maps combines all the inner lists .flatMap(i -> - // For each list, get the i'th element if it exists, or null otherwise (if the list is shorter) - validResources.stream().map(r -> r.size() > i ? r.get(i) : null) - // Remove nulls - .filter(Objects::nonNull)) + // For each list, get the i'th element if it exists, or null otherwise (if the list is shorter) + validResources.stream().map(r -> r.size() > i ? r.get(i) : null) + // Remove nulls + .filter(Objects::nonNull)) // So we combine the list of the first element of each list, then the second element, etc. .collect(Collectors.toList()); } diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/StreamingIngestClient.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/StreamingIngestClient.java index 07311ef0..e32719f5 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/StreamingIngestClient.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/StreamingIngestClient.java @@ -96,12 +96,12 @@ public static String generateEngineUriSuggestion(URI existingEndpoint) throws UR @Override protected Mono ingestFromFileAsyncImpl(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) { return Mono.fromCallable(() -> { - Ensure.argIsNotNull(fileSourceInfo, "fileSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - fileSourceInfo.validate(); - ingestionProperties.validate(); - return IngestionUtils.fileToStream(fileSourceInfo, false, ingestionProperties.getDataFormat()); - }) + Ensure.argIsNotNull(fileSourceInfo, "fileSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); + fileSourceInfo.validate(); + ingestionProperties.validate(); + return IngestionUtils.fileToStream(fileSourceInfo, false, ingestionProperties.getDataFormat()); + }) .onErrorMap(FileNotFoundException.class, e -> { log.error("File not found when ingesting a file.", e); return new IngestionClientException("IO exception - check file path.", e); @@ -112,12 +112,12 @@ protected Mono ingestFromFileAsyncImpl(FileSourceInfo fileSourc @Override protected Mono ingestFromBlobAsyncImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) { return Mono.fromCallable(() -> { - Ensure.argIsNotNull(blobSourceInfo, "blobSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - blobSourceInfo.validate(); - ingestionProperties.validate(); - return true; - }) + Ensure.argIsNotNull(blobSourceInfo, "blobSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); + blobSourceInfo.validate(); + ingestionProperties.validate(); + return true; + }) .flatMap(valid -> { BlobClient blobClient; try { @@ -143,12 +143,12 @@ protected Mono ingestFromBlobAsyncImpl(BlobSourceInfo blobSourc @Override protected Mono ingestFromResultSetAsyncImpl(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) { return Mono.fromCallable(() -> { - Ensure.argIsNotNull(resultSetSourceInfo, "resultSetSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - resultSetSourceInfo.validate(); - ingestionProperties.validateResultSetProperties(); - return IngestionUtils.resultSetToStream(resultSetSourceInfo); - }) + Ensure.argIsNotNull(resultSetSourceInfo, "resultSetSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); + resultSetSourceInfo.validate(); + ingestionProperties.validateResultSetProperties(); + return IngestionUtils.resultSetToStream(resultSetSourceInfo); + }) .flatMap(streamSourceInfo -> ingestFromStreamAsync(streamSourceInfo, ingestionProperties)) .onErrorMap(IOException.class, e -> { String msg = "Failed to read from ResultSet."; @@ -177,30 +177,30 @@ Mono ingestFromStreamAsync(StreamSourceInfo streamSourceInfo, I } private Mono ingestFromStreamImplAsync(StreamSourceInfo streamSourceInfo, - IngestionProperties ingestionProperties, - @Nullable String clientRequestId) { + IngestionProperties ingestionProperties, + @Nullable String clientRequestId) { return Mono.fromCallable(() -> { - Ensure.argIsNotNull(streamSourceInfo, "streamSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - ingestionProperties.validate(); - streamSourceInfo.validate(); + Ensure.argIsNotNull(streamSourceInfo, "streamSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); + ingestionProperties.validate(); + streamSourceInfo.validate(); - return ingestionProperties.getDataFormat(); - }) + return ingestionProperties.getDataFormat(); + }) .onErrorMap(IOException.class, e -> { String msg = ExceptionsUtils.getMessageEx(e); log.error(msg, e); return new IngestionClientException(msg, e); }) .flatMap(dataFormat -> Mono.fromCallable(() -> { - if (IngestClientBase.shouldCompress(streamSourceInfo.getCompressionType(), dataFormat)) { - return compressStream(streamSourceInfo.getStream(), streamSourceInfo.isLeaveOpen()); - } else { - return streamSourceInfo.getStream(); - } - }) - .subscribeOn(Schedulers.boundedElastic()) - .map(stream -> new Object[]{stream, dataFormat}) // Pass both stream and dataFormat downstream + if (IngestClientBase.shouldCompress(streamSourceInfo.getCompressionType(), dataFormat)) { + return compressStream(streamSourceInfo.getStream(), streamSourceInfo.isLeaveOpen()); + } else { + return streamSourceInfo.getStream(); + } + }) + .subscribeOn(Schedulers.boundedElastic()) + .map(stream -> new Object[] {stream, dataFormat}) // Pass both stream and dataFormat downstream ) .flatMap(tuple -> { InputStream stream = (InputStream) tuple[0]; @@ -214,17 +214,16 @@ private Mono ingestFromStreamImplAsync(StreamSourceInfo streamS ClientRequestProperties finalClientRequestProperties = clientRequestProperties; return Mono.fromCallable(() -> { - log.debug("Executing streaming ingest"); - return this.streamingClient.executeStreamingIngestAsync( - ingestionProperties.getDatabaseName(), - ingestionProperties.getTableName(), - stream, - finalClientRequestProperties, - dataFormat.getKustoValue(), - ingestionProperties.getIngestionMapping().getIngestionMappingReference(), - !(streamSourceInfo.getCompressionType() == null || !streamSourceInfo.isLeaveOpen()) - ); - }) + log.debug("Executing streaming ingest"); + return this.streamingClient.executeStreamingIngestAsync( + ingestionProperties.getDatabaseName(), + ingestionProperties.getTableName(), + stream, + finalClientRequestProperties, + dataFormat.getKustoValue(), + ingestionProperties.getIngestionMapping().getIngestionMappingReference(), + !(streamSourceInfo.getCompressionType() == null || !streamSourceInfo.isLeaveOpen())); + }) .flatMap(Function.identity()) .doOnSuccess(ignored -> log.debug("Stream was ingested successfully.")) .then(Mono.fromCallable(() -> { @@ -272,9 +271,9 @@ private InputStream compressStream(InputStream uncompressedStream, boolean leave } Mono ingestFromBlobAsync(BlobSourceInfo blobSourceInfo, - IngestionProperties ingestionProperties, - BlobClient cloudBlockBlob, - @Nullable String clientRequestId) { + IngestionProperties ingestionProperties, + BlobClient cloudBlockBlob, + @Nullable String clientRequestId) { // trace ingestFromBlobAsync return MonitoredActivity.wrap( ingestFromBlobImplAsync(blobSourceInfo, @@ -284,34 +283,34 @@ Mono ingestFromBlobAsync(BlobSourceInfo blobSourceInfo, } private Mono ingestFromBlobImplAsync(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties, BlobClient cloudBlockBlob, - @Nullable String clientRequestId) { + @Nullable String clientRequestId) { return Mono.fromCallable(() -> { - if (blobSourceInfo.getRawSizeInBytes() == 0 && cloudBlockBlob.getProperties().getBlobSize() == 0) { - String message = "Empty blob."; - log.error(message); - throw new IngestionClientException(message); - } - return true; - }).subscribeOn(Schedulers.boundedElastic())//TODO: same + if (blobSourceInfo.getRawSizeInBytes() == 0 && cloudBlockBlob.getProperties().getBlobSize() == 0) { + String message = "Empty blob."; + log.error(message); + throw new IngestionClientException(message); + } + return true; + }).subscribeOn(Schedulers.boundedElastic())// TODO: same .onErrorMap(BlobStorageException.class, e -> new IngestionClientException(String.format("Exception trying to read blob metadata,%s", e.getStatusCode() == HttpStatus.FORBIDDEN ? "this might mean the blob doesn't exist" : ""), e)) .flatMap(ignored -> Mono.fromCallable(() -> { - String blobPath = blobSourceInfo.getBlobPath(); - ClientRequestProperties clientRequestProperties = null; - if (StringUtils.isNotBlank(clientRequestId)) { - clientRequestProperties = new ClientRequestProperties(); - clientRequestProperties.setClientRequestId(clientRequestId); - } + String blobPath = blobSourceInfo.getBlobPath(); + ClientRequestProperties clientRequestProperties = null; + if (StringUtils.isNotBlank(clientRequestId)) { + clientRequestProperties = new ClientRequestProperties(); + clientRequestProperties.setClientRequestId(clientRequestId); + } - IngestionProperties.DataFormat dataFormat = ingestionProperties.getDataFormat(); - return this.streamingClient.executeStreamingIngestFromBlobAsync(ingestionProperties.getDatabaseName(), - ingestionProperties.getTableName(), - blobPath, - clientRequestProperties, - dataFormat.getKustoValue(), - ingestionProperties.getIngestionMapping().getIngestionMappingReference()); - }) + IngestionProperties.DataFormat dataFormat = ingestionProperties.getDataFormat(); + return this.streamingClient.executeStreamingIngestFromBlobAsync(ingestionProperties.getDatabaseName(), + ingestionProperties.getTableName(), + blobPath, + clientRequestProperties, + dataFormat.getKustoValue(), + ingestionProperties.getIngestionMapping().getIngestionMappingReference()); + }) .onErrorMap(DataClientException.class, e -> { log.error(e.getMessage(), e); return new IngestionClientException(e.getMessage(), e); diff --git a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/E2ETest.java b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/E2ETest.java index de6ad9c3..0f0c83d6 100644 --- a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/E2ETest.java +++ b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/E2ETest.java @@ -132,7 +132,7 @@ public static void setUp() { principalFqn = String.format("aadapp=%s;%s", APP_ID, TENANT_ID); ConnectionStringBuilder dmCsb = createConnection(DM_CONN_STR); - dmCsb.setClusterUrl(dmCsb.getClusterUrl().replaceFirst("https://dev", "https://ingest-dev")); //TODO: remove + dmCsb.setClusterUrl(dmCsb.getClusterUrl().replaceFirst("https://dev", "https://ingest-dev")); // TODO: remove dmCsb.setUserNameForTracing("testUser"); try { dmCslClient = ClientFactory.createClient(dmCsb); @@ -172,7 +172,7 @@ public static void setUp() { @AfterAll public static void tearDown() { try { - //queryClient.executeToJsonResult(DB_NAME, String.format(".drop table %s ifexists skip-seal", tableName), null); + // queryClient.executeToJsonResult(DB_NAME, String.format(".drop table %s ifexists skip-seal", tableName), null); ingestClient.close(); managedStreamingIngestClient.close(); } catch (Exception ex) { @@ -206,7 +206,7 @@ private static void createTableAndMapping() { } try { - //queryClient.executeToJsonResult(DB_NAME, ".clear database cache streamingingestion schema", null); + // queryClient.executeToJsonResult(DB_NAME, ".clear database cache streamingingestion schema", null); queryClient.executeToJsonResult(DB_NAME, ".alter table JavaTest_2024_11_30_02_14_23_653_2051561983 policy streamingingestion enable", null); } catch (Exception ex) { Assertions.fail("Failed to refresh cache", ex); @@ -235,7 +235,7 @@ private static void createTestData() { first.setPath("$.rownumber"); ColumnMapping second = new ColumnMapping("rowguid", "string"); second.setPath("$.rowguid"); - ColumnMapping[] columnMapping = new ColumnMapping[]{first, second}; + ColumnMapping[] columnMapping = new ColumnMapping[] {first, second}; ingestionPropertiesWithColumnMapping.setIngestionMapping(columnMapping, IngestionMappingKind.JSON); ingestionPropertiesWithColumnMapping.setDataFormat(DataFormat.JSON); @@ -391,11 +391,13 @@ void testCallbackAndTokenCredentialAuth() throws DataServiceException, URISyntax CloudInfo cloudInfo = CloudInfo.retrieveCloudInfoForCluster(ENG_CONN_STR); TokenRequestContext tokenRequestContext = new TokenRequestContext().addScopes(cloudInfo.determineScope()); - ConnectionStringBuilder syncCallback = ConnectionStringBuilder.createWithAadTokenProviderAuthentication(ENG_CONN_STR, () -> new AzureCliCredentialBuilder().build().getTokenSync(tokenRequestContext).getToken()); + ConnectionStringBuilder syncCallback = ConnectionStringBuilder.createWithAadTokenProviderAuthentication(ENG_CONN_STR, + () -> new AzureCliCredentialBuilder().build().getTokenSync(tokenRequestContext).getToken()); Assertions.assertEquals(1, ClientFactory.createClient(syncCallback).executeMgmt(".show version").getPrimaryResults().count()); - ConnectionStringBuilder asyncCallback = ConnectionStringBuilder.createWithAadAsyncTokenProviderAuthentication(ENG_CONN_STR, new AzureCliCredentialBuilder().build().getToken(tokenRequestContext).map(AccessToken::getToken)); + ConnectionStringBuilder asyncCallback = ConnectionStringBuilder.createWithAadAsyncTokenProviderAuthentication(ENG_CONN_STR, + new AzureCliCredentialBuilder().build().getToken(tokenRequestContext).map(AccessToken::getToken)); Assertions.assertEquals(1, ClientFactory.createClient(asyncCallback).executeMgmt(".show version").getPrimaryResults().count()); @@ -666,7 +668,7 @@ void testPerformanceKustoOperationResultVsJsonVsStreamingQuery() throws DataClie stopWatch.start(); // The InputStream *must* be closed by the caller to prevent memory leaks try (InputStream is = streamingClient.executeStreamingQuery(DB_NAME, query, clientRequestProperties); - BufferedReader br = new BufferedReader(new InputStreamReader(is))) { + BufferedReader br = new BufferedReader(new InputStreamReader(is))) { StringBuilder streamedResult = new StringBuilder(); char[] buffer = new char[65536]; String streamedLine; @@ -715,13 +717,8 @@ void testNoRedirectsCloudFail() { try { Client client = ClientFactory.createClient( ConnectionStringBuilder.createWithAadAccessTokenAuthentication("https://statusreturner.azurewebsites.net/nocloud/" + code, "token")); - try { - client.executeQuery("db", "table"); - Assertions.fail("Expected exception"); - } catch (DataServiceException e) { - Assertions.assertTrue(e.getMessage().contains("" + code)); - Assertions.assertTrue(e.getMessage().contains("metadata")); - } + client.executeQuery("db", "table"); + Assertions.fail("Expected exception"); } catch (Exception e) { return e; } @@ -744,9 +741,6 @@ void testNoRedirectsClientFail() { try { client.executeQuery("db", "table"); Assertions.fail("Expected exception"); - } catch (DataServiceException e) { - Assertions.assertTrue(e.getMessage().contains("" + code)); - Assertions.assertFalse(e.getMessage().contains("metadata")); } catch (Exception e) { return e; } diff --git a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientTest.java b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientTest.java index 275faf57..32e0cbe4 100644 --- a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientTest.java +++ b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientTest.java @@ -91,7 +91,7 @@ static void setUp() throws Exception { void setUpEach() throws IngestionServiceException { doReturn(Collections.singletonList(TestUtils.containerWithSasFromContainerName("storage")), Collections.singletonList(TestUtils.containerWithSasFromContainerName("storage2"))).when(resourceManagerMock) - .getShuffledContainers(); + .getShuffledContainers(); queuedIngestClient = new QueuedIngestClientImpl(resourceManagerMock, azureStorageClientMock); ingestionProperties = new IngestionProperties("dbName", "tableName"); diff --git a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/StreamingIngestClientTest.java b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/StreamingIngestClientTest.java index de7dee95..731f8a5a 100644 --- a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/StreamingIngestClientTest.java +++ b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/StreamingIngestClientTest.java @@ -242,9 +242,8 @@ void ingestFromStreamAsync_JsonWrongMappingKind_IngestionClientException() { ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); ingestionProperties.setIngestionMapping("CsvMapping", IngestionMapping.IngestionMappingKind.CSV); StepVerifier.create(streamingIngestClient.ingestFromStreamAsync(streamSourceInfo, ingestionProperties)) - .expectErrorMatches(e -> - e instanceof IngestionClientException - && e.getMessage().contains("Wrong ingestion mapping for format 'json'; mapping kind should be 'Json', but was 'Csv'.")) + .expectErrorMatches(e -> e instanceof IngestionClientException + && e.getMessage().contains("Wrong ingestion mapping for format 'json'; mapping kind should be 'Json', but was 'Csv'.")) .verify(); } @@ -267,9 +266,8 @@ void ingestFromStreamAsync_AvroWrongMappingKind_IngestionClientException() { ingestionProperties.setDataFormat(IngestionProperties.DataFormat.AVRO); ingestionProperties.setIngestionMapping("CsvMapping", IngestionMapping.IngestionMappingKind.CSV); StepVerifier.create(streamingIngestClient.ingestFromStreamAsync(streamSourceInfo, ingestionProperties)) - .expectErrorMatches(e -> - e instanceof IngestionClientException - && e.getMessage().contains("Wrong ingestion mapping for format 'avro'; mapping kind should be 'Avro', but was 'Csv'.")) + .expectErrorMatches(e -> e instanceof IngestionClientException + && e.getMessage().contains("Wrong ingestion mapping for format 'avro'; mapping kind should be 'Avro', but was 'Csv'.")) .verify(); } @@ -278,8 +276,7 @@ void ingestFromStreamAsync_EmptyStream_IngestionClientException() { InputStream inputStream = new ByteArrayInputStream(new byte[0]); StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream); StepVerifier.create(streamingIngestClient.ingestFromStreamAsync(streamSourceInfo, ingestionProperties)) - .expectErrorMatches(e -> - e instanceof IngestionClientException && e.getMessage().contains("Empty stream.")) + .expectErrorMatches(e -> e instanceof IngestionClientException && e.getMessage().contains("Empty stream.")) .verify(); } @@ -292,9 +289,8 @@ void ingestFromStreamAsync_CaughtDataClientException_IngestionClientException() InputStream inputStream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(data).array()); StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream); StepVerifier.create(streamingIngestClient.ingestFromStreamAsync(streamSourceInfo, ingestionProperties)) - .expectErrorMatches(e -> - e instanceof IngestionClientException - && "DataClientException".equals(e.getMessage())) + .expectErrorMatches(e -> e instanceof IngestionClientException + && "DataClientException".equals(e.getMessage())) .verify(); } @@ -302,15 +298,14 @@ void ingestFromStreamAsync_CaughtDataClientException_IngestionClientException() void ingestFromStream_CaughtDataServiceException_IngestionServiceException() throws Exception { when(streamingClientMock.executeStreamingIngestAsync(any(String.class), any(String.class), any(InputStream.class), isNull(), any(String.class), isNull(), any(boolean.class))) - .thenReturn(Mono.error(new DataServiceException("ingestFromStream", "DataServiceException", true))); + .thenReturn(Mono.error(new DataServiceException("ingestFromStream", "DataServiceException", true))); String data = "Name, Age, Weight, Height"; InputStream inputStream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(data).array()); StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream); StepVerifier.create(streamingIngestClient.ingestFromStreamAsync(streamSourceInfo, ingestionProperties)) - .expectErrorMatches(e -> - e instanceof IngestionServiceException - && "DataServiceException".equals(e.getMessage())) + .expectErrorMatches(e -> e instanceof IngestionServiceException + && "DataServiceException".equals(e.getMessage())) .verify(); } @@ -416,9 +411,8 @@ void ingestFromFileAsync_JsonWrongMappingKind_IngestionClientException() { ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); ingestionProperties.setIngestionMapping("CsvMapping", IngestionMapping.IngestionMappingKind.CSV); StepVerifier.create(streamingIngestClient.ingestFromFileAsync(fileSourceInfo, ingestionProperties)) - .expectErrorMatches(e -> - e instanceof IngestionClientException - && e.getMessage().contains("Wrong ingestion mapping for format 'json'; mapping kind should be 'Json', but was 'Csv'.")) + .expectErrorMatches(e -> e instanceof IngestionClientException + && e.getMessage().contains("Wrong ingestion mapping for format 'json'; mapping kind should be 'Json', but was 'Csv'.")) .verify(); } @@ -436,8 +430,7 @@ void ingestFromFileAsync_EmptyFile_IngestionClientException() { String path = resourcesDirectory + "empty.csv"; FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length()); StepVerifier.create(streamingIngestClient.ingestFromFileAsync(fileSourceInfo, ingestionProperties)) - .expectErrorMatches(e -> - e instanceof IngestionClientException && e.getMessage().contains("Empty file.")) + .expectErrorMatches(e -> e instanceof IngestionClientException && e.getMessage().contains("Empty file.")) .verify(); } @@ -511,9 +504,8 @@ void ingestFromBlobAsync_InvalidBlobPath_IngestionClientException() { String path = "wrongURI"; BlobSourceInfo blobSourceInfo1 = new BlobSourceInfo(path); StepVerifier.create(streamingIngestClient.ingestFromBlobAsync(blobSourceInfo1, ingestionProperties)) - .expectErrorMatches(e -> - e instanceof IngestionClientException - && e.getMessage().contains("Unexpected error when ingesting a blob - Invalid blob path.")) + .expectErrorMatches(e -> e instanceof IngestionClientException + && e.getMessage().contains("Unexpected error when ingesting a blob - Invalid blob path.")) .verify(); } @@ -522,9 +514,8 @@ void ingestFromBlobAsync_BlobNotFound_IngestionClientException() { String path = "https://kustotest.blob.core.windows.net/container/blob.csv"; BlobSourceInfo blobSourceInfo2 = new BlobSourceInfo(path); StepVerifier.create(streamingIngestClient.ingestFromBlobAsync(blobSourceInfo2, ingestionProperties)) - .expectErrorMatches(e -> - e instanceof IngestionClientException - && e.getMessage().contains("Exception trying to read blob metadata")) + .expectErrorMatches(e -> e instanceof IngestionClientException + && e.getMessage().contains("Exception trying to read blob metadata")) .verify(); } @@ -539,9 +530,8 @@ void ingestFromBlob_EmptyBlob_IngestClientException() { when(cloudBlockBlob.getProperties()).thenReturn(blobProperties); StepVerifier.create(streamingIngestClient.ingestFromBlobAsync(blobSourceInfo, ingestionProperties, cloudBlockBlob, null)) - .expectErrorMatches(e -> - e instanceof IngestionClientException - && e.getMessage().contains("Empty blob.")) + .expectErrorMatches(e -> e instanceof IngestionClientException + && e.getMessage().contains("Empty blob.")) .verify(); } @@ -608,9 +598,8 @@ void ingestFromResultSet_EmptyResultSet_IngestionClientException() throws Except ResultSetSourceInfo resultSetSourceInfo = new ResultSetSourceInfo(resultSet); StepVerifier.create(streamingIngestClient.ingestFromResultSetAsync(resultSetSourceInfo, ingestionProperties)) - .expectErrorMatches(e -> - e instanceof IngestionClientException - && e.getMessage().contains("Empty ResultSet.")) + .expectErrorMatches(e -> e instanceof IngestionClientException + && e.getMessage().contains("Empty ResultSet.")) .verify(); } diff --git a/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/Utils.java b/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/Utils.java index 6832ad03..10c0713a 100644 --- a/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/Utils.java +++ b/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/Utils.java @@ -197,7 +197,7 @@ protected static class Ingestion { */ @NotNull protected static IngestionProperties createIngestionProperties(String databaseName, String tableName, IngestionProperties.DataFormat dataFormat, - String mappingName, boolean ignoreFirstRecord) { + String mappingName, boolean ignoreFirstRecord) { IngestionProperties ingestionProperties = new IngestionProperties(databaseName, tableName); ingestionProperties.setDataFormat(dataFormat); // Learn More: For more information about supported data formats, see: https://docs.microsoft.com/azure/data-explorer/ingestion-supported-formats @@ -226,7 +226,7 @@ protected static IngestionProperties createIngestionProperties(String databaseNa * @param ignoreFirstRecord Flag noting whether to ignore the first record in the table */ protected static void ingestFromFile(IngestClient ingestClient, String databaseName, String tableName, String filePath, - IngestionProperties.DataFormat dataFormat, String mappingName, boolean ignoreFirstRecord) { + IngestionProperties.DataFormat dataFormat, String mappingName, boolean ignoreFirstRecord) { IngestionProperties ingestionProperties = createIngestionProperties(databaseName, tableName, dataFormat, mappingName, ignoreFirstRecord); // Tip 1: For optimal ingestion batching and performance, specify the uncompressed data size in the file descriptor (e.g. fileToIngest.length()) @@ -250,7 +250,7 @@ protected static void ingestFromFile(IngestClient ingestClient, String databaseN * @param ignoreFirstRecord Flag noting whether to ignore the first record in the table */ protected static void ingestFromBlob(IngestClient ingestClient, String databaseName, String tableName, String blobUrl, - IngestionProperties.DataFormat dataFormat, String mappingName, boolean ignoreFirstRecord) { + IngestionProperties.DataFormat dataFormat, String mappingName, boolean ignoreFirstRecord) { IngestionProperties ingestionProperties = createIngestionProperties(databaseName, tableName, dataFormat, mappingName, ignoreFirstRecord); // Tip 1: For optimal ingestion batching and performance,specify the uncompressed data size in the file descriptor instead of the default below of 0