diff --git a/debezium-server-redis/pom.xml b/debezium-server-redis/pom.xml
index fee95c6a..73b86b02 100644
--- a/debezium-server-redis/pom.xml
+++ b/debezium-server-redis/pom.xml
@@ -10,9 +10,9 @@
debezium-server-redis
Debezium Server Redis Sink Adapter
jar
-
+
- java,-Djavax.net.ssl.keyStore=/etc/certificates/client-keystore.p12,-Djavax.net.ssl.trustStore=/etc/certificates/client-truststore.p12,-Djavax.net.ssl.keyStorePassword=secret,-Djavax.net.ssl.trustStorePassword=secret,-jar,quarkus-run.jar
+ -Djavax.net.ssl.keyStore=/ssl/client-keystore.p12,-Djavax.net.ssl.trustStore=/ssl/client-truststore.p12,-Djavax.net.ssl.keyStorePassword=secret,-Djavax.net.ssl.trustStorePassword=secret
diff --git a/debezium-server-redis/src/main/jib/ssl/README b/debezium-server-redis/src/main/jib/ssl/README
new file mode 100644
index 00000000..d0d4d968
--- /dev/null
+++ b/debezium-server-redis/src/main/jib/ssl/README
@@ -0,0 +1,3 @@
+keystore and truststore copied from resources.
+TODO reconsider if there is more elegant solution that copying truststore here
+
diff --git a/debezium-server-redis/src/main/jib/ssl/client-keystore.p12 b/debezium-server-redis/src/main/jib/ssl/client-keystore.p12
new file mode 100644
index 00000000..97377267
Binary files /dev/null and b/debezium-server-redis/src/main/jib/ssl/client-keystore.p12 differ
diff --git a/debezium-server-redis/src/main/jib/ssl/client-truststore.p12 b/debezium-server-redis/src/main/jib/ssl/client-truststore.p12
new file mode 100644
index 00000000..36ee4827
Binary files /dev/null and b/debezium-server-redis/src/main/jib/ssl/client-truststore.p12 differ
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersSslStreamIT.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersSslStreamIT.java
index 675180b9..ee1f0052 100644
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersSslStreamIT.java
+++ b/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersSslStreamIT.java
@@ -35,8 +35,13 @@ public TestContainersSslStreamIT() {
"--tls-key-file /etc/certificates/redis.key " +
"--tls-ca-cert-file /etc/certificates/ca.crt")
.withClasspathResourceMapping("ssl", "/etc/certificates", BindMode.READ_ONLY);
- server
- .withClasspathResourceMapping("ssl", "/etc/certificates", BindMode.READ_WRITE);
+ // server
+ // TODO why is withCommand not working
+ // .withCommand("-Djavax.net.ssl.keyStore=/ssl/client-keystore.p12",
+ // "-Djavax.net.ssl.trustStore=/ssl/client-truststore.p12",
+ // "-Djavax.net.ssl.keyStorePassword=secret",
+ // "-Djavax.net.ssl.trustStorePassword=secret")
+ // .withClasspathResourceMapping("ssl", "/etc/certificates", BindMode.READ_WRITE);
}
@Test
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersPrototypeIT.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersStreamIT.java
similarity index 60%
rename from debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersPrototypeIT.java
rename to debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersStreamIT.java
index 6a7c472f..ed5e549c 100644
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersPrototypeIT.java
+++ b/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersStreamIT.java
@@ -13,6 +13,8 @@
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
+import java.util.List;
+import java.util.Map;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
@@ -21,8 +23,11 @@
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.doc.FixFor;
-public class TestContainersPrototypeIT extends TestContainersRedisTestBase {
- private static final Logger LOGGER = LoggerFactory.getLogger(TestContainersPrototypeIT.class);
+import redis.clients.jedis.StreamEntryID;
+import redis.clients.jedis.resps.StreamEntry;
+
+public class TestContainersStreamIT extends TestContainersRedisTestBase {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TestContainersStreamIT.class);
@Test
public void shouldStreamChanges() throws InterruptedException, IOException {
@@ -54,37 +59,52 @@ public void shouldFailWithIncorrectRedisAddress() {
@Test
@FixFor("DBZ-4510")
- public void testRedisConnectionRetry() throws Exception {
+ public void shouldRetryAfterRedisCrash() throws Exception {
final int SOCKET_TIMEOUT = 4000;
server.setEnv(new DebeziumServerConfigBuilder().withBaseConfig(redis, postgres)
.withValue("debezium.sink.redis.socket.timeout.ms", String.valueOf(SOCKET_TIMEOUT))
.build());
server.start();
- final int MESSAGE_COUNT = 5;
- final String STREAM_NAME = "testc.inventory.redis_test";
+ final int MESSAGE_COUNT = 4;
+ final String STREAM_NAME = "testc.inventory.customers";
+ awaitStreamLength(jedis, STREAM_NAME, MESSAGE_COUNT);
redis.pause();
- final PostgresConnection connection = getPostgresConnection(postgres);
- LOGGER.info("Creating new redis_test table and inserting 5 records to it");
- connection.execute(
- "CREATE TABLE inventory.redis_test (id INT PRIMARY KEY)",
- "INSERT INTO inventory.redis_test VALUES (1)",
- "INSERT INTO inventory.redis_test VALUES (2)",
- "INSERT INTO inventory.redis_test VALUES (3)",
- "INSERT INTO inventory.redis_test VALUES (4)",
- "INSERT INTO inventory.redis_test VALUES (5)");
- connection.close();
+ insertCustomerToPostgres(postgres, "Sergei", "Savage", "sesa@email.com");
LOGGER.info("Sleeping for " + SOCKET_TIMEOUT / 2 + " milis to simulate no connection errors");
Thread.sleep(SOCKET_TIMEOUT / 2);
redis.unpause();
+ awaitStreamLength(jedis, STREAM_NAME, MESSAGE_COUNT + 1);
+ }
+
+ @Test
+ public void shouldTimeoutAfterRedisCrash() throws Exception {
+ final int SOCKET_TIMEOUT = 2000;
+ server.setEnv(new DebeziumServerConfigBuilder().withBaseConfig(redis, postgres)
+ .withValue("debezium.sink.redis.socket.timeout.ms", String.valueOf(SOCKET_TIMEOUT))
+ .build());
+ server.start();
+
+ final int MESSAGE_COUNT = 4;
+ final String STREAM_NAME = "testc.inventory.customers";
awaitStreamLength(jedis, STREAM_NAME, MESSAGE_COUNT);
+
+ redis.pause();
+ insertCustomerToPostgres(postgres, "Sergei", "Savage", "sesa@email.com");
+
+ LOGGER.info("Sleeping for " + SOCKET_TIMEOUT / 2 + " milis to simulate no connection errors");
+ Thread.sleep(SOCKET_TIMEOUT / 2);
+ assertThat(server.isRunning()).isTrue();
+
+ waitForContainerLog(server, "Read timed out", 2);
+ waitForContainerStop(server);
}
@Test
@FixFor("DBZ-4510")
- public void testRedisOOMRetry() throws Exception {
+ public void shouldRetryAfterRedisOOM() throws Exception {
server.setEnv(new DebeziumServerConfigBuilder().withBaseConfig(redis, postgres).build());
server.start();
@@ -110,4 +130,25 @@ public void testRedisOOMRetry() throws Exception {
jedis.configSet("maxmemory", "30M");
awaitStreamLength(jedis, STREAM_NAME, TOTAL_RECORDS);
}
+
+ @Test
+ public void shouldStreamExtendedMessageFormat() {
+ server.setEnv(new DebeziumServerConfigBuilder().withBaseConfig(redis, postgres)
+ .withValue("debezium.sink.redis.message.format", "extended")
+ .build());
+ server.start();
+ final int MESSAGE_COUNT = 4;
+ final String STREAM_NAME = "testc.inventory.customers";
+
+ awaitStreamLength(jedis, STREAM_NAME, MESSAGE_COUNT);
+
+ final List entries = jedis.xrange(STREAM_NAME, null, (StreamEntryID) null);
+ for (StreamEntry entry : entries) {
+ Map map = entry.getFields();
+ // TODO verify, that there should really be 2 fields
+ // assertEquals(3, map.size(), "Expected map of size 3");
+ assertThat(map.get("key")).startsWith("{\"schema\":");
+ assertThat(map.get("value")).startsWith("{\"schema\":");
+ }
+ }
}
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestUtils.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestUtils.java
index 483af63b..a4ff8e28 100644
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestUtils.java
+++ b/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestUtils.java
@@ -25,8 +25,12 @@
public class TestUtils {
public static void waitForContainerLog(GenericContainer> container, String expectedLog) {
+ waitForContainerLog(container, expectedLog, TestConfigSource.waitForSeconds());
+ }
+
+ public static void waitForContainerLog(GenericContainer> container, String expectedLog, int seconds) {
await()
- .atMost(20, TimeUnit.SECONDS)
+ .atMost(seconds, TimeUnit.SECONDS)
.until(() -> container.getLogs(OutputFrame.OutputType.STDOUT).contains(expectedLog));
}
@@ -73,14 +77,14 @@ public static PostgresConnection getPostgresConnection(DebeziumTestContainerWrap
public static void awaitStreamLengthGte(Jedis jedis, String streamName, int expectedLength) {
await()
- .atMost(TestConfigSource.waitForSeconds(), TimeUnit.SECONDS)
+ .atMost(10, TimeUnit.SECONDS)
.until(() -> jedis.xlen(streamName) >= expectedLength);
}
public static void awaitStreamLength(Jedis jedis, String streamName, int expectedLength) {
await()
- .atMost(TestConfigSource.waitForSeconds(), TimeUnit.SECONDS)
+ .atMost(10, TimeUnit.SECONDS)
.until(() -> jedis.xlen(streamName) == expectedLength);
}