Giter Site home page Giter Site logo

nemo's Introduction

Nemo

Build Status Build Status

Nemo prerequisites and setup

Prerequisites

  • Java 8
  • Maven
  • YARN settings
  • Protobuf 2.5.0
    • On Ubuntu 14.04 LTS and its point releases:

      sudo apt-get install protobuf-compiler
    • On Ubuntu 16.04 LTS and its point releases:

      sudo add-apt-repository ppa:snuspl/protobuf-250
      sudo apt update
      sudo apt install protobuf-compiler=2.5.0-9xenial1
    • On macOS:

      brew tap homebrew/versions
      brew install [email protected]
    • Or build from source:

    • To check for a successful installation of version 2.5.0, run protoc --version

Installing Nemo

  • Run all tests and install: mvn clean install -T 2C
  • Run only unit tests and install: mvn clean install -DskipITs -T 2C

Running Beam applications

Running an external Beam application

  • Use run_external_app.sh instead of run.sh
  • Set the first argument the path to the external Beam application jar
./bin/run_external_app.sh \
    `pwd`/nemo_app/target/bd17f-1.0-SNAPSHOT.jar \
    -job_id mapreduce \
    -executor_json `pwd`/examples/resources/sample_executor_resources.json \
    -user_main MapReduce \
    -user_args "`pwd`/mr_input_data `pwd`/nemo_output/output_data"

Configurable options

  • -job_id: ID of the Beam job
  • -user_main: Canonical name of the Beam application
  • -user_args: Arguments that the Beam application accepts
  • -optimization_policy: Canonical name of the optimization policy to apply to a job DAG in Nemo Compiler
  • -deploy_mode: yarn is supported(default value is local)

Examples

## MapReduce example
./bin/run.sh \
	-job_id mr_default \
	-executor_json `pwd`/examples/resources/sample_executor_resources.json \
	-optimization_policy edu.snu.nemo.compiler.optimizer.policy.DefaultPolicy \
	-user_main edu.snu.nemo.examples.beam.MapReduce \
	-user_args "`pwd`/examples/resources/sample_input_mr `pwd`/examples/resources/sample_output_mr"

## YARN cluster example
./bin/run.sh \
	-deploy_mode yarn \
  	-job_id mr_pado \
	-executor_json `pwd`/examples/resources/sample_executor_resources.json \
  	-user_main edu.snu.nemo.examples.beam.MapReduce \
  	-optimization_policy edu.snu.nemo.compiler.optimizer.policy.PadoPolicy \
  	-user_args "hdfs://v-m:9000/sample_input_mr hdfs://v-m:9000/sample_output_mr"

Resource Configuration

-executor_json command line option can be used to provide a path to the JSON file that describes resource configuration for executors. Its default value is config/default.json, which initializes one of each Transient, Reserved, and Compute executor, each of which has one core and 1024MB memory.

Configurable options

  • num (optional): Number of containers. Default value is 1
  • type: Three container types are supported:
    • Transient : Containers that store eviction-prone resources. When batch jobs use idle resources in Transient containers, they can be arbitrarily evicted when latency-critical jobs attempt to use the resources.
    • Reserved : Containers that store eviction-free resources. Reserved containers are used to reliably store intermediate data which have high eviction cost.
    • Compute : Containers that are mainly used for computation.
  • memory_mb: Memory size in MB
  • capacity: Number of TaskGroups that can be run in an executor. Set this value to be the same as the number of CPU cores of the container.

Examples

[
  {
    "num": 12,
    "type": "Transient",
    "memory_mb": 1024,
    "capacity": 4
  },
  {
    "type": "Reserved",
    "memory_mb": 1024,
    "capacity": 2
  }
]

This example configuration specifies

  • 12 transient containers with 4 cores and 1024MB memory each
  • 1 reserved container with 2 cores and 1024MB memory

Monitoring your job using web UI

Nemo Compiler and Engine can store JSON representation of intermediate DAGs.

  • -dag_dir command line option is used to specify the directory where the JSON files are stored. The default directory is ./dag. Using our online visualizer, you can easily visualize a DAG. Just drop the JSON file of the DAG as an input to it.

Examples

./bin/run.sh \
	-job_id als \
	-executor_json `pwd`/examples/resources/sample_executor_resources.json \
  	-user_main edu.snu.nemo.examples.beam.AlternatingLeastSquare \
  	-optimization_policy edu.snu.nemo.compiler.optimizer.policy.PadoPolicy \
  	-dag_dir "./dag/als" \
  	-user_args "`pwd`/examples/resources/sample_input_als 10 3"

nemo's People

Contributors

differentsc avatar ejjeong avatar gwsshs22 avatar jeongyooneo avatar johnyangk avatar jooykim avatar mhkwon924 avatar sanha avatar seojangho avatar skystar-p avatar taegeonum avatar wonook avatar wynot12 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

nemo's Issues

Make ProcessContext Kinder

Currently, most of the interfaces throw UnsupportedOperationException without any further explanation. Let's leave comments or exception messages. For example, we can say that streaming is currently not supported.

Compiler Interfaces

Compiler-related code is currently intermixed with translator/engine code.
Let's move them into a separate package and introduce APIs around them.

Methods to register new attributes

In order to make Vortex as extensible as possible, the set of attributes used to decide how Runtime executes jobs must be made extensible as well.

Runtime currently has a fixed set of attributes. New attributes must be flexibly added.

Vortex Runtime

This issue is for keeping track of the subtasks

Given the labeled and splitted Vortex DAG, which is processed by the Vortex Compiler (#8), the Vortex Runtime will run the given DAG in a physical level. Its main components and contributions are as followings:

  • Execution
    • Given the information of each Nodes and Edges, stated in #8, the Runtime executes them appropriately.
  • Scheduler
    • Division of each operators into tasks
    • Pipelining
  • Fault-tolerance mechanisms
  • Optimizations

Details and the following sub-issues will be updated.

Sub-issues:

  • Add more sub-issues

Introduce simple backend

Compiler's backend is responsible for converting IR representation into an ExecutionPlan executable by Vortex Runtime. A simple example has been introduced in #43 and we can use this example to introduce a simple version of the backend.

Different types of tasks

There are various types of tasks. (ex. Do, Merge, Partition)
However, tasks must be clearly defined with a thorough review of how varying jobs that can be run using these task definitions.

Vortex Compiler

This issue is for keeping track of the subtasks

The Vortex Compiler is composed of three components: the Frontend, the Optimizer, and the Backend. The structure is similar to the LLVM compiler structure.

We will create a Vortex IR from the given DAG of BEAM program through the Frontend and process and optimize it, to pass it on to the Backend which transforms the Vortex IR into an ExecutionPlan which will be received and processed by the Vortex Runtime (#9).

Vortex IR

The main job of the Vortex Compiler is to label each of the vertices and edges of the DAG with specific attributes including:

Vertex:

  • implemented as classes of:
    • SourceVertex
    • OperatorVertex
      • Transform
        • DoTransform
        • GroupByKeyTransform
        • WindowTransform
  • with attributes:
    • Placement
      • e.g. Reserved/Transient
      • e.g. Storage/Compute
    • Parallelism
      • e.g. # of partitions

Edges:

  • with attributes:
    • EdgeChannel (distinguishes intermediate data location)
      • Local Memory
      • TCP Pipe (push)
      • File (local disk)
      • Distributed Filesystem
    • Relation
      • ScatterGather (M2M)
      • OneToOne (O2O)
      • Broadcast (O2M)

Vortex Labeling & Correctness Check

The placement/labeling algorithm/policy will be pluggable and will be decided by the user, customized for each of the usages and environments. While labeling each nodes and edges, it will also check if there are any anomalies in the DAG. Our previous implementation of transient-reserved-specified Vortex will use the algorithm shown in the paper. The specifications and the details of the policy are shown under the compiler.optimizer.passes package. Each pass receives a DAG and outputs another DAG, tagged with attributes.

Vortex DAG Optimization

The Compiler can further optimize the DAG for it to be run efficiently on the Runtime layer, by adding/merging/removing operators, modifying edges, and tweaking system attributes. (like FlumeJava, etc.)
This would also be ideal to be done during the Runtime.

Vortex Stage Generation

Then, the Compiler Backend splits the DAG into Vortex Stages (#73).

Details and the following sub-issues will be updated.

Sub-issues:

  • Add more sub-issues

  • #12 Sink Node

  • #25 Compiler interfaces

  • #76 new IR

  • #29 Configurable Optimizer

  • #21 Refactor Attributes class

  • #13 Join Node

  • #22 DAG Integrity check

  • #14 Multi-Output Do node

  • #28 VortexBackend

  • #30 Tang to parse user arguments

  • #31 Interfaces for Runtime Optimization

  • #56 More instantiation policies

  • #36 Stream support

Code cleanup and clarification

Currently, the code needs some code cleanup and restructuring, to reflect the changes discussed.
Also, edge attributes need to be added to clarify the DAG.

Multi-window Example

Create an example user code for multi-windowing in edu.snu.vortex.examples.beam.
Please use Java8, as much as possible. :)

Task state management on a state machine

Task states are roughly defined (ex. READY, SCHEDULED, RUNNING, COMPLETE), but state transitions are rather vaguely implemented, without an explicit use of a state machine.

Use a state machine to formally manage task states in Runtime.

Introduce Master/Executor

Create a simple Master/Executor abstractions that exchange messages with each other.
Please refer to the previous version of Vortex on their class structures and types of messages exchanged.
Modify SimpleEngine to use the Master/Executor abstractions.

Implement Beam Result

Currently, all of the Beam Result's APIs throw UnsupportedOperationException. Let's implement the APIs so that they correctly tell the job status.

DAG Integrity Check

Let's perform integrity checks on graphs upon their initialization as well as manipulation.

The checks should include

  • No cycle
  • At least one source node, and one sink node
  • Completely connected
  • No null/incorrect pointers that connect the nodes

As a start, we can inject the checker in the beginning of DAGBuilder#build.

Interfaces for Dynamic Optimization

Currently, we assume that the compiler optimization happens just once, before the job commences.

Let's allow the optimization to happen multiple times, at runtime. We need to carefully think about how the interfaces between different components in the system should change.

The execution flow might look like this: The engine feeds runtime metrics into the compiler optimizer, which outputs a new IR for the compiler backend. The compiler backend then manipulates the JobDAG, with which the engine resumes execution.

Implement Join Node

Support Join in Vortex DAG, and translate Beam's CoGroupByKey into it.
In the PR, please provide Beam program examples for testing the code.

Show build status

We can make Jenkins show our build status, so that we don't miss out on a bad pull request once it is merged with the master branch

Make Optimizer Configurable

The current optimizer statically applies the placement optimization.
Let's make it configurable so that the compiler can apply arbitrary optimizations(i.e., DAG passes) specified by the user.
It might be a good idea to introduce a new package(e.g., edu.snu.vortex.compiler.optimizer.pass) and keep all the pass-related code in it.

Introduce Task

Introduce a simple Task class with the following specifications.

  • It should have states(running, queued, executing, etc)
  • Its state transitions should be explicitly managed in the code (e.g., using a state machine library), or at least its invalid state transitions should be checked
  • It should execute one or more operators (pipelining operators with one-to-one dependencies would be a good start)

Please change SimpleEngine to use your Task class to execute Vortex DAGs. Make sure the engine correctly runs the Beam examples after the change.

pom.xml per Sub-Package

Currently, we have one big pom.xml at the root directory.

Let's use one pom.xml per sub-package(beam/dag/engine) and set up dependencies as the following.

  • beam depends on dag
  • engine depends on dag
  • there should be no other dependencies (e.g., engine should not know about beam, and dag should not know about engine)

Vortex BEAM Translator

This issue is for keeping track of the subtasks

We will need a layer to translate BEAM programs into Vortex DAGs, which follow our style.
These DAGs will later be received and processed by the Vortex Compiler (#8).

Details and the following sub-issues will be updated.

Sub-issues:

  • Study BEAM
  • List out BEAM functionalities to translate for our usages (#)
  • Add more sub-issues

Import REEF

We should import REEF and run our code on top of it to make quite a few things easier :)

Introduce initial ShuffleManager

Currently, intermediate data and their shuffle/broadcast are managed inside SimpleEngine.
Let's extract the related code into a separate sub-package called shuffle and hide the details with APIs.
It'd be great if you could make the APIs flexible and pluggable such that we can use the same code in a distributed environment(REEF), only with different implementations.

Specify Types in Requesting Containers

Specify types(e.g., storage, compute, transient, reserved) in requesting containers to the resource manager(RM).

However, REEF, which we use to communicate with the RM, does not support this. One simple approach that requires minimum modifications to REEF is using node labelling features provided by RMs, simply assuming that datacenter operators statically pre-label each node with its type.

We can add a String field for node label in REEF's EvaluatorRequest, and modify REEF's YARN/Mesos runtimes to use the information appropriately. Then, in Vortex, we can simply set the node label field in EvaluatorRequest when requesting new Evaluators.

I wonder whether node labels can be also used to request for dynamically labelled containers(not nodes) such as Mesos's revocable containers(http://mesos.apache.org/documentation/latest/oversubscription). This might make a good discussion topic in the REEF community.

Scheduling policies for practical use

Runtime has an interface class SchedulingPolicy defined.
A simple and naive, round-robin scheduling policy is currently used in the simple Master's scheduler.

There must be more practical scheduling policies implemented for general use.
Moreover, optimal scheduling policies depending on job characteristics are preferable.

Discuss code structure

We've discussed about the overall architecture of our new version of Vortex, consisting of the translator layer, the compilation layer, and lastly, the runtime layer.
Keeping this in mind, do you have any opinions about our code structure?
@johnyangk @gwsshs22

Implement Sink Node

Translate Beam's Sink Node into Vortex's Sink Node. Please provide Beam program examples for testing the translation.

Implement VortexBackend

Implement VortexBackend, a backend that converts IR representation into an ExecutionPlan executable by Vortex Runtime.
Implement stage partitioning

Implement Multi-Output Do Node

Support Beam's ParDo.UnboundMulti.

My guess is that we will have multiple edges coming out of a single Vortex's Do node, We will then somehow need to match each output to an edge using Beam's multi-output tags.

It might be helpful to take a look at how Beam's side input(which sort of is a multi-input thing) is translated into Vortex's Broadcast.

Web UI

A web ui for visualizing Vortex job executions.

It would be similar to Spark UI in high-level features(e.g., visualizing DAG, progress, faults, tasks, streaming), but different in the elements that construct them(e.g., Optimizer, Task/Channel, State Machines).

Runtime execution plan builder

Runtime execution plan must be generated in compiler's backend.
Runtime must provide APIs to generate the execution plan. This includes runtime's operators, edges and attributes that correspond to those of IR

Support Beam Streaming in Compiler

  • Extract window-related information from Beam operators and save them in Vortex IR
  • Create a simple Beam Streaming example(e.g., WindowedMapReduce) to demonstrate the compiler's support

Use Tang to Parse User Arguments

In the client JobLauncher, we assume that the first argument is the user main class and the rest are the user main arguments.

Let's replace this assumption with Tang. Then, the user will be able to specify other types of configurations(e.g., compiler types, resource types, etc).

VortexJobLauncher in the previous version of Vortex is a good reference for implementing it.

Optimization Pass for PAFAS

  • Traverse the IR DAG and detect multi-window optimization opportunity
  • Optimize the DAG by "compacting" operators and inserting PAFAS-specific operators(e.g., dependency tracking, etc)

Fix Jenkins with BEAM

Currently Jenkins seems like it's having some trouble as it doesn't have BEAM compiled on the machine. I'll try to fix this ASAP to use our CI functionality.

Refactor Attributes Class

It should be explicit that only certain attribute values can be mapped to attribute keys.

We can do it in 2 ways

  1. Using strongly typed attribute hashmaps in node and edge (We need to restructure Attributes class accordingly)
  2. We just go with Attributes.Key->Object hashmaps and check if the mapping is correct or not after a graph pass

Support code/data serialization

The simple engine we have assumes everything runs on a single computer. Thus, it never ser/des anything.

But that's not the case in a distributed environment.

With #15, #16, #17 in place, let's ser/des code/data in message exchanges between master and executors. The translator, and then the compiler should pass down the required class/codec information to the runtime.

Add more instantiation policies

Be sure to:

  • Add examples that transforms the DAG structure itself (See discussions in #53).
    • Operator fusion
  • Add examples for:
    • In-memory Query Execution
    • Dynamic Partitioning

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.