Giter Site home page Giter Site logo

robocluster's People

Contributors

chameleon07 avatar climathosphere avatar jpas avatar liambindle avatar ottopasuuna avatar

Stargazers

 avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

Forkers

ottopasuuna

robocluster's Issues

Connection reset exception thrown when peer disconnects

Everything continues to work, but if a peer is shut down, the one that keeps running prints this exception:

Traceback (most recent call last):
  File "/home/carl/Programming/usst/robocluster/robocluster/looper.py", line 86, in _daemon_wrapper                                    
    await coro(*args, **kwargs)
  File "/home/carl/Programming/usst/robocluster/robocluster/member.py", line 238, in _recv_loop                                        
    size = await self._socket.recv(4)
  File "/home/carl/Programming/usst/robocluster/robocluster/net.py", line 80, in wrapper                                               
    result = func(*args, **kwargs)
ConnectionResetError: [Errno 104] Connection reset by peer

AttributeError: 'NoneType' object has no attribute 'recv'

'NoneType' object has no attribute 'recv'
Traceback (most recent call last):
  File "/home/pi/robocluster/robocluster/member.py", line 228, in _recv                                                                
    data = await self._socket.recv(size)
AttributeError: 'NoneType' object has no attribute 'recv'
Got remote stop DriveControl
'NoneType' object has no attribute 'recv'
Traceback (most recent call last):
  File "/home/pi/robocluster/robocluster/member.py", line 228, in _recv                                                                
    data = await self._socket.recv(size)
AttributeError: 'NoneType' object has no attribute 'recv'

Windows 10. "The requested address is not valid in its context"

Tried running device.py from the robocluster/examples repo. Installed setup.py and have anaconda3, to no avail. Not too sure what the problem is but it's most certainly because of some compatibility issue between Windows 10 & Linux(the OS for which this example was originally coded).

C:\Users\Tyrel\robocluster\examples>python device.py
Traceback (most recent call last):
  File "device.py", line 10, in <module>
    device_a = Device('device-a', group)
  File "C:\Users\Tyrel\Anaconda3\lib\site-packages\robocluster-0.1-py3.6.egg\robocluster\device.py", line 58, in __init__
    self._receiver.bind()
  File "C:\Users\Tyrel\Anaconda3\lib\site-packages\robocluster-0.1-py3.6.egg\robocluster\net.py", line 64, in bind
    self._socket.bind(self._address)
OSError: [WinError 10049] The requested address is not valid in its context

Connection reset by peer when a process shuts down

When I shut down the software on the rover, the JoystickProcess that runs on my machine throws this error:

Traceback (most recent call last):
  File "/home/carl/Programming/usst/robocluster/robocluster/looper.py", line 86, in _daemon_wrapper                                    
    await coro(*args, **kwargs)
  File "/home/carl/Programming/usst/robocluster/robocluster/member.py", line 238, in _recv_loop                                        
    size = await self._socket.recv(4)
  File "/home/carl/miniconda3/envs/robocluster2/lib/python3.5/asyncio/futures.py", line 381, in __iter__                               
    yield self  # This tells Task to wait for completion.
  File "/home/carl/miniconda3/envs/robocluster2/lib/python3.5/asyncio/tasks.py", line 310, in _wakeup                                  
    future.result()
  File "/home/carl/miniconda3/envs/robocluster2/lib/python3.5/asyncio/futures.py", line 294, in result                                 
    raise self._exception
  File "/home/carl/Programming/usst/robocluster/robocluster/net.py", line 80, in wrapper                                               
    result = func(*args, **kwargs)
ConnectionResetError: [Errno 104] Connection reset by peer

Autodetection of serial device identifiers

Currently SerialDevices are opened with a path to the device such as /dev/ttyACM0 or COM1. This is undescriptive, so we need a way to poll serial devices to ask what they are (Accelerometer, motor controller for a specific wheel, etc).

UnkownPeer exception thrown before Devices have a chance to talk to eachother

This is more of a convenience request.
If we have two processes:
Replier:

from robocluster import Device
dev = Device('replier', 'test')
@dev.on_request('topic')
def give_thing():
  return 'Hi'
dev.start()
dev.wait()

