Giter Site home page Giter Site logo

Comments (9)

mu-arch avatar mu-arch commented on June 12, 2024 1

Ah, I see now. That example is beautiful. Thank you!

from moka.

tatsuya6502 avatar tatsuya6502 commented on June 12, 2024

Hi. Thanks for your feedback. Yes; the document needs some improvements.

I am thinking to add the following examples to moka::future::Cache of Moka v0.5.2. Will they work for you? (I am assuming you are using the future cache, but please let me know if you are using a sync cache)

get_or_insert_with

// Cargo.toml
//
// [dependencies]
// moka = { version = "0.5", features = ["future"] }
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }

use moka::future::Cache;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    const TEN_MIB: usize = 10 * 1024 * 1024; // 10MiB
    let cache = Cache::new(1_000);

    // Get the value for key1. The async block should be evaluated because
    // the key1 does not exist yet.
    let value1 = cache
        .get_or_insert_with("key1", async { Arc::new(vec![0u8; TEN_MIB]) })
        .await;
    assert_eq!(value1.len(), TEN_MIB);

    // Get the value for key1. The async block should NOT be evaluated
    // because the key1 already exists. Note that the length of the vec is
    // different to the first call (10MiB vs zero).
    let value2 = cache
        .get_or_insert_with("key1", async { Arc::new(vec![0u8; 0]) })
        .await;

    // The length of the value2 should be equal to the one inserted by the
    // first call.
    assert_eq!(value2.len(), TEN_MIB);
}

get_or_try_insert_with

// Cargo.toml
//
// [dependencies]
// moka = { version = "0.5", features = ["future"] }
// reqwest = "0.11"
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
use moka::future::Cache;

/// This async function tries to get HTML from the given URI.
async fn get_html(uri: &str) -> 
    Result<String, Box<dyn std::error::Error + Send + Sync + 'static>>
{
    Ok(reqwest::get(uri).await?.text().await?)
}

#[tokio::main]
async fn main() {
    let cache = Cache::new(1_000);

    // Get the value for key1. The async fn should be evaluated because the
    // key does not exist yet. However, the async fn will return an error due
    // to the broken URL.
    let value1 = cache
        .get_or_try_insert_with("key1", get_html("tps://broken-url"))
        .await;

    // An error was returned, and key1 still does not exist.
    assert!(value1.is_err());
    assert!(cache.get(&"key1").is_none());

    // Get the value for key1. The async fn should be evaluated.
    let value2 = cache
        .get_or_try_insert_with("key1", get_html("https://www.rust-lang.org"))
        .await;

    // An OK was returned, and key1 now exists.
    assert!(value2.is_ok());
    assert!(cache.get(&"key1").is_some());
}

from moka.

mu-arch avatar mu-arch commented on June 12, 2024

Appreciate your quick and very detailed response, I was mistaken about what I thought this method did. This ticket probably isn't the place but I would like to mention: I think a useful feature could be to allow locking a key.

My use case is a function that can run multiple times in circumstances where network conditions are bad, and I wanted to block at get() until the other functions that intend to insert() finish.

I imagine it could look something like this:

my_cache.acquire_lock(&"my_key");

let cached_item: String = match my_cache.get(&"my_key")
    {
        None => {
            ... lookup some data from a database and then insert it into the moka cache, once .get() returns unlock the mutex ...

            value_from_database
        }
        Some(v) => v,
    };

I just think it would be a nice feature to have since this is a frequent pattern for me with this library. I'm not sure if that is something you are interested in adding. Thanks so much for your work on this project!

from moka.

tatsuya6502 avatar tatsuya6502 commented on June 12, 2024

Ah, OK. You were not mistaken. This method does lock a key and you can use it in your code above.

It uses a RwLock and ensures only one async task can acquire the exclusive writer lock. Other async tasks will wait untile the writer lock is released and then they will acquire multi-reader locks. The async task who has the writer lock will resolves the future, and other async tasks will not. The other async task will be blocked until the value becomes available.

type Waiter<V> = Arc<RwLock<Option<Result<V, ErrorObject>>>>;

let waiter = Arc::new(RwLock::new(None));
let mut lock = waiter.write().await;
match self.try_insert_waiter(&key, type_id, &waiter) {
None => {
// Inserted. Resolve the init future.
let value = init.await;
*lock = Some(Ok(value.clone()));
Initialized(value)
}
Some(res) => {
// Value already exists. Drop our write lock and wait for a read lock
// to become available.
std::mem::drop(lock);
match &*res.read().await {
Some(Ok(value)) => ReadExisting(value.clone()),
Some(Err(_)) | None => unreachable!(),
}
}
}

(If the async task has acquired the writer lock, None clause will be executed and it will resolves the init future. Otherwise, Some clause will be executed and it will be blocked until it gets a reader lock)

