Giter Site home page Giter Site logo

kompact's Introduction

Kompics

Build Status Download javadoc

A message-passing component model for building distributed systems.

See the documentation for more information.

kompact's People

Contributors

adamhass avatar bathtor avatar dependabot[bot] avatar haraldng avatar huitseeker avatar johanmickos avatar max-meldrum avatar mrobakowski avatar segeljakt avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kompact's Issues

Support UDP

Not much point in having protocol selection in the ActorPath if there is only one protocol to chose from.

Network Meta Port

Expose a new port from the Network Dispatcher which broadcasts information about channel session loss (or other status events perhaps), can respond with network metrics on demand, and allows actors to indicate that they would like to see a particular channel closed (for P2P with random overlay maintenance).

Tasks

  • Implement
  • Write tutorial(s)

Components using BufferPool are killed incorrectly

Harald helped me figure out a critical issue in the BufferPool system.

Components which kill themselves or are killed don't wait for their BufferChunks to become Unlocked but simply drops the references. This can cause critical-issues if there are outstanding ChunkLeases waiting to be sent over the network.

Will fix this issue ASAP.

Re-schedule dispatcher when underlying network channel becomes ready

Problem:
Frame sending on individual TCP connections takes place over bounded futures-aware mpsc::Sender channels. If the channel responds with NotReady due to being at capacity, the dispatcher queues the frame to be sent on the channel's VecDeque. When this happens, the dispatcher is never re-scheduled when the underlying channel becomes available. We need to ensure that it does.

The problem is essentially: "How do we cross the barrier between two separate asynchronous systems?"

Potential Solutions:
One way to address this is to make the mpsc::Sender an UnboundedSender and let the system build up frames when the network is unavailable. This might be a good solution for now, considering that the dispatcher already does unbounded queuing when the "network bridge" is unable to send frames.

Another way to address this, given the current system architecture, is to have a listening task in tokio-land for each (mpsc::Sender, mpsc::Receiver) pair. This task would be spawned in tandem with the connection-driving task, and boil down to the following logic:

  • Poll the underlying mpsc::Sender for readiness, delegating NotReady responsibility to the sender
  • When ready, send a ConnectionState::Ready event to the dispatcher
  • The dispatcher, upon receiving the Ready state change, attempts to drain the connection's built-up queue until it's empty.

The above logic would still require the readiness poller to be parked when not needed and re-scheduled when the dispatcher has queued framed, which adds scheduling overhead.

Update Futures and Tokio

With v0.5.0 I updated all dependencies except for the futures crate, since v0.2.x has been yanked, and tokio since I'm slightly afraid to touch that, for now. But eventually it must get done.

This is issue is a reminder to do so, eventually.

Implement lazy ser/deser for local messages

When dispatching things to local ActorPaths, the message trickles down to the Dispatcher. When this happens, the serializable content should really not be serialized/deserialized, but abstracted away by the serializer/deserializer to completely skip writing into a BufMut.

@Bathtor feel free to add any comments here to clarify technical details.

Detect and clean-up buffers for Unreachable hosts

Currently the NetworkDispatcher will queue up new messages to an unreachable destination in the QueueManager indefinitely while it waits for the NetworkThread to establish the connection.

Need to add a maximum number of retries for the NetworkThread and a "Host Unreachable" status update from the NetworkThread to the NetworkDispatcher such that it can clean-up the QueueManager.

If this isn't fixed we can expect failure detectors monitoring failed systems to eventually take up all local memory with failed ping requests.

Async/Await support

In particular for the system notifications and for request response, if that makes for a nice API.

Make nicer `match_deser` sytnax

