Giter Site home page Giter Site logo

hortonworks-spark / shc Goto Github PK

View Code? Open in Web Editor NEW
552.0 552.0 282.0 583 KB

The Apache Spark - Apache HBase Connector is a library to support Spark accessing HBase table as external data source or sink.

License: Apache License 2.0

Scala 95.67% Java 2.82% Shell 1.50%

shc's People

Contributors

btomala avatar chetkhatri avatar davidov541 avatar dongjoon-hyun avatar hkothari avatar ivanmalamen avatar jerryshao avatar khampson avatar lins05 avatar ludochane avatar merlintang avatar mridulm avatar nikolaytsvetkov avatar rayokota avatar shanecurcuru avatar shubhamchopra avatar weiqingy avatar zhzhan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

shc's Issues

java.lang.IllegalArgumentException: Can not create a Path from a null string

Hi

I'm unable to insert data to HBase, because my job is failing with exception:

App > 16/07/06 12:16:58 task-result-getter-1 WARN TaskSetManager: Lost task 3.0 in stage 0.0 (TID 3, ip-172-31-19-56.eu-west-1.compute.internal): java.lang.IllegalArgumentException: Can not create a Path from a null string
App > at org.apache.hadoop.fs.Path.checkPathArg(Path.java:125)
App > at org.apache.hadoop.fs.Path.(Path.java:137)
App > at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1197)
App > at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1190)
App > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
App > at org.apache.spark.scheduler.Task.run(Task.scala:89)
App > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
App > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
App > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
App > at java.lang.Thread.run(Thread.java:745)

my hbase-site.xml is:

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
</property>
<property>
    <name>zookeeper.recovery.retry</name>
    <value>3</value>
</property>
<property>
    <name>hbase.regionserver.info.port</name>
    <value>16030</value>
</property>
<property>
    <name>hbase.zookeeper.quorum</name>
    <value>remote_IP</value>
</property>
<property>
    <name>hbase.rootdir</name>
    <value>hdfs://remote_IP:9000/hbase</value>
</property>
<property>
    <name>hbase.fs.tmp.dir</name>
    <value>/user/${user.name}/hbase-staging</value>
</property>
<property>
    <name>hadoop.tmp.dir</name>
    <value>/tmp/hadoop-${user.name}</value>
</property>
    <!-- Put any other property here, it will be used -->
</configuration>

my Hbase version:
HBase Version 1.0.0, revision=984db9a1cae088b996e997db9ce83f6d4bd565ad

Any suggesions?

BulkLoad support

No idea how difficult it would be, but having BulkLoad support would be great.

Support of Array data type

Hello,
We store arrays in Hbase serialized with Avro schema and we need to get the data deserialized by the connector. We have tried using the Avro feature that worked perfectly for other complex types, but in this case we serialize the data only using a simple array schema {"type": "array", "items": ["long","null"]} and not the complete record schema.
The connector fails with ClassCastException while trying to deserialize and cast to GenericRecord.
Do you have any plans of supporting such schemas or array types out of the box ?
Thank you in advance!

Cheers,
Nikolay

Multiple columns in a column family support works on read but not on write/update

Summary: The catalog definition to help with movement from dataframe to hbase does not appear to consistently support having multiple columns associated to one column family when loading/saving data

1. Write mode error when more than one column comes from same column family
In spark shell define the catalog:
def empcatalog = s"""{
|"table":{"namespace":"default", "name":"emp"},
|"rowkey":"key",
|"columns":{
|"empNumber":{"cf":"rowkey", "col":"key", "type":"string"},
|"city":{"cf":"personal data", "col":"city", "type":"string"},
|"empName":{"cf":"personal data", "col":"name", "type":"string"},
|"jobDesignation":{"cf":"professional data", "col":"designation", "type":"string"},
|"salary":{"cf":"professional data", "col":"salary", "type":"string"}
|}
|}""".stripMargin

Define Case class:
case class HBaseRecordEmp(
empNumber:String,
city:String,
empName:String,
jobDesignation:String,
salary:String)

create some dummy data with spark and try to write and it says column family already created:
val data = (4 to 10).map { i => {
val name = s"""Bobby${"%03d".format(i)}"""
HBaseRecordEmp(i,
s"MyCity",
name,
"worker",
"5000")
}
}

sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()

ERROR:
java.lang.IllegalArgumentException: Family 'professional' already exists so cannot be added
at org.apache.hadoop.hbase.HTableDescriptor.addFamily(HTableDescriptor.java:829)

*2. Go to HBase shell and create the table with 2 column families, each with two columns as above manually and add some data, this works on READ *
def withCatalog(cat: String): DataFrame = {sqlContext.read.options(Map(HBaseTableCatalog.tableCatalog->cat)).format("org.apache.spark.sql.execution.datasources.hbase").load()}

val df = withCatalog(empcatalog)
df.show

**3. Now that the table exists in Hbase with the 2 column families as expected, add some dummy data in the spark shell, and attempt the write again. This will work if you change the SaveMode to "append".

It seems like the hbase connector should support multiple columns in one column family as expected and this behavior is inconsistent.

org.apache.hadoop.hbase.client.RetriesExhaustedException

I am trying to use shc in HDP. Spark version in the cluster is 1.5.2, command I run is:
spark-submit --class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource --master yarn-client --jars /usr/hdp/current/hbase-client/lib/htrace-core-3.1.0-incubating.jar,/usr/hdp/current/hbase-client/lib/hbase-client.jar,/usr/hdp/current/hbase-client/lib/hbase-common.jar,/usr/hdp/current/hbase-client/lib/hbase-server.jar,/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar,/usr/hdp/current/hbase-client/lib/hbase-protocol.jar --files /usr/hdp/current/hbase-client/conf/hbase-site.xml, /usr/hdp/current/hbase-client/conf/hdfs-site.xml /path/to/hbase-spark-connector-1.0.0.jar

Exception:

Exception in thread "main" org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=36, exceptions:
Wed Aug 10 14:55:21 EDT 2016, null, java.net.SocketTimeoutException: callTimeout=60000, callDuration=68118: row 'table2,,00000000000000' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname={hostName},16020,1470239773946, seqNum=0
        at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.throwEnrichedException(RpcRetryingCallerWithReadReplicas.java:271)
        at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:195)
        at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
        at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
        at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
        at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
        at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
        at org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:155)
        at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
        at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:193)
        at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:89)
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.isTableAvailable(ConnectionManager.java:991)
        at org.apache.hadoop.hbase.client.HBaseAdmin.isTableAvailable(HBaseAdmin.java:1400)
        at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.createTable(HBaseRelation.scala:95)
        at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:66)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:170)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
        at org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource$.main(HBaseSource.scala:92)
        at org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource.main(HBaseSource.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:685)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.net.SocketTimeoutException: callTimeout=60000, callDuration=68118: row 'table2,,00000000000000' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname={hostName},16020,1470239773946, seqNum=0
        at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:159)
        at org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture.run(ResultBoundedCompletionService.java:64)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.hbase.exceptions.ConnectionClosingException: Call to {hostname}/{ip}:16020 failed on local exception: org.apache.hadoop.hbase.exceptions.ConnectionClosingException: Connection to {hostname}/{ip}:16020 is closing. Call id=9, waitTime=1
        at org.apache.hadoop.hbase.ipc.RpcClientImpl.wrapException(RpcClientImpl.java:1259)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:12e se30)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:213)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:287)
        at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.scan(ClientProtos.java:32651)
        at org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:372)
        at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:199)
        at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:62)
        at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
        at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:346)
        at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:320)
        at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:126)
        ... 4 more
Caused by: org.apache.hadoop.hbase.exceptions.ConnectionClosingException: Connection to {hostName}/{ip}:16020 is closing. Call id=9, waitTime=1
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.cleanupCalls(RpcClientImpl.java:1047)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.close(RpcClientImpl.java:846)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.run(RpcClientImpl.java:574)

Cluster is using kerboros authentication. RegionServers are running well in the cluster and I can create or drop table through Hbase-shell.
I guess it should be permission and configuration problem but cannot figure it out.

Connection refused, while running examples provided using spark 1.6.0 & hbase 1.2.0

Get Connection refused error, while running examples provided here using spark 1.6.0 & hbase 1.2.0

