Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iMessageExecutorPool 关于线程模型 #14

Open
testwen00 opened this issue Dec 18, 2020 · 33 comments
Open

iMessageExecutorPool 关于线程模型 #14

testwen00 opened this issue Dec 18, 2020 · 33 comments

Comments

@testwen00
Copy link

testwen00 commented Dec 18, 2020

在一个pc上测试,运行两个KcpRttExampleClient,连接KcpRttExampleServer
ScheduleTask,ReadTask,WriteTask其实都是一个线程在运行,都是串行在操作,这设计是为何?

@l42111996
Copy link
Owner

运行两个KcpRttExampleClient 是两个进程吗? 对于单个Ukcp的连接来说 ScheduleTask,ReadTask,WriteTask其实都是一个线程在运行,都是串行在操作。对于多个Ukcp来说他们都是并行的,这样子业务都是无锁的,这样子单连接串行多连接并行充分利用了多核,对cpu的缓存也友好

@l42111996
Copy link
Owner

如果是同一个进程的话,最佳实践是new一个KcpRttExampleClient实例然后多次调用connect不同的server

@testwen00
Copy link
Author

希望读和写能分开线程,如果单线程,这样怕性能上有影响。

@l42111996
Copy link
Owner

这里的线程只处理kcp层的逻辑不会处理网络io,逻辑通过visual分析不是瓶颈,测试的单线程单连接带fec的逻辑处理预计每秒可以处理6万个包,每个包如果算1K的话也有6W1K字节,这个应该满足99%的业务了,而且并行的话最少可以跑到70%cpu+的性能,整体吞吐量等于 cpu数6W1K70%
如果读写分离必须要加锁,单连接的吞吐可能上去20%,整个系统吞吐量会降低很多。

@testwen00
Copy link
Author

实际测试下来,性能有影响,发送方数据包在20w以上,receivequeue或writequeue数据一多,就造成所有数据处理延迟,发送不及时,最终服务端读取超时。

@testwen00
Copy link
Author

例如用 #15 的例子,接收数据非常缓慢,耗时很长

@l42111996
Copy link
Owner

例如用 #15 的例子,接收数据非常缓慢,耗时很长
上面的问题在我本地没跑出来,从上面的日志分析可能是带宽满了或者你局域网路由器达到极限了,局域网达到了50%的丢包率,
可以试试再本地跑几个虚拟机试试

@testwen00
Copy link
Author

1、我的是pc机,本机运行,用的无线网卡。
2、数据包要大,200000字节。

我的猜测是数据包太大,拆分成了很多分片,每个分片都需要ack来回,造成了时间延长。

@l42111996
Copy link
Owner

1、我的是pc机,本机运行,用的无线网卡。
2、数据包要大,200000字节。

我的猜测是数据包太大,拆分成了很多分片,每个分片都需要ack来回,造成了时间延长。

我这边用你的例子在本地跑了两个进程,连接的127.0.0.1,看带宽峰值在38MB,来回rtt打印的在10ms以内,所以我感觉可能是网卡问题或者路由器问题,快手那边用这个库做过类似的测试瓶颈也出现在了公司内部路由器。

@testwen00
Copy link
Author

测算下来,耗时在fec编,解码,平均耗时1毫秒,累计起来,延迟增加。

@l42111996
Copy link
Owner

测算下来,耗时在fec编,解码,平均耗时1毫秒,累计起来,延迟增加。

啥配置的cpu?怎么这么耗时,fec的编码可以独立出去搞个线程 ,解码现在还不好搞出去单独搞个线程

@testwen00
Copy link
Author

testwen00 commented Dec 23, 2020

i7-7700 3.6GHz
内存16g
测试10000个字节以下性能很好。
100000个字节,就不行了。

打印的时间代码 ReedSolomon:

    long start = System.currentTimeMillis();
    // Do the coding.
    LOOP.codeSomeShards(
            parityRows,
            shards, dataShardCount,
            outputs, parityShardCount,
            offset, byteCount);
    long end = System.currentTimeMillis();
    if ((end - start) > 0)
        System.out.println("codeSomeShards use time = " + (end - start));

@l42111996
Copy link
Owner

