Giter Site home page Giter Site logo

qubole / spark-state-store Goto Github PK

View Code? Open in Web Editor NEW
16.0 4.0 8.0 58 KB

Rocksdb state storage implementation for Structured Streaming.

License: Apache License 2.0

Shell 2.33% Java 7.59% Scala 79.34% Python 10.73%
streaming structured-streaming spark state-management rocksdb performance scalability qubole real-time-processing

spark-state-store's Introduction

Build Status

Rocksdb State Store for Structured Streaming

SPARK-13809 introduced a framework for state management for computing Streaming Aggregates. The default implementation was in-memory hashmap which was backed up in HDFS complaint file system at the end of every micro-batch.

Current implementation suffers from Performance and Latency Issues. It uses Executor JVM memory to store the states. State store size is limited by the size of the executor memory. Also Executor JVM memory is shared by state storage and other tasks operations. State storage size will impact the performance of task execution

Moreover, GC pauses, executor failures, OOM issues are common when the size of state storage increases which increases overall latency of a micro-batch

RocksDB is a storage engine with key/value interface based on levelDB. New writes are inserted into the memtable; when memtable fills up, it flushes the data on local storage. It supports both point lookups and range scans, and provides different types of ACID guarantees and is optimized for flash storage. Rocksdb based state storage for Structured streaming provides major performance improvements for stateful stream processing.

Discussion on the PR raised can be found here

Downloading and Using the Connector

The connector is available from the Maven Central repository. It can be used using the --packages option or the spark.jars.packages configuration property. Use the following connector artifact

com.qubole.spark/spark-rocksdb-state-store_2.11/1.0.0

Benchmark

Used following repo for the benchmark

Setup

  • Used Qubole's distribution of Apache Spark 2.4.0 for my tests.
  • Master Instance Type = i3.xlarge
  • Driver Memory = 2g
  • num-executors = 1
  • max-executors = 1
  • spark.sql.shuffle.partitions = 8
  • Run time = 30 mins
  • Source = Rate Source
  • executor Memory = 7g
  • spark.executor.memoryOverhead=3g
  • Processing Time = 30 sec

Executor Instance type = i3.xlarge cores per executor = 4 ratePerSec = 20k

State Storage Type Mode Total Trigger Execution Time Records Processed Total State Rows Comments
memory Append ~7 mins 8.6 million 2 million Application failed before 30 mins
RockSB Append ~30 minutes 34.6 million 7 million

Executor Instance type = C5d.2xlarge cores per executor = 8 ratePerSec = 30k

State Storage Type Mode Total Trigger Execution Time Records Processed Total State Rows Comments
memory Append 8 mins 12.6 million 3.1 million Application was stuck because of GC
RockSB Complete ~30 minutes 47.34 million 12.5 million

Executor info when memory based state storage is used Screenshot 2019-08-02 at 10 58 21 AM

Longevity run results

Executor Instance type = C5d.2xlarge cores per executor = 8 ratePerSec = 20k

State Storage Type Mode Total Trigger Execution Time Records Processed Total State Rows Number of Micro-batch Comments
RockSB Append ~1.5 hrs 104.3 million 10.5 million 114

Streaming Metrics Screenshot 2019-08-07 at 8 08 32 PM

Executor info Screenshot 2019-08-07 at 8 18 10 PM

spark-state-store's People

Contributors

dependabot[bot] avatar indit-qubole avatar itsvikramagr avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

spark-state-store's Issues

Unable to perform stream-stream Join using Spark-shell

Hi, I am having java.lang.IllegalStateException: Error while creating OptimisticTransactionDb instance lock : file:/home/centos/rocksdb/rdb/db/state_-1619083917/0/0/LOCK: No locks available. Error while running stream-stream Join on spark-shell.
Step to reproduce.

  1. Spark version 2.4.0 and --packages com.qubole.spark/spark-rocksdb-state-store_2.11/1.0.0
  2. $SPARK_HOME/bin/spark-shell --master local[*] --packages com.qubole.spark/spark-rocksdb-state-store_2.11/1.0.0
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider")
spark.conf.set("spark.sql.streaming.stateStore.rocksDb.localDir", "file:///home/centos/rocksdb/rdb")
import org.apache.spark.sql.types.StructType
val schemaUntyped = new StructType() .add("a", "int") .add("b", "string")
val schemaUntyped1 = new StructType() .add("a1", "int") .add("b1", "string")

