Giter Site home page Giter Site logo

mpmcqueue's Introduction

MPMCQueue.h

C/C++ CI License

A bounded multi-producer multi-consumer concurrent queue written in C++11.

It's battle hardened and used daily in production:

It's been cited by the following papers:

  • Peizhao Ou and Brian Demsky. 2018. Towards understanding the costs of avoiding out-of-thin-air results. Proc. ACM Program. Lang. 2, OOPSLA, Article 136 (October 2018), 29 pages. DOI: https://doi.org/10.1145/3276506

Example

MPMCQueue<int> q(10);
auto t1 = std::thread([&] {
  int v;
  q.pop(v);
  std::cout << "t1 " << v << "\n";
});
auto t2 = std::thread([&] {
  int v;
  q.pop(v);
  std::cout << "t2 " << v << "\n";
});
q.push(1);
q.push(2);
t1.join();
t2.join();

Usage

  • MPMCQueue<T>(size_t capacity);

    Constructs a new MPMCQueue holding items of type T with capacity capacity.

  • void emplace(Args &&... args);

    Enqueue an item using inplace construction. Blocks if queue is full.

  • bool try_emplace(Args &&... args);

    Try to enqueue an item using inplace construction. Returns true on success and false if queue is full.

  • void push(const T &v);

    Enqueue an item using copy construction. Blocks if queue is full.

  • template <typename P> void push(P &&v);

    Enqueue an item using move construction. Participates in overload resolution only if std::is_nothrow_constructible<T, P&&>::value == true. Blocks if queue is full.

  • bool try_push(const T &v);

    Try to enqueue an item using copy construction. Returns true on success and false if queue is full.

  • template <typename P> bool try_push(P &&v);

    Try to enqueue an item using move construction. Participates in overload resolution only if std::is_nothrow_constructible<T, P&&>::value == true. Returns true on success and false if queue is full.

  • void pop(T &v);

    Dequeue an item by copying or moving the item into v. Blocks if queue is empty.

  • bool try_pop(T &v);

    Try to dequeue an item by copying or moving the item into v. Return true on sucess and false if the queue is empty.

  • ssize_t size();

    Returns the number of elements in the queue.

    The size can be negative when the queue is empty and there is at least one reader waiting. Since this is a concurrent queue the size is only a best effort guess until all reader and writer threads have been joined.

  • bool empty();

    Returns true if the queue is empty.

    Since this is a concurrent queue this is only a best effort guess until all reader and writer threads have been joined.

All operations except construction and destruction are thread safe.

Implementation

Memory layout

Enqeue:

  1. Acquire next write ticket from head.
  2. Wait for our turn (2 * (ticket / capacity)) to write slot (ticket % capacity).
  3. Set turn = turn + 1 to inform the readers we are done writing.

Dequeue:

  1. Acquire next read ticket from tail.
  2. Wait for our turn (2 * (ticket / capacity) + 1) to read slot (ticket % capacity).
  3. Set turn = turn + 1 to inform the writers we are done reading.

References:

Testing

Testing concurrency algorithms is hard. I'm using two approaches to test the implementation:

  • A single threaded test that the functionality works as intended, including that the element constructor and destructor is invoked correctly.
  • A multithreaded fuzz test that all elements are enqueued and dequeued correctly under heavy contention.

TODO

  • Add allocator supports so that the queue could be used with huge pages and shared memory
  • Add benchmarks and compare to boost::lockfree::queue and others
  • Use C++20 concepts instead of static_assert if available
  • Use std::hardware_destructive_interference_size if available
  • Add API for zero-copy deqeue and batch dequeue operations
  • Add [[nodiscard]] attributes

About

This project was created by Erik Rigtorp <[email protected]>.

mpmcqueue's People

Contributors

jonecg avatar rigtorp avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

mpmcqueue's Issues

Congestion issues with pop and push

Hello,

I was benchmarking your MPMCQueue with https://gist.github.com/TurpentineDistillery/cba204646e631a3eeda5b06cac595fde on my Ryzen R9 3900x (windows 10).
And I noticed that the MPMCQueue has struggles with the last benchmark (128 producers and 128 consumer). I test with 24/24 and it is still slow.

So with this benchmak I tried to use:
template
void push(rigtorp::MPMCQueue& q, T value)
{
while(!q.try_push(value)) {
mt::min_sleep();
}
}

template
T pop(rigtorp::MPMCQueue& q)
{
T item{};
while(!q.try_pop(item)) {
mt::min_sleep();
}
return item;
}

