Giter Site home page Giter Site logo

dibbhatt / kafka-spark-consumer Goto Github PK

View Code? Open in Web Editor NEW
629.0 70.0 320.0 350 KB

High Performance Kafka Connector for Spark Streaming.Supports Multi Topic Fetch, Kafka Security. Reliable offset management in Zookeeper. No Data-loss. No dependency on HDFS and WAL. In-built PID rate controller. Support Message Handler . Offset Lag checker.

License: Apache License 2.0

Java 100.00%

kafka-spark-consumer's Introduction

README file for Kafka-Spark-Consumer

NOTE : This Kafka Spark Consumer code is taken from Kafka spout of the Apache Storm project (https://github.com/apache/storm/tree/master/external/storm-kafka), which was originally created by wurstmeister (https://github.com/wurstmeister/storm-kafka-0.8-plus). Original Storm Kafka Spout Code has been modified to work with Spark Streaming.

This utility will help to pull messages from Kafka using Spark Streaming and have better handling of the Kafka Offsets and handle failures.

This Consumer have implemented a Custom Reliable Receiver which uses Kafka Consumer API to fetch messages from Kafka and store every received block in Spark BlockManager. The logic will automatically detect number of partitions for a topic and spawn as many Kafka Receiver based on configured number of Receivers. Each Receiver can fetch messages from one or more Kafka Partitions.
e.g. if Kafka have 100 partitions of a Topic, and Spark Consumer if configured with 20 Receivers, each Receiver will handle 5 partition.

This consumer can commit the offsets of processed batch , once Spark Streaming batch is completed.

In Spark driver code , Receivers is launched by calling ReceiverLauncher.launch

Please see Java or Scala code example on how to use this Low Level Consumer

Kafka-Spark-Consumer Version Compatibility

Version 2.1.0 : Spark verison 2.x and above. Kafka version 0.10 and above. Support for handling Multi Topic .

Version 2.0.0 : Spark verison 2.x and above. Kafka version 0.10 and above. Support for Kafka Security . Used New Kafka Consumer API

Version 1.0.9 : Spark Version earlier to 2.0 ( i.e. 1.6 and prior). All Kafka version ( 0.8.x, 0.9.x, 0.10.x, 0.11.x). No support for Kafka Security. Used Low Level SimpleConsumer API

Salient Feature of Kafka-Spark-Consumer

  • User latest Kafka Consumer API. Support for Kafka Security
  • Support for consuming from multiple topics
  • Zookeeper for storing the offset for each Kafka partition, which will help to recover in case of failure
  • Spark streaming job using this Consumer does not require WAL for recovery from Driver or Executor failures. As this consumer has capability to store the processed offset after every Batch interval, in case of any failure, Consumer can start from the correct offset from the last Processed offset.
  • This Consumer has implemented PID (Proportional , Integral , Derivative ) based Rate Controller for controlling Back-Pressure.
  • This consumer have capability to use Message Interceptor which can be used to preprocess kafka messages before writing to Spark Block Manager
  • Support for Consumer Lag Checker (like ConsumerOffsetChecker) tool to find Consumer Lag

What is Different from Spark Out of Box Kafka Consumers

  • This Consumer is Receiver based fault tolerant reliable consumer . This Receiver is designed to recover from any underlying failure and does not require WAL feature in case of Driver failure. Please refer to Consumer Recovery from Driver/Executor Crash section below for more details.

  • This Consumer have mechanism to create Block from Kafka Stream and write to Spark BlockManager ( See more details in Consumer Tuning Options section below ).

  • This Consumer has in-built PID (Proportional, Integral, Derivative ) Controller to control the Spark Back Pressure . The PID Controller rate feedback loop mechanism is built using Zookeeper. The logic to control Back Pressure is by altering numer of the messaged consumed during every poll. Please refer Spark Consumer Properties section on how to enable back pressure. Also see Consumer Tuning Options section on how to tune PID Controller.

  • Number of partitions in RDD generated by this consumer is decoupled from the number of Kafka partitions. One can control the RDD partitions by controlling the Block creation interval and Block Size. Let assume you have Kafka Topic with 10 Partition. And your Block Interval is 200 Ms and Batch Interval is 5 Sec. This Consumer will generate 5 Blocks every second (1 second / Block Interval ) for each Partitions , and 5 x 10 x 5 = 250 Blocks for every Batch. As every block written to Spark BlockManager within Batch interval creates one Partition for underlying RDD , which mean every RDD created per batch will have 250 Partitions and this will increase processing parallelism. Whereas , if RDD partition is same as Kafka partition , every RDD will only have 10 partitions (same as kafka topic partition) and limit your processing parallelism.

  • Consumer will enable end to end No Data Loss guarantee without support for Spark WAL feature. Refer to Consumer Recovery from Driver/Executor Crash section for more details.

Instructions for Manual build

git clone https://github.com/dibbhatt/kafka-spark-consumer

cd kafka-spark-consumer

mvn install

And Use Below Dependency in your Maven

    <dependency>
            <groupId>dibbhatt</groupId>
            <artifactId>kafka-spark-consumer</artifactId>
            <version>2.1.0</version>
    </dependency>

Accessing from Spark Packages

This Consumer is now part of Spark Packages : http://spark-packages.org/package/dibbhatt/kafka-spark-consumer

Include this package in your Spark Applications using:

  • spark-shell, pyspark, or spark-submit $SPARK_HOME/bin/spark-shell --packages dibbhatt:kafka-spark-consumer:2.1.0
  • sbt

If you use the sbt-spark-package plugin, in your sbt build file, add:

spDependencies += "dibbhatt/kafka-spark-consumer:2.1.0"

Otherwise,

resolvers += "Spark Packages Repo" at "http://dl.bintray.com/spark-packages/maven"
          
libraryDependencies += "dibbhatt" % "kafka-spark-consumer" % "2.1.0"
  • Maven

In your pom.xml, add:

<dependencies>
  <!-- list of dependencies -->
  <dependency>
    <groupId>dibbhatt</groupId>
    <artifactId>kafka-spark-consumer</artifactId>
    <version>2.1.0</version>
  </dependency>
</dependencies>
<repositories>
  <!-- list of other repositories -->
  <repository>
    <id>SparkPackagesRepo</id>
    <url>http://dl.bintray.com/spark-packages/maven</url>
  </repository>
</repositories>

Running with different Spark and Kafka

This consumer supports all Kafka versions 0.8, 0.9, 0.10 and 0.11. And work with all Spark versions. One need to include respective Spark and Kafka versions in application pom.xml. Please refer to version compatibility section above

e.g. Below example to include dependency for Spark 2.2.0 and Kafka 0.11.0

<properties>
  <spark.version>2.2.0</spark.version>
  <kafka.version>1.1.0</kafka.version>
</properties>

<dependencies>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>${kafka.version}</version>
    </dependency>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.11.0</version>
    </dependency>
</dependencies>

Spark Consumer Properties

These are the Consumer Properties need to be used in your Driver Code. ( See Java and Scala Code example on how to use these properties)

  • ZK quorum details of Kafka Cluster.
    • zookeeper.hosts=host1,host2
  • Kafka ZK Port
    • zookeeper.port=2181
  • Kafka Topic to consume (comma separated list of Kafka Topics for Multi topic fetch)
    • kafka.topic=topicA
  • Kafka Consumer ID. Identifier of the Consumer
    • kafka.consumer.id=consumer-id
  • Kafka Bootstrap Servers.
    • bootstrap.servers=x.x.x.x:9092
  • OPTIONAL - Force From Start . Default Consumer Starts from Latest offset.
    • consumer.forcefromstart=true
  • OPTIONAL - Maximum messages fetched in one Poll. Default 500
    • max.poll.records=100
  • OPTIONAL - Fill Frequence in MS . Default 1 Second
    • consumer.fillfreqms=500
  • OPTIONAL - Consumer Back Pressure Support. Default is true
    • consumer.backpressure.enabled=false
  • OPTIONAL - This can further control RDD Partitions. Number of Blocks fetched from Kafka to merge before writing to Spark Block Manager. Default is 1
    • consumer.num_fetch_to_buffer=10
  • OPTIONAL - ZK used for consumer offset commit. It can be different ZK cluster than what used for Kafka.
    • zookeeper.consumer.connection=host1:2181,host2:2181

Java Example

Properties props = new Properties();
props.put("zookeeper.hosts", "x.x.x.x");
props.put("zookeeper.port", "2181");
props.put("kafka.topic", "topicA");
props.put("kafka.consumer.id", "kafka-consumer");
props.put("bootstrap.servers", "x.x.x.x:9092");
// Optional Properties
props.put("max.poll.records", "250");
props.put("consumer.fillfreqms", "1000");

SparkConf _sparkConf = new SparkConf();
JavaStreamingContext jsc = new JavaStreamingContext(_sparkConf, Durations.seconds(30));
// Specify number of Receivers you need.
int numberOfReceivers = 3;

JavaDStream<MessageAndMetadata<byte[]>> unionStreams = ReceiverLauncher.launch(
    jsc, props, numberOfReceivers, StorageLevel.MEMORY_ONLY());

unionStreams.foreachRDD(new VoidFunction<JavaRDD<MessageAndMetadata<byte[]>>>() {
  @Override
  public void call(JavaRDD<MessageAndMetadata<byte[]>> rdd) throws Exception {
    //Start Application Logic
    rdd.foreachPartition(new VoidFunction<Iterator<MessageAndMetadata<byte[]>>>() {
        @Override
        public void call(Iterator<MessageAndMetadata<byte[]>> mmItr) throws Exception {
            while(mmItr.hasNext()) {
                MessageAndMetadata<byte[]> mm = mmItr.next();
                byte[] key = mm.getKey();
                byte[] value = mm.getPayload();
                if(key != null)
                    System.out.println(" key :" + new String(key));
                if(value != null)
                    System.out.println(" Value :" + new String(value));
            }
        }
    });
    //End Application Logic
    //commit offset
    ProcessedOffsetManager.persistsPartition(rdd, props);
  }
});

try {
  jsc.start();
  jsc.awaitTermination();
}catch (Exception ex ) {
  jsc.ssc().sc().cancelAllJobs();
  jsc.stop(true, false);
  System.exit(-1);
}

Complete example is available here :

The src/main/java/consumer/kafka/client/SampleConsumer.java is the sample Java code which uses this ReceiverLauncher to generate DStreams from Kafka and apply a Output operation for every messages of the RDD.

Scala Example

val conf = new SparkConf()
  .setMaster("spark://x.x.x.x:7077")
  .setAppName("LowLevelKafkaConsumer")
  .set("spark.executor.memory", "1g")
  .set("spark.rdd.compress","true")
  .set("spark.storage.memoryFraction", "1")
  .set("spark.streaming.unpersist", "true")

val sc = new SparkContext(conf)

//Might want to uncomment and add the jars if you are running on standalone mode.
sc.addJar("/home/kafka-spark-consumer/target/kafka-spark-consumer-2.1.0-jar-with-dependencies.jar")
val ssc = new StreamingContext(sc, Seconds(10))

val topic = "topicA"
val zkhosts = "x.x.x.x"
val zkports = "2181"

//Specify number of Receivers you need. 
val numberOfReceivers = 1
val kafkaProperties: Map[String, String] = 
Map("zookeeper.hosts" -> zkhosts,
    "zookeeper.port" -> zkports,
    "kafka.topic" -> topic,
    "kafka.consumer.id" -> "kafka-consumer",
    "bootstrap.servers" - > "9092"
    //optional properties
    "max.poll.records" -> "250",
    "consumer.fillfreqms" -> "1000"

val props = new java.util.Properties()
kafkaProperties foreach { case (key,value) => props.put(key, value)}

val unionStreams = ReceiverLauncher.launch(ssc, props, numberOfReceivers,StorageLevel.MEMORY_ONLY)

unionStreams.foreachRDD(rdd => {
    //Start Application Logic e.g. rdd.foreachPartition
    println("\n\nNumber of records in this batch : " + rdd.count())
    //End Application Logic

    //Persists the Max Offset of given Kafka Partition to ZK
    ProcessedOffsetManager.persistsPartition(rdd, props)
} )

try{
  ssc.start()
  ssc.awaitTermination()
} catch {
  case e: Exception => {
    sc.cancelAllJobs()
    ssc.stop(stopSparkContext = true, stopGracefully = false)
    System.exit(-1)
  }
}

Complete example is available here :

examples/scala/LowLevelKafkaConsumer.scala is a sample scala code on how to use this utility.

Handling Multi Topic

Add the multiple topic in kafka.topic property

props.put("kafka.topic", "topicA,topicB,topicC");

Sample Multi Topic Fetch example. Code is exactly same as Single topic Fetch.

Properties props = new Properties();
props.put("zookeeper.hosts", "zkhost");
props.put("zookeeper.port", "2181");
props.put("kafka.topic", "topicA,topicB,topicC");
props.put("kafka.consumer.id", "kafka-consumer");
// Optional Properties
props.put("zookeeper.broker.path", "/brokers");
props.put("zookeeper.consumer.path", "/consumers");
props.put("consumer.forcefromstart", "false");
props.put("max.poll.records", "10");
props.put("consumer.fillfreqms", "500");
props.put("consumer.backpressure.enabled", "true");
//Kafka properties
props.put("bootstrap.servers", "x.x.x.x:9092");

SparkConf _sparkConf = new SparkConf();
JavaStreamingContext jsc = new JavaStreamingContext(_sparkConf, Durations.seconds(30));
//Specify number of Receivers you need.
int numberOfReceivers = 6;

JavaDStream<MessageAndMetadata<byte[]>> unionStreams = ReceiverLauncher.launch(
    jsc, props, numberOfReceivers, StorageLevel.MEMORY_ONLY());

unionStreams.foreachRDD(new VoidFunction<JavaRDD<MessageAndMetadata<byte[]>>>() {
  @Override
  public void call(JavaRDD<MessageAndMetadata<byte[]>> rdd) throws Exception {
    //Start Application Logic
    rdd.foreachPartition(new VoidFunction<Iterator<MessageAndMetadata<byte[]>>>() {
        @Override
        public void call(Iterator<MessageAndMetadata<byte[]>> mmItr) throws Exception {
            int countTopicA = 0;
            int countTopicB = 0;
            int countTopicC = 0;
            while(mmItr.hasNext()) {
                MessageAndMetadata<byte[]> mm = mmItr.next();
                if(mm.getTopic().equals("topicA")) {
                    countTopicA++;
                }
                else if (mm.getTopic().equals("topicB")) {
                    countTopicB++;
                }
                else if (mm.getTopic().equals("topicC")) {
                    countTopicC++;
                }
            }
            System.out.println("topicA count " + countTopicA);
            System.out.println("topicB count " + countTopicB);
            System.out.println("topicC count " + countTopicC);
        }
    });
    System.out.println("RDD count " + rdd.count());
    //End Application Logic
    //commit offset
    System.out.println("Commiting Offset");
    ProcessedOffsetManager.persistsPartition(rdd, props);
  }
});

Offset Commit Mechanism

if you see client/SampleConsumer.java or examples/scala/LowLevelKafkaConsumer.scala , you need to add couple of lines (marked as 1) in you Driver Code

For Java

//Get the stream
JavaDStream<MessageAndMetadata<byte[]>> unionStreams = ReceiverLauncher.launch(
    jsc, props, numberOfReceivers, StorageLevel.MEMORY_ONLY());

unionStreams.foreachRDD(new VoidFunction<JavaRDD<MessageAndMetadata<byte[]>>>() {
  @Override
  public void call(JavaRDD<MessageAndMetadata<byte[]>> rdd) throws Exception {
      //Start Application Logic
      System.out.println("Number of records in this batch : " + rdd.count());
      //rdd.foreachPartition { ..}
      //End Application Logic

      //**1** commit offset
      ProcessedOffsetManager.persistsPartition(rdd, props);
  }
});

For Scala

//Get the Stream 
val unionStreams = ReceiverLauncher.launch(ssc, props, numberOfReceivers,StorageLevel.MEMORY_ONLY)

unionStreams.foreachRDD(rdd => {
    //Start Application Logic e.g. rdd.foreachPartition {..}
     println("\n\nNumber of records in this batch : " + rdd.count())
    //End Application Logic

    //**1** Commit offset
    ProcessedOffsetManager.persistsPartition(rdd, props)
} )

Kafka Security

this consumer supports Kafka Security. One just need to add necessary kafka security properties to pull messages from Secured kafka cluster. e.g.

props.put("bootstrap.servers", "x.x.x.x:9093");
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location","kafka.server.truststore.jks");
props.put("ssl.truststore.password", "test1234");

This consumer by default using below properties, which can not be changed. If you want diffrent key/value.deserializer, you can do it either in Spark Job or using the Custom Message Hanlder concept mentioned below.

props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "none");

Consumer Recovery from Driver/Executor Crash without WAL

Please refer to this blog which explains why WAL is needed for Zero Data Loss.

https://databricks.com/blog/2015/01/15/improved-driver-fault-tolerance-and-zero-data-loss-in-spark-streaming.html

Primary reason for WAL is , Receiver commit offset of consumed block to ZK after same is written to Spark BlockManager ( and may be replicated). If blocks which are already consumed and committed is not processed , Receiver can have data loss. This is because of how Spark applications operate in a distributed manner. When the driver process fails, all the executors running in a standalone/yarn/mesos cluster are killed as well, along with any data in their memory. In case of Spark Streaming, all the data received from sources like Kafka are buffered in the memory of the executors until their processing has completed. This buffered data cannot be recovered even if the driver is restarted.

Hence there is a need for the WAL feature to recover already Received (but not processed) blocks from WAL written to persistence store like HDFS.

But this Consumer has a different mechanism for Driver failure. Consumer maintains the offset of the processed blocks. Which mean, this consumer can commit offsets of the already processed blocks to ZK, and in case of Driver failures , it can start from the offset next to the last processed offset ( instead last consumed offset) for every Kafka partition. Thus Consumer can start from exact same offset since the last successful batch was processed and hence No data loss.

How This Works

Receiver receives a Block of Data equal to configurable FetchSize from Kafka every FillFrequency. A given Block fetched from Kafka may contain many messages. Every Receiver spawns dedicated thread for every Kafka partition. e.g. if there is 5 Receiver for 20 Kafka Partition, each Receiver will spawn 4 threads. Each thread fetch messages from one and only one Kafka psartition and writes Blocks of data to Spark BlockManager.

Every write to Spark Block Manager creates one Partition for underlying RDD. Say for given Batch Interval if there are N blocks written by all Receiving Threads, the RDD for that batch will have N Partition .

Receiver can write One Block of data pulled from Kafka during every Fetch, or can merge multiple Fetches together . This can be used to further control the RDD partitions. This is controlled by consumer.num_fetch_to_buffer property ( default is 1). Receiver wraps every messages of a given Block with some additional MetaData like message offset and kafka Partition ID.

As one Receiver thread can write multiple blocks to BlockManager, it need to find highest offset for every RDD Partition which belongs to same Kafka partition , and repeat the same for all Kafka partition . Finally it can find the highest offset for a Kafka partition amongst all RDD partition. e.g. , if RDD Partition 4, 8 and 12 are generated by Receiver Thread X for Kafka Partition Y , and highest offset for 4 is 100, 8 is 400 and 12 is 800; then highest offset for Kafka Partition Y for this Batch is 800.

This Consumer perform very simple Map Reduce logic to get the highest offset for every Kafka partitions belongs to a given RDD for a Batch. This <Partition, Offset> tuple is written back to ZK as already processed offset after the Batch completes.

This require few lines to be added in Spark Driver Code to avail this feature. Please refer Offset Commit Mechanism section.

Consumer Tuning Options

Batch Size Tuning :

This consumer Polls Kafka every consumer.fillfreqms and during every Poll , it tries to fetch max.poll.records messages.

Hecne let assume your Batch Duration is 30 seconds, and you have 10 Kafka Partitions. and max.poll.records is 50 , Total messages consumer can fetch is Total = (BatchDuration/FillFrequency) * MaxPollRecords * Number of Kafka Partitions

e.g. for above example it can fetch 30 * 10 * 50 = 15000 messages every batch.

If you need higher rate, you can increase the max.poll.records property , or you can increase poll frequency using by lowering consumer.fillfreqms property.

These two parameter need to be carefully tuned keeping in mind your downstream processing rate and your memory settings.

Back-Pressure Rate Tuning

You can enable the BackPressure mechanism by setting consumer.backpressure.enabled to "true" in Properties used for ReceiverLauncher

The Default PID settings is as below.

  • Proportional = 1.0
  • Integral = 0.0
  • Derivative = 0.0

If you increase any or all of these , your damping factor will be higher. So if you want to lower the Consumer rate than what is being calculated with default PID settings , you can increase these values.

You can control the PID values by settings the Properties below.

  • consumer.backpressure.proportional
  • consumer.backpressure.integral
  • consumer.backpressure.derivative

Custom Message Handler

This Cosnumer support writing custom message handler which can pre-process every consumed messages from Kafka before writing to Spark Block Manager. This is can be used for Filter/ Map type of logic which can be applied to every message.

To implement Custom MessageHandler, one need to extend

public abstract class KafkaMessageHandler implements Cloneable, Serializable

and provide implementation of protected abstract E process(byte[] payload)

Default MessageHandler is IdentityMessageHandler which is just pass through of exact same fetched byte[] from Kafka

public class IdentityMessageHandler extends KafkaMessageHandler<byte[]> {
    @Override
    protected byte[] process(byte[] payload) {
        return payload;
    }
}

E.g. one can implement MyMessageHandler which will transform consumed byte[] from Kafka to some type E

java example

public class MyMessageHandler extends KafkaMessageHandler<E> {
    @Override
    protected E process(byte[] payload) {
        //do something
        //return object of type E
    }
}

MyMessageHandler myHandler = new MyMessageHandler();

JavaDStream<MessageAndMetadata<T>> unionStreams = ReceiverLauncher.launch(
    jsc, props, numberOfReceivers, StorageLevel.MEMORY_ONLY(), myHandler);

scala example

import consumer.kafka.KafkaMessageHandler

object MyMessageHandler extends KafkaMessageHandler[Array[Byte]] {

    override def process(payload: Array[Byte]): Array[Byte] = {
        val s = new String(payload)
        if (!s.contains("test")) return s.getBytes
        else return "NA".getBytes
    }
}
val unionStreams = ReceiverLauncher.launch(ssc, props, numberOfReceivers,StorageLevel.MEMORY_ONLY, MyMessageHandler)
val stream = unionStreams.map(x => { val s = new String(x.getPayload); s })

Consumer Offset Checker

You can use standrad OffsetChecker utility from Kafka .

For this to work , Zookeeper Consumer Path Property should be zookeeper.consumer.path=/consumers

This is set by default in this version thus even if you do not specify this property, offset checker will be anyway enabled.

One can run the offset checker like this.

kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --broker-info x.x.x.x:9092 --group <kafka.consumer.id> --topic <kafka.topic> --zookeeper y.y.y.y:2181

With new Kafka API, command is

bin/kafka-consumer-groups --describe --group <kafka.consumer.id> --zookeeper y.y.y.y:2181

Running Spark Kafka Consumer

Let assume your Driver code is in xyz.jar which is built using the spark-kafka-consumer as dependency.

Launch this using spark-submit

./bin/spark-submit --class x.y.z.YourDriver --master spark://x.x.x.x:7077 --executor-memory 1G /<Path_To>/xyz.jar

This will start the Spark Receiver and Fetch Kafka Messages for every partition of the given topic and generates the DStream.

e.g. to Test Consumer provided in the package with your Kafka settings please modify it to point to your Kafka and use below command for spark submit. You may need to change the Spark-Version and Kafka-Version in pom.xml.

./bin/spark-submit --class consumer.kafka.client.SampleConsumer --master spark://x.x.x.x:7077 --executor-memory 1G /<Path_To>/kafka-spark-consumer-2.1.0-jar-with-dependencies.jar

kafka-spark-consumer's People

Contributors

dependabot[bot] avatar dibbhatt avatar jedisct1 avatar spring1843 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-spark-consumer's Issues

Not working with Spark 2.2.0

Since Spark has been updated to version 2.2.0, the following exception is thrown:

Exception in thread "SparkListenerBus" java.lang.AbstractMethodError
ERROR: org.apache.spark.util.Utils - uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.AbstractMethodError
at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:69)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:69)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:75)
at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:75)
at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36)
at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1279)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1279)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)

