Skip to content

Commit

Permalink
feat: bigtable sink using depot (#198)
Browse files Browse the repository at this point in the history
* feat: bigtable sink using depot

* docs: bigtable sink docs

* chore: update depot version and bump firehose version to 0.6.0

Co-authored-by: MayurGubrele <mayur.gubrele@gmail.com>
  • Loading branch information
sumitaich1998 and MayurGubrele authored Nov 29, 2022
1 parent d9f014f commit 39dfa80
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 19 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Discover why users choose Firehose as their main Kafka Consumer
- Elasticsearch
- Redis
- Bigquery
- BigTable
- Blob Storage/Object Storage :
- Google Cloud Storage

Expand Down
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ lombok {
}

group 'io.odpf'
version '0.5.0'
version '0.6.0'

def projName = "firehose"

Expand Down Expand Up @@ -101,7 +101,7 @@ dependencies {
implementation 'com.google.cloud:google-cloud-storage:1.114.0'
implementation 'com.google.cloud:google-cloud-bigquery:1.115.0'
implementation 'org.apache.logging.log4j:log4j-core:2.17.1'
implementation group: 'io.odpf', name: 'depot', version: '0.2.1'
implementation group: 'io.odpf', name: 'depot', version: '0.3.4'
implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j'

testImplementation group: 'junit', name: 'junit', version: '4.11'
Expand Down
4 changes: 4 additions & 0 deletions docs/docs/guides/create_firehose.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,8 @@ _**Note:**_ [_**DATABASE**_](../sinks/influxdb-sink.md#sink_influx_db_name) _**a
- The timestamp column is needed incase of partition table. It can be generated at the time of ingestion by setting the config. Please refer to config `SINK_BIGQUERY_ADD_EVENT_TIMESTAMP_ENABLE` in [depot bigquery sink config section](https://github.com/odpf/depot/blob/main/docs/reference/configuration/bigquery-sink.md#sink_bigquery_add_event_timestamp_enable)
- Google cloud credential with some bigquery permission is required to run this sink.

## Create a Bigtable sink

- it requires the following environment [variables](https://github.com/odpf/depot/blob/main/docs/reference/configuration/bigtable.md) ,which are required by ODPF Depot library, to be set along with the generic firehose variables.

If you'd like to connect to a sink which is not yet supported, you can create a new sink by following the [contribution guidelines](../contribute/contribution.md)
1 change: 1 addition & 0 deletions docs/docs/introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Following sinks are supported in the Firehose
- [Prometheus](https://en.wikipedia.org/wiki/Prometheus_%28software) - A time-series database
- [MongoDB](https://en.wikipedia.org/wiki/MongoDB) - A NoSQL database
- [Bigquery](https://cloud.google.com/bigquery) - A data warehouse provided by Google Cloud
- [Bigtable](https://cloud.google.com/bigtable) - A fully managed, scalable NoSQL database service for large analytical and operational workloads.
- [Blob Storage](https://gocloud.dev/howto/blob/) - A data storage architecture for large stores of unstructured data like google cloud storage, amazon s3, apache hadoop distributed filesystem

## How can I get started?
Expand Down
7 changes: 7 additions & 0 deletions docs/docs/sinks/bigtable-sink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Bigtable Sink

Bigtable Sink is implemented in Firehose using the Bigtable sink connector implementation in ODPF Depot. You can check out ODPF Depot Github repository [here](https://github.com/odpf/depot).

### Configuration

For Bigtable sink in Firehose we need to set first (`SINK_TYPE`=`bigtable`). There are some generic configs which are common across different sink types which need to be set which are mentioned in [generic.md](../advance/generic.md). Bigtable sink specific configs are mentioned in ODPF Depot repository. You can check out the Bigtable Sink configs [here](https://github.com/odpf/depot/blob/main/docs/reference/configuration/bigtable.md)
1 change: 1 addition & 0 deletions src/main/java/io/odpf/firehose/config/enums/SinkType.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ public enum SinkType {
PROMETHEUS,
BLOB,
BIGQUERY,
BIGTABLE,
MONGODB
}
12 changes: 12 additions & 0 deletions src/main/java/io/odpf/firehose/sink/SinkFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import io.odpf.depot.bigquery.BigQuerySink;
import io.odpf.depot.bigquery.BigQuerySinkFactory;
import io.odpf.depot.config.BigQuerySinkConfig;
import io.odpf.depot.bigtable.BigTableSinkFactory;
import io.odpf.depot.bigtable.BigTableSink;
import io.odpf.depot.config.BigTableSinkConfig;
import io.odpf.depot.log.LogSink;
import io.odpf.depot.log.LogSinkFactory;
import io.odpf.depot.metrics.StatsDReporter;
Expand Down Expand Up @@ -33,6 +36,7 @@ public class SinkFactory {
private final StencilClient stencilClient;
private final OffsetManager offsetManager;
private BigQuerySinkFactory bigQuerySinkFactory;
private BigTableSinkFactory bigTableSinkFactory;
private LogSinkFactory logSinkFactory;
private final Map<String, String> config;

Expand Down Expand Up @@ -75,6 +79,12 @@ public void init() {
BigquerySinkUtils.getRowIDCreator());
bigQuerySinkFactory.init();
return;
case BIGTABLE:
bigTableSinkFactory = new BigTableSinkFactory(
ConfigFactory.create(BigTableSinkConfig.class, config),
statsDReporter);
bigTableSinkFactory.init();
return;
default:
throw new ConfigurationException("Invalid Firehose SINK_TYPE");
}
Expand Down Expand Up @@ -104,6 +114,8 @@ public Sink getSink() {
return BlobSinkFactory.create(config, offsetManager, statsDReporter, stencilClient);
case BIGQUERY:
return new GenericOdpfSink(new FirehoseInstrumentation(statsDReporter, BigQuerySink.class), sinkType.name(), bigQuerySinkFactory.create());
case BIGTABLE:
return new GenericOdpfSink(new FirehoseInstrumentation(statsDReporter, BigTableSink.class), sinkType.name(), bigTableSinkFactory.create());
case MONGODB:
return MongoSinkFactory.create(config, statsDReporter, stencilClient);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import io.odpf.depot.error.ErrorInfo;
import io.odpf.depot.error.ErrorType;
import io.odpf.firehose.message.Message;
import io.odpf.firehose.exception.DeserializerException;
import io.odpf.firehose.sink.common.blobstorage.BlobStorageException;
import io.odpf.firehose.message.Message;
import io.odpf.firehose.sink.common.blobstorage.BlobStorage;
import io.odpf.firehose.sink.common.blobstorage.BlobStorageException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -16,13 +16,10 @@
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Base64;
import java.util.Comparator;
import java.util.List;

import static org.mockito.Mockito.*;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;

@RunWith(MockitoJUnitRunner.class)
public class BlobStorageDlqWriterTest {
Expand Down Expand Up @@ -50,14 +47,12 @@ public void shouldWriteMessagesWithoutErrorInfoToObjectStorage() throws IOExcept
List<Message> messages = Arrays.asList(message1, message2, message3, message4);
Assert.assertEquals(0, blobStorageDLQWriter.write(messages).size());

String key = Base64.getEncoder().encodeToString("123".getBytes());
String message = Base64.getEncoder().encodeToString("abc".getBytes());
verify(blobStorage).store(contains("booking/2020-01-02"),
eq(("{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":3,\"timestamp\":1577923200000,\"error\":\"DESERIALIZATION_ERROR\"}\n"
+ "{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":4,\"timestamp\":1577923200000,\"error\":\"DESERIALIZATION_ERROR\"}").getBytes()));
eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":3,\"timestamp\":1577923200000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\"}\n"
+ "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":4,\"timestamp\":1577923200000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\"}").getBytes()));
verify(blobStorage).store(contains("booking/2020-01-01"),
eq(("{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":1,\"timestamp\":1577836800000,\"error\":\"DESERIALIZATION_ERROR\"}\n"
+ "{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":2,\"timestamp\":1577836800000,\"error\":\"DESERIALIZATION_ERROR\"}").getBytes()));
eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":1,\"timestamp\":1577836800000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\"}\n"
+ "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":2,\"timestamp\":1577836800000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\"}").getBytes()));
}

@Test
Expand All @@ -73,14 +68,12 @@ public void shouldWriteMessageErrorTypesToObjectStorage() throws IOException, Bl
List<Message> messages = Arrays.asList(message1, message2, message3, message4);
Assert.assertEquals(0, blobStorageDLQWriter.write(messages).size());

String key = Base64.getEncoder().encodeToString("123".getBytes());
String message = Base64.getEncoder().encodeToString("abc".getBytes());
verify(blobStorage).store(contains("booking/2020-01-02"),
eq(("{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":3,\"timestamp\":1577923200000,\"error\":\"DESERIALIZATION_ERROR\"}\n"
+ "{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":4,\"timestamp\":1577923200000,\"error\":\"SINK_UNKNOWN_ERROR\"}").getBytes()));
eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":3,\"timestamp\":1577923200000,\"error\":\"Exception , ErrorType: DESERIALIZATION_ERROR\"}\n"
+ "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":4,\"timestamp\":1577923200000,\"error\":\"Exception , ErrorType: SINK_UNKNOWN_ERROR\"}").getBytes()));
verify(blobStorage).store(contains("booking/2020-01-01"),
eq(("{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":1,\"timestamp\":1577836800000,\"error\":\"DESERIALIZATION_ERROR\"}\n"
+ "{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":2,\"timestamp\":1577836800000,\"error\":\"SINK_UNKNOWN_ERROR\"}").getBytes()));
eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":1,\"timestamp\":1577836800000,\"error\":\"Exception , ErrorType: DESERIALIZATION_ERROR\"}\n"
+ "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":2,\"timestamp\":1577836800000,\"error\":\"Exception null, ErrorType: SINK_UNKNOWN_ERROR\"}").getBytes()));
}

@Test
Expand Down

0 comments on commit 39dfa80

Please sign in to comment.