大道至简,简即是繁
- 个人网站: https://colobu.com
- 最好的Go微服务框架: https://rpcx.io
- rpcx官方博客: https://blog.rpcx.io
unbounded chan
License: MIT License
大道至简,简即是繁
ref: https://github.com/tikv/pd/actions/runs/3776529907/jobs/6419849603
2022-12-25T16:09:45.9555131Z Goroutine 179341 in state chan receive, with github.com/smallnest/chanx.process[...] on top of the stack:
2022-12-25T16:09:45.9555252Z goroutine 179341 [chan receive]:
2022-12-25T16:09:45.9555427Z github.com/smallnest/chanx.process[...](0xc0880dccc0, 0xc0880dcd80, 0xc08872fb40)
2022-12-25T16:09:45.9555654Z /home/runner/go/pkg/mod/github.com/smallnest/[email protected]/unbounded_chan.go:55 +0xd8
2022-12-25T16:09:45.9555842Z created by github.com/smallnest/chanx.NewUnboundedChanSize[...]
2022-12-25T16:09:45.9556065Z /home/runner/go/pkg/mod/github.com/smallnest/[email protected]/unbounded_chan.go:46 +0x41e
2022-12-25T16:09:45.9556071Z
2022-12-25T16:09:45.9556329Z Goroutine 179778 in state chan receive, with github.com/smallnest/chanx.process[...] on top of the stack:
2022-12-25T16:09:45.9556442Z goroutine 179778 [chan receive]:
2022-12-25T16:09:45.9556690Z github.com/smallnest/chanx.process[...](0xc08a5f0cc0, 0xc08a5f0d20, 0xc08c56a1e0)
2022-12-25T16:09:45.9556921Z /home/runner/go/pkg/mod/github.com/smallnest/[email protected]/unbounded_chan.go:55 +0xd8
2022-12-25T16:09:45.9557096Z created by github.com/smallnest/chanx.NewUnboundedChanSize[...]
2022-12-25T16:09:45.9557323Z /home/runner/go/pkg/mod/github.com/smallnest/[email protected]/unbounded_chan.go:46 +0x41e
2022-12-25T16:09:45.9557397Z ]
Consider the following test:
func TestDataRace(t *testing.T) {
ch := NewUnboundedChan(1)
stop := make(chan bool)
for i := 0; i < 100; i++ { // may tweak the number of iterations
go func() {
for {
select {
case <-stop:
return
default:
ch.In <- 42
<-ch.Out
}
}
}()
}
for i := 0; i < 10000; i++ { // may tweak the number of iterations
ch.Len()
}
close(stop)
}
The above test results in the following data race:
$ go test -run=DataRace -race
==================
WARNING: DATA RACE
Read at 0x00c0001309a8 by goroutine 7:
github.com/smallnest/chanx.(*RingBuffer).Len()
/Users/changkun/dev/changkun.de/chanx/ringbuffer.go:110 +0x118
github.com/smallnest/chanx.UnboundedChan.Len()
/Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:18 +0xf4
github.com/smallnest/chanx.TestChanDataRace()
/Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:191 +0xf0
testing.tRunner()
/Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
Previous write at 0x00c0001309a8 by goroutine 8:
github.com/smallnest/chanx.(*RingBuffer).Read()
/Users/changkun/dev/changkun.de/chanx/ringbuffer.go:42 +0x12c
github.com/smallnest/chanx.(*RingBuffer).Pop()
/Users/changkun/dev/changkun.de/chanx/ringbuffer.go:51 +0x6c
github.com/smallnest/chanx.process()
/Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:72 +0x460
Goroutine 7 (running) created at:
testing.(*T).Run()
/Users/changkun/goes/go/src/testing/testing.go:1289 +0x5b8
testing.runTests.func1()
/Users/changkun/goes/go/src/testing/testing.go:1581 +0xac
testing.tRunner()
/Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
testing.runTests()
/Users/changkun/goes/go/src/testing/testing.go:1579 +0x780
testing.(*M).Run()
/Users/changkun/goes/go/src/testing/testing.go:1487 +0x928
main.main()
_testmain.go:55 +0x288
Goroutine 8 (running) created at:
github.com/smallnest/chanx.NewUnboundedChanSize()
/Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:40 +0x1b0
github.com/smallnest/chanx.NewUnboundedChan()
/Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:31 +0x3c
github.com/smallnest/chanx.TestChanDataRace()
/Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:174 +0x28
testing.tRunner()
/Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
==================
==================
WARNING: DATA RACE
Read at 0x00c0001309b0 by goroutine 7:
github.com/smallnest/chanx.(*RingBuffer).Len()
/Users/changkun/dev/changkun.de/chanx/ringbuffer.go:110 +0x134
github.com/smallnest/chanx.UnboundedChan.Len()
/Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:18 +0xf4
github.com/smallnest/chanx.TestChanDataRace()
/Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:191 +0xf0
testing.tRunner()
/Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
Previous write at 0x00c0001309b0 by goroutine 8:
github.com/smallnest/chanx.(*RingBuffer).Write()
/Users/changkun/dev/changkun.de/chanx/ringbuffer.go:70 +0xec
github.com/smallnest/chanx.process()
/Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:69 +0x44c
Goroutine 7 (running) created at:
testing.(*T).Run()
/Users/changkun/goes/go/src/testing/testing.go:1289 +0x5b8
testing.runTests.func1()
/Users/changkun/goes/go/src/testing/testing.go:1581 +0xac
testing.tRunner()
/Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
testing.runTests()
/Users/changkun/goes/go/src/testing/testing.go:1579 +0x780
testing.(*M).Run()
/Users/changkun/goes/go/src/testing/testing.go:1487 +0x928
main.main()
_testmain.go:55 +0x288
Goroutine 8 (running) created at:
github.com/smallnest/chanx.NewUnboundedChanSize()
/Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:40 +0x1b0
github.com/smallnest/chanx.NewUnboundedChan()
/Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:31 +0x3c
github.com/smallnest/chanx.TestChanDataRace()
/Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:174 +0x28
testing.tRunner()
/Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
==================
==================
WARNING: DATA RACE
Read at 0x00c0001309a0 by goroutine 7:
github.com/smallnest/chanx.(*RingBuffer).Len()
/Users/changkun/dev/changkun.de/chanx/ringbuffer.go:118 +0x1ac
github.com/smallnest/chanx.UnboundedChan.Len()
/Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:18 +0xf4
github.com/smallnest/chanx.TestChanDataRace()
/Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:191 +0xf0
testing.tRunner()
/Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
Previous write at 0x00c0001309a0 by goroutine 8:
github.com/smallnest/chanx.(*RingBuffer).grow()
/Users/changkun/dev/changkun.de/chanx/ringbuffer.go:96 +0x3bc
github.com/smallnest/chanx.(*RingBuffer).Write()
/Users/changkun/dev/changkun.de/chanx/ringbuffer.go:77 +0x18c
github.com/smallnest/chanx.process()
/Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:69 +0x44c
Goroutine 7 (running) created at:
testing.(*T).Run()
/Users/changkun/goes/go/src/testing/testing.go:1289 +0x5b8
testing.runTests.func1()
/Users/changkun/goes/go/src/testing/testing.go:1581 +0xac
testing.tRunner()
/Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
testing.runTests()
/Users/changkun/goes/go/src/testing/testing.go:1579 +0x780
testing.(*M).Run()
/Users/changkun/goes/go/src/testing/testing.go:1487 +0x928
main.main()
_testmain.go:55 +0x288
Goroutine 8 (running) created at:
github.com/smallnest/chanx.NewUnboundedChanSize()
/Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:40 +0x1b0
github.com/smallnest/chanx.NewUnboundedChan()
/Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:31 +0x3c
github.com/smallnest/chanx.TestChanDataRace()
/Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:174 +0x28
testing.tRunner()
/Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
==================
--- FAIL: TestChanDataRace (0.01s)
testing.go:1135: race detected during execution of test
FAIL
exit status 1
FAIL github.com/smallnest/chanx 0.194s
I think providing a Len()
method in abstracting an unbounded channel needs more careful handling (Mutex or lock-free pattern via an external atomic counter similar to the runtime channel implementation).
The self-created internal buffer for an unbounded channel is maintained in a separate goroutine, a read via len()
creates a race condition with any writes with respect to that buffer.
Lines 54 to 66 in 0aaad01
Why not Peek
first and then Pop
?
There is a possibility that out
did not receive this Pop data
because ctx.Done
was triggered first after Pop from buf
.
package main
import (
"fmt"
"github.com/smallnest/chanx"
"time"
)
func main() {
ch := chanx.NewUnboundedChan(1)
go func() {
for i := 0; i < 100; i++ {
ch.In <- i
}
fmt.Println("push ok")
close(ch.In) // close In channel
}()
time.Sleep(1 * time.Second)
for v := range ch.Out { // read values
fmt.Println(v)
}
}
报错
panic: runtime error: index out of range [1] with length 1
goroutine 6 [running]:
github.com/smallnest/chanx.(*RingBuffer).Write(0xc000072040, 0x10b04e0, 0x1158910)
/Users/vearne/Documents/gopath/pkg/mod/github.com/smallnest/[email protected]/ringbuffer.go:61 +0x235
github.com/smallnest/chanx.NewUnboundedChan.func1(0xc00004c1e0, 0xc00004c180, 0xc00004c180, 0xc00004c1e0, 0xc000072040)
/Users/vearne/Documents/gopath/pkg/mod/github.com/smallnest/[email protected]/unbounded_chan.go:59 +0x233
created by github.com/smallnest/chanx.NewUnboundedChan
/Users/vearne/Documents/gopath/pkg/mod/github.com/smallnest/[email protected]/unbounded_chan.go:35 +0x112
exit status 2
// from
func (r *RingBuffer) Read() (T, error) {
if r.r == r.w {
return nil, ErrIsEmpty
}
v := r.buf[r.r]
r.r++
if r.r == r.size {
r.r = 0
}
return v, nil
}
func (r *RingBuffer) Pop() T {
if r.r == r.w { // Empty
panic(ErrIsEmpty.Error())
}
v := r.buf[r.r]
r.r++
if r.r == r.size {
r.r = 0
}
return v
}
// to
func (r *RingBuffer) Pop() T {
v, err := r.Read()
if err == ErrIsEmpty { // Empty
panic(ErrIsEmpty.Error())
}
return v
}
Hello @smallnest! Can you please create a tag with all latest changes. Current latest tag is too old. Many thanks!
Line 65 in 14a2dcc
Line 54 in 2c31397
执行cancel函数之后,没有执行<-cctx.Done()的可能。
一个场景,在业务高峰期造成ringbuffer grow太大,是否可以周期性缩容? 当没有数据时,索性直接实例化一个新的ringbuffer。
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.