Giter Site home page Giter Site logo

pyre's Introduction

Pyre

This is a Python port of Zyre 1.0, implementing the same ZRE protocol.

Pyre - an open-source framework for proximity-based peer-to-peer applications

Description

Pyre does local area discovery and clustering. A Pyre node broadcasts UDP beacons, and connects to peers that it finds. This class wraps a Pyre node with a message-based API.

All incoming events are messages delivered via the recv call of a Pyre instance. The first frame defines the type of the message, and following frames provide further values:

ENTER fromnode headers
    a new peer has entered the network
EXIT fromnode
    a peer has left the network
JOIN fromnode groupname
    a peer has joined a specific group
LEAVE fromnode groupname
    a peer has joined a specific group
WHISPER fromnode message
    a peer has sent this node a message
SHOUT fromnode groupname message
    a peer has sent one of our groups a message

In SHOUT and WHISPER the message is a single frame in this version of Pyre. In ENTER, the headers frame contains a packed dictionary, that can be unpacked using json.loads(msg) (see chat client).

To join or leave a group, use the join() and leave() methods. To set a header value, use the set_header() method. To send a message to a single peer, use whisper(). To send a message to a group, use shout().

Installation

For now use Pip:

pip install https://github.com/zeromq/pyre/archive/master.zip

API

import pyre
#  Constructor, creates a new Zyre node. Note that until you start the
#  node it is silent and invisible to other nodes on the network.
node = pyre.Pyre()

#  Set node header; these are provided to other nodes during discovery
#  and come in each ENTER message.
node.set_header(name, value)

#  (TODO: Currently a Pyre node starts immediately) Start node, after setting header values. When you start a node it
#  begins discovery and connection.
node.start()

#  Stop node, this signals to other peers that this node will go away.
#  This is polite; however you can also just destroy the node without
#  stopping it.
node.stop()

#  Join a named group; after joining a group you can send messages to
#  the group and all Zyre nodes in that group will receive them.
node.join(group)

#  Leave a group
node.leave(group)

#  Receive next message from network; the message may be a control
#  message (ENTER, EXIT, JOIN, LEAVE) or data (WHISPER, SHOUT).
#  Returns a list of message frames
msgs = node.recv();

# Send message to single peer, specified as a UUID object (import uuid)
# Destroys message after sending
node.whisper(peer, msg)

# Send message to a named group
# Destroys message after sending
node.shout(group, msg);

#  Send string to single peer specified as a UUID string.
#  String is formatted using printf specifiers.
node.whispers(peer, msg_string)

#  Send message to a named group
#  Destroys message after sending
node.shouts(group, msg_string);
    
#  Return handle to the Zyre node, for polling
node.get_socket()
# use node.get_socket().getsockopt(zmq.FD) to acquire 
# the filedescriptor
# Don't use this for getting Pyre events you can use the 
# node.inbox to get those events

Example Chat Client

try:
    from zyre_pyzmq import Zyre as Pyre
except Exception as e:
    print("using Python native module", e)
    from pyre import Pyre 

from pyre import zhelper 
import zmq 
import uuid
import logging
import sys
import json

def chat_task(ctx, pipe):
    n = Pyre("CHAT")
    n.set_header("CHAT_Header1","example header1")
    n.set_header("CHAT_Header2","example header2")
    n.join("CHAT")
    n.start()

    poller = zmq.Poller()
    poller.register(pipe, zmq.POLLIN)
    print(n.socket())
    poller.register(n.socket(), zmq.POLLIN)
    print(n.socket())
    while(True):
        items = dict(poller.poll())
        print(n.socket(), items)
        if pipe in items and items[pipe] == zmq.POLLIN:
            message = pipe.recv()
            # message to quit
            if message.decode('utf-8') == "$$STOP":
                break
            print("CHAT_TASK: %s" % message)
            n.shouts("CHAT", message.decode('utf-8'))
        else:
        #if n.socket() in items and items[n.socket()] == zmq.POLLIN:
            cmds = n.recv()
            msg_type = cmds.pop(0)
            print("NODE_MSG TYPE: %s" % msg_type)
            print("NODE_MSG PEER: %s" % uuid.UUID(bytes=cmds.pop(0)))
            print("NODE_MSG NAME: %s" % cmds.pop(0))
            if msg_type.decode('utf-8') == "SHOUT":
                print("NODE_MSG GROUP: %s" % cmds.pop(0))
            elif msg_type.decode('utf-8') == "ENTER":
                headers = json.loads(cmds.pop(0).decode('utf-8'))
                print("NODE_MSG HEADERS: %s" % headers)
                for key in headers:
                    print("key = {0}, value = {1}".format(key, headers[key]))
            print("NODE_MSG CONT: %s" % cmds)
    n.stop()


if __name__ == '__main__':
    # Create a StreamHandler for debugging
    logger = logging.getLogger("pyre")
    logger.setLevel(logging.INFO)
    logger.addHandler(logging.StreamHandler())
    logger.propagate = False

    ctx = zmq.Context()
    chat_pipe = zhelper.zthread_fork(ctx, chat_task)
    # input in python 2 is different
    if sys.version_info.major < 3:
        input = raw_input

    while True:
        try:
            msg = input()
            chat_pipe.send(msg.encode('utf_8'))
        except (KeyboardInterrupt, SystemExit):
            break
    chat_pipe.send("$$STOP".encode('utf_8'))
    print("FINISHED")

