Giter Site home page Giter Site logo

mode's Introduction

Mode: AsyncIO Services

Build status coverage BSD License Mode can be installed via wheel Supported Python versions. Supported Python implementations.

Version

4.3.2

Web

http://mode.readthedocs.org/

Download

http://pypi.org/project/mode

Source

http://github.com/ask/mode

Keywords

async, service, framework, actors, bootsteps, graph

What is Mode?

Mode is a very minimal Python library built-on top of AsyncIO that makes it much easier to use.

In Mode your program is built out of services that you can start, stop, restart and supervise.

A service is just a class:

class PageViewCache(Service):
    redis: Redis = None

    async def on_start(self) -> None:
        self.redis = connect_to_redis()

    async def update(self, url: str, n: int = 1) -> int:
        return await self.redis.incr(url, n)

    async def get(self, url: str) -> int:
        return await self.redis.get(url)

Services are started, stopped and restarted and have callbacks for those actions.

It can start another service:

class App(Service):
    page_view_cache: PageViewCache = None

    async def on_start(self) -> None:
        await self.add_runtime_dependency(self.page_view_cache)

    @cached_property
    def page_view_cache(self) -> PageViewCache:
        return PageViewCache()

It can include background tasks:

class PageViewCache(Service):

    @Service.timer(1.0)
    async def _update_cache(self) -> None:
        self.data = await cache.get('key')

Services that depends on other services actually form a graph that you can visualize.

Worker

Mode optionally provides a worker that you can use to start the program, with support for logging, blocking detection, remote debugging and more.

To start a worker add this to your program:

if __name__ == '__main__':
    from mode import Worker
    Worker(Service(), loglevel="info").execute_from_commandline()

Then execute your program to start the worker:

console

$ python examples/tutorial.py [2018-03-27 15:47:12,159: INFO]: [^Worker]: Starting... [2018-03-27 15:47:12,160: INFO]: [^-AppService]: Starting... [2018-03-27 15:47:12,160: INFO]: [^--Websockets]: Starting... STARTING WEBSOCKET SERVER [2018-03-27 15:47:12,161: INFO]: [^--UserCache]: Starting... [2018-03-27 15:47:12,161: INFO]: [^--Webserver]: Starting... [2018-03-27 15:47:12,164: INFO]: [^--Webserver]: Serving on port 8000 REMOVING EXPIRED USERS REMOVING EXPIRED USERS

To stop it hit Control-c:

console

[2018-03-27 15:55:08,084: INFO]: [^Worker]: Stopping on signal received... [2018-03-27 15:55:08,084: INFO]: [^Worker]: Stopping... [2018-03-27 15:55:08,084: INFO]: [^-AppService]: Stopping... [2018-03-27 15:55:08,084: INFO]: [^--UserCache]: Stopping... REMOVING EXPIRED USERS [2018-03-27 15:55:08,085: INFO]: [^Worker]: Gathering service tasks... [2018-03-27 15:55:08,085: INFO]: [^--UserCache]: -Stopped! [2018-03-27 15:55:08,085: INFO]: [^--Webserver]: Stopping... [2018-03-27 15:55:08,085: INFO]: [^Worker]: Gathering all futures... [2018-03-27 15:55:08,085: INFO]: [^--Webserver]: Closing server [2018-03-27 15:55:08,086: INFO]: [^--Webserver]: Waiting for server to close handle [2018-03-27 15:55:08,086: INFO]: [^--Webserver]: Shutting down web application [2018-03-27 15:55:08,086: INFO]: [^--Webserver]: Waiting for handler to shut down [2018-03-27 15:55:08,086: INFO]: [^--Webserver]: Cleanup [2018-03-27 15:55:08,086: INFO]: [^--Webserver]: -Stopped! [2018-03-27 15:55:08,086: INFO]: [^--Websockets]: Stopping... [2018-03-27 15:55:08,086: INFO]: [^--Websockets]: -Stopped! [2018-03-27 15:55:08,087: INFO]: [^-AppService]: -Stopped! [2018-03-27 15:55:08,087: INFO]: [^Worker]: -Stopped!

Beacons

The beacon object that we pass to services keeps track of the services in a graph.

They are not stricly required, but can be used to visualize a running system, for example we can render it as a pretty graph.

This requires you to have the pydot library and GraphViz installed:

console

$ pip install pydot

Let's change the app service class to dump the graph to an image at startup:

class AppService(Service):

    async def on_start(self) -> None:
        print('APP STARTING')
        import pydot
        import io
        o = io.StringIO()
        beacon = self.app.beacon.root or self.app.beacon
        beacon.as_graph().to_dot(o)
        graph, = pydot.graph_from_dot_data(o.getvalue())
        print('WRITING GRAPH TO image.png')
        with open('image.png', 'wb') as fh:
            fh.write(graph.create_png())

Creating a Service

To define a service, simply subclass and fill in the methods to do stuff as the service is started/stopped etc.:

class MyService(Service):

    async def on_start(self) -> None:
        print('Im starting now')

    async def on_started(self) -> None:
        print('Im ready')

    async def on_stop(self) -> None:
        print('Im stopping now')

To start the service, call await service.start():

await service.start()

Or you can use mode.Worker (or a subclass of this) to start your services-based asyncio program from the console:

if __name__ == '__main__':
    import mode
    worker = mode.Worker(
        MyService(),
        loglevel='INFO',
        logfile=None,
        daemon=False,
    )
    worker.execute_from_commandline()

It's a Graph!

Services can start other services, coroutines, and background tasks.

  1. Starting other services using add_depenency:

    class MyService(Service):
    
        def __post_init__(self) -> None:
           self.add_dependency(OtherService(loop=self.loop))
  2. Start a list of services using on_init_dependencies:

    class MyService(Service):
    
        def on_init_dependencies(self) -> None:
            return [
                ServiceA(loop=self.loop),
                ServiceB(loop=self.loop),
                ServiceC(loop=self.loop),
            ]
  3. Start a future/coroutine (that will be waited on to complete on stop):

    class MyService(Service):
    
        async def on_start(self) -> None:
            self.add_future(self.my_coro())
    
        async def my_coro(self) -> None:
            print('Executing coroutine')
  4. Start a background task:

    class MyService(Service):
    
        @Service.task
        async def _my_coro(self) -> None:
            print('Executing coroutine')
  5. Start a background task that keeps running:

    class MyService(Service):
    
        @Service.task
        async def _my_coro(self) -> None:
            while not self.should_stop:
                # NOTE: self.sleep will wait for one second, or
                #       until service stopped/crashed.
                await self.sleep(1.0)
                print('Background thread waking up')

Installation

You can install Mode either via the Python Package Index (PyPI) or from source.

To install using `pip`:

$ pip install -U mode

Downloading and installing from source

Download the latest version of Mode from http://pypi.org/project/mode

You can install it by doing the following:

$ tar xvfz mode-0.0.0.tar.gz
$ cd mode-0.0.0
$ python setup.py build
# python setup.py install

The last command must be executed as a privileged user if you are not currently using a virtualenv.

Using the development version

With pip

You can install the latest snapshot of Mode using the following pip command:

$ pip install https://github.com/ask/mode/zipball/master#egg=mode

FAQ

Can I use Mode with Django/Flask/etc.?

Yes! Use gevent/eventlet as a bridge to integrate with asyncio.

Using gevent

This works with any blocking Python library that can work with gevent.

Using gevent requires you to install the aiogevent module, and you can install this as a bundle with Mode:

console

$ pip install -U mode[gevent]

Then to actually use gevent as the event loop you have to execute the following in your entrypoint module (usually where you start the worker), before any other third party libraries are imported:

#!/usr/bin/env python3
import mode.loop
mode.loop.use('gevent')
# execute program

REMEMBER: This must be located at the very top of the module, in such a way that it executes before you import other libraries.

Using eventlet

This works with any blocking Python library that can work with eventlet.

Using eventlet requires you to install the aioeventlet module, and you can install this as a bundle with Mode:

console

$ pip install -U mode[eventlet]

Then to actually use eventlet as the event loop you have to execute the following in your entrypoint module (usually where you start the worker), before any other third party libraries are imported:

#!/usr/bin/env python3
import mode.loop
mode.loop.use('eventlet')
# execute program

REMEMBER: It's very important this is at the very top of the module, and that it executes before you import libraries.

Can I use Mode with Tornado?

Yes! Use the tornado.platform.asyncio bridge: http://www.tornadoweb.org/en/stable/asyncio.html

Can I use Mode with Twisted?

Yes! Use the asyncio reactor implementation: https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.html

Will you support Python 3.5 or earlier?

There are no immediate plans to support Python 3.5, but you are welcome to contribute to the project.

