Giter Site home page Giter Site logo

ClassCastException in pipeline: Kafka Streams Function -> Kafka Function -> Kafka Streams Consumer about spring-cloud-stream-binder-kafka HOT 5 CLOSED

aliaksandr-pleski avatar aliaksandr-pleski commented on August 28, 2024
ClassCastException in pipeline: Kafka Streams Function -> Kafka Function -> Kafka Streams Consumer

from spring-cloud-stream-binder-kafka.

Comments (5)

sobychacko avatar sobychacko commented on August 28, 2024 1

@aliaksandr-pleski Thanks for this report. After looking at this issue a bit further, I think we can resolve it in a couple of ways.

The reason you are getting this error is due to the fact that __typeid__ header is not set properly by your function(). This is supposed to be set by the JsonSerializer in Spring Kafka. Your streamFunction function is a Kafka Streams function and therefore by default it uses the JsonSerializer and adds the __typdid__ header as ClassB on the outbound. This is then passed down to function's input, but function is based on regular Kafka binder and it uses message conversion behind the scenes (on both input and output) and does not use JsonSerializer (or native Kafka serialization). When function publishes the data (as type ClassC), it doesn't properly update the type id header (since no JsonSerializer is involved). It simply passes down the same __typeid__ value it received on the input (which is ClassB) and when it reaches the next function (consumer1), you run into that ClassCastException you are getting.

You can get around to this problem in two ways. The first one is pretty trivial while the second one needs some configuration changes.

  1. you can add the __typeid__ header programmatically when sending the messages from the function method. Below is an example.
@bean
public Function<ClassB, Message<ClassC>> function() {
          return classB -> {
		final Map<String, Object> headers = new HashMap<>();
		headers.put(KafkaHeaders.MESSAGE_KEY, classB.b);
		headers.put(DefaultJackson2JavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, ClassC.class.getCanonicalName());
		return MessageBuilder.createMessage(
			new ClassC(Integer.parseInt(classB.b)),
			new MessageHeaders(headers)
		);
	};
}

Note the explicit addition of DEFAULT_CLASSID_FIELD_NAME header.

  1. The other option is to force the function method to use native encoding on the outbound so that it uses Kafka's serialization using the JsonSerializer from Spring Kafka. This way, the proper typeid header is added when the record is published. Here are the relevant configuration changes needed.
spring:
  cloud:
    function:
      definition: supplier;streamFunction;function;consumer1
    stream:
      kafka:
        bindings:
          function-out-0:
            producer:
              configuration:
                value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
        ....
      bindings:
        ...
        function-in-0:
          binder: kafka
          destination: scs-cc-streamFunction
        function-out-0:
          binder: kafka
          destination: scs-cc-consumer
          producer:
            useNativeEncoding: true
        ....
        ....

I verified that both solutions work. Please let us know if they work on your end with these solutions.

from spring-cloud-stream-binder-kafka.

sobychacko avatar sobychacko commented on August 28, 2024 1

Since the regular binder uses message conversion by default from Spring, it doesn't go through the native Kafka serializer, and that code for specifically adding that header is part of the JsonSerializer class. Therefore, this is the expected behavior.

from spring-cloud-stream-binder-kafka.

aliaksandr-pleski avatar aliaksandr-pleski commented on August 28, 2024

@sobychacko thanks for the quick response!

Both options are working for me. I personally find option 2 more preferable.

The fact that the regular Kafka function doesn't properly update __typeid__ header and just proxies existing one - is that the expected behavior?

from spring-cloud-stream-binder-kafka.

sobychacko avatar sobychacko commented on August 28, 2024

Closing the issue now, feel free to re-open if you find something else related to this.

from spring-cloud-stream-binder-kafka.

aliaksandr-pleski avatar aliaksandr-pleski commented on August 28, 2024

Got it, thanks!

from spring-cloud-stream-binder-kafka.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.