Doesn't seems to be working

I gave this project a try, i cloned the repo, build the project and used that jar inside my SparkStreaming project and used your KafkaReceiver to create dstream. Whenever i run it, it just crashes with my SparkContext being shutdown.

You can look at the code over here https://gist.github.com/akhld/a59a2369f0f1f5509af4

This is what happens when i run it in Standalone mode https://gist.github.com/akhld/b5627bf866721df7321b

This is what happens when i run it in local mode https://gist.github.com/akhld/36baccb2a866106315f6

org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/load/partitions

Hi,
I am trying to run the consumer , but getting following error. Any idea why i am getting this?

INFO : org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
Exception in thread "main" java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/load/partitions
at consumer.kafka.ReceiverLauncher.getNumPartitions(ReceiverLauncher.java:109)
at consumer.kafka.ReceiverLauncher.createStream(ReceiverLauncher.java:53)
at consumer.kafka.ReceiverLauncher.launch(ReceiverLauncher.java:36)
at consumer.kafka.client.Consumer.run(Consumer.java:72)
at consumer.kafka.client.Consumer.start(Consumer.java:43)
at consumer.kafka.client.Consumer.main(Consumer.java:95)
Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/load/partitions
at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1590)
at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:214)
at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:203)
at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.pathInForeground(GetChildrenBuilderImpl.java:199)
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:191)
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:38)
at consumer.kafka.ReceiverLauncher.getNumPartitions(ReceiverLauncher.java:105)
... 5 more

