Giter Site home page Giter Site logo

altwistendpy's People

Contributors

altendky avatar dependabot[bot] avatar

Watchers

 avatar  avatar

altwistendpy's Issues

Helpers to get into async, maybe a decorator?

https://gist.github.com/altendky/a1423f8519c4785283ee723106ef5e8b

"""
Run such as:
    venv/bin/python rundevice.py --device-path "interface/devices/DG 247 Factory.epz"
"""

import click
import epyqlib.hildevice
import epyqlib.utils.twisted
import twisted.internet.defer
import twisted.internet.reactor
import twisted.internet.task


def react_async_helper(reactor, f, args, kwargs):
    return twisted.internet.defer.ensureDeferred(
        f(reactor, *args, **kwargs),
    )


def react_async(f, *args, **kwargs):
    twisted.internet.task.react(react_async_helper, (f, args, kwargs))


@click.command()
@click.option(
    '--device-path',
    type=click.Path(exists=True, dir_okay=False, resolve_path=True),
    required=True,
)
def command(device_path):
    react_async(async_main, device_path=device_path)


async def async_main(reactor, device_path):
    device = epyqlib.hildevice.Device(definition_path=device_path)

    print('about to sleep')
    await epyqlib.utils.twisted.sleep(1)
    print('slept 1')
    await epyqlib.utils.twisted.sleep(1)
    print('slept 1')


command()

inlineCallbacks entry/react etc stuff

https://github.com/altendky/txcan/blob/9576bb8425b807d86bd9a817847a0a60c02e3cf4/src/txcan/cli.py#L30-L71

def react_inline_callbacks_helper(reactor, f, args, kwargs):
    deferred = f(reactor, *args, **kwargs)

    def cancel():
        deferred.cancel()
        return deferred

    reactor.addSystemEventTrigger('before', 'shutdown', cancel)

    return deferred


def react_inline_callbacks(f, *args, **kwargs):
    twisted.internet.task.react(
        react_inline_callbacks_helper,
        (f, args, kwargs),
    )


def sleep(clock, duration):
    return twisted.internet.task.deferLater(clock, duration, lambda: None)


@twisted.internet.defer.inlineCallbacks
def react_for(duration, clock, deferred):
    cancelled = [False]

    def cancel():
        cancelled[0] = True
        deferred.cancel()

    clock.callLater(duration, cancel)

    result = None

    try:
        result = yield deferred
    except twisted.internet.defer.CancelledError:
        if not cancelled[0]:
            raise

    return result

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.