Skip to content

Commit 01de086

Browse files
feat: redis sink using depot (#193)
* feat: redis sink using depot repo * docs: update documentation for redis sink using depot * fix: delete redis sink data type, deployment type, ttl type enums and enum converters * refactor: import RedisSinkConfig from depot * fix: handle IOException for redis sink in SinkFactory * chore: depot version upgrade to 0.3.0 * test: fix tests in BlobStorageDlqWriterTest * checkstyle: fix checkstyle * fix: runtimeexception in SinkFactory * fix: remove unused import * chore: upgrade depot version * chore: versiob update Co-authored-by: lavkesh <[email protected]>
1 parent 3124615 commit 01de086

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+35
-2760
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ dependencies {
101101
implementation 'com.google.cloud:google-cloud-storage:1.114.0'
102102
implementation 'com.google.cloud:google-cloud-bigquery:1.115.0'
103103
implementation 'org.apache.logging.log4j:log4j-core:2.17.1'
104-
implementation group: 'io.odpf', name: 'depot', version: '0.2.1'
104+
implementation group: 'io.odpf', name: 'depot', version: '0.3.3'
105105
implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j'
106106

107107
testImplementation group: 'junit', name: 'junit', version: '4.11'

docs/docs/sinks/redis-sink.md

Lines changed: 13 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,80 +1,21 @@
1-
# Redis
1+
# Redis Sink
22

3-
A Redis sink Firehose \(`SINK_TYPE`=`redis`\) requires the following variables to be set along with Generic ones
3+
Redis Sink is implemented in Firehose using the Redis sink connector implementation in ODPF Depot. You can check out ODPF Depot Github repository [here](https://github.com/odpf/depot).
44

5-
### `SINK_REDIS_URLS`
5+
### Data Types
6+
Redis sink can be created in 3 different modes based on the value of [`SINK_REDIS_DATA_TYPE`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_data_type): HashSet, KeyValue or List
7+
- `Hashset`: For each message, an entry of the format `key : field : value` is generated and pushed to Redis. Field and value are generated on the basis of the config [`SINK_REDIS_HASHSET_FIELD_TO_COLUMN_MAPPING`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_hashset_field_to_column_mapping)
8+
- `List`: For each message entry of the format `key : value` is generated and pushed to Redis. Value is fetched for the Proto field name provided in the config [`SINK_REDIS_LIST_DATA_FIELD_NAME`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_list_data_field_name)
9+
- `KeyValue`: For each message entry of the format `key : value` is generated and pushed to Redis. Value is fetched for the proto field name provided in the config [`SINK_REDIS_KEY_VALUE_DATA_FIELD_NAME`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_key_value_data_field_name)
610

7-
REDIS instance hostname/IP address followed by its port.
11+
The `key` is picked up from a field in the message itself.
812

9-
- Example value: `localhos:6379,localhost:6380`
10-
- Type: `required`
13+
Limitation: Depot Redis sink only supports Key-Value, HashSet and List entries as of now.
1114

12-
### `SINK_REDIS_DATA_TYPE`
15+
### Configuration
1316

14-
To select whether you want to push your data as a HashSet or as a List.
17+
For Redis sink in Firehose we need to set first (`SINK_TYPE`=`redis`). There are some generic configs which are common across different sink types which need to be set which are mentioned in [generic.md](../advance/generic.md). Redis sink specific configs are mentioned in ODPF Depot repository. You can check out the Redis Sink configs [here](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md)
1518

16-
- Example value: `Hashset`
17-
- Type: `required`
18-
- Default value: `List`
1919

20-
### `SINK_REDIS_KEY_TEMPLATE`
21-
22-
The string that will act as the key for each Redis entry. This key can be configured as per the requirement, a constant or can extract value from each message and use that as the Redis key.
23-
24-
- Example value: `Service\_%%s,1`
25-
26-
This will take the value with index 1 from proto and create the Redis keys as per the template\
27-
28-
- Type: `required`
29-
30-
### `INPUT_SCHEMA_PROTO_TO_COLUMN_MAPPING`
31-
32-
This is the field that decides what all data will be stored in the HashSet for each message.
33-
34-
- Example value: `{"6":"customer_id", "2":"order_num"}`
35-
- Type: `required (For Hashset)`
36-
37-
### `SINK_REDIS_LIST_DATA_PROTO_INDEX`
38-
39-
This field decides what all data will be stored in the List for each message.
40-
41-
- Example value: `6`
42-
43-
This will get the value of the field with index 6 in your proto and push that to the Redis list with the corresponding keyTemplate\
44-
45-
- Type: `required (For List)`
46-
47-
### `SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX`
48-
49-
This field decides what data will be stored in the value part of key-value pair
50-
51-
- Example value: `6`
52-
53-
This will get the value of the field with index 6 in your proto and push that to the Redis as value with the corresponding keyTemplate\
54-
55-
- Type: `required (For KeyValue)`
56-
57-
### `SINK_REDIS_TTL_TYPE`
58-
59-
- Example value: `DURATION`
60-
- Type: `optional`
61-
- Default value: `DISABLE`
62-
- Choice of Redis TTL type.It can be:\
63-
- `DURATION`: After which the Key will be expired and removed from Redis \(UNIT- seconds\)\
64-
- `EXACT_TIME`: Precise UNIX timestamp after which the Key will be expired
65-
66-
### `SINK_REDIS_TTL_VALUE`
67-
68-
Redis TTL value in Unix Timestamp for `EXACT_TIME` TTL type, In Seconds for `DURATION` TTL type.
69-
70-
- Example value: `100000`
71-
- Type: `optional`
72-
- Default value: `0`
73-
74-
### `SINK_REDIS_DEPLOYMENT_TYPE`
75-
76-
The Redis deployment you are using. At present, we support `Standalone` and `Cluster` types.
77-
78-
- Example value: `Standalone`
79-
- Type: `required`
80-
- Default value: `Standalone`
20+
### Deployment Types
21+
Redis sink, as of now, supports two different Deployment Types `Standalone` and `Cluster`. This can be configured in the Depot environment variable `SINK_REDIS_DEPLOYMENT_TYPE`.

src/main/java/io/odpf/firehose/config/RedisSinkConfig.java

Lines changed: 0 additions & 43 deletions
This file was deleted.

src/main/java/io/odpf/firehose/config/converter/RedisSinkDataTypeConverter.java

Lines changed: 0 additions & 13 deletions
This file was deleted.

src/main/java/io/odpf/firehose/config/converter/RedisSinkDeploymentTypeConverter.java

Lines changed: 0 additions & 13 deletions
This file was deleted.

src/main/java/io/odpf/firehose/config/converter/RedisSinkTtlTypeConverter.java

Lines changed: 0 additions & 13 deletions
This file was deleted.

src/main/java/io/odpf/firehose/config/enums/RedisSinkDataType.java

Lines changed: 0 additions & 7 deletions
This file was deleted.

src/main/java/io/odpf/firehose/config/enums/RedisSinkDeploymentType.java

Lines changed: 0 additions & 6 deletions
This file was deleted.

src/main/java/io/odpf/firehose/config/enums/RedisSinkTtlType.java

Lines changed: 0 additions & 7 deletions
This file was deleted.

src/main/java/io/odpf/firehose/sink/SinkFactory.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
import io.odpf.depot.bigquery.BigQuerySink;
44
import io.odpf.depot.bigquery.BigQuerySinkFactory;
55
import io.odpf.depot.config.BigQuerySinkConfig;
6+
import io.odpf.depot.config.RedisSinkConfig;
67
import io.odpf.depot.log.LogSink;
78
import io.odpf.depot.log.LogSinkFactory;
89
import io.odpf.depot.metrics.StatsDReporter;
10+
import io.odpf.depot.redis.RedisSink;
11+
import io.odpf.depot.redis.RedisSinkFactory;
912
import io.odpf.firehose.config.KafkaConsumerConfig;
1013
import io.odpf.firehose.config.enums.SinkType;
1114
import io.odpf.firehose.consumer.kafka.OffsetManager;
@@ -20,7 +23,6 @@
2023
import io.odpf.firehose.sink.jdbc.JdbcSinkFactory;
2124
import io.odpf.firehose.sink.mongodb.MongoSinkFactory;
2225
import io.odpf.firehose.sink.prometheus.PromSinkFactory;
23-
import io.odpf.firehose.sink.redis.RedisSinkFactory;
2426
import io.odpf.stencil.client.StencilClient;
2527
import org.aeonbits.owner.ConfigFactory;
2628

@@ -34,6 +36,7 @@ public class SinkFactory {
3436
private final OffsetManager offsetManager;
3537
private BigQuerySinkFactory bigQuerySinkFactory;
3638
private LogSinkFactory logSinkFactory;
39+
private RedisSinkFactory redisSinkFactory;
3740
private final Map<String, String> config;
3841

3942
public SinkFactory(KafkaConsumerConfig kafkaConsumerConfig,
@@ -57,7 +60,6 @@ public void init() {
5760
case HTTP:
5861
case INFLUXDB:
5962
case ELASTICSEARCH:
60-
case REDIS:
6163
case GRPC:
6264
case PROMETHEUS:
6365
case BLOB:
@@ -67,6 +69,12 @@ public void init() {
6769
logSinkFactory = new LogSinkFactory(config, statsDReporter);
6870
logSinkFactory.init();
6971
return;
72+
case REDIS:
73+
redisSinkFactory = new RedisSinkFactory(
74+
ConfigFactory.create(RedisSinkConfig.class, config),
75+
statsDReporter);
76+
redisSinkFactory.init();
77+
return;
7078
case BIGQUERY:
7179
BigquerySinkUtils.addMetadataColumns(config);
7280
bigQuerySinkFactory = new BigQuerySinkFactory(
@@ -95,7 +103,7 @@ public Sink getSink() {
95103
case ELASTICSEARCH:
96104
return EsSinkFactory.create(config, statsDReporter, stencilClient);
97105
case REDIS:
98-
return RedisSinkFactory.create(config, statsDReporter, stencilClient);
106+
return new GenericOdpfSink(new FirehoseInstrumentation(statsDReporter, RedisSink.class), sinkType.name(), redisSinkFactory.create());
99107
case GRPC:
100108
return GrpcSinkFactory.create(config, statsDReporter, stencilClient);
101109
case PROMETHEUS:

0 commit comments

Comments
 (0)