Giter Site home page Giter Site logo

cloudwego / netpoll Goto Github PK

View Code? Open in Web Editor NEW
3.9K 72.0 455.0 561 KB

A high-performance non-blocking I/O networking framework focusing on RPC scenarios.

License: Apache License 2.0

Shell 0.06% Go 99.94%
rpc golang network-programming net high-performance

netpoll's Introduction

CloudWeGo-Netpoll

中文

Release WebSite License Go Report Card OpenIssue ClosedIssue Stars Forks

Introduction

Netpoll is a high-performance non-blocking I/O networking framework, which focused on RPC scenarios, developed by ByteDance.

RPC is usually heavy on processing logic and therefore cannot handle I/O serially. But Go's standard library net is designed for blocking I/O APIs, so that the RPC framework can only follow the One Conn One Goroutine design. It will waste a lot of cost for context switching, due to a large number of goroutines under high concurrency. Besides, net.Conn has no API to check Alive, so it is difficult to make an efficient connection pool for RPC framework, because there may be a large number of failed connections in the pool.

On the other hand, the open source community currently lacks Go network libraries that focus on RPC scenarios. Similar repositories such as: evio, gnet, etc., are all focus on scenarios like Redis, HAProxy.

But now, Netpoll was born and solved the above problems. It draws inspiration from the design of evio and netty, has excellent Performance, and is more suitable for microservice architecture. Also Netpoll provides a number of Features, and it is recommended to replace net in some RPC scenarios.

We developed the RPC framework Kitex and HTTP framework Hertz based on Netpoll, both with industry-leading performance.

Examples show how to build RPC client and server using Netpoll.

For more information, please refer to Document.

Features

  • Already

    • LinkBuffer provides nocopy API for streaming reading and writing
    • gopool provides high-performance goroutine pool
    • mcache provides efficient memory reuse
    • IsActive supports checking whether the connection is alive
    • Dialer supports building clients
    • EventLoop supports building a server
    • TCP, Unix Domain Socket
    • Linux, macOS (operating system)
  • Future

  • Unsupported

    • Windows (operating system)

Performance

Benchmark should meet the requirements of industrial use. In the RPC scenario, concurrency and timeout are necessary support items.

We provide the netpoll-benchmark project to track and compare the performance of Netpoll and other frameworks under different conditions for reference.

More benchmarks reference kitex-benchmark and hertz-benchmark.

Reference

netpoll's People

Contributors

abioy avatar abner-chenc avatar alice-yyds avatar alpha-baby avatar ashleet avatar binaryoracle avatar cuishuang avatar davidzkeng avatar duslia avatar guangmingluo avatar hchenn avatar ixzk avatar jayantxie avatar jbossbc avatar jianxinhou avatar joway avatar l2nce avatar liu-song avatar ning2510 avatar ppzqh avatar purewhitewu avatar rural-panda avatar shalk avatar sinnera avatar wuqinqiang avatar yangruiemma avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

netpoll's Issues

运行普通的server端demo有问题

image
如上图:
d当我监听12345端口后,想请求下该端口查看是netpoll处理io事件流程,却发现监听端口始终无法启动,也没有任何报错。
之前曾经短暂启动成功过
下面是具体的code代码:

import (
"context"
"fmt"
"github.com/cloudwego/netpoll"
"time"
)

func main() {

l, err := netpoll.CreateListener("tcp", ":12345")
if err != nil {
	panic("监听错误")
}
eventLoop, _ := netpoll.NewEventLoop(
	//这个就是接收到io事件后处理的handler
	func(ctx context.Context, connection netpoll.Connection) error {
		fmt.Printf("收到请求了")
		return nil
	},
	netpoll.WithReadTimeout(time.Second))
fmt.Printf("准备开始监听请求")
eventLoop.Serve(l)

}

下面是我电脑的配置:

image

Got a Panic

I removed sensitive information, for example, the name of functions.

Based on the log atum.go 187 is