Here are some of the steps required to accomplish this:

  • Source code transformation to rewrite variable annotations to comments

    for example, the code:

    class Point:
        x: int = 0
        y: int = 0

    must be rewritten into:

    class Point:
        x = 0  # type: int
        y = 0  # type: int
  • Source code transformation to rewrite async functions

    for example, the code:

    async def foo():
        await asyncio.sleep(1.0)

    must be rewritten into:

    @coroutine
    def foo():
        yield from asyncio.sleep(1.0)

Will you support Python 2?

There are no plans to support Python 2, but you are welcome to contribute to the project (details in question above is relevant also for Python 2).

At Shutdown I get lots of warnings, what is this about?

If you get warnings such as this at shutdown:

text

Task was destroyed but it is pending! task: <Task pending coro=<Service._execute_task() running at /opt/devel/mode/mode/services.py:643> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1100a7468>()]>> Task was destroyed but it is pending! task: <Task pending coro=<Service._execute_task() running at /opt/devel/mode/mode/services.py:643> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1100a72e8>()]>> Task was destroyed but it is pending! task: <Task pending coro=<Service._execute_task() running at /opt/devel/mode/mode/services.py:643> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1100a7678>()]>> Task was destroyed but it is pending! task: <Task pending coro=<Event.wait() running at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/locks.py:269> cb=[_release_waiter(<Future pendi...1100a7468>()]>)() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:316]> Task was destroyed but it is pending! task: <Task pending coro=<Event.wait() running at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/locks.py:269> cb=[_release_waiter(<Future pendi...1100a7678>()]>)() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:316]>

It usually means you forgot to stop a service before the process exited.

Code of Conduct

Everyone interacting in the project's codebases, issue trackers, chat rooms, and mailing lists is expected to follow the Mode Code of Conduct.

As contributors and maintainers of these projects, and in the interest of fostering an open and welcoming community, we pledge to respect all people who contribute through reporting issues, posting feature requests, updating documentation, submitting pull requests or patches, and other activities.

We are committed to making participation in these projects a harassment-free experience for everyone, regardless of level of experience, gender, gender identity and expression, sexual orientation, disability, personal appearance, body size, race, ethnicity, age, religion, or nationality.

Examples of unacceptable behavior by participants include:

  • The use of sexualized language or imagery
  • Personal attacks
  • Trolling or insulting/derogatory comments
  • Public or private harassment
  • Publishing other's private information, such as physical or electronic addresses, without explicit permission
  • Other unethical or unprofessional conduct.

Project maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct. By adopting this Code of Conduct, project maintainers commit themselves to fairly and consistently applying these principles to every aspect of managing this project. Project maintainers who do not follow or enforce the Code of Conduct may be permanently removed from the project team.

This code of conduct applies both within project spaces and in public spaces when an individual is representing the project or its community.

Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by opening an issue or contacting one or more of the project maintainers.

This Code of Conduct is adapted from the Contributor Covenant, version 1.2.0 available at http://contributor-covenant.org/version/1/2/0/.

mode's People

Contributors

ask avatar casio avatar jinxjinx avatar martinmaillard avatar nemosupremo avatar prithvin avatar r313pp avatar seifertm avatar tojkamaster avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

mode's Issues

CancelledError does not cancel 'mode.Worker'

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Mode.

Steps to reproduce

Have a Service with a canceled background task wrapped into Worker:

class Api(mode.Service):
    @mode.Service.task
    def main(self) -> None:
        raise asyncio.CancelledError()

if __name__ == '__main__':
    mode.Worker(Api()).execute_from_command_line()

Expected behavior

A task's CancelledError can be propagated to the worker.

Actual behavior

An application seems stuck and unresponsive.

Full traceback

Paste the full traceback (if there is any)

Versions

  • Python version: 3.7.5
  • Mode version: 4.1.2
  • Operating system: Debian-based official Python 3.7 image

ANN: Please migrate away from pytest-openfiles

Hello. The Astropy Project has decided to retire pytest-openfiles, a plugin to detect open files in pytest runs. We see that you are using it in this repo. While you may continue to use it, please be advised that it will no longer be maintained going forward. See https://github.com/astropy/pytest-openfiles#important-retirement-roadmap for more information. We apologize for any inconvenienced cause.

If this issue was opened in error and is irrelevant, feel free to close.

Thank you for your patience.

xref astropy/pytest-openfiles#47

