Giter Site home page Giter Site logo

Comments (31)

cpq avatar cpq commented on May 29, 2024 1

@jvo203 pushed mg_wakeup() and mg_wakeup_init - so the socket pair approach is back
please check out an updated multithreaded example

from mongoose.

scaprile avatar scaprile commented on May 29, 2024

Please run the example as is and if you experience any problems with that, please describe what you see there. I don't seem to see anything wrong with your code, but let's go step by step.
Then, please help us doing the following:

  • run make test on the repo root directory. That (among a lot of other stuff) tests the queue. You could also pinpoint that part and extend that test alone.
  • run every code that shows this problem without the mg_send() call.
    Thanks

from mongoose.

jvo203 avatar jvo203 commented on May 29, 2024

OK, let's take it step by step.

from mongoose.

jvo203 avatar jvo203 commented on May 29, 2024

Unfortunately running the unit_test program always results in a segmentation fault (each and every time):

86d7e87f 3 sock.c:274:read_conn         2 4 snd 0/0 rcv 0/2048 n=45 err=0
86d7e87f 3 dns.c:165:dns_cb             1 cesanta.com is 148.251.54.236
86d7e87f 3 sock.c:363:mg_connect_resolv 1 5 -> 148.251.54.236:80 pend
86d7e9b9 3 sock.c:285:write_conn        1 5 snd 48/2048 rcv 0/0 n=48 err=0
86d7ea67 2 unit_test.c:1279:test_http_c 0
FAILURE test/unit_test.c:1280: ok == 301

Thread 1 "unit_test" received signal SIGABRT, Aborted.
0x00007ffff6c91dec in __pthread_kill_implementation () from /lib64/libc.so.6
Missing separate debuginfos, use: zypper install libasan8-debuginfo-13.2.1+git8109-1.1.x86_64 libgcc_s1-debuginfo-13.2.1+git8109-1.1.x86_64 libstdc++6-debuginfo-13.2.1+git8109-1.1.x86_64 libubsan1-debuginfo-13.2.1+git8109-1.1.x86_64
(gdb) bt
#0  0x00007ffff6c91dec in __pthread_kill_implementation () from /lib64/libc.so.6
#1  0x00007ffff6c3f0c6 in raise () from /lib64/libc.so.6
#2  0x00007ffff6c268d7 in abort () from /lib64/libc.so.6
#3  0x00000000004098c0 in test_http_client () at test/unit_test.c:1280
#4  main () at test/unit_test.c:3286
(gdb)

from mongoose.

jvo203 avatar jvo203 commented on May 29, 2024

Please run the example as is and if you experience any problems with that, please describe what you see there. I don't seem to >see anything wrong with your code, but let's go step by step.

I've also ran the examples/multi-threaded, it executes OK but it can hardly be called a stress-test. The mg_queue in this example is only used to send exactly one message before being discarded of.

In the stress-test of my huge application using mongoose, the stress-test churned through about 64TB of RAM on a machine equipped with 32GB RAM, constantly computing and replying to incoming WebSockets requests, all handled by mongoose. The test ran for 12 hours. Each separate WebSocket connection had its own dedicated mg_queue, constantly adding, sending and removing WebSocket messages. The CPU cores, RAM as well as the network were placed under constant stress.

from mongoose.

scaprile avatar scaprile commented on May 29, 2024

Well...
wrt the unit-test, it is indicating you a failure: FAILURE test/unit_test.c:1280: ok == 301 the segfault is because we don't care to clean after a failure, this runs on virtual environments that are destroyed after that.
That failure is way before testing the queue and may be caused by some comm issue; it doesn't happen on our CI/CD test workflow.

That (among a lot of other stuff) tests the queue. You could also pinpoint that part and extend that test alone.

mongoose/test/unit_test.c

Lines 3130 to 3204 in 6192573