Requester:

from robocluster improt Device
dev = Device('requester', 'test')
@dev.every('1s')
async def get_thing():
   data = await dev.request('replier', 'topic')
   print(data)
dev.start()
dev.wait()

Running this as is results in an UnknownPeer getting raised.
Catching the exception allows things to run properly.

It would be slick if it could somehow wait for a round of heartbeats (or however this works now) before throwing the exception, to give devices on the network a chance to anounce themselves.

Weird error when restarting a process

I'm not sure what exactly causes this, but I think it happens when the robocluster-manager tries to restart a stopped process, and the other processes get confused becuase it creates a new Device with the same name.

Traceback (most recent call last):
  File "/home/pi/robocluster/robocluster/looper.py", line 86, in _daemon_wrapper
    await coro(*args, **kwargs)
  File "/home/pi/robocluster/robocluster/device.py", line 208, in _wrapper
    await coro()
  File "./demo/random_stream.py", line 10, in generate
    await device.publish('random', random.random())
  File "/home/pi/robocluster/robocluster/device.py", line 107, in publish
    await self._member.publish(topic, data)
  File "/home/pi/robocluster/robocluster/member.py", line 67, in publish
    await peer.publish(endpoint, data)
  File "/home/pi/robocluster/robocluster/member.py", line 177, in publish
    await self._send(packet)
  File "/home/pi/robocluster/robocluster/member.py", line 213, in _send
    await self._socket.send(size + packet)
  File "/home/pi/robocluster/robocluster/net.py", line 76, in wrapper
    remover(fd)
  File "/usr/lib/python3.5/asyncio/selector_events.py", line 352, in remove_writer
    return self._remove_writer(fd)
  File "/usr/lib/python3.5/asyncio/selector_events.py", line 316, in _remove_writer
    key = self._selector.get_key(fd)
  File "/usr/lib/python3.5/selectors.py", line 189, in get_key
    return mapping[fileobj]
  File "/usr/lib/python3.5/selectors.py", line 70, in __getitem__
    fd = self._selector._fileobj_lookup(fileobj)
  File "/usr/lib/python3.5/selectors.py", line 224, in _fileobj_lookup
    return _fileobj_to_fd(fileobj)
  File "/usr/lib/python3.5/selectors.py", line 41, in _fileobj_to_fd
    raise ValueError("Invalid file descriptor: {}".format(fd))
ValueError: Invalid file descriptor: -1

Add method of storing state in devices.

Many processes need to keep track of some state (Arm position, driving mode, etc), so there needs to be a way to store and retrieve information in a device to be accessable accross callbacks.
Current suggestion by @jpas is to use the device[key] syntax (use devices like dictionaries). We could also add locks to prevent race conditions accross threads, although I believe asyncio only runs one coroutine at a time.

Device refactoring and definition

One of the major goals of developing the new framework was to make supporting multiple transport mediums seamless, so we can send messages over ethernet/wireless, USB/UART serial, SPI, I2C, CAN bus, etc using the same API. The current implementation of SerialDevice seems a bit tacked on with regards to its integration with Device, and feels inconsistant with how you interact IP multicast communication. The goal of this issue is to:

  • Define exactly what a Device is.
  • Propose a restructuring of the Device internals to better accomodate new transport mediums, protocols, and callbacks.
  • Do this all in a way that is applicable to other implementations such as C/C++, Arduino.

The following is my suggestion, which may or may not also represent the current implementation.

Device

A "Device" is an entity that allows you to connect to the robocluster "network", and performs basic task management and callbacks triggered by messages incomming on a "Port". The Device would contain a collection of "Ports", a "Router", a callback look-up table, a collection of "Tasks", and local storage for synchronizing data amongst "Tasks". This aligns with the current implementation of a "Device", with the addition of a "Router" and local storage (which is already being developed). My thinking is that a "Device" can be thought of as a "virtual microcontroller", or an RTOS running on a microcontroller, in that it as ports for i/o and tasks can be registerd like interupt subroutines or scheduled to be run periodically. A "Device" would provide an easy interface to manage tasks, and send and recieve data on the network, while all the grunt work would be handled by other classes instanciated within the device.
The advantage of having an analogy like this is it makes it easy to understand conceptually of how robocluster works, making it easier to document and organize development, and should make it easy to port over to a real microcontroller.