Faust build fails due to import error in utils/logging.py

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Mode.

Steps to reproduce

I'm using Faust in one of my projects which is using the mode module. The new version for mode (4.1.5) seems to have an import issue in utils/logging.py. This is causing build issues in faust.

Expected behavior

Build to not fail due to import error.

Actual behavior

Build fails due to import error.

Full traceback

Traceback (most recent call last):
server_1          |   File "/usr/local/bin/faust", line 5, in <module>
server_1          |     from faust.cli.faust import cli
server_1          |   File "/usr/local/lib/python3.8/site-packages/faust/cli/__init__.py", line 2, in <module>
server_1          |     from .base import AppCommand, Command, argument, call_command, option
server_1          |   File "/usr/local/lib/python3.8/site-packages/faust/cli/base.py", line 34, in <module>
server_1          |     from mode import Service, ServiceT, Worker
server_1          |   File "/usr/local/lib/python3.8/site-packages/mode/__init__.py", line 126, in __getattr__
server_1          |     module = __import__(
server_1          |   File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 31, in <module>
server_1          |     from .timers import timer_intervals
server_1          |   File "/usr/local/lib/python3.8/site-packages/mode/timers.py", line 5, in <module>
server_1          |     from .utils.logging import get_logger
server_1          |   File "/usr/local/lib/python3.8/site-packages/mode/utils/logging.py", line 47, in <module>
server_1          |     from typing_extensions import Protocol

Versions

  • Python version: 3.8
  • Mode version: 4.1.5
  • Operating system: macOS Catalina 10.15.1

Suggestions to fix this

Add typing_extensions in requirements.txt file and rebuild.

Failed to start example service.py

Starting example/service.py with debug=True

import mode

class MyService(mode.Service):

    async def on_started(self) -> None:
        self.log.info('Service started (hit ctrl+C to exit).')

    @mode.Service.task
    async def _background_task(self) -> None:
        print('BACKGROUND TASK STARTING')
        while not self.should_stop:
            await self.sleep(1.0)
            print('BACKGROUND SERVICE WAKING UP')


if __name__ == '__main__':
    mode.Worker(
        MyService(),
        debug=True,
        loglevel='INFO',
        logfile=None,  # stderr
).execute_from_commandline()
python app.py
[2018-08-08 16:21:05,376: ERROR]: [^Worker]: Error: TypeError('Use `self.add_context(ctx)` for non-async context')
Traceback (most recent call last):
  File "/path/python3.7/site-packages/mode/worker.py", line 189, in execute_from_commandline
    self.loop.run_until_complete(self.start())
  File "/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 568, in run_until_complete
    return future.result()
  File "/path/python3.7/site-packages/mode/worker.py", line 241, in start
    await super().start()
  File "/path/python3.7/site-packages/mode/services.py", line 657, in start
    await self._default_start()
  File "/path/python3.7/site-packages/mode/services.py", line 662, in _default_start
    await self._actually_start()
  File "/path/python3.7/site-packages/mode/services.py", line 668, in _actually_start
    await self.on_first_start()
  File "/path/python3.7/site-packages/mode/worker.py", line 118, in on_first_start
    await self.default_on_first_start()
  File "/path/python3.7/site-packages/mode/worker.py", line 124, in default_on_first_start
    await self._add_monitor()
  File "/path/python3.7/site-packages/mode/worker.py", line 254, in _add_monitor
    await self.add_async_context(monitor)
  File "/path/python3.7/site-packages/mode/services.py", line 525, in add_async_context
    'Use `self.add_context(ctx)` for non-async context')
TypeError: Use `self.add_context(ctx)` for non-async context

Versions

  • Python version 3.7.0
  • Mode version: 1.15.0
  • aiomonitor: aiomonitor 0.3.1
  • Operating system: Lastest Macos

asyncio.get_event_loop is not supported by python 3.10

Checklist

  • [ Y ] I have included information about relevant versions
  • [ Y ] I have verified that the issue persists when using the master branch of Mode.

Steps to reproduce

Tell us what you did to cause something to happen.

Expected behavior

asyncio.get_event_loop is not supported by python 3.10. We should use get_running_loop & asyncio.run() instead.

