Skip to content

Commit

Permalink
Code refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
baizel committed May 19, 2020
1 parent f9dbe5e commit 5b0a2e9
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 27 deletions.
58 changes: 33 additions & 25 deletions src/main/java/org/jgroups/blocks/cs/netty/NettyConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ public class NettyConnection {

private final EventExecutorGroup separateWorkerGroup = new DefaultEventExecutorGroup(4);
private final Bootstrap outgoingBootstrap = new Bootstrap();
private final ServerBootstrap inboundBootstrap = new ServerBootstrap();
private final Map<IpAddress, Channel> ipAddressChannelMap = new HashMap<>();
private byte[] replyAdder = null;
private int port;
private InetAddress bind_addr;
private EventLoopGroup boss_group; // Handles incoming connections
private EventLoopGroup boss_group; // Only handles incoming connections
private EventLoopGroup worker_group;
private boolean isNativeTransport;
private NettyReceiverListener callback;
Expand All @@ -66,33 +67,11 @@ public void channelRead(Channel channel, IpAddress sender) {
updateMap(channel, sender);
}
};

outgoingBootstrap.group(worker_group)
.handler(new PipelineChannelInitializer(this.callback, lifecycleListener, separateWorkerGroup))
.localAddress(bind_addr, 0)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.TCP_NODELAY, true);
if (isNativeTransport)
outgoingBootstrap.channel(EpollSocketChannel.class);
else
outgoingBootstrap.channel(NioSocketChannel.class);
configureClient();
configureServer();
}

public void run() throws InterruptedException, BindException, Errors.NativeIoException {
ServerBootstrap inboundBootstrap = new ServerBootstrap();
inboundBootstrap.group(boss_group, worker_group)
.localAddress(bind_addr, port)
.childHandler(new PipelineChannelInitializer(this.callback, lifecycleListener, separateWorkerGroup))
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.TCP_NODELAY, true);
if (isNativeTransport) {
inboundBootstrap.channel(EpollServerSocketChannel.class);
} else {
inboundBootstrap.channel(NioServerSocketChannel.class);
}
inboundBootstrap.bind().sync();

try {
Expand Down Expand Up @@ -170,6 +149,35 @@ private void updateMap(Channel connected, IpAddress destAddr) {
ipAddressChannelMap.put(destAddr, connected);
}

private void configureClient() {
outgoingBootstrap.group(worker_group)
.handler(new PipelineChannelInitializer(this.callback, lifecycleListener, separateWorkerGroup))
.localAddress(bind_addr, 0)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.TCP_NODELAY, true);
if (isNativeTransport)
outgoingBootstrap.channel(EpollSocketChannel.class);
else
outgoingBootstrap.channel(NioSocketChannel.class);

}

private void configureServer() {
inboundBootstrap.group(boss_group, worker_group)
.localAddress(bind_addr, port)
.childHandler(new PipelineChannelInitializer(this.callback, lifecycleListener, separateWorkerGroup))
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.TCP_NODELAY, true);
if (isNativeTransport) {
inboundBootstrap.channel(EpollServerSocketChannel.class);
} else {
inboundBootstrap.channel(NioServerSocketChannel.class);
}
}

private static ByteBuf pack(ByteBufAllocator allocator, byte[] data, int offset, int length, byte[] replyAdder) {
int allocSize = Integer.BYTES + length + Integer.BYTES + replyAdder.length;
ByteBuf buf = allocator.buffer(allocSize);
Expand Down
4 changes: 2 additions & 2 deletions src/main/resources/tcp-nio.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd">
<TCP_NIO2 bind_port="7800"
thread_pool.min_threads="0"
thread_pool.max_threads="20"
thread_pool.max_threads="200"
thread_pool.keep_alive_time="30000"/>

<TCPPING async_discovery="true"
initial_hosts="${jgroups.tcpping.initial_hosts:localhost[7800],localhost[7801]}"
initial_hosts="${jgroups.tcpping.initial_hosts:10.128.0.4[7800],10.128.0.3[7800]}"
port_range="3"/>
<MERGE3 min_interval="10000"
max_interval="30000"/>
Expand Down

0 comments on commit 5b0a2e9

Please sign in to comment.