szhnet / kcp-netty Goto Github PK
View Code? Open in Web Editor NEWJava implementation of KCP based on Netty. 基于netty实现的java版kcp。
License: MIT License
Java implementation of KCP based on Netty. 基于netty实现的java版kcp。
License: MIT License
目前kcp-netty通过一个线程维护所有客户端的kcp_update方法调用,同时该线程还要负责数据读取、数据下发、数据在pipeline中传递。
担心如果连接的客户端量太大,一个线程将无法满足需求。
如该issue所说的,如果支持Epoll的方式,可以通过设置EpollChannelOption.SO_REUSEPORT
为true的方式多次绑定同一个端口,这样就可以通过多个线程来支持更多的连接。
和参数解释
private int conv;
private byte cmd;
private short frg;
private int wnd;
private long ts;
private long sn;
private long una;
private long resendts;
private int rto;
private int fastack;
private int xmit;
想测试一下,大数据包的发送, 发了一个 1024 字节的消息。但解码器收到的前三次消息打印如下
server rec len :488
server rec len :488
server rec len :48
所以,这里没有做消息组装?应该如何组装出发送时的原始数据?
这里需要自己手动做消息缓存,来处理半包的组装么?
问题1:
io.jpower.kcp.netty.KcpException: State=-1 after update()
at io.jpower.kcp.netty.UkcpClientChannel.run(UkcpClientChannel.java:298)
at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:125)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:745)
在使用demo测试过程中,发现跑一段时间后就会报这个问题,网络环境是本地电脑模拟服务器和客户端,网络应该不成问题,但是还是会出现deadLink超过20次的问题,请教下kcp的C代码中没有断开连接的做法,不知道您这边基于什么考虑要将客户端断开连接?
问题2:
我们系统集成中,发现会存在netty的内存泄露问题,提示的报错信息集中在:
io.netty.buffer.AdvancedLeakAwareByteBuf.readRetainedSlice(AdvancedLeakAwareByteBuf.java:106)
kcp.netty.Kcp.input(Kcp.java:999)
kcp.netty.Ukcp.input(Ukcp.java:51)
kcp.netty.UkcpServerChildChannel.kcpInput(UkcpServerChildChannel.java:192)
kcp.netty.UkcpServerChannel$UkcpServerUnsafe.read(UkcpServerChannel.java:566)
以上两个问题,请帮忙看看,是否我在使用中有什么注意点,不胜感谢!
1、kcp协议层可以透传ip地址和端口
在我现在使用的环境中,公网到kcp服务器要经过好几个链路,可能会经过LVS,nginx,F5等,经过这些节点后,客户端的ip和端口无法透传到服务器,可不可以考虑在kcp协议层做。
2、是否考虑开发一个关于kcp(反向)的代理
我们的使用环境中,服务器内部要做kcp代理,基于nginx,F5等代理,只能针对UDP代理,不是很完美,使用起来总有一些问题,所以是否考虑自己开发一个。
3、使用过程中经过UDP代理后,一直报Conv inconsistency异常
默认是自动设置conv,并且抓包看了UDP包,换算出来的conv都是一样的,不经过代理又没问题,找不到原因。
能不能写个文档,表示看不懂示例
你好,想咨询一个问题,我使用极速模式,客户端和服务器配置: ikcp_nodelay(kcp, 1, 10, 2, 1); ,MTU:512,RCV_WND和SND_WND为64,当我在客户端循环发送100个包,
for (int i = 0; i < 100; i++) {
ChannelFuture future = channel.writeAndFlush(request);
future.sync();
}
我的服务器只收到了前面的60多个包,后面的包全丢了,当我每发送一个包,睡眠5毫秒,则服务器能收到所有的100个包。这应该是发送窗口满了,消息丢了。请问下,future.sync() 无法阻塞,直到SND_WND 清空吗?
刚接触KCP,从它的首页找到kcp-netty这个项目,使用上有几个疑惑:
读了KCP的文档,个人理解conv是供服务器端识别客户端的一个标识符,毕竟UDP是非面向连接的,不知道我理解对不对?我看example里的代码EchoClientHandler和EchoServerHandler里都在channelActive函数中写死了conv的值:
kcpCh.conv(EchoClient.CONV); // set conv
而且Client和Server的conv值需要一样,否则传输数据时Server会报conv不一致的异常。这里我有个疑惑,假设有多个Client,conv值都不同,那么Server端如何在channelActive函数中知道每个Client的conv值?因为example中是hard code的
读代码时看到接收缓冲区是设定成512B
public static final int FIXED_RECV_BYTEBUF_ALLOCATOR_SIZE = 512;
测试结果是如果往channel中flush超过(512 - kcp header)字节的数据时,就会被切成多个fragment发送到对端,我看官方网页和腾讯的一篇文章,只要打开stream模式:
option(UkcpChannelOption.UKCP_STREAM, true)
对端接收到fragments后会拼装成一个完整的数据提供给应用层(不知道理解对不对)。不过我本地测试结果显示对端收到的还是一个个fragment。不过如果连续flush多个很小的数据(比如2字节),对端有时候能在一次channelRead中读到多个发送过来的数据,也就是发送方的多个小包在接收方变成一个大包了(关闭stream模式时不会有这种现象,接收方每次channelRead只能读到一次发送的数据)
请问这个行为是跟KCP的stream模式行为一致的吗?那么传输大块数据(比如64KB)时,最佳实践应该怎样?
现在C#有个DotNetty,API和Java一致,是否有可能也弄一个kcp-dotnetty呢?
当服务器端调用ctx.close()时,客户端不知道服务器close了,判断不了。
反之客户端调用ctx.close()的时候,服务器却能够调用channelInactive方法。
客户端代码
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
UkcpChannel kcpCh = (UkcpChannel) ctx.channel();
kcpCh.conv(EchoClient.CONV); // set conv
//ctx.writeAndFlush(firstMessage);
ByteBuf heapBuffer = Unpooled.buffer(18);
heapBuffer.writeBytes("测试我是客户端".getBytes());
ctx.writeAndFlush(heapBuffer);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
System.out.println("客户端调用 exceptionCaught!!!!!!!---------");
cause.printStackTrace();
ctx.close();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
System.out.println("客户端调用: channelInactive!!!!!!!");
super.channelInactive(ctx);
}
}
服务器代码
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
private final ByteBuf firstMessage;
/**
* Creates a client-side handler.
* @throws InterruptedException
*/
public EchoServerHandler () throws InterruptedException {
firstMessage = Unpooled.buffer(EchoClient.SIZE);
for (int i = 0; i < firstMessage.capacity(); i++) {
firstMessage.writeByte((byte) i);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
UkcpChannel kcpCh = (UkcpChannel) ctx.channel();
kcpCh.conv(EchoServer.CONV);
}
int i = 0;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
System.out.println("服务器调用exceptionCaught!!!!!!!111111111111111111111");
cause.printStackTrace();
ctx.close();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
System.out.println("服务器调用channelInactive!!!!!!!");
super.channelInactive(ctx);
}
}
实现了一个udp server,通过LoggingHandler想看下kcp协议头,客户端使用EchoClient,发现长度好像不对
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 0a 00 00 00 51 00 20 00 79 75 be 5a 00 00 00 00 |....Q. .yu.Z....|
|00000010| 00 00 00 00 00 01 00 00 00 01 02 03 04 05 06 07 |................|
|00000020| 08 09 0a 0b 0c 0d 0e 0f 10 11 12 13 14 15 16 17 |................|
|00000030| 18 19 1a 1b 1c 1d 1e 1f 20 21 22 23 24 25 26 27 |........ !"#$%&'|
|00000040| 28 29 2a 2b 2c 2d 2e 2f 30 31 32 33 34 35 36 37 |()*+,-./01234567|
|00000050| 38 39 3a 3b 3c 3d 3e 3f 40 41 42 43 44 45 46 47 |89:;<=>?@ABCDEFG|
|00000060| 48 49 4a 4b 4c 4d 4e 4f 50 51 52 53 54 55 56 57 |HIJKLMNOPQRSTUVW|
|00000070| 58 59 5a 5b 5c 5d 5e 5f 60 61 62 63 64 65 66 67 |XYZ[]^_`abcdefg|
|00000080| 68 69 6a 6b 6c 6d 6e 6f 70 71 72 73 74 75 76 77 |hijklmnopqrstuvw|
|00000090| 78 79 7a 7b 7c 7d 7e 7f 80 81 82 83 84 85 86 87 |xyz{|}~.........|
|000000a0| 88 89 8a 8b 8c 8d 8e 8f 90 91 92 93 94 95 96 97 |................|
|000000b0| 98 99 9a 9b 9c 9d 9e 9f a0 a1 a2 a3 a4 a5 a6 a7 |................|
|000000c0| a8 a9 aa ab ac ad ae af b0 b1 b2 b3 b4 b5 b6 b7 |................|
|000000d0| b8 b9 ba bb bc bd be bf c0 c1 c2 c3 c4 c5 c6 c7 |................|
|000000e0| c8 c9 ca cb cc cd ce cf d0 d1 d2 d3 d4 d5 d6 d7 |................|
|000000f0| d8 d9 da db dc dd de df e0 e1 e2 e3 e4 e5 e6 e7 |................|
|00000100| e8 e9 ea eb ec ed ee ef f0 f1 f2 f3 f4 f5 f6 f7 |................|
|00000110| f8 f9 fa fb fc fd fe ff |........ |
+--------+-------------------------------------------------+----------------+
我应该参考哪个示例,进行开发?
server disconnect client with excpetion.
pcapng file:
3.zip
Server: 172.17.0.8
Client: 172.17.0.7
用Netty的Http传输文件,如果调整了UKCP_SND_WND和UKCP_RCV_WND的值,就会导致有一定的几率,这个文件最后一点字节没发出去。用ctx.flush()或者channel.flush都没用。
用wireshark抓包发现的,最后一次发送貌似存在KCP server缓存里,没有发送出去。
Server端的代码:
UkcpServerBootstrap b = new UkcpServerBootstrap();
b.group(group)
.channel(UkcpServerChannel.class)
.childOption(UkcpChannelOption.UKCP_SND_WND, 1024)
.childOption(UkcpChannelOption.UKCP_RCV_WND, 1024)
.childHandler(new ChannelInitializer() {
@OverRide
public void initChannel(UkcpChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(100* 65536));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpStaticFileServerHandler());
}
});
ChannelOptionHelper.nodelay(b, true, 20, 2, true)
Server Handler出问题的代码,直接拷贝Netty的 Example,没动过:
https://github.com/netty/netty/blob/4.1/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
ChannelFuture sendFileFuture;
sendFileFuture =
ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)),
ctx.newProgressivePromise());
这个异常并不是一直出现,是在用于程序小规模压力测试的时候,偶尔会出现。但不清楚是什么原因造成的,请大佬指点一下。
2021-09-16 12:03:09.093 [ROBOT_FIGHT_IO-2-14][ERROR] com.janlr.ag.td.base.net.FightClientHandler.exceptionCaught:70: IO异常
java.lang.NullPointerException: null
at io.jpower.kcp.netty.Kcp.peekSize(Kcp.java:471) ~[kcp-netty-1.4.11.jar:?]
at io.jpower.kcp.netty.Ukcp.peekSize(Ukcp.java:106) ~[kcp-netty-1.4.11.jar:?]
at io.jpower.kcp.netty.UkcpClientChannel.kcpPeekSize(UkcpClientChannel.java:255) ~[kcp-netty-1.4.11.jar:?]
at io.jpower.kcp.netty.UkcpClientUdpChannel$UkcpClientUdpUnsafe.read(UkcpClientUdpChannel.java:306) [kcp-netty-1.4.11.jar:?]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) [netty-transport-4.1.67.Final.jar:4.1.67.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) [netty-transport-4.1.67.Final.jar:4.1.67.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) [netty-transport-4.1.67.Final.jar:4.1.67.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [netty-transport-4.1.67.Final.jar:4.1.67.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [netty-common-4.1.67.Final.jar:4.1.67.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.67.Final.jar:4.1.67.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.67.Final.jar:4.1.67.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_152]
↓↓↓↓
使用版本:implementation group: 'io.jpower.kcp', name: 'kcp-netty', version: '1.4.11'
源码类: Kcp.java
public int peekSize() {
if (rcvQueue.isEmpty()) {
return -1;
}
Segment seg = rcvQueue.peek();
if (seg.frg == 0) {
return seg.data.readableBytes();
}
if (rcvQueue.size() < seg.frg + 1) { // Some segments have not arrived yet
return -1;
}
int len = 0;
for (Iterator<Segment> itr = rcvQueueItr.rewind(); itr.hasNext(); ) {
Segment s = itr.next();
len += s.data.readableBytes();
if (s.frg == 0) {
break;
}
}
return len;
}
测试大量连接发包时,会出现很多这个异常导致连接断开。
EventLoopGroup为什么不分BossGroup和WorkGroup,多连接并发性能应该扛不住吧?这个为啥不分开啊?
sun.misc.Hashing.stringHash32((String) k)
sun.misc.Hashing.randomHashSeed(this)
these two can not be find in jdk 1.8
String altThreshold = java.security.AccessController.doPrivileged(
new sun.security.action.GetPropertyAction(
"jdk.map.althashing.threshold"));
jdk10 sun.security.action.GetPropertyAction这个类没了,能不能修复下,谢谢 !
框架内部通过remote address管理channel, 当客户端ip或port发生变化后, 服务端是无法感知的, 只能通过客户端探活重连, 这种情况会导致客户端在这种情况下延迟大大增加. 当客户端4G切换基站或者更换wifi ap时就会发生这种情, 而且比较频繁. 除了修改框架以conv管理channel外, 还有其他方案么?
作者你好!
我按example中的示例跑kcp echo server 和client, server能正常收到client发送的消息,在channelActive 通道激活的时候就write,但server端channelRead收到后,原样返回ctx.write client端却收不到,代码如下:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("通道已激活:"+ctx.toString());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String message="";
if (msg instanceof ByteBuf){
ByteBuf buf= (ByteBuf) msg;
byte [] byteArray=new byte[buf.capacity()];
buf.readBytes(byteArray);
message=new String(byteArray);
System.out.println("收到Client消息:"+message);
}
ctx.writeAndFlush(msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
UkcpChannel kcpCh = (UkcpChannel) ctx.channel();
kcpCh.conv(EchoClient.CONV); // set conv
//建立链接的时候 向服务器发送一个hello
ByteBuf firstMessage=Unpooled.copiedBuffer("hello world".getBytes());
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof ByteBuf){
ByteBuf buf= (ByteBuf) msg;
byte[] byteArrays=new byte[buf.capacity()];
buf.readBytes(byteArrays);
System.out.println("收到Server消息:"+new String(byteArrays));
}
ctx.writeAndFlush(msg);
}
client端根本没进channelRead方法。
java.io.IOException: No enough bytes of data
at io.jpower.kcp.netty.Ukcp.input(Ukcp.java:56)
at io.jpower.kcp.netty.UkcpServerChildChannel.kcpInput(UkcpServerChildChannel.java:192)
at io.jpower.kcp.netty.UkcpServerChannel$UkcpServerUnsafe.read(UkcpServerChannel.java:559)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:745)
io.jpower.kcp.netty.KcpException: State=-1 after update()
at io.jpower.kcp.netty.UkcpServerChannel.run(UkcpServerChannel.java:383)
at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:125)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:745)
example里面的KcpRttClient
static final int SIZE = Integer.parseInt(System.getProperty("size", "500"));这里由200改成500就会出现
请问这两个报错是什么问题?
发送一个超大的数据包,直接抛java.io.IOException: Too many fragments
/**
* Sends a Bytebuf.
*
* @param buf
* @throws IOException
*/
public void send(ByteBuf buf) throws IOException {
int ret = kcp.send(buf);
switch (ret) {
case -2:
throw new IOException("Too many fragments");
default:
break;
}
}
if (count >= IKCP_WND_RCV) { // Maybe don't need the conditon in stream mode
return -2;
}
出错是这个地方,这里直接可以把if条件去掉?
一个必现的问题,KCP连通之后十多秒会报这个异常,然后断开连接。详细日志如下,请问该怎么处理? 另外1.4.10版本相对于1.4.9版本修改了什么?
2021-04-28 18:25:46.532 12041-12676/com.vvv.testkcp W/System.err: io.jpower.kcp.netty.KcpException: State=-1 after update()
2021-04-28 18:25:46.533 12041-12676/com.vvv.testkcp W/System.err: at io.jpower.kcp.netty.UkcpClientChannel.run(UkcpClientChannel.java:307)
2021-04-28 18:25:46.533 12041-12676/com.vvv.testkcp W/System.err: at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
2021-04-28 18:25:46.533 12041-12676/com.vvv.testkcp W/System.err: at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
2021-04-28 18:25:46.533 12041-12676/com.vvv.testkcp W/System.err: at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
2021-04-28 18:25:46.533 12041-12676/com.vvv.testkcp W/System.err: at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
2021-04-28 18:25:46.534 12041-12676/com.vvv.testkcp W/System.err: at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
2021-04-28 18:25:46.534 12041-12676/com.vvv.testkcp W/System.err: at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
2021-04-28 18:25:46.534 12041-12676/com.vvv.testkcp W/System.err: at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
2021-04-28 18:25:46.534 12041-12676/com.vvv.testkcp W/System.err: at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
2021-04-28 18:25:46.534 12041-12676/com.vvv.testkcp W/System.err: at java.lang.Thread.run(Thread.java:923)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.