Giter Site home page Giter Site logo

gearpump / gearpump Goto Github PK

View Code? Open in Web Editor NEW
764.0 92.0 153.0 26.82 MB

Lightweight real-time big data streaming engine over Akka

Home Page: https://gearpump.github.io/gearpump/

License: Apache License 2.0

Java 2.00% Scala 78.36% CSS 0.23% JavaScript 13.72% HTML 4.84% Shell 0.86%
akka stream-processing bigdata scala

gearpump's Introduction

Gearpump

Maven Central GitHub license Build Status codecov.io Join the chat at https://gitter.im/gearpump/gearpump

Gearpump is a lightweight real-time big data streaming engine. It is inspired by recent advances in the Akka framework and a desire to improve on existing streaming frameworks.

The name Gearpump is a reference to the engineering term "gear pump", which is a super simple pump that consists of only two gears, but is very powerful at streaming water.

We model streaming within the Akka actor hierarchy.

Per initial benchmarks we are able to process near 18 million messages/second (100 bytes per message) with a 8ms latency on a 4-node cluster.

For steps to reproduce the performance test, please check Performance benchmark.

Useful Resources

How to Build

1). Clone the Gearpump repository

  git clone https://github.com/gearpump/gearpump.git
  cd gearpump

2). Build package

  ## Please use scala 2.12
  ## The target package path: output/target/gearpump-${version}.zip
  sbt clean +assembly +packArchiveZip

After the build, there will be a package file gearpump-${version}.zip generated under output/target/ folder.

To build scala document, use

   ## Will generate the scala doc under target/scala_2.xx/unidoc/
   sbt unidoc

NOTE: The build requires network connection. If you are behind an enterprise proxy, make sure you have set the proxy in your env before running the build commands. For windows:

set HTTP_PROXY=http://host:port
set HTTPS_PROXY= http://host:port

For Linux:

export HTTP_PROXY=http://host:port
export HTTPS_PROXY=http://host:port

How to do style check before submitting a pull request?

Before submitting a PR, you should always run style check first:

  ## Run style check for compile, test, and integration test.
  sbt scalastyle test:scalastyle it:scalastyle

How to generate the license report to generate a list of all dependencies

  sbt dumpLicenseReport

How to generate dependencies by declared license

  sbt dependencyLicenseInfo

Contributors (time order)

License

Gearpump itself is licensed under the Apache License (2.0). For library it used, please see LICENSE.

Acknowledgement

The netty transport code work is based on Apache Storm. Thanks Apache Storm contributors.

The cgroup code work is based on JStorm. Thanks JStorm contributors.

Thanks to Jetbrains for providing a IntelliJ IDEA Free Open Source License.

gearpump's People

Contributors

bitdeli-chef avatar christophse avatar clockfly avatar darionyaphet avatar domsj avatar fabiofumarola avatar felixcheung avatar gy910210 avatar haibaoy avatar haoch avatar he-pin avatar huafengw avatar karol-brejna-i avatar kimutansk avatar kkasravi avatar manuzhang avatar pipsqueakh avatar scala-steward avatar skw1992 avatar smarthi avatar stanleyxu2005 avatar stevepeak avatar whjiang avatar willemjiang avatar yanghua avatar zhaosoap avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

gearpump's Issues

trade throughput with latency

This system should be able to trade throughput with latency.

For a messaging system whose message size is very small, like 10 bytes, it is very hard to reach high throughput. In this case, it will be very good if we can batch smartly.

This system should be able to trade througput with latency.

Connect Kafka with Gearpump

Connect Kafka queue with Gearpump so that Gearpump can consume data from outside.

The key points are:

  1. functionality.
  2. performance.

build error: Undefined resolver 'local'

I'm hitting this issue when building gearpump on Windows but cannot reproduce it on Linux.

java.lang.RuntimeException: Undefined resolver 'local'
        at scala.sys.package$.error(package.scala:27)
        at sbt.IvyActions$$anonfun$publish$1.apply(IvyActions.scala:93)
        at sbt.IvyActions$$anonfun$publish$1.apply(IvyActions.scala:91)
        at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:115)
        at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:115)
        at sbt.IvySbt$$anonfun$withIvy$1.apply(Ivy.scala:103)
        at sbt.IvySbt.sbt$IvySbt$$action$1(Ivy.scala:48)
        at sbt.IvySbt$$anon$3.call(Ivy.scala:57)
        at xsbt.boot.Locks$GlobalLock.withChannel$1(Locks.scala:98)
        at xsbt.boot.Locks$GlobalLock.withChannelRetries$1(Locks.scala:81)
        at xsbt.boot.Locks$GlobalLock$$anonfun$withFileLock$1.apply(Locks.scala:102)
        at xsbt.boot.Using$.withResource(Using.scala:11)
        at xsbt.boot.Using$.apply(Using.scala:10)
        at xsbt.boot.Locks$GlobalLock.ignoringDeadlockAvoided(Locks.scala:62)
        at xsbt.boot.Locks$GlobalLock.liftedTree1$1(Locks.scala:52)
        at xsbt.boot.Locks$GlobalLock.withLock(Locks.scala:52)
        at xsbt.boot.Locks$.apply0(Locks.scala:31)
        at xsbt.boot.Locks$.apply(Locks.scala:28)
        at sbt.IvySbt.withDefaultLogger(Ivy.scala:57)
        at sbt.IvySbt.withIvy(Ivy.scala:98)
        at sbt.IvySbt.withIvy(Ivy.scala:94)
        at sbt.IvySbt$Module.withModule(Ivy.scala:115)
        at sbt.IvyActions$.publish(IvyActions.scala:91)
        at sbt.Classpaths$$anonfun$publishTask$1.apply(Defaults.scala:1173)
        at sbt.Classpaths$$anonfun$publishTask$1.apply(Defaults.scala:1172)
        at scala.Function3$$anonfun$tupled$1.apply(Function3.scala:35)
        at scala.Function3$$anonfun$tupled$1.apply(Function3.scala:34)
        at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
        at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
        at sbt.std.Transform$$anon$4.work(System.scala:64)
        at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
        at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
        at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
        at sbt.Execute.work(Execute.scala:244)
        at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
        at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
        at sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
        at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
[error] (core/*:publishLocal) Undefined resolver 'local'

ActorInitializationException while running wordCount in local mode

Was trying to run the wordcount example in local mode, followed the instructions as listed in README.md, seeing the below exception. This happens because JAVA_HOME is not set and ideally one would expect either the actors to not startup or expect a more graceful shutdown due to JAVA_HOME not being set.

[INFO] [11/05/2014 00:00:50.923] [Worker-1] Executor is down app1-executor-1
[INFO] [11/05/2014 00:00:50.924] [Scheduler] Resource update id: 349103899, slots: 100....
[ERROR] [11/05/2014 00:00:50.923] [master-akka.actor.default-dispatcher-2] [akka://master/user/Worker1/app1-executor-1] null
akka.actor.ActorInitializationException: exception during creation
    at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
    at akka.actor.ActorCell.create(ActorCell.scala:596)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at akka.util.Reflect$.instantiate(Reflect.scala:66)
    at akka.actor.ArgsReflectConstructor.produce(Props.scala:352)
    at akka.actor.Props.newActor(Props.scala:252)
    at akka.actor.ActorCell.newActor(ActorCell.scala:552)
    at akka.actor.ActorCell.create(ActorCell.scala:578)
    ... 9 more
Caused by: java.io.IOException: Cannot run program "null/bin/java": error=2, No such file or directory
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1041)
    at scala.sys.process.ProcessBuilderImpl$Simple.run(ProcessBuilderImpl.scala:68)
    at scala.sys.process.ProcessBuilderImpl$AbstractBuilder.run(ProcessBuilderImpl.scala:99)
    at scala.sys.process.ProcessBuilderImpl$AbstractBuilder.run(ProcessBuilderImpl.scala:98)
    at org.apache.gearpump.cluster.Worker$ExecutorWatcher.<init>(Worker.scala:181)
    ... 18 more
Caused by: java.io.IOException: error=2, No such file or directory
    at java.lang.UNIXProcess.forkAndExec(Native Method)
    at java.lang.UNIXProcess.<init>(UNIXProcess.java:135)
    at java.lang.ProcessImpl.start(ProcessImpl.java:130)
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1022)
    ... 22 more

Add more metrics, Graph visualization, and UI dashboard

sub issues:

  • #269 Define UI Dashboard structure
  • #270 Create a force directed widget
  • #280 Add websockets to REST api
  • #285 Integrate frontend dashboard into sbt build
  • #287 Enable a topic based event bus on master
  • #289 Ability to Configure Flows / DAG using NoFlo based JSON graph definition
  • #422 create a ui for master informations
  • #433 Remove npm, bower dependencies. Generate bower dependencies dynamically.
  • #434 Add api/v1.0/ prefix for all REST related calls
  • #449 Remove external resource
  • #453 Add rest service for master information
  • #456 Create a widget to show visjs graph

CLI options do not override values in config file application.conf

  1. start local
    >target/pack/bin/local -port 8091
  2. start wordcount
    >target/pack/bin/wordcount -master 127.0.0.1:8091
  3. note errors
    [INFO] [09/17/2014 08:55:49.394] [ActorSystemBooter] ActorSystem app0-executor-1 Binding life cycle with actor: Actor[akka.tcp://[email protected]:8091/user/Worker1#-680917607] [INFO] [09/17/2014 08:55:49.439] [ActorSystemBooter] creating actor masterproxy [INFO] [09/17/2014 08:55:49.443] [MasterProxy] Master Proxy is started... [INFO] [09/17/2014 08:55:49.444] [AppManager] Master proxy is created, create appmaster... [WARN] [09/17/2014 08:55:49.456] [app0-executor-1-akka.actor.default-dispatcher-10] [akka.tcp://[email protected]:52822/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fmaster%40127.0.0.1%3A3002-3/endpointWriter] AssociationError [akka.tcp://[email protected]:52822] -> [akka.tcp://[email protected]:3002]: Error [Invalid address: akka.tcp://[email protected]:3002] [ akka.remote.InvalidAssociation: Invalid address: akka.tcp://[email protected]:3002 Caused by: akka.remote.transport.Transport$InvalidAssociationException: Connection refused: /127.0.0.1:3002 ] [WARN] [09/17/2014 08:55:49.456] [app0-executor-1-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:52822/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fmaster%40127.0.0.1%3A3001-2/endpointWriter] AssociationError [akka.tcp://[email protected]:52822] -> [akka.tcp://[email protected]:3001]: Error [Invalid address: akka.tcp://[email protected]:3001] [ akka.remote.InvalidAssociation: Invalid address: akka.tcp://[email protected]:3001 Caused by: akka.remote.transport.Transport$InvalidAssociationException: Connection refused: /127.0.0.1:3001 ] [WARN] [09/17/2014 08:55:49.456] [app0-executor-1-akka.actor.default-dispatcher-3] [akka.tcp://[email protected]:52822/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fmaster%40127.0.0.1%3A3000-1/endpointWriter] AssociationError [akka.tcp://[email protected]:52822] -> [akka.tcp://[email protected]:3000]: Error [Invalid address: akka.tcp://[email protected]:3000] [ akka.remote.InvalidAssociation: Invalid address: akka.tcp://[email protected]:3000 Caused by: akka.remote.transport.Transport$InvalidAssociationException: Connection refused: /127.0.0.1:3000 ] [INFO] [09/17/2014 08:55:49.463] [ActorSystemBooter] creating actor appmaster [WARN] [09/17/2014 08:55:49.463] [app0-executor-1-akka.actor.default-dispatcher-10] [Remoting] Tried to associate with unreachable remote address [akka.tcp://[email protected]:3000]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /127.0.0.1:3000 [INFO] [09/17/2014 08:55:49.465] [AppMaster] AppMaster[0] is launched AppDescription(wordCount,org.apache.gearpump.util.Configs@1c4ef130,([TaskDescription(class org.apache.gearpump.streaming.examples.wordcount.Split,4), TaskDescription(class org.apache.gearpump.streaming.examples.wordcount.Sum,4)], [Edge(org.apache.gearpump.partitioner.HashPartitioner@16cda83c)=(TaskDescription(class org.apache.gearpump.streaming.examples.wordcount.Split,4),TaskDescription(class org.apache.gearpump.streaming.examples.wordcount.Sum,4))])) [WARN] [09/17/2014 08:55:49.466] [app0-executor-1-akka.actor.default-dispatcher-10] [Remoting] Tried to associate with unreachable remote address [akka.tcp://[email protected]:3001]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /127.0.0.1:3001 [WARN] [09/17/2014 08:55:49.466] [app0-executor-1-akka.actor.default-dispatcher-10] [Remoting] Tried to associate with unreachable remote address [akka.tcp://[email protected]:3002]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /127.0.0.1:3002 [INFO] [09/17/2014 08:55:49.476] [AppMaster] AppMaster is launched xxxxxxxxxxxxxxxxx [INFO] [09/17/2014 08:55:49.477] [MasterProxy] get unknown message , stashing RegisterAppMaster [INFO] [09/17/2014 08:55:51.485] [MasterProxy] get unknown message , stashing RegisterAppMaster [INFO] [09/17/2014 08:55:53.495] [MasterProxy] get unknown message , stashing RegisterAppMaster

Provide application argument defaults as prior version did

Currently invoking
target/pack/bin/local -port 8092 -workernum 4
Results in a usage error
Usage:
java class org.apache.gearpump.cluster.main.Local$ -port -sameprocess -workernum
fail to parse arguments...

because default arguments are no longer provided when defining options. Options for local, master, workcount, sol, etc should allow additional attributes like default, required and should probably also look/accept a config file in case the user wishes to provide a .conf file rather than command line args. These additional arguments should also be reflected in the usage message.

Add schema to message

The message should have:

  1. timestamp binded, the timestamp can be system generated, or user provided.
  2. a json like format to pass between tasks.

Followup on issue #37 rest service support

  1. maybe move rest service to a standalone scala.
I feel this should not be a part of Local.

Maybe we should move it to a standalone command. Like Rest.scala. And then we can create a shell command like rest.sh.

Usage like this:
rest.sh -master "host1:port1,host2:port2,host3,port3"

then we will create a rest service. It wil route the comand to real master.

We can get a master ref with the method we used in ClientContext.scala

class ClientContext(masters: Iterable[HostPort]) {

  private implicit val timeout = Timeout(5, TimeUnit.SECONDS)
  val system = ActorSystem("client", Configs.SYSTEM_DEFAULT_CONFIG)

  val master = system.actorOf(Props(classOf[MasterProxy], masters), MASTER)

maybe add a interface in master to query a list of application ids so that rest service can query to get a list of app id?

AppMasterRequestData(appId) better? when master receive the call, it may returns:
a) appId, attemptId b) appMaster address (worker address and appmaster actorRef)

REST interface to submit a dag by JSON representation

from kam

kkasravi commented 4 hours ago
Design document is here which discusses how a gearpump grammar can be developed with examples. It is not yet complete but will be the basis for the design when complete.

Allow adaptive cluster control

It is better that if we can allow the number of executors (level of parallelism) dynamically adjustable. That is, application/framework can automatically adjust its parallelism based on input characteristics.

Support pluggable application scheduling

Task scheduling should be as flexible as possible.

Some user may want to schedule based on resource usage, like memory, cpu, or disk.
Other use case may require to only schedule one task per machine,
Other use cases may want to schedule to same nodes before the topology is restarted.
Other use cases may want to schedule all tasks on same machine, so that we can have max network bandwidth...

There are lots of different requirement, the best thing we can do is to provide a general representation of tasks, and resources, and pass them to user provided scheduler.

Investigate and support uploading custom jars and distribute it across the clusters

We need a safe and efficient way to distribute a jar to the cluster.

Please investigate how spark and storm distribute their jars.

Some requirements:

  1. The jar storage interface must be pluggable. User can choose to store it iin HDFS, kafka, or sharing with bt protocol.
  2. We don't want to introduce heavy binary dependency on other storage system, like HDFS.

As default implementation, I would prefer BT. It is very flexible.

add multi-topic and grouping support to Kafka spout

Currently, all Kafka spouts consume from the same topic configured by user. Our system, however, may read from multiple topics. We need to support that.

Partitions are evenly distributed among spouts now. For 2 spouts and 4 partitions, inputs from partition0 and partition2 go to spout0 and those from partition1 and partitions3 to spout1. When there are multiple topics, how should we group the partitions of topics among spouts. We could add an interface and provide default grouping strategies.

Integrate Gear scheduler with YARN

YARN resource manager and Gearpump Master need to integrate. It may be that Gearpump Master provides a virtual resource allocation which is sync'd with actual resources allocated by YARN RM. This would allow us to include attributes not yet in YARN like SLA's.

Requirements/Features for YARN Client and Appmaster (WIP)

  • Support hadoop secure mode
  • Use HDFS to package and distribute the binaries
  • Allow logging to occur within YARN working directory
  • Configuration file needs to indicate how many workers to allocate with possible locations
  • Configuration needs to be able to handle Master HA creation and deployment.
  • Configuration file needs to specify minimum master ram, vmcore requirements
  • Configuration file needs to specify minimum worker ram, vmcore requirements
  • Configuration file should specify where in HDFS to place jars for appmaster and workers
  • Client needs to use YARN cluster API to find best nodes to run Master(s)
  • AppMaster needs to use YARN cluster API to find best nodes to run Workers
  • Need to be able to save configuration state so RM killing and restarting AppMaster works.

Error recovery when target worker don't have enough resource to launch appmaster or executor

In AppManager, it first will ask for a worker, then it will try to launch the executor on that worker. The worker may reject the launch due to resource limit or other reasons. The AppManager need to handle this situation, and ask for a new worker to launch appmaster.

Please check the following TODO in AppManager.scala

AppManager.scala

def waitForActorSystemToStart(worker : ActorRef, masterConfig : Configs) : Receive = {
  case ExecutorLaunchRejected(reason, ex) =>
    LOG.error(s"Executor Launch failed reason:$reason", ex)
    //TODO: ask master to allocate new resources and start appmaster on new node.
    context.stop(self)

TaskActor loading should allow a TaskConfiguration step for those jars which cannot be transmitted inline as part of a TaskDescription

As part of #89, a TaskDescription allowed an inline jar to be transmitted as part of the AppDescription message to the worker. However we also need to consider Tasks in which the TaskActors and related classes cannot be transmitted inline due to size restrictions. In this case a pre-step of 'TaskConfiguration' needs to be completed by the worker where the TaskActor jar is retrieved from stable storage somewhere. Spark, Samza and Hadoop YARN all provide solutions in this area and should be examined for the best solution. This task can be considered part 2 of allowing a remote worker to dynamically load a taskactor prior to running the Task.

JSON serialization needs to handle AppMasterInfo

Currently this is just converted to a string but needs to be deserialized by using annotations. Currently information return for /appmaster is
{"appId":0,"executorId":-1,"appData":"AppMasterInfo(Actor[akka://master/user/Worker0#1386979283])"}

Dynamic DAG

After a client submit's an AppDescription, the client should be able to modify the DAG by adding or removing nodes which then results in the AppMaster updating the workers, executors with this information. Details such as what happens to currently running tasks and how new tasks are added should be covered here within a design doc. Additional details should also be covered:

  1. Whether tasks can be removed and resulting behaviors.
  2. Whether the partitioner can be replaced or modified.
  3. Whether existing task parallelism hints can be updated.

Task list:

  • pre-calculate the dependencies information in AppMaster, and push the subscription info to tasks.
  • ClockService allow to add or remove a processor.
  • Add dynamic subscribe/unsubscribe interface to TaskActor
  • Handle message loss
  • UI support dynamic dag update
  • Allow DAG modification by config file.
  • multiple version DAG
  • akka user interface for dag udpate
  • rest api for dag upate.
  • multiple version tasks
  • task manager state transition during dyanmic dag
  • unique reprenstation of dag
  • refactor task scheduler to remove dependencies on DAG.
  • refactor task scheduler to handle dynamic DAG change.
  • Add DAGManager to manage the DAG change
  • Allow dynamically change resource used by an executor.

Add storage layer

We should be allowed to read history from storage to do join.
We should be able to checkpoint to storage.

Build error, unable to resolve akka-data-replication

Seeing the below error when running 'sbt clean pack'. Tried moving 'patrik' to the top of resolvers in Build.scala to no avail. This happens with all versions of akka-data-replication.

[info] Resolving com.github.patriknw#akka-data-replication_2.10;0.7 ...
[warn]  module not found: com.github.patriknw#akka-data-replication_2.10;0.7
[warn] ==== local: tried
[warn]   /Users/smarthi/.ivy2/local/com.github.patriknw/akka-data-replication_2.10/0.7/ivys/ivy.xml
[warn] ==== public: tried
[warn]   http://repo1.maven.org/maven2/com/github/patriknw/akka-data-replication_2.10/0.7/akka-data-replication_2.10-0.7.pom
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[warn]  ::::::::::::::::::::::::::::::::::::::::::::::
[warn]  ::          UNRESOLVED DEPENDENCIES         ::
[warn]  ::::::::::::::::::::::::::::::::::::::::::::::
[warn]  :: com.github.patriknw#akka-data-replication_2.10;0.7: not found
[warn]  ::::::::::::::::::::::::::::::::::::::::::::::
sbt.ResolveException: unresolved dependency: com.github.patriknw#akka-data-replication_2.10;0.7: not found
    at sbt.IvyActions$.sbt$IvyActions$$resolve(IvyActions.scala:217)
    at sbt.IvyActions$$anonfun$update$1.apply(IvyActions.scala:126)
    at sbt.IvyActions$$anonfun$update$1.apply(IvyActions.scala:125)
    at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:115)
    at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:115)
    at sbt.IvySbt$$anonfun$withIvy$1.apply(Ivy.scala:103)
    at sbt.IvySbt.sbt$IvySbt$$action$1(Ivy.scala:48)
    at sbt.IvySbt$$anon$3.call(Ivy.scala:57)
    at xsbt.boot.Locks$GlobalLock.withChannel$1(Locks.scala:98)
    at xsbt.boot.Locks$GlobalLock.xsbt$boot$Locks$GlobalLock$$withChannelRetries$1(Locks.scala:81)
    at xsbt.boot.Locks$GlobalLock$$anonfun$withFileLock$1.apply(Locks.scala:102)
    at xsbt.boot.Using$.withResource(Using.scala:11)
    at xsbt.boot.Using$.apply(Using.scala:10)
    at xsbt.boot.Locks$GlobalLock.ignoringDeadlockAvoided(Locks.scala:62)
    at xsbt.boot.Locks$GlobalLock.withLock(Locks.scala:52)
    at xsbt.boot.Locks$.apply0(Locks.scala:31)
    at xsbt.boot.Locks$.apply(Locks.scala:28)
    at sbt.IvySbt.withDefaultLogger(Ivy.scala:57)
    at sbt.IvySbt.withIvy(Ivy.scala:98)
    at sbt.IvySbt.withIvy(Ivy.scala:94)
    at sbt.IvySbt$Module.withModule(Ivy.scala:115)
    at sbt.IvyActions$.update(IvyActions.scala:125)
    at sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala:1223)
    at sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala:1221)
    at sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$74.apply(Defaults.scala:1244)
    at sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$74.apply(Defaults.scala:1242)
    at sbt.Tracked$$anonfun$lastOutput$1.apply(Tracked.scala:35)
    at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1246)
    at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1241)
    at sbt.Tracked$$anonfun$inputChanged$1.apply(Tracked.scala:45)
    at sbt.Classpaths$.cachedUpdate(Defaults.scala:1249)
    at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1214)
    at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1192)
    at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
    at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
    at sbt.std.Transform$$anon$4.work(System.scala:64)
    at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
    at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
    at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
    at sbt.Execute.work(Execute.scala:244)
    at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
    at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
    at sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
    at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
[error] (gearpump-core/*:update) sbt.ResolveException: unresolved dependency: com.github.patriknw#akka-data-replication_2.10;0.7: not found

TaskActors need to be dynamically loaded via transmitted jar file prior or as part of submitting AppDescription

Currently TaskDescription assumes Actor's have already been loaded into the receiving executor. In order to support remote DAG deployment we need to transmit a JAR prior to sending a TaskDescription and make sure the class loader does the actor creation when the TaskDescription is received. This will require new message types and protocol between Master, Client and AppMaster so this JAR is routed correctly as a byte buffer and then loaded. A design should accompany this issue detailing how this works.

exception when running local.

  1. target/pack/bin/local -port 8092 -workernum 4
  2. target/pack/bin/wordcount -ip 192.168.1.14 -port 8092 -split 4 -sum 4 -runseconds 20

exception is
[ERROR] [09/03/2014 14:20:06.120] [Worker] Executor exit with errors
java.lang.InstantiationException: org.apache.gearpump.util.ActorSystemBooter
at java.lang.Class.newInstance(Class.java:359)
at org.apache.gearpump.cluster.Worker$ExecutorWatcher$$anon$1$$anonfun$exitValue$1.apply(Worker.scala:127)
at org.apache.gearpump.cluster.Worker$ExecutorWatcher$$anon$1$$anonfun$exitValue$1.apply(Worker.scala:125)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[INFO] [09/03/2014 14:20:06.125] [Worker1] [1] An executor dies akka://Worker3/remote/akka.tcp/[email protected]:8092/user/master/Worker1/app0-executor-1....
[INFO] [09/03/2014 14:20:06.125] [Worker1] [1] Reclaiming resource 1....

Latest merge (commit acfc9a23c99d5c8d643c868c6fc7e382943253b9) fails to run either so-l or word-count

Output from so-l is below

[INFO] [10/07/2014 10:25:26.923] [Client] netty client connect to netty-client-HostPort(192.168.1.14,64252), tries: 0, hostPort: HostPort(192.168.1.14,64252)
[ERROR] [10/07/2014 10:25:26.943] [app0-executor2-akka.actor.default-dispatcher-4] [akka://app0-executor2/user/netty-server] Encountered unregistered class ID: 2116730745
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 2116730745
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at com.romix.akka.serialization.kryo.KryoBasedSerializer.fromBinary(KryoSerializer.scala:394)
at akka.serialization.Serializer$class.fromBinary(Serializer.scala:60)
at com.romix.akka.serialization.kryo.KryoBasedSerializer.fromBinary(KryoSerializer.scala:368)
at org.apache.gearpump.serializer.FastKryoSerializer.deserialize(FastKryoSerializer.scala:33)
at org.apache.gearpump.transport.netty.Server$$anonfun$msgHandler$1$$anonfun$applyOrElse$3$$anonfun$apply$1.apply(Server.scala:66)
at org.apache.gearpump.transport.netty.Server$$anonfun$msgHandler$1$$anonfun$applyOrElse$3$$anonfun$apply$1.apply(Server.scala:65)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.gearpump.transport.netty.Server$$anonfun$msgHandler$1$$anonfun$applyOrElse$3.apply(Server.scala:65)
at org.apache.gearpump.transport.netty.Server$$anonfun$msgHandler$1$$anonfun$applyOrElse$3.apply(Server.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.gearpump.transport.netty.Server$$anonfun$msgHandler$1.applyOrElse(Server.scala:59)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.gearpump.transport.netty.Server.aroundReceive(Server.scala:33)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Add REST layer

REST layer initially should be to expose some aspects of workers, executors that could be also changed (eg GET, POST, PUT). This will be a basis for determining how to automate the REST/spray layer for other applications. This initial REST layer is largely to enable an admin API.

Support MillWheel-like User interface

MillWheel support following user API:

class Computation {
   void ProcessRecord(Record data);
   void ProcessTimer(Timer timer);

   void setTimer(String tag, int64 time);
   void ProduceRecord(Record data, String stream);

   StateType MutablePersistentState();
}

I think it is worthwhile to implement some similar interface for user to use. This can simplify usage.

Examples need to be refactored to use a general command 'gear' that can run any of the examples and include the example jar

Currently we have 4 examples which each have their own command and load all classes into their classpath under all of gearpump. Additionally local, master and worker all run with examples/target/gearpump-examples-0.1.jar in their classpath. This was due to not dynamically loading a TaskActor's jar in the worker which required a global classpath. We should now refactor the examples so they can be run like:
gear app -jar example.jar example.main
so for example wordcount would be run as
gear app -jar examples/target/gearpump-examples-0.1.jar org.apache.gearpump.streaming.examples.wordcount.WordCount

if possible each example could have its own jar - that way we
could add the main class to the jar manifest and we would only need to run something like
gear app -jar examples/target/wordcount-0.1.jar
This is similar to how one can do java -jar somejar.jar and java looks for the main in the manifest.mf.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.