A message-passing component model for building distributed systems.
See the documentation for more information.
A Component System in Rust
License: MIT License
A message-passing component model for building distributed systems.
See the documentation for more information.
Not much point in having protocol selection in the ActorPath
if there is only one protocol to chose from.
Right now when a component panics we always get
<timestamp> ERRO Component panicked with: Any, cid: <cid>, system: <system name>, location: <kompact dir>/core/src/component.rs:448
It always says Any
, never anything more useful. It would be nice to at least try to downcast to &str
and String
, as that covers most cases as far as I know.
Expose a new port from the Network Dispatcher which broadcasts information about channel session loss (or other status events perhaps), can respond with network metrics on demand, and allows actors to indicate that they would like to see a particular channel closed (for P2P with random overlay maintenance).
Harald helped me figure out a critical issue in the BufferPool system.
Components which kill themselves or are killed don't wait for their BufferChunks to become Unlocked but simply drops the references. This can cause critical-issues if there are outstanding ChunkLeases waiting to be sent over the network.
Will fix this issue ASAP.
Problem:
Frame sending on individual TCP connections takes place over bounded futures-aware mpsc::Sender
channels. If the channel responds with NotReady
due to being at capacity, the dispatcher queues the frame to be sent on the channel's VecDeque
. When this happens, the dispatcher is never re-scheduled when the underlying channel becomes available. We need to ensure that it does.
The problem is essentially: "How do we cross the barrier between two separate asynchronous systems?"
Potential Solutions:
One way to address this is to make the mpsc::Sender
an UnboundedSender
and let the system build up frames when the network is unavailable. This might be a good solution for now, considering that the dispatcher already does unbounded queuing when the "network bridge" is unable to send frames.
Another way to address this, given the current system architecture, is to have a listening task in tokio-land for each (mpsc::Sender, mpsc::Receiver)
pair. This task would be spawned in tandem with the connection-driving task, and boil down to the following logic:
mpsc::Sender
for readiness, delegating NotReady
responsibility to the senderConnectionState::Ready
event to the dispatcherReady
state change, attempts to drain the connection's built-up queue until it's empty.The above logic would still require the readiness poller to be parked when not needed and re-scheduled when the dispatcher has queued framed, which adds scheduling overhead.
When dispatching things to local ActorPath
s, the message trickles down to the Dispatcher
. When this happens, the serializable content should really not be serialized/deserialized, but abstracted away by the serializer/deserializer to completely skip writing into a BufMut
.
@Bathtor feel free to add any comments here to clarify technical details.
Here:
kompact/core/src/serialisation/helpers.rs
Line 95 in a403f77
Currently the Component method initialize_pool(&mut self)
needs to be called before tell_serialised()
can be used. This should be changed such that tell_serialised()
automatically initialises the pool when it is called for the first time instead.
Currently the NetworkDispatcher will queue up new messages to an unreachable destination in the QueueManager indefinitely while it waits for the NetworkThread to establish the connection.
Need to add a maximum number of retries for the NetworkThread and a "Host Unreachable" status update from the NetworkThread to the NetworkDispatcher such that it can clean-up the QueueManager.
If this isn't fixed we can expect failure detectors monitoring failed systems to eventually take up all local memory with failed ping requests.
In particular for the system notifications and for request response, if that makes for a nice API.
Using the custom parser approach like in crossbeam I could do better syntax (like allowing both matching messages with and without Deserialiser
, and maybe getting rid of the !Err
.
Also consider prefixing all match arms with a unique identifier like, for example:
match_deser! { msg {
msg(res: MsgA) => EitherAOrB::A(res),
msg(res: MsgB [BSer]) => EitherAOrB::B(res),
err(_e) => panic!("test panic please ignore"),
default(_) => unimplemented!("Should be either MsgA or MsgB!"),
}
}
Have a configuration limit on concurrently open network channels, and when reaching that limit start closing off old channels in a LRU manner.
In order to support applications with very strict FIFO requirements, we might need to annotate network messages with a running session id for the TCP channel they were sent on. This way applications can identify the occurrence of a session-loss and re-establishment in-order and act according to their required protocol.
Would be nice to derive protocol, address, port, etc. from an actorpath.
The current actor registration API only registers UUIDs. We want to be able to designate human-readable aliases for actor references.
Internally, the dispatcher (who receives actor registration notifications) should deduplicate identical aliases. This can be done by simply appending an integer to the alias, e.g. my-alias
, my-alias-1
, and so on.
As chunk is a fixed size array, increasing BUFFER_SIZE
will cause stack overflow.
https://github.com/kompics/kompact/blob/master/core/src/net/buffer.rs#L22
Add lifecycle support for components.
Right now KompicsSystem::create
doesn't actually start a component, but since there is no lifecycle support, it will run events sent to it anyway. Just the "Start Handler" never gets called, which is very confusing behaviour.
Allow specifying a virtual channel identifier (i.e. a "name") to an actor path, in order to separate logical flows of data in the system (alá Partisan).
Also allow configuration of multiple open channels between two hosts and load balancing (or channel assigning) between them to increase throughput on high-BDP links while maintaining FIFO guarantees.
The existing code needs some adjustments in order to integrate better with arcon.
Currently, most network variables are hardcoded (e.g., BUFFER_SIZE, INITIAL_BUFFER_LEN, MAX_POOL_SIZE). Also, there are assertations on the static BUFFER_SIZE
variable. While this works fine for the hardcoded DefaultChunk
, it will not work with a custom ArconChunk
.
Instead, swallow them up or return them.
Currently only basic networked ping-pong functionality is verified. We should implement more thorough tests and harnessing methods to verify the system's correctness and behavior in more complex scenarios.
When the dispatcher receives RegistrationEnvelope::Deregister(actor_ref)
events, it needs to remove the actor reference from the lookup table.
The ActorStore
currently only has one-way mappings of Vec<String>
and UUID
to ActorRef
s, meaning deletions are O(n) where n is the number of ActorRef
s currently stored. If we want to speed this up, we can use bidirectional maps.
It ran away, but it should probably be there. Perhaps behind a config option, in case someone does want to optimise for throughput over latency?
You can assign this to me.
Allow chaining of buffer chunks so messages of arbitrary size can be serialised, if necessary.
Necessary for the CDA project, but also useful in general for kompact.
Currently (as of #75 ) UDP uses the same framing header as TCP packets, even though much of it is unused on the UDP path.
While it is easier for maintenance to keep the the same, it does waste between 9 and 4 bytes (depending on what we want to keep) of previous space in a UDP packet.
This issue should be considered a reminder to re-evaluate UDP framing in the future.
Many of the networking test-cases use specific port-numbers rather than letting the OS assign port-numbers, which leads to flaky tests.
The worst offenders are remote_lost_and_continued_connection
and remote_lost_and_dropped_connection
in NetworkDispatcher, in which a system needs to be started with an identical network-config. The two test-cases currently use a static port-number to achieve that rather than booting the second-system and capturing its network config and then restarting the system with the captured network config.
There's likely a few more test-cases with static-port numbers that I naively assigned to avoid the static-ports to not collide with OS assignment of ports in test-cases that execute in parallel. I will go over all network related test cases and make sure they let the OS assign ports to make development and testing easier and more portable between hosts that may be using the specified ports (or more likely lingering KompactSystems/slow port release causing collisions).
It's lazy coding on my behalf but it should be easy to fix. Since it's not critical I'll leave it as a self-assigned issue while I run my experiments.
Basic lifecycle management (#1) is complete, but fault handling is still missing to full lifecycle management.
CustomComponents
self.actor_path()
or self.actor_ref()
The Fringe Case:
1. A channel is up and running.
2. NetworkDispatch is sending messages to the network thread
3. The Channel dies.
3a. A Disconnect Message is sent to NetworkDispatch
3b. A batch of messages are rejected.
4. A new batch of messages from the NetworkDispatch are sent to the NetworkThread
5. The Channel reconnects by remote system request
5a. A Connected message is sent to the NetworkDispatch
6. The new batch of messages from the NetworkDispatch from step 4 are sent over the channel
7. The Disconnect Message from step 3a is received by the NetworkDispatch
8. All Rejected Messages are enqueued in the priority queue
9. The connect message from 5a is received by the NetworkDispatch
9a. The priority queue and the outgoing queue is drained to the NetworkThread
-> Messages sent in step 4 will be delivered before messages sent in step 2.
This case is fringe because it's entirely dependent on very unfortunate and rare timings in which the channel and its handshake completes before the NetworkDispatcher has received the disconnect message. Broken channels definitely print error messages and this fringe case can only occur after that has happened.
A Fix which will arrive with the next big version of the NetworkThread, which I'm planning to do after the thesis:
Each NetworkChannel will have a dedicated queue with receiver
held by NetworkThread and sender
held by NetworkDispatch. On connection-loss the NetworkThread can simply send the receiver
to the NetworkThread such that it can drain all messages into the new queue if/when it arrives from the NetworkThread. Thereby trivially solving the timings as any new messages sent over the new reconnected channel would have to be sent through a sender
of the newly created queue, which would have to be received after the old sender
has already been received by the NetworkDispatcher, thus the old queue will always be available to be completely drained into the new queue before new messages are enqueued.
We want to enable UDT in the transport layer. UDT comes as a C++ library with a small API surface, and gluing it to be supported in Rust is surprisingly simple (see this repo, for example).
Evented
interfaceMio allows two ways to register a pollable resource through its Evented
interface: (1) user handles, driven in user-space by hand-written "readiness" updates; and (2) system file handles such as sockets that can be monitored by the system selector, thus delegating readiness updates to the operating system.
The epoll-like API exposed by UDT operates on logical UDT sockets rather than system-specific file descriptors. This means that there's no system file handle providing epoll-like capabilities that Mio's Evented
can use. For Mio integration, we're left with option #2: writing a dedicated user-space poller which drives the readiness state of UDT sockets (presumably from a separate thread). In other words, re-writing the Poll
functionality found in Mio to work on "something that looks like epoll but isn't necessarily the system selector."
There is a problem with the user-space Evented
types, though: they come with very few guarantees. See the Mio quote below:
There is no guarantee that readiness establishes any sort of memory ordering. Any concurrent data access must be synchronized using another strategy.
There is also no guarantee as to when the readiness event will be delivered to poll. A best attempt will be made to make the delivery in a "timely" fashion.
Netty uses a very complex tree of interfaces and abstract classes. Netty also supports Java's NIO tree of interfaces and abstract classes. Within NIO, we find selectors which multiplex channels. The API exposed by the UDT library fits quite nicely into NIO's selectors, and thus Netty's single-threaded NioEventLoop can drive the polling on the UDT resources and pipeline them through the rest of Netty's features.
We need to [de]serialize system and actor paths when shipping around MsgEnvelopes
between Kompics systems.
+-------------------+-------------------+-----------------------+
| Path type (1 bit) | Protocol (5 bits) | Address Type (2 bits) |
+-------------------+-------------------+-----------------------+----------------+
| Address (4/16/* bytes) ...| Port (2 bytes) |
+---------------------------------------------------------------+----------------+
+---------------------------+
| System path (*) ...
+---------------+-----------+-------------------------------+
| Named path (2 bytes prefix + variable length) ...
+-----------------------------------------------------------+
+---------------------------+
| System path (*) ...
+---------------+-----------+
| UUID (16 bytes) |
+---------------------------+
+---------------------+
| Actor path (*) ...
+---------------------+
NOTE The 1-bit path type will be packed into the byte containing the system path's protocol and address type.
Field | Description |
---|---|
Path Type | 1-bit identifier for Named or Unique actor paths |
Protocol | TCP, UDP, UDT, and any other transport protocol we may support, up to 2^6=64 different ones. |
Address Type | IPv4, IPv6, or variable-length domain name |
Address | 4-byte IPv4 address, 16-byte IPv6 address, or variable-length (1 byte length-prefixed) domain name |
Port | Reachable port at the address, values 0-65535 |
Named path | Extended path after address and port; 2-byte length prefix |
UUID | 128-bit Universally Unique Identifier |
Systemhandle doesn't have register_by_alias
method
I am unhappy with the current API for NetMessage, because it basically forces a very common case to have an unnecessary clone()
, e.g.:
fn receive_network(&mut self, msg: NetMessage) -> () {
let sender = msg.sender().clone();
match msg.try_deserialise::<MsgType, DeserType>() {
Ok(message) => // do stuff with sender and message
Err(e) => // log error whatever
}
}
The clone occurs because try_deserialise
needs to consume the NetMessage
.
I see a bunch of ways around this, but not sure what is the best way to go, so I'm looking for feedback or alternative suggestions.
Here's what I got so far:
Move the serialisation id and the data into its own struct (let's call it NetData
perhaps) and have NetMessage
be a struct with public fields of sender
, receiver
, data
. This way anyone can deconstruct it using normal pattern matching, e.g.
let NetMessage{sender, receiver, data} = msg;
match data.try_deserialise::<MsgType, DeserType>() {
Ok(message) => // do stuff with sender and message
Err(e) => // log error whatever
}
It's quite a bunch of boilerplate, though, and most people don't care about receiver
, which adds even more boilerplate.
I could of course do this destructuring eagerly and change the method signature of receive_network
to something like:
fn receive_network(&mut self, sender: ActorPath, _receiver: ActorPath, msg: NetData) -> () {
match msg.try_deserialise::<MsgType, DeserType>() {
Ok(message) => // do stuff with sender and message
Err(e) => // log error whatever
}
}
But again, that's a lot of fields to ignore when we don't need receiver
or sender
, which bloats the code somewhat.
Could also add a few convenience methods to NetMessage
that do the destructuring and take closures with the values, e.g.:
msg.use_sender(|sender, data| {
match data.try_deserialise::<MsgType, DeserType>() {
Ok(message) => // do stuff with sender and message
Err(e) => // log error whatever
}
});
I could make the sender
and receiver
fields inside NetMessage
an Optional<ActorPath>
. This would allow people to take()
them when needed, e.g.:
fn receive_network(&mut self, msg: NetMessage) -> () {
let sender = msg.sender().take().unwrap(); // must exist the first time
match msg.try_deserialise::<MsgType, DeserType>() {
Ok(message) => // do stuff with sender and message
Err(e) => // log error whatever
}
}
It's not as verbose as destructing and more similar to code you might write Scala, for example. But on the other hand, adding an Optional
for something that is guaranteed to be present is kind of misleading and the required unwrap()
adds panicking logic to the code piece, which we know the Rust compiler isn't too happy with performance-wise.
Rather than publicly exposing wildcarded components in the Kompics library file, I suggest making an explicit kompics::prelude
submodule containing everything that a Kompics user might use.
This is becoming standard practice and comes with some benefits:
No point in having the network thread just collect them with no way to access them.
When using #[derive(Actor)]
or #[derive(ComponentDefinition)]
, the user has to import a handful of things into scope which the macros use. We should verify that these should indeed be publicly exposed to the user.
In the case for Actor
, the RecvMessage
must be in scope for the code to compile. The current solution is to publicly expose RecvMessage
in the root library module (lib.rs
) so that users can import it using use kompics::*
. We should verify that the imports required by the macro should indeed be publicly exposed like this.
The automated test dispatch::dispatch_tests::network_cleanup
is flaky right now due to the NetworkThread not doing tear-down properly.
Need to add better test-cases and improve the shutdown procedure of the Network layer ensuring that all ports are freed when the NetworkThread is shutdown.
Flaky failure log here: https://travis-ci.org/github/kompics/kompact/jobs/664909674
Just as a convenience to the normal manual cancellation.
Alternatively, change lower level timer API to let timer functions prevent rescheduling.
After the demo, maybe^^
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.