Skip to content

Commit

Permalink
removes redundant mono creations v3 & fixes on getResponseBody
Browse files Browse the repository at this point in the history
  • Loading branch information
georgebanasios committed Dec 26, 2024
1 parent ea3b13e commit 1558b02
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 58 deletions.
23 changes: 10 additions & 13 deletions data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,16 @@ private Map<String, String> updateAndGetExecuteTracingAttributes(String database
}

private Mono<KustoOperationResult> executeImplAsync(KustoRequest kr) {
String clusterEndpoint = String.format(kr.getCommandType().getEndpoint(), clusterUrl);
return executeWithTimeout(kr, ".executeImplAsync")
.flatMap(response -> {
String clusterEndpoint = String.format(kr.getCommandType().getEndpoint(), clusterUrl);
// .publishOn(Schedulers.boundedElastic()) TODO: should the following be published on a new thread?
.map(response -> {
JsonResult jsonResult = new JsonResult(response, clusterEndpoint);
return processJsonResultAsync(jsonResult);
});
return new KustoOperationResult(jsonResult.getResult(), jsonResult.getEndpoint().endsWith("v2/rest/query") ? "v2" : "v1");
})
.onErrorMap(KustoServiceQueryError.class, e -> new DataServiceException(clusterEndpoint,
"Error found while parsing json response as KustoOperationResult:" + e, e, e.isPermanent()))
.onErrorMap(Exception.class, e -> new DataClientException(clusterEndpoint, ExceptionUtils.getMessageEx(e), e));
}

private Mono<String> executeWithTimeout(KustoRequest request, String nameOfSpan) {
Expand Down Expand Up @@ -204,14 +208,6 @@ Mono<KustoRequestContext> prepareRequestAsync(@NotNull KustoRequest kr) {
.switchIfEmpty(Mono.just(buildKustoRequestContext(kr, clusterEndpoint, tracing, null))));
}

private Mono<KustoOperationResult> processJsonResultAsync(JsonResult res) {
return Mono.fromCallable(() -> new KustoOperationResult(res.getResult(), res.getEndpoint().endsWith("v2/rest/query") ? "v2" : "v1"))
.subscribeOn(Schedulers.boundedElastic())
.onErrorMap(KustoServiceQueryError.class, e -> new DataServiceException(res.getEndpoint(),
"Error found while parsing json response as KustoOperationResult:" + e, e, e.isPermanent()))
.onErrorMap(Exception.class, e -> new DataClientException(res.getEndpoint(), ExceptionUtils.getMessageEx(e), e));
}

