Giter Site home page Giter Site logo

shmipc-go's People

Contributors

gala-r avatar goodbye-babyer avatar guangmingluo avatar lemonymoon avatar liu-song avatar rfyiamcool avatar zhquzzuli avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

shmipc-go's Issues

latency seems worse than uds in low qps and batch IO scenario

It seems like shm ipc latency is worse than when using plain uds sockets, below is the difference i'm measuring between the two:

shm ipc:

server

package main

import (
	"fmt"
	"github.com/cloudwego/shmipc-go"
	"github.com/cloudwego/shmipc-go/example/best_practice/idl"
	"net"
	"os"
	"path/filepath"
	"syscall"
	"time"
)

func handleStream(s *shmipc.Stream) {
	req := &idl.Request{}
	resp := &idl.Response{}
	for {
		// 1. deserialize Request
		if err := req.ReadFromShm(s.BufferReader()); err != nil {
			fmt.Println("stream read request, err=" + err.Error())
			return
		}

		// 3.serialize Response
		resp.ID = req.ID

		// Measure latency and send a response
		latency := time.Since(time.Unix(0, int64(req.ID)))
		fmt.Printf("Received latency request: %v\n", latency)

		if err := resp.WriteToShm(s.BufferWriter()); err != nil {
			fmt.Println("stream write response failed, err=" + err.Error())
			return
		}
		if err := s.Flush(false); err != nil {
			fmt.Println("stream write response failed, err=" + err.Error())
			return
		}
		req.Reset()
		resp.Reset()
	}
}

func init() {
	// runtime.GOMAXPROCS(1)
}

func main() {
	dir, err := os.Getwd()
	if err != nil {
		panic(err)
	}
	udsPath := filepath.Join(dir, "../ipc_test.sock")

	// 1. listen unix domain socket
	_ = syscall.Unlink(udsPath)
	ln, err := net.ListenUnix("unix", &net.UnixAddr{Name: udsPath, Net: "unix"})
	if err != nil {
		panic(err)
	}
	defer ln.Close()

	// 2. accept a unix domain socket
	for {
		conn, err := ln.Accept()
		if err != nil {
			fmt.Printf("accept error:%s now exit", err.Error())
			return
		}
		go func() {
			defer conn.Close()

			// 3. create server session
			conf := shmipc.DefaultConfig()
			server, err := shmipc.Server(conn, conf)
			if err != nil {
				panic("new ipc server failed " + err.Error())
			}
			defer server.Close()

			// 4. accept stream and handle
			for {
				stream, err := server.AcceptStream()
				if err != nil {
					fmt.Println("shmipc server accept stream failed, err=" + err.Error())
					break
				}
				go handleStream(stream)
			}
		}()
	}
}

client:

package main

import (
	"flag"
	"fmt"
	"os"
	"path/filepath"
	"time"

	"github.com/cloudwego/shmipc-go"
	"github.com/cloudwego/shmipc-go/example/best_practice/idl"
)

func init() {
	// runtime.GOMAXPROCS(1)
}

func main() {
	// packageSize := flag.Int("p", 1024, "-p 1024 mean that request and response's size are both near 1KB")
	flag.Parse()

	// 1. get current directory
	dir, err := os.Getwd()
	if err != nil {
		panic(err)
	}

	// 2. init session manager
	conf := shmipc.DefaultSessionManagerConfig()
	conf.Address = filepath.Join(dir, "../ipc_test.sock")
	conf.Network = "unix"
	conf.MemMapType = shmipc.MemMapTypeMemFd
	conf.SessionNum = 1
	conf.InitializeTimeout = 100 * time.Second
	smgr, err := shmipc.NewSessionManager(conf)
	if err != nil {
		panic(err)
	}

	go func() {
		req := &idl.Request{}
		resp := &idl.Response{}

		for range time.Tick(time.Second) {
			// 3. get stream from SessionManager
			stream, err := smgr.GetStream()
			if err != nil {
				fmt.Println("get stream error:" + err.Error())
				continue
			}

			// 4. set request object
			req.Reset()
			req.ID = uint64(time.Now().UnixNano())

			// 5. write req to buffer of stream
			if err := req.WriteToShm(stream.BufferWriter()); err != nil {
				fmt.Println("write request to share memory failed, err=" + err.Error())
				return
			}

			// 6. flush the buffered data of stream to peer
			if err := stream.Flush(false); err != nil {
				fmt.Println(" flush request to peer failed, err=" + err.Error())
				return
			}

			// 7. wait and read response
			resp.Reset()
			if err := resp.ReadFromShm(stream.BufferReader()); err != nil {
				fmt.Println("write request to share memory failed, err=" + err.Error())
				continue
			}
		}
	}()
	time.Sleep(1200 * time.Second)
}

