Skip to content

Commit

Permalink
fix resource action with retries
Browse files Browse the repository at this point in the history
  • Loading branch information
georgebanasios committed Nov 30, 2024
1 parent a288d43 commit f34d3a4
Show file tree
Hide file tree
Showing 9 changed files with 331 additions and 306 deletions.
102 changes: 51 additions & 51 deletions data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -302,59 +302,59 @@ private Mono<KustoOperationResult> executeStreamingIngest(String clusterEndpoint
String contentEncoding = isStreamSource ? "gzip" : null;
String contentType = isStreamSource ? "application/octet-stream" : "application/json";

try {
long timeoutMs = determineTimeout(properties, CommandType.STREAMING_INGEST, clusterUrl);

// This was a separate method but was moved into the body of this method because it performs a side effect
if (properties != null) {
Iterator<Map.Entry<String, Object>> iterator = properties.getOptions();
while (iterator.hasNext()) {
Map.Entry<String, Object> pair = iterator.next();
headers.put(pair.getKey(), pair.getValue().toString());
}
}
return Mono.fromCallable(() -> determineTimeout(properties, CommandType.STREAMING_INGEST, clusterUrl)) // Step 1: Determine timeout
.flatMap(timeoutMs -> {

// This was a separate method but was moved into the body of this method because it performs a side effect
if (properties != null) {
Iterator<Map.Entry<String, Object>> iterator = properties.getOptions();
while (iterator.hasNext()) {
Map.Entry<String, Object> pair = iterator.next();
headers.put(pair.getKey(), pair.getValue().toString());
}
}

try (InputStream ignored = (isStreamSource && !leaveOpen) ? stream : null) {
BinaryData data;

if (isStreamSource) {

// We use UncloseableStream to prevent HttpClient From closing it
data = BinaryData.fromStream(new UncloseableStream(stream));
} else {
data = BinaryData.fromString(new IngestionSourceStorage(blobUrl).toString());
}

HttpTracing tracing = HttpTracing
.newBuilder()
.withProperties(properties)
.withRequestPrefix("KJC.executeStreamingIngest" + (isStreamSource ? "" : "FromBlob"))
.withActivitySuffix(CommandType.STREAMING_INGEST.getActivityTypeSuffix())
.withClientDetails(clientDetails)
.build();

// Build the HTTP request. Since this is an ingestion and not a command, content headers aren't auto-applied.
HttpRequest request = HttpRequestBuilder
.newPost(clusterEndpoint)
.withTracing(tracing)
.withHeaders(headers)
.withAuthorization(authorizationToken)
.withContentType(contentType)
.withContentEncoding(contentEncoding)
.withBody(data)
.build();

// Get the response, and trace the call.
return MonitoredActivity.wrap(postAsync(request, timeoutMs), "ClientImpl.executeStreamingIngest")
.flatMap(response ->
Mono.fromCallable(() -> new KustoOperationResult(response, "v1")))
.onErrorMap(KustoServiceQueryError.class, e -> new DataClientException(clusterEndpoint, "Error converting json response to KustoOperationResult:" + e.getMessage(), e))
.onErrorMap(IOException.class, e -> new DataClientException(clusterUrl, e.getMessage(), e));
}
} catch (Exception e) {
return Mono.error(e);
}
return Mono.fromCallable(() -> {
BinaryData data;
if (isStreamSource) {
// We use UncloseableStream to prevent HttpClient from closing the stream
data = BinaryData.fromStream(new UncloseableStream(stream));
} else {
data = BinaryData.fromString(new IngestionSourceStorage(blobUrl).toString());
}

HttpTracing tracing = HttpTracing
.newBuilder()
.withProperties(properties)
.withRequestPrefix("KJC.executeStreamingIngest" + (isStreamSource ? "" : "FromBlob"))
.withActivitySuffix(CommandType.STREAMING_INGEST.getActivityTypeSuffix())
.withClientDetails(clientDetails)
.build();

return HttpRequestBuilder
.newPost(clusterEndpoint)
.withTracing(tracing)
.withHeaders(headers)
.withAuthorization(authorizationToken)
.withContentType(contentType)
.withContentEncoding(contentEncoding)
.withBody(data)
.build();
}).flatMap(httpRequest -> MonitoredActivity.wrap(postAsync(httpRequest, timeoutMs), "ClientImpl.executeStreamingIngest")
.flatMap(response ->
Mono.fromCallable(() -> new KustoOperationResult(response, "v1")))
.onErrorMap(KustoServiceQueryError.class, e -> new DataClientException(clusterEndpoint, "Error converting json response to KustoOperationResult:" + e.getMessage(), e))
.onErrorMap(IOException.class, e -> new DataClientException(clusterUrl, e.getMessage(), e)));
})
.doFinally(signalType -> {
if (isStreamSource && !leaveOpen) {
try {
stream.close();
} catch (IOException e) {
LOGGER.debug("executeStreamingIngest: Error while closing the stream.", e);
}
}
});
}

private String buildClusterEndpoint(String database, String table, String format, String mappingName) {
Expand Down
3 changes: 2 additions & 1 deletion data/src/main/java/com/microsoft/azure/kusto/data/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.github.resilience4j.retry.RetryConfig;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import javax.net.ssl.SSLException;
import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -154,7 +155,7 @@ private static Mono<String> processGzipBody(Flux<ByteBuffer> body) {
try (GZIPInputStream gzipStream = new GZIPInputStream(new SequenceInputStream(Collections.enumeration(inputStreams)))) {
return readStreamToString(gzipStream);
}
})
}).subscribeOn(Schedulers.boundedElastic()) //TODO: same as ingest module
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ public static <T> Mono<T> wrap(Mono<T> mono, String nameOfSpan, Map<String, Stri
.flatMap(span -> mono.doOnTerminate(span::close));
}

