Skip to content

Commit

Permalink
improves cloud info retrieval & kusto formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
georgebanasios committed Dec 4, 2024
1 parent 7894680 commit 46a3e92
Show file tree
Hide file tree
Showing 22 changed files with 452 additions and 468 deletions.
19 changes: 9 additions & 10 deletions data/src/main/java/com/microsoft/azure/kusto/data/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,10 @@ public BaseClient(HttpClient httpClient) {

protected Mono<String> postAsync(HttpRequest request, long timeoutMs) {
return httpClient.send(request, getContextTimeout(timeoutMs))
.flatMap(response ->
Mono.using(
() -> response,
this::processResponseBodyAsync,
HttpResponse::close
))
.flatMap(response -> Mono.using(
() -> response,
this::processResponseBodyAsync,
HttpResponse::close))
.onErrorMap(e -> {
if (e instanceof DataServiceException) {
return e;
Expand All @@ -82,19 +80,20 @@ public Mono<String> processResponseBodyAsync(HttpResponse response) {
}

protected Mono<InputStream> postToStreamingOutputAsync(HttpRequest request, long timeoutMs,
int currentRedirectCounter, int maxRedirectCount) {
int currentRedirectCounter, int maxRedirectCount) {
ResponseState state = new ResponseState();
return httpClient.send(request, getContextTimeout(timeoutMs))
.flatMap(httpResponse -> processHttpResponse(httpResponse, state, request, timeoutMs, currentRedirectCounter, maxRedirectCount))
.onErrorMap(IOException.class, e -> new DataServiceException(request.getUrl().toString(),
"postToStreamingOutput failed to get or decompress response stream", e, false))
.onErrorMap(UncheckedIOException.class, e -> ExceptionUtils.createExceptionOnPost(e, request.getUrl(), "streaming async"))
.onErrorMap(Exception.class, e -> createExceptionFromResponse(request.getUrl().toString(), state.getHttpResponse(), e, state.getErrorFromResponse()))
.onErrorMap(Exception.class,
e -> createExceptionFromResponse(request.getUrl().toString(), state.getHttpResponse(), e, state.getErrorFromResponse()))
.doFinally(ignored -> closeResourcesIfNeeded(state.isReturnInputStream(), state.getHttpResponse()));
}

private Mono<InputStream> processHttpResponse(HttpResponse httpResponse, ResponseState state, HttpRequest request,
long timeoutMs, int currentRedirectCounter, int maxRedirectCount) {
long timeoutMs, int currentRedirectCounter, int maxRedirectCount) {
try {
state.setHttpResponse(httpResponse);
int responseStatusCode = httpResponse.getStatusCode();
Expand Down Expand Up @@ -122,7 +121,7 @@ private static Mono<InputStream> handleSuccessfulResponse(HttpResponse httpRespo
}

private Mono<InputStream> handleErrorResponse(HttpResponse httpResponse, ResponseState state, HttpRequest request,
long timeoutMs, int currentRedirectCounter, int maxRedirectCount) {
long timeoutMs, int currentRedirectCounter, int maxRedirectCount) {
return httpResponse.getBodyAsByteArray()
.flatMap(bytes -> {
try {
Expand Down
37 changes: 18 additions & 19 deletions data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,7 @@ Mono<KustoRequestContext> prepareRequestAsync(@NotNull KustoRequest kr) {
HttpTracing tracing = buildTracing(kr);

return validateEndpointAsync()
.then(authorization.flatMap(token ->
buildKustoRequestContext(kr, clusterEndpoint, tracing, token))
.then(authorization.flatMap(token -> buildKustoRequestContext(kr, clusterEndpoint, tracing, token))
.switchIfEmpty(buildKustoRequestContext(kr, clusterEndpoint, tracing, null)));
}

Expand Down Expand Up @@ -236,26 +235,26 @@ private Mono<Void> validateEndpointAsync() {
if (endpointValidated) {
return Mono.empty();
}

return CloudInfo.retrieveCloudInfoForClusterAsync(clusterUrl, null)
.map(CloudInfo::getLoginEndpoint)
.flatMap(loginEndpoint -> Mono.fromCallable(() -> {
KustoTrustedEndpoints.validateTrustedEndpoint(clusterUrl, loginEndpoint);
return true;
}
))
KustoTrustedEndpoints.validateTrustedEndpoint(clusterUrl, loginEndpoint);
return true;
}))
.doOnSuccess(ignored -> endpointValidated = true)
.then();
}

@Override
public KustoOperationResult executeStreamingIngest(String database, String table, InputStream stream, ClientRequestProperties properties,
String streamFormat, String mappingName, boolean leaveOpen) {
String streamFormat, String mappingName, boolean leaveOpen) {
return executeStreamingIngestAsync(database, table, stream, properties, streamFormat, mappingName, leaveOpen).block();
}

@Override
public Mono<KustoOperationResult> executeStreamingIngestAsync(String database, String table, InputStream stream, ClientRequestProperties properties,
String streamFormat, String mappingName, boolean leaveOpen) {
String streamFormat, String mappingName, boolean leaveOpen) {
if (stream == null) {
return Mono.error(new IllegalArgumentException("The provided stream is null."));
}
Expand All @@ -268,13 +267,13 @@ public Mono<KustoOperationResult> executeStreamingIngestAsync(String database, S

@Override
public KustoOperationResult executeStreamingIngestFromBlob(String database, String table, String blobUrl, ClientRequestProperties properties,
String dataFormat, String mappingName) {
String dataFormat, String mappingName) {
return executeStreamingIngestFromBlobAsync(database, table, blobUrl, properties, dataFormat, mappingName).block();
}

@Override
public Mono<KustoOperationResult> executeStreamingIngestFromBlobAsync(String database, String table, String blobUrl, ClientRequestProperties properties,
String dataFormat, String mappingName) {
String dataFormat, String mappingName) {
if (blobUrl == null) {
return Mono.error(new IllegalArgumentException("The provided blobUrl is null."));
}
Expand All @@ -287,22 +286,21 @@ public Mono<KustoOperationResult> executeStreamingIngestFromBlobAsync(String dat
}

private Mono<KustoOperationResult> executeStreamingIngestImplAsync(String clusterEndpoint, InputStream stream, String blobUrl,
ClientRequestProperties properties, boolean leaveOpen) {
ClientRequestProperties properties, boolean leaveOpen) {
return validateEndpointAsync()
.then(getAuthorizationHeaderValueAsync()
.flatMap(token ->
executeStreamingIngest(clusterEndpoint, stream, blobUrl, properties, leaveOpen, token))
.flatMap(token -> executeStreamingIngest(clusterEndpoint, stream, blobUrl, properties, leaveOpen, token))
.switchIfEmpty(executeStreamingIngest(clusterEndpoint, stream, blobUrl, properties, leaveOpen, null)));
}

private Mono<KustoOperationResult> executeStreamingIngest(String clusterEndpoint, InputStream stream, String blobUrl,
ClientRequestProperties properties, boolean leaveOpen, String authorizationToken) {
ClientRequestProperties properties, boolean leaveOpen, String authorizationToken) {
boolean isStreamSource = stream != null;
Map<String, String> headers = new HashMap<>();
String contentEncoding = isStreamSource ? "gzip" : null;
String contentType = isStreamSource ? "application/octet-stream" : "application/json";

return Mono.fromCallable(() -> determineTimeout(properties, CommandType.STREAMING_INGEST, clusterUrl)) // Step 1: Determine timeout
return Mono.fromCallable(() -> determineTimeout(properties, CommandType.STREAMING_INGEST, clusterUrl)) // Step 1: Determine timeout
.flatMap(timeoutMs -> {

// This was a separate method but was moved into the body of this method because it performs a side effect
Expand Down Expand Up @@ -341,9 +339,10 @@ private Mono<KustoOperationResult> executeStreamingIngest(String clusterEndpoint
.withBody(data)
.build();
}).flatMap(httpRequest -> MonitoredActivity.wrap(postAsync(httpRequest, timeoutMs), "ClientImpl.executeStreamingIngest")
.flatMap(response ->
Mono.fromCallable(() -> new KustoOperationResult(response, "v1")))
.onErrorMap(KustoServiceQueryError.class, e -> new DataClientException(clusterEndpoint, "Error converting json response to KustoOperationResult:" + e.getMessage(), e))
.flatMap(response -> Mono.fromCallable(() -> new KustoOperationResult(response, "v1")))
.onErrorMap(KustoServiceQueryError.class,
e -> new DataClientException(clusterEndpoint, "Error converting json response to KustoOperationResult:" + e.getMessage(),
e))
.onErrorMap(IOException.class, e -> new DataClientException(clusterUrl, e.getMessage(), e)));
})
.doFinally(signalType -> {
Expand Down Expand Up @@ -420,7 +419,7 @@ private Mono<InputStream> executeStreamingQueryAsync(@NotNull KustoRequest kr) {
}

private Mono<InputStream> executeStreamingQuery(String clusterEndpoint, KustoRequest kr,
HttpTracing tracing, String authorizationToken) {
HttpTracing tracing, String authorizationToken) {
try {
HttpRequest request = HttpRequestBuilder
.newPost(clusterEndpoint)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,34 +35,32 @@ public ExponentialRetry(ExponentialRetry other) {
}

public Retry retry() {
return Retry.from(retrySignals ->
retrySignals.flatMap(retrySignal -> {
return Retry.from(retrySignals -> retrySignals.flatMap(retrySignal -> {

Retry.RetrySignal signalCopy = retrySignal.copy();
long currentAttempt = signalCopy.totalRetries();
log.info("Retry attempt {}.", currentAttempt);
Retry.RetrySignal signalCopy = retrySignal.copy();
long currentAttempt = signalCopy.totalRetries();
log.info("Retry attempt {}.", currentAttempt);

Throwable failure = signalCopy.failure();
if (failure instanceof DataServiceException && ((DataServiceException) failure).isPermanent()) {
log.error("Error is permanent, stopping.", failure);
return Mono.error(failure);
}
Throwable failure = signalCopy.failure();
if (failure instanceof DataServiceException && ((DataServiceException) failure).isPermanent()) {
log.error("Error is permanent, stopping.", failure);
return Mono.error(failure);
}

if (currentAttempt >= maxAttempts) {
log.info("Max retry attempts reached: {}.", currentAttempt);
return Mono.error(failure);
}
if (currentAttempt >= maxAttempts) {
log.info("Max retry attempts reached: {}.", currentAttempt);
return Mono.error(failure);
}

double currentSleepSecs = sleepBaseSecs * (float) Math.pow(2, currentAttempt);
double jitterSecs = (float) Math.random() * maxJitterSecs;
double sleepMs = (currentSleepSecs + jitterSecs) * 1000;
double currentSleepSecs = sleepBaseSecs * (float) Math.pow(2, currentAttempt);
double jitterSecs = (float) Math.random() * maxJitterSecs;
double sleepMs = (currentSleepSecs + jitterSecs) * 1000;

log.info("Attempt {} failed, trying again after sleep of {} seconds.", currentAttempt, sleepMs / 1000);
log.info("Attempt {} failed, trying again after sleep of {} seconds.", currentAttempt, sleepMs / 1000);

// Each retry can occur on a different thread
return Mono.delay(Duration.ofMillis((long) sleepMs));
})
);
// Each retry can occur on a different thread
return Mono.delay(Duration.ofMillis((long) sleepMs));
}));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ public interface StreamingClient {
* @return {@link KustoOperationResult} object including the ingestion result
*/
KustoOperationResult executeStreamingIngest(String database, String table, InputStream stream, ClientRequestProperties properties, String streamFormat,
String mappingName, boolean leaveOpen);
String mappingName, boolean leaveOpen);

Mono<KustoOperationResult> executeStreamingIngestAsync(String database, String table, InputStream stream, ClientRequestProperties properties, String streamFormat,
String mappingName, boolean leaveOpen);
Mono<KustoOperationResult> executeStreamingIngestAsync(String database, String table, InputStream stream, ClientRequestProperties properties,
String streamFormat,
String mappingName, boolean leaveOpen);

/**
* <p>Query directly from Kusto database using streaming output.</p>
Expand All @@ -46,15 +47,16 @@ Mono<KustoOperationResult> executeStreamingIngestAsync(String database, String t
InputStream executeStreamingQuery(String command);

KustoOperationResult executeStreamingIngestFromBlob(String databaseName, String tableName, String blobUrl, ClientRequestProperties clientRequestProperties,
String dataFormat, String ingestionMappingReference);
String dataFormat, String ingestionMappingReference);

Mono<InputStream> executeStreamingQueryAsync(String command);

Mono<InputStream> executeStreamingQueryAsync(String database, String command);

Mono<InputStream> executeStreamingQueryAsync(String database, String command, ClientRequestProperties properties);

Mono<KustoOperationResult> executeStreamingIngestFromBlobAsync(String databaseName, String tableName, String blobUrl, ClientRequestProperties clientRequestProperties,
String dataFormat, String ingestionMappingReference);
Mono<KustoOperationResult> executeStreamingIngestFromBlobAsync(String databaseName, String tableName, String blobUrl,
ClientRequestProperties clientRequestProperties,
String dataFormat, String ingestionMappingReference);

}
13 changes: 6 additions & 7 deletions data/src/main/java/com/microsoft/azure/kusto/data/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class Utils {
public static ObjectMapper getObjectMapper() {
return JsonMapper.builder().configure(MapperFeature.PROPAGATE_TRANSIENT_MARKER, true).addModule(new JavaTimeModule()).build().configure(
DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true).configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, true).setNodeFactory(
JsonNodeFactory.withExactBigDecimals(true));
JsonNodeFactory.withExactBigDecimals(true));
}

private static final HashSet<Class<? extends IOException>> nonRetriableClasses = new HashSet<Class<? extends IOException>>() {
Expand Down Expand Up @@ -150,12 +150,11 @@ private static Mono<String> processGzipBody(Flux<ByteBuffer> body) {
return new ByteArrayInputStream(bytes);
})
.collectList()
.flatMap(inputStreams ->
Mono.fromCallable(() -> {
try (GZIPInputStream gzipStream = new GZIPInputStream(new SequenceInputStream(Collections.enumeration(inputStreams)))) {
return readStreamToString(gzipStream);
}
}).subscribeOn(Schedulers.boundedElastic()) //TODO: same as ingest module
.flatMap(inputStreams -> Mono.fromCallable(() -> {
try (GZIPInputStream gzipStream = new GZIPInputStream(new SequenceInputStream(Collections.enumeration(inputStreams)))) {
return readStreamToString(gzipStream);
}
}).subscribeOn(Schedulers.boundedElastic()) // TODO: same as ingest module
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public ApplicationKeyTokenProvider(@NotNull String clusterUrl, String clientId,

@Override
protected TokenCredential createTokenCredential(CredentialBuilderBase<?> builder) {
return ((ClientSecretCredentialBuilder)builder)
return ((ClientSecretCredentialBuilder) builder)
.clientSecret(clientSecret)
.build();
}
Expand Down
Loading

0 comments on commit 46a3e92

Please sign in to comment.