diff --git a/zyt-rpc-call/src/main/java/service/call/nio_call/NIOClientCall.java b/zyt-rpc-call/src/main/java/service/call/nio_call/NIOClientCall.java index ae59d22..f8e7943 100644 --- a/zyt-rpc-call/src/main/java/service/call/nio_call/NIOClientCall.java +++ b/zyt-rpc-call/src/main/java/service/call/nio_call/NIOClientCall.java @@ -6,7 +6,7 @@ //通用启动类 将启动的逻辑藏在ClientBootStrap中 public class NIOClientCall { - public static Customer main(String[] args){ + public static Customer main(String[] args) { return NIOClientBootStrap.start(); } } diff --git a/zyt-rpc-common/src/main/java/codec/AddCodec.java b/zyt-rpc-common/src/main/java/codec/AddCodec.java index 8177446..b10e57b 100644 --- a/zyt-rpc-common/src/main/java/codec/AddCodec.java +++ b/zyt-rpc-common/src/main/java/codec/AddCodec.java @@ -1,6 +1,5 @@ package codec; -import annotation.CodecSelector; import annotation.RpcSerializationSelector; import configuration.GlobalConfiguration; import entity.PersonPOJO; diff --git a/zyt-rpc-common/src/main/java/compress/diyzip/DiyZipUtils.java b/zyt-rpc-common/src/main/java/compress/diyzip/DiyZipUtils.java index 5d04368..da42239 100644 --- a/zyt-rpc-common/src/main/java/compress/diyzip/DiyZipUtils.java +++ b/zyt-rpc-common/src/main/java/compress/diyzip/DiyZipUtils.java @@ -15,13 +15,12 @@ public class DiyZipUtils implements CompressType { //线程独享的霍夫曼编码表 因为害怕线程不安全导致错误 所以设置线程独享 - private static final ThreadLocal> hashMapThreadLocal = ThreadLocal.withInitial(() -> new HashMap<>()); + private static final ThreadLocal> hashMapThreadLocal = ThreadLocal.withInitial(HashMap::new); @Override public byte[] compress(byte[] bytes) { - byte[] compressBytes = huffmanZip(bytes); - return compressBytes; + return huffmanZip(bytes); } @@ -63,7 +62,7 @@ private List getNodes(byte[] contentBytes) { return nodes; } - private class Node1 implements Comparable { + private static class Node1 implements Comparable { //数值 存放字符本身 Byte data; //权重 diff --git a/zyt-rpc-common/src/main/java/entity/Person.java b/zyt-rpc-common/src/main/java/entity/Person.java index 7cb5bea..ef1e7ec 100644 --- a/zyt-rpc-common/src/main/java/entity/Person.java +++ b/zyt-rpc-common/src/main/java/entity/Person.java @@ -7,6 +7,7 @@ //千万注意要实现序列化接口 //接口一定要实现正确 + /** * @author 祝英台炸油条 */ @@ -14,20 +15,18 @@ public class Person implements Serializable { private String name; //构造方法 - public Person(String name) - { + public Person(String name) { this.name = name; } public Person() { } - public String getName() - { + public String getName() { return name; } - public void setName(String name) - { + + public void setName(String name) { this.name = name; } diff --git a/zyt-rpc-common/src/main/java/loadbalance/AccessLoadBalance.java b/zyt-rpc-common/src/main/java/loadbalance/AccessLoadBalance.java index 042ed05..fdc66d1 100644 --- a/zyt-rpc-common/src/main/java/loadbalance/AccessLoadBalance.java +++ b/zyt-rpc-common/src/main/java/loadbalance/AccessLoadBalance.java @@ -6,7 +6,6 @@ import org.apache.zookeeper.ZooKeeper; import java.nio.charset.StandardCharsets; -import java.util.Collections; import java.util.List; /** @@ -27,7 +26,7 @@ public String loadBalance(ZooKeeper zooKeeper, String path) { } } //进行排序 根据每个节点的访问次数 从小到大进行排序 然后选用最小的 - Collections.sort(children, (o1, o2) -> { + children.sort((o1, o2) -> { try { return Integer.parseInt(new String(zooKeeper.getData(path + "/" + o1, false, null))) diff --git a/zyt-rpc-common/src/main/java/loadbalance/RandomLoadBalance.java b/zyt-rpc-common/src/main/java/loadbalance/RandomLoadBalance.java index 90b48c2..fc84cf3 100644 --- a/zyt-rpc-common/src/main/java/loadbalance/RandomLoadBalance.java +++ b/zyt-rpc-common/src/main/java/loadbalance/RandomLoadBalance.java @@ -20,7 +20,7 @@ public String loadBalance(ZooKeeper zooKeeper, String path) { try { children = zooKeeper.getChildren(path, null, null); } catch (KeeperException | InterruptedException e) { - log.error(e.getMessage(),e); + log.error(e.getMessage(), e); } assert children != null; if (children.isEmpty()) { @@ -42,7 +42,7 @@ public String loadBalance(ZooKeeper zooKeeper, String path) { ++visitedCount; zooKeeper.setData(path + "/" + chooseNode, String.valueOf(visitedCount).getBytes(StandardCharsets.UTF_8), -1); } catch (KeeperException | InterruptedException e) { - log.error(e.getMessage(),e); + log.error(e.getMessage(), e); } return chooseNode; } diff --git a/zyt-rpc-common/src/main/java/serialization/json/FastJsonUtils.java b/zyt-rpc-common/src/main/java/serialization/json/FastJsonUtils.java index a04b163..ba4d1d8 100644 --- a/zyt-rpc-common/src/main/java/serialization/json/FastJsonUtils.java +++ b/zyt-rpc-common/src/main/java/serialization/json/FastJsonUtils.java @@ -11,8 +11,6 @@ public class FastJsonUtils implements Serializer { @Override public byte[] serialize(Object obj) { - - // return JSON.toJSONString(obj,true).getBytes(StandardCharsets.UTF_8); return JSON.toJSONBytes(obj); } diff --git a/zyt-rpc-common/src/main/java/serialization/json/JacksonUtils.java b/zyt-rpc-common/src/main/java/serialization/json/JacksonUtils.java index 6bd3153..1c75ef1 100644 --- a/zyt-rpc-common/src/main/java/serialization/json/JacksonUtils.java +++ b/zyt-rpc-common/src/main/java/serialization/json/JacksonUtils.java @@ -17,23 +17,23 @@ public class JacksonUtils implements Serializer { static ObjectMapper mapper = new ObjectMapper(); @Override - public byte[] serialize(Object obj) { + public byte[] serialize(Object obj) { //将obj转换成json再将json转换成字节数组 try { return mapper.writeValueAsBytes(obj); } catch (JsonProcessingException e) { - log.error(e.getMessage(),e); + log.error(e.getMessage(), e); } return new byte[0]; } //将json字符串或者字节组转换为对应的类 @Override - public T deserialize(byte[] bytes, Class clazz){ + public T deserialize(byte[] bytes, Class clazz) { try { - return mapper.readValue(bytes,clazz); + return mapper.readValue(bytes, clazz); } catch (IOException e) { - log.error(e.getMessage(),e); + log.error(e.getMessage(), e); } return null; } diff --git a/zyt-rpc-consumer/src/main/java/consumer/bootstrap/nio/NIOConsumerBootStrap10.java b/zyt-rpc-consumer/src/main/java/consumer/bootstrap/nio/NIOConsumerBootStrap10.java index 7a96290..ab1dfd9 100644 --- a/zyt-rpc-consumer/src/main/java/consumer/bootstrap/nio/NIOConsumerBootStrap10.java +++ b/zyt-rpc-consumer/src/main/java/consumer/bootstrap/nio/NIOConsumerBootStrap10.java @@ -11,9 +11,7 @@ */ public class NIOConsumerBootStrap10 { public static void main(String[] args) { - //非阻塞启动 NIONonBlockingClient10.start("127.0.0.1", 6666); - } } diff --git a/zyt-rpc-consumer/src/main/java/consumer/bootstrap/nio/NIOConsumerBootStrap11.java b/zyt-rpc-consumer/src/main/java/consumer/bootstrap/nio/NIOConsumerBootStrap11.java index 4fe31fa..5a6ba46 100644 --- a/zyt-rpc-consumer/src/main/java/consumer/bootstrap/nio/NIOConsumerBootStrap11.java +++ b/zyt-rpc-consumer/src/main/java/consumer/bootstrap/nio/NIOConsumerBootStrap11.java @@ -11,9 +11,7 @@ */ public class NIOConsumerBootStrap11 { public static void main(String[] args) { - //阻塞启动 NIOBlockingClient11.start("127.0.0.1", 6666); - } } diff --git a/zyt-rpc-consumer/src/main/java/consumer/bootstrap/nio/NIOConsumerBootStrap12.java b/zyt-rpc-consumer/src/main/java/consumer/bootstrap/nio/NIOConsumerBootStrap12.java index 303548a..0e60252 100644 --- a/zyt-rpc-consumer/src/main/java/consumer/bootstrap/nio/NIOConsumerBootStrap12.java +++ b/zyt-rpc-consumer/src/main/java/consumer/bootstrap/nio/NIOConsumerBootStrap12.java @@ -13,7 +13,6 @@ */ public class NIOConsumerBootStrap12 { public static Customer main(String[] args) { - ClientProxyTool proxy = new ClientProxyTool(); return (Customer) proxy.getBean(Customer.class); } diff --git a/zyt-rpc-consumer/src/main/java/consumer/bootstrap/nio/NIOConsumerBootStrap14.java b/zyt-rpc-consumer/src/main/java/consumer/bootstrap/nio/NIOConsumerBootStrap14.java index f1a243c..2cfcec3 100644 --- a/zyt-rpc-consumer/src/main/java/consumer/bootstrap/nio/NIOConsumerBootStrap14.java +++ b/zyt-rpc-consumer/src/main/java/consumer/bootstrap/nio/NIOConsumerBootStrap14.java @@ -13,7 +13,6 @@ */ public class NIOConsumerBootStrap14 { public static Customer main(String[] args) { - ClientProxyTool proxy = new ClientProxyTool(); return (Customer) proxy.getBean(Customer.class); } diff --git a/zyt-rpc-consumer/src/main/java/consumer/bootstrap/nio/NIOConsumerBootStrap15.java b/zyt-rpc-consumer/src/main/java/consumer/bootstrap/nio/NIOConsumerBootStrap15.java index 2aa8d80..a2f22ec 100644 --- a/zyt-rpc-consumer/src/main/java/consumer/bootstrap/nio/NIOConsumerBootStrap15.java +++ b/zyt-rpc-consumer/src/main/java/consumer/bootstrap/nio/NIOConsumerBootStrap15.java @@ -13,7 +13,6 @@ */ public class NIOConsumerBootStrap15 { public static Customer main(String[] args) { - ClientProxyTool proxy = new ClientProxyTool(); return (Customer) proxy.getBean(Customer.class); } diff --git a/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient.java b/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient.java index 4277e3c..b0ebfd5 100644 --- a/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient.java +++ b/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient.java @@ -3,14 +3,15 @@ import java.lang.reflect.Method; //如果对应的是字符串类那就不用到下面的这个里面调用了 + /** * @author 祝英台炸油条 */ public class NettyClient { - public static Object callMethod(String hostName, int port, Object param, Method method){ + public static Object callMethod(String hostName, int port, Object param, Method method) { //用户根据自己想用的版本 打开对应的注解 // return NettyClient21.callMethod(hostName, port, param,method); // return NettyClient22.callMethod(hostName, port, param,method); - return NettyClient24.callMethod(hostName, port, param,method); + return NettyClient24.callMethod(hostName, port, param, method); } } diff --git a/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient20.java b/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient20.java index 6cdac50..48c34cb 100644 --- a/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient20.java +++ b/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient20.java @@ -25,7 +25,7 @@ public static void start(String hostName, int port) { .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { + protected void initChannel(SocketChannel socketChannel) { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new NettyClientHandler20()); } diff --git a/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient21.java b/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient21.java index b0b9763..a00d171 100644 --- a/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient21.java +++ b/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient21.java @@ -43,7 +43,7 @@ public static void initClient(String hostName, int port) { .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { + protected void initChannel(SocketChannel socketChannel) { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new StringEncoder());//传输必须加编解码器 不然不认识的类传不过去 pipeline.addLast(new StringDecoder()); diff --git a/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient22.java b/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient22.java index 4e200bc..9a490b2 100644 --- a/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient22.java +++ b/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient22.java @@ -19,6 +19,7 @@ //实际客户端启动类 进行操作 //不确定能返回什么 所以判断是对象 + /** * @author 祝英台炸油条 */ @@ -30,8 +31,7 @@ public class NettyClient22 { static NettyClientHandler22 clientHandler; - public static void initClient(String hostName,int port,Method method) - { + public static void initClient(String hostName, int port, Method method) { clientHandler = new NettyClientHandler22(); //建立客户端监听 @@ -43,11 +43,11 @@ public static void initClient(String hostName,int port,Method method) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { + protected void initChannel(SocketChannel socketChannel) { ChannelPipeline pipeline = socketChannel.pipeline(); //加编解码器的逻辑,根据对应的注解进行编码器的添加 这里面有实现对应的逻辑 // - AddCodec.addCodec(pipeline,method,true); + AddCodec.addCodec(pipeline, method, true); pipeline.addLast(clientHandler); } }); @@ -56,20 +56,20 @@ protected void initChannel(SocketChannel socketChannel) throws Exception { bootstrap.connect(hostName, port).sync(); } catch (InterruptedException e) { - log.error(e.getMessage(),e); + log.error(e.getMessage(), e); } } public static Object callMethod(String hostName, int port, Object param, Method method) { //我是有多个地方进行调用的 不能只连接一个 - initClient(hostName,port,method); + initClient(hostName, port, method); clientHandler.setParam(param); //接下来这就有关系到调用 直接调用 try { return executor.submit(clientHandler).get(); } catch (InterruptedException | ExecutionException e) { - log.error(e.getMessage(),e); + log.error(e.getMessage(), e); } return null; } diff --git a/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient24.java b/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient24.java index 5e56ec7..0355ce1 100644 --- a/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient24.java +++ b/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient24.java @@ -5,7 +5,6 @@ import codec.AddCodec; import configuration.GlobalConfiguration; import consumer.netty_client_handler.NettyClientHandler24; -import heartbeat.HeartBeat; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; @@ -51,7 +50,7 @@ public static Object callMethod(String hostName, int port, Object param, Method .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { + protected void initChannel(SocketChannel socketChannel) { ChannelPipeline pipeline = socketChannel.pipeline(); //加编解码器的逻辑,根据对应的注解进行编码器的添加 这里面有实现对应的逻辑 // diff --git a/zyt-rpc-consumer/src/main/java/consumer/netty_client_handler/NettyClientHandler20.java b/zyt-rpc-consumer/src/main/java/consumer/netty_client_handler/NettyClientHandler20.java index 0c332a1..f18e212 100644 --- a/zyt-rpc-consumer/src/main/java/consumer/netty_client_handler/NettyClientHandler20.java +++ b/zyt-rpc-consumer/src/main/java/consumer/netty_client_handler/NettyClientHandler20.java @@ -15,13 +15,13 @@ public class NettyClientHandler20 extends ChannelInboundHandlerAdapter { //通道就绪就会发的信息 @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { + public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.copiedBuffer("你好,服务端", CharsetUtil.UTF_8)); } //这个是收到信息 @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf buf = (ByteBuf) msg; log.info("收到来自" + ctx.channel().remoteAddress() + "的消息" + buf.toString(CharsetUtil.UTF_8)); } diff --git a/zyt-rpc-consumer/src/main/java/consumer/netty_client_handler/NettyClientHandler24.java b/zyt-rpc-consumer/src/main/java/consumer/netty_client_handler/NettyClientHandler24.java index 7995be4..7965004 100644 --- a/zyt-rpc-consumer/src/main/java/consumer/netty_client_handler/NettyClientHandler24.java +++ b/zyt-rpc-consumer/src/main/java/consumer/netty_client_handler/NettyClientHandler24.java @@ -2,7 +2,6 @@ import annotation.CompressFunction; -import compress.Compress; import compress.CompressTypeTool; import configuration.GlobalConfiguration; import io.netty.channel.ChannelHandlerContext; diff --git a/zyt-rpc-consumer/src/main/java/consumer/nio/NIOBlockingClient11.java b/zyt-rpc-consumer/src/main/java/consumer/nio/NIOBlockingClient11.java index a0a8981..8c5faba 100644 --- a/zyt-rpc-consumer/src/main/java/consumer/nio/NIOBlockingClient11.java +++ b/zyt-rpc-consumer/src/main/java/consumer/nio/NIOBlockingClient11.java @@ -43,6 +43,7 @@ private static void start0(String hostName, int port) { //输入输出通道都放在外面 try { + assert socketChannel != null; ObjectOutputStream outputStream = new ObjectOutputStream(socketChannel.socket().getOutputStream()); ObjectInputStream objectInputStream = new ObjectInputStream(socketChannel.socket().getInputStream()); //都是阻塞等待 发完了 接收完了 才能进行下一步 不然会报异常 @@ -65,6 +66,7 @@ private static void start0(String hostName, int port) { log.error(e.getMessage(), e); } finally { try { + assert socketChannel != null; socketChannel.close(); } catch (IOException ex) { log.error(ex.getMessage(), ex); diff --git a/zyt-rpc-consumer/src/main/java/consumer/nio/NIONonBlockingClient10.java b/zyt-rpc-consumer/src/main/java/consumer/nio/NIONonBlockingClient10.java index 160968c..f3ff3fe 100644 --- a/zyt-rpc-consumer/src/main/java/consumer/nio/NIONonBlockingClient10.java +++ b/zyt-rpc-consumer/src/main/java/consumer/nio/NIONonBlockingClient10.java @@ -60,13 +60,13 @@ private static void start0(String hostName, int port) { SocketChannel channel = (SocketChannel) key.channel(); int read = 1; //用这个的原因是怕 多线程出现影响 - StringBuilder stringBuffer = new StringBuilder(); + StringBuilder stringBuilder = new StringBuilder(); while (read != 0) { buffer.clear(); read = channel.read(buffer); - stringBuffer.append(new String(buffer.array(), 0, read)); + stringBuilder.append(new String(buffer.array(), 0, read)); } - log.info("收到服务端回信" + stringBuffer.toString()); + log.info("收到服务端回信" + stringBuilder.toString()); keyIterator.remove(); } } catch (IOException e) { @@ -80,7 +80,7 @@ private static void start0(String hostName, int port) { while (true) { int methodNum = scanner.nextInt(); String message = scanner.next(); - String msg = new String(methodNum + "#" + message); + String msg = methodNum + "#" + message; try { socketChannel.write(ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8))); } catch (IOException e) { diff --git a/zyt-rpc-consumer/src/main/java/consumer/nio/NIONonBlockingClient12.java b/zyt-rpc-consumer/src/main/java/consumer/nio/NIONonBlockingClient12.java index 8586f7a..834d219 100644 --- a/zyt-rpc-consumer/src/main/java/consumer/nio/NIONonBlockingClient12.java +++ b/zyt-rpc-consumer/src/main/java/consumer/nio/NIONonBlockingClient12.java @@ -27,8 +27,9 @@ public static String start(String hostName, int port, String msg) { private static String start0(String hostName, int port, String msg) { //得到一个网络通道 Selector selector = null; + SocketChannel socketChannel = null; try { - SocketChannel socketChannel = SocketChannel.open(); + socketChannel = SocketChannel.open(); log.info("-----------服务消费方启动-------------"); socketChannel.configureBlocking(false); //建立链接 非阻塞连接 但我们是要等他连接上 @@ -61,18 +62,19 @@ private static String start0(String hostName, int port, String msg) { SocketChannel channel = (SocketChannel) key.channel(); int read = 1; //用这个的原因是怕 多线程出现影响 - StringBuilder stringBuffer = new StringBuilder(); + StringBuilder stringBuilder = new StringBuilder(); while (read != 0) { buffer.clear(); read = channel.read(buffer); - stringBuffer.append(new String(buffer.array(), 0, read)); + stringBuilder.append(new String(buffer.array(), 0, read)); } - return stringBuffer.toString(); + return stringBuilder.toString(); } } catch (IOException e) { log.error(e.getMessage(), e); } } + } } diff --git a/zyt-rpc-consumer/src/test/java/NacosTest.java b/zyt-rpc-consumer/src/test/java/NacosTest.java index 6ea0ff9..3230c2d 100644 --- a/zyt-rpc-consumer/src/test/java/NacosTest.java +++ b/zyt-rpc-consumer/src/test/java/NacosTest.java @@ -1,4 +1,5 @@ +import lombok.extern.slf4j.Slf4j; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.client.methods.RequestBuilder; @@ -12,13 +13,22 @@ import static constants.RpcConstants.NACOS_DISCOVERY_ADDRESS; //进行测试获取 + +/** + * @author 祝英台炸油条 + */ +@Slf4j public class NacosTest { @Test - public void nacosDiscovery() throws IOException { + public void nacosDiscovery() { String serviceName = "nacos.test.1"; CloseableHttpClient httpClient = HttpClients.createDefault(); - HttpUriRequest request = RequestBuilder.get(URI.create(NACOS_DISCOVERY_ADDRESS +"serviceName="+serviceName)).build(); - CloseableHttpResponse response = httpClient.execute(request); + HttpUriRequest request = RequestBuilder.get(URI.create(NACOS_DISCOVERY_ADDRESS + "serviceName=" + serviceName)).build(); + try { + CloseableHttpResponse response = httpClient.execute(request); + } catch (IOException e) { + log.error(e.getMessage(), e); + } } } diff --git a/zyt-rpc-consumer/src/test/java/ZKServiceTest.java b/zyt-rpc-consumer/src/test/java/ZKServiceTest.java index f5776e0..6f3ea4c 100644 --- a/zyt-rpc-consumer/src/test/java/ZKServiceTest.java +++ b/zyt-rpc-consumer/src/test/java/ZKServiceTest.java @@ -1,5 +1,4 @@ -import annotation.LoadBalanceMethodImpl; -import configuration.GlobalConfiguration; +import lombok.extern.slf4j.Slf4j; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; @@ -9,31 +8,34 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; - -public class ZKServiceTest { - - private String connectString = "zytCentos:2181"; - private int sessionTimeout = 2000; +/** + * @author 祝英台炸油条 + */ +@Slf4j +class ZKServiceTest { @Test - public void connect() throws IOException, InterruptedException, KeeperException { - ZooKeeper zooKeeper = new ZooKeeper( - connectString, //连接地址 如果写多个那会 - sessionTimeout,//即超过该时间后,客户端没有向服务器端发送任何请求(正常情况下客户端会每隔一段时间发送心跳请求,此时服务器端会从新计算客户端的超时时间点的), - // 则服务器端认为session超时,清理数据。此时客户端的ZooKeeper对象就不再起作用了,需要再重新new一个新的对象了。 - null //监听器 暂时不设置 - ); - if (zooKeeper.exists("/test", null) == null) - zooKeeper.create("/test", "".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zooKeeper.create("/test/hello", "12".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + void connect() { + try { + String connectString = "zytCentos:2181"; + int sessionTimeout = 2000; + ZooKeeper zooKeeper = new ZooKeeper( + connectString, //连接地址 如果写多个那会 + sessionTimeout,//即超过该时间后,客户端没有向服务器端发送任何请求(正常情况下客户端会每隔一段时间发送心跳请求,此时服务器端会从新计算客户端的超时时间点的), + // 则服务器端认为session超时,清理数据。此时客户端的ZooKeeper对象就不再起作用了,需要再重新new一个新的对象了。 + null //监听器 暂时不设置 + ); + if (zooKeeper.exists("/test", null) == null) + zooKeeper.create("/test", "".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zooKeeper.create("/test/hello", "12".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } catch (IOException | KeeperException | InterruptedException e) { + log.error(e.getMessage(), e); + } } @Test - public void testInterface() { - LoadBalanceMethodImpl annotation = GlobalConfiguration.class.getAnnotation(LoadBalanceMethodImpl.class); - Class method = annotation.chosenMethod(); - + void testInterface() { } } diff --git a/zyt-rpc-provider/src/main/java/provider/netty/NettyServer22.java b/zyt-rpc-provider/src/main/java/provider/netty/NettyServer22.java index b2d60e7..c51be74 100644 --- a/zyt-rpc-provider/src/main/java/provider/netty/NettyServer22.java +++ b/zyt-rpc-provider/src/main/java/provider/netty/NettyServer22.java @@ -44,11 +44,17 @@ private static void start0(String methodName, int port) { .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态 .childHandler(new ChannelInitializer() { @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { + protected void initChannel(SocketChannel socketChannel) { log.info(socketChannel.remoteAddress() + "连接上了"); ChannelPipeline pipeline = socketChannel.pipeline(); //添加的处理器 根据相应的注解而定 - Method method = Class.forName("provider.api." + methodName + "ServiceImpl").getMethods()[0]; + Method method = null; + try { + method = Class.forName("provider.api." + methodName + "ServiceImpl").getMethods()[0]; + } catch (ClassNotFoundException e) { + log.error(e.getMessage(), e); + } + assert method != null; AddCodec.addCodec(pipeline, method, false); //传入的直接是方法本身了 而不是方法名字 pipeline.addLast(new NettyServerHandler22(methodName)); diff --git a/zyt-rpc-provider/src/main/java/provider/netty/NettyServer24.java b/zyt-rpc-provider/src/main/java/provider/netty/NettyServer24.java index b9fea28..944781f 100644 --- a/zyt-rpc-provider/src/main/java/provider/netty/NettyServer24.java +++ b/zyt-rpc-provider/src/main/java/provider/netty/NettyServer24.java @@ -44,11 +44,17 @@ private static void start0(String methodName, int port) { .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态 .childHandler(new ChannelInitializer() { @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { + protected void initChannel(SocketChannel socketChannel) { log.info(socketChannel.remoteAddress() + "连接上了"); ChannelPipeline pipeline = socketChannel.pipeline(); //添加的处理器 根据相应的注解而定 - Method method = Class.forName("provider.api." + methodName + "ServiceImpl").getMethods()[0]; + Method method = null; + try { + method = Class.forName("provider.api." + methodName + "ServiceImpl").getMethods()[0]; + } catch (ClassNotFoundException e) { + log.error(e.getMessage(), e); + } + assert method != null; AddCodec.addCodec(pipeline, method, false); //传入的直接是方法本身了 而不是方法名字 diff --git a/zyt-rpc-provider/src/main/java/provider/netty_server_handler/NettyServerHandler24.java b/zyt-rpc-provider/src/main/java/provider/netty_server_handler/NettyServerHandler24.java index 23679cd..01c7f74 100644 --- a/zyt-rpc-provider/src/main/java/provider/netty_server_handler/NettyServerHandler24.java +++ b/zyt-rpc-provider/src/main/java/provider/netty_server_handler/NettyServerHandler24.java @@ -9,6 +9,7 @@ import lombok.extern.slf4j.Slf4j; import serialization.SerializationTool; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -41,10 +42,16 @@ public NettyServerHandler24(String methodName) { //实现对应的方法 @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + public void channelRead(ChannelHandlerContext ctx, Object msg) { log.info("收到来自" + ctx.channel().remoteAddress() + "的信息"); //使用反射的方法获取对应的类 通过反射再进行执行 - Class calledClass = Class.forName("provider.api." + methodName + "ServiceImpl"); + Class calledClass = null; + try { + calledClass = Class.forName("provider.api." + methodName + "ServiceImpl"); + } catch (ClassNotFoundException e) { + log.error(e.getMessage(), e); + } + assert calledClass != null; Method[] methods = calledClass.getMethods(); Method method = methods[0]; @@ -66,8 +73,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception // } - Object instance = calledClass.newInstance(); - Object response = method.invoke(instance, msg); + Object response = null; + try { + Object instance = calledClass.newInstance(); + response = method.invoke(instance, msg); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + log.error(e.getMessage(), e); + } //获得对应信息并进行回传 //判断是否需要通过对应的方法进行序列化 序列化都集成了 @@ -85,7 +97,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception //出现异常的话 如何进行处理 @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.channel().close(); cause.printStackTrace(); } diff --git a/zyt-rpc-provider/src/main/java/provider/nio/NIONonBlockingServer10.java b/zyt-rpc-provider/src/main/java/provider/nio/NIONonBlockingServer10.java index 25d59e9..4ae0f13 100644 --- a/zyt-rpc-provider/src/main/java/provider/nio/NIONonBlockingServer10.java +++ b/zyt-rpc-provider/src/main/java/provider/nio/NIONonBlockingServer10.java @@ -94,17 +94,17 @@ private static void start0(int port) { ByteBuffer buffer = (ByteBuffer) key.attachment(); //进行调用方法并返回 //获得信息 - StringBuilder stringBuffer = new StringBuilder(); + StringBuilder stringBuilder = new StringBuilder(); int read = 1; while (read != 0) { //先清空 防止残留 buffer.clear(); read = socketChannel.read(buffer); //添加的时候 根据读入的数据进行 - stringBuffer.append(new String(buffer.array(), 0, read)); + stringBuilder.append(new String(buffer.array(), 0, read)); } //方法号和信息中间有个#进行分割 - String msg = stringBuffer.toString(); + String msg = stringBuilder.toString(); String[] strings = msg.split("#"); if (strings.length < 2) { //当出现传入错误的时候 报异常 diff --git a/zyt-rpc-provider/src/main/java/provider/nio/NIONonBlockingServer12bye.java b/zyt-rpc-provider/src/main/java/provider/nio/NIONonBlockingServer12bye.java index b482424..c7e4527 100644 --- a/zyt-rpc-provider/src/main/java/provider/nio/NIONonBlockingServer12bye.java +++ b/zyt-rpc-provider/src/main/java/provider/nio/NIONonBlockingServer12bye.java @@ -95,17 +95,17 @@ private static void start0(int port) { ByteBuffer buffer = (ByteBuffer) key.attachment(); //进行调用方法并返回 //获得信息 - StringBuilder stringBuffer = new StringBuilder(); + StringBuilder stringBuilder = new StringBuilder(); int read = 1; while (read != 0) { //先清空 防止残留 buffer.clear(); read = socketChannel.read(buffer); //添加的时候 根据读入的数据进行 - stringBuffer.append(new String(buffer.array(), 0, read)); + stringBuilder.append(new String(buffer.array(), 0, read)); } //方法号和信息中间有个#进行分割 - String msg = stringBuffer.toString(); + String msg = stringBuilder.toString(); String response; ByeService byeService = new ByeServiceImpl(); response = byeService.sayBye(msg); diff --git a/zyt-rpc-provider/src/main/java/provider/nio/NIONonBlockingServer12hello.java b/zyt-rpc-provider/src/main/java/provider/nio/NIONonBlockingServer12hello.java index e7d2566..749ff3a 100644 --- a/zyt-rpc-provider/src/main/java/provider/nio/NIONonBlockingServer12hello.java +++ b/zyt-rpc-provider/src/main/java/provider/nio/NIONonBlockingServer12hello.java @@ -95,17 +95,17 @@ private static void start0(int port) { ByteBuffer buffer = (ByteBuffer) key.attachment(); //进行调用方法并返回 //获得信息 - StringBuffer stringBuffer = new StringBuffer(); + StringBuilder stringBuilder = new StringBuilder(); int read = 1; while (read != 0) { //先清空 防止残留 buffer.clear(); read = socketChannel.read(buffer); //添加的时候 根据读入的数据进行 - stringBuffer.append(new String(buffer.array(), 0, read)); + stringBuilder.append(new String(buffer.array(), 0, read)); } - String msg = stringBuffer.toString(); + String msg = stringBuilder.toString(); HelloService helloService = new HelloServiceImpl(); String response = helloService.sayHello(msg); diff --git a/zyt-rpc-provider/src/main/java/provider/nio/NIONonBlockingServer14.java b/zyt-rpc-provider/src/main/java/provider/nio/NIONonBlockingServer14.java index 8571e1b..9d31e99 100644 --- a/zyt-rpc-provider/src/main/java/provider/nio/NIONonBlockingServer14.java +++ b/zyt-rpc-provider/src/main/java/provider/nio/NIONonBlockingServer14.java @@ -93,17 +93,17 @@ private static void start0(String method, int port) { ByteBuffer buffer = (ByteBuffer) key.attachment(); //进行调用方法并返回 //获得信息 - StringBuilder stringBuffer = new StringBuilder(); + StringBuilder stringBuilder = new StringBuilder(); int read = 1; while (read != 0) { //先清空 防止残留 buffer.clear(); read = socketChannel.read(buffer); //添加的时候 根据读入的数据进行 - stringBuffer.append(new String(buffer.array(), 0, read)); + stringBuilder.append(new String(buffer.array(), 0, read)); } - String msg = stringBuffer.toString(); + String msg = stringBuilder.toString(); //这里要有新逻辑了 根据获得的方法名 去找到相应的方法 //方法我们保存在固定位置 同时含有固定后缀 diff --git a/zyt-rpc-provider/src/main/java/provider/nio/NIONonBlockingServer15.java b/zyt-rpc-provider/src/main/java/provider/nio/NIONonBlockingServer15.java index f987980..0518564 100644 --- a/zyt-rpc-provider/src/main/java/provider/nio/NIONonBlockingServer15.java +++ b/zyt-rpc-provider/src/main/java/provider/nio/NIONonBlockingServer15.java @@ -63,6 +63,7 @@ private static void start0(String method, int port) { while (true) { //1秒钟无事发生的话 就继续 try { + assert selector != null; if (selector.select(1000) == 0) { continue; } @@ -94,17 +95,17 @@ private static void start0(String method, int port) { ByteBuffer buffer = (ByteBuffer) key.attachment(); //进行调用方法并返回 //获得信息 - StringBuffer stringBuffer = new StringBuffer(); + StringBuilder stringBuilder = new StringBuilder(); int read = 1; while (read != 0) { //先清空 防止残留 buffer.clear(); read = socketChannel.read(buffer); //添加的时候 根据读入的数据进行 - stringBuffer.append(new String(buffer.array(), 0, read)); + stringBuilder.append(new String(buffer.array(), 0, read)); } - String msg = stringBuffer.toString(); + String msg = stringBuilder.toString(); //这里要有新逻辑了 根据获得的方法名 去找到相应的方法 //方法我们保存在固定位置 同时含有固定后缀 diff --git a/zyt-rpc-provider/src/main/java/provider/test/ZKServiceTest.java b/zyt-rpc-provider/src/main/java/provider/test/ZKServiceTest.java index dde04ab..7727911 100644 --- a/zyt-rpc-provider/src/main/java/provider/test/ZKServiceTest.java +++ b/zyt-rpc-provider/src/main/java/provider/test/ZKServiceTest.java @@ -18,17 +18,21 @@ public class ZKServiceTest { @Test - public void connect() throws IOException, InterruptedException, KeeperException { + public void connect() { int sessionTimeout = 2000; String connectString = "zytCentos:2181"; - ZooKeeper zooKeeper = new ZooKeeper( - connectString, //连接地址 如果写多个那会 - sessionTimeout,//即超过该时间后,客户端没有向服务器端发送任何请求(正常情况下客户端会每隔一段时间发送心跳请求,此时服务器端会从新计算客户端的超时时间点的), - // 则服务器端认为session超时,清理数据。此时客户端的ZooKeeper对象就不再起作用了,需要再重新new一个新的对象了。 - null //监听器 暂时不设置 - ); - if (zooKeeper.exists("/test", null) == null) - zooKeeper.create("/test", "".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zooKeeper.create("/test/hello", "12".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + try { + ZooKeeper zooKeeper = new ZooKeeper( + connectString, //连接地址 如果写多个那会 + sessionTimeout,//即超过该时间后,客户端没有向服务器端发送任何请求(正常情况下客户端会每隔一段时间发送心跳请求,此时服务器端会从新计算客户端的超时时间点的), + // 则服务器端认为session超时,清理数据。此时客户端的ZooKeeper对象就不再起作用了,需要再重新new一个新的对象了。 + null //监听器 暂时不设置 + ); + if (zooKeeper.exists("/test", null) == null) + zooKeeper.create("/test", "".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zooKeeper.create("/test/hello", "12".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } catch (IOException | KeeperException | InterruptedException e) { + log.error(e.getMessage(), e); + } } } diff --git a/zyt-rpc-provider/src/main/java/provider/test/tes.java b/zyt-rpc-provider/src/main/java/provider/test/tes.java index a9fa50f..b63b059 100644 --- a/zyt-rpc-provider/src/main/java/provider/test/tes.java +++ b/zyt-rpc-provider/src/main/java/provider/test/tes.java @@ -8,9 +8,8 @@ **/ public class tes { @Test - public void test() - { - String s=null; - assert s!=null; + public void test() { + String s = null; + assert s != null; } }