Giter Site home page Giter Site logo

rust-threadpool / rust-threadpool Goto Github PK

View Code? Open in Web Editor NEW
534.0 15.0 83.0 686 KB

A very simple thread pool for parallel task execution

Home Page: https://crates.io/crates/threadpool

License: Apache License 2.0

Rust 100.00%
thread-pool rust parallelism

rust-threadpool's Introduction

threadpool

A thread pool for running a number of jobs on a fixed set of worker threads.

Build Status doc.rs

Usage

Add this to your Cargo.toml:

[dependencies]
threadpool = "1.0"

and this to your crate root:

extern crate threadpool;

Minimal requirements

This crate requires Rust >= 1.13.0

Memory performance

Rust 1.32.0 has switched from jemalloc to the operating systems allocator. While this enables more plattforms for some workloads this means some performance loss.

To regain the performance consider enabling the jemallocator crate.

Similar libraries

License

Licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

Development

To install rust version 1.13.0 with rustup execute this command:

rustup install 1.13.0

To run the tests with 1.13.0 use this command:

cargo +1.13.0 test

If your build fails with this error:

warning: unused manifest key: package.categories
error: failed to parse lock file at: /home/vp/rust/threadpool/Cargo.lock

You can fix it by removing the lock file:

rm Cargo.lock

rust-threadpool's People

Contributors

aidanhs avatar alexcrichton avatar antonhagser avatar aochagavia avatar bblancha avatar bors[bot] avatar busyjay avatar dns2utf8 avatar efyang avatar frewsxcv avatar jahkeup avatar killercup avatar kolloch avatar mjkillough avatar naufraghi avatar nooberfsh avatar notriddle avatar rsolomo avatar steveklabnik avatar tamird avatar willem66745 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rust-threadpool's Issues

Feature: split thread pool from work queue

This library (and the others like it) combine the notion of a "replenishing pool of N threads" with a "work queue". It would be useful to expose these building blocks separately.

The raw thread pool would allow you to run any Send + Sync + Fn type across N threads, and would automatically restart a thread if it panicked. The work queue would be an implementation of that function which pulled jobs from a queue and executed them.

How can I know if a job panicked?

If I execute a job, how can I know it has panicked?

The example uses a channel to collect results, but I don't think it's possible to distinguish between a response that won't be sent, and a response that hasn't been sent yet.

use threadpool::ThreadPool;
use std::sync::mpsc::channel;

let pool = ThreadPool::new(4);

let (tx, rx) = channel();
for i in 0..8 {
    let tx = tx.clone();
    pool.execute(move|| {
        if i == 4 {panic!();} // -- unexpected failure added here --
        tx.send(i).unwrap();
    });
}

// And now this code waits forever:
assert_eq!(rx.iter().take(8).fold(0, |a, b| a + b), 28);

Possible memory leak

Hi, I think this crate has a memory leak. When building then running with valgrind: I get the following output:
Screenshot_20210424_223012
If I make the loop run longer, more memory leaks:
image
Hopefully this can be fixed.

Hellgrind issues

Hi guys,

I've run an example from the docs page (see below) under the hellgrind tool (it checks for multi-threading related errors) and it gave me plenty of synchronization errors, although rust's plain threads implementation it runs doesn't have the errors.

Generally speaking, the tool is known to produce false positives, but as I mentioned before nothing is shown for the plain threads rust implementation. I've also generated suppression rules from a single run, but they haven't fixed the errors meaning different ones have been reported on another launch.

Could you please have a look? You can see it by yourself just by running valgrind --tool=helgrind on a compiled app. Here's my ouput -- https://gist.github.com/mexus/08e2694758e4b1a2833dcbb415971c0b .

I'm using rustc 1.17.0 and threadpool 1.3.2.

extern crate threadpool;

use threadpool::ThreadPool;
use std::sync::mpsc::channel;

fn main() {
    let n_workers = 4;
    let n_jobs = 8;
    let pool = ThreadPool::new(n_workers);

    let (tx, rx) = channel();
    for _ in 0..n_jobs {
        let tx = tx.clone();
        pool.execute(move || { tx.send(1).unwrap(); });
    }

    let res = rx.iter().take(n_jobs).fold(0, |a, b| a + b);
    assert_eq!(res, 8);
}

Question on panicking threads

Hi!

I had a question around what happens when a job panics - is that thread essentially "dead" for the rest of the runtime of the pool? Apologies if this is documented or implied somewhere.

`ScopedPool` is unsafe

This compiles:

extern crate threadpool;

use threadpool::ScopedPool;
use std::{mem, thread};