Ports and Routers

A "Port" is like a socket in network programming, but also applies to interfaces such as serial, I2C, etc. We also don't have to call them "Ports" if that's confusing with the ports in TCP/UDP. When you want to publish a message to the network, data is writen to the UDP or Multicast port. If you send a message directly to another Device, it is send over the TCP port. If you send a message to a microcontroller connected with serial, it is sent over the serial port. You could also explicitly state which ports you want a message to be sent over, or all of them if you wish. A "Router" is just the thing that handles the distribution of messages to the right place. Though now that I think of it, the Router may not be necessary as its own entity at all. When message is read from a "Port", the event goes through the callback look up table to trigger the correct task. A task can be registered to trigger on an event from any "Port", or a specific set of "Ports", and multiple tasks can be triggered from a single event.
In the case of TCP and serial ports, a Device has one port for each serial device or other Device it wants to talk to, and are referenced by the name of the downstream device. So sending a message to deviceB could look like: deviceA.send(msg, port='deviceA')

Tasks

These are just your standard coroutines/functions. An additional feature that would be useful, is the ability to add or remove tasks easily while the Device is running. Tasks can keep track of persistant state within the Device via a storage mechanism, probably just a AttributeDictionary where you can put what ever you need inside.

Here is an image for an overview of a Device

Message format

Messages look something like this:

msg = {
  'event': 'event_name',
  'data'  : <whatever data>,
  'ports' : [ list of ports or port to send over],
  'sender' : 'device that sent the data'
}

We can just add more things to the message as we need.
When you call publish or send, the Device will automatically add the sender information and could assist in the ports part.
When a port sends the message, it could remove the ports field, as the receiving end doesn't care. In which case the ports field may be completely unneccessary if the Router is not its own entity inside the Device.

Device methods

List of some of the methods Device will implement:

  • publish(event, data, ports=ALL) Broadcast an event
  • send(device, data) Sends data directly to a device, connecting via TCP if not done already
  • on(event, callback, ports=ALL) Register a callback for an event
  • read(port=Serial) -> data Reads data from a port. Reallly only usefull for Serial device that can't use JSON for what ever reason.
  • request(device, event) -> data Requests data from another Device, identified with event.
  • reply(device, data) Sends the data to the device that requested it.

Probably a bunch more asyncio wrappers like sleep and what not, but these should be the main communication ones.

I might be forgetting something, but this is probably enough for discussion.

This is mostly a refactoring of what we already have, just need to solidify the "Ports" concept and figure out what we want to do for the "Router".

PUB/SUB Interface

To allow for easier subscription to 0MQ sockets.

The publish/subscribe model should be used for metrics and sensor data.

I propose using a Uniform Resource Identifier (URI) scheme for our published messages which will allow for easy identification of where information is coming from. In addition to this, all messages should be able to be serialized to JSON.

I will be using the following hypothetical publishers as an example:

  • A temperature sensor

    • Bound to the socket tcp://10.0.0.1:5000
    • Publishes the current temperature in Celsius and Fahrenheit.
      • tcp://10.0.0.1:5000/temperature/celsius
      • tcp://10.0.0.1:5000/temperature/fahrenheit
  • A GPS receiver

    • Bound to the socket tcp://10.0.0.2:6000
    • Publishes the current GPS position
      • tcp://10.0.0.2:6000/gps/location

I propose an API that could conform to the following code. The code in this issue is pseudo-python.

# temperature publisher
pub = Publisher('tcp://10.0.0.1:5000/temperature')
while True:
    c = read_temperature()
    pub.publish('celcius', {'unit': 'celcius', 'value': c})
    f = c * (9/5) + 32
    pub.publish('fahrenheit', {'unit': 'fahrenheit', 'value': f})
# gps publisher
pub = Publisher('tcp://10.0.0.2:6000/gps')
while True:
    lat, lon = get_gps()
    pub.publish('location', {'latitude': lat, 'longitude': lon})
# subscriber
sub = Subscriber()

# callback to print function
sub.subscribe('tcp://10.0.0.1:5000/temperature', print)