Look at the ZOCP project for examples of how Pyre can be integrated into different environments and frameworks, i.e.:

Pyre uses the Python Logging module. To change the debug level:

    # Create a StreamHandler for debugging
    logger = logging.getLogger("pyre")
    logger.setLevel(logging.INFO)
    # i.e. logging.DEBUG, logging.WARNING
    logger.addHandler(logging.StreamHandler())
    logger.propagate = False

Requirements

Python only needs PyZMQ. On some older versions of Python it also needs the ipaddress module.

The recommended Python version is 3.3+

Project Organization

Pyre is owned by all its authors and contributors. This is an open source project licensed under the LGPLv3. To contribute to Zyre please read the C4.1 process that we use.

To report an issue, use the PYRE issue tracker at github.com.

For more information on this project's maintenance, see MAINTENANCE.md.

pyre's People

Contributors

axelvoitier avatar blink1073 avatar bluca avatar c-rack avatar cgettys avatar fieldofview avatar geektrovert avatar gheber avatar gpotter2 avatar hintjens avatar jfolz avatar keent avatar madarp avatar marccardinal avatar mikigrit avatar papr avatar prepultrue avatar rio avatar romanroibu avatar shader avatar sphaero avatar tangb avatar wesyoung avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pyre's Issues

Multiple Pyre methods seem to be using wrong recv() method

All of the following use self.recv() to get the results rather than self.actor.recv_multipart() which seems more appropriate.

  • get_peers
  • get_peer_address
  • get_own_groups
  • get_peer_groups

get_peer_header_value should probably use self.actor.recv(), since it doesn't use a multipart message.

Pypi support

please add this repo into pypi.
BTW I found there is a project named "pyre" already.

get_peer_name

get_peer_name() doesn't seem to be currently implemented.

It might make sense to add a get_peer() method that returns the peer object, which can be directly queried and manipulated, instead of always operating indirectly using uuids.

Sending a unicode message via Pyre.whisper damages subsequent message

Executing the following code:

>>>n.whisper(id, 'first message')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib64/python3.3/site-packages/pyre/pyre.py", line 118, in whisper
    self.actor.send(msg)
  File "/usr/lib64/python3.3/site-packages/pyre/zactor.py", line 78, in send
    self.pipe.send(*args, **kwargs)
  File "zmq/backend/cython/socket.pyx", line 574, in zmq.backend.cython.socket.Socket.send (zmq/backend/cython/socket.c:5437)
  File "zmq/backend/cython/socket.pyx", line 614, in zmq.backend.cython.socket.Socket.send (zmq/backend/cython/socket.c:5150)
TypeError: unicode not allowed, use send_string
>>>n.whispers(id, 'second message')
>>>n.whispers(id, 'third message')

Results in the following output on the second node:

>>> n.recv()
[b'WHISPER', b"\xc8'C\xfbR\xfbL[\x8c\xed\xfa\xcb\xbe\\\xbfW", b'c82743', b'WHISPER']
>>> n.recv()
[b'WHISPER', b"\xc8'C\xfbR\xfbL[\x8c\xed\xfa\xcb\xbe\\\xbfW", b'c82743', b'third message']

This means that the first mistake wrote over the payload of the second message with the string 'WHISPER'.

Trouble on OSX and Windows in zbeacon

On Windows we were experiencing 10022 and 10049 errors in ZBeacon. On OSX no errors but no broadcast beacons either. In Windows we only noticed the errors when the first beacon was about to be transmitted. It would complain about invalid context. This might be caused by us using the same socket for receiving beacons as well as for transmitting them.

see:
https://github.com/zeromq/pyre/blob/master/zbeacon.py#L231

We could try a workaround by creating a temporary udpsocket for sending a beacon.

I've checked the logic from zbeacon.c:
https://github.com/zeromq/czmq/blob/master/src/zbeacon.c

But in there also the receiving socket is used for sending the beacons. Apparently that does work on OSX?

Puzzling...

Gossip Discovery

I noticed that this is in the TODO list. Any progress on this? If there is a protocol spec for this I could do a PR and work on it.

Error - Resource temporarily unavailable

I was running 5 nodes on the same machine and after sometime (~3minutes), I got this error.

Traceback (most recent call last):
File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
self.run()
File "/usr/lib/python3.4/threading.py", line 868, in run
self._target(_self._args, *_self._kwargs)
File "/usr/local/lib/python3.4/dist-packages/pyre/zbeacon.py", line 204, in init
self.run()
File "/usr/local/lib/python3.4/dist-packages/pyre/zbeacon.py", line 395, in run
self.recv()
File "/usr/local/lib/python3.4/dist-packages/pyre/zbeacon.py", line 371, in recv
self._pipe.send_unicode(peername, zmq.SNDMORE)
File "/usr/local/lib/python3.4/dist-packages/zmq/sugar/socket.py", line 330, in send_string
return self.send(u.encode(encoding), flags=flags, copy=copy)
File "zmq/backend/cython/socket.pyx", line 574, in zmq.backend.cython.socket.Socket.send (zmq/backend/cython/socket.c:5434)
File "zmq/backend/cython/socket.pyx", line 621, in zmq.backend.cython.socket.Socket.send (zmq/backend/cython/socket.c:5196)
File "zmq/backend/cython/socket.pyx", line 181, in zmq.backend.cython.socket._send_copy (zmq/backend/cython/socket.c:2035)
File "zmq/backend/cython/checkrc.pxd", line 15, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/socket.c:6112)
zmq.error.Again: Resource temporarily unavailable

