Giter Site home page Giter Site logo

tokahuke / yaque Goto Github PK

View Code? Open in Web Editor NEW
81.0 81.0 11.0 192 KB

Yaque is yet another disk-backed persistent queue for Rust.

License: Other

Rust 100.00%
async asynchronous asyncio disk filesystem persistence persistent-storage queue resilience rust rust-lang

yaque's Introduction

Hello and welcome to tokahuke!

yaque's People

Contributors

evaporei avatar grant0417 avatar mullr avatar netguy204 avatar tokahuke avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

yaque's Issues

Sender side already in use after Ctrl+C and restart of application

I am running a yaque channel with a gRPC server as a sender and a worker on the other other end processing messages. If I start the application and send a few messages then hit Ctrl+C and try and restart the application I receive a "sender side already in use" error when creating the channel on the same directory. The only way to fix this is to delete the directory and create a fresh one. This makes the queue non-durable and prone to large amounts of data loss on crashes.

Any suggestions?

Certain sequence of bytes can screw up the channel

I feel like this should be documented somewhere, if you happen to send some bytes that matches the header of yaque, then the queue may cease to function. Happened when I tried to get some messagepack through the channel.

Behaviour of `commit` not clear - does commiting also clear the data in the queue?

In this example, we just send and receive to a yaque. After receiving, we commit on the RecvGuard.

Is it intentional that on the next start of the program, the yaque still contains all the previously recvd data?

Running this example twice prints the bytes from the first run during the second run even though they were committed in the first run.

use std::time::Duration;
use yaque::recovery::recover;

#[tokio::main]
async fn main() {
    recover("persistent-data").ok();

    let (mut sender, mut receiver) = yaque::channel("persistent-data").unwrap();

    let mut interval = tokio::time::interval(Duration::from_secs(1));
    let mut counter = 0u64;
    loop {
        tokio::select! {
            _tick = interval.tick() => {
                println!("Enqueuing {counter}");
                let Ok(_) = sender.send(format!("Message {counter}")).await else {
                    break;
                };
                counter += 1;
            }
            guard = receiver.recv() => {
                println!("Got batch");
                let Ok(contents) = guard else {
                    println!("Breaking because guard not OK");
                    break;
                };
                // println!("{}", String::from_utf8((&mut *contents).drain(..).flatten().collect::<Vec<_>>()).unwrap());
                println!("{:?}", String::from_utf8(contents.to_vec()));
                contents.commit().unwrap();
                println!("committed");
            }
        }
    }
}

`xor` is not xor

xor in header.rs does not actually compute the exclusive or of a and b. Instead it checks that a and b are either both true or both false, which is equivalent to a == b. I don't understand what decode does well enough to know if this is the intended behavior or not, but either the name or the implementation is currently wrong. If you do actually need xor there's the ^ operator and the BitXor trait.

`recv()` stuck awaiting

heya ! love the work being done on yaque it is an insanely useful crate :)

Description of issue

I have a long running future as such:

tokio::spawn(async move {
            let mut recv: Receiver = match Receiver::open(qpath) {
                Ok(recv) => recv,
                Err(e) => {
                    log::error!("Error opening receiver: {}", e);
                    return;
                }
            };

            while (*status).load(Ordering::Relaxed) {
                loop {
                    match recv.recv().await {
                        Ok(bytes) => {
                            let bytes_inner = bytes.deref().clone();
                            Self::execute_job_from_bytes(bytes_inner, store.clone()).await;
                            match bytes.commit() {
                                Ok(_) => {}
                                Err(e) => {
                                    log::error!("Error committing to queue: {}", e);
                                }
                            }
                        }
                        Err(e) => log::error!("Error receiving from queue: {:?}", e),
                    }
                }
            }
        })

With a sender that is triggered by API calls in a separate future.

sender
        .send(cbor_bytes)
        .await
        .map_err(|e| warp::reject::custom(Failure::Execute(e.to_string())))?;

However the receiver loop gets stuck at recv.recv().await, never resolving -- even when the sender succesfully sends !

I've dug a bit into yaque to see where it hangs -- and it seems to never resolve on the following line (line 272) of queue/receiver.rs

// Read header:
let mut header = [0; 4];
self.tail_follower.read_exact(&mut header).await?;

