Giter Site home page Giter Site logo

ethereum / lahja Goto Github PK

View Code? Open in Web Editor NEW
391.0 19.0 19.0 475 KB

Lahja is a generic multi process event bus implementation written in Python 3.6+ to enable lightweight inter-process communication, based on non-blocking asyncio

License: MIT License

Shell 1.09% Makefile 1.18% Python 97.74%
asyncio trio eventbus python python3 async

lahja's Introduction

Lahja

Documentation Status

Documentation hosted by ReadTheDocs

DISCLAIMER: This is alpha state software. Expect bugs.

Lahja is a generic event bus implementation written in Python 3.6+ that enables lightweight inter-process communication, based on non-blocking asynchronous IO

What is this for?

Lahja is tailored around one primary use case: enabling multi process Python applications to communicate via events between processes using non-blocking APIs based on asyncio or trio.

Key facts:

  • non-blocking APIs using asynchronous IO (asyncio / trio)
  • lightweight with zero dependencies
  • simple to use
  • easy multicasting of events (one event, many independent receivers)
  • easy event routing (e.g route event X only to certain receivers)
  • multiple consuming APIs to adapt to different use cases and styles

Developer Setup

If you would like to hack on lahja, please check out the Snake Charmers Tactical Manual for information on how we do:

  • Testing
  • Pull Requests
  • Code Style
  • Documentation

Development Environment Setup

You can set up your dev environment with:

git clone https://github.com/ethereum/lahja
cd lahja
virtualenv -p python3 venv
. venv/bin/activate
pip install -e .[dev]

Testing Setup

During development, you might like to have tests run on every file save.

Show flake8 errors on file change:

# Test flake8
when-changed -v -s -r -1 lahja/ tests/ -c "clear; flake8 lahja tests && echo 'flake8 success' || echo 'error'"

Run multi-process tests in one command, but without color:

# in the project root:
pytest --numprocesses=4 --looponfail --maxfail=1
# the same thing, succinctly:
pytest -n 4 -f --maxfail=1

Run in one thread, with color and desktop notifications:

cd venv
ptw --onfail "notify-send -t 5000 'Test failure ⚠⚠⚠⚠⚠' 'python 3 test on lahja failed'" ../tests ../lahja

Release setup

For Debian-like systems:

apt install pandoc

To release a new version:

make release bump=$$VERSION_PART_TO_BUMP$$

How to bumpversion

The version format for this repo is {major}.{minor}.{patch} for stable, and {major}.{minor}.{patch}-{stage}.{devnum} for unstable (stage can be alpha or beta).

To issue the next version in line, specify which part to bump, like make release bump=minor or make release bump=devnum.

If you are in a beta version, make release bump=stage will switch to a stable.

To issue an unstable version when the current version is stable, specify the new version explicitly, like make release bump="--new-version 4.0.0-alpha.1 devnum"

lahja's People

Contributors

carver avatar cburgdorf avatar davesque avatar defnull avatar gsalgado avatar lithp avatar pipermerriam avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

lahja's Issues

Remove type checking from tests

What was wrong?

Currently the ./tests files have type hints. This was done to catch a typing bug where Endpoint.stream was returning Any types.

However, the overhead of typing our tests isn't something we want to continue with.

How can it be fixed?

Remove all type hints from under ./tests and create a new file similar to this one in eth-utils and add a line in CI that checks the type hints for this file.

Async `EndpointAPI.subscribe` handlers are broken

What was wrong?

The inspection of handler types uses inspect.iscoroutine when it should be using inspect.iscoroutinefunction

How can it be fixed?

Write tests that excercise this and then switch to the right inspect API.

Trio vs Curio

This branch has an implementation of the Endpoint class in both trio and curio

pipermerriam#2

Benchmark results put them definitively in asyncio -> curio -> trio

  • Asyncio is fastest (3300 events-per-second)
  • Curio is between 2-3x slower (1970 events-per-second)
  • Trio is between 6-9x slower (695 events per second)

The reasons for switching are:

  1. No more CancelToken in favor of built-in reliable cancellations.
  2. Improved developer experience in writing async code.

Both curio and trio will have lock-in and switching again would not be expensive.

Both tools have expressive built-in primatives for async task management, primarily around groups of tasks that you want to run in parallel.

Both tools have good documentation.

curio evaluation

  • First release is in 2016, making it 3 years old and the more established framework of the two.
  • Has a smaller and less active developer community, both the core library and 3rd party tools.
  • Allows spawning of background tasks that are not monitored or managed though it does appear to manage cancellation of these correctly.
  • Allows for getting results from background tasks.
  • pytest-curio doesn't work (but hand-rolling was pretty easy)

Screenshot from 2019-05-08 20-47-17

