Giter Site home page Giter Site logo

nc-kafka-fundamentals's Introduction

TZ Kafka fondamentaux

La Technozaure (TZ) est un format d'évènement interne de conférences à Zenika. Les talks sont organisés par des Zénikéens pour des Zénikéens.

Chaque agence organise sa propre Technozaure.

L'idée de la technozaure est de nous rassembler sur différents sujets : des présentations techniques, des REX client, des livecodings et des ateliers…

La NightClazz est un meetup organisé par Zenika.

Build the doc

Based on https://github.com/vgallet/workshop-monitoring-kafka
Thanks to Victor Gallet, Ilyès Semlali, Raphaël Ruelle

NVM Install - https://github.com/nvm-sh/nvm

curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.38.0/install.sh | bash
# Install node lts
nvm install --lts

Run locally

npm install -d
npm run serve

nc-kafka-fundamentals's People

Contributors

awattez avatar malo-t avatar mboix avatar

Stargazers

 avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

nc-kafka-fundamentals's Issues

Gérer les poison-pills

Hello,

J'ai suivi ce workshop au Devfest Nantes et je reviens dessus dans le cadre d'un nouveau projet.
Il s'agit de consommer un topic simple avec :

  • l'annotation @KafkaListener
  • un schema Avro avec des pojo générés par maven
  • une deadletter pour l'ensemble des erreurs, y compris les poison pills

Il semble que dans le cas de ce workshop les poison pills ne sont pas gérées et provoque un retry infini.
Faut-il les gérer ? Comment ?

J'ai commencer à travailler sur le sujet à partir de cette source : https://www.confluent.io/fr-fr/blog/spring-kafka-can-your-kafka-consumers-handle-a-poison-pill/

Le problème est que le ErrorHandlingDeserializer essai de poster un byte[] sur le DLT alors que l'exception fonctionnelle essai de poster du Avro sur le DLT.
Que mettre dans spring.kafka.producer.value-serializer ? ByteArraySerializer ou KafkaAvroSerializer ?

J'ai réussi à obtenir quelque chose qui fonctionne en créant 2 producerFactorty et 2 KafkaTemplate.
C'est tout de même un peu compliqué pour un besoin assez basic.
Vous avez peut-être déjà réfléchi à la question. Je vous soumet donc cette solution pour avis/commentaire.

application:
  subscribers:
    topic: abc

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      auto-offset-reset: earliest
      group-id: abc
    producer:
      client-id: abc
    properties:
      spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
      specific:
        avro:
          # By default, each record is deserialized into an Avro GenericRecord, but in this tutorial the record should be deserialized using the application’s code-generated PositionValue class.
          # Therefore, configure the deserializer to use Avro SpecificRecord, i.e., SPECIFIC_AVRO_READER_CONFIG should be set to true.
          reader: true
      schema:
        registry:
          url: http://localhost:9081
    avro-producer:
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    byte-producer:
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
@Configuration
@ConfigurationProperties(prefix = "spring.kafka.byte-producer")
@RequiredArgsConstructor
public class ByteProducerConfig extends KafkaProperties.Producer {

    private final KafkaProperties common;

    @Qualifier("byteProducerFactory")
    public ProducerFactory<?, ?> producerFactory() {
        final var conf = new HashMap<>(
                this.common.buildProducerProperties()
        );
        conf.putAll(this.buildProperties());
        return new DefaultKafkaProducerFactory<>(conf);

    }

    @Qualifier("byteKafkaTemplate")
    @Bean
    public KafkaTemplate<?, ?> kafkaTemplate() {
        return new KafkaTemplate<>(this.producerFactory());

    }
}
@Configuration
@ConfigurationProperties(prefix = "spring.kafka.avro-producer")
@RequiredArgsConstructor
public class AvroProducerConfig extends KafkaProperties.Producer {

    private final KafkaProperties common;

    @Bean("avroProducerFactory")
    public ProducerFactory<?, ?> producerFactory() {
        final var conf = new HashMap<>(
                this.common.buildProducerProperties()
        );
        conf.putAll(this.buildProperties());
        return new DefaultKafkaProducerFactory<>(conf);

    }

    @Bean("avroKafkaTemplate")
    public KafkaTemplate<?, ?> kafkaTemplate() {
        return new KafkaTemplate<>(this.producerFactory());

    }
}
@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${application.subscribers.topic}")
    private String topic;

    @Bean
    NewTopic schemaAvroTopic() {
        return TopicBuilder.name(topic).partitions(1).replicas(1).build();
    }

    // Create  DLT topic
    // Default behavior is to send the record to a topic named <originalTopic>.DLT (the original topic name suffixed with .DLT)
    @Bean
    public NewTopic dlt() {
        return TopicBuilder.name(topic + ".DLT").partitions(1).replicas(1).build();
    }

    // Boot will autowire this into the container factory.
    // An error handler that seeks to the current offset for each topic in the remaining records.
    // Used to rewind partitions after a message failure so that it can be replayed.
    @Bean
    public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
        return new DefaultErrorHandler(deadLetterPublishingRecoverer, new FixedBackOff(5000L, 2)
        );
    }

    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(@Qualifier("byteKafkaTemplate") KafkaTemplate<?, ?> kafkaTemplateByte,
                                                                       @Qualifier("avroKafkaTemplate") KafkaTemplate<?, ?> kafkaTemplateObject) {
        Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates = new LinkedHashMap<>();
        templates.put(byte[].class, kafkaTemplateByte);
        templates.put(Object.class, kafkaTemplateObject);
        return new DeadLetterPublishingRecoverer(templates);
    }
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.