Skip to content

Commit

Permalink
fixes on resource actions
Browse files Browse the repository at this point in the history
  • Loading branch information
georgebanasios committed Dec 3, 2024
1 parent 9dd4624 commit d9fabc9
Showing 1 changed file with 29 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import com.microsoft.azure.kusto.data.instrumentation.FunctionOneException;
import com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.resources.QueueWithSas;
import com.microsoft.azure.kusto.ingest.resources.RankedStorageAccount;
import com.microsoft.azure.kusto.ingest.resources.ResourceWithSas;
import com.microsoft.azure.kusto.ingest.utils.SecurityUtils;
Expand All @@ -18,6 +18,7 @@
import java.io.File;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -95,60 +96,52 @@ private static <TInner, TWrapper extends ResourceWithSas<TInner>, TOut> Mono<TOu
}

public static Mono<Void> postToQueueWithRetriesAsync(ResourceManager resourceManager, AzureStorageClient azureStorageClient, IngestionBlobInfo blob) {
return Mono.defer(() -> {
try {
ObjectMapper objectMapper = Utils.getObjectMapper();
String message = objectMapper.writeValueAsString(blob);
return resourceActionWithRetriesAsync(resourceManager,
resourceManager.getShuffledQueues(),
queue -> Mono.fromCallable(() -> {
azureStorageClient.postMessageToQueue(queue.getQueue(), message);
return null;
}),
"ResourceAlgorithms.postToQueueWithRetriesAsync",
Collections.singletonMap("blob", SecurityUtils.removeSecretsFromUrl(blob.getBlobPath()))
);
} catch (Exception e) {
return Mono.error(e);
}
});
return Mono.fromCallable(() -> {
ObjectMapper objectMapper = Utils.getObjectMapper();
String message = objectMapper.writeValueAsString(blob);
List<QueueWithSas> shuffledQueues = resourceManager.getShuffledQueues();
return new AbstractMap.SimpleImmutableEntry<>(message, shuffledQueues);
})
.flatMap(entry -> {
String message = entry.getKey();
List<QueueWithSas> shuffledQueues = entry.getValue();
return resourceActionWithRetriesAsync(resourceManager,
shuffledQueues,
queue -> Mono.fromCallable(() -> {
azureStorageClient.postMessageToQueue(queue.getQueue(), message); //TODO: offload all sync calls to bounded elastic?
return null;
}),
"ResourceAlgorithms.postToQueueWithRetriesAsync",
Collections.singletonMap("blob", SecurityUtils.removeSecretsFromUrl(blob.getBlobPath()))
);
});
}

public static Mono<String> uploadStreamToBlobWithRetriesAsync(ResourceManager resourceManager, AzureStorageClient azureStorageClient, InputStream stream,
String blobName, boolean shouldCompress) {
return Mono.defer(() -> {
try {
return resourceActionWithRetriesAsync(resourceManager,
resourceManager.getShuffledContainers(),
return Mono.fromCallable(resourceManager::getShuffledContainers)
.flatMap(shuffledContainers -> resourceActionWithRetriesAsync(resourceManager,
shuffledContainers,
container -> Mono.fromCallable(() -> {
azureStorageClient.uploadStreamToBlob(stream, blobName, container.getContainer(), shouldCompress);
return (container.getContainer().getBlobContainerUrl() + "/" + blobName + container.getSas());
}),
"ResourceAlgorithms.uploadStreamToBlobWithRetriesAsync",
Collections.emptyMap()
);
} catch (Exception e) {
return Mono.error(e);
}
});
));
}

public static Mono<String> uploadLocalFileWithRetriesAsync(ResourceManager resourceManager, AzureStorageClient azureStorageClient, File file, String blobName,
boolean shouldCompress) {
return Mono.defer(() -> {
try {
return resourceActionWithRetriesAsync(resourceManager,
resourceManager.getShuffledContainers(),
return Mono.fromCallable(resourceManager::getShuffledContainers)
.flatMap(shuffledContainers -> resourceActionWithRetriesAsync(resourceManager,
shuffledContainers,
container -> Mono.fromCallable(() -> {
azureStorageClient.uploadLocalFileToBlob(file, blobName, container.getContainer(), shouldCompress);
return container.getContainer().getBlobContainerUrl() + "/" + blobName + container.getSas();
}), "ResourceAlgorithms.uploadLocalFileWithRetriesAsync",
Collections.emptyMap()
);
} catch (IngestionServiceException e) {
return Mono.error(e);
}
});
));
}

@NotNull
Expand Down

0 comments on commit d9fabc9

Please sign in to comment.