trio evaluation

  • First release December 2017 making it less than 2 years old
  • Active core and 3rd party developer community with multiple people having made meaningful contributions to the core library in the last month.
  • Extremely strict about background tasks. Requires learning the various patterns for running things that need to be daemonized. (this is a good thing)
  • Does not appear to allow you to retrieve results from a task run in the background. If correct that for any task that runs in the background for which you need the result you must use a communication mechanism to send the result back to the parent task. There does not appear to be any built-in Future concept.
  • trio-typing exists for some semblance of good type hinting.
  • The trio maintainers are working on exposing type hints properly from the core library
  • pytest-trio works.
  • Suite of nice built-in testing tools (haven't actually used them). The one that auto-advances the clock to make code with sleeps and timeouts run "instantly" sounds cool.
  • Does not have a high level tool for running a server on a unix socket which required me to actually open the socket myself, bind it to the file, and then wrap it up in trio primitives, but this was relatively painless.
  • Does not have Queue objects, but instead uses a channel pattern. I think it might be better.

Screenshot from 2019-05-08 20-46-41

Conclusion

I think trio is the clear winner. The performance issue is something we'll have to work on over time. Given the momentum the project has, and the likelyhood that people will want trio to get fast, I wouldn't be surprised if a uvloop equivalent showed up for trio.

Multi-response

What was wrong?

Prior to the PeerBackend system that was recently introduced to trinity there was a single request/response pair that served getting new candidates for peer connections. This proved problematic because we actually wanted/needed multiple sources of new peers.

The backend system works around this by no longer having a generic event for requesting new peers, but rather having each backend source new peers via it's own mechanism.

At the time, lahja operated under a "send everything to everyone" model.

I think there is a valid case to support requests that have many responses.

How can it be fixed?

If we implement #91 then this should be possible. This would either be a new request type, or a flag on the BroadcastConfig which would result in request returning an iterable of responses. Internally, we would tack how many endpoints the request was broadcast to, require them each to ack for which we allow a response event to serve as an ack. The simple ack responses would be discarded, and the actual responses returned.

This also probably suggests that the ack API may need to allow for any response event with a mtaching filter_event_id to serve as an ack rather than restricting it to a single event type.

AttributeError: 'Endpoint' object has no attribute '_name'

What was wrong?

Code that produced the error

I guess a broadcast call from an endpoint created with Endpoint().

Full error output

future: <Task finished coro=<Endpoint.broadcast() done, defined at .../lahja/endpoint.py:405> exception=AttributeError("'Endpoint' object has no attribute '_name'",)>
Traceback (most recent call last):
  File "..../lahja/endpoint.py", line 412, in broadcast
    item._origin = self.name
  File ".../lahja/endpoint.py", line 196, in name
    return self._name
AttributeError: 'Endpoint' object has no attribute '_name'

Environment

v0.12.0

How can it be fixed?

Either the name should be required on init, or _name shouldn't be required at all (maybe an internally generated one, if necessary).

I think I prefer not needing a name

Add an explicit Request/Response mechanism

What was wrong?

Trinity has a lot of code which looks like this:

    async def handle_should_connect_to_requests(self) -> None:
        async for req in self.wait_iter(self.event_bus.stream(ShouldConnectToPeerRequest)):
            self.logger.debug2('Received should connect to request: %s', req.remote)
            should_connect = self.tracker.should_connect_to(req.remote)
            self.event_bus.broadcast(
                ShouldConnectToPeerResponse(should_connect),
                req.broadcast_config()
            )

That's a lot of overhead if all you want to do is expose a method for other processes!

It's also a little dangerous. If multiple coroutines are subscribed like this, then every request will trigger all of them. The caller will accept the first response and drop the rest. You might actually want this in some situations, but I think in most situations it's unexpected and unwanted behavior.

How can it be fixed?

Maybe something like this?

# callee
self.event_bus.register_method(
    'should-connect-to-peer',
    self.tracker.should_connect_to
)

# caller
should_connect = await self.event_bus.call_method('should-connect-to-peer', remote)

call_method triggers a special event, endpoints which are listening for the given event call the registered method with the given args (in this case, remote). Once something like #42 lands Endpoints will inform each other what they're listening for, if you call register_method for an event which another Endpoint is already listening for it could throw an exception.

mypy would be very unhappy if we used the above interface, though I'm not sure what would be better. With this interface you could:

connect_to_peer = cast(
    Callable[[XXX], XXX],
    functools.partial(self.event_bus.call_method, 'should-connect-to')
)

but I'm sure there's a better way, possibly involving a mypy plugin?

Context manager API for EndpointAPI.subscribe

What was wrong?

The EndpointAPI.subscribe API returns a handle on the subscription that the user can call to remove this subscription. This is easy to forget or in some contexts, not always easy have a clear place where this would be called.

How can it be fixed?

We can fix this by making lahja.common.Subscription a context manager and having it automatically call Subscription.unsubscribe in the __exit__. This allows for the subscription API to be used like this:

with EndpointAPI.subscribe(MyEvent, handler_fn):
    # subscription active here:
# automatically unsubscribed here

Better request/response benchmark

What was wrong?

the request/response benchmarks would be better if the reporting occurred on the Driver side and it reported round trip, or if it reported on both sides of the request/response side so we knew both the 1/2 and full round trip times (and ideally the computed metric of the response time).

How can it be fixed?

Adjust how the driver and workers operate.

EventBus.reply API

What is wrong

The act or responding to an event is a little awkward and could maybe be more concise.

How it could be fixed

Maybe we can do something like this.

async for event in event_bus.stream(TheRequestEvent):
    event_bus.reply(event, *response_args, **response_kwargs)

The implementation would be something like

class Endpoint:
    def reply(event, *args, **kwargs):
        self.broadcast(event.expected_response_type(*args, **kwargs), event.broadcast_config())

Seems like a nice shortcut approach which cuts out boilerplate.

Make `broadcast_nowait` a core API

What is wrong

Endpoint.broadcast_nowait() is likely a necessary API. After spending some time with it I'm convinced that broadcast_nowait should be a core event API.

How can it be fixed.

Add it to the core EndpointAPI definition.

Supporting it in trio should be fine, it will just involve having an extra background process that receive events that have been placed on the a SendChannel using the SendChannel.send_nowait API.

Ensure that `broadcast/request/stream/wait_for` cannot be called prior to running the endpoint.

What was wrong?

It should result in an error to call any of the following APIs if the endpoint is not yet running.

  • EndpointAPI.broadcast
  • EndpointAPI.request
  • EndpointAPI.stream
  • EndpointAPI.wait_for

How can it be fixed?

Add tests to ensure these all throw friendly useful errors when called prior to the endpoint being running or after the endpoint has stopped. Ideally we figure a way to do this with as little runtime overhead as possible.

Make `Response` a property of `Request`

What was wrong?

The Request/Response pattern is cumbersome to define.

@dataclass
class GetThingResponse(BaseEvent):
    thing_id: int

@dataclass
class GetThingRequest(BaseRequestResponseEvent[GetThingResponse]):
    thing: Thing

    @staticmethod
    def expected_response_type() -> Type[GetNodeDataResponse]:
        return GetThingResponse

Note that we always have to define things out of order, first defining the response, then the event, and we always have to define the staticmethod.

How can it be fixed?

I think I like this better:

# in lahja codebase
class BaseRequest(BaseEvent):
    Response: Type[BaseEvent]

# use code
class GetThing(BaseRequest):
    thing_id: int

    class Response(BaseEvent):
        thing: Thing

# server code
for req in endpoint.stream(GetThing):
    thing = ...
    await endpoint.broadcast(req.Response(thing)

Things I like about this.

  • concise definition
  • still retains ability to do same type checking (I think)

Only downside is that it doesn't enforce that the Response property is defined on subclasses but I don't see that as a big deal.

More efficient subscription updates.

What was wrong?

Right now, the way most of the APIs that do things like update subscriptions operate, we incur some inefficiencies when multiple updates occur in quick succession. Today, if we were to call Endpoint.subscribe for multiple different events in quick succession, we would issue the same number of subscription updates to the remote.

In reality, we should only ever issue the last one.

How can it be fixed?

We should be able to roll something simple with an Event and maybe a Lock to make it so the call sites that notify of the change call Event.set, which is a noop anytime the event has already been set. Then, once all of the waits have been processed, we do something like replace the Event with a new fresh one.

The end result goal here is that in the event that multiple calls have been made to notify that something has happened, that we only ever issue a single update for all of the stacked up notifications.

Error prone pattern with background tasks that we wait till they are started.

What was wrong?

We have a growing amount of code like this:

self._receiving_loop_running = asyncio.Event()
self._receiving_queue = asyncio.Queue()
self._running = True
self._endpoint_tasks.add(asyncio.ensure_future(self._connect_receiving_queue()))
await self._receiving_loop_running.wait()

  • an asyncio.Event to signal something that starts asynchronously in-fact running.
  • a coroutine that needs to be run in the background which will set the event once it has started.
  • a statement that waits for the event to be set..

This code is error prone.

  • easy to forget to wait for the event.
  • easy to forget to remove the event if the function gets refactored such that it isn't needed anymore.

How can it be fixed?

Not sure yet.

Cover examples in CI

What was wrong?

Examples often break after CI changes

How can it be fixed?

Run examples in CI and do assertions on the output

Potential performance gain on receiving events.

What was wrong?

Currently we read objects passed across process boundaries using this algorithm

  • read until we have exactly 4 bytes
  • decode length from 4 bytes
  • read until we have exactly the number of bytes for the incoming object.

According the the python docs sockets perform best when you read from them in sizes that are powers of two.

https://docs.python.org/3/library/socket.html#socket.socket.recv

Also, sockets allow for incomplete reads, thus the docs for StreamReader.read are clear that it returns up to n bytes of data from the socket.

How can it be fixed?

We should experiment with switching over to the StreamReader.read API and treating the incoming bytes like they are an infinite stream, feeding them into some local buffer as they are read, allowing for much larger reads that are naive of the message format. Then, internally, we handle do the logic for "wait for at least 4 bytes to be on the stream > read them > decode the size > read the size > unpickle > repeat"

Rename `EndpointAPI.subscribe` to be `EndpointAPI.add_event_handler`

What is wrong

We have two concepts that both use the term subscription.

  • The set of endpoint types that a given endpoint cares about
  • The EndpointAPI.subscribe concept

How can it be fixed

I think that we might benefit from changing EndpointAPI.subscribe to be EndpointAPI.add_event_handler

This makes it alight with the internal nomenclature we use and separates it from the other subscriptions concept.

Remove half/full distinction for remote connections.

What was wrong?

#85 condenses the inbound/outbound connection distinction to a single RemoteConnection concept, though it still maintains the distinction at the Endpoint level via half/full connections.

It appears this distinction is no longer strictly necessary and may be introducing un-necessary complexity.

How can it be fixed?

Not entirely clear, but here's some information.

With the recent introduction of endpoints advertising which event types they subscribe to, we should be fine removing this distinction and storing all connections alongside each other. The only thing the name is currently used for is for quick O(1) checks for whether we are connected to a named endpoint.

We can replace the is_connected_to check with either a cache of the names we are connected to or simply iterating since the set of endpoints we need to iterate over is small.

The Endpoint class can no longer maintain the collection of names, relying on the RemoteEndpoint.name as the canonical/sole place this data is stored, and removing the duplication.

This does have the unfortunate effect of having RemoteEndpoint.name be Optional, however, it appears that we must at some level allow for un-named RemoteEndpoint objects, so even if we remove the name from the object, we will still have some area of code that has to allow for un-named objects.

For this reason, I think it's best to keep the RemoteEndpoint.name and remove the name awareness from the Endpoint object which simplifies multiple APIs that currently have to pass around Tuple[Optional[str], RemoteEndpoint] pairs, which can all then be simplified to just the RemoteEndpoint object.

Allow requesitng *ack* when event is received

from #89

What was wrong?

It's easy to accidentally broadcast into the void with nobody subscribing to your broadcast and thus nobody listening.

How can it be fixed?

Extend BroadcastConfig to have a mechanism for requesting the receiver provide and ack.

Care should be taken to correctly handle or restrict this API for broadcasts that are sent to multiple recipients. The initial implementation should probably error if the broadcast is not limited to a single endpoint (such as requiring that filter_endpoint be set)

However, it should be reasonable to do something like count how many endpoints the event was broadcast to, and then to wait until we receive the same number of ack responses before returning to the user.

I am inclined to make this the default behavior.

endpoint.request() should raise when a result is no longer possible

What was wrong?

Code that produced the error

I have some code that hangs when the other endpoint disconnects:

await event_bus.request(...)

Note: this happens inside of a:

future = asyncio.run_coroutine_threadsafe(...)
future.result(300)

But I don't think that makes a difference. Obviously it will eventually give up at the end of the timeout, but due to the nature of the call, it's unreasonable to tighten the timeout and give up early.

Actual Result

This code hangs until the future times out.

Expected Result

An exception to be raised, something like a ConnectionLost, if it becomes impossible to receive a response. (Maybe the right terminology is: if there are no more subscribers?)

Environment

lahja v0.14.0

How can it be fixed?

🤷‍♂️

Allow coroutines in `EndpointAPI.subscribe`

What was wrong?

The EndpointAPI.subscribe API does not allow use of coroutines as handlers. This restricts the api from being used as a building block for server-style request/response handlers since these will need to use the EndpointAPI.broadcast API.

While use of something like EndpointAPI.broadcast_nowait could be suitable, it seems more correct to allow coroutines.

How can it be fixed?

Use inspect.iscoroutine to determine how the handler is called.

Ensure internal events aren't pushed into central event bus

What was wrong?

Currently, even if we would broadcast an event only to our very own Endpoint it will still go on a roundtrip to the EventBus (which usually should sit in another process) which is

a.) a waste of resources

b.) restricts use case to pickable events