output

Received latency request: 741.428µs
Received latency request: 252.1µs
Received latency request: 178.321µs
Received latency request: 287.057µs
Received latency request: 225.539µs
Received latency request: 104.035µs
Received latency request: 189.923µs
Received latency request: 266.65µs
Received latency request: 195.734µs
Received latency request: 198.661µs

uds:

server:

package main

import (
	"encoding/binary"
	"fmt"
	"net"
	"os"
	"time"
)

func handleConn(conn net.Conn) {
	defer conn.Close()

	for {
		// Read data from the client
		data := make([]byte, 1024)
		_, err := conn.Read(data)
		if err != nil {
			fmt.Printf("Error reading data: %v\n", err)
			return
		}

		// Measure latency and send a response
		latency := time.Since(time.Unix(0, int64(binary.BigEndian.Uint64(data))))
		fmt.Printf("Received latency request: %v\n", latency)
	}
}

func main() {
	// Remove the socket file if it already exists
	socketFile := "/tmp/socket"
	if err := os.Remove(socketFile); err != nil && !os.IsNotExist(err) {
		fmt.Printf("Error removing socket file: %v\n", err)
		return
	}

	// Create a UNIX domain socket
	listener, err := net.Listen("unix", socketFile)
	if err != nil {
		fmt.Printf("Error creating socket: %v\n", err)
		return
	}
	defer listener.Close()

	fmt.Println("Server listening on", socketFile)

	for {
		// Accept incoming connections
		conn, err := listener.Accept()
		if err != nil {
			fmt.Printf("Error accepting connection: %v\n", err)
			return
		}

		go handleConn(conn)
	}
}

client:

package main

import (
	"encoding/binary"
	"fmt"
	"net"
	"time"
)

func main() {
	// Connect to the server's UNIX domain socket
	conn, err := net.Dial("unix", "/tmp/socket")
	if err != nil {
		fmt.Printf("Error connecting to server: %v\n", err)
		return
	}
	defer conn.Close()

	for range time.Tick(time.Second) {
		// Send the current time to the server for measuring latency
		currentTime := time.Now().UnixNano()
		currentTimeBytes := make([]byte, 8)
		binary.BigEndian.PutUint64(currentTimeBytes, uint64(currentTime))
		_, err = conn.Write(currentTimeBytes)
		if err != nil {
			fmt.Printf("Error sending data: %v\n", err)
			return
		}
	}
}

latency:

Received latency request: 209.581µs
Received latency request: 111.524µs
Received latency request: 119.614µs
Received latency request: 80.55µs
Received latency request: 133.225µs
Received latency request: 92.258µs
Received latency request: 97.206µs
Received latency request: 89.792µs
Received latency request: 106.665µs
Received latency request: 83.404µs

batching / pipelining requests for small byte size data

smaller sizes bandwidth throughput is much lower. was wondering if can provide a "batching" / pipelining requests through
option of time (10ms) or slice size (e.g. 100) items and with size definition for the pipeline (e.g. <1kb) or similar in nature for shmipc requests only. not the fallback uds ones.

certain ops will benefit from the smaller sized transactions.

not sure why but client and server both remove /dev/shm/client.ipc.shm_502044_buffer together?

either one of them will show an error / warning because both tried to remove /dev/shm/client.ipc.shm_.... together.
how to resolve this?
using and modified the helloworld example

client

Warn 2023-04-27 13:16:45.629954 buffer_manager.go:252  clean buffer manager:/dev/shm/client.ipc.shm_502044_buffer

server

