Giter Site home page Giter Site logo

crossbeam-channel's People

Contributors

alexbool avatar bcko avatar danburkert avatar eun-ice avatar gsquire avatar jeehoonkang avatar killercup avatar kleimkuhler avatar lndbrg avatar pslydhh avatar simonsapin avatar timnn avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

crossbeam-channel's Issues

different behavior between while and for loops

I'm finding that while loops seem to process data, however for loops may be deadlocking. An example would be the following:

extern crate crossbeam_channel;

use std::thread;
use crossbeam_channel as channel;

fn run(urls: Vec<String>, paths: &Vec<String>) {
    let (s, r) = channel::bounded(0);
    for (thread_n, url) in urls.iter().enumerate() {
        let thread_r = r.clone();
        let url = url.clone();
        thread::spawn(move || {
            //while let Some(path) = thread_r.recv() {
            for path in thread_r.recv() {
                println!("INFO[{}]: {}{}", thread_n, url, path);
            }
        });
    }

    for path in paths.iter() {
        println!("DEBUG: path: {}", path);
        s.send(path.clone());
    }
}

fn main() {
    let urls = vec![
        "url1".to_string(),
        "url2".to_string(),
    ];
    let paths = vec![
        "/path1".to_string(),
        "/path2".to_string(),
        "/path3".to_string(),
        "/path4".to_string(),
        "/path5".to_string(),
        "/path6".to_string(),
    ];
    run(urls, &paths);
    println!("Done");
}

The for loop produces the following output where I need to terminate the code because it seems to have stalled:

DEBUG: path: /path1
DEBUG: path: /path2
INFO[0]: url1/path1
DEBUG: path: /path3
INFO[1]: url2/path2
^C

and if we uncomment the while loop and comment out the for loop the code produces the following:

DEBUG: path: /path1
INFO[0]: url1/path1
DEBUG: path: /path2
DEBUG: path: /path3
INFO[0]: url1/path2
DEBUG: path: /path4
DEBUG: path: /path5
INFO[1]: url2/path3
INFO[0]: url1/path4
INFO[1]: url2/path5
DEBUG: path: /path6
INFO[0]: url1/path6
Done

I'm not sure if this is a bug. The following is from rustup show:

rustup show
Default host: x86_64-unknown-linux-gnu

installed toolchains
--------------------

stable-x86_64-unknown-linux-gnu
nightly-2017-12-21-x86_64-unknown-linux-gnu
nightly-x86_64-unknown-linux-gnu (default)

installed targets for active toolchain
--------------------------------------

armv7-unknown-linux-gnueabihf
x86_64-unknown-linux-gnu

active toolchain
----------------

nightly-x86_64-unknown-linux-gnu (default)
rustc 1.28.0-nightly (1d4dbf488 2018-06-11)

Unwind safety?

I wonder whether the crossbeam_channel sender is unwind-safe?

recv_timeout()

Is there a way to use a crossbeam_channel::Receiver in a manner equivalent to the recv_timeout() method of the mpsc::Receiver of the standard library? As far as I can see, the closest equivalent would be polling in a loop but that would consume extra CPU and introduce variable latency dependent on the delay of the polling loop. If there isn't a way to do this, are there plans to add a recv_timeout() method?

Complex wording

I know that this might be weird issue but I decided to create it anyway.

A channel can be created by calling bounded or unbounded. The former creates a channel of bounded capacity (i.e. there is a limit to how many messages it can hold), while the latter creates a channel of unbounded capacity (i.e. it can contain an arbitrary number of messages).

While reading documentation this sentence was pretty complicated to understand and I needed to check the meaning of the former and the latter. I think that a lot of non English native people might have problems with that construct.

If you think this is not a problem, feel free to close this issue.

Simplified send in selection

The trickiest thing about send operations in selection is that we have to give ownership of the message to each call to send, and then regain ownership if sending fails. This also means that the message has to be bound to a mutable variable and in select_loop! we have to use something like send(tx, mut msg).

I've just realized that in all three underlying channel implementations (list-based, array-based, and exchanger-based flavor) we send messages in two steps: first we acquire a slot in the queue, and on success we finally supply the message.

Here's an example for illustration. Currently, we send messages in select like this:

let mut msg = "foo".to_string();
let mut sel = Select::new();
loop {
    if let Err(err) = sel.send(&tx, msg) {
        msg = err.0;
    } else {
        break;
    }
}

But with an explicit two-step approach instead of ownership juggling we could have:

let msg = "foo".to_string();
let mut sel = Select::new();
loop {
    if let Ok(s) = sel.send(&tx) {
        s.finish(msg);
        break;
    }
}

This is very similar to the upcoming placement-new syntax.

Note that this would remove one more roadbump from select_loop! - if you write send(tx, msg), then msg will be evaluated at most once rather than in every iteration of the loop.

cc @TimNN

Syntax for timeouts in select

Now that #41 is almost finished, I'm thinking about the syntax for timeouts in the new select! macro. There are two options I'm exploring:

First option

