From f6be7e18cb5b8e0d3ef2d21a1310ce425a5748a3 Mon Sep 17 00:00:00 2001 From: McMichailidis Date: Tue, 13 Aug 2019 15:24:11 +0300 Subject: [PATCH] Reworked initialize function of broker tester --- README.md | 4 +- .../mmichaildis/amqprunner/BrokerManager.java | 46 +++--- .../amqprunner/broker/AssertionTime.java | 6 +- .../broker/AssertionVerification.java | 20 +-- .../amqprunner/broker/BrokerTester.java | 147 +++++++++--------- .../amqprunner/broker/ReferenceHolder.java | 10 +- .../util/PortExtractingLauncherListener.java | 10 +- .../amqprunner/util/StreamHelpers.java | 6 +- .../BrokerManagerBasicReflectionTest.java | 4 +- .../broker/AssertionTimeTester.java | 18 +-- .../amqprunner/broker/BrokerTesterTest.java | 70 +++------ 11 files changed, 165 insertions(+), 176 deletions(-) diff --git a/README.md b/README.md index b08c91d..4535af0 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,9 @@ # AmqpRunner [![Build Status](https://travis-ci.org/mmichailidis/AmqpRunner.svg?branch=master)](https://travis-ci.org/mmichailidis/AmqpRunner) - [![codecov](https://codecov.io/gh/mmichailidis/AmqpRunner/branch/master/graph/badge.svg)](https://codecov.io/gh/mmichailidis/AmqpRunner) - [![Maven Central](https://img.shields.io/maven-central/v/gr.mmichailidis/amqprunner.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:%22gr.mmichailidis%22%20AND%20a:%22amqprunner%22) +[![Codacy Badge](https://api.codacy.com/project/badge/Grade/a26f2d10a3384dec9832bec64fd082fa)](https://www.codacy.com/app/mmichailidis/AmqpRunner?utm_source=github.com&utm_medium=referral&utm_content=mmichailidis/AmqpRunner&utm_campaign=Badge_Grade) +[![License](https://img.shields.io/github/license/mmichailidis/amqprunner.svg)](https://opensource.org/licenses/Apache-2.0) AmqpRunner is a JUnit runner that provides a fluent api for creating in memory amqp and validating the packets that reached it. It extends diff --git a/src/main/java/gr/mmichaildis/amqprunner/BrokerManager.java b/src/main/java/gr/mmichaildis/amqprunner/BrokerManager.java index c0be4cd..a1a414e 100644 --- a/src/main/java/gr/mmichaildis/amqprunner/BrokerManager.java +++ b/src/main/java/gr/mmichaildis/amqprunner/BrokerManager.java @@ -21,14 +21,19 @@ import gr.mmichaildis.amqprunner.util.PortExtractingLauncherListener; import io.vavr.collection.Stream; import io.vavr.control.Option; +import io.vavr.control.Try; import lombok.extern.slf4j.Slf4j; import org.apache.qpid.server.SystemLauncher; import org.apache.qpid.server.SystemLauncherListener.DefaultSystemLauncherListener; import java.io.File; import java.net.URL; +import java.util.Map; import java.util.*; +import static gr.mmichaildis.amqprunner.util.StreamHelpers.not; +import static gr.mmichaildis.amqprunner.util.StreamHelpers.replaceWith; +import static io.vavr.API.*; import static java.lang.Thread.sleep; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; @@ -47,7 +52,11 @@ public class BrokerManager { private static final Long SLEEP_STEP = 100L; private final String introduction; - private static ReferenceHolder refHolder; + /** + * The path that contains the configuration file for qpid initialization. + */ + private static final String INITIAL_CONFIG_PATH = "amqp.json"; + private static final ReferenceHolder refHolder = new ReferenceHolder(); private final String username; private final String password; @@ -82,8 +91,9 @@ public BrokerManager(final String username, this.requestedAmqpPort = requestedAmqpPort; this.requestedWorkPath = requestedWorkPath; this.requestedLogPath = requestedLogPath; - refHolder = new ReferenceHolder(); - refHolder.setCleanUpList(Collections.synchronizedList(new LinkedList<>())); + + refHolder.setQueueCleanUpList(Collections.synchronizedList(new LinkedList<>())); + refHolder.setExchangeCleanUpList(Collections.synchronizedList(new LinkedList<>())); introduction = "[BrokerManager" + (name.isEmpty() ? "" : "-" + name) + "] "; // this.systemLauncher = new SystemLauncher(); @@ -97,11 +107,6 @@ public BrokerManager(final String username, this.uuid = UUID.randomUUID(); } - /** - * The path that contains the configuration file for qpid initialization. - */ - private static final String INITIAL_CONFIG_PATH = "amqp.json"; - private static final String INITIAL_CONFIG_PATH_NETWORK = "amqpNetwork.json"; /** * Start the broker with the properties that was initialized with. @@ -160,15 +165,15 @@ public void stopBroker() { systemLauncher.shutdown(); log.info("SystemLauncher shutdown complete. Cleaning up."); - File db = new File(requestedWorkPath + uuid); - File log = new File(requestedLogPath + uuid); + final File db = new File(requestedWorkPath + uuid); + final File log = new File(requestedLogPath + uuid); Stream.of(db, log) .filter(File::exists) .forEach(BrokerManager::deleteFolder); try { - Thread.sleep(5000); + sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } @@ -200,22 +205,27 @@ private static void deleteFolder(File folder) { private static void deleteFile(File file, Integer retryStep) { try { - Thread.sleep(2500 * retryStep); + sleep(2500 * retryStep); } catch (InterruptedException e) { e.printStackTrace(); } - if (!file.delete() && retryStep < 3) { - deleteFile(file, retryStep + 1); - } else { - log.error("File {} failed to be deleted after {} retries", file, retryStep); - } + Try.run(() -> sleep(2500 * retryStep)) + .filter(ignore -> retryStep < 3) + .map(ignore -> file.delete()) + .filter(not(Boolean::booleanValue)) + .map(replaceWith(retryStep < 3)) + .forEach(hasMoreSteps -> Match(hasMoreSteps).of( + Case($(true), run(() -> deleteFile(file, retryStep + 1))), + Case($(false), run(() -> log.error("File {} failed to be deleted after {} retries", file, retryStep))) + )); } /** * Cleans up all the queues and exchanges in the broker. */ public void cleanUp() { - refHolder.getCleanUpList().forEach(r -> r.apply(null)); + refHolder.getQueueCleanUpList().forEach(r -> r.apply(null)); + refHolder.getExchangeCleanUpList().forEach(r -> r.apply(null)); } private Map createSystemConfig() { diff --git a/src/main/java/gr/mmichaildis/amqprunner/broker/AssertionTime.java b/src/main/java/gr/mmichaildis/amqprunner/broker/AssertionTime.java index ecf2f25..bde8066 100644 --- a/src/main/java/gr/mmichaildis/amqprunner/broker/AssertionTime.java +++ b/src/main/java/gr/mmichaildis/amqprunner/broker/AssertionTime.java @@ -42,9 +42,9 @@ final class AssertionTime { * set the target to {@code totalMillis} and reset the counter. * * @param totalMillis The millis that will be either setup as the new target or will put the current target ahead. - * @param override {@code true} to override the target, {@code false} to push ahead. + * @param override {@code true} to override the target, {@code false} to push ahead. */ - void updateTotalSeconds(Long totalMillis, Boolean override) { + protected void updateTotalSeconds(Long totalMillis, Boolean override) { if (override) { this.totalSeconds.set(totalMillis); currentMillis.set(0L); @@ -60,7 +60,7 @@ void updateTotalSeconds(Long totalMillis, Boolean override) { * @param sleepStep the amount of time to update the counter. * @return The difference between the counter and the target. */ - Long updateCurrentMillis(Long sleepStep) { + protected Long updateCurrentMillis(Long sleepStep) { return totalSeconds.get() - currentMillis.updateAndGet(curr -> curr + sleepStep); } } diff --git a/src/main/java/gr/mmichaildis/amqprunner/broker/AssertionVerification.java b/src/main/java/gr/mmichaildis/amqprunner/broker/AssertionVerification.java index 7530873..1a2468a 100644 --- a/src/main/java/gr/mmichaildis/amqprunner/broker/AssertionVerification.java +++ b/src/main/java/gr/mmichaildis/amqprunner/broker/AssertionVerification.java @@ -69,8 +69,8 @@ public boolean isValid() { * the {@link AssertionError} found in all the registered assertions. * * @return the {@link List} which contains all the {@link AssertionError} found. - * If the {@link AssertionVerification#isValid()} was not called before - * this then the {@link List} is empty. + * If the {@link AssertionVerification#isValid()} was not called before + * this then the {@link List} is empty. */ public List getErrors() { return assertionErrors; @@ -82,7 +82,7 @@ public List getErrors() { * @param identifier The assertions identifier. * @param valid The valid status of the identified assertion. */ - void setValid(String identifier, boolean valid) { + protected void setValid(String identifier, boolean valid) { final AssertionPair pair = this.response.get(identifier); pair.setValid(valid); this.response.put(identifier, pair); @@ -94,7 +94,7 @@ void setValid(String identifier, boolean valid) { * @param identifier The assertions identifier. * @param e The {@link AssertionError} that will be registered for the identified assertion. */ - void setAssertionError(String identifier, AssertionError e) { + protected void setAssertionError(String identifier, AssertionError e) { final AssertionPair pair = this.response.get(identifier); pair.setAssertionError(e); this.response.put(identifier, pair); @@ -106,7 +106,7 @@ void setAssertionError(String identifier, AssertionError e) { * * @param identifier The identifier that will identify each assertion. */ - void register(String identifier, String userProvidedIdentifier) { + protected void register(String identifier, String userProvidedIdentifier) { response.put(identifier, AssertionPair.initial(userProvidedIdentifier)); block.put(identifier, new AssertionTime(0)); } @@ -117,9 +117,9 @@ void register(String identifier, String userProvidedIdentifier) { * * @param nextName The assertion name which will be used to create the unique identifier. * @return The unique identifier which should be used for registration and identification on - * the main {@link Thread}. + * the main {@link Thread}. */ - String getNextName(String nextName) { + protected String getNextName(String nextName) { Integer identifier = nameProvider.getOrDefault(nextName, 0); identifier += 1; nameProvider.put(nextName, identifier); @@ -131,7 +131,7 @@ String getNextName(String nextName) { * * @param identifier The assertions identifier. */ - void clearAssertionError(String identifier) { + protected void clearAssertionError(String identifier) { final AssertionPair pair = response.get(identifier); pair.setAssertionError(null); response.put(identifier, pair); @@ -143,7 +143,7 @@ void clearAssertionError(String identifier) { * @param identifier The identifier that will have it sleep increased. * @param sleepTime The N seconds that will be increase. */ - void updateBlock(String identifier, Integer sleepTime) { + protected void updateBlock(String identifier, Integer sleepTime) { this.block.get(identifier) .updateTotalSeconds(sleepTime * SECOND_TO_MILLIS, false); } @@ -166,7 +166,7 @@ void setBlock(String identifier, Integer sleepTime) { * * @param sleepStep the step in time * @return {@code true} if at least one assertion requires more time. {@code false} if and only if all - * the assertions are completed and responds they require no more time. + * the assertions are completed and responds they require no more time. */ public boolean shouldBlock(Long sleepStep) { return block.values().stream() diff --git a/src/main/java/gr/mmichaildis/amqprunner/broker/BrokerTester.java b/src/main/java/gr/mmichaildis/amqprunner/broker/BrokerTester.java index 49163ac..4aa57b4 100644 --- a/src/main/java/gr/mmichaildis/amqprunner/broker/BrokerTester.java +++ b/src/main/java/gr/mmichaildis/amqprunner/broker/BrokerTester.java @@ -23,6 +23,7 @@ import com.rabbitmq.client.Delivery; import gr.mmichaildis.amqprunner.BrokerManager; import gr.mmichaildis.amqprunner.TestFunction; +import io.vavr.Tuple; import io.vavr.collection.List; import io.vavr.collection.Stream; import io.vavr.control.Try; @@ -44,6 +45,7 @@ import java.util.stream.IntStream; import static gr.mmichaildis.amqprunner.util.StreamHelpers.not; +import static gr.mmichaildis.amqprunner.util.StreamHelpers.replaceWith; import static java.lang.Thread.sleep; /** @@ -518,88 +520,25 @@ public BrokerTester addObjectMapper(final ObjectMapper objectMapper) { * @throws TimeoutException in-case the connection has timeout. */ public Sender initialize(final SenderOptions senderOptions) throws IOException, TimeoutException { - Connection connection = receiverOptions.getConnectionFactory().newConnection(); + final Connection connection = receiverOptions.getConnectionFactory().newConnection(); channel = connection.createChannel(); - List exchangeNames = List.empty(); - - for (AMQPBinding binding : bindings.values()) { - channel.queueDeclare( - binding.getQueueProperties().getName(), - binding.getQueueProperties().isDurable(), - binding.getQueueProperties().isExclusive(), - binding.getQueueProperties().isAutoDelete(), - binding.getQueueProperties().getArguments() - ); - - channel.exchangeDeclare( - binding.getExchangeProperties().getName(), - binding.getExchangeProperties().getType().getType(), - binding.getExchangeProperties().isDurable(), - binding.getExchangeProperties().isAutoDelete(), - binding.getExchangeProperties().getArguments() - ); - - channel.queueBind(binding.getQueueProperties().getName(), binding.getExchangeProperties().getName(), - binding.getRoutingKey()); - - exchangeNames = exchangeNames.append(binding.getExchangeProperties().getName()); - - referenceHolder.getCleanUpList().add(ignore -> { - try { - channel.queueDeleteNoWait(binding.getQueueProperties().getName(), false, false); - } catch (IOException e) { - e.printStackTrace(); - } - return null; - }); - } - - exchangeNames.forEach(exchangeName -> referenceHolder.getCleanUpList().add(ignore -> { - try { - channel.exchangeDeleteNoWait(exchangeName, false); - } catch (IOException e) { - e.printStackTrace(); - } - return null; - })); + List.ofAll(bindings.values()) + .map(binding -> Tuple.of( + declareQueue(channel, binding.getQueueProperties()).toOption(), + declareExchange(channel, binding.getExchangeProperties()).toOption(), + binding.getRoutingKey() + )) + .filter(tuple -> tuple._1().isDefined()) + .filter(tuple -> tuple._2().isDefined()) + .forEach(tuple -> Try.of(() -> channel.queueBind(tuple._1.get(), tuple._2.get(), tuple._3)) + .onFailure(Throwable::printStackTrace)); Stream.ofAll(singleQueues.values()) - .forEach(queue -> Try.run( - () -> channel.queueDeclare( - queue.getName(), - queue.isDurable(), - queue.isExclusive(), - queue.isAutoDelete(), - queue.getArguments())) - .onFailure(Throwable::printStackTrace) - .forEach(ignore -> referenceHolder.getCleanUpList().add(ignore2 -> { - try { - channel.queueDeleteNoWait(queue.getName(), false, false); - } catch (IOException e) { - e.printStackTrace(); - } - return null; - }))); + .forEach(queue -> declareQueue(channel, queue)); Stream.ofAll(singleExchanges.values()) - .forEach(exchange -> Try.run( - () -> channel.exchangeDeclare( - exchange.getName(), - exchange.getType().getType(), - exchange.isDurable(), - exchange.isAutoDelete(), - exchange.getArguments() - )) - .onFailure(Throwable::printStackTrace) - .forEach(ignore -> referenceHolder.getCleanUpList().add(ignore2 -> { - try { - channel.exchangeDeleteNoWait(exchange.getName(), false); - } catch (IOException e) { - e.printStackTrace(); - } - return null; - }))); + .forEach(exchange -> declareExchange(channel, exchange)); latch.countDown(); @@ -618,6 +557,62 @@ public Sender initialize(final SenderOptions senderOptions) throws IOException, ); } + /** + * Declares a queue in the given {@link Channel} with the given {@link QueueProperties} defining a + * cleanUp function in the process. + * + * @param channel The {@link Channel} in which the queue will be declared. + * @param queueProperties The queue definition. + * @return A {@link Try} containing the queue name. + */ + private Try declareQueue(final Channel channel, + final QueueProperties queueProperties) { + return Try.run(() -> channel + .queueDeclare( + queueProperties.getName(), + queueProperties.isDurable(), + queueProperties.isExclusive(), + queueProperties.isAutoDelete(), + queueProperties.getArguments())) + .onFailure(Throwable::printStackTrace) + .map(ignore -> referenceHolder.getQueueCleanUpList() + .add(ignore2 -> + Try.run(() -> channel.queueDeleteNoWait(queueProperties.getName(), + false, false)) + .onFailure(Throwable::printStackTrace) + .getOrElse(() -> null) + )) + .map(replaceWith(queueProperties.getName())); + } + + /** + * Declares an exchange in the given {@link Channel} with the given {@link ExchangeProperties} defining a + * cleanUp function in the process. + * + * @param channel The {@link Channel} in which the exchange will be declared. + * @param exchangeProperties The exchange definition. + * @return A {@link Try} containing the queue name. + */ + private Try declareExchange(final Channel channel, + final ExchangeProperties exchangeProperties) { + return Try.run(() -> channel + .exchangeDeclare( + exchangeProperties.getName(), + exchangeProperties.getType().getType(), + exchangeProperties.isDurable(), + exchangeProperties.isAutoDelete(), + exchangeProperties.getArguments() + )) + .onFailure(Throwable::printStackTrace) + .map(ignore -> referenceHolder.getExchangeCleanUpList() + .add(ignore2 -> + Try.run(() -> channel.exchangeDeleteNoWait(exchangeProperties.getName(), false)) + .onFailure(Throwable::printStackTrace) + .getOrElse(() -> null) + )) + .map(replaceWith(exchangeProperties.getName())); + } + /** * Initializes the connection to {@link BrokerManager} and setup the {@link AMQPBinding}. The {@link Sender} has * the default {@link SenderOptions} which contain the given {@link ConnectionFactory}. diff --git a/src/main/java/gr/mmichaildis/amqprunner/broker/ReferenceHolder.java b/src/main/java/gr/mmichaildis/amqprunner/broker/ReferenceHolder.java index 0089e07..ee79577 100644 --- a/src/main/java/gr/mmichaildis/amqprunner/broker/ReferenceHolder.java +++ b/src/main/java/gr/mmichaildis/amqprunner/broker/ReferenceHolder.java @@ -16,11 +16,12 @@ package gr.mmichaildis.amqprunner.broker; import gr.mmichaildis.amqprunner.BrokerManager; -import java.util.List; -import java.util.function.Function; import lombok.Getter; import lombok.Setter; +import java.util.List; +import java.util.function.Function; + /** * A helper class containing references on {@link AssertionVerification} and a {@link List} which are meant * to be used by {@link BrokerManager} but updated by {@link BrokerTester}. @@ -30,6 +31,7 @@ @Getter @Setter public final class ReferenceHolder { - AssertionVerification assertionVerification; - List> cleanUpList; + protected AssertionVerification assertionVerification; + protected List> queueCleanUpList; + protected List> exchangeCleanUpList; } diff --git a/src/main/java/gr/mmichaildis/amqprunner/util/PortExtractingLauncherListener.java b/src/main/java/gr/mmichaildis/amqprunner/util/PortExtractingLauncherListener.java index 5b93e4f..0bf2897 100644 --- a/src/main/java/gr/mmichaildis/amqprunner/util/PortExtractingLauncherListener.java +++ b/src/main/java/gr/mmichaildis/amqprunner/util/PortExtractingLauncherListener.java @@ -33,12 +33,12 @@ public class PortExtractingLauncherListener implements SystemLauncherListener { @Override public void beforeStartup() { - + //empty } @Override public void errorOnStartup(final RuntimeException e) { - + //empty } @Override @@ -61,17 +61,17 @@ public void onContainerResolve(final SystemConfig systemConfig) { @Override public void onContainerClose(final SystemConfig systemConfig) { - + //empty } @Override public void onShutdown(final int exitCode) { - + //empty } @Override public void exceptionOnShutdown(final Exception e) { - + //empty } public Map getPorts() { diff --git a/src/main/java/gr/mmichaildis/amqprunner/util/StreamHelpers.java b/src/main/java/gr/mmichaildis/amqprunner/util/StreamHelpers.java index eb1867d..8c45317 100644 --- a/src/main/java/gr/mmichaildis/amqprunner/util/StreamHelpers.java +++ b/src/main/java/gr/mmichaildis/amqprunner/util/StreamHelpers.java @@ -15,15 +15,19 @@ */ package gr.mmichaildis.amqprunner.util; +import java.util.function.Function; import java.util.function.Predicate; /** * @author MMichailidis */ -public class StreamHelpers { +public abstract class StreamHelpers { public static Predicate not(final Predicate with) { return (final T t) -> with.negate().test(t); } + public static Function replaceWith(U to) { + return ignore -> to; + } } diff --git a/src/test/java/gr/mmichaildis/amqprunner/BrokerManagerBasicReflectionTest.java b/src/test/java/gr/mmichaildis/amqprunner/BrokerManagerBasicReflectionTest.java index 82f5720..e70ecac 100644 --- a/src/test/java/gr/mmichaildis/amqprunner/BrokerManagerBasicReflectionTest.java +++ b/src/test/java/gr/mmichaildis/amqprunner/BrokerManagerBasicReflectionTest.java @@ -23,8 +23,7 @@ import java.util.HashMap; import java.util.Map; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.*; import static org.mockito.Mockito.mock; /** @@ -52,6 +51,7 @@ public static class SomeTest { @Test public void demoTest() { + assertTrue(true); } } diff --git a/src/test/java/gr/mmichaildis/amqprunner/broker/AssertionTimeTester.java b/src/test/java/gr/mmichaildis/amqprunner/broker/AssertionTimeTester.java index c927e68..2c6b8d9 100644 --- a/src/test/java/gr/mmichaildis/amqprunner/broker/AssertionTimeTester.java +++ b/src/test/java/gr/mmichaildis/amqprunner/broker/AssertionTimeTester.java @@ -15,41 +15,41 @@ */ package gr.mmichaildis.amqprunner.broker; -import static org.junit.Assert.assertEquals; - import org.junit.Test; +import static org.junit.Assert.assertEquals; + /** * @author MMichailidis */ public class AssertionTimeTester { @Test - public void test_updateTotalSeconds_noOverride() { + public void testUpdateTotalSecondsNoOverride() { AssertionTime at = new AssertionTime(1); - assertEquals( 1000, (long) at.updateCurrentMillis(0L)); + assertEquals(1000, (long) at.updateCurrentMillis(0L)); at.updateTotalSeconds(100L, false); - assertEquals( 1100, (long) at.updateCurrentMillis(0L)); + assertEquals(1100, (long) at.updateCurrentMillis(0L)); } @Test - public void test_updateTotalSeconds_override() { + public void testUpdateTotalSecondsOverride() { AssertionTime at = new AssertionTime(1); - assertEquals( 1000, (long) at.updateCurrentMillis(0L)); + assertEquals(1000, (long) at.updateCurrentMillis(0L)); at.updateTotalSeconds(5000L, true); - assertEquals( 5000, (long) at.updateCurrentMillis(0L)); + assertEquals(5000, (long) at.updateCurrentMillis(0L)); } @Test public void updateCurrentMillis() { AssertionTime at = new AssertionTime(1); - assertEquals( 950, (long) at.updateCurrentMillis(50L)); + assertEquals(950, (long) at.updateCurrentMillis(50L)); } } \ No newline at end of file diff --git a/src/test/java/gr/mmichaildis/amqprunner/broker/BrokerTesterTest.java b/src/test/java/gr/mmichaildis/amqprunner/broker/BrokerTesterTest.java index d0b4e3c..84f1281 100644 --- a/src/test/java/gr/mmichaildis/amqprunner/broker/BrokerTesterTest.java +++ b/src/test/java/gr/mmichaildis/amqprunner/broker/BrokerTesterTest.java @@ -23,8 +23,6 @@ import io.vavr.Tuple; import io.vavr.Tuple2; import lombok.*; -import org.junit.After; -import org.junit.Before; import org.junit.Test; import reactor.core.publisher.Flux; import reactor.rabbitmq.Receiver; @@ -50,14 +48,6 @@ */ public class BrokerTesterTest { - @Before - public void setUp() throws Exception { - } - - @After - public void tearDown() throws Exception { - } - @Test public void defaultSleep() throws Exception { final ReferenceHolder referenceHolder = new ReferenceHolder(); @@ -326,9 +316,8 @@ public void declareBindingQueueReuse() throws Exception { @Test public void expectNextCount() throws Exception { - final ReferenceHolder referenceHolder = new ReferenceHolder(); - referenceHolder.setCleanUpList(Collections.synchronizedList(new LinkedList<>())); - final BrokerTester brokerTester = new BrokerTester(mock(ConnectionFactory.class), "TestingTest", referenceHolder); + final BrokerTester brokerTester = new BrokerTester(mock(ConnectionFactory.class), + "TestingTest", getReferenceHolder()); final TestPublisher testPublisher = TestPublisher.create(); @@ -354,9 +343,8 @@ public void expectNextCount() throws Exception { @Test public void expectNextCountIgnoreMoreEmissions() throws Exception { - final ReferenceHolder referenceHolder = new ReferenceHolder(); - referenceHolder.setCleanUpList(Collections.synchronizedList(new LinkedList<>())); - final BrokerTester brokerTester = new BrokerTester(mock(ConnectionFactory.class), "TestingTest", referenceHolder); + final BrokerTester brokerTester = new BrokerTester(mock(ConnectionFactory.class), + "TestingTest", getReferenceHolder()); final TestPublisher testPublisher = TestPublisher.create(); @@ -383,12 +371,10 @@ public void expectNextCountIgnoreMoreEmissions() throws Exception { @Test public void expectNextCountLessEmissions() throws Exception { - final ReferenceHolder referenceHolder = new ReferenceHolder(); - referenceHolder.setCleanUpList(Collections.synchronizedList(new LinkedList<>())); final String brokerName = "TestingTest"; final String queueName = "someQueue"; final BrokerTester brokerTester = new BrokerTester(mock(ConnectionFactory.class), - brokerName, referenceHolder); + brokerName, getReferenceHolder()); final TestPublisher testPublisher = TestPublisher.create(); @@ -423,13 +409,11 @@ public void expectNextCountLessEmissions() throws Exception { @Test public void expectNextCountUnexpectedEmissions() throws Exception { - final ReferenceHolder referenceHolder = new ReferenceHolder(); - referenceHolder.setCleanUpList(Collections.synchronizedList(new LinkedList<>())); final String brokerName = "TestingTest"; final String queueName = "someQueue"; final BrokerTester brokerTester = new BrokerTester(mock(ConnectionFactory.class), - brokerName, referenceHolder); + brokerName, getReferenceHolder()); final TestPublisher testPublisher = TestPublisher.create(); @@ -467,13 +451,11 @@ public void expectNextCountUnexpectedEmissions() throws Exception { @Test public void expectNextCountMultipleUnexpectedEmissions() throws Exception { - final ReferenceHolder referenceHolder = new ReferenceHolder(); - referenceHolder.setCleanUpList(Collections.synchronizedList(new LinkedList<>())); final String brokerName = "TestingTest"; final String queueName = "someQueue"; final BrokerTester brokerTester = new BrokerTester(mock(ConnectionFactory.class), - brokerName, referenceHolder); + brokerName, getReferenceHolder()); final TestPublisher testPublisher = TestPublisher.create(); @@ -515,13 +497,11 @@ public void expectNextCountMultipleUnexpectedEmissions() throws Exception { @Test public void expectNoEmissions() throws Exception { - final ReferenceHolder referenceHolder = new ReferenceHolder(); - referenceHolder.setCleanUpList(Collections.synchronizedList(new LinkedList<>())); final String brokerName = "TestingTest"; final String queueName = "someQueue"; final BrokerTester brokerTester = new BrokerTester(mock(ConnectionFactory.class), - brokerName, referenceHolder); + brokerName, getReferenceHolder()); final TestPublisher testPublisher = TestPublisher.create(); @@ -543,13 +523,11 @@ public void expectNoEmissions() throws Exception { @Test public void expectNoEmissionsHadEmissions() throws Exception { - final ReferenceHolder referenceHolder = new ReferenceHolder(); - referenceHolder.setCleanUpList(Collections.synchronizedList(new LinkedList<>())); final String brokerName = "TestingTest"; final String queueName = "someQueue"; final BrokerTester brokerTester = new BrokerTester(mock(ConnectionFactory.class), - brokerName, referenceHolder); + brokerName, getReferenceHolder()); final TestPublisher testPublisher = TestPublisher.create(); @@ -584,9 +562,8 @@ public void expectNoEmissionsHadEmissions() throws Exception { @Test public void assertNextWith() throws Exception { - final ReferenceHolder referenceHolder = new ReferenceHolder(); - referenceHolder.setCleanUpList(Collections.synchronizedList(new LinkedList<>())); - final BrokerTester brokerTester = new BrokerTester(mock(ConnectionFactory.class), "TestingTest", referenceHolder); + final BrokerTester brokerTester = new BrokerTester(mock(ConnectionFactory.class), + "TestingTest", getReferenceHolder()); final TestPublisher testPublisher = TestPublisher.create(); final Delivery mock1 = mock(Delivery.class); @@ -621,12 +598,10 @@ public void assertNextWith() throws Exception { @Test public void assertNextWithUnexpectedEmission() throws Exception { - final ReferenceHolder referenceHolder = new ReferenceHolder(); - referenceHolder.setCleanUpList(Collections.synchronizedList(new LinkedList<>())); final String brokerName = "TestingTest"; final String queueName = "someQueue"; final BrokerTester brokerTester = new BrokerTester(mock(ConnectionFactory.class), - brokerName, referenceHolder); + brokerName, getReferenceHolder()); final TestPublisher testPublisher = TestPublisher.create(); final Delivery mock1 = mock(Delivery.class); @@ -673,12 +648,10 @@ public void assertNextWithUnexpectedEmission() throws Exception { @Test public void assertNextWithTooLittleEmissions() throws Exception { - final ReferenceHolder referenceHolder = new ReferenceHolder(); - referenceHolder.setCleanUpList(Collections.synchronizedList(new LinkedList<>())); final String brokerName = "TestingTest"; final String queueName = "someQueue"; final BrokerTester brokerTester = new BrokerTester(mock(ConnectionFactory.class), - brokerName, referenceHolder); + brokerName, getReferenceHolder()); final TestPublisher testPublisher = TestPublisher.create(); final Delivery mock1 = mock(Delivery.class); @@ -722,12 +695,10 @@ public void assertNextWithTooLittleEmissions() throws Exception { @Test public void assertNextWithAssertionFailure() throws Exception { - final ReferenceHolder referenceHolder = new ReferenceHolder(); - referenceHolder.setCleanUpList(Collections.synchronizedList(new LinkedList<>())); final String brokerName = "TestingTest"; final String queueName = "someQueue"; final BrokerTester brokerTester = new BrokerTester(mock(ConnectionFactory.class), - brokerName, referenceHolder); + brokerName, getReferenceHolder()); final TestPublisher testPublisher = TestPublisher.create(); final Delivery mock1 = mock(Delivery.class); @@ -770,12 +741,10 @@ public void assertNextWithAssertionFailure() throws Exception { @Test public void assertNextWithObjectMapperIOException() throws Exception { - final ReferenceHolder referenceHolder = new ReferenceHolder(); - referenceHolder.setCleanUpList(Collections.synchronizedList(new LinkedList<>())); final String brokerName = "TestingTest"; final String queueName = "someQueue"; final BrokerTester brokerTester = new BrokerTester(mock(ConnectionFactory.class), - brokerName, referenceHolder); + brokerName, getReferenceHolder()); final TestPublisher testPublisher = TestPublisher.create(); final Delivery mock1 = mock(Delivery.class); @@ -866,6 +835,15 @@ public Tuple2 weaveBrokerTesterForVerific return Tuple.of(assertionVerification, o); } + private ReferenceHolder getReferenceHolder() { + final ReferenceHolder referenceHolder = new ReferenceHolder(); + + referenceHolder.setQueueCleanUpList(Collections.synchronizedList(new LinkedList<>())); + referenceHolder.setExchangeCleanUpList(Collections.synchronizedList(new LinkedList<>())); + + return referenceHolder; + } + @Getter @ToString @EqualsAndHashCode