Giter Site home page Giter Site logo

dropwizard-activemq-bundle's Introduction

Dropwizard ActiveMQ Bundle

Maven Central

Since Dropwizard ActiveMQ Bundle is written in Java 8, your app must also be compiled with Java 8

Use it when you need to send and receive JSON (jackson) via ActiveMq in your Dropwizard application.

Please have a look at the Example application.

Change History

Version 0.5.2

  • Added support for object-messages

Version 0.5.1

  • Fixing sporadic issue where "session is closed" when ack'ing received message

Version 0.5.0

  • Add support to connect to multiple activeMq brokers
  • Upgraded to Dropwizard 1.0.2 and ActiveMQ 5.14.1

Version 0.4.0

  • First version released to Maven Central
  • Code is identical to version 0.3.13

Version 0.3.13

  • Upgraded to Dropwizard 0.9.1 and ActiveMQ 5.13.0

Version 0.3.12

  • Added support for receiving ActiveMQMapMessage as java.util.Map

Version 0.3.11

  • Added support for connecting to secure brokers
  • Upgraded to ActiveMQ client version 5.11.1

Version 0.3.10

  • Set correlationID on outgoing messages if sent in same thread as incoming message

Version 0.3.9

  • Now using ActiveMQ 5.10 and Dropwizard 0.7.1
  • Reduces info-logging

Version 0.3.8.1

  • Fixed issue #6 - Improve handling of failing exceptionHandler

Version 0.3.7

Version 0.3.6

  • Fixed issue #5 - Use less verbose errors when 'The Consumer is closed'

Version 0.3.5

  • Improved error handling for receiver + added healthCheck for each receiver
  • idleTimeout is now specified in mills
  • Added time to live for JMS message sending
  • Added more flexible option for creating messages by passing a function

Version 0.3.4 - 20140428

  • Removed resource-leakage when sending messages using multiple senders

Version 0.3.3 - 20140428

  • Added more pool-config options
  • HealthCheck is not using pooled factory anymore to prevent hang

Version 0.3.2 - 20140428

  • Added activeMQ-HealthCheck
  • Added max-give-up-time for graceful shutdown
  • Added more debug logging

Version 0.3.1 - 20140411

  • Added ActiveMQSenderFactory and JsonError

Version 0.3 - 20140411

  • It is now possible to change between queues and topics by prefixing the destination-name

Version 0.2 - 20140410

  • Added custom exception-handler

Version 0.1 - 20140410

  • Initial version

Maven

Add it as a dependency:

    <dependency>
        <groupId>com.kjetland.dropwizard</groupId>
        <artifactId>dropwizard-activemq</artifactId>
        <version> INSERT LATEST VERSION HERE </version>
    </dependency>

Configuration

Your config-class must implement ActiveMQConfigHolder like this:

public class Config extends Configuration implements ActiveMQConfigHolder {

    @JsonProperty
    @NotNull
    @Valid
    private ActiveMQConfig activeMQ;

    public ActiveMQConfig getActiveMQ() {
        return activeMQ;
    }
}

And add the following to your config.yml:

activeMQ:
  brokerUrl: tcp://localhost:61616

(Almost?) All config-options:

activeMQ:
  brokerUrl: failover:(tcp://broker1.com:61616,tcp://broker2.com:61616)?randomize=false
  # brokerUsername: username
  # brokerPassword: password
  # shutdownWaitInSeconds: 20
  # healthCheckMillisecondsToWait: 2000
  # timeToLiveInSeconds: -1     (Default message time-to-live is off. Specify a maximum lifespan here in seconds for all messages.)
  # trustedPackages: (To prevent malicious code from being deserialized. Needed if you want to receive plain object messages, see http://activemq.apache.org/objectmessage.html)
  #   - com.some.package
  #   - java.util

  pool:
    maxConnections: 1
    maximumActiveSessionPerConnection: 3
    blockIfSessionPoolIsFull: false
    idleTimeoutMills: 30000
    # expiryTimeoutMills:
    createConnectionOnStartup: false
    timeBetweenExpirationCheckMillis: 20000

Use it like this

(Please have a look at the Example application)

public class ActiveMQApp extends Application<Config> {

    public static void main(String[] args) throws Exception {
        new ActiveMQApp().run(args);
    }

    private ActiveMQBundle activeMQBundle;

    @Override
    public void initialize(Bootstrap<Config> configBootstrap) {

        // Create the bundle and store reference to it
        this.activeMQBundle = new ActiveMQBundle();
        // Add the bundle
        configBootstrap.addBundle(activeMQBundle);
    }

    @Override
    public void run(Config config, Environment environment) throws Exception {


        // Create a queue sender
        ActiveMQSender sender = activeMQBundle.createSender("test-queue", false);

        // or like this:
        ActiveMQSender sender2 = activeMQBundle.createSender("queue:test-queue", false);

        // where messages have a 60 second time-to-live:
        ActiveMQSender sender3 = activeMQBundle.createSender("queue:test-queue", false, Optional.of(60));

        // Create a topic-sender
        ActiveMQSender sender4 = activeMQBundle.createSender("topic:test-topic", false);

        // use it
        sender.send( someObject );
        sender.sendJson("{'a':2, 'b':3}");

        // If you require full control of message creation, pass a Java 8 function that takes a javax.jms.Session parameter:
        sender.send((Session session) -> {
            TextMessage message = session.createTextMessage();
            message.setText("{'a':2, 'b':3}");
            message.setJMSCorrelationID(myCorrelationId);
            return message;
        });


        // Create a receiver that consumes json-strings using Java 8
        activeMQBundle.registerReceiver(
                "test-queue", // default is queue. Prefix with 'topic:' or 'queue:' to choose
                (json) -> System.out.println("json: " + json),
                String.class,
                true);


        // Create a receiver that consumes SomeObject via Json using Java 8
        activeMQBundle.registerReceiver(
                            "test-queue-2",
                            (o) -> System.out.println("Value from o: " + o.getValue()),
                            SomeObject.class,
                            true);

        // Create a receiver that consumes SomeObject via Json using Java 7
        activeMQBundle.registerReceiver(
                            "test-queue-3",
                            new ActiveMQReceiver<SomeObject>() {
                                @Override
                                public void receive(SomeObject o) {
                                    System.out.println"Value from o: " + o.getValue());
                                }
                            },
                            SomeObject.class,
                            true);
    }
}