let timeout = Duration::from_secs(1);
select! {
    recv(r, msg) => println!("got {:?}", msg),
    default(timeout) => println!("timed out"),
}

The default case optionally accepts a timeout (of type Duration) or a deadline (of type Instant). It can also accept Option<Duration> and Option<Instant>, where None simply means the default case will never get fired.

Second option

We define these two helper functions like in Go and chan crate:

fn after(dur: Duration) -> Receiver<()>;
fn tick(dur: Duration) -> Receiver<()>;

The default case now doesn't accept any arguments, but one can still handle timeouts by creating a channel using one of the two helper functions:

let timeout = Duration::from_secs(1);
select! {
    recv(r, msg) => println!("got {:?}", msg),
    recv(channel::after(timeout)) => println!("timed out"),
}

Functions after and tick don't spawn threads - they're very cheap and don't allocate anything (the logic is handled in a special receiver variant), so there would be no concerns about performance like in chan crate.


So the questions are:

  1. Should default be able to accept a timeout/deadline as an argument?
  2. Do we need functions after and tick?

cc @BurntSushi Do you have an opinion on this?
cc @tekjar You asked for ticker in #36. Any thoughts about this idea?

add into() implementation for std::sync::mpsc::TrySendError

Use Case

I would like for the ch! macro to be able to move to it's own crate and be usable for any current or future non-blocking channel. I think this would allow it to be usable by a futures-channel crate even (maybe with a bit of added syntax for the Context argument). (meh, that isn't really known)

Feature request

Add into() to convert to the stdlib type, or use the stdlib type directly.

Reasoning

As far as I can tell these types are basically identical

pub enum TrySendError<T> {
    Full(T),
    Disconnected(T),
}

crossbeam_channel::TrySendError

pub enum TrySendError<T> {
    Full(T),
    Disconnected(T),
}

The same is true for TryRecvErorr (from std):

pub enum TryRecvError {
    Empty,
    Disconnected,
}

Select should use runtime checks to make sure its rules are not broken

The selection mechanism is brittle - it requires you to follow a list of rules in order for selection to work properly. If one of those rules is broken, channels might run into deadlocks or livelocks. Instead of running into such weird behavior, we should fail as early as possible by panicking.

One idea would be to compute some sort of hash of all selection cases probed in each iteration of the loop. If we notice that two iterations have different hashes, that must mean cases were not probed in the same order, or some cases are missing, or we have extra cases, or maybe even duplicate cases. And then we should panic.

The select_loop! macro has opportunities for automatically inserting additional checks. For example, the macro could desugar to:

let mut sel = Select::new();
crossbeam_channel::_select_loop_start();
loop {
    // Probe each case...

    crossbeam_channel::_select_loop_step();
}
crossbeam_channel::_select_loop_end();

We should probably also check:

  1. That there are no duplicate timeout, would block, or disconnected cases.
  2. That selection finished (check inside the destructor of Select).
  3. That the same instance of Select isn't used for two different selections.

Broadcasting

I fairly often see people on IRC asking for broadcast channels. Sending a message into a broadcast channel means each receiver will receive a clone of the message. This is in contrast to normal channels, where only one receiver will receive it.

If we're going to add support for broadcasting, we must take into consideration integration with selection and how to work out blocking operations. I'd prefer not to include a broadcasting channel into this crate if it doesn't work with the selection mechanism.

I think the easiest solution would be to introduce a new type Broadcast<T>, which is just a thin wrapper around RwLock<Vec<Sender<T>> (of course, we can always do something to avoid the cost of locking/unlocking). There could be a method Broadcast::register that creates a new unbounded channel, adds the new sender to the list, and returns the new receiver. Broadcasting a message would iterate through the list and send a clone of the message to each sender. If sending the message fails due to disconnection, the sender is removed from the list.

Supporting bounded broadcast channels is kinda tricky, and I suppose probably not worth the effort. The unbounded version is conceptually simpler because broadcasting in that case never blocks.

Also, should a broadcast channel internally have one message queue and clone messages when receiving, or have a message queue for each receiver and clone messages when sending? The latter approach is much simpler, but the former could be faster and use less memory. The former also requires T: Sync due to concurrent calls to T::clone. I'm leaning towards the latter because it's so much easier.

Another idea is to avoid introducing the new type Broadcast<T> and simply reuse the Sender/Receiver API by introducing a new channel flavor. There'd be constructor broadcast() that returns (Sender<T>, Receiver<T>) (just like the other two constructors). The constructor would create a sender and receiver of special broadcast flavor and save a function pointer to T::clone so that it can be used later without requiring T: Clone bound in the Sender/Receiver API. The main issue with this idea is that it's hard to fit register method into the Sender API - the only way to register more receivers would be by cloning them (Receiver::clone would be special-cased for the broadcast flavor).

cc @dbrgn Relevant to our IRC discussion.
cc @jonhoo As the author of bus, you might be interested. If you've got the time, your opinion would be very appreciated!

Lessons to be taken from channels in Go?

