michelin / kstreamplify Goto Github PK
View Code? Open in Web Editor NEWKstreamplify is a Java library that brings new features on top of Kafka Streams.
License: Apache License 2.0
Kstreamplify is a Java library that brings new features on top of Kafka Streams.
License: Apache License 2.0
Today when wanting to define the DLQ for tests, we have to use the DLQ constant defined in The KafkaStreamsStarteTest which isn't documented anywhere and doesn't make much sense.
When using TopicWithSerde, it's even less understandable, as the syntax for defining the Dlq outputTestTopic is completely different:
@BeforeEach
void setUp() {
infofacCanonicalInputTopic = createInputTestTopic(InfoFacTopicWithSerde.infoFacAllCanonical());
infofacAvroOutputTopic = createOutputTestTopic(InfoFacTopicWithSerde.infoFacAllAvro());
dlqTopic = testDriver.createOutputTopic(DLQ_TOPIC, new StringDeserializer(), SerdesUtils.<KafkaError>getSerdesForValue().deserializer());
}
Also, the topology method feels wrong, as it's hard to understand what is required to override.
@Override
protected void topology(StreamsBuilder streamsBuilder) {
new InfoFacCanonicalToAvroStream().topology(streamsBuilder);
}
The dlqTopic is a core feature of the Kstreamplify library.
Defining the dlq is a redundant operation that will always happen, and getting the correct DLQ name can be obtained from the dlqTopic method in KafkaStreamStarter.
So in order to improve both aforementionned concepts, the required overridable topology() method can be deleted, and instead we should be able to write this in tests:
@Override
protected KafkaStreamsStarter getKafkaStreamsStarter() {
return new InfoFacCanonicalToAvroStream();
}
@BeforeEach
void setUp() {
infofacCanonicalInputTopic = createInputTestTopic(InfoFacTopicWithSerde.infoFacAllCanonical());
infofacAvroOutputTopic = createOutputTestTopic(InfoFacTopicWithSerde.infoFacAllAvro());
}
The KafkaStreamsStarterTest generalSetup method can then get the dlq name from getKafkaStreamsStarter().dlq()
please note that the generalSetUp method probably has to be re-organised as calling the dlq() method before The StreamExecutionContext is fully set will lead to issues
Some production error or deserialization error can be retriable.
The production error handler and the deserialization error handler should handle these errors.
Documentation on interactive queries does not showcase an actual implementation of an interactive query, leaving the users scratching their heads for it.
A detailled explanation on how to interact with a store in the README, at least mentioning the autowiring of the KafkaStreamsInitializer class to retrieve the kafkaStreams instance
A link to a side project (?) showcasing the feature
Integrating @loicgreffier 's work on the subject for internal showcases to Kstreamplify is probably a very good addition to the library.
Code convention are not always respected (e.g.: this.
, static imports, ...). Overall code should be enhanced
The JsonToAvroConverter
Java utility class is currently unable to parse an AVRO field of type "int" and logical type "date" (corresponding to the LocalDate
Java type).
Indeed, in the populateFieldInRecordWithCorrespondingType
Java method at line 214, there is no check on the AVRO field logical type :
The corresponding Java field (of type LocalDate
) should be set with the right value.
Some AVRO schemas used at Michelin contain fields of type "int" and logical type "date" (corresponding to the LocalDate
Java type).
When running with the Spring Boot dependency, the Kafka Streams properties are not injected
The bridge between the core properties and the Spring Boot properties seems to be missing:
@Override
protected void initProperties() {
// Set properties here
}
Lifecycle of Kstreamplify is not explained in doc and should be.
-> Explain hooks and how / what to use them for
AppName suggests it could be linked to the applicationId, which is not the case.
Make it clearer that the property mentionned here is the property "key" under topic.prefix for a specific ns4kfk namespace.
Potentially, we might also want to add another constructor without the AppName so that it does not get confusing for people that would not be using ns4kfk.
Lot of improvements have been brought to the main
branch since the previous beta version spring-kafka-streams
.
It should be merged to the 2.7.x
The current prefixing strategy is as such:
kafka.properties.prefix is used for application prefixing.
kafka.properties.topic.prefix is used for topic prefixing, as follows:
kafka:
properties:
prefix: "myNsPrefix."
application.id: "my-app"
topic:
prefix:
ns: "myNsPrefix."
ns1: "otherNsPrefix1."
ns2: "otherNsPrefix2."
At properties registration in KafkaStreamExecutionContext, the application.id property is prepended with the prefix value.
kafka.properties.topic.prefix are used to determine the namespace prefixing that needs to be applied to each TopicWithSerde.
kafka:
properties:
application.id: "my-app"
prefix:
self: "myNsPrefix."
ns1: "otherNsPrefix1."
ns2: "otherNsPrefix2."
The prefix.self property becomes the default for application prefixing and topics that belong to the same namespace.
As well as editing the way that the KafkaStreamExecutionContext retrieves the prefix for prepending, a method should be added to get the self prefix, in case topic definition is not done through TopicWithSerde.
TopicWithSerde should be edited so that a default constructor with 3 parameters (topicName, keySerde, valueSerde) is created. It would call the 4 parameter constructor with the "self" property key
Many Kafka Streams users are more at easy using JSON instead of AVRO, I would like to be able to use KStreamplify as easily with JSON as it's easy to use with AVRO.
I would like to rename (or deprecate) SerdesUtils in favor of AvroSerdesUtils (since the library has no Major release yet !) and to create a JsonSerdesUtils to easily get a generic Serdes for Avro or JSON in the same transparent & generic way.
A clear and concise description of any alternative solutions or features you've considered.
A pull request will follow.
This Maven module should be merged, somehow, with the other modules
In the testing section, the animation shows the old way of defining the topology inside the test class
We should update the animation with the new "getKafkaStreamsStarter" method and mention the dlqTopic default as well
Javadoc is not always well respected, and sometimes not implemented. It should be improved
The Javadoc of the first beta version of the library is currently automatically deployed here: https://javadoc.io/doc/io.github.michelin/spring-kafka-streams/latest/index.html
A badge to the Javadoc should be added to the README.md. Javadoc.io provides this possibility: https://javadoc.io/
Can I utilize custom exception handler classes for default.production.exception.handler
and default.deserialization.exception.handler
, as well as UncaughtExceptionHandler? Despite my attempts, the exceptions are not reaching my handlers; rather, they are being handled by the internal Kafka Streams task. Additionally, it's worth noting that the exceptions reach my handlers successfully when not using kstreamplify.
Is this the expected behavior? If so, can it be enhanced to provide this flexibility of bringing your own exception handlers?
Deduplication utils need to be improved:
When doing some history replay or when a huge amount of data is present, having a scheduled task with set frequency at one hour does not cover all use cases.
It would be better to use a window store like the dedupKayValues method for all implems
KStreamplify reads all configurations from the Spring Boot context and exposes those under the kafka.properties prefix in a static global object, KafkaStreamsExecutionContext.
protected void initProperties() { properties = PropertiesUtils.loadProperties(); serverPort = (Integer) properties.get(SERVER_PORT_PROPERTY); kafkaProperties = PropertiesUtils.loadKafkaProperties(properties); KafkaStreamsExecutionContext.registerProperties(kafkaProperties); }
I would like to be able to access all my properties through this object to manage some business or technical aspects that are not directly related to Kafka from my external configuration. Typically, a Kafka Streams application developed with KStreamplify uses Processor or Transformer classes, which are not directly managed by the Spring Application Context.
Possible alternatives for implementation include:
the existing method WindowStoreUtils.get is not enough, because it only takes Instant.now(), which is bad practice.
We should have an additionnal method with a parameter for the initial instant form which we want to do the backwardFetch
Add the possibility to override the name of a topic from the application properties
In the case of Springboot implementation, the Kafka properties are empty
Improve the whole unit tests coverage.
Consider using Junit/Mockito for unit tests.
A possible improvement would be running a Kafka Streams on Testcontainers Kafka Brokers (see).
Currently, it is possible to prefix topics and application.id with a single prefix.
It should be possible to handle multiple prefix in case of consuming a topic with a different prefix
Join behavior has changed since 3.X, as described in the following links:
https://issues.apache.org/jira/browse/KAFKA-13813
https://confluentinc.atlassian.net/browse/KSTREAMS-5249
Custom utility methods need to be implemented, in a similar logic to the one for DedupUtils : the method needs to take a callback function to describe what to do with the join once it is found.
For left joins, implementation can involve a window store on the right side, or even a timestampedKeyValueStore with a scheduled task for purging.
We could use polymorphism to make multiple levels of details:
A nice improvement would be running the library on Gitpod
Add getting started section to explain the goal of this library and add gif to easily visualize how to use the library.
Currently the KafkaError is an Avro object, meaning that anyone using this library needs to have a schema registry on their cluster and use it.
We can create an interface KafkaErrorInterface
that requires getters and setters for topic, partition, offset, error stack, message, with the default implementation being the KafkaError avro POJO.
Override could be declared in the properties just like ExceptionHandlers for KafkaStreams.
We could also create a model and their serdes for all 3 main ways of interacting with kafka (Avro, JSON & Protobuff)
Improve consumption and production when custom serdes are used
Kstreamplify is providing the necessary parameters to configure the application.server
property required to implement interactive queries.
Unfortunately, Kstreamplify does not provide anything to simplify the implementation of interactive queries.
The purpose of this improvement proposal is to simplify the implementation of interactive queries, including the RPC (Remote Procedure Call).
Option #1
Provide an InteractiveQueriesService
that can request a store and handle the RPC. Here is an implementation example using Spring Boot and KafkaPerson
Avro:
@Slf4j
@Service
public class InteractiveQueriesService {
@Autowired
private KafkaStreamsInitializer kafkaStreamsInitializer;
private final RestTemplate restTemplate = new RestTemplate();
public KafkaPerson getByKey(String key) {
final var host = getHostByStoreAndKey(PERSON_INTERACTIVE_QUERIES_RPC_STATE_STORE.toString(),
key, new StringSerializer());
if (isNotCurrentHost(host)) {
log.info("The key {} has been located on another instance ({}:{})", key,
host.host(), host.port());
return getRecordOnOtherInstance(host, "/api/v1/rpc/persons/" + key);
}
log.info("The key {} has been located on this instance ({}:{})", key,
host.host(), host.port());
final ReadOnlyKeyValueStore<String, KafkaPerson> store = kafkaStreamsInitializer.getKafkaStreams().store(
StoreQueryParameters.fromNameAndType(PERSON_INTERACTIVE_QUERIES_RPC_STATE_STORE.toString(),
QueryableStoreTypes.keyValueStore()));
KafkaPerson person = store.get(key);
if (person == null) {
log.info("No person found for the key {}", key);
return null;
}
return person;
}
public List<KafkaPerson> getAll() {
final List<HostInfo> hosts = getHostsByStore(PERSON_INTERACTIVE_QUERIES_RPC_STATE_STORE.toString());
if (hosts.isEmpty()) {
log.info("No host found for the given state store {}", PERSON_INTERACTIVE_QUERIES_RPC_STATE_STORE);
return Collections.emptyList();
}
List<KafkaPerson> persons = new ArrayList<>();
hosts.forEach(host -> {
if (isNotCurrentHost(host)) {
log.info("Fetching data on other instance ({}:{})", host.host(), host.port());
persons.addAll(getAllOnOtherInstance(host, "api/v1/rpc/persons/instance"));
} else {
log.info("Fetching data on this instance ({}:{})", host.host(), host.port());
persons.addAll(getAllOnCurrentInstance()
.stream()
.map(entry -> entry.value)
.toList());
}
});
return persons;
}
public List<KeyValue<String, KafkaPerson>> getAllOnCurrentInstance() {
final ReadOnlyKeyValueStore<String, KafkaPerson> store = kafkaStreamsInitializer.getKafkaStreams().store(
StoreQueryParameters.fromNameAndType(PERSON_INTERACTIVE_QUERIES_RPC_STATE_STORE.toString(),
QueryableStoreTypes.keyValueStore()));
List<KeyValue<String, KafkaPerson>> results = new ArrayList<>();
try (KeyValueIterator<String, KafkaPerson> iterator = store.all()) {
while (iterator.hasNext()) {
results.add(iterator.next());
}
}
return results;
}
private KafkaPerson getRecordOnOtherInstance(HostInfo host, String endpointPath) {
try {
return restTemplate
.getForEntity(String.format("http://%s:%d/%s", host.host(), host.port(), endpointPath), KafkaPerson.class)
.getBody();
} catch (RestClientException e) {
log.info("The other instance ({}:{}) threw an error. Cannot retrieve the entity", host.host(), host.port(), e);
return null;
}
}
private List<KafkaPerson> getAllOnOtherInstance(HostInfo host, String endpointPath) {
KafkaPerson[] persons = restTemplate
.getForEntity(String.format("http://%s:%d/%s", host.host(), host.port(), endpointPath), KafkaPerson[].class)
.getBody();
if (persons == null) {
return Collections.emptyList();
}
return Arrays.asList(persons);
}
public List<HostInfo> getHostsByStore(final String store) {
final Collection<StreamsMetadata> metadata = kafkaStreamsInitializer
.getKafkaStreams()
.streamsMetadataForStore(store);
if (metadata == null || metadata.isEmpty()) {
return Collections.emptyList();
}
return metadata
.stream()
.map(StreamsMetadata::hostInfo)
.toList();
}
public <K> HostInfo getHostByStoreAndKey(final String store, final K key, final Serializer<K> serializer) {
final KeyQueryMetadata metadata = kafkaStreamsInitializer
.getKafkaStreams()
.queryMetadataForKey(store, key, serializer);
if (metadata == null) {
return null;
}
return metadata.activeHost();
}
/**
* Compare the given host to the host of the current KStream instance
* @param compareHostInfo The host information to compare
* @return True if the hosts are different, false if equals
*/
private boolean isNotCurrentHost(HostInfo compareHostInfo) {
return !kafkaStreamsInitializer.getHostInfo().host().equals(compareHostInfo.host())
|| kafkaStreamsInitializer.getHostInfo().port() != compareHostInfo.port();
}
}
โก The implementation needs to be split between kstreamplify-core
and kstreamplify-spring-boot
. The Spring Boot module should reuse the core implementation as much as possible.
โก The KafkaPerson
is an example and needs to be templatized.
โก The name of the store is hardcoded here and needs to be passed as parameter as well as the endpoint path to call on other instance.
โก restTemplate
is coming from Spring Boot and probably needs to be changed as the service will be implemented in the core-module.
๐ก Expected result
As a result, I want to be able to inject the service in my project:
@Autowired
private InteractiveQueryService interactiveQueryService;
...
interactiveQueryService.getByKey(store, key, path);
interactiveQueryService.getAll(store, path);
interactiveQueryService.getAllOnCurrentInstance(store);
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.