Giter Site home page Giter Site logo

kstreamplify's People

Contributors

adriencalime avatar clvacher avatar dependabot[bot] avatar higv2000 avatar jsebayhi avatar loicgreffier avatar michelinbot avatar mlmomplot avatar qjoignaud avatar sebastienviale avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kstreamplify's Issues

โ›” DLQ topic definition in KafkaStreamsStarterTest

Problem

Today when wanting to define the DLQ for tests, we have to use the DLQ constant defined in The KafkaStreamsStarteTest which isn't documented anywhere and doesn't make much sense.

When using TopicWithSerde, it's even less understandable, as the syntax for defining the Dlq outputTestTopic is completely different:

    @BeforeEach
    void setUp() {
        infofacCanonicalInputTopic = createInputTestTopic(InfoFacTopicWithSerde.infoFacAllCanonical());
        infofacAvroOutputTopic = createOutputTestTopic(InfoFacTopicWithSerde.infoFacAllAvro());

        dlqTopic = testDriver.createOutputTopic(DLQ_TOPIC, new StringDeserializer(), SerdesUtils.<KafkaError>getSerdesForValue().deserializer());
    }

Also, the topology method feels wrong, as it's hard to understand what is required to override.

    @Override
    protected void topology(StreamsBuilder streamsBuilder) {
        new InfoFacCanonicalToAvroStream().topology(streamsBuilder);
    }

Suggestion

The dlqTopic is a core feature of the Kstreamplify library.
Defining the dlq is a redundant operation that will always happen, and getting the correct DLQ name can be obtained from the dlqTopic method in KafkaStreamStarter.

So in order to improve both aforementionned concepts, the required overridable topology() method can be deleted, and instead we should be able to write this in tests:

    @Override
    protected KafkaStreamsStarter getKafkaStreamsStarter() {
        return new InfoFacCanonicalToAvroStream();
    }

    @BeforeEach
    void setUp() {
        infofacCanonicalInputTopic = createInputTestTopic(InfoFacTopicWithSerde.infoFacAllCanonical());
        infofacAvroOutputTopic = createOutputTestTopic(InfoFacTopicWithSerde.infoFacAllAvro());
    }

The KafkaStreamsStarterTest generalSetup method can then get the dlq name from getKafkaStreamsStarter().dlq()

please note that the generalSetUp method probably has to be re-organised as calling the dlq() method before The StreamExecutionContext is fully set will lead to issues

Add retries on exception handler

Some production error or deserialization error can be retriable.

The production error handler and the deserialization error handler should handle these errors.

Interactive queries documentation

Problem

Documentation on interactive queries does not showcase an actual implementation of an interactive query, leaving the users scratching their heads for it.

Suggestion

A detailled explanation on how to interact with a store in the README, at least mentioning the autowiring of the KafkaStreamsInitializer class to retrieve the kafkaStreams instance

A link to a side project (?) showcasing the feature

Additional Context

Integrating @loicgreffier 's work on the subject for internal showcases to Kstreamplify is probably a very good addition to the library.

Improve code conventions

Code convention are not always respected (e.g.: this., static imports, ...). Overall code should be enhanced

Cannot parse a date / time field in JsonToAvroConverter

Describe the bug

The JsonToAvroConverter Java utility class is currently unable to parse an AVRO field of type "int" and logical type "date" (corresponding to the LocalDate Java type).

Indeed, in the populateFieldInRecordWithCorrespondingType Java method at line 214, there is no check on the AVRO field logical type :

image

Expected behavior

The corresponding Java field (of type LocalDate) should be set with the right value.

Environment

  • Java version: 17
  • Spring Boot version: 3.1.1
  • KStreamplify version: 1.0.1

Steps taken to try and solve the problem

  1. Run a unit test case and make it fail
  2. Add a check on the AVRO field logical type
  3. Run again the unit test case and make it succeed

Additional context

Some AVRO schemas used at Michelin contain fields of type "int" and logical type "date" (corresponding to the LocalDate Java type).

Not injected properties on Spring dependency

When running with the Spring Boot dependency, the Kafka Streams properties are not injected

The bridge between the core properties and the Spring Boot properties seems to be missing:

 @Override
 protected void initProperties() {
    // Set properties here
 }

Documentation on lifecycle

Problem

Lifecycle of Kstreamplify is not explained in doc and should be.

-> Explain hooks and how / what to use them for

๐Ÿ“ TopicWith serde improvements