I'd like to take a step back and challenge some of the core design decisions in crossbeam-channel, std::sync::mpsc and futures::sync::mpsc.

Motivation: select! for crossbeam-channel is hard to get right

The current select_loop! macro is kinda silly (it's a loop, you can't use break/continue inside it, it causes potentially subtle side effects on every iteration). I'm trying to come up with a new, nicer macro, with fewer surprises, and without the implicit loop.

This is what seems like the best solution so far:

select! {
    recv(receiver, msg)     => {} // `msg` is of type `T`
    send(sender, foo + bar) => {} // sends message `foo + bar`
    closed(receiver)        => {} // fires when `receiver` is closed
    closed(sender)          => {} // fires when `sender` is closed
    default(timeout)        => {} // fires when `timeout` expires
}

And this is how it works:

  1. If any of the recv or send cases can complete without blocking, a random one is chosen. The chosen operation is executed and its block (in this example {} for simplicity) is evaluated.
  2. If any of the closed cases are ready (because a channel is closed), it is evaluated.
  3. If the timeout has expired, the default case is evaluated.
  4. Otherwise, we wait until something changes.

Pros:

  • No surprises (apart from the complexity) - it behaves just as one would expect.
  • Very flexible - allows a lot of freedom in choosing how to react to channel events.

Cons:

  • Using a recv or a send case doesn't nudge you into handling the closed case. For example, this is in contrast to a bare Receiver::recv operation, which returns a Result so the compiler advises you to do something with it (like call .unwrap()).
  • The macro looks kind of complicated.
  • Internal implementation is a little challenging (although not too bad).

There were some other different ideas for the macro but I won't go there.

Folks from Servo (more concretely, @SimonSapin and @nox) were cool with this macro idea, although it wasn't the absolute best option for their use case.

@matthieu-m had a good point that the select! macro should ideally be exhaustive in the sense that it makes sure you don't forget edge cases like a channel being unexpectedly closed.

Now let's see if we can somehow force the user to handle the closed case for every recv and send. One idea is to change recv so that it returns a Result<T, RecvError>:

select! {
    recv(receiver, msg) => {
        match msg {
            Ok(m) => println!("got {:?}", m),
            Err(RecvError) => println!("the channel is closed"),
        }
    }
}

But what about the send case? Here's a try:

let mut greeting = String::new("Howdy!");
select! {
    send(sender, greeting, result) => {
        match result {
            Ok(()) => println!("successfully sent"),
            Err(SendError(m)) => greeting = m,
        }
    }
}

That'd work and eliminate the need for closed case, but it doesn't look very nice. To be fair, users of this select! syntax would typically just raise a panic on 'closed' events:

let mut greeting = String::new("Howdy!");
select! {
    recv(receiver, msg) => {
        let m = msg.expect("the channel is closed");
        println!("got {:?}", m),
    }
    send(sender, greeting, result) => {
        result.expect("the channel is closed");
    }
}

Looks a bit better, but still kind of clumsy.

When does a channel become closed/disconnected?

Depends on the implementation.

  • In std::sync::mpsc: when either the Receiver or all Senders get dropped.
  • In crossbeam-channel: when either all Receivers or all Senders get dropped.
  • In futures::sync::mpsc: when either the Receiver or all Senders get dropped, or when you call Receiver::close.
  • In chan: when all Senders get dropped.
  • In Go: when you call close(sender). It is not possible to close from the receiver side.

These differences are important. The chan crate follows Go's model very closely so they have very similar behavior. The reason why Go allows closing from the sender side is because it follows the principle of unidirectionality.

Unidirectionality means that information flows in one direction only. Closing a channel signals to the receivers that no more messages will arrive, ever. Note that even if all receivers get dropped, sending into the channel works as usual. The whole idea is that receiver side cannot signal anything to the sender side!

Why do we want unidirectionality? See this and this comment by Russ Cox for an explanation.

Channels in Go

Channels in Go are quite peculiar. They come with a bunch of rules that seem arbitrary at first sight, but are actually well-thought-out. See Channel Axioms and Curious Channels by Dave Cheney. The controversial blog post titled Go channels are bad and you should feel bad is worth a read, too, despite the title.

Here are a few interesting rules:

  1. Sending into a closed channel panics. The idea is that senders have to coordinate among themselves so that the last sender closes the channel.

  2. Receiving from a closed channel returns a 'zero value'. This is like returning None, but Go doesn't have sum types.

  3. Sending into a nil channel blocks forever. This is handy when you want to disable a send operation inside a select - just set the sender to nil!

  4. Receiving from a nil channel blocks forever. This is useful for the same reason the previous rule is. See this StackOverflow answer for an example.

How would we port Go's channels to Rust

Let's solve some of the quirks in Go's channels by using two useful language features of Rust: destructors and sum types.

Keeping the idea of unidirectionality, we change the disconnection behavior: channel gets closed only when all Senders get dropped. That's it. This means sending into a channel cannot panic because having a sender implies the channel is not closed. Next, receiving from a closed channel returns an Option<T> rather than a 'zero value'. The chan crate follows the same design - see here and here.