Followed by unending messages:
WARNING:pyre.pyre_node:Peer None isn't ready

No way to detect when a Pyre's ZBeacon has exited

To illustrate, run this:

import logging
from pyre import Pyre

logger = logging.getLogger('pyre')
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

pyre = Pyre()
pyre.start()

As long as there's a network connection during startup, you will get output like this:

...
Set up a broadcast beacon to w.x.y.z:5670

When you disable your network connection, you will get the following output:

Network seems gone, exiting zbeacon

(in addition, if the network was not available during startup, there's an exception in prepare_udp)

The corresponding actor exits and sends shim_pipe.signal() (i.e. a single part message), but PyreNode ignores this. In particular, this happens here:

def recv_beacon(self):
    # Get IP address and beacon of peer
    try:
        ipaddress, frame = self.beacon_socket.recv_multipart()
    except ValueError:
        return 

I don't see a way to work around this while still using the Pyre class, and I'm not sure about the proper fix. Is a node without a beacon useful, or is that a reason for shutting the Node down?

Error calling Pyre()

I tried the chat client example in pyre and it failed. Here is a simplified ipython calling sequence that gives the same error with calling ipaddress.

Thanks

In [5]: ctx=zmq.Context()

In [6]: ctx
Out[6]: <zmq.sugar.context.Context at 0x14342a8>

In [7]: n=Pyre(ctx)

In [8]: myID: 2bf5d55e-9f75-4e48-bf27-275af66ae4f2
Exception in thread Thread-3:
Traceback (most recent call last):
   File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
   self.run()
   File "/usr/lib/python2.7/threading.py", line 505, in run
   self.__target(*self.__args, **self.__kwargs)
   File "/usr/local/lib/python2.7/dist-packages/pyre/zbeacon.py", line 129, in __init__
   self._init_socket()
   File "/usr/local/lib/python2.7/dist-packages/pyre/zbeacon.py", line 149, in _init_socket
   if ipaddress.IPv4Address(self.announce_addr).is_multicast:
   File "/usr/local/lib/python2.7/dist-packages/ipaddress.py", line 1269, in __init__
   self._check_packed_address(address, 4)
   File "/usr/local/lib/python2.7/dist-packages/ipaddress.py", line 526, in _check_packed_address
   expected_len, self._version))
AddressValueError: '255.255.255.255' (len 15 != 4) is not permitted as an IPv4 address (did you pass in a bytes instead of a unicode object?)

prevent the sequence number from going beyond 65535

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/home/people/arnaud/src/pyre/pyre/pyre_peer.py", line 84, in send
    msg.send(self.mailbox)
  File "/home/people/arnaud/src/pyre/pyre/zre_msg.py", line 154, in send
    self._put_number2(self.sequence)
  File "/home/people/arnaud/src/pyre/pyre/zre_msg.py", line 379, in _put_number2
    d = struct.pack('>H', nr)
struct.error: 'H' format requires 0 <= number <= 65535

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.4/threading.py", line 868, in run
    self._target(*self._args, **self._kwargs)
  File "/home/people/arnaud/src/pyre/pyre/zactor.py", line 57, in run
    self.shim_handler(*self.shim_args, **self.shim_kwargs)
  File "/home/people/arnaud/src/pyre/pyre/pyre_node.py", line 52, in __init__
    self.run()
  File "/home/people/arnaud/src/pyre/pyre/pyre_node.py", line 501, in run
    self.recv_api()
  File "/home/people/arnaud/src/pyre/pyre/pyre_node.py", line 189, in recv_api
    self.peers[peer_id].send(msg)
  File "/home/people/arnaud/src/pyre/pyre/pyre_peer.py", line 85, in send
    except zmq.EAGAIN as e:
TypeError: catching classes that do not inherit from BaseException is not allowed

also fix the zmq.EAGAIN exception (its not an exception)

Pyre.set_port() doesn't seem to work

At first I thought it was just a typo in that the argument list of set_port (around line 95 in pyre.py) refers to port_nbt, but the call to self.actor.send uses port_nbr. But even when I switched port_nbt to port_nbr, I got an error:

from pyre import Pyre
import zmq
p = Pyre()
p.set_port(5671)
Traceback (most recent call last):
File "", line 1, in
File "/scratch/tmp/janin/virtualenv/python33.pyre/lib/python3.3/site-packages/pyre/pyre.py", line 100, in set_port
self.actor.send(port_nbr)
File "/scratch/tmp/janin/virtualenv/python33.pyre/lib/python3.3/site-packages/pyre/zactor.py", line 78, in send
return self.pipe.send(_args, *_kwargs)
File "zmq/backend/cython/socket.pyx", line 581, in zmq.backend.cython.socket.Socket.send (zmq/backend/cython/socket.c:5447)
File "zmq/backend/cython/socket.pyx", line 628, in zmq.backend.cython.socket.Socket.send (zmq/backend/cython/socket.c:5209)
File "zmq/backend/cython/socket.pyx", line 168, in zmq.backend.cython.socket._send_copy (zmq/backend/cython/socket.c:1959)
File "zmq/utils/buffers.pxd", line 200, in buffers.asbuffer_r (zmq/backend/cython/socket.c:7257)
File "zmq/utils/buffers.pxd", line 151, in buffers.asbuffer (zmq/backend/cython/socket.c:6686)
TypeError: 5671 does not provide a buffer interface.

