From 39dfa808fe23fd4c1c4e6774703295f38dd617e9 Mon Sep 17 00:00:00 2001 From: Sumit Aich Date: Tue, 29 Nov 2022 14:15:06 +0530 Subject: [PATCH] feat: bigtable sink using depot (#198) * feat: bigtable sink using depot * docs: bigtable sink docs * chore: update depot version and bump firehose version to 0.6.0 Co-authored-by: MayurGubrele --- README.md | 1 + build.gradle | 4 +-- docs/docs/guides/create_firehose.md | 4 +++ docs/docs/introduction.md | 1 + docs/docs/sinks/bigtable-sink.md | 7 +++++ .../odpf/firehose/config/enums/SinkType.java | 1 + .../io/odpf/firehose/sink/SinkFactory.java | 12 +++++++++ .../blobstorage/BlobStorageDlqWriterTest.java | 27 +++++++------------ 8 files changed, 38 insertions(+), 19 deletions(-) create mode 100644 docs/docs/sinks/bigtable-sink.md diff --git a/README.md b/README.md index e6ee49ea3..80e90f4c1 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ Discover why users choose Firehose as their main Kafka Consumer - Elasticsearch - Redis - Bigquery + - BigTable - Blob Storage/Object Storage : - Google Cloud Storage diff --git a/build.gradle b/build.gradle index 55717b00e..dedc9c25d 100644 --- a/build.gradle +++ b/build.gradle @@ -33,7 +33,7 @@ lombok { } group 'io.odpf' -version '0.5.0' +version '0.6.0' def projName = "firehose" @@ -101,7 +101,7 @@ dependencies { implementation 'com.google.cloud:google-cloud-storage:1.114.0' implementation 'com.google.cloud:google-cloud-bigquery:1.115.0' implementation 'org.apache.logging.log4j:log4j-core:2.17.1' - implementation group: 'io.odpf', name: 'depot', version: '0.2.1' + implementation group: 'io.odpf', name: 'depot', version: '0.3.4' implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j' testImplementation group: 'junit', name: 'junit', version: '4.11' diff --git a/docs/docs/guides/create_firehose.md b/docs/docs/guides/create_firehose.md index d2855d9d3..37cec7e7b 100644 --- a/docs/docs/guides/create_firehose.md +++ b/docs/docs/guides/create_firehose.md @@ -136,4 +136,8 @@ _**Note:**_ [_**DATABASE**_](../sinks/influxdb-sink.md#sink_influx_db_name) _**a - The timestamp column is needed incase of partition table. It can be generated at the time of ingestion by setting the config. Please refer to config `SINK_BIGQUERY_ADD_EVENT_TIMESTAMP_ENABLE` in [depot bigquery sink config section](https://github.com/odpf/depot/blob/main/docs/reference/configuration/bigquery-sink.md#sink_bigquery_add_event_timestamp_enable) - Google cloud credential with some bigquery permission is required to run this sink. +## Create a Bigtable sink + +- it requires the following environment [variables](https://github.com/odpf/depot/blob/main/docs/reference/configuration/bigtable.md) ,which are required by ODPF Depot library, to be set along with the generic firehose variables. + If you'd like to connect to a sink which is not yet supported, you can create a new sink by following the [contribution guidelines](../contribute/contribution.md) diff --git a/docs/docs/introduction.md b/docs/docs/introduction.md index ee3de4026..2ad38eb1d 100644 --- a/docs/docs/introduction.md +++ b/docs/docs/introduction.md @@ -38,6 +38,7 @@ Following sinks are supported in the Firehose - [Prometheus](https://en.wikipedia.org/wiki/Prometheus_%28software) - A time-series database - [MongoDB](https://en.wikipedia.org/wiki/MongoDB) - A NoSQL database - [Bigquery](https://cloud.google.com/bigquery) - A data warehouse provided by Google Cloud +- [Bigtable](https://cloud.google.com/bigtable) - A fully managed, scalable NoSQL database service for large analytical and operational workloads. - [Blob Storage](https://gocloud.dev/howto/blob/) - A data storage architecture for large stores of unstructured data like google cloud storage, amazon s3, apache hadoop distributed filesystem ## How can I get started? diff --git a/docs/docs/sinks/bigtable-sink.md b/docs/docs/sinks/bigtable-sink.md new file mode 100644 index 000000000..258789f09 --- /dev/null +++ b/docs/docs/sinks/bigtable-sink.md @@ -0,0 +1,7 @@ +# Bigtable Sink + +Bigtable Sink is implemented in Firehose using the Bigtable sink connector implementation in ODPF Depot. You can check out ODPF Depot Github repository [here](https://github.com/odpf/depot). + +### Configuration + +For Bigtable sink in Firehose we need to set first (`SINK_TYPE`=`bigtable`). There are some generic configs which are common across different sink types which need to be set which are mentioned in [generic.md](../advance/generic.md). Bigtable sink specific configs are mentioned in ODPF Depot repository. You can check out the Bigtable Sink configs [here](https://github.com/odpf/depot/blob/main/docs/reference/configuration/bigtable.md) \ No newline at end of file diff --git a/src/main/java/io/odpf/firehose/config/enums/SinkType.java b/src/main/java/io/odpf/firehose/config/enums/SinkType.java index b7a677edc..eaa73a157 100644 --- a/src/main/java/io/odpf/firehose/config/enums/SinkType.java +++ b/src/main/java/io/odpf/firehose/config/enums/SinkType.java @@ -12,5 +12,6 @@ public enum SinkType { PROMETHEUS, BLOB, BIGQUERY, + BIGTABLE, MONGODB } diff --git a/src/main/java/io/odpf/firehose/sink/SinkFactory.java b/src/main/java/io/odpf/firehose/sink/SinkFactory.java index 00ee24e26..7029e9c19 100644 --- a/src/main/java/io/odpf/firehose/sink/SinkFactory.java +++ b/src/main/java/io/odpf/firehose/sink/SinkFactory.java @@ -3,6 +3,9 @@ import io.odpf.depot.bigquery.BigQuerySink; import io.odpf.depot.bigquery.BigQuerySinkFactory; import io.odpf.depot.config.BigQuerySinkConfig; +import io.odpf.depot.bigtable.BigTableSinkFactory; +import io.odpf.depot.bigtable.BigTableSink; +import io.odpf.depot.config.BigTableSinkConfig; import io.odpf.depot.log.LogSink; import io.odpf.depot.log.LogSinkFactory; import io.odpf.depot.metrics.StatsDReporter; @@ -33,6 +36,7 @@ public class SinkFactory { private final StencilClient stencilClient; private final OffsetManager offsetManager; private BigQuerySinkFactory bigQuerySinkFactory; + private BigTableSinkFactory bigTableSinkFactory; private LogSinkFactory logSinkFactory; private final Map config; @@ -75,6 +79,12 @@ public void init() { BigquerySinkUtils.getRowIDCreator()); bigQuerySinkFactory.init(); return; + case BIGTABLE: + bigTableSinkFactory = new BigTableSinkFactory( + ConfigFactory.create(BigTableSinkConfig.class, config), + statsDReporter); + bigTableSinkFactory.init(); + return; default: throw new ConfigurationException("Invalid Firehose SINK_TYPE"); } @@ -104,6 +114,8 @@ public Sink getSink() { return BlobSinkFactory.create(config, offsetManager, statsDReporter, stencilClient); case BIGQUERY: return new GenericOdpfSink(new FirehoseInstrumentation(statsDReporter, BigQuerySink.class), sinkType.name(), bigQuerySinkFactory.create()); + case BIGTABLE: + return new GenericOdpfSink(new FirehoseInstrumentation(statsDReporter, BigTableSink.class), sinkType.name(), bigTableSinkFactory.create()); case MONGODB: return MongoSinkFactory.create(config, statsDReporter, stencilClient); default: diff --git a/src/test/java/io/odpf/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java b/src/test/java/io/odpf/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java index a9e0637d7..21c7c0303 100644 --- a/src/test/java/io/odpf/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java +++ b/src/test/java/io/odpf/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java @@ -2,10 +2,10 @@ import io.odpf.depot.error.ErrorInfo; import io.odpf.depot.error.ErrorType; -import io.odpf.firehose.message.Message; import io.odpf.firehose.exception.DeserializerException; -import io.odpf.firehose.sink.common.blobstorage.BlobStorageException; +import io.odpf.firehose.message.Message; import io.odpf.firehose.sink.common.blobstorage.BlobStorage; +import io.odpf.firehose.sink.common.blobstorage.BlobStorageException; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -16,13 +16,10 @@ import java.io.IOException; import java.time.Instant; import java.util.Arrays; -import java.util.Base64; import java.util.Comparator; import java.util.List; import static org.mockito.Mockito.*; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.verify; @RunWith(MockitoJUnitRunner.class) public class BlobStorageDlqWriterTest { @@ -50,14 +47,12 @@ public void shouldWriteMessagesWithoutErrorInfoToObjectStorage() throws IOExcept List messages = Arrays.asList(message1, message2, message3, message4); Assert.assertEquals(0, blobStorageDLQWriter.write(messages).size()); - String key = Base64.getEncoder().encodeToString("123".getBytes()); - String message = Base64.getEncoder().encodeToString("abc".getBytes()); verify(blobStorage).store(contains("booking/2020-01-02"), - eq(("{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":3,\"timestamp\":1577923200000,\"error\":\"DESERIALIZATION_ERROR\"}\n" - + "{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":4,\"timestamp\":1577923200000,\"error\":\"DESERIALIZATION_ERROR\"}").getBytes())); + eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":3,\"timestamp\":1577923200000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\"}\n" + + "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":4,\"timestamp\":1577923200000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\"}").getBytes())); verify(blobStorage).store(contains("booking/2020-01-01"), - eq(("{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":1,\"timestamp\":1577836800000,\"error\":\"DESERIALIZATION_ERROR\"}\n" - + "{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":2,\"timestamp\":1577836800000,\"error\":\"DESERIALIZATION_ERROR\"}").getBytes())); + eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":1,\"timestamp\":1577836800000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\"}\n" + + "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":2,\"timestamp\":1577836800000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\"}").getBytes())); } @Test @@ -73,14 +68,12 @@ public void shouldWriteMessageErrorTypesToObjectStorage() throws IOException, Bl List messages = Arrays.asList(message1, message2, message3, message4); Assert.assertEquals(0, blobStorageDLQWriter.write(messages).size()); - String key = Base64.getEncoder().encodeToString("123".getBytes()); - String message = Base64.getEncoder().encodeToString("abc".getBytes()); verify(blobStorage).store(contains("booking/2020-01-02"), - eq(("{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":3,\"timestamp\":1577923200000,\"error\":\"DESERIALIZATION_ERROR\"}\n" - + "{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":4,\"timestamp\":1577923200000,\"error\":\"SINK_UNKNOWN_ERROR\"}").getBytes())); + eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":3,\"timestamp\":1577923200000,\"error\":\"Exception , ErrorType: DESERIALIZATION_ERROR\"}\n" + + "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":4,\"timestamp\":1577923200000,\"error\":\"Exception , ErrorType: SINK_UNKNOWN_ERROR\"}").getBytes())); verify(blobStorage).store(contains("booking/2020-01-01"), - eq(("{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":1,\"timestamp\":1577836800000,\"error\":\"DESERIALIZATION_ERROR\"}\n" - + "{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":2,\"timestamp\":1577836800000,\"error\":\"SINK_UNKNOWN_ERROR\"}").getBytes())); + eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":1,\"timestamp\":1577836800000,\"error\":\"Exception , ErrorType: DESERIALIZATION_ERROR\"}\n" + + "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":2,\"timestamp\":1577836800000,\"error\":\"Exception null, ErrorType: SINK_UNKNOWN_ERROR\"}").getBytes())); } @Test