Topics and queue

By default all destination-names refers to queues. If you want to specify topic (or queue), you prefix it with:

  • topic:
  • queue:

Custom exception-handling

activeMQBundle.registerReceiver(
    config.getInboundJmsQueue(),
    (ManualRequest m) -> myService.processMessage(m),
    ManualRequest.class,
    // Add your custom exception-handler here
    (message, exception) -> {
        System.out.println("Error with this message: " + message);
        exception.printStackTrace();
        return true;
    });

Connecting to secure brokers

Connecting to a secure broker is possible by setting both the username (brokerUsername) and password (brokerPassword) in an application's config file.

Connecting to multiple brokers

The library has a second bundle that enables a dropwizard application to connect to multiple brokers. This bundle will correctly register multiple healthchecks for each queue created, differentiating them using the brokerName given in the config

Usage is the same with the following changes:

(Please have a look at the Multi Broker Example application)

// Create the multi bundle and store reference to it
    this.activeMQBundle = new ActiveMQMultiBundle();
// Add the bundle
    configBootstrap.addBundle(activeMQBundle);


public void run(MultiQConfig config, Environment environment) throws Exception {
        MultiQResource multiQResource = new MultiQResource();
        environment.jersey().register(multiQResource);
        activeMQBundle.getActiveMQBundleMap().entrySet()
            .stream()
            .forEach(activeMQEntry->activateQueue(multiQResource, activeMQEntry));
}

private void activateQueue(MultiQResource multiQResource, Map.Entry<String, ActiveMQBundle> activeMQEntry) {
    String queueName = activeMQEntry.getKey();
    ActiveMQBundle activeMQBundle = activeMQEntry.getValue();

    // Set up the sender for our queue
    multiQResource.addSender(queueName, activeMQBundle.createSender( queueName, false));

    // Set up a receiver that just logs the messages we receive on our queue
    activeMQBundle.registerReceiver(
        queueName,
        (message) -> log.info("\n*****\nWe received a message on: {} from activeMq: \n{}\n*****", queueName, message),
        Message.class,
        true);
}

In a real application you will have to handle which messages are sent / received to which broker based on your requirements. The broker name will be your identification key for the mappings

You should use the MultiQConfig which is a map of ActiveMQConfig configs

activeMQConnections:
  ieQueue:
      brokerUrl: tcp://localhost:61616
  gbQueue:
      brokerUrl: tcp://localhost:61626

Receiving object messages

The bundle also supports receiving plain object messages. The code required for receiving object messages is the same as for json deserialized objects. The magic is happening internally in the library before the message is dispatched to the receiver.

To avoid malicious code from being deserialized, you must configure trusted packages in the bundle configuration, i.e. the packages containing the code you want to receive. More information can be found at http://activemq.apache.org/objectmessage.html . Default behaviour is to reject all packages.

activeMQ:
  ...
  trustedPackages:
    - com.youpackages.*
    - java.util

dropwizard-activemq-bundle's People

Contributors

kjeivers avatar mbknor avatar oddmar avatar petternordholm avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dropwizard-activemq-bundle's Issues

Improve handling of failing exceptionHandler

If the exceptionHandler is failing, we are not acking the message, nor waiting some time before retrying. This results in 100% CPU loop (re)processing the same message and failing every time.
Need to have special handling of this situation, and possibly add slowdown

ActiveMQHealthCheck and failover transport

Hi,

In my very simple test scenario I am using a failover transport layer for the client URI. This URI does not have a maxReconnectAttempts set so will block until it can get a connection.

When there are no brokers up this means the healthcheck API blocks indefinitely.

The URI could be changed to include a maxreconnect (i.e., maxReconnectAttempts=X), but that will affect actual production clients where blocking until we have a connection is a desired behaviour.

