Skip to content

Commit

Permalink
removes redundant mono creations v2
Browse files Browse the repository at this point in the history
  • Loading branch information
georgebanasios committed Dec 23, 2024
2 parents 7dc1e53 + 0acc991 commit cc87828
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 27 deletions.
22 changes: 11 additions & 11 deletions data/src/main/java/com/microsoft/azure/kusto/data/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ public BaseClient(HttpClient httpClient) {
protected Mono<String> postAsync(HttpRequest request, long timeoutMs) {
return httpClient.send(request, getContextTimeout(timeoutMs))
.flatMap(response -> Utils.getResponseBody(response)
.flatMap(responseBody -> {
.map(responseBody -> {
switch (response.getStatusCode()) {
case HttpStatus.OK:
return Mono.justOrEmpty(responseBody);
return responseBody;
case HttpStatus.TOO_MANY_REQS:
return Mono.error(new ThrottleException(response.getRequest().getUrl().toString()));
throw new ThrottleException(response.getRequest().getUrl().toString());
default:
return Mono.error(createExceptionFromResponse(
response.getRequest().getUrl().toString(), response, null, responseBody));
throw createExceptionFromResponse(
response.getRequest().getUrl().toString(), response, null, responseBody);
}
}).doFinally(ignore -> response.close()))
.onErrorMap(e -> {
Expand All @@ -81,7 +81,8 @@ protected Mono<InputStream> postToStreamingOutputAsync(HttpRequest request, long
int responseStatusCode = httpResponse.getStatusCode();
if (responseStatusCode == HttpStatus.OK) {
state.setReturnInputStream(true);
return httpResponse.getBodyAsInputStream() //TODO: since we want to just close on EOF should we implement EofSensorInputStream ourselves and remove the remaining Apache dependency or not?
return httpResponse.getBodyAsInputStream() // TODO: since we want to just close on EOF should we implement EofSensorInputStream
// ourselves and remove the remaining Apache dependency or not?
.map(inputStream -> new EofSensorInputStream(new CloseParentResourcesStream(httpResponse, inputStream), null));
}

Expand All @@ -101,9 +102,9 @@ private Mono<InputStream> handleErrorResponse(HttpResponse httpResponse, Respons
.flatMap(content -> {
state.setErrorFromResponse(content);
if (content.isEmpty() || content.equals("{}")) {
return Mono.error(new DataServiceException(request.getUrl().toString(),
throw new DataServiceException(request.getUrl().toString(),
"postToStreamingOutputAsync failed to get or decompress response body.",
true));
true);
}

// Ideal to close here (as opposed to finally) so that if any data can't be flushed, the exception will be properly thrown and
Expand All @@ -122,11 +123,10 @@ private Mono<InputStream> handleErrorResponse(HttpResponse httpResponse, Respons
.build();
return postToStreamingOutputAsync(redirectRequest, timeoutMs, currentRedirectCounter + 1, maxRedirectCount);
})
.orElse(Mono.error(
createExceptionFromResponse(request.getUrl().toString(), httpResponse, null, state.getErrorFromResponse())));
.orElseThrow(() -> createExceptionFromResponse(request.getUrl().toString(), httpResponse, null, state.getErrorFromResponse()));
}

return Mono.error(createExceptionFromResponse(request.getUrl().toString(), httpResponse, null, state.getErrorFromResponse()));
throw createExceptionFromResponse(request.getUrl().toString(), httpResponse, null, state.getErrorFromResponse());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public KustoOperationResult executeStreamingIngest(String database, String table
public Mono<KustoOperationResult> executeStreamingIngestAsync(String database, String table, InputStream stream, ClientRequestProperties properties,
String streamFormat, String mappingName, boolean leaveOpen) {
if (stream == null) {
return Mono.error(new IllegalArgumentException("The provided stream is null."));
throw new IllegalArgumentException("The provided stream is null.");
}

return Mono.defer(() -> {
Expand All @@ -263,7 +263,7 @@ public KustoOperationResult executeStreamingIngestFromBlob(String database, Stri
public Mono<KustoOperationResult> executeStreamingIngestFromBlobAsync(String database, String table, String blobUrl, ClientRequestProperties properties,
String dataFormat, String mappingName) {
if (blobUrl == null) {
return Mono.error(new IllegalArgumentException("The provided blobUrl is null."));
throw new IllegalArgumentException("The provided blobUrl is null.");
}

return Mono.defer(() -> {
Expand Down
3 changes: 2 additions & 1 deletion data/src/main/java/com/microsoft/azure/kusto/data/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public class Utils {

// Use a custom parallel scheduler for retries to avoid thread starvation in case other services
// are using the reactor parallel scheduler
public static final Scheduler ADX_PARALLEL_SCHEDULER = Schedulers.newParallel( //TODO: does that make sense? Should this be done on boundedElastic instead or not at all?
public static final Scheduler ADX_PARALLEL_SCHEDULER = Schedulers.newParallel( // TODO: does that make sense? Should this be done on boundedElastic instead
// or not at all?
"adx-kusto-parallel",
Schedulers.DEFAULT_POOL_SIZE,
true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.jetbrains.annotations.Nullable;

import com.azure.core.http.HttpClient;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity;

Expand Down Expand Up @@ -42,12 +41,7 @@ final Mono<Void> initialize() {
"CloudDependentTokenProviderBase.retrieveCloudInfo", getTracingAttributes())
.flatMap(cloudInfo -> {
this.cloudInfo = cloudInfo;

try {
initializeWithCloudInfo(cloudInfo);
} catch (DataClientException | DataServiceException e) {
return Mono.error(e);
}
initializeWithCloudInfo(cloudInfo);
this.initialized = true;
return Mono.empty();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public static Mono<CloudInfo> retrieveCloudInfoForClusterAsync(String clusterUrl

// Ensure that if multiple threads request the cloud info for the same cluster url, only one http call will be made
// for all corresponding threads
return Mono.fromCallable(() -> new UrlPair(UriUtils.setPathForUri(clusterUrl, ""), UriUtils.appendPathToUri(clusterUrl, METADATA_ENDPOINT)))
return Mono.fromCallable(() -> new UrlPair(UriUtils.setPathForUri(clusterUrl, ""), UriUtils.setPathForUri(clusterUrl, METADATA_ENDPOINT)))
.flatMap(urls -> cache.computeIfAbsent(urls.clusterEndpoint, key -> fetchCloudInfoAsync(urls, givenHttpClient)
.retryWhen(RETRY_CONFIG)
.onErrorMap(e -> ExceptionUtils.unwrapCloudInfoException(urls.clusterEndpoint, e))));
Expand Down Expand Up @@ -120,22 +120,22 @@ private static Mono<CloudInfo> fetchCloudInfoAsync(UrlPair clusterUrls, @Nullabl
private static Mono<CloudInfo> getCloudInfo(HttpResponse response, String clusterUrl) {
int statusCode = response.getStatusCode();
return Utils.getResponseBody(response)
.flatMap(content -> {
.map(content -> {
if (statusCode == HttpStatus.OK) {
if (content.isEmpty() || content.equals("{}")) {
throw new DataServiceException(clusterUrl, "Error in metadata endpoint, received no data", true);
}
return Mono.just(parseCloudInfo(content));
return parseCloudInfo(content);
} else if (statusCode == HttpStatus.NOT_FOUND) {
return Mono.just(DEFAULT_CLOUD);
return DEFAULT_CLOUD;
} else {
String errorFromResponse = content;
if (errorFromResponse.isEmpty()) {
// Fixme: Missing reason phrase to add to exception. Potentially want to use an enum.
errorFromResponse = "";
}
return Mono.error(new DataServiceException(clusterUrl, "Error in metadata endpoint, got code: " + statusCode +
"\nWith error: " + errorFromResponse, statusCode != HttpStatus.TOO_MANY_REQS));
throw new DataServiceException(clusterUrl, "Error in metadata endpoint, got code: " + statusCode +
"\nWith error: " + errorFromResponse, statusCode != HttpStatus.TOO_MANY_REQS);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -704,6 +705,7 @@ void testSameHttpClientInstance() throws URISyntaxException {

}

@Disabled("This test is disabled because it relies on the path part of the cluster Uri which we now ignore")
@Test
void testNoRedirectsCloudFail() {
KustoTrustedEndpoints.addTrustedHosts(Collections.singletonList(new MatchRule("statusreturner.azurewebsites.net", false)), false);
Expand Down

0 comments on commit cc87828

Please sign in to comment.