Giter Site home page Giter Site logo

softwaremill / kmq Goto Github PK

View Code? Open in Web Editor NEW
330.0 44.0 46.0 460 KB

Kafka-based message queue

Home Page: https://softwaremill.com/open-source/

License: Apache License 2.0

Java 43.22% Scala 56.78%
kafka message-queue reactive-streams java scala cats-effect

kmq's Introduction

Kafka Message Queue

Join the chat at https://gitter.im/softwaremill/kmq Maven Central

Using kmq you can acknowledge processing of individual messages in Kafka, and have unacknowledged messages re-delivered after a timeout.

This is in contrast to the usual Kafka offset-committing mechanism, using which you can acknowledge all messages up to a given offset only.

If you are familiar with Amazon SQS, kmq implements a similar message processing model.

How does this work?

For a more in-depth overview see the blog: Using Kafka as a message queue, and for performance benchmarks: Kafka with selective acknowledgments (kmq) performance & latency benchmark

The acknowledgment mechanism uses a marker topic, which should have the same number of partitions as the "main" data topic (called the queue topic). The marker topic is used to track which messages have been processed, by writing start/end markers for every message.

message flow diagram

Using kmq

An application using kmq should consist of the following components:

  • a number of RedeliveryTrackers. This components consumes the marker topic and redelivers messages if appropriate. Multiple copies should be started in a cluster for fail-over. Uses automatic partition assignment.
  • components which send data to the queue topic to be processed
  • queue clients, either custom or using the KmqClient

Maven/SBT dependency

SBT:

"com.softwaremill.kmq" %% "core" % "0.3.1"

Maven:

<dependency>
    <groupId>com.softwaremill.kmq</groupId>
    <artifactId>core_2.13</artifactId>
    <version>0.3.1</version>
</dependency>

Note: The supported Scala versions are: 2.12, 2.13.

Client flow

The flow of processing a message is as follows:

  1. read messages from the queue topic, in batches
  2. write a start marker to the markers topic for each message, wait until the markers are written
  3. commit the biggest message offset to the queue topic
  4. process messages
  5. for each message, write an end marker. No need to wait until the markers are written.

This ensures at-least-once processing of each message. Note that the acknowledgment of each message (writing the end marker) can be done for each message separately, out-of-order, from a different thread, server or application.

Example code

There are three example applications:

  • example-java/embedded: a single java application that starts all three components (sender, client, redelivery tracker)
  • example-java/standalone: three separate runnable classes to start the different components
  • example-scala: an implementation of the client using reactive-kafka

Time & timestamps

How time is handled is crucial for message redelivery, as messages are redelivered after a given amount of time passes since the start marker was sent.

To track what was sent when, kmq uses Kafka's message timestamp. By default, this is messages create time (message.timestamp.type=CreateTime), but for the markers topic, it is advisable to switch this to LogAppendTime. That way, the timestamps more closely reflect when the markers are really written to the log, and are guaranteed to be monotonic in each partition (which is important for redelivery - see below).

To calculate which messages should be redelivered, we need to know the value of "now", to check which start markers have been sent later than the configured timeout. When a marker has been received from a partition recently, the maximum such timestamp is used as the value of "now" - as it indicates exactly how far we are in processing the partition. What "recently" means depends on the useNowForRedeliverDespiteNoMarkerSeenForMs config setting. Otherwise, the current system time is used, as we assume that all markers from the partition have been processed.

Dead letter queue (DMQ)

The redelivery of the message is attempted only a configured number of times. By default, it's 3. You can change that number by setting maxRedeliveryCount value in KmqConfig. After that number is exceeded messages will be forwarded to a topic working as a dead letter queue. By default, the name of that topic is name of the message topic concatenated with the suffix __undelivered. You can configure the name by setting deadLetterTopic in KmqConfig. The number of redeliveries is tracked by kmq with a special header. The default the name of that header is kmq-redelivery-count. You can change it by setting redeliveryCountHeader in KmqConfig.

kmq's People

Contributors

adamw avatar gitter-badger avatar jakubdziworski avatar katlasik avatar mergify[bot] avatar mkrzemien avatar mohamedel59 avatar softwaremill-ci avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kmq's Issues

Explanation regarding re-delivery tracker

Hey,
I was using this project and some doubts regarding the implementation of the re-delivery tracker and its limitations. My doubts are:

  1. As per the logic, if I spawn multiple redelivery trackers on one topic for one client, I should spawn redelivery tracker with the same group-id. Then one record of marker would go to eighter of the tracker, which is then stored in the in-memory storage. My point does these multiple redelivery trackers shared same in memory storage? If not then there would be a problem in case one tracker gets the start marker while other get the end marker.

  2. If it is an in-memory storage, can it be accessed over multiple JVMs? So if multiple copies of kmq are running in different JVMs with the same group-id. All should be accessing same memory storage.