How to recover the failed receiver on a partition which has exception of " Offsets out of range with no configured reset policy for partitions:"

Hello Dibyendu,
When a partition met a exception of "Offsets out of range with no configured reset policy for partitions: ", then this partition no longer works, how to recover the receiver for the failed partition?
following is the details log message:

18/09/20 00:32:40 WARN PartitionManager: Got fetch request with offset out of range: 313303788 for Topic tr69data24 partition 19
18/09/20 00:32:40 WARN PartitionManager: Offset reset to LatestTime 313303788 for Topic tr69data24 partition 19
18/09/20 00:32:40 ERROR KafkaSparkConsumer: Partition 19 encountered error during createStream : Offsets out of range with no configured reset policy for partitions: {tr69data24-19=313303788}
consumer.kafka.OutOfRangeException: Offsets out of range with no configured reset policy for partitions: {tr69data24-19=313303788}
at consumer.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:75)
at consumer.kafka.PartitionManager.fetchMessages(PartitionManager.java:278)
at consumer.kafka.PartitionManager.fill(PartitionManager.java:195)
at consumer.kafka.PartitionManager.next(PartitionManager.java:134)
at consumer.kafka.KafkaSparkConsumer.createStream(KafkaSparkConsumer.java:96)
at consumer.kafka.KafkaSparkConsumer.run(KafkaSparkConsumer.java:118)
at java.lang.Thread.run(Thread.java:745)
18/09/20 00:32:40 INFO ZooKeeper: Session: 0x265abf0f5a496a0 closed
18/09/20 00:32:40 INFO ClientCnxn: EventThread shut down
consumer.kafka.OutOfRangeException: Offsets out of range with no configured reset policy for partitions: {tr69data24-19=313303788}
at consumer.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:75)
at consumer.kafka.PartitionManager.fetchMessages(PartitionManager.java:278)
at consumer.kafka.PartitionManager.fill(PartitionManager.java:195)
at consumer.kafka.PartitionManager.next(PartitionManager.java:134)
at consumer.kafka.KafkaSparkConsumer.createStream(KafkaSparkConsumer.java:96)
at consumer.kafka.KafkaSparkConsumer.run(KafkaSparkConsumer.java:118)
at java.lang.Thread.run(Thread.java:745)

18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 443585258 for Parittion 0
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 311429390 for Parittion 2
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 315225480 for Parittion 5
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 385262949 for Parittion 6
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 166297698 for Parittion 7
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 490824808 for Parittion 8
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 589901040 for Parittion 10
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 320172181 for Parittion 11
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 388380731 for Parittion 12
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 404124739 for Parittion 14
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 666967289 for Parittion 15
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 434333538 for Parittion 16
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 368144106 for Parittion 17
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 244265022 for Parittion 18
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 445694201 for Parittion 20

18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 205794190 for Parittion 21
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 420965741 for Parittion 22
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 338009268 for Parittion 23
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 389383488 for Parittion 24
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 444850876 for Parittion 26
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 254682257 for Parittion 28
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 296762823 for Parittion 29
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 280982673 for Parittion 30
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 148408705 for Parittion 31
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 345764987 for Parittion 32
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 198189174 for Parittion 34
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 153297639 for Parittion 35
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 345925489 for Parittion 36
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 353553758 for Parittion 37
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 293651243 for Parittion 38
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 457799405 for Parittion 40
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 352548943 for Parittion 42
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 373666232 for Parittion 43
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 450144063 for Parittion 44
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 200918074 for Parittion 45
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 55549751 for Parittion 46
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 266091161 for Parittion 47

Thanks,
Fred

Missing explanation why it has better kafka offsets/error handling

Hello,

The readme of this project mentions "This utility will help to pull messages from Kafka Cluster using Spark Streaming. The Kafka Consumer is Low Level Kafka Consumer ( SimpleConsumer) and have better handling of the Kafka Offsets and handle failures.", but I can not find any explanation what it does differently to provide this features.

I looked through the code and I can see that kafka offsets are committed to zookeeper after a Recevier.store() (instead of auto commit). This is the main feature or are there other reasons why it has better offset handling and failure handling?

Thanks

Not able to get streaming data

hi Dibbhatt,

I'm not able to figure out why am I not getting the content in my batch.
I have added an SOP message in foreachRDD().call method but it seems as if this method is not at all getting executed.
Following is my piece of code:

props.put("consumer.forcefromstart", "false");
props.put("consumer.fetchsizebytes", "524288");
props.put("consumer.fillfreqms", "2000");

    SparkConf _sparkConf = new SparkConf();
    _sparkConf.setAppName("KafkaReceiver");
    _sparkConf.setMaster("local[4]");
    _sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false");


            JavaStreamingContext jsc = new JavaStreamingContext(_sparkConf,
                    new Duration(100000));
            //Specify number of Receivers you need.
            int numberOfReceivers = 4;

            JavaDStream<MessageAndMetadata> unionStreams = ReceiverLauncher.launch(jsc, props, numberOfReceivers, StorageLevel.DISK_ONLY());

            unionStreams.foreachRDD(new Function2<JavaRDD<MessageAndMetadata>, Time, Void>() {
                /**
                 * 
                 */
                private static final long serialVersionUID = -5999013346771943994L;

                public Void call(JavaRDD<MessageAndMetadata> rdd,
                        Time time) throws Exception {
                    System.out.println(" methiod call rdd.collect()");
                    rdd.collect();
                    return null;
                }
            });

            jsc.start();
            jsc.awaitTermination();

I am new to Spark so not very comfortable with it,
please let me know when this call() method gets executed and do I need a spark cluster to execute this.

Great Thanks,

PartitionManager not writing Offset to Zookeeper if a record fetched from Kafka only contain one line.

PartitionManager does not write Offset to zookeeper in some condition
duplicated data will be fetched if program restart without writing offset to zookeeper

How to reproduce:
Fetch one line at a time from Kafka

version
kafka-spark-consumer : 1.0.6
spark-streaming_2.10 : 1.6.0

here is the log immediately after deploying the Spark program to YARN
notice that offset 6 is already duplicated
and PartitonManager cannot write offset 6/7 to zookeeper due to

if (_lastEnquedOffset > _lastComittedOffset) {

17/01/19 10:22:30 INFO kafka.PartitionManager: Read last commit offset from zookeeper: 6; old topology_id: test_test_kpi_reg_user - new consumer_id: test_test_kpi_reg_user
17/01/19 10:22:30 INFO kafka.PartitionManager: Starting Kafka cdh005.testdev.local:0 from offset 6
17/01/19 10:22:30 INFO kafka.PartitionManager: Total 1 messages from Kafka: cdh005.testdev.local:0 there in internal buffers
17/01/19 10:22:30 INFO kafka.PartitionManager: Store for topic topic_tid_9999_dev for partition 0 is : 6
17/01/19 10:22:31 INFO kafka.PartitionManager: LastComitted Offset : 6
17/01/19 10:22:31 INFO kafka.PartitionManager: New Emitted Offset : 7
17/01/19 10:22:31 INFO kafka.PartitionManager: Enqueued Offset :6
17/01/19 10:22:31 INFO kafka.PartitionManager: Last Enqueued offset 6 not incremented since previous Comitted Offset 6 for partition  Partition{host=cdh005.testdev.local:9092, partition=0} for Consumer test_test_kpi_reg_user. Some issue in Process!!

produce another record -> Write succeed

17/01/19 10:57:30 INFO kafka.PartitionManager: Total 1 messages from Kafka: cdh005.testdev.local:0 there in internal buffers
17/01/19 10:57:30 INFO kafka.PartitionManager: Store for topic topic_tid_9999_dev for partition 0 is : 7
17/01/19 10:57:33 INFO kafka.PartitionManager: LastComitted Offset : 6
17/01/19 10:57:33 INFO kafka.PartitionManager: New Emitted Offset : 8
17/01/19 10:57:33 INFO kafka.PartitionManager: Enqueued Offset :7
17/01/19 10:57:33 INFO kafka.PartitionManager: Committing offset for Partition{host=cdh005.testdev.local:9092, partition=0}
17/01/19 10:57:33 INFO kafka.PartitionManager: Wrote committed offset to ZK: 8
17/01/19 10:57:33 INFO kafka.PartitionManager: Committed offset 8 for Partition{host=cdh005.testdev.local:9092, partition=0} for consumer: test_test_kpi_reg_user

produce another record -> write failed again

17/01/19 10:58:40 INFO kafka.PartitionManager: Total 1 messages from Kafka: cdh005.testdev.local:0 there in internal buffers
17/01/19 10:58:40 INFO kafka.PartitionManager: Store for topic topic_tid_9999_dev for partition 0 is : 8
17/01/19 10:58:42 INFO kafka.PartitionManager: LastComitted Offset : 8
17/01/19 10:58:42 INFO kafka.PartitionManager: New Emitted Offset : 9
17/01/19 10:58:42 INFO kafka.PartitionManager: Enqueued Offset :8
17/01/19 10:58:42 INFO kafka.PartitionManager: Last Enqueued offset 8 not incremented since previous Comitted Offset 8 for partition  Partition{host=cdh005.testdev.local:9092, partition=0} for Consumer test_test_kpi_reg_user. Some issue in Process!!

it seems that this issue can be fix by saving processed offset(_lastEnquedOffset) to zookeeper and +1 to _emittedToOffset from the constructer
having README.md states that "the last processed offset is write in Zookeeper."
dont know if these change will affect other class or not

only tested for 1.0.6
but base on the logic this issue should reproduce in 1.0.8 as well

Consumer not able to read kafka topic and write to HDFS

The kafka-spark-consumer (v.1.0.6) job was reading kafka topic and writing to couchbase successfully on CDH 5.5.1 Cluster. But it is not reading or writing the data after we upgraded to CDH 5.9.0. Additionally we are also trying to write to HDFS along with Couchbase. The job just creates specified directories in HDFS and keeps running forever but does not read or write anything.
Specs:
kafka version: 0.8.2.0
Zookeeper version: zookeeper-3.4.5+cdh5.9.0+98
spark version: spark-1.6.0+cdh5.9.0+229

pom.xml :

dibbhatt
kafka-spark-consumer
1.0.6

We also tried to use latest version v1.0.7 but it didnt help.

Thanks
Srikanth

Consumer commit not proceeding the offset anymore

We are using kafka-spark-consumer verison 1.0.6, kafka 0.8.2 and here are our configurations-

zookeeper.consumer.path=/listingmetrics-impressions-kafka
kafka.consumer.id=listingmetricsimpressionssparkconsumer-prod-2
group.id=ListingMetricsImpressionsSparkConsumer-prod
consumer.forcefromstart=false
consumer.fetchsizebytes=102400
consumer.fillfreqm=250
consumer.receivers=1

The issue we are facing is that

Consumer commit not proceeding the offset anymore

Heres what we found upon debugging the code-
PartitionManager -> next() method and then
-> fill() method
However the KafkaUtils.fetchMessages call is not retuning any validBytes as well as no errors
The following call

ByteBufferMessageSet msgs =
          fetchResponse.messageSet(topic, _partition.partition);

has no messages and hence _emittedToOffset is not getting incremented

When the commit is called next in the flow, we are getting the following message-

Last Enqueued offset "
          + _lastEnquedOffset
            + " not incremented since previous Comitted Offset "
            + _lastComittedOffset
            + " for partition  "
            + _partition
            + " for Consumer "
            + _ConsumerId
            + ". Some issue in Process!!"

Can you help us with the resolution. Let us know if you need any other info.
Thanks.

Licensing concerns with current kafka-spark-consumer source code

Dibyendu,

first, thanks for your work on providing an improved Kafka consumer for Spark Streaming. Much appreciated!

I have been playing around with Kafka and Spark Streaming myself, and stumbled upon your project in the spark-user thread where you announced it last month. Since there are apparently still a couple of issues (including Spark issues) to be ironed out, I began reading your source code for further details on the current status of Kafka support in Spark Streaming -- actually because I thought that "Hey, the Apache Storm project has a reasonable Kafka spout/connector, maybe that code would help the Spark project to improve their own variant."

While reading your source code that I noticed that apparently most of the code is a verbatim copy of the Kafka spout of the Apache Storm project, which was originally created by wurstmeister. In both cases the code is licensed under the Apache License v2.0, which means you can't just copy the code -- there are some rules you must follow. (And both Apache Spark and Apache Storm, as ASF projects, are using the very same license, which also means it's easy to share code amongst the projects.) Notably, "you must give any other recipients of derivative work a copy of that license, you must cause any modified files to carry prominent notices stating that you changed the files, and you must retain, in the source form of any derivative works that you distribute, all copyright, patent, trademark, and attribution notices from the source form of the work, excluding those notices that do not pertain to any part of the derivative works". See Apache License v2.0 for details of what you would have to do/change/add/etc. to be license compliant.