And the performance on the 24/24 and 96/96 was nice.
I don't understand why the initial pop and push has so bad performance when there are far more thread than my CPU can handle.

Thanks you.

push_front ?

Hi,

Is there a way to have a push_front (instead of a push_back) ?

CPU usage in pop block

Used ~30% CPU on wait in pop.
Compiled in VS2017

sample code:

auto q = std::make_shared<rigtorp::MPMCQueue<int>>(10);
while(true) {
    int v;
    q->pop(v);
    switch(v) {
    case 4726:
        LOGV << "event=" << v << " user removed";
        break;
    case 4724:
        LOGV << "event=" << v << " user password manually reseted";
        break;
    default:
        LOGV << "event=" << v << " unknow";
        break;
    }
}

image

cannot convert ‘Message*’ to ‘rigtorp::mpmc::Slot<Message>*’

Hi, I am trying to use MPMCQueue with a custom allocator. But I am getting the two following errors:

/Path/To/Project/Debug/external/include/rigtorp/MPMCQueue.h:129:33: error: cannot convert ‘Message*’ to ‘rigtorp::mpmc::Slot<Message>*’ in assignment
  129 |     slots_ = allocator_.allocate(capacity_ + 1);
/Path/To/Project/Debug/external/include/rigtorp/MPMCQueue.h:134:29: error: cannot convert ‘rigtorp::mpmc::Slot<Message>*’ to ‘CommMemoryAllocator<Message>::pointer’ {aka ‘Message*’}
  134 |       allocator_.deallocate(slots_, capacity_ + 1);

This is how I define the allocator and pass it to MPMCQueue:

template <typename T>
class CommMemoryAllocator
{
    public:
        typedef size_t size_type;
        typedef ptrdiff_t difference_type;
        typedef T* pointer;
        typedef const T* const_pointer;
        typedef T& reference;
        typedef const T& const_reference;
        typedef T value_type;

        T* allocate(size_type num, const void* hint = nullptr);

        void deallocate(pointer ptr, size_type num);
};

template <typename T>
using comm_mqueue = rigtorp::MPMCQueue<T, CommMemoryAllocator<T>>;

Then inside one of my classes I actually use it like so:

comm_mqueue<Message> downward_queue{20};

I don't actually use the queues yet (so no pushing or popping), I am getting these errors directly from this simple use.

Is there something I am missing or is this a bug?I am using Linux, gcc/g++ 11.1.0, and this is being compiled as C++17.

Beginner Question

I'm running into a compile issue using a struct containing a std::string as the type when constructing a new MPMCQueue.

For example, using a struct:

struct MyEvent {
	std::string MyString;
	long MyLong;
};

and trying to construct a MPMCQueue:

MPMCQueue<MyEvent> EventQueue(100);

I get a compiler message:

"T must be nothrow copy constructible".

If I remove the std::string from the struct and replace it with another long for example, it compiles and works perfectly. It also works perfectly if I use just a std::string in constructing the queue. But not when it's used inside the struct.

Any advice would be appreciated.

Why T can not be std::function<void()>?

My Compiler is gcc 5.4.
The static_assert is raised when compile this code : MPMCQueue<std::function<void()>>
What did this happen? For my knowledge, std::function is no-throw move assignment, so it should not trigger the static_assert in your code.

Compile errors

Using GNU g++ 4.8.5/Centos 7, I get the following compile errors when trying to compile a fresh clone of MPMCQueue. Can you offer assistance?

MPMCQueue.h:264:56: error: missing binary operator before token "("
#if defined(__has_cpp_attribute) && __has_cpp_attribute(no_unique_address)
MPMCQueue.h:58:13: error: ‘bad_array_new_length’ is not a member of ‘std’
throw std::bad_array_new_length();

long latency when push times are much bigger than queue size.

Hi

I tried a test pushing 1000000 times into the queue, and when the queue size is small like 64 or 1024, the latency is quite long. and when the queue size is big like 65535, the latency becomes low.

