Skip to content

Commit

Permalink
DBZ-6703 Run tests correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
jpechane committed Nov 27, 2023
1 parent d89974e commit 04f8923
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 19 deletions.
18 changes: 17 additions & 1 deletion debezium-server-rabbitmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,26 @@
<artifactId>maven-failsafe-plugin</artifactId>
<executions>
<execution>
<id>integration-test</id>
<id>integration-test-redismq</id>
<goals>
<goal>integration-test</goal>
</goals>
<configuration>
<systemProperties>
<debezium.sink.type>rabbitmq</debezium.sink.type>
</systemProperties>
</configuration>
</execution>
<execution>
<id>integration-test-redismq-stream</id>
<goals>
<goal>integration-test</goal>
</goals>
<configuration>
<systemProperties>
<debezium.sink.type>rabbitmqstream</debezium.sink.type>
</systemProperties>
</configuration>
</execution>
<execution>
<id>verify</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import jakarta.annotation.PostConstruct;
Expand Down Expand Up @@ -48,17 +49,12 @@ public class RabbitMqStreamNativeChangeConsumer extends BaseChangeConsumer imple
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMqStreamNativeChangeConsumer.class);

private static final String PROP_PREFIX = "debezium.sink.rabbitmqstream.";
private static final String PROP_CONNECTION_PREFIX = PROP_PREFIX + "connection.";
private static final String STREAM_NAME = PROP_PREFIX + "stream";
private String stream;

/**
* When true, the routing key is calculated from topic name using stream name mapper.
* When false the routingKey value or empty string is used.
*/
private static final String PROP_STREAM = PROP_PREFIX + "stream";
private static final String PROP_CONNECTION_PREFIX = PROP_PREFIX + "connection.";

@ConfigProperty(name = PROP_PREFIX + "deliveryMode", defaultValue = "2")
int deliveryMode;
@ConfigProperty(name = PROP_STREAM)
Optional<String> stream;

@ConfigProperty(name = PROP_PREFIX + "ackTimeout", defaultValue = "30000")
int ackTimeout;
Expand Down Expand Up @@ -88,15 +84,17 @@ void connect() {
.host(factory.getHost())
.port(factory.getPort()).build();

stream = config.getValue(STREAM_NAME, String.class);
if (stream.isEmpty()) {
throw new DebeziumException("Mandatory configration option '" + PROP_STREAM + "' is not provided");
}

LOGGER.info("Creating stream '{}'", stream);
LOGGER.info("Creating stream '{}'", stream.get());

environment.streamCreator().stream(stream).create();
environment.streamCreator().stream(stream.get()).create();

producer = environment.producerBuilder()
.confirmTimeout(Duration.ofSeconds(ackTimeout))
.stream(stream)
.stream(stream.get())
.build();

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
Expand All @@ -43,6 +44,7 @@
@QuarkusTest
@QuarkusTestResource(PostgresTestResourceLifecycleManager.class)
@QuarkusTestResource(RabbitMqTestResourceLifecycleManager.class)
@EnabledIfSystemProperty(named = "debezium.sink.type", matches = "rabbitmq")
public class RabbitMqIT {

private static final int MESSAGE_COUNT = 4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

@QuarkusTest
@QuarkusTestResource(PostgresTestResourceLifecycleManager.class)
@QuarkusTestResource(value = RabbitMqStreamTestResourceLifecycleManager.class, parallel = true)
@QuarkusTestResource(value = RabbitMqStreamTestResourceLifecycleManager.class)
@EnabledIfSystemProperty(named = "debezium.sink.type", matches = "rabbitmqstream")
public class RabbitMqStreamIT {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ public RabbitMqTestConfigSource() {
String sinkType = System.getProperty("debezium.sink.type");
if ("rabbitmqstream".equals(sinkType)) {
rabbitmqConfig.put("debezium.sink.type", "rabbitmqstream");
rabbitmqConfig.put("debezium.sink.rabbitmqstream.stream", TOPIC_NAME);
}
else {
rabbitmqConfig.put("debezium.sink.type", "rabbitmq");
}

rabbitmqConfig.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
rabbitmqConfig.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
rabbitmqConfig.put("debezium.source.offset.flush.interval.ms", "0");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ public Map<String, String> start() {
try {
init();
}
catch (IOException e) {
throw new RuntimeException(e);
}
catch (InterruptedException e) {
catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
Map<String, String> params = new ConcurrentHashMap<>();
Expand Down

0 comments on commit 04f8923

Please sign in to comment.