Skip to content

Commit

Permalink
Added more test cases, refactor, replaced postgres with mysql
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinMedek committed Aug 15, 2023
1 parent 15faee4 commit e454acb
Show file tree
Hide file tree
Showing 11 changed files with 262 additions and 206 deletions.
4 changes: 2 additions & 2 deletions debezium-server-redis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
<artifactId>debezium-server-redis</artifactId>
<name>Debezium Server Redis Sink Adapter</name>
<packaging>jar</packaging>
<!-- TODO is writing custom dockerfile better? -->

<properties>
<quarkus.jib.jvm-additional-arguments>-Djavax.net.ssl.keyStore=/ssl/client-keystore.p12,-Djavax.net.ssl.trustStore=/ssl/client-truststore.p12,-Djavax.net.ssl.keyStorePassword=secret,-Djavax.net.ssl.trustStorePassword=secret</quarkus.jib.jvm-additional-arguments>
<quarkus.jib.jvm-entrypoint>java</quarkus.jib.jvm-entrypoint>
</properties>

<dependencies>
Expand Down
3 changes: 0 additions & 3 deletions debezium-server-redis/src/main/jib/ssl/README

This file was deleted.

Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
*/
package io.debezium.server.redis.wip;

import static io.debezium.server.redis.wip.TestConstants.POSTGRES_DATABASE;
import static io.debezium.server.redis.wip.TestConstants.POSTGRES_PASSWORD;
import static io.debezium.server.redis.wip.TestConstants.POSTGRES_PORT;
import static io.debezium.server.redis.wip.TestConstants.POSTGRES_USER;
import static io.debezium.server.redis.wip.TestConstants.MYSQL_DATABASE;
import static io.debezium.server.redis.wip.TestConstants.MYSQL_PASSWORD;
import static io.debezium.server.redis.wip.TestConstants.MYSQL_PORT;
import static io.debezium.server.redis.wip.TestConstants.MYSQL_USER;
import static io.debezium.server.redis.wip.TestConstants.REDIS_PORT;

import java.util.HashMap;
Expand All @@ -35,25 +35,27 @@ public List<String> build() {
public Map<String, String> baseRedisConfig(DebeziumTestContainerWrapper redis) {
return Map.of(
"debezium.sink.type", "redis",
"debezium.sink.redis.address", redis.getContainerIp() + ":" + REDIS_PORT);
"debezium.sink.redis.address", redis.getContainerAddress() + ":" + REDIS_PORT);
}

