Skip to content

Commit

Permalink
related to #58
Browse files Browse the repository at this point in the history
Start isolating reactor behind internal interfaces.
  • Loading branch information
daviddawson committed Aug 27, 2017
1 parent b322040 commit 14aa8c2
Show file tree
Hide file tree
Showing 13 changed files with 95 additions and 17 deletions.
6 changes: 3 additions & 3 deletions muon-core/src/main/java/io/muoncore/channel/Channels.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@
import io.muoncore.transport.client.RingBufferLocalDispatcher;
import io.muoncore.transport.client.TransportMessageDispatcher;
import reactor.Environment;
import reactor.core.Dispatcher;

import reactor.core.config.DispatcherType;

import java.util.function.Function;

public class Channels {

static Dispatcher WORK_DISPATCHER = Environment.newDispatcher(32768, 200, DispatcherType.THREAD_POOL_EXECUTOR);
public static Dispatcher EVENT_DISPATCHER = new RingBufferLocalDispatcher("channel", 32768);
static Dispatcher WORK_DISPATCHER = new Reactor2Dispatcher(Environment.newDispatcher(32768, 200, DispatcherType.THREAD_POOL_EXECUTOR));
public static Dispatcher EVENT_DISPATCHER = new Reactor2Dispatcher(new RingBufferLocalDispatcher("channel", 32768));

public static void shutdown() {
// EVENT_DISPATCHER.shutdown();
Expand Down
14 changes: 14 additions & 0 deletions muon-core/src/main/java/io/muoncore/channel/Dispatcher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.muoncore.channel;

import java.util.function.Consumer;

public interface Dispatcher {

<E> void dispatch(E data,
Consumer<E> eventConsumer,
Consumer<Throwable> errorConsumer);

<E> void tryDispatch(E data,
Consumer<E> eventConsumer,
Consumer<Throwable> errorConsumer);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.muoncore.channel;

import lombok.AllArgsConstructor;

import java.util.function.Consumer;

@AllArgsConstructor
public class Reactor2Dispatcher implements Dispatcher {

private reactor.core.Dispatcher internal;

@Override
public <E> void dispatch(E data, Consumer<E> eventConsumer, Consumer<Throwable> errorConsumer) {
internal.dispatch(data, eventConsumer::accept, errorConsumer::accept);
}

@Override
public <E> void tryDispatch(E data, Consumer<E> eventConsumer, Consumer<Throwable> errorConsumer) {
internal.tryDispatch(data, eventConsumer::accept, errorConsumer::accept);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.muoncore.channel.Channel;
import io.muoncore.channel.ChannelConnection;
import io.muoncore.channel.Dispatcher;
import io.muoncore.channel.support.Scheduler;
import io.muoncore.exception.MuonException;
import io.muoncore.message.MuonInboundMessage;
Expand All @@ -10,7 +11,6 @@
import io.muoncore.message.MuonOutboundMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Dispatcher;

import java.util.concurrent.TimeUnit;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import io.muoncore.channel.Channel;
import io.muoncore.channel.ChannelConnection;
import io.muoncore.channel.Dispatcher;
import io.muoncore.exception.MuonException;
import reactor.core.Dispatcher;

import java.util.Date;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

import io.muoncore.channel.Channel;
import io.muoncore.channel.ChannelConnection;
import io.muoncore.channel.Dispatcher;
import io.muoncore.channel.support.Scheduler;
import io.muoncore.exception.MuonException;
import io.muoncore.message.MuonInboundMessage;
import io.muoncore.message.MuonMessageBuilder;
import io.muoncore.message.MuonOutboundMessage;
import reactor.core.Dispatcher;

import java.util.concurrent.TimeUnit;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import io.muoncore.channel.Channel;
import io.muoncore.channel.ChannelConnection;
import io.muoncore.channel.Dispatcher;
import io.muoncore.exception.MuonException;
import io.muoncore.message.MuonMessage;
import io.muoncore.transport.client.TransportMessageDispatcher;
import reactor.core.Dispatcher;

public class WiretapChannel<GoingLeft extends MuonMessage, GoingRight extends MuonMessage> implements Channel<GoingLeft, GoingRight> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

import io.muoncore.channel.Channel;
import io.muoncore.channel.ChannelConnection;
import io.muoncore.channel.Dispatcher;
import io.muoncore.exception.MuonException;
import io.muoncore.message.MuonInboundMessage;
import io.muoncore.message.MuonMessageBuilder;
import io.muoncore.message.MuonOutboundMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Dispatcher;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package io.muoncore.transport.client;

import io.muoncore.Discovery;
import io.muoncore.channel.Channel;
import io.muoncore.channel.ChannelConnection;
import io.muoncore.channel.Channels;
import io.muoncore.channel.*;
import io.muoncore.codec.Codecs;
import io.muoncore.config.AutoConfiguration;
import io.muoncore.message.MuonInboundMessage;
Expand All @@ -13,7 +11,6 @@
import io.muoncore.transport.sharedsocket.client.SharedSocketRoute;
import io.muoncore.transport.sharedsocket.client.SharedSocketRouter;
import org.reactivestreams.Publisher;
import reactor.core.Dispatcher;

import java.util.List;
import java.util.function.Predicate;
Expand All @@ -25,7 +22,7 @@ public class MultiTransportClient implements TransportClient, TransportControl {

private List<MuonTransport> transports;
private TransportMessageDispatcher taps;
private Dispatcher dispatcher = new RingBufferLocalDispatcher("transportDispatch", 8192);
private Dispatcher dispatcher = new Reactor2Dispatcher(new RingBufferLocalDispatcher("transportDispatch", 8192));
private AutoConfiguration configuration;
private SharedSocketRouter sharedSocketRouter;
private Discovery discovery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import io.muoncore.Discovery;
import io.muoncore.ServiceDescriptor;
import io.muoncore.channel.ChannelConnection;
import io.muoncore.channel.Dispatcher;
import io.muoncore.exception.NoSuchServiceException;
import io.muoncore.message.MuonInboundMessage;
import io.muoncore.message.MuonMessage;
import io.muoncore.message.MuonOutboundMessage;
import io.muoncore.transport.sharedsocket.client.SharedSocketRouter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Dispatcher;

import java.util.HashMap;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package io.muoncore.transport.client;

import io.muoncore.channel.Dispatcher;
import io.muoncore.channel.Reactor2Dispatcher;
import io.muoncore.message.MuonMessage;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.Environment;
import reactor.core.Dispatcher;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -18,7 +19,7 @@ public class SimpleTransportMessageDispatcher implements TransportMessageDispatc

private List<QueuePredicate> queues = new ArrayList<>();
private ExecutorService exec = Executors.newFixedThreadPool(20);
private Dispatcher dispatcher = Environment.newDispatcher();
private Dispatcher dispatcher = new Reactor2Dispatcher(Environment.newDispatcher());

private static final MuonMessage POISON = new MuonMessage(null, 0, null, null, null, null, null, null,null,null, null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public SharedSocketRoute(String serviceName, TransportConnectionProvider transpo

private void shutdownRoute(MuonInboundMessage msg) {
if (running) {
log.info("Shutting down shared-route due to channel failure");
log.info("Shutting down shared-route due to channel failure {}", msg);
routes.values().forEach(sharedSocketChannelConnection -> sharedSocketChannelConnection.sendInbound(msg));
onShutdown.run();
transportChannel.shutdown();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.muoncore.inmem

import com.google.common.eventbus.EventBus
import io.muoncore.MultiTransportMuon
import io.muoncore.Muon
import io.muoncore.codec.json.JsonOnlyCodecs
import io.muoncore.config.AutoConfiguration
import io.muoncore.memory.discovery.InMemDiscovery
import io.muoncore.memory.seda.InMemSeda
import io.muoncore.memory.transport.InMemTransport
import spock.lang.Ignore
import spock.lang.Specification
import spock.lang.Timeout

class IntrospectionSimulationSpec extends Specification {

def eventbus = new EventBus()

@Timeout(100)
"many services can run and be introspected"() {

given: "some services"

def discovery = new InMemDiscovery()

//TODO, extract this out into the SEDA stack and have a general way of using it that isn't here.
def seda = new InMemSeda(discovery, eventbus);

def svc1 = createService("mine", discovery)

when:
def names = svc1.discovery.serviceNames

then:
names == ["hello"]

}

Muon createService(ident, discovery) {
def config = new AutoConfiguration(serviceName: "${ident}")
def transport = new InMemTransport(config, eventbus)

new MultiTransportMuon(config, discovery, [transport], new JsonOnlyCodecs())
}
}

0 comments on commit 14aa8c2

Please sign in to comment.