Is there any way to prevent the healthcheck from blocking?

Setting redelivery policy for client

Do you think is new PR with support for configuration redelivery policies map worth including?
If yes, I'll try to cut some polished PR from my current work in progress. It'll be great.

ActiveMQConfigRedeliveryPolicy - has reasonable properties (taken from: http://activemq.apache.org/redelivery-policy.html)

Example of configuration taken from my real use case:

...
  activeMQConfig:
    ...
    redeliveryPolicies:
      mainQueue:
        initialRedeliveryDelay: 5000
        redeliveryDelay: 10000
        maximumRedeliveries: 2
      tryQueue:
        initialRedeliveryDelay: 60000
        redeliveryDelay: 60000
        maximumRedeliveries: 5

ActiveMQBundle core modification:

for (String name : activeMQConfig.redeliveryPolicies.keySet()) {
            String destinationName = DestinationCreatorImpl.provideDestinationName(name);
            ActiveMQDestination destination = DestinationCreatorImpl.isQueue(name) ?
                    new ActiveMQQueue(destinationName) : new ActiveMQTopic(destinationName);
            final RedeliveryPolicy redeliveryPolicy = activeMQConfig.redeliveryPolicies.get(name).create();
            realConnectionFactory.getRedeliveryPolicyMap().put(destination, redeliveryPolicy);
        }

ActiveMQConfig change:

   @JsonProperty
   @Valid
   public Map<String, ActiveMQConfigRedeliveryPolicy> redeliveryPolicies = Maps.newHashMap();

Rest of small changes are in my fork: https://github.com/mlotysz/dropwizard-activemq-bundle/commit/533623a48e7abdb317bf708660cd0ecdf13236b5

Consuming Message Properties

Hi there,

I am trying to see if there is a way to access the message properties when consuming a message from the queue. I've dug into the source code a bit and it looks like only the message body is accessible from the receiver.

Is there something I am missing? Let me know and thank you so much!

Cheers,
Teresa

Licensing

Morten,

Can you please specify some licensing details for this project. I would like to use it in my project.

Thanks,
Michał

maven repo unavailable

The maven repo mentioned in the readme:

https://raw.githubusercontent.com/mbknor/mbknor.github.com/master/m2repo/releases

Is now throwing a 404.

Has it been moved elsewhere? Any plans to host on central?

does it support clustering too?

I am quite a newbee in this space. So excuse me if my question sounds too vague or naive. But just wanted to understand do we need to do any kind of config or changes in the code to make it support a MQ in clustered mode? Currently we are using ZeroMQ in our project. And it is written only to support single instance peer and requires good amount of code changes to support clustering. Please let me know your thought on this.

Let me know if we need to discuss this over skype.
Thanks,
Vikesh

Use less verbose errors when 'The Consumer is closed'

Since it appears that 'Uncaught exception - will try to recover' - 'javax.jms.IllegalStateException: The Consumer is closed' is happening from time to time, we should not log it as such verbose ERROR

ERROR [2014-05-06 12:17:59,657] com.kjetland.dropwizard.activemq.ActiveMQReceiverHandler: Uncaught exception - will try to recover
! javax.jms.IllegalStateException: The Consumer is closed
! at org.apache.activemq.ActiveMQMessageConsumer.checkClosed(ActiveMQMessageConsumer.java:836) ~[activemq-client-5.9.0.jar:5.9.0]
! at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:593) ~[activemq-client-5.9.0.jar:5.9.0]
! at org.apache.activemq.jms.pool.PooledMessageConsumer.receive(PooledMessageConsumer.java:67) ~[activemq-jms-pool-5.9.0.jar:5.9.0]
! at com.kjetland.dropwizard.activemq.ActiveMQReceiverHandler.runReceiveLoop(ActiveMQReceiverHandler.java:169) ~[dropwizard-activemq-0.3.5.jar:na]
! at com.kjetland.dropwizard.activemq.ActiveMQReceiverHandler.run(ActiveMQReceiverHandler.java:137) ~[dropwizard-activemq-0.3.5.jar:na]
! at java.lang.Thread.run(Thread.java:744) [na:1.8.0]
INFO  [2014-05-06 12:17:59,657] com.kjetland.dropwizard.activemq.ActiveMQReceiverHandler: Setting up receiver for Consumer.x.VirtualTopic.request.v1
INFO  [2014-05-06 12:17:59,681] com.kjetland.dropwizard.activemq.ActiveMQReceiverHandler: Started listening for messages on Consumer.x.VirtualTopic.request.v1

trying with eclipse

Hey, Thanks for this project.

I am a newbie trying activeMQ with dropwizard.
I am using eclipse.
I have downloaded your zip file and using that.
But I do not know how to run this.
I do not see any file with main function.
Do I need to configure a few things extra ? Am I missing something ?

Thanks!
Alok

Way to configure multiple activemq endpoints

I see that ActiveMQBundle is a ConfiguredBundle. This means that we cannot use multiple ActiveMQBundle's in the same application.
I need to connect to multiple different ActiveMQ endpoints in my application. Can you suggest a way to do that?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.