Giter Site home page Giter Site logo

kanal's People

Contributors

adrian-budau avatar fereidani avatar hauvgaard avatar quarticcat avatar sabify avatar shinmao avatar szepeviktor avatar terrarier2111 avatar zzau13 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kanal's Issues

Oneshot: Data race detected

Problem

Continuing from #34, I audited oneshot and found a data racing problem. The quickest way to reproduce it is using MIRI.

$ cargo +nightly miri --version                                 
miri 0.1.0 (a6f8aa5 2023-08-11)

$ cargo +nightly miri test --test=oneshot_test -- send_win_panic
Preparing a sysroot for Miri (target: x86_64-unknown-linux-gnu)... done
    Finished test [optimized + debuginfo] target(s) in 0.02s
     Running tests/oneshot_test.rs (/home/qc/.cache/cargo-build/miri/x86_64-unknown-linux-gnu/debug/deps/oneshot_test-c2884fce5be3f64a)
warning: Miri does not support optimizations. If you have enabled optimizations by selecting a Cargo profile (such as --release) which changes other profile settings such as whether debug assertions and overflow checks are enabled, those settings are still applied.


running 1 test
test asyncs::send_win_panic ... error: Undefined Behavior: Data race detected between (1) Read on thread `asyncs::send_wi` and (2) Write on thread `<unnamed>` at alloc33459. (2) just happened here
   --> /home/qc/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:497:1
    |
497 | pub unsafe fn drop_in_place<T: ?Sized>(to_drop: *mut T) {
    | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Data race detected between (1) Read on thread `asyncs::send_wi` and (2) Write on thread `<unnamed>` at alloc33459. (2) just happened here
    |
help: and (1) occurred earlier here
   --> /home/qc/Workspace/NotMe/kanal/src/signal.rs:230:15
    |
230 |         match &(*this).waker {
    |               ^^^^^^^^^^^^^^
    = help: this indicates a bug in the program: it performed an invalid operation, and caused Undefined Behavior
    = help: see https://doc.rust-lang.org/nightly/reference/behavior-considered-undefined.html for further information
    = note: BACKTRACE (of the first span):
    = note: inside `std::ptr::drop_in_place::<kanal::OneshotSendFuture<std::boxed::Box<usize>>> - shim(Some(kanal::OneshotSendFuture<std::boxed::Box<usize>>))` at /home/qc/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:497:1: 497:56
    = note: inside `std::ptr::drop_in_place::<std::boxed::Box<kanal::OneshotSendFuture<std::boxed::Box<usize>>>> - shim(Some(std::boxed::Box<kanal::OneshotSendFuture<std::boxed::Box<usize>>>))` at /home/qc/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:497:1: 497:56
    = note: inside `std::ptr::drop_in_place::<std::pin::Pin<std::boxed::Box<kanal::OneshotSendFuture<std::boxed::Box<usize>>>>> - shim(Some(std::pin::Pin<std::boxed::Box<kanal::OneshotSendFuture<std::boxed::Box<usize>>>>))` at /home/qc/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:497:1: 497:56
note: inside closure
   --> tests/oneshot_test.rs:298:13
    |
298 |             });
    |             ^

note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace

error: aborting due to previous error; 1 warning emitted

error: test failed, to rerun pass `--test oneshot_test`

Caused by:
  process didn't exit successfully: `/home/qc/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/bin/cargo-miri runner /home/qc/.cache/cargo-build/miri/x86_64-unknown-linux-gnu/debug/deps/oneshot_test-c2884fce5be3f64a send_win_panic` (exit status: 1)

How to trigger it

  1. The sender wins the race and goes waiting.
  2. The receiver loses the race and tries to finish it. Now it has already set the ptr to FINISHED but it hasn't actually finished receiving yet.
  3. The sender drops. It finds that the ptr has already been set to FINISHED, so it drops the OneshotInternal, which contains the waker.
  4. The receiver then finishes receiving and invokes wake. BANG!

Possible solution

You can add a state before FINISHED indicating that the data is transferring. An existing example is AtomicWaker, which has a REGISTERING state indicating that the waker is being replaced.