func (l *Server) handler(ctx context.Context, conn netpoll.Connection) error {
  defer conn.Writer().Flush()
  defer conn.Reader().Release()
  data, err := conn.Reader().ReadBinary(conn.Reader().Len())
  conn.Reader().Release()
  if err != nil {
	  zap.L().Error("Unable to read data", zap.ByteString("Data", data), zap.Error(err))
	  return conn.Close()
  }
  ..business logic handle data
  ...business logic loop
  return nil  <------ Line 187
}
go version
go version go1.17.3 linux/amd64
Nov  8 22:46:49 [2860435]: 2021/11/08 22:46:49.947222 logger.go:190: [Error] GOPOOL: panic in pool: gopool.DefaultPool: runtime error: invalid memory address or nil pointer dereference: goroutine 1478890 [running]:
Nov  8 22:46:49 [2860435]: runtime/debug.Stack()
Nov  8 22:46:49 [2860435]: #011/snap/go/8627/src/runtime/debug/stack.go:24 +0x65
Nov  8 22:46:49 [2860435]: github.com/bytedance/gopkg/util/gopool.(*worker).run.func1.1.1()
Nov  8 22:46:49 [2860435]: #011/root/go/pkg/mod/github.com/bytedance/[email protected]/util/gopool/worker.go:61 +0x5d
Nov  8 22:46:49 [2860435]: panic({0xa34520, 0x10bc020})
Nov  8 22:46:49 [2860435]: #011/snap/go/8627/src/runtime/panic.go:1038 +0x215
Nov  8 22:46:49 2860435]: github.com/cloudwego/netpoll.(*linkBufferNode).Len(...)
Nov  8 22:46:49 [2860435]: #011/root/go/pkg/mod/github.com/cloudwego/[email protected]/nocopy_linkbuffer.go:631
Nov  8 22:46:49 [2860435]: github.com/cloudwego/netpoll.(*LinkBuffer).Release(0xdf)
Nov  8 22:46:49 [2860435]: #011/root/go/pkg/mod/github.com/cloudwego/[email protected]/nocopy_linkbuffer.go:171 +0x26
Nov  8 22:46:49 [2860435]: github.com/cloudwego/netpoll.(*connection).Release(0xc01a89c8c0)
Nov  8 22:46:49 [2860435]: #011/root/go/pkg/mod/github.com/cloudwego/[email protected]/connection_impl.go:109 +0x25
Nov  8 22:46:49 [2860435]: myfunction().handler(0xc0009806c0, {0x3, 0xc06577a820}, {0xd06648, 0xc00ef55500})

Nov  8 22:46:49 [2860435]: #011/root/goprogram/atum.go:187 +0x695
Nov  8 22:46:49 [2860435]: github.com/cloudwego/netpoll.(*connection).onRequest.func1()
Nov  8 22:46:49 [2860435]: #011/root/go/pkg/mod/github.com/cloudwego/[email protected]/connection_onevent.go:112 +0xe5
Nov  8 22:46:49 [2860435]: github.com/bytedance/gopkg/util/gopool.(*worker).run.func1.1(0xc01cf5ffd0, 0x98adae)
Nov  8 22:46:49 [2860435]: #011/root/go/pkg/mod/github.com/bytedance/[email protected]/util/gopool/worker.go:68 +0x66
Nov  8 22:46:49 [2860435]: github.com/bytedance/gopkg/util/gopool.(*worker).run.func1()
Nov  8 22:46:49 [2860435]: #011/root/go/pkg/mod/github.com/bytedance/[email protected]/util/gopool/worker.go:69 +0xe5
Nov  8 22:46:49 [2860435]: created by github.com/bytedance/gopkg/util/gopool.(*worker).run
Nov  8 22:46:49 [2860435]: #011/root/go/pkg/mod/github.com/bytedance/[email protected]/util/gopool/worker.go:41 +0x5b

TLS

Is TLS available with netpoll?

How to read up to \n

Is there anyway to read a packet up to \n similar to conn.Reader().Next()?

For example, using net package, I use

bufio.NewReaderSize(conn, 1024).ReadLine()

for