This isn't urgent, but I thought I'd pass it on...

Adam Janin
[email protected]

ZActor has no send_multipart method

Attempting to send a list of strings via Pyre.whisper or Pyre.shout raises an exception because the ZActor.send_multipart method does not exist.

File "/usr/lib64/python2.7/site-packages/pyre/pyre.py", line 117, in whisper
  self.actor.send_multipart(msg)
AttributeError: 'ZActor' object has no attribute 'send_multipart'

package broken due to last change

from pyre.zactor import ZActor File "/Users/.../development/work/.venv/lib/python2.7/site-packages/pyre/__init__.py", line 6, in <module> with open(version_file_path) as version_file: IOError: [Errno 2] No such file or directory: '/Users/.../development/work/.venv/lib/python2.7/site-packages/pyre/VERSION'

Poorly serialized peer info

After fixing #33, Pyre.get_peers() returns the following:

>>> n.get_peers()
[b'p', b"{UUID('84320b72-d244-460d-b416-b3f8e2d413ef'): <pyre.pyre_peer.PyrePeer object at 0x7ffff0d6e890>}"]

The uuid string can be extracted and used, but additional peer details are unavailable because the peer object cannot be deserialized. Better serialization or a different format should be used instead.

Obtaining Peers in Group

Hello

Thanks for the Python Port - I'm using it in a University project at the moment.
I would like to obtain a list of the current peers which are members of a specific group.

Pyre provides Pyre.peers() which seems to be an implementation of Zyre zyre_peers() (see below or https://github.com/zeromq/zyre#toc3-188) and according to my tests provides all peers.

Is there an implementation of Zyre zyre_peers_by_group() (see below or https://github.com/zeromq/zyre#toc3-188) which returns only peers which belong to this group?

If this is not implemented, could anyone recommend a way of achieving this?

Grateful for any assistance.

//  *** Draft method, for development use, may change without warning ***
    //  Return zlist of current peer ids.
    //  Caller owns return value and must destroy it when done.
    ZYRE_EXPORT zlist_t *
        zyre_peers (zyre_t *self);

    //  *** Draft method, for development use, may change without warning ***
    //  Return zlist of current peers of this group.
    //  Caller owns return value and must destroy it when done.
    ZYRE_EXPORT zlist_t *
        zyre_peers_by_group (zyre_t *self, const char *name);

actor can deadlock

Don't know how yet:

  File "/home/people/arnaud/src/pyre/pyre/pyre.py", line 48, in get_uuid
    self.uuid = uuid.UUID(bytes=self.actor.recv())
  File "/home/people/arnaud/src/pyre/pyre/zactor.py", line 87, in recv
    return self.pipe.recv(*args, **kwargs)

Windows: "ImportError: cannot import name inet_ntop" in zhelper.py, line 42

File "c:\python33\lib\site-packages\zocp__init__.py", line 3, in
from .zocp import ZOCP
File "c:\python33\lib\site-packages\zocp\zocp.py", line 18, in
from pyre import Pyre
File "c:\python33\lib\site-packages\pyre__init__.py", line 3, in
from .pyre import Pyre
File "c:\python33\lib\site-packages\pyre\pyre.py", line 9, in
from . import zbeacon
File "c:\python33\lib\site-packages\pyre\zbeacon.py", line 31, in
from . import zhelper
File "c:\python33\lib\site-packages\pyre\zhelper.py", line 42, in
from socket import AF_INET, AF_INET6, inet_ntop
ImportError: cannot import name inet_ntop

Missing statement on supported Python version(s)

When trying to run the "Example Chat Client" in the README.md file, I found out that it seems to work only with Python 3, while I'm getting a traceback for Python 2:

Traceback (most recent call last):
  File "pyre_chat.py", line 1, in <module>
    from pyre import Pyre 
  File "/opt/lib/python2.7/site-packages/pyre/__init__.py", line 3, in <module>
    from .pyre import Pyre
  File "/opt/lib/python2.7/site-packages/pyre/pyre.py", line 9, in <module>
    from . import zbeacon
  File "/opt/lib/python2.7/site-packages/pyre/zbeacon.py", line 28, in <module>
    import ipaddress
ImportError: No module named ipaddress

It would be helpful to state somewhere in the README.md which Python versions are supported, and even better, make it run on Python 2 as well.

beacon is dead after network error

I have the following scenario on which there is a network problem during beacon setup, where I get the following exception inside ZBeacon:

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/pyre/zbeacon.py", line 118, in prepare_udp
    socket.IP_ADD_MEMBERSHIP, mreq)
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 19] No such device

The problem is that the caller cannot know it happened as the exception was triggered inside ZActor which has it's own thread (using Threading package).

Incompatibility between Pyre and Zyre