Do you have plan to implement broadcast channel?

Nice work!!

I think it would be nice to have a clear separation of the different kind of channels. I assume the one currently implemented is mpmc because both the sender and the receiver implements Clone. (Would there be a speed improvement to also implement mpsc?)

I would be really interested to see an implementation for a broadcast type of channel. You send the message once, the message needs to implement Clone, and all the receivers receive a copy of it. Like it is done by the crate async-broadcast.

Incorrect Send and Sync bounds

Currently both senders and receivers implement Send and Sync regardless of whether T implements Send, which is incorrect

Unsound implementation of `as_sync`

Hi, I am the researcher from sunlab. I am running our personal tools on open-source projects and found that these functions might be unsound.

The source of unsoundness

kanal/src/lib.rs

Lines 994 to 997 in 2dc6a1e

pub fn as_sync(&self) -> &Sender<T> {
// Safety: structure of Sender<T> and AsyncSender<T> is same
unsafe { transmute(self) }
}

These functions are trying to transmute AsyncSender to Sender.

#[cfg_attr(
    feature = "async",
    doc = r##"
)]
pub struct Sender<T> {
    internal: Internal<T>,
}

#[cfg(feature = "async")]
pub struct AsyncSender<T> {
    internal: Internal<T>,
}

Yes, just like the comments mentioned, the structures are totally same. However, the layout of repr(Rust) is not stable. It reserves the rights for compiler to insert paddings around the field for optimization. Only the totally same class could follow the same abi (e.g., Sender to Sender). The best way here is to annotate these struct as repr(C) to avoid UB.

All of as_sync functions might be affected: AsyncReceiver::<T>::as_sync, Sender::<T>::as_async, AsyncSender::<T>::as_sync, and Receiver::<T>::as_async.

Usage example

Hi, I think this lib is quite interesting!
However, I would like to see some examples for copy-pasta thingy.
Can you help provide that?

Help needed improving Kanal

I open this issue for those kind people who are interested provide help to develop Kanal.

We need help in these areas:

  1. Auditing Kanal for UB and security-related issues.
  2. Writing tests for all library functions, in all cases. include cases that the function returns an error.
  3. Improving documentation

`Stream` that take ownership of the `Receiver`

Hello, thank you very much for this crate. Exactly what I need for my application. I would like to ask:

How expensive to create one ReceiveStream from Receiver ?
If that is expensive, can you provide something like tokio ReceiverStream which takes the ownership of Receiver since axum::body::Body::from_stream requires an owned stream.

Kanal is slower than crossbeam

I was experimenting with multithreaded algorithms in Rust and to improve perfomance tried for one which is based on communication through the channel to replace crossbeam channels with kanal.

Here is the code

pub fn multithread_map_kanal_channel<T, R, F>(
    input: Vec<T>,
    f: F
) -> anyhow::Result<Vec<R>>
    where
        T: Send + Sync + Clone,
        R: Send + Sync + Default + Clone,
        F: Fn(T) -> R + Send + Sync + Copy,
{
    let size = input.len();

    if size == 0 {
        return Ok(vec![]);
    }

    let num_threads = num_cpus::get() - 1; // one thread to collect results

    let mut results = vec![Default::default(); size];
    let chunk_size = size / num_threads;
    let (snd, rcv) = kanal::bounded(size);
    let res = crossbeam::thread::scope(|s| {
        for (i, chunk) in input.chunks(chunk_size).enumerate() {
            let snd_cloned = snd.clone();
            s.spawn(move |_| {
                for (j, item) in chunk.iter().enumerate() {
                    snd_cloned
                        .send((i * chunk_size + j, f(item.clone())))
                        .unwrap();
                }
            });
        }
        s.spawn(|_| {
            let mut counter = size;
            while counter > 0 {
                let (index, value) = rcv.recv().unwrap();
                results[index] = value;
                counter -= 1;
            }
        });
    });
    res.map(|_| results)
        .map_err(|_| anyhow::anyhow!("One of the threads failed"))
}

crossbeam implementation is the same, only another channel is used.

I used the following code for benchmark.

