Giter Site home page Giter Site logo

chan's Introduction

chan

Pure C implementation of Go channels.

Unbuffered Channels

Unbuffered channels provide both a mechanism for communication as well as synchronization. When data is sent into the channel, the sender blocks until a receiver is ready. Likewise, a receiver will block until a sender is ready.

#include <pthread.h>
#include <stdio.h>
#include "chan.h"

chan_t* chan;

void* ping()
{
    // Send blocks until receiver is ready.
    chan_send(chan, "ping");
    return NULL;
}

int main()
{
    // Initialize unbuffered channel.
    chan = chan_init(0);

    pthread_t th;
    pthread_create(&th, NULL, ping, NULL);

    // Receive blocks until sender is ready.
    void* msg;
    chan_recv(chan, &msg);
    printf("%s\n", msg);

    // Clean up channel.
    chan_dispose(chan);
}

With an unbuffered channel, the sender and receiver are synchronized, so the above program will print ping.

Buffered Channels

Buffered channels accept a limited number of values without a corresponding receiver for those values. Sending data will not block unless the channel is full. Receiving data will block only if the channel is empty.

#include <stdio.h>
#include "chan.h"

int main()
{
    // Initialize buffered channel with a capacity of 2.
    chan_t* chan = chan_init(2);

    // Send up to 2 values without receiver.
    chan_send(chan, "buffered");
    chan_send(chan, "channel");

    // Later receive the values.
    void* msg;
    chan_recv(chan, &msg);
    printf("%s\n", msg);

    chan_recv(chan, &msg);
    printf("%s\n", msg);

    // Clean up channel.
    chan_dispose(chan);
}

The above program will print buffered and then channel. The sends do not block because the channel has a capacity of 2. Sending more after that would block until values were received.

Closing Channels

When a channel is closed, no more values can be sent on it. Receiving on a closed channel will return an indication code that the channel has been closed. This can be useful to communicate completion to the channel’s receivers. If the closed channel is buffered, values will be received on it until empty.

#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include "chan.h"

chan_t* jobs;
chan_t* done;

void* worker()
{
    // Process jobs until channel is closed.
    void* job;
    while (chan_recv(jobs, &job) == 0)
    {
        printf("received job %d\n", (int) job);
    }

    // Notify that all jobs were received.
    printf("received all jobs\n");
    chan_send(done, "1");
    return NULL;
}

int main()
{
    // Initialize channels.
    jobs = chan_init(5);
    done = chan_init(0);

    pthread_t th;
    pthread_create(&th, NULL, worker, NULL);

    // Send 3 jobs over the jobs channel then close it.
    int i;
    for (i = 1; i <= 3; i++)
    {
        chan_send(jobs, (void*) (uintptr_t) i);
        printf("sent job %d\n", i);
    }
    chan_close(jobs);
    printf("sent all jobs\n");

    // Wait for all jobs to be received.
    chan_recv(done, NULL);

    // Clean up channels.
    chan_dispose(jobs);
    chan_dispose(done);
}

This program will print:

sent job 1
received job 1
sent job 2
received job 2
sent job 3
received job 3
sent all jobs
received all jobs

Select Statements

Select statements choose which of a set of possible send or receive operations will proceed. They also provide a way to perform non-blocking sends and receives. Selects are particularly useful for multiplexing communication over several channels.

#include <stdio.h>
#include "chan.h"

chan_t* messages;
chan_t* signals;

int main()
{
    // Initialize channels.
    messages = chan_init(0);
    signals = chan_init(0);
    void *msg;

    // This is a non-blocking receive. If a value is available on messages,
    // select will take the messages (0) case with that value. If not, it will
    // immediately take the default case.
    switch(chan_select(&messages, 1, &msg, NULL, 0, NULL))
    {
        case 0:
            printf("received message %s\n", msg);
            break;
        default:
            printf("no message received\n");
    }

    // A non-blocking send works similarly.
    msg = "foo";
    switch(chan_select(NULL, 0, NULL, &messages, 1, &msg))
    {
        case 0:
            printf("sent message %s\n", msg);
            break;
        default:
            printf("no message sent\n");
    }

    // We can use multiple cases above the default clause to implement a
    // multi-way non-blocking select. Here we attempt non-blocking receives on
    // both messages and signals.
    chan_t* chans[2] = {messages, signals};
    switch(chan_select(chans, 2, &msg, NULL, 0, NULL))
    {
        case 0:
            printf("received message %s\n", msg);
            break;
        case 1:
            printf("received signal %s\n", msg);
            break;
        default:
            printf("no activity\n");
    }

    // Clean up channels.
    chan_dispose(messages);
    chan_dispose(signals);
}

