Skip to content

Commit

Permalink
queued async ingest from blob
Browse files Browse the repository at this point in the history
  • Loading branch information
georgebanasios committed Dec 2, 2024
1 parent 156cc39 commit 9dd4624
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.microsoft.azure.kusto.data.ExponentialRetry;
import com.microsoft.azure.kusto.data.UriUtils;
import com.microsoft.azure.kusto.data.Utils;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.data.exceptions.ExceptionUtils;
import com.microsoft.azure.kusto.data.http.HttpClientFactory;
Expand Down Expand Up @@ -63,8 +62,7 @@ public class CloudInfo implements TraceableAttributes, Serializable {
private final String kustoServiceResourceId;
private final String firstPartyAuthorityUrl;
private static final int ATTEMPT_COUNT = 3;
private static final ExponentialRetry<DataClientException, DataServiceException> exponentialRetryTemplate = new ExponentialRetry<>(
ATTEMPT_COUNT);
private static final ExponentialRetry exponentialRetryTemplate = new ExponentialRetry(ATTEMPT_COUNT);

public CloudInfo(boolean loginMfaRequired, String loginEndpoint, String kustoClientAppId, String kustoClientRedirectUri, String kustoServiceResourceId,
String firstPartyAuthorityUrl) {
Expand Down Expand Up @@ -99,7 +97,7 @@ public static Mono<CloudInfo> retrieveCloudInfoForClusterAsync(String clusterUrl

return fetchCloudInfoAsync(clusterUrl, givenHttpClient)
.doOnNext(cloudInfo -> cache.put(clusterUrl, cloudInfo))
.retryWhen(new ExponentialRetry<>(exponentialRetryTemplate).retry())
.retryWhen(new ExponentialRetry(exponentialRetryTemplate).retry())
.onErrorMap(e -> ExceptionUtils.unwrapCloudInfoException(clusterUrl, e));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected abstract IngestionResult ingestFromFileImpl(FileSourceInfo fileSourceI

public IngestionResult ingestFromFile(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException {
return ingestFromFileAsync(fileSourceInfo, ingestionProperties).block();
return ingestFromFileImpl(fileSourceInfo, ingestionProperties);
}

protected abstract Mono<IngestionResult> ingestFromFileAsyncImpl(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class ManagedStreamingIngestClient extends IngestClientBase implements Qu
private final ExponentialRetry exponentialRetryTemplate;
private HttpClient httpClient = null;
private ManagedStreamingQueuingPolicy queuingPolicy = ManagedStreamingQueuingPolicy.Default;
private static final String fallbackLogString = "Data size is greater than max streaming size according to the policy. Falling back to queued.";
private static final String FALLBACK_LOG_STRING = "Data size is greater than max streaming size according to the policy. Falling back to queued.";

/**
* @param dmConnectionString dm connection string
Expand Down Expand Up @@ -295,7 +295,6 @@ protected IngestionResult ingestFromFileImpl(FileSourceInfo fileSourceInfo, Inge

@Override
protected Mono<IngestionResult> ingestFromFileAsyncImpl(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) {

return Mono.fromCallable(() -> {
Ensure.argIsNotNull(fileSourceInfo, "fileSourceInfo");
Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
Expand Down Expand Up @@ -344,7 +343,7 @@ protected IngestionResult ingestFromBlobImpl(BlobSourceInfo blobSourceInfo, Inge

if (queuingPolicy.shouldUseQueuedIngestion(blobSize, blobSourceInfo.getRawSizeInBytes(),
blobSourceInfo.getCompressionType() != null, ingestionProperties.getDataFormat())) {
log.info(fallbackLogString);
log.info(FALLBACK_LOG_STRING);
return queuedIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
}

Expand All @@ -355,9 +354,49 @@ protected IngestionResult ingestFromBlobImpl(BlobSourceInfo blobSourceInfo, Inge
return queuedIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
}

private IngestionResult streamWithRetries(SourceInfo sourceInfo, IngestionProperties ingestionProperties, @Nullable BlobClient blobClient)
throws IngestionClientException, IngestionServiceException {
ExponentialRetry<IngestionClientException, IngestionServiceException> retry = new ExponentialRetry<>(
exponentialRetryTemplate);
return retry.execute(currentAttempt -> {
try {
if (blobClient != null) {
String clientRequestId = String.format("KJC.executeManagedStreamingIngest.ingestFromBlob;%s;%d", sourceInfo.getSourceId(), currentAttempt);
return streamingIngestClient.ingestFromBlob((BlobSourceInfo) sourceInfo, ingestionProperties, blobClient, clientRequestId);
} else {
String clientRequestId = String.format("KJC.executeManagedStreamingIngest.ingestFromStream;%s;%d", sourceInfo.getSourceId(),
currentAttempt);
return streamingIngestClient.ingestFromStream((StreamSourceInfo) sourceInfo, ingestionProperties, clientRequestId);
}
} catch (Exception e) {
if (e instanceof IngestionServiceException
&& e.getCause() != null
&& e.getCause() instanceof DataServiceException
&& e.getCause().getCause() != null
&& e.getCause().getCause() instanceof DataWebException) {
DataWebException webException = (DataWebException) e.getCause().getCause();
OneApiError oneApiError = webException.getApiError();
if (oneApiError.isPermanent()) {
throw e;
}
}
log.info(String.format("Streaming ingestion failed attempt %d", currentAttempt), e);

if (sourceInfo instanceof StreamSourceInfo) {
try {
((StreamSourceInfo) sourceInfo).getStream().reset();
} catch (IOException ioException) {
throw new IngestionClientException("Failed to reset stream", ioException);
}
}

}
return null;
});
}

@Override
protected Mono<IngestionResult> ingestFromBlobAsyncImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) {

return Mono.fromCallable(() -> {
Ensure.argIsNotNull(blobSourceInfo, "blobSourceInfo");
Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
Expand Down Expand Up @@ -393,15 +432,14 @@ private Mono<IngestionResult> handleIngestion(BlobSourceInfo blobSourceInfo,
IngestionProperties ingestionProperties,
BlobClient blobClient,
long blobSize) {

if (queuingPolicy.shouldUseQueuedIngestion(blobSize, blobSourceInfo.getRawSizeInBytes(),
blobSourceInfo.getCompressionType() != null, ingestionProperties.getDataFormat())) {
log.info(fallbackLogString);
log.info(FALLBACK_LOG_STRING);
return queuedIngestClient.ingestFromBlobAsync(blobSourceInfo, ingestionProperties);
}

return executeStream(blobSourceInfo, ingestionProperties, blobClient)
.retryWhen(new ExponentialRetry<>(exponentialRetryTemplate).retry())
.retryWhen(new ExponentialRetry(exponentialRetryTemplate).retry())
.onErrorResume(ignored -> queuedIngestClient.ingestFromBlobAsync(blobSourceInfo, ingestionProperties)); // Fall back to queued ingestion
}

Expand Down Expand Up @@ -442,47 +480,6 @@ private Mono<IngestionResult> handleStreamingError(SourceInfo sourceInfo, Throwa
return Mono.empty();
}

private IngestionResult streamWithRetries(SourceInfo sourceInfo, IngestionProperties ingestionProperties, @Nullable BlobClient blobClient)
throws IngestionClientException, IngestionServiceException {
ExponentialRetry<IngestionClientException, IngestionServiceException> retry = new ExponentialRetry<>(
exponentialRetryTemplate);
return retry.execute(currentAttempt -> {
try {
if (blobClient != null) {
String clientRequestId = String.format("KJC.executeManagedStreamingIngest.ingestFromBlob;%s;%d", sourceInfo.getSourceId(), currentAttempt);
return streamingIngestClient.ingestFromBlob((BlobSourceInfo) sourceInfo, ingestionProperties, blobClient, clientRequestId);
} else {
String clientRequestId = String.format("KJC.executeManagedStreamingIngest.ingestFromStream;%s;%d", sourceInfo.getSourceId(),
currentAttempt);
return streamingIngestClient.ingestFromStream((StreamSourceInfo) sourceInfo, ingestionProperties, clientRequestId);
}
} catch (Exception e) {
if (e instanceof IngestionServiceException
&& e.getCause() != null
&& e.getCause() instanceof DataServiceException
&& e.getCause().getCause() != null
&& e.getCause().getCause() instanceof DataWebException) {
DataWebException webException = (DataWebException) e.getCause().getCause();
OneApiError oneApiError = webException.getApiError();
if (oneApiError.isPermanent()) {
throw e;
}
}
log.info(String.format("Streaming ingestion failed attempt %d", currentAttempt), e);

if (sourceInfo instanceof StreamSourceInfo) {
try {
((StreamSourceInfo) sourceInfo).getStream().reset();
} catch (IOException ioException) {
throw new IngestionClientException("Failed to reset stream", ioException);
}
}

}
return null;
});
}

@Override
protected IngestionResult ingestFromResultSetImpl(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException {
Expand All @@ -503,7 +500,6 @@ protected IngestionResult ingestFromResultSetImpl(ResultSetSourceInfo resultSetS

@Override
protected Mono<IngestionResult> ingestFromResultSetAsyncImpl(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) {

return Mono.fromCallable(() -> {
Ensure.argIsNotNull(resultSetSourceInfo, "resultSetSourceInfo");
Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
Expand Down Expand Up @@ -539,7 +535,7 @@ protected IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo

if (queuingPolicy.shouldUseQueuedIngestion(streamSourceInfo.getStream().available(), streamSourceInfo.getRawSizeInBytes(),
streamSourceInfo.getCompressionType() != null, ingestionProperties.getDataFormat())) {
log.info(fallbackLogString);
log.info(FALLBACK_LOG_STRING);
return queuedIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties);
}

Expand All @@ -555,7 +551,7 @@ protected IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo
int size = streamingBytes.length;
if (queuingPolicy.shouldUseQueuedIngestion(size, streamSourceInfo.getRawSizeInBytes(),
streamSourceInfo.getCompressionType() != null, ingestionProperties.getDataFormat())) {
log.info(fallbackLogString);
log.info(FALLBACK_LOG_STRING);
StreamSourceInfo managedSourceInfo = new StreamSourceInfo(new SequenceInputStream(byteArrayStream, streamSourceInfo.getStream()),
streamSourceInfo.isLeaveOpen(), sourceId, streamSourceInfo.getCompressionType());

Expand Down Expand Up @@ -615,7 +611,7 @@ protected Mono<IngestionResult> ingestFromStreamAsyncImpl(StreamSourceInfo strea
ingestionProperties.getDataFormat());
}).subscribeOn(Schedulers.boundedElastic()) //TODO: same
.flatMap(useQueued -> {
if (useQueued) {
if (Boolean.TRUE.equals(useQueued)) {
return queuedIngestClient.ingestFromStreamAsync(streamSourceInfo, ingestionProperties);
}
return Mono.empty();
Expand All @@ -628,20 +624,14 @@ private Mono<IngestionResult> processStream(StreamSourceInfo streamSourceInfo, I
return Mono.fromCallable(() -> streamSourceInfo.getStream() instanceof ByteArrayInputStream
|| streamSourceInfo.getStream() instanceof ResettableFileInputStream)
.flatMap(isKnownStreamType -> {
if (isKnownStreamType) {
if (Boolean.TRUE.equals(isKnownStreamType)) {
StreamSourceInfo managedSourceInfo = new StreamSourceInfo(streamSourceInfo.getStream(),
true, streamSourceInfo.getSourceId(), streamSourceInfo.getCompressionType(),
streamSourceInfo.getRawSizeInBytes());
return executeStream(managedSourceInfo, ingestionProperties, null)
.retryWhen(new ExponentialRetry<>(exponentialRetryTemplate).retry()) //TODO: check after all retries have failed to fallback to queued
.retryWhen(new ExponentialRetry(exponentialRetryTemplate).retry()) //TODO: check after all retries have failed to fallback to queued
.onErrorResume(ignored -> queuedIngestClient.ingestFromStreamAsync(managedSourceInfo, ingestionProperties))
.doFinally(signal -> {
try {
managedSourceInfo.getStream().close();
} catch (IOException e) {
log.warn("Failed to close byte stream", e);
}
});
.doFinally(signal -> closeStreamSafely(managedSourceInfo));
} else {
return Mono.fromCallable(() -> IngestionUtils.readBytesFromInputStream(streamSourceInfo.getStream(),
ManagedStreamingQueuingPolicy.MAX_STREAMING_STREAM_SIZE_BYTES + 1))
Expand All @@ -656,14 +646,15 @@ private Mono<IngestionResult> processStream(StreamSourceInfo streamSourceInfo, I
streamSourceInfo.getCompressionType() != null,
ingestionProperties.getDataFormat());
if (shouldUseQueuedIngestion) {
log.info(fallbackLogString);
log.info(FALLBACK_LOG_STRING);
StreamSourceInfo managedSourceInfo = new StreamSourceInfo(new SequenceInputStream(byteArrayStream, streamSourceInfo.getStream()),
streamSourceInfo.isLeaveOpen(), streamSourceInfo.getSourceId(), streamSourceInfo.getCompressionType());

return queuedIngestClient.ingestFromStreamAsync(managedSourceInfo, ingestionProperties);
}

if (!streamSourceInfo.isLeaveOpen()) {

// From this point we don't need the original stream anymore, we cached it
try {
streamSourceInfo.getStream().close();
Expand All @@ -676,20 +667,22 @@ private Mono<IngestionResult> processStream(StreamSourceInfo streamSourceInfo, I
true, streamSourceInfo.getSourceId(), streamSourceInfo.getCompressionType(),
streamSourceInfo.getRawSizeInBytes());
return executeStream(managedSourceInfo, ingestionProperties, null)
.retryWhen(new ExponentialRetry<>(exponentialRetryTemplate).retry())
.retryWhen(new ExponentialRetry(exponentialRetryTemplate).retry())
.onErrorResume(ignored -> queuedIngestClient.ingestFromStreamAsync(managedSourceInfo, ingestionProperties))
.doFinally(signal -> {
try {
managedSourceInfo.getStream().close();
} catch (IOException e) {
log.warn("Failed to close byte stream", e);
}
});
.doFinally(signal -> closeStreamSafely(managedSourceInfo));
});
}
});
}

private void closeStreamSafely(StreamSourceInfo streamSourceInfo) {
try {
streamSourceInfo.getStream().close();
} catch (IOException e) {
log.warn("Failed to close byte stream", e);
}
}

/*
* Set the policy that handles the logic over which data size would the client choose to directly use queued ingestion instead of trying streaming ingestion
* first.
Expand Down
Loading

0 comments on commit 9dd4624

Please sign in to comment.