I'm feeling very tempted to redesign crossbeam-channel around the philosophy of Go channels:

impl<T> Sender<T> {
    fn send(&self, msg: T);
}

impl<T> Receiver<T> {
    fn recv(&self) -> Option<T>; // `None` if closed.
}

let greeting = String::new("Howdy!");
select! {
    recv(receiver, msg) => {
        // `msg` is of type `Option<T>`
        let m = msg.expect("the channel is closed");
        println!("got {}", m);
    }
    send(sender, greeting) => {
        println!("successfully sent");
    }
    default(timeout) => println!("timed out!"),
}

This is beautifully simple:

  • Easy to understand, especially for programmers coming from the world of Go.
  • No need for those annoying unwraps in sender.send(foo).unwrap(). @BurntSushi is going to like this. :)
  • select! macro doesn't need the closed case.
  • You're advised to handle the possibility of the channel being closed in the recv case. Perhaps we might want to change the Option type to Result.

Some drawbacks:

  • Dropping all receivers doesn't prevent senders from sending. This might or might not be a drawback - we could argue both ways.
  • select! is not as powerful as before. But it probably doesn't matter since all real-world cases should be covered by this simpler version.

Note that we could also accept Option<Sender<T>> and Option<Receiver<T>> in the recv and send cases. That would be equivalent to using nil channels in Go.

We can get all the benefits of Go channels without its weak spots like accidental panics (e.g. sending into a closed channel), accidental deadlocks (e.g. receiving from a nil channel), and incorrect closing (we close automatically when the last Sender gets dropped).

Final words

The simple Go-like channel interface currently seems to me to be sitting in some kind of sweet spot of the design space. I've been thinking about this problem for way too long now, constantly switching from one idea to another. In the end, I'm not sure whether this one is the way to go and need your opinion.

Any thoughts?

cc @arcnmx - you might be interested in this comment, too.

Docs: cloning receivers does *not* clone messages

Some people assumed that each message sent into the channel will be delivered to all Receiver associated with the channel. It's a perfectly reasonable assumption.

We should clear the confusion by documenting that cloning a Receiver merely creates another handle to the same channel, and each message sent into the channel can be received only once.

Stop selection if *any* channel becomes disconnected.

I've noticed that ipc-channel relies on this behavior in std::sync::mpsc::Select: link to code.

The call to select.wait() will wake up even if just one channel becomes disconnected. This is something crossbeam-channel's selection cannot do right now. We can only wake up when all channels become disconnected.

Perhaps we should have two kinds of disconnection cases, all_disconnected() and any_disconnected()? (these names are not great so I need better ideas)

cc @TimNN

SIGILL in `<crossbeam_channel::flavors::zero::Channel<T>>::recv`

Hi! I am seeing this very weird SIGILL. Unfortunately, I can't give a nice steps to reproduce because there are a lot of moving parts here. I am not even sure that it is a problem with crossbeam-channel. However I think it makes sense to report the issue nonetheless :)

I've recently added crossbeam channel to RLS: rust-lang/rls#923.

I don't actually send any messages and simply wait for channels to close, taking advantage of the select! macro. Specifically, I see SIGILL in this bit of code:

https://github.com/rust-lang-nursery/rls/blob/0b9254b7dcdf52bba50a6e477d7891fb23adf62b/src/concurrency.rs#L44-L57

The most interesting part is that I get SIGILL only when testing RLS inside rust-lang/rust repository. That is, cargo test inside RLS itself works.

I get rust-lang rust checked out locally to 29ee65411c46b8f701bd1f241725092cb1b347e6 commit, and the src/tools/rls submodule checked out to matklad/rls@746b0f4.

With this setup, runnig ./x.py test src/tools/rls sigills. See the rls commit above for the precise point where it happens.

So, this is definitely not the most self-contained and easy to reproduce bug report, but this is the best I have at this time :-)

Oneshot channel

We might want to introduce a new type of channel in addition to bounded and unbounded channels:

fn oneshot<T>() -> (Sender<T>, Receiver<T>);

This type of channel internally has only one slot and gets automatically disconnected as soon as one message is sent into it. That means it uses less memory and is faster than unbounded and bounded channels. Another advantage might be that it makes code more robust: if you really need to use the channel only once, then automatic disconnection might prevent bugs or help in debugging.

The main drawback is that automatic disconnection makes oneshot kind of odd in comparison to other channel types. However, we already have zero-capacity channels, which I find to be even more odd, so perhaps introducing oneshot is not a big deal.

cc @dbrgn

Document how to specify timeouts

The following pattern is used for timeouts:

select! {
    recv(r) => {}
    recv(channel::after(timeout)) => {}
}

We should document it more clearly - perhaps by adding a section to lib.rs.

Specify used go/rust version in benchmarks

It's not clear what version of go or rust has been used to produce the sample benchmark :)

Also it seems that Go (go1.10.3 linux/amd64) is slightly ahead of crossbeam-channel (bounded channel) ;)
plot