public Map<String, String> basePostgresConfig(DebeziumTestContainerWrapper postgres) {
return Map.of("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector",
public Map<String, String> baseMySqlConfig(DebeziumTestContainerWrapper mysql) {
Map<String, String> result = new HashMap<>(Map.of("debezium.source.connector.class", "io.debezium.connector.mysql.MySqlConnector",
"debezium.source.offset.flush.interval.ms", "0",
"debezium.source.topic.prefix", "testc",
"debezium.source.schema.include.list", "inventory",
"debezium.source.database.hostname", String.valueOf(postgres.getContainerIp()),
"debezium.source.database.port", String.valueOf(POSTGRES_PORT),
"debezium.source.database.user", POSTGRES_USER,
"debezium.source.database.password", POSTGRES_PASSWORD,
"debezium.source.database.dbname", POSTGRES_DATABASE,
"debezium.source.offset.storage.file.filename", "offset.dat");
"debezium.source.database.dbname", MYSQL_DATABASE,
"debezium.source.database.hostname", String.valueOf(mysql.getContainerAddress()),
"debezium.source.database.port", String.valueOf(MYSQL_PORT),
"debezium.source.database.user", MYSQL_USER,
"debezium.source.database.password", MYSQL_PASSWORD,
"debezium.source.database.server.id", "1",
"debezium.source.schema.history.internal", "io.debezium.server.redis.RedisSchemaHistory"));
result.put("debezium.source.offset.storage.file.filename", "offset.dat");
return result;
}

public DebeziumServerConfigBuilder withBaseConfig(DebeziumTestContainerWrapper redis, DebeziumTestContainerWrapper postgres) {
public DebeziumServerConfigBuilder withBaseMySqlConfig(DebeziumTestContainerWrapper redis, DebeziumTestContainerWrapper mysql) {
config.putAll(baseRedisConfig(redis));
config.putAll(basePostgresConfig(postgres));
config.putAll(baseMySqlConfig(mysql));
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,24 @@
*/
package io.debezium.server.redis.wip;

import static org.awaitility.Awaitility.await;
import static org.testcontainers.containers.output.OutputFrame.OutputType.STDOUT;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.OutputFrame;

import lombok.NonNull;

public class DebeziumTestContainerWrapper extends GenericContainer<DebeziumTestContainerWrapper> {

private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumTestContainerWrapper.class);

private String networkAlias;

public DebeziumTestContainerWrapper(@NonNull String dockerImageName) {
super(dockerImageName);
}
Expand All @@ -31,19 +37,28 @@ public void unpause() {
getDockerClient().unpauseContainerCmd(getContainerId()).exec();
}

public String getContainerIp() {
return getContainerInfo()
.getNetworkSettings()
.getNetworks()
.entrySet()
.stream()
.findFirst()
.get()
.getValue()
.getIpAddress();
public DebeziumTestContainerWrapper withNetworkAlias(String alias) {
this.networkAlias = alias;
return super.withNetworkAliases(alias);
}

public String getContainerAddress() {
return networkAlias;
}

public String getStandardOutput() {
return getLogs(STDOUT);
}

public void waitForContainerLog(String log) {
await()
.atMost(60, TimeUnit.SECONDS)
.until(() -> getLogs(OutputFrame.OutputType.STDOUT).contains(log));
}

public void waitForStop() {
await()
.atMost(60, TimeUnit.SECONDS)
.until(() -> !isRunning());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,24 @@
package io.debezium.server.redis.wip;

public class TestConstants {
// POSTGRESQL
public static final String POSTGRES_USER = "debezium";
public static final String POSTGRES_PASSWORD = "dbz";
public static final String POSTGRES_DATABASE = "debezium";
public static final String POSTGRES_IMAGE = "quay.io/debezium/example-postgres";
public static final int POSTGRES_PORT = 5432;
private TestConstants() {
// intentionally private
}

public static final int INITIAL_CUSTOMER_COUNT = 4;
public static final int INITIAL_SCHEMA_HISTORY_SIZE = 16;
public static final String LOCALHOST = "localhost";

// REDIS
public static final String REDIS_IMAGE = "redis";
public static final int REDIS_PORT = 6379;

// MYSQL
public static final String MYSQL_USER = "debezium";
public static final String MYSQL_PASSWORD = "dbz";
public static final String MYSQL_ROOT_PASSWORD = "debezium";
public static final String MYSQL_DATABASE = "inventory";
public static final String MYSQL_PRIVILEGED_USER = "mysqluser";
public static final String MYSQL_PRIVILEGED_PASSWORD = "mysqlpassword";
public static final int MYSQL_PORT = 3306;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,22 @@
*/
package io.debezium.server.redis.wip;

import static io.debezium.server.redis.wip.TestConstants.POSTGRES_DATABASE;
import static io.debezium.server.redis.wip.TestConstants.POSTGRES_IMAGE;
import static io.debezium.server.redis.wip.TestConstants.POSTGRES_PASSWORD;
import static io.debezium.server.redis.wip.TestConstants.POSTGRES_PORT;
import static io.debezium.server.redis.wip.TestConstants.POSTGRES_USER;
import static io.debezium.server.redis.wip.TestConstants.LOCALHOST;
import static io.debezium.server.redis.wip.TestConstants.MYSQL_PORT;
import static io.debezium.server.redis.wip.TestConstants.MYSQL_PRIVILEGED_PASSWORD;
import static io.debezium.server.redis.wip.TestConstants.MYSQL_PRIVILEGED_USER;
import static io.debezium.server.redis.wip.TestConstants.MYSQL_ROOT_PASSWORD;
import static io.debezium.server.redis.wip.TestConstants.REDIS_IMAGE;
import static io.debezium.server.redis.wip.TestConstants.REDIS_PORT;
import static io.debezium.server.redis.wip.TestProperties.DEBEZIUM_SERVER_IMAGE;
import static io.debezium.server.redis.wip.TestUtils.getRedisContainerAddress;

import java.util.Map;
import java.time.Duration;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;

import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
Expand All @@ -27,34 +29,43 @@ public class TestContainersRedisTestBase {
protected DebeziumTestContainerWrapper postgres;
protected DebeziumTestContainerWrapper redis;
protected DebeziumTestContainerWrapper server;
protected DebeziumTestContainerWrapper mysql;
protected Jedis jedis;

public TestContainersRedisTestBase() {
// provide base configuration for all components
postgres = new DebeziumTestContainerWrapper(POSTGRES_IMAGE)
.withExposedPorts(POSTGRES_PORT)
.withEnv(Map.of("POSTGRES_USER", POSTGRES_USER,
"POSTGRES_PASSWORD", POSTGRES_PASSWORD,
"POSTGRES_DB", POSTGRES_DATABASE,
"POSTGRES_INITDB_ARGS", "\"-E UTF8\"",
"LANG", "en_US.utf8"));
Network network = Network.newNetwork();

mysql = new DebeziumTestContainerWrapper("quay.io/debezium/example-mysql")
.withNetwork(network)
.withNetworkAlias("mysql")
.waitingFor(Wait.forLogMessage(".*mysqld: ready for connections.*", 2))
.withEnv("MYSQL_ROOT_PASSWORD", MYSQL_ROOT_PASSWORD)
.withEnv("MYSQL_USER", MYSQL_PRIVILEGED_USER)
.withEnv("MYSQL_PASSWORD", MYSQL_PRIVILEGED_PASSWORD)
.withExposedPorts(MYSQL_PORT)
.withStartupTimeout(Duration.ofSeconds(180));
redis = new DebeziumTestContainerWrapper(REDIS_IMAGE)
.withClasspathResourceMapping("redis", "/etc/redis", BindMode.READ_ONLY)
.withNetwork(network)
.withNetworkAlias("redis")
.withExposedPorts(REDIS_PORT);
server = new DebeziumTestContainerWrapper(DEBEZIUM_SERVER_IMAGE);
server = new DebeziumTestContainerWrapper(DEBEZIUM_SERVER_IMAGE)
.withNetwork(network)
.withCommand("-jar", "quarkus-run.jar");
}

@BeforeEach
public void setUp() {
postgres.start();
mysql.start();
redis.start();
jedis = new Jedis(HostAndPort.from(getRedisContainerAddress(redis)));

jedis = new Jedis(HostAndPort.from(LOCALHOST + ":" + redis.getMappedPort(REDIS_PORT)));
}

@AfterEach
public void tearDown() {
server.stop();
postgres.stop();
mysql.stop();
redis.stop();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
*/
package io.debezium.server.redis.wip;

import static io.debezium.server.redis.wip.TestUtils.awaitStreamLength;
import static io.debezium.server.redis.wip.TestUtils.getContainerIp;
import static org.assertj.core.api.Assertions.assertThat;
import static io.debezium.server.redis.wip.TestConstants.INITIAL_CUSTOMER_COUNT;
import static io.debezium.server.redis.wip.TestConstants.INITIAL_SCHEMA_HISTORY_SIZE;
import static io.debezium.server.redis.wip.TestConstants.LOCALHOST;
import static io.debezium.server.redis.wip.TestConstants.REDIS_PORT;
import static io.debezium.server.redis.wip.TestUtils.insertCustomerToMySql;
import static io.debezium.server.redis.wip.TestUtils.waitForStreamLength;

import java.util.Map;
import java.io.IOException;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
Expand All @@ -24,61 +27,46 @@
public class TestContainersSslStreamIT extends TestContainersRedisTestBase {

private static final Logger LOGGER = LoggerFactory.getLogger(TestContainersSslStreamIT.class);
private static final int REDIS_NON_SSL_PORT = 6378;

public TestContainersSslStreamIT() {
super();
redis
.withCommand(
"redis-server --tls-port 6379 " +
"--port 6378 " +
"redis-server --tls-port " + REDIS_PORT + " " +
"--port " + REDIS_NON_SSL_PORT + " " +
"--tls-cert-file /etc/certificates/redis.crt " +
"--tls-key-file /etc/certificates/redis.key " +
"--tls-ca-cert-file /etc/certificates/ca.crt")
.withExposedPorts(REDIS_NON_SSL_PORT, REDIS_PORT)
.withClasspathResourceMapping("ssl", "/etc/certificates", BindMode.READ_ONLY);
// server
// TODO why is withCommand not working
// .withCommand("-Djavax.net.ssl.keyStore=/ssl/client-keystore.p12",
// "-Djavax.net.ssl.trustStore=/ssl/client-truststore.p12",
// "-Djavax.net.ssl.keyStorePassword=secret",
// "-Djavax.net.ssl.trustStorePassword=secret")
// .withClasspathResourceMapping("ssl", "/etc/certificates", BindMode.READ_WRITE);
server
.withCommand("-Djavax.net.ssl.keyStore=/ssl/client-keystore.p12",
"-Djavax.net.ssl.trustStore=/ssl/client-truststore.p12",
"-Djavax.net.ssl.keyStorePassword=secret",
"-Djavax.net.ssl.trustStorePassword=secret",
"-jar", "quarkus-run.jar")
.withClasspathResourceMapping("ssl", "/ssl", BindMode.READ_WRITE);
}

@Test
public void testRedisSslStream() {
prepareServerEnv();
public void shouldStreamWithSslEnabled() throws IOException, InterruptedException {
server.setEnv(new DebeziumServerConfigBuilder().withBaseMySqlConfig(redis, mysql)
.withValue("debezium.sink.redis.ssl.enabled", "true")
.withValue("debezium.source.database.ssl.mode", "disabled")
.withValue("debezium.source.offset.storage", "io.debezium.server.redis.RedisOffsetBackingStore")
.build());
server.start();

Jedis jedis = new Jedis(new HostAndPort(getContainerIp(redis), 6378));
final int MESSAGE_COUNT = 4;
jedis = new Jedis(new HostAndPort(LOCALHOST, redis.getMappedPort(REDIS_NON_SSL_PORT)));
final String STREAM_NAME = "testc.inventory.customers";
final String HASH_NAME = "metadata:debezium:offsets";

awaitStreamLength(jedis, STREAM_NAME, MESSAGE_COUNT);
waitForStreamLength(jedis, STREAM_NAME, INITIAL_CUSTOMER_COUNT);

long streamLength = jedis.xlen(STREAM_NAME);
assertThat(streamLength).isEqualTo(MESSAGE_COUNT);
insertCustomerToMySql(mysql, "Sergei", "Savage", "sesa@email.com");
waitForStreamLength(jedis, STREAM_NAME, INITIAL_CUSTOMER_COUNT + 1);

// wait until the offsets are re-written
TestUtils.awaitHashSizeGte(jedis, HASH_NAME, 1);

Map<String, String> redisOffsets = jedis.hgetAll(HASH_NAME);
assertThat(redisOffsets.size()).isPositive();

jedis.close();
}

private void prepareServerEnv() {
if (!redis.isRunning() || !postgres.isRunning()) {
throw new IllegalStateException("Cannot prepare server environment without redis and postgres running");
}

server.setEnv(new DebeziumServerConfigBuilder()
.withBaseConfig(redis, postgres)
.withValue("debezium.source.offset.storage", "io.debezium.server.redis.RedisOffsetBackingStore")
.withValue("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector")
.withValue("debezium.source.offset.storage.redis.ssl.enabled", "true")
.withValue("debezium.sink.redis.ssl.enabled", "true")
.build());
TestUtils.awaitHashSizeGte(jedis, "metadata:debezium:offsets", 1);
waitForStreamLength(jedis, "metadata:debezium:schema_history", INITIAL_SCHEMA_HISTORY_SIZE);
}
}
Loading

0 comments on commit e454acb

Please sign in to comment.