Skip to content

Commit

Permalink
Enable consumer recovery on temporary unavailable queues (#257)
Browse files Browse the repository at this point in the history
* reconnecting consumer implementation

* reconnecting consumer tests

* converted the recovery test to Spock and review changes
  • Loading branch information
mscheibler authored Aug 4, 2021
1 parent 459dfb0 commit e3d941e
Show file tree
Hide file tree
Showing 7 changed files with 819 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
*/
public interface ChannelPool extends Named {

/**
* The default delay to apply for recovery channel getter.
*/
int DEFAULT_RECOVERY_DELAY = 5000;

/**
* Retrieves a channel from the pool. The channel must be returned to the
* pool after it is no longer being used.
Expand All @@ -40,6 +45,31 @@ public interface ChannelPool extends Named {
*/
Channel getChannel() throws IOException;

/**
* Retrieves a channel from the pool after blocking the thread for a delay period defined by the
* {@link com.rabbitmq.client.ConnectionFactory#getRecoveryDelayHandler() RecoveryDelayHandler}
* of the connection for this pool.
*
* @param recoveryAttempts the number of recovery attempts so far
* @return a channel from the pool
* @throws IOException if a channel needed to be created and encountered an error
* @throws InterruptedException if the thread was interrupted during the delay period
*/
default Channel getChannelWithRecoveringDelay(int recoveryAttempts) throws IOException, InterruptedException {
Thread.sleep(DEFAULT_RECOVERY_DELAY);
return getChannel();
}

/**
* Returns whether {@link com.rabbitmq.client.ConnectionFactory#isTopologyRecoveryEnabled() topology recovery}
* is enabled for the connection of this pool.
*
* @return true by default
*/
default boolean isTopologyRecoveryEnabled() {
return true;
}

/**
* Returns a channel to the pool. No further use of the channel
* is allowed by the returner.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.RecoveryDelayHandler;
import io.micronaut.context.annotation.EachBean;
import io.micronaut.context.annotation.Parameter;
import org.slf4j.Logger;
Expand Down Expand Up @@ -48,6 +49,8 @@ public class DefaultChannelPool implements AutoCloseable, ChannelPool {
private final Connection connection;
private final AtomicLong totalChannels = new AtomicLong(0);
private final String name;
private final RecoveryDelayHandler recoveryDelayHandler;
private final boolean topologyRecoveryEnabled;

/**
* Default constructor.
Expand All @@ -62,6 +65,8 @@ public DefaultChannelPool(@Parameter String name,
this.name = name;
this.connection = connection;
Integer maxIdleChannels = config.getChannelPool().getMaxIdleChannels().orElse(null);
this.recoveryDelayHandler = config.params(null).getRecoveryDelayHandler();
topologyRecoveryEnabled = config.isTopologyRecoveryEnabled();
this.channels = new LinkedBlockingQueue<>(maxIdleChannels == null ? Integer.MAX_VALUE : maxIdleChannels);
}

Expand All @@ -88,6 +93,17 @@ public Channel getChannel() throws IOException {
return channel;
}

@Override
public Channel getChannelWithRecoveringDelay(int recoveryAttempts) throws IOException, InterruptedException {
Thread.sleep(recoveryDelayHandler.getDelay(recoveryAttempts));
return getChannel();
}

@Override
public boolean isTopologyRecoveryEnabled() {
return topologyRecoveryEnabled;
}

@Override
public void returnChannel(Channel channel) {
if (channel.isOpen()) {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package io.micronaut.rabbitmq

import com.github.dockerjava.api.model.HealthCheck
import io.micronaut.context.ApplicationContext
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.testcontainers.containers.BindMode
import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.InternetProtocol
import org.testcontainers.containers.Network
import org.testcontainers.containers.output.Slf4jLogConsumer
import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy
import org.testcontainers.containers.wait.strategy.Wait
import org.testcontainers.utility.DockerImageName
import spock.lang.Specification
import spock.util.concurrent.PollingConditions

import java.time.Duration

class AbstractRabbitMQClusterTest extends Specification {
private static final int AMQP_PORT = 5672
private static final DockerImageName RABBIT_IMAGE = DockerImageName.parse("library/rabbitmq:3.8-management")
private static final Logger log = LoggerFactory.getLogger(AbstractRabbitMQClusterTest.class)
private static final String CLUSTER_COOKIE = "test-cluster"
private static final String RABBIT_CONFIG_PATH = ClassLoader.getSystemResource("rabbit/rabbitmq.conf").getPath()
private static final String RABBIT_DEFINITIONS_PATH = ClassLoader.getSystemResource("rabbit/definitions.json").getPath()
private static final Network mqClusterNet = Network.newNetwork()

public static final String EXCHANGE = "test-exchange"
public static final String QUEUE = "test-durable-queue"
public static final GenericContainer NODE1_CONT = new GenericContainer<>(RABBIT_IMAGE)
public static final GenericContainer NODE2_CONT = new GenericContainer<>(RABBIT_IMAGE)
public static final GenericContainer NODE3_CONT = new GenericContainer<>(RABBIT_IMAGE)
public static int node1Port
public static int node2Port
public static int node3Port


static {
PollingConditions until = new PollingConditions(timeout: 60)
getNodePorts()
log.info("rabbit.conf path: {}", RABBIT_CONFIG_PATH)
log.info("rabbit definitions path: {}", RABBIT_DEFINITIONS_PATH)
log.info("rabbit node ports: {}, {}, {}", node1Port, node2Port, node3Port)

configureContainer(NODE1_CONT, "rabbitmq1", node1Port)
// first node must boot up completely so that the other nodes can join the new cluster
NODE1_CONT.waitingFor(Wait.forHealthcheck().withStartupTimeout(Duration.ofMinutes(1)))
NODE1_CONT.start()
log.info("first node startup complete")

configureContainer(NODE2_CONT, "rabbitmq2", node2Port)
configureContainer(NODE3_CONT, "rabbitmq3", node3Port)
// node 2 and 3 may start up in parallel as they can join the already existing cluster
NODE2_CONT.waitingFor(new DoNotWaitStrategy())
NODE3_CONT.waitingFor(new DoNotWaitStrategy())
NODE2_CONT.start()
NODE3_CONT.start()
until.eventually {
assert NODE2_CONT.isHealthy()
assert NODE3_CONT.isHealthy()
}
log.info("cluster startup complete")
}

protected ApplicationContext startContext(Map additionalConfig = [:]) {
Map<String, Object> properties = ["spec.name" : getClass().simpleName]
properties.put("rabbitmq.servers.node1.port", node1Port)
properties.put("rabbitmq.servers.node2.port", node2Port)
properties.put("rabbitmq.servers.node3.port", node3Port)
properties << additionalConfig

log.info("context properties: {}", properties)
ApplicationContext.run(properties, "test")
}

private static getNodePorts() {
try (ServerSocket s1 = new ServerSocket(0)
ServerSocket s2 = new ServerSocket(0)
ServerSocket s3 = new ServerSocket(0)) {
node1Port = s1.getLocalPort()
node2Port = s2.getLocalPort()
node3Port = s3.getLocalPort()
}
}

private static configureContainer(GenericContainer mqContainer, String hostname, int nodePort) {
mqContainer
.withEnv("RABBITMQ_ERLANG_COOKIE", CLUSTER_COOKIE)
.withFileSystemBind(RABBIT_CONFIG_PATH, "/etc/rabbitmq/rabbitmq.conf", BindMode.READ_ONLY)
.withFileSystemBind(RABBIT_DEFINITIONS_PATH, "/etc/rabbitmq/definitions.json", BindMode.READ_ONLY)
.withNetwork(mqClusterNet)
.withLogConsumer(new Slf4jLogConsumer(log).withPrefix(hostname))
.withCreateContainerCmdModifier(cmd -> cmd
.withHostName(hostname)
.withHealthcheck(new HealthCheck()
.withTest(Arrays.asList("CMD-SHELL", "rabbitmqctl status"))
.withStartPeriod(Duration.ofMinutes(4).toNanos())
.withInterval(Duration.ofSeconds(5).toNanos())
.withRetries(10)
.withTimeout(Duration.ofSeconds(5).toNanos())))
// Use fixed port binding, because the dynamic port binding would use different port on each container start.
// These changing ports would make any reconnect attempt impossible, as the client assumes that the broker
// address does not change.
addPortBinding(mqContainer, nodePort, AMQP_PORT)
}

private static addPortBinding(GenericContainer cont, int hostPort, int contPort) {
cont.getPortBindings().add(String.format("%d:%d/%s",
hostPort, contPort, InternetProtocol.TCP.toDockerNotation()))
}

private static class DoNotWaitStrategy extends AbstractWaitStrategy {
@Override
protected void waitUntilReady() {
// NOOP - do not wait
}
}
}
Loading

0 comments on commit e3d941e

Please sign in to comment.