diff --git a/muon-core/src/main/java/io/muoncore/channel/Channels.java b/muon-core/src/main/java/io/muoncore/channel/Channels.java index f9ef54a..09a620d 100644 --- a/muon-core/src/main/java/io/muoncore/channel/Channels.java +++ b/muon-core/src/main/java/io/muoncore/channel/Channels.java @@ -9,15 +9,15 @@ import io.muoncore.transport.client.RingBufferLocalDispatcher; import io.muoncore.transport.client.TransportMessageDispatcher; import reactor.Environment; -import reactor.core.Dispatcher; + import reactor.core.config.DispatcherType; import java.util.function.Function; public class Channels { - static Dispatcher WORK_DISPATCHER = Environment.newDispatcher(32768, 200, DispatcherType.THREAD_POOL_EXECUTOR); - public static Dispatcher EVENT_DISPATCHER = new RingBufferLocalDispatcher("channel", 32768); + static Dispatcher WORK_DISPATCHER = new Reactor2Dispatcher(Environment.newDispatcher(32768, 200, DispatcherType.THREAD_POOL_EXECUTOR)); + public static Dispatcher EVENT_DISPATCHER = new Reactor2Dispatcher(new RingBufferLocalDispatcher("channel", 32768)); public static void shutdown() { // EVENT_DISPATCHER.shutdown(); diff --git a/muon-core/src/main/java/io/muoncore/channel/Dispatcher.java b/muon-core/src/main/java/io/muoncore/channel/Dispatcher.java new file mode 100644 index 0000000..3b8c602 --- /dev/null +++ b/muon-core/src/main/java/io/muoncore/channel/Dispatcher.java @@ -0,0 +1,14 @@ +package io.muoncore.channel; + +import java.util.function.Consumer; + +public interface Dispatcher { + + void dispatch(E data, + Consumer eventConsumer, + Consumer errorConsumer); + + void tryDispatch(E data, + Consumer eventConsumer, + Consumer errorConsumer); +} diff --git a/muon-core/src/main/java/io/muoncore/channel/Reactor2Dispatcher.java b/muon-core/src/main/java/io/muoncore/channel/Reactor2Dispatcher.java new file mode 100644 index 0000000..c26a968 --- /dev/null +++ b/muon-core/src/main/java/io/muoncore/channel/Reactor2Dispatcher.java @@ -0,0 +1,21 @@ +package io.muoncore.channel; + +import lombok.AllArgsConstructor; + +import java.util.function.Consumer; + +@AllArgsConstructor +public class Reactor2Dispatcher implements Dispatcher { + + private reactor.core.Dispatcher internal; + + @Override + public void dispatch(E data, Consumer eventConsumer, Consumer errorConsumer) { + internal.dispatch(data, eventConsumer::accept, errorConsumer::accept); + } + + @Override + public void tryDispatch(E data, Consumer eventConsumer, Consumer errorConsumer) { + internal.tryDispatch(data, eventConsumer::accept, errorConsumer::accept); + } +} diff --git a/muon-core/src/main/java/io/muoncore/channel/impl/KeepAliveChannel.java b/muon-core/src/main/java/io/muoncore/channel/impl/KeepAliveChannel.java index 3459d14..b0dfc29 100644 --- a/muon-core/src/main/java/io/muoncore/channel/impl/KeepAliveChannel.java +++ b/muon-core/src/main/java/io/muoncore/channel/impl/KeepAliveChannel.java @@ -2,6 +2,7 @@ import io.muoncore.channel.Channel; import io.muoncore.channel.ChannelConnection; +import io.muoncore.channel.Dispatcher; import io.muoncore.channel.support.Scheduler; import io.muoncore.exception.MuonException; import io.muoncore.message.MuonInboundMessage; @@ -10,7 +11,6 @@ import io.muoncore.message.MuonOutboundMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.Dispatcher; import java.util.concurrent.TimeUnit; diff --git a/muon-core/src/main/java/io/muoncore/channel/impl/StandardAsyncChannel.java b/muon-core/src/main/java/io/muoncore/channel/impl/StandardAsyncChannel.java index 28b2a71..f9660a1 100644 --- a/muon-core/src/main/java/io/muoncore/channel/impl/StandardAsyncChannel.java +++ b/muon-core/src/main/java/io/muoncore/channel/impl/StandardAsyncChannel.java @@ -2,8 +2,8 @@ import io.muoncore.channel.Channel; import io.muoncore.channel.ChannelConnection; +import io.muoncore.channel.Dispatcher; import io.muoncore.exception.MuonException; -import reactor.core.Dispatcher; import java.util.Date; diff --git a/muon-core/src/main/java/io/muoncore/channel/impl/TimeoutChannel.java b/muon-core/src/main/java/io/muoncore/channel/impl/TimeoutChannel.java index 78a641b..390d954 100644 --- a/muon-core/src/main/java/io/muoncore/channel/impl/TimeoutChannel.java +++ b/muon-core/src/main/java/io/muoncore/channel/impl/TimeoutChannel.java @@ -2,12 +2,12 @@ import io.muoncore.channel.Channel; import io.muoncore.channel.ChannelConnection; +import io.muoncore.channel.Dispatcher; import io.muoncore.channel.support.Scheduler; import io.muoncore.exception.MuonException; import io.muoncore.message.MuonInboundMessage; import io.muoncore.message.MuonMessageBuilder; import io.muoncore.message.MuonOutboundMessage; -import reactor.core.Dispatcher; import java.util.concurrent.TimeUnit; diff --git a/muon-core/src/main/java/io/muoncore/channel/impl/WiretapChannel.java b/muon-core/src/main/java/io/muoncore/channel/impl/WiretapChannel.java index a1507aa..47083fc 100644 --- a/muon-core/src/main/java/io/muoncore/channel/impl/WiretapChannel.java +++ b/muon-core/src/main/java/io/muoncore/channel/impl/WiretapChannel.java @@ -2,10 +2,10 @@ import io.muoncore.channel.Channel; import io.muoncore.channel.ChannelConnection; +import io.muoncore.channel.Dispatcher; import io.muoncore.exception.MuonException; import io.muoncore.message.MuonMessage; import io.muoncore.transport.client.TransportMessageDispatcher; -import reactor.core.Dispatcher; public class WiretapChannel implements Channel { diff --git a/muon-core/src/main/java/io/muoncore/channel/impl/ZipChannel.java b/muon-core/src/main/java/io/muoncore/channel/impl/ZipChannel.java index 9d787da..d2f97d6 100644 --- a/muon-core/src/main/java/io/muoncore/channel/impl/ZipChannel.java +++ b/muon-core/src/main/java/io/muoncore/channel/impl/ZipChannel.java @@ -2,13 +2,13 @@ import io.muoncore.channel.Channel; import io.muoncore.channel.ChannelConnection; +import io.muoncore.channel.Dispatcher; import io.muoncore.exception.MuonException; import io.muoncore.message.MuonInboundMessage; import io.muoncore.message.MuonMessageBuilder; import io.muoncore.message.MuonOutboundMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.Dispatcher; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; diff --git a/muon-core/src/main/java/io/muoncore/transport/client/MultiTransportClient.java b/muon-core/src/main/java/io/muoncore/transport/client/MultiTransportClient.java index 9d15172..80ba6ef 100644 --- a/muon-core/src/main/java/io/muoncore/transport/client/MultiTransportClient.java +++ b/muon-core/src/main/java/io/muoncore/transport/client/MultiTransportClient.java @@ -1,9 +1,7 @@ package io.muoncore.transport.client; import io.muoncore.Discovery; -import io.muoncore.channel.Channel; -import io.muoncore.channel.ChannelConnection; -import io.muoncore.channel.Channels; +import io.muoncore.channel.*; import io.muoncore.codec.Codecs; import io.muoncore.config.AutoConfiguration; import io.muoncore.message.MuonInboundMessage; @@ -13,7 +11,6 @@ import io.muoncore.transport.sharedsocket.client.SharedSocketRoute; import io.muoncore.transport.sharedsocket.client.SharedSocketRouter; import org.reactivestreams.Publisher; -import reactor.core.Dispatcher; import java.util.List; import java.util.function.Predicate; @@ -25,7 +22,7 @@ public class MultiTransportClient implements TransportClient, TransportControl { private List transports; private TransportMessageDispatcher taps; - private Dispatcher dispatcher = new RingBufferLocalDispatcher("transportDispatch", 8192); + private Dispatcher dispatcher = new Reactor2Dispatcher(new RingBufferLocalDispatcher("transportDispatch", 8192)); private AutoConfiguration configuration; private SharedSocketRouter sharedSocketRouter; private Discovery discovery; diff --git a/muon-core/src/main/java/io/muoncore/transport/client/MultiTransportClientChannelConnection.java b/muon-core/src/main/java/io/muoncore/transport/client/MultiTransportClientChannelConnection.java index d3b0450..d0f0aa5 100644 --- a/muon-core/src/main/java/io/muoncore/transport/client/MultiTransportClientChannelConnection.java +++ b/muon-core/src/main/java/io/muoncore/transport/client/MultiTransportClientChannelConnection.java @@ -3,6 +3,7 @@ import io.muoncore.Discovery; import io.muoncore.ServiceDescriptor; import io.muoncore.channel.ChannelConnection; +import io.muoncore.channel.Dispatcher; import io.muoncore.exception.NoSuchServiceException; import io.muoncore.message.MuonInboundMessage; import io.muoncore.message.MuonMessage; @@ -10,7 +11,6 @@ import io.muoncore.transport.sharedsocket.client.SharedSocketRouter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.Dispatcher; import java.util.HashMap; import java.util.Map; diff --git a/muon-core/src/main/java/io/muoncore/transport/client/SimpleTransportMessageDispatcher.java b/muon-core/src/main/java/io/muoncore/transport/client/SimpleTransportMessageDispatcher.java index 5840b9a..f1446fa 100644 --- a/muon-core/src/main/java/io/muoncore/transport/client/SimpleTransportMessageDispatcher.java +++ b/muon-core/src/main/java/io/muoncore/transport/client/SimpleTransportMessageDispatcher.java @@ -1,10 +1,11 @@ package io.muoncore.transport.client; +import io.muoncore.channel.Dispatcher; +import io.muoncore.channel.Reactor2Dispatcher; import io.muoncore.message.MuonMessage; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; import reactor.Environment; -import reactor.core.Dispatcher; import java.util.ArrayList; import java.util.List; @@ -18,7 +19,7 @@ public class SimpleTransportMessageDispatcher implements TransportMessageDispatc private List queues = new ArrayList<>(); private ExecutorService exec = Executors.newFixedThreadPool(20); - private Dispatcher dispatcher = Environment.newDispatcher(); + private Dispatcher dispatcher = new Reactor2Dispatcher(Environment.newDispatcher()); private static final MuonMessage POISON = new MuonMessage(null, 0, null, null, null, null, null, null,null,null, null); diff --git a/muon-core/src/main/java/io/muoncore/transport/sharedsocket/client/SharedSocketRoute.java b/muon-core/src/main/java/io/muoncore/transport/sharedsocket/client/SharedSocketRoute.java index 0898230..6615026 100644 --- a/muon-core/src/main/java/io/muoncore/transport/sharedsocket/client/SharedSocketRoute.java +++ b/muon-core/src/main/java/io/muoncore/transport/sharedsocket/client/SharedSocketRoute.java @@ -59,7 +59,7 @@ public SharedSocketRoute(String serviceName, TransportConnectionProvider transpo private void shutdownRoute(MuonInboundMessage msg) { if (running) { - log.info("Shutting down shared-route due to channel failure"); + log.info("Shutting down shared-route due to channel failure {}", msg); routes.values().forEach(sharedSocketChannelConnection -> sharedSocketChannelConnection.sendInbound(msg)); onShutdown.run(); transportChannel.shutdown(); diff --git a/muon-core/src/test/groovy/io/muoncore/inmem/IntrospectionSimulationSpec.groovy b/muon-core/src/test/groovy/io/muoncore/inmem/IntrospectionSimulationSpec.groovy new file mode 100644 index 0000000..b7fa693 --- /dev/null +++ b/muon-core/src/test/groovy/io/muoncore/inmem/IntrospectionSimulationSpec.groovy @@ -0,0 +1,45 @@ +package io.muoncore.inmem + +import com.google.common.eventbus.EventBus +import io.muoncore.MultiTransportMuon +import io.muoncore.Muon +import io.muoncore.codec.json.JsonOnlyCodecs +import io.muoncore.config.AutoConfiguration +import io.muoncore.memory.discovery.InMemDiscovery +import io.muoncore.memory.seda.InMemSeda +import io.muoncore.memory.transport.InMemTransport +import spock.lang.Ignore +import spock.lang.Specification +import spock.lang.Timeout + +class IntrospectionSimulationSpec extends Specification { + + def eventbus = new EventBus() + + @Timeout(100) + "many services can run and be introspected"() { + + given: "some services" + + def discovery = new InMemDiscovery() + + //TODO, extract this out into the SEDA stack and have a general way of using it that isn't here. + def seda = new InMemSeda(discovery, eventbus); + + def svc1 = createService("mine", discovery) + + when: + def names = svc1.discovery.serviceNames + + then: + names == ["hello"] + + } + + Muon createService(ident, discovery) { + def config = new AutoConfiguration(serviceName: "${ident}") + def transport = new InMemTransport(config, eventbus) + + new MultiTransportMuon(config, discovery, [transport], new JsonOnlyCodecs()) + } +}