queue size 1024, push 10000 times, per use 0.3 us.
queue size 1024, push 1000000 times, per use 13 us

    using namespace chrono;
   
    vector<thread> threads;
    rigtorp::MPMCQueue<int *> q(1024);
    atomic<int> push_times{0};
    atomic<int> pop_times{0};

    {
        steady_clock::time_point t1 = steady_clock::now();
        for(int i=0; i<Consumer_num; ++i) {
            threads.emplace_back(
                [&] {
                    while(pop_times.load()<test_size) {
                        int* temp;
                        q.pop(temp);
                        pop_times.fetch_add(1);
                    }
                }
            );
        }
        while(push_times<test_size) {
            int* temp;
            q.push(temp);
            push_times.fetch_add(1);
        }
        steady_clock::time_point t2 = steady_clock::now();
        duration<double, micro> time_span = (duration<double, micro>)(t2 - t1);

        this_thread::sleep_for(chrono::microseconds(500));
        cout<<"rigtorp::MPMCQueue: push " << test_size << " data, per cost us:" << (time_span).count()/test_size <<endl;
        if(push_times.load() != pop_times.load()) {
            cout<<"warning: push_times=" << push_times.load() << ", pop_times=" << pop_times.load() <<endl;
        }
    }
    exit(0);

Memory corruption issue with slot storage

Hi,

I ported the code to MS Visual Studion 2015 and while storing large structures, I found that the slot storage as declared below was buggy (or not portable).

std::aligned_storage<sizeof(T), alignof(T)> storage;

I fixed it by declaring as:

typename std::aligned_storage<sizeof(T), alignof(T)>::type storage;

sizeof(std::aligned_storage<sizeof(T), alignof(T)>) is "1"
sizeof(std::aligned_storage<sizeof(T), alignof(T)>::type) was the proper size of my type "T".

I suspect that the type you tested were small enough to fit in the "padding" to prevent false sharing.

Peeking

Would it be possible to add a peek method?

BUG: can't Push if head_ overflow?

I read the code, and found if the head_ overflow, then it can't Push anymore.
and I change the head_ type from size_t to uint16_t, and just run code:

MPMCQueue<size_t> mp(1024);
for (size_t i = 0; i < UINT64_MAX; ++i) {
    mp.push(i);
    size_t j = 0;
    if (!mp.try_pop(j)) {
        std::cout << "failed " << i << std::endl;
        break;
    }
    std::cout << "success " << i << std::endl;
}

and then it stop cout at 65535 and then dead

success 65533
success 65534
success 65535

Can just set the turn to true or false to represent reading or writing is done

According to the implementation.

Enqeue:

  • Acquire next write ticket from head.
  • Wait for our turn (true) to write slot (ticket % capacity).
  • Set turn = false to inform the readers we are done writing.

Dequeue:

  • Acquire next read ticket from tail.
  • Wait for our turn (false) to read slot (ticket % capacity).
  • Set turn = true to inform the writers we are done reading.

Dose this work?

Thank you.

`try_pop` returns false when the queue is not empty.

I use a semaphore (std::counting_semaphore in c++20 for convenience here) to make sure that the queue is not empty. There is a simple example:

#include <thread>
#include <vector>
#include <iostream>
#include "rigtorp/MPMCQueue.h"

int main() {
  rigtorp::MPMCQueue<int> queue(2333);
  std::counting_semaphore<> sem{0};

  std::thread reader([&]{
    int num = 100000;
    while (num--) {
      sem.acquire();
      int x;
      if (!queue.try_pop(x)) {
        std::cout << queue.size() << "\n";
        std::cout << "error\n";
        exit(-1);
      }
    }
  });

  std::vector<std::thread> writers;
  for (int i = 0; i < 10; i++) {
    writers.emplace_back([&] {
      int num = 10000;
      while (num--) {
        queue.push(num);
        sem.release();
      }
    });
  }

  reader.join();
  for (auto& writer : writers) writer.join();

  std::cout << "ok\n";
}

The code can print "error". Such result can be reproduced in godbolt.

I think it caused by release-acquire memory order of atomic operations, but I am not sure how to solve it.

static assertion failed

I created a queue of global variables, but an error occurred during compilation.
Slot must be aligned to cache line boundary to prevent false sharing.
141 | alignof(Slot) == hardwareInterferenceSize,
The way to create a queue is as follows: MPMCQueue<HqOpsStoreMessage> snap_mpmc_queue(MPMCBUF_SIZE);
How to solve this problem?

Update README with more precise description of the queue behavior

I am opening this issue because I found that the README didn't explain with precision the behavior that is expected from the MPMC queue.

In the README, one can read the following:

bool try_push(const T &v);

Try to enqueue an item using copy construction. Returns true on success and false if queue is full.

However, if I run the following test, it fails:

TEST(mpmc_queue_test, push_after_pop_should_not_fail)
{
  rigtorp::MPMCQueue<int> q(256);
  for (int i = 0; i != 256; ++i) {
    ASSERT_TRUE(q.try_push(i));
  }
  std::atomic<bool> running{true};

  auto task = [&q, &running]() {
    while (running) {
      int v;
      if (q.try_pop(v)) {
        ASSERT_TRUE(q.try_push(v));
      }
    }
  };

  std::thread t1{task};
  std::thread t2{task};
  std::thread t3{task};
  std::thread t4{task};

  std::this_thread::sleep_for(std::chrono::seconds{5});
  running = false;
  t1.join();
  t2.join();
  t3.join();
  t4.join();
}

Notice that the try_push inside the lambda is always going to push to a queue that is never full. This is guaranteed via the try_pop just one line above. If there can be spurious failures when multiple writers call try_push concurrently, then this should be clear in the function description in the README.

empty()

Hi,

I just wrote the "empty" method, just want to share it. I hope it is correct (it is based on the try_pop)

bool empty() noexcept { auto tail = tail_.load(std::memory_order_acquire); for (;;) { auto &slot = slots_[idx(tail)]; if (turn(tail) * 2 + 1 == slot.turn.load(std::memory_order_acquire)) { if (tail_.compare_exchange_strong(tail, tail + 1)) { return false; } } else { auto const prevTail = tail; tail = tail_.load(std::memory_order_acquire); if (tail == prevTail) { return true; } } } }

How is this queue lock-free?

A thread that blocks after the call to fetch_add() will block later producers and consumers that use the same slot. As objects are enqueued (and dequeued), the ring will wrap around and come back to earlier slots. For enqueue, such an event can be pushed far into the future as the queue capacity can be made arbitrarily large (as memory allows) but not for dequeue. Possibly blocking could be avoided if producers and consumers synchronise with each other (e.g. use CAS to update the slot and restart the operation (allocate new index) if the slot doesn't have the expected value.

std::aligned_storage is deprecated

In c++23, std::aligned_storage is being deprecated (The proposed alternative is to use std::array<std::byte>). This causes compiler errors with for example clang-17 or 18 with c++23 flags see below.

...
[build] build/vcpkg_installed/x64-linux/include/rigtorp/MPMCQueue.h:108:17: error: 'aligned_storage<4, 4>' is deprecated [-Werror,-Wdeprecated-declarations]
[build]   108 |   typename std::aligned_storage<sizeof(T), alignof(T)>::type storage;
[build]       |                 ^
[build] /usr/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/allocator.h:193:38: note: in instantiation of template class 'rigtorp::mpmc::Slot<int>' requested here
[build]   193 |             if (__builtin_mul_overflow(__n, sizeof(_Tp), &__n))
[build]       |                                             ^
[build] build/vcpkg_installed/x64-linux/include/rigtorp/MPMCQueue.h:129:25: note: in instantiation of member function 'std::allocator<rigtorp::mpmc::Slot<int>>::allocate' requested here
[build]   129 |     slots_ = allocator_.allocate(capacity_ + 1);
[build]       |                         ^
[build] utilities/include/utilities/parallel_helpers.h:28:18: note: in instantiation of member function 'rigtorp::mpmc::Queue<int>::Queue' requested here
[build]    28 |     auto tasks = rigtorp::MPMCQueue<TaskArgT>(n_tasks);
[build]       |                  ^
...
[build] 1 error generated.

We are getting MPMCqueue with the newest release of vcpkg. To work around this, we just add warning suppressions around it, but getting a real fix for this would be nice.

Feature request: is LIFO possible?

This is a very nice MPMC queue! Is it possible to add LIFO functionality? It looks as if pop() could be modified to use head_.fetch_sub(1) but I am uncertain if this breaks something?

CMake Deprecation Warning

CMake Deprecation Warning at output/_deps/mpmcqueue-src/CMakeLists.txt:1 (cmake_minimum_required):
  Compatibility with CMake < 3.5 will be removed from a future version of
  CMake.

  Update the VERSION argument <min> value or use a ...<max> suffix to tell
  CMake that the project does not need compatibility with older versions.

Please consider updating cmake_minimum_required(VERSION 3.2) to 3.5

A Wait-Free MPMCQueue

Hi, I've just created a wait-free MPMC based on an idea by this project, can you help take a look: WFMPMC.

Thanks.

Does this work on shared memory?

For example, Linux open_shm, or MPI_win_allocate_shared, combined with placement new:

const int N = 10; // capacity
std::byte* buffer;
/* allocate buffer as shm on main proc*/
if (/*main proc*/) {
    new (buffer) MPMCQueue<int> q(N);
    // consume
} else {
    new (buffer) MPMCQueue<int> q(N);
   // produce
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.