Digging further into the generated ReadExact future. read_until_you_drain gets called twice when the queue is empty as dictated by the poll function

impl<'a> Future for ReadExact<'a> {
    type Output = io::Result<()>;
    fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
        self.was_polled = true;
        // See what happens when we read.
        let outcome = self.read_until_you_drain();

        if outcome.is_pending() {
            // Set the waker in the file watcher:
            let mut lock = self.waker.lock().expect("waker mutex poisoned");
            *lock = Some(context.waker().clone());

            // Now, you will have to recheck (TOCTOU!)
            self.read_until_you_drain()
        } else {
            outcome
        }
    }
}

self.read_until_you_drain() returns a Poll::Pending state as we'd expect -- but then ... never gets triggered again so the future hangs indefinitly -- possibly the waker not operating as expected ?

If I ctrlc-c to kill the process then reboot it -- the queue gets read correctly and the logic ensures correctly ... until the queue is empty once more and things hang.

As a work around I currently have replaced recv() with try_recv() which isn't ideal as the loop now spins and consumes the CPU entirely.

Any help on the above would be much appreciated :)

Determining length of queue at startup

Hi! Cool library.

My use case is an IoT system where data is buffered for transmission. In the event of flaky comms, I need to buffer data to disk instead of memory, so this library looks great for this use case. I'm implementing a hybrid memory/disk queue using Yaque and a VecDeque. The idea is that the a fixed amount of memory will be dedicated for buffering, and if the memory limit is exceeded, we'll start buffering to Yaque. Popping elements for transmission will entail loading from Yaque if disk buffering has occurred.

What's the best way to determine the queue length during program initialization?

Is the answer here to just create QueueIter and count them?

Stream or Iterator?

Hi,

on first sight I am wondering if it would make sense to have an iterator/stream to receive the data. Currently I would need to call it in a loop but a stream might be more optimized. I dont have any metrics or something to justify this assumption but its a gut feeling that it would be a nice addition.

yaque drops items?

Here's an example:

use std::path::Path;
use tokio::{
    sync::mpsc::{channel, Receiver, Sender},
};
use yaque::{self, TryRecvError, recovery};

const CHAN_BUF_SIZE: usize = 32;
const BATCH_SIZE: usize = 50;
const QUEUE_PATH: &str = "state/queue";

struct WorkContext {
    dispatch_sender: Sender<DispatchRequest>,
}

async fn work(ctx: WorkContext) {
    loop {
        let (send, mut recv) = channel(CHAN_BUF_SIZE);
        println!("here!");
        ctx.dispatch_sender
            .send(DispatchRequest::GetChunk(send))
            .await
            .unwrap();
        let ids = recv.recv().await.unwrap();
        // do stuff w/ ids
    }
}

#[derive(Debug)]
enum DispatchRequest {
    GetChunk(Sender<Vec<String>>),
}

struct DispatchThread {
    url_receiver: yaque::Receiver,
    url_sender: yaque::Sender,
    receiver: Receiver<DispatchRequest>,
    sender: Sender<DispatchRequest>,
}

impl DispatchThread {
    pub fn new(n_workers: usize) -> DispatchThread {
        if Path::new(QUEUE_PATH).exists() {
            println!("Recovering queue...");
            recovery::recover_with_loss(QUEUE_PATH).unwrap();
        }
        let (url_send, url_recv) = yaque::channel(QUEUE_PATH).unwrap();
        let (send, recv) = channel(CHAN_BUF_SIZE);

        for _ in 0..n_workers {
            let send = send.clone();
            tokio::spawn(async move {
                work(WorkContext {
                    dispatch_sender: send,
                })
                .await;
            });
        }

        DispatchThread {
            url_receiver: url_recv,
            url_sender: url_send,
            receiver: recv,
            sender: send,
        }
    }

    pub async fn add_vid_ids(&mut self, ids: Vec<String>) {
        self.url_sender.send_batch(ids).await.unwrap();
    }

