Giter Site home page Giter Site logo

spring-cloud-stream-binder-kafka's Introduction

CircleCI
codecov
Gitter

ANNOUNCEMENT

IMPORTANT: This repository is now migrated as part of core Spring Cloud Stream - https://github.com/spring-cloud/spring-cloud-stream. Please create new issues over at the core repository.

Apache Kafka Binder

Usage

To use Apache Kafka binder, you need to add spring-cloud-stream-binder-kafka as a dependency to your Spring Cloud Stream application, as shown in the following example for Maven:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

Alternatively, you can also use the Spring Cloud Stream Kafka Starter, as shown in the following example for Maven:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

Apache Kafka Streams Binder

Usage

To use Apache Kafka Streams binder, you need to add spring-cloud-stream-binder-kafka-streams as a dependency to your Spring Cloud Stream application, as shown in the following example for Maven:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>

Appendices

Appendix A: Building

Basic Compile and Test

To build the source you will need to install JDK 1.7.

The build uses the Maven wrapper so you don’t have to install a specific version of Maven. To enable the tests, you should have Kafka server 0.9 or above running before building. See below for more information on running the servers.

The main build command is

$ ./mvnw clean install

You can also add '-DskipTests' if you like, to avoid running the tests.

Note
You can also install Maven (>=3.3.3) yourself and run the mvn command in place of ./mvnw in the examples below. If you do that you also might need to add -P spring if your local Maven settings do not contain repository declarations for spring pre-release artifacts.
Note
Be aware that you might need to increase the amount of memory available to Maven by setting a MAVEN_OPTS environment variable with a value like -Xmx512m -XX:MaxPermSize=128m. We try to cover this in the .mvn configuration, so if you find you have to do it to make a build succeed, please raise a ticket to get the settings added to source control.

The projects that require middleware generally include a docker-compose.yml, so consider using Docker Compose to run the middeware servers in Docker containers.

Documentation

There is a "full" profile that will generate documentation.

Working with the code

If you don’t have an IDE preference we would recommend that you use Spring Tools Suite or Eclipse when working with the code. We use the m2eclipe eclipse plugin for maven support. Other IDEs and tools should also work without issue.

Importing into eclipse with m2eclipse

We recommend the m2eclipe eclipse plugin when working with eclipse. If you don’t already have m2eclipse installed it is available from the "eclipse marketplace".

Unfortunately m2e does not yet support Maven 3.3, so once the projects are imported into Eclipse you will also need to tell m2eclipse to use the .settings.xml file for the projects. If you do not do this you may see many different errors related to the POMs in the projects. Open your Eclipse preferences, expand the Maven preferences, and select User Settings. In the User Settings field click Browse and navigate to the Spring Cloud project you imported selecting the .settings.xml file in that project. Click Apply and then OK to save the preference changes.

Note
Alternatively you can copy the repository settings from .settings.xml into your own ~/.m2/settings.xml.

Importing into eclipse without m2eclipse

If you prefer not to use m2eclipse you can generate eclipse project metadata using the following command:

$ ./mvnw eclipse:eclipse

