Giter Site home page Giter Site logo

bbejeck / kafka-streams-in-action Goto Github PK

View Code? Open in Web Editor NEW
260.0 16.0 179.0 180.69 MB

Source code for the Kafka Streams in Action Book

Home Page: https://www.manning.com/bejeck

License: Apache License 2.0

Shell 0.23% Java 93.61% HTML 0.99% JavaScript 5.17%
kafka kafkastreams streaming stream-processing streaming-data

kafka-streams-in-action's People

Contributors

bbejeck avatar chang-chao avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-streams-in-action's Issues

Why KTable is emitting on each update ?

I have tried to simplify the example of KStreamVsKTable down to my problem, which is that KTable is emitting on each update, instead of the latest updates only.

Please see code below (in Scala):

object SimpleTable extends App {
  val topic = "simple-table"
  
  val prodProps = new Properties()
  prodProps.put("bootstrap.servers", "localhost:9092")
  prodProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  prodProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  prodProps.put("acks", "1")
  prodProps.put("retries", "3")

  val producer = new KafkaProducer[String, String](prodProps)

  producer.send(new ProducerRecord[String, String](topic, "key1", "value1"))
  producer.send(new ProducerRecord[String, String](topic, "key2", "value2"))
  producer.send(new ProducerRecord[String, String](topic, "key3", "value3"))
  producer.send(new ProducerRecord[String, String](topic, "key1", "value11"))
  producer.send(new ProducerRecord[String, String](topic, "key2", "value22"))
  producer.send(new ProducerRecord[String, String](topic, "key3", "value33"))

  producer.close()


  val streamProps = new Properties()
  streamProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-table-app1")
  streamProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  //streamProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group11")
  //streamProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "client11")
  //streamProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
  //streamProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "18000")
  //streamProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "18000")
  //streamProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "10485760")
  //streamProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1")
  //streamProps.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "10000")
  //streamProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1)
  //streamProps.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[WallclockTimestampExtractor])

  import org.apache.kafka.streams.scala.Serdes._
  implicit val consumeSerdes: Consumed[String, String] = Consumed.`with`[String, String]
  val builder = new StreamsBuilder()

  val simpleTable: KTable[String, String] = builder.table[String, String](topic)
  simpleTable.toStream.print(Printed.toSysOut[String, String].withLabel("simple-table"))


  val streams = new KafkaStreams(builder.build(), streamProps)
  streams.start()
  Thread.sleep(10000)
  streams.close()
}

This App is displaying this:

[simple-table]: key1, value1
[simple-table]: key2, value2
[simple-table]: key3, value3
[simple-table]: key1, value11
[simple-table]: key2, value22
[simple-table]: key3, value33

I am stuck on the book since almost one week now because of that. I am supposed to have only the latest 3 lines. The same thing is happening when I implement the example KStreamVsKTable from the book.

Please help.

Examples not working if running with a locale different from en

I'm running examples on a machine with italian locale and examples don't work. In particular, MockDataProducer is not able to generate data, due to the generation with a different decimal separator. This is the stacktrace:

java.lang.NumberFormatException: For input string: "262,00"
	at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043)
	at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
	at java.lang.Double.parseDouble(Double.java:538)
	at bbejeck.util.datagen.DataGenerator.generatePurchases(DataGenerator.java:96)
	at bbejeck.clients.producer.MockDataProducer.lambda$producePurchaseData$0(MockDataProducer.java:68)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Also it has been very hard to debug since that code is wrapped in a task run with ExecutorService and exception is not thrown.
The solution anyway is to add the following JVM parameters: -Duser.country=US -Duser.language=en

Ch. 4. Generating a key twice

In 4.4.2 Generating keys containing customer IDs to perform joins you stated we need to generate keys for values. As I can see from the code, you do this twice. The first time is when we mask CC number and the second one when we do the generation itself. This is a bit confusing. Have I missed something?

Shares Aggregation problem (Chapter 5)

First of all - thank you for this amazing book!

I've run in problem when running the example of transforming "Stock Transactions" into "Share Volume". With the example code from Github, it compiles, but fails with the following exception:

