Giter Site home page Giter Site logo

cda-group / arcon Goto Github PK

View Code? Open in Web Editor NEW
173.0 15.0 17.0 3.34 MB

State-first Streaming Applications in Rust

Home Page: https://cda-group.github.io/arcon/

License: Apache License 2.0

Rust 99.97% Shell 0.03%
data-analytics stream-processor dataflow distributed-computing rust arrow

arcon's Introduction

Arcon

Arcon is a library for building state-first streaming applications in Rust.

ci Cargo Documentation project chat License

Project Status

Arcon is in development and should be considered experimental until further notice.

The APIs may break and you should not be running Arcon with important data!

Rust Version

Arcon builds against the latest stable release and the current MSRV is 1.56.1

Roadmap

See the roadmap here

Highlights

  • Out-of-order Processing
  • Event-time & Watermarks
  • Epoch Snapshotting for Exactly-once Processing
  • Hybrid Row(Protobuf) / Columnar (Arrow) System
  • Modular State Backend Abstraction

Example

#[arcon::app]
fn main() {
    (0..100u64)
        .to_stream(|conf| conf.set_arcon_time(ArconTime::Process))
        .filter(|x| *x > 50)
        .map(|x| x * 10)
        .print()
}

More examples can be found here.

Project Layout

Contributing

See Contributing

Community

Arcon is an ambitious project with many different development & research areas.

If you find Arcon interesting and want to learn more, then join the Zulip community!

Acknowledgements

Arcon is influenced by many great projects whether it is implementation, code practices or project structure:

License

This project is licensed under the Apache-2.0 license.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in Arcon by you shall be licensed as Apache-2.0, without any additional terms or conditions.

arcon's People

Contributors

adamhass avatar bathtor avatar max-meldrum avatar mrobakowski avatar sanskar95 avatar segeljakt avatar skyzh avatar soniahorchidan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

arcon's Issues

Add batching at the channel level

Change Arcon Nodes expected input type from:

ArconMessage<IN>

to:

Vec<ArconMessage<IN>>

This means Arcon Nodes will operate on messages in a loop:

for msg in msgs.iter() {
    match msg.event {
      Element(e) => {},
      Watermark(w) => {},
      Epoch(e) => {},
    }
}

For this, ChannelStrategy needs to be rewritten. Each implementation needs to be able to buffer msgs and also implement a flush function.

Expected outcome

Gotta benchmark it first, but it should realistically increase the throughput and improve resource utilisation.

Arcon derive and Protobuf generation

From discussions in #101

Look to turn #[arcon] into a derive macro instead and add test cases for generating ArconType structs from a Protobuf file.

#[derive(ArconType)] or #[derive(Arcon)]
#[arcon(unsafe_ser_id = 104, reliable_ser_id = 105, version = 1)]
pub struct Data {
   pub id: u32,
}

Where proto generation might look like this:

let mut config = prost_build::Config::new();
config.type_attribute(".", "#[derive(Clone, arcon::Arcon)]");
let arcon_cfg = format!("#[arcon(unsafe_ser_id = {}, reliable_ser_id = {}, version = {})]",  id, id, version);
config.type_attribute(".",  arcon_cfg);
config.compile_protos(&["file.proto"], &["proto_dir/"]).unwrap();

Add Nexmark Benchmark

Add to arcon/experiments.

Start off by adding the Nexmark queries that we support...

UDF memory management

Currently all StreamTask's set up a single WeldContext to reuse for all executions. However, a WeldContext has a default memory limit. So, some form of memory management is required.

Fix flaky socket tests

Every now and then, we get failures on the CI due to socket related test cases.
Investigate and fix!

Update CI osx version

Got an email saying macOS-10.13 will be removed soon on azure pipelines.

Update to macOS-10.14 or macOS-10.15 when it comes (3rd feb).

Arcon Default Binary Format

With us moving away from the Weld runtime (#45), we will no longer be required to use Rust structs (although we could). We currently rely on Serde and Bincode to serialise our Rust structs into bytes so that they can be sent over the wire or stored in a state backend. However, I do not see this as a good approach moving forward. Why? Mainly two reasons: (1) lack of schema evolution; (2) performance reasons.

https://github.com/erickt/rust-serialization-benchmarks

Note on Complexity This is just based on my own experiences using the frameworks (In rust) on how complicated it would be to integrate them with Arcon.

Framework Schema Evolution Performance Complexity
Serde No Medium Low
Protobuf yes Medium Medium
Flatbuffers yes High High
Cap’n Proto yes High High

Both Flatbuffers and Cap'n Proto achieve similar performance due to not having any encode/decode step.

The point of this issue is to start a discussion.

Add config

add some form of ArconConfig to configure batch size, batch timeouts, watermark interval etc....

Rework Arc batch backend

Remove the Weld runtime and generate Rust code instead. Will require changes to arcon_codegen and arcon.

Remove Option(s) in ArconElement

I don't think I was aware of the required tag at the time. We currently have ArconElement as the following:

pub struct ArconElement<A: ArconType> {
    #[prost(message, tag = "1")]
    pub data: Option<A>,
    #[prost(message, tag = "2")]
    pub timestamp: Option<u64>,
}

When we can in fact just remove the Option(s).

pub struct ArconElement<A: ArconType> {
    #[prost(message, required, tag = "1")]
    pub data: A,
    #[prost(message, tag = "2")]
    pub timestamp: Option<u64>,
}

And then the question is if we need timestamp to be an Option or if we simply remove it.

#[prost(uint64, tag = "2")]
pub timestamp: u64,

For actual pipelines, the SourceContext will either populate the timestamp using an extractor or fall back to system clock time, so in that sense, an Option is not necessary.

migrate to tell_serialised for remote channels

tell_serialised demands a reference to a ComponentDefinition and not an ActorSource, so it will require some changes to the Channel Strategies.

For unit tests, we might have to do something like this.

src_comp.on_definition(|cd| {
  channel_strategy.add(ArconEvent::Element(element), &cd);
  // flush ...
});

Change authors

Update the crates to "Arcon developers" or something like that.

Reusable message buffers for ChannelStrategies

Preallocate message buffers through #57 at start and send message by message if there is no available buffer in the ChannelStrategy and that we are not able to allocate more through the allocator.

Non-unique serialisation ids

impl<A: ArconType> UnsafeSerde<A> {
    const SID: kompact::prelude::SerId = 26;
}

This is a really bad idea. Assigning the same serialisation id to multiple concrete serialisers (i.e. one for every generic type) can end you up with someone trying to deserialise data of type B with a serialiser for type A and getting some weird as shit value by accident without error.

This may work ok for protobuf stuff assuming it maintains its own internal ids (which I'm not sure it does, tbh), but for abomonation this is definitely a no-no.

It would be better to generate serialisation id for every type A and have ReliableSerde<A> use A::SER_ID instead, so that it can't be confused with B.

Kompact has the following trait for this purpose to be implemented on A not ReliableSerde<A>:

pub trait SerialisationId {
    const SER_ID: SerId;
}

Alternatively you can make add a SER_ID field to ArconType.

We will need a way to generate those ids reliably and consistently from the Arc compiler toolchain at some point.

Replace grpc-rs with tonic

arcon_compiler is currently using grpc-rs. Replace with a pure Rust implementation such as Tonic (Which also utilises Prost).

This is not critical at this point, but leaving this here as a reminder.

Spring cleaning

Reminder to go through the Rust codebase and refactor some "hacky" implementations.

Add serialisers for Weld types

Structs currently work only with primitive types. To be able to use Weld types, e.g., WeldVec, we need to implement custom serialisers using serde.

Improving the critical path

These are just some notes/thoughts. yeah, some of them are quite obvious.

Pre-allocate ringbuffer/queue rather than doing heap allocation during execution

We are currently using Crossbeam SegQueue that performs heap allocations during runtime. This may be fine in general purpose applications, but for arcon, we need to avoid doing such on the critical path.

A simple benchmark between 2 threads for 400M msgs.

Queue Throughput Type
Crossbeam SegQueue 17M Dynamic
Crossbeam ArrayQueue 35M Pre-allocated

Single-writer channels to avoid cache invalidations

The most common channel strategy is a forward one, that is, one-to-one. Therefore, it might make sense to default to SPSC communication between threads/components.
This would also make it easier to control the flow of watermarks/epochs. If a watermark is signaled on one channel, we simply switch to the other(s) where we expect watermarks.
This would most likely make it faster for downstream event-time triggers to happen and also we avoid having to buffer msgs internally in the component as we can select channels.

Queue Throughput Type
Crossbeam SegQueue (MPMC) 17M Dynamic
Crossbeam ArrayQueue (MPMC) 35M Pre-allocated
bounded-spsc-queue 61M Pre-allocated

Core pinning and NUMA aware placement

Placement matters! For all the results seen so far, the writer and reader have been pinned in such a way that maximised performance.

The following is the layout of my reliable laptop:

cpu_layout

Queue Throughput Type Writer Reader
Crossbeam SegQueue 17M Dynamic vcore-0 vcore-1
Crossbeam SegQueue ~17M Dynamic vcore-0 vcore-2
Crossbeam ArrayQueue 35M Pre-allocated vcore-0 vcore-1
Crossbeam ArrayQueue 20M Pre-allocated vcore-0 vcore-2
bounded-spsc-queue 61M Pre-allocated vcore-0 vcore-1
bounded-spsc-queue 17M Pre-allocated vcore-0 vcore-2

Also, it might be worth looking into NUMA-aware placements.

Adaptive buffering of stream elements for improved throughput

Ideally, we would like to avoid popping of element by element. We don't want to spend most of our time in the pop() function and possibly become CPU-bound. Should use some of that memory bandwidth as well.

Suggestion is to buffer elements and read them as batched slices and iterate over them in a loop.

RocksDB crate

If we are to use RocksDB, it would be good to check which crate to use. There are two available as I see it:

Without any deep investigation, it seems like the tikv fork is more feature complete. It supports the titan plugin as well. Also, we can see how it is used in a real system (i.e. tikv).

But yeah.. needs further investigation.

Local Deployment

Add local execution cluster so that one may experiment with arcon.

Key extraction

Rather than forcing Hash upon all ArconType's, look into having our own trait for extracting a key. That is, we don't only want to be able to hash keys, users may want their own custom logic for defining keys.

Metrics Plugin

Add metrics feature to be able to push pre-defined and custom metrics to some fancy dashboard. Examples of the former could be throughput/s of Arcon nodes.

Either use something like rust-prometheus or implement something from scratch.

Add Network buffer

Add a NetworkBuffer backed by the ArconAllocator. Should look something like this:

pub struct NetworkBuffer {
    /// A raw pointer to our allocated memory block
    ptr: *mut u8,
    /// Reference to the allocator
    ///
    /// Used to dealloc `ptr` when the NetworkBuffer is dropped
    allocator: Arc<Mutex<ArconAllocator>>,
    /// A unique identifier for the allocation
    id: AllocId,
    /// How many data elements there are in `ptr`
    capacity: usize,
}

impl kompact::Chunk for NetworkBuffer {
    fn as_mut_ptr(&mut self) -> *mut u8 {
        self.ptr
    }

    fn len(&self) -> usize {
        self.capacity
    }
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.