From 85279336e6ffff6c9bf508d7312853c7cd6997e0 Mon Sep 17 00:00:00 2001 From: jeffreyvanhelden Date: Wed, 10 Jun 2020 15:39:28 +1200 Subject: [PATCH 1/8] first hardcoded version --- docker-compose.yml | 4 +-- pom.xml | 2 +- .../kafka/connect/tile38/Tile38SinkTask.java | 5 +++- .../connect/tile38/writer/Tile38Writer.java | 27 ++++++++++++++++--- 4 files changed, 30 insertions(+), 8 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 6f29d88..8e72efa 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,7 +7,7 @@ services: - "9851:9851" volumes: # for password config - - ./tile38/data:/data + - ./tile38/data:/data zookeeper: image: confluentinc/cp-zookeeper:5.5.0 @@ -89,7 +89,7 @@ services: CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR volumes: - - ./target/kafka-connect-tile38-sink-1.0-SNAPSHOT-package/share/java/kafka-connect-tile38-sink:/usr/share/confluent-hub-components/kafka-connect-tile38-sink + - ./target/kafka-connect-tile38-sink-1.0.1-SNAPSHOT-package/share/java/kafka-connect-tile38-sink:/usr/share/confluent-hub-components/kafka-connect-tile38-sink - ./src/main/resources/datagen:/tmp links: diff --git a/pom.xml b/pom.xml index 22b8ed8..c79e5cc 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ guru.bonacci.tile38.connect kafka-connect-tile38-sink - 1.0.0 + 1.0.1-SNAPSHOT jar kafka-connect-tile38-sink diff --git a/src/main/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTask.java b/src/main/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTask.java index 1e4c951..a3aa0c6 100644 --- a/src/main/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTask.java +++ b/src/main/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTask.java @@ -20,7 +20,9 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import java.util.Collection; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -76,7 +78,8 @@ public void put(Collection records) { .withSinkRecords(records) .build(); - final RedisFuture[] futures = writer.write(recordStream); + List rec = recordStream.collect(Collectors.toList()); + final RedisFuture[] futures = writer.write(rec); wait(futures); log.trace(futures.length + " commands executed"); diff --git a/src/main/java/guru/bonacci/kafka/connect/tile38/writer/Tile38Writer.java b/src/main/java/guru/bonacci/kafka/connect/tile38/writer/Tile38Writer.java index 2562d39..e7a41fc 100644 --- a/src/main/java/guru/bonacci/kafka/connect/tile38/writer/Tile38Writer.java +++ b/src/main/java/guru/bonacci/kafka/connect/tile38/writer/Tile38Writer.java @@ -22,9 +22,11 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import java.time.Duration; +import java.util.List; import java.util.concurrent.ExecutionException; -import java.util.stream.Stream; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.tuple.Triple; import org.apache.kafka.connect.errors.ConnectException; import guru.bonacci.kafka.connect.tile38.commands.CommandGenerators; @@ -35,6 +37,7 @@ import io.lettuce.core.RedisFuture; import io.lettuce.core.SocketOptions; import io.lettuce.core.api.async.RedisAsyncCommands; +import io.lettuce.core.output.CommandOutput; import io.lettuce.core.output.StatusOutput; import io.lettuce.core.protocol.CommandArgs; import io.lettuce.core.protocol.CommandType; @@ -91,17 +94,33 @@ public Tile38Writer(Tile38SinkConnectorConfig config) { } - public RedisFuture[] write(Stream records) { - final RedisFuture[] futures = records + public RedisFuture[] write(List records) { + final RedisFuture[] futures = records.stream() .map(event -> cmds.generatorForTopic(event.getTopic()).compile(event)) // create command .map(cmd -> async.dispatch(cmd.getLeft(), cmd.getMiddle(), cmd.getRight())) // execute command .toArray(RedisFuture[]::new); // collect futures + final RedisFuture[] moreFutures = records.stream() + .map(event -> expire(event)) + .map(cmd -> async.dispatch(cmd.getLeft(), cmd.getMiddle(), cmd.getRight())) + .toArray(RedisFuture[]::new); + // async batch insert async.flushCommands(); - return futures; + return ArrayUtils.addAll(futures, moreFutures); } + Triple, CommandArgs> expire(final Tile38Record record) { + final Triple, CommandArgs> generatedCmd; + final CommandArgs cmdArgs = new CommandArgs<>(UTF8); + cmdArgs.add("train"); //key + cmdArgs.add(record.getId()); //id + cmdArgs.add(5); //seconds + generatedCmd = Triple.of(CommandType.EXPIRE, new StatusOutput<>(UTF8), cmdArgs); + log.error("Compiled to: {} {}", generatedCmd.getLeft(), cmdArgs.toCommandString()); + return generatedCmd; + } + public void close() { client.shutdown(); } From 8652b75556004be56511e5f510dc60d0c505bc6a Mon Sep 17 00:00:00 2001 From: jeffreyvanhelden Date: Thu, 11 Jun 2020 08:34:38 +1200 Subject: [PATCH 2/8] integration test --- .../connect/tile38/writer/Tile38Writer.java | 7 +- .../connect/tile38/Tile38SinkTaskIT.java | 206 ++---------------- 2 files changed, 21 insertions(+), 192 deletions(-) diff --git a/src/main/java/guru/bonacci/kafka/connect/tile38/writer/Tile38Writer.java b/src/main/java/guru/bonacci/kafka/connect/tile38/writer/Tile38Writer.java index e7a41fc..8997d8b 100644 --- a/src/main/java/guru/bonacci/kafka/connect/tile38/writer/Tile38Writer.java +++ b/src/main/java/guru/bonacci/kafka/connect/tile38/writer/Tile38Writer.java @@ -37,6 +37,7 @@ import io.lettuce.core.RedisFuture; import io.lettuce.core.SocketOptions; import io.lettuce.core.api.async.RedisAsyncCommands; +import io.lettuce.core.output.BooleanOutput; import io.lettuce.core.output.CommandOutput; import io.lettuce.core.output.StatusOutput; import io.lettuce.core.protocol.CommandArgs; @@ -113,10 +114,10 @@ public RedisFuture[] write(List records) { Triple, CommandArgs> expire(final Tile38Record record) { final Triple, CommandArgs> generatedCmd; final CommandArgs cmdArgs = new CommandArgs<>(UTF8); - cmdArgs.add("train"); //key + cmdArgs.add("foo"); //key cmdArgs.add(record.getId()); //id - cmdArgs.add(5); //seconds - generatedCmd = Triple.of(CommandType.EXPIRE, new StatusOutput<>(UTF8), cmdArgs); + cmdArgs.add(3); //seconds + generatedCmd = Triple.of(CommandType.EXPIRE, new BooleanOutput<>(UTF8), cmdArgs); log.error("Compiled to: {} {}", generatedCmd.getLeft(), cmdArgs.toCommandString()); return generatedCmd; } diff --git a/src/test/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTaskIT.java b/src/test/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTaskIT.java index b4eb135..fe0114c 100644 --- a/src/test/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTaskIT.java +++ b/src/test/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTaskIT.java @@ -15,32 +15,29 @@ */ package guru.bonacci.kafka.connect.tile38; -import static com.github.jcustenborder.kafka.connect.utils.SinkRecordHelper.write; import static io.lettuce.core.codec.StringCodec.UTF8; +import static org.junit.Assert.assertNull; +import static com.github.jcustenborder.kafka.connect.utils.SinkRecordHelper.write; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import java.io.File; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Stream; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; import org.testcontainers.containers.DockerComposeContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -53,6 +50,7 @@ import guru.bonacci.kafka.connect.tile38.config.Tile38SinkConnectorConfig; import io.lettuce.core.api.sync.RedisCommands; import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.output.BooleanOutput; import io.lettuce.core.output.StatusOutput; import io.lettuce.core.protocol.CommandArgs; import io.lettuce.core.protocol.CommandType; @@ -95,203 +93,33 @@ public void after() { } @Test - public void emptyAssignment() { - final String topic = "foo"; - Map config = Maps.newHashMap(provideConfig(topic)); - config.remove("tile38.topic.foo"); - - Assertions.assertThrows(ConfigException.class, () -> { - this.task.start(config); - }); - } - - @Test - public void invalidTopics() { - final String topic = "*#^$(^#$(&(#*$&($&(Q#"; - - Assertions.assertThrows(ConfigException.class, () -> { - this.task.start(provideConfig(topic)); - }); - } - - @Test - public void topicRegex() { - final String topic = "foo"; - Map config = Maps.newHashMap(provideConfig(topic)); - config.remove("topics"); - config.put(SinkTask.TOPICS_REGEX_CONFIG, topic); - - Assertions.assertThrows(ConfigException.class, () -> { - this.task.start(config); - }); - } - - @Test - public void unconfiguredTopic() { - final String topic = "foo,bar"; - - Assertions.assertThrows(ConfigException.class, () -> { - this.task.start(provideConfig(topic)); - }); - } - - @Test - public void emptyPut() { + public void expireMany() throws InterruptedException { final String topic = "foo"; - - this.task.start(provideConfig(topic)); - this.task.put(ImmutableList.of()); - } - - static Stream provideForInvalidCommandTemplateStartup() { - return Stream.of( - Arguments.of("event.one event.two event.three"), - Arguments.of(" set "), - Arguments.of("set nokey") - ); - } - - @ParameterizedTest - @MethodSource("provideForInvalidCommandTemplateStartup") - public void invalidCommandTemplateStartup(String cmdString) { - final String topic = "foo"; - Map config = Maps.newHashMap(provideConfig(topic)); - config.put("tile38.topic.foo", cmdString); - - Assertions.assertThrows(ConfigException.class, () -> { - this.task.start(config); - }); - } - - private static Stream provideForValidWrite() { - return Stream.of( - Arguments.of("12.3", "4.56", "7.89"), - Arguments.of("0.3", "1111.2", "6.9"), - Arguments.of("15", "56", "89"), - Arguments.of("0.3", "0", "0"), - Arguments.of("1", "-0.0001", "-123.89"), - Arguments.of("-12.3", "-4.56", "7.89") - ); - } - - @ParameterizedTest - @MethodSource("provideForValidWrite") - public void validWrites(String route, String lat, String lon) { - final String topic = "foo"; this.task.start(provideConfig(topic)); final String id = "fooid"; Schema schema = getRouteSchema(); - Struct value = new Struct(schema).put("id", id).put("route", route).put("lat", lat).put("lon", lon); - final List records = ImmutableList.of(write(topic, Schema.STRING_SCHEMA, id, schema, value)); + final List records = new ArrayList<>(); + for (int i=0; i<100; i++) { + Struct value = new Struct(schema).put("id", id+i).put("route", ""+i).put("lat", ""+1.01*i).put("lon", ""+-1.01*i); + records.add(write(topic, Schema.STRING_SCHEMA, id, schema, value)); + } this.task.put(records); - RedisCommands sync = this.task.getWriter().getClient().connect().sync(); - CommandArgs get = getFooCommand(id); - String resp = sync.dispatch(CommandType.GET, new StatusOutput<>(StringCodec.UTF8), get); - assertThat(parser.parse(resp), is(equalTo(parser.parse(String.format(RESULT_STRING, lon, lat))))); - - resp = executeWithFieldsCommand(sync, get); - assertThat(resp, is(equalTo(route))); - } - - public void nestedWrite() { - final String lat = "4.56", lon = "7.89"; - final String topic = "foo"; - - Map config = Maps.newHashMap(provideConfig(topic)); - config.put("tile38.topic.foo", "SET foo event.id POINT event.nested.lat event.nested.lon"); - this.task.start(config); - - final String id = "fooid"; - - Schema nestedSchema = SchemaBuilder.struct() - .field("lat", Schema.STRING_SCHEMA) - .field("lon", Schema.STRING_SCHEMA).build(); - Schema schema = SchemaBuilder.struct().field("id", Schema.STRING_SCHEMA) - .field("nested", nestedSchema); - - Struct nestedValue = new Struct(nestedSchema).put("lat", lat).put("lon", lon); - Struct value = new Struct(schema).put("id", id).put("nested", nestedValue); - - final List records = ImmutableList.of(write(topic, Schema.STRING_SCHEMA, id, schema, value)); - this.task.put(records); + // sleep longer than x seconds + Thread.sleep(5000); RedisCommands sync = this.task.getWriter().getClient().connect().sync(); - CommandArgs get = getFooCommand(id); - String resp = sync.dispatch(CommandType.GET, new StatusOutput<>(StringCodec.UTF8), get); - assertThat(parser.parse(resp), is(equalTo(parser.parse(String.format(RESULT_STRING, lon, lat))))); + for (int i=0; i<100; i++) { + CommandArgs get = getFooCommand(id); + String resp = sync.dispatch(CommandType.GET, new StatusOutput<>(StringCodec.UTF8), get); + assertNull(resp); + } } - private static Stream provideForIgnoredFieldWrite() { - return Stream.of( - Arguments.of("0", "0.1", "1.0"), - Arguments.of("0.0", "0.1", "1.0") - ); - } - - @ParameterizedTest - @MethodSource("provideForIgnoredFieldWrite") - public void IgnoredFieldWrite(String route, String lat, String lon) { - final String topic = "foo"; - this.task.start(provideConfig(topic)); - - final String id = "fooid"; - Schema schema = getRouteSchema(); - Struct value = new Struct(schema).put("id", id).put("route", route).put("lat", lat).put("lon", lon); - - final List records = ImmutableList.of(write(topic, Schema.STRING_SCHEMA, id, schema, value)); - this.task.put(records); - - RedisCommands sync = this.task.getWriter().getClient().connect().sync(); - CommandArgs getCmd = getFooCommand(id); - String resp = executeWithFieldsCommand(sync, getCmd); - - assertThat(resp, is(not(equalTo(route)))); - } - private static Stream provideForInvalidWriteCommands() { - return Stream.of( - Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "12.3", "string", "no float"), - Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "null", "0.1", "1.0"), - Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "not a float", "3.1", "4.1"), - Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "nothing", "0.1", "1.0"), - Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "1f", "3.1", "4.1"), - Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "1", "3.1f", "4.1"), - Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "1F", "3.1", "4.1"), - Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "1", "3.1", "4.1F"), - Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "event.route", "event.lat", "event.lon"), - Arguments.of("SET bar event.one FIELD POINT event.two event.three", "123", "1.2", "2.3", null), - Arguments.of("SET bar event.one FIELD POINT event.two event.three", "null", "%%", "@@", null), - Arguments.of("SET bar event.one FIELD POINT event.two event.three", "$$", "1.2", "2.3", null), - Arguments.of("SET bar event.one FIELD event.two event.three", "100", "1.2", "2.3", null) - ); - } - @ParameterizedTest - @MethodSource("provideForInvalidWriteCommands") - public void invalidWriteCommands(String cmdString, String one, String two, String three, String four) { - final String topic = "foo"; - Map config = Maps.newHashMap(provideConfig(topic)); - config.put("tile38.topic.foo", cmdString); - this.task.start(config); - - SchemaBuilder schemaBuilder = SchemaBuilder.struct().field("one", Schema.STRING_SCHEMA).field("two", Schema.STRING_SCHEMA) - .field("three", Schema.STRING_SCHEMA); - if (four != null) - schemaBuilder.field("four", Schema.STRING_SCHEMA); - Schema schema = schemaBuilder.build(); - - Struct valueBuilder = new Struct(schema).put("one", one).put("two", two).put("three", three); - Struct value = four != null ? valueBuilder.put("four", four) : valueBuilder; - - final List records = ImmutableList.of(write(topic, Schema.STRING_SCHEMA, one, schema, value)); - Assertions.assertThrows(ConnectException.class, () -> { - this.task.put(records); - }); - } - // below some helper methods private Map provideConfig(String topic) { return ImmutableMap.of(SinkTask.TOPICS_CONFIG, topic, From 59d4301c7ca2cc7b0f504220b35e0b802dbf7e86 Mon Sep 17 00:00:00 2001 From: jeffreyvanhelden Date: Thu, 11 Jun 2020 08:40:07 +1200 Subject: [PATCH 3/8] IT test fixed --- .../connect/tile38/Tile38SinkTaskIT.java | 211 +++++++++++++++++- 1 file changed, 206 insertions(+), 5 deletions(-) diff --git a/src/test/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTaskIT.java b/src/test/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTaskIT.java index fe0114c..8dc1b53 100644 --- a/src/test/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTaskIT.java +++ b/src/test/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTaskIT.java @@ -15,9 +15,8 @@ */ package guru.bonacci.kafka.connect.tile38; -import static io.lettuce.core.codec.StringCodec.UTF8; -import static org.junit.Assert.assertNull; import static com.github.jcustenborder.kafka.connect.utils.SinkRecordHelper.write; +import static io.lettuce.core.codec.StringCodec.UTF8; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -29,15 +28,21 @@ import java.util.Map; import java.util.stream.Stream; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; +import org.junit.Assert; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.testcontainers.containers.DockerComposeContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -50,7 +55,6 @@ import guru.bonacci.kafka.connect.tile38.config.Tile38SinkConnectorConfig; import io.lettuce.core.api.sync.RedisCommands; import io.lettuce.core.codec.StringCodec; -import io.lettuce.core.output.BooleanOutput; import io.lettuce.core.output.StatusOutput; import io.lettuce.core.protocol.CommandArgs; import io.lettuce.core.protocol.CommandType; @@ -92,6 +96,137 @@ public void after() { this.task.stop(); } + @Test + public void emptyAssignment() { + final String topic = "foo"; + Map config = Maps.newHashMap(provideConfig(topic)); + config.remove("tile38.topic.foo"); + + Assertions.assertThrows(ConfigException.class, () -> { + this.task.start(config); + }); + } + + @Test + public void invalidTopics() { + final String topic = "*#^$(^#$(&(#*$&($&(Q#"; + + Assertions.assertThrows(ConfigException.class, () -> { + this.task.start(provideConfig(topic)); + }); + } + + @Test + public void topicRegex() { + final String topic = "foo"; + Map config = Maps.newHashMap(provideConfig(topic)); + config.remove("topics"); + config.put(SinkTask.TOPICS_REGEX_CONFIG, topic); + + Assertions.assertThrows(ConfigException.class, () -> { + this.task.start(config); + }); + } + + @Test + public void unconfiguredTopic() { + final String topic = "foo,bar"; + + Assertions.assertThrows(ConfigException.class, () -> { + this.task.start(provideConfig(topic)); + }); + } + + @Test + public void emptyPut() { + final String topic = "foo"; + + this.task.start(provideConfig(topic)); + this.task.put(ImmutableList.of()); + } + + static Stream provideForInvalidCommandTemplateStartup() { + return Stream.of( + Arguments.of("event.one event.two event.three"), + Arguments.of(" set "), + Arguments.of("set nokey") + ); + } + + @ParameterizedTest + @MethodSource("provideForInvalidCommandTemplateStartup") + public void invalidCommandTemplateStartup(String cmdString) { + final String topic = "foo"; + Map config = Maps.newHashMap(provideConfig(topic)); + config.put("tile38.topic.foo", cmdString); + + Assertions.assertThrows(ConfigException.class, () -> { + this.task.start(config); + }); + } + + private static Stream provideForValidWrite() { + return Stream.of( + Arguments.of("12.3", "4.56", "7.89"), + Arguments.of("0.3", "1111.2", "6.9"), + Arguments.of("15", "56", "89"), + Arguments.of("0.3", "0", "0"), + Arguments.of("1", "-0.0001", "-123.89"), + Arguments.of("-12.3", "-4.56", "7.89") + ); + } + + @ParameterizedTest + @MethodSource("provideForValidWrite") + public void validWrites(String route, String lat, String lon) { + final String topic = "foo"; + this.task.start(provideConfig(topic)); + + final String id = "fooid"; + Schema schema = getRouteSchema(); + Struct value = new Struct(schema).put("id", id).put("route", route).put("lat", lat).put("lon", lon); + + final List records = ImmutableList.of(write(topic, Schema.STRING_SCHEMA, id, schema, value)); + this.task.put(records); + + RedisCommands sync = this.task.getWriter().getClient().connect().sync(); + CommandArgs get = getFooCommand(id); + String resp = sync.dispatch(CommandType.GET, new StatusOutput<>(StringCodec.UTF8), get); + assertThat(parser.parse(resp), is(equalTo(parser.parse(String.format(RESULT_STRING, lon, lat))))); + + resp = executeWithFieldsCommand(sync, get); + assertThat(resp, is(equalTo(route))); + } + + @Test + public void nestedWrite() { + final String lat = "4.56", lon = "7.89"; + final String topic = "foo"; + + Map config = Maps.newHashMap(provideConfig(topic)); + config.put("tile38.topic.foo", "SET foo event.id POINT event.nested.lat event.nested.lon"); + this.task.start(config); + + final String id = "fooid"; + + Schema nestedSchema = SchemaBuilder.struct() + .field("lat", Schema.STRING_SCHEMA) + .field("lon", Schema.STRING_SCHEMA).build(); + Schema schema = SchemaBuilder.struct().field("id", Schema.STRING_SCHEMA) + .field("nested", nestedSchema); + + Struct nestedValue = new Struct(nestedSchema).put("lat", lat).put("lon", lon); + Struct value = new Struct(schema).put("id", id).put("nested", nestedValue); + + final List records = ImmutableList.of(write(topic, Schema.STRING_SCHEMA, id, schema, value)); + this.task.put(records); + + RedisCommands sync = this.task.getWriter().getClient().connect().sync(); + CommandArgs get = getFooCommand(id); + String resp = sync.dispatch(CommandType.GET, new StatusOutput<>(StringCodec.UTF8), get); + assertThat(parser.parse(resp), is(equalTo(parser.parse(String.format(RESULT_STRING, lon, lat))))); + } + @Test public void expireMany() throws InterruptedException { final String topic = "foo"; @@ -114,12 +249,78 @@ public void expireMany() throws InterruptedException { for (int i=0; i<100; i++) { CommandArgs get = getFooCommand(id); String resp = sync.dispatch(CommandType.GET, new StatusOutput<>(StringCodec.UTF8), get); - assertNull(resp); + Assert.assertNull(resp); } } + + private static Stream provideForIgnoredFieldWrite() { + return Stream.of( + Arguments.of("0", "0.1", "1.0"), + Arguments.of("0.0", "0.1", "1.0") + ); + } + + @ParameterizedTest + @MethodSource("provideForIgnoredFieldWrite") + public void IgnoredFieldWrite(String route, String lat, String lon) { + final String topic = "foo"; + this.task.start(provideConfig(topic)); + final String id = "fooid"; + Schema schema = getRouteSchema(); + Struct value = new Struct(schema).put("id", id).put("route", route).put("lat", lat).put("lon", lon); + + final List records = ImmutableList.of(write(topic, Schema.STRING_SCHEMA, id, schema, value)); + this.task.put(records); + + RedisCommands sync = this.task.getWriter().getClient().connect().sync(); + CommandArgs getCmd = getFooCommand(id); + String resp = executeWithFieldsCommand(sync, getCmd); + assertThat(resp, is(not(equalTo(route)))); + } + + private static Stream provideForInvalidWriteCommands() { + return Stream.of( + Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "12.3", "string", "no float"), + Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "null", "0.1", "1.0"), + Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "not a float", "3.1", "4.1"), + Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "nothing", "0.1", "1.0"), + Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "1f", "3.1", "4.1"), + Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "1", "3.1f", "4.1"), + Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "1F", "3.1", "4.1"), + Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "1", "3.1", "4.1F"), + Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "event.route", "event.lat", "event.lon"), + Arguments.of("SET bar event.one FIELD POINT event.two event.three", "123", "1.2", "2.3", null), + Arguments.of("SET bar event.one FIELD POINT event.two event.three", "null", "%%", "@@", null), + Arguments.of("SET bar event.one FIELD POINT event.two event.three", "$$", "1.2", "2.3", null), + Arguments.of("SET bar event.one FIELD event.two event.three", "100", "1.2", "2.3", null) + ); + } + @ParameterizedTest + @MethodSource("provideForInvalidWriteCommands") + public void invalidWriteCommands(String cmdString, String one, String two, String three, String four) { + final String topic = "foo"; + Map config = Maps.newHashMap(provideConfig(topic)); + config.put("tile38.topic.foo", cmdString); + this.task.start(config); + + SchemaBuilder schemaBuilder = SchemaBuilder.struct().field("one", Schema.STRING_SCHEMA).field("two", Schema.STRING_SCHEMA) + .field("three", Schema.STRING_SCHEMA); + if (four != null) + schemaBuilder.field("four", Schema.STRING_SCHEMA); + Schema schema = schemaBuilder.build(); + + Struct valueBuilder = new Struct(schema).put("one", one).put("two", two).put("three", three); + Struct value = four != null ? valueBuilder.put("four", four) : valueBuilder; + + final List records = ImmutableList.of(write(topic, Schema.STRING_SCHEMA, one, schema, value)); + Assertions.assertThrows(ConnectException.class, () -> { + this.task.put(records); + }); + } + // below some helper methods private Map provideConfig(String topic) { return ImmutableMap.of(SinkTask.TOPICS_CONFIG, topic, @@ -148,4 +349,4 @@ private String executeWithFieldsCommand(RedisCommands sync, Comm cmd.add("WITHFIELDS"); return sync.dispatch(CommandType.GET, new StatusOutput<>(StringCodec.UTF8), cmd); } -} +} \ No newline at end of file From 93564096d4887f0b5deb1bf37defc5385f65eb74 Mon Sep 17 00:00:00 2001 From: jeffreyvanhelden Date: Thu, 11 Jun 2020 11:29:02 +1200 Subject: [PATCH 4/8] command generation complete --- .../kafka/connect/tile38/Tile38SinkTask.java | 7 +--- .../tile38/commands/CommandGenerator.java | 38 ++++++++++++++++++- .../tile38/commands/CommandResult.java | 24 ++++++++++++ .../tile38/commands/CommandTemplate.java | 5 ++- .../connect/tile38/writer/Tile38Writer.java | 34 +++++------------ .../commands/CommandGeneratorTests.java | 6 +-- 6 files changed, 78 insertions(+), 36 deletions(-) create mode 100644 src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandResult.java diff --git a/src/main/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTask.java b/src/main/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTask.java index a3aa0c6..3040555 100644 --- a/src/main/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTask.java +++ b/src/main/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTask.java @@ -20,9 +20,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import java.util.Collection; -import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -78,10 +76,9 @@ public void put(Collection records) { .withSinkRecords(records) .build(); - List rec = recordStream.collect(Collectors.toList()); - final RedisFuture[] futures = writer.write(rec); - + final RedisFuture[] futures = writer.write(recordStream); wait(futures); + log.trace(futures.length + " commands executed"); } diff --git a/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandGenerator.java b/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandGenerator.java index 700c032..d03effb 100644 --- a/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandGenerator.java +++ b/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandGenerator.java @@ -33,6 +33,7 @@ import org.apache.kafka.connect.errors.DataException; import guru.bonacci.kafka.connect.tile38.writer.Tile38Record; +import io.lettuce.core.output.BooleanOutput; import io.lettuce.core.output.CommandOutput; import io.lettuce.core.output.IntegerOutput; import io.lettuce.core.output.StatusOutput; @@ -51,13 +52,23 @@ public class CommandGenerator { private final CommandTemplate cmd; /** - * Generates a Redis command based on a record's key and/or value. + * Combines a SET or DEL with a possible EXPIRE command + */ + public CommandResult compile(final Tile38Record record) { + return CommandResult.builder() + .setOrDel(setOrDelCmd(record)) + .expire(expireCmd(record)) + .build(); + } + + /** + * Generates a Redis command based on a record value. * Sending a command with Lettuce requires three arguments: * 1. CommandType: SET or DEL * 2. CommandOutput: Lettuce forces one to anticipate on the result data type * 3. CommandArgs: The actual command terms */ - public Triple, CommandArgs> compile(final Tile38Record record) { + private Triple, CommandArgs> setOrDelCmd(final Tile38Record record) { final Triple, CommandArgs> generatedCmd; final CommandArgs cmdArgs = new CommandArgs<>(UTF8); @@ -79,6 +90,29 @@ public class CommandGenerator { return generatedCmd; } + /** + * Generates a Redis command based on a record value. + * Sending a command with Lettuce requires three arguments: + * 1. CommandType: EXPIRE + * 2. CommandOutput: Lettuce forces one to anticipate on the result data type + * 3. CommandArgs: The actual command terms + */ + private Triple, CommandArgs> expireCmd(final Tile38Record record) { + // no expiration or a tombstone record + if (cmd.getExpirationInSec() == null || record.getValue() == null) { + return null; + } + + final Triple, CommandArgs> generatedCmd; + final CommandArgs cmdArgs = new CommandArgs<>(UTF8); + cmdArgs.add(cmd.getKey()); + cmdArgs.add(record.getId()); + cmdArgs.add(cmd.getExpirationInSec()); + generatedCmd = Triple.of(CommandType.EXPIRE, new BooleanOutput<>(UTF8), cmdArgs); + log.trace("Compiled to: {} {}", generatedCmd.getLeft(), cmdArgs.toCommandString()); + return generatedCmd; + } + /** * A command statement is created by * substituting the command terms with the records actual values. diff --git a/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandResult.java b/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandResult.java new file mode 100644 index 0000000..258fcb2 --- /dev/null +++ b/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandResult.java @@ -0,0 +1,24 @@ +package guru.bonacci.kafka.connect.tile38.commands; + +import java.util.stream.Stream; + +import org.apache.commons.lang3.tuple.Triple; + +import io.lettuce.core.output.CommandOutput; +import io.lettuce.core.protocol.CommandArgs; +import io.lettuce.core.protocol.CommandType; +import lombok.Builder; +import lombok.Getter; + +@Getter // for testing +@Builder +public class CommandResult { + + private Triple, CommandArgs> setOrDel; + + private Triple, CommandArgs> expire; + + public Stream, CommandArgs>> asStream() { + return Stream.of(setOrDel, expire); + } +} diff --git a/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplate.java b/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplate.java index c47fe01..100fe2b 100644 --- a/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplate.java +++ b/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplate.java @@ -52,6 +52,9 @@ public class CommandTemplate { // These terms are substituted in dynamic command generation // example: {event.id, event.rou, event.lat, event.lon} private final Set terms; + + // TODO comment + private final Integer expirationInSec; /** * Command format: @@ -84,6 +87,6 @@ public static CommandTemplate from(String cmdString) { // remove all command terms that do not start with 'event.' terms.removeIf(s -> !s.startsWith(TOKERATOR)); - return new CommandTemplate(cmdStringWithoutSet, keyAndCmdString[0], terms); + return new CommandTemplate(cmdStringWithoutSet, keyAndCmdString[0], terms, 3); } } diff --git a/src/main/java/guru/bonacci/kafka/connect/tile38/writer/Tile38Writer.java b/src/main/java/guru/bonacci/kafka/connect/tile38/writer/Tile38Writer.java index 8997d8b..a4247ab 100644 --- a/src/main/java/guru/bonacci/kafka/connect/tile38/writer/Tile38Writer.java +++ b/src/main/java/guru/bonacci/kafka/connect/tile38/writer/Tile38Writer.java @@ -22,14 +22,14 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import java.time.Duration; -import java.util.List; +import java.util.Objects; import java.util.concurrent.ExecutionException; +import java.util.stream.Stream; -import org.apache.commons.lang3.ArrayUtils; -import org.apache.commons.lang3.tuple.Triple; import org.apache.kafka.connect.errors.ConnectException; import guru.bonacci.kafka.connect.tile38.commands.CommandGenerators; +import guru.bonacci.kafka.connect.tile38.commands.CommandResult; import guru.bonacci.kafka.connect.tile38.commands.CommandTemplates; import guru.bonacci.kafka.connect.tile38.config.Tile38SinkConnectorConfig; import io.lettuce.core.ClientOptions; @@ -37,8 +37,6 @@ import io.lettuce.core.RedisFuture; import io.lettuce.core.SocketOptions; import io.lettuce.core.api.async.RedisAsyncCommands; -import io.lettuce.core.output.BooleanOutput; -import io.lettuce.core.output.CommandOutput; import io.lettuce.core.output.StatusOutput; import io.lettuce.core.protocol.CommandArgs; import io.lettuce.core.protocol.CommandType; @@ -95,33 +93,19 @@ public Tile38Writer(Tile38SinkConnectorConfig config) { } - public RedisFuture[] write(List records) { - final RedisFuture[] futures = records.stream() - .map(event -> cmds.generatorForTopic(event.getTopic()).compile(event)) // create command + public RedisFuture[] write(Stream records) { + final RedisFuture[] futures = records + .map(event -> cmds.generatorForTopic(event.getTopic()).compile(event)) // create command(s) + .flatMap(CommandResult::asStream) + .filter(Objects::nonNull) // non-expire commands are null .map(cmd -> async.dispatch(cmd.getLeft(), cmd.getMiddle(), cmd.getRight())) // execute command .toArray(RedisFuture[]::new); // collect futures - final RedisFuture[] moreFutures = records.stream() - .map(event -> expire(event)) - .map(cmd -> async.dispatch(cmd.getLeft(), cmd.getMiddle(), cmd.getRight())) - .toArray(RedisFuture[]::new); - // async batch insert async.flushCommands(); - return ArrayUtils.addAll(futures, moreFutures); + return futures; } - Triple, CommandArgs> expire(final Tile38Record record) { - final Triple, CommandArgs> generatedCmd; - final CommandArgs cmdArgs = new CommandArgs<>(UTF8); - cmdArgs.add("foo"); //key - cmdArgs.add(record.getId()); //id - cmdArgs.add(3); //seconds - generatedCmd = Triple.of(CommandType.EXPIRE, new BooleanOutput<>(UTF8), cmdArgs); - log.error("Compiled to: {} {}", generatedCmd.getLeft(), cmdArgs.toCommandString()); - return generatedCmd; - } - public void close() { client.shutdown(); } diff --git a/src/test/java/guru/bonacci/kafka/connect/tile38/commands/CommandGeneratorTests.java b/src/test/java/guru/bonacci/kafka/connect/tile38/commands/CommandGeneratorTests.java index 0956d68..b7fb365 100644 --- a/src/test/java/guru/bonacci/kafka/connect/tile38/commands/CommandGeneratorTests.java +++ b/src/test/java/guru/bonacci/kafka/connect/tile38/commands/CommandGeneratorTests.java @@ -97,7 +97,7 @@ void compileToSET() { Tile38Record internalRecord = new RecordConverter().convert(rec); Triple, CommandArgs> result = CommandGenerator.from( - CommandTemplate.from(cmdString)).compile(internalRecord); + CommandTemplate.from(cmdString)).compile(internalRecord).getSetOrDel(); assertThat(result.getLeft(), is(equalTo(CommandType.SET))); assertThat(result.getRight().toCommandString(), is(equalTo("bla some id is to be sub nest.event.foo and nest.event.bar more"))); @@ -123,7 +123,7 @@ void tombstoneToDELETE() { Tile38Record internalRecord = new RecordConverter().convert(rec); Triple, CommandArgs> result = CommandGenerator.from( - CommandTemplate.from(cmdString)).compile(internalRecord); + CommandTemplate.from(cmdString)).compile(internalRecord).getSetOrDel(); assertThat(result.getLeft(), is(equalTo(CommandType.DEL))); assertThat(result.getRight().toCommandString(), is(equalTo("bla thekey"))); @@ -178,7 +178,7 @@ void compileWithRepeatingTermNames() { Tile38Record internalRecord = new RecordConverter().convert(rec); Triple, CommandArgs> result = CommandGenerator.from( - CommandTemplate.from(cmdString)).compile(internalRecord); + CommandTemplate.from(cmdString)).compile(internalRecord).getSetOrDel(); assertThat(result.getLeft(), is(equalTo(CommandType.SET))); assertThat(result.getRight().toCommandString(), is(equalTo("foo one POINT 12.34 56.78"))); From 794fc329bcbca700e7624b8402f12ce3e61922e3 Mon Sep 17 00:00:00 2001 From: jeffreyvanhelden Date: Fri, 12 Jun 2020 09:11:50 +1200 Subject: [PATCH 5/8] command template creation for expire --- .../kafka/connect/tile38/Constants.java | 1 + .../tile38/commands/CommandTemplate.java | 9 ++-- .../tile38/commands/CommandTemplates.java | 35 ++++++++++-- .../commands/CommandTemplatesTests.java | 54 ++++++++++++++++++- .../tile38/config/TopicsConfigTests.java | 12 +++-- 5 files changed, 99 insertions(+), 12 deletions(-) diff --git a/src/main/java/guru/bonacci/kafka/connect/tile38/Constants.java b/src/main/java/guru/bonacci/kafka/connect/tile38/Constants.java index 0576852..547ef5f 100644 --- a/src/main/java/guru/bonacci/kafka/connect/tile38/Constants.java +++ b/src/main/java/guru/bonacci/kafka/connect/tile38/Constants.java @@ -24,4 +24,5 @@ public class Constants { public static final String SET_TERM = "SET"; public static final String COMMAND_PREFIX = "tile38.topic."; + public static final String EXPIRE_SUFFIX = ".expire"; } diff --git a/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplate.java b/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplate.java index 100fe2b..5ef1fe2 100644 --- a/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplate.java +++ b/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplate.java @@ -29,6 +29,7 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.Setter; import lombok.ToString; /** @@ -53,8 +54,10 @@ public class CommandTemplate { // example: {event.id, event.rou, event.lat, event.lon} private final Set terms; - // TODO comment - private final Integer expirationInSec; + // Expiration time in seconds. + // example: 5 + @Setter private Integer expirationInSec = null; + /** * Command format: @@ -87,6 +90,6 @@ public static CommandTemplate from(String cmdString) { // remove all command terms that do not start with 'event.' terms.removeIf(s -> !s.startsWith(TOKERATOR)); - return new CommandTemplate(cmdStringWithoutSet, keyAndCmdString[0], terms, 3); + return new CommandTemplate(cmdStringWithoutSet, keyAndCmdString[0], terms); } } diff --git a/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplates.java b/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplates.java index 83ff1f7..54a9e16 100644 --- a/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplates.java +++ b/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplates.java @@ -15,13 +15,24 @@ */ package guru.bonacci.kafka.connect.tile38.commands; +import static com.google.common.collect.Maps.filterKeys; import static com.google.common.collect.Maps.immutableEntry; +import static guru.bonacci.kafka.connect.tile38.Constants.EXPIRE_SUFFIX; +import static java.lang.Integer.valueOf; +import static java.lang.String.format; import static java.util.stream.Collectors.toMap; import static lombok.AccessLevel.PRIVATE; +import static org.apache.commons.lang3.StringUtils.removeEnd; import java.util.Map; +import java.util.Map.Entry; import java.util.stream.Stream; +import org.apache.kafka.common.config.ConfigException; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; + import guru.bonacci.kafka.connect.tile38.config.TopicsConfig; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -45,17 +56,35 @@ public CommandTemplate templateForTopic(String topic) { } public static CommandTemplates from(TopicsConfig topics) { - Map cmdsByTopic = topics.getCmdsByTopic(); + final Map cmdsByTopic = topics.getCmdsByTopic(); log.info("Creating command template data structure for {}", cmdsByTopic); + // split the map in commands and expiration configs + final Predicate isExpire = key -> key.endsWith(EXPIRE_SUFFIX); + final Map expires = filterKeys(cmdsByTopic, Predicates.and(isExpire)); + final Map cmds = filterKeys(cmdsByTopic, Predicates.not(isExpire)); + + // create templates from command strings // in -> key: topic name - value: command string - Map cmdTemplates = - cmdsByTopic.entrySet().stream().map(cmdForTopic -> { + final Map cmdTemplates = + cmds.entrySet().stream().map(cmdForTopic -> { CommandTemplate cmd = CommandTemplate.from(cmdForTopic.getValue()); return immutableEntry(cmdForTopic.getKey(), cmd); }) .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)); + // add the expiration times to the templates + for (Entry expire : expires.entrySet()) { + String expireTopicKey = removeEnd(expire.getKey(), EXPIRE_SUFFIX); + try { + cmdTemplates.computeIfAbsent(expireTopicKey, s -> { throw new IllegalArgumentException(); }); + cmdTemplates.get(expireTopicKey).setExpirationInSec(valueOf(expire.getValue())); + } catch (IllegalArgumentException e) { + throw new ConfigException( + format("Error expire config for topic '%s'. Invalid value '%s'. Check the docs!", expireTopicKey, expire.getValue())); + } + } + // out -> key: topic name - value: command template return new CommandTemplates(cmdTemplates); } diff --git a/src/test/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplatesTests.java b/src/test/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplatesTests.java index 418ccdc..fb83f73 100644 --- a/src/test/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplatesTests.java +++ b/src/test/java/guru/bonacci/kafka/connect/tile38/commands/CommandTemplatesTests.java @@ -19,16 +19,24 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertNull; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigException; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import com.google.common.collect.Maps; + +import guru.bonacci.kafka.connect.tile38.config.TopicsConfig; import guru.bonacci.kafka.connect.tile38.config.TopicsConfigTests; public class CommandTemplatesTests { @Test - void commandArgs() { + void onlyCommandArgs() { CommandTemplates cmds = CommandTemplates.from(TopicsConfigTests.provideTopics()); assertThat(cmds.configuredTopics().count(), is(2l)); @@ -45,4 +53,48 @@ void commandArgs() { assertThat(barCmd.getTerms(), hasItem("event.bar")); assertThat(barCmd.getTerms(), hasItem("event.there")); } + + @Test + void commandArgsWithExpire() { + Map config = Maps.newHashMap(TopicsConfigTests.provideConfig()); + config.put("tile38.topic.foo.expire", "5"); + + CommandTemplates cmds = CommandTemplates.from(TopicsConfig.from(config)); + + assertThat(cmds.configuredTopics().count(), is(2l)); + + CommandTemplate fooCmd = cmds.templateForTopic("foo"); + assertThat(fooCmd.getCmdString(), is("foo event.query event.here")); + assertThat(fooCmd.getTerms(), hasSize(2)); + assertThat(fooCmd.getTerms(), hasItem("event.here")); + assertThat(fooCmd.getTerms(), hasItem("event.query")); + assertThat(fooCmd.getExpirationInSec(), is(5)); + + CommandTemplate barCmd = cmds.templateForTopic("bar"); + assertThat(barCmd.getCmdString(), is("bar event.bar query here event.there")); + assertThat(barCmd.getTerms(), hasSize(2)); + assertThat(barCmd.getTerms(), hasItem("event.bar")); + assertThat(barCmd.getTerms(), hasItem("event.there")); + assertNull(barCmd.getExpirationInSec()); + } + + @Test + void commandArgsInvalidExpire() { + Map config = Maps.newHashMap(TopicsConfigTests.provideConfig()); + config.put("tile38.topic.foooooo.expire", "5"); + + Assertions.assertThrows(ConfigException.class, () -> { + CommandTemplates.from(TopicsConfig.from(config)); + }); + } + + @Test + void commandArgsInvalidExpireNumber() { + Map config = Maps.newHashMap(TopicsConfigTests.provideConfig()); + config.put("tile38.topic.foo.expire", "abc"); + + Assertions.assertThrows(ConfigException.class, () -> { + CommandTemplates.from(TopicsConfig.from(config)); + }); + } } diff --git a/src/test/java/guru/bonacci/kafka/connect/tile38/config/TopicsConfigTests.java b/src/test/java/guru/bonacci/kafka/connect/tile38/config/TopicsConfigTests.java index 8b640a6..deeefbe 100644 --- a/src/test/java/guru/bonacci/kafka/connect/tile38/config/TopicsConfigTests.java +++ b/src/test/java/guru/bonacci/kafka/connect/tile38/config/TopicsConfigTests.java @@ -27,17 +27,19 @@ import com.google.common.collect.ImmutableMap; -import guru.bonacci.kafka.connect.tile38.config.TopicsConfig; - public class TopicsConfigTests { - public static TopicsConfig provideTopics() { - return TopicsConfig.from(ImmutableMap.of( + public static Map provideConfig() { + return ImmutableMap.of( "key.converter", "org.apache.kafka.connect.storage.StringConverter", "value.converter", "org.apache.kafka.connect.storage.StringConverter", "topics", "foo,bar", "tile38.topic.foo", "SET foo event.query event.here", - "tile38.topic.bar", "set bar event.bar query here event.there")); + "tile38.topic.bar", "set bar event.bar query here event.there"); + } + + public static TopicsConfig provideTopics() { + return TopicsConfig.from(provideConfig()); } @Test From 96a82ca836ef0f1aed38f2cf7d203855e40f3981 Mon Sep 17 00:00:00 2001 From: jeffreyvanhelden Date: Fri, 12 Jun 2020 11:13:37 +1200 Subject: [PATCH 6/8] expire functionality working --- config/connector-sink.json | 1 + .../connect/tile38/config/Tile38SinkConnectorConfig.java | 4 +++- .../guru/bonacci/kafka/connect/tile38/Tile38SinkTaskIT.java | 4 +++- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/config/connector-sink.json b/config/connector-sink.json index ba565ba..e9382cb 100644 --- a/config/connector-sink.json +++ b/config/connector-sink.json @@ -11,6 +11,7 @@ "tile38.topic.stations": "SET station event.id POINT event.latitude event.longitude", "tile38.topic.trains": "SET train event.id FIELD route event.route POINT event.lat event.lon", + "tile38.topic.trains.expire": 10, "tile38.host": "tile38", "tile38.port": 9851, diff --git a/src/main/java/guru/bonacci/kafka/connect/tile38/config/Tile38SinkConnectorConfig.java b/src/main/java/guru/bonacci/kafka/connect/tile38/config/Tile38SinkConnectorConfig.java index c3e3e43..3fec2d6 100644 --- a/src/main/java/guru/bonacci/kafka/connect/tile38/config/Tile38SinkConnectorConfig.java +++ b/src/main/java/guru/bonacci/kafka/connect/tile38/config/Tile38SinkConnectorConfig.java @@ -16,6 +16,7 @@ package guru.bonacci.kafka.connect.tile38.config; import static com.google.common.collect.Sets.symmetricDifference; +import static guru.bonacci.kafka.connect.tile38.Constants.EXPIRE_SUFFIX; import static java.lang.String.format; import static java.util.Arrays.stream; import static java.util.Collections.emptySet; @@ -98,7 +99,8 @@ private void validateConfiguredTopics(Map props) { : emptySet(); Set configuredTopics = this.topicsConfig.configuredTopics(); - + configuredTopics.removeIf(topicName -> topicName.endsWith(EXPIRE_SUFFIX)); + if (!symmetricDifference(topics, configuredTopics).isEmpty()) { throw new ConfigException(format("There is a mismatch between topics defined into the property 'topics' %s and configured topics %s", topics, configuredTopics)); diff --git a/src/test/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTaskIT.java b/src/test/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTaskIT.java index 8dc1b53..3ae7a1e 100644 --- a/src/test/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTaskIT.java +++ b/src/test/java/guru/bonacci/kafka/connect/tile38/Tile38SinkTaskIT.java @@ -230,7 +230,9 @@ public void nestedWrite() { @Test public void expireMany() throws InterruptedException { final String topic = "foo"; - this.task.start(provideConfig(topic)); + Map config = Maps.newHashMap(provideConfig(topic)); + config.put("tile38.topic.foo.expire", "2"); + this.task.start(config); final String id = "fooid"; Schema schema = getRouteSchema(); From ae474cfd2c2bd08139834fe18ad6181e0eb183c7 Mon Sep 17 00:00:00 2001 From: LeonardoBonacci Date: Fri, 12 Jun 2020 12:04:57 +1200 Subject: [PATCH 7/8] Update README.md --- README.md | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 5cbc5d6..2761082 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,14 @@ Specified event fields that do not match any topic value field name result in in - Only value fields are permitted. - Using anything other than a SET command is not supported at this stage. +### Expire + +[Expire](https://tile38.com/commands/expire/) functionality is available. The unit is seconds. How to use it: +``` +tile38.topic.foo=SET foo event.id FIELD route event.route POINT event.lat event.lon +tile38.topic.foo.expire=5 +``` +**Warning! This functionality only works when the Kafka topic name is equal to the Tile38 key.** ### Tombstone messages @@ -82,8 +90,9 @@ topics | Kafka topics read by the connector | comma-separated string | | high | flush.timeout.ms | Used for periodic flushing | int | 10000 | low | 1234 behavior.on.error | Error handling behavior | string | FAIL | medium | LOG or FAIL tile38.password | Tile38's password | string | "" | low | foo123 -tile38.topic.foo | Example command for 'foo' topic | string | | low | foo event.id FIELD route event.route POINT event.lat event.lon -tile38.topic.bar | Example command for 'bar' topic | string | | low | anything event.the_key POINT event.latitude event.longitude +tile38.topic.foo | Example command for 'foo' topic | string | | low | SET foo event.id FIELD route event.route POINT event.lat event.lon +tile38.topic.foo.expire | Expire time for 'foo' keyed id's | int | | low | 5 +tile38.topic.bar | Example command for 'bar' topic | string | | low | SET anything event.the_key POINT event.latitude event.longitude ***and*** | ***a*** | ***few*** | ***boring*** | ***connection*** | ***settings*** socket.tcp.no.delay.enabled | Use TCP-no-delay | boolean | false | low | socket.keep.alive.enabled | Enable keepalive | boolean | false | low | From eb246a990d3961b9601d4aa1a136464b20ff32c6 Mon Sep 17 00:00:00 2001 From: LeonardoBonacci Date: Sat, 13 Jun 2020 11:01:31 +1200 Subject: [PATCH 8/8] Update README.md --- README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/README.md b/README.md index 2761082..d865541 100644 --- a/README.md +++ b/README.md @@ -63,7 +63,6 @@ Specified event fields that do not match any topic value field name result in in tile38.topic.foo=SET foo event.id FIELD route event.route POINT event.lat event.lon tile38.topic.foo.expire=5 ``` -**Warning! This functionality only works when the Kafka topic name is equal to the Tile38 key.** ### Tombstone messages