last_gps = {'latitude': 0, 'longitude': 0}
def on_gps(from, message):
    distance = calculate_distance(message, last_gps)
    print(distance)

# callback to on_gps
sub.subscribe('tcp://10.0.0.2:6000/gps/location', on_gps)

# sleep indefinitely
while True:
    sleep(1)

The output of the subscriber will look something like this:

tcp://10.0.0.1:5000/temperature/celcius {'unit': 'celcius', 'value': 20}
tcp://10.0.0.1:5000/temperature/fahrenheit {'unit': 'fahrenheit', 'value': 68}
100
0
tcp://10.0.0.1:5000/temperature/celcius {'unit': 'celcius', 'value': 20}
tcp://10.0.0.1:5000/temperature/fahrenheit {'unit': 'fahrenheit', 'value': 68}
0
2

Packaging format for pip

If this library is to be easy to integrate into other projects, it should be structured properly as a python package so that it can be installed via tools like pip.

Request/Reply interface

Similar to #3, here is a proposal for the REQ/REP communication.
Resources will be accessable through Uniform Resource Identifiers (URI), and requests will be synchronous by default in that a request will block until a reply is recieved. Replies should have the option to be asynchronous via callbacks, or synchronous with

Here is an example in pseudo python to show how this looks:
There are two processes, a drive process and a navigation process. The navigation process collects statistics about the speed of the wheels and uses that to determine the orientation and seed of the rover.

Drive process:

def on_reqWheelSpeed(req):
    # req can contain additional information 
    return wheel_speed #This is assumed to be global here, but could/should be in a class of some sort.
    # when this callback returns, the return value is what is sent as the reply

register_reply("wheelspeed", on_reqWheelSpeed)
#Now whenever we get a request for "wheelspeed", on_reqWheelSpeed is called.
#This will likely be achieved via a background thread.

while True:
    wheel_speed = random.random()
    # do other things
    ...

Or alternatively:

while True:
    req = get_request()
    # req could be an object with meta information about the request
    if req.key == 'wheel_speed':
         req.reply(random.random())

Navigation process:

while True:
    speed = request('DriveProcess/wheelspeed')
    display(speed) # do something with the result

This is just a starting point, there are some details that I'm just making up on the spot. Feel free to suggest better ways of dealing with this syntactically.

Add serial support to Device class

This would allow us to put all serial communication inside the event loop, which would allow us to have non-blocking serial read/write.

Port Roveberrypy processes to robocluster

As a sanity test of how easy robocluster is to use, someone should port some of the rover processes from Roveberrypy to the new robocluster Pub/Sub framework. Most interesting is something that reads data from hardware devices, and something that maintains some sort of state, like navigation code.

  • GPSProcess
  • DriveProcess
  • WebServer Webserver is getting redone
  • Navigation Process

Predictive 0 Round Trip Time Replies

I had an optimization idea regarding the request/reply model (Which we should also figure out how to standardize/wrap):
If a module requests a certain piece of data at a regular interval, the module doing the response could make note of that interval and schedule that data to be sent right before the requester sends the next request. For example;
A requests x from B. B logs the time the request was made, and sends the reply. After a time interval t, A requests x from B. B now knows (or at least thinks that it knows) how often A wants x, sends the reply, but also schedules a timer to send reply for x to A after time (t-2*k) has passed, where k is a tolerance constant common to both A and B. The tolerance value would be needed because the modules could be on different computers, each with different clock skews, so time measurements will always be off by some factor (idealy k), and I multiply by 2 because A could send the request at (t-k) while B sends at (t+k) in which case A would miss the predictive reply.
Once this system is working, it has the effect of A making a call for x and imediately getting a response, eliminating the performance impact that round trip time causes in synchronous request/response communication.
When B sends its reply, it also sends a timestamp with the time that the reply was sent. When A requests x, if a reply is already in its receive queue, A checks the timestamp to make sure it is current enough. If it is, A sends a dummy request that functions as an acknowledge to B to say "Yes, I got your predictive reply, thanks", after which B schedules another predictive reply. If the reply in A's receive queue is too old, A discards it and sends a regular request.
If B schedules a reply but A sends a request before the predictive reply is sent, B cancels the timer and sends the reply imediately, and also updates the predicted interval.
This whole system could be disabled at the reuest of A using a predictive=false parameter to the request call.
Again, the whole point of all this is to address the issue of latency in synchronous request/reply communication. The alternative is to fire off non blocking requests and receive the replies asynchronously, but this is obviously no longer synchronous communication. Some problems are just better suited to synchronous comunication.
This is probably more work than it's worth, so its not important but would be pretty cool.

