Skip to content

Commit

Permalink
Add async interface methods (#38)
Browse files Browse the repository at this point in the history
* Add Async interface methods
* Async interface methods throw CompletionExceptions
* Add sample of Async-file-ingestion
  • Loading branch information
rabee333 authored Nov 28, 2018
1 parent 0e01487 commit 2741219
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.microsoft.azure.storage.table.CloudTable;
import com.microsoft.azure.storage.table.TableOperation;
import com.microsoft.azure.storage.table.TableServiceEntity;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,29 +30,43 @@ class AzureStorageHelper {
private static final int GZIP_BUFFER_SIZE = 16384;
private static final int STREAM_BUFFER_SIZE = 16384;

public void postMessageToQueue(String queuePath, String content) throws StorageException, URISyntaxException {
void postMessageToQueue(String queuePath, String content) throws StorageException, URISyntaxException {
CloudQueue queue = new CloudQueue(new URI(queuePath));
CloudQueueMessage queueMessage = new CloudQueueMessage(content);
queue.addMessage(queueMessage);
}

public void azureTableInsertEntity(String tableUri, TableServiceEntity entity) throws StorageException, URISyntaxException {
void azureTableInsertEntity(String tableUri, TableServiceEntity entity) throws StorageException, URISyntaxException {
CloudTable table = new CloudTable(new URI(tableUri));
// Create an operation to add the new customer to the table basics table.
TableOperation insert = TableOperation.insert(entity);
// Submit the operation to the table service.
table.execute(insert);
}

public CloudBlockBlob uploadLocalFileToBlob(String filePath, String blobName, String storageUri) throws URISyntaxException, StorageException, IOException {
CloudBlockBlob uploadLocalFileToBlob(String filePath, String blobName, String storageUri) throws URISyntaxException, StorageException, IOException {
log.debug("uploadLocalFileToBlob: filePath: {}, blobName: {}, storageUri: {}", filePath, blobName, storageUri);

// Validation
if(StringUtils.isEmpty(filePath)){
throw new IllegalArgumentException("filePath is empty");
}
if(StringUtils.isEmpty(blobName)){
throw new IllegalArgumentException("blobName is empty");
}
if(StringUtils.isEmpty(storageUri)){
throw new IllegalArgumentException("storageUri is empty");
}

File sourceFile = new File(filePath);
if(!sourceFile.exists()){
throw new IllegalArgumentException("The file does not exist: " + filePath);
}

// Check if the file is already compressed:
boolean isCompressed = filePath.endsWith(".gz") || filePath.endsWith(".zip");

CloudBlobContainer container = new CloudBlobContainer(new URI(storageUri));
File sourceFile = new File(filePath);

CloudBlockBlob blob = container.getBlockBlobReference(blobName + (isCompressed?"":".gz"));

if(!isCompressed){
Expand All @@ -74,7 +89,7 @@ private void compressAndUploadFile(String filePath, CloudBlockBlob blob) throws
fin.close();
}

public CloudBlockBlob uploadStreamToBlob(InputStream inputStream, String blobName, String storageUri, boolean compress) throws IOException, URISyntaxException, StorageException {
CloudBlockBlob uploadStreamToBlob(InputStream inputStream, String blobName, String storageUri, boolean compress) throws IOException, URISyntaxException, StorageException {
log.debug("uploadLocalFileToBlob: blobName: {}, storageUri: {}", blobName, storageUri);
CloudBlobContainer container = new CloudBlobContainer(new URI(storageUri));
CloudBlockBlob blob = container.getBlockBlobReference(blobName+ (compress?".gz":""));
Expand All @@ -98,7 +113,7 @@ private void streamFile(InputStream inputStream, OutputStream outputStream, int
}
}

public String getBlobPathWithSas(CloudBlockBlob blob) {
String getBlobPathWithSas(CloudBlockBlob blob) {
StorageCredentialsSharedAccessSignature signature = (StorageCredentialsSharedAccessSignature)blob.getServiceClient().getCredentials();
return blob.getStorageUri().getPrimaryUri().toString() + "?" + signature.getToken();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;

import java.util.concurrent.CompletableFuture;

public interface IngestClient {

IngestionResult ingestFromFile (FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException;
Expand All @@ -18,4 +20,10 @@ public interface IngestClient {

IngestionResult ingestFromStream (StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException;

CompletableFuture<IngestionResult> ingestFromFileAsync (FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties);

CompletableFuture<IngestionResult> ingestFromStreamAsync (StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties);

CompletableFuture<IngestionResult> ingestFromBlobAsync (BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties);

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

class IngestClientImpl implements IngestClient {

Expand All @@ -45,10 +47,37 @@ class IngestClientImpl implements IngestClient {
@Override
public IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException {
return ingestFromBlobImpl(blobSourceInfo, ingestionProperties);
}

@Override
public CompletableFuture<IngestionResult> ingestFromBlobAsync(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) {
return CompletableFuture.supplyAsync(
() -> {
try {
return ingestFromBlobImpl(blobSourceInfo, ingestionProperties);
} catch (IngestionClientException | IngestionServiceException e) {
log.error("Error when ingestFromBlobAsync()", e);
// Here we throw a CompletionException which extends the RuntimeException.
// the real exception itself would be in the <cause> of this CompletionException
throw new CompletionException(e);
/* We might decide what to return in case of error.
One suggestion is to return an object of ResultWrapper<IngestionResult, Exception>
instead of throwing an exception.
*/
}
});
}

private IngestionResult ingestFromBlobImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException {

// Argument validation:
if (blobSourceInfo == null || ingestionProperties == null){
throw new IllegalArgumentException("blobSourceInfo or ingestionProperties is null");
if (blobSourceInfo == null){
throw new IllegalArgumentException("blobSourceInfo is null");
}
if (ingestionProperties == null){
throw new IllegalArgumentException("ingestionProperties is null");
}
blobSourceInfo.validate();
ingestionProperties.validate();
Expand Down Expand Up @@ -95,6 +124,7 @@ public IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionPr
azureStorageHelper.postMessageToQueue(
resourceManager.getIngestionResource(ResourceManager.ResourceType.SECURED_READY_FOR_AGGREGATION_QUEUE)
, serializedIngestionBlobInfo);

return new TableReportIngestionResult(tableStatuses);

} catch (StorageException e) {
Expand All @@ -105,10 +135,31 @@ public IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionPr
}

@Override
public IngestionResult ingestFromFile(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
public IngestionResult ingestFromFile(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException {
return ingestFromFileImpl(fileSourceInfo, ingestionProperties);
}

@Override
public CompletableFuture<IngestionResult> ingestFromFileAsync(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) {
return CompletableFuture.supplyAsync(
() -> {
try {
return ingestFromFileImpl(fileSourceInfo, ingestionProperties);
} catch (IngestionClientException | IngestionServiceException e) {
log.error("Error in ingestFromFileAsync()", e);
throw new CompletionException(e);
}
});
}

private IngestionResult ingestFromFileImpl(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
// Argument validation:
if (fileSourceInfo == null || ingestionProperties == null){
throw new IllegalArgumentException("fileSourceInfo or ingestionProperties is null");
if (fileSourceInfo == null){
throw new IllegalArgumentException("fileSourceInfo is null");
}
if (ingestionProperties == null){
throw new IllegalArgumentException("ingestionProperties is null");
}
fileSourceInfo.validate();
ingestionProperties.validate();
Expand All @@ -134,10 +185,31 @@ public IngestionResult ingestFromFile(FileSourceInfo fileSourceInfo, IngestionPr

@Override
public IngestionResult ingestFromStream(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
return ingestFromStreamImpl(streamSourceInfo, ingestionProperties);
}

@Override
public CompletableFuture<IngestionResult> ingestFromStreamAsync(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) {
return CompletableFuture.supplyAsync(
() -> {
try {
return ingestFromStreamImpl(streamSourceInfo, ingestionProperties);
} catch (IngestionClientException | IngestionServiceException e) {
log.error("Error when ingestFromStreamAsync()", e);
throw new CompletionException(e);
}
});
}

private IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
// Argument validation:
if (streamSourceInfo == null || ingestionProperties == null){
throw new IllegalArgumentException("streamSourceInfo or ingestionProperties is null");
if (streamSourceInfo == null){
throw new IllegalArgumentException("streamSourceInfo is null");
}
if (ingestionProperties == null){
throw new IllegalArgumentException("ingestionProperties is null");
}

streamSourceInfo.validate();
ingestionProperties.validate();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@
class IngestClientImplTest {

private ResourceManager resourceManagerMock = mock(ResourceManager.class);
private IngestClientImpl ingestClientImplMock;
private AzureStorageHelper azureStorageHelperMock;
private IngestClientImpl ingestClientImplMock = mock(IngestClientImpl.class);;
private AzureStorageHelper azureStorageHelperMock = mock(AzureStorageHelper.class);
private IngestionProperties props;

@BeforeEach
void setUp() {
try {
ingestClientImplMock = mock(IngestClientImpl.class);
azureStorageHelperMock = mock(AzureStorageHelper.class);

when(resourceManagerMock.getIngestionResource(ResourceManager.ResourceType.SECURED_READY_FOR_AGGREGATION_QUEUE))
.thenReturn("queue1")
.thenReturn("queue2");
Expand All @@ -43,6 +40,8 @@ void setUp() {
when(resourceManagerMock.getIdentityToken())
.thenReturn("identityToken");

doNothing().when(azureStorageHelperMock).postMessageToQueue(isA(String.class),isA(String.class));

props = new IngestionProperties("dbName", "tableName");
props.setJsonMappingName("mappingName");

Expand All @@ -59,16 +58,10 @@ void tearDown() {
@Test
void ingestFromBlob() {
try {
doReturn(null).when(ingestClientImplMock).ingestFromBlob(isA(BlobSourceInfo.class), isA(IngestionProperties.class));

String blobPath = "blobPath";
long size = 100;

BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath, size);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo("blobPath", 100);

ingestClientImplMock.ingestFromBlob(blobSourceInfo, props);

verify(ingestClientImplMock).ingestFromBlob(blobSourceInfo, props);
verify(ingestClientImplMock).ingestFromBlob(any(BlobSourceInfo.class), any(IngestionProperties.class));

} catch (Exception e) {
e.printStackTrace();
Expand All @@ -82,8 +75,6 @@ void ingestFromFile() {
when(azureStorageHelperMock.uploadLocalFileToBlob(isA(String.class), isA(String.class), isA(String.class)))
.thenReturn(new CloudBlockBlob(new URI("https://ms.com/storageUri")));

doNothing().when(azureStorageHelperMock).postMessageToQueue(isA(String.class), isA(String.class));

FileSourceInfo fileSourceInfo = new FileSourceInfo(testFilePath, 0);
int numOfFiles = 3;
for (int i = 0; i < numOfFiles; i++) {
Expand All @@ -103,7 +94,6 @@ void ingestFromStream() {
String testFilePath = Paths.get("src", "test", "resources", "testdata.json").toString();
when(azureStorageHelperMock.uploadStreamToBlob(isA(InputStream.class), isA(String.class), isA(String.class), isA(Boolean.class)))
.thenReturn(new CloudBlockBlob(new URI("https://ms.com/storageUri")));
doNothing().when(azureStorageHelperMock).postMessageToQueue(isA(String.class), isA(String.class));
int numOfFiles = 3;
for (int i = 0; i < numOfFiles; i++) {
InputStream stream = new FileInputStream(testFilePath);
Expand All @@ -115,4 +105,53 @@ void ingestFromStream() {
e.printStackTrace();
}
}

@Test
void ingestFromBlobAsync() {
try{
BlobSourceInfo blobSourceInfo = new BlobSourceInfo("blobPath", 100);
ingestClientImplMock.ingestFromBlobAsync(blobSourceInfo, props);
verify(ingestClientImplMock).ingestFromBlobAsync(any(BlobSourceInfo.class), any(IngestionProperties.class));

} catch (Exception e){
e.printStackTrace();
}
}

@Test
void ingestFromFileAsync() {
try {
String testFilePath = Paths.get("src", "test", "resources", "testdata.json").toString();
when(azureStorageHelperMock.uploadLocalFileToBlob(isA(String.class), isA(String.class), isA(String.class)))
.thenReturn(new CloudBlockBlob(new URI("https://ms.com/storageUri")));

FileSourceInfo fileSourceInfo = new FileSourceInfo(testFilePath, 0);
int numOfFiles = 3;
for (int i = 0; i < numOfFiles; i++) {
ingestClientImplMock.ingestFromFileAsync(fileSourceInfo, props);
}
verify(ingestClientImplMock, times(numOfFiles)).ingestFromFileAsync(any(FileSourceInfo.class), any(IngestionProperties.class));

} catch (Exception e){
e.printStackTrace();
}
}

@Test
void ingestFromStreamAsync() {
try {
String testFilePath = Paths.get("src", "test", "resources", "testdata.json").toString();
when(azureStorageHelperMock.uploadStreamToBlob(isA(InputStream.class), isA(String.class), isA(String.class), isA(Boolean.class)))
.thenReturn(new CloudBlockBlob(new URI("https://ms.com/storageUri")));
int numOfFiles = 3;
for (int i = 0; i < numOfFiles; i++) {
InputStream stream = new FileInputStream(testFilePath);
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(stream,false);
ingestClientImplMock.ingestFromStreamAsync(streamSourceInfo, props);
}
verify(ingestClientImplMock, times(numOfFiles)).ingestFromStreamAsync(any(StreamSourceInfo.class), any(IngestionProperties.class));
} catch (Exception e) {
e.printStackTrace();
}
}
}
Loading

0 comments on commit 2741219

Please sign in to comment.