i7-7700 3.6GHz

内存16g

测试10000个字节以下性能很好。

100000个字节,就不行了。

打印的时间代码 ReedSolomon:

    long start = System.currentTimeMillis();

    // Do the coding.

    LOOP.codeSomeShards(

            parityRows,

            shards, dataShardCount,

            outputs, parityShardCount,

            offset, byteCount);

    long end = System.currentTimeMillis();

    if ((end - start) > 0)

        System.out.println("codeSomeShards use time = " + (end - start));

Windows操作系统吗?明天我测试一下试试

@testwen00
Copy link
Author

testwen00 commented Dec 23, 2020

windows10
jdk1.8

@l42111996
Copy link
Owner

windows10
jdk1.8

有完整的测试代码吗?

@testwen00
Copy link
Author

testwen00 commented Dec 24, 2020

KcpRttExampleServer:

   ChannelConfig channelConfig = new ChannelConfig();
    channelConfig.nodelay(true,40,2,true);
    channelConfig.setSndwnd(1024);
    channelConfig.setRcvwnd(1024);
    channelConfig.setMtu(1400);
    channelConfig.setFecDataShardCount(10);
    channelConfig.setFecParityShardCount(3);
    channelConfig.setAckNoDelay(true);
    channelConfig.setTimeoutMillis(10000);
    channelConfig.setUseConvChannel(true);
    channelConfig.setCrc32Check(false);

KcpRttExampleClient:

public class KcpRttExampleClient implements KcpListener {
private static final Logger log = LoggerFactory.getLogger(KcpRttExampleClient.class);
private final ByteBuf data;
private int[] rtts;
private volatile int count;
private ScheduledExecutorService scheduleSrv;
private ScheduledFuture<?> future = null;
private final long startTime ;

public KcpRttExampleClient() {
    data = Unpooled.buffer(200000);
    for (int i = 0; i < data.capacity(); i++) {
        data.writeByte((byte) i);
    }
    rtts = new int[30000];
    for (int i = 0; i < rtts.length; i++) {
        rtts[i] = -1;
    }
    startTime = System.currentTimeMillis();
    scheduleSrv = new ScheduledThreadPoolExecutor(1);
}

public static void main(String[] args) {
    ChannelConfig channelConfig = new ChannelConfig();
    channelConfig.nodelay(true,10,2,true);
    channelConfig.setSndwnd(1024);
    channelConfig.setRcvwnd(1024);
    channelConfig.setMtu(1400);
    channelConfig.setAckNoDelay(true);
    channelConfig.setConv(55);
    channelConfig.setFecDataShardCount(10);
    channelConfig.setFecParityShardCount(3);
    channelConfig.setCrc32Check(false);
    KcpClient kcpClient = new KcpClient();
    kcpClient.init(channelConfig);
    KcpRttExampleClient kcpClientRttExample = new KcpRttExampleClient();
    kcpClient.connect(new InetSocketAddress("127.0.0.1",20003),channelConfig,kcpClientRttExample);
}

@Override
public void onConnected(Ukcp ukcp) {
    future = scheduleSrv.scheduleWithFixedDelay(() -> {
        ByteBuf byteBuf = rttMsg(++count);
        ukcp.write(byteBuf);
        byteBuf.release();
        if (count >= rtts.length) {
            // finish
            future.cancel(true);
            byteBuf = rttMsg(-1);
            ukcp.write(byteBuf);
            byteBuf.release();

        }
    }, 5, 5, TimeUnit.MILLISECONDS);
}

@Override
public void handleReceive(ByteBuf byteBuf, Ukcp ukcp) {
    int curCount = byteBuf.readShort();
    if (curCount == -1) {
        scheduleSrv.schedule(new Runnable() {
            @Override
            public void run() {
                int sum = 0;
                for (int rtt : rtts) {
                    sum += rtt;
                }
                System.out.println("average: "+ (sum / rtts.length));
                System.out.println(Snmp.snmp.toString());
                ukcp.close();
                //ukcp.setTimeoutMillis(System.currentTimeMillis());
                System.exit(0);
            }
        }, 3, TimeUnit.SECONDS);
    } else {
        int idx = curCount - 1;
        long time = byteBuf.readInt();
        if (rtts[idx] != -1) {
            System.out.println("???");
        }
        //log.info("rcv count {} {}", curCount, System.currentTimeMillis());
        rtts[idx] = (int) (System.currentTimeMillis() - startTime - time);
        System.out.println("rtt : "+ curCount+"  "+ rtts[idx]);
    }
}

@Override
public void handleException(Throwable ex, Ukcp kcp)
{
    ex.printStackTrace();
}

@Override
public void handleClose(Ukcp kcp) {
    scheduleSrv.shutdown();
    try {
        scheduleSrv.awaitTermination(3, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    int sum = 0;
    int max = 0;
    for (int rtt : rtts) {
        if(rtt>max){
            max = rtt;
        }
        sum += rtt;
    }
    System.out.println("average: "+ (sum / rtts.length)+" max:"+max);
    System.out.println(Snmp.snmp.toString());
    System.out.println("lost percent: "+(Snmp.snmp.RetransSegs.doubleValue()/Snmp.snmp.OutPkts.doubleValue()));


}


/**
 * count+timestamp+dataLen+data
 *
 * @param count
 * @return
 */
public ByteBuf rttMsg(int count) {
    ByteBuf buf = Unpooled.buffer(200000);
    buf.writeShort(count);
    buf.writeInt((int) (System.currentTimeMillis() - startTime));

    //int dataLen = new Random().nextInt(200);
    //buf.writeBytes(new byte[dataLen]);

    int dataLen = data.readableBytes();
    buf.writeShort(dataLen);
    buf.writeBytes(data, data.readerIndex(), dataLen);
    return buf;
}

}

@l42111996
Copy link
Owner

系统:windows10 cpu: i7-3770 16G内存跑这个程序分析瓶颈在网络io,带宽跑到了600Mb
image
线程分析
image
kcp所有业务(fec编码解码,kcp组包合包等)占用了60%还没到瓶颈
因为windows下面是select模型性能会差一些 ,如果是linux或者mac下面会好很多

@testwen00
Copy link
Author

cpu为何如此之高,如果开2个连接,直接占用到90%了。这点无法使用,能否优化?
我的无线网卡是300m的,同样条件下,tcp的情况要好很多,kcp的优势就没体现出来。
fec算法部分是否考虑用c++,java调用dll方式呢?

@l42111996
Copy link
Owner

cpu为何如此之高,如果开2个连接,直接占用到90%了。这点无法使用,能否优化?
单个连接只会有2个线程处理,一个处理网络收发,一个处理kcp业务和fec编解码,最多占用两核,2个连接最多占用4核,你那边i7-7700是8线程的应该最多占用50%,得分析一下别的什么业务占用了cpu,比如windows下360会洗流量占用cpu

我的无线网卡是300m的,同样条件下,tcp的情况要好很多,kcp的优势就没体现出来。
tcp有控流,kcp没有控流,如果流量打满了tcp确实会好很多

fec算法部分是否考虑用c++,java调用dll方式呢?
java版本的fec确实跟c++和go版本有点差距,用dll的话需要点时间,这个版本的所有内存都是堆外的不受jvm的gc管制,得把堆外内存拷贝到堆内再传到c++层去处理这样子gc会加大具体性能不好评估。

@l42111996
Copy link
Owner

试试1.4版本,fec会好一点

@l42111996
Copy link
Owner

优化后的fec单核每秒大概400MB 加密+解密,可以满足需求了,有时间再进一步优化成c艹版本

@testwen00
Copy link
Author

刚买个650m的无线网卡,:)
优化效果有,但延时还是越来越严重,cpu单个连接占用在40~50%之间。
rtt : 3733 6415
rtt : 3734 6399
rtt : 3735 6407
rtt : 3736 6400
rtt : 3737 6382
rtt : 3738 6418
rtt : 3739 6411
rtt : 3740 6405
rtt : 3741 6398
rtt : 3742 6393
rtt : 3743 6416
rtt : 3744 6418
rtt : 3745 6419
rtt : 3746 6419
rtt : 3747 6412
rtt : 3748 6421
rtt : 3749 6414
时间会越来越长,看来只适合小数据包通信。

@l42111996
Copy link
Owner

650Mb的网卡跑满也就81.25MB,上面那个例子轻松跑满网卡,有延迟是正常的,瓶颈还是在这个无线网卡

@l42111996
Copy link
Owner

在不丢包的环境,kcp没法跟tcp来比较的,因为窗口是固定的,在丢包环境下kcp会相对好挺多,kcp更多的应用是一定丢包得网络下低延迟要求的场景。

@testwen00
Copy link
Author

我找到了c++版本的reed-solomon,和目前java版本代码一样,地址:https://github.com/DrPizza/reed-solomon
c++运行通过,但java包装调用dll,c++不熟,没有成功,请有空帮助一下。

@l42111996
Copy link
Owner

我找到了c++版本的reed-solomon,和目前java版本代码一样,地址:https://github.com/DrPizza/reed-solomon
c++运行通过,但java包装调用dll,c++不熟,没有成功,请有空帮助一下。

过段时间,最近挺忙的,等忙完这阵

@l42111996
Copy link
Owner

看看nativeFec分支,jni调用c版本的fec,现在我只生成了mac版本lib,回头我提交c代码可以编译自己平台版本。
java与jni c的fec性能对比:
java :420MB/s
jni c :750MB/s

@l42111996
Copy link
Owner

我找到了c++版本的reed-solomon,和目前java版本代码一样,地址:https://github.com/DrPizza/reed-solomon
c++运行通过,但java包装调用dll,c++不熟,没有成功,请有空帮助一下。

https://github.com/l42111996/reedsolomon_jni

@testwen00
Copy link
Author

我一直尝试的用的jna,过年期间试下,非常感谢

@testwen00
Copy link
Author

windwos dylib加载不了,这个是mac用的吧

native\libjni.dylib: Can't load this .dll (machine code=0x7) on a AMD 64-bit platform
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at com.backblaze.erasure.fecNative.ReedSolomonC.(ReedSolomonC.java:13)
at com.backblaze.erasure.fecNative.ReedSolomonNative.(ReedSolomonNative.java:12)
at com.backblaze.erasure.FecAdapt.(FecAdapt.java:20)
at test.KcpRttExampleServer.main(KcpRttExampleServer.java:27)

@l42111996
Copy link
Owner

windwos dylib加载不了,这个是mac用的吧

native\libjni.dylib: Can't load this .dll (machine code=0x7) on a AMD 64-bit platform
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at com.backblaze.erasure.fecNative.ReedSolomonC.(ReedSolomonC.java:13)
at com.backblaze.erasure.fecNative.ReedSolomonNative.(ReedSolomonNative.java:12)
at com.backblaze.erasure.FecAdapt.(FecAdapt.java:20)
at test.KcpRttExampleServer.main(KcpRttExampleServer.java:27)

https://github.com/l42111996/reedsolomon_jni

这个需要你自己百度编译一下,我没有windows的电脑,放到dylib同级别的目录就行了

@testwen00
Copy link
Author

testwen00 commented Feb 23, 2021

CLion-2020.3.2.win + mingw,编译通过,生成dll,可以加载

另用上面的测试代码,会报错

java.io.IOException: No enough bytes of head
at kcp.Ukcp.input(Ukcp.java:170)
at kcp.Ukcp.input(Ukcp.java:156)
at kcp.ReadTask.execute(ReadTask.java:58)
at threadPool.netty.NettyMessageExecutor.lambda$execute$0(NettyMessageExecutor.java:32)
at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)

