Giter Site home page Giter Site logo

spring-pulsar's Introduction

Spring for Apache Pulsar

Join the chat at https://gitter.im/spring-pulsar/community GitHub Workflow Status GitHub pull requests GitHub issues GitHub release (latest by date including pre-releases) GitHub tag (latest by date) Sonatype Nexus (Releases)

Spring For Apache Pulsar allows any java/kotlin application to easily integrate with Apache Pulsar avoiding boilerplate code. It supports configuration and annotation based creation of pulsar components.

How to use?

Defining producer or consumer is a step-by-step process as described below.

  1. Add spring-pulsar library as dependency in your project.
  2. Define client configuration in property source file.
  3. Create producer using template.
  4. Create consumer using annotation.
  5. Define error handlers.
  6. Sample applications for reference.

Add dependencies

For Spring Pulsar client to work you need to add spring-pulsar library as a dependency in your project. So go ahead and add below dependency in your application.

For Maven application

<dependency>
    <groupId>com.intuit.pulsar</groupId>
    <artifactId>spring-pulsar-core</artifactId>
    <version>${spring-pulsar-core.version}</version>
</dependency>

For Gradle application

dependencies {
    implementation 'com.intuit.pulsar:spring-pulsar-core:$springPulsarCoreVersion'
}

Define Client Configuration

To create pulsar client you need to define the client properties in your application's properties file and add scanning of base package of the spring-pulsar-core package in your Spring boot application.

Pulsar spring library will automatically detect the client configuration present in your application and generate client using defined properties. See the details below on how to create client.

Add scan base package on spring-pulsar-core package

For Kotlin application

`@SpringBootApplication(scanBasePackages = ["com.intuit.spring.pulsar.client"])`

For Java application

`@SpringBootApplication(scanBasePackages = {"com.intuit.spring.pulsar.client"})`

Define client config in applications property source

In application.yml

    pulsar:
        client:
            serviceUrl: pulsar+ssl://your.service.url:6651
            tls:
                tlsAllowInsecureConnection: true
                tlsHostnameVerificationEnable: false
            auth:
                username: UserName
                password: Password

In application.properties

    pulsar.client.serviceUrl=pulsar+ssl://your.service.url:6651
    pulsar.client.tls.tlsAllowInsecureConnection=true
    pulsar.client.tls.tlsHostnameVerificationEnable=false
    pulsar.client.auth.username=UserName
    pulsar.client.auth.password=Password

Define Producer

In order to create a producer, you need to register a producer template bean with properties related to producer. Once the template is registered, you can autowire the template anywhere in your application and use methods like send() and sendAsync() to publish messages to topic.

Below code shows an example of defining a producer template.

@Configuration
open class ProducerConfiguration(val applicationContext: ApplicationContext) {

    @Bean
    open fun producerTemplate(): PulsarProducerTemplate<ByteArray> {
        return PulsarProducerTemplateImpl<ByteArray>(
                    pulsarProducerConfig = PulsarProducerConfig(
                        schema = Schema.BYTES,
                        topicName = "persistent://tenent/namespace/topicName",
                        autoFlush = true),
                    applicationContext = applicationContext)
    }
}

Once you have created and registered a producer template as spring bean, now you can autowire the producer template in your application and use it to publish messages as below.

@Component
class SomeClass(val producerTemplate: PulsarProducerTemplate<ByteArray>) {

    fun publishMessage(message: String): MessageId {
          val messageId: MessageId = producerTemplate.send(message.toByteArray())
          return messageId
    }
}

Below is the code snippet to define producer template in Java based application.

@Configuration
class ProducerConfiguration {

    private ApplicationContext applicationContext;

    ProducerConfiguration(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    @Bean
    public PulsarProducerTemplate<byte[]> producerTemplate() {
        Map<String, String> config = new HashMap<>();
        config.put(TOPIC_NAME, "my-test-topic");
        config.put(AUTO_FLUSH, "true");

        return new PulsarProducerTemplateImpl(
            Schema.BYTES,
            config,
            applicationContext);
    }
}

Below code uses the producer template define above to produce messages in a Java based application.

@Component
public class SomeClass {