I am sure you have done this in good faith, and I am making you aware of this issue primarily to help you.

Best wishes,
Michael

failure occurred when set spark.streaming.concurrentJobs from 1 to 2

Exception like this:

02 DEBUG zookeeper.ClientCnxn: Reading reply sessionid:0x3557739d57e0613, packet:: clientPath:null serverPath:null finished:false header:: 305,4 replyHeader:: 305,21475155982,-101 request:: '/brokers/ids/-1,F response::
16/06/22 19:03:02 ERROR kafka.DynamicBrokersReader: Node /brokers/ids/-1 does not exist

mvn repository you provided is not working

I can't download the jar file through http://dl.bintray.com/spark-packages/maven

  <dependency>
            <groupId>kafka.spark.consumer</groupId>
            <artifactId>kafka-spark-consumer</artifactId>
            <version>1.0.8</version>
        </dependency>
   <repositories>
        <!-- list of other repositories -->
        <repository>
            <id>SparkPackagesRepo</id>
            <url>http://dl.bintray.com/spark-packages/maven</url>
        </repository>
    </repositories>

is not working for me.

thanks.

Consumer how to use Kafka utilities like ConsumerOffsetChecker and get to show on Kafka Tool

We are using kafka-spark-consumer verison 1.0.6, kafka 0.8.2 and here are our configurations-

zookeeper.consumer.path=/listingmetrics-impressions-kafka
kafka.consumer.id=listingmetricsimpressionssparkconsumer-prod-2
group.id=ListingMetricsImpressionsSparkConsumer-prod
consumer.forcefromstart=false
consumer.fetchsizebytes=102400
consumer.fillfreqm=250
consumer.receivers=1

Out issue is that this consumer does not seem to be listed as a consumer group
when using the following kafka utility
kafka-run-class kafka.tools.ConsumerOffsetChecker --zookeeper my-host:my-port --topic impression --group listingmetricsimpressionssparkconsumer-prod-2

group.id=ListingMetricsImpressionsSparkConsumer-prod), group.id (ListingMetricsImpressionsSparkConsumer-prod)from the config

we get the following error Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/ListingMetricsImpressionsSparkConsumer-prod/offsets/impression/6.

The consumer shows for other flume related groups

kafka-run-class kafka.tools.ConsumerOffsetChecker --zookeeper my-host:my-port --topic impression --group flume
Group Topic Pid Offset logSize Lag Owner
flume impression 0 2195314576 2707268737 511954161 none
flume impression 1 2200432543 2712319730 511887187 none
flume impression 2 2188578827 2700580138 512001311 none
flume impression 3 2192263104 2704228698 511965594 none

Even on the Kafka Tool on the consumers list I dont see the spark-kafka-consumer.
Can you please let us know how we can get these utilities working for the spark kafka consumer

How many KafkaReceiver do I need?

I am trying to read from a single topic, which has 100 partitions.

The description of Kafka-Spark-Consumer mentions:

The logic will detect number of partitions for a topic and spawn that many Kafka Receivers.

But the example seems to suggest that the number of partitions has to be manually given, and that for each of them, a KafkaReceiver instance is required:

val kafkaStreams = (1 to partitions).map { i=>
    ssc.receiverStream(new KafkaReceiver(props, i))
}

Do I actually need to manually spawn 100 KafkaReceiver instances? Does it mean that 100 CPU cores have to be reserved to this?

It there a way to have receivers manage multiple partitions instead of having to spawn that many receivers?

Thanks in advance for your help.

Python api?

Just wanted to check if there is any way to use this package in pyspark :)

AbstractMethodError with Spark 1.6.0 and Kafka 0.10.2

I'm trying to use this library with older versions of Spark (1.6.0-cdh5.11.1) and Kafka (0.10.2-kafka-2.2.0), but while trying to persist the offsets after the application logic happened I get the mentioned error.

It seems to me that it is a version miss match between Scala versions. For me its not easy to switch to 2.11 scala so I guess my question would be: Is there a way to make your library work with my versions?

Below is the observed exception and the important bits of my pom file:

java.lang.AbstractMethodError: consumer.kafka.PartitionOffsetPair.call(Ljava/lang/Object;)Ljava/lang/Iterable;
	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$4$1.apply(JavaDStreamLike.scala:205)
	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$4$1.apply(JavaDStreamLike.scala:205)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/11/17 12:02:52 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.AbstractMethodError: consumer.kafka.PartitionOffsetPair.call(Ljava/lang/Object;)Ljava/lang/Iterable;
	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$4$1.apply(JavaDStreamLike.scala:205)
	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$4$1.apply(JavaDStreamLike.scala:205)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/11/17 12:02:52 INFO storage.DiskBlockManager: Shutdown hook called
               <dependency>
			<groupId>org.scala-lang</groupId>
			<artifactId>scala-library</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_2.10</artifactId>
			<exclusions>
				<exclusion>
					<groupId>org.scala-lang</groupId>
					<artifactId>scala-library</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming_2.10</artifactId>
			<exclusion>
				<artifactId>org.apache.kafka</artifactId>
				<groupId>kafka_2.10</groupId>
			</exclusion>
			<exclusion>
				<groupId>org.scala-lang</groupId>
				<artifactId>scala-library</artifactId>
			</exclusion>
		</dependency>
		<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase-spark</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.10</artifactId>
			<exclusions>
				<exclusion>
					<groupId>org.apache.zookeeper</groupId>
					<artifactId>zookeeper</artifactId>
				</exclusion>
				<exclusion>
					<groupId>log4j</groupId>
					<artifactId>log4j</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>dibbhatt</groupId>
			<artifactId>kafka-spark-consumer</artifactId>
			<version>1.0.12</version>
		</dependency>

Build Failure

I cloned the latest version and did a mvn install and it fails to build, few test cases are failing i think.
Here's the mvn stack:

[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 14.397s
[INFO] Finished at: Wed Feb 11 11:39:19 UTC 2015
[INFO] Final Memory: 27M/290M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.10:test (default-test) on project kafka-spark-consumer: There are test failures.
[ERROR] 
[ERROR] Please refer to /root/akhld/kafka-spark-consumer/target/surefire-reports for the individual test results.
[ERROR] -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException

And part of these /root/akhld/kafka-spark-consumer/target/surefire-reports/TEST-consumer.kafka.KafkaServerTest.xml and /root/akhld/kafka-spark-consumer/target/surefire-reports/TEST-consumer.kafka.ZKServerTest.xml file says:

<testcase time="1.112" classname="consumer.kafka.KafkaServerTest" name="sendMessageAndAssertValueForOffset">
    <error message="org.apache.zookeeper.ZooKeeper.&lt;init&gt;(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V" type="java.lang.NoSuchMethodError">java.lang.NoSuchMethodError: org.apache.zookeeper.ZooKeeper.&lt;init&gt;(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V
    at org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29)
    at org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:169)
    at org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94)
    at org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55)
    at org.apache.curator.ConnectionState.reset(ConnectionState.java:219)
    at org.apache.curator.ConnectionState.start(ConnectionState.java:103)
    at org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:188)
    at org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:234)
    at consumer.kafka.ZkState.&lt;init&gt;(ZkState.java:67)
    at consumer.kafka.KafkaServerTest.setUp(KafkaServerTest.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
    at org.apache.maven.surefire.junit4.JUnit4TestSet.execute(JUnit4TestSet.java:53)
    at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:123)
    at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:104)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:164)
    at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:110)
    at org.apache.maven.surefire.booter.SurefireStarter.invokeProvider(SurefireStarter.java:175)
    at org.apache.maven.surefire.booter.SurefireStarter.runSuitesInProcessWhenForked(SurefireStarter.java:107)
    at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:68)
</error>

This is my mvn version:

Apache Maven 3.0.4
Maven home: /usr/share/maven
Java version: 1.7.0_75, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-7-openjdk-amd64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.16.0-0.bpo.4-amd64", arch: "amd64", family: "unix"

MEMORY_ONLY, MEMORY_ONLY_* storage levels

Hi,

Since the very first version of kafka-spark-consumer, the only storage level I have been able to use are the disk-based ones such as MEMORY_AND_DISK_SER.

Unfortunately, this requires a lot of disk operations on hosts running the Kafka receivers, and disks then become a bottleneck. Disks I/O only happen on these hosts (the ones running the receivers); I don't know if this is the expected behavior.

When using MEMORY_ONLY or MEMORY_ONLY_SER, I quickly get a slew of exceptions:

15/05/06 16:25:11 WARN TaskSetManager: Lost task 69.0 in stage 3.0 (TID 141, node-00263.hadoop): java.lang.Exception:
Could not compute split, block input-1-1430922252737 not found
        at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
        at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
        at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
        at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:56)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

MEMORY_ONLY_2 and MEMORY_ONLY_SER_2 work amazingly well for a few minutes and eventually fail the same way.

Your expertise would be more than welcome.

Kafka Headers Support

I'm aware that this consumer supports 0.11 brokers. Does it provide access to the kafka message headers API that was added in 0.11 release?

Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

Dear all ,

I have created a JIRA to track the progress of contributing back this project to Apache Spark.

https://issues.apache.org/jira/browse/SPARK-11045

This project is now presently in spark-packages and I believe this is the correct time to contribute it to Apache Spark Project and give better options to larger community around Kafka Connectivity for Spark Streaming.

Those who are using this packages , kindly Vote for this JIRA.

Release on Spark Packages Repo

Hi,
Would you like to make a release of this on the Spark Packages Repository? This will allow users to easily include this package in their Spark Applications simply by adding the flag:
--packages dibbhatt/kafka-spark-consumer:0.1
to spark-shell, spark-submit, or even pyspark.

For this, you need to upload a "Release Artifact". You can make the release directly from the command line by simply using the spark-package command tool, with the command spark-package publish. Please refer to the README. Or you can go through the Release process on the webpage. Since your project contains java code, you will need to build your jar beforehand using maven.

Let me know if you have any questions or issues!

Best,
Burak

Kafka apache spark Streaming recive 0

i have lunched my code below

import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

import com.google.common.collect.Lists;

public final class JavaKafkaWordCount {
    private static final Pattern SPACE = Pattern.compile(" ");

    private JavaKafkaWordCount() {
    }

    public static void main(String[] args) {
        Logger.getLogger("org").setLevel(Level.OFF);
        Logger.getLogger("akka").setLevel(Level.OFF);
        SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount")
                .set("spark.master", "local[4]");
        // Create the context with a 1 second batch size
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
                new Duration(2000));

        // int numThreads = Integer.parseInt(args[3]);
        Map<String, Integer> topicMap = new HashMap<String, Integer>();
        // String[] topics = args[2].split(",");
        // for (String topic : topics) {
        topicMap.put("page_visits", 1);
        // }

        JavaPairReceiverInputDStream<String, String> messages = KafkaUtils
                .createStream(jssc, "127.0.0.1", "0", topicMap);

        JavaDStream<String> lines = messages
                .map(new Function<Tuple2<String, String>, String>() {
                    @Override
                    public String call(Tuple2<String, String> tuple2) {
                        return tuple2._2();
                    }
                });