{"id":2,"method":foobar","params":[]}\n {"id":3,"method":"foobar","params":["foobar.foo",""]}\n

connection_reactor.go方法中onRequest方法是否可以仿照netty进行优化

如图,会有循环持续判断当前连接是否处理完毕,如果该链接接收一个很大的数据比如2个G,那么此次可能就会存在一直要读取完毕2G数据,才会处理下一个IO事件。进而造成其他请求被阻塞了
而在netty则会循环处理到一定次数(默认是16次)后,若发现还有数据未处理,则暂时设置一个flag,接着处理下一个io事件。未处理完毕的数据等待下一个轮询周期继续处理

image

seem it can't work on Mac OSX

I try to write a simple test on Mac OSX below, but it can't work. I use telnet 127.0.0.1 8080, type hello world, but can't get the response.

package main

import (
	"context"
	"flag"
	"fmt"
	"time"

	"github.com/cloudwego/netpoll"
)

var (
	addr = flag.String("addr", "127.0.0.1:8080", "server address")
)

func onRequest(_ context.Context, conn netpoll.Connection) error {
	println("hello world")

	var reader, writer = conn.Reader(), conn.Writer()

	defer reader.Release()
	// client send "Hello World", size is 11
	buf, err := reader.Next(11)
	if err != nil {
		fmt.Printf("error %v\n", err)
		return err
	}

	alloc, _ := writer.Malloc(len(buf))
	copy(alloc, buf)
	err = writer.Flush()
	if err != nil {
		fmt.Printf("flush error %v\n", err)
		return err
	}

	return nil
}

func onPrepare(conn netpoll.Connection) context.Context {
	println("hello prepare")
	return context.TODO()
}

func main() {
	flag.Parse()
	l, err := netpoll.CreateListener("tcp", *addr)
	if err != nil {
		panic("create netpoll listener failed")
	}
	defer l.Close()

	println("hello event loop")

	loop, err1 := netpoll.NewEventLoop(
		onRequest,
		netpoll.WithReadTimeout(time.Second),
		netpoll.WithOnPrepare(onPrepare),
	)
	if err1 != nil {
		panic("create event loop failed")
	}

	println("begin to serve")
	loop.Serve(l)
}

echo 测试

想跑下echo测试,你们的完整例子代码已经找不到,所以参考文档自己写了个 echo server,麻烦看下这样写有没有问题

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/cloudwego/netpoll"
)

func main() {
	network, address := "tcp", "127.0.0.1:8888"

	// 创建 listener
	listener, err := netpoll.CreateListener(network, address)
	if err != nil {
		panic("create netpoll listener fail")
	}

	// handle: 连接读数据和处理逻辑
	var onRequest netpoll.OnRequest = handler

	// options: EventLoop 初始化自定义配置项
	var opts = []netpoll.Option{
		netpoll.WithReadTimeout(1 * time.Second),
		netpoll.WithIdleTimeout(10 * time.Minute),
		netpoll.WithOnPrepare(nil),
	}

	// 创建 EventLoop
	eventLoop, err := netpoll.NewEventLoop(onRequest, opts...)
	if err != nil {
		panic("create netpoll event-loop fail")
	}

	// 运行 Server
	err = eventLoop.Serve(listener)
	if err != nil {
		panic("netpoll server exit")
	}
}

// 读事件处理
func handler(ctx context.Context, connection netpoll.Connection) error {
	reader := connection.Reader()
	buf, err := reader.Peek(reader.Len())
	if err != nil {
		return err
	}

	// n, err := connection.Write(append([]byte{}, buf...))
	// or
	n, err := connection.Write(buf)

	if err != nil {
		return err
	}
	if n != len(buf) {
		return fmt.Errorf("write failed: %v < %v", n, len(buf))
	}

	return nil
	// return connection.Writer().Flush()
}

关于benchmark代码、更多参数和测试指标

目前benchmark代码的链接已经失效,希望能早点提供测试用例完整代码

目前文档中benchmark的图,只包括了 100连接(没提到 payload)以及 payload 1k(没提到连接数)的指标,测试内容太少、能够说明的情况也局限