var stream1 = spark.readStream.schema(schemaUntyped).csv("file:///home/centos/rocksdb/s1")
var stream2 = spark.readStream.schema(schemaUntyped1).csv("file:///home/centos/rocksdb/s2")

stream1.join(stream2, stream2.col("b1").equalTo(stream1.col("b"))).writeStream.option("checkpointLocation", "file:///home/centos/rocksdb/checkpoint").format("console").start()```
3. sample data
```1,asd
2,dfsf
3,df
4,fdvfdv
5,fdv
6,dv```

Thanks, Please let me know if you need more Info. Thanks!

**full stack trace for Error::**
```2020-10-07 03:19:54 ERROR Utils:91 - Aborting task
java.lang.IllegalStateException: Error while creating OptimisticTransactionDb instance lock : file:/home/centos/rocksdb/rdb/db/state_-1619083917/0/0/LOCK: No locks available
	at org.apache.spark.sql.execution.streaming.state.OptimisticTransactionDbInstance.open(RocksDbInstance.scala:281)
	at org.apache.spark.sql.execution.streaming.state.OptimisticTransactionDbInstance.open(RocksDbInstance.scala:267)
	at org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider$RocksDbStateStore.initTransaction(RocksDbStateStoreProvider.scala:119)
	at org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider$RocksDbStateStore.get(RocksDbStateStoreProvider.scala:126)
	at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyToNumValuesStore.get(SymmetricHashJoinStateManager.scala:354)
	at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.append(SymmetricHashJoinStateManager.scala:84)
	at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$storeAndJoinWithOtherSide$1.apply(StreamingSymmetricHashJoinExec.scala:454)
	at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$storeAndJoinWithOtherSide$1.apply(StreamingSymmetricHashJoinExec.scala:440)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
	at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
	at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.rocksdb.RocksDBException: lock : file:/home/centos/rocksdb/rdb/db/state_-1619083917/0/0/LOCK: No locks available
	at org.rocksdb.OptimisticTransactionDB.open(Native Method)
	at org.rocksdb.OptimisticTransactionDB.open(OptimisticTransactionDB.java:40)
	at org.apache.spark.sql.execution.streaming.state.OptimisticTransactionDbInstance.open(RocksDbInstance.scala:276)
	... 27 more
2020-10-07 03:19:54 ERROR Utils:91 - Aborting task
java.lang.IllegalStateException: Error while creating OptimisticTransactionDb instance lock : file:/home/centos/rocksdb/rdb/db/state_-1619083917/0/1/LOCK: No locks available
	at org.apache.spark.sql.execution.streaming.state.OptimisticTransactionDbInstance.open(RocksDbInstance.scala:281)
	at org.apache.spark.sql.execution.streaming.state.OptimisticTransactionDbInstance.open(RocksDbInstance.scala:267)
	at org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider$RocksDbStateStore.initTransaction(RocksDbStateStoreProvider.scala:119)
	at org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider$RocksDbStateStore.get(RocksDbStateStoreProvider.scala:126)
	at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyToNumValuesStore.get(SymmetricHashJoinStateManager.scala:354)
	at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.append(SymmetricHashJoinStateManager.scala:84)
	at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$storeAndJoinWithOtherSide$1.apply(StreamingSymmetricHashJoinExec.scala:454)
	at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$storeAndJoinWithOtherSide$1.apply(StreamingSymmetricHashJoinExec.scala:440)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
	at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
	at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.rocksdb.RocksDBException: lock : file:/home/centos/rocksdb/rdb/db/state_-1619083917/0/1/LOCK: No locks available
	at org.rocksdb.OptimisticTransactionDB.open(Native Method)
	at org.rocksdb.OptimisticTransactionDB.open(OptimisticTransactionDB.java:40)
	at org.apache.spark.sql.execution.streaming.state.OptimisticTransactionDbInstance.open(RocksDbInstance.scala:276)
	... 27 more
