diff --git a/rabbitmq/src/main/java/io/micronaut/rabbitmq/connect/ChannelPool.java b/rabbitmq/src/main/java/io/micronaut/rabbitmq/connect/ChannelPool.java index 9ab76bf1e..c75c2874e 100644 --- a/rabbitmq/src/main/java/io/micronaut/rabbitmq/connect/ChannelPool.java +++ b/rabbitmq/src/main/java/io/micronaut/rabbitmq/connect/ChannelPool.java @@ -31,6 +31,11 @@ */ public interface ChannelPool extends Named { + /** + * The default delay to apply for recovery channel getter. + */ + int DEFAULT_RECOVERY_DELAY = 5000; + /** * Retrieves a channel from the pool. The channel must be returned to the * pool after it is no longer being used. @@ -40,6 +45,31 @@ public interface ChannelPool extends Named { */ Channel getChannel() throws IOException; + /** + * Retrieves a channel from the pool after blocking the thread for a delay period defined by the + * {@link com.rabbitmq.client.ConnectionFactory#getRecoveryDelayHandler() RecoveryDelayHandler} + * of the connection for this pool. + * + * @param recoveryAttempts the number of recovery attempts so far + * @return a channel from the pool + * @throws IOException if a channel needed to be created and encountered an error + * @throws InterruptedException if the thread was interrupted during the delay period + */ + default Channel getChannelWithRecoveringDelay(int recoveryAttempts) throws IOException, InterruptedException { + Thread.sleep(DEFAULT_RECOVERY_DELAY); + return getChannel(); + } + + /** + * Returns whether {@link com.rabbitmq.client.ConnectionFactory#isTopologyRecoveryEnabled() topology recovery} + * is enabled for the connection of this pool. + * + * @return true by default + */ + default boolean isTopologyRecoveryEnabled() { + return true; + } + /** * Returns a channel to the pool. No further use of the channel * is allowed by the returner. diff --git a/rabbitmq/src/main/java/io/micronaut/rabbitmq/connect/DefaultChannelPool.java b/rabbitmq/src/main/java/io/micronaut/rabbitmq/connect/DefaultChannelPool.java index dac9d4ccf..976dfb935 100644 --- a/rabbitmq/src/main/java/io/micronaut/rabbitmq/connect/DefaultChannelPool.java +++ b/rabbitmq/src/main/java/io/micronaut/rabbitmq/connect/DefaultChannelPool.java @@ -18,6 +18,7 @@ import com.rabbitmq.client.AlreadyClosedException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; +import com.rabbitmq.client.RecoveryDelayHandler; import io.micronaut.context.annotation.EachBean; import io.micronaut.context.annotation.Parameter; import org.slf4j.Logger; @@ -48,6 +49,8 @@ public class DefaultChannelPool implements AutoCloseable, ChannelPool { private final Connection connection; private final AtomicLong totalChannels = new AtomicLong(0); private final String name; + private final RecoveryDelayHandler recoveryDelayHandler; + private final boolean topologyRecoveryEnabled; /** * Default constructor. @@ -62,6 +65,8 @@ public DefaultChannelPool(@Parameter String name, this.name = name; this.connection = connection; Integer maxIdleChannels = config.getChannelPool().getMaxIdleChannels().orElse(null); + this.recoveryDelayHandler = config.params(null).getRecoveryDelayHandler(); + topologyRecoveryEnabled = config.isTopologyRecoveryEnabled(); this.channels = new LinkedBlockingQueue<>(maxIdleChannels == null ? Integer.MAX_VALUE : maxIdleChannels); } @@ -88,6 +93,17 @@ public Channel getChannel() throws IOException { return channel; } + @Override + public Channel getChannelWithRecoveringDelay(int recoveryAttempts) throws IOException, InterruptedException { + Thread.sleep(recoveryDelayHandler.getDelay(recoveryAttempts)); + return getChannel(); + } + + @Override + public boolean isTopologyRecoveryEnabled() { + return topologyRecoveryEnabled; + } + @Override public void returnChannel(Channel channel) { if (channel.isOpen()) { diff --git a/rabbitmq/src/main/java/io/micronaut/rabbitmq/intercept/RabbitMQConsumerAdvice.java b/rabbitmq/src/main/java/io/micronaut/rabbitmq/intercept/RabbitMQConsumerAdvice.java index 8434034eb..14c91f599 100644 --- a/rabbitmq/src/main/java/io/micronaut/rabbitmq/intercept/RabbitMQConsumerAdvice.java +++ b/rabbitmq/src/main/java/io/micronaut/rabbitmq/intercept/RabbitMQConsumerAdvice.java @@ -18,7 +18,9 @@ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.AlreadyClosedException; import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Delivery; import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.client.RecoverableChannel; import io.micronaut.context.BeanContext; import io.micronaut.context.processor.ExecutableMethodProcessor; @@ -50,16 +52,15 @@ import javax.annotation.PreDestroy; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; /** * An {@link ExecutableMethodProcessor} that will process all beans annotated @@ -80,7 +81,7 @@ public class RabbitMQConsumerAdvice implements ExecutableMethodProcessor, private final RabbitMessageSerDesRegistry serDesRegistry; private final ConversionService conversionService; private final Map channelPools; - private final Map consumerChannels = new ConcurrentHashMap<>(); + private final List consumers = new CopyOnWriteArrayList<>(); /** * Default constructor. @@ -125,17 +126,11 @@ public void process(BeanDefinition beanDefinition, ExecutableMethod met boolean hasAckArg = Arrays.stream(method.getArguments()) .anyMatch(arg -> Acknowledgement.class.isAssignableFrom(arg.getType())); - String connection = method.stringValue(RabbitConnection.class, "connection") - .orElse(RabbitConnection.DEFAULT_CONNECTION); - - ChannelPool channelPool = channelPools.get(connection); - if (channelPool == null) { - throw new MessageListenerException(String.format("Failed to find a channel pool named [%s] to register a listener", connection)); - } - + Integer prefetch = queueAnn.get("prefetch", Integer.class).orElse(null); int numberOfConsumers = queueAnn.intValue("numberOfConsumers").orElse(1); - Integer prefetch = queueAnn.get("prefetch", Integer.class).orElse(null); + ChannelPool channelPool = getChannelPool(method); + ExecutorService executorService = getExecutorService(method); Map arguments = retrieveArguments(method); Object bean = getExecutableMethodBean(beanDefinition, method); @@ -143,143 +138,95 @@ public void process(BeanDefinition beanDefinition, ExecutableMethod met DefaultExecutableBinder binder = new DefaultExecutableBinder<>(); - List channelList = new ArrayList<>(numberOfConsumers); - for (int i = 0; i < numberOfConsumers; i++) { - channelList.add(getChannel(channelPool)); - } + DeliverCallback deliverCallback = (channel, message) -> { + final RabbitConsumerState state = new RabbitConsumerState(message.getEnvelope(), + message.getProperties(), message.getBody(), channel); - ExecutorService executorService = getExecutorService(method); - - int idx = 0; - for (Channel channel : channelList) { - String clientTag = methodTag + "[" + idx + "]"; - idx ++; + BoundExecutable boundExecutable = null; try { - if (prefetch != null) { - channel.basicQos(prefetch); - } - } catch (IOException e) { - throw new MessageListenerException(String.format("Failed to set a prefetch count of [%s] on the channel", prefetch), e); + boundExecutable = binder.bind(method, binderRegistry, state); + } catch (Throwable e) { + handleException(new RabbitListenerException("An error occurred binding the message to the method", + e, bean, state)); } - - ConsumerState state = new ConsumerState(); - state.channelPool = channelPool; - state.consumerTag = clientTag; - consumerChannels.put(channel, state); - try { - - if (LOG.isDebugEnabled()) { - LOG.debug("Registering a consumer to queue [{}] with client tag [{}]", queue, clientTag); - } - - channel.basicConsume(queue, false, clientTag, false, exclusive, arguments, new DefaultConsumer() { - - @Override - public void handleTerminate(String consumerTag) { - if (channel instanceof RecoverableChannel) { - if (LOG.isDebugEnabled()) { - LOG.debug("The channel was been terminated. Automatic recovery attempt is underway for consumer [{}]", clientTag); - } - } else { - ConsumerState state = consumerChannels.remove(channel); - if (state != null) { - state.channelPool.returnChannel(channel); - if (LOG.isDebugEnabled()) { - LOG.debug("The channel was terminated. The consumer [{}] will no longer receive messages", clientTag); - } + if (boundExecutable != null) { + try (RabbitMessageCloseable closeable = new RabbitMessageCloseable(state, false, reQueue) + .withAcknowledge(hasAckArg ? null : false)) { + Object returnedValue = boundExecutable.invoke(bean); + + String replyTo = message.getProperties().getReplyTo(); + if (!isVoid && StringUtils.isNotEmpty(replyTo)) { + MutableBasicProperties replyProps = new MutableBasicProperties(); + replyProps.setCorrelationId(message.getProperties().getCorrelationId()); + + byte[] converted = null; + if (returnedValue != null) { + RabbitMessageSerDes serDes = serDesRegistry + .findSerdes(method.getReturnType().asArgument()) + .map(RabbitMessageSerDes.class::cast) + .orElseThrow(() -> new RabbitListenerException(String.format( + "Could not find a serializer for the body argument of type [%s]", + returnedValue.getClass().getName()), bean, state)); + + converted = serDes.serialize(returnedValue, replyProps); } - } - } - - public void doHandleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { - final RabbitConsumerState state = new RabbitConsumerState(envelope, properties, body, channel); - BoundExecutable boundExecutable = null; - try { - boundExecutable = binder.bind(method, binderRegistry, state); - } catch (Throwable e) { - handleException(new RabbitListenerException("An error occurred binding the message to the method", e, bean, state)); + channel.basicPublish("", replyTo, replyProps.toBasicProperties(), converted); } - try { - if (boundExecutable != null) { - try (RabbitMessageCloseable closeable = new RabbitMessageCloseable(state, false, reQueue).withAcknowledge(hasAckArg ? null : false)) { - Object returnedValue = boundExecutable.invoke(bean); - - String replyTo = properties.getReplyTo(); - if (!isVoid && StringUtils.isNotEmpty(replyTo)) { - MutableBasicProperties replyProps = new MutableBasicProperties(); - replyProps.setCorrelationId(properties.getCorrelationId()); - - byte[] converted = null; - if (returnedValue != null) { - RabbitMessageSerDes serDes = serDesRegistry.findSerdes(method.getReturnType().asArgument()) - .map(RabbitMessageSerDes.class::cast) - .orElseThrow(() -> new RabbitListenerException(String.format("Could not find a serializer for the body argument of type [%s]", returnedValue.getClass().getName()), bean, state)); - - converted = serDes.serialize(returnedValue, replyProps); - } - - channel.basicPublish("", replyTo, replyProps.toBasicProperties(), converted); - } - - if (!hasAckArg) { - closeable.withAcknowledge(true); - } - } catch (MessageAcknowledgementException e) { - throw e; - } catch (Throwable e) { - if (e instanceof RabbitListenerException) { - handleException((RabbitListenerException) e); - } else { - handleException(new RabbitListenerException("An error occurred executing the listener", e, bean, state)); - } - } - } else { - new RabbitMessageCloseable(state, false, reQueue).withAcknowledge(false).close(); - } - } catch (MessageAcknowledgementException e) { - if (!channel.isOpen()) { - ConsumerState consumerState = consumerChannels.remove(channel); - if (consumerState != null) { - consumerState.channelPool.returnChannel(channel); - } - if (LOG.isErrorEnabled()) { - LOG.error("The channel was closed due to an exception. The consumer [{}] will no longer receive messages", clientTag); - } - } - handleException(new RabbitListenerException(e.getMessage(), e, bean, null)); - } finally { - consumerChannels.get(channel).inProgress = false; + if (!hasAckArg) { + closeable.withAcknowledge(true); } + } catch (MessageAcknowledgementException e) { + throw e; + } catch (RabbitListenerException e) { + handleException(e); + } catch (Throwable e) { + handleException(new RabbitListenerException("An error occurred executing the listener", + e, bean, state)); } - - @Override - public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - consumerChannels.get(channel).inProgress = true; - - if (executorService != null) { - executorService.submit(() -> doHandleDelivery(consumerTag, envelope, properties, body)); - } else { - doHandleDelivery(consumerTag, envelope, properties, body); - } - } - }); - } catch (Throwable e) { - if (!channel.isOpen()) { - channelPool.returnChannel(channel); - consumerChannels.remove(channel); - if (LOG.isErrorEnabled()) { - LOG.error("The channel was closed due to an exception. The consumer [{}] will no longer receive messages", clientTag); - } + } else { + new RabbitMessageCloseable(state, false, reQueue).withAcknowledge(false).close(); } - handleException(new RabbitListenerException("An error occurred subscribing to a queue", e, bean, null)); + } catch (MessageAcknowledgementException e) { + handleException(new RabbitListenerException(e.getMessage(), e, bean, state)); + } + }; + + try { + for (int idx = 0; idx < numberOfConsumers; idx++) { + String consumerTag = methodTag + "[" + idx + "]"; + LOG.debug("Registering a consumer to queue [{}] with client tag [{}]", queue, consumerTag); + consumers.add(new RecoverableConsumerWrapper(queue, consumerTag, executorService, + exclusive, arguments, channelPool, prefetch, deliverCallback)); } + } catch (Throwable e) { + handleException(new RabbitListenerException("An error occurred subscribing to a queue", e, bean, null)); } } } + private ChannelPool getChannelPool(ExecutableMethod method) { + String connection = method.stringValue(RabbitConnection.class, "connection") + .orElse(RabbitConnection.DEFAULT_CONNECTION); + + return Optional.ofNullable(channelPools.get(connection)) + .orElseThrow(() -> new MessageListenerException( + String.format("Failed to find a channel pool named [%s] to register a listener", connection))); + } + + private static void setChannelPrefetch(Integer prefetch, Channel channel) { + try { + if (prefetch != null) { + channel.basicQos(prefetch); + } + } catch (IOException e) { + throw new MessageListenerException(String.format("Failed to set a prefetch count of [%s] on the channel", + prefetch), e); + } + } + private Map retrieveArguments(ExecutableMethod method) { Map arguments = new HashMap<>(); @@ -288,11 +235,11 @@ private Map retrieveArguments(ExecutableMethod method) { propertyAnnotations.forEach((prop) -> { String name = prop.getRequiredValue("name", String.class); String value = prop.getValue(String.class).orElse(null); - Class type = prop.get("type", Class.class).orElse(null); + Class type = prop.get("type", Class.class).orElse(null); if (StringUtils.isNotEmpty(name) && StringUtils.isNotEmpty(value)) { if (type != null && type != Void.class) { - Optional converted = conversionService.convert(value, type); + Optional converted = conversionService.convert(value, type); if (converted.isPresent()) { arguments.put(name, converted.get()); } else { @@ -321,7 +268,7 @@ private ExecutorService getExecutorService(ExecutableMethod method) { String executor = method.stringValue(RabbitConnection.class, "executor").orElse(null); if (executor != null) { return beanContext.findBean(ExecutorService.class, Qualifiers.byName(executor)) - .orElseThrow(() -> new MessageListenerException(String.format("Could not find the executor service [%s] specified for the method [%s]", executor, method))); + .orElseThrow(() -> new MessageListenerException(String.format("Could not find the executor service [%s] specified for the method [%s]", executor, method))); } return null; } @@ -329,37 +276,7 @@ private ExecutorService getExecutorService(ExecutableMethod method) { @PreDestroy @Override public void close() throws Exception { - while (!consumerChannels.entrySet().isEmpty()) { - Iterator> it = consumerChannels.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry entry = it.next(); - Channel channel = entry.getKey(); - ConsumerState state = entry.getValue(); - try { - channel.basicCancel(state.consumerTag); - } catch (IOException | AlreadyClosedException e) { - //ignore - } - if (!state.inProgress) { - state.channelPool.returnChannel(channel); - it.remove(); - } - } - } - } - - /** - * Retrieves a channel to use for consuming messages. - * - * @param channelPool The channel pool to retrieve the channel from - * @return A channel to publish with - */ - protected Channel getChannel(ChannelPool channelPool) { - try { - return channelPool.getChannel(); - } catch (IOException e) { - throw new MessageListenerException("Could not retrieve a channel", e); - } + consumers.forEach(RecoverableConsumerWrapper::cancel); } private void handleException(RabbitListenerException exception) { @@ -372,11 +289,225 @@ private void handleException(RabbitListenerException exception) { } /** - * Consumer state. + * Callback interface to be notified when a message is delivered. */ - private static class ConsumerState { - private String consumerTag; - private ChannelPool channelPool; - private volatile boolean inProgress; + @FunctionalInterface + private interface DeliverCallback { + /** + * Inspired by {@link com.rabbitmq.client.DeliverCallback#handle(String, Delivery)}. + * + * @param channel that is used to register the consumer for this callback + * @param message the delivered message + */ + void handle(Channel channel, Delivery message); + } + + /** + * This wrapper around a {@link com.rabbitmq.client.DefaultConsumer} allows to handle the different signals from + * the underlying channel and to react accordingly. + *

+ * If the consumer is canceled due to an external event (like an unavailable queue) we will try to recover from it. + * Exceptions that are caused by the consumer itself will not trigger the recovery process. In such a case the + * consumer will no longer receive any messages. + * + * @see com.rabbitmq.client.Consumer#handleShutdownSignal(String, ShutdownSignalException) + * @see com.rabbitmq.client.Consumer#handleCancel(String) + */ + private class RecoverableConsumerWrapper { + final String consumerTag; + private final ExecutorService executorService; + private final String queue; + private final boolean exclusive; + private final Map arguments; + private final ChannelPool channelPool; + private final Integer prefetch; + private final DeliverCallback deliverCallback; + private com.rabbitmq.client.DefaultConsumer consumer; + private boolean canceled = false; + private final AtomicInteger handlingDeliveryCount = new AtomicInteger(); + + /** + * Create the consumer and register ({@code Channel.basicConsume}) it with a dedicated channel from the + * provided pool. + * + * @throws IOException in case no channel is available or the registration of the consumer fails + */ + RecoverableConsumerWrapper(String queue, String consumerTag, ExecutorService executorService, boolean exclusive, + Map arguments, ChannelPool channelPool, Integer prefetch, DeliverCallback deliverCallback) + throws IOException { + this.queue = queue; + this.consumerTag = consumerTag; + this.executorService = executorService; + this.exclusive = exclusive; + this.arguments = arguments; + this.channelPool = channelPool; + this.prefetch = prefetch; + this.deliverCallback = deliverCallback; + + Channel channel = null; + try { + channel = channelPool.getChannel(); + consumer = createConsumer(channel); + } catch (IOException e) { + if (channel != null) { + channelPool.returnChannel(channel); + } + throw e; + } + } + + /** + * Cancel the consumer and return the associated channel to the pool. + */ + public synchronized void cancel() { + canceled = true; + if (consumer == null) { + return; + } + + Channel channel = consumer.getChannel(); + try { + channel.basicCancel(consumerTag); + } catch (IOException | AlreadyClosedException e) { + // ignore + } + try { + while (handlingDeliveryCount.get() > 0) { + this.wait(500); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + consumer = null; + consumers.remove(this); + channelPool.returnChannel(channel); + } + } + + private void performConsumerRecovery() { + com.rabbitmq.client.DefaultConsumer recoveredConsumer = null; + int recoveryAttempts = 0; + while (recoveredConsumer == null) { + Channel channel = null; + try { + synchronized (this) { + if (canceled) { + return; + } + LOG.debug("consumer [{}] recovery attempt: {}", consumerTag, recoveryAttempts + 1); + channel = channelPool.getChannelWithRecoveringDelay(recoveryAttempts++); + recoveredConsumer = createConsumer(channel); + consumer = recoveredConsumer; + } + } catch (IOException e) { + if (channel != null) { + channelPool.returnChannel(channel); + } + LOG.warn("Recovery attempt {} for consumer [{}] failed, will retry.", + recoveryAttempts, consumerTag, e); + } catch (InterruptedException e) { + LOG.warn("The consumer [{}] recovery was interrupted. The consumer will not recover.", + consumerTag, e); + Thread.currentThread().interrupt(); + return; + } + } + LOG.info("consumer [{}] recovered", consumerTag); + } + + private com.rabbitmq.client.DefaultConsumer createConsumer(Channel channel) throws IOException { + setChannelPrefetch(prefetch, channel); + + com.rabbitmq.client.DefaultConsumer consumer = new com.rabbitmq.client.DefaultConsumer(channel) { + /** + * The consumer was irregular terminated. This may be caused by a deleted or temporary unavailable + * queue. + *

+ * This kind of infrastructure failure may happen due to RabbitMQ cluster node restarts or other + * external actions. The client application will most likely be unable to restore the infrastructure, + * but it should return to normal operation as soon as the external infrastructure problem is solved. + * e.g. the RabbitMQ node restart is complete and the queue is available again. + */ + @Override + public void handleCancel(String consumerTag) throws IOException { + synchronized (RecoverableConsumerWrapper.this) { + RecoverableConsumerWrapper.this.consumer = null; + channelPool.returnChannel(getChannel()); + } + + if (channelPool.isTopologyRecoveryEnabled() && getChannel() instanceof RecoverableChannel) { + LOG.warn("The consumer [{}] subscription was canceled, a recovery will be tried.", + consumerTag); + performConsumerRecovery(); + } else { + LOG.warn("The RabbitMQ consumer [{}] was canceled. Recovery is not enabled. It will no longer receive messages", + consumerTag); + cancel(); + } + } + + /** + * A shutdown signal from the channel or the underlying connection does not always imply that the + * consumer is no longer usable. If the automatic topology recovery is active and the shutdown + * was not initiated by the application it will be recovered by the RabbitMQ client. + *

+ * If the topology recovery is enabled we will also try to recover the consumer if (only) its channel + * fails. These events are not handled by the RabbitMQ client itself as they are most likely application + * specific. Also some edge cases like a delivery acknowledgement timeout may cause a channel shutdown. + * The registered exception handler of the consumer may handle these cases and it is possible to + * resume message handling by re-registering the consumer on a new channel. + */ + @Override + public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { + if (getChannel() instanceof RecoverableChannel && sig.isHardError()) { + // The RabbitMQ client automatic recovery is only triggered by connection errors. + // The consumer will be recovered by the client, so no additional handling here. + LOG.info("The underlying connection was terminated. Automatic recovery attempt is underway for consumer [{}]", + consumerTag); + } else if (channelPool.isTopologyRecoveryEnabled() && getChannel() instanceof RecoverableChannel) { + LOG.info("The channel of this consumer was terminated. Automatic recovery attempt is underway for consumer [{}]", + consumerTag, sig); + synchronized (RecoverableConsumerWrapper.this) { + RecoverableConsumerWrapper.this.consumer = null; + channelPool.returnChannel(getChannel()); + } + performConsumerRecovery(); + } else { + LOG.error("The channel was closed. Recovery is not enabled. The consumer [{}] will no longer receive messages", + consumerTag, sig); + cancel(); + } + } + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) + throws IOException { + // If the broker forces the channel to close, the client may already have prefetched messages in + // memory and will call handleDelivery for these messages, even if they are re-queued by the broker. + // The client will be unable to acknowledge these messages. So it is safe to silently discard + // them, without bothering the callback handler. + if (!getChannel().isOpen()) { + return; + } + handlingDeliveryCount.incrementAndGet(); + if (executorService != null) { + executorService.submit(() -> callbackHandle(envelope, properties, body)); + } else { + callbackHandle(envelope, properties, body); + } + } + + private void callbackHandle(Envelope envelope, AMQP.BasicProperties properties, byte[] body) { + try { + deliverCallback.handle(getChannel(), new Delivery(envelope, properties, body)); + } finally { + handlingDeliveryCount.decrementAndGet(); + } + } + }; + + channel.basicConsume(queue, false, consumerTag, false, exclusive, arguments, consumer); + return consumer; + } } } diff --git a/rabbitmq/src/test/groovy/io/micronaut/rabbitmq/AbstractRabbitMQClusterTest.groovy b/rabbitmq/src/test/groovy/io/micronaut/rabbitmq/AbstractRabbitMQClusterTest.groovy new file mode 100644 index 000000000..db6d0ebd1 --- /dev/null +++ b/rabbitmq/src/test/groovy/io/micronaut/rabbitmq/AbstractRabbitMQClusterTest.groovy @@ -0,0 +1,119 @@ +package io.micronaut.rabbitmq + +import com.github.dockerjava.api.model.HealthCheck +import io.micronaut.context.ApplicationContext +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import org.testcontainers.containers.BindMode +import org.testcontainers.containers.GenericContainer +import org.testcontainers.containers.InternetProtocol +import org.testcontainers.containers.Network +import org.testcontainers.containers.output.Slf4jLogConsumer +import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy +import org.testcontainers.containers.wait.strategy.Wait +import org.testcontainers.utility.DockerImageName +import spock.lang.Specification +import spock.util.concurrent.PollingConditions + +import java.time.Duration + +class AbstractRabbitMQClusterTest extends Specification { + private static final int AMQP_PORT = 5672 + private static final DockerImageName RABBIT_IMAGE = DockerImageName.parse("library/rabbitmq:3.8-management") + private static final Logger log = LoggerFactory.getLogger(AbstractRabbitMQClusterTest.class) + private static final String CLUSTER_COOKIE = "test-cluster" + private static final String RABBIT_CONFIG_PATH = ClassLoader.getSystemResource("rabbit/rabbitmq.conf").getPath() + private static final String RABBIT_DEFINITIONS_PATH = ClassLoader.getSystemResource("rabbit/definitions.json").getPath() + private static final Network mqClusterNet = Network.newNetwork() + + public static final String EXCHANGE = "test-exchange" + public static final String QUEUE = "test-durable-queue" + public static final GenericContainer NODE1_CONT = new GenericContainer<>(RABBIT_IMAGE) + public static final GenericContainer NODE2_CONT = new GenericContainer<>(RABBIT_IMAGE) + public static final GenericContainer NODE3_CONT = new GenericContainer<>(RABBIT_IMAGE) + public static int node1Port + public static int node2Port + public static int node3Port + + + static { + PollingConditions until = new PollingConditions(timeout: 60) + getNodePorts() + log.info("rabbit.conf path: {}", RABBIT_CONFIG_PATH) + log.info("rabbit definitions path: {}", RABBIT_DEFINITIONS_PATH) + log.info("rabbit node ports: {}, {}, {}", node1Port, node2Port, node3Port) + + configureContainer(NODE1_CONT, "rabbitmq1", node1Port) + // first node must boot up completely so that the other nodes can join the new cluster + NODE1_CONT.waitingFor(Wait.forHealthcheck().withStartupTimeout(Duration.ofMinutes(1))) + NODE1_CONT.start() + log.info("first node startup complete") + + configureContainer(NODE2_CONT, "rabbitmq2", node2Port) + configureContainer(NODE3_CONT, "rabbitmq3", node3Port) + // node 2 and 3 may start up in parallel as they can join the already existing cluster + NODE2_CONT.waitingFor(new DoNotWaitStrategy()) + NODE3_CONT.waitingFor(new DoNotWaitStrategy()) + NODE2_CONT.start() + NODE3_CONT.start() + until.eventually { + assert NODE2_CONT.isHealthy() + assert NODE3_CONT.isHealthy() + } + log.info("cluster startup complete") + } + + protected ApplicationContext startContext(Map additionalConfig = [:]) { + Map properties = ["spec.name" : getClass().simpleName] + properties.put("rabbitmq.servers.node1.port", node1Port) + properties.put("rabbitmq.servers.node2.port", node2Port) + properties.put("rabbitmq.servers.node3.port", node3Port) + properties << additionalConfig + + log.info("context properties: {}", properties) + ApplicationContext.run(properties, "test") + } + + private static getNodePorts() { + try (ServerSocket s1 = new ServerSocket(0) + ServerSocket s2 = new ServerSocket(0) + ServerSocket s3 = new ServerSocket(0)) { + node1Port = s1.getLocalPort() + node2Port = s2.getLocalPort() + node3Port = s3.getLocalPort() + } + } + + private static configureContainer(GenericContainer mqContainer, String hostname, int nodePort) { + mqContainer + .withEnv("RABBITMQ_ERLANG_COOKIE", CLUSTER_COOKIE) + .withFileSystemBind(RABBIT_CONFIG_PATH, "/etc/rabbitmq/rabbitmq.conf", BindMode.READ_ONLY) + .withFileSystemBind(RABBIT_DEFINITIONS_PATH, "/etc/rabbitmq/definitions.json", BindMode.READ_ONLY) + .withNetwork(mqClusterNet) + .withLogConsumer(new Slf4jLogConsumer(log).withPrefix(hostname)) + .withCreateContainerCmdModifier(cmd -> cmd + .withHostName(hostname) + .withHealthcheck(new HealthCheck() + .withTest(Arrays.asList("CMD-SHELL", "rabbitmqctl status")) + .withStartPeriod(Duration.ofMinutes(4).toNanos()) + .withInterval(Duration.ofSeconds(5).toNanos()) + .withRetries(10) + .withTimeout(Duration.ofSeconds(5).toNanos()))) + // Use fixed port binding, because the dynamic port binding would use different port on each container start. + // These changing ports would make any reconnect attempt impossible, as the client assumes that the broker + // address does not change. + addPortBinding(mqContainer, nodePort, AMQP_PORT) + } + + private static addPortBinding(GenericContainer cont, int hostPort, int contPort) { + cont.getPortBindings().add(String.format("%d:%d/%s", + hostPort, contPort, InternetProtocol.TCP.toDockerNotation())) + } + + private static class DoNotWaitStrategy extends AbstractWaitStrategy { + @Override + protected void waitUntilReady() { + // NOOP - do not wait + } + } +} diff --git a/rabbitmq/src/test/groovy/io/micronaut/rabbitmq/listener/ConsumerRecoverySpec.groovy b/rabbitmq/src/test/groovy/io/micronaut/rabbitmq/listener/ConsumerRecoverySpec.groovy new file mode 100644 index 000000000..31ce979c2 --- /dev/null +++ b/rabbitmq/src/test/groovy/io/micronaut/rabbitmq/listener/ConsumerRecoverySpec.groovy @@ -0,0 +1,271 @@ +package io.micronaut.rabbitmq.listener + +import com.github.dockerjava.api.DockerClient +import com.rabbitmq.client.* +import io.micronaut.context.ApplicationContext +import io.micronaut.context.annotation.Requires +import io.micronaut.messaging.annotation.MessageBody +import io.micronaut.rabbitmq.AbstractRabbitMQClusterTest +import io.micronaut.rabbitmq.annotation.Queue +import io.micronaut.rabbitmq.annotation.RabbitListener +import io.micronaut.rabbitmq.bind.RabbitConsumerState +import io.micronaut.rabbitmq.exception.RabbitListenerException +import io.micronaut.rabbitmq.exception.RabbitListenerExceptionHandler +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import org.testcontainers.DockerClientFactory +import org.testcontainers.containers.GenericContainer +import spock.lang.Shared +import spock.util.concurrent.PollingConditions + +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException + +import static org.hamcrest.MatcherAssert.assertThat +import static org.hamcrest.Matchers.equalTo + +class ConsumerRecoverySpec extends AbstractRabbitMQClusterTest { + private static final Logger log = LoggerFactory.getLogger(ConsumerRecoverySpec.class) + private static final DockerClient DOCKER_CLIENT = DockerClientFactory.lazyClient() + + @Shared + private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor() + @Shared + private Set publishedMessages = new LinkedHashSet<>() + @Shared + private boolean enablePublisher = false + + def setupSpec() { + /* + * The current Micronaut publisher implementation has a flaw in detecting unroutable drop/return messages + * in a Rabbit cluster setup. It considers the messages as published even if the broker did not enqueue it. + * So for this test a simple custom publisher is used that detects unpublished messages. + */ + ConnectionFactory connectionFactory = new ConnectionFactory() + connectionFactory.setPort(node3Port) + connectionFactory.newConnection().openChannel() + .map(ch -> { + try { + ch.confirmSelect() + // returned messages must not count as published + ch.addReturnListener(new ReturnCallback() { + @Override + void handle(Return r) { + String returned = new String(r.getBody()) + publishedMessages.remove(returned) + log.warn("{} publish message returned: {}", publishedMessages.size(), returned) + } + }) + } catch (IOException e) { + log.error("failed to set confirmSelect", e) + } + return ch + }) + .ifPresent( + ch -> executorService.scheduleWithFixedDelay(() -> { + if (!enablePublisher) { + return + } + String msg = UUID.randomUUID().toString() + try { + publishedMessages.add(msg) + ch.basicPublish(EXCHANGE, "", true, + new AMQP.BasicProperties.Builder().deliveryMode(2).build(), + msg.getBytes()) + if (ch.waitForConfirms(1000)) { + log.info("publish ack") + } + } catch (IOException | RuntimeException | InterruptedException | TimeoutException e) { + publishedMessages.remove(msg) + log.error("failed to publish: {}", e.getMessage()) + } + }, 500, 500, TimeUnit.MILLISECONDS)) + } + + def cleanupSpec() { + executorService.shutdown() + } + + def setup() { + publishedMessages.clear() + enablePublisher = true + } + + def "test restart of Node1 with consumer connected to Node1"() { + given: + ApplicationContext ctx = startContext(["connectToNode": "node1"]) + TestConsumer consumer = ctx.getBean(TestConsumer) + awaitPublishConsumeOfMessages(consumer) + + when: + restartContainer(NODE1_CONT) + + then: + awaitPublishConsumeOfMessages(consumer) + stopPublishAndAssertAllConsumed(consumer) + + cleanup: + ctx.close() + } + + def "test restart of Node1 with consumer connected to Node2"() { + given: + ApplicationContext ctx = startContext(["connectToNode": "node2"]) + TestConsumer consumer = ctx.getBean(TestConsumer) + awaitPublishConsumeOfMessages(consumer) + + when: + restartContainer(NODE1_CONT) + + then: + awaitPublishConsumeOfMessages(consumer) + stopPublishAndAssertAllConsumed(consumer) + + cleanup: + ctx.close() + } + + def "test restart of Node2 with consumer connected to Node1"() { + given: + ApplicationContext ctx = startContext(["connectToNode": "node1"]) + TestConsumer consumer = ctx.getBean(TestConsumer) + awaitPublishConsumeOfMessages(consumer) + + when: + restartContainer(NODE2_CONT) + + then: + awaitPublishConsumeOfMessages(consumer) + stopPublishAndAssertAllConsumed(consumer) + + cleanup: + ctx.close() + } + + def "test restart of Node2 with consumer connected to Node2"() { + given: + ApplicationContext ctx = startContext(["connectToNode": "node2"]) + TestConsumer consumer = ctx.getBean(TestConsumer) + awaitPublishConsumeOfMessages(consumer) + + when: + restartContainer(NODE2_CONT) + + then: + awaitPublishConsumeOfMessages(consumer) + stopPublishAndAssertAllConsumed(consumer) + + cleanup: + ctx.close() + } + + def "test consumer recovery after delivery acknowledgement timeout"() { + given: + ApplicationContext ctx = startContext(["connectToNode": "node3"]) + SlowTestConsumer slowConsumer = ctx.getBean(SlowTestConsumer) + PollingConditions until = new PollingConditions(timeout: 120) + + when: + until.eventually { + RabbitListenerException e = slowConsumer.lastException + assert e != null && e.getCause() instanceof AlreadyClosedException + } + slowConsumer.doSlowdown = false; + + then: + stopPublishAndAssertAllConsumed(slowConsumer) + + cleanup: + ctx.close() + } + + + @Requires(property = "spec.name", value = "ConsumerRecoverySpec") + @Requires(property = "connectToNode", value = "node1") + @RabbitListener(connection = "node1") + static class Node1Consumer extends TestConsumer { + } + + @Requires(property = "spec.name", value = "ConsumerRecoverySpec") + @Requires(property = "connectToNode", value = "node2") + @RabbitListener(connection = "node2") + static class Node2Consumer extends TestConsumer { + } + + @Requires(property = "spec.name", value = "ConsumerRecoverySpec") + @Requires(property = "connectToNode", value = "node3") + @RabbitListener(connection = "node3") + static class SlowTestConsumer extends TestConsumer { + public boolean doSlowdown = true + + @Override + @Queue(value = QUEUE, prefetch = 5) + void handleMessage(@MessageBody String body) { + super.handleMessage(body) + + if (doSlowdown && consumedMessages.size() % 10 == 0) { + Thread.sleep(20000) // simulate slow processing + log.info("slow message processing complete") + } + } + } + + private void awaitPublishConsumeOfMessages(TestConsumer consumer) { + PollingConditions until = new PollingConditions(timeout: 60) + int targetPubCount = publishedMessages.size() + 10 + int targetConCount = consumer.consumedMessages.size() + 10 + + until.eventually { + assert publishedMessages.size() > targetPubCount + assert consumer.consumedMessages.size() > targetConCount + } + } + + private void stopPublishAndAssertAllConsumed(TestConsumer consumer) { + PollingConditions until = new PollingConditions(timeout: 60) + enablePublisher = false + + until.eventually { + assertThat "all published messages must be consumed", + publishedMessages, equalTo(consumer.consumedMessages) + } + } + + private static restartContainer(GenericContainer container) throws InterruptedException { + PollingConditions until = new PollingConditions(timeout: 60) + + log.info("stopping container: {}", container.getContainerId()) + DOCKER_CLIENT.stopContainerCmd(container.getContainerId()).exec() + log.info("re-starting container: {}", container.getContainerId()) + DOCKER_CLIENT.startContainerCmd(container.getContainerId()).exec() + until.eventually { + assert container.isHealthy() + } + log.info("started container: {}", container.getContainerId()) + } +} + +abstract class TestConsumer implements RabbitListenerExceptionHandler { + static final Logger log = LoggerFactory.getLogger(TestConsumer.class) + public final Set consumedMessages = new LinkedHashSet<>() + public RabbitListenerException lastException + + @Queue(AbstractRabbitMQClusterTest.QUEUE) + void handleMessage(@MessageBody String body) { + consumedMessages.add(body) + log.info("{} received: {}", consumedMessages.size(), body) + } + + @Override + void handle(RabbitListenerException e) { + lastException = e + String msg = e.getMessageState() + .map(RabbitConsumerState::getBody) + .map(String::new) + .orElse("<>") + consumedMessages.remove(msg) + log.warn("{} failed to consume: {}", consumedMessages.size(), msg, e) + } +} diff --git a/rabbitmq/src/test/resources/rabbit/definitions.json b/rabbitmq/src/test/resources/rabbit/definitions.json new file mode 100644 index 000000000..5a2d9bc38 --- /dev/null +++ b/rabbitmq/src/test/resources/rabbit/definitions.json @@ -0,0 +1,67 @@ +{ + "rabbit_version": "3.8.2", + "rabbitmq_version": "3.8.2", + "users": [ + { + "name": "guest", + "password_hash": "G0NloEh/Nnuyyqh0X3GZG2o3yaiokMXzK7PMaZYSSv4NZesQ", + "hashing_algorithm": "rabbit_password_hashing_sha256", + "tags": "administrator" + } + ], + "vhosts": [ + { + "name": "/" + } + ], + "permissions": [ + { + "user": "guest", + "vhost": "/", + "configure": ".*", + "write": ".*", + "read": ".*" + } + ], + "topic_permissions": [], + "parameters": [], + "global_parameters": [ + { + "name": "cluster_name", + "value": "rabbit@e0d6c0a4fb65" + } + ], + "policies": [], + "queues": [ + { + "name": "test-durable-queue", + "vhost": "/", + "durable": true, + "auto_delete": false, + "arguments": { + "x-queue-type": "classic" + } + } + ], + "exchanges": [ + { + "name": "test-exchange", + "vhost": "/", + "type": "fanout", + "durable": true, + "auto_delete": false, + "internal": false, + "arguments": {} + } + ], + "bindings": [ + { + "source": "test-exchange", + "vhost": "/", + "destination": "test-durable-queue", + "destination_type": "queue", + "routing_key": "", + "arguments": {} + } + ] +} diff --git a/rabbitmq/src/test/resources/rabbit/rabbitmq.conf b/rabbitmq/src/test/resources/rabbit/rabbitmq.conf new file mode 100644 index 000000000..7ff6cf3a2 --- /dev/null +++ b/rabbitmq/src/test/resources/rabbit/rabbitmq.conf @@ -0,0 +1,12 @@ +loopback_users.guest = false +listeners.tcp.default = 5672 +management.tcp.port = 15672 + +consumer_timeout = 5000 + +management.load_definitions = /etc/rabbitmq/definitions.json + +cluster_formation.peer_discovery_backend = classic_config +cluster_formation.classic_config.nodes.1 = rabbit@rabbitmq1 +cluster_formation.classic_config.nodes.2 = rabbit@rabbitmq2 +cluster_formation.classic_config.nodes.3 = rabbit@rabbitmq3