希望能提供不同并发情况下,比如1k、10k、100k、1000k场景下不同payload时与其他网络库、标准库的指标对比

另外由于本人参考文档写的echo server压测时发现netpoll内存占用非常高,而内存占用也是海量并发场景时的重要指标。所以希望也能提供包括内存占用以及其他指标的数据对比

性能、内存占用问题

用于压测的echo server代码在这里: server

压测客户端100个连接、payload 64,代码在这:client

跟其他几个库相比,都是append一个新的buffer用于写,其他框架也可以直接写读到的buffer、但也都用了append的新buffer回写,所以这应该不影响各个框架性能对比

不知道现在这个echo server代码正确了没,用这个代码测,100连接、1k payload,吞吐量只有 gnet 的一半左右,甚至比 easygo还慢一些。。

如果方便,请提供下你们的benchmark代码我重新跑下看看

socket5协议支持

目前好像只是实现了tcp,还没有完成socket5协议的dailer支持,这个有计划实现嘛。

Need Optimization in CPU Intensive Scenarios

Is your feature request related to a problem? Please describe.

In CPU intensive scenarios, Netpoll performs a little worse than go native net:

CPU usage of RPC servers:

cpu-usage-of-rpc-server

NetPoll:

NetPoll

Go Native Net:

Go Net

netpoll-benchmark sources used to benchmark

The commit to cpu-intensive-fy the benchmark

Describe the solution you'd like

Maybe we need some optimizations for NetPoll when it is running in CPU-intensive scenarios?

Describe alternatives you've considered

No idea.

QPS chart is misleading

https://github.com/cloudwego/netpoll#performance

"Concurrent Performance (Echo 1KB): QPS" is misleading. It starts at Y=210,000. This library is only 30,000/250,000 is 10% faster.

Please use same Y offset 0 as rest of the graphs.

Do admit that you say it is 110% of net package QPS. Thanks for being accurate there!

Thanks for open sourcing this too!

关于Reader 接口的Slice方法的一些问题

type Reader interface {
	// ...
	// Slice returns a new Reader containing the next n bytes from this reader,
	// the operation is zero-copy, similar to b = p [:n].
	Slice(n int) (r Reader, err error)
	// ...
}

Reader 描述的行为只提到说similar to b = p [:n]
但是LinkBufferSlice会同时ReleaseLinkBuffer,这个似乎和b = p[:n]语义上就有点差别了吧同时还有p = p[n:]

// Slice returns a new LinkBuffer, which is a zero-copy slice of this LinkBuffer,
// and only holds the ability of Reader.
//
// Slice will automatically execute a Release.
func (b *LinkBuffer) Slice(n int) (r Reader, err error) {
	//...
	return p, b.Release()
}

这个行为是对所有Reader都有这样的预期吗,如果是这样是不是在Reader接口里提一下这个行为比较好

InputAck 接收比消费快问题

数据接收:
image
数据消费:
image
当接收过快, 消费过慢时, 是否会出现阻塞在triggerRead()调用中?出现阻塞时, 会影响到poll内其他的connection运作。

LinkBuffer的readBinary方法

我看注释里说是不能使用mcache,理由是无法进行回收。这个不太理解,请问这个方法和其他读取数据的方法(例如:Next)在空间回收上有区别吗?感谢
nocopy_linkbuffer.go#206

waitRead 为什么 waitReadSize 设定成 N?

// waitRead will wait full n bytes.
func (c *connection) waitRead(n int) (err error) {
	if n <= c.inputBuffer.Len() {
		return nil
	}
	atomic.StoreInt32(&c.waitReadSize, int32(n)) // 这里为什么不设置成 int32(n-c.inputBuffer.Len())
	defer atomic.StoreInt32(&c.waitReadSize, 0)
	if c.readTimeout > 0 {
		return c.waitReadWithTimeout(n)
	}
	// wait full n
	for c.inputBuffer.Len() < n {
		if c.IsActive() {
			<-c.readTrigger
			continue
		}
		// confirm that fd is still valid.
		if atomic.LoadUint32(&c.netFD.closed) == 0 {
			return c.fill(n)
		}
		return Exception(ErrConnClosed, "wait read")
	}
	return nil
}