KcpRttExampleClient:

public class KcpRttExampleClient implements KcpListener {

private final ByteBuf data;

private int[] rtts;

private volatile int count;

private ScheduledExecutorService scheduleSrv;

private ScheduledFuture<?> future = null;

private final long startTime ;

public KcpRttExampleClient() {
    data = Unpooled.buffer(200000);
    for (int i = 0; i < data.capacity(); i++) {
        data.writeByte((byte) i);
    }

    rtts = new int[30000];
    for (int i = 0; i < rtts.length; i++) {
        rtts[i] = -1;
    }
    startTime = System.currentTimeMillis();
    scheduleSrv = new ScheduledThreadPoolExecutor(1);
}

public static void main(String[] args) {
    ChannelConfig channelConfig = new ChannelConfig();
    channelConfig.nodelay(true,40,2,true);
    channelConfig.setSndwnd(1024);
    channelConfig.setRcvwnd(1024);
    channelConfig.setMtu(1400);
    channelConfig.setAckNoDelay(true);
    channelConfig.setConv(55);

    channelConfig.setFecAdapt(new FecAdapt(10,3));
    channelConfig.setCrc32Check(true);
    //channelConfig.setTimeoutMillis(10000);
    //channelConfig.setAckMaskSize(32);
    KcpClient kcpClient = new KcpClient();
    kcpClient.init(channelConfig);

    KcpRttExampleClient kcpClientRttExample = new KcpRttExampleClient();
    kcpClient.connect(new InetSocketAddress("127.0.0.1",20003),channelConfig,kcpClientRttExample);

    //kcpClient.connect(new InetSocketAddress("10.60.100.191",20003),channelConfig,kcpClientRttExample);
}

@Override
public void onConnected(Ukcp ukcp) {
    future = scheduleSrv.scheduleWithFixedDelay(() -> {
        ByteBuf byteBuf = rttMsg(++count);
        ukcp.write(byteBuf);
        byteBuf.release();
        if (count >= rtts.length) {
            // finish
            future.cancel(true);
            byteBuf = rttMsg(-1);
            ukcp.write(byteBuf);
            byteBuf.release();

        }
    }, 20, 20, TimeUnit.MILLISECONDS);
}

@Override
public void handleReceive(ByteBuf byteBuf, Ukcp ukcp) {
    int curCount = byteBuf.readShort();

    if (curCount == -1) {
        scheduleSrv.schedule(new Runnable() {
            @Override
            public void run() {
                int sum = 0;
                for (int rtt : rtts) {
                    sum += rtt;
                }
                System.out.println("average: "+ (sum / rtts.length));
                System.out.println(Snmp.snmp.toString());
                ukcp.close();
                //ukcp.setTimeoutMillis(System.currentTimeMillis());
                System.exit(0);
            }
        }, 3, TimeUnit.SECONDS);
    } else {
        int idx = curCount - 1;
        long time = byteBuf.readInt();
        if (rtts[idx] != -1) {
            System.out.println("???");
        }
        //log.info("rcv count {} {}", curCount, System.currentTimeMillis());
        rtts[idx] = (int) (System.currentTimeMillis() - startTime - time);
        System.out.println("rtt : "+ curCount+"  "+ rtts[idx]);
    }
}

@Override
public void handleException(Throwable ex, Ukcp kcp)
{
    ex.printStackTrace();
}

@Override
public void handleClose(Ukcp kcp) {
    scheduleSrv.shutdown();
    try {
        scheduleSrv.awaitTermination(3, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    int sum = 0;
    int max = 0;
    for (int rtt : rtts) {
        if(rtt>max){
            max = rtt;
        }
        sum += rtt;
    }
    System.out.println("average: "+ (sum / rtts.length)+" max:"+max);
    System.out.println(Snmp.snmp.toString());
    System.out.println("lost percent: "+(Snmp.snmp.RetransSegs.doubleValue()/Snmp.snmp.OutPkts.doubleValue()));


}


/**
 * count+timestamp+dataLen+data
 *
 * @param count
 * @return
 */
public ByteBuf rttMsg(int count) {
    ByteBuf buf = Unpooled.buffer(200000);
    buf.writeShort(count);
    buf.writeInt((int) (System.currentTimeMillis() - startTime));

    //int dataLen = new Random().nextInt(200);
    //buf.writeBytes(new byte[dataLen]);

    int dataLen = data.readableBytes();
    buf.writeShort(dataLen);
    buf.writeBytes(data, data.readerIndex(), dataLen);

    return buf;
}

}

@CodeJiang
Copy link

@testwen00 兄弟有在线上项目用过吗? 我看作者在源码里面有一些todo说可能会卡死。我有点不太敢用. qq问作者,作者还没回。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants