Giter Site home page Giter Site logo

strimzi / strimzi-kafka-bridge Goto Github PK

View Code? Open in Web Editor NEW
263.0 23.0 117.0 2.49 MB

An HTTP bridge for Apache Kafka®

License: Apache License 2.0

Java 96.20% Shell 2.89% Dockerfile 0.42% Makefile 0.42% 1C Enterprise 0.03% Awk 0.05%
bridge kafka kafka-bridge http kafka-consumer kafka-producer kafka-rest-proxy kafka-client hacktoberfest

strimzi-kafka-bridge's Introduction

Strimzi

Run Apache Kafka on Kubernetes and OpenShift

Build Status GitHub release License Twitter Follow Artifact Hub

Strimzi provides a way to run an Apache Kafka® cluster on Kubernetes or OpenShift in various deployment configurations. See our website for more details about the project.

Quick Starts

To get up and running quickly, check our Quick Start for Minikube, OKD (OpenShift Origin) and Kubernetes Kind.

Documentation

Documentation for the current main branch as well as all releases can be found on our website.

Roadmap

The roadmap of the Strimzi Operator project is maintained as GitHub Project.

Getting help

If you encounter any issues while using Strimzi, you can get help using:

Strimzi Community Meetings

You can join our regular community meetings:

Resources:

Contributing

You can contribute by:

  • Raising any issues you find using Strimzi
  • Fixing issues by opening Pull Requests
  • Improving documentation
  • Talking about Strimzi

All bugs, tasks or enhancements are tracked as GitHub issues. Issues which might be a good start for new contributors are marked with "good-start" label.

The Dev guide describes how to build Strimzi. Before submitting a patch, please make sure to understand, how to test your changes before opening a PR Test guide.

The Documentation Contributor Guide describes how to contribute to Strimzi documentation.

If you want to get in touch with us first before contributing, you can use:

License

Strimzi is licensed under the Apache License, Version 2.0

Container signatures

From the 0.38.0 release, Strimzi containers are signed using the cosign tool. Strimzi currently does not use the keyless signing and the transparency log. To verify the container, you can copy the following public key into a file:

-----BEGIN PUBLIC KEY-----
MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAET3OleLR7h0JqatY2KkECXhA9ZAkC
TRnbE23Wb5AzJPnpevvQ1QUEQQ5h/I4GobB7/jkGfqYkt6Ct5WOU2cc6HQ==
-----END PUBLIC KEY-----

And use it to verify the signature:

cosign verify --key strimzi.pub quay.io/strimzi/operator:latest --insecure-ignore-tlog=true

Software Bill of Materials (SBOM)

From the 0.38.0 release, Strimzi publishes the software bill of materials (SBOM) of our containers. The SBOMs are published as an archive with SPDX-JSON and Syft-Table formats signed using cosign. For releases, they are also pushed into the container registry. To verify the SBOM signatures, please use the Strimzi public key:

-----BEGIN PUBLIC KEY-----
MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAET3OleLR7h0JqatY2KkECXhA9ZAkC
TRnbE23Wb5AzJPnpevvQ1QUEQQ5h/I4GobB7/jkGfqYkt6Ct5WOU2cc6HQ==
-----END PUBLIC KEY-----

You can use it to verify the signature of the SBOM files with the following command:

cosign verify-blob --key cosign.pub --bundle <SBOM-file>.bundle --insecure-ignore-tlog=true <SBOM-file>

Strimzi is a Cloud Native Computing Foundation incubating project.

CNCF ><

strimzi-kafka-bridge's People

Contributors

abhijit-mane avatar antonio-pedro99 avatar dependabot[bot] avatar elakito avatar examin avatar frawless avatar granzoto avatar haijun2022 avatar hifly81 avatar im-konge avatar kamaci avatar krishvoor avatar kun-lu20 avatar mandarjkulkarni avatar noamichael avatar paulrmellor avatar pinkyrathore avatar ppatierno avatar pushpithadilhan avatar scholzj avatar see-quick avatar serrss avatar shubhamrwt avatar shubhamvashisht avatar sknot-rh avatar srajmane avatar taikulawo avatar tombentley avatar vbusch avatar zoidyzoidzoid avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

