diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java b/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java index 55aff53d..5b42dc5a 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java @@ -305,7 +305,7 @@ private Mono executeStreamingIngest(String clusterEndpoint String contentEncoding = isStreamSource ? "gzip" : null; String contentType = isStreamSource ? "application/octet-stream" : "application/json"; - return Mono.fromCallable(() -> determineTimeout(properties, CommandType.STREAMING_INGEST, clusterUrl)) // Step 1: Determine timeout + return Mono.fromCallable(() -> determineTimeout(properties, CommandType.STREAMING_INGEST, clusterUrl)) .flatMap(timeoutMs -> { // This was a separate method but was moved into the body of this method because it performs a side effect @@ -344,7 +344,7 @@ private Mono executeStreamingIngest(String clusterEndpoint .withBody(data) .build(); }).flatMap(httpRequest -> MonitoredActivity.wrap(postAsync(httpRequest, timeoutMs), "ClientImpl.executeStreamingIngest") - .flatMap(response -> Mono.fromCallable(() -> new KustoOperationResult(response, "v1"))) + .flatMap(response -> Mono.fromCallable(() -> new KustoOperationResult(response, "v1")).subscribeOn(Schedulers.boundedElastic())) .onErrorMap(KustoServiceQueryError.class, e -> new DataClientException(clusterEndpoint, "Error converting json response to KustoOperationResult:" + e.getMessage(), e))