    pub async fn run(&mut self) {
        loop {
            match self.receiver.recv().await.unwrap() {
                DispatchRequest::GetChunk(sender) => {
                    let batch = match self
                        .url_receiver
                        .try_recv_batch(BATCH_SIZE)
                    {
                        Ok(b) => Some(b),
                        Err(e) => match e {
                            TryRecvError::Io(e) => {
                                panic!("io error while receiving: {:?}", e);
                            }
                            TryRecvError::QeueuEmpty => None,
                        },
                    };
                    if let Some(batch) = batch {
                        assert!(batch.len() > 0);
                        let mut urls = vec![];
                        for url_bytes in batch.iter() {
                            urls.push(String::from(std::str::from_utf8(url_bytes).unwrap()));
                        }
                        batch.commit();
                        sender.send(urls).await.unwrap();
                    } else {
                        // recycle the request until we have more work
                        println!("Recycling request...");
                        self.sender
                            .send(DispatchRequest::GetChunk(sender))
                            .await
                            .unwrap();
                    }
                }
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let mut dispatch_thread = DispatchThread::new(1);
    dispatch_thread
        .add_vid_ids(vec![String::from("Wko7I9QcwUQ")])
        .await;
    tokio::spawn(async move {
        dispatch_thread.run().await;
    })
    .await
    .unwrap();
}

Cargo.toml:

[dependencies]
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
yaque = { git = "https://github.com/tokahuke/yaque.git", branch = "try-recv" }

Most of the time, this outputs:

Recovering queue...
here!
Recycling request...
Recycling request...
Recycling request...

However, I would not expected this to be the case because I call add_vid_ids with a value, which should be added into the queue.

Handling of ctrl+c with save is unclear

Hi maybe I am just doing something wrong but I tried to handle ctlrl_c and ran into this:

error[E0499]: cannot borrow `receiver` as mutable more than once at a time
   --> crates/erooster_smtp/src/servers/mod.rs:86:25
    |
84  | /                 tokio::select! {
85  | |                     _ = signal::ctrl_c() => {
86  | |                         receiver.save().expect("Unable to save queue");
    | |                         ^^^^^^^^ second mutable borrow occurs here
87  | |                     },
88  | |                     data = receiver.recv() => {
    | |                            -------- first mutable borrow occurs here
...   |
112 | |                     }
113 | |                 }
    | |_________________- first borrow might be used here, when `output` is dropped and runs the destructor for type `servers::start::{closure#0}::{closure#0}::__tokio_select_util::Out<std::result::Result<(), std::io::Error>, std::result::Result<erooster_deps::yaque::queue::RecvGuard<'_, std::vec::Vec<u8>>, std::io::Error>>`

with this code:

// Start listening for tasks
    let mut receiver = ReceiverBuilder::default()
        .save_every_nth(None)
        .open(config.task_folder.clone());
    if let Err(e) = receiver {
        warn!("Unable to open receiver: {:?}. Trying to recover.", e);
        recover(&config.task_folder)?;
        receiver = ReceiverBuilder::default()
            .save_every_nth(None)
            .open(config.task_folder.clone());
        info!("Recovered queue successfully");
    }

    match receiver {
        Ok(mut receiver) => {
            loop {
                tokio::select! {
                    _ = signal::ctrl_c() => {
                        receiver.save().expect("Unable to save queue");
                    },
                    data = receiver.recv() => {
                        match data {
                            Ok(data) => {
                                let email_bytes = &*data;
                                let email_json = serde_json::from_slice::<EmailPayload>(email_bytes).expect("Unable to parse email payload json");

                                if let Err(e) = send_email_job(&email_json).await {
                                    tracing::error!(
                                        "Error while sending email: {:?}. Adding it to the queue again",
                                        e
                                    );
                                    // FIXME: This can race the lock leading to an error. We should
                                    //        probably handle this better.
                                    let mut sender = Sender::open(config.task_folder.clone()).expect("Unable to open queue sender");
                                    let json_bytes = serde_json::to_vec(&email_json).expect("Unable to convert email to bytes");
                                    sender.send(json_bytes).await.expect("Unable to add email to queue");
                                }
                                // Mark the job as complete
                                data.commit().expect("Unable to commit data");
                            }
                            Err(e) => {
                                tracing::error!("Error while receiving data from receiver: {:?}", e);
                            }
                        }
                    }
                }
            }
        }
        Err(e) => {
            error!("Unable to open receiver: {:?}. Giving up.", e);
        }
    }

The error is obvious and makes sense, but I wonder what is a better way of handling this in a way where I save on crash :)

Tests fail

I've tried running the tests and they give the following output:

otaviopace@mettaton ~/yaque (master) [SIGINT]> cargo test
    Finished test [unoptimized + debuginfo] target(s) in 0.05s
     Running target/debug/deps/yaque-009dcd917f9d7263

running 11 tests
test recovery::tests::test_unlock_inexistent ... ok
test tests::create_and_clear_fails ... ok
test tests::test_dequeue_is_atomic ... FAILED
test tests::create_and_clear ... ok
test tests::create_and_clear_async ... ok
test recovery::tests::test_unlock ... ok
test tests::test_enqueue_and_dequeue ... FAILED
test tests::test_enqueue ... ok
2020-08-22 16:51:25,942 TRACE [yaque] created queue directory
2020-08-22 16:51:25,942 TRACE [yaque] sender lock acquired. Sender state now is QueueState { segment_size: 4194304, segment: 32, position: 2786945 }
2020-08-22 16:51:25,942 TRACE [yaque] last segment opened for appending
2020-08-22 16:51:25,954 TRACE [yaque] created queue directory
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "queue `data/enqueue-dequeue-parallel` receiver side already in use" }', src/lib.rs:858:36
test tests::test_enqueue_then_dequeue ... ok
2020-08-22 16:51:32,500 TRACE [yaque::sync] file guard on `"data/enqueue-dequeue-parallel/send.lock"` dropped
test tests::test_enqueue_dequeue_parallel ... FAILED

Then, after hanging for a while, they output this:

2020-08-22 16:51:07,493 TRACE [yaque::sync] enough! Done reading
2020-08-22 16:51:07,493 TRACE [yaque::sync] reading until drained
2020-08-22 16:51:07,493 TRACE [yaque::sync] read 6 bytes
2020-08-22 16:51:07,493 TRACE [yaque::sync] enough! Done reading
2020-08-22 16:51:07,493 TRACE [yaque::sync] reading until drained
2020-08-22 16:51:07,493 TRACE [yaque::sync] read 4 bytes
2020-08-22 16:51:07,493 TRACE [yaque::sync] enough! Done reading
2020-08-22 16:51:07,493 TRACE [yaque::sync] reading until drained
2020-08-22 16:51:07,493 TRACE [yaque::sync] read 52 bytes
2020-08-22 16:51:07,493 TRACE [yaque::sync] enough! Done reading
2020-08-22 16:51:07,493 TRACE [yaque::sync] reading until drained
2020-08-22 16:51:07,493 TRACE [yaque::sync] read 4 bytes
2020-08-22 16:51:07,493 TRACE [yaque::sync] enough! Done reading
2020-08-22 16:51:07,494 TRACE [yaque::sync] reading until drained
2020-08-22 16:51:07,494 TRACE [yaque::sync] read 4 bytes
2020-08-22 16:51:07,494 TRACE [yaque::sync] enough! Done reading
2020-08-22 16:51:07,494 TRACE [yaque::sync] reading until drained
2020-08-22 16:51:07,494 TRACE [yaque::sync] read 4 bytes

After a couple of tries running them, I got this:

otaviopace@mettaton ~/yaque (master) [SIGINT]> cargo test
    Finished test [unoptimized + debuginfo] target(s) in 0.05s
     Running target/debug/deps/yaque-009dcd917f9d7263

running 11 tests
test recovery::tests::test_unlock_inexistent ... ok
test tests::create_and_clear_fails ... ok
test tests::test_dequeue_is_atomic ... FAILED
test tests::create_and_clear ... ok
test tests::create_and_clear_async ... ok
test recovery::tests::test_unlock ... ok
test tests::test_enqueue_and_dequeue ... FAILED
test tests::test_enqueue ... ok
2020-08-22 16:51:25,942 TRACE [yaque] created queue directory
2020-08-22 16:51:25,942 TRACE [yaque] sender lock acquired. Sender state now is QueueState { segment_size: 4194304, segment: 32, position: 2786945 }
2020-08-22 16:51:25,942 TRACE [yaque] last segment opened for appending
2020-08-22 16:51:25,954 TRACE [yaque] created queue directory
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "queue `data/enqueue-dequeue-parallel` receiver side already in use" }', src/lib.rs:858:36
test tests::test_enqueue_then_dequeue ... ok
2020-08-22 16:51:32,500 TRACE [yaque::sync] file guard on `"data/enqueue-dequeue-parallel/send.lock"` dropped
test tests::test_enqueue_dequeue_parallel ... FAILED
2020-08-22 16:52:15,239 TRACE [yaque] created queue directory
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "queue `data/enqueue-dequeue-parallel-with-batches` sender side already in use" }', src/lib.rs:898:30
test tests::test_enqueue_dequeue_parallel_with_batches ... FAILED