2020-10-07 03:19:54 ERROR DataWritingSparkTask:70 - Aborting commit for partition 0 (task 2, attempt 0stage 2.0)
2020-10-07 03:19:54 ERROR DataWritingSparkTask:70 - Aborting commit for partition 1 (task 3, attempt 0stage 2.0)
2020-10-07 03:19:54 ERROR DataWritingSparkTask:70 - Aborted commit for partition 0 (task 2, attempt 0stage 2.0)
2020-10-07 03:19:54 ERROR DataWritingSparkTask:70 - Aborted commit for partition 1 (task 3, attempt 0stage 2.0)
2020-10-07 03:19:54 ERROR TaskContextImpl:91 - Error in TaskCompletionListener
java.lang.IllegalArgumentException: requirement failed: No DB to close
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.sql.execution.streaming.state.OptimisticTransactionDbInstance.close(RocksDbInstance.scala:346)
	at org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider$RocksDbStateStore.close(RocksDbStateStoreProvider.scala:205)
	at org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider$RocksDbStateStore.abort(RocksDbStateStoreProvider.scala:199)
	at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$StateStoreHandler.abortIfNeeded(SymmetricHashJoinStateManager.scala:314)
	at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.abortIfNeeded(SymmetricHashJoinStateManager.scala:258)
	at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anonfun$2$$anonfun$apply$1.apply(SymmetricHashJoinStateManager.scala:298)
	at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anonfun$2$$anonfun$apply$1.apply(SymmetricHashJoinStateManager.scala:298)
	at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:131)
	at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:117)
	at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:117)
	at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:130)
	at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:128)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:128)
	at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
2020-10-07 03:19:54 ERROR TaskContextImpl:91 - Error in TaskCompletionListener
java.lang.IllegalArgumentException: requirement failed: No DB to close
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.sql.execution.streaming.state.OptimisticTransactionDbInstance.close(RocksDbInstance.scala:346)
	at org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider$RocksDbStateStore.close(RocksDbStateStoreProvider.scala:205)
	at org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider$RocksDbStateStore.abort(RocksDbStateStoreProvider.scala:199)
	at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$StateStoreHandler.abortIfNeeded(SymmetricHashJoinStateManager.scala:314)
	at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.abortIfNeeded(SymmetricHashJoinStateManager.scala:258)
	at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anonfun$2$$anonfun$apply$1.apply(SymmetricHashJoinStateManager.scala:298)
	at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anonfun$2$$anonfun$apply$1.apply(SymmetricHashJoinStateManager.scala:298)
	at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:131)
	at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:117)
	at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:117)
	at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:130)
	at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:128)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:128)
	at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
2020-10-07 03:19:54 ERROR Executor:91 - Exception in task 0.0 in stage 2.0 (TID 2)
org.apache.spark.util.TaskCompletionListenerException: requirement failed: No DB to close

Previous exception in task: Error while creating OptimisticTransactionDb instance lock : file:/home/centos/rocksdb/rdb/db/state_-1619083917/0/0/LOCK: No locks available
	org.apache.spark.sql.execution.streaming.state.OptimisticTransactionDbInstance.open(RocksDbInstance.scala:281)
	org.apache.spark.sql.execution.streaming.state.OptimisticTransactionDbInstance.open(RocksDbInstance.scala:267)
	org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider$RocksDbStateStore.initTransaction(RocksDbStateStoreProvider.scala:119)
	org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider$RocksDbStateStore.get(RocksDbStateStoreProvider.scala:126)
	org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyToNumValuesStore.get(SymmetricHashJoinStateManager.scala:354)
	org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.append(SymmetricHashJoinStateManager.scala:84)
	org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$storeAndJoinWithOtherSide$1.apply(StreamingSymmetricHashJoinExec.scala:454)
	org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$storeAndJoinWithOtherSide$1.apply(StreamingSymmetricHashJoinExec.scala:440)
	scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
	org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
	scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
	org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
	org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
	org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
	org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
	org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
	org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
	org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	org.apache.spark.scheduler.Task.run(Task.scala:121)
	org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	java.lang.Thread.run(Thread.java:748)
	at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
	at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
2020-10-07 03:19:54 ERROR Executor:91 - Exception in task 1.0 in stage 2.0 (TID 3)
org.apache.spark.util.TaskCompletionListenerException: requirement failed: No DB to close

