Giter Site home page Giter Site logo

multitasking's Introduction

MultiTasking: Non-blocking Python methods using decorators

Python version

Travis-CI build status

PyPi version

PyPi status

PyPi downloads

CodeFactor

Star this repo

Follow me on twitter

MultiTasking is a tiny Python library lets you convert your Python methods into asynchronous, non-blocking methods simply by using a decorator.

Example

# example.py
import multitasking
import time
import random
import signal

# kill all tasks on ctrl-c
signal.signal(signal.SIGINT, multitasking.killall)

# or, wait for task to finish on ctrl-c:
# signal.signal(signal.SIGINT, multitasking.wait_for_tasks)

@multitasking.task # <== this is all it takes :-)
def hello(count):
    sleep = random.randint(1,10)/2
    print("Hello %s (sleeping for %ss)" % (count, sleep))
    time.sleep(sleep)
    print("Goodbye %s (after for %ss)" % (count, sleep))

if __name__ == "__main__":
    for i in range(0, 10):
        hello(i+1)

The output would look something like this:

$ python example.py

Hello 1 (sleeping for 0.5s)
Hello 2 (sleeping for 1.0s)
Hello 3 (sleeping for 5.0s)
Hello 4 (sleeping for 0.5s)
Hello 5 (sleeping for 2.5s)
Hello 6 (sleeping for 3.0s)
Hello 7 (sleeping for 0.5s)
Hello 8 (sleeping for 4.0s)
Hello 9 (sleeping for 3.0s)
Hello 10 (sleeping for 1.0s)
Goodbye 1 (after for 0.5s)
Goodbye 4 (after for 0.5s)
Goodbye 7 (after for 0.5s)
Goodbye 2 (after for 1.0s)
Goodbye 10 (after for 1.0s)
Goodbye 5 (after for 2.5s)
Goodbye 6 (after for 3.0s)
Goodbye 9 (after for 3.0s)
Goodbye 8 (after for 4.0s)
Goodbye 3 (after for 5.0s)

Settings

The default maximum threads is equal to the # of CPU Cores. This is just a rule of thumb! The Thread module isn't actually using more than one core at a time.

You can change the default maximum number of threads using:

import multitasking
multitasking.set_max_threads(10)

...or, if you want to set the maximum number of threads based on the number of CPU Cores, you can:

import multitasking
multitasking.set_max_threads(multitasking.config["CPU_CORES"] * 5)

For applications that doesn't require access to shared resources, you can set MultiTasking to use multiprocessing.Process() instead of the threading.Thread(), thus avoiding some of the GIL constraints.

import multitasking
multitasking.set_engine("process") # "process" or "thread"

Installation

Install multitasking using pip:

$ pip install multitasking --upgrade --no-cache-dir

Install multitasking using conda:

$ conda install -c ranaroussi multitasking

Legal Stuff

MultiTasking is distributed under the Apache Software License. See the LICENSE.txt file in the release for details.

P.S.

Please drop me an note with any feedback you have.

Ran Aroussi

multitasking's People

Contributors

andersk avatar jsibbiso avatar ketothxupack avatar ranaroussi avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

multitasking's Issues

Unable to pickle when using multiprocessing

I'm using your example from the readme and while setting the engine to multiprocessing I receive the following error. I'm using python 3.8 64 bit.


Traceback (most recent call last):
  File "mp_test.py", line 23, in <module>
    hello(i+1)
  File "C:\Users\nbrei\AppData\Local\Programs\Python\Python38\lib\site-packages\multitasking\__init__.py", line 119, in async_method
    single.start()
  File "C:\Users\nbrei\AppData\Local\Programs\Python\Python38\lib\multiprocessing\process.py", line 121, in start
    self._popen = self._Popen(self)
  File "C:\Users\nbrei\AppData\Local\Programs\Python\Python38\lib\multiprocessing\context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Users\nbrei\AppData\Local\Programs\Python\Python38\lib\multiprocessing\context.py", line 326, in _Popen
    return Popen(process_obj)
  File "C:\Users\nbrei\AppData\Local\Programs\Python\Python38\lib\multiprocessing\popen_spawn_win32.py", line 93, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Users\nbrei\AppData\Local\Programs\Python\Python38\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'task.<locals>._run_via_pool'