fn gen_data(n: usize) -> Vec<u32> {
    let mut rng = rand::thread_rng();
    (0..n).map(|_| rng.gen_range(0..1000)).collect()
}

fn dup(x: u32) -> u32 {
    x * 2
}

fn fib(n: u32) -> u32 {
    let mut a = 0;
    let mut b = 1;

    match n {
        0 => b,
        _ => {
            for _ in 0..n {
                let c = a + b;
                a = b;
                b = c;
            }
            b
        }
    }
}

// Define the benchmark function
fn bench_parallel_map(c: &mut Criterion) {

    // Benchmark the parallel_map function with different input sizes
    let mut group = c.benchmark_group("parallel_map");
    for n in [150, 1000, 5_000, 20_000].iter() {
        group.bench_with_input(format!("crossbeam_chan_dup_{}", n), n, |b, &n| {
            b.iter(|| multithread_map_crossbeam_channel(gen_data(n), dup))
        });
        group.bench_with_input(format!("kanal_chan_dup_{}", n), n, |b, &n| {
            b.iter(|| multithread_map_kanal_channel(gen_data(n), dup))
        });

        group.bench_with_input(format!("crossbeam_chan_fib_{}", n), n, |b, &n| {
            b.iter(|| multithread_map_crossbeam_channel(gen_data(n), fib))
        });
        group.bench_with_input(format!("kanal_chan_fib_{}", n), n, |b, &n| {
            b.iter(|| multithread_map_kanal_channel(gen_data(n), fib))
        });
    }
    group.finish();
}


criterion_group!(benches, bench_parallel_map);
criterion_main!(benches);

The benchmark output for Apple M1 is in the attachment, I also did the similar experiment on an old i5 with 4 cores on Linux, and crossbeam also beats kanal.

bench.txt

Access to `AsyncSignal` is not sound

I see that others have made related PRs and suggestions, but in my opinion these came short of fully solving the problem.

The issue is that the AsyncSignal::poll method takes &mut self, and at the same time AsyncSignal::send can be accessed with &self. In theory, this is in itself UB, though the compiler has escaped hatches for such cases (see below). I see that it has been suggested to use UnsafeCells to store members, but this is IMO not a solution: an UnsafeCell is only correct if you have several coexisting &self and one of them wants to access a member by mutable reference; it is still incorrect if one of them is an &mut self.

For instance, say that you store the waker in an UnsafeCell<Waker>. In the poll method, the compiler is now free to assume that you have mutable access to this UnsafeCell, but unfortunately this will also make it assume that it has automatically mutable access to the Waker. In fact, you will see that the method UnsafeCell::get_mut(&mut self) is a safe method for this very reason. So now the compiler is free to make optimizations based on the assumption that nobody reads or write the Waker at the same time, which of course is incorrect since send does access the waker concurrently.

One solution (or rather a temporary work-around until Rust has a real solution for that) is to wrap the state in an Aliasable and the waker in an Aliasable<UnsafeCell>. Unfortunately Aliasable does not exist yet in std but I would trust the pinned-aliasable crate due to the credentials of its author. Her repo is also a very good read to understand those kind of issues.

Miri error when forgetting Box<T>

We have a Miri error report in the library when sending Box<T> in sync context.
in one-shot tests:

std::mem::forget(data);
 ^^^^ constructing invalid value: encountered a dangling box (use-after-free)

in sync tests:

626 | forget(data);
    |  ^^^^ Data race detected between (1) Write on thread `<unnamed>` and (2) Write on thread `<unnamed>` at alloc591188. (2) just happened here

Miri reports that forgetting the successfully moved Box<T> is a use-after-free.
We have drop tests in our test suits, if there was a drop error, it should be present in that part too.
I'm unsure about this report as forgetting a Box does nothing to it, but I'll keep this issue open until we find out whether it's a real problem or not.

`AsyncReceiver::try_recv` never returns `Err` for multiple sender

Thanks for developing such an excellent project!

I want to use kanal by sending via async with multiple senders and a single sync receiver.

But on 256a0e0 this code nerver ends.
Complete project here

