diff --git a/README.md b/README.md index 5cbc5d6..d865541 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,13 @@ Specified event fields that do not match any topic value field name result in in - Only value fields are permitted. - Using anything other than a SET command is not supported at this stage. +### Expire + +[Expire](https://tile38.com/commands/expire/) functionality is available. The unit is seconds. How to use it: +``` +tile38.topic.foo=SET foo event.id FIELD route event.route POINT event.lat event.lon +tile38.topic.foo.expire=5 +``` ### Tombstone messages @@ -82,8 +89,9 @@ topics | Kafka topics read by the connector | comma-separated string | | high | flush.timeout.ms | Used for periodic flushing | int | 10000 | low | 1234 behavior.on.error | Error handling behavior | string | FAIL | medium | LOG or FAIL tile38.password | Tile38's password | string | "" | low | foo123 -tile38.topic.foo | Example command for 'foo' topic | string | | low | foo event.id FIELD route event.route POINT event.lat event.lon -tile38.topic.bar | Example command for 'bar' topic | string | | low | anything event.the_key POINT event.latitude event.longitude +tile38.topic.foo | Example command for 'foo' topic | string | | low | SET foo event.id FIELD route event.route POINT event.lat event.lon +tile38.topic.foo.expire | Expire time for 'foo' keyed id's | int | | low | 5 +tile38.topic.bar | Example command for 'bar' topic | string | | low | SET anything event.the_key POINT event.latitude event.longitude ***and*** | ***a*** | ***few*** | ***boring*** | ***connection*** | ***settings*** socket.tcp.no.delay.enabled | Use TCP-no-delay | boolean | false | low | socket.keep.alive.enabled | Enable keepalive | boolean | false | low | diff --git a/config/connector-sink.json b/config/connector-sink.json index ba565ba..e9382cb 100644 --- a/config/connector-sink.json +++ b/config/connector-sink.json @@ -11,6 +11,7 @@ "tile38.topic.stations": "SET station event.id POINT event.latitude event.longitude", "tile38.topic.trains": "SET train event.id FIELD route event.route POINT event.lat event.lon", + "tile38.topic.trains.expire": 10, "tile38.host": "tile38", "tile38.port": 9851, diff --git a/docker-compose.yml b/docker-compose.yml index 2eef224..8e72efa 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -89,7 +89,7 @@ services: CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR volumes: - - ./target/kafka-connect-tile38-sink-1.0.0-package/share/java/kafka-connect-tile38-sink:/usr/share/confluent-hub-components/kafka-connect-tile38-sink + - ./target/kafka-connect-tile38-sink-1.0.1-SNAPSHOT-package/share/java/kafka-connect-tile38-sink:/usr/share/confluent-hub-components/kafka-connect-tile38-sink - ./src/main/resources/datagen:/tmp links: diff --git a/pom.xml b/pom.xml index 22b8ed8..c79e5cc 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ guru.bonacci.tile38.connect kafka-connect-tile38-sink - 1.0.0 + 1.0.1-SNAPSHOT jar kafka-connect-tile38-sink diff --git a/src/main/java/guru/bonacci/kafka/connect/tile38/Constants.java b/src/main/java/guru/bonacci/kafka/connect/tile38/Constants.java index 0576852..547ef5f 100644 --- a/src/main/java/guru/bonacci/kafka/connect/tile38/Constants.java +++ b/src/main/java/guru/bonacci/kafka/connect/tile38/Constants.java @@ -24,4 +24,5 @@ public class Constants { public static final String SET_TERM = "SET"; public static final String COMMAND_PREFIX = "tile38.topic."; + public static final String EXPIRE_SUFFIX = ".expire"; } diff --git a/src/main/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTask.java b/src/main/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTask.java index 1e4c951..3040555 100644 --- a/src/main/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTask.java +++ b/src/main/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTask.java @@ -77,8 +77,8 @@ public void put(Collection records) { .build(); final RedisFuture[] futures = writer.write(recordStream); - wait(futures); + log.trace(futures.length + " commands executed"); } diff --git a/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandGenerator.java b/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandGenerator.java index 700c032..d03effb 100644 --- a/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandGenerator.java +++ b/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandGenerator.java @@ -33,6 +33,7 @@ import org.apache.kafka.connect.errors.DataException; import guru.bonacci.kafka.connect.tile38.writer.Tile38Record; +import io.lettuce.core.output.BooleanOutput; import io.lettuce.core.output.CommandOutput; import io.lettuce.core.output.IntegerOutput; import io.lettuce.core.output.StatusOutput; @@ -51,13 +52,23 @@ public class CommandGenerator { private final CommandTemplate cmd; /** - * Generates a Redis command based on a record's key and/or value. + * Combines a SET or DEL with a possible EXPIRE command + */ + public CommandResult compile(final Tile38Record record) { + return CommandResult.builder() + .setOrDel(setOrDelCmd(record)) + .expire(expireCmd(record)) + .build(); + } + + /** + * Generates a Redis command based on a record value. * Sending a command with Lettuce requires three arguments: * 1. CommandType: SET or DEL * 2. CommandOutput: Lettuce forces one to anticipate on the result data type * 3. CommandArgs: The actual command terms */ - public Triple, CommandArgs> compile(final Tile38Record record) { + private Triple, CommandArgs> setOrDelCmd(final Tile38Record record) { final Triple, CommandArgs> generatedCmd; final CommandArgs cmdArgs = new CommandArgs<>(UTF8); @@ -79,6 +90,29 @@ public class CommandGenerator { return generatedCmd; } + /** + * Generates a Redis command based on a record value. + * Sending a command with Lettuce requires three arguments: + * 1. CommandType: EXPIRE + * 2. CommandOutput: Lettuce forces one to anticipate on the result data type + * 3. CommandArgs: The actual command terms + */ + private Triple, CommandArgs> expireCmd(final Tile38Record record) { + // no expiration or a tombstone record + if (cmd.getExpirationInSec() == null || record.getValue() == null) { + return null; + } + + final Triple, CommandArgs> generatedCmd; + final CommandArgs cmdArgs = new CommandArgs<>(UTF8); + cmdArgs.add(cmd.getKey()); + cmdArgs.add(record.getId()); + cmdArgs.add(cmd.getExpirationInSec()); + generatedCmd = Triple.of(CommandType.EXPIRE, new BooleanOutput<>(UTF8), cmdArgs); + log.trace("Compiled to: {} {}", generatedCmd.getLeft(), cmdArgs.toCommandString()); + return generatedCmd; + } + /** * A command statement is created by * substituting the command terms with the records actual values. diff --git a/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandResult.java b/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandResult.java new file mode 100644 index 0000000..258fcb2 --- /dev/null +++ b/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandResult.java @@ -0,0 +1,24 @@ +package guru.bonacci.kafka.connect.tile38.commands; + +import java.util.stream.Stream; + +import org.apache.commons.lang3.tuple.Triple; + +import io.lettuce.core.output.CommandOutput; +import io.lettuce.core.protocol.CommandArgs; +import io.lettuce.core.protocol.CommandType; +import lombok.Builder; +import lombok.Getter; + +@Getter // for testing +@Builder +public class CommandResult { + + private Triple, CommandArgs> setOrDel; + + private Triple, CommandArgs> expire; + + public Stream, CommandArgs>> asStream() { + return Stream.of(setOrDel, expire); + } +} diff --git a/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplate.java b/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplate.java index c47fe01..5ef1fe2 100644 --- a/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplate.java +++ b/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplate.java @@ -29,6 +29,7 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.Setter; import lombok.ToString; /** @@ -52,6 +53,11 @@ public class CommandTemplate { // These terms are substituted in dynamic command generation // example: {event.id, event.rou, event.lat, event.lon} private final Set terms; + + // Expiration time in seconds. + // example: 5 + @Setter private Integer expirationInSec = null; + /** * Command format: diff --git a/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplates.java b/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplates.java index 83ff1f7..54a9e16 100644 --- a/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplates.java +++ b/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplates.java @@ -15,13 +15,24 @@ */ package guru.bonacci.kafka.connect.tile38.commands; +import static com.google.common.collect.Maps.filterKeys; import static com.google.common.collect.Maps.immutableEntry; +import static guru.bonacci.kafka.connect.tile38.Constants.EXPIRE_SUFFIX; +import static java.lang.Integer.valueOf; +import static java.lang.String.format; import static java.util.stream.Collectors.toMap; import static lombok.AccessLevel.PRIVATE; +import static org.apache.commons.lang3.StringUtils.removeEnd; import java.util.Map; +import java.util.Map.Entry; import java.util.stream.Stream; +import org.apache.kafka.common.config.ConfigException; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; + import guru.bonacci.kafka.connect.tile38.config.TopicsConfig; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -45,17 +56,35 @@ public CommandTemplate templateForTopic(String topic) { } public static CommandTemplates from(TopicsConfig topics) { - Map cmdsByTopic = topics.getCmdsByTopic(); + final Map cmdsByTopic = topics.getCmdsByTopic(); log.info("Creating command template data structure for {}", cmdsByTopic); + // split the map in commands and expiration configs + final Predicate isExpire = key -> key.endsWith(EXPIRE_SUFFIX); + final Map expires = filterKeys(cmdsByTopic, Predicates.and(isExpire)); + final Map cmds = filterKeys(cmdsByTopic, Predicates.not(isExpire)); + + // create templates from command strings // in -> key: topic name - value: command string - Map cmdTemplates = - cmdsByTopic.entrySet().stream().map(cmdForTopic -> { + final Map cmdTemplates = + cmds.entrySet().stream().map(cmdForTopic -> { CommandTemplate cmd = CommandTemplate.from(cmdForTopic.getValue()); return immutableEntry(cmdForTopic.getKey(), cmd); }) .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)); + // add the expiration times to the templates + for (Entry expire : expires.entrySet()) { + String expireTopicKey = removeEnd(expire.getKey(), EXPIRE_SUFFIX); + try { + cmdTemplates.computeIfAbsent(expireTopicKey, s -> { throw new IllegalArgumentException(); }); + cmdTemplates.get(expireTopicKey).setExpirationInSec(valueOf(expire.getValue())); + } catch (IllegalArgumentException e) { + throw new ConfigException( + format("Error expire config for topic '%s'. Invalid value '%s'. Check the docs!", expireTopicKey, expire.getValue())); + } + } + // out -> key: topic name - value: command template return new CommandTemplates(cmdTemplates); } diff --git a/src/main/java/guru/bonacci/kafka/connect/tile38/config/Tile38SinkConnectorConfig.java b/src/main/java/guru/bonacci/kafka/connect/tile38/config/Tile38SinkConnectorConfig.java index c3e3e43..3fec2d6 100644 --- a/src/main/java/guru/bonacci/kafka/connect/tile38/config/Tile38SinkConnectorConfig.java +++ b/src/main/java/guru/bonacci/kafka/connect/tile38/config/Tile38SinkConnectorConfig.java @@ -16,6 +16,7 @@ package guru.bonacci.kafka.connect.tile38.config; import static com.google.common.collect.Sets.symmetricDifference; +import static guru.bonacci.kafka.connect.tile38.Constants.EXPIRE_SUFFIX; import static java.lang.String.format; import static java.util.Arrays.stream; import static java.util.Collections.emptySet; @@ -98,7 +99,8 @@ private void validateConfiguredTopics(Map props) { : emptySet(); Set configuredTopics = this.topicsConfig.configuredTopics(); - + configuredTopics.removeIf(topicName -> topicName.endsWith(EXPIRE_SUFFIX)); + if (!symmetricDifference(topics, configuredTopics).isEmpty()) { throw new ConfigException(format("There is a mismatch between topics defined into the property 'topics' %s and configured topics %s", topics, configuredTopics)); diff --git a/src/main/java/guru/bonacci/kafka/connect/tile38/writer/Tile38Writer.java b/src/main/java/guru/bonacci/kafka/connect/tile38/writer/Tile38Writer.java index 2562d39..a4247ab 100644 --- a/src/main/java/guru/bonacci/kafka/connect/tile38/writer/Tile38Writer.java +++ b/src/main/java/guru/bonacci/kafka/connect/tile38/writer/Tile38Writer.java @@ -22,12 +22,14 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import java.time.Duration; +import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.stream.Stream; import org.apache.kafka.connect.errors.ConnectException; import guru.bonacci.kafka.connect.tile38.commands.CommandGenerators; +import guru.bonacci.kafka.connect.tile38.commands.CommandResult; import guru.bonacci.kafka.connect.tile38.commands.CommandTemplates; import guru.bonacci.kafka.connect.tile38.config.Tile38SinkConnectorConfig; import io.lettuce.core.ClientOptions; @@ -93,7 +95,9 @@ public Tile38Writer(Tile38SinkConnectorConfig config) { public RedisFuture[] write(Stream records) { final RedisFuture[] futures = records - .map(event -> cmds.generatorForTopic(event.getTopic()).compile(event)) // create command + .map(event -> cmds.generatorForTopic(event.getTopic()).compile(event)) // create command(s) + .flatMap(CommandResult::asStream) + .filter(Objects::nonNull) // non-expire commands are null .map(cmd -> async.dispatch(cmd.getLeft(), cmd.getMiddle(), cmd.getRight())) // execute command .toArray(RedisFuture[]::new); // collect futures diff --git a/src/test/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTaskIT.java b/src/test/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTaskIT.java index b4eb135..3ae7a1e 100644 --- a/src/test/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTaskIT.java +++ b/src/test/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTaskIT.java @@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.not; import java.io.File; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Stream; @@ -34,6 +35,7 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; +import org.junit.Assert; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -196,6 +198,7 @@ public void validWrites(String route, String lat, String lon) { assertThat(resp, is(equalTo(route))); } + @Test public void nestedWrite() { final String lat = "4.56", lon = "7.89"; final String topic = "foo"; @@ -224,6 +227,34 @@ public void nestedWrite() { assertThat(parser.parse(resp), is(equalTo(parser.parse(String.format(RESULT_STRING, lon, lat))))); } + @Test + public void expireMany() throws InterruptedException { + final String topic = "foo"; + Map config = Maps.newHashMap(provideConfig(topic)); + config.put("tile38.topic.foo.expire", "2"); + this.task.start(config); + + final String id = "fooid"; + Schema schema = getRouteSchema(); + + final List records = new ArrayList<>(); + for (int i=0; i<100; i++) { + Struct value = new Struct(schema).put("id", id+i).put("route", ""+i).put("lat", ""+1.01*i).put("lon", ""+-1.01*i); + records.add(write(topic, Schema.STRING_SCHEMA, id, schema, value)); + } + this.task.put(records); + + // sleep longer than x seconds + Thread.sleep(5000); + + RedisCommands sync = this.task.getWriter().getClient().connect().sync(); + for (int i=0; i<100; i++) { + CommandArgs get = getFooCommand(id); + String resp = sync.dispatch(CommandType.GET, new StatusOutput<>(StringCodec.UTF8), get); + Assert.assertNull(resp); + } + } + private static Stream provideForIgnoredFieldWrite() { return Stream.of( Arguments.of("0", "0.1", "1.0"), @@ -320,4 +351,4 @@ private String executeWithFieldsCommand(RedisCommands sync, Comm cmd.add("WITHFIELDS"); return sync.dispatch(CommandType.GET, new StatusOutput<>(StringCodec.UTF8), cmd); } -} +} \ No newline at end of file diff --git a/src/test/java/guru/bonacci/kafka/connect/tile38/commands/CommandGeneratorTests.java b/src/test/java/guru/bonacci/kafka/connect/tile38/commands/CommandGeneratorTests.java index 0956d68..b7fb365 100644 --- a/src/test/java/guru/bonacci/kafka/connect/tile38/commands/CommandGeneratorTests.java +++ b/src/test/java/guru/bonacci/kafka/connect/tile38/commands/CommandGeneratorTests.java @@ -97,7 +97,7 @@ void compileToSET() { Tile38Record internalRecord = new RecordConverter().convert(rec); Triple, CommandArgs> result = CommandGenerator.from( - CommandTemplate.from(cmdString)).compile(internalRecord); + CommandTemplate.from(cmdString)).compile(internalRecord).getSetOrDel(); assertThat(result.getLeft(), is(equalTo(CommandType.SET))); assertThat(result.getRight().toCommandString(), is(equalTo("bla some id is to be sub nest.event.foo and nest.event.bar more"))); @@ -123,7 +123,7 @@ void tombstoneToDELETE() { Tile38Record internalRecord = new RecordConverter().convert(rec); Triple, CommandArgs> result = CommandGenerator.from( - CommandTemplate.from(cmdString)).compile(internalRecord); + CommandTemplate.from(cmdString)).compile(internalRecord).getSetOrDel(); assertThat(result.getLeft(), is(equalTo(CommandType.DEL))); assertThat(result.getRight().toCommandString(), is(equalTo("bla thekey"))); @@ -178,7 +178,7 @@ void compileWithRepeatingTermNames() { Tile38Record internalRecord = new RecordConverter().convert(rec); Triple, CommandArgs> result = CommandGenerator.from( - CommandTemplate.from(cmdString)).compile(internalRecord); + CommandTemplate.from(cmdString)).compile(internalRecord).getSetOrDel(); assertThat(result.getLeft(), is(equalTo(CommandType.SET))); assertThat(result.getRight().toCommandString(), is(equalTo("foo one POINT 12.34 56.78"))); diff --git a/src/test/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplatesTests.java b/src/test/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplatesTests.java index 418ccdc..fb83f73 100644 --- a/src/test/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplatesTests.java +++ b/src/test/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplatesTests.java @@ -19,16 +19,24 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertNull; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigException; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import com.google.common.collect.Maps; + +import guru.bonacci.kafka.connect.tile38.config.TopicsConfig; import guru.bonacci.kafka.connect.tile38.config.TopicsConfigTests; public class CommandTemplatesTests { @Test - void commandArgs() { + void onlyCommandArgs() { CommandTemplates cmds = CommandTemplates.from(TopicsConfigTests.provideTopics()); assertThat(cmds.configuredTopics().count(), is(2l)); @@ -45,4 +53,48 @@ void commandArgs() { assertThat(barCmd.getTerms(), hasItem("event.bar")); assertThat(barCmd.getTerms(), hasItem("event.there")); } + + @Test + void commandArgsWithExpire() { + Map config = Maps.newHashMap(TopicsConfigTests.provideConfig()); + config.put("tile38.topic.foo.expire", "5"); + + CommandTemplates cmds = CommandTemplates.from(TopicsConfig.from(config)); + + assertThat(cmds.configuredTopics().count(), is(2l)); + + CommandTemplate fooCmd = cmds.templateForTopic("foo"); + assertThat(fooCmd.getCmdString(), is("foo event.query event.here")); + assertThat(fooCmd.getTerms(), hasSize(2)); + assertThat(fooCmd.getTerms(), hasItem("event.here")); + assertThat(fooCmd.getTerms(), hasItem("event.query")); + assertThat(fooCmd.getExpirationInSec(), is(5)); + + CommandTemplate barCmd = cmds.templateForTopic("bar"); + assertThat(barCmd.getCmdString(), is("bar event.bar query here event.there")); + assertThat(barCmd.getTerms(), hasSize(2)); + assertThat(barCmd.getTerms(), hasItem("event.bar")); + assertThat(barCmd.getTerms(), hasItem("event.there")); + assertNull(barCmd.getExpirationInSec()); + } + + @Test + void commandArgsInvalidExpire() { + Map config = Maps.newHashMap(TopicsConfigTests.provideConfig()); + config.put("tile38.topic.foooooo.expire", "5"); + + Assertions.assertThrows(ConfigException.class, () -> { + CommandTemplates.from(TopicsConfig.from(config)); + }); + } + + @Test + void commandArgsInvalidExpireNumber() { + Map config = Maps.newHashMap(TopicsConfigTests.provideConfig()); + config.put("tile38.topic.foo.expire", "abc"); + + Assertions.assertThrows(ConfigException.class, () -> { + CommandTemplates.from(TopicsConfig.from(config)); + }); + } } diff --git a/src/test/java/guru/bonacci/kafka/connect/tile38/config/TopicsConfigTests.java b/src/test/java/guru/bonacci/kafka/connect/tile38/config/TopicsConfigTests.java index 8b640a6..deeefbe 100644 --- a/src/test/java/guru/bonacci/kafka/connect/tile38/config/TopicsConfigTests.java +++ b/src/test/java/guru/bonacci/kafka/connect/tile38/config/TopicsConfigTests.java @@ -27,17 +27,19 @@ import com.google.common.collect.ImmutableMap; -import guru.bonacci.kafka.connect.tile38.config.TopicsConfig; - public class TopicsConfigTests { - public static TopicsConfig provideTopics() { - return TopicsConfig.from(ImmutableMap.of( + public static Map provideConfig() { + return ImmutableMap.of( "key.converter", "org.apache.kafka.connect.storage.StringConverter", "value.converter", "org.apache.kafka.connect.storage.StringConverter", "topics", "foo,bar", "tile38.topic.foo", "SET foo event.query event.here", - "tile38.topic.bar", "set bar event.bar query here event.there")); + "tile38.topic.bar", "set bar event.bar query here event.there"); + } + + public static TopicsConfig provideTopics() { + return TopicsConfig.from(provideConfig()); } @Test