Giter Site home page Giter Site logo

ringbuffer's Introduction

ringbuffer

License GoDoc Go Report Card coveralls

A circular buffer (ring buffer) in Go, implemented io.ReaderWriter interface

wikipedia

Usage

package main

import (
	"fmt"

	"github.com/smallnest/ringbuffer"
)

func main() {
	rb := ringbuffer.New(1024)

	// write
	rb.Write([]byte("abcd"))
	fmt.Println(rb.Length())
	fmt.Println(rb.Free())

	// read
	buf := make([]byte, 4)
	rb.Read(buf)
	fmt.Println(string(buf))
}

It is possible to use an existing buffer with by replacing New with NewBuffer.

Blocking vs Non-blocking

The default behavior of the ring buffer is non-blocking, meaning that reads and writes will return immediately with an error if the operation cannot be completed. If you want to block when reading or writing, you must enable it:

	rb := ringbuffer.New(1024).SetBlocking(true)

Enabling blocking will cause the ring buffer to behave like a buffered io.Pipe.

Regular Reads will block until data is available, but not wait for a full buffer. Writes will block until there is space available and writes bigger than the buffer will wait for reads to make space.

TryRead and TryWrite are still available for non-blocking reads and writes.

To signify the end of the stream, close the ring buffer from the writer side with rb.CloseWriter()

Either side can use rb.CloseWithError(err error) to signal an error and close the ring buffer. Any reads or writes will return the error on next call.

In blocking mode errors are stateful and the same error will be returned until rb.Reset() is called.

io.Copy replacement

The ring buffer can replace io.Copy and io.CopyBuffer to do async copying through the ring buffer.

The copy operation will happen directly on the buffer, so between reads and writes there is no memory copy.

Here is a simple example where the copy operation is replaced by a ring buffer:

func saveWebsite(url, file string) {
    in, _ := http.Get(url)
    out, _ := os.Create(file)

    // Copy with regular buffered copy
    // n, err := io.Copy(out, in.Body)

    // Copy with ring buffer
    n, err := ringbuffer.New(1024).Copy(out, in.Body)
    fmt.Println(n, err)
}

The ring buffer implements io.ReaderFrom and io.WriterTo interfaces, which allows to fill either or both the write and read side respectively.

This will provide an async method for writing or reading directly into the ring buffer. These functions require that "blocking" is set on the pipe.

Example:

func readWebsite(url string) io.ReadCloser {
	in, _ := http.Get(url)

	// Create blocking ring buffer
	ring := ringbuffer.New(1024).SetBlocking(true)

	// Read from the input in a goroutine into the ring buffer
	go func() {
		ring.ReadFrom(in.Body)
		ring.CloseWriter()
	}()
	return ring.ReadCloser()
}

io.Pipe replacement

The ring buffer can be used as a compatible, but asynchronous replacement of io.Pipe.

That means that Reads and Writes will go to the ring buffer. Writes will complete as long as the data fits within the ring buffer.

Reads will attempt to satisfy reads with data from the ring buffer. The read will only block if the ring buffer is empty.

In the common case, where the Read and Write side can run concurrently, it is safe to replace io.Pipe() with (*Ringbuffer).Pipe().

Compare the following to the io.Pipe example:

func main() {
	// Create pipe from a 4KB ring buffer.
	r, w := ringbuffer.New(4 << 10).Pipe()

	go func() {
		fmt.Fprint(w, "some io.Reader stream to be read\n")
		w.Close()
	}()

	if _, err := io.Copy(os.Stdout, r); err != nil {
		log.Fatal(err)
	}
}

When creating the pipe, the ring buffer is internally switched to blocking mode.

Error reporting on Close and CloseWithError functions is similar to io.Pipe.

It is possible to use the original ring buffer alongside the pipe functions. So for example it is possible to "seed" the ring buffer with data, so reads can complete at once.

ringbuffer's People

Contributors

alessiodallapiazza avatar klauspost avatar lnnujxxy avatar panjf2000 avatar simonwu-os avatar smallnest avatar testwill avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

