Giter Site home page Giter Site logo

swap-buffer-queue's Introduction

swap-buffer-queue

License Cargo Documentation

A buffering MPSC queue.

This library is intended to be a (better, I hope) alternative to traditional MPSC queues in the context of a buffering consumer, by moving the buffering part directly into the queue.

It is especially well suited for IO writing workflow, see buffer implementations.

The crate is no_std – some buffer implementations may require alloc crate.

In addition to the low level Queue implementation, a higher level SynchronizedQueue is provided with both blocking and asynchronous methods. Synchronization feature requires std.

Example

use std::ops::Deref;
use swap_buffer_queue::{buffer::{IntoValueIter, VecBuffer}, Queue};

// Initialize the queue with a capacity
let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
// Enqueue some value
queue.try_enqueue([0]).unwrap();
// Multiple values can be enqueued at the same time
// (optimized compared to multiple enqueuing)
queue.try_enqueue([1, 2]).unwrap();
let mut values = vec![3, 4];
queue
    .try_enqueue(values.drain(..).into_value_iter())
    .unwrap();
// Dequeue a slice to the enqueued values
let slice = queue.try_dequeue().unwrap();
assert_eq!(slice.deref(), &[0, 1, 2, 3, 4]);
// Enqueued values can also be retrieved
assert_eq!(slice.into_iter().collect::<Vec<_>>(), vec![0, 1, 2, 3, 4]);

Buffer implementations

In addition to simple ArrayBuffer and VecBuffer, this crate provides useful write-oriented implementations.

WriteArrayBuffer and WriteVecBuffer are well suited when there are objects to be serialized with a known-serialization size. Indeed, objects can then be serialized directly on the queue's buffer, avoiding allocation.

use std::io::Write;
use swap_buffer_queue::{Queue, write::{WriteBytesSlice, WriteVecBuffer}};

// Creates a WriteVecBuffer queue with a 2-bytes header
let queue: Queue<WriteVecBuffer<2>> = Queue::with_capacity((1 << 16) - 1);
queue
    .try_enqueue((256, |slice: &mut [u8]| { /* write the slice */ }))
    .unwrap();
queue
    .try_enqueue((42, |slice: &mut [u8]| { /* write the slice */ }))
    .unwrap();
let mut slice = queue.try_dequeue().unwrap();
// Adds a header with the len of the buffer
let len = (slice.len() as u16).to_be_bytes();
slice.header().copy_from_slice(&len);
// Let's pretend we have a writer
let mut writer: Vec<u8> = Default::default();
assert_eq!(writer.write(slice.frame()).unwrap(), 300);

WriteVectoredArrayBuffer and WriteVectoredVecBuffer allows buffering a slice of IoSlice, saving the cost of dequeuing io-slices one by one to collect them after. (Internally, two buffers are used, one of the values, and one for the io-slices)

As a convenience, total size of the buffered io-slices can be retrieved.

use std::io::{Write};
use swap_buffer_queue::{Queue, write_vectored::WriteVectoredVecBuffer};

// Creates a WriteVectoredVecBuffer queue
let queue: Queue<WriteVectoredVecBuffer<Vec<u8>>, Vec<u8>> = Queue::with_capacity(100);
queue.try_enqueue([vec![0; 256]]).unwrap();
queue.try_enqueue([vec![42; 42]]).unwrap();
let mut total_size = 0u16.to_be_bytes();
let mut slice = queue.try_dequeue().unwrap();
// Adds a header with the total size of the slices
total_size.copy_from_slice(&(slice.total_size() as u16).to_be_bytes());
let mut frame = slice.frame(.., Some(&total_size), None);
// Let's pretend we have a writer
let mut writer: Vec<u8> = Default::default();
assert_eq!(writer.write_vectored(&mut frame).unwrap(), 300);

How it works

Internally, this queue use 2 buffers: one being used for enqueuing while the other is dequeued.

When Queue::try_enqueue is called, it reserves atomically a slot in the current enqueuing buffer. The value is then inserted in the slot.

When Queue::try_dequeue is called, both buffers are swapped atomically, so dequeued buffer will contain previously enqueued values, and new enqueued ones will go to the other (empty) buffer.

As the two-phase enqueuing cannot be atomic, the queue can be in a transitory state, where slots have been reserved but have not been written yet. In this rare case, dequeuing will fail and have to be retried.

Fairness

SynchronizedQueue implementation is not fair, i.e. it doesn't ensure that the oldest blocked enqueuer will succeed when the capacity becomes available.

However, this issue is quite mitigated by the fact that all the capacity becomes available at once, so all blocked enqueuers may succeed (especially with one-sized values).

