Skip to content

Commit

Permalink
better error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
georgebanasios committed Dec 24, 2024
1 parent cc87828 commit 509a1c2
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 50 deletions.
53 changes: 26 additions & 27 deletions data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -299,38 +299,37 @@ private Mono<KustoOperationResult> executeStreamingIngest(String clusterEndpoint
}
}

return Mono.fromCallable(() -> {
BinaryData data;
if (isStreamSource) {
// We use UncloseableStream to prevent HttpClient from closing the stream
data = BinaryData.fromStream(new UncloseableStream(stream));
} else {
data = BinaryData.fromString(new IngestionSourceStorage(blobUrl).toString());
}
BinaryData data;
if (isStreamSource) {
// We use UncloseableStream to prevent HttpClient from closing the stream
data = BinaryData.fromStream(new UncloseableStream(stream));
} else {
data = BinaryData.fromString(new IngestionSourceStorage(blobUrl).toString());
}

HttpTracing tracing = HttpTracing
.newBuilder()
.withProperties(properties)
.withRequestPrefix("KJC.executeStreamingIngest" + (isStreamSource ? "" : "FromBlob"))
.withActivitySuffix(CommandType.STREAMING_INGEST.getActivityTypeSuffix())
.withClientDetails(clientDetails)
.build();

return HttpRequestBuilder
.newPost(clusterEndpoint)
.withTracing(tracing)
.withHeaders(headers)
.withAuthorization(authorizationToken)
.withContentType(contentType)
.withContentEncoding(contentEncoding)
.withBody(data)
.build();
}).flatMap(httpRequest -> MonitoredActivity.wrap(postAsync(httpRequest, timeoutMs), "ClientImpl.executeStreamingIngest")
HttpTracing tracing = HttpTracing
.newBuilder()
.withProperties(properties)
.withRequestPrefix("KJC.executeStreamingIngest" + (isStreamSource ? "" : "FromBlob"))
.withActivitySuffix(CommandType.STREAMING_INGEST.getActivityTypeSuffix())
.withClientDetails(clientDetails)
.build();

HttpRequest httpRequest = HttpRequestBuilder
.newPost(clusterEndpoint)
.withTracing(tracing)
.withHeaders(headers)
.withAuthorization(authorizationToken)
.withContentType(contentType)
.withContentEncoding(contentEncoding)
.withBody(data)
.build();
return MonitoredActivity.wrap(postAsync(httpRequest, timeoutMs), "ClientImpl.executeStreamingIngest")
.flatMap(response -> Mono.fromCallable(() -> new KustoOperationResult(response, "v1")).subscribeOn(Schedulers.boundedElastic()))
.onErrorMap(KustoServiceQueryError.class,
e -> new DataClientException(clusterEndpoint, "Error converting json response to KustoOperationResult:" + e.getMessage(),
e))
.onErrorMap(IOException.class, e -> new DataClientException(clusterUrl, e.getMessage(), e)))
.onErrorMap(IOException.class, e -> new DataClientException(clusterUrl, e.getMessage(), e))
.doFinally(signalType -> {
if (isStreamSource && !leaveOpen) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import reactor.util.retry.Retry;

public class CloudInfo implements TraceableAttributes, Serializable {
Expand Down Expand Up @@ -90,23 +92,31 @@ public static CloudInfo retrieveCloudInfoForCluster(String clusterUrl) {

public static Mono<CloudInfo> retrieveCloudInfoForClusterAsync(String clusterUrl, @Nullable HttpClient givenHttpClient) {

// Ensure that if multiple threads request the cloud info for the same cluster url, only one http call will be made
// We ensure that if multiple threads request the cloud info for the same cluster url, only one http call will be made
// for all corresponding threads
return Mono.fromCallable(() -> new UrlPair(UriUtils.setPathForUri(clusterUrl, ""), UriUtils.setPathForUri(clusterUrl, METADATA_ENDPOINT)))
.flatMap(urls -> cache.computeIfAbsent(urls.clusterEndpoint, key -> fetchCloudInfoAsync(urls, givenHttpClient)
.retryWhen(RETRY_CONFIG)
.onErrorMap(e -> ExceptionUtils.unwrapCloudInfoException(urls.clusterEndpoint, e))));
try {
Tuple2<String, String> clusterUrls = Tuples.of(
UriUtils.setPathForUri(clusterUrl, ""), // Cluster endpoint
UriUtils.setPathForUri(clusterUrl, METADATA_ENDPOINT) // Metadata endpoint is always on the root of the cluster
);
return cache.computeIfAbsent(clusterUrls.getT1(), key -> fetchCloudInfoAsync(clusterUrls, givenHttpClient)
.retryWhen(RETRY_CONFIG)
.onErrorMap(e -> ExceptionUtils.unwrapCloudInfoException(clusterUrls.getT1(), e)));
} catch (URISyntaxException e) {
throw new DataServiceException(clusterUrl,
"URISyntaxException when trying to retrieve cluster metadata: " + e.getMessage(), e, true);
}
}

private static Mono<CloudInfo> fetchCloudInfoAsync(UrlPair clusterUrls, @Nullable HttpClient givenHttpClient) {
private static Mono<CloudInfo> fetchCloudInfoAsync(Tuple2<String, String> clusterUrls, @Nullable HttpClient givenHttpClient) {
HttpClient localHttpClient = givenHttpClient == null ? HttpClientFactory.create(null) : givenHttpClient;
HttpRequest request = new HttpRequest(HttpMethod.GET, clusterUrls.metadataEndpoint);
HttpRequest request = new HttpRequest(HttpMethod.GET, clusterUrls.getT2());
request.setHeader(HttpHeaderName.ACCEPT_ENCODING, "gzip,deflate");
request.setHeader(HttpHeaderName.ACCEPT, "application/json");

return MonitoredActivity.wrap(localHttpClient.send(request, RequestUtils.contextWithTimeout(CLOUD_INFO_TIMEOUT)),
"CloudInfo.httpCall")
.flatMap(response -> getCloudInfo(response, clusterUrls.clusterEndpoint))
.flatMap(response -> getCloudInfo(response, clusterUrls.getT1()))
.doFinally(ignore -> {
if (givenHttpClient == null && localHttpClient instanceof Closeable) {
try {
Expand Down Expand Up @@ -225,13 +235,4 @@ public String determineScope() throws URISyntaxException {
return resourceUrl + ".default";
}

private static class UrlPair {
final String clusterEndpoint;
final String metadataEndpoint;

UrlPair(String clusterEndpoint, String metadataEndpoint) {
this.clusterEndpoint = clusterEndpoint;
this.metadataEndpoint = metadataEndpoint;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URISyntaxException;
import java.net.URL;

import com.microsoft.azure.kusto.data.Utils;
Expand Down Expand Up @@ -34,11 +33,6 @@ public static String getMessageEx(Exception e) {
}

public static Exception unwrapCloudInfoException(String clusterUrl, Throwable throwable) {
if (throwable instanceof URISyntaxException) {
return new DataServiceException(clusterUrl, "URISyntaxException when trying to retrieve cluster metadata: " + throwable.getMessage(),
(URISyntaxException) throwable, true);
}

if (throwable instanceof IOException) {
IOException ex = (IOException) throwable;
if (!Utils.isRetriableIOException(ex)) {
Expand Down

0 comments on commit 509a1c2

Please sign in to comment.