This program will print:

no message received
no message sent
no activity

chan's People

Contributors

dmac avatar goodwinos avatar longlene avatar mattn avatar stefantalpalaru avatar tylertreat avatar tylertreat-wf avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

chan's Issues

deadlock2

The following test program produces a deadlock in case the condition variable is
implemented as FIFO (waiting readers are signalled first):

void test_chan_multi2()
{
    chan_t* chan = chan_init(5);
    pthread_t th[100];
    for (int i = 0; i < 100; ++i) {
        pthread_create(&th[i], NULL, receiver, chan);
    }
    sleep(1);
    assert_true(chan->r_waiting >= 50, chan, "At least half of the receiver threads are waiting");

    // simulate 5 high priority writing threads
    pthread_mutex_lock(&chan->m_mu);
    for (int i = 0; i < 5; ++i) {
        assert_true(0 == queue_add(&chan->queue, "foo"), chan, "Simulate writer thread");
        pthread_cond_signal(&chan->m_cond); // wakeup reader
    }

    // simulate 6th high priority waiting writer
    assert_true(chan->queue->size == chan->queue->capacity, chan, "6th writer has to wait");
    chan->w_waiting++;
    // !!! simulated writer must be woken up by reader !!!
    pthread_cond_wait(&chan->m_cond, &chan->m_mu);
    chan->w_waiting--;
    pthread_mutex_unlock(&chan->m_mu);

    // wake-up other waiting reader    
    for (int i = 5; i < 100; ) {
        if (chan_size(chan) < 5) {
            ++i; // one more woken up reader
            chan_send(chan, "foo");
        } else {
            // pass cpu to reader
            pthread_yield(); 
        }
    }

    for (int i = 0; i < 100; ++i) {
        pthread_join(th[i], NULL);
    }
    chan_dispose(chan);
    pass();
}

The gist of the problem is:
5 waiting readers are woken up but do not run cause of the (simulated) higher priority
of writing threads. The 6th writer waits until the queue becomes not full.
Now the 5 readers run and signal the waiting writer but all 5
pthread_cond_signal(&chan->m_cond) are received by other waiting readers.
The condition variable (at least on Linux) is implemented to serve in FIFO order.
Therefore the signal is received by a waiting reader first.

Proposal:
Add two condition variables. One for readers and one for writers (not_empty vs. not_full).
Readers signal the writer condition and writers signal the reader condition.

primitive values

Currently, chan provides chan_send to send data by void* argument. But someone may want to send int/long/double/char etc.
How do you think? And I worry about someone may mistake to use chan and argument pointer. For example.

char buf[256];
while (!chan_is_closed(c)) {
  if (condition) {
    strcpy(buf, "doA");
  } else {
    strcpy(buf, "doB");
  }
  chan_send(c, p);
}

If the chan is possible to do buffering, the buf will be overwritten. It need to allocation new memory for each sending. My suggestion is adding new function like below.

chan_send_string(c, buf);
chan_recv_string(c);
chan_send_int(c, 3);
chan_recv_int(c);
chan_send_double(c, 4.5);
chan_recv_double(c);

This functions allocate new memory for the types. chan_recv_int, chan_recv_double is possible to free the memory automatically. chan_recv_string will be required to free memory by developers.

Does this facilitate C - Go communication?

Hi,

Looks like a great package! I imagine that the primary use case is go-like channels within a C program, but I'm wondering - have you actually mirrored the Go datastructure exactly? Is it possible to use CGO to embed a C program using this library inside of Go and actually send and receive on the same channel from both languages?

Thanks!
Oliver

Add support for blocking selects

chan_select currently only supports non-blocking selects. It should also support blocking selects, maybe by passing in a flag.

deadlock

With a certain kind of probability the following test program could produce a deadlock:

void test_chan_multi()
{
    chan_t* chan = chan_init(5);
    pthread_t th[100];
    for (int i = 0; i < 50; ++i) {
       pthread_create(&th[i], NULL, sender, chan);
    }
    sleep(1);
    for (int i = 50; i < 100; ++i) {
       pthread_create(&th[i], NULL, receiver, chan);
    }
    sleep(1);
    for (int i = 0; i < 100; ++i) {
       pthread_join(th[i], NULL);
    }
    chan_dispose(chan);
    pass();
}

The gist of the problem is:
A waiting sender is woken up from a reader but before it could run
other readers run which do not wake up waiting senders cause
queue->size does not equal (chan->queue->capacity - 1).

Proposal:
Everytime a thread enters the critical section
let it check for waiting readers/writers (add a waiting counter)
and let it send a signal in case of waiting threads.
This solution could send more signals than necessary
but is faster than using pthread_cond_broadcast.

Clibs package

Hi there,

This looks like a really cool library 👍. I was wondering whether you could add it to the clib package manager? It'd make it easier for some of us to use it on our projects (like me).

If you aren't up for it I can fork it and put my local repo on the wiki, but you are the project maintainer.

Thanks! :)

problem of threads?

10,000,000 mil channel possible? coz may hit linux thread creation limit?

chan_t* chan = chan_init(10000000);
  1. i've checked the c code, uses pthreads. am i wrong? this will create 10mil threads (which uses around 1mb per thread?)
  2. how much "overhead of ram" per channel?

optional size param for send/recv

it would be great if there was a size param, so this library can be drop-in replaced with a compatible api for passing messages between processes.

memory not freed

  1. blocking_pipe_read { ... void* msg_ptr = malloc(sizeof(void_)) ... }
    Where is the corresponding free?
    void_ msg;
    chan_recv(chan, &msg);
    printf("%s\n", msg);
    // missing free(msg)
    chan_recv(chan, &msg);
    printf("%s\n", msg);
    // missing free(msg)
  2. Why do you write void* into a pipe and also synchronize with pthread_cond_wait?
    It is much simpler to synchronize with pthread_cond_wait and write a pointer into a data field of blocking_pipe_t*. You do not need any pipe in this case.
  3. How do make sure that the content where the pointer points to is valid?
    What about the following ?
    void* ping() {
    char localstr[] = { "ping" };
    chan_send(chan, localstr);
    return NULL;
    }
    This causes undefined behaviour. Your synchronization in blocking_pipe_t synchronizes
    only the pointer transfer but does not make sure that the receiving end has processed
    the data the pointer is pointing to !!
  4. Error: not unlocking mutex
    unbuffered_chan_recv(chan_t* chan, void** data)
    {
    pthread_mutex_lock(&chan->r_mu);
    if (chan_is_closed(chan))
    {
    // missing _unlock !!
    errno = EPIPE;
    return -1;
    }

(No more time to check for more errors :-))

chan_recv can receive duplicate sent values

I've encountered an issue where chan_recv appears to be receiving sent values that were already consumed by a previous chan_recv. This program exhibits the issue for me:

#include <stdio.h>
#include <pthread.h>
#include "chan/chan.h"

void* produce(void *_c) {
    chan_t *c = (chan_t*)_c;
    int code;

    code = chan_send(c, (void*)(uintptr_t)1);
    printf("send 1 code %d\n", code);

    code = chan_send(c, (void*)(uintptr_t)2);
    printf("send 2 code %d\n", code);

    chan_close(c);
    return NULL;
}

int main(void) {
    chan_t *c = chan_init(0);

    pthread_t pt;
    pthread_create(&pt, NULL, produce, c);

    void *v;
    int code;

    code = chan_recv(c, &v);
    printf("recv %d code %d\n", (int)(uintptr_t)v, code);

    code = chan_recv(c, &v);
    printf("recv %d code %d\n", (int)(uintptr_t)v, code);

    code = chan_recv(c, &v);
    printf("recv %d code %d\n", (int)(uintptr_t)v, code);

    pthread_join(pt, NULL);
    chan_dispose(c);

    return 0;
}

I would expect the output of this program to be (in some order):

recv 1 code 0
send 1 code 0
recv 2 code 0
send 2 code 0
recv 2 code -1

However, I often get the output:

recv 1 code 0
send 1 code 0
recv 2 code 0
send 2 code 0
recv 2 code 0

The third chan_recv should never return 0.

C2x feature proposal?

This seems to be a great feature C2x could use. Have you though about putting a proposal with this code?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.