#define NMESSAGES 99999
static uint32_t s_qcrc = 0;
static int s_out, s_in;
static void producer(void *param) {
struct mg_queue *q = (struct mg_queue *) param;
char tmp[64 * 1024], *buf;
size_t len, ofs = sizeof(tmp);
for (s_out = 0; s_out < NMESSAGES; s_out++) {
if (ofs >= sizeof(tmp)) mg_random(tmp, sizeof(tmp)), ofs = 0;
len = ((uint8_t *) tmp)[ofs] % 55U + 1U;
if (ofs + len > sizeof(tmp)) len = sizeof(tmp) - ofs;
while ((mg_queue_book(q, &buf, len)) < len) (void) 0;
memcpy(buf, &tmp[ofs], len);
s_qcrc = mg_crc32(s_qcrc, buf, len);
ofs += len;
#if 0
fprintf(stderr, "-->prod %3d %8x %-3lu %zu/%zu/%lu\n", s_out, s_qcrc, len, q->tail,
q->head, buf - q->buf);
#endif
mg_queue_add(q, len);
}
}
static uint32_t consumer(struct mg_queue *q) {
uint32_t crc = 0;
for (s_in = 0; s_in < NMESSAGES; s_in++) {
char *buf;
size_t len;
while ((len = mg_queue_next(q, &buf)) == 0) (void) 0;
crc = mg_crc32(crc, buf, len);
#if 0
fprintf(stderr, "-->cons %3u %8x %-3lu %zu/%zu/%lu\n", s_in, crc, len, q->tail,
q->head, buf - q->buf);
#endif
mg_queue_del(q, len);
}
return crc;
}
#if MG_ARCH == MG_ARCH_WIN32
static void start_thread(void (*f)(void *), void *p) {
_beginthread((void(__cdecl *)(void *)) f, 0, p);
}
#elif MG_ARCH == MG_ARCH_UNIX
#include <pthread.h>
static void start_thread(void (*f)(void *), void *p) {
union {
void (*f1)(void *);
void *(*f2)(void *);
} u = {f};
pthread_t thread_id = (pthread_t) 0;
pthread_attr_t attr;
(void) pthread_attr_init(&attr);
(void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&thread_id, &attr, u.f2, p);
pthread_attr_destroy(&attr);
}
#else
static void start_thread(void (*f)(void *), void *p) {
(void) f, (void) p;
}
#endif
static void test_queue(void) {
char buf[512];
struct mg_queue queue;
uint32_t crc;
memset(buf, 0x55, sizeof(buf));
mg_queue_init(&queue, buf, sizeof(buf));
start_thread(producer, &queue); // Start producer in a separate thread
crc = consumer(&queue); // Consumer eats data in this thread
MG_INFO(("CRC1 %8x", s_qcrc)); // Show CRCs
MG_INFO(("CRC2 %8x", crc));
ASSERT(s_qcrc == crc);
}

please

I understand you are keeping it single-producer/single-consumer, as that is what this is.
We stress-tested that queue at its very beginning, now we only test briefly, Github tests are already slow by themselves.

I suggest you also trim down your scenario to the size of an example reproducing the problem. Then

run every code that shows this problem without the mg_send() call

We recently added that in order to wake up the manager thread, our initial approach was to reduce its sleeping time down to 10ms or so and not to use that. We are still analyzing what side effects this call might have (this is not our priority) and brainstorming for other options, so a good stress test and/or a clear issue would help.

Please post the trimmed-down code so we can try to reproduce that on our test environments and determine the cause.

Thanks

from mongoose.

jvo203 avatar jvo203 commented on May 29, 2024

I suggest you also trim down your scenario to the size of an example reproducing the problem. Then

run every code that shows this problem without the mg_send() call

We recently added that in order to wake up the manager thread, our initial approach was to reduce its sleeping time down to 10ms or so and not to use that. We are still analyzing what side effects this call might have

A quick response: the first time I got a set. fault related to mg_queue my code was not using the recently added UDP wake-up functionality. And the event pool was polling every 10ms to ensure prompt WebSocket responses.

Trimming down my application is not so simple, it is HUGE and it also runs libmicrohttpd as its main multithreaded HTTP server. Given some time I might come up with a different code that isolates the mg_queue part. This is the application: https://github.com/jvo203/FITSWEBQLSE

