Giter Site home page Giter Site logo

Comments (14)

Totodore avatar Totodore commented on June 10, 2024 3

Is it connected with a polling connection or websocket ? I suspect that there is an encoding processus that blocks the thread and therefore blocks the event system.
I currently can't check anything but it will be on my to-do list when I get back to work :)

from socketioxide.

arthurguedes375 avatar arthurguedes375 commented on June 10, 2024 1

It's a websocket connection. Of course, i didn't hard code it to be websockets, but i made sure that it was a websocket connection looking at the developer tools.

from socketioxide.

HRKings avatar HRKings commented on June 10, 2024 1

@Totodore here we go: https://github.com/HRKings/socketoxide-tick-system/tree/nohack
It's on a separate branch, the master branch contains the code with the hack.
The project is using the defaults, and it's blocking with only one socket connected, if I change to other path (other than '/', the other path works).

from socketioxide.

Totodore avatar Totodore commented on June 10, 2024 1

@HRKings Found the bug, your "simulation::runner" never yields and therefore is blocking the entire tokio scheduler. You either have to replace your "tick/wait" system to use tokio::time which will yields between task execution. Or to remove the async and use tokio::task::spawn_blocking.

let simulation_thread =
        tokio::task::spawn_blocking(move || simulation::runner::new(io, simulation_receiver));

Basically this is probably the exact same issue with you @arthurguedes375. If you want to find what task you spawned is blocking everything you can use https://github.com/tokio-rs/console. It is very useful.

from socketioxide.

HRKings avatar HRKings commented on June 10, 2024

Hey! I don't know if we are running in the same issue, but I have a tick system that runs 100 updates per second and the socket gets stuck and the connection dies after a while. I've made a quick hack to get it working:

fn emit_async(
    socket_io: SocketIo,
    event: impl Into<Cow<'static, str>> + std::marker::Send + 'static,
    data: Value,
) -> JoinHandle<Result<(), BroadcastError>> {
    tokio::spawn(async move { socket_io.of("/").unwrap().emit(event, data) })
}

And then I emit like so:

_ = emit_async(socket_io.clone(),
    "event",
    json!("Test"),
)
.await
.unwrap();