The generated eclipse projects can be imported by selecting import existing projects from the file menu. [[contributing] == Contributing

Spring Cloud is released under the non-restrictive Apache 2.0 license, and follows a very standard Github development process, using Github tracker for issues and merging pull requests into master. If you want to contribute even something trivial please do not hesitate, but follow the guidelines below.

Sign the Contributor License Agreement

Before we accept a non-trivial patch or pull request we will need you to sign the contributor’s agreement. Signing the contributor’s agreement does not grant anyone commit rights to the main repository, but it does mean that we can accept your contributions, and you will get an author credit if we do. Active contributors might be asked to join the core team, and given the ability to merge pull requests.

Code Conventions and Housekeeping

None of these is essential for a pull request, but they will all help. They can also be added after the original pull request but before a merge.

  • Use the Spring Framework code format conventions. If you use Eclipse you can import formatter settings using the eclipse-code-formatter.xml file from the Spring Cloud Build project. If using IntelliJ, you can use the Eclipse Code Formatter Plugin to import the same file.

  • Make sure all new .java files to have a simple Javadoc class comment with at least an @author tag identifying you, and preferably at least a paragraph on what the class is for.

  • Add the ASF license header comment to all new .java files (copy from existing files in the project)

  • Add yourself as an @author to the .java files that you modify substantially (more than cosmetic changes).

  • Add some Javadocs and, if you change the namespace, some XSD doc elements.

  • A few unit tests would help a lot as well — someone has to do it.

  • If no-one else is using your branch, please rebase it against the current master (or other target branch in the main project).

  • When writing a commit message please follow these conventions, if you are fixing an existing issue please add Fixes gh-XXXX at the end of the commit message (where XXXX is the issue number).

spring-cloud-stream-binder-kafka's People

Contributors

aldex32 avatar amanzag avatar arnaudjardine avatar artembilan avatar bewithvk avatar garyrussell avatar hekonsek avatar ilayaperumalg avatar imechemi avatar marcingrzejszczak avatar mbogoevici avatar ncheema avatar nico-javadev avatar olegz avatar onobc avatar porshkevich avatar robertomatas avatar sarathshyam avatar sflandergan avatar sobychacko avatar spring-builds avatar spring-operator avatar sserhii-95 avatar tdanylchuk avatar tellisnz avatar top-cat avatar ultimaphoenix avatar viniciusccarvalho avatar walliee avatar yanayita avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spring-cloud-stream-binder-kafka's Issues

Creating a DLQ fails if group name does not pass topic name validations

When the DLQ is created, the pattern to create the topic name is:

  • error.<destination>.<group>

The part is refering to the current group name and this value is not validated with same rules as the topic name (using the org.springframework.cloud.stream.binder.kafka.KafkaTopicUtils#validateTopicName method)

So, if the group name contains special chars like ":" the dlq topic creation fails...

I would like to contribute to solve the problem, but I don't know what would be the better approach:

  1. Validate the group name with the KafkaTopicUtils class if the enableDlq option is enabled
  2. Filter the group name on the fly to create a valid DLQ topic name.

any suggestions?

KafkaBinderHealthIndicator - KafKaConsumer properties problem - spring-cloud-stream-binder-kafka version 1.1.0.BUILD-SNAPSHOT

Originally reported by @justcoon in spring-cloud-stream as

spring-cloud-stream-binder-kafka version 1.1.0.BUILD-SNAPSHOT
spring-cloud-stream-binder-kafka version 1.1.0.BUILD-SNAPSHOT

Kafka health check crashing on:

Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "value.deserializer" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:148)
at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:49)
at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:56)
at org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:336)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:512)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:494)
at org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator.health(KafkaBinderHealthIndicator.java:58)

in KafkaBinderHealthIndicator

@Override
public Health health() {
    Map<String, String> properties = new HashMap<>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties
            .getKafkaConnectionString());
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
    KafkaConsumer metadataConsumer = new KafkaConsumer(properties);

probably missing configuration for ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG

Missing Spring core version

I just cloned the project and tried to open it in Intellij Idea, I get this error on each pom xml on this project : For artifact {org.springframework:spring-core:null:jar}: The version cannot be empty
Any solution ?

Support late binding for Kafka

Moved from: spring-cloud/spring-cloud-stream#590

Originally created by @vsrahul82:

Our spring boot apps try to connect to the Kafka servers at the time of application startup. If the Kafka server is down or name is wrongly spelled, the application does not startup. Is there a way to prevent the spring boot app from trying to connect to Kafka server so that application starts up successfully?

Support for new Producer/Consumer without Zookeeper connection

Hi,

is there any plan to support using the new Consumer / new Producer configuration without having to specify the connection to Zookeeper? http://kafka.apache.org/documentation.html#producerconfigs

It looks like it will default to localhost in KafkaBinderConfigurationProperties, and is an expected setting.

In some Kafka configurations, particularly when using ACLs, the connection to Zookeeper will be firewalled and the producer/consumer will only be allowed to connect to Kafka. This is often the case when management of topics is restricted.

Add 0.9 Kafka binder

From @sabbyanandan on February 24, 2016 17:51

As a developer, I'd like to continue the work from #340, so I can adapt the existing SCSt-Kafka binder with 0.9 release of Apache Kafka.

Acceptance:

  • SCSt-Kafka binder supports 0.9 release of Kafka
  • 0.9 is the default version in pom.xml
  • I can build SCSM applications with SCSt-kafka as the binder
  • I can run ticktock stream with Kafka 0.9 running as messaging middleware
  • Unit tests included
  • Docs updated

Copied from original issue: spring-cloud/spring-cloud-stream#373

Configuration of instanceIndex and instanceCount per topic

Currently it is possible to setup partitioned kafka consumers using spring.cloud.stream.instanceIndex and spring.cloud.stream.instanceCount. However the same application might have several consumer groups. I tried to specify these options per topic, but no luck. Could we add an option to configure it per topic likes this:
spring.cloud.stream.bindings.input.consumer.instanceCount=2

REST API for spring cloud stream consumer of Kafka Topic

From @vsrahul82 on July 14, 2016 19:44

The consumer example in documentation polls the Kafka topic and returns instantly when a message arrives on the topic. If i wanted the message to be read from queue only when i invoke the consume method, kind of like provide a REST API that will return a message from Kafka topic when invoked how can i do that? Please provide code example.

Copied from original issue: spring-cloud/spring-cloud-stream#591

Auto partition assignment issue when there are more consumers than there are partitions

From @mnadelson on September 14, 2016 15:25

I have a consumer that subscribes to a topic with 3 partitions. I launch the same instance of that consumer multiple times (they will of course all have the same consumer group). When I launch one instance the consumer gets all three partitions (as expected). When I launch 3 instances each consumer gets one partition (as expected). When I launch 4 instances two of the consumers got the same partition and received the same messages. This was not expected. I would either expect the 4th consumer to get none of the messages or the 4th consumer to round-robin the messages with the other consumer on the same partition.

Copied from original issue: spring-cloud/spring-cloud-stream#649

java.io.IOException: Packet len352518912 is out of range!

Created a Kafka cluster on https://aiven.io/kafka

My configuration is:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: test
      kafka:
        binder:
          zk-nodes: <kafka-service>.<project>.aivencloud.com:20786

With a demo app:

@SpringBootApplication
public class SinkApplication {
    public static void main(String[] args) {
        SpringApplication.run(SinkApplication.class, args);
    }
}

@EnableBinding(Sink.class)
class LogSink {
    private static Logger logger = LoggerFactory.getLogger(LogSink.class);

    @ServiceActivator(inputChannel= Sink.INPUT)
    public void loggerSink(Object payload) {
        logger.info("Received: " + payload);
    }
}

using Spring Boot 1.4.1.RELEASE with Spring Cloud Camden.SR1

fails with:

2016-10-31 18:30:12.179  INFO 72534 --- [           main] org.I0Itec.zkclient.ZkClient             : Waiting for keeper state SyncConnected
2016-10-31 18:30:12.183  INFO 72534 --- [6.32.115:20786)] org.apache.zookeeper.ClientCnxn          : Opening socket connection to server 188.166.32.115/188.166.32.115:20786. Will not attempt to authenticate using SASL (unknown error)
2016-10-31 18:30:12.214  INFO 72534 --- [6.32.115:20786)] org.apache.zookeeper.ClientCnxn          : Socket connection established to 188.166.32.115/188.166.32.115:20786, initiating session
2016-10-31 18:30:12.283  WARN 72534 --- [6.32.115:20786)] org.apache.zookeeper.ClientCnxn          : Session 0x0 for server 188.166.32.115/188.166.32.115:20786, unexpected error, closing socket connection and attempting reconnect

java.io.IOException: Packet len352518912 is out of range!
	at org.apache.zookeeper.ClientCnxnSocket.readLength(ClientCnxnSocket.java:112) ~[zookeeper-3.4.6.jar:3.4.6-1569965]
	at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:79) ~[zookeeper-3.4.6.jar:3.4.6-1569965]
	at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366) ~[zookeeper-3.4.6.jar:3.4.6-1569965]
	at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) ~[zookeeper-3.4.6.jar:3.4.6-1569965]

Do you know what could be the reason?

Do not use Spring Cloud Stream Parent as parent

Since version 1.1 we depend on Spring Cloud Stream as the parent for the build. This creates a tight coupling in what concerns the build cycle, etc. Should use Spring Cloud Build instead.

Test against Spring Kafka 1.1.0.M1

As a developer, I'd like to test Kafka binder against 1.1.0.M1 release of Spring Kafka, so I can evaluate binder functionality against 0.10 release of Kafka.

Add batching support for consumer applications

As a user, I'd like to have an option to control the batch-size at the consumer level, so I can buffer until a certain limit and consume the messages in bulk.

Note:
spring-kafka and spring-integration-kafka supports a BatchMessageListener and the binder can adapt to it to expose an overridable property similar to producer.