I am working on a project that I need to have a Pyre node and many Zyre nodes in a group. I created the nodes and tested Pyre node to Pyre node and also Zyre node to Zyre node and they are working fine but there is an incompatibility between Pyre and Zyre. As soon as the Zyre node joins the group all the Pyre nodes in the group facing an error and they die. The error is:

Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/usr/lib/python3.5/threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "/home/ali/.local/lib/python3.5/site-packages/pyre/zactor.py", line 57, in run
self.shim_handler(*self.shim_args, **self.shim_kwargs)
File "/home/ali/.local/lib/python3.5/site-packages/pyre/pyre_node.py", line 53, in init
self.run()
File "/home/ali/.local/lib/python3.5/site-packages/pyre/pyre_node.py", line 519, in run
self.recv_beacon()
File "/home/ali/.local/lib/python3.5/site-packages/pyre/pyre_node.py", line 454, in recv_beacon
beacon = struct.unpack('cccb16sH', frame)
struct.error: unpack requires a bytes object of length 22

I tried to find a solution for this but I couldn't find anything online. Does anybody have any idea how I can solve this?

Getting Pyre in sync with Zyre

As Pyre is a bit behind Zyre development quite some changes are needed to get it up to date. I have a pyre version which is up to date in my own repo. I would like to merge this however before doing so I hope to get some feedback of running on other platforms.

Please test pyre by:

pip install https://github.com/sphaero/pyre/archive/master.zip

Or cloning the repo

Then use an example like this:

from pyre import Pyre 
from pyre import zhelper 
import zmq 
import uuid
import logging

def chat_task(ctx, pipe):
    n = Pyre(ctx)
    n.join("CHAT")

    poller = zmq.Poller()
    poller.register(pipe, zmq.POLLIN)
    poller.register(n.get_socket(), zmq.POLLIN)
    while(True):
        items = dict(poller.poll())
        if pipe in items and items[pipe] == zmq.POLLIN:
            message = pipe.recv()
            # message to quit
            if message.decode('utf-8') == "$$STOP":
                break
            print("CHAT_TASK: %s" % message)
            n.shout("CHAT", message)
        if n.get_socket() in items and items[n.get_socket()] == zmq.POLLIN:
            cmds = n.get_socket().recv_multipart()
            type = cmds.pop(0)
            print("NODE_MSG TYPE: %s" % type)
            print("NODE_MSG PEER: %s" % uuid.UUID(bytes=cmds.pop(0)))
            print("NODE_MSG NAME: %s" % cmds.pop(0))
            if type.decode('utf-8') == "SHOUT":
                print("NODE_MSG GROUP: %s" % cmds.pop(0))
            print("NODE_MSG CONT: %s" % cmds)
    n.stop()


if __name__ == '__main__':
    # Create a StreamHandler for debugging
    logger = logging.getLogger("pyre")
    logger.setLevel(logging.DEBUG)
    logger.addHandler(logging.StreamHandler())
    logger.propagate = False

    ctx = zmq.Context()
    chat_pipe = zhelper.zthread_fork(ctx, chat_task)
    while True:
        try:
            # In python2 you'll want raw_input() instead of input()
            msg = input()
            chat_pipe.send(msg.encode('utf_8'))
        except (KeyboardInterrupt, SystemExit):
            break
    chat_pipe.send("$$STOP".encode('utf_8'))
    print("FINISHED")

This example also works with the chat example from zyre.

What is still missing is Gossip support and some methods for headers.
I've also implemented a ZActor class hoping to solve some issues with threading

I've tested on:

  • Linux x86_64, zmq 4.0.4, pyzmq 13.1.0, python 3.3
  • Linux x86_64, zmq 4.0.6, pyzmq 14.4.1, python 3.4
  • Linux i686, zmq 4.0.5, pyzmq 14.4.1, python 2.7
  • Linux i686, zmq 4.0.5, pyzmq 14.4.1, python 3.2
  • Win 7, zmq 4.0.4, pyzmq 14.2.0, python3.4

"TypeError: UUID does not provide buffer interface" when calling get_uuid()

When running:

import pyre
n = pyre.Pyre()
n.get_uuid()

I get the following error:

Traceback (most recent call last):
  File "/usr/lib64/python3.3/threading.py", line 901, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.3/threading.py", line 858, in run
    self._target(*self._args, **self._kwargs)
  File "./pyre/zactor.py", line 57, in run
    self.shim_handler(*self.shim_args, **self.shim_kwargs)
  File "./pyre/pyre_node.py", line 50, in __init__
    self.run()
  File "./pyre/pyre_node.py", line 479, in run
    self.recv_api()
  File "./pyre/pyre_node.py", line 130, in recv_api
    self._pipe.send(self.identity)
  File "zmq/backend/cython/socket.pyx", line 574, in zmq.backend.cython.socket.Socket.
send (zmq/backend/cython/socket.c:5437)                                              
  File "zmq/backend/cython/socket.pyx", line 621, in zmq.backend.cython.socket.Socket.
send (zmq/backend/cython/socket.c:5199)                                              
  File "zmq/backend/cython/socket.pyx", line 168, in zmq.backend.cython.socket._send_c
opy (zmq/backend/cython/socket.c:1949)                                               
  File "zmq/utils/buffers.pxd", line 200, in buffers.asbuffer_r (zmq/backend/cython/so
cket.c:7247)                                                                         
  File "zmq/utils/buffers.pxd", line 151, in buffers.asbuffer (zmq/backend/cython/sock
et.c:6676)                                                                           
TypeError: UUID('8cd6853b-bad0-4037-8601-64affca1a37c') does not provide a buffer inte
rface.

zbeacon.py binds to link-local interface