For the particular case of potential big variable-sized values, it's still possible to combine the queue with a semaphore, e.g. tokio::sync::Semaphore. Performance will be impacted, but the algorithm is fast enough to afford it.

I'm still thinking about a way to include fairness directly in the algorithm, but it's not an easy thing to do.

Unsafe

This library uses unsafe code, for three reasons:

  • buffers are wrapped in UnsafeCell to allow mutable for the dequeued buffer;
  • buffers implementation may use unsafe to allow insertion with shared reference;
  • Buffer trait require unsafe interface for its invariant, because it's public.

To ensure the safety of the algorithm, it uses:

  • tests (mostly doctests for now, but it needs to be completed)
  • benchmarks
  • MIRI (with tests)

Loom is partially integrated for now, but loom tests are on the TODO list.

Performance

swap-buffer-queue is very performant – it's actually the fastest MPSC queue I know.

Here is the crossbeam benchmark forked

benchmark crossbeam swap-buffer-queue
bounded1_mpsc 1.545s 1.763s
bounded1_spsc 1.652s 1.000s
bounded_mpsc 0.362s 0.137s
bounded_seq 0.190s 0.114s
bounded_spsc 0.115s 0.092s

However, a large enough capacity is required to reach maximum performance; otherwise, high contention scenario may be penalized. This is because the algorithm put all the contention on a single atomic integer (instead of two for crossbeam).

swap-buffer-queue's People

Contributors

wyfo avatar

Stargazers

 avatar  avatar Swoorup Joshi avatar  avatar Celestin de Villa avatar Juxhin avatar Rail Khusnutdinov avatar Nova avatar fwcd avatar Akash Sankritya avatar Antoine Baché avatar Jacques Langner avatar neelabalan avatar  avatar Denny Wong avatar  avatar Calvin avatar Parsa avatar Kart avatar Dean Rikrik Ichsan Hakiki avatar Colum avatar Denis avatar Yanhao avatar  avatar Vincent Yang avatar  avatar  avatar Nilay Savant avatar batphonghan avatar Théodore Prévot avatar Nikita Salkin avatar acid avatar Jax avatar Max Justus Spransy avatar Paul Horn avatar Guilhem C. avatar  avatar unknowntrojan avatar Nikolai Skvortsov avatar Sebastian Thiel avatar Alex Lu avatar garvin wang avatar  avatar Artifex Maximus avatar Eric Ye avatar Henry John Kupty avatar Kiril Mihaylov avatar astrolemonade avatar Kyle L. Davis avatar Willi Kappler avatar  avatar James Chung avatar Sandalots avatar Sean Jensen-Grey avatar Andy Bao avatar  avatar Paul Masurel avatar Ilya Salauyeu avatar pasindu p konghawaththa avatar Stanislav Tkach avatar Amidamaru avatar Jocelyn Boullier avatar Chris avatar Marc Espin avatar  avatar Alexander Clausen avatar Lujeni avatar David M. Golembiowski avatar hanhotfox avatar tsingson avatar Masanori Ogino avatar Piotr Dulikowski avatar Yassir Barchi avatar 李冬冬 avatar

Watchers

 avatar  avatar

swap-buffer-queue's Issues

Fairness

Currently, synchronized enqueuing isn't fair. Buffer swapping kind of mitigates this issue, because of all the capacity becomes available at once, but in case of variable size, one big value could have to wait several swaps to be enqueued.

Removing ending dequeuing spinloop?

From reddit:

I see you use a spin loop in dequeuing. This reminds me of this conversation about Flume using a spinlock, and how that fights with the system scheduler...

Quick benchmark shows no performance difference without this spinloop, so it may be removed. By the way, there are additional backoff in synchronized implementation (which should explain why there is no difference with or without this particular spinloop)

Document `unsafe` with `SAFETY` comments and warn about usage of `unsafe` in README

From reddit:

amarao_san
I checked few places, and I see a lot of unsafe code which I have trouble to understand. May be I'm not that bright, but you don't have any SAFETY comments there proving to readers that it's safe to have unsafe in safe code. If you want your library to be really reviewed, you need to write proofs for each unsafe part. "SAFETY: We can do it because of this, and this, and it will uphold invariant which compiler can't check but we know because ..."

kinoshitajona

  1. unsafe blocks should have a Safety comment explaining how you uphold the invariants required for this unsafe code usage.
  2. unsafe fn or traits should have a Safety comment explaining what invariants the user must uphold.

https://std-dev-guide.rust-lang.org/documentation/safety-comments.html

https://rust-lang.github.io/api-guidelines/documentation.html#c-failure

Furthermore, I think it's good to warn users in README about the usage of unsafe in the library, so they are able to make a decision on if they want to pull unsafe code as a dependency.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.