Skip to content

Commit

Permalink
Revert "feat: redis sink using depot (#193)" (#204)
Browse files Browse the repository at this point in the history
This reverts commit 01de086.
  • Loading branch information
sumitaich1998 authored Nov 23, 2022
1 parent 4237ead commit d9f014f
Show file tree
Hide file tree
Showing 48 changed files with 2,760 additions and 35 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ dependencies {
implementation 'com.google.cloud:google-cloud-storage:1.114.0'
implementation 'com.google.cloud:google-cloud-bigquery:1.115.0'
implementation 'org.apache.logging.log4j:log4j-core:2.17.1'
implementation group: 'io.odpf', name: 'depot', version: '0.3.3'
implementation group: 'io.odpf', name: 'depot', version: '0.2.1'
implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j'

testImplementation group: 'junit', name: 'junit', version: '4.11'
Expand Down
85 changes: 72 additions & 13 deletions docs/docs/sinks/redis-sink.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,80 @@
# Redis Sink
# Redis

Redis Sink is implemented in Firehose using the Redis sink connector implementation in ODPF Depot. You can check out ODPF Depot Github repository [here](https://github.com/odpf/depot).
A Redis sink Firehose \(`SINK_TYPE`=`redis`\) requires the following variables to be set along with Generic ones

### Data Types
Redis sink can be created in 3 different modes based on the value of [`SINK_REDIS_DATA_TYPE`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_data_type): HashSet, KeyValue or List
- `Hashset`: For each message, an entry of the format `key : field : value` is generated and pushed to Redis. Field and value are generated on the basis of the config [`SINK_REDIS_HASHSET_FIELD_TO_COLUMN_MAPPING`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_hashset_field_to_column_mapping)
- `List`: For each message entry of the format `key : value` is generated and pushed to Redis. Value is fetched for the Proto field name provided in the config [`SINK_REDIS_LIST_DATA_FIELD_NAME`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_list_data_field_name)
- `KeyValue`: For each message entry of the format `key : value` is generated and pushed to Redis. Value is fetched for the proto field name provided in the config [`SINK_REDIS_KEY_VALUE_DATA_FIELD_NAME`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_key_value_data_field_name)
### `SINK_REDIS_URLS`

The `key` is picked up from a field in the message itself.
REDIS instance hostname/IP address followed by its port.

Limitation: Depot Redis sink only supports Key-Value, HashSet and List entries as of now.
- Example value: `localhos:6379,localhost:6380`
- Type: `required`

### Configuration
### `SINK_REDIS_DATA_TYPE`

For Redis sink in Firehose we need to set first (`SINK_TYPE`=`redis`). There are some generic configs which are common across different sink types which need to be set which are mentioned in [generic.md](../advance/generic.md). Redis sink specific configs are mentioned in ODPF Depot repository. You can check out the Redis Sink configs [here](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md)
To select whether you want to push your data as a HashSet or as a List.

- Example value: `Hashset`
- Type: `required`
- Default value: `List`

### Deployment Types
Redis sink, as of now, supports two different Deployment Types `Standalone` and `Cluster`. This can be configured in the Depot environment variable `SINK_REDIS_DEPLOYMENT_TYPE`.
### `SINK_REDIS_KEY_TEMPLATE`

The string that will act as the key for each Redis entry. This key can be configured as per the requirement, a constant or can extract value from each message and use that as the Redis key.

- Example value: `Service\_%%s,1`

This will take the value with index 1 from proto and create the Redis keys as per the template\

- Type: `required`

### `INPUT_SCHEMA_PROTO_TO_COLUMN_MAPPING`

This is the field that decides what all data will be stored in the HashSet for each message.

- Example value: `{"6":"customer_id", "2":"order_num"}`
- Type: `required (For Hashset)`

### `SINK_REDIS_LIST_DATA_PROTO_INDEX`

This field decides what all data will be stored in the List for each message.

- Example value: `6`

This will get the value of the field with index 6 in your proto and push that to the Redis list with the corresponding keyTemplate\

- Type: `required (For List)`

### `SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX`

This field decides what data will be stored in the value part of key-value pair

- Example value: `6`

This will get the value of the field with index 6 in your proto and push that to the Redis as value with the corresponding keyTemplate\

- Type: `required (For KeyValue)`

### `SINK_REDIS_TTL_TYPE`

- Example value: `DURATION`
- Type: `optional`
- Default value: `DISABLE`
- Choice of Redis TTL type.It can be:\
- `DURATION`: After which the Key will be expired and removed from Redis \(UNIT- seconds\)\
- `EXACT_TIME`: Precise UNIX timestamp after which the Key will be expired

### `SINK_REDIS_TTL_VALUE`

Redis TTL value in Unix Timestamp for `EXACT_TIME` TTL type, In Seconds for `DURATION` TTL type.

- Example value: `100000`
- Type: `optional`
- Default value: `0`

### `SINK_REDIS_DEPLOYMENT_TYPE`

The Redis deployment you are using. At present, we support `Standalone` and `Cluster` types.

- Example value: `Standalone`
- Type: `required`
- Default value: `Standalone`
43 changes: 43 additions & 0 deletions src/main/java/io/odpf/firehose/config/RedisSinkConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.odpf.firehose.config;

import io.odpf.firehose.config.converter.RedisSinkDataTypeConverter;
import io.odpf.firehose.config.converter.RedisSinkTtlTypeConverter;
import io.odpf.firehose.config.converter.RedisSinkDeploymentTypeConverter;
import io.odpf.firehose.config.enums.RedisSinkDataType;
import io.odpf.firehose.config.enums.RedisSinkTtlType;
import io.odpf.firehose.config.enums.RedisSinkDeploymentType;

public interface RedisSinkConfig extends AppConfig {
@Key("SINK_REDIS_URLS")
String getSinkRedisUrls();

@Key("SINK_REDIS_KEY_TEMPLATE")
String getSinkRedisKeyTemplate();

@Key("SINK_REDIS_DATA_TYPE")
@DefaultValue("HASHSET")
@ConverterClass(RedisSinkDataTypeConverter.class)
RedisSinkDataType getSinkRedisDataType();

@Key("SINK_REDIS_LIST_DATA_PROTO_INDEX")
String getSinkRedisListDataProtoIndex();

@Key("SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX")
String getSinkRedisKeyValuetDataProtoIndex();

@Key("SINK_REDIS_TTL_TYPE")
@DefaultValue("DISABLE")
@ConverterClass(RedisSinkTtlTypeConverter.class)
RedisSinkTtlType getSinkRedisTtlType();

@Key("SINK_REDIS_TTL_VALUE")
@DefaultValue("0")
long getSinkRedisTtlValue();

@Key("SINK_REDIS_DEPLOYMENT_TYPE")
@DefaultValue("Standalone")
@ConverterClass(RedisSinkDeploymentTypeConverter.class)
RedisSinkDeploymentType getSinkRedisDeploymentType();


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.odpf.firehose.config.converter;

import io.odpf.firehose.config.enums.RedisSinkDataType;
import org.aeonbits.owner.Converter;

import java.lang.reflect.Method;

public class RedisSinkDataTypeConverter implements Converter<RedisSinkDataType> {
@Override
public RedisSinkDataType convert(Method method, String input) {
return RedisSinkDataType.valueOf(input.toUpperCase());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.odpf.firehose.config.converter;

import io.odpf.firehose.config.enums.RedisSinkDeploymentType;
import org.aeonbits.owner.Converter;

import java.lang.reflect.Method;

public class RedisSinkDeploymentTypeConverter implements Converter<RedisSinkDeploymentType> {
@Override
public RedisSinkDeploymentType convert(Method method, String input) {
return RedisSinkDeploymentType.valueOf(input.toUpperCase());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.odpf.firehose.config.converter;

import io.odpf.firehose.config.enums.RedisSinkTtlType;
import org.aeonbits.owner.Converter;

import java.lang.reflect.Method;

public class RedisSinkTtlTypeConverter implements Converter<RedisSinkTtlType> {
@Override
public RedisSinkTtlType convert(Method method, String input) {
return RedisSinkTtlType.valueOf(input.toUpperCase());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.odpf.firehose.config.enums;

public enum RedisSinkDataType {
LIST,
HASHSET,
KEYVALUE,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.odpf.firehose.config.enums;

public enum RedisSinkDeploymentType {
STANDALONE,
CLUSTER
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.odpf.firehose.config.enums;

public enum RedisSinkTtlType {
EXACT_TIME,
DURATION,
DISABLE
}
14 changes: 3 additions & 11 deletions src/main/java/io/odpf/firehose/sink/SinkFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@
import io.odpf.depot.bigquery.BigQuerySink;
import io.odpf.depot.bigquery.BigQuerySinkFactory;
import io.odpf.depot.config.BigQuerySinkConfig;
import io.odpf.depot.config.RedisSinkConfig;
import io.odpf.depot.log.LogSink;
import io.odpf.depot.log.LogSinkFactory;
import io.odpf.depot.metrics.StatsDReporter;
import io.odpf.depot.redis.RedisSink;
import io.odpf.depot.redis.RedisSinkFactory;
import io.odpf.firehose.config.KafkaConsumerConfig;
import io.odpf.firehose.config.enums.SinkType;
import io.odpf.firehose.consumer.kafka.OffsetManager;
Expand All @@ -23,6 +20,7 @@
import io.odpf.firehose.sink.jdbc.JdbcSinkFactory;
import io.odpf.firehose.sink.mongodb.MongoSinkFactory;
import io.odpf.firehose.sink.prometheus.PromSinkFactory;
import io.odpf.firehose.sink.redis.RedisSinkFactory;
import io.odpf.stencil.client.StencilClient;
import org.aeonbits.owner.ConfigFactory;

Expand All @@ -36,7 +34,6 @@ public class SinkFactory {
private final OffsetManager offsetManager;
private BigQuerySinkFactory bigQuerySinkFactory;
private LogSinkFactory logSinkFactory;
private RedisSinkFactory redisSinkFactory;
private final Map<String, String> config;

public SinkFactory(KafkaConsumerConfig kafkaConsumerConfig,
Expand All @@ -60,6 +57,7 @@ public void init() {
case HTTP:
case INFLUXDB:
case ELASTICSEARCH:
case REDIS:
case GRPC:
case PROMETHEUS:
case BLOB:
Expand All @@ -69,12 +67,6 @@ public void init() {
logSinkFactory = new LogSinkFactory(config, statsDReporter);
logSinkFactory.init();
return;
case REDIS:
redisSinkFactory = new RedisSinkFactory(
ConfigFactory.create(RedisSinkConfig.class, config),
statsDReporter);
redisSinkFactory.init();
return;
case BIGQUERY:
BigquerySinkUtils.addMetadataColumns(config);
bigQuerySinkFactory = new BigQuerySinkFactory(
Expand Down Expand Up @@ -103,7 +95,7 @@ public Sink getSink() {
case ELASTICSEARCH:
return EsSinkFactory.create(config, statsDReporter, stencilClient);
case REDIS:
return new GenericOdpfSink(new FirehoseInstrumentation(statsDReporter, RedisSink.class), sinkType.name(), redisSinkFactory.create());
return RedisSinkFactory.create(config, statsDReporter, stencilClient);
case GRPC:
return GrpcSinkFactory.create(config, statsDReporter, stencilClient);
case PROMETHEUS:
Expand Down
57 changes: 57 additions & 0 deletions src/main/java/io/odpf/firehose/sink/redis/RedisSink.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.odpf.firehose.sink.redis;

import io.odpf.firehose.message.Message;
import io.odpf.firehose.metrics.FirehoseInstrumentation;
import io.odpf.firehose.sink.AbstractSink;
import io.odpf.firehose.sink.redis.client.RedisClient;
import io.odpf.firehose.sink.redis.exception.NoResponseException;

import java.util.List;

/**
* RedisSink allows messages consumed from kafka to be persisted to a redis.
* The related configurations for RedisSink can be found here: {@see io.odpf.firehose.config.RedisSinkConfig}
*/
public class RedisSink extends AbstractSink {

private RedisClient redisClient;

/**
* Instantiates a new Redis sink.
*
* @param firehoseInstrumentation the instrumentation
* @param sinkType the sink type
* @param redisClient the redis client
*/
public RedisSink(FirehoseInstrumentation firehoseInstrumentation, String sinkType, RedisClient redisClient) {
super(firehoseInstrumentation, sinkType);
this.redisClient = redisClient;
}

/**
* process messages before sending to redis.
*
* @param messages the messages
*/
@Override
protected void prepare(List<Message> messages) {
redisClient.prepare(messages);
}

/**
* Send data to redis.
*
* @return the list
* @throws NoResponseException the no response exception
*/
@Override
protected List<Message> execute() throws NoResponseException {
return redisClient.execute();
}

@Override
public void close() {
getFirehoseInstrumentation().logInfo("Redis connection closing");
redisClient.close();
}
}
51 changes: 51 additions & 0 deletions src/main/java/io/odpf/firehose/sink/redis/RedisSinkFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.odpf.firehose.sink.redis;


import io.odpf.depot.metrics.StatsDReporter;
import io.odpf.firehose.config.RedisSinkConfig;
import io.odpf.firehose.metrics.FirehoseInstrumentation;
import io.odpf.firehose.sink.AbstractSink;
import io.odpf.firehose.sink.redis.client.RedisClient;
import io.odpf.firehose.sink.redis.client.RedisClientFactory;
import io.odpf.stencil.client.StencilClient;
import org.aeonbits.owner.ConfigFactory;

import java.util.Map;

/**
* Factory class to create the RedisSink.
* <p>
* The firehose would reflectively instantiate this factory
* using the configurations supplied and invoke {@see #create(Map < String, String > configuration, StatsDClient statsDReporter, StencilClient client)}
* to obtain the RedisSink implementation.
*/
public class RedisSinkFactory {

/**
* Creates Redis sink.
*
* @param configuration the configuration
* @param statsDReporter the stats d reporter
* @param stencilClient the stencil client
* @return the abstract sink
*/
public static AbstractSink create(Map<String, String> configuration, StatsDReporter statsDReporter, StencilClient stencilClient) {
RedisSinkConfig redisSinkConfig = ConfigFactory.create(RedisSinkConfig.class, configuration);
FirehoseInstrumentation firehoseInstrumentation = new FirehoseInstrumentation(statsDReporter, RedisSinkFactory.class);
String redisConfig = String.format("\n\tredis.urls = %s\n\tredis.key.template = %s\n\tredis.sink.type = %s"
+ "\n\tredis.list.data.proto.index = %s\n\tredis.ttl.type = %s\n\tredis.ttl.value = %d",
redisSinkConfig.getSinkRedisUrls(),
redisSinkConfig.getSinkRedisKeyTemplate(),
redisSinkConfig.getSinkRedisDataType().toString(),
redisSinkConfig.getSinkRedisListDataProtoIndex(),
redisSinkConfig.getSinkRedisTtlType().toString(),
redisSinkConfig.getSinkRedisTtlValue());
firehoseInstrumentation.logDebug(redisConfig);
firehoseInstrumentation.logInfo("Redis server type = {}", redisSinkConfig.getSinkRedisDeploymentType());

RedisClientFactory redisClientFactory = new RedisClientFactory(statsDReporter, redisSinkConfig, stencilClient);
RedisClient client = redisClientFactory.getClient();
firehoseInstrumentation.logInfo("Connection to redis established successfully");
return new RedisSink(new FirehoseInstrumentation(statsDReporter, RedisSink.class), "redis", client);
}
}
Loading

0 comments on commit d9f014f

Please sign in to comment.