    private final PulsarProducerTemplate<byte[]> producerTemplate;

    public SomeClass(PulsarProducerTemplate<byte[]> producerTemplate) {
	    this.producerTemplate = producerTemplate;
    }

    public void sendMessage(String message) {
        this.producerTemplate.send(message.getBytes(StandardCharsets.UTF_8),null,new HashMap<String,String>(),null);
    }

}

Define Consumer

Defining a consumer is a two-step process as described below.

  • First, you define a consumer listener bean.
  • Second, you register your consumer listener bean as pulsar consumer by annotating it with @PulsarConsumer.

Define Listener

Create a consumer listener bean by implementing either of the below interfaces.

Implementing IPulsarListener<?>

This interface gives you control over the message and acknowledgement process and also exposes the internal Message and Consumer object.Take a look at below example.

  @Component
  class MyMessageListener: IPulsarListener<ByteArray> {

        override fun onException(
             exception: Exception,
             consumer: org.apache.pulsar.client.api.Consumer<ByteArray>,
             message: Message<ByteArray>
        ) {
             // Here you can define logic on how to handle the exception and
             // send either negative or positive ack.
         }

        override fun onSuccess(
              consumer: org.apache.pulsar.client.api.Consumer<ByteArray>,
              message: Message<ByteArray>
        ) {
             // Here you can define logic on how to handle a successful 
             // processing and send either negative or positive ack.
        }

        override fun processMessage(
              consumer: org.apache.pulsar.client.api.Consumer<ByteArray>,
              message: Message<ByteArray>
        ) {
              // Whenever a message is recived by pulsar runtime 
              // it first lands in this method.
              // 
              // Here you can define logic to process the message.
              // If an exception is thrown from this method then the onException() 
              // is executed. If no exception is thrown then the onSuccess() is
              // executed.
        }
  }
  • Control over negative and positive ack.
  • Access to internal Message and Consumer object.
  • Automation delegation to onSuccess and onException to provide unified handling capabilities
  • Preferred when corrective action required when an exception occurs is consistent irrespective of the exception

Implementing MessageListener<?>

Standard MessageListener from pulsar gives you full control over what you want to do when your listener receives a message.

   @Component
   class MyMessageListener: MessageListener<ByteArray> {
        override fun received(
              consumer: org.apache.pulsar.client.api.Consumer<ByteArray>?,
              message: Message<ByteArray>?
        ) {
              // Whenever a message is recieved it lands in this method
              // Write code here to handle the received message
              // Any exception thrown from here can be handled 
              // by using the in-built exception handling aspect provided by this library
        }
   }
  • received() is called for each message received by consumer.
  • Gives you access to Message and Consumer objects.
  • Control over how to process the message and when to send negative and positive ack.
  • Preferred if you have specific definitive actions to be taken based on the exception thrown

Register Listener as Consumer

Once you have created a consumer listener class and registered it as a spring bean, you can now identify your listener class as a pulsar consumer by annotating it with @PulsarConsumer annotation defined by pulsar-spring-client.

In this @PulsarConsumer annotation you can pass all the configuration related to the consumer as can be seen in the below example.

  @Component
  @PulsarConsumer(
       topic = Topic(
          topicNames = "Topic_names"
       ),
       subscription = Subscription(
          subscriptionName = "Subscription_Name",
          subscriptionType = "Subscription_Type"))
   class MyMessageListener: MessageListener<MessageData> {
        override fun received(
              org.apache.pulsar.client.api.Consumer<MessageData> consumer,
              Message<MessageData> message
        ) {
              // Code to handle mesasge
        }
   }

Below is the code snippet to define consumer in Java based application

  @Component
  @PulsarConsumer(
       topic = @Topic(
          topicNames = "Topic_names"
       ),
       subscription = @Subscription(
          subscriptionName = "Subscription_Name",
          subscriptionType = "Subscription_Type"))
   class MyMessageListener implements MessageListener<MessageData> {
        override fun received(
              consumer: org.apache.pulsar.client.api.Consumer<MessageData>?,
              message: Message<MessageData>?
        ) {
              // Code to handle mesasge
        }
   }

Property Resolution Feature

While creating the @PulsarConsumer annotation, you can utilize Spring's property resolution feature to directly incorporate values from your application.yml or application.properties file into the consumer properties.

For instance, if you have defined properties in your application file as follows:

    pulsar:
        sample01:
            topic:
                name: java-sample-topic01
            subscription:
                name: java-sample-sub01
                type: Key_Shared
            consumer:
                count: 1

You can use them in your consumer by providing the property path, and they will be resolved automatically:

@PulsarConsumer(
topic = Topic(topicNames = "#{pulsar.sample01.topic.name}"),
subscription = Subscription(
    subscriptionName = "#{pulsar.sample01.subscription.name}",
    subscriptionType = "#{pulsar.sample01.subscription.type}"
),
count = "#{pulsar.sample01.consumer.count}"
)

Error Handling

This library provides exception handling capabilities, both while producing and consuming a message, internally using an aspect.

Steps to follow to integrate exception handling capabilities

  • Annotate your producer/consumer with PulsarAction annotations - @PulsarProducerAction, @PulsarConsumerAction
  • Define an exception handler class with the annotation @PulsarExceptionHandlerClass and add exception handler methods with annotations @PulsarProducerExceptionHandlerFunction and @PulsarConsumerExceptionHandlerFunction

Annotating the producer

Add the annotation @PulsarProducerAction to the method where you are using the instance of PulsarProducerTemplateImpl to send a message. The action param on the annotation is to provide a short description on how/what is the message being generated before being sent on Pulsar.

Ex:

@PulsarProducerAction("description of the BL step")
fun produce(): String {
    //BL logic that is generating the message that could result in an exception
    val messageId =  producerTemplate.send(message.toByteArray())
    return messageId.toString()
}

Annotating the consumer

Add the annotation @PulsarConsumerAction to the override fun received(consumer: Consumer<ByteArray>?, message: Message<ByteArray>) method inside your consumer listener bean class

Ex:

@PulsarConsumerAction("description of the BL step")
override fun received(consumer: Consumer<ByteArray>?, message: Message<ByteArray>) {
    val messageString: String = String(message.value)
    //Bl step to process the message that could result in an exception
    consumer?.acknowledge(message.messageId)
}

Note: This strategy of exception handling on the consumer side can only be used when the consumer listener bean is implementing the org.apache.pulsar.client.api.MessageListener<?> interface.

When a consumer listener bean is implementing the interface com.intuit.spring.pulsar.client.consumer.listener.IPulsarListener<?>, any exceptions occurring within the override fun processMessage(consumer: Consumer<ByteArray>, message: Message<ByteArray>) method are delegated to override fun onException(e: Exception, consumer: Consumer<ByteArray>, message: Message<ByteArray>) and hence would not reach the exception handler methods even if you defined one.

Define the exception handler class and methods

Add an exception handler class and annotate it with @PulsarExceptionHandlerClass and within the class, add exception handler methods with details on which exceptions are being handled using annotations @PulsarProducerExceptionHandlerFunction and @PulsarConsumerExceptionHandlerFunction.

The exception handler methods need to implement the below functional interface com.intuit.spring.pulsar.client.exceptions.PulsarExceptionHandler, and only then they qualify to be handlers used when an exception occurs

Ex:

@PulsarExceptionHandlerClass
@Component
/**
 * This class will add exception handlers to handle all exceptions
 * thrown by Pulsar producers and consumers
 */
class PulsarExceptionHandlers {
    //This method is invoked when a producer throws a BLServiceException or IOException
    @PulsarProducerExceptionHandlerFunction(BLServiceException::class, IOException::class)
    var pulsarProducerExceptionHandler = PulsarExceptionHandler { exceptionHandlerParams ->
        println("Exception occurred while performing ${exceptionHandlerParams.action}")
        println("Handling producer exception ${exceptionHandlerParams.exception}")
    }
    
    //This method is invoked when a consumer throws a DownstreamServiceException
    @PulsarConsumerExceptionHandlerFunction(DownstreamServiceException::class)
    var pulsarConsumerExceptionHandler = PulsarExceptionHandler { exceptionHandlerParams ->
        println("Exception occurred while performing ${exceptionHandlerParams.action}")
        println("Handling consumer exception ${exceptionHandlerParams.exception}")
    }
}

Sample Applications

If the above descriptions are not enough , and you want to see some sample applications with working code on how to use the library, worry not we have you covered.

We have written some sample spring boot application both in java and kotlin for you to refer to. You can find the samples here. The samples project also has a README which describes in detail the sample applications and how to run them in your local machine. If you are not interested in running these samples and just want to look at the code directly you can visit below links.

Getting Started

Refer to the Getting Started section which contains instructions on setting up the library for development/debugging purposes.

Code of Conduct

Please see our Code of conduct.

Resources

For more information on how to use this library check below reference manual: Reference Manual

Contributing to Spring Apache Pulsar

Check Contributing for contribution

License

This Spring Pulsar library is released under the terms of the MIT License (see LICENSE.md).

spring-pulsar's People

Contributors

zulfiquiar-khan avatar kritikajin avatar hpa16 avatar

Stargazers

 avatar Timothy Spann avatar Qiang Zhao avatar Mayank avatar Nashet  Ali avatar  avatar Nityansh avatar Mewan DAlmeida avatar  avatar Gourav Gunjan avatar Md Farhan Khan avatar  avatar

Watchers

 avatar  avatar Lucy avatar Tony Hernandez avatar Mewan DAlmeida avatar  avatar Athitya Kumar avatar  avatar  avatar

spring-pulsar's Issues

Support for Apache Pulsars Reader API

Is your feature request related to a problem? Please describe.
Apache Pulsar client describes support for Reader API , Reader API is still not supported in Spring for pulsar.

Describe the solution you'd like
A clear and concise description of what you want to happen.
Add support for Apache Pulsars , Reader API

Describe alternatives you've considered
No Alternatives.

Additional context
No Context

Add section in Readme explaining property resolution support in @PulsarConsumer

Is your feature request related to a problem? Please describe.
There are no docs related to property resolution support available in @PulsarConsumer.Because of this it is absolutely impossible for users to know about this useful feature.

Describe the solution you'd like
Add section in Readme explaining property resolution support in @PulsarConsumer. Add example code in both Java and Kotlin.

Authentication support for TLS, Athenz, and Oauth2

Is your feature request related to a problem? Please describe.
Current implementation only supports basic authentication for pulsar client, while as per Apache Pulsar docs , pulsar also supports below three authentication mechanisms.

  • TLS
  • AuthenZ
  • Oauth2

These mechanisms of authentication are not supported by Spring Pulsar library.

Describe the solution you'd like
Support for below pulsar client authentication mechanism.

  • TLS
  • AuthenZ
  • Oauth2

Describe alternatives you've considered
There are no alternatives for it as of now.

Add support for client authentication using Athenz

Is your feature request related to a problem? Please describe.
We have still not added support for Athenz. For some odd reason, Athenz related classes in pulsar-client is not avaialble.

As per the docs here. Below is the code to define and athez auth config.

Map<String, String> authParams = new HashMap();
authParams.put("tenantDomain", "shopping"); // Tenant domain name
authParams.put("tenantService", "some_app"); // Tenant service name
authParams.put("providerDomain", "pulsar"); // Provider domain name
authParams.put("privateKey", "file:///path/to/private.pem"); // Tenant private key path
authParams.put("keyId", "v1"); // Key id for the tenant private key (optional, default: "0")

Authentication athenzAuth = AuthenticationFactory
        .create(AuthenticationAthenz.class.getName(), authParams);

But the AuthenticationAthenz is not available in pulsar-client library version 2.7.5.

Describe the solution you'd like
Support for client authentication using Athenz

Add support for producing messages using @PulsarProducer

Is your feature request related to a problem? Please describe.
In order to publish messages users has to create a producer template, as shown in below code. Register this producer template as a spring bean , autowire it into any spring component and the use send() or sendAsync method to publish message.

@Configuration
open class ProducerConfiguration(val applicationContext: ApplicationContext) {

