cloudwego / shmipc-go Goto Github PK
View Code? Open in Web Editor NEWA high performance inter-process communication golang library developed by CloudWeGo
License: Apache License 2.0
A high performance inter-process communication golang library developed by CloudWeGo
License: Apache License 2.0
pls provide the server specifications for the shmipc-go benchmark on front page. thx
It seems like shm ipc latency is worse than when using plain uds sockets, below is the difference i'm measuring between the two:
package main
import (
"fmt"
"github.com/cloudwego/shmipc-go"
"github.com/cloudwego/shmipc-go/example/best_practice/idl"
"net"
"os"
"path/filepath"
"syscall"
"time"
)
func handleStream(s *shmipc.Stream) {
req := &idl.Request{}
resp := &idl.Response{}
for {
// 1. deserialize Request
if err := req.ReadFromShm(s.BufferReader()); err != nil {
fmt.Println("stream read request, err=" + err.Error())
return
}
// 3.serialize Response
resp.ID = req.ID
// Measure latency and send a response
latency := time.Since(time.Unix(0, int64(req.ID)))
fmt.Printf("Received latency request: %v\n", latency)
if err := resp.WriteToShm(s.BufferWriter()); err != nil {
fmt.Println("stream write response failed, err=" + err.Error())
return
}
if err := s.Flush(false); err != nil {
fmt.Println("stream write response failed, err=" + err.Error())
return
}
req.Reset()
resp.Reset()
}
}
func init() {
// runtime.GOMAXPROCS(1)
}
func main() {
dir, err := os.Getwd()
if err != nil {
panic(err)
}
udsPath := filepath.Join(dir, "../ipc_test.sock")
// 1. listen unix domain socket
_ = syscall.Unlink(udsPath)
ln, err := net.ListenUnix("unix", &net.UnixAddr{Name: udsPath, Net: "unix"})
if err != nil {
panic(err)
}
defer ln.Close()
// 2. accept a unix domain socket
for {
conn, err := ln.Accept()
if err != nil {
fmt.Printf("accept error:%s now exit", err.Error())
return
}
go func() {
defer conn.Close()
// 3. create server session
conf := shmipc.DefaultConfig()
server, err := shmipc.Server(conn, conf)
if err != nil {
panic("new ipc server failed " + err.Error())
}
defer server.Close()
// 4. accept stream and handle
for {
stream, err := server.AcceptStream()
if err != nil {
fmt.Println("shmipc server accept stream failed, err=" + err.Error())
break
}
go handleStream(stream)
}
}()
}
}
package main
import (
"flag"
"fmt"
"os"
"path/filepath"
"time"
"github.com/cloudwego/shmipc-go"
"github.com/cloudwego/shmipc-go/example/best_practice/idl"
)
func init() {
// runtime.GOMAXPROCS(1)
}
func main() {
// packageSize := flag.Int("p", 1024, "-p 1024 mean that request and response's size are both near 1KB")
flag.Parse()
// 1. get current directory
dir, err := os.Getwd()
if err != nil {
panic(err)
}
// 2. init session manager
conf := shmipc.DefaultSessionManagerConfig()
conf.Address = filepath.Join(dir, "../ipc_test.sock")
conf.Network = "unix"
conf.MemMapType = shmipc.MemMapTypeMemFd
conf.SessionNum = 1
conf.InitializeTimeout = 100 * time.Second
smgr, err := shmipc.NewSessionManager(conf)
if err != nil {
panic(err)
}
go func() {
req := &idl.Request{}
resp := &idl.Response{}
for range time.Tick(time.Second) {
// 3. get stream from SessionManager
stream, err := smgr.GetStream()
if err != nil {
fmt.Println("get stream error:" + err.Error())
continue
}
// 4. set request object
req.Reset()
req.ID = uint64(time.Now().UnixNano())
// 5. write req to buffer of stream
if err := req.WriteToShm(stream.BufferWriter()); err != nil {
fmt.Println("write request to share memory failed, err=" + err.Error())
return
}
// 6. flush the buffered data of stream to peer
if err := stream.Flush(false); err != nil {
fmt.Println(" flush request to peer failed, err=" + err.Error())
return
}
// 7. wait and read response
resp.Reset()
if err := resp.ReadFromShm(stream.BufferReader()); err != nil {
fmt.Println("write request to share memory failed, err=" + err.Error())
continue
}
}
}()
time.Sleep(1200 * time.Second)
}
Received latency request: 741.428µs
Received latency request: 252.1µs
Received latency request: 178.321µs
Received latency request: 287.057µs
Received latency request: 225.539µs
Received latency request: 104.035µs
Received latency request: 189.923µs
Received latency request: 266.65µs
Received latency request: 195.734µs
Received latency request: 198.661µs
package main
import (
"encoding/binary"
"fmt"
"net"
"os"
"time"
)
func handleConn(conn net.Conn) {
defer conn.Close()
for {
// Read data from the client
data := make([]byte, 1024)
_, err := conn.Read(data)
if err != nil {
fmt.Printf("Error reading data: %v\n", err)
return
}
// Measure latency and send a response
latency := time.Since(time.Unix(0, int64(binary.BigEndian.Uint64(data))))
fmt.Printf("Received latency request: %v\n", latency)
}
}
func main() {
// Remove the socket file if it already exists
socketFile := "/tmp/socket"
if err := os.Remove(socketFile); err != nil && !os.IsNotExist(err) {
fmt.Printf("Error removing socket file: %v\n", err)
return
}
// Create a UNIX domain socket
listener, err := net.Listen("unix", socketFile)
if err != nil {
fmt.Printf("Error creating socket: %v\n", err)
return
}
defer listener.Close()
fmt.Println("Server listening on", socketFile)
for {
// Accept incoming connections
conn, err := listener.Accept()
if err != nil {
fmt.Printf("Error accepting connection: %v\n", err)
return
}
go handleConn(conn)
}
}
package main
import (
"encoding/binary"
"fmt"
"net"
"time"
)
func main() {
// Connect to the server's UNIX domain socket
conn, err := net.Dial("unix", "/tmp/socket")
if err != nil {
fmt.Printf("Error connecting to server: %v\n", err)
return
}
defer conn.Close()
for range time.Tick(time.Second) {
// Send the current time to the server for measuring latency
currentTime := time.Now().UnixNano()
currentTimeBytes := make([]byte, 8)
binary.BigEndian.PutUint64(currentTimeBytes, uint64(currentTime))
_, err = conn.Write(currentTimeBytes)
if err != nil {
fmt.Printf("Error sending data: %v\n", err)
return
}
}
}
Received latency request: 209.581µs
Received latency request: 111.524µs
Received latency request: 119.614µs
Received latency request: 80.55µs
Received latency request: 133.225µs
Received latency request: 92.258µs
Received latency request: 97.206µs
Received latency request: 89.792µs
Received latency request: 106.665µs
Received latency request: 83.404µs
how to control the timer before it is put to sleep?
smaller sizes bandwidth throughput is much lower. was wondering if can provide a "batching" / pipelining requests through
option of time (10ms) or slice size (e.g. 100) items and with size definition for the pipeline (e.g. <1kb) or similar in nature for shmipc requests only. not the fallback uds ones.
certain ops will benefit from the smaller sized transactions.
either one of them will show an error / warning because both tried to remove /dev/shm/client.ipc.shm_.... together.
how to resolve this?
using and modified the helloworld example
client
Warn 2023-04-27 13:16:45.629954 buffer_manager.go:252 clean buffer manager:/dev/shm/client.ipc.shm_502044_buffer
server
Warn 2023-04-27 13:16:45.630167 buffer_manager.go:252 clean buffer manager:/dev/shm/client.ipc.shm_502044_buffer
Warn 2023-04-27 13:16:45.632198 buffer_manager.go:593 bufferManager remove file:/dev/shm/client.ipc.shm_502044_buffer failed, error=remove /dev/shm/client.ipc.shm_502044_buffer: no such file or directory
Warn 2023-04-27 13:16:45.632384 queue.go:204 queueManager remove file:/dev/shm/client.ipc.shm_502044_queue_0 failed, error=remove /dev/shm/client.ipc.shm_502044_queue_0: no such file or directory
session.go:369 client session /dev/shm/client.ipc.shm_502044_queue_0 exitErr:connection was reset by peer. localAddress:@ remoteAddress...
can you update the release version to 0.1.1?
seems good but working out some edge case on my side and still doing some testing work.
is this production ready? what hiccups do i have to look out for?
I run helloworld on Linux with ARM architecture. When the client sends a request, a panic occurs at position queue.go #249. I want to ask if this is a known issue? Will it be supported in the future?
i saw some maxSliceSize and capPerBuffer etc
i was wondering how can i set this inside the shmipc-go to influence the transfer size of the 1 byte idl?
can you provide some guidance on how to set it for optimum transfer? i think the current one has 4096 bytes but not sure how to set it "correct" way.
there are a lot of code and i'm not sure if hardcoding 1 value here will affect there so i'm asking here. thx in advance
why does net_server and net_client with lesser code has lower QPS than shmipc_server and shmipc_client?
possible to provide example to better understand the example folder?
getting 400000 QPS with shmipc_ example and 140000 QPS with net_ example
We are always interested in finding out who is using Shmipc, what attracted you to using it, how we can listen to your needs and if you are interested, help promote your organization.
Submit a comment in this issue to include the following information
Organization/Company: ByteDance
Website: https://bytedance.com
Country: China
Contact: [email protected]
Usage scenario: Using Shmipc to build large scale Service Mesh applications
Status: Production
lock used for shmipc buffer slice? lock free??
MaxStreamNum what is a good value for this? default is 4096.
also defaultqueuecap etc
i have a "memory leak" not sure if this is the place. what do u suggest for maxstreamnum and how to calculate how much memory will be used by maxstreamnum etc?
how to limit memory use of shmipc?
type [SessionManagerConfig](https://github.com/cloudwego/shmipc-go/blob/v0.1.0/session_manager.go#L67) [¶](https://pkg.go.dev/github.com/cloudwego/shmipc-go#SessionManagerConfig)
type SessionManagerConfig struct {
*[Config](https://pkg.go.dev/github.com/cloudwego/shmipc-go#Config)
UnixPath [string](https://pkg.go.dev/builtin#string) //Deprecated , please use Network and Address.
Network [string](https://pkg.go.dev/builtin#string) //tcp or unix
Address [string](https://pkg.go.dev/builtin#string) //tcp address or unix domain socket path
//SessionNum is similar to concurrency.
//A session have dependent io queue, dependent tcp/unix connection.
//we recommend the value equal peer process' thread count, and every thread keep a session.
//if the peer process written by golang, recommend SessionNum = cpu cores / 4
SessionNum [int](https://pkg.go.dev/builtin#int)
//Max number of stream per session's stream pool
MaxStreamNum [int](https://pkg.go.dev/builtin#int)
//The idle time to close a stream
StreamMaxIdleTime [time](https://pkg.go.dev/time).[Duration](https://pkg.go.dev/time#Duration)
}
helloworld keeps doing os.Remove per connection?
sorry i was thinking how to make helloworld a persistent server though client can also be persistent too.
currently server is persistent but client is also persistent just not sure how to make it more like the net examples.
pls help make helloworld [3]byte (data length info) + []byte serialize and deserialize will do and persistent examples.
thx
please provide an example that doesnt really use idl, meaning just pass []byte (as encoded information) and decode []byte
meaning to say user can implement their own encode and decode, shmipc just provide the mechanism to do transfer over shm.
is this possible?
p.s. : i tried modifying the idl code etc to remove .Name but it shows error below, so i just want something simple without the Name or ID where possible, just pass []byte as value
Warn 2023-04-12 23:35:26.65807 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:486 status:0
Warn 2023-04-12 23:35:26.658124 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:104 status:0
Warn 2023-04-12 23:35:26.658136 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:45 status:0
Warn 2023-04-12 23:35:26.658148 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:169 status:0
Warn 2023-04-12 23:35:26.658157 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:495 status:0
Warn 2023-04-12 23:35:26.65817 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:470 status:0
Warn 2023-04-12 23:35:26.658181 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:373 status:0
Warn 2023-04-12 23:35:26.65819 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:377 status:0
Warn 2023-04-12 23:35:26.658199 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:154 status:0
Warn 2023-04-12 23:35:26.658209 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:254 status:0
Warn 2023-04-12 23:35:26.658218 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:39 status:0
Warn 2023-04-12 23:35:26.658228 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:320 status:0
Warn 2023-04-12 23:35:26.658239 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:456 status:0
Warn 2023-04-12 23:35:26.658249 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:214 status:0
Warn 2023-04-12 23:35:26.658258 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:185 status:0
Warn 2023-04-12 23:35:26.658267 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:200 status:0
Warn 2023-04-12 23:35:26.658275 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:19 status:0
Warn 2023-04-12 23:35:26.658285 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:473 status:0
Warn 2023-04-12 23:35:26.658294 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:195 status:0
Warn 2023-04-12 23:35:26.658304 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:64 status:0
Warn 2023-04-12 23:35:26.658313 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:82 status:0
Warn 2023-04-12 23:35:26.658323 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:448 status:0
Warn 2023-04-12 23:35:26.658332 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:215 status:0
Warn 2023-04-12 23:35:26.658342 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:363 status:0
Warn 2023-04-12 23:35:26.658351 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:479 status:0
Warn 2023-04-12 23:35:26.65836 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:282 status:0
Warn 2023-04-12 23:35:26.658371 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:273 status:0
Warn 2023-04-12 23:35:26.658381 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:192 status:0
Warn 2023-04-12 23:35:26.65839 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:318 status:0
Warn 2023-04-12 23:35:26.65841 stream.go:257 server session stream fallback seqID:318 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658449 stream.go:257 server session stream fallback seqID:486 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658468 stream.go:257 server session stream fallback seqID:104 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658488 stream.go:257 server session stream fallback seqID:45 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658506 stream.go:257 server session stream fallback seqID:169 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658526 stream.go:257 server session stream fallback seqID:495 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658543 stream.go:257 server session stream fallback seqID:470 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658561 stream.go:257 server session stream fallback seqID:373 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658579 stream.go:257 server session stream fallback seqID:377 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658596 stream.go:257 server session stream fallback seqID:154 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658614 stream.go:257 server session stream fallback seqID:254 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658645 stream.go:257 server session stream fallback seqID:39 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658667 stream.go:257 server session stream fallback seqID:320 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658726 stream.go:257 server session stream fallback seqID:200 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658739 stream.go:257 server session stream fallback seqID:456 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658746 stream.go:257 server session stream fallback seqID:214 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658753 stream.go:257 server session stream fallback seqID:185 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.65876 stream.go:257 server session stream fallback seqID:473 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658769 stream.go:257 server session stream fallback seqID:19 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.65878 stream.go:257 server session stream fallback seqID:363 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658787 stream.go:257 server session stream fallback seqID:82 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
helloworld keeps doing os.Remove per connection?
sorry i was thinking how to make helloworld a persistent server though client can also be persistent too.
currently server is persistent but client is also persistent just not sure how to make it more like the net examples.
pls help make helloworld [3]byte (data length info) + []byte serialize and deserialize will do and persistent examples.
thx
Describe the bug
if i put 2 instances, it will basically halved at 750000 req/s each and 3 instances uses 500,000 req/s etc.
how to push for max req/s using multiple instances?
also getting this error after running for a long time with multiple instances where by ram is showing 0.0% usage using top -c
pls suggest ways to workaround the issues.
Warn 2024-04-04 20:02:46.910525 stream.go:257 server session stream fallback seqID:24 len:1043 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
1 goroutine per cpu core running listening in as server
many clients sending to different uds socket per core of the server
To Reproduce
commenting
//runtime.GOMAXPROCS(1)
in
shmipc_server example
Expected behavior
able to use multiple core
Environment:
ubuntu 22.04 go 1.22
i cant confirm this yet but running multiple different instances of shmipc listening to two different things on 1 process will OOM
sorry i cant confirm this yet but can you give an idea how to "work around"? is this an issue?
i've ran multiple shmipc go on 1 process and got oom doing so because of "memory leak". it seems to be using a lot of memory without releasing
今天看了Shmipc在serviceMesh场景下,对于大包场景下做到了较好的收益。请问 Shmipc的方案在serviceMesh场景下,有和ebpf的socket redirect做过对比吗?谁更好点
Getting shared memory exhausted, saw shmipc specification but can't find examples on how to use fallback to use uds / tcp
possible to give more details on how to use uds (and also tcp) for the fallback when shmipc is exhausted?
how to limit "exhaustion" of memory of shm ipc?
how to silence internal logger or any other displayed information?
how to set the setting for it to be quiet for warnf etc in the examples?
running as client shows the error below:
./main
client stream send request:client say hello world!!!
server receive request message:client say hello world!!!
server reply response: server hello world!!!
client stream receive response server hello world!!!
Error 2023-04-12 23:37:01.27175 session.go:369 server session /dev/shm/client.ipc.shm_2133964_queue_0 exitErr:connection was reset by peer. localAddress:/home/ubuntu/Documents/Programs/go/shmipc-go/example/helloworld/ipc_test.sock remoteAddress:@
Warn 2023-04-12 23:37:01.271935 buffer_manager.go:252 clean buffer manager:/dev/shm/client.ipc.shm_2133964_buffer
for async client and server, possible to give an explanation on how does it work?
data received or transferred will be out of sync?
possible to provide a solution whereby we dont need to putback? i dont mind a bit of allocation and let GC do the work
Is your feature request related to a problem? Please describe.
目前Monitor 接口没有具体实现,无法获得程序运行时的监控指标
Describe the solution you'd like
type Monitor interface {
OnEmitSessionMetrics(PerformanceMetrics, StabilityMetrics, ShareMemoryMetrics, *Session)
Flush() error
}
实现 Monitor 接口,其中OnEmitSessionMetrics 方法发送metrics 到prometheus , Flush() 方法将metrics写到本地日志文件
Describe alternatives you've considered
不太确定用那种实现方式
1 在业务代码里面实现Monitor 接口,然后提供一个使用的example,放在example路径中
2 直接在shmipc-go 库里实现,对外提供接口的实现,写一个使用的example
Additional context
shm server
package server
import (
"fmt"
"github.com/cloudwego/shmipc-go"
"net"
"os"
"path/filepath"
"sync"
"syscall"
)
var (
_ shmipc.StreamCallbacks = &streamCbImpl{}
_ shmipc.ListenCallback = &listenCbImpl{}
)
type Server struct {
udsPath string
netUnixListener *net.UnixListener
conn net.Conn
shmipcServer *shmipc.Session
shmipcStream *shmipc.Stream
shutdownErrStr string
mu sync.Mutex
callback shmipc.ListenCallback
unlinkOnClose bool
isClose bool
}
func New() *Server {
log.Init("server_shmipc.log", log.DebugLevel,
log.SetStdout(true),
log.SetCaller(true),
log.SetMaxBackups(1),
)
return &Server{}
}
func (s *Server) Init() {
dir, err := os.Getwd()
if err != nil {
log.Errorf("getwd err: %s", err.Error())
return
}
s.udsPath = filepath.Join(dir, fmt.Sprintf("%s/%s", market_gateway.UnixDomainSocketFolder, market_gateway.UnixDomainSocketFile))
_ = syscall.Unlink(s.udsPath)
shmipc.SetLogLevel(0)
config := shmipc.NewDefaultListenerConfig(s.udsPath, "unix")
config.ShareMemoryBufferCap = 32 << 20
config.ShareMemoryPathPrefix = fmt.Sprintf("%s/server.ipc.shm", market_gateway.IPCShmPathFolder)
config.QueuePath = fmt.Sprintf("%s/server.ipc.shm_queue", market_gateway.IPCShmPathFolder)
ln, err := shmipc.NewListener(&listenCbImpl{}, config)
if err != nil {
log.Error(err)
return
}
log.Infof("unix listener:%s", s.udsPath)
ln.Run()
}
type listenCbImpl struct{}
// 客户端连接进来stream
func (l *listenCbImpl) OnNewStream(s *shmipc.Stream) {
s.SetCallbacks(&streamCbImpl{stream: s})
}
//
func (l *listenCbImpl) OnShutdown(reason string) {
log.Error("OnShutdown reason:" + reason)
}
type streamCbImpl struct {
req idl.Request
resp idl.Response
stream *shmipc.Stream
}
// 收到客户端数据
func (s *streamCbImpl) OnData(reader shmipc.BufferReader) {
//1.deserialize Request
if err := s.req.ReadFromShm(reader); err != nil {
fmt.Println("stream read request, err=" + err.Error())
return
}
// 解析客户端请求什么数据
// ID, Name, Key
// 将其注册到行情内存channel中
// 启动转发routine,从channel中获取数据
now := time.Now()
//3.serialize Response
s.resp.ID = uint64(now.UnixNano())// s.req.ID
s.resp.Name = s.req.Name
s.resp.Image = []byte(strconv.Itoa(int(now.UnixNano()-int64(s.req.ID))))
if err := s.resp.WriteToShm(s.stream.BufferWriter()); err != nil {
fmt.Println("stream write response failed, err=" + err.Error())
return
}
if err := s.stream.Flush(false); err != nil {
fmt.Println("stream write response failed, err=" + err.Error())
return
}
s.stream.ReleaseReadAndReuse()
s.req.Reset()
s.resp.Reset()
}
func (s *streamCbImpl) OnLocalClose() {
log.Error("stream OnLocalClose")
}
func (s *streamCbImpl) OnRemoteClose() {
log.Error("stream OnRemoteClose")
}
...
then
s := server.New()
s.Init()
shm client
package client
import (
"fmt"
"github.com/cloudwego/shmipc-go"
"math/rand"
"os"
"path/filepath"
)
var (
_ shmipc.StreamCallbacks = &StreamHander{}
)
type Client struct {
prefix string
udsPath string
shmipcCfg *shmipc.SessionManagerConfig
shmipcMgr *shmipc.SessionManager
shmipcStream *shmipc.Stream
hander *StreamHander
}
func New(prefix string) *Client {
log.Init(fmt.Sprintf("client_%s_shmipc.log", prefix), log.DebugLevel,
log.SetStdout(true),
log.SetCaller(true),
log.SetMaxBackups(1),
)
return &Client{
prefix: prefix,
udsPath: "",
}
}
func (c *Client) Init() {
dir, err := os.Getwd()
if err != nil {
log.Errorf("getwd err: %s", err.Error())
return
}
//c.udsPath = filepath.Join(dir, fmt.Sprintf("%s/ipc_%s.sock", market_gateway.UnixDomainSocketFolder,c.prefix))
c.udsPath = filepath.Join(dir, fmt.Sprintf("%s/%s", market_gateway.UnixDomainSocketFolder, market_gateway.UnixDomainSocketFile))
shmipc.SetLogLevel(0)
// 1.create client session manager
c.shmipcCfg = shmipc.DefaultSessionManagerConfig()
c.shmipcCfg.ShareMemoryPathPrefix = fmt.Sprintf("%s/client_%s.ipc.shm", market_gateway.IPCShmPathFolder, c.prefix)
c.shmipcCfg.QueuePath = fmt.Sprintf("%s/client_%s.ipc.shm_queue", market_gateway.IPCShmPathFolder, c.prefix)
c.shmipcCfg.MemMapType = shmipc.MemMapTypeMemFd
c.shmipcCfg.Network = "unix"
c.shmipcCfg.Address = c.udsPath
c.shmipcCfg.ShareMemoryBufferCap = 32 << 20
//if runtime.GOOS == "darwin" {
// conf.ShareMemoryPathPrefix = "/tmp/client.ipc.shm"
// conf.QueuePath = "/tmp/client.ipc.shm_queue"
//}
c.shmipcMgr, err = shmipc.NewSessionManager(c.shmipcCfg)
if err != nil {
panic("create client session failed, " + err.Error())
}
//defer s.Close()
// 2.create stream
c.shmipcStream, err = c.shmipcMgr.GetStream()
if err != nil {
panic("client open stream failed, " + err.Error())
}
//defer s.PutBack(stream)
key := make([]byte, 1024)
rand.Read(key)
c.hander = NewStreamHander(key, c.shmipcStream, c.shmipcMgr)
c.shmipcStream.SetCallbacks(c.hander)
}
func (c *Client) Send() {
c.hander.Send()
}
func (c *Client) End() {
if c.shmipcStream != nil {
if c.shmipcMgr != nil {
c.shmipcMgr.PutBack(c.shmipcStream)
c.shmipcMgr.Close()
}
}
}
type StreamHander struct {
req idl.Request
resp idl.Response
stream *shmipc.Stream
smgr *shmipc.SessionManager
key []byte
}
func NewStreamHander(key []byte, stream *shmipc.Stream, smgr *shmipc.SessionManager) *StreamHander {
return &StreamHander{key: key, stream: stream, smgr: smgr}
}
func (s *StreamHander) OnData(reader shmipc.BufferReader) {
//wait and read response
s.resp.Reset()
if err := s.resp.ReadFromShm(reader); err != nil {
log.Error("write request to share memory failed, err=" + err.Error())
return
}
diff := time.Now().UnixNano()-int64(s.resp.ID)
log.Infof("server=>client %d ns, client=>server %s ns", diff, string(s.resp.Image))
s.stream.ReleaseReadAndReuse()
}
func (s *StreamHander) Send() {
now := time.Now()
//serialize request
s.req.Reset()
s.req.ID = uint64(now.UnixNano())
s.req.Name = "xxx"
s.req.Key = s.key
if err := s.req.WriteToShm(s.stream.BufferWriter()); err != nil {
log.Error("write request to share memory failed, err=" + err.Error())
return
}
if err := s.stream.Flush(false); err != nil {
log.Error(" flush request to peer failed, err=" + err.Error())
return
}
}
func (s *StreamHander) OnLocalClose() {
log.Info("stream OnLocalClose")
}
func (s *StreamHander) OnRemoteClose() {
log.Info("stream OnRemoteClose")
}
then
c := client.New("0001")
c.Init()
for i := 0; i < 100; i++ {
c.Send()
<-time.After(time.Second)
}
<-time.After(time.Second)
result
server=>client 302563 ns, client=>server 149902 ns
server=>client 78659 ns, client=>server 311423 ns
server=>client 99438 ns, client=>server 201126 ns
server=>client 101603 ns, client=>server 254039 ns
server=>client 101192 ns, client=>server 229704 ns
server=>client 110421 ns, client=>server 214390 ns
server=>client 77879 ns, client=>server 258622 ns
server=>client 89274 ns, client=>server 195844 ns
server=>client 113707 ns, client=>server 241404 ns
server=>client 128838 ns, client=>server 202124 ns
server=>client 181284 ns, client=>server 257551 ns
server=>client 68382 ns, client=>server 213854 ns
server=>client 162655 ns, client=>server 240469 ns
server=>client 89941 ns, client=>server 274032 ns
server=>client 79828 ns, client=>server 170969 ns
server=>client 107270 ns, client=>server 106275 ns
server=>client 110980 ns, client=>server 196599 ns
server=>client 100005 ns, client=>server 315514 ns
server=>client 184491 ns, client=>server 295354 ns
server=>client 131333 ns, client=>server 240325 ns
server=>client 118292 ns, client=>server 254118 ns
server=>client 114306 ns, client=>server 276610 ns
server=>client 87094 ns, client=>server 235931 ns
server=>client 95147 ns, client=>server 126418 ns
server=>client 83500 ns, client=>server 262050 ns
server=>client 155115 ns, client=>server 139429 ns
server=>client 79613 ns, client=>server 214479 ns
server=>client 84174 ns, client=>server 127120 ns
server=>client 128731 ns, client=>server 251253 ns
server=>client 90009 ns, client=>server 213201 ns
server=>client 64554 ns, client=>server 187908 ns
server=>client 113250 ns, client=>server 346987 ns
server=>client 154018 ns, client=>server 180147 ns
server=>client 109506 ns, client=>server 276111 ns
server=>client 115490 ns, client=>server 184541 ns
server=>client 79395 ns, client=>server 196219 ns
server=>client 100075 ns, client=>server 191885 ns
server=>client 176090 ns, client=>server 212988 ns
server=>client 113911 ns, client=>server 140827 ns
server=>client 56777 ns, client=>server 113488 ns
server=>client 109094 ns, client=>server 138789 ns
server=>client 94613 ns, client=>server 146264 ns
server=>client 79668 ns, client=>server 200440 ns
server=>client 82570 ns, client=>server 235427 ns
server=>client 113162 ns, client=>server 264805 ns
server=>client 81321 ns, client=>server 190720 ns
server=>client 114324 ns, client=>server 192740 ns
server=>client 117009 ns, client=>server 181410 ns
server=>client 150933 ns, client=>server 201962 ns
server=>client 95808 ns, client=>server 247240 ns
server=>client 81515 ns, client=>server 162001 ns
server=>client 100433 ns, client=>server 187268 ns
server=>client 94080 ns, client=>server 183065 ns
server=>client 68143 ns, client=>server 182330 ns
server=>client 96020 ns, client=>server 171040 ns
server=>client 89387 ns, client=>server 195151 ns
server=>client 192450 ns, client=>server 177611 ns
server=>client 113053 ns, client=>server 150515 ns
server=>client 104233 ns, client=>server 193330 ns
server=>client 69314 ns, client=>server 194273 ns
server=>client 183809 ns, client=>server 318489 ns
server=>client 133550 ns, client=>server 311941 ns
server=>client 72557 ns, client=>server 190717 ns
server=>client 83506 ns, client=>server 214645 ns
server=>client 261849 ns, client=>server 189151 ns
server=>client 101423 ns, client=>server 300826 ns
server=>client 127147 ns, client=>server 155703 ns
server=>client 82517 ns, client=>server 209542 ns
server=>client 100622 ns, client=>server 181754 ns
server=>client 144928 ns, client=>server 217414 ns
server=>client 78824 ns, client=>server 188329 ns
server=>client 212554 ns, client=>server 244976 ns
server=>client 98345 ns, client=>server 160499 ns
server=>client 92774 ns, client=>server 133956 ns
server=>client 97873 ns, client=>server 214409 ns
server=>client 90826 ns, client=>server 204376 ns
server=>client 100275 ns, client=>server 168810 ns
server=>client 88030 ns, client=>server 239757 ns
server=>client 71838 ns, client=>server 114075 ns
server=>client 83410 ns, client=>server 184958 ns
server=>client 140440 ns, client=>server 154009 ns
server=>client 142377 ns, client=>server 230575 ns
server=>client 91591 ns, client=>server 226697 ns
server=>client 77744 ns, client=>server 124703 ns
server=>client 145887 ns, client=>server 196064 ns
server=>client 86124 ns, client=>server 156900 ns
server=>client 76476 ns, client=>server 188798 ns
server=>client 127490 ns, client=>server 126566 ns
server=>client 107608 ns, client=>server 208988 ns
server=>client 92043 ns, client=>server 176982 ns
server=>client 147985 ns, client=>server 263178 ns
server=>client 123474 ns, client=>server 151468 ns
server=>client 91433 ns, client=>server 382726 ns
server=>client 155142 ns, client=>server 1198213 ns
server=>client 119104 ns, client=>server 348680 ns
server=>client 89418 ns, client=>server 225937 ns
server=>client 66334 ns, client=>server 142855 ns
server=>client 85201 ns, client=>server 118376 ns
server=>client 83037 ns, client=>server 212686 ns
server=>client 79710 ns, client=>server 137516 ns
possible to support streaming?
why NewListener
only support linux?
osx may be ok?
Line 81 in 9f40696
seems good but working out some edge case on my side and still doing some testing work.
is this production ready? what hiccups do i have to look out for?
@goodbye-babyer using cloudwego hertz web server and shmipc-go but seems memory is increasing. not sure how to limit this heap.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.