Previous exception in task: Error while creating OptimisticTransactionDb instance lock : file:/home/centos/rocksdb/rdb/db/state_-1619083917/0/1/LOCK: No locks available
	org.apache.spark.sql.execution.streaming.state.OptimisticTransactionDbInstance.open(RocksDbInstance.scala:281)
	org.apache.spark.sql.execution.streaming.state.OptimisticTransactionDbInstance.open(RocksDbInstance.scala:267)
	org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider$RocksDbStateStore.initTransaction(RocksDbStateStoreProvider.scala:119)
	org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider$RocksDbStateStore.get(RocksDbStateStoreProvider.scala:126)
	org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyToNumValuesStore.get(SymmetricHashJoinStateManager.scala:354)
	org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.append(SymmetricHashJoinStateManager.scala:84)
	org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$storeAndJoinWithOtherSide$1.apply(StreamingSymmetricHashJoinExec.scala:454)
	org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$storeAndJoinWithOtherSide$1.apply(StreamingSymmetricHashJoinExec.scala:440)
	scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
	org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
	scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
	org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
	org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
	org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
	org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
	org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
	org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
	org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	org.apache.spark.scheduler.Task.run(Task.scala:121)
	org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	java.lang.Thread.run(Thread.java:748)
	at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
	at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
2020-10-07 03:19:54 WARN  TaskSetManager:66 - Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): org.apache.spark.util.TaskCompletionListenerException: requirement failed: No DB to close

Previous exception in task: Error while creating OptimisticTransactionDb instance lock : file:/home/centos/rocksdb/rdb/db/state_-1619083917/0/0/LOCK: No locks available
	org.apache.spark.sql.execution.streaming.state.OptimisticTransactionDbInstance.open(RocksDbInstance.scala:281)
	org.apache.spark.sql.execution.streaming.state.OptimisticTransactionDbInstance.open(RocksDbInstance.scala:267)
	org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider$RocksDbStateStore.initTransaction(RocksDbStateStoreProvider.scala:119)
	org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider$RocksDbStateStore.get(RocksDbStateStoreProvider.scala:126)
	org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyToNumValuesStore.get(SymmetricHashJoinStateManager.scala:354)
	org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.append(SymmetricHashJoinStateManager.scala:84)
	org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$storeAndJoinWithOtherSide$1.apply(StreamingSymmetricHashJoinExec.scala:454)
	org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$storeAndJoinWithOtherSide$1.apply(StreamingSymmetricHashJoinExec.scala:440)
	scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
	org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
	scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
	org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
	org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
	org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
	org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
	org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
	org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
	org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	org.apache.spark.scheduler.Task.run(Task.scala:121)
	org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	java.lang.Thread.run(Thread.java:748)
	at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
	at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

2020-10-07 03:19:54 ERROR TaskSetManager:70 - Task 0 in stage 2.0 failed 1 times; aborting job
2020-10-07 03:19:54 WARN  TaskSetManager:66 - Lost task 1.0 in stage 2.0 (TID 3, localhost, executor driver): org.apache.spark.util.TaskCompletionListenerException: requirement failed: No DB to close

Previous exception in task: Error while creating OptimisticTransactionDb instance lock : file:/home/centos/rocksdb/rdb/db/state_-1619083917/0/1/LOCK: No locks available
	org.apache.spark.sql.execution.streaming.state.OptimisticTransactionDbInstance.open(RocksDbInstance.scala:281)
	org.apache.spark.sql.execution.streaming.state.OptimisticTransactionDbInstance.open(RocksDbInstance.scala:267)
	org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider$RocksDbStateStore.initTransaction(RocksDbStateStoreProvider.scala:119)
	org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider$RocksDbStateStore.get(RocksDbStateStoreProvider.scala:126)
	org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyToNumValuesStore.get(SymmetricHashJoinStateManager.scala:354)
	org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.append(SymmetricHashJoinStateManager.scala:84)
	org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$storeAndJoinWithOtherSide$1.apply(StreamingSymmetricHashJoinExec.scala:454)
	org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$storeAndJoinWithOtherSide$1.apply(StreamingSymmetricHashJoinExec.scala:440)
	scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
	org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
	scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
	org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
	org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
	org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
	org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
	org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
	org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
	org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	org.apache.spark.scheduler.Task.run(Task.scala:121)
	org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	java.lang.Thread.run(Thread.java:748)
	at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
	at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

2020-10-07 03:19:54 ERROR WriteToDataSourceV2Exec:70 - Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@2c643740 is aborting.
2020-10-07 03:19:54 ERROR WriteToDataSourceV2Exec:70 - Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@2c643740 aborted.
2020-10-07 03:19:54 ERROR MicroBatchExecution:91 - Query [id = 0ae82f91-8457-4efd-983e-93abe15c5caf, runId = aedfa63f-e4fc-4dd1-9ec9-1584394f0d3a] terminated with error
org.apache.spark.SparkException: Writing job aborted.
	at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
	at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2783)
	at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2783)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
	at org.apache.spark.sql.Dataset.collect(Dataset.scala:2783)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:532)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:531)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): org.apache.spark.util.TaskCompletionListenerException: requirement failed: No DB to close

