Hello and welcome to tokahuke
!
tokahuke / yaque Goto Github PK
View Code? Open in Web Editor NEWYaque is yet another disk-backed persistent queue for Rust.
License: Other
Yaque is yet another disk-backed persistent queue for Rust.
License: Other
Hello and welcome to tokahuke
!
I am running a yaque channel with a gRPC server as a sender and a worker on the other other end processing messages. If I start the application and send a few messages then hit Ctrl+C and try and restart the application I receive a "sender side already in use" error when creating the channel on the same directory. The only way to fix this is to delete the directory and create a fresh one. This makes the queue non-durable and prone to large amounts of data loss on crashes.
Any suggestions?
I feel like this should be documented somewhere, if you happen to send some bytes that matches the header of yaque, then the queue may cease to function. Happened when I tried to get some messagepack through the channel.
The Receiver<T>
consumes the items from the queue (like .into_iter()
). It Would Be Nice© to have an equivalent to .iter()
.
In this example, we just send and receive to a yaque. After receiving, we commit on the RecvGuard
.
Is it intentional that on the next start of the program, the yaque still contains all the previously recv
d data?
Running this example twice prints the bytes from the first run during the second run even though they were committed in the first run.
use std::time::Duration;
use yaque::recovery::recover;
#[tokio::main]
async fn main() {
recover("persistent-data").ok();
let (mut sender, mut receiver) = yaque::channel("persistent-data").unwrap();
let mut interval = tokio::time::interval(Duration::from_secs(1));
let mut counter = 0u64;
loop {
tokio::select! {
_tick = interval.tick() => {
println!("Enqueuing {counter}");
let Ok(_) = sender.send(format!("Message {counter}")).await else {
break;
};
counter += 1;
}
guard = receiver.recv() => {
println!("Got batch");
let Ok(contents) = guard else {
println!("Breaking because guard not OK");
break;
};
// println!("{}", String::from_utf8((&mut *contents).drain(..).flatten().collect::<Vec<_>>()).unwrap());
println!("{:?}", String::from_utf8(contents.to_vec()));
contents.commit().unwrap();
println!("committed");
}
}
}
}
xor
in header.rs does not actually compute the exclusive or of a
and b
. Instead it checks that a
and b
are either both true or both false, which is equivalent to a == b
. I don't understand what decode
does well enough to know if this is the intended behavior or not, but either the name or the implementation is currently wrong. If you do actually need xor
there's the ^
operator and the BitXor
trait.
Something like bounded spsc?
send(data) -> Result
should fail when the queue reach the limit.
heya ! love the work being done on yaque
it is an insanely useful crate :)
I have a long running future as such:
tokio::spawn(async move {
let mut recv: Receiver = match Receiver::open(qpath) {
Ok(recv) => recv,
Err(e) => {
log::error!("Error opening receiver: {}", e);
return;
}
};
while (*status).load(Ordering::Relaxed) {
loop {
match recv.recv().await {
Ok(bytes) => {
let bytes_inner = bytes.deref().clone();
Self::execute_job_from_bytes(bytes_inner, store.clone()).await;
match bytes.commit() {
Ok(_) => {}
Err(e) => {
log::error!("Error committing to queue: {}", e);
}
}
}
Err(e) => log::error!("Error receiving from queue: {:?}", e),
}
}
}
})
With a sender that is triggered by API calls in a separate future.
sender
.send(cbor_bytes)
.await
.map_err(|e| warp::reject::custom(Failure::Execute(e.to_string())))?;
However the receiver loop gets stuck at recv.recv().await
, never resolving -- even when the sender succesfully sends !
I've dug a bit into yaque to see where it hangs -- and it seems to never resolve on the following line (line 272) of queue/receiver.rs
// Read header:
let mut header = [0; 4];
self.tail_follower.read_exact(&mut header).await?;
Digging further into the generated ReadExact
future. read_until_you_drain
gets called twice when the queue is empty as dictated by the poll function
impl<'a> Future for ReadExact<'a> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
self.was_polled = true;
// See what happens when we read.
let outcome = self.read_until_you_drain();
if outcome.is_pending() {
// Set the waker in the file watcher:
let mut lock = self.waker.lock().expect("waker mutex poisoned");
*lock = Some(context.waker().clone());
// Now, you will have to recheck (TOCTOU!)
self.read_until_you_drain()
} else {
outcome
}
}
}
self.read_until_you_drain()
returns a Poll::Pending
state as we'd expect -- but then ... never gets triggered again so the future hangs indefinitly -- possibly the waker not operating as expected ?
If I ctrlc-c to kill the process then reboot it -- the queue gets read correctly and the logic ensures correctly ... until the queue is empty once more and things hang.
As a work around I currently have replaced recv()
with try_recv()
which isn't ideal as the loop now spins and consumes the CPU entirely.
Any help on the above would be much appreciated :)
Hi! Cool library.
My use case is an IoT system where data is buffered for transmission. In the event of flaky comms, I need to buffer data to disk instead of memory, so this library looks great for this use case. I'm implementing a hybrid memory/disk queue using Yaque and a VecDeque. The idea is that the a fixed amount of memory will be dedicated for buffering, and if the memory limit is exceeded, we'll start buffering to Yaque. Popping elements for transmission will entail loading from Yaque if disk buffering has occurred.
What's the best way to determine the queue length during program initialization?
Is the answer here to just create QueueIter and count them?
Hi,
on first sight I am wondering if it would make sense to have an iterator/stream to receive the data. Currently I would need to call it in a loop but a stream might be more optimized. I dont have any metrics or something to justify this assumption but its a gut feeling that it would be a nice addition.
Here's an example:
use std::path::Path;
use tokio::{
sync::mpsc::{channel, Receiver, Sender},
};
use yaque::{self, TryRecvError, recovery};
const CHAN_BUF_SIZE: usize = 32;
const BATCH_SIZE: usize = 50;
const QUEUE_PATH: &str = "state/queue";
struct WorkContext {
dispatch_sender: Sender<DispatchRequest>,
}
async fn work(ctx: WorkContext) {
loop {
let (send, mut recv) = channel(CHAN_BUF_SIZE);
println!("here!");
ctx.dispatch_sender
.send(DispatchRequest::GetChunk(send))
.await
.unwrap();
let ids = recv.recv().await.unwrap();
// do stuff w/ ids
}
}
#[derive(Debug)]
enum DispatchRequest {
GetChunk(Sender<Vec<String>>),
}
struct DispatchThread {
url_receiver: yaque::Receiver,
url_sender: yaque::Sender,
receiver: Receiver<DispatchRequest>,
sender: Sender<DispatchRequest>,
}
impl DispatchThread {
pub fn new(n_workers: usize) -> DispatchThread {
if Path::new(QUEUE_PATH).exists() {
println!("Recovering queue...");
recovery::recover_with_loss(QUEUE_PATH).unwrap();
}
let (url_send, url_recv) = yaque::channel(QUEUE_PATH).unwrap();
let (send, recv) = channel(CHAN_BUF_SIZE);
for _ in 0..n_workers {
let send = send.clone();
tokio::spawn(async move {
work(WorkContext {
dispatch_sender: send,
})
.await;
});
}
DispatchThread {
url_receiver: url_recv,
url_sender: url_send,
receiver: recv,
sender: send,
}
}
pub async fn add_vid_ids(&mut self, ids: Vec<String>) {
self.url_sender.send_batch(ids).await.unwrap();
}
pub async fn run(&mut self) {
loop {
match self.receiver.recv().await.unwrap() {
DispatchRequest::GetChunk(sender) => {
let batch = match self
.url_receiver
.try_recv_batch(BATCH_SIZE)
{
Ok(b) => Some(b),
Err(e) => match e {
TryRecvError::Io(e) => {
panic!("io error while receiving: {:?}", e);
}
TryRecvError::QeueuEmpty => None,
},
};
if let Some(batch) = batch {
assert!(batch.len() > 0);
let mut urls = vec![];
for url_bytes in batch.iter() {
urls.push(String::from(std::str::from_utf8(url_bytes).unwrap()));
}
batch.commit();
sender.send(urls).await.unwrap();
} else {
// recycle the request until we have more work
println!("Recycling request...");
self.sender
.send(DispatchRequest::GetChunk(sender))
.await
.unwrap();
}
}
}
}
}
}
#[tokio::main]
async fn main() {
let mut dispatch_thread = DispatchThread::new(1);
dispatch_thread
.add_vid_ids(vec![String::from("Wko7I9QcwUQ")])
.await;
tokio::spawn(async move {
dispatch_thread.run().await;
})
.await
.unwrap();
}
Cargo.toml:
[dependencies]
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
yaque = { git = "https://github.com/tokahuke/yaque.git", branch = "try-recv" }
Most of the time, this outputs:
Recovering queue...
here!
Recycling request...
Recycling request...
Recycling request...
However, I would not expected this to be the case because I call add_vid_ids
with a value, which should be added into the queue.
An old version of ntapi
(required by sysinfo
) is causing build-failures on some systems (see MSxDOS/ntapi#19). 88bbf18 already fixes this, so for the time being i can just the master branch.
Hi maybe I am just doing something wrong but I tried to handle ctlrl_c and ran into this:
error[E0499]: cannot borrow `receiver` as mutable more than once at a time
--> crates/erooster_smtp/src/servers/mod.rs:86:25
|
84 | / tokio::select! {
85 | | _ = signal::ctrl_c() => {
86 | | receiver.save().expect("Unable to save queue");
| | ^^^^^^^^ second mutable borrow occurs here
87 | | },
88 | | data = receiver.recv() => {
| | -------- first mutable borrow occurs here
... |
112 | | }
113 | | }
| |_________________- first borrow might be used here, when `output` is dropped and runs the destructor for type `servers::start::{closure#0}::{closure#0}::__tokio_select_util::Out<std::result::Result<(), std::io::Error>, std::result::Result<erooster_deps::yaque::queue::RecvGuard<'_, std::vec::Vec<u8>>, std::io::Error>>`
with this code:
// Start listening for tasks
let mut receiver = ReceiverBuilder::default()
.save_every_nth(None)
.open(config.task_folder.clone());
if let Err(e) = receiver {
warn!("Unable to open receiver: {:?}. Trying to recover.", e);
recover(&config.task_folder)?;
receiver = ReceiverBuilder::default()
.save_every_nth(None)
.open(config.task_folder.clone());
info!("Recovered queue successfully");
}
match receiver {
Ok(mut receiver) => {
loop {
tokio::select! {
_ = signal::ctrl_c() => {
receiver.save().expect("Unable to save queue");
},
data = receiver.recv() => {
match data {
Ok(data) => {
let email_bytes = &*data;
let email_json = serde_json::from_slice::<EmailPayload>(email_bytes).expect("Unable to parse email payload json");
if let Err(e) = send_email_job(&email_json).await {
tracing::error!(
"Error while sending email: {:?}. Adding it to the queue again",
e
);
// FIXME: This can race the lock leading to an error. We should
// probably handle this better.
let mut sender = Sender::open(config.task_folder.clone()).expect("Unable to open queue sender");
let json_bytes = serde_json::to_vec(&email_json).expect("Unable to convert email to bytes");
sender.send(json_bytes).await.expect("Unable to add email to queue");
}
// Mark the job as complete
data.commit().expect("Unable to commit data");
}
Err(e) => {
tracing::error!("Error while receiving data from receiver: {:?}", e);
}
}
}
}
}
}
Err(e) => {
error!("Unable to open receiver: {:?}. Giving up.", e);
}
}
The error is obvious and makes sense, but I wonder what is a better way of handling this in a way where I save on crash :)
Need to make sure the lock files are deleted on interrupt.
I've tried running the tests and they give the following output:
otaviopace@mettaton ~/yaque (master) [SIGINT]> cargo test
Finished test [unoptimized + debuginfo] target(s) in 0.05s
Running target/debug/deps/yaque-009dcd917f9d7263
running 11 tests
test recovery::tests::test_unlock_inexistent ... ok
test tests::create_and_clear_fails ... ok
test tests::test_dequeue_is_atomic ... FAILED
test tests::create_and_clear ... ok
test tests::create_and_clear_async ... ok
test recovery::tests::test_unlock ... ok
test tests::test_enqueue_and_dequeue ... FAILED
test tests::test_enqueue ... ok
2020-08-22 16:51:25,942 TRACE [yaque] created queue directory
2020-08-22 16:51:25,942 TRACE [yaque] sender lock acquired. Sender state now is QueueState { segment_size: 4194304, segment: 32, position: 2786945 }
2020-08-22 16:51:25,942 TRACE [yaque] last segment opened for appending
2020-08-22 16:51:25,954 TRACE [yaque] created queue directory
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "queue `data/enqueue-dequeue-parallel` receiver side already in use" }', src/lib.rs:858:36
test tests::test_enqueue_then_dequeue ... ok
2020-08-22 16:51:32,500 TRACE [yaque::sync] file guard on `"data/enqueue-dequeue-parallel/send.lock"` dropped
test tests::test_enqueue_dequeue_parallel ... FAILED
Then, after hanging for a while, they output this:
2020-08-22 16:51:07,493 TRACE [yaque::sync] enough! Done reading
2020-08-22 16:51:07,493 TRACE [yaque::sync] reading until drained
2020-08-22 16:51:07,493 TRACE [yaque::sync] read 6 bytes
2020-08-22 16:51:07,493 TRACE [yaque::sync] enough! Done reading
2020-08-22 16:51:07,493 TRACE [yaque::sync] reading until drained
2020-08-22 16:51:07,493 TRACE [yaque::sync] read 4 bytes
2020-08-22 16:51:07,493 TRACE [yaque::sync] enough! Done reading
2020-08-22 16:51:07,493 TRACE [yaque::sync] reading until drained
2020-08-22 16:51:07,493 TRACE [yaque::sync] read 52 bytes
2020-08-22 16:51:07,493 TRACE [yaque::sync] enough! Done reading
2020-08-22 16:51:07,493 TRACE [yaque::sync] reading until drained
2020-08-22 16:51:07,493 TRACE [yaque::sync] read 4 bytes
2020-08-22 16:51:07,493 TRACE [yaque::sync] enough! Done reading
2020-08-22 16:51:07,494 TRACE [yaque::sync] reading until drained
2020-08-22 16:51:07,494 TRACE [yaque::sync] read 4 bytes
2020-08-22 16:51:07,494 TRACE [yaque::sync] enough! Done reading
2020-08-22 16:51:07,494 TRACE [yaque::sync] reading until drained
2020-08-22 16:51:07,494 TRACE [yaque::sync] read 4 bytes
After a couple of tries running them, I got this:
otaviopace@mettaton ~/yaque (master) [SIGINT]> cargo test
Finished test [unoptimized + debuginfo] target(s) in 0.05s
Running target/debug/deps/yaque-009dcd917f9d7263
running 11 tests
test recovery::tests::test_unlock_inexistent ... ok
test tests::create_and_clear_fails ... ok
test tests::test_dequeue_is_atomic ... FAILED
test tests::create_and_clear ... ok
test tests::create_and_clear_async ... ok
test recovery::tests::test_unlock ... ok
test tests::test_enqueue_and_dequeue ... FAILED
test tests::test_enqueue ... ok
2020-08-22 16:51:25,942 TRACE [yaque] created queue directory
2020-08-22 16:51:25,942 TRACE [yaque] sender lock acquired. Sender state now is QueueState { segment_size: 4194304, segment: 32, position: 2786945 }
2020-08-22 16:51:25,942 TRACE [yaque] last segment opened for appending
2020-08-22 16:51:25,954 TRACE [yaque] created queue directory
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "queue `data/enqueue-dequeue-parallel` receiver side already in use" }', src/lib.rs:858:36
test tests::test_enqueue_then_dequeue ... ok
2020-08-22 16:51:32,500 TRACE [yaque::sync] file guard on `"data/enqueue-dequeue-parallel/send.lock"` dropped
test tests::test_enqueue_dequeue_parallel ... FAILED
2020-08-22 16:52:15,239 TRACE [yaque] created queue directory
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "queue `data/enqueue-dequeue-parallel-with-batches` sender side already in use" }', src/lib.rs:898:30
test tests::test_enqueue_dequeue_parallel_with_batches ... FAILED
failures:
---- tests::test_dequeue_is_atomic stdout ----
2020-08-22 16:51:17,832 TRACE [yaque] created queue directory
thread 'tests::test_dequeue_is_atomic' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "queue `data/dequeue-is-atomic` sender side already in use" }', src/lib.rs:928:26
---- tests::test_enqueue_and_dequeue stdout ----
2020-08-22 16:51:18,651 TRACE [yaque] created queue directory
thread 'tests::test_enqueue_and_dequeue' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "queue `data/enqueue-and-dequeue` sender side already in use" }', src/lib.rs:820:26
---- tests::test_enqueue_dequeue_parallel stdout ----
thread 'tests::test_enqueue_dequeue_parallel' panicked at 'dequeue thread panicked: Any', src/lib.rs:872:9
---- tests::test_enqueue_dequeue_parallel_with_batches stdout ----
thread 'tests::test_enqueue_dequeue_parallel_with_batches' panicked at 'enqueue thread panicked: Any', src/lib.rs:921:9
failures:
tests::test_dequeue_is_atomic
tests::test_enqueue_and_dequeue
tests::test_enqueue_dequeue_parallel
tests::test_enqueue_dequeue_parallel_with_batches
test result: FAILED. 7 passed; 4 failed; 0 ignored; 0 measured; 0 filtered out
2020-08-22 16:52:15,240 TRACE [yaque] created queue directory
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "queue `data/enqueue-dequeue-parallel-with-batches` receiver side already in use" }', src/lib.rs:908:21
error: test failed, to rerun pass '--lib'
thread '<unnamed>' panicked at /tmp/tmp.yvD7plPUDn/yaque/src/queue/receiver.rs:253:9:
There were read and unused items at the end of transaction. Read and unused queue: [[85, 85]]
Output of `std::fmt::Debug`:
Scenario {
commands: [
SendBatch(
[
[
85,
85,
],
],
),
Send(
[],
),
RecvBatch(
14189153571838739689,
),
RecvBatch(
0,
),
],
}
Here's a failing test case:
#[test]
fn test_try_recv_empty_msg() {
// Populate a queue:
let mut sender = SenderBuilder::new()
.segment_size(512)
.open("data/try-recv-empty-msg")
.unwrap();
sender.try_send(&[]).unwrap();
let mut receiver = Receiver::open("data/try-recv-empty-msg").unwrap();
let item = receiver.try_recv().unwrap();
assert_eq!(&*item, &[]);
item.commit().unwrap();
}
The actual behavior is that try_recv
returns Err(TryRecvError::QueueEmpty)
Typos, typos everywhere!
I wrote some simple fuzz tests for this library (https://github.com/mullr/yaque/tree/fuzz-tests). The good news is that it works very well for something that's never been fuzzed! I did identify a few issues:
It's pretty easy to break when try_recv_batch
is used. It panics with There were read and unused items at the end of transaction. Read and unused queue: ...
, even though the test is attempting to read everything that's there, using a large count parameter.
After running the fuzzer for a little awhile, we run out of file handles. It seems like something in this library is leaking, but it's unclear where or why.
Is there a function that will try receiving, and if the queue has no data, will return None?
It seems like I can manually replicate this now by using recv_timeout
with a very small timeout, but wondering if there was a more idiomatic way?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.