failures:

---- tests::test_dequeue_is_atomic stdout ----
2020-08-22 16:51:17,832 TRACE [yaque] created queue directory
thread 'tests::test_dequeue_is_atomic' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "queue `data/dequeue-is-atomic` sender side already in use" }', src/lib.rs:928:26

---- tests::test_enqueue_and_dequeue stdout ----
2020-08-22 16:51:18,651 TRACE [yaque] created queue directory
thread 'tests::test_enqueue_and_dequeue' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "queue `data/enqueue-and-dequeue` sender side already in use" }', src/lib.rs:820:26

---- tests::test_enqueue_dequeue_parallel stdout ----
thread 'tests::test_enqueue_dequeue_parallel' panicked at 'dequeue thread panicked: Any', src/lib.rs:872:9

---- tests::test_enqueue_dequeue_parallel_with_batches stdout ----
thread 'tests::test_enqueue_dequeue_parallel_with_batches' panicked at 'enqueue thread panicked: Any', src/lib.rs:921:9


failures:
    tests::test_dequeue_is_atomic
    tests::test_enqueue_and_dequeue
    tests::test_enqueue_dequeue_parallel
    tests::test_enqueue_dequeue_parallel_with_batches

test result: FAILED. 7 passed; 4 failed; 0 ignored; 0 measured; 0 filtered out