Hi, I've been trying out Pyre for a work project. On my windows desktop dev environment, I notice that during the zbeacon.py/_prepare_socket method, it will bind to any link-local interface on my system that is not disabled, such as secondary NIC or VPN. When this happens, pyre cannot send or receive any messages. Is link-local binding useful in a way I'm not considering? If not, I have a patch to skip over link-local during interface selection.

Problem: Wrong UUID prefix

The ZRE Interconnection Model says that the DEALER and ROUTER sockets' identity should be a byte containing 1, followed by 16-octet UUID.

pyre_peer.py#L50:

self.mailbox.setsockopt(zmq.IDENTITY, b'1' + reply_to.bytes)

Problem: b'1' translates into 0x31 and not 0x1.
This is the ascii code of '1' not a decimal value of 1.

Solution: Correct the prefix.
Question: What would be the best way to do that?

Thanks to @saki4510t for catching that.

Pyre with gevent

Hi

I'm trying to use Pyre with bottle and gevent but I got some exceptions:

>> python3 test.py
Init green socket Thread-1
----->SETUP EVENTS Thread-1
READ is not NONE
WRITE is not NONE
Init green socket Thread-1
----->SETUP EVENTS Thread-1
READ is not NONE
WRITE is not NONE
Init zsocket Thread-2
Init zsocket Thread-2
Init zsocket Thread-2
Init zsocket Thread-2
Init green socket Thread-3
----->SETUP EVENTS Thread-3
READ is not NONE
WRITE is not NONE
Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.4/threading.py", line 868, in run
    self._target(*self._args, **self._kwargs)
  File "test.py", line 41, in __pyre_task
    node = Pyre('TEST')
  File "/usr/local/lib/python3.4/dist-packages/pyre/pyre.py", line 52, in __init__
    self.actor = ZActor(self._ctx, PyreNode, self._outbox)
  File "/usr/local/lib/python3.4/dist-packages/pyre/zactor.py", line 55, in __init__
    self.pipe.wait()
  File "/usr/local/lib/python3.4/dist-packages/pyre/zsocket.py", line 62, in wait
    msg = self.recv()
  File "/usr/local/lib/python3.4/dist-packages/zmq/green/core.py", line 261, in recv
    self._wait_read()
  File "/usr/local/lib/python3.4/dist-packages/zmq/green/core.py", line 170, in _wait_read
    assert self.__readable.ready(), "Only one greenlet can be waiting on this event"
AttributeError: 'NoneType' object has no attribute 'ready'
Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.4/threading.py", line 868, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.4/dist-packages/pyre/zactor.py", line 58, in run
    self.shim_handler(*self.shim_args, **self.shim_kwargs)
  File "/usr/local/lib/python3.4/dist-packages/pyre/pyre_node.py", line 54, in __init__
    self.run()
  File "/usr/local/lib/python3.4/dist-packages/pyre/pyre_node.py", line 507, in run
    self._pipe.signal()
  File "/usr/local/lib/python3.4/dist-packages/pyre/zsocket.py", line 54, in signal
    self.send(struct.pack("Q", signal_value))
  File "/usr/local/lib/python3.4/dist-packages/zmq/green/core.py", line 229, in send
    self.__state_changed()
  File "/usr/local/lib/python3.4/dist-packages/zmq/green/core.py", line 131, in __state_changed
    self.__writable.set()
AttributeError: 'NoneType' object has no attribute 'set'


----->CLEANUP EVENTS Thread-2
Exception ignored in: <bound method ZSocket.__del__ of <pyre.zsocket.ZSocket object at 0x7f3978b0e288>>
Traceback (most recent call last):
  File "/usr/local/lib/python3.4/dist-packages/zmq/green/core.py", line 78, in __del__
    self.close()
  File "/usr/local/lib/python3.4/dist-packages/zmq/green/core.py", line 82, in close
    self.__cleanup_events()
  File "/usr/local/lib/python3.4/dist-packages/zmq/green/core.py", line 91, in __cleanup_events
    self.__writable.set()
AttributeError: 'NoneType' object has no attribute 'set'
----->CLEANUP EVENTS Thread-2
Exception ignored in: <bound method ZSocket.__del__ of <pyre.zsocket.ZSocket object at 0x7f3978b0e228>>
Traceback (most recent call last):
  File "/usr/local/lib/python3.4/dist-packages/zmq/green/core.py", line 78, in __del__
    self.close()
  File "/usr/local/lib/python3.4/dist-packages/zmq/green/core.py", line 82, in close
    self.__cleanup_events()
  File "/usr/local/lib/python3.4/dist-packages/zmq/green/core.py", line 91, in __cleanup_events
    self.__writable.set()
AttributeError: 'NoneType' object has no attribute 'set'
----->CLEANUP EVENTS Thread-2
Exception ignored in: <bound method ZSocket.__del__ of <pyre.zsocket.ZSocket object at 0x7f3978ba5768>>
Traceback (most recent call last):
  File "/usr/local/lib/python3.4/dist-packages/zmq/green/core.py", line 78, in __del__
    self.close()
  File "/usr/local/lib/python3.4/dist-packages/zmq/green/core.py", line 82, in close
    self.__cleanup_events()
  File "/usr/local/lib/python3.4/dist-packages/zmq/green/core.py", line 91, in __cleanup_events
    self.__writable.set()