update Kafka

  • Update Kafka dependencies from 2.x to 3.x
  • requires dropping a support for Scala 2.12 due to version conflicts among dependencies

Kafka Exactly Once implications

Hi!

Awesome project! Can some information be added about how Kafka 0.11's exactly once functionality will or won't be incorporated into this work? Are they complementary or orthogonal?

Need explanation on redelivery

Hi,

Great project. I was looking for 'per message acknowledgement' for a project and this seems to be a brilliant approach. I wanted to know more about how the redelivery tracker works.

The following section from the docs was not very clear -

When a marker has been received from a partition recently, the maximum such timestamp is used as the value of "now" - as it indicates exactly how far we are in processing the partition. What "recently" means depends on the useNowForRedeliverDespiteNoMarkerSeenForMs config setting. Otherwise, the current system time is used, as we assume that all markers from the partition have been processed.

When a start marker for an offset has been added, how will the redelivery tracker know if the end marker has arrived or not?
I am assuming that the redelivery tracker will scan the marker topic periodically from start to end.
It will check the start markers which are due past their timeout. If there is no end marker for the same it will be sent back to the main topic to be processed.
In the marker topic how do we get rid of 'start markers' which have been acknowledged? Do we commit them with kafka broker?

ConsumeMarkersActor is blocked when kafka becomes suddenly down

In my tests I stumbled upon a case where actor system wouldn't shutdown. Turned out that ConsumeMarkersActor was blocked by polling from kafka (https://stackoverflow.com/questions/50268622/kafka-consumer-hangs-on-poll-when-kafka-is-down). It happens if you shut down kafka before shutting down actor.

Reactive kafka already solved this issue (https://github.com/akka/alpakka-kafka/blob/master/core/src/main/resources/reference.conf#L57) so It would be good idea to use it.

EndOffset of a redelivery consumer

Just wanted to know the use of the disableRedeliveryBefore argument of the marker queue?

And as per the logic, I feel the following case scenario would be a failure.
for code reference :

assignedPartitions += p -> AssignedPartition(new MarkersQueue(endOffset - 1), redeliverActor, None, None)

Case Scenario:
For a Topic(X) there is a marker topic(MT). One redelivery tracker is listening to the marker topic.
The order of markers on a partition are SM1, SM2, EM1, EM2, where SMx is start-marker for some key x and EMx is end-marker for some key x. the redelivery tracker commits on Kafka till EM2.
Then redelivery tracker disconnects and a new redelivery consumer is created, and in between a new SM3 is published. But as the consumer connects after SM3, endOffset/ disabledRedeliveryBefore for MarkerQueue of partition is after the SM3 offset.

Problem:
SM3 would not be tracked by any redelivery tracker.

Possible Solution:
Rather than endOffset, the redelivery tracker can disable redelivery before the last committed offset.

simple http api

PoC of HTTP API. Proposal: provide three operations:

  • SendMessage
  • ReceiveMessage
  • AcknowledgeMessage
    Simplifications for the PoS:
  • Connect clients with an existing queue (topic)
  • No authorization

KMQ for production

Hello,

Is there any production environments which was used this KMQ. Please let me know.

Thanks,
Anil.

Make KafkaClients class more flexible

Hi!

I've started using your implementation and I really like it. The only question I have is why can't I add extra configuration settings when calling the createConsumer function, as opposed to createProducer which allows for such a thing (Map<String, Object> extraConfig argument).

TIA!

fix streams drain control

  • Use new streams in original IntegrationTest
  • Fix a bug in handling DrainControl in streams which results in IntegrationTest timeout.

Maximum tries for redelivery tracker

Hi,
Great work, Just wanted to enquire about any way to restrict number of retries for redelivery a message to a queue. So in case the client is down parmanently, the topic would not populate beyond certain fixed number of redelivered messages.

Possible correction in docs

To calculate which messages should be redelivered, we need to know the value of "now", to check which start markers have been sent later than the configured timeout.

Should 'start marker' be 'end marker' here?

update sbt

  • update sbt version
  • update list of plugins
  • scala versions matrix 2.12, 2.13
  • update lib versions (requires removal of scala 2.11 support)
  • expected output: project compiles, individual tests pass

add dead-letter queue

Follow up from #19

  • add dead-letter queue
  • update integration tests
  • parametrize dead-letter queue name

add retry count

Handle max retry count:

  • add retry count on redelivery
  • parametrize max retry count
  • update integration tests

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.