Find a more accurate name for the variable "AppName" in TopicWithSerde

AppName suggests it could be linked to the applicationId, which is not the case.
Make it clearer that the property mentionned here is the property "key" under topic.prefix for a specific ns4kfk namespace.

Test the behavior with an empty "AppName" field

Potentially, we might also want to add another constructor without the AppName so that it does not get confusing for people that would not be using ns4kfk.

Update Spring Boot 2.7.x branch

Lot of improvements have been brought to the main branch since the previous beta version spring-kafka-streams.

It should be merged to the 2.7.x

๐Ÿš‚ Prefixing unification

The current prefixing strategy is as such:
kafka.properties.prefix is used for application prefixing.

kafka.properties.topic.prefix is used for topic prefixing, as follows:

kafka:
  properties:
    prefix: "myNsPrefix."
    application.id: "my-app"
    topic:
      prefix:
        ns: "myNsPrefix."
        ns1: "otherNsPrefix1."
        ns2: "otherNsPrefix2."

At properties registration in KafkaStreamExecutionContext, the application.id property is prepended with the prefix value.

kafka.properties.topic.prefix are used to determine the namespace prefixing that needs to be applied to each TopicWithSerde.

๐Ÿ’ก Both concepts are confusing and should be unified:

kafka:
  properties:
    application.id: "my-app"
    prefix:
      self: "myNsPrefix."
      ns1: "otherNsPrefix1."
      ns2: "otherNsPrefix2."

The prefix.self property becomes the default for application prefixing and topics that belong to the same namespace.

Implementations

  • As well as editing the way that the KafkaStreamExecutionContext retrieves the prefix for prepending, a method should be added to get the self prefix, in case topic definition is not done through TopicWithSerde.

  • TopicWithSerde should be edited so that a default constructor with 3 parameters (topicName, keySerde, valueSerde) is created. It would call the 4 parameter constructor with the "self" property key

Handle JSON serialization & deserialization and JSON formatted generic error objects

Problem

Many Kafka Streams users are more at easy using JSON instead of AVRO, I would like to be able to use KStreamplify as easily with JSON as it's easy to use with AVRO.

Suggestion

I would like to rename (or deprecate) SerdesUtils in favor of AvroSerdesUtils (since the library has no Major release yet !) and to create a JsonSerdesUtils to easily get a generic Serdes for Avro or JSON in the same transparent & generic way.

Alternatives Considered

A clear and concise description of any alternative solutions or features you've considered.

Additional Context

A pull request will follow.

๐Ÿ“„ Update documentation gif for tests

Describe the bug

In the testing section, the animation shows the old way of defining the topology inside the test class

Expected behavior

We should update the animation with the new "getKafkaStreamsStarter" method and mention the dlqTopic default as well

Exceptions are not reaching custom handlers

Can I utilize custom exception handler classes for default.production.exception.handler and default.deserialization.exception.handler, as well as UncaughtExceptionHandler? Despite my attempts, the exceptions are not reaching my handlers; rather, they are being handled by the internal Kafka Streams task. Additionally, it's worth noting that the exceptions reach my handlers successfully when not using kstreamplify.

Is this the expected behavior? If so, can it be enhanced to provide this flexibility of bringing your own exception handlers?

  • Java 17
  • Spring boot 3.2.2

โ€ผ DeduplicationUtils improvement with window store

Deduplication utils need to be improved:

When doing some history replay or when a huge amount of data is present, having a scheduled task with set frequency at one hour does not cover all use cases.

It would be better to use a window store like the dedupKayValues method for all implems

KafkaStreamsExecutionContext could expose all properties instead of only the ones under kafka.properties

Problem

KStreamplify reads all configurations from the Spring Boot context and exposes those under the kafka.properties prefix in a static global object, KafkaStreamsExecutionContext.

protected void initProperties() { properties = PropertiesUtils.loadProperties(); serverPort = (Integer) properties.get(SERVER_PORT_PROPERTY); kafkaProperties = PropertiesUtils.loadKafkaProperties(properties); KafkaStreamsExecutionContext.registerProperties(kafkaProperties); }

Suggestion

I would like to be able to access all my properties through this object to manage some business or technical aspects that are not directly related to Kafka from my external configuration. Typically, a Kafka Streams application developed with KStreamplify uses Processor or Transformer classes, which are not directly managed by the Spring Application Context.

Alternatives Considered