I'm afraid I have been operating it as a multiple producer / single consumer. The consumer is a mongoose event loop. The producers are various POSIX threads launched in response to incoming WebSocket messages. My understanding is that the mg_queue - being a drop-in replacement for mg_pipe() - is a lock-less thread-safe queue. Prior to using mg_queue I had been using the TCP/UDP mg_pipe() which was fully thread-safe and able to operate with multiple producers. So naturally I assumed that the replacement for the excellent mg_pipe() would be equivalent or better (certainly not a regression).

from mongoose.

cpq avatar cpq commented on May 29, 2024

Trimming down my application is not so simple, it is HUGE and it also runs libmicrohttpd as its main multithreaded HTTP

What?
Are you running libmicrohttpd AND mongoose in the same app?
Can I ask you, why not resorting to only one? If you use libmicrohttpd as the main engine, why do you need mongoose then?

from mongoose.

jvo203 avatar jvo203 commented on May 29, 2024

Because libmicrohttpd does not have WebSockets! I use mongoose for the WebSockets stuff as well as for intra-node communication via POST HTTP. Handling HTTP POST in libmicrohttpd is inconvenient. libmicrohttpd handles the incoming traffic from the clients worldwide. Naturally mongoose operates on a different port.

Intra-node: this is a single-server as well as distributed cluster application. With custom HTTP messages flying between cluster nodes.

from mongoose.

jvo203 avatar jvo203 commented on May 29, 2024

Single producer / single consumer: so I guess protecting writes to the mg_queue mg_queue_add(&session->queue, _len); with a mutex is necessary?

from mongoose.

cpq avatar cpq commented on May 29, 2024

I'm afraid I have been operating it as a multiple producer / single consumer. The consumer is a mongoose event loop. The producers are various POSIX threads launched in response to incoming WebSocket messages.

That's not an intended usage, you can't use multiple producers.

Try to pull the "pair " branch: https://github.com/cesanta/mongoose/tree/pair
And use the multi-threaded approach from there: https://github.com/cesanta/mongoose/blob/pair/examples/multi-threaded/main.c

Let us know if that works for you. With that, every notification from the worker thread automatically wakes up Mongoose, so no need for a tight loop.

And of course we suggest to make yourself a favor and drop libmicrohttpd, use Mongoose all the way through.

from mongoose.

jvo203 avatar jvo203 commented on May 29, 2024

And of course we suggest to make yourself a favor and drop libmicrohttpd, use Mongoose all the way through.

There is one big advantage to libmicrohttpd: it handles HTTP responses from Unix pipes, which makes HTTP streaming extremely convenient. You create a Unix pipe, pass the write end to a producer thread and then do a single call to

// create a response from a pipe by passing the read end of the pipe
struct MHD_Response *response = MHD_create_response_from_pipe(pipefd[0]);

That's all that's needed for streaming. No messing around with an event loop. So design-wise each library has its own strengths and weaknesses.

from mongoose.

jvo203 avatar jvo203 commented on May 29, 2024

Try to pull the "pair " branch: https://github.com/cesanta/mongoose/tree/pair And use the multi-threaded approach from there: https://github.com/cesanta/mongoose/blob/pair/examples/multi-threaded/main.c

Let us know if that works for you. With that, every notification from the worker thread automatically wakes up Mongoose, so no need for a tight loop.

Well I had been using the mongoose mkpipe() without any problems for a long time, prior to mongoose dropping it! I liked it, it was very reliable and did not need an extra UDP wake-up helper at all.

from mongoose.

scaprile avatar scaprile commented on May 29, 2024

We needed a way to serve embedded scenarios with RTOS and no socket support.
We try not to have different usage templates on different architectures as much as possible.

from mongoose.

jvo203 avatar jvo203 commented on May 29, 2024

I see.

from mongoose.

jvo203 avatar jvo203 commented on May 29, 2024