If we would short circuit internal events to stay within the Endpoint we could get rid of the special legacy PluginManager events in Trinity and handle those via the event bus as well.

How can it be fixed?

Detect internal events and ensure they stay within the Endpoint.

[WIP] APIs I expected/wanted on "first contact"

Since this is roughly my first real contact with lahja, I wanted to write down my first impressions. I'll leave this open and [WIP] for a bit, as I collect my thoughts.

Custom APIs

An acknowledgment response

I have a request/response pair, but the response is super simple. Basically, I just want to know when it succeeds. So I don't want to have to define a custom response event type. I just want the caller to hang until the server responds.

This could look almost identical to the current Endpoint.subscribe() API (except I also want an async version, mentioned below). As soon as the handler exits, the vanilla response event is sent to acknowledge completion.

Response with a primitive value

I have a request, but just need a simple value back. Do I really need to define the response event?

Something like, client-side:

result = await endpoint.request(NeedsDoubling(3))
assert result == 6

# also, a blocking version:
endpoint.request_blocking(NeedsDoubling(3))

Server-side

async def doubler(event: NeedsDoubling):
  return event.operand * 2
endpoint.add_async_handler(NeedsDoubling, doubler)

# also
endpoint.add_handler(NeedsDoubling, lambda event: event.operand * 2)

