diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java b/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java index 1ee7180d..43bf956c 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java @@ -243,9 +243,7 @@ public KustoOperationResult executeStreamingIngest(String database, String table @Override public Mono executeStreamingIngestAsync(String database, String table, InputStream stream, ClientRequestProperties properties, String streamFormat, String mappingName, boolean leaveOpen) { - if (stream == null) { - throw new IllegalArgumentException("The provided stream is null."); - } + Ensure.argIsNotNull(stream, "stream"); return Mono.defer(() -> { String clusterEndpoint = buildClusterEndpoint(database, table, streamFormat, mappingName); @@ -262,9 +260,7 @@ public KustoOperationResult executeStreamingIngestFromBlob(String database, Stri @Override public Mono executeStreamingIngestFromBlobAsync(String database, String table, String blobUrl, ClientRequestProperties properties, String dataFormat, String mappingName) { - if (blobUrl == null) { - throw new IllegalArgumentException("The provided blobUrl is null."); - } + Ensure.argIsNotNull(blobUrl, "blobUrl"); return Mono.defer(() -> { String clusterEndpoint = buildClusterEndpoint(database, table, dataFormat, mappingName) @@ -299,38 +295,37 @@ private Mono executeStreamingIngest(String clusterEndpoint } } - return Mono.fromCallable(() -> { - BinaryData data; - if (isStreamSource) { - // We use UncloseableStream to prevent HttpClient from closing the stream - data = BinaryData.fromStream(new UncloseableStream(stream)); - } else { - data = BinaryData.fromString(new IngestionSourceStorage(blobUrl).toString()); - } + BinaryData data; + if (isStreamSource) { + // We use UncloseableStream to prevent HttpClient from closing the stream + data = BinaryData.fromStream(new UncloseableStream(stream)); + } else { + data = BinaryData.fromString(new IngestionSourceStorage(blobUrl).toString()); + } - HttpTracing tracing = HttpTracing - .newBuilder() - .withProperties(properties) - .withRequestPrefix("KJC.executeStreamingIngest" + (isStreamSource ? "" : "FromBlob")) - .withActivitySuffix(CommandType.STREAMING_INGEST.getActivityTypeSuffix()) - .withClientDetails(clientDetails) - .build(); - - return HttpRequestBuilder - .newPost(clusterEndpoint) - .withTracing(tracing) - .withHeaders(headers) - .withAuthorization(authorizationToken) - .withContentType(contentType) - .withContentEncoding(contentEncoding) - .withBody(data) - .build(); - }).flatMap(httpRequest -> MonitoredActivity.wrap(postAsync(httpRequest, timeoutMs), "ClientImpl.executeStreamingIngest") + HttpTracing tracing = HttpTracing + .newBuilder() + .withProperties(properties) + .withRequestPrefix("KJC.executeStreamingIngest" + (isStreamSource ? "" : "FromBlob")) + .withActivitySuffix(CommandType.STREAMING_INGEST.getActivityTypeSuffix()) + .withClientDetails(clientDetails) + .build(); + + HttpRequest httpRequest = HttpRequestBuilder + .newPost(clusterEndpoint) + .withTracing(tracing) + .withHeaders(headers) + .withAuthorization(authorizationToken) + .withContentType(contentType) + .withContentEncoding(contentEncoding) + .withBody(data) + .build(); + return MonitoredActivity.wrap(postAsync(httpRequest, timeoutMs), "ClientImpl.executeStreamingIngest") .flatMap(response -> Mono.fromCallable(() -> new KustoOperationResult(response, "v1")).subscribeOn(Schedulers.boundedElastic())) .onErrorMap(KustoServiceQueryError.class, e -> new DataClientException(clusterEndpoint, "Error converting json response to KustoOperationResult:" + e.getMessage(), e)) - .onErrorMap(IOException.class, e -> new DataClientException(clusterUrl, e.getMessage(), e))) + .onErrorMap(IOException.class, e -> new DataClientException(clusterUrl, e.getMessage(), e)) .doFinally(signalType -> { if (isStreamSource && !leaveOpen) { try { diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/auth/CloudInfo.java b/data/src/main/java/com/microsoft/azure/kusto/data/auth/CloudInfo.java index db2210b7..d6649360 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/auth/CloudInfo.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/auth/CloudInfo.java @@ -35,6 +35,8 @@ import reactor.core.Exceptions; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; import reactor.util.retry.Retry; public class CloudInfo implements TraceableAttributes, Serializable { @@ -90,23 +92,31 @@ public static CloudInfo retrieveCloudInfoForCluster(String clusterUrl) { public static Mono retrieveCloudInfoForClusterAsync(String clusterUrl, @Nullable HttpClient givenHttpClient) { - // Ensure that if multiple threads request the cloud info for the same cluster url, only one http call will be made + // We ensure that if multiple threads request the cloud info for the same cluster url, only one http call will be made // for all corresponding threads - return Mono.fromCallable(() -> new UrlPair(UriUtils.setPathForUri(clusterUrl, ""), UriUtils.setPathForUri(clusterUrl, METADATA_ENDPOINT))) - .flatMap(urls -> cache.computeIfAbsent(urls.clusterEndpoint, key -> fetchCloudInfoAsync(urls, givenHttpClient) - .retryWhen(RETRY_CONFIG) - .onErrorMap(e -> ExceptionUtils.unwrapCloudInfoException(urls.clusterEndpoint, e)))); + try { + Tuple2 clusterUrls = Tuples.of( + UriUtils.setPathForUri(clusterUrl, ""), // Cluster endpoint + UriUtils.setPathForUri(clusterUrl, METADATA_ENDPOINT) // Metadata endpoint is always on the root of the cluster + ); + return cache.computeIfAbsent(clusterUrls.getT1(), key -> fetchCloudInfoAsync(clusterUrls, givenHttpClient) + .retryWhen(RETRY_CONFIG) + .onErrorMap(e -> ExceptionUtils.unwrapCloudInfoException(clusterUrls.getT1(), e))); + } catch (URISyntaxException e) { + throw new DataServiceException(clusterUrl, + "URISyntaxException when trying to retrieve cluster metadata: " + e.getMessage(), e, true); + } } - private static Mono fetchCloudInfoAsync(UrlPair clusterUrls, @Nullable HttpClient givenHttpClient) { + private static Mono fetchCloudInfoAsync(Tuple2 clusterUrls, @Nullable HttpClient givenHttpClient) { HttpClient localHttpClient = givenHttpClient == null ? HttpClientFactory.create(null) : givenHttpClient; - HttpRequest request = new HttpRequest(HttpMethod.GET, clusterUrls.metadataEndpoint); + HttpRequest request = new HttpRequest(HttpMethod.GET, clusterUrls.getT2()); request.setHeader(HttpHeaderName.ACCEPT_ENCODING, "gzip,deflate"); request.setHeader(HttpHeaderName.ACCEPT, "application/json"); return MonitoredActivity.wrap(localHttpClient.send(request, RequestUtils.contextWithTimeout(CLOUD_INFO_TIMEOUT)), "CloudInfo.httpCall") - .flatMap(response -> getCloudInfo(response, clusterUrls.clusterEndpoint)) + .flatMap(response -> getCloudInfo(response, clusterUrls.getT1())) .doFinally(ignore -> { if (givenHttpClient == null && localHttpClient instanceof Closeable) { try { @@ -225,13 +235,4 @@ public String determineScope() throws URISyntaxException { return resourceUrl + ".default"; } - private static class UrlPair { - final String clusterEndpoint; - final String metadataEndpoint; - - UrlPair(String clusterEndpoint, String metadataEndpoint) { - this.clusterEndpoint = clusterEndpoint; - this.metadataEndpoint = metadataEndpoint; - } - } } diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/exceptions/ExceptionUtils.java b/data/src/main/java/com/microsoft/azure/kusto/data/exceptions/ExceptionUtils.java index 3910d757..c3091ee9 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/exceptions/ExceptionUtils.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/exceptions/ExceptionUtils.java @@ -2,7 +2,6 @@ import java.io.IOException; import java.io.UncheckedIOException; -import java.net.URISyntaxException; import java.net.URL; import com.microsoft.azure.kusto.data.Utils; @@ -34,11 +33,6 @@ public static String getMessageEx(Exception e) { } public static Exception unwrapCloudInfoException(String clusterUrl, Throwable throwable) { - if (throwable instanceof URISyntaxException) { - return new DataServiceException(clusterUrl, "URISyntaxException when trying to retrieve cluster metadata: " + throwable.getMessage(), - (URISyntaxException) throwable, true); - } - if (throwable instanceof IOException) { IOException ex = (IOException) throwable; if (!Utils.isRetriableIOException(ex)) {