System: Linux 4.17.5-1-ARCH #1 SMP PREEMPT
CPU: Intel(R) Core(TM) i7-6700K CPU (4 physical cores/8 logical cores)
Go: go1.10.3 linux/amd64
Rust: rustc 1.29.0-nightly (960f6046c 2018-07-08)

Futures support

Hi. Thanks a lot for this crate. Are there any plans of adding futures support?

Allocate smaller nodes in unbounded channels

If an unbounded channel is created for sending only one message, 31 of the initially allocated 32 slots won't be used at all.

The situation can be improved by starting with a single-slot node in the beginning and allocating exponentially larger nodes for further messages.

Perhaps we should also avoid allocating anything at all in the channel constructor. This might give us some performance wins in cases where no messages are sent through the channel.

Regarding interaction with #80, only nodes of size 32 should be recycled.

Duplicate crossbeam-utils dependency

I noticed that crossbeam-channel 0.1.1 seems to pull in crossbeam-utils twice: v0.1.0 (directly) and v0.2.1 (via crossbeam-epoch).

I'm kind of new to the rust/cargo ecosystem, so I'm not sure if this is a practical issue in terms if linking ambiguity, but coming from the maven ecosystem, it always make nervous to see duplicate dependencies with different versions.

Terminating / breaking select_loop

I may have missed a part of the docs, but is it possible to break/terminate a select_loop?

As an example, I have a thread that reads from stdin and pushes the line through a channel. On the other side, I read from that channel:

select_loop! {
  recv(stdin_rx, input) => {
    match &*input.trim() {
      ...
      "q" | "quit" => {
        println!("Goodbye");
        // break
      },
      ...
    }
  },
  ...
}

Is this conditional "break" possible somehow?

Add peek method on Receiver

I would be interested in having a "peek" method for Receiver, which would, if there is a message available allow a function to be called with a const reference to that message. This could allow a thread to do some work or thinking regarding a message before definitively removing it from the channel.

For instance, with a zero sized bounded channel, it could allow the sender to block until the message has been acted upon.

Importing `select` with 2018 edition

(Caution: I'm not sure if this may be a language issue still in the works.)

When I try to do use crossbeam_channel::select to import the macro on Rust 2018, since select! uses two helper macros I have to import these as well, ending up with

use crossbeam_channel::{select, __crossbeam_channel_parse, __crossbeam_channel_codegen};

Microbenchmarks

We have a few fun benchmarks against other channels, but for finer performance tuning we'll need those typical #[bench] microbenchmarks.

Mac OS supports

Does this crate support Mac OS? Several tests fail on Mac OS platform. You can see details here.

Stopping a receiver when other receiver is active

Hi @stjepang . I'm planning to write an mpsc abstraction over crossbeam channel to serialize data in the channel to disk when the channel is full. So my sender looks like this

struct Sender {
    tx: crossbeam_channel::Sender,
    rx: crossbeam_channel ::Receiver
}
struct Receiver {
   rx: crossbeam_channel::Receiver
}

When sender's try_send throws a channel full error, I want to use sender's rx to serialize data to disk. But to maintain ordering, I want the other receiver to block when the backup is going on. Is there a better way than using an atomic pointer to check if the queue backup is in progress?

Sorry if this is a wrong place for these questions

Interval support in select_loop

Hi, It'll be helpful if select_loop has support to periodically do some work.

ticker := time.NewTicker(5 * time.Second)
quit := make(chan struct{})
go func() {
    for {
       select {
        case <- ticker.C:
            // do stuff
        case <- quit:
            ticker.Stop()
            return
        }
    }
 }()

I know I can do this with a separate thread and push interval events in the channel but inbuilt Interval will be much more ergonomic

The select macro generates clippy errors

When using the select macro, clippy is angry at me, probably coming from within the macro:

error: casting from `*const u8` to a more-strictly-aligned pointer (`*const crossbeam_channel::Receiver<i32>`)
 --> src/main.rs:6:5
  |
6 | /     select! {
7 | |         recv(r) => (),
8 | |     }
  | |_____^
  |
  = note: #[deny(cast_ptr_alignment)] on by default
  = help: for further information visit https://rust-lang-nursery.github.io/rust-clippy/v0.0.207/index.html#cast_ptr_alignment
  = note: this error originates in a macro outside of the current crate (in Nightly builds, run with -Z external-macro-backtrace for more info)

I don't know what exactly happens there, but my guess is the *const u8 is some โ€žgenericโ€œ pointer. I've seen these commonly represented/casted to usize instead of pointer to bytes.

Rename disconnect to close?

Nomenclature in other channel implementations:

I think mixing closing and disconnection is confusing and we should stick to one term or the other, and be consistent about it. My preference goes toward closing since it is more prevalent, shorter to type, and more intuitive (IMO, closing implies nothing can be sent into the channel anymore, while disconnection sounds more hazardous, as if the messages currently buffered in the channel might get dropped).

The main drawback of switching to closing consistently is that {TryRecvError,TrySendError}::Disconnected would have to be renamed to {TryRecvError,TrySendError}::Closed, thus departing from the interface in std::sync::mpsc. This could be an annoyance when switching from std::sync::mpsc to crossbeam-channel.

Since I'm preparing a set of breaking changes for the 0.2 version of crossbeam-channel, perhaps we should pull the trigger and switch the nomenclature to closing?

Select over multiple receivers with different types

I'm writing a small library to aggregate and report different types of events to a collection server.

In crossbeam-channel 0.2.1 I could store receivers of different message types in one collection:

trait ErasedChannel: Send {
    fn recv(&mut self, select: &mut Select) -> Result<(), SelectRecvError>;
}

struct Channel<V> {
    rx: Receiver<V>,
    reporter: Report<V>,
}

impl<V> ErasedChannel for Channel<V>
where
    V: Send,
{
    fn recv(&mut self, select: &mut Select) -> Result<(), SelectRecvError> {
        let msg = select.recv(&self.rx)?;
        self.reporter.report(msg);
        Ok(())
    }
}

pub struct Collector {
    channels: Vec<Box<ErasedChannel>>,
}

impl Collector {
    fn aggregate(&mut self, deadline: Instant) {
        'aggregating: loop {
            // Convert deadline into duration for select's timeout
            let timeout = match Instant::now() {
                x if x < deadline => deadline - x,
                _ => break 'aggregating,
            };

            let mut select = Select::with_timeout(timeout);

            'selecting: loop {
                for mut channel in &mut self.channels {
                    if let Ok(_) = channel.recv(&mut select) {
                        break 'selecting;
                    }
                }

                if select.timed_out() {
                    break 'selecting;
                }
            }
        }
    }
}