Maybe the acknowledgement becomes redundant with this API, because you could just return None.

Connection Retries

If I start both the server and client and the client runs slightly before, I get a connection refused. I'd prefer to have it retry, at least for a while. Probably something like:

  • connect_to_endpoints(ConnectionConfig, timeout=seconds_im_willing_to_wait)

There are likely more APIs that would benefit. Unsure what the default timeout should be, if any.

Synchronous versions

Sync versions of most APIs. I expect the call to block until success (probably by launching a new thread under the hood):

  • Endpoint.connect_to_endpoints()
  • Endpoint.broadcast()

Async versions

like Endpoint.subscribe(event, some_awaitable_handler)

Cleanup duplicate large comment

What was wrong?

def _remove_async_subscription(
self, event_type: Type[BaseEvent], handler_fn: SubscriptionAsyncHandler
) -> None:
self._async_handler[event_type].remove(handler_fn)
if not self._async_handler[event_type]:
self._async_handler.pop(event_type)
# this is asynchronous because that's a better user experience than making
# the user `await subscription.remove()`. This means this Endpoint will keep
# getting events for a little while after it stops listening for them but
# that's a performance problem, not a correctness problem.
asyncio.ensure_future(self._notify_subscriptions_changed())
def _remove_sync_subscription(
self, event_type: Type[BaseEvent], handler_fn: SubscriptionSyncHandler
) -> None:
self._sync_handler[event_type].remove(handler_fn)
if not self._sync_handler[event_type]:
self._sync_handler.pop(event_type)
# this is asynchronous because that's a better user experience than making
# the user `await subscription.remove()`. This means this Endpoint will keep
# getting events for a little while after it stops listening for them but
# that's a performance problem, not a correctness problem.
asyncio.ensure_future(self._notify_subscriptions_changed())