Caches have unit test cases to verify this behavior, but they will not be suitable for usage examples (They will be too complicated).

moka/src/future/cache.rs

Lines 943 to 947 in 8607db7

// This test will run five async tasks:
//
// Task1 will be the first task to call `get_or_insert_with` for a key, so
// its async block will be evaluated and then a &str value "task1" will be
// inserted to the cache.

Let me try to come up with another usage example to demonstrate the locking. I will do it after work. (It is early morning now in my timezone UTC+0800, and I will do it tonight)

from moka.

tatsuya6502 avatar tatsuya6502 commented on June 12, 2024

This method does lock a key and you can use it in your code above.

So your code

my_cache.acquire_lock(&"my_key");

let cached_item: String = match my_cache.get(&"my_key")
    {
        None => {
            ... lookup some data from a database and then insert it into the moka cache, once .get() returns unlock the mutex ...

            value_from_database
        }
        Some(v) => v,
    };

will be translated into this when you use get_or_insert_with() method:

let cached_item: String = my_cache.get_or_insert_with("my_key",
    // This async block should be evaluated only once.
    // get_or_insert_with() uses an internal mutex (`RwLock`) on "my_key",
    // so you do not need something like `my_cache.acquire_lock(&"my_key")`.
    async {
        // Lookup some data from a database.
        let value_from_database = ...
        // Then return the data. It will be inserted into my_cache.
        value_from_database
        // Once it is inserted, the mutex is unlocked, and other async tasks,
        // which have called `get_or_insert_with()` and blocked, will be unblocked
        // and get the same data.
    }
).await;

from moka.

tatsuya6502 avatar tatsuya6502 commented on June 12, 2024

Let me try to come up with another usage example to demonstrate the locking.

OK. How about this? Will this answer your question?

// Cargo.toml
//
// [dependencies]
// moka = { version = "0.5", features = ["future"] }
// futures = "0.3"
// once_cell = "1"
// reqwest = "0.11"
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
use moka::future::Cache;
use once_cell::sync::Lazy;
use std::sync::atomic::{AtomicU8, Ordering};

// This counter will be incremented by 1 on each time get_html() is called.
// (This counter exists only for a demonstration purpose)
static CALL_COUNTER: Lazy<AtomicU8> = Lazy::new(|| AtomicU8::default());

/// This async function tries to get HTML from the given URI.
async fn get_html(
    task_id: u8,
    uri: &str,
) -> Result<String, Box<dyn std::error::Error + Send + Sync + 'static>> {
    println!("get_html() called by task {}.", task_id);

    // Increment the call counter by 1.
    CALL_COUNTER.fetch_add(1, Ordering::AcqRel);

    // Get the HTML from the Internet.
    Ok(reqwest::get(uri).await?.text().await?)
}

#[tokio::main]
async fn main() {
    let cache = Cache::new(1_000);

    // Spawn eight async tasks. They will try to do the same thing at the same time
    // but get_html() should be called only once.
    let tasks: Vec<_> = (0..8_u8)
        .map(|task_id| {
            // To share the same cache across the async tasks, clone it.
            // This is a cheap operation.
            let my_cache = cache.clone();

            // Spawn an async task.
            tokio::spawn(async move {
                println!("Task {} started.", task_id);

                // Try to get the value for key1. If key1 does not exists, get_html()
                // will be called, and the returned value will be inserted to the cache
                // unless it is an Err(..).
                let value = my_cache
                    .get_or_try_insert_with("key1", get_html(task_id, "https://www.rust-lang.org"))
                    .await;

                // Ensure the value exists now.
                assert!(value.is_ok());
                assert!(my_cache.get(&"key1").is_some());

                println!(
                    "Task {} got the value. (len: {})",
                    task_id,
                    value.unwrap().len()
                );
            })
        })
        .collect();

    // Run all tasks concurrently and wait for them to complete.
    futures::future::join_all(tasks).await;

    // Verify that get_html() was called exactly once.
    assert_eq!(CALL_COUNTER.load(Ordering::Acquire), 1);
}

Here is an output from the above program:

  • get_html() was called exactly once by task 5.
  • All other tasks were blocked until task 5 got the value from get_html() and inserted it to the cache.
Task 0 started.
Task 3 started.
Task 4 started.
Task 5 started.
Task 1 started.
Task 2 started.
get_html() called by task 5.
Task 6 started.
Task 7 started.
Task 5 got the value. (len: 19419)
Task 1 got the value. (len: 19419)
Task 7 got the value. (len: 19419)
Task 4 got the value. (len: 19419)
Task 2 got the value. (len: 19419)
Task 6 got the value. (len: 19419)
Task 3 got the value. (len: 19419)
Task 0 got the value. (len: 19419)

This example uses get_or_try_insert_with(), which expects an async block/async fn to return a Result<V, Box<dyn Error + ...>>, where V is the value type. You can replace get_or_try_insert_with() with get_or_insert_with() when async block/async fn always returns a V.