Environment: wsl2

#[tokio::main]
async fn main() {
    const FUTURES: usize = 50;
    const N_SEND_PER_FUTURE: usize = 1000;

    let (tx, rx) = kanal::unbounded_async();

    let _futures = (0..FUTURES)
        .map(move |_| {
            let tx = tx.clone();
            tokio::spawn(async move {
                for _ in 0..N_SEND_PER_FUTURE {
                    tx.send(()).await.unwrap();
                }
                /*
                // Works for FUTURES = 1
                tx.close();
                */
            })
        })
        .collect::<Vec<_>>();

    let mut counter = 0;
    loop {
        match rx.try_recv() {
            Ok(Some(_)) => {
                counter += 1;
            }
            Ok(None) => {/* always reach here after complete receive */}
            Err(_) => {
                break;
            }
        }
    }
    /*
    // This is OK
    while let Ok(_) = rx.recv().await {
        counter += 1;
    }
    */

    assert_eq!(counter, FUTURES * N_SEND_PER_FUTURE);
    dbg!(counter);
}

Bounded Async Channel is 100x worse than kanal sync channel when system is loaded

I ran the benchmarks on a system where another process was using nearly 100% CPU on all cores while the benchmarks were running.

For the bounded channel with size 0 or 1, the performance of kanal-async (regardless of which lock implementation I used), ranged from ~15x worse to over 200x worse than the sync kanal implementation.

I'm not sure of the cause, and I'm filing this as a separate bug from #11 because it's not a complete hang (the benchmarks still completed), but it demonstrates there are substantial performance issues for kanal-async when on a loaded system.

image

Add Oneshot implementation

From what you have, a async oneshot without a clone that I can use to synchronize pairs of threads would not be complicated.
And also features to be able to reduce the compilation time.
And also I can help!

Async can't utilize `KanalPtr` as effective as Sync

With the addition of KanalPtr with 92c8f58 and 65e9102 we got these performance gains for SPSC benchmark for usize bounded(0) channel:
sync_kanal 1333923 messages per second
sync_pre7 920740 messages per second
sync_gain 30.98
async_current 9843154 messages per second
async_pre7 9723400 messages per second
async_gain 1.22

as you can see we got a 31 percent performance boost in sync but only 1 percent for async.
I need some help reviewing the async part to find out why we didn't receive a similar gain as sync.

A few notes

Just saw this on Reddit, and I think I have to leave a few notes on the code.

I don't want to demotivate you and can offer to send in a few PRs or help abstract unsafe stuff into concise modules. But since this is on crates.io, these issues should definitely be publicly addressed.