AttributeError: 'NoneType' object has no attribute 'set'
----->CLEANUP EVENTS Thread-2
Exception ignored in: <bound method ZSocket.__del__ of <pyre.zsocket.ZSocket object at 0x7f3978b0e1c8>>
Traceback (most recent call last):
  File "/usr/local/lib/python3.4/dist-packages/zmq/green/core.py", line 78, in __del__
    self.close()
  File "/usr/local/lib/python3.4/dist-packages/zmq/green/core.py", line 82, in close
    self.__cleanup_events()
  File "/usr/local/lib/python3.4/dist-packages/zmq/green/core.py", line 91, in __cleanup_events
    self.__writable.set()
AttributeError: 'NoneType' object has no attribute 'set'
^C2017-09-11 12:09:10,089 root.<module> +120: ERROR    [19136] Exception:
Traceback (most recent call last):
  File "test.py", line 118, in <module>
    time.sleep(1.0)
KeyboardInterrupt
END

Like you can see I added some logs to understand what's happening.
I replaced all "import zmq" I found in Pyre sources like explained here but it seems Pyre does not call correctly the green socket version and I don't understand why.

Second weird point is even if the thread-3 appears to call green socket, when it tries to use self.__writable, the variable is None...

Maybe some expert eyes can help me fixing it ๐Ÿ˜ƒ

I attached my test source but make sure to replace all "import zmq" by "import zmq.green as zmq" in all pyre source files to reproduce the issue.
test.txt

It seems checking if self.__writeable or self.__readable is not None (and call __setup_events) before using it in zmq/green/core.py fixed the problem but there is surely a better fix directly in Pyre module.

Thank you for your help

"Socket has no such option: SET" with python 3.2

While testing pyZOCP on pyhton 3.2, I got this error.

python3 simple_node.py
Traceback (most recent call last):
  File "socket.pyx", line 412, in zmq.core.socket.Socket.__getattr__ (zmq/core/socket.c:4018)
AttributeError: 'module' object has no attribute 'SET'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "simple_node.py", line 6, in <module>
    z = ZOCP()
  File "/home/cubie/z25/pyZOCP/src/zocp.py", line 79, in __init__
    super().__init__(*args, **kwargs)
  File "/home/cubie/z25/pyre/pyre.py", line 24, in __init__
    self._pipe = zhelper.zthread_fork(self._ctx, PyreNode)
  File "/home/cubie/z25/pyre/zhelper.py", line 13, in zthread_fork
    a.set(zmq.LINGER, 0)
  File "socket.pyx", line 414, in zmq.core.socket.Socket.__getattr__ (zmq/core/socket.c:4091)
AttributeError: Socket has no such option: SET

This happens with pyzmq version 2.2.0 (which I installed from the debian package)

exception on leave

$ python3
Python 3.4.2 (default, Oct  8 2014, 13:08:17) 
[GCC 4.9.1] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyre
>>> node = pyre.Pyre()
>>> node.join("TEST")
>>> node.shout("TEST", "TEST".encode('utf-8'))
>>> Group TEST not found.
node.shoujoin("TEST1")
>>> node.leave("TEST1")
>>> Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.4/threading.py", line 868, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/src/zmq/pyre/pyre.py", line 124, in __init__
    self.run()
  File "/usr/src/zmq/pyre/pyre.py", line 383, in run
    self.recv_api()
  File "/usr/src/zmq/pyre/pyre.py", line 255, in recv_api
    peer.send(msg)
AttributeError: 'UUID' object has no attribute 'send'

Modifiable Reap interval?

Can the reap_interval be exposed through the API so that one can change it according to the network conditions?
If its a fairly stable network, is pinging each peer every second required?

Is Pyre working on ARM architecture?

I'm trying to run Pyre based application on ARM (Jetson TX1) and getting the following error. The same code runs on x64 processor fine. Does anybody have idea what is causing this?

Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "/home/ubuntu/.virtualenvs/sidas/lib/python3.5/site-packages/pyre/zactor.py", line 57, in run
    self.shim_handler(*self.shim_args, **self.shim_kwargs)
  File "/home/ubuntu/.virtualenvs/sidas/lib/python3.5/site-packages/pyre/zbeacon.py", line 64, in __init__
    self.run()
  File "/home/ubuntu/.virtualenvs/sidas/lib/python3.5/site-packages/pyre/zbeacon.py", line 287, in run
    self.handle_pipe()
  File "/home/ubuntu/.virtualenvs/sidas/lib/python3.5/site-packages/pyre/zbeacon.py", line 216, in handle_pipe
    self.configure(port)
  File "/home/ubuntu/.virtualenvs/sidas/lib/python3.5/site-packages/pyre/zbeacon.py", line 199, in configure
    self.prepare_udp()
  File "/home/ubuntu/.virtualenvs/sidas/lib/python3.5/site-packages/pyre/zbeacon.py", line 71, in prepare_udp
    self._prepare_socket()
  File "/home/ubuntu/.virtualenvs/sidas/lib/python3.5/site-packages/pyre/zbeacon.py", line 135, in _prepare_socket
    netinf = zhelper.get_ifaddrs()
  File "/home/ubuntu/.virtualenvs/sidas/lib/python3.5/site-packages/pyre/zhelper.py", line 291, in get_ifaddrs
    total += si.sll_addr[i]
IndexError: invalid index