// inputAck implements FDOperator.
func (c *connection) inputAck(n int) (err error) {
	if n < 0 {
		n = 0
	}
	// Auto size bookSize.
	if n == c.bookSize && c.bookSize < mallocMax {
		c.bookSize <<= 1
	}

	length, _ := c.inputBuffer.bookAck(n)
	if c.maxSize < length {
		c.maxSize = length
	}
	if c.maxSize > mallocMax {
		c.maxSize = mallocMax
	}

	var needTrigger = true
	if length == n { // first start onRequest
		needTrigger = c.onRequest()
	}
	if needTrigger && length >= int(atomic.LoadInt32(&c.waitReadSize)) {
		c.triggerRead()
	}
	return nil
}

atomic.StoreInt32(&c.waitReadSize, int32(n)) // 这里为什么不设置成 int32(n-c.inputBuffer.Len())
如果need的是一个大包或者实际只有几B读延迟,不就阻塞了吗

nocopy linkbuffer 能支持并发write()吗?

在发包时,write()分成了Malloc和Flush两个步骤。 如果有两个协程并发写,好像会有问题。 无法保证在flush时,两个协程都完成了写数据的操作

No Copy Buffer 的 No Copy 指的是什么呢?

// readBinary cannot use mcache, because the memory allocated by readBinary will not be recycled.
func (b *LinkBuffer) readBinary(n int) (p []byte) {
	b.recalLen(-n) // re-cal length

	// single node
	p = make([]byte, n)
	if b.isSingleNode(n) {
		copy(p, b.read.Next(n))
		return p
	}
	// multiple nodes
	var pIdx int
	var l int
	for ack := n; ack > 0; ack = ack - l {
		l = b.read.Len()
		if l >= ack {
			pIdx += copy(p[pIdx:], b.read.Next(ack))
			break
		} else if l > 0 {
			pIdx += copy(p[pIdx:], b.read.Next(l))
		}
		b.read = b.read.next
	}
	_ = pIdx
	return p
}

Reader 的 接口貌似返回的都是对数组的拷贝, No Copy 是指扩容缩容 No Copy 吗

循环依赖问题

connection.go和netpoll.go在ide中无法导入包,个人感觉是这2个文件件存在循环依赖引用问题,想确认是否是这个原因

Reactor 调度问题

MainReactor 和SubReactor 都是作为goroutine ,在调度上会不存在优先级,请问有尝试过runtime.LockOSThread()把goroutine 固定在线程上, 在pollWait() 当中调用吗。类似下图这种机制
204efc18c00e8f1da313fc8866b9c92

SetIdleTimeout调用SetKeepAlive的原因?

Connection接口中SetIdleTimeout的具体实现是调用了SetKeepAlive,我的理解是连接的空闲时间超过了Idle timeout后连接被关闭,但是KeepAlive应该是定期发送维持连接存活的报文,怎么能实现超过空闲时间后连接被关闭呢?

Timeout issue

I am trying the below code but when testing with telnet connection, the connection stays online forever even if no data is sent to the server via telnet

var opts = []netpoll.Option{ netpoll.WithReadTimeout(10 * time.Second), netpoll.WithIdleTimeout(2 * time.Minute), netpoll.WithOnPrepare(onPrepare), }

我在运行示例代码的时候报的netpoll找不到这些,升级了go版本还是这个问题,请问我该怎么做谢谢