ringbuffer's Issues

Add Peek Method for Non-Destructive Buffer Reads

I would like to propose the addition of a Peek method to the RingBuffer implementation. This method allows reading from the buffer without consuming the data, which is useful for scenarios where the buffer needs to be read multiple times without data loss.

Here is the proposed code for the Peek method:

// Peek reads up to len(p) bytes into p without moving the read pointer.
func (r *RingBuffer) Peek(p []byte) (n int, err error) {
    if len(p) == 0 {
        return 0, r.readErr(false)
    }

    r.mu.Lock()
    defer r.mu.Unlock()
    if err := r.readErr(true); err != nil {
        return 0, err
    }

    return r.peek(p)
}

func (r *RingBuffer) peek(p []byte) (n int, err error) {
    if r.w == r.r && !r.isFull {
        return 0, ErrIsEmpty
    }

    if r.w > r.r {
        n = r.w - r.r
        if n > len(p) {
            n = len(p)
        }
        copy(p, r.buf[r.r:r.r+n])
        return
    }

    n = r.size - r.r + r.w
    if n > len(p) {
        n = len(p)
    }

    if r.r+n <= r.size {
        copy(p, r.buf[r.r:r.r+n])
    } else {
        c1 := r.size - r.r
        copy(p, r.buf[r.r:r.size])
        c2 := n - c1
        copy(p[c1:], r.buf[0:c2])
    }

    return n, r.readErr(true)
}

Test Case:
Here is a test case for the new Peek method:

func TestRingBuffer_Peek(t *testing.T) {
    rb := New(10)
    data := []byte("hello")
    rb.Write(data)

    buf := make([]byte, len(data))
    n, err := rb.Peek(buf)
    if err != nil {
        t.Fatalf("unexpected error: %v", err)
    }
    if n != len(data) {
        t.Fatalf("expected %d bytes, got %d", len(data), n)
    }
    if string(buf) != string(data) {
        t.Fatalf("expected %s, got %s", string(data), string(buf))
    }
}

If you agree I can open a PR.

异步读写Benchmark例子不合理

例如 BenchmarkRingBuffer_AsyncWrite 这个方法

...
for i := 0; i < b.N; i++ {
    rb.Read(buf)
}
...

大量的Read返回的是 0, ErrIsEmpty,因此不能很好反应该Buffer的实际性能。

rewrite it by go1.18 generic

泛型替换掉[]byte。而且新版本支持TryLock了,代码可以大幅度简化。
mu可以替换成读写锁,性能应该可以优化一些。

Write 方法存在bug

Write 方法在 ringbuffer.buf 尾部空间不足需要从头继续写的时候存在bug

在单元测试 TestRingBuffer_Write 函数中增加如下代码可以测出

func TestRingBuffer_Write(t *testing.T) {
       ......

	rb.Reset()
	// write 4 * 2 = 8 bytes
	n, err = rb.Write([]byte(strings.Repeat("abcd", 2)))
	if err != nil {
		t.Fatalf("write failed: %v", err)
	}
	if n != 8 {
		t.Fatalf("expect write 16 bytes but got %d", n)
	}
	if rb.Length() != 8 {
		t.Fatalf("expect len 16 bytes but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r)
	}
	if rb.Free() != 56 {
		t.Fatalf("expect free 48 bytes but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r)
	}
	buf := make([]byte, 5)
	rb.Read(buf)
	if rb.Length() != 3 {
		t.Fatalf("expect len 3 bytes but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r)
	}
	_, err = rb.Write([]byte(strings.Repeat("abcd", 15)))

	if bytes.Compare(rb.Bytes(), []byte("bcd" + strings.Repeat("abcd", 15))) != 0 {
		t.Fatalf("expect 63 ... but got %s. r.w=%d, r.r=%d", rb.Bytes(), rb.w, rb.r)
	}
}

在 ring_buffer.go 的 135 行, 如下修改可以修复

- copy(r.buf[0:], p[c1+1:])
+ copy(r.buf[0:], p[c1:])

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.