fn main() {
    let mut xs = vec![1, 2, 3, 4, 5];
    {
        let pool = ScopedPool::new(4);
        for x in &mut xs {
            pool.execute(move || {
                thread::sleep_ms(2000);
                *x += 10;
            });
        }
        mem::forget(pool);
    }
    // The threads are still running and we can access the shared mutable data!
    xs.push(5);
}

Threadpool leaks memory

hello,

i've a large number of tasks and my application is leaking memory.
i tracked down this issue to the threadpool:

let n_workers = 8;
    let n_jobs = 100000000;
    let pool = ThreadPool::new(n_workers);

    let (tx, rx) = channel();
    for _ in 0..n_jobs {
        let tx = tx.clone();
        pool.execute(move|| {
            let foo = (0..1000000).map(|_| "X").collect::<String>();
            tx.send(1).unwrap();
        });
    }

    assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), n_jobs);

the problem is the allocation in the thread (let foo = ....).
by increasing the size of foo, you can determ how fast the pool is eating your memory.

i pushed the code to a demo repository:

git clone https://github.com/timglabisch/rust_leaking_threadpool.git
cd rust_leaking_threadpool
cargo run --release

any ideas how to solve this?

Use sync_channel to limit the size of job queue?

rust-threadpool use channel for receiving jobs, that means if the number of jobs is very large it will consume lots of memory.

So I'd like to know if it is a good idea or not to use sync_channel instead of sync.

I googled and found:

Not sure why the author hadn't submitted a PR.

Seperating Thread creation from job execution.

Right now, the ScopedPool API works in such a way that the handle that represents the spawned threads and the lifetime-bound RAII guard for the actual execution of jobs are one and the same type.

This means its not possible to re-use the same pool of threads across different batches of job closing over disjoints lifetimes, eg:

let pool = ScopedPool::new(4);