        JavaDStream<String> words = lines
                .flatMap(new FlatMapFunction<String, String>() {
                    @Override
                    public Iterable<String> call(String x) {
                        return Lists.newArrayList(SPACE.split(x));
                    }
                });

        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                new PairFunction<String, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(String s) {
                        return new Tuple2<String, Integer>(s, 1);
                    }
                }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }
        });

        wordCounts.print();
        jssc.start();
        jssc.awaitTermination();
    }
}

but no data reciver the out put is only

-------------------------------------------
Time: 1434192136000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192138000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192140000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192142000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192144000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192146000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192148000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192150000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192152000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192154000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192156000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192158000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192160000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192162000 ms
-------------------------------------------

any solution please !

Released latest version 1.0.8 of the Receiver

Released latest version 1.0.8 of Receiver based Kafka Consumer for Spark Streaming.

Receiver is compatible with Kafka versions 0.8.x, 0.9.x and 0.10.x and All Spark Versions

Available at Spark Packages : https://spark-packages.org/package/dibbhatt/kafka-spark-consumer

Salient Features :

  • End to End No Data Loss without Write Ahead Log
  • ZK Based offset management for both consumed and processed offset
  • No dependency on WAL and Checkpoint for recovery on Driver failure
  • In-built PID Controller for Rate Limiting and Backpressure management
  • Custom Message Interceptor

Please refer to https://github.com/dibbhatt/kafka-spark-consumer/blob/master/README.md for more details

Exception: Could not compute split, block not found

I currently experience the above exception. It seems to happen when receiving data from multiple receivers and unioning said stream of data.

The exceptions always looks like this
17/11/20 18:52:59 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 19341
17/11/20 18:52:59 INFO executor.Executor: Running task 37.0 in stage 131.0 (TID 19341)
17/11/20 18:52:59 ERROR executor.Executor: Exception in task 37.0 in stage 131.0 (TID 19341)
java.lang.Exception: Could not compute split, block input-0-1511200296800 not found
  at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
  at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
17/11/20 18:53:00 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 19343
17/11/20 18:53:00 INFO executor.Executor: Running task 38.0 in stage 131.0 (TID 19343)
17/11/20 18:53:00 ERROR executor.Executor: Exception in task 38.0 in stage 131.0 (TID 19343)
java.lang.Exception: Could not compute split, block input-0-1511200297800 not found
  at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
  at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

The first time it happened after 45 minutes. The second time it happened after 2 minutes. First run used 4 receivers and 12 executors. Second run used 2 receivers and 8 executors.

Have you seen this problem before?

exception on simple example

 WARN [run-main-0] NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[error] (run-main-0) java.lang.NoClassDefFoundError: kafka/api/OffsetRequest
java.lang.NoClassDefFoundError: kafka/api/OffsetRequest
    at consumer.kafka.KafkaConfig.<init>(KafkaConfig.java:38)
    at consumer.kafka.ReceiverLauncher.createStream(ReceiverLauncher.java:88)
    at consumer.kafka.ReceiverLauncher.launch(ReceiverLauncher.java:66)
    at it.dtk.KafkaConsumerTest$.main(KafkaConsumerTest.scala:48)
    at it.dtk.KafkaConsumerTest.main(KafkaConsumerTest.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
Caused by: java.lang.ClassNotFoundException: kafka.api.OffsetRequest
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at consumer.kafka.KafkaConfig.<init>(KafkaConfig.java:38)
    at consumer.kafka.ReceiverLauncher.createStream(ReceiverLauncher.java:88)
    at consumer.kafka.ReceiverLauncher.launch(ReceiverLauncher.java:66)
    at it.dtk.KafkaConsumerTest$.main(KafkaConsumerTest.scala:48)
    at it.dtk.KafkaConsumerTest.main(KafkaConsumerTest.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)

Unable to pull Batch info metrics using StreamingListener Interface

I have a jobListener extending StreamingListener interface to perform some tasks onBatchCompleted, it works pretty well with native SparkStreaming and KafkaUtil. But its giving wrong values when i used with this lowlevel consumer.

Here's the Listener class:

private class JobListenern(ssc: StreamingContext) extends StreamingListener {

  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {

    var totalProcessedRecords = 0L
    println("====> Total delay:  " + batchCompleted.batchInfo.totalDelay.getOrElse(-1) + " ms")

    batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
      totalProcessedRecords += infos.map(_.numRecords).sum        
    }
    println("\n=====> Recieved Events: "+ totalProcessedRecords)

  }
}

You can attach it to your ssc as:

val listen = new JobListenern(ssc)
ssc.addStreamingListener(listen)

ssc.start()
ssc.awaitTermination()

Let me know if there's some other way to pull batch info

ReceiverLauncher.launch can not get data

The blow is the log of the application, in the log i found spark dstream always consume the kafka, but it can not do nothing.

My code:
val tmp_stream = ReceiverLauncher.launch(ssc, props, numberOfReceivers, StorageLevel.MEMORY_ONLY)
tmp_stream.foreachRDD(rdd => {
if (!rdd.isEmpty()){
val today = DateTime.now
val nowDay = today.toString("yyyy-MM-dd")
val lastDay = today.minusDays(1).toString("yyyy-MM-dd")
val nowDayType = today.toString("yyyy.MM.dd")
val lastDayType = today.minusDays(1).toString("yyyy.MM.dd")
println(nowDay)
sqlContext.createDataFrame(rdd.map(x => new String(x.getPayload, "UTF-8"))
.flatMap(checkClick), affiliate_data_schema).where(s"dt = '${nowDay}'")
.saveToEs(s"spark-aflt-all-record-${nowDayType}/record", esconf)
sqlContext.createDataFrame(rdd.map(x => new String(x.getPayload, "UTF-8"))
.flatMap(checkClick), affiliate_data_schema).where(s"dt = '${lastDay}'")
.saveToEs(s"spark-aflt-all-record-${lastDayType}/record", esconf)
} else {
println("rdd is empty")
}
} )

  • 17/02/28 21:37:06 INFO Version: Elasticsearch Hadoop v5.0.0-alpha5 [c513369b06]
  • 17/02/28 21:37:07 INFO EsDataFrameWriter: Writing to [spark-aflt-all-record-2017.02.28/record]
  • 17/02/28 21:37:07 INFO EsDataFrameWriter: Writing to [spark-aflt-all-record-2017.02.28/record]

