From a8b1d4653fdba6da4d40f5cd8e72cd4061da9ae3 Mon Sep 17 00:00:00 2001 From: Ayush Khandelwal Date: Sun, 12 May 2024 16:51:45 +0530 Subject: [PATCH 1/7] New interface for AzureHTTPClient providers --- .../system/azureblob/AzureBlobClientBuilder.java | 2 +- .../azureblob/AzureBlobClientBuilderFactory.java | 13 +++++++++++++ .../samza/system/azureblob/AzureBlobConfig.java | 7 +++++++ .../system/azureblob/BlobClientBuilder.java | 15 +++++++++++++++ .../azureblob/BlobClientBuilderFactory.java | 16 ++++++++++++++++ .../producer/AzureBlobSystemProducer.java | 13 +++++++++++-- 6 files changed, 63 insertions(+), 3 deletions(-) create mode 100644 samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilderFactory.java create mode 100644 samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilder.java create mode 100644 samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilderFactory.java diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilder.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilder.java index a8a6eb2df1..1ee9620b4d 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilder.java +++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilder.java @@ -39,7 +39,7 @@ * configs given to the SystemProducer - such as which authentication method to use, whether to use proxy to authenticate, * and so on. */ -public final class AzureBlobClientBuilder { +public final class AzureBlobClientBuilder implements BlobClientBuilder{ private final String systemName; private final String azureUrlFormat; private final AzureBlobConfig azureBlobConfig; diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilderFactory.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilderFactory.java new file mode 100644 index 0000000000..60f4ce9f99 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilderFactory.java @@ -0,0 +1,13 @@ +package org.apache.samza.system.azureblob; + +/** + * Default implementation of {@link BlobClientBuilderFactory} that constructs a + * new instance of {@link AzureBlobClientBuilder}. + */ +public class AzureBlobClientBuilderFactory implements BlobClientBuilderFactory { + @Override + public BlobClientBuilder getBlobClientBuilder(String systemName, String azureUrlFormat, + AzureBlobConfig azureBlobConfig) { + return new AzureBlobClientBuilder(systemName, azureUrlFormat, azureBlobConfig); + } +} diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java index 37c6ebcf3b..bedf433703 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java +++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java @@ -38,6 +38,8 @@ public class AzureBlobConfig extends MapConfig { // fully qualified class name of the AzureBlobWriter impl for the producer system public static final String SYSTEM_WRITER_FACTORY_CLASS_NAME = SYSTEM_AZUREBLOB_PREFIX + "writer.factory.class"; public static final String SYSTEM_WRITER_FACTORY_CLASS_NAME_DEFAULT = "org.apache.samza.system.azureblob.avro.AzureBlobAvroWriterFactory"; + public static final String SYSTEM_BLOB_CLIENT_BUILDER_FACTORY_CLASS_NAME = SYSTEM_AZUREBLOB_PREFIX + "azureclientbuilder.factory.class"; + public static final String SYSTEM_BLOB_CLIENT_BUILDER_FACTORY_CLASS_NAME_DEFAULT = "org.apache.samza.system.azureblob.AzureBlobClientBuilderFactory"; public static final String SYSTEM_USE_TOKEN_CREDENTIAL_AUTHENTICATION = Config.SENSITIVE_PREFIX + SYSTEM_AZUREBLOB_PREFIX + "useTokenCredentialAuthentication"; private static final boolean SYSTEM_USE_TOKEN_CREDENTIAL_AUTHENTICATION_DEFAULT = false; @@ -172,6 +174,11 @@ public String getAzureBlobWriterFactoryClassName(String systemName) { return get(String.format(SYSTEM_WRITER_FACTORY_CLASS_NAME, systemName), SYSTEM_WRITER_FACTORY_CLASS_NAME_DEFAULT); } + public String getAzureBlobClientBuilderFactoryClassName(String systemName) { + return get(String.format(SYSTEM_BLOB_CLIENT_BUILDER_FACTORY_CLASS_NAME, systemName), + SYSTEM_BLOB_CLIENT_BUILDER_FACTORY_CLASS_NAME_DEFAULT); + } + public int getMaxFlushThresholdSize(String systemName) { return getInt(String.format(SYSTEM_MAX_FLUSH_THRESHOLD_SIZE, systemName), SYSTEM_MAX_FLUSH_THRESHOLD_SIZE_DEFAULT); } diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilder.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilder.java new file mode 100644 index 0000000000..d55ac0b275 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilder.java @@ -0,0 +1,15 @@ +package org.apache.samza.system.azureblob; + +import com.azure.storage.blob.BlobServiceAsyncClient; + + +/** + * Create a BlobServiceAsyncClient. Implementation controls construction of + * underlying client. + */ +public interface BlobClientBuilder { + /** + * create a client for ABS (uploads) + */ + BlobServiceAsyncClient getBlobServiceAsyncClient(); +} diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilderFactory.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilderFactory.java new file mode 100644 index 0000000000..1d2113ce02 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilderFactory.java @@ -0,0 +1,16 @@ +package org.apache.samza.system.azureblob; + +/** + * Constructs a new instance of {@link AzureBlobClientBuilder} + * from factory provided by configuration. + */ +public interface BlobClientBuilderFactory { + /** + * Create a new instance of {@link AzureBlobClientBuilder} + * @param systemName Name of the system + * @param azureUrlFormat Azure URL format + * @param azureBlobConfig Azure Blob configuration + * @return New instance of {@link AzureBlobClientBuilder} + */ + BlobClientBuilder getBlobClientBuilder(String systemName, String azureUrlFormat, AzureBlobConfig azureBlobConfig); +} diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java index 2774dba41c..86bdc5f7cd 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java +++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java @@ -46,6 +46,7 @@ import org.apache.samza.system.SystemProducerException; import org.apache.samza.system.azureblob.AzureBlobClientBuilder; import org.apache.samza.system.azureblob.AzureBlobConfig; +import org.apache.samza.system.azureblob.BlobClientBuilderFactory; import org.apache.samza.system.azureblob.compression.CompressionFactory; import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory; import org.slf4j.Logger; @@ -116,6 +117,7 @@ public class AzureBlobSystemProducer implements SystemProducer { // Map of writers indexed first by sourceName and then by (streamName, partitionName) or just streamName if partition key does not exist. private final Map> writerMap; private final AzureBlobWriterFactory writerFactory; + private final BlobClientBuilderFactory clientFactory; private final int blockFlushThresholdSize; private final long flushTimeoutMs; @@ -143,6 +145,13 @@ public AzureBlobSystemProducer(String systemName, AzureBlobConfig config, Metric this.systemName = systemName; this.config = config; + String clientFactoryClassName = this.config.getAzureBlobClientBuilderFactoryClassName(this.systemName); + try { + this.clientFactory = (BlobClientBuilderFactory) Class.forName(clientFactoryClassName).newInstance(); + } catch (Exception e) { + throw new SystemProducerException("Could not create blob client factory with name " + clientFactoryClassName, e); + } + String writerFactoryClassName = this.config.getAzureBlobWriterFactoryClassName(this.systemName); try { this.writerFactory = (AzureBlobWriterFactory) Class.forName(writerFactoryClassName).newInstance(); @@ -344,8 +353,8 @@ public void flush(String source) { @VisibleForTesting void setupAzureContainer() { try { - BlobServiceAsyncClient storageClient = new AzureBlobClientBuilder(systemName, AZURE_URL, config) - .getBlobServiceAsyncClient(); + BlobServiceAsyncClient storageClient = + clientFactory.getBlobClientBuilder(systemName, AZURE_URL, config).getBlobServiceAsyncClient(); validateFlushThresholdSizeSupported(storageClient); containerAsyncClient = storageClient.getBlobContainerAsyncClient(systemName); From 40ef401cc9014e20837622eccef6a2035c7e9d69 Mon Sep 17 00:00:00 2001 From: Ayush Khandelwal Date: Wed, 12 Jun 2024 14:31:50 +0530 Subject: [PATCH 2/7] Added license headers --- .../AzureBlobClientBuilderFactory.java | 18 ++++++++++++++++++ .../system/azureblob/BlobClientBuilder.java | 18 ++++++++++++++++++ .../azureblob/BlobClientBuilderFactory.java | 18 ++++++++++++++++++ 3 files changed, 54 insertions(+) diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilderFactory.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilderFactory.java index 60f4ce9f99..7a5120c117 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilderFactory.java +++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilderFactory.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.samza.system.azureblob; /** diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilder.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilder.java index d55ac0b275..52eb090efc 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilder.java +++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilder.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.samza.system.azureblob; import com.azure.storage.blob.BlobServiceAsyncClient; diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilderFactory.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilderFactory.java index 1d2113ce02..8eda4cbef8 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilderFactory.java +++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilderFactory.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.samza.system.azureblob; /** From 3c2c7044a4c28cfd671e2b339203d579689a0f8b Mon Sep 17 00:00:00 2001 From: Ayush Khandelwal Date: Wed, 12 Jun 2024 14:44:16 +0530 Subject: [PATCH 3/7] Updated method spec definitions --- .../samza/system/azureblob/BlobClientBuilderFactory.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilderFactory.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilderFactory.java index 8eda4cbef8..438c307f9c 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilderFactory.java +++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilderFactory.java @@ -19,16 +19,16 @@ package org.apache.samza.system.azureblob; /** - * Constructs a new instance of {@link AzureBlobClientBuilder} - * from factory provided by configuration. + * Constructs a new instance of type {@link BlobClientBuilder}. + * Implementation controls construction of underlying instance. */ public interface BlobClientBuilderFactory { /** - * Create a new instance of {@link AzureBlobClientBuilder} + * Create a new instance of {@link BlobClientBuilder} * @param systemName Name of the system * @param azureUrlFormat Azure URL format * @param azureBlobConfig Azure Blob configuration - * @return New instance of {@link AzureBlobClientBuilder} + * @return New instance of {@link BlobClientBuilder} */ BlobClientBuilder getBlobClientBuilder(String systemName, String azureUrlFormat, AzureBlobConfig azureBlobConfig); } From db7c653fcac1119d49102203ca7e9d12e36ddd1b Mon Sep 17 00:00:00 2001 From: Ayush Khandelwal Date: Wed, 12 Jun 2024 21:28:54 +0530 Subject: [PATCH 4/7] Updated spec headers doc --- .../apache/samza/system/azureblob/BlobClientBuilder.java | 3 ++- .../samza/system/azureblob/BlobClientBuilderFactory.java | 7 ++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilder.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilder.java index 52eb090efc..9bc3a5b199 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilder.java +++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilder.java @@ -27,7 +27,8 @@ */ public interface BlobClientBuilder { /** - * create a client for ABS (uploads) + * Create a client for uploading to Azure Blob Storage + * @return New instance of {@link BlobServiceAsyncClient} */ BlobServiceAsyncClient getBlobServiceAsyncClient(); } diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilderFactory.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilderFactory.java index 438c307f9c..00d4cf94d1 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilderFactory.java +++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilderFactory.java @@ -25,9 +25,10 @@ public interface BlobClientBuilderFactory { /** * Create a new instance of {@link BlobClientBuilder} - * @param systemName Name of the system - * @param azureUrlFormat Azure URL format - * @param azureBlobConfig Azure Blob configuration + * @param systemName Name of the system for which the blob client builder is being created + * @param azureUrlFormat The format of the Azure URL, which should conform to Azure's URL formatting requirements. + * @param azureBlobConfig The configuration settings for Azure Blob, encapsulated in an {@link AzureBlobConfig} object. + * This includes metadata details for Azure Blob configs. * @return New instance of {@link BlobClientBuilder} */ BlobClientBuilder getBlobClientBuilder(String systemName, String azureUrlFormat, AzureBlobConfig azureBlobConfig); From 2a8360b3f5b90b925efcef84e8674129f158c0b3 Mon Sep 17 00:00:00 2001 From: Ayush Khandelwal Date: Thu, 13 Jun 2024 14:21:16 +0530 Subject: [PATCH 5/7] Updated config property in readme file --- docs/learn/documentation/versioned/jobs/samza-configurations.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md index 6188e90b47..9ccb4192e5 100644 --- a/docs/learn/documentation/versioned/jobs/samza-configurations.md +++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md @@ -264,6 +264,7 @@ Configs for producing to [Azure Blob Storage](https://azure.microsoft.com/en-us/ |systems.**_system-name_**.azureblob.proxy.hostname| |if proxy.use is true then host name of proxy.| |systems.**_system-name_**.azureblob.proxy.port| |if proxy.use is true then port of proxy.| |systems.**_system-name_**.azureblob.writer.factory.class|`org.apache.samza.system.`
`azureblob.avro.`
`AzureBlobAvroWriterFactory`|Fully qualified class name of the `org.apache.samza.system.azureblob.producer.AzureBlobWriter` impl for the system producer.

The default writer creates blobs that are of type AVRO and require the messages sent to a blob to be AVRO records. The blobs created by the default writer are of type [Block Blobs](https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-block-blobs--append-blobs--and-page-blobs#about-block-blobs).| +|systems.**_system-name_**.azureblob.azureclientbuilder.factory.class | `org.apache.samza.system.`
`azureblob.AzureBlobClientBuilderFactory`|Fully qualified class name of the factory of the `org.apache.samza.system.azureblob.BlobClientBuilder` implementation for the client builder. | |systems.**_system-name_**.azureblob.compression.type|"none"|type of compression to be used before uploading blocks. Can be "none" or "gzip".| |systems.**_system-name_**.azureblob.maxFlushThresholdSize|10485760 (10 MB)|max size of the uncompressed block to be uploaded in bytes. Maximum size allowed by Azure is 100MB.| |systems.**_system-name_**.azureblob.maxBlobSize|Long.MAX_VALUE (unlimited)|max size of the uncompressed blob in bytes.
If default value then size is unlimited capped only by Azure BlockBlob size of 4.75 TB (100 MB per block X 50,000 blocks).| From 472c41532c26748ad582aaf6a5aa1e46207d98ed Mon Sep 17 00:00:00 2001 From: Ayush Khandelwal Date: Tue, 20 Aug 2024 14:12:39 +0530 Subject: [PATCH 6/7] Modified doc of method --- .../apache/samza/system/azureblob/BlobClientBuilder.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilder.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilder.java index 9bc3a5b199..f76c3d9553 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilder.java +++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilder.java @@ -22,8 +22,12 @@ /** - * Create a BlobServiceAsyncClient. Implementation controls construction of - * underlying client. + * Create a BlobServiceAsyncClient. Implementation controls construction of underlying client. + * Customers implementing their own System Producer need to ensure thread safety of following impl for generation of client. + * If org.apache.samza.system.azureblob.producer.AzureBlobSystemProducer is used, it by defaults allows only one thread to create the client. + * Please ensure any client implementation of this interface to be thread safe as well. + * AzureBlobSystemProducer also ensures to safely close the client on call of stop(). Please ensure to close clients if using this interface + * to create your own client. */ public interface BlobClientBuilder { /** From ee7f60b1d5c1f143b1f7407f4ae5c221a826e6fb Mon Sep 17 00:00:00 2001 From: Ayush Khandelwal Date: Thu, 22 Aug 2024 14:04:21 +0530 Subject: [PATCH 7/7] Fixed checkstyle failures --- .../apache/samza/system/azureblob/AzureBlobClientBuilder.java | 2 +- .../system/azureblob/producer/AzureBlobSystemProducer.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilder.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilder.java index 1ee9620b4d..479733e315 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilder.java +++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilder.java @@ -39,7 +39,7 @@ * configs given to the SystemProducer - such as which authentication method to use, whether to use proxy to authenticate, * and so on. */ -public final class AzureBlobClientBuilder implements BlobClientBuilder{ +public final class AzureBlobClientBuilder implements BlobClientBuilder { private final String systemName; private final String azureUrlFormat; private final AzureBlobConfig azureBlobConfig; diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java index 86bdc5f7cd..9fd2b3b1e7 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java +++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java @@ -44,7 +44,6 @@ import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemProducer; import org.apache.samza.system.SystemProducerException; -import org.apache.samza.system.azureblob.AzureBlobClientBuilder; import org.apache.samza.system.azureblob.AzureBlobConfig; import org.apache.samza.system.azureblob.BlobClientBuilderFactory; import org.apache.samza.system.azureblob.compression.CompressionFactory;