I didn't get through the whole code within the few minutes I had, and I also cannot even remotely prove that the concept is sound or unsound yet, but these are a few of my first findings (not ordered in any particular order):

  • That looks like it could reactivate terminated Signals and also result in two instances with an unlocked state. Also, a SAFETY comment would be nice in places like that, since the reader has no clue why this is supposed to be sound.

    kanal/src/signal.rs

    Lines 193 to 194 in d8f816a

    if !self.state.unlock() {
    self.state.force_unlock();

  • In a lot of places, there are separate branches for ZSTs, that make the code harder to read, but end up being ignored by the compiler anyways (godbolt).

    kanal/src/signal.rs

    Lines 76 to 83 in d8f816a

    unsafe fn read_ptr<T>(ptr: *const T) -> T {
    if std::mem::size_of::<T>() > 0 {
    std::ptr::read(ptr)
    } else {
    // for zero types
    std::mem::zeroed()
    }
    }

  • Also, in some cases, these branches also change the code's behaviour, since ZSTs are dropped at a different point in time. Since the Drop implementation can have side effects (i.e. modify thread locals) that's not quite right (playground).

    kanal/src/signal.rs

    Lines 116 to 122 in d8f816a

    pub unsafe fn send(&self, d: T) {
    if std::mem::size_of::<T>() > 0 {
    *self.data = ManuallyDrop::new(d);
    }
    self.state.store(UNLOCKED);
    self.wake();
    }

  • Mutex<Cell<T>> seems odd. Not sure what the intention of that is.

    pub struct WakerStore(Mutex<Cell<Option<Waker>>>);

  • SendSignal has unconstraint Send + Sync impls. Using this (Not used currently as far as I can see), it's possible to send !Send values to other threads. That can lead to data races, double free, and use after free.

    kanal/src/signal.rs

    Lines 165 to 166 in d8f816a

    unsafe impl<T> Send for SyncSignal<T> {}
    unsafe impl<T> Sync for SyncSignal<T> {}

  • There are a lot of pub items, that are not publicly reachable. This makes the code a bit harder to read since it's never clear if that's a public-facing API. And with some of the functions is crate, you really don't want them to be pub. pub(crate) could help here.

  • acquire_internal(&self.internal) vs. self.internal.lock() seems to decrease readability while not really saving characters.

    kanal/src/internal.rs

    Lines 12 to 14 in d8f816a

    pub fn acquire_internal<T>(internal: &'_ Internal<T>) -> MutexGuard<'_, ChannelInternal<T>> {
    internal.lock()
    }

  • forget is free, so there's no need to 'optimize' it away (godbolt)

    kanal/src/lib.rs

    Lines 198 to 200 in d8f816a

    if std::mem::needs_drop::<T>() {
    std::mem::forget(data);
    }

Any chance to update the benchmarks for `tokio`?

I tried to find where you're generating/running these benchmarks, but failed to do so.

Could you add cases for Tokio's MPSC channel? It supports both bounded and unbounded modes. It's generally very competitive with all other asynchronous MPSC channel implementations, so not seeing it here feels a bit weird.

Undefined behaviour inside library

miri reports lots undefined behaviour, and some are easy to confirm.

For example

running 13 tests
test kanal_tests::tests::mpmc_0 ... error: Undefined Behavior: trying to retag from <272286> for SharedReadWrite permission at alloc112011[0x10], but that tag does no
t exist in the borrow stack for this location
   --> src/signal.rs:294:34
    |
294 |             Signal::Sync(sig) => (**sig).recv(),
    |                                  ^^^^^^^^^^^^^^
    |                                  |
    |                                  trying to retag from <272286> for SharedReadWrite permission at alloc112011[0x10], but that tag does not exist in the borrow st
ack for this location
    |                                  this error occurs as part of retag at alloc112011[0x0..0x18]
    |
    = help: this indicates a potential bug in the program: it performed an invalid operation, but the Stacked Borrows rules it violated are still experimental
    = help: see https://github.com/rust-lang/unsafe-code-guidelines/blob/master/wip/stacked-borrows.md for further information
help: <272286> was created by a SharedReadWrite retag at offsets [0x0..0x18]
   --> src/signal.rs:184:22
    |
184 |         Signal::Sync(self as *mut Self)
    |                      ^^^^
help: <272286> was later invalidated at offsets [0x10..0x11] by a write access
   --> src/state.rs:119:9
    |
119 | /         self.v
120 | |             .compare_exchange(
121 | |                 LOCKED,
122 | |                 LOCKED_STARVATION,
123 | |                 Ordering::SeqCst,
124 | |                 Ordering::Relaxed,
125 | |             )
    | |_____________^
    = note: BACKTRACE:
    = note: inside `signal::Signal::<i32>::recv` at src/signal.rs:294:34
note: inside `Receiver::<i32>::recv` at src/lib.rs:430:25
   --> src/lib.rs:430:25
    |
430 |             unsafe { Ok(p.recv()) }
    |                         ^^^^^^^^
note: inside closure at src/kanal_tests.rs:41:36
   --> src/kanal_tests.rs:41:36
    |
41  |                         assert_eq!(rx.recv().unwrap(), 1);
    |                                    ^^^^^^^^^

Which is easy to confirm:
One thread has a Send which will lock an Internal which will then try to register (sometimes) a SyncSignal which will push a *mut SyncSignal in a list.

Thread 1 still has a mutable live access to this SyncSignal (let's name it sig). From it's point of view no one else can modify it, or it's inner fields, even creating a &mut for this same object is undefined behaviour in rust. This also is valid for it's State field which has an atomic. Having a &mut reference (even to an atomic) guarantees to the compiler that the memory pointed to can not be changed by anyone.

Thread 2 (the receiver) will pop this *mut SyncSignal from the list and dereference the pointer to call recv on it (that dereference will already introduce undefined behaviour by virtue of having 2 &mut references to sig from 2 threads at the same time).

You might argue that rust rules are too restrictive, but you also end up doing a write while these 2 mutable references are live. The first thread (sender), while waiting will write it's state with self.state.upgrade_lock() effectively introducing a race condition.

mixing sync and async context

I saw the following comment in the AsyncSender impl

// JUST FOR EXAMPLE IT IS WRONG TO USE SYNC INSTANCE IN ASYNC CONTEXT

why is this the case?

unsafe use of pointer to object on stack

let mut sig = SyncSignal::new(&mut data as *mut T, std::thread::current());

Here data is taken pointer from, then forgotten. However, data is on stack, so it can be corrupted when being read again. Do you have justification for this?

In Golang this is safe, as Golang performs escape analysis, and any object taken references from and escapes are actually allocated to heap with GC. But in Rust it is not.

Data race with waker access

Hi Khashayar,

As promised, I had a look at kanal to see which ideas I could steal ;-)

I do like very much the idea of writing directly to receivers, but realized that this is already basically what I do in my queue since I have a unique receiver (it's only an MPSC) and I only use atomic stores and loads on a "stamp" to signal the presence of an item in the queue, much like the LOCK and UNLOCK in your *Signal primitives.

I was therefore surprised that kanal could be as fast as tachyonix in many cases (or even faster at N=1), so I wanted to see how you achieved faster waker synchronization. But as I discovered it looks like waker access is not synchronized at all in kanal and is subject to data races.

For instance here:

kanal/src/signal.rs

Lines 22 to 32 in 79162a6

fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let v = self.state.value();
if v < LOCKED {
return std::task::Poll::Ready(v);
}
{
self.waker = Some(cx.waker().clone())
}

the waker is accessed mutably, but there is nothing that prevents a sender from accessing it concurrently here:

kanal/src/signal.rs

Lines 83 to 87 in 79162a6

pub unsafe fn send(this: *const Self, d: T) {
if std::mem::size_of::<T>() > 0 {
*(*this).data = d;
}
let waker = (*this).waker.as_ref().unwrap().clone();

2 options that would solve the problem:

  1. you could make sure that you acquire and keep the global lock with acquire_internal when you enter poll. However, you would also need to keep the lock a little longer in try_send so that the lock is still held when calling AsyncSignal::send (right now the lock guard is dropped just before). This should work, but you should then be extremely carefull to document that all methods which access the waker (like send) can only be called when the caller holds the lock.

  2. you could use an eventcount system, like the event-listener crate, or adapt the one from tachyonix which I think is slightly faster (but similar).

Whichever way you go, you will probably find that performance will suffer a lot because waker synchronization is one of the most expensive operation for an async channel.

As a general note, I think this issue and others could be caught by using Loom pervasively. I have personally spent much more time trying to "break" my implementation with Loom tests than I have spent writing the code itself, and I feel that it was worth every minute I invested in it.

pointer bugs

clone: https://github.com/beckend/kanal-bug
sometimes it hangs forever, sometimes you get that below:

❯ cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.02s
     Running `target/debug/p-73e9ac9c-8c15-4a85-aa6a-679d60b19f96`
free(): invalid pointer
fish: Job 1, 'cargo run' terminated by signal SIGABRT (Abort)
❯ cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.02s
     Running `target/debug/p-73e9ac9c-8c15-4a85-aa6a-679d60b19f96`
free(): double free detected in tcache 2
fish: Job 1, 'cargo run' terminated by signal SIGABRT (Abort)
❯ cargo run --release
    Finished release [optimized] target(s) in 0.02s
     Running `target/release/p-73e9ac9c-8c15-4a85-aa6a-679d60b19f96`
fish: Job 1, 'cargo run --release' terminated by signal SIGSEGV (Address boundary error)

Feature: implement recv_timeout for AsyncReceiver

As the method recv_timeout already implemented for Receiver, would it be implemented for AsyncReceiver in the future? Receiver::recv_timeout will block the thread. AsyncReceiver::recv_timeout should let out the thread and return a failure if it times out, which results in better concurrency performance. In many scenarios, it is important to return a failure if a timeout occurs.

Hangs on async benchmarks

kanal hangs for me 99% of the time in all async benchmarks, both with the official benchmarks and with Tachyobench.

I was initially suspecting a problem with async notifications, but since it uses 100% CPU on all threads when hanging, I start to think it might actually be due to the use of spin locks, though I don't know for sure.

Slow usize send when using `MiMalloc`

I'm not sure on the details here, but switching to MiMalloc dramatically increases the time it takes for a usize sized thing (tested w both usize and Box) takes to send between two threads.

Before (standard system allocator, M1 Max on macOS Sonoma)
Screenshot 2024-02-06 at 13 04 08

After

use mimalloc::MiMalloc;

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
Screenshot 2024-02-06 at 13 05 21

This is consistent over multiple runs. I'm running in release mode. I'm using quanta for measuring timings

Intrusive variants

Thanks for making this library, the design is very cool.

I have a high performance scenario where I want to avoid heap allocations, and am happy to manage memory more manually.

Would it be possible to provide "intrusive" variants of the oneshot channels where the user provides heap memory to initialise, and have a callback for drop?

In my case, I have a mbuf-style implementation with a custom allocator, where the mbufs are reference counted, and it would be nice to be able to embed the kanal values into the mbuf header. I can treat kanal's OneshotInternal as a live reference to the mbuf itself, which would work nicely. It would enable me to acquire an IO request buffer with metadata + data + completion notification mechanism in one alloc.

Feature: Provide a select! macro

Select would be very helpful especially when one channel is used to communicate cancellation while another is communicating elements.

However, I did not test closing a channel, but coming from go background, it would be very nice to have a select statement.

Lost wake-ups due to race

This is a second issue which is related but not the same as #14. Unlike #14, it is a regular race and not a data race, so AFAICT it does not expose any UB, but it will lead to lost wake-ups (a receiver could never wake up even though data is available). As I write below, I suspect that a variation of this issue exists as well in the SyncSignal, but I haven't reviewed the latter.

The problem can be seen for instance in the interaction between these 2 methods:

kanal/src/signal.rs

Lines 22 to 39 in 79162a6

fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let v = self.state.value();
if v < LOCKED {
return std::task::Poll::Ready(v);
}
{
self.waker = Some(cx.waker().clone())
}
let v = self.state.value();
if v >= LOCKED {
std::task::Poll::Pending
} else {
std::task::Poll::Ready(v)
}
}

kanal/src/signal.rs

Lines 83 to 90 in 79162a6

pub unsafe fn send(this: *const Self, d: T) {
if std::mem::size_of::<T>() > 0 {
*(*this).data = d;
}
let waker = (*this).waker.as_ref().unwrap().clone();
(*this).state.store(UNLOCKED);
waker.wake();
}

A race would appear in this scenario:

  1. poll reads LOCKED at line 26
  2. send reads an old (no longer actual) waker at line 87
  3. poll sets a new waker at line 31 and reads the state again at line 33, but since the read is just a SeqCst read, with respect to the waker store (a relaxed operation) it just acts as an Acquire operation, meaning it may be reordered to
    a) first: read LOCKED again
    b) then: set new waker
  4. send stores UNLOCK and wakes the old waker: the receive task will never wake up.

As suggested in issue #14, I would strongly urge you to instrument all code with Loom (use loom::cell::UnsafeCell etc) as this would definitely have caught these types of races.

On the bright side, any of the 2 solutions I suggested in #14 should also solve this race.

I suspect that a similar issue exists in SyncSignal, but this is only based on a gut feeling because evencount-type primitives (which were originally invented for classical blocking code such as SyncSignal) will nearly always need to use full fences or Read-Modify-Atomic operations to avoid such issues, and I don't see any in your code.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.