uofsspaceteam / robocluster Goto Github PK
View Code? Open in Web Editor NEWCommunication wrapper library for serialization and transport of data through a distributed robotics system.
License: Educational Community License v2.0
Communication wrapper library for serialization and transport of data through a distributed robotics system.
License: Educational Community License v2.0
Everything continues to work, but if a peer is shut down, the one that keeps running prints this exception:
Traceback (most recent call last):
File "/home/carl/Programming/usst/robocluster/robocluster/looper.py", line 86, in _daemon_wrapper
await coro(*args, **kwargs)
File "/home/carl/Programming/usst/robocluster/robocluster/member.py", line 238, in _recv_loop
size = await self._socket.recv(4)
File "/home/carl/Programming/usst/robocluster/robocluster/net.py", line 80, in wrapper
result = func(*args, **kwargs)
ConnectionResetError: [Errno 104] Connection reset by peer
'NoneType' object has no attribute 'recv'
Traceback (most recent call last):
File "/home/pi/robocluster/robocluster/member.py", line 228, in _recv
data = await self._socket.recv(size)
AttributeError: 'NoneType' object has no attribute 'recv'
Got remote stop DriveControl
'NoneType' object has no attribute 'recv'
Traceback (most recent call last):
File "/home/pi/robocluster/robocluster/member.py", line 228, in _recv
data = await self._socket.recv(size)
AttributeError: 'NoneType' object has no attribute 'recv'
Tried running device.py from the robocluster/examples repo. Installed setup.py and have anaconda3, to no avail. Not too sure what the problem is but it's most certainly because of some compatibility issue between Windows 10 & Linux(the OS for which this example was originally coded).
C:\Users\Tyrel\robocluster\examples>python device.py
Traceback (most recent call last):
File "device.py", line 10, in <module>
device_a = Device('device-a', group)
File "C:\Users\Tyrel\Anaconda3\lib\site-packages\robocluster-0.1-py3.6.egg\robocluster\device.py", line 58, in __init__
self._receiver.bind()
File "C:\Users\Tyrel\Anaconda3\lib\site-packages\robocluster-0.1-py3.6.egg\robocluster\net.py", line 64, in bind
self._socket.bind(self._address)
OSError: [WinError 10049] The requested address is not valid in its context
When I shut down the software on the rover, the JoystickProcess that runs on my machine throws this error:
Traceback (most recent call last):
File "/home/carl/Programming/usst/robocluster/robocluster/looper.py", line 86, in _daemon_wrapper
await coro(*args, **kwargs)
File "/home/carl/Programming/usst/robocluster/robocluster/member.py", line 238, in _recv_loop
size = await self._socket.recv(4)
File "/home/carl/miniconda3/envs/robocluster2/lib/python3.5/asyncio/futures.py", line 381, in __iter__
yield self # This tells Task to wait for completion.
File "/home/carl/miniconda3/envs/robocluster2/lib/python3.5/asyncio/tasks.py", line 310, in _wakeup
future.result()
File "/home/carl/miniconda3/envs/robocluster2/lib/python3.5/asyncio/futures.py", line 294, in result
raise self._exception
File "/home/carl/Programming/usst/robocluster/robocluster/net.py", line 80, in wrapper
result = func(*args, **kwargs)
ConnectionResetError: [Errno 104] Connection reset by peer
Currently UDP is used to publish data. Is there a way to have more reliable transmission for important messages? And how would a message be designated as "important"?
Currently SerialDevices are opened with a path to the device such as /dev/ttyACM0
or COM1
. This is undescriptive, so we need a way to poll serial devices to ask what they are (Accelerometer, motor controller for a specific wheel, etc).
This is more of a convenience request.
If we have two processes:
Replier:
from robocluster import Device
dev = Device('replier', 'test')
@dev.on_request('topic')
def give_thing():
return 'Hi'
dev.start()
dev.wait()
Requester:
from robocluster improt Device
dev = Device('requester', 'test')
@dev.every('1s')
async def get_thing():
data = await dev.request('replier', 'topic')
print(data)
dev.start()
dev.wait()
Running this as is results in an UnknownPeer getting raised.
Catching the exception allows things to run properly.
It would be slick if it could somehow wait for a round of heartbeats (or however this works now) before throwing the exception, to give devices on the network a chance to anounce themselves.
We no longer use loops, see context implementation relevant lines
I'm not sure what exactly causes this, but I think it happens when the robocluster-manager tries to restart a stopped process, and the other processes get confused becuase it creates a new Device with the same name.
Traceback (most recent call last):
File "/home/pi/robocluster/robocluster/looper.py", line 86, in _daemon_wrapper
await coro(*args, **kwargs)
File "/home/pi/robocluster/robocluster/device.py", line 208, in _wrapper
await coro()
File "./demo/random_stream.py", line 10, in generate
await device.publish('random', random.random())
File "/home/pi/robocluster/robocluster/device.py", line 107, in publish
await self._member.publish(topic, data)
File "/home/pi/robocluster/robocluster/member.py", line 67, in publish
await peer.publish(endpoint, data)
File "/home/pi/robocluster/robocluster/member.py", line 177, in publish
await self._send(packet)
File "/home/pi/robocluster/robocluster/member.py", line 213, in _send
await self._socket.send(size + packet)
File "/home/pi/robocluster/robocluster/net.py", line 76, in wrapper
remover(fd)
File "/usr/lib/python3.5/asyncio/selector_events.py", line 352, in remove_writer
return self._remove_writer(fd)
File "/usr/lib/python3.5/asyncio/selector_events.py", line 316, in _remove_writer
key = self._selector.get_key(fd)
File "/usr/lib/python3.5/selectors.py", line 189, in get_key
return mapping[fileobj]
File "/usr/lib/python3.5/selectors.py", line 70, in __getitem__
fd = self._selector._fileobj_lookup(fileobj)
File "/usr/lib/python3.5/selectors.py", line 224, in _fileobj_lookup
return _fileobj_to_fd(fileobj)
File "/usr/lib/python3.5/selectors.py", line 41, in _fileobj_to_fd
raise ValueError("Invalid file descriptor: {}".format(fd))
ValueError: Invalid file descriptor: -1
Right now you have to close your terminal window to stop your program which is goofy.
Many processes need to keep track of some state (Arm position, driving mode, etc), so there needs to be a way to store and retrieve information in a device to be accessable accross callbacks.
Current suggestion by @jpas is to use the device[key]
syntax (use devices like dictionaries). We could also add locks to prevent race conditions accross threads, although I believe asyncio only runs one coroutine at a time.
One of the major goals of developing the new framework was to make supporting multiple transport mediums seamless, so we can send messages over ethernet/wireless, USB/UART serial, SPI, I2C, CAN bus, etc using the same API. The current implementation of SerialDevice seems a bit tacked on with regards to its integration with Device, and feels inconsistant with how you interact IP multicast communication. The goal of this issue is to:
The following is my suggestion, which may or may not also represent the current implementation.
A "Device" is an entity that allows you to connect to the robocluster "network", and performs basic task management and callbacks triggered by messages incomming on a "Port". The Device would contain a collection of "Ports", a "Router", a callback look-up table, a collection of "Tasks", and local storage for synchronizing data amongst "Tasks". This aligns with the current implementation of a "Device", with the addition of a "Router" and local storage (which is already being developed). My thinking is that a "Device" can be thought of as a "virtual microcontroller", or an RTOS running on a microcontroller, in that it as ports for i/o and tasks can be registerd like interupt subroutines or scheduled to be run periodically. A "Device" would provide an easy interface to manage tasks, and send and recieve data on the network, while all the grunt work would be handled by other classes instanciated within the device.
The advantage of having an analogy like this is it makes it easy to understand conceptually of how robocluster works, making it easier to document and organize development, and should make it easy to port over to a real microcontroller.
A "Port" is like a socket in network programming, but also applies to interfaces such as serial, I2C, etc. We also don't have to call them "Ports" if that's confusing with the ports in TCP/UDP. When you want to publish a message to the network, data is writen to the UDP or Multicast port. If you send a message directly to another Device, it is send over the TCP port. If you send a message to a microcontroller connected with serial, it is sent over the serial port. You could also explicitly state which ports you want a message to be sent over, or all of them if you wish. A "Router" is just the thing that handles the distribution of messages to the right place. Though now that I think of it, the Router may not be necessary as its own entity at all. When message is read from a "Port", the event goes through the callback look up table to trigger the correct task. A task can be registered to trigger on an event from any "Port", or a specific set of "Ports", and multiple tasks can be triggered from a single event.
In the case of TCP and serial ports, a Device has one port for each serial device or other Device it wants to talk to, and are referenced by the name of the downstream device. So sending a message to deviceB could look like: deviceA.send(msg, port='deviceA')
These are just your standard coroutines/functions. An additional feature that would be useful, is the ability to add or remove tasks easily while the Device is running. Tasks can keep track of persistant state within the Device via a storage mechanism, probably just a AttributeDictionary where you can put what ever you need inside.
Here is an image for an overview of a Device
Messages look something like this:
msg = {
'event': 'event_name',
'data' : <whatever data>,
'ports' : [ list of ports or port to send over],
'sender' : 'device that sent the data'
}
We can just add more things to the message as we need.
When you call publish
or send
, the Device will automatically add the sender information and could assist in the ports part.
When a port sends the message, it could remove the ports field, as the receiving end doesn't care. In which case the ports field may be completely unneccessary if the Router is not its own entity inside the Device.
List of some of the methods Device will implement:
publish(event, data, ports=ALL)
Broadcast an eventsend(device, data)
Sends data directly to a device, connecting via TCP if not done alreadyon(event, callback, ports=ALL)
Register a callback for an eventread(port=Serial) -> data
Reads data from a port. Reallly only usefull for Serial device that can't use JSON for what ever reason.request(device, event) -> data
Requests data from another Device, identified with event.reply(device, data)
Sends the data to the device that requested it.Probably a bunch more asyncio wrappers like sleep
and what not, but these should be the main communication ones.
I might be forgetting something, but this is probably enough for discussion.
This is mostly a refactoring of what we already have, just need to solidify the "Ports" concept and figure out what we want to do for the "Router".
To allow for easier subscription to 0MQ sockets.
The publish/subscribe model should be used for metrics and sensor data.
I propose using a Uniform Resource Identifier (URI) scheme for our published messages which will allow for easy identification of where information is coming from. In addition to this, all messages should be able to be serialized to JSON.
I will be using the following hypothetical publishers as an example:
A temperature sensor
tcp://10.0.0.1:5000
tcp://10.0.0.1:5000/temperature/celsius
tcp://10.0.0.1:5000/temperature/fahrenheit
A GPS receiver
tcp://10.0.0.2:6000
tcp://10.0.0.2:6000/gps/location
I propose an API that could conform to the following code. The code in this issue is pseudo-python.
# temperature publisher
pub = Publisher('tcp://10.0.0.1:5000/temperature')
while True:
c = read_temperature()
pub.publish('celcius', {'unit': 'celcius', 'value': c})
f = c * (9/5) + 32
pub.publish('fahrenheit', {'unit': 'fahrenheit', 'value': f})
# gps publisher
pub = Publisher('tcp://10.0.0.2:6000/gps')
while True:
lat, lon = get_gps()
pub.publish('location', {'latitude': lat, 'longitude': lon})
# subscriber
sub = Subscriber()
# callback to print function
sub.subscribe('tcp://10.0.0.1:5000/temperature', print)
last_gps = {'latitude': 0, 'longitude': 0}
def on_gps(from, message):
distance = calculate_distance(message, last_gps)
print(distance)
# callback to on_gps
sub.subscribe('tcp://10.0.0.2:6000/gps/location', on_gps)
# sleep indefinitely
while True:
sleep(1)
The output of the subscriber will look something like this:
tcp://10.0.0.1:5000/temperature/celcius {'unit': 'celcius', 'value': 20}
tcp://10.0.0.1:5000/temperature/fahrenheit {'unit': 'fahrenheit', 'value': 68}
100
0
tcp://10.0.0.1:5000/temperature/celcius {'unit': 'celcius', 'value': 20}
tcp://10.0.0.1:5000/temperature/fahrenheit {'unit': 'fahrenheit', 'value': 68}
0
2
Need more information from the affected people.
If this library is to be easy to integrate into other projects, it should be structured properly as a python package so that it can be installed via tools like pip.
Similar to #3, here is a proposal for the REQ/REP communication.
Resources will be accessable through Uniform Resource Identifiers (URI), and requests will be synchronous by default in that a request will block until a reply is recieved. Replies should have the option to be asynchronous via callbacks, or synchronous with
Here is an example in pseudo python to show how this looks:
There are two processes, a drive process and a navigation process. The navigation process collects statistics about the speed of the wheels and uses that to determine the orientation and seed of the rover.
Drive process:
def on_reqWheelSpeed(req):
# req can contain additional information
return wheel_speed #This is assumed to be global here, but could/should be in a class of some sort.
# when this callback returns, the return value is what is sent as the reply
register_reply("wheelspeed", on_reqWheelSpeed)
#Now whenever we get a request for "wheelspeed", on_reqWheelSpeed is called.
#This will likely be achieved via a background thread.
while True:
wheel_speed = random.random()
# do other things
...
Or alternatively:
while True:
req = get_request()
# req could be an object with meta information about the request
if req.key == 'wheel_speed':
req.reply(random.random())
Navigation process:
while True:
speed = request('DriveProcess/wheelspeed')
display(speed) # do something with the result
This is just a starting point, there are some details that I'm just making up on the spot. Feel free to suggest better ways of dealing with this syntactically.
This would allow us to put all serial communication inside the event loop, which would allow us to have non-blocking serial read/write.
Also get it on readthedocs.io
As a sanity test of how easy robocluster is to use, someone should port some of the rover processes from Roveberrypy to the new robocluster Pub/Sub framework. Most interesting is something that reads data from hardware devices, and something that maintains some sort of state, like navigation code.
Automated tests are good. We should have them in robocluster
I had an optimization idea regarding the request/reply model (Which we should also figure out how to standardize/wrap):
If a module requests a certain piece of data at a regular interval, the module doing the response could make note of that interval and schedule that data to be sent right before the requester sends the next request. For example;
A
requests x
from B
. B
logs the time the request was made, and sends the reply. After a time interval t
, A
requests x
from B
. B
now knows (or at least thinks that it knows) how often A
wants x
, sends the reply, but also schedules a timer to send reply for x
to A
after time (t-2*k)
has passed, where k
is a tolerance constant common to both A
and B
. The tolerance value would be needed because the modules could be on different computers, each with different clock skews, so time measurements will always be off by some factor (idealy k
), and I multiply by 2 because A
could send the request at (t-k)
while B
sends at (t+k)
in which case A
would miss the predictive reply.
Once this system is working, it has the effect of A
making a call for x
and imediately getting a response, eliminating the performance impact that round trip time causes in synchronous request/response communication.
When B
sends its reply, it also sends a timestamp with the time that the reply was sent. When A
requests x
, if a reply is already in its receive queue, A
checks the timestamp to make sure it is current enough. If it is, A
sends a dummy request that functions as an acknowledge to B
to say "Yes, I got your predictive reply, thanks", after which B
schedules another predictive reply. If the reply in A
's receive queue is too old, A
discards it and sends a regular request.
If B
schedules a reply but A
sends a request before the predictive reply is sent, B
cancels the timer and sends the reply imediately, and also updates the predicted interval.
This whole system could be disabled at the reuest of A
using a predictive=false
parameter to the request call.
Again, the whole point of all this is to address the issue of latency in synchronous request/reply communication. The alternative is to fire off non blocking requests and receive the replies asynchronously, but this is obviously no longer synchronous communication. Some problems are just better suited to synchronous comunication.
This is probably more work than it's worth, so its not important but would be pretty cool.
How many devices can it support?
How frequently can we send messages?
How long can it run under high load without running into issues?
Here is the Appveyor output:
________________________________ test_RunOnce _________________________________
def test_RunOnce():
proc = RunOnce('echo-test', 'echo "Hello world"')
async def run_proc():
await proc.run()
loop = asyncio.get_event_loop()
> loop.run_until_complete(run_proc())
tests\test_manager.py:10:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Python35-x64\lib\asyncio\base_events.py:466: in run_until_complete
return future.result()
C:\Python35-x64\lib\asyncio\futures.py:293: in result
raise self._exception
C:\Python35-x64\lib\asyncio\tasks.py:239: in _step
result = coro.send(None)
tests\test_manager.py:8: in run_proc
await proc.run()
robocluster\manager\ProcessManager.py:60: in run
self.process = await self.restart()
robocluster\manager\ProcessManager.py:44: in restart
self.process = await asyncio.create_subprocess_shell(self.cmd)
C:\Python35-x64\lib\asyncio\subprocess.py:197: in create_subprocess_shell
stderr=stderr, **kwds)
C:\Python35-x64\lib\asyncio\base_events.py:1159: in subprocess_shell
protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
C:\Python35-x64\lib\asyncio\coroutines.py:210: in coro
res = func(*args, **kw)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_WindowsSelectorEventLoop running=False closed=False debug=False>
protocol = <SubprocessStreamProtocol>, args = 'echo "Hello world"', shell = True
stdin = None, stdout = None, stderr = None, bufsize = 0, extra = None
kwargs = {}
@coroutine
def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
"""Create subprocess transport."""
> raise NotImplementedError
E NotImplementedError
C:\Python35-x64\lib\asyncio\base_events.py:340: NotImplementedError
If a USB device becomes disconnected, we can't have the whole program crash, and if it gets plugged back in, the process should resume normal operation.
Previous behavior of Device.task
made the decorated coroutine run once, however now it appears to run in a while true loop.
File "USBmanager.py", line 9, in <module>
ports[port.device] = SerialDriver(port.device,'rover')
File "c:\users\tyrel\robocluster\robocluster\serialdriver.py", line 137, in __init__
super().__init__(name, group, loop=loop)
File "c:\users\tyrel\robocluster\robocluster\device.py", line 52, in __init__
self._router = Router(self.name, group, loop=self._loop)
File "c:\users\tyrel\robocluster\robocluster\router.py", line 279, in __init__
self._caster = Multicaster(group, port, loop=loop)
File "c:\users\tyrel\robocluster\robocluster\router.py", line 114, in __init__
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
AttributeError: module 'socket' has no attribute 'IPPROTO_IPV6```
@device.on('*/hello')
def print_it(event, data):
print(event, data)
Should print something like:
otherDevice/hello <some data>
However, this is what is actually printed:
otherDevice <some data>
It would be nice to allow for subscribing to events from arbitrary locations:
@device.on('*/event')
async def callback(event, data):
...
We have automated testing set up for a linux environment on travisci.org, but nothing to ensure that it works on Windows platforms. Appveyor is a service that does continuous integration on windows machines.
Traceback (most recent call last):
File "/home/carl/Programming/usst/robocluster/robocluster/looper.py", line 86, in _daemon_wrapper
await coro(*args, **kwargs)
File "/home/carl/Programming/usst/robocluster/robocluster/device.py", line 208, in _wrapper
await coro()
File "Simulator.py", line 85, in publish_state
await simDevice.publish("GPSPosition", pos_list)
File "/home/carl/Programming/usst/robocluster/robocluster/device.py", line 107, in publish
await self._member.publish(topic, data)
File "/home/carl/Programming/usst/robocluster/robocluster/member.py", line 70, in publish
for peer in self._peers.values():
RuntimeError: dictionary changed size during iteration
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.