This section of code contains a duplication of a large comment.

How can it be fixed?

These two functions can very likely be collapsed into something that removes the duplication.

Proxy object API

What was wrong?

Here's a fictional API I've been thinking about.

from lahja import proxy
from my_code import Animal

@dataclass
class Animal:
    name: str
    sound: str

class ProxyAnimal(proxy.Proxy[Animal]):
    name = proxy.Text()
    get_sound = proxy.Method()
    wait_something = proxy.AsyncMethod()

# to serve it:
cat = Animal(name='cat', sound='meow')
with proxy.serve_object(endpoint, ProxyAnimal, cat) as obj_id:
    ...

# to use it from across process boundaries
cat = ProxyAnimal(endpoint, obj_id)

I think this achieves the following goals.

  • low boilerplate
  • allows for ABC ObjectAPI equivalent implementations via proxy objects which should mean they are functionally interchangeable.

How can it be fixed?

Not sure. One major problem is the need to interact with the event bus from both async and synchronous call sites. Things like object properties and non-async methods will mean that we'll be entering the endpoint APIs from a synchronous context. We can likely pull something off using threads and broadcast_nowait. It might be nice to have a way to interact with EndpointAPI.request as well since everything from the client side will be a request/response.

The Text() and Method() APIs would likely be descriptors which know how to access the endpoint through their parent object.

A meta-class is probably going to be needed to programmatically generate event types but it may be possible to do get by with something simpler like a set of builtin events for the request/response or an internal event.

The Trio endpoint serializes all messages through a single coro

In the trio endpoint all broadcast calls add to a queue, _outbound_send_channel, which is then processed by _process_outbound_messages. When _outbound_send_channel blocks it blocks all message sends. This could happen if message handler takes a long time to run, or if a remote process's queue fills up. The queue also makes our back-pressure situation more complicated. If a coro is sending messages which are expensive to handle it will slow down all message sends and the offending coro will be given no feedback. I think both situations could be solved by having the trio endpoint work in the same way as the asyncio endpoint works: immediately attempting to send messages when broadcast is called.

Make `start/stop/start_server/stop_server` private APIs

What was wrong?

Some of this is based on the code found in #75 rather than master.

The AsyncioEndpoint.start/stop/start_server/stop_server APIs encourage use the EndpointAPI outside of the standard API.

How can it be fixed?

Make them private.

  • _start
  • _stop
  • _start_server
  • _stop_server

Metaclass API for defining events

What is wrong

Defining new events requires un-necessary boilerplate. Events are typically purely for data transfer and rarely have extra methods or properties.

How can it be fixed?

It'd be really nice to be able to do this:

class NewUser(DataEvent):
    username: str
    created_at: datetime.datetime
    age: int

event = NewUser('piper', datatime.datetime.now(), 99)

Doing this with metaclasses is quite easy and it would reduce the boilerplate in defining new events significantly.

Transfer down `esophagus` consistently rejected

What happened

Tried to send nutritional matter down esophagus

What did you expect to happen

Esophagus transfers food to internal factory to be converted to energy

What actually happend

factory returned food in a partially processed state

Steps to reproduce

Give lahja food

How can it be fixed

I suspect spending a day at home watching cartoons but test is ongoing.

Internal events don't work with non-pickable payload

What was wrong?

One of the main motivations for the internal events is to be able to broadcast events that stay within one Endpoint (and hence Process) and allow those to use non-pickable events as well.

Unfortunately, the simple implementation doesn't work in practice. Trying to use this in practice leaves me with:

Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "stringsource", line 2, in uvloop.loop.Loop.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

I believe what happens is that even if the event does not intent to move across process boundaries, simply pushing it into a multiprocessing.Queue enforces a pickability check.