for _ in 0..10 {
    let mut a = [1, 2, 3];
    for e in &mut a {
        pool.execute(move || *e += 1);
    }
    // <join the existing threads here without ending them>
}
src/pool_cache.rs:19:27: 19:28 error: `a` does not live long enough
src/pool_cache.rs:19             for e in &mut a {
                                               ^
src/pool_cache.rs:14:35: 28:6 note: reference must be valid for the block at 14:34...
src/pool_cache.rs:14     fn scoped(&mut self) -> Guard {
src/pool_cache.rs:15         let pool = ScopedPool::new(4);
src/pool_cache.rs:16 
src/pool_cache.rs:17         for _ in 0..10 {
src/pool_cache.rs:18             let mut a = [1, 2, 3];
src/pool_cache.rs:19             for e in &mut a {
                     ...
src/pool_cache.rs:18:35: 22:10 note: ...but borrowed value is only valid for the block suffix following statement 0 at 18:34
src/pool_cache.rs:18             let mut a = [1, 2, 3];
src/pool_cache.rs:19             for e in &mut a {
src/pool_cache.rs:20                 pool.execute(move || *e += 1);
src/pool_cache.rs:21             }
src/pool_cache.rs:22         }

I propose the introduction of a new type PoolCache whose sole purpose is to spawn N threads, and which has constructor methods for getting a RAII handle that allows temporary usage as a scoped pool:

let pool_cache = PoolCache::new(4); // Allocate 4 Threads and keep them ready

for _ in 0..10 {
    let mut a = [1, 2, 3];
    let pool = pool_cache.scoped(); // Create a scoped pool with a lifetime bound local to this scope
    for e in &mut a {
        pool.execute(move || *e += 1);
    }
    // pool drops here, which causes the threads to wait ("join") for completion and get returned to the cache.
}

SYNC is not implemented for ThreadPool

I am trying to create a threadpool as static. So that I dont have to pass this variable across multiple functions.

lazy_static! {
    pub static ref TABLES_LOAD_POOL: ThreadPool = setup_thread_pool();
}

fn setup_thread_pool () -> ThreadPool {
        
    let thread_pool :ThreadPool  = threadpool::Builder::new()
                      .num_threads(10)
                      .thread_name("Table_Load".into())
                      .build();
    thread_pool

}

I get the following error

| |_^std::sync::mpsc::Sender<std::boxed::Box<(dyn threadpool::FnBox + std::marker::Send + 'static)>> cannot be shared between threads safely
|

Intermittent failure in `test_multi_join()`

The test test_multi_join() will intermittently fail, although it can take a large number of runs to see this. Running the following script:

#!/bin/bash
set -e

for i in `seq 5000`; do
    echo ==================== $i ====================
    cargo test -- multi_join;
done

will eventually produce the following error:

running 1 test
test test::test_multi_join ... FAILED

failures:

---- test::test_multi_join stdout ----
thread 'test::test_multi_join' panicked at src/lib.rs:1111:9:
assertion `left == right` failed
  left: Ok(0)
 right: Err(Empty)
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace


failures:
    test::test_multi_join

test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 20 filtered out; finished in 0.00s

error: test failed, to rerun pass `--lib`

API breaking changes proposal

  • add struct ThreadPoolBuilder

  • add ThreadPoolBuilder fn new()
    • uses num_cpu crate to default num of threads to num of cpus

  • remove: ThreadPool fn new(num_threads: usize) -> ThreadPool
  • add: ThreadPool fn new() -> ThreadPool
    • uses num_cpu to default num of threads to num of cpus
  • add: ThreadPoolBuilder fn num_threads(self, num: usize) -> ThreadPoolBuilder
    • set the number of threads for the pool that the builder will construct

  • remove: ThreadPool fn with_name(name: String, num_threads: usize) -> ThreadPool
  • add: ThreadPoolBuilder fn name(self, name: String) -> ThreadPoolBuilder


In summary: remove all constructors off of ThreadPool, except a new new constructor. If you want to fine-tune your threadpool, you'd use a ThreadPoolBuilder.

Build error in extern lib nn = "0.1.5"

Hey out there!

I am very new to rust and still don't know how to tackle building issues. Can someone help me out with this one?

RustNN$ cargo build
   Compiling libc v0.1.6
   Compiling rustc-serialize v0.3.14
   Compiling gcc v0.3.5
   Compiling threadpool v0.1.4
   Compiling rand v0.3.8
   Compiling time v0.1.25
   Compiling nn v0.1.5 (file:///home/swiesend/workspace/rust/RustNN)
src/lib.rs:70:18: 70:28 error: unresolved import `threadpool::ScopedPool`. There is no `ScopedPool` in `threadpool`
src/lib.rs:70 use threadpool::{ScopedPool};
                               ^~~~~~~~~~
error: aborting due to previous error
Could not compile `nn`.

build is failing with the beta.4 and the nightly build from rust:

$ rustc --version
rustc 1.0.0-beta.4 (850151a75 2015-04-30) (built 2015-04-30)

$ rustc --version
rustc 1.1.0-nightly (c42c1e7a6 2015-05-02) (built 2015-05-02)

Why not remove trait FnBox ?

FnOnce already replaces FnBox, so why implement it yourself?

  1. 删除以下代码:
trait FnBox {
    fn call_box(self: Box<Self>);
}

impl<F: FnOnce()> FnBox for F {
    fn call_box(self: Box<F>) {
        (*self)()
    }
}
  1. 修改代码
type Thunk<'a> = Box<FnBox + Send + 'a>;

改成

type Thunk<'a> = Box<dyn FnOnce() -> () + Send + 'a>;
job.call_box();

改成

job();

Feature: provide a way to shutdown the thread pool

It would be great to provide a mechanism to terminate all pending tasks and wait for the current one to complete.

Essentially complete the current executing jobs (since I'm not sure it's possible to terminate spawned threads). And then don't proceed doing any more work.

At the moment this is not possible.

Limit job-pool size

It would be great if we could set a maximum number of queued jobs for the pool. In this case, adding a new job to a full pool would return an error.

The unit test ```test_shrink``` make no sense

In the test_shrink, if comment the function call pool.set_num_threads(TEST_TASKS); , the test also can pass. Because after_shrink_size == TEST_TASKS and active threads == TEST_TASKS.

Then I want to try write a test case that can test shrink, but it will test fail sometimes (because the implementation in lib.rs).

The code is as follows, test_no_shrink is for the origin test case I metion above, test_shrink_in_sleep is my custom test case using sleep.

#[cfg(test)]
mod test {
    use std::{
        sync::{Arc, Barrier},
        thread::sleep,
        time::Duration,
    };
    use threadpool::ThreadPool;

    const TEST_TASKS: usize = 4;
    #[test]
    fn test_no_shrink() {
        let test_tasks_begin = TEST_TASKS + 2;

        let pool = ThreadPool::new(test_tasks_begin);
        let b0 = Arc::new(Barrier::new(test_tasks_begin + 1));
        let b1 = Arc::new(Barrier::new(test_tasks_begin + 1));

        for _ in 0..test_tasks_begin {
            let (b0, b1) = (b0.clone(), b1.clone());
            pool.execute(move || {
                b0.wait();
                b1.wait();
            });
        }

        let b2 = Arc::new(Barrier::new(TEST_TASKS + 1));
        let b3 = Arc::new(Barrier::new(TEST_TASKS + 1));

        for _ in 0..TEST_TASKS {
            let (b2, b3) = (b2.clone(), b3.clone());
            pool.execute(move || {
                b2.wait();
                b3.wait();
            });
        }

        b0.wait();

        // no shrink call, it also can test successfully
        // pool.set_num_threads(TEST_TASKS);

        assert_eq!(pool.active_count(), test_tasks_begin);
        b1.wait();

        b2.wait();
        assert_eq!(pool.active_count(), TEST_TASKS);
        b3.wait();
    }

    #[test]
    fn test_shrink_in_sleep() {
        let test_tasks_begin = TEST_TASKS / 2;
        let shrink_tasks = 1;

        let mut pool = ThreadPool::new(test_tasks_begin);

        for _ in 0..TEST_TASKS {
            pool.execute(move || {
                sleep(Duration::from_secs(1));
            })
        }

        sleep(Duration::from_millis(100));
        pool.set_num_threads(shrink_tasks);

        /*
         *  if TEST_TASKS = 4:
         *
         *  time        tasks
         *  [0-1]       task0, task1
         *  [1-2]       task2
         *  [2-3]       task3
         */

        sleep(Duration::from_secs(1));
        assert_eq!(pool.active_count(), 1);
    }
}

This is the test result:

running 2 tests
test test::test_no_shrink ... ok
test test::test_shrink_in_sleep ... FAILED

failures:

---- test::test_shrink_in_sleep stdout ----
thread 'test::test_shrink_in_sleep' panicked at 'assertion failed: `(left == right)`
  left: `2`,
 right: `1`', src/main.rs:77:9
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace


failures:
    test::test_shrink_in_sleep

test result: FAILED. 1 passed; 1 failed; 0 ignored; 0 measured; 0 filtered out; finished in 1.10s

Although I know the thread pool does not need such a precise shrink, I want to ask how to write a test case to test whether the shrink really works and the test case can pass everytime.

Default number of workers: Look at the affinity mask instead of the number of CPUs?

Hi,

I was looking for a thread pool, and found this repo. Thanks for working on this.

I was browsing the dependencies and saw that you use num_cpus to decide how many worker threads to use by default:

let num_threads = self.num_threads.unwrap_or_else(num_cpus::get);

Would it not be better to look into the affinity mask to get this default? The system may have 64 cores, but have the process pinned to, e.g. 4 cores. This would cause contention.

I found this crate, which might help (disclaimer, I've not used it and I'm not sure how portable it is):
https://crates.io/crates/core_affinity

Thanks

Adding a `scoped` function

I wonder if it would be useful to have a scoped function similar to the scoped_threadpool crate.

Advantages could come from the design. Like:

  • dedicate only a certain part of the pool to the scoped work
  • add the scoped jobs in random order but join the whole pool at the end of the scoped function
  • add the scoped jobs in random order but join only the scoped jobs

One could also state that the resources would be wasted since such a crate already exists.

Clarify documentation about deadlocking

The way this sentence is currently phrased, it sounds like it will deadlock as soon as you submit a job when no worker is available:

Keep in mind, if you put more jobs in the pool than you have workers, you will end up with a deadlock

But that doesn't seem to be the case, this works nicely:

extern crate threadpool;

use threadpool::ThreadPool;
use std::time::Duration;
use std::thread::sleep;

fn main() {
    let pool = ThreadPool::new(2);
    for _ in 0..5 {
        pool.execute(move || {
            println!("Executing job");
            sleep(Duration::from_secs(1));
        });
        println!("Submitted job");
    }
    sleep(Duration::from_secs(5));
    println!("Active: {}", pool.active_count());
}

Will print:

Submitted job
Executing job
Submitted job
Submitted job
Submitted job
Submitted job
Executing job
Executing job
Executing job
Executing job
Active: 0

Can you clarify what is meant in that section? As far as I can tell, the danger is when you block in the job execution on something that is outside of it and that depends on the job execution as well.

Correct way to wait until all threads are finished

If you have a fixed sized queue of work, what's the correct way to block until your queue is finished? I am currently using a channel, but I'm not sure that's the correct way.

For instance, from the dining philosophers problem:

let handles: Vec<_> = philosophers.into_iter().map(|p| {
    thread::spawn(move || {
        p.eat();
    })
}).collect();

//blocks until all work is done   
for h in handles {
    h.join().unwrap();
}

The way I have currently accomplished this with threadpools is using mpsc::channel:

let pool = ThreadPool::new(4);

let num_philosophers = philosophers.len();

let (tx, rx) = channel();

for p in philosophers {
   let tx = tx.clone();
   pool.execute(move || {
        p.eat();
        tx.send(()).unwrap();
   })
}

//blocks until all work is done   
for _ in 0..num_philosophers {
  rc.recv().unwrap();
}

Is there a cleaner way to wait?

Provide a real world example

Maybe something that searches for all *.jpeg in a directory, starts a thread pool, and in each thread, converts them to .png

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.