Giter Site home page Giter Site logo

seznam / euphoria Goto Github PK

View Code? Open in Web Editor NEW
82.0 13.0 11.0 3.99 MB

Euphoria is an open source Java API for creating unified big-data processing flows. It provides an engine independent programming model which can express both batch and stream transformations.

License: Apache License 2.0

Java 99.54% Shell 0.46%
big-data apache-flink apache-spark java-api hadoop kafka hdfs unified-bigdata-processing streaming-data batch-processing

euphoria's People

Contributors

cervebar avatar dependabot[bot] avatar dmvk avatar dusan-rychnovsky avatar horkyada avatar je-ik avatar kination avatar mareksimunek avatar o2-be-build avatar svxaverius avatar t-novak avatar vaclavplajt avatar vanekjar avatar xitep avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

euphoria's Issues

euphoria-flink: Allowed lateness

In streaming environment it happens elements arrive out-of-order. Our FlinkExecutor has a possibility to setup a time period called allowed lateness via setAllowedLateness method. Allowed lateness specifies by how much time elements can be late before they are dropped.

Currently allowed lateness is applied during event time assignment in ReduceStateByKeyTranslator.EventTimeAssigner. In case no event time extraction is used (i. e. flow is run using ingestion time characteristics) the allowed lateness is ignored. It would be better to move allowed lateness to AbstractWindowOperator and remove it from assigner.

Redesign Dataset#getPartitioning()

The method Dataset#getPartitioning() in the client API doesn't make sense in its current form (spotted as part of #15). Partitioners themselves are a property of operators not datasets. However, we still need to preserve the "numPartitions" property available through the #getPartitioning() which is used to relay an dataset's physical split further down a flow.

I suggest to change the method to a plain int Dataset#getNumPartitions(). A negative value denotes "undefined" just as it is handled so far.

euphoria-flink: optimize rocks db access for combining rbk

A combining RBK operator currently does state.get(); state.clear(); state.add(); to store its running state value. The call to state.clear() is unnecessary. Ideally, we'd replace state.clear(); state.add() with state.set(), which should be more efficient.

  • This change can and should be achieved in the default implementation of RBK; see RBK#getBasicOps. accumulate(); state.set(..); much better expresses the intent and allows for an optimal implementation.
  • It is applicable only for the combining scenario.
  • We shall measure and document the impact of the change.

Consider documenting nullability

So far we've followed the convention that unless otherwise stated null values are not allowed. Right now, we document allowed nullability in comments. Consider switching to a stronger and more visible form of documentation by using @javax.annotation.Nullable and friends from the jsr305.jar. (We'll add add it as an explicit compile dependency even though we do inherit it transitively through the dependency on guava.)

euphoria-core: generic FlowBuilder

Right now, running a (single) flow requires the programmer to write the usual class with a main method, setting up the executor, submitting her flows to it and executing it. This is a lot of boilerplate in the way of getting a flow to execute. Ideally, we'd be able to say:

spark-submit [parameters] --class cz.seznam.euphoria.spark.runner.Runner --jars usercode.jar euphoria-assembly.jar [name of class providing the flow to executed; located in the provided jars]

The rough idea is to have each executor providing a convenience Runner being able to take a class (by name), instantiate it, and then derive a flow from it to execute. Actually, a lot of the "runner" code is expected to be the same across different executors, thus, we want to abstract the process into euphoria-core. The named, user provided class will need to implement a common interface, through which the abstraction will create the flow to execute; FlowBuilder seems like a good name.

euphoria-core: sort operator

We'd like to:

Sort.named("SORT")
    .of(input)
    .by(e -> Comparable)
    .setPartitioning(new RangePartitioning())
    .output();

Range partitioning by the given .by function and sorting by the same attribute within each partition.

euphoria-core: unbounded source without explicit windowing defined

Executing a stateful operator which itself does not define a windowing strategy and which consumes - directly or indirectly - a non-windowed, unbounded data set is basically undefined. This is, such an operator, e.g. ReduceByKey, consumes an infinite stream of data never reaching a point at which results can be emitted. Note:

  • stateful: consuming non-windowed, unbounded data sources through stateless operators, e.g. FlatMap, Union, Repartition is defined. There's no problem for them. These emit results immediately upon receiving input.
  • non-windowed and unbounded: this is, all along the way from the unbounded data source down to the operator in question, there is no explicit windowing part of the game. since euphoria has the notion of "attached windowing", one windowing on the way to the operator in question, makes the processing defined again - even if that operator itself has no explicit windowing defined.

It'd be good to immediately fail the attempt to translate a flow with the described situation. The situation is mostly unintentional, but the mistake is hard to spot at runtime - basically leaving the programmer wondering why no output is produced.

One might argue, though, that the situation is practically valid (e.g. it works nicely on flink actually): the semantic of such a situation might well be that the results are produced when the unbounded data source is closed/cancelled. however, such computed results are non-deterministic and unsound with the theory.

euphoria-flink: data type transparency

Execution engines can do much better at optimizations if they transparently know what types they are working with. Efforts in the Spark as well as the Flink community proof optimization potentials in this regard. As a layer above such execution engines Euphoria must provide type specific information to executors in order to stay relevant (in terms of performance). In this ticket we'll focus on Flink.

Motivation

The primary background for this ticket was triggered by an attempt to shave off some of the overhead mentioned in #13 and #14.

Experiment

Using opaque data types with general purpose serialization has considerable, negative effects on optimizations that Flink tries to apply by default. I was able to see the effect in an experiment, where the "core" operation of the flow is the following (basically just a windowed word-count):

      ReduceByKey
            .of(input)
            .keyBy(Pair::getSecond)
            .valueBy(e -> 1L)
            .combineBy(Sums.ofLongs())
            .windowBy(Time.of(shortInterval), Pair::getFirst)
            .output();

Changing Euphoria's flink batch executor such that it uses Flink's native and Flink's pojo based serializers for the types involved during the reduce operation, squeezed out about half of the original execution time of the program. The amount of data shuffled was approximately the same. Unfortunately, such an approach required me to explicitly provide the return type of the .keyBy function to Euphoria's FlinkExecutor's internals. I was not able to derive the return type of a lambda in an automatic manner without the user having to explicitly state it.

What to do next

  • Find a non-verbose way for clients to provide required type information to the executor translators, i.e. return types of UDFs (we might not really succeed here and might end up with some verbose construct; maybe we can make the verbose constructs and the implied optimizations merely an opt-in?)
  • Leverage the types in Euphoria's Flink batch and stream executors.
    • Leverage type information for objects used internally in both flink executors to avoid general purpose serialization by kryo when possible.
    • Using TupleX instead of Pair and Tripple is a good start.
    • For internal types, utilizing POJOs instead of opaque data types is another easy gain.
    • Note: it will not help much if we switch over to Tuple but fail to provide type information of the actually used fields.
    • Attention: proper delegation of the window types from one operator to another due to attached windowing may require substantial changes to the current executor code.

Feature: Add abstraction for exactly-once reading of sources

In order to be able to read DataSource in an exactly-once manner, we need an abstraction of source commit process. One possible solution would be to get rid of the Reader and instead of that to change the Partition interface to something like this:

public interface Partition<T> extends Serializable {

  Set<String> getLocations();

  // this should be non-blocking - i.e. it should run the observer in different thread
  void observe(PartitionObserver<T> observer);

}

where the PartitionObserver would be an analogy of RxJava observers, but with a CommitCallback - i.e.

public interface PartitionObserver<T> {
  void onNext(T element, CommitCallback callback);
  void onError(Throwable err);
  void onCompleted();
}

and

@FunctionalInterface
public interface CommitCallback {
  void commit();
}

These API changes should be then reflected inside all DataSources.

The API changes described above are just a proposition, the final solution should be agreed upon in comments of this issue.

Flink executor: Make use of ProcessFunction

Recently released Flink 1.2.0 introduced a low-level API for streaming that gives access to all internal parts of the engine (data, state, timers).

The ProcessFunction can be thought of as a FlatMapFunction with access to keyed state and timers. It handles events be being invoked for each event received in the input stream(s).

More details described at https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html

It seems it's a good opportunity to optimize current state of windowing in Flink stream executor in a way that all wrapping class from package cz.seznam.euphoria.flink.streaming.windowing can be discarded and replaced by custom WindowOperator implementation relying on Euphoria-core windowing and triggers only.

euphoria-flink: state storage consistency

As part of #67 we observed an uncertainty about where trigger states get stored for merging windows. After windows get merged, one of the already existing, merged windows becomes the carrier of the target window's state (to avoid shuffling state from one window into a new one.) However, it appears that the onEventTime method utilizes partially the target window's namespace as well as the state carrier window's namesspace to handle triggers and their states.

  • Please clarify
  • Add a test to validate that trigger state for merging windows is at exactly one place

ReduceStateByKey state combiner function shouldn't use CombinableReduceFunction

Currently the state combiner is type of CombinableReduceFunction

public State apply(Iterable<State> reducedStates)

It turned out runtime needs to choose which of given states will be the final result merged into. That means the function can't be commutative.

Proposed solution is to change the signature of the state combiner to something like

public void merge(State targetState, Iterable<State> mergedStates)

This task involves change of client API as well as modification of all executors relying on this API.

STORY: Performance tuning

Subtickets

  • Performance tuning of Spark executor #12
  • Performance tuning of Flink batch executor #13
  • Performance tuning of Flink stream executor #14

Goal

According to our observations and measurements it seems Euphoria API layer has significant performance overhead compared to jobs written natively in Apache Flink or Apache Spark.

It can be assumed there will always be some amount of overhead, because Euphoria API adds another layer of abstraction with its additional data structures. Goal of this issue is to lower this overhead as much as possible.

Approximate measurements show the overhead may be as high as tens of percents. More details about performance comparison in following charts:

batch-chart

stream-chart

euphoria-core: multi-executor execution

It would be desirable to run a program consisting of multiple flows on different executors. Example:

PExecutor sparkExec = new SparkExecutor(mem, cores);
PExecutor inmemExec = new InMemExecutor(mem, cores);

Flow flow1 = Flow.create("spark-flow");
Flow flow2 = Flow.create("inmem-flow");

sparkExec.registerFlow(flow1);
inmemExec.registerFlow(flow2);

// allocate YARN containers for Spark runtime and execute "flow1"
sparkExec.execute();

// allocate another YARN container for in-mem runtime
inmemExec.execute();

// kill all YARN containers
sparkExec.shutdown();
inmemExec.shutdown();

The idea is that euphoria executors be independent of the execution engine launchers and be able to allocate their own YARN containers from within which each would operate their own execution engine.

Consider aligning naming of "batch windowing"

Apache Beam as well as Apache Flink refer to what we named Batch and Batch.BatchWindow as GlobalWindows and GlobalWindow respectively. I think the term "global" makes more sense. 1) Windowing is a separate concept/value to "batch/stream processing". 2) By aligning the names we create less confusion for new users.

In particular, I'd like to propose renaming:

  • Batch => GlobalWindowing
  • Batch.BatchWindow => GlobalWindowing.GlobalWindow or just GlobalWindowing.Window

Please comment if renaming doesn't make sense to you.

euphoria-flink: caching states

AbstractWindowOperator which is part of the Euphoria's RSBK implementation for the streaming flink executor caches once retrieved states per key/window in memory. While this seemed a good idea initially, it turns out to be problematic when a lot of different keys with lot of different windows are open - this is easily achievable with time-sliding for example. Further, it turns out that caching the state objects doesn't bring much!

  • We shall rework the window operator such that it avoids maintaining the map and thus preventing out-of-memory errors.
  • Further we shall ensure that the state objects are not unnecessarily created multiple times when processing a given input element.

euphoria-core: consider adding support for metrics

Client programs often have the need to gather and report metrics, e.g. counters, timers, gauges. The most obvious use-cases are debugging and monitoring. Currently, euphoria itself lacks any client side support in this regard. Actually, it's a question whether it makes sense to have native support through euphoria for metrics/aggregators or whether to leave it to 3rd-party APIs to take over here.

A typically example is to measure the execution time of a lengthy operation, like in the following example:

Dataset<String> words =
    FlatMap.of(lines)
    .using((String line, Context<String> context) -> {

      // 1)
      long start = System.nanoTime();
      // do some expensive work here
      context.timer("foobar-duration").record(System.nanoTime() - start, TimeUnit.NANOTIME);

     // 2)
     if (someCondition) {
       context.counter("someCondition-true").increment();
     }

     // 3)
     start = System.nanoTime();
     // do some more expensive work
     context.timer("quux-duration").record(System.nanoTime() - start, TimeUnit.NANTIME);
    })
    .output();

This example suggests, that euphoria's Context object provides a metrics API. However, to make this globally practical, it precludes that such a context object be available to most user defined functions, which is not the case at the moment.

If we end up supporting a client side API for metrics, we'll need to think about about an SPI at the same time, such that different metric reporting backends/implementation can be hooked in independently of the actual flow execution engine.

euphoria-flink: streaming word-count with time-sliding very slow

Executing a word-count like program - this is the simplest scenario to trace down the problem - against an Euphoria's Flink Streaming executor using TimeSliding windowing, reveals a huge shuffle overhead compared to an implementation of the same program using Flink's native Java API.

Experiment characteristics

  • Apart from the key, the actual "values" being shuffled (as intended by the programmer) are actually small (in both cases):
    • On native flink: it's a word (string)
    • On euphoria: it's just a long (the value to be reduced)
  • TimeSliding; the more windows the bigger Euphoria's overhead

Problem

Euphoria tries to implement a strategy such that the size of the actual values being shuffled gets reduced. This is, instead of shuffling RBK's (and RSBK's) original input elements, before the shuffle phase it extracts the "values" (to be later reduced) and sends those for reduction (instead of the original input elements). The reasoning behind this choice is to send the typically smaller values instead of the bigger input elements. However, due to windowing being applied to the operator's original input elements, it also needs to extract the original input elements' windows before sending them together with the value across the wire. The more windows there are for a single value, the bigger the overhead of data being shuffled.

The more data is being shuffled, the quicker network buffers get filled, which can lead to back pressure, thereby making the consumer side go slower. (To mitigate against back pressure, we can resize the network buffers at the cost of memory consumption.)

Our implementation of the idea to shuffle only values instead of the original input elements suffers under the following items:

  • If the "value" being shuffled isn't smaller than or equal size as the original input element, we're immediately introducing unnecessary overhead.
  • If the "value" being shuffled plus the necessary window information for the element isn't smaller than or equals size as the original input element, we're introducing unnecessary overhead. (note: this problem is not relevant when considering a partitioning schema by "window and key"; this is a potential but not-yet-implemented/explored reduce-by-key strategy.)
  • Users of the Euphoria API have no chance to express a preference for a particular strategy in regards to the shuffle.

What's to do next

  • Attempt to reduce the serialized size of window types natively provided by Euphoria
    • @vanekjar had a very nice idea about optimizing the TimeSliding windowing use-case: instead of creating many physical objects for each TimeInterval of the time slides, i.e. a HashSet<TimeInterval>, create one object which is able to lazily expand itself into these many window objects only when they are iterated - which for the RBK happens after the shuffle. the serialized form of such a "set of windows" can be of constant size independent of the number of windows presented
  • Declare what is good enough for us, i.e. it's fine to say "we don't care about word-count like reductions" and not further bother with the problem
  • Depending on the previous item, find a way to hint the executor for an alternative shuffle strategy (this doesn't necessarily have to be an easy-to-use feature, but it shall be possible.)

Conclusion

It is unclear whether the strategy currently implemented by Euphoria's streaming Flink executor is sufficient for most common use-cases. At best, we can say the strategy is not flexible to support use-cases where the original input element's size is not significantly different that the "value" being reduced. Further, from an API perspective users are not able to optimize for such use-cases (whereas users can optimize the opposite use-case in the Flink API - basically users of the Flink API can implement what Euphoria's streaming Flink executor does.)

Last but not least, the problem of this "reduce-by-key" strategy as detailed above, explains a significant part of the overhead measured under #14, which uses a benchmark with a "word-count" like reduction using TimeSliding.

documentation

This is merely a parent ticket to track efforts on improving documentation. In particular, we'd like to:

  • Provide human friendly documentation, tutorials, conceptual explanations in the form of the wiki
  • Greatly improve javadocs

All commits dedicated to improving any of this shall reference this ticket.

euphoria-flink: disable object-reuse mode

In object-reuse enabled mode, Flink’s runtime minimizes the number of object instantiations. This can improve the performance and can reduce the Java garbage collection pressure.

More about this feature can be found in Flink documentation.

We tried to make use of this feature in Euphoria executor, but it turned out it may (and it will) cause problems in case of iteration over whole partition while remembering input objects. This is typical behavior for ReduceByKey or ReduceStateByKey where input objects are stored as a state.

Referring to Flink documentation:

Input objects received from an Iterable are only valid until the next() method is called. An Iterable or Iterator may serve the same object instance multiple times. It is not safe to remember input objects received from an Iterable, e.g., by putting them in a List or Map.

Proposed solution is to give up this performance benefit and disable object reuse completely.

euphoria-core: generalize flow translation

The spark and flink flow translator use similar structures when translating a user defined flow into an execution graph. The code is more or less duplicated. We shall extract this into an executor independent layer to be used for translation purposes by all executors. See cz.seznam.euphoria.flink.FlowTranslator (and cz.seznam.euphoria.flink.streaming.StreamingFlowTranslator) and cz.seznam.euphoria.spark.SparkFlowTranslator.

euphoria-core: multiple flows, single execution

Sometimes reading data is more expensive than processing it. In scenarios, where multiple programs process the same or nearly the same data, it is beneficial to execute them in parallel over data which is read only once instead of running each program over the source data individually.

Euphoria emphasizes the creation of simple programs/flows which can be developed and run independently as opposed to writing (complicated) plugin architectures. It would be desirable to be (easily) able to combine multiple independent flows, which do want to consume the same source data, into a single execution unit, such that the source data is fetched only once. The canonical example would be to execution of independent flows - each calculating some statistics on their own - over a large stable data set without the data set being read multiple times.

make builds faster

Would it be possible to speed up mvn install for local development? It'd be great if I was able to turn off javadoc, findbugs, and possibly other build steps when developing locally. Maybe we can provide different profiles such that we can opt out of certain build steps using mvn install -P-xyz when building interactively on the command line.

euphoria-core: earlyTriggering of merging windows

Early triggering of merging windows is unsound! A merging window, e.g. session window, may change its identity over time. If such a window is fired multiple times while its identity changed, receiving downstream operators have no chance associating such multiple distinct windows as originating from one.

This typically causes problems for downstream operators which do access the window for business logic reasons. We may want to perform a validation as part of translating the flow DAG and either emit a warning or have the validation fail, if the user can access a merging window in a downstream operator.

Create POC of Kafka based executor

Create a simple executor that will use as a messaging subsystem Apache Kafka. The executor should be analogy of what Kafka Streams does, but for Euphoria API. The executor should send data via Kafka topic only inside repartitioning, where the data are actually needed to be distributed, otherwise, keep the data inside in-process pipelines (i.e. no storing inside Kafka after flatmap).

Compare performance with Apache Beam

It'd be good to evaluate how Euphoria compares with Apache Beam in terms of performance on Apache Spark as well as on Apache Flink. Similarly to #11, we'd like to measure the overhead of the Beam layer of the benchmark program compared to a raw flink and spark impl and see how that overhead differs from Euphoria's overhead.

  1. This can reveal either problems on our side or problems on the side of Apache Beam, which we can report back.
  2. This will provide a good base for deciding whether its worth to start with #32 or rather postpone it.
  3. We'll have some concrete numbers for upcoming presentations about Euphoria. (Questions regarding "Euphoria vs. Beam" are expected and the overhead comparison is at least one point of interest.)

A very premature observation on a word-count like program suggests that Apache Beam 0.6.0 on Flink 1.2 might be well suffering from expanding windows before the shuffle (see #47.)

publish benchmarks

During the course of the last few weeks we worked on improving performance of the euphoria executors mainly by investigating the differences between programs written natively in either Spark and Flink and equivalently in the Euphoria API and have make public statements about the progress. For transparency (with the broader community) and fairness, further reference, and the ability for everybody to easily run/modify/improve/discuss/measure the performance of Euphoria we want these benchmark programs open sourced as well.

I'd suggest ...

  • ... we bring our yet internal repository hosting the benchmarks into a presentable shape
    • we can discuss the progress here internally
    • do not forget: think about changing the required input data format to make it independent of our internally used datasets
  • ... we push then the contents of that repo into the benchmarks sub-folder (alternatively we can think about a separate repository for these benchmarks on github).

Output partitioning of Union operator is not well defined

Number of output partitions (as reflected by Dataset#getNumPartitions()) of Union operator should probably be one of the following:
a) sum of partitions of input Datasets
b) maximum of number of input partitions over input Dataset

Currently, it seems that the partitioning is taken from the first input Dataset, which seems to be wrong. I'm not 100% sure about the consequences of changing the semantics, so I'm leaving this for a discussion under this issue.

euphoria-flink: support InputFormat#getStatistics

Flink leverages during is optimization phase statistics returned by InputFormat#getStatistics. Currently, Euphoria's DataSource implementations have no way of providing such statistics to flink. Therefore:

  • Euphoria shall provide an additional, separate IO interface (e.g. StatisticsAware) to allow data-source implementations return the corresponding information (the implementation of the interface shall be a optional.)
  • This newly introduced interface shall be implemented by all DataSources in the euphoria-hadoop module.
  • Within euphoria-flink we'll leverage the new interface where possible.

euphoria-beam: consider creating an executor based on apache beam

It'd be interesting to see an executor, which merely translates a euphoria flow into to the Apache Beam program. Euphoria programs could thus gain the potential to execute on engines supported by Apache Beam. Further, it could show the main differences between the two APIs and potentially uncover conceptual unsoundness in one or the other API.

euphoria-flink: detached mode execution

Euphoria's flink executor relies on the environment.execute() to be a blocking call. However, if we submit a program in flink's detached mode on yarn, e.g. flink -d ..., the method call does not block, and the euphoria executor continues to immediately commit/close all data sinks making the submitted flows fail at some point in time.

We should be able to detect being run in detached mode using something a long the lines of environment.getBatchEnv/getStreamEnv() instanceof DetachedEnvironment. However, we don't have any possibility to hook in code after the execution of the job has finished (in order to commit the sinks.)

It seems like Apache Beam fought a similar problem. Though, they were lucky not to have any code to be executed strictly after the job finished.

euphoria-flink: optimize sort

Sort consists of sorting elements by a composite key <Window, ExtractedObject>.
SortTranslator for flink executor relies on GenericTypeComparator for Window as well as for user defined extracted object which causes a huge performance overhead because of deserializing both objects on every compare.

  • we can define our custom TypeComparator for known windows, or just emit NormalizableKey instead of the window
  • however without knowing the extracted objects' type we are not able to tune the comparing of those objects

euphoria-flink: reduce number of required shuffles

In particular, we'd like to resolve this FIXME in the batch and streaming ReduceStateByKeyTranslator in euphoria-flink to reduce the number of shuffle operations of the operator implemenation:

    // FIXME parallelism should be set to the same level as parent until we reach "shuffling"

euphoria-core: reduce-state-by-key: merging windowing and emitting state earlier than flush

StateFactory is invoked with an (output) context to allow the corresponding states to emit results as early as possible. however, together with a merging windowing strategy, e.g. Session, this can lead to wrong results, i.e. leading to results ending up in wrong windows. Currently known to suffer under this issue is the JoinState implementation. This issue is similar to #43.

euphoria-core: native operator support

At some point in time we'll hit the point at which users will want to use an executor specific operation, which is not available through the euphoria API. We'd like to support such scenarios by providing a dedicated operator Native which will allow users to embedded technology native operations directly into a euphoria flow. The rough idea is:

Dataset<...> euphoriaDataset =
    Native.of(input-euphoria-dataset)
    .using((input-native-execution-dataset-eg-flink) -> {
        /* allow users to use the native input dataset and require them to return another */
    })
    .output();

One motivation is to all experimentation with the new operations of the native execution engine, another to avoidthe need for euphoria specific wrappers around existing engine specific libraries, e.g. MLlib.

The above suggested example has clearly a lot of problems which we'll need to work through and either accept the implications or work out corresponding solutions. One natural implication of the Native operator will be the non-portability to different execution engines.

euphoria-core: map-side join support

Map-side join is a special derivation of the Join operator, which can be turned into a plain MapElements operation, looking up the other side of the data to join in through a random-access API.

We'd like the write down such join operations through the existing Join operator. The special derivation mentioned above is merely a runtime characteristic which does not alter the semantics of the operation. In other words, a map-side join might be more efficient in certain situations, but the operator would still produce the results if carried out through a conventional (hash or sort based) join implementation. Prerequisites:

  • We'd need to provide support for data sets that can be accessed randomly.
  • We'd need to provide some way of hinting an executor to either do or not to do a map-side join "optimization" when it's possible.
  • The join operation would be effectively only a left or a right join (with zero or exactly one joined-value to be practical) - inner join semantics can be implemented through both of these.

On the other hand, if we are about to join two data sets which are already ordered by the "join key" and one or the other would provide the possibility to seek, we'd be able to optimize such a map-side join even more efficiently by turning the look-up into a re-seek. for a left-join, this would naturally support N join-values, which the random-access approach doesn't. As we can see, this is approach is a super-set of the random-access-approach. The canonical use-case would be to join to distinct databases with the same "primary key" (a typical key/value store delivers items ordered by the "key").

We clearly need more elaboration here, before starting with it. TBD

euphoria-flink: do not send periodic watermarks when not necessary

By default, the streaming flink executor periodically forwards ingestion time watermarks from the sources. in case no windowing is part of a flow, this generates some unnecessary traffic; we can see this traffic in flink's task tracker ui.

Before translating a euphoria flow into a flink DAG, we can analyze the flow, and determine whether sending periodic watermarks starting here (i.e. InputTranslator) is necessary. If not setting the flows streamTimeCharacteristic to "processing time" might do the trick.

FIndbugs plugin is bound to incorrect phase

Currently the findbugs plugin runs in compile phase, which is not correct, because by that time no source files are ready for analysis yet. We should bind probably to test phase.

Bugs in Merging Windowing process

When I was going through the code handling window merging I realized there must clearly a few bugs. Although it is not covered by unit test. I added more complex unit test in branch 79/MergingWindowingBug that is currently failing.

The test uses merging windowing that is uncommon in two attributes:

  • Windows are always triggered only by onMerge trigger.
  • Newly created window is never result of the merge, it just triggers the merge on other windows.

Running this test will fail on both executors:

Please have a look and let me know if my observation is right.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.