C:\Users\nbrei\Documents\GitHub\pead_ml2>Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\nbrei\AppData\Local\Programs\Python\Python38\lib\multiprocessing\spawn.py", line 102, in spawn_main
    source_process = _winapi.OpenProcess(
OSError: [WinError 87] The parameter is incorrect

The library takes too long time than expected when using `time.sleep` function.

I run the code bellow on Linux ubuntu 5.10.0-8-amd64 with your library multitasking, normally, the time it cost should be about 10 seconds, however,it cost almost 30 seconds to finish the whole code.

import time
import multitasking
from datetime import datetime


@multitasking.task
def sleep(s: float, idx: int) -> None:
    print(f'Thread {idx} started at', datetime.today())
    time.sleep(s)


multitasking.set_max_threads(100)
st = time.time()
for i in range(10):
    sleep(10, i+1)
multitasking.wait_for_tasks()
et = time.time()
print('Cost:', et-st, 's')

Output

Thread 1 started at 2021-10-05 10:10:40.820264
Thread 2 started at 2021-10-05 10:10:40.820646
Thread 3 started at 2021-10-05 10:10:40.822117
Thread 4 started at 2021-10-05 10:10:40.823953
Thread 5 started at 2021-10-05 10:10:50.830119
Thread 6 started at 2021-10-05 10:10:50.830896
Thread 7 started at 2021-10-05 10:10:50.831372
Thread 8 started at 2021-10-05 10:10:50.833745
Thread 9 started at 2021-10-05 10:11:00.831055
Thread 10 started at 2021-10-05 10:11:00.841716
Cost: 30.023271560668945 s

But with other way like this, it just cost about 10 seconds.

import time
from typing import List
import multitasking
from datetime import datetime
from threading import Thread


def sleep(s: float, idx: int) -> None:
    print(f'Thread {idx} started at', datetime.today())
    time.sleep(s)


multitasking.set_max_threads(100)
st = time.time()
tasks: List[Thread] = []
for i in range(10):
    t = Thread(target=sleep, args=(10, i+1))
    tasks.append(t)
for t in tasks:
    t.start()
for t in tasks:
    t.join()
et = time.time()
print('Cost:', et-st, 's')

Output

Thread 1 started at 2021-10-05 10:21:38.460654
Thread 2 started at 2021-10-05 10:21:38.460975
Thread 3 started at 2021-10-05 10:21:38.461291
Thread 4 started at 2021-10-05 10:21:38.463486
Thread 5 started at 2021-10-05 10:21:38.463916
Thread 6 started at 2021-10-05 10:21:38.464316
Thread 7 started at 2021-10-05 10:21:38.465013
Thread 8 started at 2021-10-05 10:21:38.465399
Thread 9 started at 2021-10-05 10:21:38.465883
Thread 10 started at 2021-10-05 10:21:38.466408
Cost: 10.016554594039917 s

Could you please tell me the reason? This library is so cool!

TypeError: __init__() got an unexpected keyword argument 'daemon' in Python 2.7.13

Hi,

Getting this exception when multitasking.task is run

Traceback (most recent call last):
File "C:/Users/mdivana/PythonProjects/Get Platform Details FlaskApp/wonder.py", line 16, in
run()
File "C:\Users\mdivana\AppData\Local\Continuum\Anaconda3\envs\py27\lib\site-packages\multitasking_init_.py", line 107, in async_method
target=_run_via_pool, args=args, kwargs=kwargs, daemon=False)
TypeError: init() got an unexpected keyword argument 'daemon'

This is the code I ran:

import multitasking

@multitasking.task
def run():
    print("Yes!")

if __name__ == '__main__':
    for i in range(5):
        run()
    print("Hello")

Python: 2.7.13
multitasking: 0.0.4
OS: Windows 10

Thanks

Returning values

Hi, is it possible to return a value from a method decorated with @multitasking.task? I always get the task reference, no matter what I try to return.

Detailed Documentation

I want to use multitasking library but I can't find proper documentation. I saw the examples but they were not clear. Can anyone please provide me the proper detailed documentation?

Killing a Process

