Giter Site home page Giter Site logo

chanx's Issues

Why not Peek first and then Pop in drain ?

chanx/unbounded_chan.go

Lines 54 to 66 in 0aaad01

drain := func() {
for !ch.buffer.IsEmpty() {
select {
case out <- ch.buffer.Pop():
atomic.AddInt64(&ch.bufCount, -1)
case <-ctx.Done():
return
}
}
ch.buffer.Reset()
atomic.StoreInt64(&ch.bufCount, 0)
}

Why not Peek first and then Pop?
There is a possibility that out did not receive this Pop data because ctx.Done was triggered first after Pop from buf.

当初始大小为1时,报错

package main

import (
	"fmt"
	"github.com/smallnest/chanx"
	"time"
)

func main() {
	ch := chanx.NewUnboundedChan(1)

	go func() {
		for i := 0; i < 100; i++ {
			ch.In <- i
		}
		fmt.Println("push ok")
		close(ch.In) // close In channel
	}()

	time.Sleep(1 * time.Second)
	for v := range ch.Out { // read values
		fmt.Println(v)
	}
}

报错

panic: runtime error: index out of range [1] with length 1

goroutine 6 [running]:
github.com/smallnest/chanx.(*RingBuffer).Write(0xc000072040, 0x10b04e0, 0x1158910)
	/Users/vearne/Documents/gopath/pkg/mod/github.com/smallnest/[email protected]/ringbuffer.go:61 +0x235
github.com/smallnest/chanx.NewUnboundedChan.func1(0xc00004c1e0, 0xc00004c180, 0xc00004c180, 0xc00004c1e0, 0xc000072040)
	/Users/vearne/Documents/gopath/pkg/mod/github.com/smallnest/[email protected]/unbounded_chan.go:59 +0x233
created by github.com/smallnest/chanx.NewUnboundedChan
	/Users/vearne/Documents/gopath/pkg/mod/github.com/smallnest/[email protected]/unbounded_chan.go:35 +0x112
exit status 2

data race in Len() and BufLen()

Consider the following test:

func TestDataRace(t *testing.T) {
	ch := NewUnboundedChan(1)
	stop := make(chan bool)
	for i := 0; i < 100; i++ { // may tweak the number of iterations
		go func() {
			for {
				select {
				case <-stop:
					return
				default:
					ch.In <- 42
					<-ch.Out
				}
			}
		}()
	}

	for i := 0; i < 10000; i++ { // may tweak the number of iterations
		ch.Len()
	}
	close(stop)
}

The above test results in the following data race:

$ go test -run=DataRace -race
==================
WARNING: DATA RACE
Read at 0x00c0001309a8 by goroutine 7:
  github.com/smallnest/chanx.(*RingBuffer).Len()
      /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:110 +0x118
  github.com/smallnest/chanx.UnboundedChan.Len()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:18 +0xf4
  github.com/smallnest/chanx.TestChanDataRace()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:191 +0xf0
  testing.tRunner()
      /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198

Previous write at 0x00c0001309a8 by goroutine 8:
  github.com/smallnest/chanx.(*RingBuffer).Read()
      /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:42 +0x12c
  github.com/smallnest/chanx.(*RingBuffer).Pop()
      /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:51 +0x6c
  github.com/smallnest/chanx.process()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:72 +0x460

Goroutine 7 (running) created at:
  testing.(*T).Run()
      /Users/changkun/goes/go/src/testing/testing.go:1289 +0x5b8
  testing.runTests.func1()
      /Users/changkun/goes/go/src/testing/testing.go:1581 +0xac
  testing.tRunner()
      /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
  testing.runTests()
      /Users/changkun/goes/go/src/testing/testing.go:1579 +0x780
  testing.(*M).Run()
      /Users/changkun/goes/go/src/testing/testing.go:1487 +0x928
  main.main()
      _testmain.go:55 +0x288

Goroutine 8 (running) created at:
  github.com/smallnest/chanx.NewUnboundedChanSize()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:40 +0x1b0
  github.com/smallnest/chanx.NewUnboundedChan()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:31 +0x3c
  github.com/smallnest/chanx.TestChanDataRace()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:174 +0x28
  testing.tRunner()
      /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
==================
==================
WARNING: DATA RACE
Read at 0x00c0001309b0 by goroutine 7:
  github.com/smallnest/chanx.(*RingBuffer).Len()
      /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:110 +0x134
  github.com/smallnest/chanx.UnboundedChan.Len()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:18 +0xf4
  github.com/smallnest/chanx.TestChanDataRace()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:191 +0xf0
  testing.tRunner()
      /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198

Previous write at 0x00c0001309b0 by goroutine 8:
  github.com/smallnest/chanx.(*RingBuffer).Write()
      /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:70 +0xec
  github.com/smallnest/chanx.process()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:69 +0x44c

Goroutine 7 (running) created at:
  testing.(*T).Run()
      /Users/changkun/goes/go/src/testing/testing.go:1289 +0x5b8
  testing.runTests.func1()
      /Users/changkun/goes/go/src/testing/testing.go:1581 +0xac
  testing.tRunner()
      /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
  testing.runTests()
      /Users/changkun/goes/go/src/testing/testing.go:1579 +0x780
  testing.(*M).Run()
      /Users/changkun/goes/go/src/testing/testing.go:1487 +0x928
  main.main()
      _testmain.go:55 +0x288