Acceptance:

  • I can enable batching via a property at consumer (i.e., listenerMode)
  • I can use the listenerMode property to override the consumer behavior with tow options: record and batch (If record, each Message will contain byte[] from a single data record. If batch, each Message will contain a List<byte[]>)
  • Document the feature with an example
  • Tests included

Add support for programmatic JAAS configuration

As a developer, I'd like to explore options to determine the right approach for programmatic JAAS configuration settings, so I can connect to a kerberized kafka cluster in cloud settings, where the applicability of physical .jaas file could be limited.

Spring Core null version

I got this problem on each project from spring cloud... Is there any solution ?
For artifact {org.springframework:spring-core:null:jar}: The version cannot be empty

Thanks

Lots of obsolete logging when perfoming health checks

We monitor our spring boot applications by accessing /health (every 30sec). A lot of logging occurs because of this. Looks like the health check for Kafka recreates a consumer every time causing to produce lots of logging like:

2016-09-28 14:44:01,963+0200 INFO [DiscoveryClient-InstanceInfoReplicator-0] config.AbstractConfig (AbstractConfig.java:178) - ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [localhost:9092]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = consumer-51
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id =
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = latest

Maybe the health check should not log this? I see there's a boolean doLog available in the constructor of org.apache.kafka.common.config.AbstractConfig.

Kind regards

Kafka consumer property 'resetOffsets' has no effect

While creating a consumer (and setting group id), the property spring.cloud.stream.kafka.bindings.<channelName>.consumer.resetOffsets (as described here: http://docs.spring.io/spring-cloud-stream/docs/Brooklyn.RELEASE/reference/htmlsingle/#_kafka_consumer_properties) seems to have no effect, and consumer consumes from last offset.
Sample properties:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: topic
          group: sample_group
      kafka:
        bindings:
          input:
            consumer:
              resetOffsets: true
              startOffset: earliest

The KafkaConsumerProperties#resetOffsets extension is set to true, but it doesn't seem to trigger any specific resetting of offsets.

Add support for Avro messages encoded with Confluent's (de)serializer (and other alternative (de)serializers)

As described in http://stackoverflow.com/questions/39941497/spring-cloud-stream-kafka-consuming-avro-messages-from-confluent-rest-proxy, the Kafka Binders fail to deserialize Avro messages which have been serialized via Confluents serializer. The issue seems to be that the Confluent serializer adds 4 (or 5?) magic bytes at the beginning of each message which contains the Avro schema ID which can be used to retrieve the schema from the schema registry.

There seems to be no easy way to configure the bindings to use alternative serializers. From what I understand, the bindings hard-code the use of the Kafka ByteArrayDeserializer (https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/master/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java#L254).

Kafka ProducerListener disabled in Camden.RELEASE

Reported by @wblancqu here: spring-cloud/spring-cloud-stream#667

Hi all,

We upgraded to the above release (including spring-cloud-stream-binder-kafka 1.1.0.RELEASE) to gain advantage of the "autoRebalanceEnabled" property. However, we noticed that in that release our ProducerListener doesn't work anymore.

I noticed in the code of KafkaBinderConfiguration (line 90) that setting the producer listener has been commented:

//kafkaMessageChannelBinder.setProducerListener(producerListener);

Any particular reason for this?

Thanks in advance

Revisit if KafkaTestBinder.TupleRegistrar is still needed

autoCommitOffset set to false does not allow manual acknowledgment

When I set spring.cloud.stream.kafka.bindings..consumer.autoCommitOffset = false I am getting errors trying to manually acknowledge the message:

Caused by: java.lang.IllegalStateException: A manual ackmode is required for an acknowledging listener
at org.springframework.util.Assert.state(Assert.java:392)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ConsumerAcknowledgment.acknowledge(KafkaMessageListenerContainer.java:991)

It looks like in KafkaMessageChannelBinder messageListenerContainer.getContainerProperties().setAckOnError is being set but not messageListenerContainer.getContainerProperties().setAckMode is not being set.

Reinstate static partitioning in Kafka biner

By default, Kafka binder currently provide dynamically allocated/rebalanced partitions. We need a way to tell the binder to use static partitioning. It is desirable to have two flavors of static partitioning.

  1. Use a modulo based approach (partition % instance count)
  2. Use fixed set of partitions.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.