Hi.
I know this isn's a issue, but how I am supossed to kill a process or
add something like daemon flag that is used in threads?

And here is more detailed reason.
I am making a mobile app, and there will be a process that work in background
and communicate with server, and it can not be a thread because then GUI
can easily be broken. And a lot of times process will be stuck on .recv() from
a socket library and it don't have a way to stop until:

  • Server will went down. (it will not happen obviously)
  • Something will kill it. (and this is what I want)

Thanks!

a bug when i want to use multitasking twice

Thanks for your great work.
But when i want to use multitasking twice, it does not work as expected at the second time. If you want to reproduce the bug, you can run the following code.

import multitasking
import signal
multitasking.set_max_threads()
multitasking.set_engine("process")
signal.signal(signal.SIGINT, multitasking.killall)


@multitasking.task
def task1():
    print(1)


@multitasking.task
def task2():
    print(2)


if __name__ == '__main__':
    for i in range(3):
        task1()

    multitasking.wait_for_tasks()
    # multitasking.config["KILL_RECEIVED"] = False

    for i in range(3):
        task2()
    multitasking.wait_for_tasks()

After reading your code, it seems that config["KILL_RECEIVED"] prevents adding new task into the pool because the value of config["KILL_RECEIVED"] is True.

So the problem is that after using wait_for_tasks, you can not add new task because of config["KILL_RECEIVED"]. Now I just add 'config["KILL_RECEIVED"] = False' after wait_for_tasks to avoid this bug.

Ctrl+C

Hey,
I used a sample code from README, and edited it a bit. Here's the result:

import multitasking
import time
import random
import signal

@multitasking.task
def hello(count):
    sleep = random.randint(1,10)/2
    print("Hello %s (sleeping for %ss)" % (count, sleep))
    time.sleep(sleep)
    print("Goodbye %s (after for %ss)" % (count, sleep))

for i in range(0, 10):
    hello(i+1)
input()

And here's the problem: I cannot use Ctrl+C to immediately stop every single task.

Can you help me, please?

P.S.: Please forgive me if I have done some stupid mistake.

Using Processes Rather than Threads Doesn't Work as Documented + Workaround

Hi Ran,

Thank you for publishing this package.

The issue is that the documented method to use processes rather than threads:

import multitasking
multitasking.set_engine("process") # "process" or "thread"

does not work. I don't believe that the set_engine call has any effect at all on the behavior of the package. This is because createPool always overwrites the "ENGINE" from it's parameter (defaulting to 'thread').

For others having this issue, a simple workaround is to create a default pool explicitly:

import multitasking
multitasking.createPool(engine='process')

Thanks,

Bishop Brock

Process mode does not respect the number of threads

Hi Ran,