strimzi-kafka-bridge's Issues

Discuss about a JSON format for serializing/deserializing AMQP messages against Kafka

Discuss about the possibility to use a JSON format in order to serialize/deserialize AMQP messages to/from Kafka. The current version gets only the AMQP message body and sends it to Kafka as message. The same in the opposite direction. Application and systems properties are lost.
Should they be preserved inside the Kafka message using a specific serialization format (i.e. JSON) ?

Flow control on sink side sending message to receiver

Consider to handle the flow control on AMQP receiver side.

The AMQP sender which receives message by Kafka consumer (via the internal queue) needs to consider the flow control and related credit asked by receiver.
Consider to define a max size for internal queue (related to link credit ?) and pause/resume the Kafka consumer (stopping to poll and commit).

Selectively disable source and sink part of the bridge

Could be useful to configure the bridge with source and sink part running or not ?
A bridge instance could be configure only for receiving by AMQP senders and sending to Kafka while another different instance (on different port) only for reading from Kafka and sending to AMQP receivers.

Sending messages to nonexistent topics

By default Kafka will create topics automatically when messages are send to a topic which doesn't already exist. But the cluster can be configured not to do this.

Currently the SourceBridgeEndpoint will happily handle a connection for a topic that doesn't exist. If the message is remotely settled then we call write() (no callback version) and the message is lost. If the message is not remotely settled then we reject the delivery, though with a less-than-helpful error message ("Failed to update metadata after XXXms").