17/02/28 21:37:02 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 40.3 KB, free 1618.0 MB)
17/02/28 21:37:02 INFO PartitionManager: Consumed offset 2438034718 for Partition 1 written to ZK
17/02/28 21:37:03 INFO MemoryStore: Block input-1-1488346622800 stored as values in memory (estimated size 1055.5 KB, free 1617.0 MB)
17/02/28 21:37:03 INFO BlockGenerator: Pushed block input-1-1488346622800
17/02/28 21:37:03 INFO BlockManager: Found block input-1-1488346618800 locally
17/02/28 21:37:03 INFO BlockManager: Found block input-1-1488346619000 locally
17/02/28 21:37:03 INFO PartitionManager: Consumed offset 2438036464 for Partition 1 written to ZK
17/02/28 21:37:04 INFO MemoryStore: Block input-1-1488346623800 stored as values in memory (estimated size 1035.7 KB, free 1616.0 MB)
17/02/28 21:37:04 INFO BlockGenerator: Pushed block input-1-1488346623800
17/02/28 21:37:04 INFO PartitionManager: Consumed offset 2438036893 for Partition 1 written to ZK
17/02/28 21:37:05 INFO MemoryStore: Block input-1-1488346624800 stored as values in memory (estimated size 248.7 KB, free 1615.7 MB)
17/02/28 21:37:05 INFO BlockGenerator: Pushed block input-1-1488346624800
17/02/28 21:37:05 INFO CodeGenerator: Code generated in 1382.889398 ms
17/02/28 21:37:05 INFO CodeGenerator: Code generated in 289.775591 ms
17/02/28 21:37:05 INFO PartitionManager: Consumed offset 2438037051 for Partition 1 written to ZK
17/02/28 21:37:06 INFO MemoryStore: Block input-1-1488346625800 stored as values in memory (estimated size 99.6 KB, free 1615.6 MB)
17/02/28 21:37:06 INFO BlockGenerator: Pushed block input-1-1488346625800
17/02/28 21:37:06 INFO CodeGenerator: Code generated in 341.839913 ms
17/02/28 21:37:06 INFO Version: Elasticsearch Hadoop v5.0.0-alpha5 [c513369b06]
17/02/28 21:37:07 INFO EsDataFrameWriter: Writing to [spark-aflt-all-record-2017.02.28/record]
17/02/28 21:37:07 INFO EsDataFrameWriter: Writing to [spark-aflt-all-record-2017.02.28/record]or: Pushed block input-1-1488346628800
17/02/28 21:37:10 INFO PartitionManager: Consumed offset 2438037474 for Partition 1 written to ZK
17/02/28 21:37:11 INFO MemoryStore: Block input-1-1488346630800 stored as values in memory (estimated size 88.7 KB, free 1615.4 MB)
17/02/28 21:37:11 INFO BlockGenerator: Pushed block input-1-1488346630800
17/02/28 21:37:11 INFO PartitionManager: Consumed offset 2438037671 for Partition 1 written to ZK
17/02/28 21:37:12 INFO MemoryStore: Block input-1-1488346631800 stored as values in memory (estimated size 121.1 KB, free 1615.3 MB)
17/02/28 21:37:12 INFO BlockGenerator: Pushed block input-1-1488346631800
17/02/28 21:37:12 INFO PartitionManager: Consumed offset 2438037859 for Partition 1 written to ZK
17/02/28 21:37:13 INFO MemoryStore: Block input-1-1488346632800 stored as values in memory (estimated size 115.8 KB, free 1615.1 MB)
17/02/28 21:37:13 INFO BlockGenerator: Pushed block input-1-1488346632800
17/02/28 21:37:14 INFO CodeGenerator: Code generated in 909.674488 ms
17/02/28 21:37:15 INFO PartitionManager: Consumed offset 2438038010 for Partition 1 written to ZK
17/02/28 21:37:16 INFO MemoryStore: Block input-1-1488346635800 stored as values in memory (estimated size 95.0 KB, free 1615.0 MB)
17/02/28 21:37:16 INFO BlockGenerator: Pushed block input-1-1488346635800
17/02/28 21:37:16 INFO Executor: Finished task 3.0 in stage 6.0 (TID 79). 1366 bytes result sent to driver
17/02/28 21:37:16 INFO PartitionManager: Consumed offset 2438038227 for Partition 1 written to ZK
17/02/28 21:37:17 INFO MemoryStore: Block input-1-1488346636800 stored as values in memory (estimated size 136.0 KB, free 1614.9 MB)
17/02/28 21:37:17 INFO BlockGenerator: Pushed block input-1-1488346636800
17/02/28 21:37:17 INFO Executor: Finished task 2.0 in stage 6.0 (TID 77). 2165 bytes result sent to driver
17/02/28 21:37:18 INFO PartitionManager: Consumed offset 2438038614 for Partition 1 written to ZK
17/02/28 21:37:19 INFO MemoryStore: Block input-1-1488346638800 stored as values in memory (estimated size 240.8 KB, free 1614.7 MB)
17/02/28 21:37:19 INFO BlockGenerator: Pushed block input-1-1488346638800
17/02/28 21:37:21 INFO PartitionManager: Consumed offset 2438038949 for Partition 1 written to ZK
17/02/28 21:37:21 INFO MemoryStore: Block input-1-1488346641000 stored as values in memory (estimated size 209.7 KB, free 1614.5 MB)
17/02/28 21:37:21 INFO BlockGenerator: Pushed block input-1-1488346641000
17/02/28 21:37:22 INFO PartitionManager: Consumed offset 2438039279 for Partition 1 written to ZK
17/02/28 21:37:22 INFO MemoryStore: Block input-1-1488346642000 stored as values in memory (estimated size 207.3 KB, free 1614.3 MB)
17/02/28 21:37:22 INFO BlockGenerator: Pushed block input-1-1488346642000
17/02/28 21:37:24 INFO PartitionManager: Consumed offset 2438039451 for Partition 1 written to ZK
17/02/28 21:37:24 INFO MemoryStore: Block input-1-1488346644000 stored as values in memory (estimated size 106.9 KB, free 1614.2 MB)
17/02/28 21:37:24 INFO BlockGenerator: Pushed block input-1-1488346644000
17/02/28 21:37:26 INFO PartitionManager: Consumed offset 2438039633 for Partition 1 written to ZK
17/02/28 21:37:26 INFO MemoryStore: Block input-1-1488346646000 stored as values in memory (estimated size 113.0 KB, free 1614.1 MB)
17/02/28 21:37:26 INFO BlockGenerator: Pushed block input-1-1488346646000
17/02/28 21:37:27 INFO PartitionManager: Consumed offset 2438039793 for Partition 1 written to ZK
17/02/28 21:37:27 INFO MemoryStore: Block input-1-1488346647000 stored as values in memory (estimated size 98.4 KB, free 1614.0 MB)
17/02/28 21:37:27 INFO BlockGenerator: Pushed block input-1-1488346647000
17/02/28 21:37:31 INFO PartitionManager: Consumed offset 2438040131 for Partition 1 written to ZK
17/02/28 21:37:31 INFO MemoryStore: Block input-1-1488346651000 stored as values in memory (estimated size 210.8 KB, free 1613.8 MB)
17/02/28 21:37:31 INFO BlockGenerator: Pushed block input-1-1488346651000
17/02/28 21:37:36 INFO PartitionManager: Consumed offset 2438040293 for Partition 1 written to ZK
17/02/28 21:37:36 INFO MemoryStore: Block input-1-1488346656000 stored as values in memory (estimated size 101.0 KB, free 1613.7 MB)
17/02/28 21:37:36 INFO BlockGenerator: Pushed block input-1-1488346656000
17/02/28 21:37:39 INFO PartitionManager: Consumed offset 2438040484 for Partition 1 written to ZK
17/02/28 21:37:39 INFO MemoryStore: Block input-1-1488346659000 stored as values in memory (estimated size 121.8 KB, free 1613.5 MB)
17/02/28 21:37:39 INFO BlockGenerator: Pushed block input-1-1488346659000
17/02/28 21:37:41 INFO PartitionManager: Consumed offset 2438040614 for Partition 1 written to ZK
17/02/28 21:37:41 INFO MemoryStore: Block input-1-1488346661000 stored as values in memory (estimated size 81.1 KB, free 1613.5 MB)
17/02/28 21:37:41 INFO BlockGenerator: Pushed block input-1-1488346661000
17/02/28 21:37:42 INFO PartitionManager: Consumed offset 2438040749 for Partition 1 written to ZK
17/02/28 21:37:42 INFO MemoryStore: Block input-1-1488346662000 stored as values in memory (estimated size 83.3 KB, free 1613.4 MB)
17/02/28 21:37:42 INFO BlockGenerator: Pushed block input-1-1488346662000
17/02/28 21:37:43 INFO PartitionManager: Consumed offset 2438041118 for Partition 1 written to ZK
17/02/28 21:37:43 INFO MemoryStore: Block input-1-1488346663000 stored as values in memory (estimated size 228.7 KB, free 1613.2 MB)
17/02/28 21:37:43 INFO BlockGenerator: Pushed block input-1-1488346663000
17/02/28 21:37:47 INFO PartitionManager: Consumed offset 2438041299 for Partition 1 written to ZK
17/02/28 21:37:47 INFO MemoryStore: Block input-1-1488346667000 stored as values in memory (estimated size 114.0 KB, free 1613.0 MB)
17/02/28 21:37:47 INFO BlockGenerator: Pushed block input-1-1488346667000
17/02/28 21:37:49 INFO PartitionManager: Consumed offset 2438041670 for Partition 1 written to ZK
17/02/28 21:37:49 INFO MemoryStore: Block input-1-1488346669000 stored as values in memory (estimated size 231.0 KB, free 1612.8 MB)
17/02/28 21:37:49 INFO BlockGenerator: Pushed block input-1-1488346669000
17/02/28 21:37:51 INFO PartitionManager: Consumed offset 2438041997 for Partition 1 written to ZK
17/02/28 21:37:51 INFO MemoryStore: Block input-1-1488346671000 stored as values in memory (estimated size 206.4 KB, free 1612.6 MB)
17/02/28 21:37:51 INFO BlockGenerator: Pushed block input-1-1488346671000
17/02/28 21:37:53 INFO PartitionManager: Consumed offset 2438042157 for Partition 1 written to ZK
17/02/28 21:37:53 INFO MemoryStore: Block input-1-1488346673000 stored as values in memory (estimated size 102.0 KB, free 1612.5 MB)
17/02/28 21:37:53 INFO BlockGenerator: Pushed block input-1-1488346673000
17/02/28 21:37:56 INFO PartitionManager: Consumed offset 2438042349 for Partition 1 written to ZK
17/02/28 21:37:56 INFO MemoryStore: Block input-1-1488346676000 stored as values in memory (estimated size 116.8 KB, free 1612.4 MB)
17/02/28 21:37:56 INFO BlockGenerator: Pushed block input-1-1488346676000
17/02/28 21:37:57 INFO PartitionManager: Consumed offset 2438042826 for Partition 1 written to ZK
17/02/28 21:37:57 INFO MemoryStore: Block input-1-1488346677000 stored as values in memory (estimated size 251.6 KB, free 1612.2 MB)
17/02/28 21:37:57 INFO BlockGenerator: Pushed block input-1-1488346677000
17/02/28 21:37:58 INFO PartitionManager: Consumed offset 2438043001 for Partition 1 written to ZK
17/02/28 21:37:58 INFO MemoryStore: Block input-1-1488346678000 stored as values in memory (estimated size 109.5 KB, free 1612.0 MB)
17/02/28 21:37:58 INFO BlockGenerator: Pushed block input-1-1488346678000
17/02/28 21:38:01 INFO PartitionManager: Consumed offset 2438043002 for Partition 1 written to ZK
17/02/28 21:38:01 INFO MemoryStore: Block input-1-1488346681000 stored as values in memory (estimated size 968.0 B, free 1612.0 MB)
17/02/28 21:38:01 INFO BlockGenerator: Pushed block input-1-1488346681000
17/02/28 21:38:02 INFO PartitionManager: Consumed offset 2438043003 for Partition 1 written to ZK
17/02/28 21:38:02 INFO MemoryStore: Block input-1-1488346682000 stored as values in memory (estimated size 1232.0 B, free 1612.0 MB)
17/02/28 21:38:02 INFO BlockGenerator: Pushed block input-1-1488346682000
17/02/28 21:38:03 INFO PartitionManager: Consumed offset 2438043004 for Partition 1 written to ZK
17/02/28 21:38:03 INFO MemoryStore: Block input-1-1488346683000 stored as values in memory (estimated size 888.0 B, free 1612.0 MB)
17/02/28 21:38:03 INFO BlockGenerator: Pushed block input-1-1488346683000
17/02/28 21:38:04 INFO PartitionManager: Consumed offset 2438043005 for Partition 1 written to ZK
17/02/28 21:38:04 INFO MemoryStore: Block input-1-1488346684000 stored as values in memory (estimated size 1072.0 B, free 1612.0 MB)
17/02/28 21:38:04 INFO BlockGenerator: Pushed block input-1-1488346684000
17/02/28 21:38:05 INFO PartitionManager: Consumed offset 2438043006 for Partition 1 written to ZK
17/02/28 21:38:05 INFO MemoryStore: Block input-1-1488346685000 stored as values in memory (estimated size 1232.0 B, free 1612.0 MB)
17/02/28 21:38:05 INFO BlockGenerator: Pushed block input-1-1488346685000
17/02/28 21:38:06 INFO PartitionManager: Consumed offset 2438043007 for Partition 1 written to ZK
17/02/28 21:38:06 INFO MemoryStore: Block input-1-1488346686000 stored as values in memory (estimated size 1016.0 B, free 1612.0 MB)
17/02/28 21:38:06 INFO BlockGenerator: Pushed block input-1-1488346686000
17/02/28 21:38:07 INFO PartitionManager: Consumed offset 2438043008 for Partition 1 written to ZK
17/02/28 21:38:07 INFO MemoryStore: Block input-1-1488346687000 stored as values in memory (estimated size 1032.0 B, free 1612.0 MB)
17/02/28 21:38:07 INFO BlockGenerator: Pushed block input-1-1488346687000
17/02/28 21:38:08 INFO PartitionManager: Consumed offset 2438043009 for Partition 1 written to ZK
17/02/28 21:38:08 INFO MemoryStore: Block input-1-1488346688000 stored as values in memory (estimated size 1016.0 B, free 1612.0 MB)
17/02/28 21:38:08 INFO BlockGenerator: Pushed block input-1-1488346688000
17/02/28 21:38:09 INFO PartitionManager: Consumed offset 2438043011 for Partition 1 written to ZK
17/02/28 21:38:09 INFO MemoryStore: Block input-1-1488346689000 stored as values in memory (estimated size 1384.0 B, free 1612.0 MB)
17/02/28 21:38:09 INFO BlockGenerator: Pushed block input-1-1488346689000
17/02/28 21:38:10 INFO PartitionManager: Consumed offset 2438043012 for Partition 1 written to ZK
17/02/28 21:38:10 INFO MemoryStore: Block input-1-1488346690000 stored as values in memory (estimated size 912.0 B, free 1612.0 MB)
17/02/28 21:38:10 INFO BlockGenerator: Pushed block input-1-1488346690000
17/02/28 21:38:11 INFO PartitionManager: Consumed offset 2438043013 for Partition 1 written to ZK

Single message to kafka not processed in some cases

I sent a single message to kafka topic with 3 partitions. It landed in partition 0. It looks like there is a chance in PartitionManager that _lastEnquedOffset == _lastComittedOffset, and with a single message in _dataBuffer.

        if ((_lastEnquedOffset > _lastComittedOffset)
                && (_waitingToEmit.isEmpty())) {

basically will wait for another message to come in to this partition before flushing out the last message. I had to change this condition to

        if ((_lastEnquedOffset >= _lastComittedOffset)
                && (_waitingToEmit.isEmpty())) {

to avoid this issue.

Running application on 3 node cluster, 3 instances of the same application are shown with different user

I'm running this application on a 3 node cluster, 1 with root and other 2 with ec2-user. when I submit the application, it shows 3 instances in web UI, with the following users: 1 root and 2 ec2-user.
But the 2 instances with ec2-user are in waiting state.

I'm submitting the application from root machine.

Not able to figure out the reason for this.

One more issue that I'm facing is :
Since the default Job scheduling in standalone mode in FIFO. I've reduced the cores for each application. So this works fine but if I try to run more than that i.e let say 5 application others go in waiting state.

Is there any other type of Job Scheduling that I can use for this, So that all the application can run in parallel with no one in waiting state.

After some hours not processing because of executor fail error

Hi Dibbhatt,

I am facing some issue, when using this project for a longer duration.

ERROR TaskSchedulerImpl: Lost executor 0 on : Executor heartbeat timed out after 161334 ms
[Stage 2:> (0 + 4) / 4][Stage 1019:> (0 + 0) / 2]15/08/14 20:58:10 ERROR TaskSchedulerImpl: Lost executor 1 on : remote Rpc client disassociated
15/08/14 20:58:10 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 1
[Stage 2:> (0 + 4) / 4][Stage 1019:> (0 + 0) / 2]15/08/14 20:59:47 ERROR TaskSchedulerImpl: Lost an executor 1 (already removed): Executor heartbeat timed out after 124846 ms
[Stage 2:> (0 + 4) / 4][Stage 1019:> (0 + 0) / 2]15/08/14 21:08:47 ERROR TaskSchedulerImpl: Lost executor 0 on : Executor heartbeat timed out after 146607 ms
[Stage 2:> (0 + 4) / 4][Stage 1019:> (0 + 0) / 2]15/08/14 21:24:20 ERROR TaskSchedulerImpl: Lost executor 5 on : remote Rpc client disassociated
15/08/14 21:24:20 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 5
[Stage 2:> (0 + 4) / 4][Stage 1019:> (0 + 0) / 2]15/08/14 21:26:47 ERROR TaskSchedulerImpl: Lost an executor 5 (already removed): Executor heartbeat timed out after 165821 ms

I am using a 2 node cluster with 4 executors on each and with 5 GB and 2 cores for each executor.

One more question that I have is :

This consumer always gives data from the last saved offset, Is there a way to neglect that offset and receive only the real time data?

Regards,
Sorabh

app client consumer doesn´t consume offsets from kafka topic

I´m submitting java jobs to yarn by use of yarn-client mode in a single node.

The job has a simple logic:

  • readStreaming from a kafka topic with spark streaming
  • making some logic
  • persisting data in other kafka topic

I send the jar file from console to jobserver (works as a proxy with yarn) and jobserver send the job to yarn working as yarn-client.

All is in the same host but working as docker containers:
Yarn is inside a docker (cloudera docker)
Jobserver is inside other docker

But I´ve a weird behaviour with Kafka consumer because messages are sent, committed but not processed.

I tried with KafkaUtils createStream and also got the same issue but not with KafkaUtils createDirectStream which runs right.

I´d need to control the offsets read to know from which one start to read so I have to use kafka consumer...

I found the issue when tried to check spark jobserver context...Inside the job logic, If I stopped the current context and create a new one in spark local mode, it works fine, but it´s not the desired behaviour because two context are created and the second one doesn´t run in yarn...

I know it´s a specific case but perhaps You have found a similar issue and could help me with it because I saw the log trace and there´s no error consuming offsets, it shows that messages are committed but never read...

The current fw versions are:
kafka spark consumer 1.0.8
spark streaming 1.6.0
kafka 0.10.0 (tried also 0.8.2.1)
jobserver 0.6.1
jdk 1.8

Let me know if you need any other info.

Thanks

Unable to receive data

Hi Dibbhatt,

A am facing some issue after changing one of the zookeeper nodes. I've changed all the settings related to zookeeper ip's. But getting the following:

ERROR ReceiverTracker: Deregistered receiver for stream 1: Error starting receiver 1 - java.lang.RuntimeException: java.lang.RuntimeException: java.lang.IllegalStateException: instance must be started before calling this method
at consumer.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:73)
at consumer.kafka.ZkCoordinator.(ZkCoordinator.java:64)
at consumer.kafka.KafkaConsumer.open(KafkaConsumer.java:61)
at consumer.kafka.client.KafkaRangeReceiver.start(KafkaRangeReceiver.java:73)
at consumer.kafka.client.KafkaRangeReceiver.onStart(KafkaRangeReceiver.java:58)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:125)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:109)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:308)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:300)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.IllegalStateException: instance must be started before calling this method
at consumer.kafka.DynamicBrokersReader.getLeaderFor(DynamicBrokersReader.java:117)
at consumer.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:62)
... 16 more
Caused by: java.lang.IllegalStateException: instance must be started before calling this method
at org.spark-project.guava.base.Preconditions.checkState(Preconditions.java:149)
at org.apache.curator.framework.imps.CuratorFrameworkImpl.getData(CuratorFrameworkImpl.java:360)
at consumer.kafka.DynamicBrokersReader.getLeaderFor(DynamicBrokersReader.java:110)
... 17 more