Exception in thread "Kafka-Streams-Tables-client-SHARES-StreamThread-1" java.lang.ClassCastException: com.google.gson.internal.LinkedTreeMap cannot be cast to java.lang.Comparable
	at java.util.TreeMap.compare(TreeMap.java:1294)
	at java.util.TreeMap.put(TreeMap.java:538)
	at java.util.TreeSet.add(TreeSet.java:255)
	at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:83)
	at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:61)
	at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$1.read(ReflectiveTypeAdapterFactory.java:131)
	at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:222)
	at com.google.gson.Gson.fromJson(Gson.java:927)
	at com.google.gson.Gson.fromJson(Gson.java:892)
	at com.google.gson.Gson.fromJson(Gson.java:841)
	at com.google.gson.Gson.fromJson(Gson.java:813)
	at com.example.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:33)
	at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:159)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:245)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:137)
	at org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:78)
	at org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:56)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:351)
	at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)

My application is configurred with following properties:

Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "Kafka-Streams-Tables-client-SHARES");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-streams-tables-group-SHARES");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "Kafka-Streams-Tables-App-SHARES");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

Whrn I tried running the debugger around the problem, it looks like it struggles to decode FixedSizePriorityQueue (or, rather, its internal TreeSet).

Is there anything you could recommend for me to try solving the issue?

Thank you for your help and great contributions!

Scala question and where do I get JAR for EmbeddedKafkaCluster

Firstly great book, thanks a million for it. Great

I hope you don't mind but just got a few questions, happy to do this some other way, if you want just let me know.

However if you are ok to help with 2 small questions.

  1. You use EmbeddedKafkaCluster, but what JAR would I need to get this in a IntelliJ IDEA SBT project. Got a link to what artifact it is in Maven Central. I could not tell from your Maven file which one it was

  2. I can see that there was a whole bunch of stuff brought into 2.0.0 (which I am using) to make working with Serdes easier from Scala. https://github.com/apache/kafka/tree/2.0/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala

This is based on the https://github.com/lightbend/kafka-streams-scala repo, that has now been folded into official apache/kafka codebase.

My question is, why don't I see these scala types if I am using the 2.0.0 artifact

I raised a question here too : https://groups.google.com/forum/?utm_medium=email&utm_source=footer#!topic/confluent-platform/f_umV-B6Spw

Its just being the author of the ONLY book on this subject, and if I am not mistaken you work for Confluent too right, so I thought if anyone knows it will be you.

I hope you dont mind direct question here, im really struggling to get this working

ZMartKafkaStreamsAddStateApp.java does not process Purchase records

ZMartKafkaStreamsAddStateApp app (in Chapter 4 directory) does not work with latest version of Kafka Streams. I pretty much copied the code in github - https://github.com/bbejeck/kafka-streams-in-action/blob/master/src/main/java/bbejeck/chapter_4/ZMartKafkaStreamsAddStateApp.java . When I run the code as is, no data is flowing through the topology. I verified that mock data is being generated, as I can see the offset in the input topic - transactions go up. However, nothing in the output topics, and nothing is printed to console.

However, when I comment line 81-88 (https://github.com/bbejeck/kafka-streams-in-action/blob/master/src/main/java/bbejeck/chapter_4/ZMartKafkaStreamsAddStateApp.java#L81-L88), basically avoid creating the "through()" processor node, and the code works. After commending, I see data being generated to the "patterns" topics, and output generate in console.

If there is no plans to address/fix the issue, would appreciate any pointers so I can get the code working on my setup. Thank you.

java.lang.ClassCastException

Hello

I try to run SimpleProducer.java at Chapter 2
I got below exception

Caused by: java.lang.ClassCastException: class bbejeck.model.PurchaseKey cannot be cast to class java.lang.String (bbejeck.model.PurchaseKey is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')

need to be fixed

Java version 11
Kafka version : kafka_2.13-2.7.0

Another question for you

I noticed you happen to work on a PR for this issue at kafka main branch

apache/kafka#4702

If I call topologyTestDriver

I get All sort of NIo exceptions about directory not being empty

If I just don't protect the call to the topologyTestDriver.close in try catch and also delete state directory myself all seems ok

Any idea if this is what I should be doing. Or should that jimust work in 2.1.0

I'm using Windows for dev machine

Sorry to ask another question of you

ReadMe organization

The README.md would be much more informative / easier to follow if it was re-organized a bit. Even just moving everything from Requirements until the end so that all of that content is above Instructions for Chapter 9 Examples would make things a lot nicer. I'd be happy to throw together a quick PR for you to review but I don't have the rights to create a branch.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.