- R&D on softwares at Toyota.
- Security & Privacy
- Auth (DID & VC)
- Data Platforms
- δΏΊγ¨γεγ¨laysakura (Blog in Japanese)
- θ·εη΅ζ΄ζΈ (in Japanese; up-to-date)
- Resume (not up-to-date)
- π§ [email protected]
- X (Twitter): @laysakura
This project forked from nivaldoh/beam
Apache Beam is a unified programming model for Batch and Streaming data processing.
Home Page: https://beam.apache.org/
License: Apache License 2.0
During pipeline execution, the bundle processor retrieves a process bundle descriptor, which contains descriptors for transforms, collections, windowing strategies, coders and other relevant descriptors to execute a pipeline. Reconstructing the pipeline graph and runtime types on a worker might have unintended side effects due to variations in the execution environment (e.g. hostname, RNGs, user names, etc.). The graph and all its types as it existed at pipeline construction time must be exactly replicable at pipeline execution time. The FnAPI BeamFnControl service enables FnAPI SDKs to retrieve serialized representations of all relevant objects to make this work.
Rust does not support handy features such as wildcard generics in Java to treat all generic trait implementations as equal types. Rust also does not ship with a usable RTTI system in stable Rust, which could then be used to facilitate (de)serialization more easily. We thus require ways to specify typed and untyped views of all objects.
For example, take this oversimplified DoFn example:
trait TypedDoFn {
type I: Any;
type O: Any;
fn process(&mut self, element: &I) -> O;
}
trait UntypedDoFn {
fn process(&mut self, element: Box<dyn Any>) -> Box<dyn Any>;
}
struct MyDoFn;
impl DoFn for MyDoFn {
type I = String;
type O = String;
fn process(&mut self, element: &Self::I) -> Self::O {
element.to_uppercase()
}
}
We can either provide a blanket UntypedDoFn implementation for all TypedDoFns as such:
impl<I, O, T: DoFn<Input = I, Output = O>> UntypedDoFn for T {
fn process(&mut self, element: Box<dyn Any>) -> Box<dyn Any> {
Box::new(self.process_element(element.downcast_ref::<<Self as DoFn>::Input>().unwrap()))
}
}
No issues so far until we need to serialize the DoFn and deserialize it on another machine.
Besides the point that the serde crate is the most likely pick for serialization/deserialization and that an efficient binary format like bincode would be preferred, how do we know which struct type the serialized data represents and what UntypedDoFn vtable is required to operate on it? Type registries with registered deserializers do not ship as part of the runtime as it does in Java for example. There are a fair number of crates which solve this, such as bevy_reflect, typetag and serde_traitobject. Both bevy_reflect and typetag (stable features) solve this using type registries, with Bevy requiring users to create and register types manually, whereas typetag relies on the inventory and ctor crates to automatically reconstruct this using life before main. Lastly, serde_traitobject (nightly features) solves this by encoding the struct and trait type id into the serialized data, which offers the most flexibility since explicit type registration (which has its own downsides) is not necessary.
It's currently impossible (for good reason) to have typetag register generic types and traits (associated as well), but a workaround can be achieved by setting up a custom macro with which we would annotate a DoFn or Coder struct to expand the UntypedDoFn implementation as entirely non-generic, like:
impl UntypedDoFn for MyDoFn {
fn process(&mut self, element: Box<dyn Any>) -> Box<dyn Any> {
Box::new(<Self as DoFn>::process_element(self, element.downcast_ref::<<Self as DoFn>::Input>().unwrap()))
}
}
One topic which requires some care and attention is utility DoFns and the desire to use modern patterns for simple Filter/FlatMap/Map transforms. Something like collection.map(|element: &String| -> String { element.to_uppercase() })
becomes impossible to write, because the closure type can't be represented without using generics. A clever way to hide this could be to write proc macros which unpack a closure expression's body and rewrite it as a struct with the necessary trait implementations, such that this can be expressed as collection.apply(Map!(|element: &String| -> String { string.to_uppercase() }))
. Utility functions like fn<I, O, F: Fn(&I) -> O> map(f: F)
could potentially be shipped as part of a trait in a separate crate which adds those utility functions to PCollection with serialization handled by e.g. serde_traitobject. This would enable us to ship Beam's core as a crate which only uses stable features, but allows users to explicitly opt into developer ergonomics with unstable Rust features.
To complete this issue we simply need to agree if we'd like to build a stable only SDK with nightly features in separate crates or if we embrace nightly features as part of the core SDK to provide more ergonomic ways of expressing ourselves.
Priority: 3 (nice-to-have improvement)
beam/sdks/rust/src/coders/required_coders.rs
Lines 187 to 193 in 4ba3711
We have trait IntoIterator
in Rust and we should rather use it.
Priority: 3
Component: beam-community
We already hold some transforms to make assertion like:
beam/sdks/rust/tests/primitives_test.rs
Line 57 in 54b4d23
But we might lack some test transforms compared to other SDKs, or rather we might be able to develop more sophisticated test framework to test pipelines.
The task should be like:
Priority: 3 (nice-to-have improvement)
cd sdks/rust
cargo test -- --skip target/debug
After you apply the diff in #16 , then the test execution pauses during Running tests/worker_test.rs
.
Priority: 3 (minor)
RUST_BACKTRACE=1 cargo test -- --skip target/debug
running 10 tests
test tests::primitives_test::tests::ensure_assert_fails_on_empty ... ignored
test tests::coders_test::tests::test_general_object_coder ... ok
test tests::primitives_test::tests::run_impulse_expansion ... ok
test tests::worker_test::tests::test_operator_construction ... ok
test tests::primitives_test::tests::run_direct_runner ... ok
test tests::coders_test::tests::test_standard_coders ... ok
test tests::primitives_test::tests::run_gbk ... FAILED
test tests::primitives_test::tests::ensure_assert_fails - should panic ... ok
test tests::primitives_test::tests::run_map ... FAILED
test tests::primitives_test::tests::run_flatten ... FAILED
---- tests::primitives_test::tests::run_gbk stdout ----
thread 'tests::primitives_test::tests::run_gbk' panicked at 'called `Option::unwrap()` on a `None` value', src/internals/serialize.rs:108:57
stack backtrace:
0: rust_begin_unwind
at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/std/src/panicking.rs:575:5
1: core::panicking::panic_fmt
at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/core/src/panicking.rs:64:14
2: core::panicking::panic
at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/core/src/panicking.rs:114:5
3: core::option::Option<T>::unwrap
at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/core/src/option.rs:823:21
4: <apache_beam::internals::serialize::TypedKeyExtractor<V> as apache_beam::internals::serialize::KeyExtractor>::extract
at ./src/internals/serialize.rs:108:24
5: <apache_beam::worker::operators::GroupByKeyWithinBundleOperator as apache_beam::worker::operators::OperatorI>::process
at ./src/worker/operators.rs:503:28
6: <apache_beam::worker::operators::Operator as apache_beam::worker::operators::OperatorI>::process
at ./src/worker/operators.rs:134:17
7: apache_beam::worker::operators::Receiver::receive
at ./src/worker/operators.rs:237:13
8: <apache_beam::worker::operators::ParDoOperator as apache_beam::worker::operators::OperatorI>::process
at ./src/worker/operators.rs:581:17
9: <apache_beam::worker::operators::Operator as apache_beam::worker::operators::OperatorI>::process
at ./src/worker/operators.rs:137:17
10: apache_beam::worker::operators::Receiver::receive
at ./src/worker/operators.rs:237:13
11: <apache_beam::worker::operators::ParDoOperator as apache_beam::worker::operators::OperatorI>::process
at ./src/worker/operators.rs:581:17
12: <apache_beam::worker::operators::Operator as apache_beam::worker::operators::OperatorI>::process
at ./src/worker/operators.rs:137:17
13: apache_beam::worker::operators::Receiver::receive
at ./src/worker/operators.rs:237:13
14: <apache_beam::worker::operators::ImpulsePerBundleOperator as apache_beam::worker::operators::OperatorI>::start_bundle
at ./src/worker/operators.rs:450:13
15: <apache_beam::worker::operators::Operator as apache_beam::worker::operators::OperatorI>::start_bundle
at ./src/worker/operators.rs:114:46
16: apache_beam::worker::sdk_worker::BundleProcessor::process::{{closure}}
at ./src/worker/sdk_worker.rs:378:13
17: <apache_beam::runners::direct_runner::DirectRunner as apache_beam::runners::runner::RunnerI>::run_pipeline::{{closure}}
at ./src/runners/direct_runner.rs:84:51
18: <core::pin::Pin<P> as core::future::future::Future>::poll
at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/core/src/future/future.rs:125:9
19: apache_beam::runners::runner::RunnerI::run_async::{{closure}}
at ./src/runners/runner.rs:64:41
20: <core::pin::Pin<P> as core::future::future::Future>::poll
at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/core/src/future/future.rs:125:9
21: apache_beam::runners::runner::RunnerI::run::{{closure}}
at ./src/runners/runner.rs:48:33
22: <core::pin::Pin<P> as core::future::future::Future>::poll
at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/core/src/future/future.rs:125:9
23: apache_beam::tests::primitives_test::tests::run_gbk::{{closure}}
at ./src/tests/primitives_test.rs:94:13
24: <core::pin::Pin<P> as core::future::future::Future>::poll
at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/core/src/future/future.rs:125:9
25: <core::pin::Pin<P> as core::future::future::Future>::poll
at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/core/src/future/future.rs:125:9
26: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}::{{closure}}
at /Users/sho.nakatani/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/runtime/scheduler/current_thread.rs:525:48
27: tokio::coop::with_budget::{{closure}}
at /Users/sho.nakatani/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/coop.rs:102:9
28: std::thread::local::LocalKey<T>::try_with
at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/std/src/thread/local.rs:446:16
29: std::thread::local::LocalKey<T>::with
at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/std/src/thread/local.rs:422:9
...
Coming from apache#21089 (comment), I noticed that you (@laysakura) welcome contributions! I'm interested in contributing, but wondering how you would prefer to handle contributions.
Do you already have plans regarding the direction/priorities, or should I just go hunting for todos in the code that seem like something I could work on? Would you like an issue created first, or rather not be bothered until (I think) I have something ready for review?
We are writing a pipeline like:
in main
#[tokio::main]
fn main() {
DirectRunner::new()
.run(|root| {
root.apply(...)
.await;
}
in test
beam/sdks/rust/tests/primitives_test.rs
Lines 91 to 103 in 54b4d23
As you may see, we have a bit of boilerplate codes.
We may want to write like:
#[apache_beam::main]
fn main(root: PValue) {
root.apply(...);
}
The task should be like:
Priority: 3 (nice-to-have improvement)
Currently, we do not have any documents on what the Beam Rust SDK should be.
In other words, I guess we should have some design policy like Java, Python, Go, and TypeScript tabs in https://beam.apache.org/documentation/programming-guide/ .
The policy should be helpful for both the SDK users and the (we) SDK developers.
Maybe construct concrete Coder from CoderUrnTree will solve it.
Priority: 3 (nice-to-have improvement)
Currently, Any
are used everywhere:
e.g.
beam/sdks/rust/src/internals/serialize.rs
Line 59 in 4bfdd26
IMO, one of the important feature of Beam Rust SDK should be statically-typed Pipeline (with generics support).
I already did some work for that in 87b9939 targeting at nivaldoh's HEAD (almost same diffs with nivaldoh#26).
But we should proceed the work to robertwb's work: nivaldoh#22 (I already hand-merged it in 9564b4e )
IMO, mod internals
is a bad one.
Priority: 3 (nice-to-have improvement)
Currently, we only have limited number of test cases.
examples/
directly and show how to use the Beam Rust SDK in codeslib.rs
See the description in #43
Priority: 3 (nice-to-have improvement)
@laysakura (me) removed CoderRegistry in ad31a7b but it's necessary in Beam Programming Model (see: https://beam.apache.org/documentation/programming-guide/#default-coders-and-the-coderregistry)
Priority: 3
Component: beam-community
See the description in #43
Priority: 3 (nice-to-have improvement)
We should test PRs automatically in CI.
The CI script should be similar to build.sh
.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
π Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. πππ
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google β€οΈ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.