15/09/15 12:56:45 ERROR ReceiverTracker: Deregistered receiver for stream 1: Error starting receiver 1 - java.lang.RuntimeException: java.lang.RuntimeException: java.lang.IllegalStateException: instance must be started before calling this method
at consumer.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:73)
at consumer.kafka.ZkCoordinator.(ZkCoordinator.java:64)
at consumer.kafka.KafkaConsumer.open(KafkaConsumer.java:61)
at consumer.kafka.client.KafkaRangeReceiver.start(KafkaRangeReceiver.java:73)
at consumer.kafka.client.KafkaRangeReceiver.onStart(KafkaRangeReceiver.java:58)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:125)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:109)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:308)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:300)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.IllegalStateException: instance must be started before calling this method
at consumer.kafka.DynamicBrokersReader.getLeaderFor(DynamicBrokersReader.java:117)
at consumer.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:62)
... 16 more
Caused by: java.lang.IllegalStateException: instance must be started before calling this method
at org.spark-project.guava.base.Preconditions.checkState(Preconditions.java:149)
at org.apache.curator.framework.imps.CuratorFrameworkImpl.getData(CuratorFrameworkImpl.java:360)
at consumer.kafka.DynamicBrokersReader.getLeaderFor(DynamicBrokersReader.java:110)
... 17 more

entered forEachRDD
[Stage 2:> (0 + 4) / 4][Stage 3:> (0 + 0) / 14]
15/09/15 12:57:24 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.RuntimeException: java.lang.RuntimeException: java.lang.IllegalStateException: instance must be started before calling this method
at consumer.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:73)
at consumer.kafka.ZkCoordinator.(ZkCoordinator.java:64)
at consumer.kafka.KafkaConsumer.open(KafkaConsumer.java:61)
at consumer.kafka.client.KafkaRangeReceiver.start(KafkaRangeReceiver.java:73)
at consumer.kafka.client.KafkaRangeReceiver.onStart(KafkaRangeReceiver.java:58)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:125)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:109)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:308)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:300)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.IllegalStateException: instance must be started before calling this method
at consumer.kafka.DynamicBrokersReader.getLeaderFor(DynamicBrokersReader.java:117)
at consumer.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:62)
... 16 more
Caused by: java.lang.IllegalStateException: instance must be started before calling this method
at org.spark-project.guava.base.Preconditions.checkState(Preconditions.java:149)
at org.apache.curator.framework.imps.CuratorFrameworkImpl.getData(CuratorFrameworkImpl.java:360)
at consumer.kafka.DynamicBrokersReader.getLeaderFor(DynamicBrokersReader.java:110)
... 17 more

Regards,
Sorabh

No Data reciver Apache kafka apache spark

hi i'am using apache kafka and apache spark 1.3 and i test this code
def main(args: Array[String]) {
val ctx = new SparkContext("local[4]", "Spark Streamin")
val kafkaParams = Map("metadata.broker.list" -> "127.0.1:2182")
val topics = Set("page_visits")
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
kafkaStream.foreachRDD(rdd => print())
ssc.start()
ssc.awaitTermination()
}
and my tomic its work when i start java programme too produce and consume data but in streaming no reciver data
Help pleas!

getting GC : outofmemory exception

Hi,

I am using this consumer with the following settings:

.set("spark.cleaner.ttl", "800")
.set("spark.executor.memory", "8g")
.set("spark.driver.memory", "8g")
.set("spark.driver.maxResultSize", "10g")

I need to do my aggregations on 10 minutes of data, so I am creating JavaStreamingContext with 10 minutes (600000 milliseconds).

The data is around 5 - 7 lakh records every 10 min.

The program is throwing GC outofmemory exception, so I increased the jvm heap size to 16 GB.
But still I am getting the same error after 20-30 minutes.

And when I checked the memory consumption, this process is consuming around 57-58 percent of my memory. My machine's total memory is 30 GB.

Please let me know what could be the reason behind this.

Thanks,

java.lang.ClassNotFoundException: consumer.kafka.client.KafkaReceiver

Hello~ @dibbhatt
I used this framework to integrate Kafka and Spark Streaming. But I have some problems.
My cluster is based on Spark2.0.0 , Kafka 0.10.1.0 , ZK-3.4.9.
And I fixed some syntax errors in "ProcessedOffsetManager.java" because it may be some difference between spark1.6.0 and spark2.0.0.

`
package consumer.kafka;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.dstream.DStream;

import scala.Tuple2;
import scala.reflect.ClassTag;

import com.google.common.collect.ImmutableMap;

public class ProcessedOffsetManager {

private static void persistProcessedOffsets(Properties props, Map<Integer, Long> partitionOffsetMap) {
ZkState state = new ZkState(props.getProperty(Config.ZOOKEEPER_CONSUMER_CONNECTION));
for(Map.Entry<Integer, Long> po : partitionOffsetMap.entrySet()) {
Map<Object, Object> data = (Map<Object, Object>) ImmutableMap
.builder()
.put("consumer",ImmutableMap.of("id",props.getProperty(Config.KAFKA_CONSUMER_ID)))
.put("offset", po.getValue())
.put("partition",po.getKey())
.put("broker",ImmutableMap.of("host", "", "port", ""))
.put("topic", props.getProperty(Config.KAFKA_TOPIC)).build();
String path = processedPath(po.getKey(), props);
try{
state.writeJSON(path, data);
}catch (Exception ex) {
state.close();
throw ex;
}
}
state.close();
}

private static String processedPath(int partition, Properties props) {
return props.getProperty(Config.ZOOKEEPER_CONSUMER_PATH)
+ "/" + props.getProperty(Config.KAFKA_CONSUMER_ID) + "/"
+ props.getProperty(Config.KAFKA_TOPIC)
+ "/processed/" + "partition_"+ partition;
}

public static JavaPairDStream<Integer, Iterable> getPartitionOffset(JavaDStream unionStreams) {

// JavaPairDStream<Integer, Long> partitonOffsetStream = unionStreams.mapPartitionsToPair
// (new PairFlatMapFunction<Iterator, Integer, Long>() {
// @OverRide
// public Iterable<Tuple2<Integer, Long>> call(Iterator entry) throws Exception {
// MessageAndMetadata mmeta = null;
// List<Tuple2<Integer, Long>> l = new ArrayList<Tuple2<Integer, Long>>();
// while(entry.hasNext()) {
// mmeta = entry.next();
// }
// if(mmeta != null) {
// l.add(new Tuple2<Integer, Long>(mmeta.getPartition().partition,mmeta.getOffset()));
// }
// return l;
// }
// });
JavaPairDStream<Integer, Long> partitonOffsetStream = unionStreams.mapPartitionsToPair
(new PairFlatMapFunction<Iterator, Integer, Long>() {

      @Override
      public Iterator<Tuple2<Integer, Long>> call(Iterator<MessageAndMetadata> entry) throws Exception {
        MessageAndMetadata mmeta = null;
        List<Tuple2<Integer, Long>> l = new ArrayList<Tuple2<Integer, Long>>();
        while(entry.hasNext()) {
          mmeta = entry.next();
        }
        if(mmeta != null) {
          l.add(new Tuple2<Integer, Long>(mmeta.getPartition().partition,mmeta.getOffset()));
        }
        return l.iterator();
      }
    });
JavaPairDStream<Integer, Iterable<Long>> partitonOffset = partitonOffsetStream.groupByKey(1);
return partitonOffset;

}

@SuppressWarnings("deprecation")
public static void persists(JavaPairDStream<Integer, Iterable> partitonOffset, Properties props) {
//spark 2.0.0
partitonOffset.foreachRDD(new VoidFunction<JavaPairRDD<Integer, Iterable>>() {
@OverRide
public void call(JavaPairRDD<Integer, Iterable> po) throws Exception {
List<Tuple2<Integer, Iterable>> poList = po.collect();
System.out.println("ProcessedOffsetManager persist list size = " + poList.size());
Map<Integer, Long> partitionOffsetMap = new HashMap<Integer, Long>();
for(Tuple2<Integer, Iterable> tuple : poList) {
int partition = tuple._1();
Long offset = getMaximum(tuple._2());
partitionOffsetMap.put(partition, offset);
}
persistProcessedOffsets(props, partitionOffsetMap);
}

  public <T extends Comparable<T>> T getMaximum(Iterable<T> values) {
    T max = null;
    for (T value : values) {
      if (max == null || max.compareTo(value) < 0) {
        max = value;
      }
    }
    return max;
  }
});

//spark 1.6.0

// partitonOffset.foreachRDD(new Function<JavaPairRDD<Integer,Iterable>, Void>() {
// @OverRide
// public Void call(JavaPairRDD<Integer, Iterable> po) throws Exception {
// List<Tuple2<Integer, Iterable>> poList = po.collect();
// Map<Integer, Long> partitionOffsetMap = new HashMap<Integer, Long>();
// for(Tuple2<Integer, Iterable> tuple : poList) {
// int partition = tuple._1();
// Long offset = getMaximum(tuple._2());
// partitionOffsetMap.put(partition, offset);
// }
// persistProcessedOffsets(props, partitionOffsetMap);
// return null;
// }
// public <T extends Comparable> T getMaximum(Iterable values) {
// T max = null;
// for (T value : values) {
// if (max == null || max.compareTo(value) < 0) {
// max = value;
// }
// }
// return max;
// }
// });
}

public static DStream<Tuple2<Integer, Iterable>> getPartitionOffset(DStream unionStreams) {
ClassTag messageMetaClassTag =
ScalaUtil.getClassTag(MessageAndMetadata.class);
JavaDStream javaDStream =
new JavaDStream(unionStreams, messageMetaClassTag);
// JavaPairDStream<Integer, Long> partitonOffsetStream = javaDStream.mapPartitionsToPair
// (new PairFlatMapFunction<Iterator, Integer, Long>() {
// @OverRide
// public Iterable<Tuple2<Integer, Long>> call(Iterator entry)
// throws Exception {
// MessageAndMetadata mmeta = null;
// List<Tuple2<Integer, Long>> l = new ArrayList<Tuple2<Integer, Long>>();
// while(entry.hasNext()) {
// mmeta = entry.next();
// }
// if(mmeta != null) {
// l.add(new Tuple2<Integer, Long>(mmeta.getPartition().partition,mmeta.getOffset()));
// }
// return l;
// }
// });
JavaPairDStream<Integer, Long> partitonOffsetStream = javaDStream.mapPartitionsToPair
(new PairFlatMapFunction<Iterator, Integer, Long>() {
@OverRide
public Iterator<Tuple2<Integer, Long>> call(Iterator entry) throws Exception {
MessageAndMetadata mmeta = null;
List<Tuple2<Integer, Long>> l = new ArrayList<Tuple2<Integer, Long>>();
while(entry.hasNext()) {
mmeta = entry.next();
}
if(mmeta != null) {
l.add(new Tuple2<Integer, Long>(mmeta.getPartition().partition,mmeta.getOffset()));
}
return l.iterator();
}
});
JavaPairDStream<Integer, Iterable> partitonOffset = partitonOffsetStream.groupByKey(1);
return partitonOffset.dstream();
}

@SuppressWarnings("deprecation")
public static void persists(DStream<Tuple2<Integer, Iterable>> partitonOffset, Properties props) {
ClassTag<Tuple2<Integer, Iterable>> tuple2ClassTag =
ScalaUtil.<Integer, Iterable>getTuple2ClassTag();
JavaDStream<Tuple2<Integer, Iterable>> jpartitonOffset =
new JavaDStream<Tuple2<Integer, Iterable>>(partitonOffset, tuple2ClassTag);

//spark 2.0.0
jpartitonOffset.foreachRDD(new VoidFunction<JavaRDD<Tuple2<Integer, Iterable<Long>>>>() {
  @Override
  public void call(JavaRDD<Tuple2<Integer, Iterable<Long>>> po) throws Exception {
    List<Tuple2<Integer, Iterable<Long>>> poList = po.collect();
    Map<Integer, Long> partitionOffsetMap = new HashMap<Integer, Long>();
    for(Tuple2<Integer, Iterable<Long>> tuple : poList) {
      int partition = tuple._1();
      Long offset = getMaximum(tuple._2());
      partitionOffsetMap.put(partition, offset);
    }
    persistProcessedOffsets(props, partitionOffsetMap);
  }
  public <T extends Comparable<T>> T getMaximum(Iterable<T> values) {
    T max = null;
    for (T value : values) {
      if (max == null || max.compareTo(value) < 0) {
        max = value;
      }
    }
    return max;
  }
});

//spark 1.6.0

// jpartitonOffset.foreachRDD(new Function<JavaRDD<Tuple2<Integer, Iterable>>, Void>() {
// @OverRide
// public Void call(JavaRDD<Tuple2<Integer, Iterable>> po) throws Exception {
// List<Tuple2<Integer, Iterable>> poList = po.collect();
// Map<Integer, Long> partitionOffsetMap = new HashMap<Integer, Long>();
// for(Tuple2<Integer, Iterable> tuple : poList) {
// int partition = tuple._1();
// Long offset = getMaximum(tuple._2());
// partitionOffsetMap.put(partition, offset);
// }
// persistProcessedOffsets(props, partitionOffsetMap);
// return null;
// }
// public <T extends Comparable> T getMaximum(Iterable values) {
// T max = null;
// for (T value : values) {
// if (max == null || max.compareTo(value) < 0) {
// max = value;
// }
// }
// return max;
// }
// });
}
}
`