Stress test Robocluster

How many devices can it support?
How frequently can we send messages?
How long can it run under high load without running into issues?

ProcessManager doesn't work on Windows.

Here is the Appveyor output:

________________________________ test_RunOnce _________________________________
    def test_RunOnce():
        proc = RunOnce('echo-test', 'echo "Hello world"')
        async def run_proc():
            await proc.run()
        loop = asyncio.get_event_loop()
>       loop.run_until_complete(run_proc())
tests\test_manager.py:10: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Python35-x64\lib\asyncio\base_events.py:466: in run_until_complete
    return future.result()
C:\Python35-x64\lib\asyncio\futures.py:293: in result
    raise self._exception
C:\Python35-x64\lib\asyncio\tasks.py:239: in _step
    result = coro.send(None)
tests\test_manager.py:8: in run_proc
    await proc.run()
robocluster\manager\ProcessManager.py:60: in run
    self.process = await self.restart()
robocluster\manager\ProcessManager.py:44: in restart
    self.process = await asyncio.create_subprocess_shell(self.cmd)
C:\Python35-x64\lib\asyncio\subprocess.py:197: in create_subprocess_shell
    stderr=stderr, **kwds)
C:\Python35-x64\lib\asyncio\base_events.py:1159: in subprocess_shell
    protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
C:\Python35-x64\lib\asyncio\coroutines.py:210: in coro
    res = func(*args, **kw)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_WindowsSelectorEventLoop running=False closed=False debug=False>
protocol = <SubprocessStreamProtocol>, args = 'echo "Hello world"', shell = True
stdin = None, stdout = None, stderr = None, bufsize = 0, extra = None
kwargs = {}
    @coroutine
    def _make_subprocess_transport(self, protocol, args, shell,
                                   stdin, stdout, stderr, bufsize,
                                   extra=None, **kwargs):
        """Create subprocess transport."""
>       raise NotImplementedError
E       NotImplementedError
C:\Python35-x64\lib\asyncio\base_events.py:340: NotImplementedError

Allow hotpluggable USB devices

If a USB device becomes disconnected, we can't have the whole program crash, and if it gets plugged back in, the process should resume normal operation.

IPv6 bug on Windows 10

  File "USBmanager.py", line 9, in <module>
    ports[port.device] = SerialDriver(port.device,'rover')
  File "c:\users\tyrel\robocluster\robocluster\serialdriver.py", line 137, in __init__
    super().__init__(name, group, loop=loop)
  File "c:\users\tyrel\robocluster\robocluster\device.py", line 52, in __init__
    self._router = Router(self.name, group, loop=self._loop)
  File "c:\users\tyrel\robocluster\robocluster\router.py", line 279, in __init__
    self._caster = Multicaster(group, port, loop=loop)
  File "c:\users\tyrel\robocluster\robocluster\router.py", line 114, in __init__
    sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
AttributeError: module 'socket' has no attribute 'IPPROTO_IPV6```

RuntimeError: dictionary changed size during iteration

Traceback (most recent call last):
  File "/home/carl/Programming/usst/robocluster/robocluster/looper.py", line 86, in _daemon_wrapper
    await coro(*args, **kwargs)
  File "/home/carl/Programming/usst/robocluster/robocluster/device.py", line 208, in _wrapper
    await coro()
  File "Simulator.py", line 85, in publish_state
    await simDevice.publish("GPSPosition", pos_list)
  File "/home/carl/Programming/usst/robocluster/robocluster/device.py", line 107, in publish
    await self._member.publish(topic, data)
  File "/home/carl/Programming/usst/robocluster/robocluster/member.py", line 70, in publish
    for peer in self._peers.values():
RuntimeError: dictionary changed size during iteration

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.