Skip to content

Commit

Permalink
Merge pull request #3 from LeonardoBonacci/expire
Browse files Browse the repository at this point in the history
Expire
  • Loading branch information
LeonardoBonacci committed Jun 12, 2020
2 parents fd6a831 + 748b869 commit d4fa374
Show file tree
Hide file tree
Showing 16 changed files with 216 additions and 22 deletions.
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ Specified event fields that do not match any topic value field name result in in
- Only value fields are permitted.
- Using anything other than a SET command is not supported at this stage.

### Expire

[Expire](https://tile38.com/commands/expire/) functionality is available. The unit is seconds. How to use it:
```
tile38.topic.foo=SET foo event.id FIELD route event.route POINT event.lat event.lon
tile38.topic.foo.expire=5
```

### Tombstone messages

Expand All @@ -82,8 +89,9 @@ topics | Kafka topics read by the connector | comma-separated string | | high |
flush.timeout.ms | Used for periodic flushing | int | 10000 | low | 1234
behavior.on.error | Error handling behavior | string | FAIL | medium | LOG or FAIL
tile38.password | Tile38's password | string | "" | low | foo123
tile38.topic.foo | Example command for 'foo' topic | string | | low | foo event.id FIELD route event.route POINT event.lat event.lon
tile38.topic.bar | Example command for 'bar' topic | string | | low | anything event.the_key POINT event.latitude event.longitude
tile38.topic.foo | Example command for 'foo' topic | string | | low | SET foo event.id FIELD route event.route POINT event.lat event.lon
tile38.topic.foo.expire | Expire time for 'foo' keyed id's | int | | low | 5
tile38.topic.bar | Example command for 'bar' topic | string | | low | SET anything event.the_key POINT event.latitude event.longitude
***and*** | ***a*** | ***few*** | ***boring*** | ***connection*** | ***settings***
socket.tcp.no.delay.enabled | Use TCP-no-delay | boolean | false | low |
socket.keep.alive.enabled | Enable keepalive | boolean | false | low |
Expand Down
1 change: 1 addition & 0 deletions config/connector-sink.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

"tile38.topic.stations": "SET station event.id POINT event.latitude event.longitude",
"tile38.topic.trains": "SET train event.id FIELD route event.route POINT event.lat event.lon",
"tile38.topic.trains.expire": 10,

"tile38.host": "tile38",
"tile38.port": 9851,
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ services:
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
volumes:
- ./target/kafka-connect-tile38-sink-1.0.0-package/share/java/kafka-connect-tile38-sink:/usr/share/confluent-hub-components/kafka-connect-tile38-sink
- ./target/kafka-connect-tile38-sink-1.0.1-SNAPSHOT-package/share/java/kafka-connect-tile38-sink:/usr/share/confluent-hub-components/kafka-connect-tile38-sink
- ./src/main/resources/datagen:/tmp

links:
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<groupId>guru.bonacci.tile38.connect</groupId>
<artifactId>kafka-connect-tile38-sink</artifactId>
<version>1.0.0</version>
<version>1.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>kafka-connect-tile38-sink</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ public class Constants {
public static final String SET_TERM = "SET";

public static final String COMMAND_PREFIX = "tile38.topic.";
public static final String EXPIRE_SUFFIX = ".expire";
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ public void put(Collection<SinkRecord> records) {
.build();

final RedisFuture<?>[] futures = writer.write(recordStream);

wait(futures);

log.trace(futures.length + " commands executed");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.kafka.connect.errors.DataException;

import guru.bonacci.kafka.connect.tile38.writer.Tile38Record;
import io.lettuce.core.output.BooleanOutput;
import io.lettuce.core.output.CommandOutput;
import io.lettuce.core.output.IntegerOutput;
import io.lettuce.core.output.StatusOutput;
Expand All @@ -51,13 +52,23 @@ public class CommandGenerator {
private final CommandTemplate cmd;

/**
* Generates a Redis command based on a record's key and/or value.
* Combines a SET or DEL with a possible EXPIRE command
*/
public CommandResult compile(final Tile38Record record) {
return CommandResult.builder()
.setOrDel(setOrDelCmd(record))
.expire(expireCmd(record))
.build();
}

/**
* Generates a Redis command based on a record value.
* Sending a command with Lettuce requires three arguments:
* 1. CommandType: SET or DEL
* 2. CommandOutput: Lettuce forces one to anticipate on the result data type
* 3. CommandArgs: The actual command terms
*/
public Triple<CommandType, CommandOutput<String, String, ?>, CommandArgs<String, String>> compile(final Tile38Record record) {
private Triple<CommandType, CommandOutput<String, String, ?>, CommandArgs<String, String>> setOrDelCmd(final Tile38Record record) {
final Triple<CommandType, CommandOutput<String, String, ?>, CommandArgs<String, String>> generatedCmd;
final CommandArgs<String, String> cmdArgs = new CommandArgs<>(UTF8);

Expand All @@ -79,6 +90,29 @@ public class CommandGenerator {
return generatedCmd;
}

/**
* Generates a Redis command based on a record value.
* Sending a command with Lettuce requires three arguments:
* 1. CommandType: EXPIRE
* 2. CommandOutput: Lettuce forces one to anticipate on the result data type
* 3. CommandArgs: The actual command terms
*/
private Triple<CommandType, CommandOutput<String, String, ?>, CommandArgs<String, String>> expireCmd(final Tile38Record record) {
// no expiration or a tombstone record
if (cmd.getExpirationInSec() == null || record.getValue() == null) {
return null;
}

final Triple<CommandType, CommandOutput<String, String, ?>, CommandArgs<String, String>> generatedCmd;
final CommandArgs<String, String> cmdArgs = new CommandArgs<>(UTF8);
cmdArgs.add(cmd.getKey());
cmdArgs.add(record.getId());
cmdArgs.add(cmd.getExpirationInSec());
generatedCmd = Triple.of(CommandType.EXPIRE, new BooleanOutput<>(UTF8), cmdArgs);
log.trace("Compiled to: {} {}", generatedCmd.getLeft(), cmdArgs.toCommandString());
return generatedCmd;
}

/**
* A command statement is created by
* substituting the command terms with the records actual values.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package guru.bonacci.kafka.connect.tile38.commands;

import java.util.stream.Stream;

import org.apache.commons.lang3.tuple.Triple;

import io.lettuce.core.output.CommandOutput;
import io.lettuce.core.protocol.CommandArgs;
import io.lettuce.core.protocol.CommandType;
import lombok.Builder;
import lombok.Getter;

@Getter // for testing
@Builder
public class CommandResult {

private Triple<CommandType, CommandOutput<String, String, ?>, CommandArgs<String, String>> setOrDel;

private Triple<CommandType, CommandOutput<String, String, ?>, CommandArgs<String, String>> expire;

public Stream<Triple<CommandType, CommandOutput<String, String, ?>, CommandArgs<String, String>>> asStream() {
return Stream.of(setOrDel, expire);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;

/**
Expand All @@ -52,6 +53,11 @@ public class CommandTemplate {
// These terms are substituted in dynamic command generation
// example: {event.id, event.rou, event.lat, event.lon}
private final Set<String> terms;

// Expiration time in seconds.
// example: 5
@Setter private Integer expirationInSec = null;


/**
* Command format:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,24 @@
*/
package guru.bonacci.kafka.connect.tile38.commands;

import static com.google.common.collect.Maps.filterKeys;
import static com.google.common.collect.Maps.immutableEntry;
import static guru.bonacci.kafka.connect.tile38.Constants.EXPIRE_SUFFIX;
import static java.lang.Integer.valueOf;
import static java.lang.String.format;
import static java.util.stream.Collectors.toMap;
import static lombok.AccessLevel.PRIVATE;
import static org.apache.commons.lang3.StringUtils.removeEnd;

import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Stream;

import org.apache.kafka.common.config.ConfigException;

import com.google.common.base.Predicate;
import com.google.common.base.Predicates;

import guru.bonacci.kafka.connect.tile38.config.TopicsConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -45,17 +56,35 @@ public CommandTemplate templateForTopic(String topic) {
}

public static CommandTemplates from(TopicsConfig topics) {
Map<String, String> cmdsByTopic = topics.getCmdsByTopic();
final Map<String, String> cmdsByTopic = topics.getCmdsByTopic();
log.info("Creating command template data structure for {}", cmdsByTopic);

// split the map in commands and expiration configs
final Predicate<String> isExpire = key -> key.endsWith(EXPIRE_SUFFIX);
final Map<String, String> expires = filterKeys(cmdsByTopic, Predicates.and(isExpire));
final Map<String, String> cmds = filterKeys(cmdsByTopic, Predicates.not(isExpire));

// create templates from command strings
// in -> key: topic name - value: command string
Map<String, CommandTemplate> cmdTemplates =
cmdsByTopic.entrySet().stream().map(cmdForTopic -> {
final Map<String, CommandTemplate> cmdTemplates =
cmds.entrySet().stream().map(cmdForTopic -> {
CommandTemplate cmd = CommandTemplate.from(cmdForTopic.getValue());
return immutableEntry(cmdForTopic.getKey(), cmd);
})
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue));

// add the expiration times to the templates
for (Entry<String, String> expire : expires.entrySet()) {
String expireTopicKey = removeEnd(expire.getKey(), EXPIRE_SUFFIX);
try {
cmdTemplates.computeIfAbsent(expireTopicKey, s -> { throw new IllegalArgumentException(); });
cmdTemplates.get(expireTopicKey).setExpirationInSec(valueOf(expire.getValue()));
} catch (IllegalArgumentException e) {
throw new ConfigException(
format("Error expire config for topic '%s'. Invalid value '%s'. Check the docs!", expireTopicKey, expire.getValue()));
}
}

// out -> key: topic name - value: command template
return new CommandTemplates(cmdTemplates);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package guru.bonacci.kafka.connect.tile38.config;

import static com.google.common.collect.Sets.symmetricDifference;
import static guru.bonacci.kafka.connect.tile38.Constants.EXPIRE_SUFFIX;
import static java.lang.String.format;
import static java.util.Arrays.stream;
import static java.util.Collections.emptySet;
Expand Down Expand Up @@ -98,7 +99,8 @@ private void validateConfiguredTopics(Map<String, String> props) {
: emptySet();

Set<String> configuredTopics = this.topicsConfig.configuredTopics();

configuredTopics.removeIf(topicName -> topicName.endsWith(EXPIRE_SUFFIX));

if (!symmetricDifference(topics, configuredTopics).isEmpty()) {
throw new ConfigException(format("There is a mismatch between topics defined into the property 'topics' %s and configured topics %s",
topics, configuredTopics));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;

import org.apache.kafka.connect.errors.ConnectException;

import guru.bonacci.kafka.connect.tile38.commands.CommandGenerators;
import guru.bonacci.kafka.connect.tile38.commands.CommandResult;
import guru.bonacci.kafka.connect.tile38.commands.CommandTemplates;
import guru.bonacci.kafka.connect.tile38.config.Tile38SinkConnectorConfig;
import io.lettuce.core.ClientOptions;
Expand Down Expand Up @@ -93,7 +95,9 @@ public Tile38Writer(Tile38SinkConnectorConfig config) {

public RedisFuture<?>[] write(Stream<Tile38Record> records) {
final RedisFuture<?>[] futures = records
.map(event -> cmds.generatorForTopic(event.getTopic()).compile(event)) // create command
.map(event -> cmds.generatorForTopic(event.getTopic()).compile(event)) // create command(s)
.flatMap(CommandResult::asStream)
.filter(Objects::nonNull) // non-expire commands are null
.map(cmd -> async.dispatch(cmd.getLeft(), cmd.getMiddle(), cmd.getRight())) // execute command
.toArray(RedisFuture[]::new); // collect futures

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.hamcrest.Matchers.not;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
Expand All @@ -34,6 +35,7 @@
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -196,6 +198,7 @@ public void validWrites(String route, String lat, String lon) {
assertThat(resp, is(equalTo(route)));
}

@Test
public void nestedWrite() {
final String lat = "4.56", lon = "7.89";
final String topic = "foo";
Expand Down Expand Up @@ -224,6 +227,34 @@ public void nestedWrite() {
assertThat(parser.parse(resp), is(equalTo(parser.parse(String.format(RESULT_STRING, lon, lat)))));
}

@Test
public void expireMany() throws InterruptedException {
final String topic = "foo";
Map<String, String> config = Maps.newHashMap(provideConfig(topic));
config.put("tile38.topic.foo.expire", "2");
this.task.start(config);

final String id = "fooid";
Schema schema = getRouteSchema();

final List<SinkRecord> records = new ArrayList<>();
for (int i=0; i<100; i++) {
Struct value = new Struct(schema).put("id", id+i).put("route", ""+i).put("lat", ""+1.01*i).put("lon", ""+-1.01*i);
records.add(write(topic, Schema.STRING_SCHEMA, id, schema, value));
}
this.task.put(records);

// sleep longer than x seconds
Thread.sleep(5000);

RedisCommands<String, String> sync = this.task.getWriter().getClient().connect().sync();
for (int i=0; i<100; i++) {
CommandArgs<String, String> get = getFooCommand(id);
String resp = sync.dispatch(CommandType.GET, new StatusOutput<>(StringCodec.UTF8), get);
Assert.assertNull(resp);
}
}

private static Stream<Arguments> provideForIgnoredFieldWrite() {
return Stream.of(
Arguments.of("0", "0.1", "1.0"),
Expand Down Expand Up @@ -320,4 +351,4 @@ private String executeWithFieldsCommand(RedisCommands<String, String> sync, Comm
cmd.add("WITHFIELDS");
return sync.dispatch(CommandType.GET, new StatusOutput<>(StringCodec.UTF8), cmd);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ void compileToSET() {
Tile38Record internalRecord = new RecordConverter().convert(rec);

Triple<CommandType, CommandOutput<String, String, ?>, CommandArgs<String, String>> result = CommandGenerator.from(
CommandTemplate.from(cmdString)).compile(internalRecord);
CommandTemplate.from(cmdString)).compile(internalRecord).getSetOrDel();

assertThat(result.getLeft(), is(equalTo(CommandType.SET)));
assertThat(result.getRight().toCommandString(), is(equalTo("bla some id is to be sub nest.event.foo and nest.event.bar more")));
Expand All @@ -123,7 +123,7 @@ void tombstoneToDELETE() {
Tile38Record internalRecord = new RecordConverter().convert(rec);

Triple<CommandType, CommandOutput<String, String, ?>, CommandArgs<String, String>> result = CommandGenerator.from(
CommandTemplate.from(cmdString)).compile(internalRecord);
CommandTemplate.from(cmdString)).compile(internalRecord).getSetOrDel();

assertThat(result.getLeft(), is(equalTo(CommandType.DEL)));
assertThat(result.getRight().toCommandString(), is(equalTo("bla thekey")));
Expand Down Expand Up @@ -178,7 +178,7 @@ void compileWithRepeatingTermNames() {
Tile38Record internalRecord = new RecordConverter().convert(rec);

Triple<CommandType, CommandOutput<String, String, ?>, CommandArgs<String, String>> result = CommandGenerator.from(
CommandTemplate.from(cmdString)).compile(internalRecord);
CommandTemplate.from(cmdString)).compile(internalRecord).getSetOrDel();

assertThat(result.getLeft(), is(equalTo(CommandType.SET)));
assertThat(result.getRight().toCommandString(), is(equalTo("foo one POINT 12.34 56.78")));
Expand Down
Loading

0 comments on commit d4fa374

Please sign in to comment.