We could check for the existence of the topic when setting up the link (in handle()), but:

  • The KafkaProducer API doesn't have an ideal method for this, only partitionsFor() which would also result in a poor error.
  • Currently we don't even require the receiver's target to have an address set (i.e. we don't know the topic yet).

Are the current semantics acceptable?

Proposal for specifying the message key with AMQP group-id

Currently the message key is specified through the custom annotation :

x-opt-bridge.key

We could use the system property group-id for that.
Leaving the x-opt-bridge.partition custom annotation for specifying partition.

So ...

for key : group-id is used or if it's not specified, the x-opt-bridge.key is used
for partition : it's possible only using x-opt-bridge.partition

Multiple Verticles

Currently the bridge consists of one Vert-X Verticle (the Bridge itself). This means that there's just one thread running all the code for both source and sink, across all the endpoints. At some scale that will become a bottleneck. Within openshift we can just scale up by using more pods, but it's wasteful to require more pods to tackle something which could be (easily?) tackled at the application layer.

To use more threads for the processing of messages we basically have to have more verticles in the application, and the obvious candidates for those are the endpoints.

The prerequisite for that refactoring is finding the current scaling limit is indeed a problem.

Improving root pom.xml file

Improving root pom.xml file in order to handle :

  • common properties for all related modules
  • common plugins

Add logic to consumer from a specific partition starting at specific offset

Add the possibility on the AMQP receiver side to specify partition and offset as filters :

rhiot.io:partition-filter:ulong
rhiot.io:offset-filter:uint

in order to assign that partition to the related KafkaConsumer and start reading from the specified offset.

Following use cases can be possible with different combinations :

  • partition + offset : consumer starts to read from that partition at specified offset
  • partition : consumer starts to read from that partition at last committed offset
  • offset : error ! set error condition and detach link
  • nothing : current behavior. Kafka consumer receives partitions after a re-balancing by Kafka server and start to read from last committed offset

Possible errors and behavior :

  • partition doesn't exist : set error condition and detach link
  • negative partition or offset : set error condition and detach link

[Question] Lag between current offset and log end offset

Is the lag between current offset and log end offset expected as a result of #23

E.g. Before creating a bridge receiver, the offset state on a topic is -
current offset = 24
log end offset = 24

After connecting to bridge, when a new message is consumed from topic and successfully delivered, the offset state is -
current offset = 24
log end offset = 25

Discussion on closing AMQP link when no partitions are available

Currently, if an AMQP receiver link is attached but no partitions are available on the topic because they are already assigned to other receivers, the attach link request is refused (detached).
Is it correct ? Or the link could remain attached, the receiver won't receive any message of course but when other receivers will disconnect, the Kafka coordinator'll reassign partitions and the new receiver will start to receive messages,

KafkaProducer first "send" blocks if Kafka server isn't online

The KafkaProducer send() method should be always asynchronous as reported by official documentation. It seems that the first invocation blocks if the Kafka server isn't online ... so it isn't asynchronous.
Written to the internal user Kafka mailing list.

Source Endpoint loses receiver reference on new attach

The Source Endpoint has only one field for a receiver link. Every time a new link is attached (from a remote AMQP sender), the receiver link instance is overridden and the previous link reference lost.
Today, delivery continues to work because all the handling is based on different objects like ProtonDelivery (for sending the disposition) and Message.

The offset state is not updated correctly when using FullOffsetTracker

Scenario -

Occurs only when using FullOffsetTracker.

Topic name : "topic_bridge"
Number of partitions : 2

  1. In SinkBridgeEndpoint.java, change offset tracker to FullOffsetTracker as a solution to out-of-order acknowledgement.

  2. Re-compile / install the project and start the bridge
    java -jar target/amqp-kafka-bridge-1.0-SNAPSHOT.jar

  3. In enmasse.kafka.bridge.example.BridgeReceiver.java, update the topic, consumer group name -
    TOPIC = topic_bridge
    GROUP_ID_PREFIX = topic_bridge_consumer_group_1

  4. Run BridgeReceiver.java

  5. Check state of offset in topic
    ./bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group topic_bridge_consumer_group_1

output >
PARTITION = 0, CURRENT-OFFSET = 2, LOG-END-OFFSET = 2, LAG = 0
PARTITION = 1, CURRENT-OFFSET = 1, LOG-END-OFFSET = 1, LAG = 0

  1. Produce 5 messages in topic and let us say, all of them go to partition 0

  2. Then, check offset state
    ./bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group topic_bridge_consumer_group_1

output >
PARTITION = 0, CURRENT-OFFSET = 2, LOG-END-OFFSET = 7, LAG = 5
PARTITION = 1, CURRENT-OFFSET = 1, LOG-END-OFFSET = 1, LAG = 0

Link properties for setting Kafka consumer configuration

When an AMQP sender attaches a link in order to send message to the bridge, it could be interesting to use the attach "properties" to bring some specific configuration for the related internal KafkaConsumer that will be used to read from Kafka server.

What do you think @grs ?

Mapping "/" into "." in the topic name

Apache Kafka doesn't support "/" character in a topic name. In order to support AMQP address with "/", the bridge should execute a mapping from "/" to the "." character.

Remove the need for "groupid" if partition and offset are specified for consumer

When a consumer wants to get messages from a topic from a specific partition (starting from an offset), it doesn't need to attach at [topic]/group.id/[groupid], because the group.id isn't needed in that case (consumer doesn't belong to any consumer group). In this case it's enough to specify [topic] as address to attach and then using message annotations for partition and offset.

Sink endpoint should close link if "group.id" isn't specified

On handling incoming link attachment from receiver, the sink endpoint MUST catch the "group.id" resolving.
Currently it crashes if "group.id" isn't specified but it should catch the error and close the link with an appropriate ErrorCondition (bridge:no-group-id).

Discussion on AMQP partition annotation

Let's discuss if current implementation is good on using AMQP message annotation for specifying partition on sender side or it could be better to have this info inside the address.

Current implementation use "x-opt-bridge.partition" as message annotation.
Could it be better to use something like "my_topic/partitions/[partition_id]" ?

The first (current) implementation seems to be simpler to handle on bridge side (quick access to annotations inside the message) but annotations aren't so much used on client side ? The second one is more friendly to the sender client (thinking in terms of URI) but "more complex" for bridge related to the address parsing needed.

If detach is going to be send, the attach has to have source "null"

Thanks to a JMS receiver example where the error on message consumer creation wasn't get if all partitions are assigned (so no other receiver can be attached), I found out that AMQP spec wants an attach with source "null" followed by detach. Currently the source isn't null before detaching.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.