Previous exception in task: Error while creating OptimisticTransactionDb instance lock : file:/home/centos/rocksdb/rdb/db/state_-1619083917/0/0/LOCK: No locks available
	org.apache.spark.sql.execution.streaming.state.OptimisticTransactionDbInstance.open(RocksDbInstance.scala:281)
	org.apache.spark.sql.execution.streaming.state.OptimisticTransactionDbInstance.open(RocksDbInstance.scala:267)
	org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider$RocksDbStateStore.initTransaction(RocksDbStateStoreProvider.scala:119)
	org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider$RocksDbStateStore.get(RocksDbStateStoreProvider.scala:126)
	org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyToNumValuesStore.get(SymmetricHashJoinStateManager.scala:354)
	org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.append(SymmetricHashJoinStateManager.scala:84)
	org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$storeAndJoinWithOtherSide$1.apply(StreamingSymmetricHashJoinExec.scala:454)
	org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$storeAndJoinWithOtherSide$1.apply(StreamingSymmetricHashJoinExec.scala:440)
	scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
	org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
	scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
	org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
	org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
	org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
	org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
	org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
	org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
	org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	org.apache.spark.scheduler.Task.run(Task.scala:121)
	org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	java.lang.Thread.run(Thread.java:748)
	at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
	at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:64)
	... 35 more```

No SST files found

I'm trying to use this with structure streaming jobs and all my queries terminate with this exception

Error reading snapshot file $fileToRead of $this: No SST files found.

Do you mind explaining why this lib is expecting sst files and possible causes for the error?

It runs fine for some hour but after that it starts giving memoy issue.

I would like to thanks for all work done on this rock db based state lookup. Its greate saviour. In our system we use state store for deduplication work and unique grows to more than 100 millions. I see it runs smoothly for some good no of batches but after that lot of gc starts happening and then container gets killed.

Any suggestion to come out of this problem

Below is some lines from log

2020-09-01 06:08:05,418 INFO org.apache.spark.executor.Executor:57 - Running task 7.1 in stage 280.0 (TID 452466)
2020-09-01 06:08:05,446 INFO org.apache.spark.executor.Executor:57 - Fetching spark://gpm302.sjc2.asn.net:42759/jars/flowlog-experiment.jar with timestamp 1598925065034
2020-09-01 06:08:05,471 INFO org.apache.spark.network.client.TransportClientFactory:272 - Successfully created connection to gpm302.sjc2.asn.net/10.0.1.171:42759 after 1 ms (0 ms spent in bootstraps)
2020-09-01 06:08:05,475 INFO org.apache.spark.util.Utils:57 - Fetching spark://gpm302.sjc2.asn.net:42759/jars/flowlog-experiment.jar to /data1/yarn/nm/usercache/root/appcache/application_1597935678144_0210/spark-44780dbc-a224-4c07-bd5c-537db7f7df45/fetchFileTemp490451002513480794.tmp
2020-09-01 06:08:05,580 INFO org.apache.spark.util.Utils:57 - Copying /data1/yarn/nm/usercache/root/appcache/application_1597935678144_0210/spark-44780dbc-a224-4c07-bd5c-537db7f7df45/420339351598925065034_cache to /data2/yarn/nm/usercache/root/appcache/application_1597935678144_0210/container_1597935678144_0210_01_004469/./flowlog-experiment.jar
2020-09-01 06:08:05,590 INFO org.apache.spark.executor.Executor:57 - Adding file:/data2/yarn/nm/usercache/root/appcache/application_1597935678144_0210/container_1597935678144_0210_01_004469/./flowlog-experiment.jar to class loader
2020-09-01 06:08:05,614 INFO org.apache.spark.MapOutputTrackerWorker:57 - Updating epoch to 182 and clearing cache
2020-09-01 06:08:05,668 INFO org.apache.spark.broadcast.TorrentBroadcast:57 - Started reading broadcast variable 455
2020-09-01 06:08:05,746 INFO org.apache.spark.network.client.TransportClientFactory:272 - Successfully created connection to gpm403.sjc2.asn.net/10.0.1.215:34457 after 2 ms (0 ms spent in bootstraps)
2020-09-01 06:08:05,776 INFO org.apache.spark.storage.memory.MemoryStore:57 - Block broadcast_455_piece0 stored as bytes in memory (estimated size 59.8 KB, free 486.1 MB)
2020-09-01 06:08:05,816 INFO org.apache.spark.broadcast.TorrentBroadcast:57 - Reading broadcast variable 455 took 148 ms
2020-09-01 06:08:06,150 INFO org.apache.spark.storage.memory.MemoryStore:57 - Block broadcast_455 stored as values in memory (estimated size 195.8 KB, free 486.0 MB)
3.780: [GC (Metadata GC Threshold) 3.839: [SoftReference, 0 refs, 0.0000340 secs]3.840: [WeakReference, 424 refs, 0.0001105 secs]3.840: [FinalReference, 3790 refs, 0.0073865 secs]3.847: [PhantomReference, 0 refs, 7 refs, 0.0000188 secs]3.847: [JNI Weak Reference, 0.0000132 secs]AdaptiveSizePolicy::update_averages: survived: 64982432 promoted: 427728 overflow: true
AdaptiveSizeStart: 3.848 collection: 4
PSAdaptiveSizePolicy::compute_eden_space_size limits: desired_eden_size: 1316299653 old_eden_size: 943718400 eden_limit: 943718400 cur_eden: 780140544 max_eden_size: 943718400 avg_young_live: 38897008
PSAdaptiveSizePolicy::compute_eden_space_size: costs minor_time: 0.078534 major_cost: 0.035737 mutator_cost: 0.885729 throughput_goal: 0.990000 live_space: 346373984 free_space: 1438646272 old_eden_size: 943718400 desired_eden_size: 943718400
AdaptiveSizePolicy::survivor space sizes: collection: 4 (65011712, 65011712) -> (65011712, 65011712)
AdaptiveSizeStop: collection: 4
[PSYoungGen: 397975K->63459K(825344K)] 436102K->102003K(1347072K), 0.0689854 secs] [Times: user=0.26 sys=0.02, real=0.07 secs]
3.849: [Full GC (Metadata GC Threshold) 3.855: [SoftReference, 0 refs, 0.0000431 secs]3.855: [WeakReference, 207 refs, 0.0000364 secs]3.855: [FinalReference, 120 refs, 0.0000233 secs]3.855: [PhantomReference, 0 refs, 8 refs, 0.0000136 secs]3.855: [JNI Weak Reference, 0.0000149 secs]AdaptiveSizeStart: 3.917 collection: 5
PSAdaptiveSizePolicy::compute_eden_space_size limits: desired_eden_size: 1306004136 old_eden_size: 943718400 eden_limit: 943718400 cur_eden: 780140544 max_eden_size: 943718400 avg_young_live: 31117606
PSAdaptiveSizePolicy::compute_eden_space_size: costs minor_time: 0.078534 major_cost: 0.037974 mutator_cost: 0.883492 throughput_goal: 0.990000 live_space: 338594560 free_space: 1438646272 old_eden_size: 943718400 desired_eden_size: 943718400
PSAdaptiveSizePolicy::compute_old_gen_free_space: costs minor_time: 0.078534 major_cost: 0.037974 mutator_cost: 0.883492 throughput_goal: 0.990000 live_space: 370662912 free_space: 1438646272 old_promo_size: 494927872 desired_promo_size: 656408576
AdaptiveSizePolicy::old generation size: collection: 5 (534249472) -> (760217600)
AdaptiveSizeStop: collection: 5
[PSYoungGen: 63459K->0K(825344K)] [ParOldGen: 38544K->100759K(742400K)] 102003K->100759K(1567744K), [Metaspace: 34681K->34681K(1079296K)], 0.0684949 secs] [Times: user=0.32 sys=0.03, real=0.07 secs]
2020-09-01 06:08:06,710 INFO org.apache.spark.broadcast.TorrentBroadcast:57 - Started reading broadcast variable 453
2020-09-01 06:08:06,727 INFO org.apache.spark.network.client.TransportClientFactory:272 - Successfully created connection to gpm403.sjc2.asn.net/10.0.1.215:36054 after 0 ms (0 ms spent in bootstraps)
2020-09-01 06:08:06,735 INFO org.apache.spark.storage.memory.MemoryStore:57 - Block broadcast_453_piece0 stored as bytes in memory (estimated size 32.8 KB, free 485.9 MB)
2020-09-01 06:08:06,763 INFO org.apache.spark.broadcast.TorrentBroadcast:57 - Reading broadcast variable 453 took 52 ms
2020-09-01 06:08:06,842 INFO org.apache.spark.storage.memory.MemoryStore:57 - Block broadcast_453 stored as values in memory (estimated size 441.5 KB, free 485.5 MB)
2020-09-01 06:08:06,847 INFO org.apache.spark.sql.execution.streaming.state.StateStore:57 - State Store maintenance task started
2020-09-01 06:08:07,603 INFO org.apache.spark.sql.execution.streaming.state.StateStore:57 - Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@1788d316
2020-09-01 06:08:07,609 INFO org.apache.spark.sql.execution.streaming.state.StateStore:57 - Reported that the loaded instance StateStoreProviderId(StateStoreId(hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state,0,7,default),bb567619-96dd-4f9d-bfb4-1ee530bc0772) is active
2020-09-01 06:08:07,610 INFO org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider:57 - get Store for version 44
2020-09-01 06:08:07,616 INFO org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider:57 - Loading state into the db for 44 and partition 7
2020-09-01 06:08:07,752 INFO org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider:57 - Will download hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state/0/7/33.snapshot at location file:/data1/rock/tmp/state_-191076869/0/7/33.tar
2020-09-01 06:08:10,111 INFO org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider:57 - Read delta file for version 44 of RocksDbStateStoreProvider[id = (op=0,part=7),dir = hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state/0/7] from hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state/0/7/34.delta
2020-09-01 06:08:10,912 INFO org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider:57 - Read delta file for version 44 of RocksDbStateStoreProvider[id = (op=0,part=7),dir = hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state/0/7] from hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state/0/7/35.delta
2020-09-01 06:08:11,696 INFO org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider:57 - Read delta file for version 44 of RocksDbStateStoreProvider[id = (op=0,part=7),dir = hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state/0/7] from hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state/0/7/36.delta
2020-09-01 06:08:12,247 INFO org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider:57 - Read delta file for version 44 of RocksDbStateStoreProvider[id = (op=0,part=7),dir = hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state/0/7] from hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state/0/7/37.delta
9.746: [GC (Allocation Failure) 9.752: [SoftReference, 0 refs, 0.0000507 secs]9.752: [WeakReference, 120 refs, 0.0000483 secs]9.753: [FinalReference, 4223 refs, 0.0066288 secs]9.759: [PhantomReference, 0 refs, 5 refs, 0.0000135 secs]9.759: [JNI Weak Reference, 0.0000095 secs]AdaptiveSizePolicy::update_averages: survived: 14827544 promoted: 8192 overflow: false
AdaptiveSizeStart: 9.760 collection: 6
PSAdaptiveSizePolicy::compute_eden_space_size limits: desired_eden_size: 1256291548 old_eden_size: 943718400 eden_limit: 943718400 cur_eden: 780140544 max_eden_size: 943718400 avg_young_live: 28511196
PSAdaptiveSizePolicy::compute_eden_space_size: costs minor_time: 0.059481 major_cost: 0.037974 mutator_cost: 0.902545 throughput_goal: 0.990000 live_space: 368056512 free_space: 1600126976 old_eden_size: 943718400 desired_eden_size: 943718400
AdaptiveSizePolicy::survivor space sizes: collection: 6 (65011712, 65011712) -> (65011712, 65011712)
AdaptiveSizeStop: collection: 6
[PSYoungGen: 761856K->14480K(985088K)] 862615K->115247K(1727488K), 0.0140488 secs] [Times: user=0.02 sys=0.01, real=0.01 secs]
2020-09-01 06:08:13,177 INFO org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider:57 - Read delta file for version 44 of RocksDbStateStoreProvider[id = (op=0,part=7),dir = hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state/0/7] from hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state/0/7/38.delta
2020-09-01 06:08:13,812 INFO org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider:57 - Read delta file for version 44 of RocksDbStateStoreProvider[id = (op=0,part=7),dir = hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state/0/7] from hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state/0/7/39.delta
2020-09-01 06:08:15,667 INFO org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider:57 - Read delta file for version 44 of RocksDbStateStoreProvider[id = (op=0,part=7),dir = hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state/0/7] from hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state/0/7/40.delta
2020-09-01 06:08:16,221 INFO org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider:57 - Read delta file for version 44 of RocksDbStateStoreProvider[id = (op=0,part=7),dir = hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state/0/7] from hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state/0/7/41.delta
2020-09-01 06:08:17,563 INFO org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider:57 - Read delta file for version 44 of RocksDbStateStoreProvider[id = (op=0,part=7),dir = hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state/0/7] from hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state/0/7/42.delta
2020-09-01 06:08:18,363 INFO org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider:57 - Read delta file for version 44 of RocksDbStateStoreProvider[id = (op=0,part=7),dir = hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state/0/7] from hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state/0/7/43.delta
19.688: [GC (Allocation Failure) 19.692: [SoftReference, 0 refs, 0.0000358 secs]19.692: [WeakReference, 129 refs, 0.0000321 secs]19.692: [FinalReference, 193 refs, 0.0002764 secs]19.692: [PhantomReference, 0 refs, 7 refs, 0.0000168 secs]19.692: [JNI Weak Reference, 0.0000128 secs]AdaptiveSizePolicy::update_averages: survived: 4684216 promoted: 8192 overflow: false
AdaptiveSizeStart: 19.693 collection: 7
avg_survived_padded_avg: 81432944.000000 avg_promoted_padded_avg: 375078.312500 avg_pretenured_padded_avg: 0.000000 tenuring_thresh: 6 target_size: 81788928
PSAdaptiveSizePolicy::compute_eden_space_size limits: desired_eden_size: 1454189753 old_eden_size: 943718400 eden_limit: 943718400 cur_eden: 943718400 max_eden_size: 943718400 avg_young_live: 25175418
PSAdaptiveSizePolicy::compute_eden_space_size: costs minor_time: 0.044743 major_cost: 0.037974 mutator_cost: 0.917283 throughput_goal: 0.990000 live_space: 364720704 free_space: 1600126976 old_eden_size: 943718400 desired_eden_size: 943718400
AdaptiveSizePolicy::survivor space sizes: collection: 7 (65011712, 65011712) -> (4718592, 81788928)
AdaptiveSizeStop: collection: 7
[PSYoungGen: 936080K->4574K(926208K)] 1036847K->105350K(1668608K), 0.0055100 secs] [Times: user=0.02 sys=0.00, real=0.01 secs]
2020-09-01 06:08:24,290 INFO org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider:57 - Read delta file for version 44 of RocksDbStateStoreProvider[id = (op=0,part=7),dir = hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state/0/7] from hdfs://nos-test5.sjc2.asn.net:8020/user/spark/flowlog/store/chkPoint/delta5_nx/state/0/7/44.delta
2020-09-01 06:08:26,519 ERROR org.apache.spark.executor.CoarseGrainedExecutorBackend:43 - RECEIVED SIGNAL TERM
2020-09-01 06:08:26,523 INFO org.apache.spark.storage.DiskBlockManager:57 - Shutdown hook called
2020-09-01 06:08:26,524 INFO org.apache.spark.util.ShutdownHookManager:57 - Shutdown hook called
2020-09-01 06:08:26,525 INFO org.apache.spark.util.ShutdownHookManager:57 - Deleting directory /data2/yarn/nm/usercache/root/appcache/application_1597935678144_0210/spark-931b8dc9-0ebb-418e-865a-96383367cc33
2020-09-01 06:08:26,528 INFO org.apache.spark.util.ShutdownHookManager:57 - Deleting directory /data3/yarn/nm/usercache/root/appcache/application_1597935678144_0210/spark-ad2cdb9e-7ada-438b-adb6-4897a7866b26
2020-09-01 06:08:26,531 INFO org.apache.spark.util.ShutdownHookManager:57 - Deleting directory /data1/yarn/nm/usercache/root/appcache/application_1597935678144_0210/spark-44780dbc-a224-4c07-bd5c-537db7f7df45
Heap
PSYoungGen total 926208K, used 137479K [0x0000000780000000, 0x00000007c0000000, 0x00000007c0000000)
eden space 921600K, 14% used [0x0000000780000000,0x00000007881ca258,0x00000007b8400000)
from space 4608K, 99% used [0x00000007b8400000,0x00000007b88779b8,0x00000007b8880000)
to space 79872K, 0% used [0x00000007bb200000,0x00000007bb200000,0x00000007c0000000)
ParOldGen total 742400K, used 100775K [0x0000000700000000, 0x000000072d500000, 0x0000000780000000)
object space 742400K, 13% used [0x0000000700000000,0x0000000706269fb8,0x000000072d500000)
Metaspace used 47294K, capacity 48058K, committed 48256K, reserved 1091584K
class space used 6272K, capacity 6446K, committed 6528K, reserved 1048576K

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.