    @Bean
    open fun producerTemplate(): PulsarProducerTemplate<ByteArray> {
        return PulsarProducerTemplateImpl<ByteArray>(
                    pulsarProducerConfig = PulsarProducerConfig(
                        schema = Schema.BYTES,
                        topicName = "persistent://tenent/namespace/topicName",
                        autoFlush = true),
                    applicationContext = applicationContext)
    }
}

This is slightly cumbersome for basic and simple usecases where users just want to produce message without
worrying much about autowiring, injection and different properties to set for producer template.

Describe the solution you'd like
Just as for consumer we should provide a support for @PulsarProducer annotation.This annotation can be defined over public methods and object returned by the method will be published to pulsar topics.

Different configuration about the producer can be configured in @PulsarProducer annotation. Also we should have support for property resolution in @PulsarProducer as we have for @PulsarConsumer.

Support for Pulsar Function creation using Spring For Pulsar library.

Is your feature request related to a problem? Please describe.
Apache Pulsar docs describes support for Pulsar Function.It is something like AWS lambda.

Pulsar Functions are lightweight compute processes that

  • consume messages from one or more Pulsar topics,
  • apply a user-supplied processing logic to each message,
  • publish the results of the computation to another topic.

You can find more details about Pulsar Functions here.

Spring For Pulsar as of now does not support creation of Pulsar Function using spring-pulsar-core library.

Describe the solution you'd like
Support for Pulsar Function creation using Spring For Pulsar library.

Enforce coverage checks in maven builds

Describe the bug
Coverage checks are not enforced in maven builds. As of now once we run maven install, if the coverage is low the build is not failing.Either the coverage is not computed or if computed it is not enforced.

To Reproduce
Steps to reproduce the behavior:
Disable all unit tests and run mvn clean install build will succeed.

Expected behavior
Coverage should be computed and enforced. If project coverage is less that 95% or the diff coverage is less that 95% the build should fail.

Enhace Getting Started section in Readme

Is your feature request related to a problem? Please describe.
The current details in Getting Started is not enough to setup a developement workspace. It just has two steps.

  • Cloning the repo
  • Executing mvn clean install.

There are more steps and dependecies required tfor a developement workspace setup.
It also does not talk about different lifecycle phases that run on clean install.

In short the current description is not enough for creating a developement workspace.

Describe the solution you'd like
Add more details on Getting Started docs, so that it exhaustively details out every dependecy and steps to setup development workspace.

Few details that has to be added are.

  • Which version of Java and maven should be present in machine
  • How to setup GPG?
  • Setup of docker to run pulsar container.
  • Executing pulsar container before running maven clean install

The above is not an exhaustive list.

Additional context
The list defined above is not an exhaustive list of details that has to be added in Getting Started. We need to do fresh developement setup in a fresh machine and then figure out the steps , dependecies needed for setup.

After that add all the required steps and dependecies details in Getting Started.

Reformat Error Handling section in Readme

Is your feature request related to a problem? Please describe.
Error Handling section is not formatted. For example

  • Class and interface name are normal text with fully qulified names, which looks ugly
  • Description is not properly paragraphed
  • Some descriptions can be put in as a note.

Describe the solution you'd like
Reformat Error Handling section.

  • Add class and interface names as links.
  • For class and interface name dont use fully qualified names.
  • Add notes as blockquote.
  • Properly paragraph descriptions.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.