Is it possible to achieve something similar with crossbeam-channel 0.3?

Couldn't compile

error[E0277]: the trait bound `[u8; 64]: core::clone::Clone` is not satisfied
  --> /home/.cargo/registry/src/github.com-1ecc6299db9ec823/crossbeam-utils-0.2.1/src/cache_padded.rs:32:13
   |
32 |             bytes: [u8; 64],
   |             ^^^^^^^^^^^^^^^ the trait `core::clone::Clone` is not implemented for `[u8; 64]`
   |
   = help: the following implementations were found:
             <[T; 28] as core::clone::Clone>
             <[T; 14] as core::clone::Clone>
             <[T; 7] as core::clone::Clone>
             <[T; 8] as core::clone::Clone>
           and 29 others
   = note: required by `core::clone::Clone::clone`

error[E0277]: the trait bound `T: core::marker::Copy` is not satisfied
  --> /home/.cargo/registry/src/github.com-1ecc6299db9ec823/crossbeam-utils-0.2.1/src/cache_padded.rs:36:13
   |
36 |             _marker: ([T; 0], PhantomData<T>),
   |             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `core::marker::Copy` is not implemented for `T`
   |
   = help: consider adding a `where T: core::marker::Copy` bound
   = note: required because of the requirements on the impl of `core::clone::Clone` for `[T; 0]`
   = note: required because of the requirements on the impl of `core::clone::Clone` for `([T; 0], core::marker::PhantomData<T>)`
   = note: required by `core::clone::Clone::clone`

How to find out whether a channel is closed?

Because of the deprecation of burntsushi's chan crate, I tried to migrate my code to crossbeam-channel, as recommended by burntsushi. However in some cases I need to distinguish (on the receiver side) between an empty channel and a closed one, which does not seem possible with crossbeam-channel.

Luckily for my personal use case, for any channel I either need a receiver that can be cloned (which crossbeam-channel provides) OR I need to be able to know whether a channel is closed, but not both at the same time. So I can use a mixture of crossbeam-channel and std::sync::mpsc channels in my code and achieve the desired effect. However it would be really nice if all my channel needs were met by a single implementation.

Is this something that can be emulated by crossbeam-channel, or will this be provided in the future?

Other than that, thank you for making this crate! ๐Ÿ˜„

Legible error messages for select_loop! macro

We should add additional rules to the macro to pattern-match some common mistakes and then use compile_error! to print human-readable messages instead of weird macro errors. Examples of mistakes we should handle:

  1. Forgetting parens, e.g. writing would_block => instead of would_block() =>.
  2. Wrong number of arguments, e.g. writing recv(rx) =>.
  3. Misspelling a case, e.g. receive(rx, msg) =>.

Additionally, it'd be also nice if we could catch duplicate cases. I think that should be doable.

cc @TimNN

Recycle nodes in unbounded channels

In order to reduce the rate of allocation/deallocation in unbounded channels, we should keep some number of removed nodes for quick recycling.

Here's an idea for how we might implement this:

  • Each unbounded channel keeps a lock-free stack of removed nodes.

  • If a receive operation removes a node, we should push it onto the stack. However, if the stack is larger than the live queue, that means we're wasting a lot of memory. In such cases we should deallocate the removed node as well as deallocate one node from the stack.

  • If a send operation needs a new node, it should first try popping one from the stack. And if the stack is empty, just allocate a fresh node.