How can it be fixed?

Probably route these events into a special purpose local queue.

Endpoint or Event Bus?

What is wrong

We have a terminology problem....

  • Sometimes it's an endpoint.
  • Sometimes it's an event_bus

Here's my take on these two terms.

When we say "endpoint" we are referring to a component that is part of a larger set of other connected "endpoints". Code that works with an endpoint will sometimes broadcast and receive events, but primarily deals with establishing connections to other endpoints and managing starting/stopping of the endpoint.

In Trinity, "plugin" code is typically working with an "endpoint".

When we say "event bus" we are referring to the sum of all connected endpoints. Code that works with an "event bus" never deals with managing connections and only deals with the primary event APIs (broadcast/wait_for/subscribe/stream).

The RequestServer from ethereum/trinity#617 works with the "event bus".

How can we fix this

I'm thinking that we might want to try and isolate these two concepts in our API for better code clarity. This could be as simple as separating our ABC base class into two levels.

class EventBusAPI:
    def broadcast
    def stream
    def wait_for
    def subscribe

class EndpointAPI(EventBusAPI):
    def connect_to_endpoint
    ...

This would allow us to distinguish between the two at the level of the type system.

Taken a step further we could make a thin wrapper object EventBus that only exposed the event APIs to hide the other APIs from code that has no business using them.

Allow primative values in responses only

from #89

What was wrong?

It is cumbersome to define a response event for things that have a primative singular value.

How can it be fixed?

We can allow the following primative types to be sent across the wire but only as a response to a request.

  • int
  • float
  • bytes
  • str

We can still package them up in an internal event like this:

class _PrimativeValue(BaseEvent):
    value: Union[int, float, bytes, str]

On the sending side, we only allow sending if the event is accompanied by a BroadcastConfig that defines filter_event_id and filter_endpoint.

On the receiving side, we unwrap the value if these conditions are met. This looks like this will require unwrapping in _process_item which means our various queues and things will have to have flexible type hints for the mechanisms where we're sending things to subscription handlers, futures, etc.

Should connections be bi-directional

What was wrong?

  • Endpoint A is "server"
  • Endpoint B is "client"

B connects to A using the ipc socket A is serving from.

  • B.broadcast(...) will be received by A
  • A.broadcast(...) will not be received by B

This behavior is surprising to me.

How can it be fixed?

I fixed it in my curio/trio branches by sending an internal _Hello event with the endpoint's name so that the "server" endpoint can correctly add the client to it's connected endpoints under the correct name. This makes connection bi-directional which seems good.

Maybe make `EndpointAPI.connect_to` APIs more forgiving.

What was wrong?

Right now, if you try to connect to an endpoint you are already connected to lahja throws an error:

https://github.com/ethereum/lahja/blob/master/lahja/asyncio/endpoint.py#L484

This is likely good behavior since a name match does not definitively identify an endpoint since two endpoints could have the same name but be fundamentally different. Situations I could see this legitimately occuring would be.

  1. Two servers accidentally named the same but with different IPC paths
  2. Two clients accidentally named the same (both establishing a connection to the same server)

How can it be fixed?

Ideally, it would be nice for connect_to_endpoint to not error in the case where you are already connected to the requested endpoint.

In the client -> server case this is relatively trivial. We just need to store the whole ConnectionConfig or its contents (name, ipc_path) and return as if the connection were successful if they match, try to connect if there is no name conflict, and throw an error if the name matches but the IPC path does not.

In the server -> client case it's trickier and not clear how we would correctly identify if the client is indeed the same one. We can potentially divide this up into two cases for clarity but I believe they both pose the same challenge.

  1. two same named clients connect to a server
  2. a server has an existing client with the name and then tries to establish a connection to another serving endpoint with that name.

One potential way to reconcile this would be to define client identity as something like (process-id, name).

However, we probably still must keep the invariant that an endpoint may only ever have a unique set of endpoint names that it is connected to, since allowing multiple same named clients would change the assumptions about broadcasts which are specifically routed to an endpoint having a single recipient, and introduce things like race conditions in a request/response scenario.

Declarative cross implementation test suite tools

What was wrong?

  • lots of duplication between the trio and asyncio based test suites.
  • lots of things that behave differently when they occur across different processes.

How can it be fixed?

I've been imagining this concept that I'll call an "Endpoint Driver". It would be a declarative, implementation agnostic mechanism for defining a sequential set of actions that should be taken on an endpoint, with one of those actions being something like just running python code.

This tool should allow us to write implementation agnostic tests allowing us to define our test suite once and run it across any number of different combinations like:

  • both endpoints of the same implementation in the same process
  • different implementations in the same process
  • same/different implementations in different processes.

I suspect the tests will be much more readable too.

An exception in a handler can halt message processing

Currently the code for _connect_receiving_queue looks like this (and _connect_internal_queue has the same problem):

    async def _connect_receiving_queue(self) -> None:
        self._receiving_loop_running.set()
        while self._running:
            (item, config) = await self._receiving_queue.get()
            event = self._decompress_event(item)
            self._process_item(event, config)