Goroutine 8 (running) created at:
  github.com/smallnest/chanx.NewUnboundedChanSize()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:40 +0x1b0
  github.com/smallnest/chanx.NewUnboundedChan()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:31 +0x3c
  github.com/smallnest/chanx.TestChanDataRace()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:174 +0x28
  testing.tRunner()
      /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
==================
==================
WARNING: DATA RACE
Read at 0x00c0001309a0 by goroutine 7:
  github.com/smallnest/chanx.(*RingBuffer).Len()
      /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:118 +0x1ac
  github.com/smallnest/chanx.UnboundedChan.Len()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:18 +0xf4
  github.com/smallnest/chanx.TestChanDataRace()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:191 +0xf0
  testing.tRunner()
      /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198

Previous write at 0x00c0001309a0 by goroutine 8:
  github.com/smallnest/chanx.(*RingBuffer).grow()
      /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:96 +0x3bc
  github.com/smallnest/chanx.(*RingBuffer).Write()
      /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:77 +0x18c
  github.com/smallnest/chanx.process()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:69 +0x44c

Goroutine 7 (running) created at:
  testing.(*T).Run()
      /Users/changkun/goes/go/src/testing/testing.go:1289 +0x5b8
  testing.runTests.func1()
      /Users/changkun/goes/go/src/testing/testing.go:1581 +0xac
  testing.tRunner()
      /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
  testing.runTests()
      /Users/changkun/goes/go/src/testing/testing.go:1579 +0x780
  testing.(*M).Run()
      /Users/changkun/goes/go/src/testing/testing.go:1487 +0x928
  main.main()
      _testmain.go:55 +0x288

Goroutine 8 (running) created at:
  github.com/smallnest/chanx.NewUnboundedChanSize()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:40 +0x1b0
  github.com/smallnest/chanx.NewUnboundedChan()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:31 +0x3c
  github.com/smallnest/chanx.TestChanDataRace()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:174 +0x28
  testing.tRunner()
      /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
==================
--- FAIL: TestChanDataRace (0.01s)
    testing.go:1135: race detected during execution of test
FAIL
exit status 1
FAIL    github.com/smallnest/chanx      0.194s

I think providing a Len() method in abstracting an unbounded channel needs more careful handling (Mutex or lock-free pattern via an external atomic counter similar to the runtime channel implementation).
The self-created internal buffer for an unbounded channel is maintained in a separate goroutine, a read via len() creates a race condition with any writes with respect to that buffer.

feature: 加入ringbuffer的缩容机制

一个场景,在业务高峰期造成ringbuffer grow太大,是否可以周期性缩容? 当没有数据时,索性直接实例化一个新的ringbuffer。

RingBuffer的Pop方法是不是可以调用Read来实现呢?

// from
func (r *RingBuffer) Read() (T, error) {
	if r.r == r.w {
		return nil, ErrIsEmpty
	}

	v := r.buf[r.r]
	r.r++
	if r.r == r.size {
		r.r = 0
	}

	return v, nil
}

func (r *RingBuffer) Pop() T {
	if r.r == r.w { // Empty
		panic(ErrIsEmpty.Error())
	}

	v := r.buf[r.r]
	r.r++
	if r.r == r.size {
		r.r = 0
	}

	return v
}

// to
func (r *RingBuffer) Pop() T {
        v, err := r.Read()
	if err == ErrIsEmpty { // Empty
		panic(ErrIsEmpty.Error())
	}

	return v
}

potential goroutine leaks

ref: https://github.com/tikv/pd/actions/runs/3776529907/jobs/6419849603

2022-12-25T16:09:45.9555131Z  Goroutine 179341 in state chan receive, with github.com/smallnest/chanx.process[...] on top of the stack:
2022-12-25T16:09:45.9555252Z goroutine 179341 [chan receive]:
2022-12-25T16:09:45.9555427Z github.com/smallnest/chanx.process[...](0xc0880dccc0, 0xc0880dcd80, 0xc08872fb40)
2022-12-25T16:09:45.9555654Z 	/home/runner/go/pkg/mod/github.com/smallnest/[email protected]/unbounded_chan.go:55 +0xd8
2022-12-25T16:09:45.9555842Z created by github.com/smallnest/chanx.NewUnboundedChanSize[...]
2022-12-25T16:09:45.9556065Z 	/home/runner/go/pkg/mod/github.com/smallnest/[email protected]/unbounded_chan.go:46 +0x41e
2022-12-25T16:09:45.9556071Z 
2022-12-25T16:09:45.9556329Z  Goroutine 179778 in state chan receive, with github.com/smallnest/chanx.process[...] on top of the stack:
2022-12-25T16:09:45.9556442Z goroutine 179778 [chan receive]:
2022-12-25T16:09:45.9556690Z github.com/smallnest/chanx.process[...](0xc08a5f0cc0, 0xc08a5f0d20, 0xc08c56a1e0)
2022-12-25T16:09:45.9556921Z 	/home/runner/go/pkg/mod/github.com/smallnest/[email protected]/unbounded_chan.go:55 +0xd8
2022-12-25T16:09:45.9557096Z created by github.com/smallnest/chanx.NewUnboundedChanSize[...]
2022-12-25T16:09:45.9557323Z 	/home/runner/go/pkg/mod/github.com/smallnest/[email protected]/unbounded_chan.go:46 +0x41e
2022-12-25T16:09:45.9557397Z ]

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.