Warn 2023-04-27 13:16:45.630167 buffer_manager.go:252  clean buffer manager:/dev/shm/client.ipc.shm_502044_buffer
Warn 2023-04-27 13:16:45.632198 buffer_manager.go:593  bufferManager remove file:/dev/shm/client.ipc.shm_502044_buffer failed, error=remove /dev/shm/client.ipc.shm_502044_buffer: no such file or directory
Warn 2023-04-27 13:16:45.632384 queue.go:204  queueManager remove file:/dev/shm/client.ipc.shm_502044_queue_0 failed, error=remove /dev/shm/client.ipc.shm_502044_queue_0: no such file or directory
session.go:369 client session /dev/shm/client.ipc.shm_502044_queue_0 exitErr:connection was reset by peer. localAddress:@ remoteAddress...

is this production ready?

seems good but working out some edge case on my side and still doing some testing work.

is this production ready? what hiccups do i have to look out for?

i've made a data transfer of 1 byte only for doing some flag transfer in my idl, how do i optimize the transfer rate e.g.

i saw some maxSliceSize and capPerBuffer etc
i was wondering how can i set this inside the shmipc-go to influence the transfer size of the 1 byte idl?

can you provide some guidance on how to set it for optimum transfer? i think the current one has 4096 bytes but not sure how to set it "correct" way.

there are a lot of code and i'm not sure if hardcoding 1 value here will affect there so i'm asking here. thx in advance

Are you using Shmipc ?

The purpose of this issue

We are always interested in finding out who is using Shmipc, what attracted you to using it, how we can listen to your needs and if you are interested, help promote your organization.

  • We have people reaching out to us asking, who uses Shmipc in production?
  • We’d like to listen to what you would like to see in Shmipc and your scenarios?
  • We'd like to help promote your organization and work with you

What we would like from you

Submit a comment in this issue to include the following information

  • Your organization or company
  • Link to your website
  • Your country
  • Your contact info to reach out to you: blog, email or Twitter (at least one).
  • What is your scenario for using Shmipc?
  • Are you running you application in testing or production?
Organization/Company: ByteDance
Website: https://bytedance.com
Country: China
Contact: [email protected]
Usage scenario: Using Shmipc to build large scale Service Mesh applications
Status: Production

buffer slice

lock used for shmipc buffer slice? lock free??

how to limit memory use of shmipc?

MaxStreamNum what is a good value for this? default is 4096.

also defaultqueuecap etc

i have a "memory leak" not sure if this is the place. what do u suggest for maxstreamnum and how to calculate how much memory will be used by maxstreamnum etc?

how to limit memory use of shmipc?