from moka.

mu-arch avatar mu-arch commented on June 12, 2024

Sorry, but I'm still having trouble. I believe it's because I can't return the way I want to here with bail!() inside an async block. Perhaps you know of a solution?:

    let mut member_was_cached = true;

    let cached_member: GuildMember = MEMBERS_MEM_CACHE
        .get()
        .unwrap()
        .get_or_insert_with((member.guild_id.0, member.user.id.0), async {
            match query_db_for_member_or_return_current(&member).await {
                Ok(v) => {
                    if !v.1 { member_was_cached = false; };
                    v.0
                }
                Err(e) => bail!(e)
            }
        }
        )
        .await;

I return a Result<(GuildMember, bool), anyhow::Error> from query_db_for_member_or_return_current(). The tuple is because there's two different states that GuildMember could be in and I didn't want to use the error to express that because I want to use it's value in a logger.

I did try the get_or_try_insert_with variant also but seem to be having the same problem.

from moka.

tatsuya6502 avatar tatsuya6502 commented on June 12, 2024

Yeah, I know these type puzzles will be difficult to solve for anybody until they get used to async and multi-threading programming in Rust.

I see a couple of issues in your code. Added comments inline.

    let mut member_was_cached = true;

    let cached_member: GuildMember = MEMBERS_MEM_CACHE
        .get()
        .unwrap()
        .get_or_insert_with((member.guild_id.0, member.user.id.0), async {
            match query_db_for_member_or_return_current(&member).await {
                Ok(v) => {
                    // We cannot safely write to this variable here because 
                    // this async block can be executed by a different thread.
                    // We need to use AtomicBool instead.
                    if !v.1 { member_was_cached = false; };
                    v.0
                }
                // bail!() only do an early return from this async block, so
                // it will return the Err value to its caller get_or_insert_with().
                // We need to use get_or_try_insert_with() instead.
                Err(e) => bail!(e)
            }
        }
        )
        .await;

To solve them, we need to do the followings:

  1. For member_was_cached, use std::sync::atomic::AtomicBool instead of bool.
  2. For bail!(), use get_or_try_insert_with instead of get_or_insert_with.

Here is an example code with the above changes applied:

use std::sync::{
    atomic::{AtomicBool, Ordering},
    Arc,
};

use moka::future::Cache;
use once_cell::sync::Lazy;

#[derive(Clone)]
struct GuildMember {
    guild_id: (u32, String),
    user_id: (u32, String),
}

async fn query_db_for_member_or_return_current(
    _member: &GuildMember,
) -> Result<(GuildMember, bool), anyhow::Error> {
    todo!()
}

async fn get_user(member: GuildMember) -> anyhow::Result<GuildMember> {
    static MEMBERS_MEM_CACHE: Lazy<Cache<(u32, u32), GuildMember>> =
        Lazy::new(|| Cache::new(1024));

    // The async block passed to get_or_try_insert_with() can be executed by a
    // different thread. So we need to protect the bool value from multi-threaded
    // access using an AtomicBool.
    let member_was_cached = Arc::new(AtomicBool::new(true));
    // Create an Arc clone. We will move this into the async block.
    let mwc = Arc::clone(&member_was_cached);

    let cached_member = MEMBERS_MEM_CACHE
        // The return type of our async block is Result<GuildMember, anyhow::Error>, so we
        // need to use get_or_try_insert_with() instead of get_or_insert_with().
        .get_or_try_insert_with((member.guild_id.0, member.user_id.0), async move {
            match query_db_for_member_or_return_current(&member).await {
                Ok(v) => {
                    if !v.1 {
                        mwc.store(false, Ordering::Release)
                    };
                    Ok(v.0)
                }
                Err(e) => {
                    // Convert e (anyhow::Error) into a type that get_or_try_insert_with()
                    // can accept.
                    Err(e.into())
                }
            }
        })
        .await
        // We need to wrap the error with anyhow::Error. Upcoming Moka v0.6 will no longer
        // require this map_err() method.
        .map_err(|e| anyhow::anyhow!(e))?;

    // Read the bool value and do something with it.
    if !member_was_cached.load(Ordering::Acquire) {
        todo!();
    }

    Ok(cached_member)
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let member = GuildMember {
        guild_id: (0, "guild0".into()),
        user_id: (1, "user1".into()),
    };
    get_user(member).await?;
    Ok(())
}

Actually, I realized that Moka v0.5.0 and v0.5.1 have a bug in type definitions so they will accept code without the change 1 (AtomicBool). That is wrong; you should not use a plain bool there. I will fix the bug and release Moka v0.5.2 soon.

from moka.

mu-arch avatar mu-arch commented on June 12, 2024

Your helpfulness is legend! Thank you. Closing this issue now.

from moka.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.