This makes everything run smoothly (Although I didn't test the implications on resource usage) and doesn't block anything

Don't know if this helps with anything. But thanks for the amazing crate!

from socketioxide.

Totodore avatar Totodore commented on June 10, 2024

@arthurguedes375 @HRKings Could you give me a minimum reproductible example (full code).
These kind of problem are a lot context based and I need some more info to find what could go wrong, in particular:

  • Socketioxide version
  • Max buffer size
  • Single or multi thread tokio runtime
  • How many sockets connected
  • What event freq (in & out)

I'm not sure it is the same problem between both of you, @arthurguedes375 did you try to put your binary sending code in a tokio::task_spawn_blocking? If everything blocks because of heavy binary processing, this could help.

from socketioxide.

arthurguedes375 avatar arthurguedes375 commented on June 10, 2024

@arthurguedes375 did you try to put your binary sending code in a tokio::task_spawn_blocking

I can't use tokio to spawn threads because my env is not async;

I have a "on_connect" function that is not async. Within the "on_connect" i am spawning two "std::thread::spawn" and then i am calling

worker1.join();
worker2.join();

When i use tokio's thread api, i can't call ".join()", i need to call ".await" but i can't because i'm within a sync function...

An example would be something like this:

fn on_connect(&self, socket: &SocketRef, __socket_data: &Value) -> Result<(), String> {

    socket.on("start_streaming", |socket| {
        let socket_emitting = thread::spawn(move || {
             // frame_chunk is a vector containing [u8;500] (five hundred u8 numbers)
             for frame_chunk in rx {
                   let emit_status = socket.bin(vec![frame_chunck]).emit("streaming_data", ());
              }
        });
        let tx_emitting = thread::spawn(move || {
             for i in 0..1000 {
                    tx.send(vec![0u8; 500])
             }
        });
       socket_emitting.join();
       tx_emitting.join();
    });
    Ok(())
}

from socketioxide.

Totodore avatar Totodore commented on June 10, 2024

It seems that it comes from a pipe task between the internal channel of socketioxide and the websocket connection that is never waken. (The task is stuck in SCHEDULED status). However this happen only when there is one / two sockets connected. When there are more all the task are normally processed (the first two socket pipe task are slower though).

I verified this with the example of @HRKings, see specific socketioxide branch to reproduce.

Could you verify on your side that when you have more than 2 socket, this bug doesn't occur? So that we are sure this is the same issue between you two.

from socketioxide.

arthurguedes375 avatar arthurguedes375 commented on June 10, 2024

Hey, ive tried to spawn an entire tokio muti threaded runtime and ran the code on a "block_on". Inside the loop, i yield after sending the socket_bin. The problem is that all the tasks that "never yield" are not mine, they are tokio's...
Inside console it says that 4 threads never yield. None of these 4 actually are part of my code.

I'm still with the same bug that the "keeping stream" only show after i stop sending bin data through the sockets

I've also tried to change the position of the "yield_now" and none of them change the result.
When i use the "spawn_blocking" for some reason, the block_on loses it's waker.

    let socket_emitting = std::thread::spawn(move || {

        // Creates a Tokio runtime that is needed to use task::spaw_blocking
        let runtime = Arc::new(
            tokio::runtime::Builder::new_multi_thread()
                .enable_all()
                .build()
                .unwrap(),
        );

        async fn handle_channel(
            rx: mpsc::Receiver<Vec<u8>>,
            socket: SocketRef,
            keep_alive: Arc<Mutex<Instant>>,
        ) {
            for frame_chunck in rx {
                let keep = *(keep_alive.lock().unwrap());
                if keep.elapsed().as_millis() > 5000 {
                    break;
                }

                socket.bin(vec![frame_chunck]).emit("streaming_data", ());
                yield_now().await
            }
        }

        runtime.block_on(handle_channel(rx, socket, keep_alive));
        info!("Dropping transmission receiver");
    });

    /** Lots of other not related code */
    
    socket_emitting.join();

from socketioxide.

Totodore avatar Totodore commented on June 10, 2024

Hey, ive tried to spawn an entire tokio muti threaded runtime and ran the code on a "block_on". Inside the loop, i yield after sending the socket_bin. The problem is that all the tasks that "never yield" are not mine, they are tokio's...
Inside console it says that 4 threads never yield. None of these 4 actually are part of my code.

I'm still with the same bug that the "keeping stream" only show after i stop sending bin data through the sockets

I've also tried to change the position of the "yield_now" and none of them change the result.
When i use the "spawn_blocking" for some reason, the block_on loses it's waker.

    let socket_emitting = std::thread::spawn(move || {

        // Creates a Tokio runtime that is needed to use task::spaw_blocking
        let runtime = Arc::new(
            tokio::runtime::Builder::new_multi_thread()
                .enable_all()
                .build()
                .unwrap(),
        );

        async fn handle_channel(
            rx: mpsc::Receiver<Vec<u8>>,
            socket: SocketRef,
            keep_alive: Arc<Mutex<Instant>>,
        ) {
            for frame_chunck in rx {
                let keep = *(keep_alive.lock().unwrap());
                if keep.elapsed().as_millis() > 5000 {
                    break;
                }

                socket.bin(vec![frame_chunck]).emit("streaming_data", ());
                yield_now().await
            }
        }

        runtime.block_on(handle_channel(rx, socket, keep_alive));
        info!("Dropping transmission receiver");
    });

    /** Lots of other not related code */
    
    socket_emitting.join();

Could you post a screenshot of the Tokio console with the blocked task ?

from socketioxide.

arthurguedes375 avatar arthurguedes375 commented on June 10, 2024

Sure!
Sometimes it tells me that besides those 4 threads that never yield, one of the socketioxide thread "has awakened it self more than 50% of the times".

image

Image of the entire line

image

from socketioxide.

arthurguedes375 avatar arthurguedes375 commented on June 10, 2024

Oh, and btw, i couldn't really understand where the bug actually happens. is it in fact a socketioxide bug or not ?
And, just for curiosity, is the expected behavior that i described in my issue correct (the one that states that the events should get handled independently even when there may be a thread blocking the entire runtime)?

from socketioxide.

Totodore avatar Totodore commented on June 10, 2024

Your bug is more about mixing blocking code and non blocking code by spawning multiple runtimes. I think you should not do that and use a multi threaded Tokio env and use spawn_blocking for any blocking task and use normally socketioxide and Axum.

from socketioxide.

arthurguedes375 avatar arthurguedes375 commented on June 10, 2024

Your bug is more about mixing blocking code and non blocking code by spawning multiple runtimes. I think you should not do that and use a multi threaded Tokio env and use spawn_blocking for any blocking task and use normally socketioxide and Axum.

I can't do that because I'm designing a modular system which receives a function inside a trait to handle events. But thank you for the insight

from socketioxide.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.