diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/ExponentialRetry.java b/data/src/main/java/com/microsoft/azure/kusto/data/ExponentialRetry.java index 4d385c327..60ee64a5a 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/ExponentialRetry.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/ExponentialRetry.java @@ -1,15 +1,11 @@ package com.microsoft.azure.kusto.data; -import com.microsoft.azure.kusto.data.exceptions.DataServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.Mono; -import reactor.util.retry.Retry; import java.lang.invoke.MethodHandles; -import java.time.Duration; -public class ExponentialRetry { +public class ExponentialRetry { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final int maxAttempts; @@ -34,33 +30,36 @@ public ExponentialRetry(ExponentialRetry other) { this.maxJitterSecs = other.maxJitterSecs; } - public Retry retry() { - return Retry.from(retrySignals -> retrySignals.flatMap(retrySignal -> { + // Caller should throw only permanent errors, returning null if a retry is needed + public T execute(KustoCheckedFunction function) throws E1, E2 { + for (int currentAttempt = 0; currentAttempt < maxAttempts; currentAttempt++) { + log.info("execute: Attempt {}", currentAttempt); - Retry.RetrySignal signalCopy = retrySignal.copy(); - long currentAttempt = signalCopy.totalRetries(); - log.info("Retry attempt {}.", currentAttempt); - - Throwable failure = signalCopy.failure(); - if (failure instanceof DataServiceException && ((DataServiceException) failure).isPermanent()) { - log.error("Error is permanent, stopping.", failure); - return Mono.error(failure); - } - - if (currentAttempt >= maxAttempts) { - log.info("Max retry attempts reached: {}.", currentAttempt); - return Mono.error(failure); + try { + T result = function.apply(currentAttempt); + if (result != null) { + return result; + } + } catch (Exception e) { + log.error("execute: Error is permanent, stopping", e); + throw e; } double currentSleepSecs = sleepBaseSecs * (float) Math.pow(2, currentAttempt); double jitterSecs = (float) Math.random() * maxJitterSecs; double sleepMs = (currentSleepSecs + jitterSecs) * 1000; - log.info("Attempt {} failed, trying again after sleep of {} seconds.", currentAttempt, sleepMs / 1000); + log.info("execute: Attempt {} failed, trying again after sleep of {} seconds", currentAttempt, sleepMs / 1000); + + try { + Thread.sleep((long) sleepMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("execute: Interrupted while sleeping", e); + } + } - // Each retry can occur on a different thread - return Mono.delay(Duration.ofMillis((long) sleepMs)); - })); + return null; } } diff --git a/ingest/pom.xml b/ingest/pom.xml index 5f9b2dad2..6e906fcad 100644 --- a/ingest/pom.xml +++ b/ingest/pom.xml @@ -281,17 +281,5 @@ 6.0.0 compile - - - io.projectreactor - reactor-test - ${reactor-test.version} - test - - - org.reactivestreams - reactive-streams - ${reactive-streams.version} - diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClient.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClient.java index d17d8d784..96c45cb4f 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClient.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClient.java @@ -3,12 +3,13 @@ package com.microsoft.azure.kusto.ingest; +import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException; +import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException; import com.microsoft.azure.kusto.ingest.result.IngestionResult; import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo; import com.microsoft.azure.kusto.ingest.source.FileSourceInfo; import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo; import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo; -import reactor.core.publisher.Mono; import java.io.Closeable; @@ -22,12 +23,13 @@ public interface IngestClient extends Closeable { * @param fileSourceInfo The specific SourceInfo to be ingested * @param ingestionProperties Settings used to customize the ingestion operation * @return {@link IngestionResult} object including the ingestion result + * @throws IngestionClientException An exception originating from a client activity + * @throws IngestionServiceException An exception returned from the service * @see FileSourceInfo * @see IngestionProperties */ - IngestionResult ingestFromFile(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties); - - Mono ingestFromFileAsync(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties); + IngestionResult ingestFromFile(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException; /** *

Ingest data from a blob storage into Kusto database.

@@ -37,12 +39,13 @@ public interface IngestClient extends Closeable { * @param blobSourceInfo The specific SourceInfo to be ingested * @param ingestionProperties Settings used to customize the ingestion operation * @return {@link IngestionResult} object including the ingestion result + * @throws IngestionClientException An exception originating from a client activity + * @throws IngestionServiceException An exception returned from the service * @see BlobSourceInfo * @see IngestionProperties */ - IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties); - - Mono ingestFromBlobAsync(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties); + IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException; /** *

Ingest data from a Result Set into Kusto database.

@@ -55,12 +58,13 @@ public interface IngestClient extends Closeable { * @param resultSetSourceInfo The specific SourceInfo to be ingested * @param ingestionProperties Settings used to customize the ingestion operation * @return {@link IngestionResult} object including the ingestion result + * @throws IngestionClientException An exception originating from a client activity + * @throws IngestionServiceException An exception returned from the service * @see ResultSetSourceInfo * @see IngestionProperties */ - IngestionResult ingestFromResultSet(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties); - - Mono ingestFromResultSetAsync(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties); + IngestionResult ingestFromResultSet(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException; /** *

Ingest data from an input stream, into Kusto database.

@@ -70,10 +74,11 @@ public interface IngestClient extends Closeable { * @param streamSourceInfo The specific SourceInfo to be ingested * @param ingestionProperties Settings used to customize the ingestion operation * @return {@link IngestionResult} object including the ingestion result + * @throws IngestionClientException An exception originating from a client activity + * @throws IngestionServiceException An exception returned from the service * @see StreamSourceInfo * @see IngestionProperties */ - IngestionResult ingestFromStream(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties); - - Mono ingestFromStreamAsync(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties); + IngestionResult ingestFromStream(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException; } diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClientBase.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClientBase.java index eae268f13..7833d4048 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClientBase.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClientBase.java @@ -1,17 +1,19 @@ package com.microsoft.azure.kusto.ingest; -import com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity; -import com.microsoft.azure.kusto.data.instrumentation.TraceableAttributes; -import com.microsoft.azure.kusto.ingest.result.IngestionResult; -import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo; +import com.microsoft.azure.kusto.data.exceptions.ExceptionsUtils; import com.microsoft.azure.kusto.ingest.source.CompressionType; -import com.microsoft.azure.kusto.ingest.source.FileSourceInfo; -import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo; -import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo; import org.apache.http.conn.util.InetAddressUtils; -import reactor.core.publisher.Mono; +import java.io.IOException; import java.net.URI; +import com.microsoft.azure.kusto.data.instrumentation.SupplierTwoExceptions; +import com.microsoft.azure.kusto.data.instrumentation.TraceableAttributes; +import com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity; +import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException; +import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException; +import com.microsoft.azure.kusto.ingest.result.IngestionResult; +import com.microsoft.azure.kusto.ingest.source.*; + import java.util.HashMap; import java.util.Map; @@ -57,18 +59,16 @@ static boolean isReservedHostname(String rawUri) { return isLocalFlag || isIPFlag || authority.equalsIgnoreCase("onebox.dev.kusto.windows.net"); } - public IngestionResult ingestFromFile(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) { - return ingestFromFileAsync(fileSourceInfo, ingestionProperties).block(); - } - - protected abstract Mono ingestFromFileAsyncImpl(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties); + protected abstract IngestionResult ingestFromFileImpl(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException; - public Mono ingestFromFileAsync(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) { - // trace ingestFromFileAsync - return Mono.defer(() -> MonitoredActivity.wrap( - ingestFromFileAsyncImpl(fileSourceInfo, + public IngestionResult ingestFromFile(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException { + // trace ingestFromFile + return MonitoredActivity.invoke( + (SupplierTwoExceptions) () -> ingestFromFileImpl(fileSourceInfo, ingestionProperties), - getClientType().concat(".ingestFromFile"))); + getClientType().concat(".ingestFromFile")); } /** @@ -79,21 +79,21 @@ public Mono ingestFromFileAsync(FileSourceInfo fileSourceInfo, * @param blobSourceInfo The specific SourceInfo to be ingested * @param ingestionProperties Settings used to customize the ingestion operation * @return {@link IngestionResult} object including the ingestion result + * @throws IngestionClientException An exception originating from a client activity + * @throws IngestionServiceException An exception returned from the service * @see BlobSourceInfo * @see IngestionProperties */ - public IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) { - return ingestFromBlobAsync(blobSourceInfo, ingestionProperties).block(); - } + protected abstract IngestionResult ingestFromBlobImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException; - protected abstract Mono ingestFromBlobAsyncImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties); - - public Mono ingestFromBlobAsync(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) { + public IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException { // trace ingestFromBlob - return Mono.defer(() -> MonitoredActivity.wrap( - ingestFromBlobAsyncImpl(blobSourceInfo, + return MonitoredActivity.invoke( + (SupplierTwoExceptions) () -> ingestFromBlobImpl(blobSourceInfo, ingestionProperties), - getClientType().concat(".ingestFromBlob"))); + getClientType().concat(".ingestFromBlob")); } /** @@ -107,21 +107,21 @@ public Mono ingestFromBlobAsync(BlobSourceInfo blobSourceInfo, * @param resultSetSourceInfo The specific SourceInfo to be ingested * @param ingestionProperties Settings used to customize the ingestion operation * @return {@link IngestionResult} object including the ingestion result + * @throws IngestionClientException An exception originating from a client activity + * @throws IngestionServiceException An exception returned from the service * @see ResultSetSourceInfo * @see IngestionProperties */ - public IngestionResult ingestFromResultSet(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) { - return ingestFromResultSetAsync(resultSetSourceInfo, ingestionProperties).block(); - } + protected abstract IngestionResult ingestFromResultSetImpl(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException; - protected abstract Mono ingestFromResultSetAsyncImpl(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties); - - public Mono ingestFromResultSetAsync(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) { + public IngestionResult ingestFromResultSet(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException { // trace ingestFromResultSet - return Mono.defer(() -> MonitoredActivity.wrap( - ingestFromResultSetAsyncImpl(resultSetSourceInfo, + return MonitoredActivity.invoke( + (SupplierTwoExceptions) () -> ingestFromResultSetImpl(resultSetSourceInfo, ingestionProperties), - getClientType().concat(".ingestFromResultSet"))); + getClientType().concat(".ingestFromResultSet")); } /** @@ -132,21 +132,27 @@ public Mono ingestFromResultSetAsync(ResultSetSourceInfo result * @param streamSourceInfo The specific SourceInfo to be ingested * @param ingestionProperties Settings used to customize the ingestion operation * @return {@link IngestionResult} object including the ingestion result + * @throws IngestionClientException An exception originating from a client activity + * @throws IngestionServiceException An exception returned from the service * @see StreamSourceInfo * @see IngestionProperties */ - public IngestionResult ingestFromStream(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) { - return ingestFromStreamAsync(streamSourceInfo, ingestionProperties).block(); - } + protected abstract IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException, IOException; - protected abstract Mono ingestFromStreamAsyncImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties); - - public Mono ingestFromStreamAsync(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) { + public IngestionResult ingestFromStream(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException { // trace ingestFromStream - return Mono.defer(() -> MonitoredActivity.wrap( - ingestFromStreamAsyncImpl(streamSourceInfo, - ingestionProperties), - getClientType().concat(".ingestFromStream"))); + return MonitoredActivity.invoke( + (SupplierTwoExceptions) () -> { + try { + return ingestFromStreamImpl(streamSourceInfo, + ingestionProperties); + } catch (IOException e) { + throw new IngestionServiceException(ExceptionsUtils.getMessageEx(e), e); + } + }, + getClientType().concat(".ingestFromStream")); } protected Map getIngestionTraceAttributes(TraceableAttributes sourceInfo, TraceableAttributes ingestionProperties) { diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ManagedStreamingIngestClient.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ManagedStreamingIngestClient.java index 32c0bbabe..af37c23c7 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ManagedStreamingIngestClient.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ManagedStreamingIngestClient.java @@ -6,34 +6,25 @@ import com.azure.storage.blob.models.BlobStorageException; import com.azure.storage.common.policy.RequestRetryOptions; import com.microsoft.azure.kusto.data.Ensure; -import com.microsoft.azure.kusto.data.ExponentialRetry; +import com.microsoft.azure.kusto.data.http.HttpClientProperties; import com.microsoft.azure.kusto.data.StreamingClient; import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; import com.microsoft.azure.kusto.data.exceptions.DataServiceException; import com.microsoft.azure.kusto.data.exceptions.DataWebException; -import com.microsoft.azure.kusto.data.exceptions.ExceptionsUtils; import com.microsoft.azure.kusto.data.exceptions.OneApiError; -import com.microsoft.azure.kusto.data.http.HttpClientProperties; +import com.microsoft.azure.kusto.data.exceptions.ExceptionsUtils; import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException; import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException; import com.microsoft.azure.kusto.ingest.result.IngestionResult; -import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo; -import com.microsoft.azure.kusto.ingest.source.FileSourceInfo; -import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo; -import com.microsoft.azure.kusto.ingest.source.SourceInfo; -import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo; +import com.microsoft.azure.kusto.ingest.source.*; +import com.microsoft.azure.kusto.data.ExponentialRetry; + import com.microsoft.azure.kusto.ingest.utils.IngestionUtils; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; - -import java.io.ByteArrayInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.io.SequenceInputStream; + +import java.io.*; import java.lang.invoke.MethodHandles; import java.net.URISyntaxException; import java.util.UUID; @@ -60,7 +51,7 @@ public class ManagedStreamingIngestClient extends IngestClientBase implements Qu private final ExponentialRetry exponentialRetryTemplate; private HttpClient httpClient = null; private ManagedStreamingQueuingPolicy queuingPolicy = ManagedStreamingQueuingPolicy.Default; - private static final String FALLBACK_LOG_STRING = "Data size is greater than max streaming size according to the policy. Falling back to queued."; + private static final String fallbackLogString = "Data size is greater than max streaming size according to the policy. Falling back to queued."; /** * @param dmConnectionString dm connection string @@ -277,217 +268,196 @@ public ManagedStreamingIngestClient(ResourceManager resourceManager, } @Override - protected Mono ingestFromFileAsyncImpl(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) { - return Mono.fromCallable(() -> { - Ensure.argIsNotNull(fileSourceInfo, "fileSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - fileSourceInfo.validate(); - ingestionProperties.validate(); - return true; - }) - .flatMap(validInput -> Mono.fromCallable(() -> IngestionUtils.fileToStream(fileSourceInfo, true, ingestionProperties.getDataFormat())) - .flatMap(streamSourceInfo -> ingestFromStreamAsync(streamSourceInfo, ingestionProperties))) - .onErrorMap(FileNotFoundException.class, e -> { - log.error("File not found when ingesting a file.", e); - return new IngestionClientException("IO exception - check file path.", e); - }); + protected IngestionResult ingestFromFileImpl(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException { + Ensure.argIsNotNull(fileSourceInfo, "fileSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); + + fileSourceInfo.validate(); + ingestionProperties.validate(); + try { + StreamSourceInfo streamSourceInfo = IngestionUtils.fileToStream(fileSourceInfo, true, ingestionProperties.getDataFormat()); + return ingestFromStream(streamSourceInfo, ingestionProperties); + } catch (FileNotFoundException e) { + log.error("File not found when ingesting a file.", e); + throw new IngestionClientException("IO exception - check file path.", e); + } } + /** + * {@inheritDoc} + *

+ */ @Override - protected Mono ingestFromBlobAsyncImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) { - return Mono.fromCallable(() -> { - Ensure.argIsNotNull(blobSourceInfo, "blobSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - blobSourceInfo.validate(); - ingestionProperties.validate(); - return true; - }) - .flatMap(valid -> { - BlobClientBuilder blobClientBuilder = new BlobClientBuilder().endpoint(blobSourceInfo.getBlobPath()); - if (httpClient != null) { - blobClientBuilder.httpClient(httpClient); - } + protected IngestionResult ingestFromBlobImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException { + Ensure.argIsNotNull(blobSourceInfo, "blobSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - BlobClient blobClient = blobClientBuilder.buildClient(); - - return Mono.fromCallable(() -> { - if (blobSourceInfo.getRawSizeInBytes() <= 0) { - return blobClient.getProperties().getBlobSize(); - } - return blobSourceInfo.getRawSizeInBytes(); - }) - .subscribeOn(Schedulers.boundedElastic()) // TODO: all ingest apis have blocking calls. offload them to bounded elastic pool in - // order for the main reactive thread to continue operate? - .onErrorMap(BlobStorageException.class, e -> new IngestionServiceException( - blobSourceInfo.getBlobPath(), - "Failed getting blob properties: " + ExceptionsUtils.getMessageEx(e), - e)) - .flatMap(blobSize -> handleIngestion(blobSourceInfo, ingestionProperties, blobClient, blobSize)); - }) - .onErrorResume(Mono::error); - } + blobSourceInfo.validate(); + ingestionProperties.validate(); - private Mono handleIngestion(BlobSourceInfo blobSourceInfo, - IngestionProperties ingestionProperties, - BlobClient blobClient, - long blobSize) { - if (queuingPolicy.shouldUseQueuedIngestion(blobSize, blobSourceInfo.getRawSizeInBytes(), - blobSourceInfo.getCompressionType() != null, ingestionProperties.getDataFormat())) { - log.info(FALLBACK_LOG_STRING); - return queuedIngestClient.ingestFromBlobAsync(blobSourceInfo, ingestionProperties); + BlobClientBuilder blobClientBuilder = new BlobClientBuilder().endpoint(blobSourceInfo.getBlobPath()); + if (httpClient != null) { + blobClientBuilder.httpClient(httpClient); } - return executeStream(blobSourceInfo, ingestionProperties, blobClient) - .retryWhen(new ExponentialRetry(exponentialRetryTemplate).retry()) - .onErrorResume(ignored -> queuedIngestClient.ingestFromBlobAsync(blobSourceInfo, ingestionProperties)); // Fall back to queued ingestion - } + BlobClient blobClient = blobClientBuilder.buildClient(); + long blobSize = 0; + if (blobSourceInfo.getRawSizeInBytes() <= 0) { + try { + blobSize = blobClient.getProperties().getBlobSize(); + } catch (BlobStorageException e) { + throw new IngestionServiceException( + blobSourceInfo.getBlobPath(), + "Failed getting blob properties: " + ExceptionsUtils.getMessageEx(e), + e); + } + } - private Mono executeStream(SourceInfo sourceInfo, IngestionProperties ingestionProperties, @Nullable BlobClient blobClient) { - if (blobClient != null) { // TODO: is the currentAttempt in the clientRequestId needed? - String clientRequestId = String.format("KJC.executeManagedStreamingIngest.ingestFromBlob;%s", sourceInfo.getSourceId()); - return streamingIngestClient.ingestFromBlobAsync((BlobSourceInfo) sourceInfo, ingestionProperties, blobClient, clientRequestId) - .onErrorResume(e -> handleStreamingError(sourceInfo, e)); - } else { - String clientRequestId = String.format("KJC.executeManagedStreamingIngest.ingestFromStream;%s", sourceInfo.getSourceId()); - return streamingIngestClient.ingestFromStreamAsync((StreamSourceInfo) sourceInfo, ingestionProperties, clientRequestId) - .onErrorResume(e -> handleStreamingError(sourceInfo, e)); + if (queuingPolicy.shouldUseQueuedIngestion(blobSize, blobSourceInfo.getRawSizeInBytes(), + blobSourceInfo.getCompressionType() != null, ingestionProperties.getDataFormat())) { + log.info(fallbackLogString); + return queuedIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties); } - } - private Mono handleStreamingError(SourceInfo sourceInfo, Throwable e) { - if (e instanceof IngestionServiceException - && e.getCause() != null // TODO: is this needed? - && e.getCause() instanceof DataServiceException - && e.getCause().getCause() != null - && e.getCause().getCause() instanceof DataWebException) { - DataWebException webException = (DataWebException) e.getCause().getCause(); - OneApiError oneApiError = webException.getApiError(); - if (oneApiError.isPermanent()) { - return Mono.error(e); - } + IngestionResult result = streamWithRetries(blobSourceInfo, ingestionProperties, blobClient); + if (result != null) { + return result; } - log.info("Streaming ingestion failed.", e); + return queuedIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties); + } - if (sourceInfo instanceof StreamSourceInfo) { + private IngestionResult streamWithRetries(SourceInfo sourceInfo, IngestionProperties ingestionProperties, @Nullable BlobClient blobClient) + throws IngestionClientException, IngestionServiceException { + ExponentialRetry retry = new ExponentialRetry<>( + exponentialRetryTemplate); + return retry.execute(currentAttempt -> { try { - ((StreamSourceInfo) sourceInfo).getStream().reset(); - } catch (IOException ioException) { - return Mono.error(new IngestionClientException("Failed to reset stream", ioException)); - } - } + if (blobClient != null) { + String clientRequestId = String.format("KJC.executeManagedStreamingIngest.ingestFromBlob;%s;%d", sourceInfo.getSourceId(), currentAttempt); + return streamingIngestClient.ingestFromBlob((BlobSourceInfo) sourceInfo, ingestionProperties, blobClient, clientRequestId); + } else { + String clientRequestId = String.format("KJC.executeManagedStreamingIngest.ingestFromStream;%s;%d", sourceInfo.getSourceId(), + currentAttempt); + return streamingIngestClient.ingestFromStream((StreamSourceInfo) sourceInfo, ingestionProperties, clientRequestId); + } + } catch (Exception e) { + if (e instanceof IngestionServiceException + && e.getCause() != null + && e.getCause() instanceof DataServiceException + && e.getCause().getCause() != null + && e.getCause().getCause() instanceof DataWebException) { + DataWebException webException = (DataWebException) e.getCause().getCause(); + OneApiError oneApiError = webException.getApiError(); + if (oneApiError.isPermanent()) { + throw e; + } + } + log.info(String.format("Streaming ingestion failed attempt %d", currentAttempt), e); + + if (sourceInfo instanceof StreamSourceInfo) { + try { + ((StreamSourceInfo) sourceInfo).getStream().reset(); + } catch (IOException ioException) { + throw new IngestionClientException("Failed to reset stream", ioException); + } + } - return Mono.empty(); + } + return null; + }); } @Override - protected Mono ingestFromResultSetAsyncImpl(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) { - return Mono.fromCallable(() -> { - Ensure.argIsNotNull(resultSetSourceInfo, "resultSetSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - resultSetSourceInfo.validate(); - ingestionProperties.validateResultSetProperties(); - return IngestionUtils.resultSetToStream(resultSetSourceInfo); - }) - .onErrorMap(IOException.class, e -> { - String msg = "Failed to read from ResultSet."; - log.error(msg, e); - return new IngestionClientException(msg, e); - }) - .flatMap(streamSourceInfo -> ingestFromStreamAsync(streamSourceInfo, ingestionProperties)); + protected IngestionResult ingestFromResultSetImpl(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException { + Ensure.argIsNotNull(resultSetSourceInfo, "resultSetSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); + + resultSetSourceInfo.validate(); + ingestionProperties.validateResultSetProperties(); + try { + StreamSourceInfo streamSourceInfo = IngestionUtils.resultSetToStream(resultSetSourceInfo); + return ingestFromStream(streamSourceInfo, ingestionProperties); + } catch (IOException ex) { + String msg = "Failed to read from ResultSet."; + log.error(msg, ex); + throw new IngestionClientException(msg, ex); + } } @Override - protected Mono ingestFromStreamAsyncImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) { - return Mono.fromCallable(() -> { - Ensure.argIsNotNull(streamSourceInfo, "streamSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - streamSourceInfo.validate(); - ingestionProperties.validate(); - - if (streamSourceInfo.getSourceId() == null) { - streamSourceInfo.setSourceId(UUID.randomUUID()); - } - return streamSourceInfo; - }) - .flatMap(sourceInfo -> Mono.fromCallable(() -> { - int availableBytes = sourceInfo.getStream().available(); - return queuingPolicy.shouldUseQueuedIngestion( - availableBytes, - streamSourceInfo.getRawSizeInBytes(), - streamSourceInfo.getCompressionType() != null, - ingestionProperties.getDataFormat()); - }).subscribeOn(Schedulers.boundedElastic()) // TODO: same - .flatMap(useQueued -> { - if (Boolean.TRUE.equals(useQueued)) { - return queuedIngestClient.ingestFromStreamAsync(streamSourceInfo, ingestionProperties); - } - return Mono.empty(); - })) - .switchIfEmpty(processStream(streamSourceInfo, ingestionProperties)) - .onErrorMap(IOException.class, e -> new IngestionClientException("Failed to read from stream.", e)); - } + protected IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException, IOException { + Ensure.argIsNotNull(streamSourceInfo, "streamSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - private Mono processStream(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) { - return Mono.fromCallable(() -> streamSourceInfo.getStream() instanceof ByteArrayInputStream - || streamSourceInfo.getStream() instanceof ResettableFileInputStream) - .flatMap(isKnownStreamType -> { - if (Boolean.TRUE.equals(isKnownStreamType)) { - StreamSourceInfo managedSourceInfo = new StreamSourceInfo(streamSourceInfo.getStream(), - true, streamSourceInfo.getSourceId(), streamSourceInfo.getCompressionType(), - streamSourceInfo.getRawSizeInBytes()); - return executeStream(managedSourceInfo, ingestionProperties, null) - .retryWhen(new ExponentialRetry(exponentialRetryTemplate).retry()) - .onErrorResume(ignored -> queuedIngestClient.ingestFromStreamAsync(managedSourceInfo, ingestionProperties)) - .doFinally(signal -> closeStreamSafely(managedSourceInfo)); - } else { - return Mono.fromCallable(() -> IngestionUtils.readBytesFromInputStream(streamSourceInfo.getStream(), - ManagedStreamingQueuingPolicy.MAX_STREAMING_STREAM_SIZE_BYTES + 1)) - .subscribeOn(Schedulers.boundedElastic())// TODO: same - .flatMap(streamingBytes -> { - InputStream byteArrayStream = new ByteArrayInputStream(streamingBytes); - int size = streamingBytes.length; - - boolean shouldUseQueuedIngestion = queuingPolicy.shouldUseQueuedIngestion( - size, - streamSourceInfo.getRawSizeInBytes(), - streamSourceInfo.getCompressionType() != null, - ingestionProperties.getDataFormat()); - if (shouldUseQueuedIngestion) { - log.info(FALLBACK_LOG_STRING); - StreamSourceInfo managedSourceInfo = new StreamSourceInfo( - new SequenceInputStream(byteArrayStream, streamSourceInfo.getStream()), - streamSourceInfo.isLeaveOpen(), streamSourceInfo.getSourceId(), streamSourceInfo.getCompressionType()); - - return queuedIngestClient.ingestFromStreamAsync(managedSourceInfo, ingestionProperties); - } - - if (!streamSourceInfo.isLeaveOpen()) { - - // From this point we don't need the original stream anymore, we cached it - try { - streamSourceInfo.getStream().close(); - } catch (IOException e) { - log.warn("Failed to close stream", e); - } - } - - StreamSourceInfo managedSourceInfo = new StreamSourceInfo(byteArrayStream, - true, streamSourceInfo.getSourceId(), streamSourceInfo.getCompressionType(), - streamSourceInfo.getRawSizeInBytes()); - return executeStream(managedSourceInfo, ingestionProperties, null) - .retryWhen(new ExponentialRetry(exponentialRetryTemplate).retry()) - .onErrorResume(ignored -> queuedIngestClient.ingestFromStreamAsync(managedSourceInfo, ingestionProperties)) - .doFinally(signal -> closeStreamSafely(managedSourceInfo)); - }); - } - }); - } + streamSourceInfo.validate(); + ingestionProperties.validate(); + + UUID sourceId = streamSourceInfo.getSourceId(); + if (sourceId == null) { + sourceId = UUID.randomUUID(); + } + + streamSourceInfo.setSourceId(sourceId); + byte[] streamingBytes; + InputStream byteArrayStream; + + if (queuingPolicy.shouldUseQueuedIngestion(streamSourceInfo.getStream().available(), streamSourceInfo.getRawSizeInBytes(), + streamSourceInfo.getCompressionType() != null, ingestionProperties.getDataFormat())) { + log.info(fallbackLogString); + return queuedIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties); + } - private void closeStreamSafely(StreamSourceInfo streamSourceInfo) { try { - streamSourceInfo.getStream().close(); + if (streamSourceInfo.getStream() instanceof ByteArrayInputStream || streamSourceInfo.getStream() instanceof ResettableFileInputStream) { + byteArrayStream = streamSourceInfo.getStream(); + } else { + // If its not a ByteArrayInputStream: + // Read 10mb (max streaming size), decide with that if we should stream + streamingBytes = IngestionUtils.readBytesFromInputStream(streamSourceInfo.getStream(), + ManagedStreamingQueuingPolicy.MAX_STREAMING_STREAM_SIZE_BYTES + 1); + byteArrayStream = new ByteArrayInputStream(streamingBytes); + int size = streamingBytes.length; + if (queuingPolicy.shouldUseQueuedIngestion(size, streamSourceInfo.getRawSizeInBytes(), + streamSourceInfo.getCompressionType() != null, ingestionProperties.getDataFormat())) { + log.info(fallbackLogString); + StreamSourceInfo managedSourceInfo = new StreamSourceInfo(new SequenceInputStream(byteArrayStream, streamSourceInfo.getStream()), + streamSourceInfo.isLeaveOpen(), sourceId, streamSourceInfo.getCompressionType()); + + return queuedIngestClient.ingestFromStream(managedSourceInfo, ingestionProperties); + } + + if (!streamSourceInfo.isLeaveOpen()) { + // From this point we don't need the original stream anymore, we cached it + try { + streamSourceInfo.getStream().close(); + } catch (IOException e) { + log.warn("Failed to close stream", e); + } + } + } } catch (IOException e) { - log.warn("Failed to close byte stream", e); + throw new IngestionClientException("Failed to read from stream.", e); + } + + StreamSourceInfo managedSourceInfo = new StreamSourceInfo(byteArrayStream, true, sourceId, streamSourceInfo.getCompressionType(), + streamSourceInfo.getRawSizeInBytes()); + try { + IngestionResult result = streamWithRetries(managedSourceInfo, ingestionProperties, null); + if (result != null) { + return result; + } + + return queuedIngestClient.ingestFromStream(managedSourceInfo, ingestionProperties); + } finally { + try { + managedSourceInfo.getStream().close(); + } catch (IOException e) { + log.warn("Failed to close byte stream", e); + } } } diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientImpl.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientImpl.java index 16e0b8148..5eae5fb91 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientImpl.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientImpl.java @@ -9,27 +9,14 @@ import com.azure.storage.blob.models.BlobStorageException; import com.azure.storage.common.policy.RequestRetryOptions; import com.azure.storage.queue.models.QueueStorageException; -import com.microsoft.azure.kusto.data.Client; -import com.microsoft.azure.kusto.data.ClientDetails; -import com.microsoft.azure.kusto.data.ClientFactory; -import com.microsoft.azure.kusto.data.Ensure; -import com.microsoft.azure.kusto.data.UriUtils; +import com.microsoft.azure.kusto.data.*; import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; import com.microsoft.azure.kusto.data.http.HttpClientFactory; import com.microsoft.azure.kusto.data.http.HttpClientProperties; import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException; import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException; -import com.microsoft.azure.kusto.ingest.result.IngestionResult; -import com.microsoft.azure.kusto.ingest.result.IngestionStatus; -import com.microsoft.azure.kusto.ingest.result.IngestionStatusInTableDescription; -import com.microsoft.azure.kusto.ingest.result.IngestionStatusResult; -import com.microsoft.azure.kusto.ingest.result.OperationStatus; -import com.microsoft.azure.kusto.ingest.result.TableReportIngestionResult; -import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo; -import com.microsoft.azure.kusto.ingest.source.CompressionType; -import com.microsoft.azure.kusto.ingest.source.FileSourceInfo; -import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo; -import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo; +import com.microsoft.azure.kusto.ingest.result.*; +import com.microsoft.azure.kusto.ingest.source.*; import com.microsoft.azure.kusto.ingest.utils.IngestionUtils; import com.microsoft.azure.kusto.ingest.utils.SecurityUtils; import com.microsoft.azure.kusto.ingest.utils.TableWithSas; @@ -37,8 +24,6 @@ import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -50,7 +35,6 @@ import java.util.LinkedList; import java.util.List; import java.util.UUID; -import java.util.function.Function; public class QueuedIngestClientImpl extends IngestClientBase implements QueuedIngestClient { @@ -96,14 +80,18 @@ public IngestionResourceManager getResourceManager() { } @Override - protected Mono ingestFromBlobAsyncImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) { - return Mono.fromCallable(() -> { - Ensure.argIsNotNull(blobSourceInfo, "blobSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); + protected IngestionResult ingestFromBlobImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException { + // Argument validation: + Ensure.argIsNotNull(blobSourceInfo, "blobSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - blobSourceInfo.validate(); - ingestionProperties.validate(); + blobSourceInfo.validate(); + ingestionProperties.validate(); + + try { ingestionProperties.setAuthorizationContextToken(resourceManager.getIdentityToken()); + List tableStatuses = new LinkedList<>(); // Create the ingestion message IngestionBlobInfo ingestionBlobInfo = new IngestionBlobInfo(blobSourceInfo.getBlobPath(), @@ -133,138 +121,119 @@ protected Mono ingestFromBlobAsyncImpl(BlobSourceInfo blobSourc status.setUpdatedOn(Instant.now()); status.setIngestionSourceId(ingestionBlobInfo.getId()); status.setIngestionSourcePath(urlWithoutSecrets); - boolean reportToTable = ingestionProperties.getReportLevel() != IngestionProperties.IngestionReportLevel.NONE && ingestionProperties.getReportMethod() != IngestionProperties.IngestionReportMethod.QUEUE; - List tableStatuses = new LinkedList<>(); - if (reportToTable) { status.setStatus(OperationStatus.Pending); - TableWithSas statusTable = resourceManager.getStatusTable(); + TableWithSas statusTable = resourceManager + .getStatusTable(); IngestionStatusInTableDescription ingestionStatusInTable = new IngestionStatusInTableDescription(); ingestionStatusInTable.setTableClient(statusTable.getTable()); ingestionStatusInTable.setTableConnectionString(statusTable.getUri()); ingestionStatusInTable.setPartitionKey(ingestionBlobInfo.getId().toString()); ingestionStatusInTable.setRowKey(ingestionBlobInfo.getId().toString()); ingestionBlobInfo.setIngestionStatusInTable(ingestionStatusInTable); - - return Mono.fromCallable(() -> { - azureStorageClient.azureTableInsertEntity(statusTable.getTable(), new TableEntity(id, id).setProperties(status.getEntityProperties())); - tableStatuses.add(ingestionBlobInfo.getIngestionStatusInTable()); - return tableStatuses; - }) - .publishOn(Schedulers.boundedElastic()) - .flatMap(insertedTableStatuses -> ResourceAlgorithms.postToQueueWithRetriesAsync(resourceManager, azureStorageClient, ingestionBlobInfo) - .thenReturn(new TableReportIngestionResult(insertedTableStatuses))) - .onErrorMap(e -> { - if (e instanceof BlobStorageException || e instanceof QueueStorageException || e instanceof TableServiceException) { - return new IngestionServiceException("Failed to ingest from blob", (Exception) e); - } else if (e instanceof IOException || e instanceof URISyntaxException) { - return new IngestionClientException("Failed to ingest from blob", e); - } else { - return e; - } - }); + azureStorageClient.azureTableInsertEntity(statusTable.getTable(), new TableEntity(id, id).setProperties(status.getEntityProperties())); + tableStatuses.add(ingestionBlobInfo.getIngestionStatusInTable()); } - return ResourceAlgorithms.postToQueueWithRetriesAsync(resourceManager, azureStorageClient, ingestionBlobInfo) - .thenReturn(new IngestionStatusResult(status)); - }) - .flatMap(Function.identity()); + ResourceAlgorithms.postToQueueWithRetries(resourceManager, azureStorageClient, ingestionBlobInfo); + + return reportToTable + ? new TableReportIngestionResult(tableStatuses) + : new IngestionStatusResult(status); + } catch (BlobStorageException | QueueStorageException | TableServiceException e) { + throw new IngestionServiceException("Failed to ingest from blob", e); + } catch (IOException | URISyntaxException e) { + throw new IngestionClientException("Failed to ingest from blob", e); + } } @Override - protected Mono ingestFromFileAsyncImpl(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) { - return Mono.fromCallable(() -> { - Ensure.argIsNotNull(fileSourceInfo, "fileSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - fileSourceInfo.validate(); - ingestionProperties.validate(); + protected IngestionResult ingestFromFileImpl(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException { + // Argument validation: + Ensure.argIsNotNull(fileSourceInfo, "fileSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); + + fileSourceInfo.validate(); + ingestionProperties.validate(); + try { String filePath = fileSourceInfo.getFilePath(); Ensure.fileExists(filePath); - return filePath; - }) - .onErrorMap(IOException.class, e -> new IngestionClientException("Failed to ingest from file", e)) - .flatMap(filePath -> { - CompressionType sourceCompressionType = IngestionUtils.getCompression(filePath); - IngestionProperties.DataFormat dataFormat = ingestionProperties.getDataFormat(); - boolean shouldCompress = shouldCompress(sourceCompressionType, dataFormat); - - File file = new File(filePath); - String blobName = genBlobName( - file.getName(), - ingestionProperties.getDatabaseName(), - ingestionProperties.getTableName(), - dataFormat.getKustoValue(), // Used to use an empty string if the DataFormat was empty. Now it can't be empty, with a default of - // CSV. - shouldCompress ? CompressionType.gz : sourceCompressionType); - - return ResourceAlgorithms.uploadLocalFileWithRetriesAsync(resourceManager, azureStorageClient, file, blobName, shouldCompress) - .flatMap(blobPath -> { - long rawDataSize = fileSourceInfo.getRawSizeInBytes() > 0L ? fileSourceInfo.getRawSizeInBytes() - : estimateFileRawSize(filePath, ingestionProperties.getDataFormat().isCompressible()); - BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath, rawDataSize, fileSourceInfo.getSourceId()); - return ingestFromBlobAsync(blobSourceInfo, ingestionProperties); - }) - .onErrorMap(BlobStorageException.class, e -> new IngestionServiceException("Failed to ingest from file", e)); - }); + CompressionType sourceCompressionType = IngestionUtils.getCompression(filePath); + IngestionProperties.DataFormat dataFormat = ingestionProperties.getDataFormat(); + boolean shouldCompress = shouldCompress(sourceCompressionType, dataFormat); + + File file = new File(filePath); + String blobName = genBlobName( + file.getName(), + ingestionProperties.getDatabaseName(), + ingestionProperties.getTableName(), + dataFormat.getKustoValue(), // Used to use an empty string if the DataFormat was empty. Now it can't be empty, with a default of CSV. + shouldCompress ? CompressionType.gz : sourceCompressionType); + + String blobPath = ResourceAlgorithms.uploadLocalFileWithRetries(resourceManager, azureStorageClient, file, blobName, shouldCompress); + + long rawDataSize = fileSourceInfo.getRawSizeInBytes() > 0L ? fileSourceInfo.getRawSizeInBytes() + : estimateFileRawSize(filePath, ingestionProperties.getDataFormat().isCompressible()); + + BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath, rawDataSize, fileSourceInfo.getSourceId()); + + return ingestFromBlob(blobSourceInfo, ingestionProperties); + } catch (BlobStorageException e) { + throw new IngestionServiceException("Failed to ingest from file", e); + } catch (IOException e) { + throw new IngestionClientException("Failed to ingest from file", e); + } } @Override - protected Mono ingestFromStreamAsyncImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) { - return Mono.fromCallable(() -> { - Ensure.argIsNotNull(streamSourceInfo, "streamSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - - streamSourceInfo.validate(); - ingestionProperties.validate(); - return true; - }) - .flatMap(valid -> Mono.fromCallable(() -> { - if (streamSourceInfo.getStream() == null) { - return Mono.error(new IngestionClientException("The provided stream is null.")); - } else if (streamSourceInfo.getStream().available() <= 0) { - return Mono.error(new IngestionClientException("The provided stream is empty.")); - } - return true; - })) - .flatMap(ignored -> { - IngestionProperties.DataFormat dataFormat = ingestionProperties.getDataFormat(); - boolean shouldCompress = shouldCompress(streamSourceInfo.getCompressionType(), dataFormat); - - String blobName = genBlobName( - "StreamUpload", - ingestionProperties.getDatabaseName(), - ingestionProperties.getTableName(), - dataFormat.getKustoValue(), // Used to use an empty string if the DataFormat was empty. Now it can't be empty, with a default of - // CSV. - shouldCompress ? CompressionType.gz : streamSourceInfo.getCompressionType()); - return ResourceAlgorithms.uploadStreamToBlobWithRetriesAsync(resourceManager, - azureStorageClient, - streamSourceInfo.getStream(), - blobName, - shouldCompress) - .flatMap(blobPath -> { - BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath, streamSourceInfo.getRawSizeInBytes(), - streamSourceInfo.getSourceId()); - return ingestFromBlobAsync(blobSourceInfo, ingestionProperties); - }) - .onErrorMap(BlobStorageException.class, e -> new IngestionServiceException("Failed to ingest from stream", e)) - .doFinally(signalType -> { - if (!streamSourceInfo.isLeaveOpen()) { - Mono.fromCallable(() -> { - try { - streamSourceInfo.getStream().close(); - return Mono.empty(); - } catch (IOException e) { - return Mono.error(new IngestionClientException("Failed to close stream after ingestion", e)); - } - }); - } - }); - }); - + protected IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException { + // Argument validation: + Ensure.argIsNotNull(streamSourceInfo, "streamSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); + + streamSourceInfo.validate(); + ingestionProperties.validate(); + + try { + IngestionResult ingestionResult; + if (streamSourceInfo.getStream() == null) { + throw new IngestionClientException("The provided stream is null."); + } else if (streamSourceInfo.getStream().available() <= 0) { + throw new IngestionClientException("The provided stream is empty."); + } + IngestionProperties.DataFormat dataFormat = ingestionProperties.getDataFormat(); + boolean shouldCompress = shouldCompress(streamSourceInfo.getCompressionType(), dataFormat); + + String blobName = genBlobName( + "StreamUpload", + ingestionProperties.getDatabaseName(), + ingestionProperties.getTableName(), + dataFormat.getKustoValue(), // Used to use an empty string if the DataFormat was empty. Now it can't be empty, with a default of CSV. + shouldCompress ? CompressionType.gz : streamSourceInfo.getCompressionType()); + + String blobPath = ResourceAlgorithms.uploadStreamToBlobWithRetries(resourceManager, + azureStorageClient, + streamSourceInfo.getStream(), + blobName, + shouldCompress); + + BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath, streamSourceInfo.getRawSizeInBytes(), streamSourceInfo.getSourceId()); + + ingestionResult = ingestFromBlob(blobSourceInfo, ingestionProperties); + if (!streamSourceInfo.isLeaveOpen()) { + streamSourceInfo.getStream().close(); + } + return ingestionResult; + } catch (BlobStorageException e) { + throw new IngestionServiceException("Failed to ingest from stream", e); + } catch (IOException e) { + throw new IngestionClientException("Failed to ingest from stream", e); + } } @Override @@ -288,28 +257,27 @@ String genBlobName(String fileName, String databaseName, String tableName, Strin } @Override - protected Mono ingestFromResultSetAsyncImpl(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) { - return Mono.fromCallable(() -> { - Ensure.argIsNotNull(resultSetSourceInfo, "resultSetSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - resultSetSourceInfo.validate(); - ingestionProperties.validateResultSetProperties(); - return true; - }) - .flatMap(valid -> Mono.fromCallable(() -> { - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - new CsvRoutines().write(resultSetSourceInfo.getResultSet(), byteArrayOutputStream); - byteArrayOutputStream.flush(); - ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray()); - return new StreamSourceInfo(byteArrayInputStream, false, resultSetSourceInfo.getSourceId()); - }) - .subscribeOn(Schedulers.boundedElastic())) // TODO: same - .flatMap(streamSourceInfo -> ingestFromStreamAsync(streamSourceInfo, ingestionProperties)) - .onErrorMap(IOException.class, e -> { - String msg = "Failed to read from ResultSet."; - log.error(msg, e); - return new IngestionClientException(msg, e); - }); + protected IngestionResult ingestFromResultSetImpl(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException { + // Argument validation: + Ensure.argIsNotNull(resultSetSourceInfo, "resultSetSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); + + resultSetSourceInfo.validate(); + ingestionProperties.validateResultSetProperties(); + try { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + new CsvRoutines().write(resultSetSourceInfo.getResultSet(), byteArrayOutputStream); + byteArrayOutputStream.flush(); + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray()); + + StreamSourceInfo streamSourceInfo = new StreamSourceInfo(byteArrayInputStream, false, resultSetSourceInfo.getSourceId()); + return ingestFromStream(streamSourceInfo, ingestionProperties); + } catch (IOException ex) { + String msg = "Failed to read from ResultSet."; + log.error(msg, ex); + throw new IngestionClientException(msg, ex); + } } protected void setConnectionDataSource(String connectionDataSource) { diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ResourceAlgorithms.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ResourceAlgorithms.java index 6be95144a..486e78ea9 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ResourceAlgorithms.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ResourceAlgorithms.java @@ -1,30 +1,25 @@ package com.microsoft.azure.kusto.ingest; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.microsoft.azure.kusto.data.Utils; import com.microsoft.azure.kusto.data.exceptions.ExceptionsUtils; import com.microsoft.azure.kusto.data.instrumentation.FunctionOneException; import com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity; +import com.microsoft.azure.kusto.data.instrumentation.Tracer; import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException; -import com.microsoft.azure.kusto.ingest.resources.QueueWithSas; +import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException; import com.microsoft.azure.kusto.ingest.resources.RankedStorageAccount; import com.microsoft.azure.kusto.ingest.resources.ResourceWithSas; import com.microsoft.azure.kusto.ingest.utils.SecurityUtils; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.Mono; import java.io.File; import java.io.InputStream; import java.lang.invoke.MethodHandles; -import java.util.AbstractMap; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -35,110 +30,75 @@ public class ResourceAlgorithms { private ResourceAlgorithms() { } - public static , TOut> Mono resourceActionWithRetriesAsync( - ResourceManager resourceManager, - List resources, - FunctionOneException, TWrapper, Exception> action, - String actionName, - Map additionalAttributes) { + private static , TOut> TOut resourceActionWithRetries(ResourceManager resourceManager, + List resources, FunctionOneException action, String actionName, Map additionalAttributes) + throws IngestionClientException { if (resources.isEmpty()) { - return Mono.error(new IngestionClientException(String.format("%s: No resources were provided.", actionName))); + throw new IngestionClientException(String.format("%s: No resources were provided.", actionName)); } List> totalAttributes = new ArrayList<>(); - - return attemptAction(1, resources, resourceManager, action, actionName, additionalAttributes, null, totalAttributes); - } - - private static , TOut> Mono attemptAction( - int attempt, - List resources, - ResourceManager resourceManager, - FunctionOneException, TWrapper, Exception> action, - String actionName, - Map additionalAttributes, - Exception ex, - List> totalAttributes) { - - if (attempt > RETRY_COUNT) { - String errorMessage = String.format("%s: All %d retries failed with last error: %s\n. Used resources: %s", - actionName, - RETRY_COUNT, - ex != null ? ExceptionsUtils.getMessageEx(ex) : "", - totalAttributes.stream() - .map(x -> String.format("%s (%s)", x.get("resource"), x.get("account"))) - .collect(Collectors.joining(", "))); - return Mono.error(new IngestionClientException(errorMessage)); + Exception ex = null; + for (int i = 0; i < RETRY_COUNT; i++) { + TWrapper resource = resources.get(i % resources.size()); + try { + Map attributes = new HashMap<>(); + attributes.put("resource", resource.getEndpointWithoutSas()); + attributes.put("account", resource.getAccountName()); + attributes.put("type", resource.getClass().getName()); + attributes.put("retry", String.valueOf(i)); + attributes.putAll(additionalAttributes); + totalAttributes.add(attributes); + + return MonitoredActivity.invoke((FunctionOneException) (Tracer.Span span) -> { + try { + TOut result = action.apply(resource); + resourceManager.reportIngestionResult(resource, true); + return result; + } catch (Exception e) { + resourceManager.reportIngestionResult(resource, false); + span.addException(e); + throw e; + } + }, actionName, attributes); + } catch (Exception e) { + ex = e; + log.warn(String.format("Error during retry %d of %d for %s", i + 1, RETRY_COUNT, actionName), e); + } } - - TWrapper resource = resources.get((attempt - 1) % resources.size()); - Map attributes = new HashMap<>(); - attributes.put("resource", resource.getEndpointWithoutSas()); - attributes.put("account", resource.getAccountName()); - attributes.put("type", resource.getClass().getName()); - attributes.put("retry", String.valueOf(attempt)); - attributes.putAll(additionalAttributes); - totalAttributes.add(attributes); - - log.info(String.format("Attempt %d of %d for %s.", attempt, RETRY_COUNT, actionName)); - return MonitoredActivity.invokeAsync( - span -> action.apply(resource) - .doOnSuccess(ignored -> resourceManager.reportIngestionResult(resource, true)), - actionName, - attributes) - .onErrorResume(e -> { - log.warn(String.format("Error during attempt %d of %d for %s.", attempt, RETRY_COUNT, actionName), e); - resourceManager.reportIngestionResult(resource, false); - return attemptAction(attempt + 1, resources, resourceManager, action, actionName, additionalAttributes, (Exception) e, totalAttributes); - }); + throw new IngestionClientException(String.format("%s: All %d retries failed with last error: %s\n. Used resources: %s", actionName, RETRY_COUNT, + totalAttributes.stream().map(x -> String.format("%s (%s)", x.get("resource"), x.get("account"))).collect(Collectors.joining(", ")), + ExceptionsUtils.getMessageEx(ex))); } - public static Mono postToQueueWithRetriesAsync(ResourceManager resourceManager, AzureStorageClient azureStorageClient, IngestionBlobInfo blob) { - return Mono.fromCallable(() -> { - ObjectMapper objectMapper = Utils.getObjectMapper(); - String message = objectMapper.writeValueAsString(blob); - List shuffledQueues = resourceManager.getShuffledQueues(); - return new AbstractMap.SimpleImmutableEntry<>(message, shuffledQueues); - }) - .flatMap(entry -> { - String message = entry.getKey(); - List shuffledQueues = entry.getValue(); - return resourceActionWithRetriesAsync(resourceManager, - shuffledQueues, - queue -> Mono.fromCallable(() -> { - azureStorageClient.postMessageToQueue(queue.getQueue(), message); // TODO: offload all sync calls to bounded elastic? - return null; - }), - "ResourceAlgorithms.postToQueueWithRetriesAsync", - Collections.singletonMap("blob", SecurityUtils.removeSecretsFromUrl(blob.getBlobPath()))); - }); + public static void postToQueueWithRetries(ResourceManager resourceManager, AzureStorageClient azureStorageClient, IngestionBlobInfo blob) + throws IngestionClientException, IngestionServiceException, JsonProcessingException { + ObjectMapper objectMapper = Utils.getObjectMapper(); + String message = objectMapper.writeValueAsString(blob); + resourceActionWithRetries(resourceManager, resourceManager.getShuffledQueues(), queue -> { + azureStorageClient.postMessageToQueue(queue.getQueue(), message); + return null; + }, "ResourceAlgorithms.postToQueueWithRetries", + Collections.singletonMap("blob", SecurityUtils.removeSecretsFromUrl(blob.getBlobPath()))); } - public static Mono uploadStreamToBlobWithRetriesAsync(ResourceManager resourceManager, AzureStorageClient azureStorageClient, InputStream stream, - String blobName, boolean shouldCompress) { - return Mono.fromCallable(resourceManager::getShuffledContainers) - .flatMap(shuffledContainers -> resourceActionWithRetriesAsync(resourceManager, - shuffledContainers, - container -> Mono.fromCallable(() -> { - azureStorageClient.uploadStreamToBlob(stream, blobName, container.getContainer(), shouldCompress); - return (container.getContainer().getBlobContainerUrl() + "/" + blobName + container.getSas()); - }), - "ResourceAlgorithms.uploadStreamToBlobWithRetriesAsync", - Collections.emptyMap())); + public static String uploadStreamToBlobWithRetries(ResourceManager resourceManager, AzureStorageClient azureStorageClient, InputStream stream, + String blobName, boolean shouldCompress) + throws IngestionClientException, IngestionServiceException { + return resourceActionWithRetries(resourceManager, resourceManager.getShuffledContainers(), container -> { + azureStorageClient.uploadStreamToBlob(stream, blobName, container.getContainer(), shouldCompress); + return (container.getContainer().getBlobContainerUrl() + "/" + blobName + container.getSas()); + }, "ResourceAlgorithms.uploadLocalFileWithRetries", Collections.emptyMap()); } - public static Mono uploadLocalFileWithRetriesAsync(ResourceManager resourceManager, AzureStorageClient azureStorageClient, File file, - String blobName, - boolean shouldCompress) { - return Mono.fromCallable(resourceManager::getShuffledContainers) - .flatMap(shuffledContainers -> resourceActionWithRetriesAsync(resourceManager, - shuffledContainers, - container -> Mono.fromCallable(() -> { - azureStorageClient.uploadLocalFileToBlob(file, blobName, container.getContainer(), shouldCompress); - return container.getContainer().getBlobContainerUrl() + "/" + blobName + container.getSas(); - }), "ResourceAlgorithms.uploadLocalFileWithRetriesAsync", - Collections.emptyMap())); + public static String uploadLocalFileWithRetries(ResourceManager resourceManager, AzureStorageClient azureStorageClient, File file, String blobName, + boolean shouldCompress) + throws IngestionClientException, IngestionServiceException { + return resourceActionWithRetries(resourceManager, resourceManager.getShuffledContainers(), container -> { + azureStorageClient.uploadLocalFileToBlob(file, blobName, container.getContainer(), shouldCompress); + return (container.getContainer().getBlobContainerUrl() + "/" + blobName + container.getSas()); + }, "ResourceAlgorithms.uploadLocalFileWithRetries", Collections.emptyMap()); } @NotNull diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/StreamingIngestClient.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/StreamingIngestClient.java index e32719f5a..8bc52c8b0 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/StreamingIngestClient.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/StreamingIngestClient.java @@ -7,17 +7,14 @@ import com.azure.storage.blob.BlobClient; import com.azure.storage.blob.BlobClientBuilder; import com.azure.storage.blob.models.BlobStorageException; -import com.microsoft.azure.kusto.data.ClientFactory; -import com.microsoft.azure.kusto.data.ClientRequestProperties; -import com.microsoft.azure.kusto.data.Ensure; -import com.microsoft.azure.kusto.data.StreamingClient; +import com.microsoft.azure.kusto.data.*; import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; import com.microsoft.azure.kusto.data.exceptions.DataClientException; import com.microsoft.azure.kusto.data.exceptions.DataServiceException; import com.microsoft.azure.kusto.data.exceptions.ExceptionsUtils; import com.microsoft.azure.kusto.data.http.HttpClientProperties; -import com.microsoft.azure.kusto.data.http.HttpStatus; import com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity; +import com.microsoft.azure.kusto.data.instrumentation.SupplierTwoExceptions; import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException; import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException; import com.microsoft.azure.kusto.ingest.result.IngestionResult; @@ -33,19 +30,12 @@ import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; + +import java.io.*; import java.lang.invoke.MethodHandles; import java.net.URI; import java.net.URISyntaxException; import java.util.Objects; -import java.util.function.Function; import java.util.zip.GZIPOutputStream; public class StreamingIngestClient extends IngestClientBase implements IngestClient { @@ -94,72 +84,70 @@ public static String generateEngineUriSuggestion(URI existingEndpoint) throws UR } @Override - protected Mono ingestFromFileAsyncImpl(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) { - return Mono.fromCallable(() -> { - Ensure.argIsNotNull(fileSourceInfo, "fileSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - fileSourceInfo.validate(); - ingestionProperties.validate(); - return IngestionUtils.fileToStream(fileSourceInfo, false, ingestionProperties.getDataFormat()); - }) - .onErrorMap(FileNotFoundException.class, e -> { - log.error("File not found when ingesting a file.", e); - return new IngestionClientException("IO exception - check file path.", e); - }) - .flatMap(streamSourceInfo -> ingestFromStreamAsync(streamSourceInfo, ingestionProperties)); + protected IngestionResult ingestFromFileImpl(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException { + Ensure.argIsNotNull(fileSourceInfo, "fileSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); + + fileSourceInfo.validate(); + ingestionProperties.validate(); + + try { + StreamSourceInfo streamSourceInfo = IngestionUtils.fileToStream(fileSourceInfo, false, ingestionProperties.getDataFormat()); + return ingestFromStream(streamSourceInfo, ingestionProperties); + } catch (FileNotFoundException e) { + log.error("File not found when ingesting a file.", e); + throw new IngestionClientException("IO exception - check file path.", e); + } } @Override - protected Mono ingestFromBlobAsyncImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) { - return Mono.fromCallable(() -> { - Ensure.argIsNotNull(blobSourceInfo, "blobSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - blobSourceInfo.validate(); - ingestionProperties.validate(); - return true; - }) - .flatMap(valid -> { - BlobClient blobClient; - try { - blobClient = new BlobClientBuilder().endpoint(blobSourceInfo.getBlobPath()).buildClient(); - } catch (IllegalArgumentException e) { - - // Handle IllegalArgumentException from BlobClient here to avoid overriding the exception - // thrown by the argument validations. - String msg = "Unexpected error when ingesting a blob - Invalid blob path."; - log.error(msg, e); - return Mono.error(new IngestionClientException(msg, e)); - } - - return ingestFromBlobAsync(blobSourceInfo, ingestionProperties, blobClient, null); - }) - .onErrorMap(BlobStorageException.class, e -> { - String msg = "Unexpected Storage error when ingesting a blob."; - log.error(msg, e); - return new IngestionClientException(msg, e); - }); + protected IngestionResult ingestFromBlobImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException { + Ensure.argIsNotNull(blobSourceInfo, "blobSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); + + blobSourceInfo.validate(); + ingestionProperties.validate(); + + try { + BlobClient blobClient = new BlobClientBuilder().endpoint(blobSourceInfo.getBlobPath()).buildClient(); + return ingestFromBlob(blobSourceInfo, ingestionProperties, blobClient, null); + } catch (IllegalArgumentException e) { + String msg = "Unexpected error when ingesting a blob - Invalid blob path."; + log.error(msg, e); + throw new IngestionClientException(msg, e); + } catch (BlobStorageException e) { + String msg = "Unexpected Storage error when ingesting a blob."; + log.error(msg, e); + throw new IngestionClientException(msg, e); + } } @Override - protected Mono ingestFromResultSetAsyncImpl(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) { - return Mono.fromCallable(() -> { - Ensure.argIsNotNull(resultSetSourceInfo, "resultSetSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - resultSetSourceInfo.validate(); - ingestionProperties.validateResultSetProperties(); - return IngestionUtils.resultSetToStream(resultSetSourceInfo); - }) - .flatMap(streamSourceInfo -> ingestFromStreamAsync(streamSourceInfo, ingestionProperties)) - .onErrorMap(IOException.class, e -> { - String msg = "Failed to read from ResultSet."; - log.error(msg, e); - return new IngestionClientException(msg, e); - }); + protected IngestionResult ingestFromResultSetImpl(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException { + // Argument validation: + Ensure.argIsNotNull(resultSetSourceInfo, "resultSetSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); + + resultSetSourceInfo.validate(); + ingestionProperties.validateResultSetProperties(); + + try { + StreamSourceInfo streamSourceInfo = IngestionUtils.resultSetToStream(resultSetSourceInfo); + return ingestFromStream(streamSourceInfo, ingestionProperties); + } catch (IOException ex) { + String msg = "Failed to read from ResultSet."; + log.error(msg, ex); + throw new IngestionClientException(msg, ex); + } } @Override - protected Mono ingestFromStreamAsyncImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) { - return ingestFromStreamImplAsync(streamSourceInfo, ingestionProperties, null); + protected IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) + throws IngestionClientException, IngestionServiceException { + return ingestFromStreamImpl(streamSourceInfo, ingestionProperties, null); } @Override @@ -167,83 +155,59 @@ protected String getClientType() { return CLASS_NAME; } - Mono ingestFromStreamAsync(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties, @Nullable String clientRequestId) { - // trace ingestFromStreamAsync - return MonitoredActivity.wrap( - ingestFromStreamImplAsync(streamSourceInfo, + IngestionResult ingestFromStream(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties, @Nullable String clientRequestId) + throws IngestionClientException, IngestionServiceException { + // trace ingestFromStream + return MonitoredActivity.invoke( + (SupplierTwoExceptions) () -> ingestFromStreamImpl(streamSourceInfo, ingestionProperties, clientRequestId), getClientType().concat(".ingestFromStream"), getIngestionTraceAttributes(streamSourceInfo, ingestionProperties)); } - private Mono ingestFromStreamImplAsync(StreamSourceInfo streamSourceInfo, - IngestionProperties ingestionProperties, - @Nullable String clientRequestId) { - return Mono.fromCallable(() -> { - Ensure.argIsNotNull(streamSourceInfo, "streamSourceInfo"); - Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); - ingestionProperties.validate(); - streamSourceInfo.validate(); - - return ingestionProperties.getDataFormat(); - }) - .onErrorMap(IOException.class, e -> { - String msg = ExceptionsUtils.getMessageEx(e); - log.error(msg, e); - return new IngestionClientException(msg, e); - }) - .flatMap(dataFormat -> Mono.fromCallable(() -> { - if (IngestClientBase.shouldCompress(streamSourceInfo.getCompressionType(), dataFormat)) { - return compressStream(streamSourceInfo.getStream(), streamSourceInfo.isLeaveOpen()); - } else { - return streamSourceInfo.getStream(); - } - }) - .subscribeOn(Schedulers.boundedElastic()) - .map(stream -> new Object[] {stream, dataFormat}) // Pass both stream and dataFormat downstream - ) - .flatMap(tuple -> { - InputStream stream = (InputStream) tuple[0]; - IngestionProperties.DataFormat dataFormat = (IngestionProperties.DataFormat) tuple[1]; - - ClientRequestProperties clientRequestProperties = null; - if (StringUtils.isNotBlank(clientRequestId)) { - clientRequestProperties = new ClientRequestProperties(); - clientRequestProperties.setClientRequestId(clientRequestId); - } - - ClientRequestProperties finalClientRequestProperties = clientRequestProperties; - return Mono.fromCallable(() -> { - log.debug("Executing streaming ingest"); - return this.streamingClient.executeStreamingIngestAsync( - ingestionProperties.getDatabaseName(), - ingestionProperties.getTableName(), - stream, - finalClientRequestProperties, - dataFormat.getKustoValue(), - ingestionProperties.getIngestionMapping().getIngestionMappingReference(), - !(streamSourceInfo.getCompressionType() == null || !streamSourceInfo.isLeaveOpen())); - }) - .flatMap(Function.identity()) - .doOnSuccess(ignored -> log.debug("Stream was ingested successfully.")) - .then(Mono.fromCallable(() -> { - log.debug("Stream was ingested successfully."); - IngestionStatus ingestionStatus = new IngestionStatus(); - ingestionStatus.status = OperationStatus.Succeeded; - ingestionStatus.table = ingestionProperties.getTableName(); - ingestionStatus.database = ingestionProperties.getDatabaseName(); - return (IngestionResult) new IngestionStatusResult(ingestionStatus); - })) - .onErrorMap(DataClientException.class, e -> { - String msg = ExceptionsUtils.getMessageEx(e); - log.error(msg, e); - return new IngestionClientException(msg, e); - }) - .onErrorMap(DataServiceException.class, e -> { - log.error(e.getMessage(), e); - return new IngestionServiceException(e.getMessage(), e); - }); - }); + private IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties, @Nullable String clientRequestId) + throws IngestionClientException, IngestionServiceException { + Ensure.argIsNotNull(streamSourceInfo, "streamSourceInfo"); + Ensure.argIsNotNull(ingestionProperties, "ingestionProperties"); + + IngestionProperties.DataFormat dataFormat = ingestionProperties.getDataFormat(); + + streamSourceInfo.validate(); + ingestionProperties.validate(); + + ClientRequestProperties clientRequestProperties = null; + if (StringUtils.isNotBlank(clientRequestId)) { + clientRequestProperties = new ClientRequestProperties(); + clientRequestProperties.setClientRequestId(clientRequestId); + } + + try { + InputStream stream = IngestClientBase.shouldCompress(streamSourceInfo.getCompressionType(), dataFormat) + ? compressStream(streamSourceInfo.getStream(), streamSourceInfo.isLeaveOpen()) + : streamSourceInfo.getStream(); + log.debug("Executing streaming ingest"); + this.streamingClient.executeStreamingIngest(ingestionProperties.getDatabaseName(), + ingestionProperties.getTableName(), + stream, + clientRequestProperties, + dataFormat.getKustoValue(), + ingestionProperties.getIngestionMapping().getIngestionMappingReference(), + !(streamSourceInfo.getCompressionType() == null || !streamSourceInfo.isLeaveOpen())); + } catch (DataClientException | IOException e) { + String msg = ExceptionsUtils.getMessageEx(e); + log.error(msg, e); + throw new IngestionClientException(msg, e); + } catch (DataServiceException e) { + log.error(e.getMessage(), e); + throw new IngestionServiceException(e.getMessage(), e); + } + + log.debug("Stream was ingested successfully."); + IngestionStatus ingestionStatus = new IngestionStatus(); + ingestionStatus.status = OperationStatus.Succeeded; + ingestionStatus.table = ingestionProperties.getTableName(); + ingestionStatus.database = ingestionProperties.getDatabaseName(); + return new IngestionStatusResult(ingestionStatus); } private InputStream compressStream(InputStream uncompressedStream, boolean leaveOpen) throws IngestionClientException, IOException { @@ -270,63 +234,61 @@ private InputStream compressStream(InputStream uncompressedStream, boolean leave return inputStream; } - Mono ingestFromBlobAsync(BlobSourceInfo blobSourceInfo, + IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties, BlobClient cloudBlockBlob, - @Nullable String clientRequestId) { - // trace ingestFromBlobAsync - return MonitoredActivity.wrap( - ingestFromBlobImplAsync(blobSourceInfo, + @Nullable String clientRequestId) + throws IngestionClientException, IngestionServiceException { + // trace ingestFromBlob + return MonitoredActivity.invoke( + (SupplierTwoExceptions) () -> ingestFromBlobImpl(blobSourceInfo, ingestionProperties, cloudBlockBlob, clientRequestId), getClientType().concat(".ingestFromBlob"), getIngestionTraceAttributes(blobSourceInfo, ingestionProperties)); } - private Mono ingestFromBlobImplAsync(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties, BlobClient cloudBlockBlob, - @Nullable String clientRequestId) { - - return Mono.fromCallable(() -> { + private IngestionResult ingestFromBlobImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties, BlobClient cloudBlockBlob, + @Nullable String clientRequestId) + throws IngestionClientException, IngestionServiceException { + String blobPath = blobSourceInfo.getBlobPath(); + try { + // No need to check blob size if it was given to us that it's not empty if (blobSourceInfo.getRawSizeInBytes() == 0 && cloudBlockBlob.getProperties().getBlobSize() == 0) { String message = "Empty blob."; log.error(message); throw new IngestionClientException(message); } - return true; - }).subscribeOn(Schedulers.boundedElastic())// TODO: same - .onErrorMap(BlobStorageException.class, e -> new IngestionClientException(String.format("Exception trying to read blob metadata,%s", - e.getStatusCode() == HttpStatus.FORBIDDEN ? "this might mean the blob doesn't exist" : ""), e)) - .flatMap(ignored -> Mono.fromCallable(() -> { - String blobPath = blobSourceInfo.getBlobPath(); - ClientRequestProperties clientRequestProperties = null; - if (StringUtils.isNotBlank(clientRequestId)) { - clientRequestProperties = new ClientRequestProperties(); - clientRequestProperties.setClientRequestId(clientRequestId); - } - - IngestionProperties.DataFormat dataFormat = ingestionProperties.getDataFormat(); - return this.streamingClient.executeStreamingIngestFromBlobAsync(ingestionProperties.getDatabaseName(), - ingestionProperties.getTableName(), - blobPath, - clientRequestProperties, - dataFormat.getKustoValue(), - ingestionProperties.getIngestionMapping().getIngestionMappingReference()); - }) - .onErrorMap(DataClientException.class, e -> { - log.error(e.getMessage(), e); - return new IngestionClientException(e.getMessage(), e); - }) - .onErrorMap(DataServiceException.class, e -> { - log.error(e.getMessage(), e); - return new IngestionServiceException(e.getMessage(), e); - }) - .doOnSuccess(ignored1 -> log.debug("Blob was ingested successfully.")) - .then(Mono.fromCallable(() -> { - IngestionStatus ingestionStatus = new IngestionStatus(); - ingestionStatus.status = OperationStatus.Succeeded; - ingestionStatus.table = ingestionProperties.getTableName(); - ingestionStatus.database = ingestionProperties.getDatabaseName(); - return new IngestionStatusResult(ingestionStatus); - }))); + } catch (BlobStorageException ex) { + throw new IngestionClientException(String.format("Exception trying to read blob metadata,%s", + ex.getStatusCode() == 403 ? "this might mean the blob doesn't exist" : ""), ex); + } + ClientRequestProperties clientRequestProperties = null; + if (StringUtils.isNotBlank(clientRequestId)) { + clientRequestProperties = new ClientRequestProperties(); + clientRequestProperties.setClientRequestId(clientRequestId); + } + IngestionProperties.DataFormat dataFormat = ingestionProperties.getDataFormat(); + try { + this.streamingClient.executeStreamingIngestFromBlob(ingestionProperties.getDatabaseName(), + ingestionProperties.getTableName(), + blobPath, + clientRequestProperties, + dataFormat.getKustoValue(), + ingestionProperties.getIngestionMapping().getIngestionMappingReference()); + } catch (DataClientException e) { + log.error(e.getMessage(), e); + throw new IngestionClientException(e.getMessage(), e); + } catch (DataServiceException e) { + log.error(e.getMessage(), e); + throw new IngestionServiceException(e.getMessage(), e); + } + + log.debug("Blob was ingested successfully."); + IngestionStatus ingestionStatus = new IngestionStatus(); + ingestionStatus.status = OperationStatus.Succeeded; + ingestionStatus.table = ingestionProperties.getTableName(); + ingestionStatus.database = ingestionProperties.getDatabaseName(); + return new IngestionStatusResult(ingestionStatus); } protected void setConnectionDataSource(String connectionDataSource) { diff --git a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/ManagedStreamingTest.java b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/ManagedStreamingTest.java index 2995b306c..13a1e50f6 100644 --- a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/ManagedStreamingTest.java +++ b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/ManagedStreamingTest.java @@ -13,27 +13,19 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; -import reactor.core.publisher.Mono; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.isNull; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; public class ManagedStreamingTest { private static final ResourceManager resourceManagerMock = mock(ResourceManager.class); @@ -177,9 +169,9 @@ void ManagedStreaming_BigFile_ShouldQueueTheFullStream() throws IOException, Ing int size = inputStream.bb.available(); StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream); ArgumentCaptor streamSourceInfoCaptor = ArgumentCaptor.forClass(StreamSourceInfo.class); - when(queuedIngestClientMock.ingestFromStreamAsync(any(), any())).thenReturn(Mono.empty()); + managedStreamingIngestClientSpy.ingestFromStream(streamSourceInfo, ingestionProperties); - verify(queuedIngestClientMock, times(1)).ingestFromStreamAsync(streamSourceInfoCaptor.capture(), any()); + verify(queuedIngestClientMock, times(1)).ingestFromStream(streamSourceInfoCaptor.capture(), any()); StreamSourceInfo value = streamSourceInfoCaptor.getValue(); int queuedStreamSize = getStreamSize(value.getStream()); diff --git a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientTest.java b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientTest.java index 32e0cbe49..1cf67a205 100644 --- a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientTest.java +++ b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientTest.java @@ -12,11 +12,7 @@ import com.microsoft.azure.kusto.ingest.result.IngestionStatus; import com.microsoft.azure.kusto.ingest.result.OperationStatus; import com.microsoft.azure.kusto.ingest.result.ValidationPolicy; -import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo; -import com.microsoft.azure.kusto.ingest.source.CompressionType; -import com.microsoft.azure.kusto.ingest.source.FileSourceInfo; -import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo; -import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo; +import com.microsoft.azure.kusto.ingest.source.*; import com.microsoft.azure.kusto.ingest.utils.IngestionUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -26,40 +22,19 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; -import reactor.core.publisher.Mono; -import reactor.test.StepVerifier; import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; import java.nio.file.Files; import java.nio.file.Paths; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; +import java.sql.*; import java.util.Collections; import java.util.function.BiFunction; import java.util.stream.Stream; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyBoolean; -import static org.mockito.Mockito.anyString; -import static org.mockito.Mockito.atLeast; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; class QueuedIngestClientTest { private static final ResourceManager resourceManagerMock = mock(ResourceManager.class); @@ -215,12 +190,11 @@ void ingestFromFile_NullFileSourceInfo_IllegalArgumentException() { } @Test - void ingestFromFileAsync_FileDoesNotExist_IngestionClientException() { + void ingestFromFile_FileDoesNotExist_IngestionClientException() { FileSourceInfo fileSourceInfo = new FileSourceInfo("file.path", 100); - Mono result = queuedIngestClient.ingestFromFileAsync(fileSourceInfo, ingestionProperties); - StepVerifier.create(result) - .expectError(IngestionClientException.class) - .verify(); + assertThrows( + IngestionClientException.class, + () -> queuedIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties)); } @Test @@ -263,40 +237,40 @@ void ingestFromResultSet_NullResultSetSourceInfo_IllegalArgumentException() { } @Test - void ingestFromResultSetAsync_StreamIngest_IngestionClientException() throws Exception { + void ingestFromResultSet_StreamIngest_IngestionClientException() throws Exception { try (IngestClient ingestClient = new QueuedIngestClientImpl(resourceManagerMock, azureStorageClientMock)) { // we need a spy to intercept the call to ingestFromStream so it wouldn't be called IngestClient ingestClientSpy = spy(ingestClient); IngestionClientException ingestionClientException = new IngestionClientException( "Client exception in ingestFromFile"); - doReturn(Mono.error(ingestionClientException)).when(ingestClientSpy).ingestFromStreamAsync(any(), any()); + doThrow(ingestionClientException).when(ingestClientSpy).ingestFromStream(any(), any()); ResultSet resultSet = getSampleResultSet(); ResultSetSourceInfo resultSetSourceInfo = new ResultSetSourceInfo(resultSet); - StepVerifier.create(ingestClientSpy.ingestFromResultSetAsync(resultSetSourceInfo, ingestionProperties)) - .expectError(IngestionClientException.class) - .verify(); + assertThrows( + IngestionClientException.class, + () -> ingestClientSpy.ingestFromResultSet(resultSetSourceInfo, ingestionProperties)); } } @Test - void ingestFromResultSetAsync_StreamIngest_IngestionServiceException() throws Exception { + void ingestFromResultSet_StreamIngest_IngestionServiceException() throws Exception { try (IngestClient ingestClient = new QueuedIngestClientImpl(resourceManagerMock, azureStorageClientMock)) { // we need a spy to intercept the call to ingestFromStream so it wouldn't be called IngestClient ingestClientSpy = spy(ingestClient); IngestionServiceException ingestionServiceException = new IngestionServiceException( "Service exception in ingestFromFile"); - doReturn(Mono.error(ingestionServiceException)).when(ingestClientSpy).ingestFromStreamAsync(any(), any()); + doThrow(ingestionServiceException).when(ingestClientSpy).ingestFromStream(any(), any()); ResultSet resultSet = getSampleResultSet(); ResultSetSourceInfo resultSetSourceInfo = new ResultSetSourceInfo(resultSet); - StepVerifier.create(ingestClientSpy.ingestFromResultSetAsync(resultSetSourceInfo, ingestionProperties)) - .expectError(IngestionServiceException.class) - .verify(); + assertThrows( + IngestionServiceException.class, + () -> ingestClientSpy.ingestFromResultSet(resultSetSourceInfo, ingestionProperties)); } } @@ -306,7 +280,7 @@ void ingestFromResultSet_StreamIngest_VerifyStreamContent() throws Exception { // we need a spy to intercept the call to ingestFromStream so it wouldn't be called IngestClient ingestClientSpy = spy(ingestClient); - doReturn(Mono.empty()).when(ingestClientSpy).ingestFromStreamAsync(any(), any()); + doReturn(null).when(ingestClientSpy).ingestFromStream(any(), any()); ResultSet resultSet = getSampleResultSet(); ResultSetSourceInfo resultSetSourceInfo = new ResultSetSourceInfo(resultSet); @@ -315,7 +289,7 @@ void ingestFromResultSet_StreamIngest_VerifyStreamContent() throws Exception { ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(StreamSourceInfo.class); - verify(ingestClientSpy, atLeastOnce()).ingestFromStreamAsync(argumentCaptor.capture(), any()); + verify(ingestClientSpy, atLeastOnce()).ingestFromStream(argumentCaptor.capture(), any()); InputStream ingestFromStreamReceivedStream = argumentCaptor.getValue().getStream(); int len = ingestFromStreamReceivedStream.available(); diff --git a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/ResourceManagerTimerTest.java b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/ResourceManagerTimerTest.java index 0721465d4..13ee2381a 100644 --- a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/ResourceManagerTimerTest.java +++ b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/ResourceManagerTimerTest.java @@ -3,6 +3,8 @@ import com.microsoft.azure.kusto.data.BaseClient; import com.microsoft.azure.kusto.data.Client; import com.microsoft.azure.kusto.data.KustoOperationResult; +import com.microsoft.azure.kusto.data.exceptions.DataClientException; +import com.microsoft.azure.kusto.data.exceptions.DataServiceException; import com.microsoft.azure.kusto.data.exceptions.KustoServiceQueryError; import org.junit.jupiter.api.Test; import org.mockito.stubbing.Answer; @@ -15,16 +17,14 @@ import static com.microsoft.azure.kusto.ingest.ResourceManagerTest.generateIngestionAuthTokenResult; import static com.microsoft.azure.kusto.ingest.ResourceManagerTest.generateIngestionResourcesResult; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; class ResourceManagerTimerTest { @Test - void timerTest() throws InterruptedException, KustoServiceQueryError, IOException { + void timerTest() throws DataClientException, DataServiceException, InterruptedException, KustoServiceQueryError, IOException { Client mockedClient = mock(Client.class); final List refreshTimestamps = new ArrayList<>(); AtomicBoolean gotHere = new AtomicBoolean(false); @@ -61,17 +61,17 @@ void timerTest() throws InterruptedException, KustoServiceQueryError, IOExceptio } @Test - void timerTestFailureGettingResources() throws InterruptedException { + void timerTestFailureGettingResources() throws DataClientException, DataServiceException, InterruptedException { Client mockedClient = mock(Client.class); final List refreshTimestamps = new ArrayList<>(); AtomicBoolean gotHere = new AtomicBoolean(false); when(mockedClient.executeMgmt(Commands.IDENTITY_GET_COMMAND)) - .thenThrow(new RuntimeException(BaseClient.createExceptionFromResponse("https://sample.kusto.windows.net", null, new Exception(), "error"))); + .thenThrow(BaseClient.createExceptionFromResponse("https://sample.kusto.windows.net", null, new Exception(), "error")); when(mockedClient.executeMgmt(Commands.INGESTION_RESOURCES_SHOW_COMMAND)) - .thenAnswer(invocation -> { - refreshTimestamps.add(new Date()); + .then((Answer) invocationOnMock -> { + refreshTimestamps.add((new Date())); gotHere.set(true); - throw new RuntimeException(BaseClient.createExceptionFromResponse("https://sample.kusto.windows.net", null, new Exception(), "error")); + throw BaseClient.createExceptionFromResponse("https://sample.kusto.windows.net", null, new Exception(), "error"); }); ResourceManager resourceManager = new ResourceManager(mockedClient, 1000L, 500L, null); diff --git a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/StreamingIngestClientTest.java b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/StreamingIngestClientTest.java index 731f8a5ad..6af048369 100644 --- a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/StreamingIngestClientTest.java +++ b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/StreamingIngestClientTest.java @@ -15,11 +15,7 @@ import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException; import com.microsoft.azure.kusto.ingest.result.IngestionResult; import com.microsoft.azure.kusto.ingest.result.OperationStatus; -import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo; -import com.microsoft.azure.kusto.ingest.source.CompressionType; -import com.microsoft.azure.kusto.ingest.source.FileSourceInfo; -import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo; -import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo; +import com.microsoft.azure.kusto.ingest.source.*; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -30,8 +26,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; -import reactor.core.publisher.Mono; -import reactor.test.StepVerifier; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -47,15 +41,8 @@ import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.isNull; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; class StreamingIngestClientTest { private static StreamingIngestClient streamingIngestClient; @@ -79,11 +66,11 @@ static void setUp() { @BeforeEach void setUpEach() throws Exception { ingestionProperties = new IngestionProperties("dbName", "tableName"); - when(streamingClientMock.executeStreamingIngestAsync(any(String.class), any(String.class), any(InputStream.class), - isNull(), any(String.class), any(String.class), any(boolean.class))).thenReturn(Mono.empty()); + when(streamingClientMock.executeStreamingIngest(any(String.class), any(String.class), any(InputStream.class), + isNull(), any(String.class), any(String.class), any(boolean.class))).thenReturn(null); - when(streamingClientMock.executeStreamingIngestAsync(any(String.class), any(String.class), any(InputStream.class), - isNull(), any(String.class), isNull(), any(boolean.class))).thenReturn(Mono.empty()); + when(streamingClientMock.executeStreamingIngest(any(String.class), any(String.class), any(InputStream.class), + isNull(), any(String.class), isNull(), any(boolean.class))).thenReturn(null); } @Test @@ -93,7 +80,7 @@ void IngestFromStream_CsvStream() throws Exception { StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream); OperationStatus status = streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status; assertEquals(OperationStatus.Succeeded, status); - verify(streamingClientMock, atLeastOnce()).executeStreamingIngestAsync(any(String.class), any(String.class), argumentCaptor.capture(), + verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(), isNull(), any(String.class), isNull(), any(boolean.class)); /* @@ -111,13 +98,11 @@ void ingestFromStream_CsvStream_WithClientRequestId() throws Exception { InputStream inputStream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(data).array()); StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream); String clientRequestId = "clientRequestId"; - IngestionResult ingestionResult = streamingIngestClient.ingestFromStreamAsync(streamSourceInfo, ingestionProperties, clientRequestId).block(); - assertNotNull(ingestionResult); - OperationStatus status = ingestionResult.getIngestionStatusCollection() + OperationStatus status = streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties, clientRequestId).getIngestionStatusCollection() .get(0).status; assertEquals(OperationStatus.Succeeded, status); ArgumentCaptor clientRequestPropertiesArgumentCaptor = ArgumentCaptor.forClass(ClientRequestProperties.class); - verify(streamingClientMock, atLeastOnce()).executeStreamingIngestAsync(any(String.class), any(String.class), argumentCaptor.capture(), + verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(), clientRequestPropertiesArgumentCaptor.capture(), any(String.class), isNull(), any(boolean.class)); /* @@ -146,7 +131,7 @@ void ingestFromStream_CompressedCsvStream() throws Exception { streamSourceInfo.setCompressionType(CompressionType.gz); OperationStatus status = streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status; assertEquals(OperationStatus.Succeeded, status); - verify(streamingClientMock, atLeastOnce()).executeStreamingIngestAsync(any(String.class), any(String.class), argumentCaptor.capture(), + verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(), isNull(), any(String.class), isNull(), any(boolean.class)); InputStream stream = argumentCaptor.getValue(); @@ -162,7 +147,7 @@ void ingestFromStream_JsonStream() throws Exception { ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.JSON); OperationStatus status = streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status; assertEquals(OperationStatus.Succeeded, status); - verify(streamingClientMock, atLeastOnce()).executeStreamingIngestAsync(any(String.class), any(String.class), argumentCaptor.capture(), + verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(), isNull(), any(String.class), any(String.class), any(boolean.class)); InputStream stream = argumentCaptor.getValue(); @@ -185,7 +170,7 @@ void ingestFromStream_CompressedJsonStream() throws Exception { ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.JSON); OperationStatus status = streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status; assertEquals(OperationStatus.Succeeded, status); - verify(streamingClientMock, atLeastOnce()).executeStreamingIngestAsync(any(String.class), any(String.class), argumentCaptor.capture(), + verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(), isNull(), any(String.class), any(String.class), any(boolean.class)); InputStream stream = argumentCaptor.getValue(); @@ -235,16 +220,16 @@ void ingestFromStream_JsonNoMappingReference_IngestionSucceeds() } @Test - void ingestFromStreamAsync_JsonWrongMappingKind_IngestionClientException() { + void ingestFromStream_JsonWrongMappingKind_IngestionClientException() { String data = "{\"Name\": \"name\", \"Age\": \"age\", \"Weight\": \"weight\", \"Height\": \"height\"}"; InputStream inputStream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(data).array()); StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream); ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); ingestionProperties.setIngestionMapping("CsvMapping", IngestionMapping.IngestionMappingKind.CSV); - StepVerifier.create(streamingIngestClient.ingestFromStreamAsync(streamSourceInfo, ingestionProperties)) - .expectErrorMatches(e -> e instanceof IngestionClientException - && e.getMessage().contains("Wrong ingestion mapping for format 'json'; mapping kind should be 'Json', but was 'Csv'.")) - .verify(); + IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class, + () -> streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties), + "Expected IngestionClientException to be thrown, but it didn't"); + assertTrue(ingestionClientException.getMessage().contains("Wrong ingestion mapping for format 'json'; mapping kind should be 'Json', but was 'Csv'.")); } @Test @@ -260,53 +245,51 @@ void ingestFromStream_AvroNoMappingReference_IngestionSucceeds() } @Test - void ingestFromStreamAsync_AvroWrongMappingKind_IngestionClientException() { + void ingestFromStream_AvroWrongMappingKind_IngestionClientException() { InputStream inputStream = new ByteArrayInputStream(new byte[10]); StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream); ingestionProperties.setDataFormat(IngestionProperties.DataFormat.AVRO); ingestionProperties.setIngestionMapping("CsvMapping", IngestionMapping.IngestionMappingKind.CSV); - StepVerifier.create(streamingIngestClient.ingestFromStreamAsync(streamSourceInfo, ingestionProperties)) - .expectErrorMatches(e -> e instanceof IngestionClientException - && e.getMessage().contains("Wrong ingestion mapping for format 'avro'; mapping kind should be 'Avro', but was 'Csv'.")) - .verify(); + IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class, + () -> streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties), + "Expected IngestionClientException to be thrown, but it didn't"); + assertTrue(ingestionClientException.getMessage().contains("Wrong ingestion mapping for format 'avro'; mapping kind should be 'Avro', but was 'Csv'.")); } @Test - void ingestFromStreamAsync_EmptyStream_IngestionClientException() { + void ingestFromStream_EmptyStream_IngestionClientException() { InputStream inputStream = new ByteArrayInputStream(new byte[0]); StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream); - StepVerifier.create(streamingIngestClient.ingestFromStreamAsync(streamSourceInfo, ingestionProperties)) - .expectErrorMatches(e -> e instanceof IngestionClientException && e.getMessage().contains("Empty stream.")) - .verify(); + IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class, + () -> streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties), + "Expected IngestionClientException to be thrown, but it didn't"); + assertTrue(ingestionClientException.getMessage().contains("Empty stream.")); } @Test - void ingestFromStreamAsync_CaughtDataClientException_IngestionClientException() throws Exception { - when(streamingClientMock.executeStreamingIngestAsync(any(String.class), any(String.class), any(InputStream.class), - isNull(), any(String.class), isNull(), any(boolean.class))).thenReturn(Mono.error(new DataClientException("DataClientException"))); + void ingestFromStream_CaughtDataClientException_IngestionClientException() throws Exception { + when(streamingClientMock.executeStreamingIngest(any(String.class), any(String.class), any(InputStream.class), + isNull(), any(String.class), isNull(), any(boolean.class))).thenThrow(DataClientException.class); String data = "Name, Age, Weight, Height"; InputStream inputStream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(data).array()); StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream); - StepVerifier.create(streamingIngestClient.ingestFromStreamAsync(streamSourceInfo, ingestionProperties)) - .expectErrorMatches(e -> e instanceof IngestionClientException - && "DataClientException".equals(e.getMessage())) - .verify(); + assertThrows(IngestionClientException.class, + () -> streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties), + "Expected IllegalArgumentException to be thrown, but it didn't"); } @Test void ingestFromStream_CaughtDataServiceException_IngestionServiceException() throws Exception { - when(streamingClientMock.executeStreamingIngestAsync(any(String.class), any(String.class), any(InputStream.class), - isNull(), any(String.class), isNull(), any(boolean.class))) - .thenReturn(Mono.error(new DataServiceException("ingestFromStream", "DataServiceException", true))); + when(streamingClientMock.executeStreamingIngest(any(String.class), any(String.class), any(InputStream.class), + isNull(), any(String.class), isNull(), any(boolean.class))).thenThrow(DataServiceException.class); String data = "Name, Age, Weight, Height"; InputStream inputStream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(data).array()); StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream); - StepVerifier.create(streamingIngestClient.ingestFromStreamAsync(streamSourceInfo, ingestionProperties)) - .expectErrorMatches(e -> e instanceof IngestionServiceException - && "DataServiceException".equals(e.getMessage())) - .verify(); + assertThrows(IngestionServiceException.class, + () -> streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties), + "Expected IllegalArgumentException to be thrown, but it didn't"); } @Test @@ -315,7 +298,7 @@ void ingestFromFile_Csv() throws Exception { FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length()); OperationStatus status = streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status; assertEquals(OperationStatus.Succeeded, status); - verify(streamingClientMock, atLeastOnce()).executeStreamingIngestAsync(any(String.class), any(String.class), any(InputStream.class), + verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), any(InputStream.class), isNull(), any(String.class), isNull(), any(boolean.class)); } @@ -329,7 +312,7 @@ void ingestFromFile_Json() throws Exception { ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.JSON); OperationStatus status = streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status; assertEquals(OperationStatus.Succeeded, status); - verify(streamingClientMock, atLeastOnce()).executeStreamingIngestAsync(any(String.class), any(String.class), argumentCaptor.capture(), + verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(), isNull(), any(String.class), any(String.class), any(boolean.class)); verifyCompressedStreamContent(argumentCaptor.getValue(), contents); @@ -343,7 +326,7 @@ void ingestFromFile_CompressedJson() throws Exception { ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.JSON); OperationStatus status = streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status; assertEquals(OperationStatus.Succeeded, status); - verify(streamingClientMock, atLeastOnce()).executeStreamingIngestAsync(any(String.class), any(String.class), argumentCaptor.capture(), + verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(), isNull(), any(String.class), any(String.class), any(boolean.class)); verifyCompressedStreamContent(argumentCaptor.getValue(), jsonDataUncompressed); @@ -405,15 +388,15 @@ void ingestFromFile_JsonNoMappingReference_IngestionSuccess() } @Test - void ingestFromFileAsync_JsonWrongMappingKind_IngestionClientException() { + void ingestFromFile_JsonWrongMappingKind_IngestionClientException() { String path = resourcesDirectory + "testdata.json"; FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length()); ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); ingestionProperties.setIngestionMapping("CsvMapping", IngestionMapping.IngestionMappingKind.CSV); - StepVerifier.create(streamingIngestClient.ingestFromFileAsync(fileSourceInfo, ingestionProperties)) - .expectErrorMatches(e -> e instanceof IngestionClientException - && e.getMessage().contains("Wrong ingestion mapping for format 'json'; mapping kind should be 'Json', but was 'Csv'.")) - .verify(); + IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class, + () -> streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties), + "Expected IngestionClientException to be thrown, but it didn't"); + assertTrue(ingestionClientException.getMessage().contains("Wrong ingestion mapping for format 'json'; mapping kind should be 'Json', but was 'Csv'.")); } @Test @@ -426,12 +409,13 @@ void ingestFromFile_JsonNoMappingKind_IngestionSuccess() throws IngestionClientE } @Test - void ingestFromFileAsync_EmptyFile_IngestionClientException() { + void ingestFromFile_EmptyFile_IngestionClientException() { String path = resourcesDirectory + "empty.csv"; FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length()); - StepVerifier.create(streamingIngestClient.ingestFromFileAsync(fileSourceInfo, ingestionProperties)) - .expectErrorMatches(e -> e instanceof IngestionClientException && e.getMessage().contains("Empty file.")) - .verify(); + IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class, + () -> streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties), + "Expected IngestionClientException to be thrown, but it didn't"); + assertTrue(ingestionClientException.getMessage().contains("Empty file.")); } @Test @@ -448,75 +432,76 @@ void ingestFromBlob() throws Exception { when(cloudBlockBlob.getProperties()).thenReturn(blobProperties); when(cloudBlockBlob.openInputStream()).thenReturn(blobInputStream); - OperationStatus status = streamingIngestClient.ingestFromBlobAsync(blobSourceInfo, ingestionProperties, cloudBlockBlob, null).block() - .getIngestionStatusCollection() + OperationStatus status = streamingIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties, cloudBlockBlob, null).getIngestionStatusCollection() .get(0).status; assertEquals(OperationStatus.Succeeded, status); - verify(streamingClientMock, atLeastOnce()).executeStreamingIngestFromBlobAsync(any(String.class), any(String.class), any(String.class), - isNull(), any(String.class), any()); + verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), any(InputStream.class), + isNull(), any(String.class), isNull(), any(boolean.class)); } @Test - void ingestFromBlobAsync_NullBlobSourceInfo_IllegalArgumentException() { - StepVerifier.create(streamingIngestClient.ingestFromBlobAsync(null, ingestionProperties)) - .expectError(IllegalArgumentException.class) - .verify(); + void ingestFromBlob_NullBlobSourceInfo_IllegalArgumentException() { + assertThrows(IllegalArgumentException.class, + () -> streamingIngestClient.ingestFromBlob(null, ingestionProperties), + "Expected IllegalArgumentException to be thrown, but it didn't"); } @Test - void ingestFromBlobAsync_BlobSourceInfoWithNullBlobPath_IllegalArgumentException() { + void ingestFromBlob_BlobSourceInfoWithNullBlobPath_IllegalArgumentException() { BlobSourceInfo blobSourceInfo1 = new BlobSourceInfo(null); - StepVerifier.create(streamingIngestClient.ingestFromBlobAsync(blobSourceInfo1, ingestionProperties)) - .expectError(IllegalArgumentException.class) - .verify(); + assertThrows(IllegalArgumentException.class, + () -> streamingIngestClient.ingestFromBlob(blobSourceInfo1, ingestionProperties), + "Expected IllegalArgumentException to be thrown, but it didn't"); } @Test - void ingestFromBlobAsync_BlobSourceInfoWithBlankBlobPath_IllegalArgumentException() { + void ingestFromBlob_BlobSourceInfoWithBlankBlobPath_IllegalArgumentException() { BlobSourceInfo blobSourceInfo2 = new BlobSourceInfo(""); - StepVerifier.create(streamingIngestClient.ingestFromBlobAsync(blobSourceInfo2, ingestionProperties)) - .expectError(IllegalArgumentException.class) - .verify(); + assertThrows(IllegalArgumentException.class, + () -> streamingIngestClient.ingestFromBlob(blobSourceInfo2, ingestionProperties), + "Expected IllegalArgumentException to be thrown, but it didn't"); } @Test - void ingestFromBlobAsync_NullIngestionProperties_IllegalArgumentException() { + void ingestFromBlob_NullIngestionProperties_IllegalArgumentException() { String path = "blobPath"; BlobSourceInfo blobSourceInfo = new BlobSourceInfo(path); - StepVerifier.create(streamingIngestClient.ingestFromBlobAsync(blobSourceInfo, null)) - .expectError(IllegalArgumentException.class) - .verify(); + assertThrows(IllegalArgumentException.class, + () -> streamingIngestClient.ingestFromBlob(blobSourceInfo, null), + "Expected IllegalArgumentException to be thrown, but it didn't"); } @ParameterizedTest @CsvSource(value = {"null,table", "'',table", "database,null", "database,''"}, nullValues = {"null"}) - void ingestFromBlobAsync_IngestionPropertiesWithIllegalDatabaseOrTableNames_IllegalArgumentException(String databaseName, String tableName) { + void ingestFromBlob_IngestionPropertiesWithIllegalDatabaseOrTableNames_IllegalArgumentException(String databaseName, String tableName) { String path = "blobPath"; BlobSourceInfo blobSourceInfo = new BlobSourceInfo(path); ingestionProperties = new IngestionProperties(databaseName, tableName); - StepVerifier.create(streamingIngestClient.ingestFromBlobAsync(blobSourceInfo, ingestionProperties)) - .expectError(IllegalArgumentException.class) - .verify(); + assertThrows(IllegalArgumentException.class, + () -> streamingIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties), + "Expected IllegalArgumentException to be thrown, but it didn't"); } @Test - void ingestFromBlobAsync_InvalidBlobPath_IngestionClientException() { + void ingestFromBlob_InvalidBlobPath_IngestionClientException() { String path = "wrongURI"; BlobSourceInfo blobSourceInfo1 = new BlobSourceInfo(path); - StepVerifier.create(streamingIngestClient.ingestFromBlobAsync(blobSourceInfo1, ingestionProperties)) - .expectErrorMatches(e -> e instanceof IngestionClientException - && e.getMessage().contains("Unexpected error when ingesting a blob - Invalid blob path.")) - .verify(); + IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class, + () -> streamingIngestClient.ingestFromBlob(blobSourceInfo1, ingestionProperties), + "Expected IngestionClientException to be thrown, but it didn't"); + + assertTrue(ingestionClientException.getMessage().contains("Unexpected error when ingesting a blob - Invalid blob path.")); } @Test - void ingestFromBlobAsync_BlobNotFound_IngestionClientException() { + void ingestFromBlob_BlobNotFound_IngestionClientException() { String path = "https://kustotest.blob.core.windows.net/container/blob.csv"; BlobSourceInfo blobSourceInfo2 = new BlobSourceInfo(path); - StepVerifier.create(streamingIngestClient.ingestFromBlobAsync(blobSourceInfo2, ingestionProperties)) - .expectErrorMatches(e -> e instanceof IngestionClientException - && e.getMessage().contains("Exception trying to read blob metadata")) - .verify(); + + IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class, + () -> streamingIngestClient.ingestFromBlob(blobSourceInfo2, ingestionProperties), + "Expected IngestionClientException to be thrown, but it didn't"); + assertTrue(ingestionClientException.getMessage().contains("Exception trying to read blob metadata")); } @Test @@ -529,10 +514,11 @@ void ingestFromBlob_EmptyBlob_IngestClientException() { when(blobProperties.getBlobSize()).thenReturn((long) 0); when(cloudBlockBlob.getProperties()).thenReturn(blobProperties); - StepVerifier.create(streamingIngestClient.ingestFromBlobAsync(blobSourceInfo, ingestionProperties, cloudBlockBlob, null)) - .expectErrorMatches(e -> e instanceof IngestionClientException - && e.getMessage().contains("Empty blob.")) - .verify(); + + IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class, + () -> streamingIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties, cloudBlockBlob, null), + "Expected IngestionClientException to be thrown, but it didn't"); + assertTrue(ingestionClientException.getMessage().contains("Empty blob.")); } @Test @@ -554,7 +540,7 @@ void ingestFromResultSet() throws Exception { OperationStatus status = streamingIngestClient.ingestFromResultSet(resultSetSourceInfo, ingestionProperties).getIngestionStatusCollection() .get(0).status; assertEquals(OperationStatus.Succeeded, status); - verify(streamingClientMock, atLeastOnce()).executeStreamingIngestAsync(any(String.class), any(String.class), argumentCaptor.capture(), + verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(), isNull(), any(String.class), isNull(), any(boolean.class)); InputStream stream = argumentCaptor.getValue(); @@ -562,30 +548,30 @@ void ingestFromResultSet() throws Exception { } @Test - void ingestFromResultSetAsync_NullResultSetSourceInfo_IllegalArgumentException() { - StepVerifier.create(streamingIngestClient.ingestFromResultSetAsync(null, ingestionProperties)) - .expectError(IllegalArgumentException.class) - .verify(); + void ingestFromResultSet_NullResultSetSourceInfo_IllegalArgumentException() { + assertThrows(IllegalArgumentException.class, + () -> streamingIngestClient.ingestFromResultSet(null, ingestionProperties), + "Expected IllegalArgumentException to be thrown, but it didn't"); } @Test - void ingestFromResultSetAsync_NullIngestionProperties_IllegalArgumentException() { + void ingestFromResultSet_NullIngestionProperties_IllegalArgumentException() { ResultSet resultSet = mock(ResultSet.class); ResultSetSourceInfo resultSetSourceInfo = new ResultSetSourceInfo(resultSet); - StepVerifier.create(streamingIngestClient.ingestFromResultSetAsync(resultSetSourceInfo, null)) - .expectError(IllegalArgumentException.class) - .verify(); + assertThrows(IllegalArgumentException.class, + () -> streamingIngestClient.ingestFromResultSet(resultSetSourceInfo, null), + "Expected IllegalArgumentException to be thrown, but it didn't"); } @ParameterizedTest @CsvSource(value = {"null,table", "'',table", "database,null", "database,''"}, nullValues = {"null"}) - void ingestFromResultSetAsync_IngestionPropertiesWithIllegalDatabaseOrTableNames_IllegalArgumentException(String databaseName, String tableName) { + void ingestFromResultSet_IngestionPropertiesWithIllegalDatabaseOrTableNames_IllegalArgumentException(String databaseName, String tableName) { ResultSet resultSet = mock(ResultSet.class); ResultSetSourceInfo resultSetSourceInfo = new ResultSetSourceInfo(resultSet); ingestionProperties = new IngestionProperties(databaseName, tableName); - StepVerifier.create(streamingIngestClient.ingestFromResultSetAsync(resultSetSourceInfo, ingestionProperties)) - .expectError(IllegalArgumentException.class) - .verify(); + assertThrows(IllegalArgumentException.class, + () -> streamingIngestClient.ingestFromResultSet(resultSetSourceInfo, ingestionProperties), + "Expected IllegalArgumentException to be thrown, but it didn't"); } @Test @@ -597,10 +583,10 @@ void ingestFromResultSet_EmptyResultSet_IngestionClientException() throws Except when(resultSetMetaData.getColumnCount()).thenReturn(0); ResultSetSourceInfo resultSetSourceInfo = new ResultSetSourceInfo(resultSet); - StepVerifier.create(streamingIngestClient.ingestFromResultSetAsync(resultSetSourceInfo, ingestionProperties)) - .expectErrorMatches(e -> e instanceof IngestionClientException - && e.getMessage().contains("Empty ResultSet.")) - .verify(); + IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class, + () -> streamingIngestClient.ingestFromResultSet(resultSetSourceInfo, ingestionProperties), + "Expected IngestionClientException to be thrown, but it didn't"); + assertTrue(ingestionClientException.getMessage().contains("Empty ResultSet.")); } private static Stream provideStringsForAutoCorrectEndpointTruePass() { diff --git a/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/Utils.java b/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/Utils.java index 10c0713ab..1df45d419 100644 --- a/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/Utils.java +++ b/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/Utils.java @@ -1,16 +1,16 @@ package com.microsoft.azure.kusto.quickstart; -import com.microsoft.azure.kusto.data.Client; -import com.microsoft.azure.kusto.data.ClientRequestProperties; -import com.microsoft.azure.kusto.data.KustoOperationResult; -import com.microsoft.azure.kusto.data.KustoResultColumn; -import com.microsoft.azure.kusto.data.KustoResultSetTable; +import com.microsoft.azure.kusto.data.*; import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; +import com.microsoft.azure.kusto.data.exceptions.DataClientException; +import com.microsoft.azure.kusto.data.exceptions.DataServiceException; import com.microsoft.azure.kusto.ingest.IngestClient; import com.microsoft.azure.kusto.ingest.IngestionProperties; +import com.microsoft.azure.kusto.ingest.utils.SecurityUtils; +import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException; +import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException; import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo; import com.microsoft.azure.kusto.ingest.source.FileSourceInfo; -import com.microsoft.azure.kusto.ingest.utils.SecurityUtils; import org.apache.commons.lang3.StringUtils; import org.jetbrains.annotations.NotNull; @@ -174,6 +174,10 @@ protected static void executeCommand(Client kustoClient, String databaseName, St System.out.println(); } + } catch (DataServiceException e) { + errorHandler(String.format("Server error while trying to execute query '%s' on database '%s'%n%n", command, databaseName), e); + } catch (DataClientException e) { + errorHandler(String.format("Client error while trying to execute query '%s' on database '%s'%n%n", command, databaseName), e); } catch (Exception e) { errorHandler(String.format("Unexpected error while trying to execute query '%s' on database '%s'%n%n", command, databaseName), e); } @@ -235,7 +239,15 @@ protected static void ingestFromFile(IngestClient ingestClient, String databaseN // Tip 2: To correlate between ingestion operations in your applications and Kusto, set the source ID and log it somewhere. FileSourceInfo fileSourceInfo = new FileSourceInfo(String.format("quickstart/%s", filePath), 0, UUID.randomUUID()); - ingestClient.ingestFromFile(fileSourceInfo, ingestionProperties); + try { + ingestClient.ingestFromFile(fileSourceInfo, ingestionProperties); + } catch (IngestionClientException e) { + System.out.printf("Client exception while trying to ingest '%s' into '%s.%s'%n%n", filePath, databaseName, tableName); + e.printStackTrace(); + } catch (IngestionServiceException e) { + System.out.printf("Service exception while trying to ingest '%s' into '%s.%s'%n%n", filePath, databaseName, tableName); + e.printStackTrace(); + } } /** @@ -258,7 +270,15 @@ protected static void ingestFromBlob(IngestClient ingestClient, String databaseN // Tip 2: To correlate between ingestion operations in your applications and Kusto, set the source ID and log it somewhere. BlobSourceInfo blobSourceInfo = new BlobSourceInfo(String.format("quickstart/%s", blobUrl), 0, UUID.randomUUID()); - ingestClient.ingestFromBlob(blobSourceInfo, ingestionProperties); + try { + ingestClient.ingestFromBlob(blobSourceInfo, ingestionProperties); + } catch (IngestionClientException e) { + System.out.printf("Client exception while trying to ingest '%s' into '%s.%s'%n%n", blobUrl, databaseName, tableName); + e.printStackTrace(); + } catch (IngestionServiceException e) { + System.out.printf("Service exception while trying to ingest '%s' into '%s.%s'%n%n", blobUrl, databaseName, tableName); + e.printStackTrace(); + } } /** diff --git a/samples/src/main/java/FileIngestionCompletableFuture.java b/samples/src/main/java/FileIngestionCompletableFuture.java index 33cca8e26..8e20ca491 100644 --- a/samples/src/main/java/FileIngestionCompletableFuture.java +++ b/samples/src/main/java/FileIngestionCompletableFuture.java @@ -6,10 +6,13 @@ import com.microsoft.azure.kusto.ingest.IngestClientFactory; import com.microsoft.azure.kusto.ingest.IngestionMapping; import com.microsoft.azure.kusto.ingest.IngestionProperties; +import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException; +import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException; import com.microsoft.azure.kusto.ingest.result.IngestionResult; import com.microsoft.azure.kusto.ingest.source.FileSourceInfo; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; /** * This class includes a sample of how to use the ingestFromFile() method within a CompletableFuture @@ -73,11 +76,17 @@ public static void main(String[] args) { private static CompletableFuture ingestFromFileAsync( IngestClient client, FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) { return CompletableFuture.supplyAsync( - () -> client.ingestFromFile(fileSourceInfo, ingestionProperties)); + () -> { + try { + return client.ingestFromFile(fileSourceInfo, ingestionProperties); + } catch (IngestionClientException | IngestionServiceException e) { + throw new CompletionException(e); + } + }); } /** - * In this example we're just printing a message to the standard output, but the user can decide what to do here. + * In this example we just printing a message to the standard output, but the user can decide what to do here. */ private static void doSomethingWithIngestionResult(IngestionResult ingestionResult) { if (ingestionResult != null) {