type [SessionManagerConfig](https://github.com/cloudwego/shmipc-go/blob/v0.1.0/session_manager.go#L67) [¶](https://pkg.go.dev/github.com/cloudwego/shmipc-go#SessionManagerConfig)
type SessionManagerConfig struct {
	*[Config](https://pkg.go.dev/github.com/cloudwego/shmipc-go#Config)
	UnixPath [string](https://pkg.go.dev/builtin#string) //Deprecated , please use Network and Address.
	Network  [string](https://pkg.go.dev/builtin#string) //tcp or unix
	Address  [string](https://pkg.go.dev/builtin#string) //tcp address or unix domain socket path

	//SessionNum is similar to concurrency.
	//A session have dependent io queue, dependent tcp/unix connection.
	//we recommend the value equal peer process' thread count, and every thread keep a session.
	//if the peer process written by golang, recommend SessionNum = cpu cores / 4
	SessionNum [int](https://pkg.go.dev/builtin#int)
	//Max number of stream per session's stream pool
	MaxStreamNum [int](https://pkg.go.dev/builtin#int)
	//The idle time to close a stream
	StreamMaxIdleTime [time](https://pkg.go.dev/time).[Duration](https://pkg.go.dev/time#Duration)
}

helloworld keeps doing os.Remove per connection?

helloworld keeps doing os.Remove per connection?
sorry i was thinking how to make helloworld a persistent server though client can also be persistent too.

currently server is persistent but client is also persistent just not sure how to make it more like the net examples.

pls help make helloworld [3]byte (data length info) + []byte serialize and deserialize will do and persistent examples.

thx

please provide an example that doesnt really use idl

please provide an example that doesnt really use idl, meaning just pass []byte (as encoded information) and decode []byte

meaning to say user can implement their own encode and decode, shmipc just provide the mechanism to do transfer over shm.

is this possible?

p.s. : i tried modifying the idl code etc to remove .Name but it shows error below, so i just want something simple without the Name or ID where possible, just pass []byte as value

Warn 2023-04-12 23:35:26.65807 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:486 status:0
Warn 2023-04-12 23:35:26.658124 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:104 status:0
Warn 2023-04-12 23:35:26.658136 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:45 status:0
Warn 2023-04-12 23:35:26.658148 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:169 status:0
Warn 2023-04-12 23:35:26.658157 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:495 status:0
Warn 2023-04-12 23:35:26.65817 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:470 status:0
Warn 2023-04-12 23:35:26.658181 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:373 status:0
Warn 2023-04-12 23:35:26.65819 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:377 status:0
Warn 2023-04-12 23:35:26.658199 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:154 status:0
Warn 2023-04-12 23:35:26.658209 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:254 status:0
Warn 2023-04-12 23:35:26.658218 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:39 status:0
Warn 2023-04-12 23:35:26.658228 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:320 status:0
Warn 2023-04-12 23:35:26.658239 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:456 status:0
Warn 2023-04-12 23:35:26.658249 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:214 status:0
Warn 2023-04-12 23:35:26.658258 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:185 status:0
Warn 2023-04-12 23:35:26.658267 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:200 status:0
Warn 2023-04-12 23:35:26.658275 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:19 status:0
Warn 2023-04-12 23:35:26.658285 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:473 status:0
Warn 2023-04-12 23:35:26.658294 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:195 status:0
Warn 2023-04-12 23:35:26.658304 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:64 status:0
Warn 2023-04-12 23:35:26.658313 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:82 status:0
Warn 2023-04-12 23:35:26.658323 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:448 status:0
Warn 2023-04-12 23:35:26.658332 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:215 status:0
Warn 2023-04-12 23:35:26.658342 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:363 status:0
Warn 2023-04-12 23:35:26.658351 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:479 status:0
Warn 2023-04-12 23:35:26.65836 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:282 status:0
Warn 2023-04-12 23:35:26.658371 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:273 status:0
Warn 2023-04-12 23:35:26.658381 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:192 status:0
Warn 2023-04-12 23:35:26.65839 protocol_manager.go:166 server session receive fallback data , length:1036 seqID:318 status:0
Warn 2023-04-12 23:35:26.65841 stream.go:257 server session stream fallback seqID:318 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658449 stream.go:257 server session stream fallback seqID:486 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658468 stream.go:257 server session stream fallback seqID:104 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658488 stream.go:257 server session stream fallback seqID:45 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658506 stream.go:257 server session stream fallback seqID:169 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658526 stream.go:257 server session stream fallback seqID:495 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658543 stream.go:257 server session stream fallback seqID:470 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658561 stream.go:257 server session stream fallback seqID:373 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658579 stream.go:257 server session stream fallback seqID:377 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658596 stream.go:257 server session stream fallback seqID:154 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658614 stream.go:257 server session stream fallback seqID:254 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658645 stream.go:257 server session stream fallback seqID:39 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658667 stream.go:257 server session stream fallback seqID:320 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658726 stream.go:257 server session stream fallback seqID:200 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658739 stream.go:257 server session stream fallback seqID:456 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658746 stream.go:257 server session stream fallback seqID:214 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658753 stream.go:257 server session stream fallback seqID:185 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.65876 stream.go:257 server session stream fallback seqID:473 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658769 stream.go:257 server session stream fallback seqID:19 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.65878 stream.go:257 server session stream fallback seqID:363 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
Warn 2023-04-12 23:35:26.658787 stream.go:257 server session stream fallback seqID:82 len:1036 reason:share memory not more buffer, sendBuf.isFromShareMemory: true

helloworld keeps doing os.Remove per connection?

helloworld keeps doing os.Remove per connection?
sorry i was thinking how to make helloworld a persistent server though client can also be persistent too.

currently server is persistent but client is also persistent just not sure how to make it more like the net examples.

pls help make helloworld [3]byte (data length info) + []byte serialize and deserialize will do and persistent examples.

thx

unable to fully utilize multiple cores in async mode

Describe the bug

  1. for async mode, not technically a bug but i'm having max 1,500,000 req/s on my 12 core amd laptop ubuntu 22.04

if i put 2 instances, it will basically halved at 750000 req/s each and 3 instances uses 500,000 req/s etc.

how to push for max req/s using multiple instances?

also getting this error after running for a long time with multiple instances where by ram is showing 0.0% usage using top -c

pls suggest ways to workaround the issues.

Warn 2024-04-04 20:02:46.910525 stream.go:257 server session stream fallback seqID:24 len:1043 reason:share memory not more buffer, sendBuf.isFromShareMemory: true
  1. also, for sync mode, trying to use multiple cores of golang. how to make this possible? i tried to run multiple server instances inside 1 golang program by having 1 go routine listening to 1 uds socket per cpu core but it doesnt seem possible. possible to show the code of how to do so?

1 goroutine per cpu core running listening in as server

many clients sending to different uds socket per core of the server

To Reproduce
commenting
//runtime.GOMAXPROCS(1)
in
shmipc_server example

Expected behavior
able to use multiple core

Environment:
ubuntu 22.04 go 1.22

i cant confirm this yet but running multiple different instances of shmipc listening to two different things on 1 process will OOM

Screenshot from 2023-08-12 18-18-34

i cant confirm this yet but running multiple different instances of shmipc listening to two different things on 1 process will OOM

sorry i cant confirm this yet but can you give an idea how to "work around"? is this an issue?

i've ran multiple shmipc go on 1 process and got oom doing so because of "memory leak". it seems to be using a lot of memory without releasing

better documentation on usage etc.

how to silence internal logger or any other displayed information?

how to set the setting for it to be quiet for warnf etc in the examples?

hello world example bug

running as client shows the error below:

./main

client stream send request:client say hello world!!!
server receive request message:client say hello world!!!
server reply response: server hello world!!!
client stream receive response server hello world!!!
Error 2023-04-12 23:37:01.27175 session.go:369 server session /dev/shm/client.ipc.shm_2133964_queue_0 exitErr:connection was reset by peer. localAddress:/home/ubuntu/Documents/Programs/go/shmipc-go/example/helloworld/ipc_test.sock remoteAddress:@
Warn 2023-04-12 23:37:01.271935 buffer_manager.go:252  clean buffer manager:/dev/shm/client.ipc.shm_2133964_buffer

Support for sending metrics to Prometheus

Is your feature request related to a problem? Please describe.

目前Monitor 接口没有具体实现,无法获得程序运行时的监控指标

Describe the solution you'd like

type Monitor interface {

   OnEmitSessionMetrics(PerformanceMetrics, StabilityMetrics, ShareMemoryMetrics, *Session)

   Flush() error
}

实现 Monitor 接口,其中OnEmitSessionMetrics 方法发送metrics 到prometheus , Flush() 方法将metrics写到本地日志文件

Describe alternatives you've considered
不太确定用那种实现方式

1 在业务代码里面实现Monitor 接口,然后提供一个使用的example,放在example路径中
2 直接在shmipc-go 库里实现,对外提供接口的实现,写一个使用的example

Additional context

latency seem so high

shm server

package server

import (
	"fmt"
	"github.com/cloudwego/shmipc-go"
	"net"
	"os"
	"path/filepath"
	"sync"
	"syscall"
)

var (
	_ shmipc.StreamCallbacks = &streamCbImpl{}
	_ shmipc.ListenCallback  = &listenCbImpl{}
)

type Server struct {
	udsPath string

	netUnixListener *net.UnixListener
	conn            net.Conn
	shmipcServer    *shmipc.Session
	shmipcStream    *shmipc.Stream

	shutdownErrStr string
	mu             sync.Mutex
	callback       shmipc.ListenCallback
	unlinkOnClose  bool
	isClose        bool
}

func New() *Server {
	log.Init("server_shmipc.log", log.DebugLevel,
		log.SetStdout(true),
		log.SetCaller(true),
		log.SetMaxBackups(1),
	)
	return &Server{}
}

func (s *Server) Init() {
	dir, err := os.Getwd()
	if err != nil {
		log.Errorf("getwd err: %s", err.Error())
		return
	}

	s.udsPath = filepath.Join(dir, fmt.Sprintf("%s/%s", market_gateway.UnixDomainSocketFolder, market_gateway.UnixDomainSocketFile))

	_ = syscall.Unlink(s.udsPath)

	shmipc.SetLogLevel(0)

	config := shmipc.NewDefaultListenerConfig(s.udsPath, "unix")
	config.ShareMemoryBufferCap = 32 << 20
	config.ShareMemoryPathPrefix = fmt.Sprintf("%s/server.ipc.shm", market_gateway.IPCShmPathFolder)
	config.QueuePath = fmt.Sprintf("%s/server.ipc.shm_queue", market_gateway.IPCShmPathFolder)
	ln, err := shmipc.NewListener(&listenCbImpl{}, config)
	if err != nil {
		log.Error(err)
		return
	}

	log.Infof("unix listener:%s", s.udsPath)
	ln.Run()
}


type listenCbImpl struct{}

// 客户端连接进来stream
func (l *listenCbImpl) OnNewStream(s *shmipc.Stream) {
	s.SetCallbacks(&streamCbImpl{stream: s})
}

//
func (l *listenCbImpl) OnShutdown(reason string) {
	log.Error("OnShutdown reason:" + reason)
}

type streamCbImpl struct {
	req    idl.Request
	resp   idl.Response
	stream *shmipc.Stream
}

// 收到客户端数据
func (s *streamCbImpl) OnData(reader shmipc.BufferReader) {
	//1.deserialize Request
	if err := s.req.ReadFromShm(reader); err != nil {
		fmt.Println("stream read request, err=" + err.Error())
		return
	}

	// 解析客户端请求什么数据
	// ID, Name, Key
	// 将其注册到行情内存channel中
	// 启动转发routine,从channel中获取数据

	now := time.Now()
	//3.serialize Response
	s.resp.ID =  uint64(now.UnixNano())// s.req.ID
	s.resp.Name = s.req.Name
	s.resp.Image = []byte(strconv.Itoa(int(now.UnixNano()-int64(s.req.ID))))
	if err := s.resp.WriteToShm(s.stream.BufferWriter()); err != nil {
		fmt.Println("stream write response failed, err=" + err.Error())
		return
	}
	if err := s.stream.Flush(false); err != nil {
		fmt.Println("stream write response failed, err=" + err.Error())
		return
	}
	s.stream.ReleaseReadAndReuse()
	s.req.Reset()
	s.resp.Reset()
}

func (s *streamCbImpl) OnLocalClose() {
	log.Error("stream OnLocalClose")
}

func (s *streamCbImpl) OnRemoteClose() {
	log.Error("stream OnRemoteClose")
}


...
then
s := server.New()
s.Init()

shm client

package client

import (
	"fmt"
	"github.com/cloudwego/shmipc-go"
	"math/rand"
	"os"
	"path/filepath"
)

var (
	_ shmipc.StreamCallbacks = &StreamHander{}
)

type Client struct {
	prefix  string
	udsPath string

	shmipcCfg    *shmipc.SessionManagerConfig
	shmipcMgr    *shmipc.SessionManager
	shmipcStream *shmipc.Stream

	hander *StreamHander
}

func New(prefix string) *Client {
	log.Init(fmt.Sprintf("client_%s_shmipc.log", prefix), log.DebugLevel,
		log.SetStdout(true),
		log.SetCaller(true),
		log.SetMaxBackups(1),
	)
	return &Client{
		prefix:  prefix,
		udsPath: "",
	}
}

func (c *Client) Init() {
	dir, err := os.Getwd()
	if err != nil {
		log.Errorf("getwd err: %s", err.Error())
		return
	}
	//c.udsPath = filepath.Join(dir, fmt.Sprintf("%s/ipc_%s.sock", market_gateway.UnixDomainSocketFolder,c.prefix))
	c.udsPath = filepath.Join(dir, fmt.Sprintf("%s/%s", market_gateway.UnixDomainSocketFolder, market_gateway.UnixDomainSocketFile))
	shmipc.SetLogLevel(0)

	// 1.create client session manager
	c.shmipcCfg = shmipc.DefaultSessionManagerConfig()
	c.shmipcCfg.ShareMemoryPathPrefix = fmt.Sprintf("%s/client_%s.ipc.shm", market_gateway.IPCShmPathFolder, c.prefix)
	c.shmipcCfg.QueuePath = fmt.Sprintf("%s/client_%s.ipc.shm_queue", market_gateway.IPCShmPathFolder, c.prefix)

	c.shmipcCfg.MemMapType = shmipc.MemMapTypeMemFd
	c.shmipcCfg.Network = "unix"
	c.shmipcCfg.Address = c.udsPath
	c.shmipcCfg.ShareMemoryBufferCap = 32 << 20

	//if runtime.GOOS == "darwin" {
	//	conf.ShareMemoryPathPrefix = "/tmp/client.ipc.shm"
	//	conf.QueuePath = "/tmp/client.ipc.shm_queue"
	//}

	c.shmipcMgr, err = shmipc.NewSessionManager(c.shmipcCfg)
	if err != nil {
		panic("create client session failed, " + err.Error())
	}
	//defer s.Close()

	// 2.create stream
	c.shmipcStream, err = c.shmipcMgr.GetStream()
	if err != nil {
		panic("client open stream failed, " + err.Error())
	}
	//defer s.PutBack(stream)

	key := make([]byte, 1024)
	rand.Read(key)

	c.hander = NewStreamHander(key, c.shmipcStream, c.shmipcMgr)
	c.shmipcStream.SetCallbacks(c.hander)
}

func (c *Client) Send() {
	c.hander.Send()
}

func (c *Client) End() {
	if c.shmipcStream != nil {
		if c.shmipcMgr != nil {
			c.shmipcMgr.PutBack(c.shmipcStream)
			c.shmipcMgr.Close()
		}
	}
}


type StreamHander struct {
	req    idl.Request
	resp   idl.Response
	stream *shmipc.Stream
	smgr   *shmipc.SessionManager
	key    []byte
}

func NewStreamHander(key []byte, stream *shmipc.Stream, smgr *shmipc.SessionManager) *StreamHander {
	return &StreamHander{key: key, stream: stream, smgr: smgr}
}

func (s *StreamHander) OnData(reader shmipc.BufferReader) {
	//wait and read response
	s.resp.Reset()
	if err := s.resp.ReadFromShm(reader); err != nil {
		log.Error("write request to share memory failed, err=" + err.Error())
		return
	}
	diff := time.Now().UnixNano()-int64(s.resp.ID)
	log.Infof("server=>client %d ns, client=>server %s ns", diff, string(s.resp.Image))

	s.stream.ReleaseReadAndReuse()

}

func (s *StreamHander) Send() {

	now := time.Now()
	//serialize request
	s.req.Reset()
	s.req.ID = uint64(now.UnixNano())
	s.req.Name = "xxx"
	s.req.Key = s.key
	if err := s.req.WriteToShm(s.stream.BufferWriter()); err != nil {
		log.Error("write request to share memory failed, err=" + err.Error())
		return
	}
	if err := s.stream.Flush(false); err != nil {
		log.Error(" flush request to peer failed, err=" + err.Error())
		return
	}
}

func (s *StreamHander) OnLocalClose() {
	log.Info("stream OnLocalClose")
}

func (s *StreamHander) OnRemoteClose() {
	log.Info("stream OnRemoteClose")
}

then
c := client.New("0001")
	c.Init()
	for i := 0; i < 100; i++ {
		c.Send()
		<-time.After(time.Second)
	}
	<-time.After(time.Second)

result


server=>client 302563 ns, client=>server 149902 ns
server=>client 78659 ns, client=>server 311423 ns
server=>client 99438 ns, client=>server 201126 ns
server=>client 101603 ns, client=>server 254039 ns
server=>client 101192 ns, client=>server 229704 ns
server=>client 110421 ns, client=>server 214390 ns
server=>client 77879 ns, client=>server 258622 ns
server=>client 89274 ns, client=>server 195844 ns
server=>client 113707 ns, client=>server 241404 ns
server=>client 128838 ns, client=>server 202124 ns
server=>client 181284 ns, client=>server 257551 ns
server=>client 68382 ns, client=>server 213854 ns
server=>client 162655 ns, client=>server 240469 ns
server=>client 89941 ns, client=>server 274032 ns
server=>client 79828 ns, client=>server 170969 ns
server=>client 107270 ns, client=>server 106275 ns
server=>client 110980 ns, client=>server 196599 ns
server=>client 100005 ns, client=>server 315514 ns
server=>client 184491 ns, client=>server 295354 ns
server=>client 131333 ns, client=>server 240325 ns
server=>client 118292 ns, client=>server 254118 ns
server=>client 114306 ns, client=>server 276610 ns
server=>client 87094 ns, client=>server 235931 ns
server=>client 95147 ns, client=>server 126418 ns
server=>client 83500 ns, client=>server 262050 ns
server=>client 155115 ns, client=>server 139429 ns
server=>client 79613 ns, client=>server 214479 ns
server=>client 84174 ns, client=>server 127120 ns
server=>client 128731 ns, client=>server 251253 ns
server=>client 90009 ns, client=>server 213201 ns
server=>client 64554 ns, client=>server 187908 ns
server=>client 113250 ns, client=>server 346987 ns
server=>client 154018 ns, client=>server 180147 ns
server=>client 109506 ns, client=>server 276111 ns
server=>client 115490 ns, client=>server 184541 ns
server=>client 79395 ns, client=>server 196219 ns
server=>client 100075 ns, client=>server 191885 ns
server=>client 176090 ns, client=>server 212988 ns
server=>client 113911 ns, client=>server 140827 ns
server=>client 56777 ns, client=>server 113488 ns
server=>client 109094 ns, client=>server 138789 ns
server=>client 94613 ns, client=>server 146264 ns
server=>client 79668 ns, client=>server 200440 ns
server=>client 82570 ns, client=>server 235427 ns
server=>client 113162 ns, client=>server 264805 ns
server=>client 81321 ns, client=>server 190720 ns
server=>client 114324 ns, client=>server 192740 ns
server=>client 117009 ns, client=>server 181410 ns
server=>client 150933 ns, client=>server 201962 ns
server=>client 95808 ns, client=>server 247240 ns
server=>client 81515 ns, client=>server 162001 ns
server=>client 100433 ns, client=>server 187268 ns
server=>client 94080 ns, client=>server 183065 ns
server=>client 68143 ns, client=>server 182330 ns
server=>client 96020 ns, client=>server 171040 ns
server=>client 89387 ns, client=>server 195151 ns
server=>client 192450 ns, client=>server 177611 ns
server=>client 113053 ns, client=>server 150515 ns
server=>client 104233 ns, client=>server 193330 ns
server=>client 69314 ns, client=>server 194273 ns
server=>client 183809 ns, client=>server 318489 ns
server=>client 133550 ns, client=>server 311941 ns
server=>client 72557 ns, client=>server 190717 ns
server=>client 83506 ns, client=>server 214645 ns
server=>client 261849 ns, client=>server 189151 ns
server=>client 101423 ns, client=>server 300826 ns
server=>client 127147 ns, client=>server 155703 ns
server=>client 82517 ns, client=>server 209542 ns
server=>client 100622 ns, client=>server 181754 ns
server=>client 144928 ns, client=>server 217414 ns
server=>client 78824 ns, client=>server 188329 ns
server=>client 212554 ns, client=>server 244976 ns
server=>client 98345 ns, client=>server 160499 ns
server=>client 92774 ns, client=>server 133956 ns
server=>client 97873 ns, client=>server 214409 ns
server=>client 90826 ns, client=>server 204376 ns
server=>client 100275 ns, client=>server 168810 ns
server=>client 88030 ns, client=>server 239757 ns
server=>client 71838 ns, client=>server 114075 ns
server=>client 83410 ns, client=>server 184958 ns
server=>client 140440 ns, client=>server 154009 ns
server=>client 142377 ns, client=>server 230575 ns
server=>client 91591 ns, client=>server 226697 ns
server=>client 77744 ns, client=>server 124703 ns
server=>client 145887 ns, client=>server 196064 ns
server=>client 86124 ns, client=>server 156900 ns
server=>client 76476 ns, client=>server 188798 ns
server=>client 127490 ns, client=>server 126566 ns
server=>client 107608 ns, client=>server 208988 ns
server=>client 92043 ns, client=>server 176982 ns
server=>client 147985 ns, client=>server 263178 ns
server=>client 123474 ns, client=>server 151468 ns
server=>client 91433 ns, client=>server 382726 ns
server=>client 155142 ns, client=>server 1198213 ns
server=>client 119104 ns, client=>server 348680 ns
server=>client 89418 ns, client=>server 225937 ns
server=>client 66334 ns, client=>server 142855 ns
server=>client 85201 ns, client=>server 118376 ns
server=>client 83037 ns, client=>server 212686 ns
server=>client 79710 ns, client=>server 137516 ns

is this production ready?

seems good but working out some edge case on my side and still doing some testing work.

is this production ready? what hiccups do i have to look out for?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.