private Mono<Void> validateEndpointAsync() {
if (endpointValidated) {
return Mono.empty();
Expand Down Expand Up @@ -321,7 +317,8 @@ private Mono<KustoOperationResult> executeStreamingIngest(String clusterEndpoint
.withBody(data)
.build();
return MonitoredActivity.wrap(postAsync(httpRequest, timeoutMs), "ClientImpl.executeStreamingIngest")
.flatMap(response -> Mono.fromCallable(() -> new KustoOperationResult(response, "v1")).subscribeOn(Schedulers.boundedElastic()))
// .publishOn(Schedulers.boundedElastic()) TODO: should the following be published on a new thread?
.map(response -> new KustoOperationResult(response, "v1"))
.onErrorMap(KustoServiceQueryError.class,
e -> new DataClientException(clusterEndpoint, "Error converting json response to KustoOperationResult:" + e.getMessage(),
e))
Expand Down
85 changes: 40 additions & 45 deletions data/src/main/java/com/microsoft/azure/kusto/data/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.zip.DeflaterInputStream;
import java.util.zip.GZIPInputStream;

Expand All @@ -35,7 +34,6 @@
import io.github.resilience4j.core.lang.Nullable;
import io.github.resilience4j.retry.RetryConfig;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.compression.ZlibCodecFactory;
Expand All @@ -45,7 +43,6 @@
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.netty.ByteBufFlux;

public class Utils {

Expand All @@ -60,6 +57,7 @@ public class Utils {
private static final long MAX_RETRY_INTERVAL = TimeUnit.SECONDS.toMillis(30);
private static final long BASE_INTERVAL = TimeUnit.SECONDS.toMillis(2);
private static final int DEFAULT_BUFFER_LENGTH = 1024;
private static final ThreadLocal<StringBuilder> sb = ThreadLocal.withInitial(StringBuilder::new);

// added auto bigdecimal deserialization for float and double value, since the bigdecimal values seem to lose precision while auto deserialization to
// double value
Expand Down Expand Up @@ -160,11 +158,43 @@ public static boolean isGzipResponse(HttpResponse response) {

public static Mono<String> getResponseBody(HttpResponse httpResponse) {
return isGzipResponse(httpResponse)
? processGzipBody(httpResponse.getBody())
: processNonGzipBody(httpResponse.getBody());
? processGzipBody(httpResponse.getBodyAsInputStream())
: httpResponse.getBodyAsString(StandardCharsets.UTF_8);
}

public static Mono<String> processGzipBody(Flux<ByteBuffer> gzipBody) {
// TODO: delete all comments

// (Ohad, Asaf opinions?):
// This aggregates the entire compressed stream into memory which is required by GZIPInputStream.
// GZIPInputStream is also blocking
// Additional memory overhead is added due to ByteArrayOutputStream copies.
// Same with all methods provided from HttpResponse object such as:
// response.getBodyAsByteArray(), getBodyAsString() (for non gzip) etc.
//
// The most I could test with is 2000000 records (dataset.csv format) and all methods, including this
// and the one below as well as the methods from HttpResponse, had almost the same query time.
// I am not sure how representative this is for you guys.
public static Mono<String> processGzipBody(Mono<InputStream> body) {
return body
.map(inputStream -> {
try (GZIPInputStream gzipStream = new GZIPInputStream(inputStream);
ByteArrayOutputStream output = new ByteArrayOutputStream()) {

byte[] buffer = new byte[DEFAULT_BUFFER_LENGTH];
int bytesRead;
while ((bytesRead = gzipStream.read(buffer)) != -1) {
output.write(buffer, 0, bytesRead);
}

return new String(output.toByteArray(), StandardCharsets.UTF_8);
} catch (IOException e) {
throw Exceptions.propagate(e);
}
}).subscribeOn(Schedulers.boundedElastic());
}

// Alternative an approach to streaming decompression. I had some OOM errors locally though on KustoOperationResult.createFromV2Response
public static Mono<String> processGzipBody1(Flux<ByteBuffer> gzipBody) {
// To ensure efficiency in terms of performance and memory allocation, a streaming
// chunked decompression approach is utilized using Netty's EmbeddedChannel. This allows the decompression
// to occur in chunks, making it more memory-efficient for large payloads, as it prevents the entire
Expand All @@ -183,57 +213,22 @@ public static Mono<String> processGzipBody(Flux<ByteBuffer> gzipBody) {
}

// Read decompressed data from the channel and emit each chunk as a String
return Flux.create(sink -> {
return Flux.<String>create(sink -> {
ByteBuf decompressedByteBuf;
while ((decompressedByteBuf = channel.readInbound()) != null) {
sink.next(decompressedByteBuf.toString(StandardCharsets.UTF_8));
if (decompressedByteBuf.refCnt() > 0) {
decompressedByteBuf.release();
}
}

sink.complete();
}).doOnError(throwable -> channel.finish()).cast(String.class);
}).doOnError(ignore -> channel.finish());
}).subscribeOn(Schedulers.boundedElastic())
.doOnError(throwable -> channel.finish())
.collect(Collectors.joining())
.reduce(new StringBuilder(), StringBuilder::append) // TODO: this will cause memory problems right?
.map(StringBuilder::toString)
.doFinally(ignore -> channel.finish());
}

// Alternative (Ohad, Asaf opinions?):
// This aggregates the entire compressed stream into memory which is required by GZIPInputStream.
// Additional memory overhead is added due to ByteArrayOutputStream copies.
// Same with all methods provided from HttpResponse object such as:
// response.getBodyAsInputStream() etc.
//
// public static Mono<String> processGzipBody(Mono<InputStream> body) {
// return body
// .flatMap(inputStream -> Mono.fromCallable(() -> {
// try (GZIPInputStream gzipStream = new GZIPInputStream(inputStream);
// ByteArrayOutputStream output = new ByteArrayOutputStream()) {
public static Mono<String> processGzipBody1(Flux<ByteBuffer> body) {
return ByteBufFlux.fromInbound(body.map(Unpooled::wrappedBuffer)).aggregate()
.flatMap(byteBuf -> Mono.fromCallable(() -> {
try (GZIPInputStream gzipStream = new GZIPInputStream(new ByteBufInputStream(byteBuf));
ByteArrayOutputStream output = new ByteArrayOutputStream()) {

byte[] buffer = new byte[DEFAULT_BUFFER_LENGTH];
int bytesRead;
while ((bytesRead = gzipStream.read(buffer)) != -1) {
output.write(buffer, 0, bytesRead);
}

return new String(output.toByteArray(), StandardCharsets.UTF_8);
}
})).subscribeOn(Schedulers.boundedElastic());
}

private static Mono<String> processNonGzipBody(Flux<ByteBuffer> body) {
return body
.map(byteBuffer -> StandardCharsets.UTF_8.decode(byteBuffer.asReadOnlyBuffer()).toString())
.collect(Collectors.joining());
}

/**
* Method responsible for constructing the correct InputStream type based on content encoding header
*
Expand Down

0 comments on commit 1558b02

Please sign in to comment.