And then I build the project as a new jar. After that I run the "SampleConsumer.java" with my settings. It works! But the spark has many failed jobs caused by "java.lang.ClassNotFoundException: consumer.kafka.client.KafkaReceiver". So that it receives 0 data.

Error Logs:

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused by: java.io.IOException: java.lang.ClassNotFoundException: consumer.kafka.client.KafkaReceiver at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1260) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: consumer.kafka.client.KafkaReceiver at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1714) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253) ... 19 more ERROR: org.apache.spark.scheduler.TaskSetManager - Task 0 in stage 19.0 failed 4 times; aborting job ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Receiver has been stopped. Try to restart it. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 19.0 failed 4 times, most recent failure: Lost task 0.3 in stage 19.0 (TID 141, 10.100.3.90): java.io.IOException: java.lang.ClassNotFoundException: consumer.kafka.client.KafkaReceiver at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1260) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

Some jobs remain in processing state forever

Hi Dibbhatt,

My application is running normally except that the UI shows some jobs remain in processing state forever.

Following is the screenshot of the same:

screenshot from 2015-09-22 17 55 25

Regards,
Sorabh

How to use in kerberized context ?

In a context with Yarn and Kerberos where to pass this kind of options ?
`
kafkaParams.put("security.protocol", "SASL_SSL");
kafkaParams.put("ssl.truststore.location","./truststore");

kafkaParams.put("ssl.truststore.password", "pass");
`

work fine few days, then can't commit offerset, no error log, and no info log about PartitionManager

the work fine few days, then, no commit offersset in zk

the last commit success log:

16/10/18 17:18:46 INFO PartitionManager: Committed offset 4951367 for Partition{host=x.x.x.x:9092, partition=0} for consumer: 123
16/10/18 17:18:47 INFO KafkaUtils: Fetching from Kafka for partition 0 for fetchSize 1024 and bufferSize 1048576
16/10/18 17:18:47 INFO PartitionManager: Total 1 messages from Kafka: x.x.x.x:0 there in internal buffers
16/10/18 17:18:47 INFO PartitionManager: Store for topic stream.affiliate_converted_click_record_log for partition 0 is : 4951367
16/10/18 17:18:47 INFO MemoryStore: Block input-0-1476427412421 stored as values in memory (estimated size 1392.0 B, free 2.8 GB)
16/10/18 17:18:47 INFO PartitionManager: LastComitted Offset : 4951367
16/10/18 17:18:47 INFO PartitionManager: New Emitted Offset : 4951368
16/10/18 17:18:47 INFO PartitionManager: Enqueued Offset :4951367
16/10/18 17:18:47 INFO PartitionManager: Last Enqueued offset 4951367 not incremented since previous Comitted Offset 4951367 for partition  Partition{host=x.x.x.x:9092, partition=0} for Consumer 123. Some issue in Process!!
16/10/18 17:18:48 INFO KafkaUtils: Fetching from Kafka for partition 0 for fetchSize 1024 and bufferSize 1048576
...
16/10/18 17:19:00 INFO CoarseGrainedExecutorBackend: Got assigned task 302048
16/10/18 17:19:00 INFO Executor: Running task 0.0 in stage 20386.0 (TID 302048)
16/10/18 17:19:00 INFO TorrentBroadcast: Started reading broadcast variable 20386
16/10/18 17:19:00 INFO MemoryStore: Block broadcast_20386_piece0 stored as bytes in memory (estimated size 1399.0 B, free 2.8 GB)
16/10/18 17:19:00 INFO TorrentBroadcast: Reading broadcast variable 20386 took 2 ms
16/10/18 17:19:00 INFO MemoryStore: Block broadcast_20386 stored as values in memory (estimated size 2.0 KB, free 2.8 GB)
16/10/18 17:19:00 INFO BlockManager: Found block input-0-1476427412419 locally
16/10/18 17:19:00 WARN Executor: 1 block locks were not released by TID = 302048:
[input-0-1476427412419]
16/10/18 17:19:00 INFO Executor: Finished task 0.0 in stage 20386.0 (TID 302048). 2322 bytes result sent to driver
16/10/18 17:19:00 INFO CoarseGrainedExecutorBackend: Got assigned task 302049
16/10/18 17:19:00 INFO Executor: Running task 0.0 in stage 20387.0 (TID 302049)
16/10/18 17:19:00 INFO CoarseGrainedExecutorBackend: Got assigned task 302051
16/10/18 17:19:00 INFO Executor: Running task 1.0 in stage 20387.0 (TID 302051)
16/10/18 17:19:00 INFO TorrentBroadcast: Started reading broadcast variable 20387
16/10/18 17:19:00 INFO MemoryStore: Block broadcast_20387_piece0 stored as bytes in memory (estimated size 12.5 KB, free 2.8 GB)
16/10/18 17:19:00 INFO TorrentBroadcast: Reading broadcast variable 20387 took 3 ms
16/10/18 17:19:00 INFO MemoryStore: Block broadcast_20387 stored as values in memory (estimated size 35.1 KB, free 2.8 GB)
16/10/18 17:19:00 INFO BlockManager: Found block input-0-1476427412419 locally
16/10/18 17:19:00 INFO BlockManager: Found block input-0-1476427412420 locally
16/10/18 17:19:00 INFO EsDataFrameWriter: Writing to [spark-aflt-data-test-2016-10-18/sparktest]
16/10/18 17:19:00 INFO EsDataFrameWriter: Writing to [spark-aflt-data-test-2016-10-18/sparktest]

then the log allways:

16/10/18 17:19:32 INFO KafkaUtils: Fetching from Kafka for partition 0 for fetchSize 1024 and bufferSize 1048576
16/10/18 17:19:33 INFO ZkCoordinator: Refreshing partition manager connections
16/10/18 17:19:33 INFO DynamicBrokersReader: Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=x.x.x.x:9092, 1=x.x.x.x:9092}}
16/10/18 17:19:33 INFO ZkCoordinator: Added partition index 0 for coordinator
16/10/18 17:19:33 INFO ZkCoordinator: Deleted partition managers: []
16/10/18 17:19:33 INFO ZkCoordinator: New partition managers: []
16/10/18 17:19:33 INFO ZkState: Starting curator service
16/10/18 17:19:33 INFO CuratorFrameworkImpl: Starting
16/10/18 17:19:33 INFO ZooKeeper: Initiating client connection, connectString=x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181 sessionTimeout=120000 watcher=org.apache.curator.ConnectionState@44bae644
16/10/18 17:19:33 INFO ClientCnxn: Opening socket connection to server x.x.x.x/x.x.x.x:2181. Will not attempt to authenticate using SASL (unknown error)
16/10/18 17:19:33 INFO ClientCnxn: Socket connection established to x.x.x.x/x.x.x.x:2181, initiating session
16/10/18 17:19:33 INFO ClientCnxn: Session establishment complete on server x.x.x.x/x.x.x.x:2181, sessionid = 0x2535a6cba5348e6, negotiated timeout = 120000
16/10/18 17:19:33 INFO ConnectionStateManager: State change: CONNECTED
16/10/18 17:19:33 INFO ZkCoordinator: Modified Fetch Rate for topic stream.affiliate_converted_click_record_log to : 1024
16/10/18 17:19:33 INFO ZooKeeper: Session: 0x2535a6cba5348e6 closed
16/10/18 17:19:33 INFO ClientCnxn: EventThread shut down
16/10/18 17:19:33 INFO ZkCoordinator: Finished refreshing
16/10/18 17:19:33 INFO KafkaUtils: Fetching from Kafka for partition 0 for fetchSize 1024 and bufferSize 1048576

spark: 2.10_2.0.0

kafka-spark-consumer:1.0.6

kafka:0.8.2.2

Why appear this exception information?

kafka-spark-consumer. v1.0.10

INFO ZooKeeper: Session: 0x35d4c588af300df closed
INFO ClientCnxn: EventThread shut down
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at consumer.kafka.KafkaConsumer.run(KafkaConsumer.java:123)
at java.lang.Thread.run(Thread.java:745)
WARN NettyRpcEndpointRef: Error sending message [message = DeregisterReceiver(2,Stopped by driver,)] in 1 attempts
org.apache.spark.SparkException: Exception thrown in awaitResult
at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)

Static zookeeper hosts configuration

The current configuration for zookeeper.hosts doesn't allow url paths to connect to different zookeepers.

My zookeeper instance is running under myhost:2181/kafka

Map(
  "zookeeper.hosts" -> "myhost",
  "zookeeper.port" -> "2181"
)

will connect to the wrong zookeeper. The easiest way arround this would be a zookeeper.connections setting.

Map(
  "zookeeper.connections" -> "myhost:2181/kafka"
)

which we can prefere over the host/port combination. WDYT?

High CPU usage

Hi Dibyendu,

While the low-level Kafka consumer works fine, it also seems to require a suspiciously high amount of CPU cycles, even on a beefy, bare-metal machine.

With a single consumer reading from a single partition, CPU usage on the machine running the consumer goes up to 25% to read 4 Mo/s.

After bumping kafka.partitions.number to 50, still with a single consumer, and with the same data pushed to a single partition at a 4 Mo/s rate, CPU usage jumps to 100% and never goes down.

Tweaking consumer.fetchsizebytes and consumer.fillfreqms didn't make a difference.

Is the Kafka consumer supposed to be that slow? If this is not the case, how may I help diagnose what is going on?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.