nc-kafka-fundamentals's Issues
Gérer les poison-pills
Hello,
J'ai suivi ce workshop au Devfest Nantes et je reviens dessus dans le cadre d'un nouveau projet.
Il s'agit de consommer un topic simple avec :
- l'annotation @KafkaListener
- un schema Avro avec des pojo générés par maven
- une deadletter pour l'ensemble des erreurs, y compris les poison pills
Il semble que dans le cas de ce workshop les poison pills ne sont pas gérées et provoque un retry infini.
Faut-il les gérer ? Comment ?
J'ai commencer à travailler sur le sujet à partir de cette source : https://www.confluent.io/fr-fr/blog/spring-kafka-can-your-kafka-consumers-handle-a-poison-pill/
Le problème est que le ErrorHandlingDeserializer
essai de poster un byte[] sur le DLT alors que l'exception fonctionnelle essai de poster du Avro sur le DLT.
Que mettre dans spring.kafka.producer.value-serializer
? ByteArraySerializer
ou KafkaAvroSerializer
?
J'ai réussi à obtenir quelque chose qui fonctionne en créant 2 producerFactorty et 2 KafkaTemplate.
C'est tout de même un peu compliqué pour un besoin assez basic.
Vous avez peut-être déjà réfléchi à la question. Je vous soumet donc cette solution pour avis/commentaire.
application:
subscribers:
topic: abc
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
auto-offset-reset: earliest
group-id: abc
producer:
client-id: abc
properties:
spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
specific:
avro:
# By default, each record is deserialized into an Avro GenericRecord, but in this tutorial the record should be deserialized using the application’s code-generated PositionValue class.
# Therefore, configure the deserializer to use Avro SpecificRecord, i.e., SPECIFIC_AVRO_READER_CONFIG should be set to true.
reader: true
schema:
registry:
url: http://localhost:9081
avro-producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
byte-producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
@Configuration
@ConfigurationProperties(prefix = "spring.kafka.byte-producer")
@RequiredArgsConstructor
public class ByteProducerConfig extends KafkaProperties.Producer {
private final KafkaProperties common;
@Qualifier("byteProducerFactory")
public ProducerFactory<?, ?> producerFactory() {
final var conf = new HashMap<>(
this.common.buildProducerProperties()
);
conf.putAll(this.buildProperties());
return new DefaultKafkaProducerFactory<>(conf);
}
@Qualifier("byteKafkaTemplate")
@Bean
public KafkaTemplate<?, ?> kafkaTemplate() {
return new KafkaTemplate<>(this.producerFactory());
}
}
@Configuration
@ConfigurationProperties(prefix = "spring.kafka.avro-producer")
@RequiredArgsConstructor
public class AvroProducerConfig extends KafkaProperties.Producer {
private final KafkaProperties common;
@Bean("avroProducerFactory")
public ProducerFactory<?, ?> producerFactory() {
final var conf = new HashMap<>(
this.common.buildProducerProperties()
);
conf.putAll(this.buildProperties());
return new DefaultKafkaProducerFactory<>(conf);
}
@Bean("avroKafkaTemplate")
public KafkaTemplate<?, ?> kafkaTemplate() {
return new KafkaTemplate<>(this.producerFactory());
}
}
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${application.subscribers.topic}")
private String topic;
@Bean
NewTopic schemaAvroTopic() {
return TopicBuilder.name(topic).partitions(1).replicas(1).build();
}
// Create DLT topic
// Default behavior is to send the record to a topic named <originalTopic>.DLT (the original topic name suffixed with .DLT)
@Bean
public NewTopic dlt() {
return TopicBuilder.name(topic + ".DLT").partitions(1).replicas(1).build();
}
// Boot will autowire this into the container factory.
// An error handler that seeks to the current offset for each topic in the remaining records.
// Used to rewind partitions after a message failure so that it can be replayed.
@Bean
public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new DefaultErrorHandler(deadLetterPublishingRecoverer, new FixedBackOff(5000L, 2)
);
}
@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(@Qualifier("byteKafkaTemplate") KafkaTemplate<?, ?> kafkaTemplateByte,
@Qualifier("avroKafkaTemplate") KafkaTemplate<?, ?> kafkaTemplateObject) {
Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates = new LinkedHashMap<>();
templates.put(byte[].class, kafkaTemplateByte);
templates.put(Object.class, kafkaTemplateObject);
return new DeadLetterPublishingRecoverer(templates);
}
}
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.