Comments (12)
This I haven't seen earlier. Have you able to solve this or still having same issue ?
To fix the ProcessedOffsetManager issue with Spark 2.0, there is a pull request you can refer to
https://github.com/dibbhatt/kafka-spark-consumer/pull/32/files
I haven't merge this as it will break for Spark 1.6. But you can try this changes and let me know if that works for you.
I need to find a way to handle this for both Spark 2.0 and earlier version of Spark.
from kafka-spark-consumer.
Hi~
I make the same changes referring to that pull request.
But it also has the same problems that the program can't deserialize the data model.
In detail, my producer program is:
`
String topicName = "TEST-TOPIC";
Properties props = new Properties();
props.put("bootstrap.servers", "x.x.x.x.88:9092, x.x.x.89:9092, x.x.x.90:9092");
props.put("producer.type", "sync");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)));
System.out.println("Message sent successfully");
producer.close();
`
I have test by run the ./kafka-consumer-console.sh. It can consume the messages from producer.
As I run the SampleConsumer for test, it can't resolve the messages.
In SampleConsumer, I used the following code to test
'
unionStreams.foreachRDD(new VoidFunction<JavaRDD>() {
@OverRide
public void call(JavaRDD rdd) throws Exception {
List rddList = rdd.collect();
System.out.println(" Number of records in this batch " + rddList.size());
for (MessageAndMetadata model : rddList)
System.out.println("model key = " + new String(model.getKey(), "UTF-8") + ", value = " + new String(model.getPayload(), "UTF-8"));
}
});
'
The result is : "Number of records in this batch 0" =====> rddList.size() = 0
I wonder if compatibility problems cause unable to deserialize the data.
Looking forward to your answer, thanks~
from kafka-spark-consumer.
This consumer by default consume offset from Latest kafka offset when it starts first time. For any successive run, it will consume from last processed offset. . So, if you publish say 100 messages to kafka, latest offset in Kafka is 101. When you start the consumer first time , it starts from offset 101. If your publisher is not running, and Kafka not getting any messages, Consumer wont pull anything. I guess that is happening here.
you can start the Consumer first, and then start the producer. As Consumer waiting for new messages from latest offset, any new messages got produced will be consumed.
from kafka-spark-consumer.
Yeah, I'm sure that I start the Consumer first. As the Consumer run successfully, I start the Producer to publish ten messages per time. And I try it several times, but the rddList's size is always zero.
When I run the SampleConsumer by the InteliJ IDEA in my Spark cluster master node, the logs show the error "java.lang.ClassNotFoundException: consumer.kafka.client.KafkaReceiver ". The error cause spark job failed. I guess that may be the reason why the DStream can't resolve the messages.
from kafka-spark-consumer.
Your topic has how many partitions ? Can you share the Receiver Property you used. You can share the log of the Executor also when you do the spark-submit
from kafka-spark-consumer.
3 partitions
the Consumer property is default.
the Broker property I will share it next Monday.
the whole logs of the Executor when I do the spark-submit are showed in my first comment of this issue.
If it is not clear, I will share all logs again.
thanks
from kafka-spark-consumer.
It works!!! Thank you so much.
from kafka-spark-consumer.
Good. What was the issue ?
from kafka-spark-consumer.
Hi,
I started my spark with package dibbhatt:kafka-spark-consumer:1.0.6 , and try to execute
scala> import consumer.kafka.ProcessedOffsetManager;
it is throwing me error: error: object ProcessedOffsetManager is not a member of package consumer.kafka
from kafka-spark-consumer.
according to the above discussion it seems to have faced the same issue and solved in spark 2.0 version, just would like to know whether the same fix been handled for earlier versions also.
from kafka-spark-consumer.
You can git clone the repo and build it with Spark 1.6. You may need to do some minor changes to make it work with Spark 1.6. Its very easy to do. Do let me know if you need any help.
from kafka-spark-consumer.
hi @vroy007 , if you do not mind, can you tell me name of your organization also if this consumer is running in production in your organization. I need it just for my reference.
from kafka-spark-consumer.
Related Issues (20)
- Why appear this exception information? HOT 8
- Not working with Spark 2.2.0 HOT 11
- How to use in kerberized context ? HOT 3
- AbstractMethodError with Spark 1.6.0 and Kafka 0.10.2 HOT 9
- Exception: Could not compute split, block not found HOT 6
- Hello, Compilation failed after changing Kafka version 0.10.0.0 HOT 5
- Kafka Headers Support HOT 7
- After long time running, the processing time of "ProcessedOffsetManager.persists(partitonOffset_stream, props)" incresing. HOT 13
- How to recover the failed receiver on a partition which has exception of " Offsets out of range with no configured reset policy for partitions:" HOT 18
- May I have a Scala sample of messageHandler to filter out some playload which includes some strings? HOT 20
- The Spark Streaming can not read kafka message HOT 8
- It works well in local model,but when I submit it in cluster model,the fixed rate is too small HOT 5
- Does this support spark structured streaming HOT 1
- java.lang.NoClassDefFoundError: kafka/api/OffsetRequest HOT 22
- Offset is still updated when exception occurs during processing HOT 28
- Manipulation of offsetRanges in each batch
- example doesn't build HOT 1
- Trying to fetch Multi topic In Local , But It is showing warning like this HOT 2
- Can a higher version of kafka be supported HOT 2
- Can a higher version of Spark be supported? Spark 3.2.0 for example. HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kafka-spark-consumer.