....\go1.17.1\pkg\mod\github.com\cloudwego\[email protected]\connection.go:59:18: undefined: OnRequest
....\go1.17.1\pkg\mod\github.com\cloudwego\[email protected]\connection_impl.go:30:2: undefined: netFD
....\go1.17.1\pkg\mod\github.com\cloudwego\[email protected]\connection_impl.go:40:19: undefined: barrier
....\go1.17.1\pkg\mod\github.com\cloudwego\[email protected]\connection_impl.go:41:19: undefined: barrier
....\go1.17.1\pkg\mod\github.com\cloudwego\[email protected]\connection_impl.go:252:32: undefined: Conn
....\go1.17.1\pkg\mod\github.com\cloudwego\[email protected]\connection_impl.go:252:46: undefined: OnPrepare
....\go1.17.1\pkg\mod\github.com\cloudwego\[email protected]\connection_impl.go:272:38: undefined: Conn
....\go1.17.1\pkg\mod\github.com\cloudwego\[email protected]\connection_onevent.go:48:39: undefined: OnRequest
....\go1.17.1\pkg\mod\github.com\cloudwego\[email protected]\connection_onevent.go:71:40: undefined: OnPrepare
....\go1.17.1\pkg\mod\github.com\cloudwego\[email protected]\net_sock.go:41:116: undefined: netFD
....\go1.17.1\pkg\mod\github.com\cloudwego\[email protected]\net_sock.go:41:116: too many errors

graceful shutdown failed

EventLoop 的 OnRequest 内,链接被主动关闭时,会导致链接(processing)死锁,从而无法关闭。

OnRequest

func (s *server) onHandle(_ context.Context, conn netpoll.Connection) error {
	time.Sleep(time.Second * 3)
	conn.Close()
	return nil
}

connection_onevent.go:115
image
image

你们内部版本改了内核,开源版本怎么用呢?

https://www.infoq.cn/article/fea7chf9moohbxbtyres

于是,字节跳动框架组和字节跳动内核组合作,由内核组提供了同步的接口:当调用 sendmsg 的时候,内核会监听并拦截内核原先给业务的回调,并且在回调完成后才会让 sendmsg 返回。这使得我们无需更改原有模型,可以很方便地接入 ZeroCopy send。同时,字节跳动内核组还实现了基于 unix domain socket 的 ZeroCopy,可以使得业务进程与 Mesh sidecar 之间的通信也达到零拷贝。

总不至于让大家用你们内部维护的内核吧?

是否有必要维护workerPool?

如果只是为了实现协程数量限制,仅维护逻辑上的workerCnt,直接使用go func()闭包就好了吧?还可以减少一次workerPool:Get/Put的开销。

Memory Issue when testing splice

Testing Splice with over 1000 Concurrent connections has high memory usage. Even after all 1000 clients are closed, the memory usage is still high. Here is a pprof image of heap for over 300 seconds.
profile001
.

压测建连很慢,listener 跟 accept 得到的 fd 可能分配到相同的 epoll fd 进行管理?

现象,压测客户端,10000连接,每建立一个连接后就立刻发送数据,10000个连接全部建连成功需要等待很长时间,或者等待很久仍然无法全部建立,netstat 发现大量出于 SYN_SENT 状态

netpoll代码wrap层次稍微有点多,我暂时没review太多。

盲猜一下,listener和client的fd是有可能在同一个epoll loop里吧?如果是这样,当client的fd很忙时,listener accept的响应能力就变慢了,我用来测试的client是每连接上一个,就直接开始数据收发了,如果是listener可能跟accept得到的client fd在同一个epoll loop,现象就能解释通了

如果是这个问题,listener改用单独的epoll loop比较好,accept不是大量频繁操作,所以多个listener放同一个epoll loop也ok,但是最好不要跟 client fd 放到相同的 epoll loop

connection.Reader().Next(n) n大于当前已有数据时阻塞,ReadTimeout后框架层没有关闭连接,connection.Reader()已有数据且应用层未读取时会不停触发 OnRequest

server

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/cloudwego/netpoll"
)

