tonarino / actor Goto Github PK
View Code? Open in Web Editor NEWA minimalist actor framework aiming for high performance and simplicity.
License: MIT License
A minimalist actor framework aiming for high performance and simplicity.
License: MIT License
Both https://github.com/livioribeiro/cargo-readme and https://github.com/phaazon/cargo-sync-readme are designed to ease the job.
Our README.md is currently a little concise, but our crate-level docstring is better, ready with an example. (though perhaps bragging a bit less would be appropriate). ;-) So some little tweaks and sync that to README?
GitHub actions would do, probably. Suggestions welcome.
Actors currently have a single ingress channel, and messages are all treated with the same priority. Sometimes we want to send a mix of important and non-important data to an actor, and we have seen that important data is dropped because non-important data is busy and occupies the channel.
We have send_if_not_full()
to work around the issue, but it's a bit hacky. I want to introduce a more proper priority concept to the actor channels.
enum Priority {
High,
Low,
}
impl<M> Recipient<M> {
fn send_with_priority(&self, message: N, priority: Priority) { ... }
}
If I look correctly, metrics are currently only collected in System::run_actor_select_loop() -> recv(addr.message_rx) -> msg =>
arm. That opens a possibility to collect the metrics solely inside Actor::handle(), without any explicit involvement of System
. For code reuse, this can be implemented through some MeteredActor
wrapper around actual Actor. To achieve the same ergonomics as the current state, a MeteredSystem
(containing System
) could be introduced that would wrap each passed actor during spawn()
(arriving de-facto near the starting point).
Potential advantages:
System
and SystemHandle
would cease to exist, and these two could be merged into a single type. This difference was a source of light confusion when I initially read the API docs and code.Disadvantages:
This is spiritually similar to #26 - I have somehow fallen for Actor wrappers. ;-)
Per @PabloMansanet's #67 (comment)
I wonder if we can have a default implementation of this method using type_name, since it's mostly what we're manually doing anyway.
For now - multiple ideas to re-add.
Flume has a TrySendError which returns the message that we attempted to send.
We should surface this to our API so actors can decide what to do with the message if it failed to send.
This might create an explosion of generics so let's approach this carefully.
Do we want?? Do we not want??
We could unconditionally convert actor panics into actor system shutdown, or perhaps allow users to supply a callback that decides the fate.
Caused by this code
/// Process any pending messages in the internal queue, calling wrapped actor's `handle()`.
fn process_queue(&mut self, context: &mut <Self as Actor>::Context) -> Result<(), A::Error> {
// Handle all messages that should have been handled by now.
let now = Instant::now();
while self.queue.peek().map(|m| m.fire_at <= now).unwrap_or(false) {
let item = self.queue.pop().expect("heap is non-empty, we have just peeked");
let message = match item.payload {
Payload::Delayed { message } => message,
Payload::Recurring { mut factory, interval } => {
let message = factory();
self.queue.push(QueueItem {
fire_at: item.fire_at + interval,
payload: Payload::Recurring { factory, interval },
});
message
},
};
// Let inner actor do its job.
//
// Alternatively, we could send an `Instant` message to ourselves.
// - The advantage would be that it would go into the queue with proper priority. But it
// is unclear what should be handled first: normal-priority message that should have
// been processed a while ago, or a high-priority message that was delivered now.
// - Disadvantage is we could easily overflow the queue if many messages fire at once.
self.inner.handle(&mut TimedContext::from_context(context), message)?;
}
Ok(())
}
Combined with the fact that we call this from handle()
.
I think that we should:
inner.handle()
, so that it goes through the actor loop and gets proper prioritywhile let
to if let
CC @goodhoko.
Currently, if a user wants to create an actor with non-default (normal and/or high-priority) inbox capacities, they need to either create their addr
first using Addr::with_capacity()
or spawn them using system.prepare[_fn]().with_capacity()
.
However, the actors themselves often have enough information to determine their best (or at least good default) inbox capacities, so let's allow them doing so! This has a potential to noticeably simplify "actor system setup" codebase sections.
Addr
already has actor generic type parameter, so it can access static methods and constants defined on it in its constructor.
Implementation-wise this would be about adding either an associated constant to Actor
trait to determine default capacity (if Rust allows having defaults for associated constants), or a static method (without &self
) to the Actor
trait with default implementation. In both cases we likely don't want to add any more mandatory items that Actors need to implement. Change impl Default for Addr
to use these method(s)/constant(s).
Another thing left to bikeshed is whether this should be a single constant/method returning Capacity
, or 2 separate methods/constants for normal, high-prio capacities returning usize
. Notice the DEFAULT_CHANNEL_CAPACITY
constant is not currently public.
The code seems to be already rustfmt-clean, with use_small_heuristics = "Max"
. Just some small changes beyond that.
Add rustfmt.toml
and check that in CI?
Plain actor
seems to be already taken on crates:io: https://crates.io/crates/actor
OTOH, repository name does not necessarily have to be the same as name on crates.io.
This issue seeks decision for both repository name and name of the crate on crates.io (crate name).
There are currently 4 related methods:
/// ... Use this if you need to react when the channel is full.
pub fn try_send<N: Into<M>>(&self, message: N) -> Result<(), TrySendError<M>>;
/// ... Use this if there is nothing you can do when the channel is full.
/// The method still logs a warning for you in that case.
pub fn send<N: Into<M>>(&self, message: N) -> Result<(), TrySendError<M>>;
/// ... Use this if you do not care if messages are being dropped.
pub fn send_quiet<N: Into<M>>(&self, message: N);
/// ... Use if you expect the channel to be
/// frequently full (slow consumer), but would still like to be notified if a
/// different error occurs (e.g. disconnection).
pub fn send_if_not_full<N: Into<M>>(&self, message: N) -> Result<(), TrySendError<M>>;
From API user PoV, I see some possible shortcomings:
send()
method "swallows" (just logs) certain class of errors (channel full). That seems good for real-time processing. For a generic actor crate, I would expect the "default" send method to be the most defensive one, i.e. current try_send()
the returns all the errors.Brainstorm of an alternative API (usage standpoint):
// current equivalent in new API
recipient.try_send() -> recipient.send()
recipient.send() -> recipient.send().warn_when_full()
recipient.send_quiet() -> recipient.send().ok() // generic way to ignore errors and suppress unused_result
recipient.send_if_not_full() -> recipient.send().ignore_full()
// The last case is currenly implemented a bit differently internally - through
// self.remaining_capacity(). That would need to be considered.
That should be possible by introducing custom error types + extension trait for Result<(), ThatErrorType>
. Downsides are the added boilerplate, call site verbosity, need for users to import that extension trait.
Currently, crossbeam
could be updated:
strohel@mat480s ~/projekty/actor $ cargo outdated -R
Name Project Compat Latest Kind Platform
---- ------- ------ ------ ---- --------
crossbeam 0.7.3 --- 0.8.0 Normal ---
...but I'm deliberately not updating it, so that the the automated way could be tested right away.
It seems that the functionality of the Actor::errored()
callback could be equivalently achieved internally by each actor - by wrapping its real handle()
in an error-handling shim handle()
. System::run_actor_select_loop()
could then unconditionally abort the loop when error occurs.
The only functional difference I could find was that metrics are not currently collected when Actor::handle()
returns error and Actor::errored() returns Recoverable
. But I'm not sure whether that's intentional.
In order not to regress in ergonomics, the crate (or users themselves) can provide wrapper (or adapter) actors that would provide the boilerplate. Imagine struct LogAndIgnoreErrorsAdapter<A: Actor>(inner: A); impl Actor for LogAndIgnoreErrorsAdapter {...}
or something along that lines. A drop-in replacement adapter with errored()
is also possible.
In other words: we might replace one method on the central trait by composing the existing concepts instead.
The original actor system has an API such that an actor accepts a single message type, causing us to often use enums to allow different types of messages to be sent a la:
enum MyMessage {
Hello,
Goodbye
}
impl Actor for MyActor {
type Message = MyMessage;
}
Actix and other systems use something like:
impl Handler<Hello> for MyActor { ... }
impl Handler<Goodbye> for MyActor { ... }
Would be good to work out the pros/cons.
Mentioned by @skywhale in #53 (comment) CC @bschwind.
It would be certainly more locally consistent as the receive_timeout
field is already private on purpose. It would cause some churn for crate users, but better sooner than later?
Note that Context
is no longer user-constructible after merging #53.
When a Timed<>
actor does something like
fn handle(&mut self, context, message) {
sleep(some time);
context.myself.send_now(some message);
}
Then delayed or recurring messages are never delivered to it (no matter what priority they have), because Timed
wrapper only processes them in deadline_exceeded()
, which is never called (because the queue in never empty in actors event loop handling).
I suggest to eventually provide own error type(s), (and let the boilerplate be filled in by thiserror).
Advantages:
Disadvantages:
This is not necessarily Before Publish
stuff: the need to be able to programmatically distinguish individual error variants may not be high. Recipient::send() family of functions already use a specific error, discussed in #20.
If I understand the overall architecture right, the "flow control" using actor
feels most natural when there are slow producers and fast consumers. Or if dropping messages is fine.
In the other case, the channels would quickly become full. Senders receive a specific error in that case, but it is not clear how they should react. Simply sleeping is prone to reduce throughput, and may require tuning.
A blocking variant of send() could help here, but it could open a Pandora's box at the same time (e.g. how to shut down a blocked actor?). I.e. this may be a complex topic.
This is probably not a Before Release
material. The ticket serves mainly to get people's feedback.
Sometimes we want to outright reject messages without processing them. It would be nice to be able to block them from even entering a queue. For example:
pub enum Priority {
Normal,
High,
PolitelyRefused,
}
This could allow:
enum Treat {
Apple,
Banana,
Cookie,
}
struct Child;
impl Actor for Child {
type Message = Treat;
fn priority(message: &Self::Message) -> Priority {
match message {
Treat::Apple => Priority::High,
Treat::Banana => Priority::Normal,
Treat::Cookie => Priority::PolitelyRefused,
}
}
}
where Treat::Cookie
no longer impacts either queue size of Child
. Because the priority selection is already a filter, allowing a 'skip' filter here feels like the best fit, rather than filtering within handle
. This is especially useful when a particular shape is noisy but unwanted: in the example, if there are 20x more Treat::Cookie
, our queue size requirements would be impacted, even though the messages would be ultimately skipped.
Naming is up to you; I'm not married to PolitelyRefused
. Alternates: Skip
, Reject
, Pass
, None
.
In the same vein, changing the trait requirement from Into<E>
to TryInto<E>
would go a long way to help publish, too.
Do we want??
Some pre-publish chores. "MIT or MPL, at your choice"?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.