The problem is twofold.

  • If _process_item encounters an exception (which is particularly likely in one of the handlers since those are provided by the user) this function will stop processing events.
  • Because asyncio.gather(asyncio.ensure_future(self._connect_receiving_queue(), ...)) is used, that exception will not be logged until the program exits, messages will be silently dropped.

Here's a failing test:

class RemoveItem(BaseEvent):
    def __init__(self, item):
        super().__init__()
        self.item = item

@pytest.mark.asyncio
async def test_exceptions_suppressed(endpoint) -> None:

    the_set = {1, 3}
    def handle(message: RemoveItem):
        the_set.remove(message.item)

    endpoint.subscribe(RemoveItem, handle)

    endpoint.broadcast(RemoveItem(1))
    await asyncio.sleep(0.05)
    assert the_set == {3}

    endpoint.broadcast(RemoveItem(2))
    await asyncio.sleep(0.05)
    assert the_set == {3}

    endpoint.broadcast(RemoveItem(3))
    await asyncio.sleep(0.05)
    assert the_set == {}   # it fails here

A few ideas for the event bus

Inter-process communication in Trinity

We are currently experimenting with different architectures to figure out the best way forward to prepare for a multi process architecture in which having multiple processes doesn't feel fragile and cumbersome but rather robust, manageable and lightweight.

It currently seems that we are leaning towards an architecture that uses a single event bus to allow loose, decoupled, async communication between processes as explained in this comment.

That idea is currently in a PoC phase with a pre-mature event bus spike as well as two PoC PRs that demo it's usage (ethereum/py-evm#1202, ethereum/py-evm#1172).

There are a number of issues today that I believe are crucial to move this forward hence I'm collecting and throwing them out here.

Random ideas for the event bus

1. Get objective performance metrics

I believe the number one reason why we want this architecture is simplicity and robustness. However, we also need to take performance into account. There are several things that may make this approach less performant out of the box (though, with much much room for improvements) and it would be nice to measure and compare rather than just guess.

2. Allow routed events

To begin with, all events that are broadcasted are delivered to all endpoints of the event bus (where each endpoint is usually sitting in a different process). Then, inside these endpoints, the events are either processed because someone is listening to them via subscribe / stream APIs, or they are lost if no one is interested in them. Nevertheless, they are delivered to each endpoint no matter if they are actually consumed or not.

We can do better than that and allow events to be routed (related: routed events in Chromium multi process architecture).

Routing can happen implicitly (more on that later) or explicitly.

3. Explicit routing via broadcast API

The broadcast API could accept an optional configuration object that could allow things like:

Excluding specific endpoints for delivery