func main() {
	network, address := "tcp", "127.0.0.1:8888"

	// 创建 listener
	listener, err := netpoll.CreateListener(network, address)
	if err != nil {
		panic("create netpoll listener fail")
	}

	// handle: 连接读数据和处理逻辑
	var onRequest netpoll.OnRequest = handler

	// options: EventLoop 初始化自定义配置项
	var opts = []netpoll.Option{
		netpoll.WithReadTimeout(5 * time.Second),
		netpoll.WithIdleTimeout(10 * time.Minute),
		netpoll.WithOnPrepare(nil),
	}

	// 创建 EventLoop
	eventLoop, err := netpoll.NewEventLoop(onRequest, opts...)
	if err != nil {
		panic("create netpoll event-loop fail")
	}

	// 运行 Server
	err = eventLoop.Serve(listener)
	if err != nil {
		panic("netpoll server exit")
	}
}

// 读事件处理
func handler(ctx context.Context, connection netpoll.Connection) error {
	total := 2
	t := time.Now()
	reader := connection.Reader()
	data, err := reader.Peek(reader.Len())
	log.Printf("before Next, len: %v, data: %v", reader.Len(), string(data))
	data, err = reader.Next(total)
	if err != nil {
		log.Printf("Next failed, total: %v, reader.Len: %v, block time: %v, error: %v", total, reader.Len(), int(time.Since(t)), err)
		return err
	}

	log.Printf("after  Next, len: %v, data: %v, timeused: %v", len(data), string(data), int(time.Since(t).Seconds()))

	n, err := connection.Write(data)
	if err != nil {
		return err
	}
	if n != len(data) {
		return fmt.Errorf("write failed: %v < %v", n, len(data))
	}

	return nil
}

client

package main

import (
	"log"
	"net"
	"time"
)

func main() {
	conn, err := net.Dial("tcp", "127.0.0.1:8888")
	if err != nil {
		log.Fatal("dial failed:", err)
	}

	// 用于测试,设定单个完整协议包 2 字节,server端使用 connection.Reader.Next(2) 进行读取
	// server 端超时时间设定为 5s

	// 第一组,在超时时间内发送,server端能读到完整包,但 connection.Reader.Next(2) 阻塞 2s
	conn.Write([]byte("a"))
	time.Sleep(time.Second * 2)
	conn.Write([]byte("a"))

	time.Sleep(time.Second * 1)

	// 第二组,超过超时时间、分开多次发送完整包,server端 connection.Reader.Next(2) 阻塞 5s(超时时间为5s)后报错,但是连接没有断开
	// 30s 内、client 发送完剩余数据前,server 端多次触发 OnRequest,并且每次 OnRequest 中的 connection.Reader.Next(2) 阻塞 5s(超时时间为5s)后报错,但是连接没有断开
	// 30s 后 client 发送完整包剩余数据,server 端 connection.Reader.Next(2) 读到完整包
	conn.Write([]byte("b"))
	time.Sleep(time.Second * 30)
	conn.Write([]byte("b"))

	time.Sleep(time.Second * 1)

	// 第三组,只发送半包,client 端不再有行动
	// server 端多次触发 OnRequest,并且每次 OnRequest 中的 connection.Reader.Next(2) 阻塞 5s(超时时间为5s)后报错,但是连接没有断开
	// 实际场景中,server 端可能收不到 tcp FIN1,比如 client 设备断电,server 端无法及时迅速地释放该连接,如果大量连接进行攻击,存在服务不可用风险
	conn.Write([]byte("c"))

	<-make(chan int)
}

日志