Introduce polling and remove `Select`

Here's an idea.

Right now, we have a quite complex Select mechanism, but it will barely be used at all. The reason is that select_loop! is already powerful enough for the vast majority of use cases. That means Select will be used in very niche situations only, like the one in ipc-channel (see issue #6).

Also, ipc-channel needs a feature we currently don't have: stopping selection as soon as any channel becomes disconnected (issue #6). This is again a very niche feature and shoehorning it into Select feels like a hack, which got me thinking about all this.

Moreover, we're going to add support for futures (#22). Selection and blocking is already very reminiscent of futures, but I believe we could take a step further and make it even more so.

I propose introducing two new methods: Sender::poll_send and Receiver::poll_recv. These methods should be identical to Sender::try_send and Receiver::try_recv, with one important difference: if they fail (due to the channel being full/empty), the current thread (std::thread::Thread) is added to an internal wait list and unparked by another operation as soon as the send/recv operation can proceed or the channel becomes disconnected.

This is the same idea as the one behind Future::poll. Put differently, poll_send and poll_recv are similar to Sink::start_send and Stream::poll, except they work with threads rather than tasks. And there is something about channels working with both threads and tasks more-or-less the same way that feels right.

Note that these polling methods are very powerful - in fact, they allow us to roll our own Select. Select is essentially just a series of try_send/try_recv calls followed by another series of poll_send/poll_recv calls, followed by thread::park().

If we introduce poll_send/poll_recv, that'll cover the requirements of ipc-channel - it won't have to use Select at all! And the whole problem with any_selected becomes irrelevant so we can forget about it. Since polling is even more powerful that present Select, I suggest we just get rid of it forever. It's a very complex piece of code, rarely used, and apparently not good enough even in those rare cases. So why even bother with it?

One potential problem with polling is that, if we call poll_send/poll_recv too often, threads might get spuriously unparked up very often. For example, if we roll our own selection and call poll_recv on a lot of channels, then the current thread might get spuriously unparked a lot of times even after selection ends. That is a potential waste of CPU time, but maybe not a big concern - I'm not sure. It's possible to remedy the problem by introducing function cancel_polls() that cancels all currently registered wakeups associated with the current thread.

cc @SimonSapin @TimNN @alexcrichton - you might be interested.

Possible deadlock footgun with lost consumer

I noticed the send on the sender no longer returns a result. I find this a little bit troubling. Consider the producer-consumer design pattern. Originally, the producer would be doing something like sender.send(stuff).unwrap().

Now, if the consumer terminated due to an error (for example by panicking), the panic would propagate, or would require the producer to handle it somehow. The application would probably terminate by an error. However, in the current implementation, the producer will deadlock (in case of bounded channel, in case of unbounded, it'll just keep eating memory forever), not bringing attention to the problem automatically by eg. crashing.

The previous version was a little bit more typing, but seemed to prevent these kinds of subtle bugs.

`libc::abort` prevents `wasm` build

Building for wasm currently fails because libc::abort is not available on this target.

$ rustup target add wasm32-unknown-unknown
$ cargo build --target wasm32-unknown-unknown
error[E0425]: cannot find function `abort` in module `libc`=======>    ] 17/18: crossbeam-channel                                                                                                                
   --> src/internal/channel.rs:268:28
    |
268 |             unsafe { libc::abort() }
    |                            ^^^^^ not found in `libc`
help: possible candidates are found in other modules, you can import them into scope
    |
3   | use std::intrinsics::abort;
    |
3   | use std::process::abort;
    |

error[E0425]: cannot find function `abort` in module `libc`
  --> src/internal/utils.rs:61:24
   |
61 |         unsafe { libc::abort() }
   |                        ^^^^^ not found in `libc`
help: possible candidates are found in other modules, you can import them into scope
   |
3  | use std::intrinsics::abort;
   |
3  | use std::process::abort;
   |

error: aborting due to 2 previous errors

We're pretty close to at least compiling! Do you think it would be possible to use an alternative to libc::abort in these cases?

Handling lists of senders/receivers in select_loop!

Here's a trick that might make the Select struct almost completely unnecessary in practice.

In addition to accepting a single receiver rx in recv(rx, msg), let's also accept any list of receivers. i.e. anything that implements IntoIterator<Item = &Receiver<_>>. The same applies to send(tx, msg). This way select_loop! macro could easily handle dynamic cases as well.

An alternative syntax might be recv(rxs, rx, msg), meaning we iterate over the list rxs, and when it fires, we get referecen rx and message msg, meaning msg came from rx.

For example, I need like to do the following in crate ipc-channel. This is currently not possible with select_loop! so I have to resort to Select.

select_loop! {
    recv(
        self.receivers
            .iter_mut()
            .map(|r| r.receiver.get_mut().as_ref().unwrap())
            .enumerate(),
        (index, rx) => rx, // This is how we get the receiver from the tuple.
        msg
    ) => {
        println!("Got message from {}-th receiver: {}", index, msg); // We can use the index here.
    }
}

By turning the iterator into an enumerating one, we can also get an index of the receiver from which the message was received. I'm not completely sure about the syntax - this is just an idea.

@TimNN, any thoughts on all this?

Hash and Eq for Senders?

I was thinking of adding crossbeam-channel support to chan-signal. A naive port would require Senders to be put into a HashMap, so they'd need to be Hash+Eq. AFAICS, chan channels have a unique ID shared by all its senders and receivers.

Is that feasible with crossbeam-channel?

Conditional send/recv in select_loop!

I found the following snippet in Servo's codebase, which uses dynamic selection over mpsc channels:

let sel = Select::new();
let mut worker_handle = sel.handle(worker_port);
let mut timer_event_handle = sel.handle(timer_event_port);
let mut devtools_handle = sel.handle(devtools_port);
unsafe {
    worker_handle.add();
    timer_event_handle.add();
    if scope.from_devtools_sender().is_some() {
        devtools_handle.add();
    }
}
let ret = sel.wait();
if ret == worker_handle.id() {
    Ok(MixedMessage::FromWorker(worker_port.recv()?))
} else if ret == timer_event_handle.id() {
    Ok(MixedMessage::FromScheduler(timer_event_port.recv()?))
} else if ret == devtools_handle.id() {
    Ok(MixedMessage::FromDevtools(devtools_port.recv()?))
} else {
    panic!("unexpected select result!")
}

This code can be ported to crossbeam-channel like this:

let mut sel = Select::new();
loop {
    if let Ok(msg) = sel.recv(worker_port) {
        break Ok(MixedMessage::FromWorker(msg));
    }
    if let Ok(msg) = sel.recv(timer_event_port) {
        break Ok(MixedMessage::FromScheduler(msg));
    }
    if scope.from_devtools_sender().is_some() {
        if let Ok(msg) = sel.recv(devtools_port) {
            break Ok(MixedMessage::FromDevtools(msg));
        }
    }
}

This is much nicer, but wouldn't it be even better if we could do the following?

select_loop! {
    recv(worker_port, msg) => Ok(MixedMessage::FromWorker(msg)),
    recv(timer_event_port, msg) => Ok(MixedMessage::FromScheduler(msg)),
    recv(devtools_port, msg) if scope.from_devtools_sender().is_some() => {
        Ok(MixedMessage::FromDevtools(msg))
    }
}

@TimNN, what do you think? :)