event_bus.broadcast(SomeEvent(), Config(exclude_endpoints = ["proc1", "proc2"])

Only delivering to explicitly named endpoints

event_bus.broadcast(SomeEvent(), Config(filter_endpoints = ["proc1", "proc3"])

Excluding specific endpoints groups for delivery

event_bus.broadcast(SomeEvent(), Config(exclude_groups = ["plugins"])

Only delivering to explicitly named endpoint groups

event_bus.broadcast(SomeEvent(), Config(filter_groups = ["plugins"])

Groups of endpoints can be a powerful concept for when the exact ids/names of endpoints are unknown at design time but a statement about them belonging to a specific group can well be codified at development time (think: plugins!)

4. implicit routing via request API

While one of the primary use cases for the event bus is loosly coupled, async communication (think: PeerJoined / PeerLeft), there does exist a valid use case for communication pattern that fall more into the traditional request / response category. While one can do this with the existing APIs, the current event bus PoC does not have great support for this to make this more ergonomic and efficient.

It would be great to have an API roughly as follows.

peer_info = await event_bus.request(PeerInfoRequest(), Config(filter_endpoint="networking"))

This API would allow to:

  • request something and directly wait on the response in one API
  • ensure that the response is only delivered to this single callsite that was requesting it (implicit routing)

5. Allow all APIs to have timeouts

E.g.

result = await event_bus.request(PeerInfoRequest(), Config(timeout=1.5))

6. Make domestic events super efficient

Events that are scoped to only be broadcasted inside the endpoint that raises them do not need to go through the central coordinator at all and hence can be much much more efficient.

7. Check if we can move the event bus into it's own process and check if that would actually be beneficial

Currently, the event bus itself is sitting in the main process with endpoints being passed into other processes. That also means that all messaging is always going through this process. In other words an event raised in proc1, to be consumed in plugin-proc-n is always going through another hop proc1 -> main -> plugin-proc-n. That also means that there's a decent amount of processing going on in the process that hosts the event bus. But since the main process is at the very top of the hierarchy, it's kind of the natural / ideal place for this to happen. I believe our main process isn't performing much work anyway today but it would still be interesting to see if this work could move into another process.

8. Check if we can have an event bus without a central coordinator

More radical thought. Maybe we can achieve this kind of API even without going through a central coordinator hop

9. Check and compare to existing well established messaging solutions such as ZeroMQ

Something like zmq + aiozmq may actually provide better performance since the bulk of the work happens in native code. Yet it seems to have strong Python bindings. We might be even able to implement our event bus API on top of it or make the backend swappable.

Pure asyncio?

I've been somewhat passively paying attention to the work that both @cburgdorf and @lithp are doing. I don't claim to know it in detail, but my impression is that there has been a reasonable amount of difficulty that is in some way related to multiprocessing

I want to pose something that's been talked about with a small-ish working code example.

Below is a simplistic concept for an event bus that does direct connection and allows transport of any pickle-able object. Does it make sense for us to go this route at this point? Is the somewhat heavy non-asyncio friendly machinery of the multiprocessing library getting in the way?

import asyncio
import logging
from pathlib import Path
import pickle


class Endpoint:
    def __init__(self, path: Path) -> None:
        self.path = path
        self.running = asyncio.Event()
        self.finished = asyncio.Event()

    async def start_server(self) -> None:
        await asyncio.start_unix_server(
            self.accept_connection,
            path=str(self.path),
        )
        # set the running event
        self.running.set()

    async def accept_connection(self, reader, writer) -> None:
        self.reader, self.writer = reader, writer
        logging.info('INCOMING CONNECTION: %s', self.path)
        self.server = asyncio.ensure_future(self.listen())

    @classmethod
    async def connect(cls, path: Path) -> None:
        reader, writer = await asyncio.open_unix_connection(
            str(path),
        )
        client = cls(path)
        client.reader = reader
        client.writer = writer
        client.server = asyncio.ensure_future(client.listen())
        logging.info('OUTBOUND CONNECTION: %s', path)
        return client

    async def listen(self) -> None:
        try:
            while not self.reader.at_eof():
                raw_size = await self.reader.readexactly(4)
                size = int.from_bytes(raw_size, 'little')
                message = await self.reader.readexactly(size)
                obj = pickle.loads(message)

                logging.info('RECEIVED OBJECT: %s', obj)
        finally:
            self.finished.set()

    async def broadcast(self, obj):
        message = pickle.dumps(obj)
        size = len(message)
        self.writer.write(size.to_bytes(4, 'little'))
        self.writer.write(message)
        await self.writer.drain()
        logging.info('SENT OBJECT: %s', obj)

    async def close(self):
        if self.writer.can_write_eof():
            self.writer.write_eof()
            await self.writer.drain()
        self.server.cancel()
        self.writer.close()
        # set the finished event
        self.finished.set()


# tests.py
from pathlib import Path
from typing import NamedTuple

import pytest

from purebus import Endpoint


IPC_PATH = Path('./test.ipc')


class User(NamedTuple):
    name: str
    age: int


@pytest.fixture
async def server():
    endpoint = Endpoint(IPC_PATH)
    await endpoint.start_server()
    yield endpoint
    await endpoint.close()


@pytest.mark.asyncio
async def test_connection(server):
    await server.running.wait()

    client = await Endpoint.connect(server.path)
    await client.broadcast(b'ping')
    await client.broadcast(1)
    await client.broadcast(User('piper', 35))
    await client.close()

Here's the logging output from running this.

$ pytest tests.py --log-cli-level=INFO
============================================================ test session starts =============================================================
platform linux -- Python 3.6.5, pytest-3.6.2, py-1.7.0, pluggy-0.6.0 -- /home/piper/python-environments/evm/bin/python
cachedir: .pytest_cache
rootdir: /home/piper/projects/pure-bus, inifile:
plugins: xdist-1.18.1, cov-2.5.1, asyncio-0.9.0, asyncio-network-simulator-0.1.0a2, hypothesis-3.69.5
collected 1 item

tests.py::test_connection
--------------------------------------------------------------- live log call ----------------------------------------------------------------
purebus.py                  69 INFO     OUTBOUND CONNECTION: test.ipc
purebus.py                  90 INFO     SENT OBJECT: b'ping'
purebus.py                  90 INFO     SENT OBJECT: 1
purebus.py                  90 INFO     SENT OBJECT: User(name='piper', age=35)
PASSED                                                                                                                                 [100%]
------------------------------------------------------------- live log teardown --------------------------------------------------------------
purebus.py                  57 INFO     INCOMING CONNECTION: test.ipc

Thread based endpoint implementation

What was wrong?

One thing that we (@carver ) need on the trinity side of things is the ability to write a fully synchronous plugin that talks over the endpoint API. Such a plugin would be specifically for things like running Chain.import_block and not having to jump through any hoops to avoid blocking the event loop.

How can it be fixed?

This is totally up for debate, but probably something dumb using the standard library threading library.

Still to be figured out:

  • How do we deal with the aynsc def based EndpointAPIs
    • maybe just raise NotImplemented. It's possible this won't be enough.
    • maybe we can define the API using the typing.overload to have the type system allow for both synchronous and asynchronous implementations of the APIs that use async def

Only relay messages to processes that hold subscriptions to the event

What was wrong?

Currently, if one broadcasts a message and not exclusively limit the scope of recipients (BroadcastConfig) it will be relayed from the coordinator process to all other endpoints.

How can it be fixed?

I guess it should be doable to make the coordinator process (in other words the EventBus aware of which endpoint is subscribed to which event and hence only relay messages to interested parties.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.