go run ./netpoll.go 
2021/07/18 07:25:22 before Next, len: 1, data: a
2021/07/18 07:25:24 after  Next, len: 2, data: aa, timeused: 2
2021/07/18 07:25:25 before Next, len: 1, data: b
2021/07/18 07:25:30 Next failed, total: 2, reader.Len: 1, block time: 5005315692, error: connection read timeout 5s
2021/07/18 07:25:30 before Next, len: 1, data: b
2021/07/18 07:25:35 Next failed, total: 2, reader.Len: 1, block time: 5017124559, error: connection read timeout 5s
2021/07/18 07:25:35 before Next, len: 1, data: b
2021/07/18 07:25:40 Next failed, total: 2, reader.Len: 1, block time: 5009562038, error: connection read timeout 5s
2021/07/18 07:25:40 before Next, len: 1, data: b
2021/07/18 07:25:45 Next failed, total: 2, reader.Len: 1, block time: 5008370180, error: connection read timeout 5s
2021/07/18 07:25:45 before Next, len: 1, data: b
2021/07/18 07:25:50 Next failed, total: 2, reader.Len: 1, block time: 5011104792, error: connection read timeout 5s
2021/07/18 07:25:50 before Next, len: 1, data: b
2021/07/18 07:25:55 after  Next, len: 2, data: bb, timeused: 4
2021/07/18 07:25:56 before Next, len: 1, data: c
2021/07/18 07:26:01 Next failed, total: 2, reader.Len: 1, block time: 5009599769, error: connection read timeout 5s
2021/07/18 07:26:01 before Next, len: 1, data: c
2021/07/18 07:26:06 Next failed, total: 2, reader.Len: 1, block time: 5017649436, error: connection read timeout 5s
2021/07/18 07:26:06 before Next, len: 1, data: c
2021/07/18 07:26:11 Next failed, total: 2, reader.Len: 1, block time: 5015780369, error: connection read timeout 5s
2021/07/18 07:26:11 before Next, len: 1, data: c
2021/07/18 07:26:16 Next failed, total: 2, reader.Len: 1, block time: 5013565228, error: connection read timeout 5s
2021/07/18 07:26:16 before Next, len: 1, data: c
2021/07/18 07:26:21 Next failed, total: 2, reader.Len: 1, block time: 5004234323, error: connection read timeout 5s
2021/07/18 07:26:21 before Next, len: 1, data: c
2021/07/18 07:26:26 Next failed, total: 2, reader.Len: 1, block time: 5014860948, error: connection read timeout 5s
2021/07/18 07:26:26 before Next, len: 1, data: c
2021/07/18 07:26:31 Next failed, total: 2, reader.Len: 1, block time: 5009890510, error: connection read timeout 5s
2021/07/18 07:26:31 before Next, len: 1, data: c
2021/07/18 07:26:36 Next failed, total: 2, reader.Len: 1, block time: 5009386524, error: connection read timeout 5s
2021/07/18 07:26:36 before Next, len: 1, data: c
2021/07/18 07:26:41 Next failed, total: 2, reader.Len: 1, block time: 5009694923, error: connection read timeout 5s
2021/07/18 07:26:41 before Next, len: 1, data: c
2021/07/18 07:26:46 Next failed, total: 2, reader.Len: 1, block time: 5006999390, error: connection read timeout 5s
2021/07/18 07:26:46 before Next, len: 1, data: c
2021/07/18 07:26:51 Next failed, total: 2, reader.Len: 1, block time: 5016639111, error: connection read timeout 5s
2021/07/18 07:26:51 before Next, len: 1, data: c
2021/07/18 07:26:56 Next failed, total: 2, reader.Len: 1, block time: 5004699154, error: connection read timeout 5s
2021/07/18 07:26:56 before Next, len: 1, data: c
2021/07/18 07:27:01 Next failed, total: 2, reader.Len: 1, block time: 5003720648, error: connection read timeout 5s
2021/07/18 07:27:01 before Next, len: 1, data: c
2021/07/18 07:27:06 Next failed, total: 2, reader.Len: 1, block time: 5013684114, error: connection read timeout 5s
2021/07/18 07:27:06 before Next, len: 1, data: c
2021/07/18 07:27:11 Next failed, total: 2, reader.Len: 1, block time: 5008594864, error: connection read timeout 5s
2021/07/18 07:27:11 before Next, len: 1, data: c
2021/07/18 07:27:16 Next failed, total: 2, reader.Len: 1, block time: 5016949058, error: connection read timeout 5s
2021/07/18 07:27:16 before Next, len: 1, data: c
### 一直这样提示

PollManager 如何实现优雅退出?

Server 实现了优雅退出,但是 PollManager 不需要关闭吗?

// 这里并没有被代码引用
func (m *manager) Close() error {
	for _, poll := range m.polls {
		poll.Close()
	}
	m.NumLoops = 0
	m.balance = nil
	m.polls = nil
	return nil
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.