cda-group / arcon Goto Github PK
View Code? Open in Web Editor NEWState-first Streaming Applications in Rust
Home Page: https://cda-group.github.io/arcon/
License: Apache License 2.0
State-first Streaming Applications in Rust
Home Page: https://cda-group.github.io/arcon/
License: Apache License 2.0
rustup component add rustfmt
and so on...
Reminder to go through the Rust codebase and refactor some "hacky" implementations.
The codegen tests pass successfully. However, after an initial run, the tests fail due to "Multiple matching crates error".
Rather than forcing Hash upon all ArconType's, look into having our own trait for extracting a key. That is, we don't only want to be able to hash keys, users may want their own custom logic for defining keys.
Opening too many files for OSX to handle....
We are currently maintaining two JSON impls. One in Arc
and one in arcon_spec
. Look into creating a common Protobuf specification instead.
Add benches (InMemory/RocksDB) for the different state types.
Add local execution cluster so that one may experiment with arcon.
This relates to the main arcon
crate.
Add to arcon/experiments
.
Start off by adding the Nexmark queries that we support...
Currently, the KeyBy
strategy is partitioning by the hash of the key. However, in order to support efficient range scans, we should probably move towards key range partitioning.
Some relevant information regarding the two approaches: https://tikv.org/deep-dive/scalability/data-sharding/#partitioning
Preallocate message buffers through #57 at start and send message by message if there is no available buffer in the ChannelStrategy and that we are not able to allocate more through the allocator.
With us moving away from the Weld runtime (#45), we will no longer be required to use Rust structs (although we could). We currently rely on Serde and Bincode to serialise our Rust structs into bytes so that they can be sent over the wire or stored in a state backend. However, I do not see this as a good approach moving forward. Why? Mainly two reasons: (1) lack of schema evolution; (2) performance reasons.
https://github.com/erickt/rust-serialization-benchmarks
Note on Complexity This is just based on my own experiences using the frameworks (In rust) on how complicated it would be to integrate them with Arcon.
Framework | Schema Evolution | Performance | Complexity |
---|---|---|---|
Serde | No | Medium | Low |
Protobuf | yes | Medium | Medium |
Flatbuffers | yes | High | High |
Cap’n Proto | yes | High | High |
Both Flatbuffers and Cap'n Proto achieve similar performance due to not having any encode/decode step.
The point of this issue is to start a discussion.
These are just some notes/thoughts. yeah, some of them are quite obvious.
We are currently using Crossbeam SegQueue that performs heap allocations during runtime. This may be fine in general purpose applications, but for arcon, we need to avoid doing such on the critical path.
A simple benchmark between 2 threads for 400M msgs.
Queue | Throughput | Type |
---|---|---|
Crossbeam SegQueue | 17M | Dynamic |
Crossbeam ArrayQueue | 35M | Pre-allocated |
The most common channel strategy is a forward one, that is, one-to-one. Therefore, it might make sense to default to SPSC communication between threads/components.
This would also make it easier to control the flow of watermarks/epochs. If a watermark is signaled on one channel, we simply switch to the other(s) where we expect watermarks.
This would most likely make it faster for downstream event-time triggers to happen and also we avoid having to buffer msgs internally in the component as we can select channels.
Queue | Throughput | Type |
---|---|---|
Crossbeam SegQueue (MPMC) | 17M | Dynamic |
Crossbeam ArrayQueue (MPMC) | 35M | Pre-allocated |
bounded-spsc-queue | 61M | Pre-allocated |
Placement matters! For all the results seen so far, the writer and reader have been pinned in such a way that maximised performance.
The following is the layout of my reliable laptop:
Queue | Throughput | Type | Writer | Reader |
---|---|---|---|---|
Crossbeam SegQueue | 17M | Dynamic | vcore-0 | vcore-1 |
Crossbeam SegQueue | ~17M | Dynamic | vcore-0 | vcore-2 |
Crossbeam ArrayQueue | 35M | Pre-allocated | vcore-0 | vcore-1 |
Crossbeam ArrayQueue | 20M | Pre-allocated | vcore-0 | vcore-2 |
bounded-spsc-queue | 61M | Pre-allocated | vcore-0 | vcore-1 |
bounded-spsc-queue | 17M | Pre-allocated | vcore-0 | vcore-2 |
Also, it might be worth looking into NUMA-aware placements.
Ideally, we would like to avoid popping of element by element. We don't want to spend most of our time in the pop() function and possibly become CPU-bound. Should use some of that memory bandwidth as well.
Suggestion is to buffer elements and read them as batched slices and iterate over them in a loop.
https://github.com/dtolnay/trybuild
Makes it easier to see compilation errors and warnings of the generated code
Create larger pipelines (e.g., source -> task-> window -> task -> sink ) under tests/ and verify results..
arcon_compiler
is currently using grpc-rs. Replace with a pure Rust implementation such as Tonic (Which also utilises Prost).
This is not critical at this point, but leaving this here as a reminder.
Remove the Weld runtime and generate Rust code instead. Will require changes to arcon_codegen
and arcon
.
add some form of ArconConfig to configure batch size, batch timeouts, watermark interval etc....
impl<A: ArconType> UnsafeSerde<A> {
const SID: kompact::prelude::SerId = 26;
}
This is a really bad idea. Assigning the same serialisation id to multiple concrete serialisers (i.e. one for every generic type) can end you up with someone trying to deserialise data of type B with a serialiser for type A and getting some weird as shit value by accident without error.
This may work ok for protobuf stuff assuming it maintains its own internal ids (which I'm not sure it does, tbh), but for abomonation this is definitely a no-no.
It would be better to generate serialisation id for every type A
and have ReliableSerde<A>
use A::SER_ID
instead, so that it can't be confused with B
.
Kompact has the following trait for this purpose to be implemented on A
not ReliableSerde<A>
:
pub trait SerialisationId {
const SER_ID: SerId;
}
Alternatively you can make add a SER_ID
field to ArconType
.
We will need a way to generate those ids reliably and consistently from the Arc compiler toolchain at some point.
Update to 0.6 release as it fixed a stack overflow issue.
If we are to use RocksDB, it would be good to check which crate to use. There are two available as I see it:
Without any deep investigation, it seems like the tikv fork is more feature complete. It supports the titan plugin as well. Also, we can see how it is used in a real system (i.e. tikv).
But yeah.. needs further investigation.
Every now and then, we get failures on the CI due to socket related test cases.
Investigate and fix!
Add a NetworkBuffer backed by the ArconAllocator. Should look something like this:
pub struct NetworkBuffer {
/// A raw pointer to our allocated memory block
ptr: *mut u8,
/// Reference to the allocator
///
/// Used to dealloc `ptr` when the NetworkBuffer is dropped
allocator: Arc<Mutex<ArconAllocator>>,
/// A unique identifier for the allocation
id: AllocId,
/// How many data elements there are in `ptr`
capacity: usize,
}
impl kompact::Chunk for NetworkBuffer {
fn as_mut_ptr(&mut self) -> *mut u8 {
self.ptr
}
fn len(&self) -> usize {
self.capacity
}
}
Support the Apache Arrow data format in Arcon. It will require #45 to be completed first.
Update the crates to "Arcon developers" or something like that.
Structs currently work only with primitive types. To be able to use Weld types, e.g., WeldVec, we need to implement custom serialisers using serde.
Got an email saying macOS-10.13 will be removed soon on azure pipelines.
Update to macOS-10.14 or macOS-10.15 when it comes (3rd feb).
implement some form of macro or trait to reduce code duplication and make it easier to reason about.
Remove unsafe calls and fix Ports
Add metrics feature to be able to push pre-defined and custom metrics to some fancy dashboard. Examples of the former could be throughput/s of Arcon nodes.
Either use something like rust-prometheus or implement something from scratch.
Change Arcon Nodes
expected input type from:
ArconMessage<IN>
to:
Vec<ArconMessage<IN>>
This means Arcon Nodes
will operate on messages in a loop:
for msg in msgs.iter() {
match msg.event {
Element(e) => {},
Watermark(w) => {},
Epoch(e) => {},
}
}
For this, ChannelStrategy
needs to be rewritten. Each implementation needs to be able to buffer msgs and also implement a flush
function.
Expected outcome
Gotta benchmark it first, but it should realistically increase the throughput and improve resource utilisation.
Currently all StreamTask's set up a single WeldContext to reuse for all executions. However, a WeldContext has a default memory limit. So, some form of memory management is required.
Add support for Kubernetes in the Appmaster
drop 0.1x
I don't think I was aware of the required
tag at the time. We currently have ArconElement
as the following:
pub struct ArconElement<A: ArconType> {
#[prost(message, tag = "1")]
pub data: Option<A>,
#[prost(message, tag = "2")]
pub timestamp: Option<u64>,
}
When we can in fact just remove the Option(s).
pub struct ArconElement<A: ArconType> {
#[prost(message, required, tag = "1")]
pub data: A,
#[prost(message, tag = "2")]
pub timestamp: Option<u64>,
}
And then the question is if we need timestamp to be an Option
or if we simply remove it.
#[prost(uint64, tag = "2")]
pub timestamp: u64,
For actual pipelines, the SourceContext
will either populate the timestamp using an extractor or fall back to system clock time, so in that sense, an Option is not necessary.
Same goal as #33
tell_serialised
demands a reference to a ComponentDefinition and not an ActorSource, so it will require some changes to the Channel Strategies.
For unit tests, we might have to do something like this.
src_comp.on_definition(|cd| {
channel_strategy.add(ArconEvent::Element(element), &cd);
// flush ...
});
Common methods/interface for Sources added to Arcon
From discussions in #101
Look to turn #[arcon] into a derive macro instead and add test cases for generating ArconType structs from a Protobuf file.
#[derive(ArconType)] or #[derive(Arcon)]
#[arcon(unsafe_ser_id = 104, reliable_ser_id = 105, version = 1)]
pub struct Data {
pub id: u32,
}
Where proto generation might look like this:
let mut config = prost_build::Config::new();
config.type_attribute(".", "#[derive(Clone, arcon::Arcon)]");
let arcon_cfg = format!("#[arcon(unsafe_ser_id = {}, reliable_ser_id = {}, version = {})]", id, id, version);
config.type_attribute(".", arcon_cfg);
config.compile_protos(&["file.proto"], &["proto_dir/"]).unwrap();
UDP for now...
Component or data structure to allocate memory for state backends or network buffers etc..
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.