Does this project be archived? If that, we have to recreate mode2 repo. :-(

Actual behavior

Not support python 3.10

Full traceback

https://docs.python.org/3/library/asyncio-eventloop.html

Get the current event loop. If there is no current event loop set in the current OS thread, the OS thread is main, and set_event_loop() has not yet been called, asyncio will create a new event loop and set it as the current one.Because this function has rather complex behavior (especially when custom event loop policies are in use), using the get_running_loop() function is preferred to get_event_loop() in coroutines and callbacks. Consider also using the asyncio.run() function instead of using lower level functions to manually create and close an event loop. Deprecated since version 3.10: Deprecation warning is emitted if there is no running event loop. In future Python releases, this function will be an alias of get_running_loop().

get_running_loop is new in version 3.7

python 3.6 is out of support at the end of 2021.

Versions

  • Python 3.10
  • Mode version: Master branch
  • Operating system: Linux

'mode.Worker' overrides logging configuration

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Mode.

Steps to reproduce

  1. Setup logging by yourself.
  2. Call mode.Worker.execute_from_commandline.

Relevant code snippet:

import mode
import logging.config


if __name__ == '__main__':
    logging.config.dictConfig(
        ...  # Some kind of setup.
    )

    service = mode.Service()
    worker = mode.Worker(service)
    worker.execute_from_commandline()

Expected behavior

Logging behaves as it was configured (e.g. log records are formatted based on logging.config).

Actual behavior

Logging configuration is overridden by mode.Worker#_setup_logging.

Full traceback

โ€”

Versions

  • Python version: 3.7.3
  • Mode version: 3.0.7
  • Operating system: macOS

Potential MethodQueue bug?

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Mode.

Steps to reproduce

  1. Define any simple async function
  2. Create a mode.threads.QueueServiceThread and call its start() method.
  3. Call the async function with service.
  4. Prepare a debugger with breakpoints at two locations:
    -- The beginning of MethodQueue, currently line 331 of mode/threads.py
    -- The if self._loop is None block of ServiceBase's loop @property, currently line 146 of mode/services.py
  5. Run with the debugger
  6. Observe first expected Breakpoint stop
  7. Resume execution

Expected behavior

The program should run to completion without stopping at the second breakpoint at all

Before proceeding to (7), notice that we have passed an explicit event loop argument. I do not expect to need to hit the second breakpoint before the program completes because the event loop is known at this point and I have no further Service initialization to do.

Actual behavior

Given that we have passed a loop argument explicitly to MethodQueue, I would not expect to land in the second breakpoint, especially while the first breakpoint is still on the call stack. But on resuming after the first, expected breakpoint, we land in a second, unexpected stop, with MethodQueue requesting the default event loop to satisfy its own None reference.

Reading the code carefully we can see that MethodQueue does not ever do anything with its EventLoop parameter--neither pass it to its own super.init() method, more to any of its child objects.

I imagine that this code should either not receive an EventLoop argument, or it should pass that argument to its own call to ServiceBase.init() before initializing its Queue and Event objects. I am not sure which solution would be preferred. Although the former seems more consistent with Python 3.8's deprecation of explicit EventLoop arguments, perhaps the latter would be more sensitive to compatibility with Python 3.6 and 3.7 for the time being.

Full traceback

Traceback (most recent call last):
  __init__, threads.py:332
  method_queue, threads.py:411
  on_thread_started, threads.py:418
  _serve, threads.py:252
  _run, events.py:81
  _run_once, base_events.py:1859
  run_forever, base_events.py:570
  run_until_complete, base_events.py:603
  _start_thread, threads.py:211
  run, threads.py:66
    _bootstrap_inner, threading.py:932
    _bootstrap, threading.py:890

Versions

  • Python version 3.8.5
  • Mode version 4.3.2
  • Operating system MacOSX 10.13.6

Services should be singletons

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Mode.

Steps to reproduce

Create a service with two child dependencies, each of which depends on the same root dependency:

from typing import List

from mode import Service
from mode.utils.objects import cached_property

class RootService(Service):
    async def on_start(self) -> None:
        print('Starting root!')

class Child1(Service):
    def on_init_dependencies(self) -> List:
        return [self.root]
    
    @cached_property
    def root(self) -> RootService:
        return RootService()
    
    async def on_start(self) -> None:
        print('Starting child 1')
    
class Child2(Service):
    def on_init_dependencies(self) -> List:
        return [self.root]
    
    @cached_property
    def root(self) -> RootService:
        return RootService()
    
    async def on_start(self) -> None:
        print('Starting child 2')

class App(Service):
    def on_init_dependencies(self) -> List:
        return [self.child1, self.child2]
    
    @cached_property
    def child1(self) -> Child1:
        return Child1()
    
    @cached_property
    def child2(self) -> Child2:
        return Child2()
    
    async def on_start(self) -> None:
        print('Starting app')

if __name__ == '__main__':
    from mode import Worker
    Worker(App(), loglevel='info').execute_from_commandline()

Expected behavior

Expected the root service to only be started once.

Actual behavior

The root service was started twice.

Full traceback

[2020-10-15 15:25:05,021] [20096] [INFO] [^Worker]: Starting...
[2020-10-15 15:25:05,023] [20096] [INFO] [^-App]: Starting...
[2020-10-15 15:25:05,024] [20096] [WARNING] Starting app
[2020-10-15 15:25:05,025] [20096] [INFO] [^--Child1]: Starting...
[2020-10-15 15:25:05,026] [20096] [WARNING] Starting child 1
[2020-10-15 15:25:05,027] [20096] [INFO] [^---RootService]: Starting...
[2020-10-15 15:25:05,028] [20096] [WARNING] Starting root!
[2020-10-15 15:25:05,029] [20096] [INFO] [^--Child2]: Starting...
[2020-10-15 15:25:05,030] [20096] [WARNING] Starting child 2
[2020-10-15 15:25:05,031] [20096] [INFO] [^---RootService]: Starting...
[2020-10-15 15:25:05,032] [20096] [WARNING] Starting root!

Versions

  • Python version: 3.8.3
  • Mode version: 4.3.2 (master branch)
  • Operating system: Windows 10

It's absolutely critical sometimes for a service to only be started once, for example if a service needs to lock a resource to operate.

Is there any way to ensure a service is started exactly once in mode?

Thanks!

Aram

Debugging with pdb looks weird

This isn't a bug, just something I observed, and I'm wondering if somebody has a way around it.
I'm using mode through Faust.

I want to debug my worker code so I put import pdb; pdb.set_trace() in. The breakpoint gets hit and I can use pdb, but all the output is being filtered through the logging mechanism, so it looks like this:

[2020-05-03 12:46:59,075] [1] [WARNING] (Pdb)
[2020-05-03 12:47:00,147] [1] [WARNING] 59          async for log in logs:
[2020-05-03 12:47:00,147] [1] [WARNING] 60              pdb.set_trace()
[2020-05-03 12:47:00,148] [1] [WARNING] 63              cid = log.MESSAGE.get('CID')
[2020-05-03 12:47:00,148] [1] [WARNING] 64  ->          loop = asyncio.get_running_loop()

Is there a way to disable this logging feature? Or is there a different recommended way of debugging?

Steps to reproduce

  • Put import pdb; pdb.set_trace() or breakpoint() into a worker code.
  • trigger that code

Versions

  • Python version: 3.8.1
  • Mode version: 4.3.2
  • Operating system: python:3.8.1 Docker image on Linux

instagram

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Mode.

Steps to reproduce

Tell us what you did to cause something to happen.

Expected behavior

Tell us what you expected to happen.

Actual behavior

Tell us what happened instead.

Full traceback

Paste the full traceback (if there is any)

Versions

cd instagram
sudo chmod +x *

  • Python version
  • python3 instagram.py lan_namayandeh /home/kaliboys/dekstop/pass.txt -m password list -m
  • Mode version
  • (0) : 32 bots 512 passwords at a time
  • (1) : 16 bots 256 passwords at a time
  • (2) : 8 bots 128 passwords at a time
  • (3) : 4 bots 64 passwords at a time
  • Operating system

threads.ServiceThread hangs on exception

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Mode.

Steps to reproduce

import asyncio
import mode.threads

class CrashingServiceThread(mode.threads.ServiceThread):
    async def on_start(self):
        raise RuntimeError('I am here to crash')

async def main():
    await CrashingServiceThread().start()

if __name__ == "__main__":
    asyncio.run(main())

Expected behavior

Process exits.

Actual behavior

Process hangs.

Full traceback

'CrashingServiceThread' crashed: RuntimeError('I am here to crash')
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/mode/threads.py", line 218, in _serve
    await self._default_start()
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 739, in _default_start
    await self._actually_start()
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 756, in _actually_start
    await self.on_start()
  File "app2.py", line 6, in on_start
    raise RuntimeError('I am here to crash')
RuntimeError: I am here to crash
--> Hangs here. ^C is pressed.
Traceback (most recent call last):
  File "app2.py", line 12, in <module>
    asyncio.run(main())
  File "/usr/local/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 570, in run_until_complete
    self.run_forever()
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 538, in run_forever
    self._run_once()
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 1746, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/local/lib/python3.7/selectors.py", line 468, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt
--> Another ^C.
Exception ignored in: <module 'threading' from '/usr/local/lib/python3.7/threading.py'>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/threading.py", line 1307, in _shutdown
    lock.acquire()
KeyboardInterrupt

Versions

  • Python 3.7.4
  • Mode 4.1.6
  • Debian GNU/Linux 10 (buster), Windows 8.1

Additional Info

mode.threads.ServiceThread and mode.threads.WorkerThread locks each other when an exception is raised.
I've dug up the problem, but I don't have sufficient knowledge of this lib to fix this properly.
Hire is what I've found (my comments are denoted by ###):

class WorkerThread(threading.Thread):
    def run(self) -> None:
        try:
            self.service._start_thread()
        finally:
            ### self._is_stopped.set() done hire
            ### but WorkerThread.stop() is called inside self.service._start_thread()
            ### and infinitely waits, so we never reach this
            self._set_stopped()

    def stop(self) -> None:
        self._is_stopped.wait()  ### we infinitely wait here because run() is still running up the stack
        if self.is_alive():
            self.join(threading.TIMEOUT_MAX)

class ServiceThread(Service):
    def _start_thread(self) -> None:
        # set the default event loop for this thread
        asyncio.set_event_loop(self.thread_loop)
        try:
            self.thread_loop.run_until_complete(self._serve())
        except Exception:
            # if self._serve raises an exception we need to set
            # shutdown here, since _shutdown_thread will not execute.
            ### I think this is not true, _shutdown_thread is executed
            ### in case of the exception in _serve.
            self.set_shutdown()
            raise

    async def _shutdown_thread(self) -> None:
        await self._default_stop_children()
        await self.on_thread_stop()
        self.set_shutdown()
        await self._default_stop_futures()
        if self._thread is not None:
            ### problem is here
            ### May be we shouldn't call this from inside the thread
            self._thread.stop()
        await self._default_stop_exit_stacks()

    async def _serve(self) -> None:
        try:
            # start the service
            await self._default_start()
            # allow ServiceThread.start() to return
            # when wait_for_thread is enabled.
            await self.on_thread_started()
            notify(self._thread_running)
            await self.wait_until_stopped()
        except asyncio.CancelledError:
            raise
        except BaseException as exc:  # pylint: disable=broad-except
            self.on_crash('{0!r} crashed: {1!r}', self.label, exc)
            await self.crash(exc)
            if self.beacon.root is not None:
                await self.beacon.root.data.crash(exc)
            raise
        finally:
            await self._shutdown_thread()  ### this is called in case of the exception

It also causes faust to hang as well:

import faust
import mode

app = faust.App(
    "some_app",
    broker="kafka://localhost:9092",
    # agent_supervisor=mode.CrashingSupervisor,  # This won't help
)

@app.task
async def some_task():
    raise RuntimeError("Some Error")

if __name__ == "__main__":
    app.main()

Process will hang in crashed state.

Also, it seems to be causing this faust issue.

aioeventlet and aiogevent are unmaintained

What is the plan with aioeventlet and aiogevent . They are extras, but they definitely do not work on Python 3.7+ , and I wouldnt be surprised if they were currently incompatible with eventlet and gevent respectively, as they appear to be unmaintained for quite a while.

I submitted PRs to each to get them sort-of working, but some of the test cases hang

Q: root/package logger configuration

I hope asking questions via github issues is considered acceptable - not sure how else to do so other than tweeting.

Project package layout:

proj
proj/__init__.py
proj/__main__.py
proj/app.py
proj/services
proj/services/__init__.py
proj/services/discovery.py

I've got a subclass of Worker called Application in proj/app.py. I've got a service DiscoveryService located in proj/services/discovery.py. if I configure DEBUG mode in Application, it doesn't apply to the logger in DiscoveryService.

I believe this is simply because of Python's cascading logging - meaning, I need to set the logger in proj/__init__.py to address this. A single localized spot for logger configuration is ideal, so with that in mind, should I override on_setup_root_logger(self, _logger, _loglevel) in my Worker-based Application subclass to 'change' the root module logger? Like this maybe:

# proj/__init__.py
import logging

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# proj/app.py
import mode

from . import logger as module_logger

class Application(mode.Worker):
    ...

    def on_setup_root_logger(self, _logger, _loglevel):
        # what to do here to 'copy' the `_logger` into `module_logger`?

Assuming I'm on the right path, filling in that "what to do here" bit is where I'm confused. I'm sure it's simple though. ๐Ÿ™„

add `add_timer` functionality

For fast prototyping Service.timer decorator is enough, but for flexible solution this need to be improved because this do not allows to:

  • Set timer interval at runtime (from config, cli, etc.)
  • Stop timer
  • Add timer at runtime

I think best approach here is pretty similar to others add_* methods. Something like add_timer will create new timer and return handle to manipulate (stop) it.

ServiceProxy and Supervisor doesn't handle _crash_reason

Firstly, thanks for writing that library, i get more and more fond of it!

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Mode.

Steps to reproduce

Using OneForAllSupervisor with a ServiceProxy, raising an exception in a supervised service.

Expected behavior

The proxy class should go to the _crash_reason attribute of the Service.

Actual behavior

It doesn't and raise AttributeError.

Full traceback

[^--OneForAllSupervisor]: Crashed reason=AttributeError("'SomeServiceProxy' object has no attribute '_crash_reason'") 
Traceback (most recent call last):
  File "/venvpath/lib/python3.7/site-packages/mode/services.py", line 762, in _execute_task
    await task
  File "/venvpath/lib/python3.7/site-packages/mode/supervisors.py", line 140, in _supervisor
    await self.restart_services(to_restart)
  File "/venvpath/lib/python3.7/site-packages/mode/supervisors.py", line 206, in restart_services
    await self.restart_service(service)
  File "/venvpath/lib/python3.7/site-packages/mode/supervisors.py", line 176, in restart_service
    service, cast(Service, service)._crash_reason,
AttributeError: 'SomeServiceProxy' object has no attribute '_crash_reason'

Versions

  • Python version 3.7
  • Mode version 4.0.1
  • Operating system Linux

Q: multiprocessing/fork

Hi @ask, hope you don't mind another question. I'm hoping to fork the initial process running my Services, to take advantage of multiple cores. What is the best approach given mode's structure?

I've looked at the source of gunicorn for hints, but I'd like to get some feedback from you as well - specifically in regards to a mode compatible approach.

For context, I've got an Application class derived from mode.Worker, and a number of mode.Service subclasses as well that are initialized on Application instantiation.

I appreciate any feedback you might have, thank you.


Edit: I see use of concurrent.futures.ProcessPoolExecutor (https://pymotw.com/3/asyncio/executors.html#processes). For more detail, I actually have two reasons for wanting to run asyncio/mode process-based work:

  • gunicorn-like handling of incoming requests so they're spread across cores
  • running computationally intensive tasks in parallel across cores; I believe ProcessPoolExecutor makes sense here

Those two scenarios would be happening on separate systems (one is a frontend mode.Service providing request validation, the other is a backend mode.Service performing things like image renders)

Missing import: typing_extensions

Versions 4.1.4 through 4.1.6 are missing an import statement for typing_extensions. The problem was introduced with commit 3d15d1e.

Here is an example of a Faust 1.8.1 application running under python 3.7.3 with mode 4.1.6 demonstrating the error:

Start the faust worker...
Traceback (most recent call last):
  File "/usr/local/bin/faust", line 5, in <module>
    from faust.cli.faust import cli
  File "/usr/local/lib/python3.7/dist-packages/faust/cli/__init__.py", line 2, in <module>
    from .base import AppCommand, Command, argument, call_command, option
  File "/usr/local/lib/python3.7/dist-packages/faust/cli/base.py", line 34, in <module>
    from mode import Service, ServiceT, Worker
  File "/usr/local/lib/python3.7/dist-packages/mode/__init__.py", line 127, in __getattr__
    object_origins[name], None, None, [name])
  File "/usr/local/lib/python3.7/dist-packages/mode/services.py", line 31, in <module>
    from .timers import timer_intervals
  File "/usr/local/lib/python3.7/dist-packages/mode/timers.py", line 5, in <module>
    from .utils.logging import get_logger
  File "/usr/local/lib/python3.7/dist-packages/mode/utils/logging.py", line 48, in <module>
    from typing_extensions import Protocol
ModuleNotFoundError: No module named 'typing_extensions'

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.