Error:
16/08/27 12:35:01 WARN zookeeper.ClientCnxn: Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:350)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)

Starting spark-shell
export HADOOP_HOME=/opt/cloudera/parcels/CDH
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:/etc/hadoop/conf}
HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/lib/*
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/etc/hbase/conf
export SPARK_HOME=/opt/cloudera/parcels/CDH/lib/spark
export SPARK_JAR_HDFS_PATH=/opt/cloudera/parcels/CDH/lib/spark/lib/spark-assembly.jar
export SPARK_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/hbase-client-1.2.0-cdh5.7.1.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/hbase-common-1.2.0-cdh5.7.1.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/hbase-server-1.2.0-cdh5.7.1.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/guava-12.0.1.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/hbase-protocol-1.2.0-cdh5.7.1.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.2.0-incubating.jar

spark-shell --master yarn-client --num-executors 2 --driver-memory 512m --executor-memory 512m --executor-cores 1 --jars jars/shc-0.0.11-1.6.1-s_2.10.jar --files /etc/hbase/conf/hbase-site.xml,/etc/hbase/conf/hdfs-site.xml,/etc/hbase/conf/core-site.xml

Multiple copies of binary data during Result -> Row conversion

Please look at:
https://github.com/hortonworks-spark/shc/blob/master/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseTableScan.scala#L115
and
https://github.com/hortonworks-spark/shc/blob/master/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/Utils.scala#L58

CellUtil.clone will already create a copy of the data. Another copy is being made within Utils.scala. Generally, binary data (blobs) can be fairly large, so copying may be an expensive operation.

shc 2.11 - Catalogue loading fails

I am using shc 2.11 with spark 2.0.1. In following example catalogue loading I am using sparkSession instead of sqlContext . Looks like it tries to create a directory similar to my cwd on hdfs! Is there a way I can configure different temp directory for it? Also is this proper way to load catalogue with spark 2?

def withCatalog(cat: String) = { sparkSession .read .options(Map(HBaseTableCatalog.tableCatalog -> cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load }

Caused by: org.apache.spark.SparkException: Unable to create database default as failed to create its directory hdfs:///home/centos/myapp/spark-warehouse at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.liftedTree1$1(InMemoryCatalog.scala:114) at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.createDatabase(InMemoryCatalog.scala:108) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:147) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.<init>(SessionCatalog.scala:89) at org.apache.spark.sql.internal.SessionState.catalog$lzycompute(SessionState.scala:95) at org.apache.spark.sql.internal.SessionState.catalog(SessionState.scala:95) at org.apache.spark.sql.internal.SessionState$$anon$1.<init>(SessionState.scala:112) at org.apache.spark.sql.internal.SessionState.analyzer$lzycompute(SessionState.scala:112) at org.apache.spark.sql.internal.SessionState.analyzer(SessionState.scala:111) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64) at org.apache.spark.sql.SparkSession.baseRelationToDataFrame(SparkSession.scala:382) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:143) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:122) at com.mypckg.DFInitializer.withCatalog(DFInitializer.scala:78)

Reason for numRegion < 3 condition in HBaseRelation

I can see the following code in HBaseRelation.scala

if (catalog.numReg > 3) {
      val tName = TableName.valueOf(catalog.name)
      val cfs = catalog.getColumnFamilies

      val connection = HBaseConnectionCache.getConnection(hbaseConf)
      // Initialize hBase table if necessary
      val admin = connection.getAdmin

      // The names of tables which are created by the Examples has prefix "shcExample"
      if (admin.isTableAvailable(tName) && tName.toString.startsWith("shcExample")){
        admin.disableTable(tName)
        admin.deleteTable(tName)
      }

      if (!admin.isTableAvailable(tName)) {
        val tableDesc = new HTableDescriptor(tName)
        cfs.foreach { x =>
         val cf = new HColumnDescriptor(x.getBytes())
          logDebug(s"add family $x to ${catalog.name}")
          tableDesc.addFamily(cf)
        }
        val startKey = Bytes.toBytes("aaaaaaa")
        val endKey = Bytes.toBytes("zzzzzzz")
        val splitKeys = Bytes.split(startKey, endKey, catalog.numReg - 3)

I am curious to know the reason for this if condition and I also checked this in HBase shell. By default Hbase creates 3 extra regions. Why so ?

How to load a csv into HBaseRecord

Please help with an example of loading a csv into HBaseRecord
The example provided uses dummy data , but I am looking for something which can help bulk load csv
object HBaseRecord { def apply(i: Int): HBaseRecord = { val s = s"""row${"%03d".format(i)}"""
HBaseRecord(s, i % 2 == 0, i.toDouble, i.toFloat, i, i.toLong, i.toShort,
s"String$i extra", i.toByte) }}

SHC is not working on Spark 1.6.2 and later

While trying to save Dataframe to HBase I'm getting an error

Caused by: java.lang.IncompatibleClassChangeError: Found class org.apache.spark.sql.catalyst.expressions.MutableRow, but interface was expected
    at org.apache.spark.sql.execution.datasources.hbase.Utils$.setRowCol(Utils.scala:61)
    at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$buildRow$1.apply(HBaseTableScan.scala:120)
    at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$buildRow$1.apply(HBaseTableScan.scala:101)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD.buildRow(HBaseTableScan.scala:101)
    at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anon$3.next(HBaseTableScan.scala:190)
    at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anon$3.next(HBaseTableScan.scala:180)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:140)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:130)
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

It seems like SHC does not support new versions of Catalyst API

Is there any quick fix or workaround for that issue?
Especially it is interesting for current version of Spark 1.6.2 in HDP 2.5.

Modify HBaseTableScan to log amount of time to return rows at level info instead of debug

At line line 194 in the HBaseTableScan (https://github.com/hortonworks-spark/shc/blob/master/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseTableScan.scala#L194) the number of rows is logged at the log level debug. By changing the log level to info, users can collect metrics about what percentage of their spark job is time spent taking data out of HBase without having to sort though many of the other debug log messages.

Add some logging to see the desired effect with fewer connections after connection sharing

Constant log impacts performance. Instead of logging, what about adding a new API so users can call at any time for statistics information, like total connection creation requests, total connection close requests, current alive connections, number of connections that have actually been created, etc. Users can do whatever they want with it; print it or log it, or just some assertions.

Support custom sedes for composite row key

The current composite row key support assumes the row key is like "part1:part2:..partn", where each part is either a well-defined data type like int or long, or any custom serdes with fixed bytes length.

But sometimes the row key may be composite and at the same time each part can be varaible length. For example, the bytes array generated by Bytes.toBytes(Long) has a variable length based on the value/scale of the long.

I'd like to propose to support custom row key serdes to suppose the situation described above. For example, we can add a trait like this:

// Sedes for composite row key
trait RowSedes {
  def components: Int
  def serialize(value: Any*): Array[Byte]
  def deserialize(bytes: Array[Byte], start: Int, end: Int): Seq[Any]
}

And pass it with rowSedes field in the catalog:

  "table": {"namespace": "default", "name": "tbl"},
  "rowkey": "part1:part2:part3",
  "rowSedes": "com.example.spark.CustomRowSedes",
   ...

@weiqingy @dongjoon-hyun What do you think?

NullPointerException during connection creation.

I am hitting an issue while submitting an example with yarn-cluster deploy mode.

16/07/21 11:08:55 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, cdh52.vm.com): java.lang.NullPointerException at org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:43) at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:214) at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119) at org.apache.spark.sql.execution.datasources.hbase.TableResource.init(HBaseResources.scala:126) at org.apache.spark.sql.execution.datasources.hbase.ReferencedResource$class.liftedTree1$1(HBaseResources.scala:57) at org.apache.spark.sql.execution.datasources.hbase.ReferencedResource$class.acquire(HBaseResources.scala:54) at org.apache.spark.sql.execution.datasources.hbase.TableResource.acquire(HBaseResources.scala:121) at org.apache.spark.sql.execution.datasources.hbase.ReferencedResource$class.releaseOnException(HBaseResources.scala:74) at org.apache.spark.sql.execution.datasources.hbase.TableResource.releaseOnException(HBaseResources.scala:121) at org.apache.spark.sql.execution.datasources.hbase.TableResource.getScanner(HBaseResources.scala:145) at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$9.apply(HBaseTableScan.scala:277) at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$9.apply(HBaseTableScan.scala:276) at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I have the Hbase-site.xml in the classpath and present in the spark-conf dir too.

Fail to insert a basic Dataframe + jar (shc-core-1.0.1-1.6-s_2.10.jar) on Horton Public Repo is doc instead of jar of class !!!

Hello,

The command line given are from my sparkshell:
spark-shell --master yarn \
--deploy-mode client \
--name "hive2hbase" \
--repositories "http://repo.hortonworks.com/content/groups/public/" \
--packages "com.hortonworks:shc:1.0.1-1.6-s_2.10" \
--jars "shc-core-1.0.1-1.6-s_2.10.jar"
--files "/usr/hdp/current/hive-client/conf/hive-site.xml" \
--driver-memory 1G \
--executor-memory 1500m \
--num-executors 6 2> ./spark-shell.log

I have a simple Dataframe of Row of count 5:

scala> newDf
res5: org.apache.spark.sql.DataFrame = [offer_id: int, offer_label: string, universe: string, category: string, sub_category: string, sub_label: string]

That is made of type Row

scala> newDf.take(1)
res6: Array[org.apache.spark.sql.Row] = Array([28896458,Etui de protection bleu pour li...liseuse Cybook Muse Light liseuse Cybook Muse Light liseuse Cybook Muse HD Etui de protection bleu pour lis... Etui de protection noir pour lis... Etui de protection rose pour lis... Etui de protection orange liseus...,null,null,null,null])

I try to insert this with the following catalog:

scala> cat
res0: String =
{
"table":{"namespace":"default", "name":"offDen3m"},
"rowkey":"key",
"columns":{
"offer_id":{"cf":"rowkey", "col":"key", "type":"int"},
"offer_label":{"cf":"cf1", "col":"col1", "type":"string"},
"universe":{"cf":"cf2", "col":"col2", "type":"string"},
"category":{"cf":"cf3", "col":"col3", "type":"string"},
"sub_category":{"cf":"cf4", "col":"col4", "type":"string"},
"sub_label":{"cf":"cf5", "col":"col5", "type":"string"}
}
}

When I try to insert with the following code:

newDf.write.options( Map(HBaseTableCatalog.tableCatalog -> cat, HBaseTableCatalog.newTable -> "5")) .format("org.apache.spark.sql.execution.datasources.hbase") .save()

And I obtain the following stack:

17/01/03 10:36:42 INFO BlockManagerInfo: Removed broadcast_2_piece0 on 149.202.161.158:37691 in memory (size: 6.4 KB, free: 511.1 MB)
java.lang.NoSuchMethodError: scala.runtime.IntRef.create(I)Lscala/runtime/IntRef;

at org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog.initRowKey(HBaseTableCatalog.scala:142)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog.(HBaseTableCatalog.scala:152)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:209)
at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:163)
at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:58)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)

My question is double:

  1. Is is possible to insert a org.apache.spark.sql.Dataframe[org.apache.spark.sql.Row] using a shc and a catalog ?
  2. Given my current catalog, is it suppose to work ?

Thank you very much for helping

Namespace in the catalog is not handled correctly by SHC

Context

I am writing a little tool to move some data from Hive to HBase and I used SHC in this context.
I finished coding the tool (it works nicely !!) and am taking care now of the details.

Problem

SHC does not seem to take the namespace I give him into account.

Step to reproduce

I created a small Hive table:
case class AgeAndName(age:Int, name:String)
val myseq = 1 to 30000 map(x => AgeAndName(x, s"Name$x is a good name "))
val myDf = sc.parallelize(myseq).toDF
myDf.write.saveAsTable("person.ageAndName")
(this work provided the DB person exists in Hive).

While processing I am turing the case class to an Avro record and take the age as an id for my Rowkey in HBase (the consistency of the example is not relevant here;)).

When inserting the data, I am providing Option as Map[String,String] and inside it the catalog. Here are the logs of YARN that give me info about their content:

17/02/21 14:00:46 INFO SHCHelper$: Generated catalog: {
"table":{"namespace":"person", "name":"ageAndName", "tableCoder":"PrimitiveType"},
"rowkey":"age",
"columns":{
"age":
{"cf":"rowkey", "col":"age", "type":"int"} ,
"record":{"cf":"record", "col":"record", "type":"binary"}
}
}

And these are the logged option passed to the writer:
17/02/21 14:00:48 INFO Hive2Hbase$: This options are passed to the writer: namespace -> person,catalog -> {
"table":{"namespace":"person", "name":"ageAndName", "tableCoder":"PrimitiveType"},
"rowkey":"age",
"columns":{
"age":
{"cf":"rowkey", "col":"age", "type":"int"} ,
"record":{"cf":"record", "col":"record", "type":"binary"}
}
},newtable -> 5

The writing works nicely with SHC but when using hbase shell to check the result here I what I can see:
hbase(main):072:0* list_namespace
NAMESPACE
default
hbase
person
3 row(s) in 0.0180 seconds

hbase(main):073:0> list_namespace_tables 'person'
TABLE
0 row(s) in 0.0110 seconds

hbase(main):074:0> list_namespace_tables 'default'
TABLE
ageAndName
1 row(s) in 0.0110 seconds

  • I tried with already existing namespace in HBase
  • I tried with letting shc handling the creation of the namespace (is it suppose to be possible ?)
  • I tried without giving him the namespace in the Map[String,String] as option adn letting him find in the catalog.
    ... but it still is not working.

Expectation

I expected the namespace person to store the table ageAndName and not the default.
So how does it come that is not the case ?
I am working with this dependency:
libraryDependencies += "com.hortonworks" % "shc-core" % "1.0.1-1.6-s_2.10"

HBase connection problem ?

Hi,

I'm testing your connector on a HDP cluster and I have these errors :
16/06/19 18:26:40 WARN ClientCnxn: Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125)
16/06/19 18:26:40 INFO ClientCnxn: Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error)
16/06/19 18:26:40 WARN ClientCnxn: Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125)

I submit the spark program like this :

spark-submit --master yarn-client --class test.TestSHC --jars /usr/hdp/current/hbase-client/lib/htrace-core-3.1.0-incubating.jar,/usr/hdp/current/hbase-client/lib/hbase-client.jar,/usr/hdp/current/hbase-client/lib/hbase-common.jar,/usr/hdp/current/hbase-client/lib/hbase-server.jar,/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar,/usr/hdp/current/hbase-client/lib/hbase-protocol.jar,/usr/hdp/current/hbase-client/lib/htrace-core-3.1.0-incubating.jar,/home/cyrille/lib/shc-0.0.11-1.6.1-s_2.10.jar --files /usr/hdp/current/hbase-client/conf/core-site.xml,/usr/hdp/current/hbase-client/conf/hbase-site.xml --num-executors 3 spark-1.0.0-SNAPSHOT.jar

Could you help me to solve this problem ?

Thanks

remote HBase cluster

Hi

How can I use shc to connect to remote HBase cluster? Where I can specify zookeeper and master?

Best

Mikolaj Habdank

NullPointer Exception when running the most basic example (HBaseSource) on HDP-2.5 Sandbox

Hello,
I am experimenting a NullPointerException running the basice HBaseSource example on the HDP-2.5 Sandbox.

I build an assembly and here is my submit:

/usr/hdp/current/spark-client/bin/spark-submit --driver-memory 1024m --class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource --master yarn --deploy-mode client --executor-memory 512m --num-executors 4 --files /usr/hdp/current/hbase-master/conf/hbase-site.xml /root/affinytix/tunnel/affinytix-test-tunnel-assembly-1.0.0-SNAPSHOT.jar |& tee /tmp/test-kafka-sparkstreaming.log

And here is the stack (it seems to occur while saving = first phase of the demo populating the table):

16/10/02 08:39:37 INFO ZooKeeperRegistry: ClusterId read in ZooKeeper is null Exception in thread "main" java.lang.RuntimeException: java.lang.NullPointerException at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208) at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320) at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295) at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160) at org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:155) at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821) at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:193) at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:89) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.isTableAvailable(ConnectionManager.java:985) at org.apache.hadoop.hbase.client.HBaseAdmin.isTableAvailable(HBaseAdmin.java:1399) at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.createTable(HBaseRelation.scala:87) at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:58) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) at org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource$.main(HBaseSource.scala:90) at org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource.main(HBaseSource.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.NullPointerException at org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher.getMetaReplicaNodes(ZooKeeperWatcher.java:395) at org.apache.hadoop.hbase.zookeeper.MetaTableLocator.blockUntilAvailable(MetaTableLocator.java:553) at org.apache.hadoop.hbase.client.ZooKeeperRegistry.getMetaRegionLocation(ZooKeeperRegistry.java:61) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateMeta(ConnectionManager.java:1185) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1152) at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300) at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:151) at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200) ... 24 more
Any idea about the cause ?
I have consulted the NullPointer from the first issues but both hbase-site.xml and yarn client mode are adopted to execute the example so I don't really understand.

Thanks for helping

"or" function performance

Niket's 48cffbe brings less time when doing 'In' filter, that could be measured by using "IN filter stack overflow" test case.

Need to figure out why "or" function is taking so long, and then update the implementation of 'In' filter.

support for spark 1.5

can you add support for 1.5, or is 1.6 backwards compatible to spark 1.5? a package doesn't seem to be available for 1.5.

unsupported data type FloatType

I'm trying to load data from one dataframe and write it to HBase. Whenever it tries to write it chokes on converting the types. Do I have to extract it all to case classes? I would much rather just use the Row types that come from Spark SQL.

This is using Spark 1.6 and the latest tagged release of shc.

HBase version requirement

I was trying out the connector to work with HBase 1.0.0 and it fails with

ERROR yarn.ApplicationMaster: User class threw exception: java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/util/DataTypeParser$
java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/util/DataTypeParser$

Could you please specify the minimum HBase requirement for the project?

Update Readme for Kerberized HBase Cluster

Hi,
I tried to run a Spark job to read from/write to HBase in a Horton Works cluster securized by Kerberos and passing the hbase-site.xml with --files never worked for me.
As described in https://community.hortonworks.com/content/supportkb/48988/how-to-run-spark-job-to-interact-with-secured-hbas.html (point 2), the only solution which worked was to copy the hbase-site.xml directly in the Spark conf directory of our Edge node (/etc/spark/conf).
Maybe I'm wrong and it is cluster dependant, but might be good to suggest this solution in the Readme. I could do a PR if needed.

Regards,

Getting java.lang.IllegalArgumentException: offset (0) + length (8) exceed the capacity of the array: 4

I am getting following error when using bigint, long or double datatypes. It runs if I use string. Also document says it supports Java primitive types but the examples have bigint, tinyint, smallint which are not java types.

Caused by: java.lang.IllegalArgumentException: offset (0) + length (8) exceed the capacity of the array: 4
at org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:631)
at org.apache.hadoop.hbase.util.Bytes.toLong(Bytes.java:605)
at org.apache.hadoop.hbase.util.Bytes.toDouble(Bytes.java:729)
at org.apache.spark.sql.execution.datasources.hbase.Utils$.hbaseFieldToScalaType(Utils.scala:51)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$4.apply(HBaseTableScan.scala:123)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$4.apply(HBaseTableScan.scala:114)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:74)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD.buildRow(HBaseTableScan.scala:114)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anon$3.next(HBaseTableScan.scala:205)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anon$3.next(HBaseTableScan.scala:186)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

Unable to save data at HBase

Hello, Thank you for nice HBase on Spark SQL Package.

I am currently facing certain challenges, when writing / reading to HBase from Spark.

Hadoop 2.7.3
Spark 2.0.1
Hbase 1.2.4
Hive 2.0.1 with MySql as a Metastore

Code:

$SPARK_HOME/bin/spark-shell --packages zhzhan:shc:0.0.11-1.6.1-s_2.10 --files /usr/local/spark/conf/hbase-site.xml

Where hbase-site.xml content:

hbase.rootdir file:///home/hduser/hbase

import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.execution.datasources.hbase._

def empcatalog = s"""{
|"table":{"namespace":"empschema", "name":"emp"},
|"rowkey":"key",
|"columns":{
|"empNumber":{"cf":"rowkey", "col":"key", "type":"string"},
|"city":{"cf":"personal data", "col":"city", "type":"string"},
|"empName":{"cf":"personal data", "col":"name", "type":"string"},
|"jobDesignation":{"cf":"professional data", "col":"designation", "type":"string"},
|"salary":{"cf":"professional data", "col":"salary", "type":"string"}
|}
|}""".stripMargin

case class HBaseRecordEmp(
empNumber:String,
city:String,
empName:String,
jobDesignation:String,
salary:String)

val data = (4 to 10).map { i => {
val name = s"""Bobby${"%03d".format(i)}"""
HBaseRecordEmp(i.toString,
s"MyCity",
name,
"worker",
"5000")
}
}

sc.parallelize(data).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> empcatalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()

ERROR:

  1. 16/12/24 12:57:51 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
    16/12/24 12:57:52 WARN metastore.ObjectStore: Failed to get database default, returning NoSuchObjectException
    16/12/24 12:57:58 ERROR metastore.RetryingHMSHandler: AlreadyExistsException(message:Database default already exists)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_database(HiveMetaStore.java:891)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:107)
    at com.sun.proxy.$Proxy19.create_database(Unknown Source)

  2. java.lang.NoClassDefFoundError: org/apache/spark/Logging
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.getDeclaredConstructors0(Native Method)
    at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
    at java.lang.Class.getConstructor0(Class.java:3075)
    at java.lang.Class.newInstance(Class.java:412)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:455)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
    ... 52 elided
    Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 69 more

Connection Leaks in TableOutFormat

TableOutputFormat creates a connection instance and does not clear it. Here is the hbase bug for it: https://issues.apache.org/jira/browse/HBASE-16017.

With Spark 1.6.1, By default a ddataframe is divided into 200 partitions and saveAsHadoopDataset internally creates a connection for each partition So with every write there are 200 unclosed connections in memory. And over some time, the number of open connections reaches the limit zookeeper can handle and it starts tripping. Here is the code eveidence:

public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Put> {
  public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";

  public TableOutputFormat() {
  }

  public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException {
    TableName tableName = TableName.valueOf(job.get("hbase.mapred.outputtable"));
    BufferedMutator mutator = null;
    Connection connection = ConnectionFactory.createConnection(job);
    mutator = connection.getBufferedMutator(tableName);
    return new TableOutputFormat.TableRecordWriter(mutator);
  }

  public void checkOutputSpecs(FileSystem ignored, JobConf job) throws FileAlreadyExistsException, InvalidJobConfException, IOException {
    String tableName = job.get("hbase.mapred.outputtable");
    if(tableName == null) {
      throw new IOException("Must specify table name");
    }
  }

  protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> {
    private BufferedMutator m_mutator;

    public TableRecordWriter(BufferedMutator mutator) throws IOException {
      this.m_mutator = mutator;
    }

    public void close(Reporter reporter) throws IOException {
      this.m_mutator.close();
    }

    public void write(ImmutableBytesWritable key, Put value) throws IOException {
      this.m_mutator.mutate(new Put(value));
    }
  }
}

Connection instance is not closed which is causing this issue.

Phoenix coder improvements

  1. Make Phoenix ‘see' the tables created by SHC without recreating Phoenix tables or views.
    To support this, SHC needs to create the metadata table SYSTEM.CATLOG or insert the metadata into it.
  2. Create empty cell for each row just like Phoenix does.

Not able to create a table in hbase

Hi,

I am not able to create a table as it is not able to connect to my cluster using zookeeper.

16/08/08 12:30:13 INFO ClientCnxn: Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error)
16/08/08 12:30:13 WARN RecoverableZooKeeper: Possibly transient ZooKeeper, quorum=localhost:2181, exception=org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase/hbaseid
16/08/08 12:30:13 WARN ClientCnxn: Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)

Could you please tell me where can I put/configure my hbase-site.xml and core-site.xml. Currently I have placed it in the resource folder of my project.

Thanks

Pyspark Filter support

I have been following this SO Post to send data to HBASE using shc in pyspark.

However, now i need to read the data back and would like to filter by timerange.

I was wondering, is it possible to use filters in shc with PySpark?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.