Thank you for fixing the issues I mentioned a few days ago. I noticed another issue recently: When using processes, you can not limit the number of running processes, because _run_via_pool() is using the threading semaphore, and the task has already started in a new process when it acquires the semaphore. (This is important to me because I'll run out of memory if all of the processes run simultaneously).

If you'd like I could suggest a fix via a pull request, but I think all that's needed is to import both types of Semaphore and make sure the correct one is used in createPool().

Thanks,

Bishop Brock

A fail before any task being processed

Traceback (most recent call last):
  File "fixed/insert_journal.py", line 132, in <module>
    get_content(key, _ind)
  File "/usr/local/lib/python3.7/site-packages/multitasking/__init__.py", line 118, in async_method
    single.start()
  File "/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 847, in start
    _start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread

But after a seconds or several seconds pause, the processing is going on well.

It seems the init stage of multitasking has some errors unhandled

Import of pystore/yfinance breaks set_max_threads

Hi Ran,

I have a problem running your example.py on a single core machine after importing one of your other libraries e,g, pystore or yfinance. I can see you are using multitasking in these libraries. I even tried to import multitasking as _test in staid but this does not seem to help. Disabling pystore and yfinance and it works like charm.

Any idea how to come around this issue?

Best regards,

Rutger

Used code

import pystore 
import yfinance

import multitasking as _test

import time
import random
import signal

# kill all tasks on ctrl-c
signal.signal(signal.SIGINT, _test.killall)
_test.set_max_threads(10)

# or, wait for task to finish on ctrl-c:
# signal.signal(signal.SIGINT, multitasking.wait_for_tasks)

@_test.task # <== this is all it takes :-)
def hello(count):
    sleep = random.randint(1,10)/2
    print("Hello %s (sleeping for %ss)" % (count, sleep))
    time.sleep(sleep)
    print("Goodbye %s (after for %ss)" % (count, sleep))

if __name__ == "__main__":
    for i in range(0, 10):
        hello(i+1)

Unwanted result

Hello 1 (sleeping for 5.0s)
Goodbye 1 (after for 5.0s)
Hello 2 (sleeping for 0.5s)
Goodbye 2 (after for 0.5s)
Hello 3 (sleeping for 4.5s)
Goodbye 3 (after for 4.5s)
Hello 4 (sleeping for 1.5s)
Goodbye 4 (after for 1.5s)
Hello 5 (sleeping for 3.5s)
Goodbye 5 (after for 3.5s)
Hello 6 (sleeping for 4.5s)
Goodbye 6 (after for 4.5s)
Hello 7 (sleeping for 2.0s)
Goodbye 7 (after for 2.0s)
Hello 8 (sleeping for 3.0s)
Goodbye 8 (after for 3.0s)
Hello 9 (sleeping for 1.5s)
Goodbye 9 (after for 1.5s)
Hello 10 (sleeping for 3.5s)
Goodbye 10 (after for 3.5s)

Pystore and yfinance both removed

#import pystore 
#import yfinance

import multitasking as _test

import time
import random
import signal

# kill all tasks on ctrl-c
signal.signal(signal.SIGINT, _test.killall)
_test.set_max_threads(10)

# or, wait for task to finish on ctrl-c:
# signal.signal(signal.SIGINT, multitasking.wait_for_tasks)

@_test.task # <== this is all it takes :-)
def hello(count):
    sleep = random.randint(1,10)/2
    print("Hello %s (sleeping for %ss)" % (count, sleep))
    time.sleep(sleep)
    print("Goodbye %s (after for %ss)" % (count, sleep))

if __name__ == "__main__":
    for i in range(0, 10):
        hello(i+1)

Wanted result

Hello 1 (sleeping for 5.0s)
Hello 2 (sleeping for 4.5s)
Hello 3 (sleeping for 4.0s)
Hello 4 (sleeping for 4.5s)
Hello 5 (sleeping for 3.0s)
Hello 6 (sleeping for 3.0s)
Hello 7 (sleeping for 3.0s)
Hello 8 (sleeping for 1.5s)
Hello 9 (sleeping for 0.5s)
Hello 10 (sleeping for 3.5s)
Goodbye 9 (after for 0.5s)
Goodbye 8 (after for 1.5s)
Goodbye 5 (after for 3.0s)
Goodbye 6 (after for 3.0s)
Goodbye 7 (after for 3.0s)
Goodbye 10 (after for 3.5s)
Goodbye 3 (after for 4.0s)
Goodbye 2 (after for 4.5s)
Goodbye 4 (after for 4.5s)
Goodbye 1 (after for 5.0s)

Kill processes when finished.

Dear Ran,

How do I kill a process once it has run? I am running into the OSError too many files issue as I have an IO based application which constantly updates the positions of 18 servo motors (Raspberry Pi) and each time starts a new thread.

macaquedev

Feature Request: Enable multiple joins

Hi Ran,

I am using this package in scripts where I need to spawn some work, wait for it to complete, then spawn more work, wait, etc. etc. The issue is that the join method wait_for_tasks() is one-shot, because it keeps config["KILL_RECEIVED"]=True after the join completes, and when config["KILL_RECEIVED"] is True, task() is a NOP. Would you consider changing wait_for_tasks() to enable multiple joins()? Also, to me not would be good for task() to throw an exception if it is called while a join is pending rather than silently doing nothing.

Thanks,

Bishop Brock

Feature Request - List of tasks

It would be nice if there was a method that returned the list inside config["TASKS"] so we can see how many are alive vs not alive. I know of wait_for_tasks() and is_alive() but if we could just get the list itself we can easily see how many alive remain and print that out. In this way, we don't need to keep our own list of tasks and iterate using is_alive() ourselves.

Thank you,
Nick

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.