2020-08-22 16:52:15,240 TRACE [yaque] created queue directory
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "queue `data/enqueue-dequeue-parallel-with-batches` receiver side already in use" }', src/lib.rs:908:21
error: test failed, to rerun pass '--lib'

fuzz failed

thread '<unnamed>' panicked at /tmp/tmp.yvD7plPUDn/yaque/src/queue/receiver.rs:253:9:
There were read and unused items at the end of transaction. Read and unused queue: [[85, 85]]

Output of `std::fmt::Debug`:

        Scenario {
            commands: [
                SendBatch(
                    [
                        [
                            85,
                            85,
                        ],
                    ],
                ),
                Send(
                    [],
                ),
                RecvBatch(
                    14189153571838739689,
                ),
                RecvBatch(
                    0,
                ),
            ],
        }

Can't recv an empty message

Here's a failing test case:

    #[test]
    fn test_try_recv_empty_msg() {
        // Populate a queue:
        let mut sender = SenderBuilder::new()
            .segment_size(512)
            .open("data/try-recv-empty-msg")
            .unwrap();

        sender.try_send(&[]).unwrap();

        let mut receiver = Receiver::open("data/try-recv-empty-msg").unwrap();

        let item = receiver.try_recv().unwrap();
        assert_eq!(&*item, &[]);
        item.commit().unwrap();
    }

The actual behavior is that try_recv returns Err(TryRecvError::QueueEmpty)

Typos

Typos, typos everywhere!

Fuzz testing: RecvBatch strangeness, file handle leaks

I wrote some simple fuzz tests for this library (https://github.com/mullr/yaque/tree/fuzz-tests). The good news is that it works very well for something that's never been fuzzed! I did identify a few issues:

  • It's pretty easy to break when try_recv_batch is used. It panics with There were read and unused items at the end of transaction. Read and unused queue: ..., even though the test is attempting to read everything that's there, using a large count parameter.

  • After running the fuzzer for a little awhile, we run out of file handles. It seems like something in this library is leaking, but it's unclear where or why.

Is there a try_recv like function?

Is there a function that will try receiving, and if the queue has no data, will return None?

It seems like I can manually replicate this now by using recv_timeout with a very small timeout, but wondering if there was a more idiomatic way?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.