struct.error: argument out of range

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.4/threading.py", line 868, in run
    self._target(*self._args, **self._kwargs)
  File "/home/arnaud/src/pyre/pyre/zactor.py", line 59, in run
    self.shim_pipe.signal()
  File "/home/arnaud/src/pyre/pyre/zsocket.py", line 51, in signal
    self.send(struct.pack("L", signal_value))
struct.error: argument out of range

Happens on x86 machine. Might need Q instead of L
No problems on 64bit though.

Networking error while running under Docker

I'm not sure what exactly the problem is, or if it is even pyre's fault, but I'm getting the following exception when I run a beacon inside a docker container. I'm guessing it has something to do with udp broadcast support.

Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/site-packages/pyre/zactor.py", line 57, in run
    self.shim_handler(*self.shim_args, **self.shim_kwargs)
  File "/usr/lib/python2.7/site-packages/pyre/zbeacon.py", line 64, in __init__
    self.run()
  File "/usr/lib/python2.7/site-packages/pyre/zbeacon.py", line 283, in run
    self.handle_pipe()
  File "/usr/lib/python2.7/site-packages/pyre/zbeacon.py", line 212, in handle_pipe
    self.configure(port)
  File "/usr/lib/python2.7/site-packages/pyre/zbeacon.py", line 195, in configure
    self.prepare_udp()
  File "/usr/lib/python2.7/site-packages/pyre/zbeacon.py", line 71, in prepare_udp
    self._prepare_socket()
  File "/usr/lib/python2.7/site-packages/pyre/zbeacon.py", line 135, in _prepare_socket
    netinf = zhelper.get_ifaddrs()
  File "/usr/lib/python2.7/site-packages/pyre/zhelper.py", line 268, in get_ifaddrs
    si = sockaddr_in.from_address(ifa.ifa_ifu.ifu_broadaddr)
TypeError: integer expected

Import error from zbeacon.py when importing pyre

Hello,

When installing pyre with either "pip install https://github.com/zeromq/pyre/archive/master.zip" as per README.md or by git cloning the repo and doing "pip install -e ." I got import errors:

The first one was:

>>> import pyre
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "pyre/__init__.py", line 3, in <module>
    from .pyre import Pyre
  File "pyre/pyre.py", line 10, in <module>
    from . import zbeacon
  File "pyre/zbeacon.py", line 32, in <module>
    from pyre.zactor import ZActor
ImportError: No module named zactor

I fixed it by doing "sed -i "s/pyre.zactor/.zactor/" pyre/zbeacon.py".

Then I got that one:

>>> import pyre
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "pyre/__init__.py", line 3, in <module>
    from .pyre import Pyre
  File "pyre/pyre.py", line 10, in <module>
    from . import zbeacon
  File "pyre/zbeacon.py", line 33, in <module>
    from pyre import zhelper
ImportError: cannot import name zhelper

I fixed it by doing "sed -i "s/from pyre import zhelper/from . import zhelper/" pyre/zbeacon.py".

After that import pyre did not raised any further import error.

Cheers,
Axel

No headers in ENTER msg

I noticed this is a TODO with question marks in the pyre_node.py. Is there anybody working on it?

Since I need it for a project, I made a fix using JSON to serialise the header dict. I am happy to send a pull request if you are interested.

support for multicast

Hi!

Any plans to support multicast? The environment I'm working in requires discovery across subnets.

Thanks!

Error handling for Network Unreachable

If the network goes down for some reason, zbeacon produces the following error message:

Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/site-packages/pyre/zbeacon.py", line 204, in __init__
    self.run()
  File "/usr/lib64/python2.7/site-packages/pyre/zbeacon.py", line 398, in run
    self.send()
  File "/usr/lib64/python2.7/site-packages/pyre/zbeacon.py", line 338, in send
    self._udp_sock.sendto(self.transmit, (str(self.announce_address), self._port))
error: [Errno 101] Network is unreachable

What is the proper way to provide error handling for this situation? I'd like to implement something like sleep + retry, but I could imagine others wanting to provide a different handler.

Exception when network connection is gone

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python3.3/threading.py", line 637, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.3/threading.py", line 594, in run
    self._target(*self._args, **self._kwargs)
  File "/home/arnaud/Documents/sphaero/zmq-test/pyzyre/zbeacon.py", line 171, in __init__
    self.run()
  File "/home/arnaud/Documents/sphaero/zmq-test/pyzyre/zbeacon.py", line 262, in run
    self.send()
  File "/home/arnaud/Documents/sphaero/zmq-test/pyzyre/zbeacon.py", line 213, in send
    self._udp_sock.sendto(self.transmit, (self._dstAddr, self._port))
OSError: [Errno 101] Network is unreachable]

Invalid endpoint

Hi

When application implementing pyre is started while there is no network connection, the beacon values are set to loopback address 127.0.0.1 with no mac address.
When ethernet cable is plugged, node connects properly to pyre network (I saw ENTER action on other pyre nodes) but the address of freshly connected node returned by peer_address() function is still 127.0.0.1.
It appears hostname is sent only once at node startup but not refreshed later.
Can you tell me what is the proper way to implement hostname update (and consequently endpoint value) ?

Thank you very much ๐Ÿ˜„

Calling Pyre.stop() while Pyre.recv() is blocking?

Maybe my design choice was poor, but, I have a thread that calls the recv() method and does stuff. When the program is done, I want to stop the nodes. But if I call Pyre.stop() from the main thread while the other thread is blocking on the recv() call, python refuses to shutdown the program.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.