There is one disadvantage of the mkpipe() approach. Given a large number of WebSocket connections one needs to loop through most of them in order to identify the right one to send the message to:

 if (ev == MG_EV_READ) {
    // Got data! Iterate over all connections, and find our parent
    for (struct mg_connection *t = c->mgr->conns; t != NULL; t = t->next) {
      if (t->id != id) continue;

It has always had me worried performance-wise, this for-loop bottleneck...

from mongoose.

scaprile avatar scaprile commented on May 29, 2024

Well, Mongoose target is embedded, and usually runs on embedded hardware with just a bunch of connections, all our connection poll loops are for loops, we don't run hash-table-driven searches as we don't expect thousands of connections.
IO Buffer resize could be another issue for you, make sure you set it large enough beforehand https://mongoose.ws/documentation/#io-buffers

from mongoose.

jvo203 avatar jvo203 commented on May 29, 2024

The alternative is to stick with the current mg_queue but re-write my application so that there is only one POSIX thread issuing mg_queue_add(&session->queue, _len);. As a matter of fact I have been re-writing the WebSocket handler threads in recent days, adding a mutex-protected ring buffer and a separate POSIX event loop thread (with a POSIX condition waking it up):

/* wait on a condition variable */
pthread_cond_wait(&session->ws_cond, &session->ws_cond_mtx);

The aim is to reduce the stress on the server, prevent the application from launching more computation threads than the server can handle. A ring buffer naturally controls the load on the server. Any old messages still waiting in a ring buffer are getting over-written by newer ones, which is perfectly OK in my case. No need to handle old (stale) messages under a heavy load.

from mongoose.

jvo203 avatar jvo203 commented on May 29, 2024

The above ring-buffer approach would turn the usage of the mg_queue into a correct single producer / single consumer one.

from mongoose.

scaprile avatar scaprile commented on May 29, 2024

Yeah, I imagined that.

We are still analyzing what side effects this call might have (this is not our priority) and brainstorming for other options

stay tuned

from mongoose.

cpq avatar cpq commented on May 29, 2024

You create a Unix pipe, pass the write end to a producer thread and then do a single call to

So, the thing that generates HTTP response, is another process that opens a pipe and printf() the response there? Do you find that useful? If you can elaborate on it, we might consider adding that to Mongoose.

from mongoose.

jvo203 avatar jvo203 commented on May 29, 2024

Not another process, another POSIX thread. A FORTRAN thread to be precise, launched from within the C code. The data is binary, printf() is not used. The FORTRAN subroutine thread does its work, calling a C write(int fd) function with the write end of the Unix pipe in order to write the binary data to the C pipe.

FORTRAN

      ! stream the FITS file from C, using a special FITSIO filter to select a sub-region
      call download_response(req%fd, item%uri//trim(filter)//c_null_char)

      ! close the connection, release pointers
      ! the pipe (or a FILE stream associated with it) will be closed in C
      nullify (item)
      nullify (req) ! disassociate the FORTRAN pointer from the C memory region
      call free(user) ! release C memory
      return

libmicrohttpd interaction (HTTP streaming). Streaming is used extensively, not just with FORTRAN but with C response threads too, for example to stream data coming out of a gzip encoder; this binary data comes out in small chunks and is streamed over HTTP. Mostly I stream BINARY data not text, don't use printf(), use binary write(int fd, ...) extensively. The binary data comes from the ZFP and LZ4 compressors too. ZFP is used to compress scientific floating-point arrays. Binary streaming is done either directly from RAM or from disk files (FITS files, selecting sub-regions from files using the CFITSIO NASA library).

// open a pipe
        status = pipe(pipefd);

        if (0 != status)
        {
            // deallocate datasetId
            for (int i = 0; i < va_count; i++)
                free(datasetId[i]);
            free(datasetId);

            return http_internal_server_error(connection);
        }

//(...)

// create a response from a pipe by passing the read end of the pipe
        struct MHD_Response *response = MHD_create_response_from_pipe(pipefd[0]);

        // add headers
        MHD_add_response_header(response, "Cache-Control", "no-cache");
        MHD_add_response_header(response, "Cache-Control", "no-store");
        MHD_add_response_header(response, "Pragma", "no-cache");

        MHD_add_response_header(response, "Content-Type", "application/force-download");
        MHD_add_response_header(response, "Content-Disposition", filename);
        MHD_add_response_header(response, "Content-Transfer-Encoding", "binary");
        MHD_add_response_header(response, "Accept-Ranges", "bytes");

        // queue the response
        enum MHD_Result ret = MHD_queue_response(connection, MHD_HTTP_OK, response);

        MHD_destroy_response(response);

        // the code below should be run in a separate thread
        // otherwise libmicrohttpd will not have a chance to read from the pipe

        // pass the write end of the pipe to Fortran
        // the binary response data will be generated in Fortran
        // printf("[C] calling viewport_request with the pipe file descriptor %d\n", pipefd[1]);

        // got all the data, prepare a request structure and pass it to FORTRAN
        struct download_request *req = (struct download_request *)malloc(sizeof(struct image_spectrum_request));

        if (req != NULL)
        {
            req->x1 = x1;
            req->x2 = x2;
            req->y1 = y1;
            req->y2 = y2;
            req->frame_start = frame_start;
            req->frame_end = frame_end;
            req->ref_freq = ref_freq;

            req->fd = pipefd[1];
            req->ptr = item;

            // create and detach the FORTRAN thread
            int stat = -1;

            if (va_count == 1)
                stat = pthread_create(&tid, NULL, &download_request, req);
            else
            {
                // create a new composite_download_request
                struct composite_download_request *creq = (struct composite_download_request *)malloc(sizeof(struct composite_download_request));

                if (creq == NULL)
                {
                    close(pipefd[1]);
                    free(req);

                    // deallocate datasetId
                    for (int i = 0; i < va_count; i++)
                        free(datasetId[i]);
                    free(datasetId);

                    return http_internal_server_error(connection);
                }

                // duplicate the datasetId
                creq->datasetId = (char **)malloc(va_count * sizeof(char *));
                for (int i = 0; i < va_count; i++)
                    creq->datasetId[i] = strdup(datasetId[i]);

                creq->va_count = va_count;
                creq->req = req;

                // launch a C thread calling handle_composite_download_request
                stat = pthread_create(&tid, NULL, &handle_composite_download_request, creq);
            }

            if (stat == 0)
                pthread_detach(tid);
            else
            {
                close(pipefd[1]);
                free(req);
            }
        }
        else
            close(pipefd[1]);

        // deallocate datasetId
        for (int i = 0; i < va_count; i++)
            free(datasetId[i]);
        free(datasetId);

        return ret;

To be honest, ripping out libmicrohttpd is not that simple. I have been using libmicrohttpd since version 2 of this software (about 9 years ago). Versions 2 and 3 used C libmicrohttpd coupled with the C++ uWebSockets library. Version 4 made a switch to Rust (https://github.com/jvo203/fits_web_ql). In the current version 5 (a cluster edition) I use a combination of FORTRAN and C (NO C++), plus re-using the libmicrohttpd code extensively, with mongoose handling WebSockets + internal cluster communications via HTTP (to be precise some HTTP responses go via libmicrohttpd port, the others - mostly HTTP POST - go via the mongoose port so it's a mixture of both! It's complicated, don't ask!).

from mongoose.

jvo203 avatar jvo203 commented on May 29, 2024

OK, please could you NOT delete the newly-created pair mongoose branch? I've just created a pair branch of FITSWEBQLSE and am testing the resurrected mg_mkpipe() functionality. Here is the event loop. You might notice there is no for-loop bottleneck in there anymore. Instead the session structure (retrieved from a hash table using a <session_id> string) stores the relevant WebSocket connection pointer (against your advice!) but there are protections against use after deletion/connection closure: a mutex for the hash table and glibc reference counting for the user session structure.

This code still needs to be tested with valgrind as well as undergo a grueling 12-hour stress-test. Fingers crossed!

// Pipe event handler.
static void mg_pipe_callback(struct mg_connection *c, int ev, void *ev_data, void *fn_data)
{
    if (c->fn_data == NULL)
    {
        c->recv.len = 0;   // Consume received data
        c->is_closing = 1; // And we're done, close this pipe
        return;
    }

    if (ev == MG_EV_CLOSE)
    {
        printf("[C] mg_pipe_callback: MG_EV_CLOSE (%s)\n", (char *)c->fn_data);

        // release the memory
        free(c->fn_data);
        c->fn_data = NULL;
        c->recv.len = 0; // Consume received data
        return;
    }

    if (ev == MG_EV_READ)
    {
        // get a session id string
        char *session_id = (char *)c->fn_data;

        websocket_session *session = NULL;

        // get a session pointer from the hash table
        if (pthread_mutex_lock(&sessions_mtx) == 0)
        {
            session = g_hash_table_lookup(sessions, (gconstpointer)session_id);
            if (session != NULL)
                g_atomic_rc_box_acquire(session);

            pthread_mutex_unlock(&sessions_mtx);
        }

        if (session == NULL)
        {
            printf("[C] mg_pipe_callback session %s not found.\n", session_id);
            c->recv.len = 0;   // Consume received data
            c->is_closing = 1; // And we're done, close this pipe
            return;
        }

        if (session->conn == NULL)
        {
            printf("[C] mg_pipe_callback session->conn is NULL.\n");
            c->recv.len = 0;   // Consume received data
            c->is_closing = 1; // And we're done, close this pipe

            g_atomic_rc_box_release_full(session, (GDestroyNotify)delete_session);
            session = NULL;

            return;
        }

        int i, n;
        size_t offset;

        n = c->recv.len / sizeof(struct websocket_message);

#ifdef DEBUG
        printf("[C] mg_pipe_callback: received %d binary message(s).\n", n);
#endif

        for (offset = 0, i = 0; i < n; i++)
        {
            struct websocket_message *msg = (struct websocket_message *)(c->recv.buf + offset);
            offset += sizeof(struct websocket_message);

#ifdef DEBUG
            printf("[C] found a WebSocket connection, sending %zu bytes.\n", msg->len);
#endif
            if (msg->len > 0 && msg->buf != NULL && session->conn->is_websocket)
                mg_ws_send(session->conn, msg->buf, msg->len, WEBSOCKET_OP_BINARY);

            // release memory
            if (msg->buf != NULL)
            {
                free(msg->buf);
                msg->buf = NULL;
                msg->len = 0;
            }
        }

         c->recv.len = 0; // Consume received data

        g_atomic_rc_box_release_full(session, (GDestroyNotify)delete_session);
        session = NULL;
    }

    (void)ev_data;
    (void)fn_data;
}

from mongoose.

jvo203 avatar jvo203 commented on May 29, 2024

An alternative pipe event loop with the said bottleneck for-loop but without storing the WebSocket connection pointer. One could say it is preferable since there are no mutexes to wait on / glibc reference counters to acquire:

// Pipe event handler.
static void mg_pipe_callback_loop(struct mg_connection *c, int ev, void *ev_data, void *fn_data)
{
    if (c->fn_data == NULL)
    {
        c->recv.len = 0;   // Consume received data
        c->is_closing = 1; // And we're done, close this pipe
        return;
    }

    if (ev == MG_EV_CLOSE)
    {
        printf("[C] mg_pipe_callback: MG_EV_CLOSE (%s)\n", (char *)c->fn_data);

        // release the memory
        free(c->fn_data);
        c->fn_data = NULL;
        c->recv.len = 0; // Consume received data
        return;
    }

    if (ev == MG_EV_READ)
    {
        // get a session id string
        char *session_id = (char *)c->fn_data;

        struct mg_connection *conn = NULL;
        for (struct mg_connection *t = c->mgr->conns; t != NULL; t = t->next)
        {
            // do not bother comparing strings for non-WebSocket connections
            if (!t->is_websocket)
                continue;

            websocket_session *session = (websocket_session *)t->fn_data;

            if (session == NULL)
                continue;

            if (strcmp(session->id, session_id) == 0)
            {
#ifdef DEBUG
                printf("[C] found a WebSocket connection for %s.\n", session_id);
#endif
                conn = t;
                break;
            }
        }

        int i, n;
        size_t offset;

        n = c->recv.len / sizeof(struct websocket_message);

#ifdef DEBUG
        printf("[C] mg_pipe_callback: received %d binary message(s).\n", n);
#endif

        for (offset = 0, i = 0; i < n; i++)
        {
            struct websocket_message *msg = (struct websocket_message *)(c->recv.buf + offset);
            offset += sizeof(struct websocket_message);

#ifdef DEBUG
            printf("[C] found a WebSocket connection, sending %zu bytes.\n", msg->len);
#endif
            if (conn != NULL && msg->len > 0 && msg->buf != NULL)
                mg_ws_send(conn, msg->buf, msg->len, WEBSOCKET_OP_BINARY);

            // release memory
            if (msg->buf != NULL)
            {
                free(msg->buf);
                msg->buf = NULL;
                msg->len = 0;
            }
        }

        c->recv.len = 0; // Consume received data
    }

    (void)ev_data;
    (void)fn_data;
}

from mongoose.

jvo203 avatar jvo203 commented on May 29, 2024

FYI, the intensive 12-hour stress tests of the revised code (three ring buffers with three event-loop POSIX threads for the real-time spectrum/video/p-v diagram WebSockets requests) coupled with your resurrected thread-safe mg_mkpipe() functionality have not brought up any problems. Am using the TCP socket, not UDP, in the mg_mkpipe().

Passing with flying colours each night.

from mongoose.

scaprile avatar scaprile commented on May 29, 2024

UDP guarantees message boundaries , TCP does not.
We are still brainstorming on this

from mongoose.

jvo203 avatar jvo203 commented on May 29, 2024

Oh bugger! The current code calculates the number of messages based on the length of the TCP message divided by the size of the transmitted structure. The transmitted binary structure size is always the same.

There may be two or more messages concatenated within a single transmission, but I've been relying on the transmitted size being an exact multiple of the transmitted message structure...

Perhaps it's time to switch to UDP then. UDP is not 100% reliable though but, it's a local connection on the same machine so (hopefully) even with UDP all the messages should get through. Or so one would hope for...

from mongoose.

scaprile avatar scaprile commented on May 29, 2024

Well, we chose UDP to keep the notion of "message", It is as reliable as the underlying network, it won't retry, as TCP does. A back-to-back socket connection should not lose messages, at least not in something that claims to be an OS. UDP dropping usually happens on network hogging, in routers, mostly, several interfaces competing to fill one or two, priority queues, regular queues, those start to fill up and then you have to choose what to discard before start discarding everything. Something like that should not happen here.

from mongoose.

jvo203 avatar jvo203 commented on May 29, 2024

That's alright then. I'll switch the pipe over to UDP and re-test everything thoroughly.

from mongoose.

jvo203 avatar jvo203 commented on May 29, 2024

During recent tests I've found a macOS-specific bug in mongoose. When a client-side WebSocket connection is closed the following user session structure gets released:

typedef struct
{
    char *datasetid; // a single id
    char *multi;     // multiple ids are separated by a semicolon
    char *id;        // sessionId

    // the WebSocket communications channel (mg_pipe)
    int channel;

    // etc...
} websocket_session;

Upon its release

// close the socket pipe
close(session->channel);

is called in order to close the mongoose socket pipe and remove it from the list of mongoose connections.

Under Linux MG_EV_CLOSE is promptly triggered (as it should) and the pipe is getting closed cleanly. However, in macOS the MG_EV_CLOSE event is not triggered "here and now". Instead it only gets triggered upon a Ctrl-C shutdown of the application. In other words, under macOS defunct zombie pipe connections keep polluting the mongoose connection list, they never get removed when they should be. There is a serious consequence: the mongoose connection list keeps growing (indefinitely?) until the program shutdown a few months into the future.

Here is an example of macOS behaviour, as you can see the Ctrl-C has been triggered and only then do the pipes get closed properly:

^C[C] Interrupt signal [2] received.
[C] mg_pipe_callback: MG_EV_CLOSE (66e3fc66-073f-4fe9-b2ad-ec8c47694baf)
[C] mg_pipe_callback: MG_EV_CLOSE (fc6d2ec6-b6b5-45a8-85da-dd1364588170)
[C] mg_pipe_callback: MG_EV_CLOSE (f4c172db-d42c-481e-96a5-afbb2d0f3a72)
[C] mg_pipe_callback: MG_EV_CLOSE (c53e3061-dbb9-4343-8d6f-8edbe2822d8d)
[C] shutting down the µHTTP daemon... done
garbage collection thread terminated.

I guess under the hood mongoose uses different event polling libraries in Linux and macOS. Somehow the macOS event loop "misbehaves".

Edit: this happens on both Intel and Apple Silicon macOS.

from mongoose.

jvo203 avatar jvo203 commented on May 29, 2024

Thank you.

from mongoose.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.