diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java b/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java index 43bf956c..c9864b4d 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java @@ -167,12 +167,16 @@ private Map updateAndGetExecuteTracingAttributes(String database } private Mono executeImplAsync(KustoRequest kr) { + String clusterEndpoint = String.format(kr.getCommandType().getEndpoint(), clusterUrl); return executeWithTimeout(kr, ".executeImplAsync") - .flatMap(response -> { - String clusterEndpoint = String.format(kr.getCommandType().getEndpoint(), clusterUrl); + // .publishOn(Schedulers.boundedElastic()) TODO: should the following be published on a new thread? + .map(response -> { JsonResult jsonResult = new JsonResult(response, clusterEndpoint); - return processJsonResultAsync(jsonResult); - }); + return new KustoOperationResult(jsonResult.getResult(), jsonResult.getEndpoint().endsWith("v2/rest/query") ? "v2" : "v1"); + }) + .onErrorMap(KustoServiceQueryError.class, e -> new DataServiceException(clusterEndpoint, + "Error found while parsing json response as KustoOperationResult:" + e, e, e.isPermanent())) + .onErrorMap(Exception.class, e -> new DataClientException(clusterEndpoint, ExceptionUtils.getMessageEx(e), e)); } private Mono executeWithTimeout(KustoRequest request, String nameOfSpan) { @@ -204,14 +208,6 @@ Mono prepareRequestAsync(@NotNull KustoRequest kr) { .switchIfEmpty(Mono.just(buildKustoRequestContext(kr, clusterEndpoint, tracing, null)))); } - private Mono processJsonResultAsync(JsonResult res) { - return Mono.fromCallable(() -> new KustoOperationResult(res.getResult(), res.getEndpoint().endsWith("v2/rest/query") ? "v2" : "v1")) - .subscribeOn(Schedulers.boundedElastic()) - .onErrorMap(KustoServiceQueryError.class, e -> new DataServiceException(res.getEndpoint(), - "Error found while parsing json response as KustoOperationResult:" + e, e, e.isPermanent())) - .onErrorMap(Exception.class, e -> new DataClientException(res.getEndpoint(), ExceptionUtils.getMessageEx(e), e)); - } - private Mono validateEndpointAsync() { if (endpointValidated) { return Mono.empty(); @@ -321,7 +317,8 @@ private Mono executeStreamingIngest(String clusterEndpoint .withBody(data) .build(); return MonitoredActivity.wrap(postAsync(httpRequest, timeoutMs), "ClientImpl.executeStreamingIngest") - .flatMap(response -> Mono.fromCallable(() -> new KustoOperationResult(response, "v1")).subscribeOn(Schedulers.boundedElastic())) + // .publishOn(Schedulers.boundedElastic()) TODO: should the following be published on a new thread? + .map(response -> new KustoOperationResult(response, "v1")) .onErrorMap(KustoServiceQueryError.class, e -> new DataClientException(clusterEndpoint, "Error converting json response to KustoOperationResult:" + e.getMessage(), e)) diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/Utils.java b/data/src/main/java/com/microsoft/azure/kusto/data/Utils.java index ac85027d..c761b993 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/Utils.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/Utils.java @@ -14,7 +14,6 @@ import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; -import java.util.stream.Collectors; import java.util.zip.DeflaterInputStream; import java.util.zip.GZIPInputStream; @@ -35,7 +34,6 @@ import io.github.resilience4j.core.lang.Nullable; import io.github.resilience4j.retry.RetryConfig; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.compression.ZlibCodecFactory; @@ -45,7 +43,6 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; -import reactor.netty.ByteBufFlux; public class Utils { @@ -60,6 +57,7 @@ public class Utils { private static final long MAX_RETRY_INTERVAL = TimeUnit.SECONDS.toMillis(30); private static final long BASE_INTERVAL = TimeUnit.SECONDS.toMillis(2); private static final int DEFAULT_BUFFER_LENGTH = 1024; + private static final ThreadLocal sb = ThreadLocal.withInitial(StringBuilder::new); // added auto bigdecimal deserialization for float and double value, since the bigdecimal values seem to lose precision while auto deserialization to // double value @@ -160,11 +158,43 @@ public static boolean isGzipResponse(HttpResponse response) { public static Mono getResponseBody(HttpResponse httpResponse) { return isGzipResponse(httpResponse) - ? processGzipBody(httpResponse.getBody()) - : processNonGzipBody(httpResponse.getBody()); + ? processGzipBody(httpResponse.getBodyAsInputStream()) + : httpResponse.getBodyAsString(StandardCharsets.UTF_8); } - public static Mono processGzipBody(Flux gzipBody) { + // TODO: delete all comments + + // (Ohad, Asaf opinions?): + // This aggregates the entire compressed stream into memory which is required by GZIPInputStream. + // GZIPInputStream is also blocking + // Additional memory overhead is added due to ByteArrayOutputStream copies. + // Same with all methods provided from HttpResponse object such as: + // response.getBodyAsByteArray(), getBodyAsString() (for non gzip) etc. + // + // The most I could test with is 2000000 records (dataset.csv format) and all methods, including this + // and the one below as well as the methods from HttpResponse, had almost the same query time. + // I am not sure how representative this is for you guys. + public static Mono processGzipBody(Mono body) { + return body + .map(inputStream -> { + try (GZIPInputStream gzipStream = new GZIPInputStream(inputStream); + ByteArrayOutputStream output = new ByteArrayOutputStream()) { + + byte[] buffer = new byte[DEFAULT_BUFFER_LENGTH]; + int bytesRead; + while ((bytesRead = gzipStream.read(buffer)) != -1) { + output.write(buffer, 0, bytesRead); + } + + return new String(output.toByteArray(), StandardCharsets.UTF_8); + } catch (IOException e) { + throw Exceptions.propagate(e); + } + }).subscribeOn(Schedulers.boundedElastic()); + } + + // Alternative an approach to streaming decompression. I had some OOM errors locally though on KustoOperationResult.createFromV2Response + public static Mono processGzipBody1(Flux gzipBody) { // To ensure efficiency in terms of performance and memory allocation, a streaming // chunked decompression approach is utilized using Netty's EmbeddedChannel. This allows the decompression // to occur in chunks, making it more memory-efficient for large payloads, as it prevents the entire @@ -183,7 +213,7 @@ public static Mono processGzipBody(Flux gzipBody) { } // Read decompressed data from the channel and emit each chunk as a String - return Flux.create(sink -> { + return Flux.create(sink -> { ByteBuf decompressedByteBuf; while ((decompressedByteBuf = channel.readInbound()) != null) { sink.next(decompressedByteBuf.toString(StandardCharsets.UTF_8)); @@ -191,49 +221,14 @@ public static Mono processGzipBody(Flux gzipBody) { decompressedByteBuf.release(); } } - sink.complete(); - }).doOnError(throwable -> channel.finish()).cast(String.class); + }).doOnError(ignore -> channel.finish()); }).subscribeOn(Schedulers.boundedElastic()) - .doOnError(throwable -> channel.finish()) - .collect(Collectors.joining()) + .reduce(new StringBuilder(), StringBuilder::append) // TODO: this will cause memory problems right? + .map(StringBuilder::toString) .doFinally(ignore -> channel.finish()); } - // Alternative (Ohad, Asaf opinions?): - // This aggregates the entire compressed stream into memory which is required by GZIPInputStream. - // Additional memory overhead is added due to ByteArrayOutputStream copies. - // Same with all methods provided from HttpResponse object such as: - // response.getBodyAsInputStream() etc. - // - // public static Mono processGzipBody(Mono body) { - // return body - // .flatMap(inputStream -> Mono.fromCallable(() -> { - // try (GZIPInputStream gzipStream = new GZIPInputStream(inputStream); - // ByteArrayOutputStream output = new ByteArrayOutputStream()) { - public static Mono processGzipBody1(Flux body) { - return ByteBufFlux.fromInbound(body.map(Unpooled::wrappedBuffer)).aggregate() - .flatMap(byteBuf -> Mono.fromCallable(() -> { - try (GZIPInputStream gzipStream = new GZIPInputStream(new ByteBufInputStream(byteBuf)); - ByteArrayOutputStream output = new ByteArrayOutputStream()) { - - byte[] buffer = new byte[DEFAULT_BUFFER_LENGTH]; - int bytesRead; - while ((bytesRead = gzipStream.read(buffer)) != -1) { - output.write(buffer, 0, bytesRead); - } - - return new String(output.toByteArray(), StandardCharsets.UTF_8); - } - })).subscribeOn(Schedulers.boundedElastic()); - } - - private static Mono processNonGzipBody(Flux body) { - return body - .map(byteBuffer -> StandardCharsets.UTF_8.decode(byteBuffer.asReadOnlyBuffer()).toString()) - .collect(Collectors.joining()); - } - /** * Method responsible for constructing the correct InputStream type based on content encoding header *