diff --git a/src/main/java/org/jgroups/blocks/cs/netty/NettyConnection.java b/src/main/java/org/jgroups/blocks/cs/netty/NettyConnection.java index df2705f..8200a5d 100644 --- a/src/main/java/org/jgroups/blocks/cs/netty/NettyConnection.java +++ b/src/main/java/org/jgroups/blocks/cs/netty/NettyConnection.java @@ -37,11 +37,12 @@ public class NettyConnection { private final EventExecutorGroup separateWorkerGroup = new DefaultEventExecutorGroup(4); private final Bootstrap outgoingBootstrap = new Bootstrap(); + private final ServerBootstrap inboundBootstrap = new ServerBootstrap(); private final Map ipAddressChannelMap = new HashMap<>(); private byte[] replyAdder = null; private int port; private InetAddress bind_addr; - private EventLoopGroup boss_group; // Handles incoming connections + private EventLoopGroup boss_group; // Only handles incoming connections private EventLoopGroup worker_group; private boolean isNativeTransport; private NettyReceiverListener callback; @@ -66,33 +67,11 @@ public void channelRead(Channel channel, IpAddress sender) { updateMap(channel, sender); } }; - - outgoingBootstrap.group(worker_group) - .handler(new PipelineChannelInitializer(this.callback, lifecycleListener, separateWorkerGroup)) - .localAddress(bind_addr, 0) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000) - .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - .option(ChannelOption.TCP_NODELAY, true); - if (isNativeTransport) - outgoingBootstrap.channel(EpollSocketChannel.class); - else - outgoingBootstrap.channel(NioSocketChannel.class); + configureClient(); + configureServer(); } public void run() throws InterruptedException, BindException, Errors.NativeIoException { - ServerBootstrap inboundBootstrap = new ServerBootstrap(); - inboundBootstrap.group(boss_group, worker_group) - .localAddress(bind_addr, port) - .childHandler(new PipelineChannelInitializer(this.callback, lifecycleListener, separateWorkerGroup)) - .option(ChannelOption.SO_REUSEADDR, true) - .option(ChannelOption.SO_BACKLOG, 128) - .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - .childOption(ChannelOption.TCP_NODELAY, true); - if (isNativeTransport) { - inboundBootstrap.channel(EpollServerSocketChannel.class); - } else { - inboundBootstrap.channel(NioServerSocketChannel.class); - } inboundBootstrap.bind().sync(); try { @@ -170,6 +149,35 @@ private void updateMap(Channel connected, IpAddress destAddr) { ipAddressChannelMap.put(destAddr, connected); } + private void configureClient() { + outgoingBootstrap.group(worker_group) + .handler(new PipelineChannelInitializer(this.callback, lifecycleListener, separateWorkerGroup)) + .localAddress(bind_addr, 0) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.TCP_NODELAY, true); + if (isNativeTransport) + outgoingBootstrap.channel(EpollSocketChannel.class); + else + outgoingBootstrap.channel(NioSocketChannel.class); + + } + + private void configureServer() { + inboundBootstrap.group(boss_group, worker_group) + .localAddress(bind_addr, port) + .childHandler(new PipelineChannelInitializer(this.callback, lifecycleListener, separateWorkerGroup)) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.SO_BACKLOG, 128) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.TCP_NODELAY, true); + if (isNativeTransport) { + inboundBootstrap.channel(EpollServerSocketChannel.class); + } else { + inboundBootstrap.channel(NioServerSocketChannel.class); + } + } + private static ByteBuf pack(ByteBufAllocator allocator, byte[] data, int offset, int length, byte[] replyAdder) { int allocSize = Integer.BYTES + length + Integer.BYTES + replyAdder.length; ByteBuf buf = allocator.buffer(allocSize); diff --git a/src/main/resources/tcp-nio.xml b/src/main/resources/tcp-nio.xml index d914159..fc84b83 100644 --- a/src/main/resources/tcp-nio.xml +++ b/src/main/resources/tcp-nio.xml @@ -10,11 +10,11 @@ xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd">