Giter Site home page Giter Site logo

yahoo / tensorflowonspark Goto Github PK

View Code? Open in Web Editor NEW
3.9K 283.0 945.0 9.25 MB

TensorFlowOnSpark brings TensorFlow programs to Apache Spark clusters.

License: Apache License 2.0

Python 80.52% Shell 1.70% Scala 17.78%
tensorflow spark yahoo machine-learning cluster featured python scala

tensorflowonspark's Introduction

TensorFlowOnSpark

TensorFlowOnSpark brings scalable deep learning to Apache Hadoop and Apache Spark clusters.

Build Status Package Downloads Documentation

By combining salient features from the TensorFlow deep learning framework with Apache Spark and Apache Hadoop, TensorFlowOnSpark enables distributed deep learning on a cluster of GPU and CPU servers.

It enables both distributed TensorFlow training and inferencing on Spark clusters, with a goal to minimize the amount of code changes required to run existing TensorFlow programs on a shared grid. Its Spark-compatible API helps manage the TensorFlow cluster with the following steps:

  1. Startup - launches the Tensorflow main function on the executors, along with listeners for data/control messages.
  2. Data ingestion
    • InputMode.TENSORFLOW - leverages TensorFlow's built-in APIs to read data files directly from HDFS.
    • InputMode.SPARK - sends Spark RDD data to the TensorFlow nodes via a TFNode.DataFeed class. Note that we leverage the Hadoop Input/Output Format to access TFRecords on HDFS.
  3. Shutdown - shuts down the Tensorflow workers and PS nodes on the executors.

Table of Contents

Background

TensorFlowOnSpark was developed by Yahoo for large-scale distributed deep learning on our Hadoop clusters in Yahoo's private cloud.

TensorFlowOnSpark provides some important benefits (see our blog) over alternative deep learning solutions.

  • Easily migrate existing TensorFlow programs with <10 lines of code change.
  • Support all TensorFlow functionalities: synchronous/asynchronous training, model/data parallelism, inferencing and TensorBoard.
  • Server-to-server direct communication achieves faster learning when available.
  • Allow datasets on HDFS and other sources pushed by Spark or pulled by TensorFlow.
  • Easily integrate with your existing Spark data processing pipelines.
  • Easily deployed on cloud or on-premise and on CPUs or GPUs.

Install

TensorFlowOnSpark is provided as a pip package, which can be installed on single machines via:

# for tensorflow>=2.0.0
pip install tensorflowonspark

# for tensorflow<2.0.0
pip install tensorflowonspark==1.4.4

For distributed clusters, please see our wiki site for detailed documentation for specific environments, such as our getting started guides for single-node Spark Standalone, YARN clusters and AWS EC2. Note: the Windows operating system is not currently supported due to this issue.

Usage

To use TensorFlowOnSpark with an existing TensorFlow application, you can follow our Conversion Guide to describe the required changes. Additionally, our wiki site has pointers to some presentations which provide an overview of the platform.

Note: since TensorFlow 2.x breaks API compatibility with TensorFlow 1.x, the examples have been updated accordingly. If you are using TensorFlow 1.x, you will need to checkout the v1.4.4 tag for compatible examples and instructions.

API

API Documentation is automatically generated from the code.

Contribute

Please join the TensorFlowOnSpark user group for discussions and questions. If you have a question, please review our FAQ before posting.

Contributions are always welcome. For more information, please see our guide for getting involved.

License

The use and distribution terms for this software are covered by the Apache 2.0 license. See LICENSE file for terms.

tensorflowonspark's People

Contributors

anfeng avatar anttisaukko avatar bbshetty avatar dependabot[bot] avatar dratini6 avatar elyast avatar eordentlich avatar junshi15 avatar leewyang avatar qsbao avatar ssheikholeslami avatar tmielika avatar tobiajo avatar varunkmohan avatar viirya avatar viplav avatar wangyum avatar winston-zillow avatar yileic avatar zhe-thoughts avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

tensorflowonspark's Issues

ssl problem when installing pip

I am installing TensorFlowOnSpark on a Google dataproc which also ready have Python and Openssl installed.
I followed the guidance for Yarn cluster and met these question when running get-pip.py

pip is configured with locations that require TLS/SSL, however the ssl module in Python is not available.
Collecting pip
  Could not fetch URL https://pypi.python.org/simple/pip/: There was a problem confirming the ssl certificate: Can't connect to HTTPS URL because the SSL module is not available. - skipping
  Could not find a version that satisfies the requirement pip (from versions: )
No matching distribution found for pip

CIFAR job with 4 Spark Executors and above crashes

I am successfully able to run CIFAR test with and without RDMA on a sinlge Node containig 2 GPUs to 2 Nodes containing a total of 4 GPUs. However when I scale it out even further, I get an unknown error pertaining to HDFS. Kindly check out the log below and see if it makes sense:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/filecache/12/__spark_libs__6493683702299857357.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/javed.19/git-pull/finished/HiBench/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
17/05/18 03:13:51 INFO util.SignalUtils: Registered signal handler for TERM
17/05/18 03:13:51 INFO util.SignalUtils: Registered signal handler for HUP
17/05/18 03:13:51 INFO util.SignalUtils: Registered signal handler for INT
17/05/18 03:13:52 INFO yarn.ApplicationMaster: Preparing Local resources
17/05/18 03:13:52 INFO yarn.ApplicationMaster: Prepared Local resources Map(pyspark.zip -> resource { scheme: "hdfs" host: "gpu09" port: 9000 file: "/user/javed.19/.sparkStaging/application_1495091612438_0001/pyspark.zip" } size: 440385 timestamp: 1495091625707 type: FILE visibility: PRIVATE, __spark_libs__ -> resource { scheme: "hdfs" host: "gpu09" port: 9000 file: "/user/javed.19/.sparkStaging/application_1495091612438_0001/__spark_libs__6493683702299857357.zip" } size: 192670368 timestamp: 1495091625513 type: ARCHIVE visibility: PRIVATE, __spark_conf__ -> resource { scheme: "hdfs" host: "gpu09" port: 9000 file: "/user/javed.19/.sparkStaging/application_1495091612438_0001/__spark_conf__.zip" } size: 85884 timestamp: 1495091626034 type: ARCHIVE visibility: PRIVATE, py4j-0.10.3-src.zip -> resource { scheme: "hdfs" host: "gpu09" port: 9000 file: "/user/javed.19/.sparkStaging/application_1495091612438_0001/py4j-0.10.3-src.zip" } size: 91275 timestamp: 1495091625776 type: FILE visibility: PRIVATE, tfspark.zip -> resource { scheme: "hdfs" host: "gpu09" port: 9000 file: "/user/javed.19/.sparkStaging/application_1495091612438_0001/tfspark.zip" } size: 19385 timestamp: 1495091625849 type: FILE visibility: PRIVATE, cifar10.zip -> resource { scheme: "hdfs" host: "gpu09" port: 9000 file: "/user/javed.19/.sparkStaging/application_1495091612438_0001/cifar10.zip" } size: 21343 timestamp: 1495091625938 type: FILE visibility: PRIVATE, Python -> resource { scheme: "hdfs" host: "gpu09" port: 9000 file: "/Python.zip" } size: 148009343 timestamp: 1495091620084 type: ARCHIVE visibility: PUBLIC)
17/05/18 03:13:52 INFO yarn.ApplicationMaster: ApplicationAttemptId: appattempt_1495091612438_0001_000001
17/05/18 03:13:52 INFO spark.SecurityManager: Changing view acls to: javed.19
17/05/18 03:13:52 INFO spark.SecurityManager: Changing modify acls to: javed.19
17/05/18 03:13:52 INFO spark.SecurityManager: Changing view acls groups to: 
17/05/18 03:13:52 INFO spark.SecurityManager: Changing modify acls groups to: 
17/05/18 03:13:52 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(javed.19); groups with view permissions: Set(); users  with modify permissions: Set(javed.19); groups with modify permissions: Set()
17/05/18 03:13:52 INFO yarn.ApplicationMaster: Starting the user application in a separate Thread
17/05/18 03:13:52 INFO yarn.ApplicationMaster: Waiting for spark context initialization
17/05/18 03:13:52 INFO yarn.ApplicationMaster: Waiting for spark context initialization ... 
17/05/18 03:13:53 INFO spark.SparkContext: Running Spark version 2.0.2
17/05/18 03:13:53 INFO spark.SecurityManager: Changing view acls to: javed.19
17/05/18 03:13:53 INFO spark.SecurityManager: Changing modify acls to: javed.19
17/05/18 03:13:53 INFO spark.SecurityManager: Changing view acls groups to: 
17/05/18 03:13:53 INFO spark.SecurityManager: Changing modify acls groups to: 
17/05/18 03:13:53 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(javed.19); groups with view permissions: Set(); users  with modify permissions: Set(javed.19); groups with modify permissions: Set()
17/05/18 03:13:53 INFO util.Utils: Successfully started service 'sparkDriver' on port 36823.
17/05/18 03:13:53 INFO spark.SparkEnv: Registering MapOutputTracker
17/05/18 03:13:53 INFO spark.SparkEnv: Registering BlockManagerMaster
17/05/18 03:13:53 INFO storage.DiskBlockManager: Created local directory at /tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/blockmgr-d5c580a9-2b04-4b3c-b4df-4a06e7372741
17/05/18 03:13:53 INFO memory.MemoryStore: MemoryStore started with capacity 366.3 MB
17/05/18 03:13:53 INFO spark.SparkEnv: Registering OutputCommitCoordinator
17/05/18 03:13:53 INFO util.log: Logging initialized @2194ms
17/05/18 03:13:53 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
17/05/18 03:13:53 INFO server.Server: jetty-9.2.z-SNAPSHOT
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6321bc55{/jobs,null,AVAILABLE}
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3d283b81{/jobs/json,null,AVAILABLE}
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@122dc555{/jobs/job,null,AVAILABLE}
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@701070ff{/jobs/job/json,null,AVAILABLE}
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1e3db9bc{/stages,null,AVAILABLE}
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@62357dc9{/stages/json,null,AVAILABLE}
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1ece0bb7{/stages/stage,null,AVAILABLE}
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6d2a1719{/stages/stage/json,null,AVAILABLE}
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1e29b359{/stages/pool,null,AVAILABLE}
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@57bb4e60{/stages/pool/json,null,AVAILABLE}
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@39467493{/storage,null,AVAILABLE}
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4e92e7d{/storage/json,null,AVAILABLE}
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5789f6c2{/storage/rdd,null,AVAILABLE}
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5a4c7a1d{/storage/rdd/json,null,AVAILABLE}
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@50247f2b{/environment,null,AVAILABLE}
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1d733994{/environment/json,null,AVAILABLE}
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@554e31e{/executors,null,AVAILABLE}
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3cf1ed3b{/executors/json,null,AVAILABLE}
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4fdf10a9{/executors/threadDump,null,AVAILABLE}
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4be42f5f{/executors/threadDump/json,null,AVAILABLE}
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@690a792e{/static,null,AVAILABLE}
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5a21c901{/,null,AVAILABLE}
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6d4cc0b4{/api,null,AVAILABLE}
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7b88495{/stages/stage/kill,null,AVAILABLE}
17/05/18 03:13:53 INFO server.ServerConnector: Started ServerConnector@7995f9ac{HTTP/1.1}{0.0.0.0:43181}
17/05/18 03:13:53 INFO server.Server: Started @2290ms
17/05/18 03:13:53 INFO util.Utils: Successfully started service 'SparkUI' on port 43181.
17/05/18 03:13:53 INFO ui.SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.3.1.12:43181
17/05/18 03:13:53 INFO cluster.YarnClusterScheduler: Created YarnClusterScheduler
17/05/18 03:13:53 INFO cluster.SchedulerExtensionServices: Starting Yarn extension services with app application_1495091612438_0001 and attemptId Some(appattempt_1495091612438_0001_000001)
17/05/18 03:13:53 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 49987.
17/05/18 03:13:53 INFO netty.NettyBlockTransferService: Server created on 10.3.1.12:49987
17/05/18 03:13:53 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.3.1.12, 49987)
17/05/18 03:13:53 INFO storage.BlockManagerMasterEndpoint: Registering block manager 10.3.1.12:49987 with 366.3 MB RAM, BlockManagerId(driver, 10.3.1.12, 49987)
17/05/18 03:13:53 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.3.1.12, 49987)
17/05/18 03:13:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@43822b7b{/metrics/json,null,AVAILABLE}
17/05/18 03:13:53 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(spark://[email protected]:36823)
17/05/18 03:13:53 INFO client.RMProxy: Connecting to ResourceManager at gpu09/10.3.1.9:8030
17/05/18 03:13:53 INFO yarn.YarnRMClient: Registering the ApplicationMaster
17/05/18 03:13:53 INFO yarn.YarnAllocator: Will request 4 executor containers, each with 1 cores and 12390 MB memory including 1126 MB overhead
17/05/18 03:13:53 INFO yarn.YarnAllocator: Canceled 0 container requests (locality no longer needed)
17/05/18 03:13:53 INFO yarn.YarnAllocator: Submitted container request (host: Any, capability: <memory:12390, vCores:1>)
17/05/18 03:13:53 INFO yarn.YarnAllocator: Submitted container request (host: Any, capability: <memory:12390, vCores:1>)
17/05/18 03:13:53 INFO yarn.YarnAllocator: Submitted container request (host: Any, capability: <memory:12390, vCores:1>)
17/05/18 03:13:53 INFO yarn.YarnAllocator: Submitted container request (host: Any, capability: <memory:12390, vCores:1>)
17/05/18 03:13:54 INFO yarn.ApplicationMaster: Started progress reporter thread with (heartbeat : 3000, initial allocation : 200) intervals
17/05/18 03:13:55 INFO impl.AMRMClientImpl: Received new token for : gpu10-ib.cluster:54754
17/05/18 03:13:55 INFO impl.AMRMClientImpl: Received new token for : gpu11-ib.cluster:38611
17/05/18 03:13:55 INFO impl.AMRMClientImpl: Received new token for : gpu13-ib.cluster:44034
17/05/18 03:13:55 INFO impl.AMRMClientImpl: Received new token for : gpu12-ib.cluster:40470
17/05/18 03:13:55 INFO yarn.YarnAllocator: Launching container container_1495091612438_0001_01_000002 for on host gpu10-ib.cluster
17/05/18 03:13:55 INFO yarn.YarnAllocator: Launching ExecutorRunnable. driverUrl: spark://[email protected]:36823,  executorHostname: gpu10-ib.cluster
17/05/18 03:13:55 INFO yarn.YarnAllocator: Launching container container_1495091612438_0001_01_000003 for on host gpu11-ib.cluster
17/05/18 03:13:55 INFO yarn.YarnAllocator: Launching ExecutorRunnable. driverUrl: spark://[email protected]:36823,  executorHostname: gpu11-ib.cluster
17/05/18 03:13:55 INFO yarn.YarnAllocator: Launching container container_1495091612438_0001_01_000004 for on host gpu13-ib.cluster
17/05/18 03:13:55 INFO yarn.YarnAllocator: Launching ExecutorRunnable. driverUrl: spark://[email protected]:36823,  executorHostname: gpu13-ib.cluster
17/05/18 03:13:55 INFO yarn.YarnAllocator: Launching container container_1495091612438_0001_01_000005 for on host gpu12-ib.cluster
17/05/18 03:13:55 INFO yarn.YarnAllocator: Launching ExecutorRunnable. driverUrl: spark://[email protected]:36823,  executorHostname: gpu12-ib.cluster
17/05/18 03:13:55 INFO yarn.YarnAllocator: Received 4 containers from YARN, launching executors on 4 of them.
17/05/18 03:13:55 INFO yarn.ExecutorRunnable: Starting Executor Container
17/05/18 03:13:55 INFO yarn.ExecutorRunnable: Starting Executor Container
17/05/18 03:13:55 INFO yarn.ExecutorRunnable: Starting Executor Container
17/05/18 03:13:55 INFO yarn.ExecutorRunnable: Starting Executor Container
17/05/18 03:13:55 INFO impl.ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
17/05/18 03:13:55 INFO impl.ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
17/05/18 03:13:55 INFO impl.ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
17/05/18 03:13:55 INFO impl.ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
17/05/18 03:13:55 INFO yarn.ExecutorRunnable: Setting up ContainerLaunchContext
17/05/18 03:13:55 INFO yarn.ExecutorRunnable: Setting up ContainerLaunchContext
17/05/18 03:13:55 INFO yarn.ExecutorRunnable: Setting up ContainerLaunchContext
17/05/18 03:13:55 INFO yarn.ExecutorRunnable: Setting up ContainerLaunchContext
17/05/18 03:13:55 INFO yarn.ExecutorRunnable: 
===============================================================================
YARN executor launch context:
  env:
    SPARK_YARN_USER_ENV -> PYSPARK_PYTHON=Python/bin/python
    CLASSPATH -> {{PWD}}<CPS>{{PWD}}/__spark_conf__<CPS>{{PWD}}/__spark_libs__/*<CPS>$HADOOP_CONF_DIR<CPS>$HADOOP_COMMON_HOME/share/hadoop/common/*<CPS>$HADOOP_COMMON_HOME/share/hadoop/common/lib/*<CPS>$HADOOP_HDFS_HOME/share/hadoop/hdfs/*<CPS>$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*<CPS>$HADOOP_YARN_HOME/share/hadoop/yarn/*<CPS>$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*
    SPARK_LOG_URL_STDERR -> http://gpu11-ib.cluster:8042/node/containerlogs/container_1495091612438_0001_01_000003/javed.19/stderr?start=-4096
    SPARK_YARN_STAGING_DIR -> hdfs://gpu09:9000/user/javed.19/.sparkStaging/application_1495091612438_0001
    SPARK_USER -> javed.19
    SPARK_YARN_MODE -> true
    PYTHONPATH -> /home/javed.19/bin/pydoop-install/lib/python<CPS>{{PWD}}/pyspark.zip<CPS>{{PWD}}/py4j-0.10.3-src.zip<CPS>{{PWD}}/tfspark.zip<CPS>{{PWD}}/cifar10.zip
    LD_LIBRARY_PATH -> /usr/local/cuda/lib64:/usr/lib/jvm/java-1.8.0/jre/lib/amd64/server
    SPARK_LOG_URL_STDOUT -> http://gpu11-ib.cluster:8042/node/containerlogs/container_1495091612438_0001_01_000003/javed.19/stdout?start=-4096
    PYSPARK_PYTHON -> Python/bin/python

  command:
    {{JAVA_HOME}}/bin/java -server -Xmx11264m -Djava.io.tmpdir={{PWD}}/tmp '-Dspark.ui.port=0' '-Dspark.driver.port=36823' -Dspark.yarn.app.container.log.dir=<LOG_DIR> -XX:OnOutOfMemoryError='kill %p' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://[email protected]:36823 --executor-id 2 --hostname gpu11-ib.cluster --cores 1 --app-id application_1495091612438_0001 --user-class-path file:$PWD/__app__.jar 1> <LOG_DIR>/stdout 2> <LOG_DIR>/stderr
===============================================================================
      
17/05/18 03:13:55 INFO yarn.ExecutorRunnable: 
===============================================================================
YARN executor launch context:
  env:
    SPARK_YARN_USER_ENV -> PYSPARK_PYTHON=Python/bin/python
    CLASSPATH -> {{PWD}}<CPS>{{PWD}}/__spark_conf__<CPS>{{PWD}}/__spark_libs__/*<CPS>$HADOOP_CONF_DIR<CPS>$HADOOP_COMMON_HOME/share/hadoop/common/*<CPS>$HADOOP_COMMON_HOME/share/hadoop/common/lib/*<CPS>$HADOOP_HDFS_HOME/share/hadoop/hdfs/*<CPS>$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*<CPS>$HADOOP_YARN_HOME/share/hadoop/yarn/*<CPS>$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*
    SPARK_LOG_URL_STDERR -> http://gpu10-ib.cluster:8042/node/containerlogs/container_1495091612438_0001_01_000002/javed.19/stderr?start=-4096
    SPARK_YARN_STAGING_DIR -> hdfs://gpu09:9000/user/javed.19/.sparkStaging/application_1495091612438_0001
    SPARK_USER -> javed.19
    SPARK_YARN_MODE -> true
    PYTHONPATH -> /home/javed.19/bin/pydoop-install/lib/python<CPS>{{PWD}}/pyspark.zip<CPS>{{PWD}}/py4j-0.10.3-src.zip<CPS>{{PWD}}/tfspark.zip<CPS>{{PWD}}/cifar10.zip
    LD_LIBRARY_PATH -> /usr/local/cuda/lib64:/usr/lib/jvm/java-1.8.0/jre/lib/amd64/server
    SPARK_LOG_URL_STDOUT -> http://gpu10-ib.cluster:8042/node/containerlogs/container_1495091612438_0001_01_000002/javed.19/stdout?start=-4096
    PYSPARK_PYTHON -> Python/bin/python

  command:
    {{JAVA_HOME}}/bin/java -server -Xmx11264m -Djava.io.tmpdir={{PWD}}/tmp '-Dspark.ui.port=0' '-Dspark.driver.port=36823' -Dspark.yarn.app.container.log.dir=<LOG_DIR> -XX:OnOutOfMemoryError='kill %p' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://[email protected]:36823 --executor-id 1 --hostname gpu10-ib.cluster --cores 1 --app-id application_1495091612438_0001 --user-class-path file:$PWD/__app__.jar 1> <LOG_DIR>/stdout 2> <LOG_DIR>/stderr
===============================================================================
      
17/05/18 03:13:55 INFO yarn.ExecutorRunnable: 
===============================================================================
YARN executor launch context:
  env:
    SPARK_YARN_USER_ENV -> PYSPARK_PYTHON=Python/bin/python
    CLASSPATH -> {{PWD}}<CPS>{{PWD}}/__spark_conf__<CPS>{{PWD}}/__spark_libs__/*<CPS>$HADOOP_CONF_DIR<CPS>$HADOOP_COMMON_HOME/share/hadoop/common/*<CPS>$HADOOP_COMMON_HOME/share/hadoop/common/lib/*<CPS>$HADOOP_HDFS_HOME/share/hadoop/hdfs/*<CPS>$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*<CPS>$HADOOP_YARN_HOME/share/hadoop/yarn/*<CPS>$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*
    SPARK_LOG_URL_STDERR -> http://gpu12-ib.cluster:8042/node/containerlogs/container_1495091612438_0001_01_000005/javed.19/stderr?start=-4096
    SPARK_YARN_STAGING_DIR -> hdfs://gpu09:9000/user/javed.19/.sparkStaging/application_1495091612438_0001
    SPARK_USER -> javed.19
    SPARK_YARN_MODE -> true
    PYTHONPATH -> /home/javed.19/bin/pydoop-install/lib/python<CPS>{{PWD}}/pyspark.zip<CPS>{{PWD}}/py4j-0.10.3-src.zip<CPS>{{PWD}}/tfspark.zip<CPS>{{PWD}}/cifar10.zip
    LD_LIBRARY_PATH -> /usr/local/cuda/lib64:/usr/lib/jvm/java-1.8.0/jre/lib/amd64/server
    SPARK_LOG_URL_STDOUT -> http://gpu12-ib.cluster:8042/node/containerlogs/container_1495091612438_0001_01_000005/javed.19/stdout?start=-4096
    PYSPARK_PYTHON -> Python/bin/python

  command:
    {{JAVA_HOME}}/bin/java -server -Xmx11264m -Djava.io.tmpdir={{PWD}}/tmp '-Dspark.ui.port=0' '-Dspark.driver.port=36823' -Dspark.yarn.app.container.log.dir=<LOG_DIR> -XX:OnOutOfMemoryError='kill %p' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://[email protected]:36823 --executor-id 4 --hostname gpu12-ib.cluster --cores 1 --app-id application_1495091612438_0001 --user-class-path file:$PWD/__app__.jar 1> <LOG_DIR>/stdout 2> <LOG_DIR>/stderr
===============================================================================
      
17/05/18 03:13:55 INFO yarn.ExecutorRunnable: 
===============================================================================
YARN executor launch context:
  env:
    SPARK_YARN_USER_ENV -> PYSPARK_PYTHON=Python/bin/python
    CLASSPATH -> {{PWD}}<CPS>{{PWD}}/__spark_conf__<CPS>{{PWD}}/__spark_libs__/*<CPS>$HADOOP_CONF_DIR<CPS>$HADOOP_COMMON_HOME/share/hadoop/common/*<CPS>$HADOOP_COMMON_HOME/share/hadoop/common/lib/*<CPS>$HADOOP_HDFS_HOME/share/hadoop/hdfs/*<CPS>$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*<CPS>$HADOOP_YARN_HOME/share/hadoop/yarn/*<CPS>$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*
    SPARK_LOG_URL_STDERR -> http://gpu13-ib.cluster:8042/node/containerlogs/container_1495091612438_0001_01_000004/javed.19/stderr?start=-4096
    SPARK_YARN_STAGING_DIR -> hdfs://gpu09:9000/user/javed.19/.sparkStaging/application_1495091612438_0001
    SPARK_USER -> javed.19
    SPARK_YARN_MODE -> true
    PYTHONPATH -> /home/javed.19/bin/pydoop-install/lib/python<CPS>{{PWD}}/pyspark.zip<CPS>{{PWD}}/py4j-0.10.3-src.zip<CPS>{{PWD}}/tfspark.zip<CPS>{{PWD}}/cifar10.zip
    LD_LIBRARY_PATH -> /usr/local/cuda/lib64:/usr/lib/jvm/java-1.8.0/jre/lib/amd64/server
    SPARK_LOG_URL_STDOUT -> http://gpu13-ib.cluster:8042/node/containerlogs/container_1495091612438_0001_01_000004/javed.19/stdout?start=-4096
    PYSPARK_PYTHON -> Python/bin/python

  command:
    {{JAVA_HOME}}/bin/java -server -Xmx11264m -Djava.io.tmpdir={{PWD}}/tmp '-Dspark.ui.port=0' '-Dspark.driver.port=36823' -Dspark.yarn.app.container.log.dir=<LOG_DIR> -XX:OnOutOfMemoryError='kill %p' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://[email protected]:36823 --executor-id 3 --hostname gpu13-ib.cluster --cores 1 --app-id application_1495091612438_0001 --user-class-path file:$PWD/__app__.jar 1> <LOG_DIR>/stdout 2> <LOG_DIR>/stderr
===============================================================================
      
17/05/18 03:13:55 INFO impl.ContainerManagementProtocolProxy: Opening proxy : gpu12-ib.cluster:40470
17/05/18 03:13:55 INFO impl.ContainerManagementProtocolProxy: Opening proxy : gpu11-ib.cluster:38611
17/05/18 03:13:55 INFO impl.ContainerManagementProtocolProxy: Opening proxy : gpu10-ib.cluster:54754
17/05/18 03:13:55 INFO impl.ContainerManagementProtocolProxy: Opening proxy : gpu13-ib.cluster:44034
17/05/18 03:13:57 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(null) (10.3.1.12:44066) with ID 4
17/05/18 03:13:57 INFO storage.BlockManagerMasterEndpoint: Registering block manager gpu12-ib.cluster:52469 with 5.7 GB RAM, BlockManagerId(4, gpu12-ib.cluster, 52469)
17/05/18 03:14:01 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(null) (10.3.1.11:42183) with ID 2
17/05/18 03:14:01 INFO storage.BlockManagerMasterEndpoint: Registering block manager gpu11-ib.cluster:34736 with 5.7 GB RAM, BlockManagerId(2, gpu11-ib.cluster, 34736)
17/05/18 03:14:01 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(null) (10.3.1.13:39548) with ID 3
17/05/18 03:14:01 INFO storage.BlockManagerMasterEndpoint: Registering block manager gpu13-ib.cluster:39205 with 5.7 GB RAM, BlockManagerId(3, gpu13-ib.cluster, 39205)
17/05/18 03:14:01 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(null) (10.3.1.10:34494) with ID 1
17/05/18 03:14:01 INFO storage.BlockManagerMasterEndpoint: Registering block manager gpu10-ib.cluster:43050 with 5.7 GB RAM, BlockManagerId(1, gpu10-ib.cluster, 43050)
17/05/18 03:14:01 INFO cluster.YarnClusterSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
17/05/18 03:14:01 INFO cluster.YarnClusterScheduler: YarnClusterScheduler.postStartHook done
17/05/18 03:14:01 INFO spark.SparkContext: Starting job: foreachPartition at /tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/tfspark.zip/com/yahoo/ml/tf/TFCluster.py:279
17/05/18 03:14:01 INFO scheduler.DAGScheduler: Got job 0 (foreachPartition at /tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/tfspark.zip/com/yahoo/ml/tf/TFCluster.py:279) with 4 output partitions
17/05/18 03:14:01 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (foreachPartition at /tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/tfspark.zip/com/yahoo/ml/tf/TFCluster.py:279)
17/05/18 03:14:01 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/05/18 03:14:01 INFO scheduler.DAGScheduler: Missing parents: List()
17/05/18 03:14:01 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (PythonRDD[1] at foreachPartition at /tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/tfspark.zip/com/yahoo/ml/tf/TFCluster.py:279), which has no missing parents
17/05/18 03:14:01 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 18.5 KB, free 366.3 MB)
17/05/18 03:14:01 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 12.5 KB, free 366.3 MB)
17/05/18 03:14:01 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.3.1.12:49987 (size: 12.5 KB, free: 366.3 MB)
17/05/18 03:14:01 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1012
17/05/18 03:14:01 INFO scheduler.DAGScheduler: Submitting 4 missing tasks from ResultStage 0 (PythonRDD[1] at foreachPartition at /tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/tfspark.zip/com/yahoo/ml/tf/TFCluster.py:279)
17/05/18 03:14:01 INFO cluster.YarnClusterScheduler: Adding task set 0.0 with 4 tasks
17/05/18 03:14:02 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, gpu11-ib.cluster, partition 0, PROCESS_LOCAL, 5594 bytes)
17/05/18 03:14:02 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, gpu12-ib.cluster, partition 1, PROCESS_LOCAL, 5594 bytes)
17/05/18 03:14:02 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, gpu13-ib.cluster, partition 2, PROCESS_LOCAL, 5594 bytes)
17/05/18 03:14:02 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, gpu10-ib.cluster, partition 3, PROCESS_LOCAL, 5594 bytes)
17/05/18 03:14:02 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Launching task 0 on executor id: 2 hostname: gpu11-ib.cluster.
17/05/18 03:14:02 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Launching task 1 on executor id: 4 hostname: gpu12-ib.cluster.
17/05/18 03:14:02 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Launching task 2 on executor id: 3 hostname: gpu13-ib.cluster.
17/05/18 03:14:02 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Launching task 3 on executor id: 1 hostname: gpu10-ib.cluster.
17/05/18 03:14:02 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on gpu10-ib.cluster:43050 (size: 12.5 KB, free: 5.7 GB)
17/05/18 03:14:02 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on gpu11-ib.cluster:34736 (size: 12.5 KB, free: 5.7 GB)
17/05/18 03:14:02 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on gpu13-ib.cluster:39205 (size: 12.5 KB, free: 5.7 GB)
17/05/18 03:14:02 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on gpu12-ib.cluster:52469 (size: 12.5 KB, free: 5.7 GB)
17/05/18 03:14:07 WARN scheduler.TaskSetManager: Lost task 3.0 in stage 0.0 (TID 3, gpu10-ib.cluster): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000002/pyspark.zip/pyspark/worker.py", line 172, in main
    process()
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000002/pyspark.zip/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 317, in func
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 762, in func
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/tfspark.zip/com/yahoo/ml/tf/TFSparkNode.py", line 421, in _mapfn
  File "cifar10_multi_gpu_train.py", line 271, in main_fun
  File "/home/javed.19/Python/lib/python2.7/site-packages/tensorflow/python/lib/io/file_io.py", line 432, in delete_recursively
    pywrap_tensorflow.DeleteRecursively(compat.as_bytes(dirname), status)
  File "/home/javed.19/Python/lib/python2.7/contextlib.py", line 24, in __exit__
    self.gen.next()
  File "/home/javed.19/Python/lib/python2.7/site-packages/tensorflow/python/framework/errors_impl.py", line 466, in raise_exception_on_not_ok_status
    pywrap_tensorflow.TF_GetCode(status))
UnknownError: hdfs://default/user/javed.19/cifar10_train/events.out.tfevents.1495091646.gpu13.cluster; Input/output error

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

17/05/18 03:14:07 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, gpu12-ib.cluster): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000005/pyspark.zip/pyspark/worker.py", line 172, in main
    process()
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000005/pyspark.zip/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 317, in func
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 762, in func
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/tfspark.zip/com/yahoo/ml/tf/TFSparkNode.py", line 421, in _mapfn
  File "cifar10_multi_gpu_train.py", line 271, in main_fun
  File "/home/javed.19/Python/lib/python2.7/site-packages/tensorflow/python/lib/io/file_io.py", line 432, in delete_recursively
    pywrap_tensorflow.DeleteRecursively(compat.as_bytes(dirname), status)
  File "/home/javed.19/Python/lib/python2.7/contextlib.py", line 24, in __exit__
    self.gen.next()
  File "/home/javed.19/Python/lib/python2.7/site-packages/tensorflow/python/framework/errors_impl.py", line 466, in raise_exception_on_not_ok_status
    pywrap_tensorflow.TF_GetCode(status))
UnknownError: hdfs://default/user/javed.19/cifar10_train/events.out.tfevents.1495091646.gpu13.cluster; Input/output error

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

17/05/18 03:14:07 INFO scheduler.TaskSetManager: Starting task 1.1 in stage 0.0 (TID 4, gpu12-ib.cluster, partition 1, PROCESS_LOCAL, 5594 bytes)
17/05/18 03:14:07 INFO scheduler.TaskSetManager: Starting task 3.1 in stage 0.0 (TID 5, gpu10-ib.cluster, partition 3, PROCESS_LOCAL, 5594 bytes)
17/05/18 03:14:07 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Launching task 4 on executor id: 4 hostname: gpu12-ib.cluster.
17/05/18 03:14:07 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Launching task 5 on executor id: 1 hostname: gpu10-ib.cluster.
17/05/18 03:14:09 WARN scheduler.TaskSetManager: Lost task 1.1 in stage 0.0 (TID 4, gpu12-ib.cluster): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000005/pyspark.zip/pyspark/worker.py", line 172, in main
    process()
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000005/pyspark.zip/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 317, in func
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 762, in func
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/tfspark.zip/com/yahoo/ml/tf/TFSparkNode.py", line 421, in _mapfn
  File "cifar10_multi_gpu_train.py", line 271, in main_fun
  File "/home/javed.19/Python/lib/python2.7/site-packages/tensorflow/python/lib/io/file_io.py", line 432, in delete_recursively
    pywrap_tensorflow.DeleteRecursively(compat.as_bytes(dirname), status)
  File "/home/javed.19/Python/lib/python2.7/contextlib.py", line 24, in __exit__
    self.gen.next()
  File "/home/javed.19/Python/lib/python2.7/site-packages/tensorflow/python/framework/errors_impl.py", line 466, in raise_exception_on_not_ok_status
    pywrap_tensorflow.TF_GetCode(status))
UnknownError: hdfs://default/user/javed.19/cifar10_train; Input/output error

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

17/05/18 03:14:09 INFO scheduler.TaskSetManager: Starting task 1.2 in stage 0.0 (TID 6, gpu12-ib.cluster, partition 1, PROCESS_LOCAL, 5594 bytes)
17/05/18 03:14:09 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Launching task 6 on executor id: 4 hostname: gpu12-ib.cluster.
17/05/18 03:14:09 WARN scheduler.TaskSetManager: Lost task 3.1 in stage 0.0 (TID 5, gpu10-ib.cluster): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000002/pyspark.zip/pyspark/worker.py", line 172, in main
    process()
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000002/pyspark.zip/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 317, in func
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 762, in func
  File "/tmp/hadoop-javed.19/nm-local-dir/usercache/javed.19/appcache/application_1495091612438_0001/container_1495091612438_0001_01_000001/tfspark.zip/com/yahoo/ml/tf/TFSparkNode.py", line 421, in _mapfn
  File "cifar10_multi_gpu_train.py", line 271, in main_fun
  File "/home/javed.19/Python/lib/python2.7/site-packages/tensorflow/python/lib/io/file_io.py", line 432, in delete_recursively
    pywrap_tensorflow.DeleteRecursively(compat.as_bytes(dirname), status)
  File "/home/javed.19/Python/lib/python2.7/contextlib.py", line 24, in __exit__
    self.gen.next()
  File "/home/javed.19/Python/lib/python2.7/site-packages/tensorflow/python/framework/errors_impl.py", line 466, in raise_exception_on_not_ok_status
    pywrap_tensorflow.TF_GetCode(status))
UnknownError: hdfs://default/user/javed.19/cifar10_train/events.out.tfevents.1495091647.gpu11.cluster; Input/output error

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

17/05/18 03:14:09 INFO scheduler.TaskSetManager: Starting task 3.2 in stage 0.0 (TID 7, gpu10-ib.cluster, partition 3, PROCESS_LOCAL, 5594 bytes)
17/05/18 03:14:09 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Launching task 7 on executor id: 1 hostname: gpu10-ib.cluster.

Tensorflow 1.0

Thanks for this great project, really meets a lot of unmet needs.
Is there a roadmap for when TFoS will be compatible with Tensorflow 1.0. There is now support for a new ClusterManager for CloudML and Mesos. Could this project attempt to get into tensorflow/contrib as a Yarn alternative?

Use multiple GPUs per node in YARN mode

The documentation for MNIST EC2 example states that we can set the NUM_GPU environment variable to specify how many GPUs we want to be used per node. Is the same true for Hadoop based YARN clusters as well?

running TensorflowOnSpark demo cluster_size problem

----------This is my commit-script:
./spark-submit
--master=spark://master:7077
--conf spark.executorEnv.LD_LIBRARY_PATH="${JAVA_HOME}/jre/lib/amd64/server"
--conf spark.executorEnv.CLASSPATH="$($HADOOP_HOME/bin/hadoop classpath --glob):${CLASSPATH}"
--py-files /app/run/tfspark.zip,/app/TensorFlowOnSpark/examples/mnist/spark/mnist_dist.py
--queue default
/app/TensorFlowOnSpark/examples/mnist/spark/mnist_spark.py
*--cluster_size 2 *
--images examples/mnist/csv/train/images
--labels examples/mnist/csv/train/labels
--format csv
--mode train
--model mnist_model

--------there is a problem :if i don't configure --cluster_size 2 ,there will be a error :

org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/app/spark-2.0.0/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/app/spark-2.0.0/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/app/spark-2.0.0/python/lib/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
File "/app/spark-2.0.0/python/lib/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
File "/app/spark-2.0.0/python/lib/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
File "/app/spark-2.0.0/python/lib/pyspark.zip/pyspark/rdd.py", line 317, in func
File "/app/spark-2.0.0/python/lib/pyspark.zip/pyspark/rdd.py", line 762, in func
File "/app/run/tfspark.zip/tensorflowonspark/TFSparkNode.py", line 433, in _train
AttributeError: 'NoneType' object has no attribute 'get_queue'

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

------I read source code ,but can't understand,so present this problem,if who know ,please tell me,thanks very much!

YARN unknown queue: gpu when running MNIST example

I am trying to run the TensorFlowOnSpark example using Hadoop cluster on GPUs. I was able to configure the environment correctly, but when it comes to the point of Convert the MNIST zip files into HDFS files and I run the spark-submit job given in the example I get the following error:

py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.hadoop.yarn.exceptions.YarnException: Failed to submit application_1494481097930_0005 to YARN : Application application_1494481097930_0005 submitted by user javed.19 to unknown queue: gpu
	at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.submitApplication(YarnClientImpl.java:271)
	at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:172)
	at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:56)
	at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:149)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:497)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:240)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:236)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)

I am assuming I have to setup a yarn queue for it to able to utilize GPUs. If that is the case, how do I do it ?

Any help would be really appreciated.

Environment:

CUDA: 8
CUDNN: 5.0
Python: 2.7.3
Tensorflow: 1.0

Key sm_w/Adagrad not found in checkpoint

Hi, I encounter an error while running MNIST sample on yarn cluster. Several days ago the sample could work.

2017-04-12 12:41:53,091 INFO (MainThread-32503) 1: ======== worker:0 ========
2017-04-12 12:41:53,091 INFO (MainThread-32503) 1: Cluster spec: {'ps': ['worker3:45207'], 'worker': ['worker1:38243', 'worker2:37
559', 'worker3:44837', 'worker1:35344']}
2017-04-12 12:41:53,091 INFO (MainThread-32503) 1: Using CPU
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:197] Initialize GrpcChannelCache for job ps -> {0 -> worker3:45207}
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:197] Initialize GrpcChannelCache for job worker -> {0 -> localhost:38243
, 1 -> worker2:37559, 2 -> worker3:44837, 3 -> worker1:35344}
I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:211] Started server with target: grpc://localhost:38243
17/04/12 12:41:53 INFO HadoopRDD: Input split: hdfs://ns/user/spark/mnist/csv/train/labels/part-00002:0+245760
17/04/12 12:41:53 INFO TorrentBroadcast: Started reading broadcast variable 1
17/04/12 12:41:53 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 21.0 KB, free 139.7 MB)
17/04/12 12:41:53 INFO TorrentBroadcast: Reading broadcast variable 1 took 200 ms
17/04/12 12:41:53 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 289.2 KB, free 139.5 MB)
tensorflow model path: hdfs://ns/user/spark/mnist_model
2017-04-12 12:41:53,969 INFO (MainThread-32560) Connected to TFSparkNode.mgr on worker1, ppid=32428, state='running'
2017-04-12 12:41:54,003 INFO (MainThread-32560) mgr.state='running'
2017-04-12 12:41:54,003 INFO (MainThread-32560) Feeding partition <generator object load_stream at 0x7f157fa00320> into input queu
e <multiprocessing.queues.JoinableQueue object at 0x7f157fa571d0>
WARNING:tensorflow:From /opt/anaconda2/envs/tensorflow/lib/python2.7/site-packages/tensorflow/python/training/supervisor.py:344 in
init.: init (from tensorflow.python.training.summary_io) is deprecated and will be removed after 2016-11-30.
Instructions for updating:
Please switch to tf.summary.FileWriter. The interface and behavior is the same; this is just a rename.
2017-04-12 12:41:54,031 WARNING (MainThread-32503) From /opt/anaconda2/envs/tensorflow/lib/python2.7/site-packages/tensorflow/pyth
on/training/supervisor.py:344 in init.: init (from tensorflow.python.training.summary_io) is deprecated and will be remove
d after 2016-11-30.
Instructions for updating:
Please switch to tf.summary.FileWriter. The interface and behavior is the same; this is just a rename.
17/04/12 12:41:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes wher
e applicable
I tensorflow/core/distributed_runtime/master_session.cc:993] Start master session 210898f5114242e8 with config:

17/04/12 12:42:00 INFO PythonRunner: Times: total = 7664, boot = -3202, init = 3439, finish = 7427
17/04/12 12:42:00 INFO PythonRunner: Times: total = 338, boot = 31, init = 78, finish = 229
INFO:tensorflow:Error reported to Coordinator: <class 'tensorflow.python.framework.errors_impl.NotFoundError'>, Key sm_w/Adagrad n
ot found in checkpoint
[[Node: save/RestoreV2_8 = RestoreV2[dtypes=[DT_FLOAT], _device="/job:ps/replica:0/task:0/cpu:0"](_recv_save/Const_0_S1,
save/RestoreV2_8/tensor_names, save/RestoreV2_8/shape_and_slices)]]

Caused by op u'save/RestoreV2_8', defined at:
File "/opt/anaconda2/envs/tensorflow/lib/python2.7/runpy.py", line 174, in _run_module_as_main
"main", fname, loader, pkg_name)
File "/opt/anaconda2/envs/tensorflow/lib/python2.7/runpy.py", line 72, in _run_code
exec code in run_globals
File "/data/hadoop/tmp/nm-local-dir/usercache/spark/appcache/application_1491967800399_0004/container_1491967800399_0004_01_0000
02/pyspark.zip/pyspark/daemon.py", line 180, in
File "/data/hadoop/tmp/nm-local-dir/usercache/spark/appcache/application_1491967800399_0004/container_1491967800399_0004_01_0000
02/pyspark.zip/pyspark/daemon.py", line 157, in manager
File "/data/hadoop/tmp/nm-local-dir/usercache/spark/appcache/application_1491967800399_0004/container_1491967800399_0004_01_0000
02/pyspark.zip/pyspark/daemon.py", line 61, in worker
File "/data/hadoop/tmp/nm-local-dir/usercache/spark/appcache/application_1491967800399_0004/container_1491967800399_0004_01_0000
02/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/data/hadoop/tmp/nm-local-dir/usercache/spark/appcache/application_1491967800399_0004/container_1491967800399_0004_01_0000
02/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 317, in func
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 762, in func
File "/home/spark/TensorFlowOnSpark2/TensorFlowOnSpark-master/tfspark.zip/com/yahoo/ml/tf/TFSparkNode.py", line 399, in _mapfn
File "/opt/anaconda2/envs/tensorflow/lib/python2.7/multiprocessing/process.py", line 130, in start
self._popen = Popen(self)
File "/opt/anaconda2/envs/tensorflow/lib/python2.7/multiprocessing/forking.py", line 126, in init
code = process_obj._bootstrap()
File "/opt/anaconda2/envs/tensorflow/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/opt/anaconda2/envs/tensorflow/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/data/hadoop/tmp/nm-local-dir/usercache/spark/appcache/application_1491967800399_0004/container_1491967800399_0004_01_0000
02/pyfiles/mnist_dist.py", line 104, in map_fun
saver = tf.train.Saver()
File "/opt/anaconda2/envs/tensorflow/lib/python2.7/site-packages/tensorflow/python/training/saver.py", line 1000, in init
self.build()
File "/opt/anaconda2/envs/tensorflow/lib/python2.7/site-packages/tensorflow/python/training/saver.py", line 1030, in build
restore_sequentially=self._restore_sequentially)
File "/opt/anaconda2/envs/tensorflow/lib/python2.7/site-packages/tensorflow/python/training/saver.py", line 624, in build
restore_sequentially, reshape)
File "/opt/anaconda2/envs/tensorflow/lib/python2.7/site-packages/tensorflow/python/training/saver.py", line 361, in _AddRestoreO
ps
tensors = self.restore_op(filename_tensor, saveable, preferred_shard)
File "/opt/anaconda2/envs/tensorflow/lib/python2.7/site-packages/tensorflow/python/training/saver.py", line 200, in restore_op
[spec.tensor.dtype])[0])
File "/opt/anaconda2/envs/tensorflow/lib/python2.7/site-packages/tensorflow/python/ops/gen_io_ops.py", line 441, in restore_v2
dtypes=dtypes, name=name)
File "/opt/anaconda2/envs/tensorflow/lib/python2.7/site-packages/tensorflow/python/framework/op_def_library.py", line 759, in ap
ply_op
op_def=op_def)
File "/opt/anaconda2/envs/tensorflow/lib/python2.7/site-packages/tensorflow/python/framework/ops.py", line 2240, in create_op
original_op=self._default_original_op, op_def=op_def)
File "/opt/anaconda2/envs/tensorflow/lib/python2.7/site-packages/tensorflow/python/framework/ops.py", line 1128, in init
self._traceback = _extract_stack()

NotFoundError (see above for traceback): Key sm_w/Adagrad not found in checkpoint
[[Node: save/RestoreV2_8 = RestoreV2[dtypes=[DT_FLOAT], _device="/job:ps/replica:0/task:0/cpu:0"](_recv_save/Const_0_S1,
save/RestoreV2_8/tensor_names, save/RestoreV2_8/shape_and_slices)]]

hello! i am new about TensorFlowOnSpark ! I have a problem ! Please help me!

command:
${SPARK_HOME}/bin/spark-submit
--master spark://master:7077
--conf spark.executorEnv.LD_LIBRARY_PATH="${JAVA_HOME}/jre/lib/amd64/server"
--py-files ${TFoS_HOME}/tfspark.zip,${TFoS_HOME}/examples/mnist/spark/mnist_dist.py
--conf spark.cores.max=3
--conf spark.task.cpus=1
--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME"
${TFoS_HOME}/examples/mnist/spark/mnist_spark.py
--cluster_size 3
--images examples/mnist/csv/train/images
--labels examples/mnist/csv/train/labels
--format csv
--mode train
--model mnist_model

It cannot finish this job. So I must to force to stop it.I do not know the problem of it ,please help me fix it
this is my log when i run it :
17/07/12 10:57:05 INFO spark.SparkContext: Running Spark version 1.6.0
17/07/12 10:57:05 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/07/12 10:57:06 INFO spark.SecurityManager: Changing view acls to: root
17/07/12 10:57:06 INFO spark.SecurityManager: Changing modify acls to: root
17/07/12 10:57:06 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
17/07/12 10:57:07 INFO util.Utils: Successfully started service 'sparkDriver' on port 46153.
17/07/12 10:57:08 INFO slf4j.Slf4jLogger: Slf4jLogger started
17/07/12 10:57:08 INFO Remoting: Starting remoting
17/07/12 10:57:08 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:43606]
17/07/12 10:57:08 INFO util.Utils: Successfully started service 'sparkDriverActorSystem' on port 43606.
17/07/12 10:57:09 INFO spark.SparkEnv: Registering MapOutputTracker
17/07/12 10:57:09 INFO spark.SparkEnv: Registering BlockManagerMaster
17/07/12 10:57:09 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-ae55c324-7728-4b32-875f-acf5a567a780
17/07/12 10:57:09 INFO storage.MemoryStore: MemoryStore started with capacity 511.1 MB
17/07/12 10:57:09 INFO spark.SparkEnv: Registering OutputCommitCoordinator
17/07/12 10:57:10 INFO server.Server: jetty-8.y.z-SNAPSHOT
17/07/12 10:57:11 INFO server.AbstractConnector: Started [email protected]:4040
17/07/12 10:57:11 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
17/07/12 10:57:11 INFO ui.SparkUI: Started SparkUI at http://192.168.147.134:4040
17/07/12 10:57:11 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-ab5e0a6c-fc99-4b8d-9aa3-eea24ddda6d1/httpd-5807f3e1-680e-43bb-a5a1-1e9077bf63a7
17/07/12 10:57:11 INFO spark.HttpServer: Starting HTTP Server
17/07/12 10:57:11 INFO server.Server: jetty-8.y.z-SNAPSHOT
17/07/12 10:57:11 INFO server.AbstractConnector: Started [email protected]:46656
17/07/12 10:57:11 INFO util.Utils: Successfully started service 'HTTP file server' on port 46656.
17/07/12 10:57:11 INFO util.Utils: Copying /usr/local/share/TensorFlowOnSpark/examples/mnist/spark/mnist_spark.py to /tmp/spark-ab5e0a6c-fc99-4b8d-9aa3-eea24ddda6d1/userFiles-fd88da1c-becf-47db-90ab-09454a5cc2a9/mnist_spark.py
17/07/12 10:57:11 INFO spark.SparkContext: Added file file:/usr/local/share/TensorFlowOnSpark/examples/mnist/spark/mnist_spark.py at http://192.168.147.134:46656/files/mnist_spark.py with timestamp 1499882231460
17/07/12 10:57:11 INFO util.Utils: Copying /usr/local/share/TensorFlowOnSpark/tfspark.zip to /tmp/spark-ab5e0a6c-fc99-4b8d-9aa3-eea24ddda6d1/userFiles-fd88da1c-becf-47db-90ab-09454a5cc2a9/tfspark.zip
17/07/12 10:57:11 INFO spark.SparkContext: Added file file:/usr/local/share/TensorFlowOnSpark/tfspark.zip at http://192.168.147.134:46656/files/tfspark.zip with timestamp 1499882231499
17/07/12 10:57:11 INFO util.Utils: Copying /usr/local/share/TensorFlowOnSpark/examples/mnist/spark/mnist_dist.py to /tmp/spark-ab5e0a6c-fc99-4b8d-9aa3-eea24ddda6d1/userFiles-fd88da1c-becf-47db-90ab-09454a5cc2a9/mnist_dist.py
17/07/12 10:57:11 INFO spark.SparkContext: Added file file:/usr/local/share/TensorFlowOnSpark/examples/mnist/spark/mnist_dist.py at http://192.168.147.134:46656/files/mnist_dist.py with timestamp 1499882231514
17/07/12 10:57:11 INFO client.AppClient$ClientEndpoint: Connecting to master spark://master:7077...
17/07/12 10:57:12 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20170712105712-0000
17/07/12 10:57:12 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 36618.
17/07/12 10:57:12 INFO netty.NettyBlockTransferService: Server created on 36618
17/07/12 10:57:12 INFO storage.BlockManagerMaster: Trying to register BlockManager
17/07/12 10:57:12 INFO storage.BlockManagerMasterEndpoint: Registering block manager 192.168.147.134:36618 with 511.1 MB RAM, BlockManagerId(driver, 192.168.147.134, 36618)
17/07/12 10:57:12 INFO storage.BlockManagerMaster: Registered BlockManager
17/07/12 10:57:12 INFO client.AppClient$ClientEndpoint: Executor added: app-20170712105712-0000/0 on worker-20170712105334-192.168.147.134-33141 (192.168.147.134:33141) with 1 cores
17/07/12 10:57:12 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20170712105712-0000/0 on hostPort 192.168.147.134:33141 with 1 cores, 1024.0 MB RAM
17/07/12 10:57:12 INFO client.AppClient$ClientEndpoint: Executor added: app-20170712105712-0000/1 on worker-20170712105342-192.168.147.136-32959 (192.168.147.136:32959) with 1 cores
17/07/12 10:57:12 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20170712105712-0000/1 on hostPort 192.168.147.136:32959 with 1 cores, 1024.0 MB RAM
17/07/12 10:57:12 INFO client.AppClient$ClientEndpoint: Executor added: app-20170712105712-0000/2 on worker-20170712105341-192.168.147.135-45599 (192.168.147.135:45599) with 1 cores
17/07/12 10:57:12 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20170712105712-0000/2 on hostPort 192.168.147.135:45599 with 1 cores, 1024.0 MB RAM
17/07/12 10:57:12 INFO client.AppClient$ClientEndpoint: Executor updated: app-20170712105712-0000/0 is now RUNNING
17/07/12 10:57:13 INFO client.AppClient$ClientEndpoint: Executor updated: app-20170712105712-0000/2 is now RUNNING
17/07/12 10:57:13 INFO client.AppClient$ClientEndpoint: Executor updated: app-20170712105712-0000/1 is now RUNNING
17/07/12 10:57:13 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
args: Namespace(batch_size=100, cluster_size=3, epochs=1, format='csv', images='examples/mnist/csv/train/images', labels='examples/mnist/csv/train/labels', mode='train', model='mnist_model', output='predictions', rdma=False, readers=1, steps=1000, tensorboard=False)
2017-07-12T10:57:14.140143 ===== Start
17/07/12 10:57:18 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 212.8 KB, free 212.8 KB)
17/07/12 10:57:19 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 19.5 KB, free 232.4 KB)
17/07/12 10:57:19 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.147.134:36618 (size: 19.5 KB, free: 511.1 MB)
17/07/12 10:57:19 INFO spark.SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:-2
17/07/12 10:57:20 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 212.8 KB, free 445.2 KB)
17/07/12 10:57:20 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 19.5 KB, free 464.7 KB)
17/07/12 10:57:20 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.147.134:36618 (size: 19.5 KB, free: 511.1 MB)
17/07/12 10:57:20 INFO spark.SparkContext: Created broadcast 1 from textFile at NativeMethodAccessorImpl.java:-2
zipping images and labels
17/07/12 10:57:23 INFO mapred.FileInputFormat: Total input paths to process : 10
17/07/12 10:57:24 INFO mapred.FileInputFormat: Total input paths to process : 10
2017-07-12 10:57:24,472 INFO (MainThread-10899) Reserving TFSparkNodes
2017-07-12 10:57:24,477 INFO (MainThread-10899) listening for reservations at ('master', 35508)
2017-07-12 10:57:24,484 INFO (MainThread-10899) Starting TensorFlow on executors
2017-07-12 10:57:24,646 INFO (MainThread-10899) Waiting for TFSparkNodes to start
2017-07-12 10:57:24,646 INFO (MainThread-10899) waiting for 3 reservations
17/07/12 10:57:24 INFO spark.SparkContext: Starting job: foreachPartition at tensorflowonspark/TFCluster.py:279
17/07/12 10:57:25 INFO scheduler.DAGScheduler: Got job 0 (foreachPartition at tensorflowonspark/TFCluster.py:279) with 3 output partitions
17/07/12 10:57:25 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (foreachPartition at tensorflowonspark/TFCluster.py:279)
17/07/12 10:57:25 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/07/12 10:57:25 INFO scheduler.DAGScheduler: Missing parents: List()
17/07/12 10:57:25 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (PythonRDD[8] at foreachPartition at tensorflowonspark/TFCluster.py:279), which has no missing parents
17/07/12 10:57:25 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 10.5 KB, free 475.2 KB)
17/07/12 10:57:25 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 7.3 KB, free 482.6 KB)
17/07/12 10:57:25 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.147.134:36618 (size: 7.3 KB, free: 511.1 MB)
17/07/12 10:57:25 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
17/07/12 10:57:25 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 0 (PythonRDD[8] at foreachPartition at tensorflowonspark/TFCluster.py:279)
17/07/12 10:57:25 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
2017-07-12 10:57:25,647 INFO (MainThread-10899) waiting for 3 reservations
17/07/12 10:57:25 INFO cluster.SparkDeploySchedulerBackend: Registered executor NettyRpcEndpointRef(null) (master:36452) with ID 0
17/07/12 10:57:26 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, master, partition 0,PROCESS_LOCAL, 2256 bytes)
2017-07-12 10:57:26,649 INFO (MainThread-10899) waiting for 3 reservations
17/07/12 10:57:26 INFO storage.BlockManagerMasterEndpoint: Registering block manager master:34551 with 511.1 MB RAM, BlockManagerId(0, master, 34551)
2017-07-12 10:57:27,652 INFO (MainThread-10899) waiting for 3 reservations
2017-07-12 10:57:28,662 INFO (MainThread-10899) waiting for 3 reservations
2017-07-12 10:57:29,663 INFO (MainThread-10899) waiting for 3 reservations
17/07/12 10:57:29 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on master:34551 (size: 7.3 KB, free: 511.1 MB)
2017-07-12 10:57:30,665 INFO (MainThread-10899) waiting for 3 reservations
17/07/12 10:57:30 INFO cluster.SparkDeploySchedulerBackend: Registered executor NettyRpcEndpointRef(null) (slave02:42844) with ID 1
17/07/12 10:57:30 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, slave02, partition 1,PROCESS_LOCAL, 2256 bytes)
2017-07-12 10:57:31,666 INFO (MainThread-10899) waiting for 3 reservations
17/07/12 10:57:32 INFO storage.BlockManagerMasterEndpoint: Registering block manager slave02:32867 with 517.4 MB RAM, BlockManagerId(1, slave02, 32867)
2017-07-12 10:57:32,667 INFO (MainThread-10899) waiting for 3 reservations
2017-07-12 10:57:33,669 INFO (MainThread-10899) waiting for 3 reservations
2017-07-12 10:57:34,670 INFO (MainThread-10899) waiting for 3 reservations
2017-07-12 10:57:35,680 INFO (MainThread-10899) waiting for 3 reservations
2017-07-12 10:57:36,683 INFO (MainThread-10899) waiting for 3 reservations
2017-07-12 10:57:37,716 INFO (MainThread-10899) waiting for 3 reservations
2017-07-12 10:57:38,730 INFO (MainThread-10899) waiting for 3 reservations
17/07/12 10:57:38 INFO cluster.SparkDeploySchedulerBackend: Registered executor NettyRpcEndpointRef(null) (slave01:59680) with ID 2
17/07/12 10:57:38 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, slave01, partition 2,PROCESS_LOCAL, 2256 bytes)
17/07/12 10:57:39 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on slave02:32867 (size: 7.3 KB, free: 517.4 MB)
2017-07-12 10:57:39,732 INFO (MainThread-10899) waiting for 3 reservations
17/07/12 10:57:40 INFO storage.BlockManagerMasterEndpoint: Registering block manager slave01:44388 with 517.4 MB RAM, BlockManagerId(2, slave01, 44388)
2017-07-12 10:57:40,734 INFO (MainThread-10899) waiting for 3 reservations
2017-07-12 10:57:41,737 INFO (MainThread-10899) waiting for 3 reservations
2017-07-12 10:57:42,739 INFO (MainThread-10899) waiting for 3 reservations
2017-07-12 10:57:43,741 INFO (MainThread-10899) waiting for 2 reservations
2017-07-12 10:57:44,743 INFO (MainThread-10899) waiting for 2 reservations
2017-07-12 10:57:45,745 INFO (MainThread-10899) waiting for 2 reservations
2017-07-12 10:57:46,747 INFO (MainThread-10899) waiting for 2 reservations
2017-07-12 10:57:47,749 INFO (MainThread-10899) waiting for 2 reservations
2017-07-12 10:57:48,750 INFO (MainThread-10899) waiting for 2 reservations
2017-07-12 10:57:49,752 INFO (MainThread-10899) waiting for 2 reservations
2017-07-12 10:57:50,754 INFO (MainThread-10899) waiting for 2 reservations
2017-07-12 10:57:51,755 INFO (MainThread-10899) waiting for 2 reservations
2017-07-12 10:57:52,757 INFO (MainThread-10899) waiting for 2 reservations
2017-07-12 10:57:53,760 INFO (MainThread-10899) waiting for 2 reservations
2017-07-12 10:57:54,761 INFO (MainThread-10899) waiting for 2 reservations
17/07/12 10:57:54 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on slave01:44388 (size: 7.3 KB, free: 517.4 MB)
2017-07-12 10:57:55,763 INFO (MainThread-10899) waiting for 2 reservations
2017-07-12 10:57:56,765 INFO (MainThread-10899) waiting for 2 reservations
2017-07-12 10:57:57,767 INFO (MainThread-10899) waiting for 2 reservations
2017-07-12 10:57:58,769 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:57:59,771 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:00,773 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:01,775 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:02,776 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:03,778 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:04,780 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:05,782 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:06,784 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:07,785 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:08,787 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:09,788 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:10,788 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:11,790 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:12,792 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:13,794 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:14,796 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:15,797 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:16,799 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:17,800 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:18,802 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:19,805 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:20,806 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:21,808 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:22,810 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:23,812 INFO (MainThread-10899) waiting for 1 reservations
2017-07-12 10:58:24,814 INFO (MainThread-10899) all reservations completed
2017-07-12 10:58:24,814 INFO (MainThread-10899) All TFSparkNodes started
2017-07-12 10:58:24,814 INFO (MainThread-10899) {'addr': ('master', 34991), 'task_index': 0, 'port': 40163, 'authkey': '\xd6\x89\xa7\xb1\xd6\x87D\xc0\x92\xc0\xb2EN>da', 'worker_num': 0, 'host': 'master', 'ppid': 11038, 'job_name': 'ps', 'tb_pid': 0, 'tb_port': 0}
2017-07-12 10:58:24,814 INFO (MainThread-10899) {'addr': '/tmp/pymp-7ldRzD/listener-AiMAbL', 'task_index': 0, 'port': 46745, 'authkey': '\x9e\xe6o5\x13\xe3L\x11\xb4\xb6\xb5_S\x98BD', 'worker_num': 1, 'host': 'slave02', 'ppid': 9134, 'job_name': 'worker', 'tb_pid': 0, 'tb_port': 0}
2017-07-12 10:58:24,814 INFO (MainThread-10899) {'addr': '/tmp/pymp-r3AWxR/listener-mceQPy', 'task_index': 1, 'port': 45309, 'authkey': '\xe0\x1dc}\xa5\xd9F\xc4\x892mX\x8d\xa5d\xc2', 'worker_num': 2, 'host': 'slave01', 'ppid': 10749, 'job_name': 'worker', 'tb_pid': 0, 'tb_port': 0}
2017-07-12 10:58:24,814 INFO (MainThread-10899) Feeding training data
17/07/12 10:58:26 INFO spark.SparkContext: Starting job: collect at PythonRDD.scala:405
17/07/12 10:58:27 INFO scheduler.DAGScheduler: Got job 1 (collect at PythonRDD.scala:405) with 10 output partitions
17/07/12 10:58:27 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (collect at PythonRDD.scala:405)
17/07/12 10:58:27 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/07/12 10:58:27 INFO scheduler.DAGScheduler: Missing parents: List()
17/07/12 10:58:27 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (PythonRDD[10] at RDD at PythonRDD.scala:43), which has no missing parents
17/07/12 10:58:27 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 56764 ms on slave02 (1/3)
17/07/12 10:58:27 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 48675 ms on slave01 (2/3)
17/07/12 10:58:27 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 12.3 KB, free 494.9 KB)
17/07/12 10:58:27 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 6.7 KB, free 501.6 KB)
17/07/12 10:58:27 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.147.134:36618 (size: 6.7 KB, free: 511.1 MB)
17/07/12 10:58:27 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006
17/07/12 10:58:27 INFO scheduler.DAGScheduler: Submitting 10 missing tasks from ResultStage 1 (PythonRDD[10] at RDD at PythonRDD.scala:43)
17/07/12 10:58:27 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 10 tasks
17/07/12 10:58:29 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 3, slave01, partition 0,NODE_LOCAL, 2811 bytes)
17/07/12 10:58:29 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 4, slave02, partition 1,NODE_LOCAL, 2811 bytes)
17/07/12 10:58:33 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on slave02:32867 (size: 6.7 KB, free: 517.4 MB)
17/07/12 10:58:37 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on slave02:32867 (size: 19.5 KB, free: 517.4 MB)
17/07/12 10:58:40 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on slave01:44388 (size: 6.7 KB, free: 517.4 MB)
17/07/12 10:58:45 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on slave01:44388 (size: 19.5 KB, free: 517.4 MB)
17/07/12 11:00:34 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on slave01:44388 (size: 19.5 KB, free: 517.4 MB)
17/07/12 11:00:38 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on slave02:32867 (size: 19.5 KB, free: 517.4 MB)

TensorFlowOnSpark with YARN & Python 3

Good morning,

I am following the guide to install TFoS on Hadoop Cluster. At first point it is necessary install Python 2.7. Is possible to do it with Python 3?

Thank you in advance.

Run MNIST example on Hadoop Cluster

Hi, I got error when I run run MNIST example on Hadoop Cluster:

export PYTHON_ROOT=/home/hduser/workspace/spark/python
export LD_LIBRARY_PATH=${PATH}
export PYSPARK_PYTHON=${PYTHON_ROOT}/bin/python
export SPARK_YARN_USER_ENV="PYSPARK_PYTHON=Python/bin/python"
export PATH=${PYTHON_ROOT}/bin/:$PATH
export QUEUE=default

export HADOOP_HDFS_HOME=/opt/hadoop-2.6.0/share/hadoop/hdfs

/opt/spark-2.1.0-bin-hadoop2.6/bin/spark-submit \
--master yarn \
--queue ${QUEUE} \
--archives hdfs:///user/${USER}/Python.zip#Python,/home/hduser/workspace/spark/mnist/mnist.zip#mnist \
/home/hduser/workspace/TensorFlowOnSpark/examples/mnist/mnist_data_setup.py \
--output /home/hduser/workspace/spark/mnist/csv \
--format csv

error:

17/04/10 13:16:38 INFO client.RMProxy: Connecting to ResourceManager at student27-x1/10.42.0.100:8032
Exception in thread "main" java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.Hdfs not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074)
	at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:151)
	at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:242)
	at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:334)
	at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:331)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
	at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:331)
	at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:448)
	at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:470)
	at org.apache.hadoop.yarn.logaggregation.LogCLIHelpers.dumpAllContainersLogs(LogCLIHelpers.java:146)
	at org.apache.hadoop.yarn.client.cli.LogsCLI.run(LogsCLI.java:133)
	at org.apache.hadoop.yarn.client.cli.LogsCLI.main(LogsCLI.java:186)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.Hdfs not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1980)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2072)
	... 13 more

when I add:

export HADOOP_COMMON_HOME=/opt/hadoop-2.6.0/
export HADOOP_YARN_HOME=/opt/hadoop-2.6.0/share/hadoop/yarn

I got an error:
Error: Could not find or load main class org.apache.hadoop.yarn.client.cli.LogsCLI

I would like to know how to solve this problem

Thanks

setting up TensorFlowOnSpark on AWS EMR

Hello,
I'm trying to follow the steps on "GetStarted_YARN" to set up TensorFlowOnSpark on AWS EMR (master:c3.2xlarge, core:m3.xlarge). My small cluster is EMR-5.5.0.

I was not able to get through "pip install pydoop". I've tried manually setting "JAVA_HOME" and "HADOOP_HOME" and troubleshooting pydoop without success. It seemed that pydoop was missing the expected the env like "HADOOP_HOME", "HADOOP_VERSION" and etc, and that solving one issue leads to another. For the very first step, it is discouraging. Am I on the wrong path? What am I missing? Thanks ahead very much!

Cifar10: ArgumentError: argument --batch_size: conflicting option string(s): --batch_size

Hi, I can run mnist example on a hadoop cluster(without GPU) properly. However, when I run the Cifar10 example, I got an error. Are the sh codes proper? Does this example support CPU mode? Should I modify the batch size?

Thanks and regards

export PYTHON_ROOT=/home/hduser/Python
export LD_LIBRARY_PATH=${PATH}
export PYSPARK_PYTHON=${PYTHON_ROOT}/bin/python
export SPARK_YARN_USER_ENV="PYSPARK_PYTHON=Python/bin/python"
export PATH=${PYTHON_ROOT}/bin/:$PATH
export QUEUE=default

/opt/spark-2.1.0-bin-hadoop2.6/bin/spark-submit
--master yarn
--deploy-mode cluster
--queue ${QUEUE}
--py-files TensorFlowOnSpark/tfspark.zip,cifar10.zip
--conf spark.dynamicAllocation.enabled=false
--conf spark.yarn.maxAppAttempts=1
--archives hdfs:///user/${USER}/Python.zip#Python
--conf spark.executorEnv.LD_LIBRARY_PATH="$JAVA_HOME/jre/lib/amd64/server"
TensorFlowOnSpark/examples/cifar10/cifar10_multi_gpu_train.py
--data_dir hdfs:///user/${USER}/cifar10_data
--train_dir hdfs:///user/${USER}/cifar10_train
--max_steps 1000

Job aborted due to stage failure: Task 4 in stage 0.0 failed 4 times, most recent failure: Lost task 4.3 in stage 0.0 (TID 20, student28-x2, executor 7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/var/hadoop/hadoop-hduser/nm-local-dir/usercache/hduser/appcache/application_1491809547772_0061/container_1491809547772_0061_01_000008/pyspark.zip/pyspark/worker.py", line 174, in main
process()
File "/var/hadoop/hadoop-hduser/nm-local-dir/usercache/hduser/appcache/application_1491809547772_0061/container_1491809547772_0061_01_000008/pyspark.zip/pyspark/worker.py", line 169, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/var/hadoop/hadoop-hduser/nm-local-dir/usercache/hduser/appcache/application_1491809547772_0061/container_1491809547772_0061_01_000001/pyspark.zip/pyspark/rdd.py", line 2407, in pipeline_func
File "/var/hadoop/hadoop-hduser/nm-local-dir/usercache/hduser/appcache/application_1491809547772_0061/container_1491809547772_0061_01_000001/pyspark.zip/pyspark/rdd.py", line 2407, in pipeline_func
File "/var/hadoop/hadoop-hduser/nm-local-dir/usercache/hduser/appcache/application_1491809547772_0061/container_1491809547772_0061_01_000001/pyspark.zip/pyspark/rdd.py", line 2407, in pipeline_func
File "/var/hadoop/hadoop-hduser/nm-local-dir/usercache/hduser/appcache/application_1491809547772_0061/container_1491809547772_0061_01_000001/pyspark.zip/pyspark/rdd.py", line 346, in func
File "/var/hadoop/hadoop-hduser/nm-local-dir/usercache/hduser/appcache/application_1491809547772_0061/container_1491809547772_0061_01_000001/pyspark.zip/pyspark/rdd.py", line 794, in func
File "/var/hadoop/hadoop-hduser/nm-local-dir/usercache/hduser/appcache/application_1491809547772_0061/container_1491809547772_0061_01_000001/tfspark.zip/com/yahoo/ml/tf/TFSparkNode.py", line 411, in _mapfn
File "cifar10_multi_gpu_train.py", line 57, in main_fun
File "./cifar10.zip/cifar10.py", line 53, in
File "/var/hadoop/hadoop-hduser/nm-local-dir/usercache/hduser/appcache/application_1491809547772_0061/container_1491809547772_0061_01_000008/Python/local/lib/python2.7/site-packages/tensorflow/python/platform/flags.py", line 91, in DEFINE_integer
_define_helper(flag_name, default_value, docstring, int)
File "/var/hadoop/hadoop-hduser/nm-local-dir/usercache/hduser/appcache/application_1491809547772_0061/container_1491809547772_0061_01_000008/Python/local/lib/python2.7/site-packages/tensorflow/python/platform/flags.py", line 65, in _define_helper
type=flagtype)
File "/usr/lib/python2.7/argparse.py", line 1297, in add_argument
return self._add_action(action)
File "/usr/lib/python2.7/argparse.py", line 1671, in _add_action
self._optionals._add_action(action)
File "/usr/lib/python2.7/argparse.py", line 1498, in _add_action
action = super(_ArgumentGroup, self)._add_action(action)
File "/usr/lib/python2.7/argparse.py", line 1311, in _add_action
self._check_conflict(action)
File "/usr/lib/python2.7/argparse.py", line 1449, in _check_conflict
conflict_handler(action, confl_optionals)
File "/usr/lib/python2.7/argparse.py", line 1456, in _handle_conflict_error
raise ArgumentError(action, message % conflict_string)
ArgumentError: argument --batch_size: conflicting option string(s): --batch_size
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:

OnYarn Cannot run program "/etc/spark/conf.cloudera.spark_on_yarn/yarn-conf/topology.py" error=2, No such file or directory

Issue log๏ผš
17/03/18 09:19:45 WARN net.ScriptBasedMapping: Exception running /etc/spark/conf.cloudera.spark_on_yarn/yarn-conf/topology.py 192.168.116.11
java.io.IOException: Cannot run program "/etc/spark/conf.cloudera.spark_on_yarn/yarn-conf/topology.py" (in directory "/yarn/nm/usercache/hdfs/appcache/application_1489725206685_0079/container_1489725206685_0079_01_000001"): error=2, No such file or directory
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1047)
at org.apache.hadoop.util.Shell.runCommand(Shell.java:548)
at org.apache.hadoop.util.Shell.run(Shell.java:504)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:786)
at org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.runResolveCommand(ScriptBasedMapping.java:251)
at org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.resolve(ScriptBasedMapping.java:188)
at org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:119)
at org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101)
at org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81)
at org.apache.spark.scheduler.cluster.YarnScheduler.getRackForHost(YarnScheduler.scala:38)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$1.apply(TaskSchedulerImpl.scala:310)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$1.apply(TaskSchedulerImpl.scala:299)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:299)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:207)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receiveAndReply$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:172)
at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:104)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:217)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: error=2, No such file or directory

KeyError: 'input' error

When I run Tensorflow on Spark 2.1, I found some error log

Traceback (most recent call last):
  File "/usr/lib64/python2.7/multiprocessing/managers.py", line 207, in handle_request
    result = func(c, *args, **kwds)
  File "/usr/lib64/python2.7/multiprocessing/managers.py", line 386, in create
    obj = callable(*args, **kwds)
  File "./tfspark.zip/com/yahoo/ml/tf/TFManager.py", line 29, in <lambda>
    TFManager.register('get_queue', callable=lambda qname: qdict[qname])
KeyError: 'input'
---------------------------------------------------------------------------

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Slim throws all Variables must have their device set: name: "global_step" after upgrade tensorflow to v1.0.0

When running slim example, Tensorflow v0.12.0 is OK, but Tensorflow v1.0.0 will throw exception:

org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/YSPARK-2.2.0-2.6.0.d20170305-11.04.04-success_flag-6b0cfd9fa51aca4536d7c3f2a4bbceae11a50339/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 180, in main
    process()
  File "/opt/cloudera/parcels/YSPARK-2.2.0-2.6.0.d20170305-11.04.04-success_flag-6b0cfd9fa51aca4536d7c3f2a4bbceae11a50339/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 175, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/YSPARK-2.2.0-2.6.0.d20170305-11.04.04-success_flag-6b0cfd9fa51aca4536d7c3f2a4bbceae11a50339/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2407, in pipeline_func
  File "/opt/cloudera/parcels/YSPARK-2.2.0-2.6.0.d20170305-11.04.04-success_flag-6b0cfd9fa51aca4536d7c3f2a4bbceae11a50339/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2407, in pipeline_func
  File "/opt/cloudera/parcels/YSPARK-2.2.0-2.6.0.d20170305-11.04.04-success_flag-6b0cfd9fa51aca4536d7c3f2a4bbceae11a50339/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2407, in pipeline_func
  File "/opt/cloudera/parcels/YSPARK-2.2.0-2.6.0.d20170305-11.04.04-success_flag-6b0cfd9fa51aca4536d7c3f2a4bbceae11a50339/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 346, in func
  File "/opt/cloudera/parcels/YSPARK-2.2.0-2.6.0.d20170305-11.04.04-success_flag-6b0cfd9fa51aca4536d7c3f2a4bbceae11a50339/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 794, in func
  File "/data/M00/tandem/wangyuming/TensorFlowOnSpark/tfspark.zip/com/yahoo/ml/tf/TFSparkNode.py", line 235, in _mapfn
  File "/data/M00/tandem/wangyuming/TensorFlowOnSpark/examples/slim/train_image_classifier.py", line 603, in main_fun
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/tensorflow/contrib/slim/python/slim/learning.py", line 715, in train
    init_fn=init_fn)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/tensorflow/python/training/supervisor.py", line 336, in __init__
    self._verify_setup()
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/tensorflow/python/training/supervisor.py", line 881, in _verify_setup
    "their device set: %s" % op)
ValueError: When using replicas, all Variables must have their device set: name: "global_step"
op: "VariableV2"
attr {
  key: "_class"
  value {
    list {
      s: "loc:@global_step"
    }
  }
}
attr {
  key: "container"
  value {
    s: ""
  }
}
attr {
  key: "dtype"
  value {
    type: DT_INT64
  }
}
attr {
  key: "shape"
  value {
    shape {
    }
  }
}
attr {
  key: "shared_name"
  value {
    s: ""
  }
}


	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:317)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

To run slim on Tensorflow v1.0.0, I use tf_upgrade.py update some code:
https://github.com/wangyum/TensorFlowOnSpark/tree/slim-tensorflow-1.0

Nvidia library/driver mismatch

Hello,

Using ami-f6d25596 nvidia library is 367 version and "nvidia-smi" works properly. After a while nvidia library becomes nvidia-375 and i received the message "Failed to initialize NVML: Driver/library version mismatch".
There is some process updating library i couldn't find.

After restoring 367 version and rebooting instance it works again.

Synchronous RNN using SyncReplicasOptimizer

I'm trying to implement a synchronous distributed Recurrent Neural Network using TensorFlow on multiple servers. Here's the link to my code: https://github.com/tushar00jain/spark-ml/blob/master/rnn-sync.ipynb. I've also provided the relevant part below.

I want the computations within the same batch to happen in parallel but I think it's still computing separate RNNs on each worker server and updating the parameters on the parameter server separately. I know this because I am printing the _current_state variable after I run the graph for each batch. Also, the _total_loss for the same global step is different on each worker server.

I'm following the instructions provided at the following links: https://www.tensorflow.org/deploy/distributed#replicated_training https://www.tensorflow.org/api_docs/python/tf/train/SyncReplicasOptimizer

Is this a bug or is there something wrong with my code?

sess = sv.prepare_or_wait_for_session(server.target)
queue_runners = tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS)
sv.start_queue_runners(sess, queue_runners)

tf.logging.info('Started %d queues for processing input data.',
                len(queue_runners))

if is_chief:
        sv.start_queue_runners(sess, chief_queue_runners)
        sess.run(init_tokens_op)

print("{0} session ready".format(datetime.now().isoformat()))
#####################################################################

########################### training loop ###########################
_current_state = np.zeros((batch_size, state_size))
for batch_idx in range(args.steps):
    if sv.should_stop() or tf_feed.should_stop():
        break

    batchX, batchY = feed_dict(tf_feed.next_batch(batch_size))

    print('==========================================================')
    print(_current_state)

    if args.mode == "train":
        _total_loss, _train_step, _current_state, _predictions_series, _global_step = sess.run(
        [total_loss, train_step, current_state, predictions_series, global_step],
        feed_dict={
            batchX_placeholder:batchX,
            batchY_placeholder:batchY,
            init_state:_current_state
        })

        print(_global_step, batch_idx)
        print(_current_state)
        print('==========================================================')

        if _global_step % 5 == 0:
            print("Step", _global_step, "Loss", _total_loss)  

Help please: I get error trying to feed data via spark RDD - Jupyter notebook

I am running TFOS_spark_demo.ipynb on a Standalone 6-node Spark cluster.
#Feed data via Spark RDD
images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])
labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])
dataRDD = images.zip(labels)
cluster.train(dataRDD, args.epochs)

Error Below:

Py4JJavaError Traceback (most recent call last)
in ()
3 labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])
4 dataRDD = images.zip(labels)
----> 5 cluster.train(dataRDD, args.epochs)

/usr/lib/python2.7/site-packages/tensorflowonspark/TFCluster.pyc in train(self, dataRDD, num_epochs, qname)
83 rdds.append(dataRDD)
84 unionRDD = self.sc.union(rdds)
---> 85 unionRDD.foreachPartition(TFSparkNode.train(self.cluster_info, self.cluster_meta, qname))
86
87 def inference(self, dataRDD, qname='input'):

/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.pyc in foreachPartition(self, f)
765 except TypeError:
766 return iter([])
--> 767 self.mapPartitions(func).count() # Force evaluation
768
769 def collect(self):

/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.pyc in count(self)
1006 3
1007 """
-> 1008 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
1009
1010 def stats(self):

/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.pyc in sum(self)
997 6.0
998 """
--> 999 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
1000
1001 def count(self):

/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.pyc in fold(self, zeroValue, op)
871 # zeroValue provided to each partition is unique from the one provided
872 # to the final reduce call
--> 873 vals = self.mapPartitions(func).collect()
874 return reduce(op, vals, zeroValue)
875

/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.pyc in collect(self)
774 """
775 with SCCallSiteSync(self.context) as css:
--> 776 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
777 return list(_load_from_socket(port, self._jrdd_deserializer))
778

/opt/mapr/spark/spark-2.0.1/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py in call(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:

/opt/mapr/spark/spark-2.0.1/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()

/opt/mapr/spark/spark-2.0.1/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 44 in stage 1.0 failed 4 times, most recent failure: Lost task 44.3 in stage 1.0 (TID 96, 172.16.10.54): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/mapr/spark/spark-2.0.1/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/opt/mapr/spark/spark-2.0.1/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.py", line 2371, in pipeline_func
return func(split, prev_func(split, iterator))
File "/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.py", line 2371, in pipeline_func
return func(split, prev_func(split, iterator))
File "/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.py", line 2371, in pipeline_func
return func(split, prev_func(split, iterator))
File "/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.py", line 317, in func
return f(iterator)
File "/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.py", line 762, in func
r = f(it)
File "/usr/lib/python2.7/site-packages/tensorflowonspark/TFSparkNode.py", line 432, in _train
mgr = _get_manager(cluster_info, socket.gethostname(), os.getppid())
File "/usr/lib/python2.7/site-packages/tensorflowonspark/TFSparkNode.py", line 65, in _get_manager
logging.info("Connected to TFSparkNode.mgr on {0}, ppid={1}, state={2}".format(host, ppid, str(TFSparkNode.mgr.get('state'))))
AttributeError: 'NoneType' object has no attribute 'get'

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1893)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1906)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1933)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.collect(RDD.scala:911)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/mapr/spark/spark-2.0.1/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/opt/mapr/spark/spark-2.0.1/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.py", line 2371, in pipeline_func
return func(split, prev_func(split, iterator))
File "/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.py", line 2371, in pipeline_func
return func(split, prev_func(split, iterator))
File "/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.py", line 2371, in pipeline_func
return func(split, prev_func(split, iterator))
File "/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.py", line 317, in func
return f(iterator)
File "/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.py", line 762, in func
r = f(it)
File "/usr/lib/python2.7/site-packages/tensorflowonspark/TFSparkNode.py", line 432, in _train
mgr = _get_manager(cluster_info, socket.gethostname(), os.getppid())
File "/usr/lib/python2.7/site-packages/tensorflowonspark/TFSparkNode.py", line 65, in _get_manager
logging.info("Connected to TFSparkNode.mgr on {0}, ppid={1}, state={2}".format(host, ppid, str(TFSparkNode.mgr.get('state'))))
AttributeError: 'NoneType' object has no attribute 'get'

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

CIFAR multi-gpu example not actually utilizing GPUs

I am trying to run a the multi-GPU CIFAR example on YARN cluster with InfiniBand. I have followed the instructions as provided in the instruction. I execute CNN on Spark using the following command:

export NUM_GPU=2
export MEMORY=$((NUM_GPU * 11))
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 3 \
--executor-memory ${MEMORY}G \
--py-files ${TFoS_HOME}/tfspark.zip,cifar10.zip \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--archives hdfs:///Python.zip#Python \
--conf spark.executorEnv.LD_LIBRARY_PATH="/usr/local/cuda/lib64:$JAVA_HOME/jre/lib/amd64/server" \
--driver-library-path="/usr/local/cuda/lib64" \
${TFoS_HOME}/examples/cifar10/cifar10_multi_gpu_train.py \
--data_dir ${CIFAR10_DATA} \
--train_dir hdfs:///cifar10_train \
--max_steps 1000 \
--num_gpus ${NUM_GPU} \
--rdma \
--tensorboard

I have NVIDIA Tesla K80 GPUs installed on each node. Each node has two GPUs per node and each GPU has approximately 11GB of memory and that is how I am calculating the memory proxy for GPUs for YARN.

In order to check how much (if it all) of the GPUs or utilized on each node I run the nvidia-smi command but surprisingly none of the nodes show any GPU process running. The output of this command on one of the NodeManger is:


> +-----------------------------------------------------------------------------+
> | NVIDIA-SMI 367.48                 Driver Version: 367.48                    |
> |-------------------------------+----------------------+----------------------+
> | GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
> | Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
> |===============================+======================+======================|
> |   0  Tesla K80           Off  | 0000:05:00.0     Off |                    0 |
> | N/A   48C    P0    60W / 149W |      0MiB / 11439MiB |      0%      Default |
> +-------------------------------+----------------------+----------------------+
> |   1  Tesla K80           Off  | 0000:06:00.0     Off |                    0 |
> | N/A   36C    P0    76W / 149W |      0MiB / 11439MiB |    100%      Default |
> +-------------------------------+----------------------+----------------------+
>                                                                                
> +-----------------------------------------------------------------------------+
> | Processes:                                                       GPU Memory |
> |  GPU       PID  Type  Process name                               Usage      |
> |=============================================================================|
> |  No running processes found                                                 |
> +-----------------------------------------------------------------------------+

When I run ps aux I see the relevant Python daemons running on each of the NodeManger nodes. The top command also shows Python consuming quite a lot of CPU cycles.

top - 01:19:04 up 89 days, 10:21,  1 user,  load average: 3.07, 3.29, 2.56
Tasks: 555 total,   2 running, 553 sleeping,   0 stopped,   0 zombie
%Cpu(s):  1.1 us,  0.1 sy,  0.0 ni, 98.9 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem : 13173779+total, 10681584+free,  3698244 used, 21223708 buff/cache
KiB Swap:  3898308 total,  3835176 free,    63132 used. 12656761+avail Mem 

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND                                                                 
31404 javed.19  20   0 1425744  37624   6628 R  93.3  0.0  21:41.69 python                                                                  
31362 javed.19  20   0 2184100 685188  32024 S  26.7  0.5   6:25.82 java                                                                    
    1 root      20   0   41368   3672   2260 S   0.0  0.0   2:42.49 systemd                                                                 
    2 root      20   0       0      0      0 S   0.0  0.0   0:02.13 kthreadd                                                                
    3 root      20   0       0      0      0 S   0.0  0.0 146:11.03 ksoftirqd/0                                                             
    5 root       0 -20       0      0      0 S   0.0  0.0   0:00.00 kworker/0:0H                                                            
    8 root      rt   0       0      0      0 S   0.0  0.0   3:02.66 migration/0  

What do you guys think is happening here? Would really appreciate any help.

Problem when determinate `job_name` and `task_index` for each spark worker

There is a problem occurs when run the following lines in TFSparkNode.py:

       # assign TF job/task based on provided cluster_spec template (or use default/null values)
        job_name = 'default'
        task_index = -1
        for jobtype in cluster_spec:
            nodes = cluster_spec[jobtype]
            if worker_num in nodes:
               job_name = jobtype
               task_index = nodes.index(worker_num)
               break;

Assume that I have a cluster which includes 5 spark workers - 1 non-GPU worker and 4 GPUs worker. I need the non-gpu worker is to run a ps tensorflow job but Spark can allocate it to a GPUs worker. So, this make a worker tensorflow job will run on the non-GPU worker.

MNIST traning on TFoS takes much longer time than on native TensorFlow

I have a Spark cluster, each of them has 64 cores and 256 G RAM, I tried run MNIST training with TFoS and TF only, the results was out of my expectation.

The training on TFoS took took 25m53s with 100 epochs, here is the script:

spark-submit
--master yarn
--deploy-mode cluster
--num-executors 3
--executor-memory 32G
--py-files TensorFlowOnSpark/tfspark.zip,TensorFlowOnSpark/examples/mnist/spark/mnist_dist.py
--conf spark.dynamicAllocation.enabled=false
--conf spark.yarn.maxAppAttempts=1
--archives hdfs:///user/spark/Python.zip#Python
TensorFlowOnSpark/examples/mnist/spark/mnist_spark.py
--images mnist/csv/train/images
--labels mnist/csv/train/labels
--mode train
--model mnist_model
--epochs 10

default

However it only took 834s for training with TF only with 1000 epochs.

I didn't have chance to investigate to TFoS, is the performance related to training data size and some other elements?

Hang tensorflowonspark with example in standalonecluster mode

Hello, thanks for your works.

I try to execute "Run distributed MNIST training (using feed_dict)" but the execution hang. I dont no why.

  • Example Tensorflow (python ${TFoS_HOME}/tensorflow/tensorflow/examples/tutorials/mnist/mnist_with_summaries.py --data_dir ${TFoS_HOME}/mnist) is ok. 97% accuracy moreless.
  • Example "Convert the MNIST zip files" also ok

Finally, Tranning mnist hang. Could you help me please? I try at single computer.
Xubuntu 16.04.2.LTS
CUDA 8.0 with CUDADNN 5.1.5
Spark 1.6.0
Python 2.7
Tensorflow-GPU 0.12

My variables:
export TFoS_HOME=/home/roberto/TensorFlowOnSpark
export SPARK_HOME=/home/roberto/TensorFlowOnSpark/scripts/spark-1.6.0-bin-hadoop2.6
export PATH=${SPARK_HOME}/bin:${PATH}
export MASTER=spark://$(hostname):7077
export SPARK_WORKER_INSTANCES=2
export CORES_PER_WORKER=1
export TOTAL_CORES=$((${CORES_PER_WORKER}*${SPARK_WORKER_INSTANCES}))

Attach image of master spark and execution. also, I attach spark output execution file.
photo_2017-05-07_13-40-48
photo_2017-05-07_13-40-54

ouput.txt
Thanks you very much.

cifar10: Failed to find file

Traceback (most recent call last):
File "/opt/modules/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
process()
File "/opt/modules/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/modules/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2407, in pipeline_func
File "/opt/modules/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2407, in pipeline_func
File "/opt/modules/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2407, in pipeline_func
File "/opt/modules/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2407, in pipeline_func
File "/opt/modules/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 346, in func
File "/opt/modules/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 794, in func
File "/opt/modules/TensorFlowOnSpark/tfspark.zip/tensorflowonspark/TFSparkNode.py", line 421, in _mapfn
File "/opt/modules/TensorFlowOnSpark/examples/cifar10/cifar10_multi_gpu_train.py", line 273, in main_fun
train()
File "/opt/modules/TensorFlowOnSpark/examples/cifar10/cifar10_multi_gpu_train.py", line 180, in train
loss = tower_loss(scope)
File "/opt/modules/TensorFlowOnSpark/examples/cifar10/cifar10_multi_gpu_train.py", line 83, in tower_loss
images, labels = cifar10.distorted_inputs()
File "./cifar10.zip/cifar10.py", line 156, in distorted_inputs
batch_size=FLAGS.batch_size)
File "./cifar10.zip/cifar10_input.py", line 155, in distorted_inputs
raise ValueError('Failed to find file: ' + f)
ValueError: Failed to find file: hdfs://XXXX/examples/cifar-10/cifar-10-batches-bin/data_batch_1.bin

However, HDFS does have this file,

TFoS can't export model

I append some code in mnist_dist.py file, like this:

while not sv.should_stop() and step < args.steps:
   # Run a training step asynchronously.
   # See `tf.train.SyncReplicasOptimizer` for additional details on how to
   # perform *synchronous* training.

   # using feed_dict
   batch_xs, batch_ys = feed_dict()
   feed = {x: batch_xs, y_: batch_ys}

   if len(batch_xs) != batch_size:
       print("done feeding")
       break
   else:
       if args.mode == "train":
           _, step = sess.run([train_op, global_step], feed_dict=feed)
           # print accuracy and save model checkpoint to HDFS every 100 steps
           if (step % 100 == 0):
               print("{0} step: {1} accuracy: {2}".format(datetime.now().isoformat(), step,
                                                          sess.run(accuracy,
                                                                   {x: batch_xs, y_: batch_ys})))
       else:  # args.mode == "inference"
           labels, preds, acc = sess.run([label, prediction, accuracy], feed_dict=feed)

           results = ["{0} Label: {1}, Prediction: {2}".format(datetime.now().isoformat(), l, p) for
                      l, p in zip(labels, preds)]
           TFNode.batch_results(ctx.mgr, results)
           print("acc: {0}".format(acc))

 if sv.is_chief:
   print("save model to:{}=======1".format(local_model_dir))
   sess.graph._unsafe_unfinalize()
   classification_inputs = utils.build_tensor_info(x)
   classification_outputs_classes = utils.build_tensor_info(y)
   print ('begin exporting!======11111')
   classification_signature = signature_def_utils.build_signature_def(
       inputs={signature_constants.CLASSIFY_INPUTS: classification_inputs},
       outputs={
           signature_constants.CLASSIFY_OUTPUT_CLASSES:
               classification_outputs_classes
       },
       method_name=signature_constants.CLASSIFY_METHOD_NAME)
   print ('begin exporting!======22222')
   tensor_info_x = utils.build_tensor_info(x)
   tensor_info_y = utils.build_tensor_info(y)
   print ('begin exporting!======33333')
   prediction_signature = signature_def_utils.build_signature_def(
       inputs={'images': tensor_info_x},
       outputs={'scores': tensor_info_y},
       method_name=signature_constants.PREDICT_METHOD_NAME)
   print ('begin exporting!======44444')
   legacy_init_op = tf.group(tf.tables_initializer(), name='legacy_init_op')
   print ('begin exporting!======55555')
   builder = saved_model_builder.SavedModelBuilder(local_model_dir)
   print('begin exporting!======66666')
   builder.add_meta_graph_and_variables(
       sess, [tag_constants.SERVING],
       signature_def_map={
           'predict_images':
               prediction_signature,
           signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY:
               classification_signature,
       },clear_devices=True,legacy_init_op=legacy_init_op)
   print ('begin exporting!======77777')
   builder.save()

but the chief worker only exec to this print('begin exporting!======66666') line, the task end without executing the rest code.

and the chief worker log as bellow:

  save model to:/data/mnist/1/0=======1
  begin exporting!======11111
  begin exporting!======22222
  begin exporting!======33333
  begin exporting!======44444
  INFO:tensorflow:No assets to save.2017-04-07 16:31:23,479 INFO (Thread-1-30287) No assets to save.
  INFO:tensorflow:No assets to write.
  2017-04-07 16:31:23,479 INFO (Thread-1-30287) No assets to write.
  begin exporting!======55555
  begin exporting!======66666
  2017-04-07 16:31:23,520 INFO (MainThread-30287) Feeding None into output queue
  2017-04-07 16:31:23,528 INFO (MainThread-30287) Setting mgr.state to 'stopped'17/04/07 16:31:23 INFO python.PythonRunner: Times: total = 123, boot = -28156, init = 28200, finish = 7917/04/07 16:31:23 INFO executor.Executor: Finished task 4.0 in stage 3.0 (TID 36). 2102 bytes result sent to driver17/04/07 16:31:29 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a shutdown
  17/04/07 16:31:29 INFO memory.MemoryStore: MemoryStore cleared
  17/04/07 16:31:29 INFO storage.BlockManager: BlockManager stopped
  17/04/07 16:31:29 INFO util.ShutdownHookManager: Shutdown hook called17/04/07 16:31:29 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-a281a003-75aa-4d20-8f9d-93e3dbe8874a/executor-35ec5b73-87a6-4fa2-b895-b5783cace0d8/spark-957ff0d0-334b-42cc-98fb-4995ab0f871517/04/07 16:31:29 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM:29 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-a281a003-75aa-4d20-8f9d-93e3dbe8874a/executor-35ec5b73-87a6-4fa2-b895-b5783cace0d8/spark-957ff0d0-334b-42cc-98fb-4995ab0f8715  

how can i make the task work well on exporting model. BTW, this export model code can work well on tensorflow distribution. thanks a lot

TensorFlowOnSpark consume too much executor memory

Hi,

I've tried the solution on a Spark enabled Yarn cluster (CPU only). I noticed that it requires very high executor memory. For the mnist example, the actual model is very small, but I need to specify around 27GB executor memory to get it running using 10 executors. The amount of required memory increases when I tried a larger dataset.

Could someone explain a bit why it requires so much memory to run on spark? Is it related to the message exchange among ps and workers as in tensorflow/tensorflow#6508 ?

Thank you very much for the help!
Yiming.

Installation and operation

Thanks for your efforts in developing TensorFlowOnSpark and making it opensource. I have two doubts regarding this new confluence of 2 most popular paradigms:-

  1. Would my cluster work if have only Spark and Tensorflow without Hadoop?

  2. How does the cluster behave if one of the node fails? Would the whole operation fail or is the cluster fault-tolerant?

No speedup when training CIFAR-10 on 2 vs 1 node on stand-alone Spark

Hi,

I ran the CIFAR-10 example on a 2 node stand-alone Spark cluster, both nodes have a single GPU. What I expected to see, was that running with both nodes would be faster than running on a single node. What I'm seeing instead is no scaling at all. The node with the fastest GPU finishes first, sitting idle until the other node also finishes (I'm looking at: watch -n1 nvidia-smi).
Also, when training on a single node, the faster GPU is barely performing better (bottleneck?).

Here are some more details:

Command to run with both nodes:
$ export SPARK_WORKER_INSTANCES=2
$ time /home/cuno/opt/spark/current/bin/spark-submit
--master spark://spark-master:7077
--py-files /tmp/cifar10.zip
--conf spark.task.cpus=1
--conf spark.cores.max=2
--conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-8-oracle
--num-executors 2
--conf spark.dynamicAllocation.enabled=false
--conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-8-oracle
--conf spark.executorEnv.LD_LIBRARY_PATH=/usr/local/cuda/lib64:/usr/lib/jvm/java-8-oracle/jre/lib/amd64/server
/home/cuno/nfs/TensorFlowOnSpark/examples/cifar10/cifar10_multi_gpu_train.py
--cluster_size 2
--data_dir file:///home/cuno/nfs/data
--train_dir file:///home/cuno/nfs/TensorFlowOnSpark/cifar10_model
--max_steps 1000
--num_gpus 1

Command to run with a single node:
$ export SPARK_WORKER_INSTANCES=1
$ time /home/cuno/opt/spark/current/bin/spark-submit
--master spark://spark-master:7077
--py-files /tmp/cifar10.zip
--conf spark.task.cpus=1
--conf spark.cores.max=1
--conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-8-oracle
--num-executors 1
--conf spark.dynamicAllocation.enabled=false
--conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-8-oracle
--conf spark.executorEnv.LD_LIBRARY_PATH=/usr/local/cuda/lib64:/usr/lib/jvm/java-8-oracle/jre/lib/amd64/server
/home/cuno/nfs/TensorFlowOnSpark/examples/cifar10/cifar10_multi_gpu_train.py
--cluster_size 1
--data_dir file:///home/cuno/nfs/data
--train_dir file:///home/cuno/nfs/TensorFlowOnSpark/cifar10_model
--max_steps 1000
--num_gpus 1

*all data is on NFS. Also tried duplicating all data to each node's disk, it didn't change performance.
*there always is around 40% to 50% CPU usage on all 12 threads (Xeon E5-2620 v2 @ 2.10GHz)

2 NODES -- GTX 1080Ti + GTX 960

run#1
real 3m15.706s
user 2m22.196s
sys 1m28.836s
run#2
real 3m6.436s
user 2m16.408s
sys 1m19.272s
run#3
real 2m52.505s
user 2m1.828s
sys 1m20.064s

*1 NODE -- GTX 960 About 50% GPU usage

run#1
real 3m7.890s
user 2m19.144s
sys 1m17.528s
run#2
real 2m58.398s
user 2m2.948s
sys 1m16.152s
run#3
real 3m4.228s
user 2m10.668s
sys 1m26.384s

*1 NODE -- GTX 1080Ti About 30% GPU usage

run#1
real 2m42.492s
user 1m57.824s
sys 1m7.904s
run#2
real 2m26.508s
user 1m43.516s
sys 0m58.988s
run#3
real 2m38.843s
user 1m54.968s
sys 1m5.608s

Am I running the example wrong? What I want is to just see any example of training a model where training time decreases when running on more nodes.

MNIST Standalone hangs when using many epochs

I am using TF 1.2 and Spark 2.0.2 with the pip install version of TFoS.
The command I used is this one:

 PYSPARK_PYTHON=${OTHER_PYTHON} ${SPARK_HOME}/bin/spark-submit  \
--num-executors 12 --executor-memory 55G   \
--master ${MASTER}  \
--py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist.py  \
--conf spark.executor.cores=1 \ 
--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME"  \
${TFoS_HOME}/examples/mnist/spark/mnist_spark.py  \
--cluster_size 12  \
--images ${TFoS_HOME}/examples/mnist/csv/train/images  \
--labels ${TFoS_HOME}/examples/mnist/csv/train/labels  \
--format csv --mode train --model mnist_model --epochs 10

I see from some workers that the supervisor gets shutdown probably because of the # of epochs has been reached.

spark-2.0.2-bin-hadoop2.7/work/cluster-2017-07-19_14-56-26/s091/app-20170719182907-0032/10/stderr:2017-07-19T18:29:32.677071 stopping supervisor
spark-2.0.2-bin-hadoop2.7/work/cluster-2017-07-19_14-56-26/s091/app-20170719182907-0032/11/stderr:2017-07-19T18:29:32.439126 stopping supervisor
spark-2.0.2-bin-hadoop2.7/work/cluster-2017-07-19_14-56-26/s091/app-20170719182907-0032/8/stderr:2017-07-19T18:29:33.170899 stopping supervisor
spark-2.0.2-bin-hadoop2.7/work/cluster-2017-07-19_14-56-26/s091/app-20170719182907-0032/9/stderr:2017-07-19T18:29:32.908222 stopping supervisor
spark-2.0.2-bin-hadoop2.7/work/cluster-2017-07-19_14-56-26/s092/app-20170719182907-0032/4/stderr:2017-07-19T18:29:32.562425 stopping supervisor
spark-2.0.2-bin-hadoop2.7/work/cluster-2017-07-19_14-56-26/s092/app-20170719182907-0032/5/stderr:2017-07-19T18:29:33.010174 stopping supervisor
spark-2.0.2-bin-hadoop2.7/work/cluster-2017-07-19_14-56-26/s092/app-20170719182907-0032/7/stderr:2017-07-19T18:29:32.517486 stopping supervisor
spark-2.0.2-bin-hadoop2.7/work/cluster-2017-07-19_14-56-26/s093/app-20170719182907-0032/0/stderr:2017-07-19T18:29:32.637912 stopping supervisor
spark-2.0.2-bin-hadoop2.7/work/cluster-2017-07-19_14-56-26/s093/app-20170719182907-0032/2/stderr:2017-07-19T18:29:32.485645 stopping supervisor
spark-2.0.2-bin-hadoop2.7/work/cluster-2017-07-19_14-56-26/s093/app-20170719182907-0032/3/stderr:2017-07-19T18:29:32.460439 stopping supervisor

And from one of the workers I get that they keep on waiting for the those tasks that were shutdown.

2017-07-19 18:39:02.906223: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0
2017-07-19 18:39:02.906259: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:1
2017-07-19 18:39:02.906268: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:2
2017-07-19 18:39:02.906275: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:3
2017-07-19 18:39:02.906283: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:4
2017-07-19 18:39:02.906290: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:5
2017-07-19 18:39:02.906297: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:6
2017-07-19 18:39:02.906304: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:8
2017-07-19 18:39:02.906311: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:9
2017-07-19 18:39:02.906318: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:10

Spark output stops here:

17/07/19 18:29:31 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 251 on executor id: 2 hostname: 192.168.14.93.
17/07/19 18:29:31 INFO TaskSetManager: Finished task 190.0 in stage 4.0 (TID 242) in 211 ms on 192.168.14.93 (189/200)
17/07/19 18:29:31 INFO TaskSetManager: Finished task 189.0 in stage 4.0 (TID 241) in 272 ms on 192.168.14.91 (190/200)
17/07/19 18:29:31 INFO TaskSetManager: Finished task 191.0 in stage 4.0 (TID 243) in 270 ms on 192.168.14.91 (191/200)
17/07/19 18:29:31 INFO TaskSetManager: Finished task 195.0 in stage 4.0 (TID 247) in 217 ms on 192.168.14.92 (192/200)
17/07/19 18:29:31 INFO TaskSetManager: Finished task 194.0 in stage 4.0 (TID 246) in 239 ms on 192.168.14.91 (193/200)
17/07/19 18:29:31 INFO TaskSetManager: Finished task 193.0 in stage 4.0 (TID 245) in 249 ms on 192.168.14.93 (194/200)
17/07/19 18:29:31 INFO TaskSetManager: Finished task 192.0 in stage 4.0 (TID 244) in 259 ms on 192.168.14.93 (195/200)
17/07/19 18:29:31 INFO TaskSetManager: Finished task 196.0 in stage 4.0 (TID 248) in 226 ms on 192.168.14.91 (196/200)
17/07/19 18:29:31 INFO TaskSetManager: Finished task 198.0 in stage 4.0 (TID 250) in 254 ms on 192.168.14.92 (197/200)
17/07/19 18:29:31 INFO TaskSetManager: Finished task 199.0 in stage 4.0 (TID 251) in 248 ms on 192.168.14.93 (198/200)
17/07/19 18:29:31 INFO TaskSetManager: Finished task 197.0 in stage 4.0 (TID 249) in 288 ms on 192.168.14.92 (199/200)

This however does seem like a race condition somewhere because this works some times, and others it just hangs. @leewyang any idea on where to start looking at please?

KeyError: 'input' error

Hello,

I am trying TFoS, and successfully converted the MNIST zip files into HDFS files following the link here. However, when I try to run the sample named "Run distributed MNIST training (using feed_dict)", I encountered the following error saying "KeyError: 'input' error" and I have no clue on what's going on.

My environment is:
Spark 2.1.1 + YARN/Hadoop 2.6 + latest TFoS (master branch) + latest TF (1.2.0). I am using YARN-client mode. Below is the command line I am using. I have a small cluster of 2 nodes (I remember some issue mentioned that the executor number should be better be set to the cluster node number, not quite sure though) and I am sure that libhdfs.so and libjvm.so is in the LD_LIBRARY_PATH.

Also I did a bit research and found a similar issue #32 with a couple of other issues - However since that issue was on Feb, I thought all the fixes should be merged in the latest branches. That's why I am raising this issue again.

I am also attaching the full logs Untitled-1.txt in case you need more information.

Thanks for looking at it!


spark-submit \
--master yarn \
--deploy-mode client \
--num-executors 2 \
--queue ${QUEUE} \
--executor-memory 4G \
--py-files TensorFlowOnSpark/tfspark.zip,TensorFlowOnSpark/examples/mnist/spark/mnist_dist.py \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--archives wasb:///Python.zip#Python \
--conf spark.executorEnv.LD_LIBRARY_PATH="$JAVA_HOME/jre/lib/amd64/server:/usr/hdp/2.6.0.10-29/usr/lib/" \
--conf spark.executorEnv.PYSPARK_PYTHON="Python/bin/python" \
--jars ecosystem/hadoop/target/tensorflow-hadoop-1.0-SNAPSHOT.jar \
TensorFlowOnSpark/examples/mnist/spark/mnist_spark.py \
--images wasb:///user/sshuser/mnist/csv/train/images \
--labels wasb:///user/sshuser/mnist/csv/train/labels \
--mode train \
--model mnist_model




17/06/17 05:20:09 INFO TaskSetManager: Starting task 6.3 in stage 1.0 (TID 17, 10.0.0.11, executor 1, partition 6, PROCESS_LOCAL, 6768 bytes)
17/06/17 05:20:09 INFO TaskSetManager: Lost task 5.3 in stage 1.0 (TID 16) on 10.0.0.11, executor 1: org.apache.spark.api.python.PythonException (Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/rdd.py", line 2408, in pipeline_func
  File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/rdd.py", line 2408, in pipeline_func
  File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/rdd.py", line 2408, in pipeline_func
  File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/rdd.py", line 345, in func
  File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/rdd.py", line 793, in func
  File "/home/sshuser/TensorFlowOnSpark/tfspark.zip/tensorflowonspark/TFSparkNode.py", line 433, in _train
  File "/home/sshuser/Python/lib/python2.7/multiprocessing/managers.py", line 667, in temp
    token, exp = self._create(typeid, *args, **kwds)
  File "/home/sshuser/Python/lib/python2.7/multiprocessing/managers.py", line 567, in _create
    id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
  File "/home/sshuser/Python/lib/python2.7/multiprocessing/managers.py", line 105, in dispatch
    raise convert_to_error(kind, result)
RemoteError:
---------------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/sshuser/Python/lib/python2.7/multiprocessing/managers.py", line 207, in handle_request
    result = func(c, *args, **kwds)
  File "/home/sshuser/Python/lib/python2.7/multiprocessing/managers.py", line 386, in create
    obj = callable(*args, **kwds)
  File "./tfspark.zip/tensorflowonspark/TFManager.py", line 34, in <lambda>
    TFManager.register('get_queue', callable=lambda qname: qdict[qname])
KeyError: 'input'
---------------------------------------------------------------------------

Very low gpu utilization on Inception V3 example on stand-alone Spark

Hi,

I ran the imagenet example on a 2 node stand-alone Spark cluster, both nodes have a single GPU.
GPU utilization is not what I was hoping for. On one node it's constantly between 4% and 8%, this node also has suspiciously little information in it's log file. It ends with "Started server with target: grpc://localhost:42158".
On the other node utilization sometimes jumps to higher values for like a fraction of a second, up to about 85% (I'm looking at: watch -n1 nvidia-smi).
Also, it seems only one node is writing to /tmp/imagenet_distributed_train_model, the one with the higher utilization.

$ export SPARK_WORKER_INSTANCES=2
$ time /home/braincreator/opt/spark/current/bin/spark-submit
--master spark://spark-master:7077
--py-files /tmp/inception.zip
--conf spark.task.cpus=6
--conf spark.cores.max=12
--conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-8-oracle
--num-executors 2
--executor-memory 1G
--conf spark.dynamicAllocation.enabled=false
--conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-8-oracle
--conf spark.executorEnv.LD_LIBRARY_PATH=/usr/local/cuda/lib64:/usr/lib/jvm/java-8-oracle/jre/lib/amd64/server
--driver-library-path=/usr/local/cuda/lib64
/home/braincreator/nfs/TensorFlowOnSpark/examples/imagenet/inception/imagenet_distributed_train.py
--cluster_size 2
--data_dir file:///data/braincreator/ImageNet/tfrecords
--train_dir file:///tmp/imagenet_distributed_train_model
--max_steps 1000
--batch_size=32
--subset train

data_dir is local to each node, I simply copied the data onto each filesystem.

logs:
brain002.log.txt
brain003.log.txt

Any clues on how to fix this?

Can't find tensorboard executable file when enable tensorboard

Run distributed MNIST training and enable tensorboard:

${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 4 \
--executor-memory 27G \
--py-files TensorFlowOnSpark/tfspark.zip,TensorFlowOnSpark/examples/mnist/spark/mnist_dist.py \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--archives hdfs:///user/${USER}/Python.zip#Python \
--conf spark.executorEnv.LD_LIBRARY_PATH="/usr/local/cuda-7.5/lib64:$JAVA_HOME/jre/lib/amd64/server" \
--driver-library-path="/usr/local/cuda-7.5/lib64" \
TensorFlowOnSpark/examples/mnist/spark/mnist_spark.py \
--images mnist/csv/train/images \
--labels mnist/csv/train/labels \
--tensorboard \
--mode train \
--model mnist_model

Exception:

17/04/13 18:07:03 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/YSPARK-2.1.0-2.6.0.d20170113-15.36.26/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 180, in main
    process()
  File "/opt/cloudera/parcels/YSPARK-2.1.0-2.6.0.d20170113-15.36.26/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 175, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/YSPARK-2.1.0-2.6.0.d20170113-15.36.26/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2409, in pipeline_func
  File "/opt/cloudera/parcels/YSPARK-2.1.0-2.6.0.d20170113-15.36.26/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2409, in pipeline_func
  File "/opt/cloudera/parcels/YSPARK-2.1.0-2.6.0.d20170113-15.36.26/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2409, in pipeline_func
  File "/opt/cloudera/parcels/YSPARK-2.1.0-2.6.0.d20170113-15.36.26/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 346, in func
  File "/opt/cloudera/parcels/YSPARK-2.1.0-2.6.0.d20170113-15.36.26/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 794, in func
  File "/data/M00/tandem/wangyuming/TensorFlowOnSpark/tfspark.zip/com/yahoo/ml/tf/TFSparkNode.py", line 319, in _mapfn
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/subprocess.py", line 711, in __init__
    errread, errwrite)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/subprocess.py", line 1343, in _execute_child
    raise child_exception
OSError: [Errno 2] No such file or directory

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:320)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

MNIST example cannot run because of RDD.zip()

I am using Spark 2.0.2 in StandAlone mode, TensorFlow 1.2 and pip provided TensorFlowOnSpark, but the same problem happens with Spark 1.6.0. Although I followed the instructions from the standalone wiki page (i.e. I generated the training data for the specific cluster size) I would always encounter:

org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition

The problem is that the zip operation assumes that the number of partitions AND the number of elements within each partition will be the same. Spark guarantees the number of partitions but not that all partitions have exactly the same number of elements. Thus, the operation of zipping the images with the labels fails.

Running error by using Jupyter. An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.

I put the spark/mnist_spark.py file onto the jupyter notebook for running. But there is a weird error "An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe." What's the meaning of this error message?

The jupyter notebook starts with ipython shell. I import pyspark and input the configuration by using pyspark.SparkConf(). There is no problem to create the TFcluster. But when it came to cluster.train, it crashed and popped out the error message. The following is my running code and result. Thank you for helping!

# %load mnist_spark.py
# Copyright 2017 Yahoo Inc.
# Licensed under the terms of the Apache 2.0 license.
# Please see LICENSE file in the project root for terms.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import pyspark
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

import argparse
import os
import numpy
import sys
import tensorflow as tf
import threading
import time
from datetime import datetime

from tensorflowonspark import TFCluster
import mnist_dist
sys.argv = sys.argv[:1]

conftfos = pyspark.SparkConf().setAll([('spark.yarn.queue','gpu'),('spark.executor.instances','2'),('spark.executor.cores','2'),('spark.executor.memory','10G'),('spark.dynamicAllocation.enabled','False'),('spark.yarn.maxAppAttempts','1'),('spark.executorEnv.LD_LIBRARY_PATH','/opt/cloudera/parcels/CDH/lib64:$JAVA_HOME/jre/lib/amd64/server')])

sc = pyspark.SparkContext(master="yarn",conf=conftfos)
sc.addFile('hdfs:///user/jupyter/Python.zip')
sc.addPyFile('/home/jupyter/TensorflowOnSpark/TensorFlowOnSpark/examples/mnist/spark/mnist_dist.py')
sc.addPyFile('/home/jupyter/TensorflowOnSpark/TensorFlowOnSpark/tfspark.zip')
executors = sc._conf.get("spark.executor.instances")
num_executors = int(executors) if executors is not None else 1
num_ps = 1
parser = argparse.ArgumentParser()
parser.add_argument("-b", "--batch_size", help="number of records per batch", type=int, default=100)
parser.add_argument("-e", "--epochs", help="number of epochs", type=int, default=1)
parser.add_argument("-f", "--format", help="example format: (csv|pickle|tfr)", choices=["csv","pickle","tfr"], default="csv")
parser.add_argument("-i", "--images", help="HDFS path to MNIST images in parallelized format",default="mnist/csv/train/images")
parser.add_argument("-l", "--labels", help="HDFS path to MNIST labels in parallelized format",default="mnist/csv/train/labels")
parser.add_argument("-m", "--model", help="HDFS path to save/load model during train/inference", default="mnist_model_csv")
parser.add_argument("-n", "--cluster_size", help="number of nodes in the cluster", type=int, default=2)
parser.add_argument("-o", "--output", help="HDFS path to save test/inference output", default="predictions")
parser.add_argument("-r", "--readers", help="number of reader/enqueue threads", type=int, default=1)
parser.add_argument("-s", "--steps", help="maximum number of steps", type=int, default=1000)
parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true",default=False)
parser.add_argument("-X", "--mode", help="train|inference", default="train")
parser.add_argument("-c", "--rdma", help="use rdma connection", default=False)
args = parser.parse_args()
print("args:",args)
args: Namespace(batch_size=100, cluster_size=2, epochs=1, format='csv', images='mnist/csv/train/images', labels='mnist/csv/train/labels', mode='train', model='mnist_model_csv', output='predictions', rdma=False, readers=1, steps=1000, tensorboard=False)
print("{0} ===== Start".format(datetime.now().isoformat()))

if args.format == "tfr":
  images = sc.newAPIHadoopFile(args.images, "org.tensorflow.hadoop.io.TFRecordFileInputFormat",
                              keyClass="org.apache.hadoop.io.BytesWritable",
                              valueClass="org.apache.hadoop.io.NullWritable")
  def toNumpy(bytestr):
    example = tf.train.Example()
    example.ParseFromString(bytestr)
    features = example.features.feature
    image = numpy.array(features['image'].int64_list.value)
    label = numpy.array(features['label'].int64_list.value)
    return (image, label)
  dataRDD = images.map(lambda x: toNumpy(str(x[0])))
else:
  if args.format == "csv":
    images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])
    labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])
  else: # args.format == "pickle":
    images = sc.pickleFile(args.images)
    labels = sc.pickleFile(args.labels)
  print("zipping images and labels")
  dataRDD = images.zip(labels)
2017-06-07T16:30:36.362144 ===== Start
zipping images and labels
dataRDD.count()
60000
cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK)
#cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK)
2017-06-07 16:30:47,037 INFO (MainThread-59203) Reserving TFSparkNodes 
2017-06-07 16:30:47,039 INFO (MainThread-59203) listening for reservations at ('gpu3', 27664)
2017-06-07 16:30:47,040 INFO (MainThread-59203) Starting TensorFlow on executors
2017-06-07 16:30:47,043 INFO (MainThread-59203) Waiting for TFSparkNodes to start
2017-06-07 16:30:47,044 INFO (MainThread-59203) waiting for 2 reservations
2017-06-07 16:30:48,046 INFO (MainThread-59203) all reservations completed
2017-06-07 16:30:48,047 INFO (MainThread-59203) All TFSparkNodes started
2017-06-07 16:30:48,048 INFO (MainThread-59203) {'addr': '/tmp/pymp-g8Kby_/listener-_9aUh3', 'task_index': 0, 'port': 22397, 'authkey': '\xf3Z\x01\x18mLF\xe6\x99\xb6_\x84\xfb\xa4\xf7\xa8', 'worker_num': 1, 'host': 'GPU1', 'ppid': 127619, 'job_name': 'worker', 'tb_pid': 0, 'tb_port': 0}
2017-06-07 16:30:48,048 INFO (MainThread-59203) {'addr': ('GPU1', 24716), 'task_index': 0, 'port': 24236, 'authkey': '\r\xdcR\xd3\x17 L5\x84R\x1a\\\x87\xd7\xa4\xc2', 'worker_num': 0, 'host': 'GPU1', 'ppid': 127621, 'job_name': 'ps', 'tb_pid': 0, 'tb_port': 0}
if args.mode == "train":
  cluster.train(dataRDD, args.epochs)
else:
  labelRDD = cluster.inference(dataRDD)
  labelRDD.saveAsTextFile(args.output)
cluster.shutdown()

print("{0} ===== Stop".format(datetime.now().isoformat()))
2017-06-07 16:30:50,717 INFO (MainThread-59203) Feeding training data



---------------------------------------------------------------------------

Py4JJavaError                             Traceback (most recent call last)

<ipython-input-10-857570569c77> in <module>()
      1 if args.mode == "train":
----> 2   cluster.train(dataRDD, args.epochs)
      3 else:
      4   labelRDD = cluster.inference(dataRDD)
      5   labelRDD.saveAsTextFile(args.output)


/usr/lib/python2.7/site-packages/tensorflowonspark/TFCluster.pyc in train(self, dataRDD, num_epochs, qname)
     83                 rdds.append(dataRDD)
     84             unionRDD = self.sc.union(rdds)
---> 85             unionRDD.foreachPartition(TFSparkNode.train(self.cluster_info, self.cluster_meta, qname))
     86 
     87     def inference(self, dataRDD, qname='input'):


/opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/rdd.py in foreachPartition(self, f)
    796             except TypeError:
    797                 return iter([])
--> 798         self.mapPartitions(func).count()  # Force evaluation
    799 
    800     def collect(self):


/opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/rdd.py in count(self)
   1038         3
   1039         """
-> 1040         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
   1041 
   1042     def stats(self):


/opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/rdd.py in sum(self)
   1029         6.0
   1030         """
-> 1031         return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
   1032 
   1033     def count(self):


/opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/rdd.py in fold(self, zeroValue, op)
    903         # zeroValue provided to each partition is unique from the one provided
    904         # to the final reduce call
--> 905         vals = self.mapPartitions(func).collect()
    906         return reduce(op, vals, zeroValue)
    907 


/opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/rdd.py in collect(self)
    806         """
    807         with SCCallSiteSync(self.context) as css:
--> 808             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    809         return list(_load_from_socket(port, self._jrdd_deserializer))
    810 


/usr/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args)
   1152         answer = self.gateway_client.send_command(command)
   1153         return_value = get_return_value(
-> 1154             answer, self.gateway_client, self.target_id, self.name)
   1155 
   1156         for temp_arg in temp_args:


/usr/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
    318                 raise Py4JJavaError(
    319                     "An error occurred while calling {0}{1}{2}.\n".
--> 320                     format(target_id, ".", name), value)
    321             else:
    322                 raise Py4JError(


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 20, GPU1, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/worker.py", line 174, in main
    process()
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/rdd.py", line 2406, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/rdd.py", line 2406, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/rdd.py", line 2406, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/rdd.py", line 345, in func
    return f(iterator)
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/rdd.py", line 793, in func
    r = f(it)
  File "/usr/lib/python2.7/site-packages/tensorflowonspark/TFSparkNode.py", line 433, in _train
    queue = mgr.get_queue(qname)
  File "/usr/lib64/python2.7/multiprocessing/managers.py", line 667, in temp
    token, exp = self._create(typeid, *args, **kwds)
  File "/usr/lib64/python2.7/multiprocessing/managers.py", line 567, in _create
    id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
  File "/usr/lib64/python2.7/multiprocessing/managers.py", line 105, in dispatch
    raise convert_to_error(kind, result)
RemoteError: 
---------------------------------------------------------------------------
Traceback (most recent call last):
  File "/usr/lib64/python2.7/multiprocessing/managers.py", line 207, in handle_request
    result = func(c, *args, **kwds)
  File "/usr/lib64/python2.7/multiprocessing/managers.py", line 386, in create
    obj = callable(*args, **kwds)
  File "./tfspark.zip/tensorflowonspark/TFManager.py", line 34, in <lambda>
    TFManager.register('get_queue', callable=lambda qname: qdict[qname])
KeyError: 'input'
---------------------------------------------------------------------------

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:934)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/worker.py", line 174, in main
    process()
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/rdd.py", line 2406, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/rdd.py", line 2406, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/rdd.py", line 2406, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/rdd.py", line 345, in func
    return f(iterator)
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/rdd.py", line 793, in func
    r = f(it)
  File "/usr/lib/python2.7/site-packages/tensorflowonspark/TFSparkNode.py", line 433, in _train
    queue = mgr.get_queue(qname)
  File "/usr/lib64/python2.7/multiprocessing/managers.py", line 667, in temp
    token, exp = self._create(typeid, *args, **kwds)
  File "/usr/lib64/python2.7/multiprocessing/managers.py", line 567, in _create
    id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
  File "/usr/lib64/python2.7/multiprocessing/managers.py", line 105, in dispatch
    raise convert_to_error(kind, result)
RemoteError: 
---------------------------------------------------------------------------
Traceback (most recent call last):
  File "/usr/lib64/python2.7/multiprocessing/managers.py", line 207, in handle_request
    result = func(c, *args, **kwds)
  File "/usr/lib64/python2.7/multiprocessing/managers.py", line 386, in create
    obj = callable(*args, **kwds)
  File "./tfspark.zip/tensorflowonspark/TFManager.py", line 34, in <lambda>
    TFManager.register('get_queue', callable=lambda qname: qdict[qname])
KeyError: 'input'
---------------------------------------------------------------------------

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more

SparkOnYarn running TensorflowOnSpark demo mnist blocked

When I running TensorflowOnSpark Demo mnist On YARN blocked. There is no error message.
How to fix it ?
CentOS 7.0,
Spark 2.1.0,
Tensorflow 1.0.0
Python 2.7.5
JDK 1.7

submit script:

${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 4 \
--queue default \
--executor-memory 20G \
--py-files tfspark.zip,TensorFlowOnSpark/examples/mnist/spark/mnist_dist.py \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
TensorFlowOnSpark/examples/mnist/spark/mnist_spark.py \
--images mnist/csv/train/images \
--labels mnist/csv/train/labels \
--mode train \
--model mnist_model

blocked task message:

logfile container_1486634114148_0042_01_000004/hadp/stderr/?start=0
-----------------------------------------
17/02/21 03:25:15 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 9
17/02/21 03:25:15 INFO executor.Executor: Running task 1.0 in stage 2.0 (TID 9)
17/02/21 03:25:15 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 4
17/02/21 03:25:15 INFO memory.MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 6.8 KB, free 10.5 GB)
17/02/21 03:25:15 INFO broadcast.TorrentBroadcast: Reading broadcast variable 4 took 16 ms
17/02/21 03:25:15 INFO memory.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 13.8 KB, free 10.5 GB)
17/02/21 03:25:16 INFO rdd.HadoopRDD: Input split: hdfs://172.16.141.90:8020/user/hadp/mnist/csv/train/images/part-00001:0+11231804
17/02/21 03:25:16 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 0
17/02/21 03:25:16 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 23.9 KB, free 10.5 GB)
17/02/21 03:25:16 INFO broadcast.TorrentBroadcast: Reading broadcast variable 0 took 16 ms
17/02/21 03:25:16 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 297.3 KB, free 10.5 GB)
17/02/21 03:25:16 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
17/02/21 03:25:16 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 40b332285be4d4a9ec95a02694b20211810cf4ec]
17/02/21 03:25:16 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
17/02/21 03:25:16 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
17/02/21 03:25:16 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
17/02/21 03:25:16 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
17/02/21 03:25:16 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
17/02/21 03:25:17 INFO rdd.HadoopRDD: Input split: hdfs://172.16.141.90:8020/user/hadp/mnist/csv/train/labels/part-00001:0+245760
17/02/21 03:25:17 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 1
17/02/21 03:25:17 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 23.9 KB, free 10.5 GB)
17/02/21 03:25:17 INFO broadcast.TorrentBroadcast: Reading broadcast variable 1 took 17 ms
17/02/21 03:25:17 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 297.3 KB, free 10.5 GB)
2017-02-21 03:25:17,378 INFO (MainThread-13113) Connected to TFSparkNode.mgr on slave2, ppid=12956, state='running'
2017-02-21 03:25:17,384 INFO (MainThread-13113) mgr.state='running'
2017-02-21 03:25:17,384 INFO (MainThread-13113) Feeding partition <itertools.chain object at 0x2ae71d0> into input queue <multiprocessing.queues.JoinableQueue object at 0x2824b90>
17/02/21 03:25:21 INFO python.PythonRunner: Times: total = 4363, boot = -9675, init = 9762, finish = 4276
17/02/21 03:25:21 INFO python.PythonRunner: Times: total = 109, boot = 4, init = 28, finish = 77
logfile:container_1486634114148_0042_01_000002/hadp/stderr/?start=0
-----------------------------------------
17/02/21 03:18:43 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
17/02/21 03:18:43 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(1, slave1, 33036, None)
17/02/21 03:18:43 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(1, slave1, 33036, None)
17/02/21 03:18:43 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(1, slave1, 33036, None)
17/02/21 03:18:44 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 0
17/02/21 03:18:44 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)slave1
17/02/21 03:18:44 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 2
17/02/21 03:18:44 INFO client.TransportClientFactory: Successfully created connection to /172.16.165.28:45922 after 2 ms (0 ms spent in bootstraps)
17/02/21 03:18:44 INFO memory.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 4.3 KB, free 10.5 GB)
17/02/21 03:18:44 INFO broadcast.TorrentBroadcast: Reading broadcast variable 2 took 161 ms
17/02/21 03:18:44 INFO memory.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 6.0 KB, free 10.5 GB)
2017-02-21 03:18:55,776 INFO (MainThread-11159) TFSparkNode.reserve: {'authkey': UUID('28cb10c4-7498-4ca8-89ac-b8a604f6048c'), 'worker_num': 0, 'host': 'slave1', 'tb_port': 0, 'addr': ('slave1', 41678), 'ppid': 11123, 'task_index': 0, 'job_name': 'ps', 'tb_pid': 0, 'port': 44860}
17/02/21 03:18:55 INFO python.PythonRunner: Times: total = 11122, boot = 689, init = 76, finish = 10357
17/02/21 03:18:55 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 2473 bytes result sent to driver
17/02/21 03:18:55 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 7
17/02/21 03:18:55 INFO executor.Executor: Running task 3.0 in stage 1.0 (TID 7)
17/02/21 03:18:55 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 3
17/02/21 03:18:56 INFO memory.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 6.0 KB, free 10.5 GB)
17/02/21 03:18:56 INFO broadcast.TorrentBroadcast: Reading broadcast variable 3 took 17 ms
17/02/21 03:18:56 INFO memory.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 9.4 KB, free 10.5 GB)
2017-02-21 03:18:56,066 INFO (MainThread-11159) node: {'addr': ('slave1', 41678), 'task_index': 0, 'job_name': 'ps', 'authkey': UUID('28cb10c4-7498-4ca8-89ac-b8a604f6048c'), 'worker_num': 0, 'host': 'slave1', 'ppid': 11123, 'port': 44860, 'tb_pid': 0, 'tb_port': 0}
2017-02-21 03:18:56,066 INFO (MainThread-11159) node: {'addr': '/tmp/pymp-uwrScZ/listener-T6UoZM', 'task_index': 0, 'job_name': 'worker', 'authkey': UUID('e0d78818-5d92-491a-ab25-22c90992b165'), 'worker_num': 1, 'host': 'slave2', 'ppid': 12956, 'port': 48957, 'tb_pid': 0, 'tb_port': 0}
2017-02-21 03:18:56,067 INFO (MainThread-11159) node: {'addr': '/tmp/pymp-IKhTmR/listener-oOgQs9', 'task_index': 1, 'job_name': 'worker', 'authkey': UUID('c8c444ab-5c1d-420d-aea6-f12a7addeafa'), 'worker_num': 2, 'host': 'slave3', 'ppid': 7641, 'port': 43524, 'tb_pid': 0, 'tb_port': 0}
2017-02-21 03:18:56,067 INFO (MainThread-11159) node: {'addr': '/tmp/pymp-yW3koh/listener-gbHUUU', 'task_index': 2, 'job_name': 'worker', 'authkey': UUID('27b2c855-fe34-47b2-9638-84d5844cb47d'), 'worker_num': 3, 'host': 'slave4', 'ppid': 141447, 'port': 41709, 'tb_pid': 0, 'tb_port': 0}
2017-02-21 03:18:56,317 INFO (MainThread-11159) Connected to TFSparkNode.mgr on slave1, ppid=11123, state='running'
2017-02-21 03:18:56,325 INFO (MainThread-11159) Starting TensorFlow ps:0 on cluster node 0 on background thread
2017-02-21 03:19:02,712 INFO (Thread-1-11159) 0: ======== ps:0 ========
2017-02-21 03:19:02,712 INFO (Thread-1-11159) 0: Cluster spec: {'ps': ['slave1:44860'], 'worker': ['slave2:48957', 'slave3:43524', 'slave4:41709']}
2017-02-21 03:19:02,712 INFO (Thread-1-11159) 0: Using CPU
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE3 instructions, but these are available on your machine and could speed up CPU computations.
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations.
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations.
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX instructions, but these are available on your machine and could speed up CPU computations.
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX2 instructions, but these are available on your machine and could speed up CPU computations.
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use FMA instructions, but these are available on your machine and could speed up CPU computations.
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:200] Initialize GrpcChannelCache for job ps -> {0 -> localhost:44860}
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:200] Initialize GrpcChannelCache for job worker -> {0 -> slave2:48957, 1 -> slave3:43524, 2 -> slave4:41709}
I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:221] Started server with target: grpc://localhost:44860
logfile: container_1486634114148_0042_01_000005/hadp/stderr/?start=0
-----------------------------------------
17/02/21 11:31:43 INFO executor.Executor: Finished task 2.0 in stage 1.0 (TID 6). 2189 bytes result sent to driver
2017-02-21 11:31:44,057 INFO (Thread-1-141481) 3: ======== worker:2 ========
2017-02-21 11:31:44,057 INFO (Thread-1-141481) 3: Cluster spec: {'ps': ['slave1:44860'], 'worker': ['slave2:48957', 'slave3:43524', 'slave4:41709']}
2017-02-21 11:31:44,057 INFO (Thread-1-141481) 3: Using CPU
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE3 instructions, but these are available on your machine and could speed up CPU computations.
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations.
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations.
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX instructions, but these are available on your machine and could speed up CPU computations.
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX2 instructions, but these are available on your machine and could speed up CPU computations.
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use FMA instructions, but these are available on your machine and could speed up CPU computations.
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:200] Initialize GrpcChannelCache for job ps -> {0 -> slave1:44860}
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:200] Initialize GrpcChannelCache for job worker -> {0 -> slave2:48957, 1 -> slave3:43524, 2 -> localhost:41709}
I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:221] Started server with target: grpc://localhost:41709
tensorflow model path: hdfs://172.16.141.90:8020/user/hadp/mnist_model
17/02/21 11:31:48 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 8
17/02/21 11:31:48 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 8)
17/02/21 11:31:48 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 4
17/02/21 11:31:48 INFO memory.MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 6.8 KB, free 10.5 GB)
17/02/21 11:31:48 INFO broadcast.TorrentBroadcast: Reading broadcast variable 4 took 16 ms
17/02/21 11:31:48 INFO memory.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 13.8 KB, free 10.5 GB)
17/02/21 11:31:48 INFO rdd.HadoopRDD: Input split: hdfs://172.16.141.90:8020/user/hadp/mnist/csv/train/images/part-00000:0+9338236
17/02/21 11:31:48 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 0
17/02/21 11:31:48 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 23.9 KB, free 10.5 GB)
17/02/21 11:31:48 INFO broadcast.TorrentBroadcast: Reading broadcast variable 0 took 16 ms
17/02/21 11:31:48 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 297.3 KB, free 10.5 GB)
17/02/21 11:31:48 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
17/02/21 11:31:48 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 40b332285be4d4a9ec95a02694b20211810cf4ec]
17/02/21 11:31:48 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
17/02/21 11:31:48 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
17/02/21 11:31:48 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
17/02/21 11:31:48 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
17/02/21 11:31:48 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
17/02/21 11:31:49 INFO rdd.HadoopRDD: Input split: hdfs://172.16.141.90:8020/user/hadp/mnist/csv/train/labels/part-00000:0+204800
17/02/21 11:31:49 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 1
17/02/21 11:31:49 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 23.9 KB, free 10.5 GB)
17/02/21 11:31:49 INFO broadcast.TorrentBroadcast: Reading broadcast variable 1 took 18 ms
17/02/21 11:31:49 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 297.3 KB, free 10.5 GB)
2017-02-21 11:31:49,476 INFO (MainThread-141591) Connected to TFSparkNode.mgr on slave4, ppid=141447, state='running'
2017-02-21 11:31:49,481 INFO (MainThread-141591) mgr.state='running'
2017-02-21 11:31:49,482 INFO (MainThread-141591) Feeding partition <itertools.chain object at 0x35c8250> into input queue <multiprocessing.queues.JoinableQueue object at 0x3343c50>
I tensorflow/core/distributed_runtime/master_session.cc:1012] Start master session 2752662a91d29de2 with config: 

INFO:tensorflow:Waiting for model to be ready.  Ready_for_local_init_op:  None, ready: Variables not initialized: hid_w, hid_b, sm_w, sm_b, Variable, hid_w/Adagrad, hid_b/Adagrad, sm_w/Adagrad, sm_b/Adagrad
2017-02-21 11:31:50,141 INFO (Thread-1-141481) Waiting for model to be ready.  Ready_for_local_init_op:  None, ready: Variables not initialized: hid_w, hid_b, sm_w, sm_b, Variable, hid_w/Adagrad, hid_b/Adagrad, sm_w/Adagrad, sm_b/Adagrad
17/02/21 11:31:51 INFO python.PythonRunner: Times: total = 1967, boot = -5694, init = 5778, finish = 1883
17/02/21 11:31:51 INFO python.PythonRunner: Times: total = 115, boot = 3, init = 40, finish = 72
I tensorflow/core/distributed_runtime/master_session.cc:1012] Start master session 09ff1aeacc016eab with config: 

2017-02-21T11:32:20.294690 session ready
2017-02-21T11:32:20.364734 step: 0 accuracy: 0.589999973774
2017-02-21 11:32:22,888 INFO (MainThread-141591) Processed 5120 items in partition
17/02/21 11:32:22 INFO python.PythonRunner: Times: total = 33461, boot = 3, init = 38, finish = 33420
17/02/21 11:32:22 INFO executor.Executor: Finished task 0.0 in stage 2.0 (TID 8). 2397 bytes result sent to driver
17/02/21 11:32:22 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 11
17/02/21 11:32:22 INFO executor.Executor: Running task 3.0 in stage 2.0 (TID 11)
17/02/21 11:32:22 INFO rdd.HadoopRDD: Input split: hdfs://172.16.141.90:8020/user/hadp/mnist/csv/train/images/part-00003:0+11226100
17/02/21 11:32:23 INFO rdd.HadoopRDD: Input split: hdfs://172.16.141.90:8020/user/hadp/mnist/csv/train/labels/part-00003:0+245760
2017-02-21 11:32:23,155 INFO (MainThread-141591) Connected to TFSparkNode.mgr on slave4, ppid=141447, state='running'
2017-02-21 11:32:23,161 INFO (MainThread-141591) mgr.state='running'
2017-02-21 11:32:23,162 INFO (MainThread-141591) Feeding partition <itertools.chain object at 0x35d1450> into input queue <multiprocessing.queues.JoinableQueue object at 0x3343c50>
17/02/21 11:32:25 INFO python.PythonRunner: Times: total = 2574, boot = -31926, init = 31933, finish = 2567
17/02/21 11:32:25 INFO python.PythonRunner: Times: total = 85, boot = -33619, init = 33625, finish = 79
2017-02-21 11:32:28,572 INFO (MainThread-141591) Processed 6144 items in partition
17/02/21 11:32:28 INFO python.PythonRunner: Times: total = 5443, boot = -227, init = 245, finish = 5425
17/02/21 11:32:28 INFO executor.Executor: Finished task 3.0 in stage 2.0 (TID 11). 1652 bytes result sent to driver
17/02/21 11:32:28 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 13
17/02/21 11:32:28 INFO executor.Executor: Running task 5.0 in stage 2.0 (TID 13)
17/02/21 11:32:28 INFO rdd.HadoopRDD: Input split: hdfs://172.16.141.90:8020/user/hadp/mnist/csv/train/images/part-00005:0+11173834
17/02/21 11:32:28 INFO rdd.HadoopRDD: Input split: hdfs://172.16.141.90:8020/user/hadp/mnist/csv/train/labels/part-00005:0+245760
2017-02-21 11:32:28,803 INFO (MainThread-141591) Connected to TFSparkNode.mgr on slave4, ppid=141447, state='running'
2017-02-21 11:32:28,808 INFO (MainThread-141591) mgr.state='running'
2017-02-21 11:32:28,808 INFO (MainThread-141591) Feeding partition <itertools.chain object at 0x35d1250> into input queue <multiprocessing.queues.JoinableQueue object at 0x3343c50>
17/02/21 11:32:31 INFO python.PythonRunner: Times: total = 2647, boot = -3039, init = 3045, finish = 2641
17/02/21 11:32:31 INFO python.PythonRunner: Times: total = 88, boot = -5557, init = 5563, finish = 82
2017-02-21 11:32:34,287 INFO (MainThread-141591) Processed 6144 items in partition
17/02/21 11:32:34 INFO python.PythonRunner: Times: total = 5508, boot = -193, init = 210, finish = 5491
17/02/21 11:32:34 INFO executor.Executor: Finished task 5.0 in stage 2.0 (TID 13). 1725 bytes result sent to driver
17/02/21 11:32:34 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 15
17/02/21 11:32:34 INFO executor.Executor: Running task 7.0 in stage 2.0 (TID 15)
17/02/21 11:32:34 INFO rdd.HadoopRDD: Input split: hdfs://172.16.141.90:8020/user/hadp/mnist/csv/train/images/part-00007:0+11201024
17/02/21 11:32:34 INFO rdd.HadoopRDD: Input split: hdfs://172.16.141.90:8020/user/hadp/mnist/csv/train/labels/part-00007:0+245760
2017-02-21 11:32:34,506 INFO (MainThread-141591) Connected to TFSparkNode.mgr on slave4, ppid=141447, state='running'
2017-02-21 11:32:34,512 INFO (MainThread-141591) mgr.state='running'
2017-02-21 11:32:34,513 INFO (MainThread-141591) Feeding partition <itertools.chain object at 0x35d1250> into input queue <multiprocessing.queues.JoinableQueue object at 0x3343c50>
17/02/21 11:32:36 INFO python.PythonRunner: Times: total = 2587, boot = -3012, init = 3017, finish = 2582
17/02/21 11:32:36 INFO python.PythonRunner: Times: total = 87, boot = -5609, init = 5614, finish = 82
2017-02-21T11:32:38.988017 step: 400 accuracy: 0.930000007153
2017-02-21 11:32:39,896 INFO (MainThread-141591) Processed 6144 items in partition
17/02/21 11:32:39 INFO python.PythonRunner: Times: total = 5417, boot = -176, init = 193, finish = 5400
17/02/21 11:32:39 INFO executor.Executor: Finished task 7.0 in stage 2.0 (TID 15). 1652 bytes result sent to driver
17/02/21 11:32:39 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 17
17/02/21 11:32:39 INFO executor.Executor: Running task 9.0 in stage 2.0 (TID 17)
17/02/21 11:32:39 INFO rdd.HadoopRDD: Input split: hdfs://172.16.141.90:8020/user/hadp/mnist/csv/train/images/part-00009:0+10449019
17/02/21 11:32:40 INFO rdd.HadoopRDD: Input split: hdfs://172.16.141.90:8020/user/hadp/mnist/csv/train/labels/part-00009:0+229120
2017-02-21 11:32:40,126 INFO (MainThread-141591) Connected to TFSparkNode.mgr on slave4, ppid=141447, state='running'
2017-02-21 11:32:40,132 INFO (MainThread-141591) mgr.state='running'
2017-02-21 11:32:40,132 INFO (MainThread-141591) Feeding partition <itertools.chain object at 0x35d1250> into input queue <multiprocessing.queues.JoinableQueue object at 0x3343c50>
17/02/21 11:32:42 INFO python.PythonRunner: Times: total = 2501, boot = -2971, init = 2977, finish = 2495
17/02/21 11:32:42 INFO python.PythonRunner: Times: total = 85, boot = -5524, init = 5530, finish = 79
2017-02-21T11:32:43.592701 step: 500 accuracy: 0.939999997616
2017-02-21 11:32:45,197 INFO (MainThread-141591) Processed 5728 items in partition
17/02/21 11:32:45 INFO python.PythonRunner: Times: total = 5097, boot = -189, init = 209, finish = 5077
17/02/21 11:32:45 INFO executor.Executor: Finished task 9.0 in stage 2.0 (TID 17). 1739 bytes result sent to driver
logfile: container_1486634114148_0042_01_000003/hadp/stderr/?start=0
-----------------------------------------
2017-02-21 03:17:50,090 INFO (Thread-1-7675) 2: ======== worker:1 ========
2017-02-21 03:17:50,090 INFO (Thread-1-7675) 2: Cluster spec: {'ps': ['slave1:44860'], 'worker': ['slave2:48957', 'slave3:43524', 'slave4:41709']}
2017-02-21 03:17:50,090 INFO (Thread-1-7675) 2: Using CPU
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE3 instructions, but these are available on your machine and could speed up CPU computations.
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations.
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations.
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX instructions, but these are available on your machine and could speed up CPU computations.
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX2 instructions, but these are available on your machine and could speed up CPU computations.
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use FMA instructions, but these are available on your machine and could speed up CPU computations.
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:200] Initialize GrpcChannelCache for job ps -> {0 -> slave1:44860}
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:200] Initialize GrpcChannelCache for job worker -> {0 -> slave2:48957, 1 -> localhost:43524, 2 -> slave4:41709}
I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:221] Started server with target: grpc://localhost:43524
tensorflow model path: hdfs://172.16.141.90:8020/user/hadp/mnist_model
I tensorflow/core/distributed_runtime/master_session.cc:1012] Start master session 3f363c8a0a437ec4 with config: 

INFO:tensorflow:Waiting for model to be ready.  Ready_for_local_init_op:  None, ready: Variables not initialized: hid_w, hid_b, sm_w, sm_b, Variable, hid_w/Adagrad, hid_b/Adagrad, sm_w/Adagrad, sm_b/Adagrad
2017-02-21 03:17:56,292 INFO (Thread-1-7675) Waiting for model to be ready.  Ready_for_local_init_op:  None, ready: Variables not initialized: hid_w, hid_b, sm_w, sm_b, Variable, hid_w/Adagrad, hid_b/Adagrad, sm_w/Adagrad, sm_b/Adagrad
17/02/21 03:17:57 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 10
17/02/21 03:17:57 INFO executor.Executor: Running task 2.0 in stage 2.0 (TID 10)
17/02/21 03:17:57 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 4
17/02/21 03:17:57 INFO memory.MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 6.8 KB, free 10.5 GB)
17/02/21 03:17:57 INFO broadcast.TorrentBroadcast: Reading broadcast variable 4 took 17 ms
17/02/21 03:17:57 INFO memory.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 13.8 KB, free 10.5 GB)
17/02/21 03:17:57 INFO rdd.HadoopRDD: Input split: hdfs://172.16.141.90:8020/user/hadp/mnist/csv/train/images/part-00002:0+11214784
17/02/21 03:17:57 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 0
17/02/21 03:17:57 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 23.9 KB, free 10.5 GB)
17/02/21 03:17:57 INFO broadcast.TorrentBroadcast: Reading broadcast variable 0 took 16 ms
17/02/21 03:17:57 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 297.3 KB, free 10.5 GB)
17/02/21 03:17:58 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
17/02/21 03:17:58 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 40b332285be4d4a9ec95a02694b20211810cf4ec]
17/02/21 03:17:58 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
17/02/21 03:17:58 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
17/02/21 03:17:58 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
17/02/21 03:17:58 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
17/02/21 03:17:58 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
17/02/21 03:17:58 INFO rdd.HadoopRDD: Input split: hdfs://172.16.141.90:8020/user/hadp/mnist/csv/train/labels/part-00002:0+245760
17/02/21 03:17:58 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 1
17/02/21 03:17:58 INFO client.TransportClientFactory: Successfully created connection to slave4/172.16.141.91:7812 after 2 ms (0 ms spent in bootstraps)
17/02/21 03:17:58 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 23.9 KB, free 10.5 GB)
17/02/21 03:17:58 INFO broadcast.TorrentBroadcast: Reading broadcast variable 1 took 50 ms
17/02/21 03:17:58 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 297.3 KB, free 10.5 GB)
2017-02-21 03:17:59,006 INFO (MainThread-7786) Connected to TFSparkNode.mgr on slave3, ppid=7641, state='running'
2017-02-21 03:17:59,010 INFO (MainThread-7786) mgr.state='running'
2017-02-21 03:17:59,010 INFO (MainThread-7786) Feeding partition <itertools.chain object at 0x178f390> into input queue <multiprocessing.queues.JoinableQueue object at 0x1e8ed50>
17/02/21 03:18:00 INFO python.PythonRunner: Times: total = 2443, boot = -9670, init = 9736, finish = 2377
17/02/21 03:18:00 INFO python.PythonRunner: Times: total = 143, boot = 3, init = 48, finish = 92
I tensorflow/core/distributed_runtime/master_session.cc:1012] Start master session 6fe3cf85083c4718 with config: 

2017-02-21T03:18:26.451959 session ready
2017-02-21T03:18:29.097871 step: 100 accuracy: 0.949999988079
2017-02-21 03:18:29,681 INFO (MainThread-7786) Processed 6144 items in partition
17/02/21 03:18:29 INFO python.PythonRunner: Times: total = 30754, boot = 4, init = 67, finish = 30683
17/02/21 03:18:29 INFO executor.Executor: Finished task 2.0 in stage 2.0 (TID 10). 2397 bytes result sent to driver
17/02/21 03:18:31 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 12
17/02/21 03:18:31 INFO executor.Executor: Running task 4.0 in stage 2.0 (TID 12)
17/02/21 03:18:31 INFO rdd.HadoopRDD: Input split: hdfs://172.16.141.90:8020/user/hadp/mnist/csv/train/images/part-00004:0+11212767
17/02/21 03:18:31 INFO rdd.HadoopRDD: Input split: hdfs://172.16.141.90:8020/user/hadp/mnist/csv/train/labels/part-00004:0+245760
2017-02-21 03:18:31,804 INFO (MainThread-7786) Connected to TFSparkNode.mgr on slave3, ppid=7641, state='running'
2017-02-21 03:18:31,809 INFO (MainThread-7786) mgr.state='running'
2017-02-21 03:18:31,809 INFO (MainThread-7786) Feeding partition <itertools.chain object at 0x1796590> into input queue <multiprocessing.queues.JoinableQueue object at 0x1e8ed50>
17/02/21 03:18:34 INFO python.PythonRunner: Times: total = 2601, boot = -30602, init = 30610, finish = 2593
17/02/21 03:18:34 INFO python.PythonRunner: Times: total = 88, boot = -32745, init = 32752, finish = 81
2017-02-21T03:18:35.628594 step: 200 accuracy: 0.949999988079
2017-02-21 03:18:37,445 INFO (MainThread-7786) Processed 6144 items in partition
17/02/21 03:18:37 INFO python.PythonRunner: Times: total = 5666, boot = -2082, init = 2100, finish = 5648
17/02/21 03:18:37 INFO executor.Executor: Finished task 4.0 in stage 2.0 (TID 12). 1652 bytes result sent to driver
17/02/21 03:18:37 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 14
17/02/21 03:18:37 INFO executor.Executor: Running task 6.0 in stage 2.0 (TID 14)
17/02/21 03:18:37 INFO rdd.HadoopRDD: Input split: hdfs://172.16.141.90:8020/user/hadp/mnist/csv/train/images/part-00006:0+11214285
17/02/21 03:18:37 INFO rdd.HadoopRDD: Input split: hdfs://172.16.141.90:8020/user/hadp/mnist/csv/train/labels/part-00006:0+245760
2017-02-21 03:18:37,686 INFO (MainThread-7786) Connected to TFSparkNode.mgr on slave3, ppid=7641, state='running'
2017-02-21 03:18:37,691 INFO (MainThread-7786) mgr.state='running'
2017-02-21 03:18:37,692 INFO (MainThread-7786) Feeding partition <itertools.chain object at 0x1796390> into input queue <multiprocessing.queues.JoinableQueue object at 0x1e8ed50>
17/02/21 03:18:40 INFO python.PythonRunner: Times: total = 2705, boot = -3210, init = 3218, finish = 2697
17/02/21 03:18:40 INFO python.PythonRunner: Times: total = 87, boot = -5775, init = 5783, finish = 79
2017-02-21T03:18:40.367697 step: 300 accuracy: 0.949999988079
2017-02-21 03:18:43,355 INFO (MainThread-7786) Processed 6144 items in partition
17/02/21 03:18:43 INFO python.PythonRunner: Times: total = 5685, boot = -212, init = 220, finish = 5677
17/02/21 03:18:43 INFO executor.Executor: Finished task 6.0 in stage 2.0 (TID 14). 1725 bytes result sent to driver
17/02/21 03:18:43 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 16
17/02/21 03:18:43 INFO executor.Executor: Running task 8.0 in stage 2.0 (TID 16)
17/02/21 03:18:43 INFO rdd.HadoopRDD: Input split: hdfs://172.16.141.90:8020/user/hadp/mnist/csv/train/images/part-00008:0+11194141
17/02/21 03:18:43 INFO rdd.HadoopRDD: Input split: hdfs://172.16.141.90:8020/user/hadp/mnist/csv/train/labels/part-00008:0+245760
2017-02-21 03:18:43,580 INFO (MainThread-7786) Connected to TFSparkNode.mgr on slave3, ppid=7641, state='running'
2017-02-21 03:18:43,584 INFO (MainThread-7786) mgr.state='running'
2017-02-21 03:18:43,585 INFO (MainThread-7786) Feeding partition <itertools.chain object at 0x1796390> into input queue <multiprocessing.queues.JoinableQueue object at 0x1e8ed50>
17/02/21 03:18:45 INFO python.PythonRunner: Times: total = 2595, boot = -3124, init = 3131, finish = 2588
17/02/21 03:18:45 INFO python.PythonRunner: Times: total = 88, boot = -5811, init = 5819, finish = 80
2017-02-21T03:18:48.998325 step: 500 accuracy: 0.939999997616
2017-02-21 03:18:49,194 INFO (MainThread-7786) Processed 6144 items in partition
17/02/21 03:18:49 INFO python.PythonRunner: Times: total = 5639, boot = -186, init = 204, finish = 5621
17/02/21 03:18:49 INFO executor.Executor: Finished task 8.0 in stage 2.0 (TID 16). 1652 bytes result sent to driver

image

ClassNotFoundException: org.tensorflow.hadoop.io.TFRecordFileOutputFormat while Running MNIST example over YARN

I am trying to run the MNIST example on a YARN cluster. I am using the following command to generate save images and labels as TFRecords but I get the following error.

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: java.lang.ClassNotFoundException: org.tensorflow.hadoop.io.TFRecordFileOutputFormat
	at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:278)
	at org.apache.spark.util.Utils$.classForName(Utils.scala:228)
	at org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:833)
	at org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)

The output of fs -ls shows that the jar file exists at the path specified.

bin/hadoop fs -ls /user/${USER}/tensorflow-hadoop-1.0-SNAPSHOT.jar
-rw-r--r--   3 javed.19 supergroup     118714 2017-05-11 16:00 /user/javed.19/tensorflow-hadoop-1.0-SNAPSHOT.jar

Moreover I also extracted the jar to confirm if it contains the concerned class and it does. What could be the problem?

Environment:
CUDA: 8
CUDNN: 5.0
Python: 2.7.3
Tensorflow: 1.0

MNIST example cannot run on the spark-2.2.0

After I deployed the spark-2.2.0, failed to run the script of the mnist example.
The script is the GetStarted_YARN . See the script detail at the following.
It is OK un the spark 2.1.1 , but it is failure on spark-2.2.0 .

Thanks for you help!

The error is : /home/xxx/Python/bin/python not found.
Script:
export PYTHON_ROOT=~/Python
export LD_LIBRARY_PATH=${PATH}
export PYSPARK_PYTHON=${PYTHON_ROOT}/bin/python
export SPARK_YARN_USER_ENV="PYSPARK_PYTHON=Python/bin/python"
export PATH=${PYTHON_ROOT}/bin/:$PATH

export QUEUE=default

${SPARK_HOME}/bin/spark-submit
--master yarn
--deploy-mode cluster
--queue ${QUEUE}
--num-executors 4
--executor-memory 4G
--archives hdfs:///user/${USER}/Python.zip#Python,mnist/mnist.zip#mnist
TensorFlowOnSpark/examples/mnist/mnist_data_setup.py
--output mnist/csv
--format csv

Exception: TFManager already started on worker1, ppid=25378, state='running'

hello, thank you for your work to open tensorflow on spark.
I can run the example where num-executors=2, but when num-executors= 4, I get some errors:

  1. Exception: TFManager already started on worker2, ppid=7751, state='running'
  2. Exception: TFManager already started on worker1, ppid=25378, state='running'
  3. Exception: Duplicate cluster node id detected (host=worker1, ppid=25378). Please ensure that the number of executors >= number of TensorFlow nodes, and TFCluster.shutdown() is successfully invoked when done.

spark-submit \

--master yarn
--deploy-mode client
--queue default
--num-executors 4
--executor-cores 1
--executor-memory 600M
--py-files tfspark.zip,examples/mnist/spark/mnist_dist.py
--conf spark.executorEnv.LD_LIBRARY_PATH="$JAVA_HOME/jre/lib/amd64/server"
--conf spark.dynamicAllocation.enabled=false
--conf spark.yarn.maxAppAttempts=1
examples/mnist/spark/mnist_spark.py
--images mnist/csv/train/images
--labels mnist/csv/train/labels
--mode train
--model mnist_model

17/03/03 10:48:17 INFO YarnSchedulerBackend$YarnDriverEndpoint: Launching task 2 on executor id: 1 hostname: worker1.
17/03/03 10:48:17 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 11836 ms on worker1 (1/4)
17/03/03 10:48:17 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, worker2, partition 3, PROCESS_LOCAL, 5391 bytes)
17/03/03 10:48:17 INFO YarnSchedulerBackend$YarnDriverEndpoint: Launching task 3 on executor id: 2 hostname: worker2.
17/03/03 10:48:17 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 11927 ms on worker2 (2/4)
17/03/03 10:48:18 WARN TaskSetManager: Lost task 3.0 in stage 0.0 (TID 3, worker2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/data/hadoop/tmp/nm-local-dir/usercache/spark/appcache/application_1488455522150_0012/container_1488455522150_0012_01_000003/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/data/hadoop/tmp/nm-local-dir/usercache/spark/appcache/application_1488455522150_0012/container_1488455522150_0012_01_000003/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 317, in func
File "/home/spark/TensorFlowOnSpark/tfspark.zip/com/yahoo/ml/tf/TFSparkNode.py", line 91, in _reserve
Exception: TFManager already started on worker2, ppid=7751, state='running'

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

17/03/03 10:48:18 INFO TaskSetManager: Starting task 3.1 in stage 0.0 (TID 4, worker2, partition 3, PROCESS_LOCAL, 5391 bytes)
17/03/03 10:48:18 INFO YarnSchedulerBackend$YarnDriverEndpoint: Launching task 4 on executor id: 2 hostname: worker2.
17/03/03 10:48:18 WARN TaskSetManager: Lost task 2.0 in stage 0.0 (TID 2, worker1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/data/hadoop/tmp/nm-local-dir/usercache/spark/appcache/application_1488455522150_0012/container_1488455522150_0012_01_000002/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/data/hadoop/tmp/nm-local-dir/usercache/spark/appcache/application_1488455522150_0012/container_1488455522150_0012_01_000002/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 317, in func
File "/home/spark/TensorFlowOnSpark/tfspark.zip/com/yahoo/ml/tf/TFSparkNode.py", line 91, in _reserve
Exception: TFManager already started on worker1, ppid=25378, state='running'

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

17/03/03 10:48:18 INFO TaskSetManager: Starting task 2.1 in stage 0.0 (TID 5, worker1, partition 2, PROCESS_LOCAL, 5391 bytes)
17/03/03 10:48:18 INFO YarnSchedulerBackend$YarnDriverEndpoint: Launching task 5 on executor id: 1 hostname: worker1.
17/03/03 10:48:28 INFO TaskSetManager: Finished task 3.1 in stage 0.0 (TID 4) in 10163 ms on worker2 (3/4)
17/03/03 10:48:28 INFO TaskSetManager: Finished task 2.1 in stage 0.0 (TID 5) in 10111 ms on worker1 (4/4)
17/03/03 10:48:28 INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool
17/03/03 10:48:28 INFO DAGScheduler: ResultStage 0 (collect at /home/spark/TensorFlowOnSpark/tfspark.zip/com/yahoo/ml/tf/TFCluster.py:189) finished in 22.397 s
17/03/03 10:48:28 INFO DAGScheduler: Job 0 finished: collect at /home/spark/TensorFlowOnSpark/tfspark.zip/com/yahoo/ml/tf/TFCluster.py:189, took 22.526369 s
{'addr': ('worker1', 33132), 'task_index': 0, 'port': 50483, 'authkey': UUID('16300861-aa35-4fe3-b775-a90cfa9e5883'), 'worker_num': 0, 'host': 'worker1', 'ppid': 25378, 'job_name': 'ps', 'tb_pid': 0, 'tb_port': 0}
{'addr': '/tmp/pymp-8w7uN9/listener-O49cY4', 'task_index': 0, 'port': 50548, 'authkey': UUID('c3d5da9a-b35f-4d0b-9d0f-e8986bacb7da'), 'worker_num': 1, 'host': 'worker2', 'ppid': 7751, 'job_name': 'worker', 'tb_pid': 0, 'tb_port': 0}
{'addr': '/tmp/pymp-NBT2JR/listener-DzP5fR', 'task_index': 1, 'port': 49292, 'authkey': UUID('a988596d-16a8-44f8-b173-e9b8e0f3e4c5'), 'worker_num': 2, 'host': 'worker1', 'ppid': 25378, 'job_name': 'worker', 'tb_pid': 0, 'tb_port': 0}
{'addr': '/tmp/pymp-V2AMeX/listener-rtKT5z', 'task_index': 2, 'port': 58283, 'authkey': UUID('fb48c221-c612-4f43-b5e1-c6a8be1a5eb6'), 'worker_num': 3, 'host': 'worker2', 'ppid': 7751, 'job_name': 'worker', 'tb_pid': 0, 'tb_port': 0}
Traceback (most recent call last):
File "/home/spark/TensorFlowOnSpark/examples/mnist/spark/mnist_spark.py", line 70, in
cluster = TFCluster.reserve(sc, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK)
File "/home/spark/TensorFlowOnSpark/tfspark.zip/com/yahoo/ml/tf/TFCluster.py", line 207, in reserve
Exception: Duplicate cluster node id detected (host=worker1, ppid=25378). Please ensure that the number of executors >= number of TensorFlow nodes, and TFCluster.shutdown() is successfully invoked when done.
17/03/03 10:48:28 INFO SparkContext: Invoking stop() from shutdown hook
17/03/03 10:48:28 INFO ServerConnector: Stopped ServerConnector@49c82a94{HTTP/1.1}{0.0.0.0:4040}

Keras Support

Since Keras can use Tensorflow as the backend, how can one use TensorFlowOnSpark with Keras if it is possible at all? Thank you.

No idea with "Duplicate cluster node id"

{'addr': ('huangkendebignippleboy.local', 63721), 'task_index': 0, 'port': 63732, 'authkey': UUID('41aabf90-0859-4a7f-b502-c8b7ff4f077e'), 'worker_num': 0, 'host': 'huangkendebignippleboy.local', 'ppid': 6110, 'job_name': 'ps', 'tb_pid': 0, 'tb_port': 0}
{'addr': '/var/folders/f2/rpxyvx551hs4hd_vz55k8vbm0000gn/T/pymp-58olDP/listener-7fWyiQ', 'task_index': 0, 'port': 63729, 'authkey': UUID('fc676ab5-c51d-42d0-8b26-09afca7adc4f'), 'worker_num': 1, 'host': 'huangkendebignippleboy.local', 'ppid': 6110, 'job_name': 'worker', 'tb_pid': 0, 'tb_port': 0}
{'addr': '/var/folders/f2/rpxyvx551hs4hd_vz55k8vbm0000gn/T/pymp-MO_r4I/listener-_7Zod5', 'task_index': 1, 'port': 63727, 'authkey': UUID('0b85368b-ab78-4c6d-a0ba-613a20a21b8c'), 'worker_num': 2, 'host': 'huangkendebignippleboy.local', 'ppid': 6110, 'job_name': 'worker', 'tb_pid': 0, 'tb_port': 0}
Traceback (most recent call last):
File "/Users/huangken/Documents/Source/git_src/TensorFlowOnSpark-master/examples/mnist/spark/mnist_spark.py", line 70, in
cluster = TFCluster.reserve(sc, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK)
File "/Users/huangken/Documents/Source/git_src/TensorFlowOnSpark-master/tfspark.zip/com/yahoo/ml/tf/TFCluster.py", line 207, in reserve
Exception: Duplicate cluster node id detected (host=huangkendebignippleboy.local, ppid=6110). Please ensure that the number of executors >= number of TensorFlow nodes, and TFCluster.shutdown() is successfully invoked when done.

AttributeError: 'NoneType' object has no attribute 'get' one cause reason

hello:
from the issues,i find some peole both has the question,like
AttributeError: 'NoneType' object has no attribute 'get'.
in my case no yarn ,i find may be like this:
when the container die(kill by nm),the ppid is not the same to cluster_info which return reserve methon.
so through cluster_info some container is not exit, can't get the state.

Python 3 support on Windows? Running into a pickling problem

I'm running the MNIST example on a standalone cluster in Windows. I had to make a few changes to enable Python 3 support (I'm using 3.5):

In TFCluster.py (due to changes in relative imports):

from . import TFSparkNode

In TFSparkNode.py (Queue > queue and relative imports again, also a problem with Python 3 not handling UUID objects the same as 2):

from . import TFManager
...
authkey = uuid.uuid4()
authkey = authkey.bytes

... and then I get stuck with a pickling issue:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "F:\TensorFlowOnSpark\spark-1.6.0-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 111, in main
  File "F:\TensorFlowOnSpark\spark-1.6.0-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 106, in process
  File "F:\TensorFlowOnSpark\spark-1.6.0-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\rdd.py", line 2346, in pipeline_func
  File "F:\TensorFlowOnSpark\spark-1.6.0-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\rdd.py", line 317, in func
  File "F:\TensorFlowOnSpark\tfspark.zip\com\yahoo\ml\tf\TFSparkNode.py", line 97, in _reserve
  File ".\tfspark.zip\com\yahoo\ml\tf\TFManager.py", line 36, in start
    mgr.start()
  File "C:\Python35\lib\multiprocessing\managers.py", line 479, in start
    self._process.start()
  File "C:\Python35\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\Python35\lib\multiprocessing\context.py", line 313, in _Popen
    return Popen(process_obj)
  File "C:\Python35\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Python35\lib\multiprocessing\reduction.py", line 59, in dump
    ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'start.<locals>.<lambda>'

Not really how to proceed from here. I tried to use another library (dill) to do the pickling but that isn't working. Has anybody gotten this to work in Python 3?

Questions about YROOT

I think I miss some knowledge to configure environment variables in the following steps of "Convert the MNIST zip files into HDFS files."

export YROOT=~/y
export LD_LIBRARY_PATH=${YROOT}/lib64:${PATH}

Specifically, how to generate ~/y/lib64 ?
Any suggestions will be appreciated!

MNIST example hangs

I have setup MNIST example on a hadoop 2.6 server with spark 1.6.1. I have been trying to train it using the following command,

${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue default \
--num-executors 4 \
--executor-memory 20G \
--py-files TensorFlowOnSpark/tfspark.zip,TensorFlowOnSpark/examples/mnist/spark/mnist_dist.py \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--conf spark.executor.cores=1 \
--archives hdfs:///user/${USER}/Python.zip#Python \
--conf spark.executorEnv.LD_LIBRARY_PATH="$JAVA_HOME/jre/lib/amd64/server:/usr/lib/hadoop2.6/lib/native/" \
TensorFlowOnSpark/examples/mnist/spark/mnist_spark.py \
--images mnist/csv/train/images \
--labels mnist/csv/train/labels \
--mode train \
--model mnist_model

But the excecution hangs.

screenshot from 2017-07-19 14-17-37

A few things I found out while trying to debug it,

  1. All the workers seem to be waiting for the model to be written. Modifying code to logdir = None works as suggested in #33. Solutions suggested in the same issue to modify the model directory location to not work. Moreover, logs suggest that the correct URL is picked by the executors anyway.
  2. If we change the number of executor cores to 2, the program gives the error from #94 but the mnist_model directory is written to hdfs.

Logs can be viewed at,
Driver
Executor 1
Executor 2
Executor 3
Executor 4

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.