Build Error: transmute called with types of different sizes

I am getting this error when building crossbeam-channel
Building using rustc 1.22.1 (05e2e1c41 2017-11-22)

  --> /home/user/.cargo/registry/src/github.com-1ecc6299db9ec823/crossbeam-0.1.6/src/sync/seg_queue.rs:34:29
   |
34 |             ready: unsafe { mem::transmute([0usize; SEG_SIZE]) },
   |                             ^^^^^^^^^^^^^^
   |
   = note: source type: [usize; 32] (2048 bits)
   = note: target type: [std::sync::atomic::AtomicBool; 32] (256 bits)

More flexible iterator recv() form in select?

My situation is that I have a vector of "client" structures, each of which has a channel.

In version 0.1, I had a manual Select loop with the following:

                for mut client in &mut clients {
                    if let Ok(msg) = select.recv(&client.chan) {
                        /* process msg here, which uses `client` mutably */

That is, I need a mutable reference to the containing struct inside the handling code.

With the new select, there is a nice way to use an iterator over receivers, but I can't build that in a way that would allow me the mutable access to client.

I see two ways to make this possible:

  • Add a form that in addition to the iterator, also passes a closure that retrieves an actual channel from the iterator items, but pass the iterator item, not the channel, as from.

  • Make the trait for the first argument to recv public, so that I could implement it for Client, or add a trait for the items of the iterator.

Is there anything else?

Should we simply allow duplicate cases?

The main reason why we disallow duplicate cases and have a rule that says no two cases may operate on the same end of the same channel is that otherwise Select couldn't detect where a new iteration of the loop starts.

For example, the following code doesn't work because Select will assume that there is only one case in the loop and then go off the rails:

let mut sel = Select::new();
loop {
    if let Ok(_) = sel.recv(&rx) { println!("First!"); break; }
    if let Ok(_) = sel.recv(&rx) { println!("First!"); break; }
    if let Ok(_) = sel.recv(&rx2) { println!("Second!"); break; }
}

However, if we require that each iteration of the loop has to start with sel.step(), the problem goes away and Select can understand the same arrangement just fine:

let mut sel = Select::new();
loop {
    sel.step();
    if let Ok(_) = sel.recv(&rx) { println!("First!"); break; }
    if let Ok(_) = sel.recv(&rx) { println!("First!"); break; }
    if let Ok(_) = sel.recv(&rx2) { println!("Second!"); break; }
}

Requiring sel.step() at the beginning of the loop is surely a small unergonomic bump, but at the same time one of the subtler rules around selection simply goes away. Perhaps this is a net win?

cc @TimNN

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.