clojure-link / link Goto Github PK
View Code? Open in Web Editor NEWA clojure framework for nonblocking network programming
A clojure framework for nonblocking network programming
I'm trying to port link to nodejs backend, and it requires changes in:
Support http/2 protocol in link
Getting this leak warning while testing slacker.
2017-12-12 22:55:28,715 ERROR io.netty.util.ResourceLeakDetector: LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
WARNING: 36 leak records were discarded because the leak record count is limited to 4. Use system property io.netty.leakDetection.maxRecords to increase the limit.
111 leak records were not sampled because the leak record sample count is limited to 40. Use system property io.netty.leakDetection.maxSampledRecords to increase the limit.
Recent access records: 4
#4:
io.netty.buffer.AdvancedLeakAwareByteBuf.readByte(AdvancedLeakAwareByteBuf.java:396)
link.codec$byte$fn__3953.invoke(codec.clj:38)
clojure.core$partial$fn__4759.invoke(core.clj:2515)
link.codec$enum$fn__4031.invoke(codec.clj:141)
clojure.core$partial$fn__4759.invoke(core.clj:2515)
link.codec$frame$fn__4049.invoke(codec.clj:171)
clojure.core$partial$fn__4759.invoke(core.clj:2515)
link.codec$header$fn__4038.invoke(codec.clj:157)
clojure.core$partial$fn__4759.invoke(core.clj:2515)
link.codec$frame$fn__4049.invoke(codec.clj:171)
clojure.core$partial$fn__4759.invoke(core.clj:2515)
link.codec$header$fn__4038.invoke(codec.clj:157)
clojure.core$partial$fn__4759.invoke(core.clj:2515)
link.codec$decode_STAR_.invokeStatic(codec.clj:203)
link.codec$decode_STAR_.invoke(codec.clj:202)
link.codec$netty_decoder$fn__4080$fn__4081.invoke(codec.clj:220)
link.codec.proxy$io.netty.handler.codec.ByteToMessageDecoder$ff19274a.decode(Unknown Source)
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
link.codec.proxy$io.netty.handler.codec.ByteToMessageDecoder$ff19274a.callDecode(Unknown Source)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
link.codec.proxy$io.netty.handler.codec.ByteToMessageDecoder$ff19274a.channelRead(Unknown Source)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:134)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
java.lang.Thread.run(Thread.java:748)
#3:
io.netty.buffer.AdvancedLeakAwareByteBuf.readByte(AdvancedLeakAwareByteBuf.java:396)
link.codec$byte$fn__3953.invoke(codec.clj:38)
clojure.core$partial$fn__4759.invoke(core.clj:2515)
link.codec$enum$fn__4031.invoke(codec.clj:141)
clojure.core$partial$fn__4759.invoke(core.clj:2515)
link.codec$header$fn__4038.invoke(codec.clj:155)
clojure.core$partial$fn__4759.invoke(core.clj:2515)
link.codec$frame$fn__4049.invoke(codec.clj:171)
clojure.core$partial$fn__4759.invoke(core.clj:2515)
link.codec$header$fn__4038.invoke(codec.clj:157)
clojure.core$partial$fn__4759.invoke(core.clj:2515)
link.codec$decode_STAR_.invokeStatic(codec.clj:203)
link.codec$decode_STAR_.invoke(codec.clj:202)
link.codec$netty_decoder$fn__4080$fn__4081.invoke(codec.clj:220)
link.codec.proxy$io.netty.handler.codec.ByteToMessageDecoder$ff19274a.decode(Unknown Source)
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
link.codec.proxy$io.netty.handler.codec.ByteToMessageDecoder$ff19274a.callDecode(Unknown Source)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
link.codec.proxy$io.netty.handler.codec.ByteToMessageDecoder$ff19274a.channelRead(Unknown Source)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:134)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
java.lang.Thread.run(Thread.java:748)
#2:
io.netty.buffer.AdvancedLeakAwareByteBuf.readInt(AdvancedLeakAwareByteBuf.java:432)
link.codec$int32$fn__3978.invoke(codec.clj:43)
clojure.core$partial$fn__4759.invoke(core.clj:2515)
link.codec$frame$fn__4049.invoke(codec.clj:171)
clojure.core$partial$fn__4759.invoke(core.clj:2515)
link.codec$header$fn__4038.invoke(codec.clj:157)
clojure.core$partial$fn__4759.invoke(core.clj:2515)
link.codec$decode_STAR_.invokeStatic(codec.clj:203)
link.codec$decode_STAR_.invoke(codec.clj:202)
link.codec$netty_decoder$fn__4080$fn__4081.invoke(codec.clj:220)
link.codec.proxy$io.netty.handler.codec.ByteToMessageDecoder$ff19274a.decode(Unknown Source)
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
link.codec.proxy$io.netty.handler.codec.ByteToMessageDecoder$ff19274a.callDecode(Unknown Source)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
link.codec.proxy$io.netty.handler.codec.ByteToMessageDecoder$ff19274a.channelRead(Unknown Source)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:134)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
java.lang.Thread.run(Thread.java:748)
#1:
io.netty.buffer.AdvancedLeakAwareByteBuf.readByte(AdvancedLeakAwareByteBuf.java:396)
link.codec$byte$fn__3953.invoke(codec.clj:38)
clojure.core$partial$fn__4759.invoke(core.clj:2515)
link.codec$enum$fn__4031.invoke(codec.clj:141)
clojure.core$partial$fn__4759.invoke(core.clj:2515)
link.codec$header$fn__4038.invoke(codec.clj:155)
clojure.core$partial$fn__4759.invoke(core.clj:2515)
link.codec$decode_STAR_.invokeStatic(codec.clj:203)
link.codec$decode_STAR_.invoke(codec.clj:202)
link.codec$netty_decoder$fn__4080$fn__4081.invoke(codec.clj:220)
link.codec.proxy$io.netty.handler.codec.ByteToMessageDecoder$ff19274a.decode(Unknown Source)
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
link.codec.proxy$io.netty.handler.codec.ByteToMessageDecoder$ff19274a.callDecode(Unknown Source)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
link.codec.proxy$io.netty.handler.codec.ByteToMessageDecoder$ff19274a.channelRead(Unknown Source)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:134)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
java.lang.Thread.run(Thread.java:748)
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:331)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:181)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:172)
io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:109)
io.netty.handler.codec.ByteToMessageDecoder.expandCumulation(ByteToMessageDecoder.java:516)
io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:88)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:263)
link.codec.proxy$io.netty.handler.codec.ByteToMessageDecoder$ff19274a.channelRead(Unknown Source)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:134)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
java.lang.Thread.run(Thread.java:748)
Add async handler support for link.http
在我的使用tcp-server的场景中,我的客户端会自动快速重连服务器。然后我需要重启tcp-server时,它是先关闭worker之后再关闭boss,那么因为我的客户端快速重连就发生了worker永远关不掉。。。
建议更改tcp-server的关闭顺序。
Some parts of the docs are outdated. For instance the decoder/encoder examples don't work as encode*
now accepts 3 arguments. send
in the example should be send!
. There is also no such thing as link.core/channel-option
anymore.
The readme also doesn't contain a complete working example. I cannot figure out how to make it work. For instance the following doesn't work as the send!
in the last line seem to have no effect:
(def custom-codec
(codec/frame
(codec/byte)
(codec/string :encoding :utf8 :prefix (codec/uint16))))
(def echo-handler
(link/create-handler
(link/on-message [ch msg]
(link/send! ch msg))))
(def print-handler
(link/create-handler
(link/on-message [ch msg]
(println "msg:" msg))))
(def srv (tcp/tcp-server 8081 [(codec/netty-encoder custom-codec)
(codec/netty-decoder custom-codec)
{:handler echo-handler}]
:options {:so-reuseaddr true}))
(def client-factory
(tcp/tcp-client-factory [(codec/netty-encoder custom-codec)
(codec/netty-decoder custom-codec)
{:handler print-handler}]))
(def cl (tcp/tcp-client client-factory "localhost" 8081))
(link/send! cl [1 "blabla"]) ;; <- nothing happens after this
We should add ability to pass on the event as currently the user defined handle is only allowed for last handler in pipeline.
Shouldn't this be if-let
instead of when-let
? Looks like .fireChannelActive
and .fireChannelInactive
are fired twice if the handler returns false
.
In websocket section, the code provided doesn't work as expected, I correct two bugs to success running a websocket echo server
(def ws-echo-handler
(create-websocket-handler ;; 1. maybe a typo in origin demo
(on-open [ch])
(on-close [ch])
(on-text [ch string]
;; you can use (text), (binary), (ping), (pong) to generate
;; different types of response
(prn (str "get:" string))
;; 2. Unforunately, we can't use (text) on string directly
(send! ch (text (Unpooled/wrappedBuffer (.getBytes string)))))
(on-binary [ch ^ByteBuf bytes])
(on-ping [ch ^ByteBuf bytes])
(on-pong [ch ^ByteBuf bytes])))
Allowing user to provide a to-retry?
function as reconnect policy.
the to-retry function takes several information of current retry loop state like time elapsed, retry count. And user could also call Thread.sleep
inside this function to sleep.
Adding support of SSL connection.
Currently ServerBootstrap is not exposed. I think that similarly to the client channel, users should be able to access underlying server. Would it be possible to return it as part of that vector or better, return a map like {:boos-group .., :worker-group ..., :bootstrap ...}
?
对于tcp和udp开发中需要注意的地方的总结。
tcp和udp两兄弟都是在IP之上的协议。几乎是服务器唯二支持的通信方式。其中tcp占得最多。
tcp是流式协议。因为它有拆包和组装的功能。具体表现就是tcp中的半包和粘包问题。所以使用tcp协议传输包式数据的话,一定要在数据中加入能分辨包结尾的机制,否则一旦出现粘包半包,就无法处理了。
udp则是报文式协议。如果数据大小超过ip包的负载,tcp会拆包,而udp会直接丢弃后面一部分。显然,udp就不会出现什么半包粘包的问题了。
tcp具有窗口滑动机制,如果你accept了一个tcp的socket一直不read的话,peer在发送一定的数据之后再write就会阻塞。
udp则没有这种机制,如果你的处理速度低于peer的发送速度,那么IP层在缓存满了之后会直接丢弃新的报文。所以说,udp一定要设置缓存区。
tcp协议会维持channel,而udp不会,udp是无连接的,这一点在很多地方都有体现:
1.tcp如果peer关闭了socket,另一端会收到通知。udp的话,你发送数据和对方收到数据没有任何关系,哪怕你往一台关闭的机器发送udp数据,大多数情况下没有人会阻止你这么做。一切都是ok。
2.在代码上就是tcp有accept,会得到一个新的socket用来通信。而udp则listen之后直接就在那个socket上read和write了。
所以对于udp来说,我们需要自行解决socket和业务实体的映射保持。在udp收到的包中,有ip和port,我们可以通过(ip port)反向发送数据回去。
但是!不可以使用它来做长期的映射。因为在ipv4地址几乎枯竭的现在,ip复用技术被大肆使用,你不知道下一秒,这个ip还是不是它。
所以我们需要设置一个超时时间,这个时间是凭经验去设置的。超过这个时间之后,就将这个实体的addr视作未知,这段时间要对它发数据的话,可以直接报错,
也可以缓存起来,在下次它再发送数据上来的时候,激活新的ip和port,在激活时间内把数据发下去。(如果是内网直连的话,就不需要这种*操作了)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.