Possible alternatives for implementation include:

  • Hard-coding the kafka.properties prefix that determines what is exposed in the KafkaStreamsExecutionContext.
  • Keeping kafka.properties isolated but exposing all properties visible in the Spring Boot configuration context.
  • Creating a new application.properties static property category that contains both business and technical parameters we want to expose in the KafkaStreamsExecutionContext static class.
  • Using a separate class like ApplicationConfigurationContext, which contains only the non-Kafka business and technical parameters of the Kafka Streams application.

WindowStoreUtils method with timestamp

Problem

the existing method WindowStoreUtils.get is not enough, because it only takes Instant.now(), which is bad practice.

Suggestion

We should have an additionnal method with a parameter for the initial instant form which we want to do the backwardFetch

Remap topic name

Add the possibility to override the name of a topic from the application properties

Improve unit tests coverage

Improve the whole unit tests coverage.

Consider using Junit/Mockito for unit tests.

A possible improvement would be running a Kafka Streams on Testcontainers Kafka Brokers (see).

Handle multiple prefix for topics

Currently, it is possible to prefix topics and application.id with a single prefix.

It should be possible to handle multiple prefix in case of consuming a topic with a different prefix

๐Ÿ‘‰๐Ÿ‘ˆ Stream-stream left and outer join utility method to avoid 3.X grace period behavior

Problem

Join behavior has changed since 3.X, as described in the following links:
https://issues.apache.org/jira/browse/KAFKA-13813
https://confluentinc.atlassian.net/browse/KSTREAMS-5249

Suggestion

Custom utility methods need to be implemented, in a similar logic to the one for DedupUtils : the method needs to take a callback function to describe what to do with the join once it is found.

For left joins, implementation can involve a window store on the right side, or even a timestampedKeyValueStore with a scheduled task for purging.
We could use polymorphism to make multiple levels of details:

  • default implementation with window store and Pair as the output
  • level 1 advanced implementation with timestampedKeyValueStore, default purge frequency and join behavior as a function parameter
  • level 2 with custom purge frequency and purge behavior as a function parameter

Run on Gitpod

A nice improvement would be running the library on Gitpod

Implement an interface for KafkaError ๐Ÿ“›

Problem

Currently the KafkaError is an Avro object, meaning that anyone using this library needs to have a schema registry on their cluster and use it.

Suggestion

We can create an interface KafkaErrorInterface that requires getters and setters for topic, partition, offset, error stack, message, with the default implementation being the KafkaError avro POJO.
Override could be declared in the properties just like ExceptionHandlers for KafkaStreams.

Alternatives Considered

We could also create a model and their serdes for all 3 main ways of interacting with kafka (Avro, JSON & Protobuff)

Interactive Queries service

Kstreamplify is providing the necessary parameters to configure the application.server property required to implement interactive queries.

Unfortunately, Kstreamplify does not provide anything to simplify the implementation of interactive queries.

The purpose of this improvement proposal is to simplify the implementation of interactive queries, including the RPC (Remote Procedure Call).

Option #1

Provide an InteractiveQueriesService that can request a store and handle the RPC. Here is an implementation example using Spring Boot and KafkaPerson Avro:

@Slf4j
@Service
public class InteractiveQueriesService {
    @Autowired
    private KafkaStreamsInitializer kafkaStreamsInitializer;

    private final RestTemplate restTemplate = new RestTemplate();

    public KafkaPerson getByKey(String key) {
        final var host = getHostByStoreAndKey(PERSON_INTERACTIVE_QUERIES_RPC_STATE_STORE.toString(),
                key, new StringSerializer());

        if (isNotCurrentHost(host)) {
            log.info("The key {} has been located on another instance ({}:{})", key,
                    host.host(), host.port());

            return getRecordOnOtherInstance(host, "/api/v1/rpc/persons/" + key);
        }

        log.info("The key {} has been located on this instance ({}:{})", key,
                host.host(), host.port());

        final ReadOnlyKeyValueStore<String, KafkaPerson> store = kafkaStreamsInitializer.getKafkaStreams().store(
                StoreQueryParameters.fromNameAndType(PERSON_INTERACTIVE_QUERIES_RPC_STATE_STORE.toString(),
                        QueryableStoreTypes.keyValueStore()));

        KafkaPerson person = store.get(key);
        if (person == null) {
            log.info("No person found for the key {}", key);
            return null;
        }

        return person;
    }