Using the custom parser approach like in crossbeam I could do better syntax (like allowing both matching messages with and without Deserialiser, and maybe getting rid of the !Err.

Also consider prefixing all match arms with a unique identifier like, for example:

match_deser! { msg {
         msg(res: MsgA) => EitherAOrB::A(res),
         msg(res: MsgB [BSer]) => EitherAOrB::B(res),
         err(_e) => panic!("test panic please ignore"),
         default(_) => unimplemented!("Should be either MsgA or MsgB!"),
    }
}

Open Channel Limit

Have a configuration limit on concurrently open network channels, and when reaching that limit start closing off old channels in a LRU manner.

Session Sequence Number

In order to support applications with very strict FIFO requirements, we might need to annotate network messages with a running session id for the TCP channel they were sent on. This way applications can identify the occurrence of a session-loss and re-establishment in-order and act according to their required protocol.

Named actor registration

The current actor registration API only registers UUIDs. We want to be able to designate human-readable aliases for actor references.

Internally, the dispatcher (who receives actor registration notifications) should deduplicate identical aliases. This can be done by simply appending an integer to the alias, e.g. my-alias, my-alias-1, and so on.

Lifecycle

Add lifecycle support for components.

Right now KompicsSystem::create doesn't actually start a component, but since there is no lifecycle support, it will run events sent to it anyway. Just the "Start Handler" never gets called, which is very confusing behaviour.

Named and Parallel Channels

Allow specifying a virtual channel identifier (i.e. a "name") to an actor path, in order to separate logical flows of data in the system (alá Partisan).

Also allow configuration of multiple open channels between two hosts and load balancing (or channel assigning) between them to increase throughput on high-BDP links while maintaining FIFO guarantees.

Tasks

  • Implement
  • Write tutorial(s)

arcon networking

The existing code needs some adjustments in order to integrate better with arcon.

Currently, most network variables are hardcoded (e.g., BUFFER_SIZE, INITIAL_BUFFER_LEN, MAX_POOL_SIZE). Also, there are assertations on the static BUFFER_SIZE variable. While this works fine for the hardcoded DefaultChunk, it will not work with a custom ArconChunk.

Implement more thorough network tests

Currently only basic networked ping-pong functionality is verified. We should implement more thorough tests and harnessing methods to verify the system's correctness and behavior in more complex scenarios.

Actor reference cleanup

When the dispatcher receives RegistrationEnvelope::Deregister(actor_ref) events, it needs to remove the actor reference from the lookup table.

The ActorStore currently only has one-way mappings of Vec<String> and UUID to ActorRefs, meaning deletions are O(n) where n is the number of ActorRefs currently stored. If we want to speed this up, we can use bidirectional maps.

Add TCP no delay again

It ran away, but it should probably be there. Perhaps behind a config option, in case someone does want to optimise for throughput over latency?

Buffer Chaining

Allow chaining of buffer chunks so messages of arbitrary size can be serialised, if necessary.

Custom framing for UDP

Currently (as of #75 ) UDP uses the same framing header as TCP packets, even though much of it is unused on the UDP path.

While it is easier for maintenance to keep the the same, it does waste between 9 and 4 bytes (depending on what we want to keep) of previous space in a UDP packet.

This issue should be considered a reminder to re-evaluate UDP framing in the future.

Port numbers in test-cases

Many of the networking test-cases use specific port-numbers rather than letting the OS assign port-numbers, which leads to flaky tests.

The worst offenders are remote_lost_and_continued_connection and remote_lost_and_dropped_connection in NetworkDispatcher, in which a system needs to be started with an identical network-config. The two test-cases currently use a static port-number to achieve that rather than booting the second-system and capturing its network config and then restarting the system with the captured network config.

There's likely a few more test-cases with static-port numbers that I naively assigned to avoid the static-ports to not collide with OS assignment of ports in test-cases that execute in parallel. I will go over all network related test cases and make sure they let the OS assign ports to make development and testing easier and more portable between hosts that may be using the specified ports (or more likely lingering KompactSystems/slow port release causing collisions).

It's lazy coding on my behalf but it should be easy to fix. Since it's not critical I'll leave it as a self-assigned issue while I run my experiments.

Fault Handling

Basic lifecycle management (#1) is complete, but fault handling is still missing to full lifecycle management.

Fringe case in Network connection loss-and-reestablishement may break FIFO

The Fringe Case:

1. A channel is up and running.
2. NetworkDispatch is sending messages to the network thread
3. The Channel dies.
	3a. A Disconnect Message is sent to NetworkDispatch
	3b. A batch of messages are rejected.
4. A new batch of messages from the NetworkDispatch are sent to the NetworkThread
5. The Channel reconnects by remote system request
	5a. A Connected message is sent to the NetworkDispatch
6. The new batch of messages from the NetworkDispatch from step 4 are sent over the channel
7. The Disconnect Message from step 3a is received by the NetworkDispatch
8. All Rejected Messages are enqueued in the priority queue
9. The connect message from 5a is received by the NetworkDispatch
	9a. The priority queue and the outgoing queue is drained to the NetworkThread
-> Messages sent in step 4 will be delivered before messages sent in step 2.

This case is fringe because it's entirely dependent on very unfortunate and rare timings in which the channel and its handshake completes before the NetworkDispatcher has received the disconnect message. Broken channels definitely print error messages and this fringe case can only occur after that has happened.

A Fix which will arrive with the next big version of the NetworkThread, which I'm planning to do after the thesis:
Each NetworkChannel will have a dedicated queue with receiver held by NetworkThread and sender held by NetworkDispatch. On connection-loss the NetworkThread can simply send the receiver to the NetworkThread such that it can drain all messages into the new queue if/when it arrives from the NetworkThread. Thereby trivially solving the timings as any new messages sent over the new reconnected channel would have to be sent through a sender of the newly created queue, which would have to be received after the old sender has already been received by the NetworkDispatcher, thus the old queue will always be available to be completely drained into the new queue before new messages are enqueued.

Support UDT

We want to enable UDT in the transport layer. UDT comes as a C++ library with a small API surface, and gluing it to be supported in Rust is surprisingly simple (see this repo, for example).

Theoretical Approach

  1. Write or find C-bindings to glue Rust to the C++ API
  2. Wrap the epoll-based API calls with Mio support (through the Evented interface
  3. Implement read/write over the provided UDT sockets/streams
  4. Let the implementations from Tokio, Futures, and Mio to do their magic and have full asynchronous support for UDT in Tokio land

Problems

Mio allows two ways to register a pollable resource through its Evented interface: (1) user handles, driven in user-space by hand-written "readiness" updates; and (2) system file handles such as sockets that can be monitored by the system selector, thus delegating readiness updates to the operating system.

The epoll-like API exposed by UDT operates on logical UDT sockets rather than system-specific file descriptors. This means that there's no system file handle providing epoll-like capabilities that Mio's Evented can use. For Mio integration, we're left with option #2: writing a dedicated user-space poller which drives the readiness state of UDT sockets (presumably from a separate thread). In other words, re-writing the Poll functionality found in Mio to work on "something that looks like epoll but isn't necessarily the system selector."

There is a problem with the user-space Evented types, though: they come with very few guarantees. See the Mio quote below:

There is no guarantee that readiness establishes any sort of memory ordering. Any concurrent data access must be synchronized using another strategy.
There is also no guarantee as to when the readiness event will be delivered to poll. A best attempt will be made to make the delivery in a "timely" fashion.

What about Netty?

Netty uses a very complex tree of interfaces and abstract classes. Netty also supports Java's NIO tree of interfaces and abstract classes. Within NIO, we find selectors which multiplex channels. The API exposed by the UDT library fits quite nicely into NIO's selectors, and thus Netty's single-threaded NioEventLoop can drive the polling on the UDT resources and pipeline them through the rest of Netty's features.

Potential Workarounds

  1. Stick to Mio and Tokio for UDT and write a custom poller to drive the UDT library and the connections registered in it, forwarding the epoll-like events from UDT to Mio's readiness events. This approach can then plug into the futures-oriented pipelines of Tokio which is a big plus, but comes at the drawbacks of being time-consuming to write and having loose guarantees provided by Mio.
  2. Drive UDT connections in a separate UDT-dedicated thread, funneling recv/send messages between the dedicated thread and the dispatcher using channels.
  3. Rewrite the UDT library in Rust with Mio support from the core, allowing Tokio and Mio to drive the underlying UDP sockets, and implementing the protocol logic using futures.
  4. Other ideas?

Actor path serialization

We need to [de]serialize system and actor paths when shipping around MsgEnvelopes between Kompics systems.

Suggested Structure

System Path

 +-------------------+-------------------+-----------------------+
 | Path type (1 bit) | Protocol (5 bits) | Address Type (2 bits) |
 +-------------------+-------------------+-----------------------+----------------+
 |                   Address (4/16/* bytes)                   ...| Port (2 bytes) |
 +---------------------------------------------------------------+----------------+

Named Path

 +---------------------------+
 | System path (*)         ...
 +---------------+-----------+-------------------------------+
 |      Named path (2 bytes prefix + variable length)      ...
 +-----------------------------------------------------------+

Unique Path

 +---------------------------+
 | System path (*)         ...
 +---------------+-----------+
 |      UUID (16 bytes)      |
 +---------------------------+

Actor Path

 +---------------------+
 | Actor path (*)    ...
 +---------------------+

NOTE The 1-bit path type will be packed into the byte containing the system path's protocol and address type.

Descriptions

Field Description
Path Type 1-bit identifier for Named or Unique actor paths
Protocol TCP, UDP, UDT, and any other transport protocol we may support, up to 2^6=64 different ones.
Address Type IPv4, IPv6, or variable-length domain name
Address 4-byte IPv4 address, 16-byte IPv6 address, or variable-length (1 byte length-prefixed) domain name
Port Reachable port at the address, values 0-65535
Named path Extended path after address and port; 2-byte length prefix
UUID 128-bit Universally Unique Identifier

Better NetMessage API

I am unhappy with the current API for NetMessage, because it basically forces a very common case to have an unnecessary clone(), e.g.:

fn receive_network(&mut self, msg: NetMessage) -> () {
  let sender = msg.sender().clone();
  match msg.try_deserialise::<MsgType, DeserType>() {
    Ok(message) => // do stuff with sender and message
    Err(e) => // log error whatever
  }
}

The clone occurs because try_deserialise needs to consume the NetMessage.

I see a bunch of ways around this, but not sure what is the best way to go, so I'm looking for feedback or alternative suggestions.

Here's what I got so far:

Option 1

Move the serialisation id and the data into its own struct (let's call it NetData perhaps) and have NetMessage be a struct with public fields of sender, receiver, data. This way anyone can deconstruct it using normal pattern matching, e.g.

  let NetMessage{sender, receiver, data} = msg;
  match data.try_deserialise::<MsgType, DeserType>() {
    Ok(message) => // do stuff with sender and message
    Err(e) => // log error whatever
  }

It's quite a bunch of boilerplate, though, and most people don't care about receiver, which adds even more boilerplate.

Option 1B

I could of course do this destructuring eagerly and change the method signature of receive_network to something like:

fn receive_network(&mut self, sender: ActorPath, _receiver: ActorPath, msg: NetData) -> () {
  match msg.try_deserialise::<MsgType, DeserType>() {
    Ok(message) => // do stuff with sender and message
    Err(e) => // log error whatever
  }
}

But again, that's a lot of fields to ignore when we don't need receiver or sender, which bloats the code somewhat.

Option 1C

Could also add a few convenience methods to NetMessage that do the destructuring and take closures with the values, e.g.:

  msg.use_sender(|sender, data| {
    match data.try_deserialise::<MsgType, DeserType>() {
      Ok(message) => // do stuff with sender and message
      Err(e) => // log error whatever
    }
  });

Option 2

I could make the sender and receiver fields inside NetMessage an Optional<ActorPath>. This would allow people to take() them when needed, e.g.:

fn receive_network(&mut self, msg: NetMessage) -> () {
  let sender = msg.sender().take().unwrap(); // must exist the first time
  match msg.try_deserialise::<MsgType, DeserType>() {
    Ok(message) => // do stuff with sender and message
    Err(e) => // log error whatever
  }
}

It's not as verbose as destructing and more similar to code you might write Scala, for example. But on the other hand, adding an Optional for something that is guaranteed to be present is kind of misleading and the required unwrap() adds panicking logic to the code piece, which we know the Rust compiler isn't too happy with performance-wise.

Verify macro imports in `Actor` and `ComponentDefinition`

When using #[derive(Actor)] or #[derive(ComponentDefinition)], the user has to import a handful of things into scope which the macros use. We should verify that these should indeed be publicly exposed to the user.

Details

In the case for Actor, the RecvMessage must be in scope for the code to compile. The current solution is to publicly expose RecvMessage in the root library module (lib.rs) so that users can import it using use kompics::*. We should verify that the imports required by the macro should indeed be publicly exposed like this.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.