public static <T, U extends Exception> Mono<T> invokeAsync(FunctionOneException<Mono<T>, Tracer.Span, U> function, String nameOfSpan, Map<String, String> attributes) {
public static <T, U extends Exception> Mono<T> invokeAsync(FunctionOneException<Mono<T>, Tracer.Span, U> function,
String nameOfSpan,
Map<String, String> attributes) {
return Mono.defer(() -> {
Tracer.Span span = Tracer.startSpan(nameOfSpan, attributes);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,11 @@ static boolean isReservedHostname(String rawUri) {
}

protected abstract IngestionResult ingestFromFileImpl(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException;
throws IngestionClientException, IngestionServiceException; //TODO: remove

public IngestionResult ingestFromFile(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException {
// trace ingestFromFile
return MonitoredActivity.invoke(
(SupplierTwoExceptions<IngestionResult, IngestionClientException, IngestionServiceException>) () -> ingestFromFileImpl(fileSourceInfo,
ingestionProperties),
getClientType().concat(".ingestFromFile"));
return ingestFromFileAsync(fileSourceInfo, ingestionProperties).block();
}

protected abstract Mono<IngestionResult> ingestFromFileAsyncImpl(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,20 +295,20 @@ protected IngestionResult ingestFromFileImpl(FileSourceInfo fileSourceInfo, Inge

@Override
protected Mono<IngestionResult> ingestFromFileAsyncImpl(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) {
Ensure.argIsNotNull(fileSourceInfo, "fileSourceInfo");
Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");

try {
fileSourceInfo.validate();
ingestionProperties.validate();
StreamSourceInfo streamSourceInfo = IngestionUtils.fileToStream(fileSourceInfo, true, ingestionProperties.getDataFormat());
return ingestFromStreamAsync(streamSourceInfo, ingestionProperties);
} catch (FileNotFoundException e) {
log.error("File not found when ingesting a file.", e);
return Mono.error(new IngestionClientException("IO exception - check file path.", e));
} catch (Exception e) {
return Mono.error(e);
}
return Mono.fromCallable(() -> {
Ensure.argIsNotNull(fileSourceInfo, "fileSourceInfo");
Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
fileSourceInfo.validate();
ingestionProperties.validate();
return true;
})
.flatMap(validInput -> Mono.fromCallable(() -> IngestionUtils.fileToStream(fileSourceInfo, true, ingestionProperties.getDataFormat()))
.flatMap(streamSourceInfo -> ingestFromStreamAsync(streamSourceInfo, ingestionProperties)))
.onErrorMap(FileNotFoundException.class, e -> {
log.error("File not found when ingesting a file.", e);
return new IngestionClientException("IO exception - check file path.", e);
});
}

/**
Expand Down Expand Up @@ -357,10 +357,10 @@ protected IngestionResult ingestFromBlobImpl(BlobSourceInfo blobSourceInfo, Inge

@Override
protected Mono<IngestionResult> ingestFromBlobAsyncImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) {
Ensure.argIsNotNull(blobSourceInfo, "blobSourceInfo");
Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");

return Mono.fromCallable(() -> {
Ensure.argIsNotNull(blobSourceInfo, "blobSourceInfo");
Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
blobSourceInfo.validate();
ingestionProperties.validate();
return true;
Expand Down Expand Up @@ -507,22 +507,20 @@ protected IngestionResult ingestFromResultSetImpl(ResultSetSourceInfo resultSetS

@Override
protected Mono<IngestionResult> ingestFromResultSetAsyncImpl(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) {
Ensure.argIsNotNull(resultSetSourceInfo, "resultSetSourceInfo");
Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");

try {
resultSetSourceInfo.validate();
ingestionProperties.validateResultSetProperties();

StreamSourceInfo streamSourceInfo = IngestionUtils.resultSetToStream(resultSetSourceInfo);
return ingestFromStreamAsync(streamSourceInfo, ingestionProperties);
} catch (IOException ex) {
String msg = "Failed to read from ResultSet.";
log.error(msg, ex);
return Mono.error(new IngestionClientException(msg, ex));
} catch (Exception e) {
return Mono.error(e);
}
return Mono.fromCallable(() -> {
Ensure.argIsNotNull(resultSetSourceInfo, "resultSetSourceInfo");
Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
resultSetSourceInfo.validate();
ingestionProperties.validateResultSetProperties();
return IngestionUtils.resultSetToStream(resultSetSourceInfo);
})
.onErrorMap(IOException.class, e -> {
String msg = "Failed to read from ResultSet.";
log.error(msg, e);
return new IngestionClientException(msg, e);
})
.flatMap(streamSourceInfo -> ingestFromStreamAsync(streamSourceInfo, ingestionProperties));
}

@Override
Expand Down Expand Up @@ -601,7 +599,7 @@ protected IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo

@Override
protected Mono<IngestionResult> ingestFromStreamAsyncImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) {
return null;
return null; //TODO: implement async
}

/*
Expand Down
Loading

0 comments on commit f34d3a4

Please sign in to comment.