Skip to content

Commit

Permalink
5.20 格式的修订 异常类的处理 crazy 遇到个就是之前不会注意的 就是日志显示级别
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzzzzzzyt committed May 20, 2022
1 parent 2c4f01d commit 9bf3b9b
Show file tree
Hide file tree
Showing 35 changed files with 146 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

//通用启动类 将启动的逻辑藏在ClientBootStrap中
public class NIOClientCall {
public static Customer main(String[] args){
public static Customer main(String[] args) {
return NIOClientBootStrap.start();
}
}
1 change: 0 additions & 1 deletion zyt-rpc-common/src/main/java/codec/AddCodec.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package codec;

import annotation.CodecSelector;
import annotation.RpcSerializationSelector;
import configuration.GlobalConfiguration;
import entity.PersonPOJO;
Expand Down
7 changes: 3 additions & 4 deletions zyt-rpc-common/src/main/java/compress/diyzip/DiyZipUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
public class DiyZipUtils implements CompressType {

//线程独享的霍夫曼编码表 因为害怕线程不安全导致错误 所以设置线程独享
private static final ThreadLocal<HashMap<Byte, String>> hashMapThreadLocal = ThreadLocal.withInitial(() -> new HashMap<>());
private static final ThreadLocal<HashMap<Byte, String>> hashMapThreadLocal = ThreadLocal.withInitial(HashMap::new);


@Override
public byte[] compress(byte[] bytes) {
byte[] compressBytes = huffmanZip(bytes);
return compressBytes;
return huffmanZip(bytes);
}


Expand Down Expand Up @@ -63,7 +62,7 @@ private List<Node1> getNodes(byte[] contentBytes) {
return nodes;
}

private class Node1 implements Comparable<Node1> {
private static class Node1 implements Comparable<Node1> {
//数值 存放字符本身
Byte data;
//权重
Expand Down
11 changes: 5 additions & 6 deletions zyt-rpc-common/src/main/java/entity/Person.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,26 @@
//千万注意要实现序列化接口

//接口一定要实现正确

/**
* @author 祝英台炸油条
*/
public class Person implements Serializable {
private String name;

//构造方法
public Person(String name)
{
public Person(String name) {
this.name = name;
}

public Person() {
}

public String getName()
{
public String getName() {
return name;
}
public void setName(String name)
{

public void setName(String name) {
this.name = name;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import org.apache.zookeeper.ZooKeeper;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;

/**
Expand All @@ -27,7 +26,7 @@ public String loadBalance(ZooKeeper zooKeeper, String path) {
}
}
//进行排序 根据每个节点的访问次数 从小到大进行排序 然后选用最小的
Collections.sort(children, (o1, o2) -> {
children.sort((o1, o2) -> {

try {
return Integer.parseInt(new String(zooKeeper.getData(path + "/" + o1, false, null)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public String loadBalance(ZooKeeper zooKeeper, String path) {
try {
children = zooKeeper.getChildren(path, null, null);
} catch (KeeperException | InterruptedException e) {
log.error(e.getMessage(),e);
log.error(e.getMessage(), e);
}
assert children != null;
if (children.isEmpty()) {
Expand All @@ -42,7 +42,7 @@ public String loadBalance(ZooKeeper zooKeeper, String path) {
++visitedCount;
zooKeeper.setData(path + "/" + chooseNode, String.valueOf(visitedCount).getBytes(StandardCharsets.UTF_8), -1);
} catch (KeeperException | InterruptedException e) {
log.error(e.getMessage(),e);
log.error(e.getMessage(), e);
}
return chooseNode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
public class FastJsonUtils implements Serializer {
@Override
public byte[] serialize(Object obj) {

// return JSON.toJSONString(obj,true).getBytes(StandardCharsets.UTF_8);
return JSON.toJSONBytes(obj);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,23 @@ public class JacksonUtils implements Serializer {
static ObjectMapper mapper = new ObjectMapper();

@Override
public byte[] serialize(Object obj) {
public byte[] serialize(Object obj) {
//将obj转换成json再将json转换成字节数组
try {
return mapper.writeValueAsBytes(obj);
} catch (JsonProcessingException e) {
log.error(e.getMessage(),e);
log.error(e.getMessage(), e);
}
return new byte[0];
}

//将json字符串或者字节组转换为对应的类
@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz){
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
try {
return mapper.readValue(bytes,clazz);
return mapper.readValue(bytes, clazz);
} catch (IOException e) {
log.error(e.getMessage(),e);
log.error(e.getMessage(), e);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
*/
public class NIOConsumerBootStrap10 {
public static void main(String[] args) {

//非阻塞启动
NIONonBlockingClient10.start("127.0.0.1", 6666);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
*/
public class NIOConsumerBootStrap11 {
public static void main(String[] args) {

//阻塞启动
NIOBlockingClient11.start("127.0.0.1", 6666);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
public class NIOConsumerBootStrap12 {
public static Customer main(String[] args) {

ClientProxyTool proxy = new ClientProxyTool();
return (Customer) proxy.getBean(Customer.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
public class NIOConsumerBootStrap14 {
public static Customer main(String[] args) {

ClientProxyTool proxy = new ClientProxyTool();
return (Customer) proxy.getBean(Customer.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
public class NIOConsumerBootStrap15 {
public static Customer main(String[] args) {

ClientProxyTool proxy = new ClientProxyTool();
return (Customer) proxy.getBean(Customer.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
import java.lang.reflect.Method;

//如果对应的是字符串类那就不用到下面的这个里面调用了

/**
* @author 祝英台炸油条
*/
public class NettyClient {
public static Object callMethod(String hostName, int port, Object param, Method method){
public static Object callMethod(String hostName, int port, Object param, Method method) {
//用户根据自己想用的版本 打开对应的注解
// return NettyClient21.callMethod(hostName, port, param,method);
// return NettyClient22.callMethod(hostName, port, param,method);
return NettyClient24.callMethod(hostName, port, param,method);
return NettyClient24.callMethod(hostName, port, param, method);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public static void start(String hostName, int port) {
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new NettyClientHandler20());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static void initClient(String hostName, int port) {
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringEncoder());//传输必须加编解码器 不然不认识的类传不过去
pipeline.addLast(new StringDecoder());
Expand Down
14 changes: 7 additions & 7 deletions zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient22.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

//实际客户端启动类 进行操作
//不确定能返回什么 所以判断是对象

/**
* @author 祝英台炸油条
*/
Expand All @@ -30,8 +31,7 @@ public class NettyClient22 {

static NettyClientHandler22 clientHandler;

public static void initClient(String hostName,int port,Method method)
{
public static void initClient(String hostName, int port, Method method) {

clientHandler = new NettyClientHandler22();
//建立客户端监听
Expand All @@ -43,11 +43,11 @@ public static void initClient(String hostName,int port,Method method)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();

//加编解码器的逻辑,根据对应的注解进行编码器的添加 这里面有实现对应的逻辑 //
AddCodec.addCodec(pipeline,method,true);
AddCodec.addCodec(pipeline, method, true);
pipeline.addLast(clientHandler);
}
});
Expand All @@ -56,20 +56,20 @@ protected void initChannel(SocketChannel socketChannel) throws Exception {
bootstrap.connect(hostName, port).sync();

} catch (InterruptedException e) {
log.error(e.getMessage(),e);
log.error(e.getMessage(), e);
}
}

public static Object callMethod(String hostName, int port, Object param, Method method) {

//我是有多个地方进行调用的 不能只连接一个
initClient(hostName,port,method);
initClient(hostName, port, method);
clientHandler.setParam(param);
//接下来这就有关系到调用 直接调用
try {
return executor.submit(clientHandler).get();
} catch (InterruptedException | ExecutionException e) {
log.error(e.getMessage(),e);
log.error(e.getMessage(), e);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import codec.AddCodec;
import configuration.GlobalConfiguration;
import consumer.netty_client_handler.NettyClientHandler24;
import heartbeat.HeartBeat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
Expand Down Expand Up @@ -51,7 +50,7 @@ public static Object callMethod(String hostName, int port, Object param, Method
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();

//加编解码器的逻辑,根据对应的注解进行编码器的添加 这里面有实现对应的逻辑 //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ public class NettyClientHandler20 extends ChannelInboundHandlerAdapter {

//通道就绪就会发的信息
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.copiedBuffer("你好,服务端", CharsetUtil.UTF_8));
}

//这个是收到信息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
log.info("收到来自" + ctx.channel().remoteAddress() + "的消息" + buf.toString(CharsetUtil.UTF_8));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@


import annotation.CompressFunction;
import compress.Compress;
import compress.CompressTypeTool;
import configuration.GlobalConfiguration;
import io.netty.channel.ChannelHandlerContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ private static void start0(String hostName, int port) {
//输入输出通道都放在外面

try {
assert socketChannel != null;
ObjectOutputStream outputStream = new ObjectOutputStream(socketChannel.socket().getOutputStream());
ObjectInputStream objectInputStream = new ObjectInputStream(socketChannel.socket().getInputStream());
//都是阻塞等待 发完了 接收完了 才能进行下一步 不然会报异常
Expand All @@ -65,6 +66,7 @@ private static void start0(String hostName, int port) {
log.error(e.getMessage(), e);
} finally {
try {
assert socketChannel != null;
socketChannel.close();
} catch (IOException ex) {
log.error(ex.getMessage(), ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ private static void start0(String hostName, int port) {
SocketChannel channel = (SocketChannel) key.channel();
int read = 1;
//用这个的原因是怕 多线程出现影响
StringBuilder stringBuffer = new StringBuilder();
StringBuilder stringBuilder = new StringBuilder();
while (read != 0) {
buffer.clear();
read = channel.read(buffer);
stringBuffer.append(new String(buffer.array(), 0, read));
stringBuilder.append(new String(buffer.array(), 0, read));
}
log.info("收到服务端回信" + stringBuffer.toString());
log.info("收到服务端回信" + stringBuilder.toString());
keyIterator.remove();
}
} catch (IOException e) {
Expand All @@ -80,7 +80,7 @@ private static void start0(String hostName, int port) {
while (true) {
int methodNum = scanner.nextInt();
String message = scanner.next();
String msg = new String(methodNum + "#" + message);
String msg = methodNum + "#" + message;
try {
socketChannel.write(ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ public static String start(String hostName, int port, String msg) {
private static String start0(String hostName, int port, String msg) {
//得到一个网络通道
Selector selector = null;
SocketChannel socketChannel = null;
try {
SocketChannel socketChannel = SocketChannel.open();
socketChannel = SocketChannel.open();
log.info("-----------服务消费方启动-------------");
socketChannel.configureBlocking(false);
//建立链接 非阻塞连接 但我们是要等他连接上
Expand Down Expand Up @@ -61,18 +62,19 @@ private static String start0(String hostName, int port, String msg) {
SocketChannel channel = (SocketChannel) key.channel();
int read = 1;
//用这个的原因是怕 多线程出现影响
StringBuilder stringBuffer = new StringBuilder();
StringBuilder stringBuilder = new StringBuilder();
while (read != 0) {
buffer.clear();
read = channel.read(buffer);
stringBuffer.append(new String(buffer.array(), 0, read));
stringBuilder.append(new String(buffer.array(), 0, read));
}
return stringBuffer.toString();
return stringBuilder.toString();
}
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}

}
}

Expand Down
Loading

0 comments on commit 9bf3b9b

Please sign in to comment.