    public List<KafkaPerson> getAll() {
        final List<HostInfo> hosts = getHostsByStore(PERSON_INTERACTIVE_QUERIES_RPC_STATE_STORE.toString());

        if (hosts.isEmpty()) {
            log.info("No host found for the given state store {}", PERSON_INTERACTIVE_QUERIES_RPC_STATE_STORE);
            return Collections.emptyList();
        }

        List<KafkaPerson> persons = new ArrayList<>();
        hosts.forEach(host -> {
            if (isNotCurrentHost(host)) {
                log.info("Fetching data on other instance ({}:{})", host.host(), host.port());
                persons.addAll(getAllOnOtherInstance(host, "api/v1/rpc/persons/instance"));
            } else {
                log.info("Fetching data on this instance ({}:{})", host.host(), host.port());
                persons.addAll(getAllOnCurrentInstance()
                        .stream()
                        .map(entry -> entry.value)
                        .toList());
            }
        });

        return persons;
    }

    public List<KeyValue<String, KafkaPerson>> getAllOnCurrentInstance() {
        final ReadOnlyKeyValueStore<String, KafkaPerson> store = kafkaStreamsInitializer.getKafkaStreams().store(
                StoreQueryParameters.fromNameAndType(PERSON_INTERACTIVE_QUERIES_RPC_STATE_STORE.toString(),
                        QueryableStoreTypes.keyValueStore()));

        List<KeyValue<String, KafkaPerson>> results = new ArrayList<>();
        try (KeyValueIterator<String, KafkaPerson> iterator = store.all()) {
            while (iterator.hasNext()) {
                results.add(iterator.next());
            }
        }

        return results;
    }

    private KafkaPerson getRecordOnOtherInstance(HostInfo host, String endpointPath) {
        try {
            return restTemplate
                    .getForEntity(String.format("http://%s:%d/%s", host.host(), host.port(), endpointPath), KafkaPerson.class)
                    .getBody();
        } catch (RestClientException e) {
            log.info("The other instance ({}:{}) threw an error. Cannot retrieve the entity", host.host(), host.port(), e);
            return null;
        }
    }

    private List<KafkaPerson> getAllOnOtherInstance(HostInfo host, String endpointPath) {
        KafkaPerson[] persons = restTemplate
                .getForEntity(String.format("http://%s:%d/%s", host.host(), host.port(), endpointPath), KafkaPerson[].class)
                .getBody();

        if (persons == null) {
            return Collections.emptyList();
        }

        return Arrays.asList(persons);
    }

    public List<HostInfo> getHostsByStore(final String store) {
        final Collection<StreamsMetadata> metadata = kafkaStreamsInitializer
                .getKafkaStreams()
                .streamsMetadataForStore(store);

        if (metadata == null || metadata.isEmpty()) {
            return Collections.emptyList();
        }

        return metadata
                .stream()
                .map(StreamsMetadata::hostInfo)
                .toList();
    }

    public <K> HostInfo getHostByStoreAndKey(final String store, final K key, final Serializer<K> serializer) {
        final KeyQueryMetadata metadata = kafkaStreamsInitializer
                .getKafkaStreams()
                .queryMetadataForKey(store, key, serializer);

        if (metadata == null) {
            return null;
        }

        return metadata.activeHost();
    }

    /**
     * Compare the given host to the host of the current KStream instance
     * @param compareHostInfo The host information to compare
     * @return True if the hosts are different, false if equals
     */
    private boolean isNotCurrentHost(HostInfo compareHostInfo) {
        return !kafkaStreamsInitializer.getHostInfo().host().equals(compareHostInfo.host())
                || kafkaStreamsInitializer.getHostInfo().port() != compareHostInfo.port();
    }
}

โžก The implementation needs to be split between kstreamplify-core and kstreamplify-spring-boot. The Spring Boot module should reuse the core implementation as much as possible.
โžก The KafkaPerson is an example and needs to be templatized.
โžก The name of the store is hardcoded here and needs to be passed as parameter as well as the endpoint path to call on other instance.
โžก restTemplate is coming from Spring Boot and probably needs to be changed as the service will be implemented in the core-module.

๐Ÿ’ก Expected result

As a result, I want to be able to inject the service in my project:

@Autowired
private InteractiveQueryService interactiveQueryService;

...

interactiveQueryService.getByKey(store, key, path);
interactiveQueryService.getAll(store, path);
interactiveQueryService.getAllOnCurrentInstance(store);

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.