Giter Site home page Giter Site logo

sonus21 / rqueue Goto Github PK

View Code? Open in Web Editor NEW
301.0 13.0 54.0 9.56 MB

Rqueue aka Redis Queue [Task Queue, Message Broker] for Spring framework

Home Page: https://sonus21.github.io/rqueue

License: Apache License 2.0

Java 96.03% Lua 0.68% Dockerfile 0.01% CSS 0.52% JavaScript 0.96% HTML 1.56% Shell 0.23%
spring-boot spring redis asynchronous-tasks delayed-jobs delayed-queue java task-executor task-scheduler task-manager

rqueue's Introduction

Rqueue Logo

Rqueue: Redis Queue, Task Queue, Scheduled Queue for Spring and Spring Boot

Build Status Coverage Status Maven Central Javadoc License Source Dashboard

Rqueue is an asynchronous task executor(worker) built for spring and spring-boot framework based on the spring framework's messaging library backed by Redis. It can be used as message broker as well, where all services code is in Spring.


Message Flow

Features

  • Instant delivery : Instant execute this message in the background
  • Message scheduling : A message can be scheduled for any arbitrary period
  • Unique message : Unique message processing for a queue based on the message id
  • Periodic message : Process same message at certain interval
  • Priority tasks : task having some special priority like high, low, medium
  • Message delivery : It's guaranteed that a message is consumed at least once. (Message would be consumed by a worker more than once due to the failure in the underlying worker/restart-process etc, otherwise exactly one delivery)
  • Message retry : Message would be retried automatically on application crash/failure/restart etc.
  • Automatic message serialization and deserialization
  • Message Multicasting : Call multiple message listeners on every message
  • Batch Message Polling : Fetch multiple messages from Redis at once
  • Metrics : In flight messages, waiting for consumption and scheduled messages
  • Competing Consumers : multiple messages can be consumed in parallel by different workers/listeners.
  • Concurrency : Concurrency of any listener can be configured
  • Queue Priority :
    • Group level queue priority(weighted and strict)
    • Sub queue priority(weighted and strict)
  • Long execution job : Long running jobs can check in periodically.
  • Execution Backoff : Exponential and fixed back off (default fixed back off)
  • Middleware : Add one or more middleware, middlewares are called before listener method.
  • Callbacks : Callbacks for dead letter queue, discard etc
  • Events : 1. Bootstrap event 2. Task execution event.
  • Redis connection : A different redis setup can be used for Rqueue
  • Redis cluster : Redis cluster can be used with Lettuce client.
  • Redis Sentinel : Redis sentinel can be used with Rqueue.
  • Reactive Programming : Supports reactive Redis and spring webflux
  • Web Dashboard : Web dashboard to manage a queue and queue insights including latency

Requirements

  • Spring 5+, 6+
  • Java 1.8+,17
  • Spring boot 2+,3+
  • Lettuce client for Redis cluster
  • Read master preference for Redis cluster

Getting Started

Dependency

Snapshot Version: https://s01.oss.sonatype.org/content/repositories/snapshots/com/github/sonus21/
Release Version: Maven central

Spring Boot

NOTE:

  • For spring boot 2.x use Rqueue 2.x
  • For spring boot 3.x use Rqueue 3.x

Get the latest one from Maven central

  • Add dependency

    • Gradle
          implementation 'com.github.sonus21:rqueue-spring-boot-starter:2.13.1-RELEASE'
    • Maven
       <dependency>
          <groupId>com.github.sonus21</groupId>
          <artifactId>rqueue-spring-boot-starter</artifactId>
          <version>2.13.1-RELEASE</version>
      </dependency>

    No additional configurations are required, only dependency is required.


Spring Framework

NOTE

  • For spring framework 5.x use rqueue-spring 2.x
  • For spring framework 6.x use rqueue-spring 3.x

Get the latest one from Maven central

  • Add Dependency
    • Gradle
          implementation 'com.github.sonus21:rqueue-spring:2.13.1-RELEASE'
    • Maven
       <dependency>
         <groupId>com.github.sonus21</groupId>
         <artifactId>rqueue-spring</artifactId>
         <version>2.13.1-RELEASE</version>
       </dependency>
  • Add annotation EnableRqueue on application config class
  • Provide a RedisConnectionFactory bean
Configuration
@EnableRqueue
public class Application {
  @Bean
  public RedisConnectionFactory redisConnectionFactory() {
    // return a redis connection factory
  }
}

Message publishing/Task submission

All messages need to be sent using RqueueMessageEnqueuer bean's enqueueXXX, enqueueInXXX and enqueueAtXXX methods. It has handful number of enqueue, enqueueIn, enqueueAt methods, we can use any one of them based on the use case.

public class MessageService {

  @AutoWired
  private RqueueMessageEnqueuer rqueueMessageEnqueuer;

  public void doSomething() {
    rqueueMessageEnqueuer.enqueue("simple-queue", "Rqueue is configured");
  }

  public void createJOB(Job job) {
    rqueueMessageEnqueuer.enqueue("job-queue", job);
  }

  // send notification in 30 seconds
  public void sendNotification(Notification notification) {
    rqueueMessageEnqueuer.enqueueIn("notification-queue", notification, 30 * 1000L);
  }

  // enqueue At example
  public void createInvoice(Invoice invoice, Instant instant) {
    rqueueMessageEnqueuer.enqueueAt("invoice-queue", invoice, instant);
  }

  // enqueue with priority, when sub queues are used as explained in the queue priority section.
  enum SmsPriority {
    CRITICAL("critical"),
    HIGH("high"),
    MEDIUM("medium"),
    LOW("low");
    private String value;
  }

  public void sendSms(Sms sms, SmsPriority priority) {
    rqueueMessageEnqueuer.enqueueWithPriority("sms-queue", priority.value(), sms);
  }

  // Index chat every 1 minute
  public void sendPeriodicEmail(Email email) {
    rqueueMessageEnqueuer.enqueuePeriodic("chat-indexer", chatIndexer, 60_000);
  }

}

Worker/Consumer/Task Executor/Listener

Any method that's part of spring bean, can be marked as worker/message listener using RqueueListener annotation

@Component
@Slf4j
public class MessageListener {

  @RqueueListener(value = "simple-queue")
  public void simpleMessage(String message) {
    log.info("simple-queue: {}", message);
  }

  @RqueueListener(value = "job-queue", numRetries = "3",
      deadLetterQueue = "failed-job-queue", concurrency = "5-10")
  public void onMessage(Job job) {
    log.info("Job alert: {}", job);
  }

  @RqueueListener(value = "push-notification-queue", numRetries = "3",
      deadLetterQueue = "failed-notification-queue")
  public void onMessage(Notification notification) {
    log.info("Push notification: {}", notification);
  }

  @RqueueListener(value = "sms", priority = "critical=10,high=8,medium=4,low=1")
  public void onMessage(Sms sms) {
    log.info("Sms : {}", sms);
  }

  @RqueueListener(value = "chat-indexing", priority = "20", priorityGroup = "chat")
  public void onMessage(ChatIndexing chatIndexing) {
    log.info("ChatIndexing message: {}", chatIndexing);
  }

  @RqueueListener(value = "chat-indexing-daily", priority = "10", priorityGroup = "chat")
  public void onMessage(ChatIndexing chatIndexing) {
    log.info("ChatIndexing message: {}", chatIndexing);
  }

  // checkin job example
  @RqueueListener(value = "chat-indexing-weekly", priority = "5", priorityGroup = "chat")
  public void onMessage(ChatIndexing chatIndexing,
      @Header(RqueueMessageHeaders.JOB) com.github.sonus21.rqueue.core.Job job) {
    log.info("ChatIndexing message: {}", chatIndexing);
    job.checkIn("Chat indexing...");
  }
}

Dashboard

Link: http://localhost:8080/rqueue

Dashboard

Queue Statistics

Micrometer based dashboard for queue

Grafana Dashboard

Message Waiting For Execution

Explore Queue

Recent jobs details

Jobs


Status

Rqueue is stable and production ready, it's processing 100K+ messages daily in production environment. Some of the Rqueue Users

Airtel    Vonage    Vonage    Line Chat

We would love to add your organization name here, if you're one of the Rqueue users, please raise a PR/issue .


Support

  • Please report bug,question,feature(s) to issue tracker.
  • Ask question on StackOverflow using #rqueue tag

Contribution

You are most welcome for any pull requests for any feature/bug/enhancement. You would need Java8 and gradle to start with. In root build.gradle file comment out spring related versions, or set environment variables for Spring versions. You can use module, class and other diagrams to familiarise yourself with the project.

Please format your code with Google Java formatter.

Links

License

© Sonu Kumar 2019-Instant.now

The Rqueue is released under version 2.0 of the Apache License.

rqueue's People

Contributors

alexcn avatar alexkarezin avatar dependabot[bot] avatar malkovro avatar rdwallis avatar scotty6435 avatar shuohao avatar sonus21 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rqueue's Issues

argument type mismatch

Thank you for this library :)
I got java.lang.IllegalStateException: argument type mismatch error while trying to send objects to the queue.

Stacktrace:

java.lang.IllegalStateException: argument type mismatch
Endpoint [com.octovan.service.redis.queue.RedisMessageListener]
Method [public void com.octovan.service.redis.queue.RedisMessageListener.onMessage(com.octovan.service.redis.queue.JobX) throws java.lang.Exception] with argument values:
 [0] [type=com.octovan.service.redis.queue.JobX] [value=JobX(message=Test2, id=id)] 
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:176)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
	at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMatch(AbstractMethodMessageHandler.java:565)
	at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessageInternal(AbstractMethodMessageHandler.java:520)
	at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:454)
	at com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer$MessageExecutor.run(RqueueMessageListenerContainer.java:506)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalArgumentException: argument type mismatch
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
	... 8 common frames omitted

My classes:

Object that I want to send over queue

public class JobX {
    private String message;
    private String id;

    public JobX() {
    }

    public JobX(String message, String id) {
        this.message = message;
        this.id = id;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

My queue listener

@RqueueListener(value = "delayed-queue-job", numRetries = "1", deadLetterQueue = "failed-job-queue")
    public void onMessage(JobX job) throws Exception {
        logger.info("Job:" + job);
    }

My message send function

public void sendDelayedMessage(String message, String id){
        rqueueMessageSender.put("delayed-queue-job", new JobX(message, id), 1);
    }

Thanks.
Edit: Markdown

Cannot dynamic active(true/false) @RqueueListener

Is your feature request related to a problem? Please describe.
I cannot flexible set active of @RqueueListener from end-user. I just able to set it from code is active (true/false)

Additional context
Suppose, I have task 1 done that takes about 5 hours (The time limit to complete is dynamic. And I can only know when it will be completed). I would expect the @RqueueListener to not execute and wait until task 1 is complete then the @RqueueListener will continue executing.

Too many queue and rqueue don't working anymore

Describe the bug
Presently in production, we have 72 queue. I add 73 queue today and rqueue don't working anymore. If I remove last one, rqueue function normally.

How to Reproduce
Add 73 queue and try it.

Library Dependencies

  • Spring Boot: 2.4.4
  • Spring Messaging: 5.3.5
  • Spring Data Redis: 2.4.6

Additional Details
I try on windows and docker and It have same problem.
I check it with debugger and function is not executed.
If I check in /rqueue/running, task in on running but function is not trigger.

Eval is not supported in cluster environment

"return #redis.pcall('keys', 'rqueue-*')".getBytes(), ReturnType.INTEGER, 0);

spring data redis JedisClusterScriptingCommands not supported

Caused by: org.springframework.dao.InvalidDataAccessApiUsageException: Eval is not supported in cluster environment. at org.springframework.data.redis.connection.jedis.JedisClusterScriptingCommands.eval(JedisClusterScriptingCommands.java:74) at org.springframework.data.redis.connection.DefaultedRedisConnection.eval(DefaultedRedisConnection.java:1311) at com.github.sonus21.rqueue.utils.RedisUtils.updateAndGetVersion(RedisUtils.java:73) at com.github.sonus21.rqueue.config.RqueueListenerBaseConfig.rqueueConfig(RqueueListenerBaseConfig.java:87)

RedisCommandExecutionException : RedisCommandExecutionException : command arguments must be strings or integers

What's not working?
the connection keeps getting lost suddenly after a while when 10 threads are running concurrently ( the fail time is faster if we have more threads). the weird thing is after the connection is established again most of the jobs fail with a Redis error that says command arguments must be strings or integers .

Caused by: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
Caused by: io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
Caused by: java.net.BindException: Address already in use: no further information
Caused by: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
Caused by: io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
Caused by: java.net.BindException: Address already in use: no further information
Caused by: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
Caused by: io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
Caused by: java.net.BindException: Address already in use: no further information
Caused by: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
Caused by: io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
Caused by: java.net.BindException: Address already in use: no further information
Caused by: io.lettuce.core.RedisCommandExecutionException: ERR Error running script (call to f_ea57ebdb2b8145a8ee3c9c6113ab6fdea9cb0411): @user_script:17: @user_script: 17: Lua redis() command arguments must be strings or integers
Caused by: io.lettuce.core.RedisCommandExecutionException: ERR Error running script (call to f_ea57ebdb2b8145a8ee3c9c6113ab6fdea9cb0411): @user_script:17: @user_script: 17: Lua redis() command arguments must be strings or integers
Caused by: java.util.concurrent.RejectedExecutionException: Task com.github.sonus21.rqueue.listener.RqueueExecutor@33b94386 rejected from java.util.concurrent.ThreadPoolExecutor@23d6532c[Running, pool size = 10, active threads = 10, queued tasks = 0, completed tasks = 9391]
Caused by: io.lettuce.core.RedisCommandExecutionException: ERR Error running script (call to f_ea57ebdb2b8145a8ee3c9c6113ab6fdea9cb0411): @user_script:17: @user_script: 17: Lua redis() command arguments must be strings or integers
Caused by: io.lettuce.core.RedisCommandExecutionException: ERR Error running script (call to f_ea57ebdb2b8145a8ee3c9c6113ab6fdea9cb0411): @user_script:17: @user_script: 17: Lua redis() command arguments must be strings or integers
Caused by: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
Caused by: io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
Caused by: java.net.BindException: Address already in use: no further information
Caused by: io.lettuce.core.RedisCommandExecutionException: ERR Error running script (call to f_ea57ebdb2b8145a8ee3c9c6113ab6fdea9cb0411): @user_script:17: @user_script: 17: Lua redis() command arguments must be strings or integers
Caused by: io.lettuce.core.RedisCommandExecutionException: ERR Error running script (call to f_ea57ebdb2b8145a8ee3c9c6113ab6fdea9cb0411): @user_script:17: @user_script: 17: Lua redis() command arguments must be strings or integers

for full log:
https://pastebin.com/WGp5H5Ed

What're application dependencies ?

  • Rqueue Version:
  • Spring Boot Version: 2.8

How to Reproduce (optional)?

@RqueueListener(value = "tags", deadLetterQueue = "tags-failed", concurrency = "10")
    public void sendTag(int value) {
        log.info("tags: {}", value);
        tagsRepo.save(new Tags(value));
        String url = "https://jsonplaceholder.typicode.com/todos/1";
        this.restTemplate.getForObject(url, String.class);

    }
  • After around 8000 or 9000 jobs the problem starts to happen.

note:
I don't think this is a bug the problem might be from the local docker repo and memory problem , when I try to connect to remote cluster it works perfectly fine the problem is that why when the connection is restored or established the command arguments must be strings or integers error start appearing.

Option to add dashboard path prefix using rqueue.web.url.prefix config

rqueue.web.url.prefix config option looks to have partially been removed in v2.7.0
Commit: c96d816

It's been removed from:
rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueWebConfig.java
rqueue-spring-boot-example/src/main/resources/application.properties

but left in the view controller and referenced in the dashboard html template and JS file

Was this an accidental removal?

Microservice running Rqueue is greedy in disk space?!

Describe the bug

from the integration of the rqueue library in a Microservice running in a docker container in a server, i noticed that the server is running out of storage consequently. Investigating the issue, i have found that docker containers are consuming all the server disk storage. i was thinking that the logs of all Microservices are the issue, but they wasn't!

Then I decided to rebuild Microservices one by one to find out who cause the problem, till finding that the rqueue Microservice is the issue! once the container is up, the space got reduced. the microservice took 20GB in storage running for 30Hours handling 200K messages.

i really want to ask, what is going on? why a microservice is so that greedy to consume all this storage? where it's used?

How to Reproduce

running a microservice handling 150K messages daily in a docker container. (in my case i'm running it in a linux server)

Screenshots

i'm attaching 2 screenshots of the server disk storage right before and after the rqueue microservice rebuild where there's the 20GB difference.

before:
Annotation 2020-08-28 020448

after
Annotation 2020-08-28 020620

Library Dependencies

same jhipster project configuration as we used before in old tickets

Additional Details

OS: linux debian 9
Env: docker

Producer Consumer with 2 different projects

I am trying to create a Producer Consumer with 2 different project, In my Producer project i am using RqueueEndpointManager with the method registerQueue to register to a queue with the name 'test' and then i am using RqueueMessageEnqueuer with the method enqueue to write a message to the same queue but for some reason i am getting the error that the queue does not exists

my code:

@log4j2
@service
@AllArgsConstructor
public class MessageService {
private final RqueueMessageEnqueuer rqueueMessageEnqueuer;
private final RqueueEndpointManager endpointManager;
private final RqueueMessageManager rqueueMessageManager;
@PostConstruct
public void init() {
List queues = new ArrayList<>();
queues.add("test");
// queues.add("test2");
for (String queue : queues) {
System.out.println(endpointManager.isQueueRegistered(queue));
endpointManager.registerQueue(queue);
System.out.println(endpointManager.isQueueRegistered(queue));
}
}
private String[] getPriority(String queue) {
return new String[]{};
}
public void queueMessage(String queueName, Object message) {
String result = rqueueMessageEnqueuer.enqueue(queueName, message);
log.info("The result of the queue op: " + result);
}
}

the error:
2021-03-21 09:48:21.692 [http-nio-8080-exec-1] ERROR - Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is com.github.sonus21.rqueue.exception.QueueDoesNotExist: test] with root cause com.github.sonus21.rqueue.exception.QueueDoesNotExist: test

At the same time i tried looking at the Dashboard GUI(i have add a pic of the GUI) and there i can see that the queue exists Another problem i get is when i move from 1 tab to another in the GUI i get an error

image

the error:
2021-03-21 09:56:02.754 [http-nio-8080-exec-7] ERROR - GET call failed for https://api.github.com/repos/sonus21/rqueue/releases/latest org.springframework.web.client.ResourceAccessException: I/O error on GET request for "https://api.github.com/repos/sonus21/rqueue/releases/latest": connect timed out; nested exception is java.net.SocketTimeoutException: connect timed out

Message Deduplication

is there a way to see if a message object with id: 124 (having the property id) is queued already or not? ideally, a queued message should have a unique tag that helps filtering queued messages for uniqueness and return a message status (... i'm inspired by Android Work Manager ). ...If this isn't implemented already in RQueue, i'm thinking of supporting it in my internal source code. (by checking the status of objects in @RqueueListener) and reschedule it from the listener if needed to wait for more before executing it. what do you think?

Originally posted by @chlegou in #25 (comment)

What's not working?

When adding actuator build fails.

org.springframework.boot spring-boot-starter-actuator

getting this error:

Parameter 0 of method rqueueCounter in com.github.sonus21.rqueue.spring.RqueueListenerConfig required a bean named 'meterRegistry' that could not be found.

The injection point has the following annotations:
- @org.springframework.beans.factory.annotation.Autowired(required=false)

Action:

Consider defining a bean named 'meterRegistry' in your configuration.

Please assist :)

QueueDoesNotExist exception. How to setup Rqueue?

Describe the bug

I just set up a simple demo with the following configurations for Rqueue. But i got the QueueDoesNotExist exception. How can I solve this problem? thank you.

How to Reproduce

@SpringBootApplication
@EnableRedisRepositories
@EnableWebMvc
public class AuctionBotV2Application {

    public static void main(String[] args) {
        SpringApplication.run(AuctionBotV2Application.class, args);
    }

}
@Component
public class BotScheduler {

    @Autowired
    private RqueueMessageSender rqueueMessageSender;

    public void createJob(Product product) {
        rqueueMessageSender.enqueue(BOT_QUEUE, product);
    }

    @PostConstruct
    private void initJob(){
        Product product = new Product("fan", 3);
        createJob(product);
    }
}
@Component
@Slf4j
public class BotWorker {

    @RqueueListener(value = BOT_QUEUE)
    public void executeBidBot(Product product){
        log.warn("job: {}", product);
    }
}

And then i got an error:

Caused by: com.github.sonus21.rqueue.exception.QueueDoesNotExist: bot-queue
	at com.github.sonus21.rqueue.core.EndpointRegistry.get(EndpointRegistry.java:37) ~[rqueue-core-2.1.0-RELEASE.jar:na]
	at com.github.sonus21.rqueue.core.impl.BaseMessageSender.pushMessage(BaseMessageSender.java:111) ~[rqueue-core-2.1.0-RELEASE.jar:na]
	at com.github.sonus21.rqueue.core.impl.RqueueMessageSenderImpl.enqueue(RqueueMessageSenderImpl.java:58) ~[rqueue-core-2.1.0-RELEASE.jar:na]
	at com.chozoi.auction_bot_v2.bots.BotScheduler.createJob(BotScheduler.java:19) ~[classes/:na]
	at com.chozoi.auction_bot_v2.bots.BotScheduler.initJob(BotScheduler.java:25) ~[classes/:na]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_265]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_265]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_265]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_265]
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:389) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:333) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:157) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	... 19 common frames omitted

Library Dependencies

  <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
...
    <properties>
        <java.version>1.8</java.version>
    </properties>
...
       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.github.sonus21</groupId>
            <artifactId>rqueue-spring-boot-starter</artifactId>
            <version>2.0.0-RELEASE</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <type>jar</type>
        </dependency>

Customizable redis key prefix

I would like to be able to customize the prefix inserted in the redis keys. Is there any prediction for that?

Thank you very much in advance!

rqueue Producer Consumer Deserialization problem

What's not working?
I am trying to create a Producer Consumer with 2 different projects and in the Consumer project i am getting a Deserialization of message error

What're application dependencies ?

  • Rqueue Version: 2.8.0-RELEASE
  • Spring Boot Version: 2.3.4.RELEASE
  • Spring Messaging Version :5.2.9.RELEASE
  • Spring Data Redis Version :2.3.4..RELEASE

How to Reproduce (optional)?
i have created 2 simple projects, one writes an object to a queue and the second one consumes the object

my object/class:
@DaTa
@AllArgsConstructor
@NoArgsConstructor
public class Event implements Serializable {
private static final long serialVersionUID = -6897130225782938727L;
private int id;
private int packageId;
private int policyId;
private int EventId;
private String population;
private int clientId;
private String agentEmail;
private String clientPhone;
private int appId;
}

create instance of MessageListener manually

I cannot figure out how to manually create an instance of the MessageListener.
If I remove the "@component" annotation from the messagelistener class the "@RqueueListener" annotation for the methods seems to have no effects (the method is not triggered by channel messages even if the class is instantiated).
I would like to manually create a messagelistener instance without spring bean and autowiring features or to have some RqueueListener methods in "non component" classes. Is that possible in some way?

unicast message

I have an app with the RQ implementation. it is sucessfully able to post and consume from the Redis.
However If i scale up the consumer instances the same message is consumed by all consumers. I was reading through the Wiki on how to implement the unicast message handling. Any docs or samples on this ?

@RqueueListener Generic results not support

@Getter
@Setter
public class RqueueResponse<T> {
    private CreatorMerchant owner;
    private T data;
}

@Component
@Slf4j
public class CustomerChangeListener {
    @Autowired
    private CustomerScoreService customerScoreService;

    @RqueueListener(value = TopicConstant.CUSTOMER_INFO_CHANGE)
    public void infoChange(RqueueResponse<CustomerInfo> resp) {         <---- ERROR RESULT
        customerScoreService.updateInfoScore(resp.getData(), resp.getOwner().getMerchantId());
    }
}
@Service
public class RqueueService {
    @Autowired
    private RqueueMessageEnqueuer rqueueMessageEnqueuer;
    @Autowired
    private SaasContext saasContext;

    public <T> void enqueue(String queueName, T message) {
        rqueueMessageEnqueuer.enqueue(queueName, new RqueueResponse<T>(saasContext.getCurrentCreator(), message));
    }
}

ERROR:
java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to com.wzx.server.entity.m.CustomerInfo

analysis:

It does not support generic parsing and is converted to LinkHashMap by default

Not all dependency added to the starter

Describe the bug

When you add started not all dependency downloaded.

How to Reproduce

  • Add dependency to pom.xml
<dependency>
	<groupId>com.github.sonus21</groupId>
	<artifactId>rqueue-spring-boot-starter</artifactId>
	<version>2.5.0-RELEASE</version>
</dependency>
  • refresh dependency to load all needed libraries.
    Piccy.info - Free Image Hosting

  • configuration add EnableRqueue etc.
    Piccy.info - Free Image Hosting

Library Dependencies

  • Spring Boot: 2.4.3

Workuraund
In case we downgrade started and do a similar thing it will work for me. Link to one of your old version resource https://medium.com/@sonus21/asynchronous-task-execution-using-redis-and-spring-framework-125386c33f9

Some queue take more that 15 minute to execute

Describe the bug

I have some queue take more that 15 minute to finish execution. After 15 minute, queue is respawn but queue is not finished and It finished with status succes. After that, duplicate queue is executed and It recreate same queue after 15 minute. Only way to stop infini respawn, It to restart application and clear redis queue.

How to Reproduce

  • Create new queue and put Thread.sleep more that 15 minute

Additional Details

Add any other context about the problem that would be helpful like OS, Redis, Docker etc

Check version not working with proxy

Hi,
Each time a go here : "http://localhost:8080/rqueue/queues", rqueue try to connect to github but It don't working because my enviromnent have proxy. It's possible to support proxy or have feature to disable check version.

Error is that : org.springframework.web.client.ResourceAccessException: I/O error on GET request for "https://api.github.com/repos/sonus21/rqueue/releases/latest": Connect timed out; nested exception is java.net.SocketTimeoutException: Connect timed out

Thank

Possibility to retrieve a job position in queue

Is your feature request related to a problem? Please describe.
I need to have access to jobs position in the queue using their id and the queue current size

Describe the solution you'd like
It would be great to have methods to provide an efficient way to retrieve the current position of a job in the queue and the current size of the queue

Describe alternatives you've considered
I've already considered to call directly redis using the repository, but I haven't found an efficient way to retrieve the position of the job

Thanks !

Example of RqueueMessageSender injection

Hi would you like to provide an example of RqueueMessageSender configuration
Because when I try to use it, spring says me that it is not possible to inject it.

Lower version support

Describe the bug

Hi, I'am using spring cloud with a lower version of spring boot.

<spring-cloud.version>Greenwich.SR3</spring-cloud.version>

but it seem that not support the spring boot version below 2.2.0.RELEASE ?

How to Reproduce

try use lower version.

Screenshots

Library Dependencies

  • Spring Boot: 2.1.5.RELEASE
  • Spring Messaging: 5.1.7.RELEASE
  • Spring Data Redis: 2.1.5.RELEASE

Additional Details

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'rqueueMessageHandler' defined in class path resource [com/github/sonus21/rqueue/spring/boot/RqueueMessageAutoConfig.class]: Invocation of init method failed; nested exception is java.lang.NoClassDefFoundError: org/springframework/messaging/handler/annotation/support/PayloadMethodArgumentResolver
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1778) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:593) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:515) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE]
	at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE]
	at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE]
	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE]
	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:843) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:877) ~[spring-context-5.1.7.RELEASE.jar:5.1.7.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549) ~[spring-context-5.1.7.RELEASE.jar:5.1.7.RELEASE]
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:142) ~[spring-boot-2.1.5.RELEASE.jar:2.1.5.RELEASE]
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) [spring-boot-2.1.5.RELEASE.jar:2.1.5.RELEASE]
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.1.5.RELEASE.jar:2.1.5.RELEASE]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:316) [spring-boot-2.1.5.RELEASE.jar:2.1.5.RELEASE]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) [spring-boot-2.1.5.RELEASE.jar:2.1.5.RELEASE]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.5.RELEASE.jar:2.1.5.RELEASE]

Listeners are running at lower concurrency than the configured concurrency

Describe the bug

I'm doing some tests with rqueue and want to test how many concurrent jobs our server can handle.
Currently, I have two queues that are having these configs

@RqueueListener(value = "tags", concurrency = "10-20", deadLetterQueue = "tags-failed")
public void sendTag(String message) {
    log.info("tags: {}", message);
    String url = "https://jsonplaceholder.typicode.com/todos/1";
    this.restTemplate.getForObject(url, String.class);
    rqueueMessageEnqueuer.enqueue("notifications", "notifications added");

}

@RqueueListener(value = "notifications", concurrency = "10-20", deadLetterQueue = "notifications-failed")
public void sendNotifications(String message) {
    log.info("notifications: {}", message);
    String url = "https://jsonplaceholder.typicode.com/todos/1";
    this.restTemplate.getForObject(url, String.class);
}

but the number of concurrent jobs always throttle at two regardless of the concurrency value that I set.
I noticed that the server create 10 different consumers to handle the jobs successful and but it will always use the last two consumers regardless of how many consumers we create. (see the log for more details)
the machine I run the code in has good specs and most of the CPU cores are idle while the queue is running (check the top command output picture).

Screenshots & logs

*sample log: notice how only the last two workers keep working while the rest are idle

2021-05-04 09:44:16.260  INFO 28882 --- [ tagsConsumer-1] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
2021-05-04 09:44:16.346  INFO 28882 --- [ tagsConsumer-2] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
2021-05-04 09:44:16.521  INFO 28882 --- [ tagsConsumer-3] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
2021-05-04 09:44:16.655  INFO 28882 --- [tionsConsumer-1] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
2021-05-04 09:44:16.723  INFO 28882 --- [ tagsConsumer-4] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
2021-05-04 09:44:16.805  INFO 28882 --- [tionsConsumer-2] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
2021-05-04 09:44:16.843  INFO 28882 --- [ tagsConsumer-5] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
2021-05-04 09:44:16.926  INFO 28882 --- [tionsConsumer-3] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
2021-05-04 09:44:17.019  INFO 28882 --- [ tagsConsumer-6] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
2021-05-04 09:44:17.077  INFO 28882 --- [tionsConsumer-4] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
2021-05-04 09:44:17.137  INFO 28882 --- [ tagsConsumer-7] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
2021-05-04 09:44:17.195  INFO 28882 --- [tionsConsumer-5] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
2021-05-04 09:44:17.258  INFO 28882 --- [ tagsConsumer-8] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
2021-05-04 09:44:17.314  INFO 28882 --- [tionsConsumer-6] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
2021-05-04 09:44:17.383  INFO 28882 --- [ tagsConsumer-9] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
2021-05-04 09:44:17.427  INFO 28882 --- [tionsConsumer-7] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
2021-05-04 09:44:17.540  INFO 28882 --- [tagsConsumer-10] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
2021-05-04 09:44:17.566  INFO 28882 --- [tionsConsumer-8] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
2021-05-04 09:44:17.622  INFO 28882 --- [ tagsConsumer-9] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
2021-05-04 09:44:17.665  INFO 28882 --- [tionsConsumer-9] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
2021-05-04 09:44:17.702  INFO 28882 --- [ tagsConsumer-8] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
2021-05-04 09:44:17.773  INFO 28882 --- [ionsConsumer-10] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
2021-05-04 09:44:17.808  INFO 28882 --- [tagsConsumer-10] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
2021-05-04 09:44:17.890  INFO 28882 --- [tionsConsumer-9] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
2021-05-04 09:44:17.940  INFO 28882 --- [ tagsConsumer-8] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
2021-05-04 09:44:17.984  INFO 28882 --- [ionsConsumer-10] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
2021-05-04 09:44:18.071  INFO 28882 --- [tagsConsumer-10] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
2021-05-04 09:44:18.117  INFO 28882 --- [tionsConsumer-9] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
2021-05-04 09:44:18.158  INFO 28882 --- [ tagsConsumer-8] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
2021-05-04 09:44:18.186  INFO 28882 --- [ionsConsumer-10] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
2021-05-04 09:44:18.231  INFO 28882 --- [tagsConsumer-10] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
2021-05-04 09:44:18.257  INFO 28882 --- [tionsConsumer-9] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
2021-05-04 09:44:18.300  INFO 28882 --- [ tagsConsumer-8] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
2021-05-04 09:44:18.314  INFO 28882 --- [ionsConsumer-10] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
2021-05-04 09:44:18.371  INFO 28882 --- [tagsConsumer-10] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
2021-05-04 09:44:18.378  INFO 28882 --- [tionsConsumer-9] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
2021-05-04 09:44:18.431  INFO 28882 --- [ionsConsumer-10] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
2021-05-04 09:44:18.431  INFO 28882 --- [ tagsConsumer-8] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
2021-05-04 09:44:18.495  INFO 28882 --- [tionsConsumer-9] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
2021-05-04 09:44:18.495  INFO 28882 --- [tagsConsumer-10] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
2021-05-04 09:44:18.557  INFO 28882 --- [ tagsConsumer-8] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
2021-05-04 09:44:18.557  INFO 28882 --- [ionsConsumer-10] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
2021-05-04 09:44:18.605  INFO 28882 --- [ tagsConsumer-9] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
2021-05-04 09:44:18.619  INFO 28882 --- [tionsConsumer-9] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
2021-05-04 09:44:18.673  INFO 28882 --- [ tagsConsumer-8] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}

Number of pending and running jobs

Number of pending and running jobs

top command output while the queue is running
top command output while the queue is running

Library Dependencies

  • Spring Boot:
  • Spring Messaging:
  • Spring Data Redis:

Extra notes

this is how I run the war file for the application:

java -Dgrails.env=prod -jar sample-app-0.0.1-test.war -Xms2g -Xmx2g -Xmn150m -XX:GCTimeRatio=2 -XX:ParallelGCThreads=20 -XX:+UseParNewGC -XX:MaxGCPauseMillis=50 -XX:+DisableExplicitGC

Can you add a sample for Dead Letter Queue Scenario

Failed messages are dead after retry
I have retry as 1-3 and throwing runtime exception for few messages to check failure scenario.
When message reaches retry limit, I can see log showing moved to the configured dead letter queue of my delay queue.
But messages are not moved

To Summarize, the dead messages are not reaching their listener method

Manual start and stop of container

Can the queue stop or start according the configuration? When we would stop
the application,we should stop the queue listener so that data won't miss

Periodic task add

I think,
In some cases, the support of periodic tasks is required, and periodic tasks allow cancellation.
Hope to achieve this function.

Many thanks!!

How to guarantee that one message consumed exactly once?

Describe the bug

I have 3 job queues. Jobs in the first queue create jobs in the second queue.
Even I assign numRetries to 0. But the second RqueueListener seems to consume one message twice?
How can I configure to guarantee that one message consumed exactly once?
Thank you.

How to Reproduce

    @RqueueListener(value = FLASH_BID_BOT_QUEUE, numRetries = "0", deadLetterQueueListenerEnabled = "true", deadLetterQueue = "failed-bot-queue", concurrency = "3-5")
    public void executeFlashBidBot(Bot bot) {
    ...
    }

    @RqueueListener(value = FLASH_BID_AFTER_MIN_PRICE_BOT_QUEUE, numRetries = "0", deadLetterQueueListenerEnabled = "true", deadLetterQueue = "failed-after-min-price-bot-queue", concurrency = "1-3")
    public void makeInstantBidAfterMinPrice(LateBot lateBot) {
    ...
    }

   @RqueueListener(value = FLASH_BID_INSTANT_BID_PADDING_QUEUE, numRetries = "0", deadLetterQueueListenerEnabled = "true", deadLetterQueue = "failed-instant-bid-padding-queue", concurrency = "1-3")
    private void padInstantBid(PaddingInstantBid instantBid) {
    ...
    }

Screenshots

e3

Library Dependencies

  • Spring Boot:
  • Spring Data Redis:

What's not working?

Describe the bug

My project is composed of a user-side module, a management-side module, and a public module. I created a mail entity class in the public module, and then put the mail entity into the queue on the management side, and then consume it. My client and management The end uses the same redis, and then there is a problem today. The mail entity message queued by the management end was successfully consumed on the management end, but an error was reported on the web end. String could not be converted to a mail entity, and the deserialization failed. Makes me very confused

exception:
ERROR rqueueMessageListenerContainer-6 com.github.sonus21.rqueue.listener.RqueueExecutor - [email-queue_low] Message consumer failed
org.springframework.messaging.MessagingException: An exception occurred while invoking the handler method; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.xxxx.queue.EmailEntity] for GenericMessage
at com.github.sonus21.rqueue.listener.RqueueMessageHandler.processHandlerMethodException(RqueueMessageHandler.java:290) ~[rqueue-2.0.0-RELEASE.jar!/:?]
at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMatch(AbstractMethodMessageHandler.java:581) ~[spring-messaging-5.2.6.RELEASE.jar!/:5.2.6.RELEASE]
at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessageInternal(AbstractMethodMessageHandler.java:520) ~[spring-messaging-5.2.6.RELEASE.jar!/:5.2.6.RELEASE]
at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:454) ~[spring-messaging-5.2.6.RELEASE.jar!/:5.2.6.RELEASE]
at com.github.sonus21.rqueue.listener.RqueueExecutor.start(RqueueExecutor.java:377) [rqueue-2.0.0-RELEASE.jar!/:?]
at com.github.sonus21.rqueue.listener.MessageContainerBase.run(MessageContainerBase.java:90) [rqueue-2.0.0-RELEASE.jar!/:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_212]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]

How to Reproduce

  • Steps to reproduce the behavior
    Start two services, share a redis, and monitor the same queue. When one service generates a message, the other service sometimes reports an error

  • A sample reproducable code if possible.
    send code:
    here i user priorityCode is low
    private void sendEmailQueue(EmailEntity emailEntity,Integer priorityCode){
    String priority ;
    try {
    priority = EmailPriorityEnum.getDescription(priorityCode);

          rqueueMessageSender.enqueueWithPriority("email-queue", priority,emailEntity);
      }catch(Exception e ){
          log.error("邮箱打入队列失败",e);
      }
    

    }
    consume code:
    @RqueueListener(value = "email-queue",numRetries="3",priority="critical=10,high=8,medium=4,low=1")
    public void onMessage(EmailEntity emailEntity) {
    log.info("邮件开始消费{},{},{}",new Date(),emailEntity.getTitle(),emailEntity.getReceiver());
    try{
    if(!EmptyUtils.isEmpty(emailEntity.getReceiver())){
    log.info("发送html邮件至:{}", emailEntity.getReceiver());
    sendHtmlEmail(emailEntity.getTitle(),emailEntity.getText(),emailEntity.getReceiver());
    }
    }catch (EmailException e) {
    log.error("邮件发送失败 {} {} ",emailEntity.getReceiver(),emailEntity.getTitle());
    log.error("邮件内容为--------"+emailEntity.getText());
    }catch(Exception e ){
    log.error("邮件消费失败",e);
    }
    }

Library Dependencies

  • Spring Boot: 2.3.0.RELEASE
  • rquequ: 2.0.0-RELEASE
  • redis.clients.jedis: 2.8.2
  • Any other Spring library dependencies

Dashboard does not work in Spring Boot

Added dependency into Spring Boot 2.3.3.RELEASE

    <dependency>
      <groupId>com.github.sonus21</groupId>
      <artifactId>rqueue-spring-boot-starter</artifactId>
      <version>2.4.0-RELEASE</version>
    </dependency>

All configuration is set to default. Security is set to allow all paths without auth.

Visiting /rqueue provides me with 500 error + following stacktrace:

java.lang.IllegalStateException: null
	at org.objectweb.asm.tree.analysis.BasicInterpreter.<init>(BasicInterpreter.java:66) ~[asm-analysis-9.0.jar:9.0]
	at org.parboiled.transform.RuleMethodInterpreter.<init>(RuleMethodInterpreter.java:42) ~[parboiled-java-1.1.7.jar:1.1.7]
	at org.parboiled.transform.InstructionGraphCreator.process(InstructionGraphCreator.java:41) ~[parboiled-java-1.1.7.jar:1.1.7]
	at org.parboiled.transform.ParserTransformer.runMethodTransformers(ParserTransformer.java:62) ~[parboiled-java-1.1.7.jar:1.1.7]
	at org.parboiled.transform.ParserTransformer.extendParserClass(ParserTransformer.java:45) ~[parboiled-java-1.1.7.jar:1.1.7]
	at org.parboiled.transform.ParserTransformer.transformParser(ParserTransformer.java:38) ~[parboiled-java-1.1.7.jar:1.1.7]
	at org.parboiled.Parboiled.createParser(Parboiled.java:54) ~[parboiled-java-1.1.7.jar:1.1.7]
	at org.jtwig.parser.parboiled.ParserContext.instance(ParserContext.java:31) ~[jtwig-core-5.87.0.RELEASE.jar:na]
	at org.jtwig.parser.parboiled.ParboiledJtwigParser.parse(ParboiledJtwigParser.java:37) ~[jtwig-core-5.87.0.RELEASE.jar:na]
	at org.jtwig.parser.cache.InMemoryConcurrentPersistentTemplateCache.get(InMemoryConcurrentPersistentTemplateCache.java:39) ~[jtwig-core-5.87.0.RELEASE.jar:na]
	at org.jtwig.parser.CachedJtwigParser.parse(CachedJtwigParser.java:19) ~[jtwig-core-5.87.0.RELEASE.jar:na]
	at org.jtwig.JtwigTemplate.render(JtwigTemplate.java:98) ~[jtwig-core-5.87.0.RELEASE.jar:na]
	at org.jtwig.JtwigTemplate.render(JtwigTemplate.java:80) ~[jtwig-core-5.87.0.RELEASE.jar:na]
	at org.jtwig.web.servlet.JtwigDispatcher.render(JtwigDispatcher.java:62) ~[jtwig-web-5.87.0.RELEASE.jar:na]
	at org.jtwig.web.servlet.JtwigDispatcher.render(JtwigDispatcher.java:52) ~[jtwig-web-5.87.0.RELEASE.jar:na]
	at org.jtwig.spring.JtwigView.renderMergedTemplateModel(JtwigView.java:17) ~[jtwig-spring-5.87.0.RELEASE.jar:na]
	at org.springframework.web.servlet.view.AbstractTemplateView.renderMergedOutputModel(AbstractTemplateView.java:179) ~[spring-webmvc-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.springframework.web.servlet.view.AbstractView.render(AbstractView.java:316) ~[spring-webmvc-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.springframework.web.servlet.DispatcherServlet.render(DispatcherServlet.java:1373) ~[spring-webmvc-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.springframework.web.servlet.DispatcherServlet.processDispatchResult(DispatcherServlet.java:1118) ~[spring-webmvc-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1057) ~[spring-webmvc-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943) ~[spring-webmvc-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:645) ~[javax.servlet-api-4.0.1.jar:4.0.1]
	at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:750) ~[javax.servlet-api-4.0.1.jar:4.0.1]
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.37.jar:9.0.37]
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:320) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:126) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:90) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:118) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:137) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:158) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:63) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.security.oauth2.server.resource.web.BearerTokenAuthenticationFilter.doFilterInternal(BearerTokenAuthenticationFilter.java:114) ~[spring-security-oauth2-resource-server-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:116) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.web.filter.CorsFilter.doFilterInternal(CorsFilter.java:92) ~[spring-web-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.security.web.header.HeaderWriterFilter.doHeadersAfter(HeaderWriterFilter.java:92) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:77) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:215) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:178) ~[spring-security-web-5.3.4.RELEASE.jar:5.3.4.RELEASE]
	at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:358) ~[spring-web-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:271) ~[spring-web-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:103) ~[spring-web-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:373) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1589) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
	at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) ~[tomcat-embed-core-9.0.37.jar:9.0.37]
	at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

I already have controller which is handling / path; this might be an issue? I have also tried to add snipped from wiki regarding WebMvcConfigurer, but it had no effect.

rqueue not support LocalDate

rqueue not support LocalDate

Exception in thread "main" com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `java.time.LocalDateTime` (no Creators, like default construct, exist): cannot deserialize from Object value (no delegate- or

Problems configuring Rqueue in Springboot

Hello. I'm not able to configure Rqueue in Springboot. SpringBoot is unable to inject the "RqueueMessageSender". To better demonstrate my case I created an example that can be checked "here";

Basically Spring is not injecting the RqueueMessageSender dependency

Exception thrown:
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'com.github.sonus21.rqueue.producer.RqueueMessageSender' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}

Rqueue seems to be not dequeueing the messages when there are delays in enqueue time

When messages are getting pushed to Rqueue with a gap of 2 mins or more, the messages seems to be not getting consumed by listeners.

Using this in spring boot app (2.0), for delayed queue functionality. Have developed an API to push messages to a test-queue with constant delay. Have placed logs in producer and listeners too. If API is invoked with some time gaps (or 1 min or more ), I can see the messages getting pushed to Rqueue successfully. However cannot see logs of listeners for all those messages.

Screenshots
message1
message2
message3
message4
message_dequeue

Rqueue version : v1.3
Redis mode : Single Node.

Unable to boot up the spring boot application due to UnsatisfiedDependencyException

While booting up the spring boot application, getting this error

Caused by: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'rqueueDashboardChartServiceImpl' defined in URL [jar:file:/Users/harshavardhan/.gradle/caches/modules-2/files-2.1/com.github.sonus21/rqueue-core/2.6.1-RELEASE/ca2d527d162f20ad9f4692faf2369ba92d05d23b/rqueue-core-2.6.1-RELEASE.jar!/com/github/sonus21/rqueue/web/service/impl/RqueueDashboardChartServiceImpl.class]: Unsatisfied dependency expressed through constructor parameter 0; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'rqueueQStatsDaoImpl' defined in URL [jar:file:/Users/harshavardhan/.gradle/caches/modules-2/files-2.1/com.github.sonus21/rqueue-core/2.6.1-RELEASE/ca2d527d162f20ad9f4692faf2369ba92d05d23b/rqueue-core-2.6.1-RELEASE.jar!/com/github/sonus21/rqueue/dao/impl/RqueueQStatsDaoImpl.class]: Unsatisfied dependency expressed through constructor parameter 0; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'rqueueConfig' defined in class path resource [com/github/sonus21/rqueue/spring/boot/RqueueListenerAutoConfig.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.github.sonus21.rqueue.config.RqueueConfig]: Factory method 'rqueueConfig' threw exception; nested exception is org.springframework.data.redis.RedisConnectionFailureException: Cannot get Jedis connection; nested exception is redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool

Rqueue is not clearing out the messages even after they are subscribed successfully

When I am publishing messages from one application and try to consume on another, they are not clearing in the queue, still in running stage and scheduled to run after 15 mins. The process is repeating.

The messages are clearing if I publish and consume on same application.

In the producer application. I have added below properties and registering queues on startup.

rqueue.scheduler.enabled=false
rqueue.mode=PRODUCER 

Grouping message in the same queue

Is your feature request related to a problem? Please describe.
Messages in the queue don’t have the option to have a group id to signal them for fifo execution

Describe the solution you'd like.
When enqueuing a message I should provide a message group id that would allow all messages in that queue with the same group id to execute in sequence while still allowing concurrency across different group ids

Describe alternatives you've considered
Using SQS fifo since it allows both concurrent execution across different group ids and respects sequential execution for messages with same group id

Enqueue a list of Objects in Rqueue

Describe the bug

I have enqueued a List<MyObject> in rqueue which was successful, but i have got a CastException in the message executor @RqueueListener once the message is loaded from the database in the run method. Is it possible to enqueue a list of objects?

How to Reproduce

the enqueue method:

rqueueMessageSender.enqueueIn(myQueue, Lists.newArrayList(MyObjectInstance), 1, TimeUnit.SECONDS);

the listener:

@RqueueListener(value = myQueue)
public void informNextRoll(List<MyObject> MyObjectList) { ... }

Additional Details

here is the stack trace:

    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:176)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMatch(AbstractMethodMessageHandler.java:565)
    at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessageInternal(AbstractMethodMessageHandler.java:520)
    at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:454)
    at com.github.sonus21.rqueue.listener.RqueueExecutor.start(RqueueExecutor.java:158)
    at com.github.sonus21.rqueue.listener.MessageContainerBase.run(MessageContainerBase.java:50)
    at io.github.jhipster.async.ExceptionHandlingAsyncTaskExecutor.lambda$createWrappedRunnable$1(ExceptionHandlingAsyncTaskExecutor.java:78)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.IllegalArgumentException: java.lang.ClassCastException@1a1632c2
    at jdk.internal.reflect.GeneratedMethodAccessor118.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.base/java.lang.reflect.Method.invoke(Unknown Source)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    ... 10 common frames omitted

thanks for looking into this :)

Job not restarting after restart of a worker (in a single-worker scenatio)

Describe the bug

If you are using a single worker (I haven't tested this with multiple workers) and you cut your worker during a job execution and restart it, the worker will not retry the cutted job, the job will simply be forgotten compleately.

How to Reproduce

  • Launch a worker
  • Trigger a long job
  • Cut the worker
  • Restart the worker
  • Nothing happens

In an ideal scenario, the job should be restarted when the worker restarts

@EnableRqueue annotation isn't defined in 2.7.0

Bug Description

this import statement return that the class isn't defined .

import com.github.sonus21.rqueue.spring.EnableRqueue;

How to Reproduce

  • make sure that you have 2.7.0 version pom.
<dependency>
	<groupId>com.github.sonus21</groupId>
	<artifactId>rqueue-spring-boot-starter</artifactId>
	<version>2.7.0-RELEASE</version>
</dependency>
  • try to use @EnableRqueue annotation.

Library Dependencies

  • Spring Boot:
  • Spring Messaging:
  • Spring Data Redis:

Add my organisation in Rqueue Users

Multiple @RqueueListener for a queue

Describe the bug

You cannot use more than one rqueuelistener with the same name?

 
@Component
@Slf4j
public class RQueueMessageListener {
    @RqueueListener(value = "simple-queue")
    public void simpleMessage(String message) {
        log.info("simple-queue: {}", message);
        // todo work

    }

    @RqueueListener(value = "simple-queue")
    public void simpleMessage2(String message) {
        log.info("simple-queue: {}", message);
        // todo study
    }
}

Creating separate applications for consumers and producers

Hi,

I am not sure whether it's a bug or not, however, here is my scenario -

  1. I have one spring boot web service application that produces the messages, using rqueue sender enqueue methods.
  2. I want to create separate consumers as spring boot apps, where the enqueued messages will get consumed.

At present, the messages are not even getting posted to the redis queue, if I do not write the message consumers/listeners inside the web service app itself. What's happening is that, it's giving queue not found error, if I comment out the listener methods inside the web service app. I can see that the queues exist in the Redis database, still, it throws errors if the listeners are not mentioned in all the apps - the web service, and two consumers.

This was not the case in 1.3.0 version. I could write code only for enqueuing the messages in the web service, and separately write the rqueue listeners in other spring boot apps, running solely as consumers.

com.github.sonus21.rqueue.exception.QueueDoesNotExist: zbar-2
at com.github.sonus21.rqueue.core.QueueRegistry.get(QueueRegistry.java:37) ~[rqueue-2.0.1-RELEASE.jar:na]
at com.github.sonus21.rqueue.core.RqueueMessageSenderImpl.pushMessage(RqueueMessageSenderImpl.java:191) ~[rqueue-2.0.1-RELEASE.jar:na]
at com.github.sonus21.rqueue.core.RqueueMessageSenderImpl.enqueue(RqueueMessageSenderImpl.java:78) ~[rqueue-2.0.1-RELEASE.jar:na]
at com.sentinel.service.LabelValidationService.validateSubmission(LabelValidationService.java:161) ~[classes/:na]
at com.sentinel.controller.AsyncSubmissionController.submit(AsyncSubmissionController.java:75) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_172]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_172]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_172]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_172]

Dashboard resources aren't accessible from gateway

Hi again :)

I have noticed that the rqueue dashboard isn't accessible from a gateway of a microservice architecture server.

How to Reproduce

i have checked the source code of the dashboard template, and it looks like the href links (css + js files) are static to direct endpoint. See here.
for this, the resource files aren't accessible. please change the href links to accept any entry path.

Additional Details

Here is an example of HTTP calling the dashboard from a direct microservice and from the gateway.

MS: http://localhost:8001/rqueue   
Gateway: http://localhost:8000/services/rqueue-service/rqueue

Please make resources accessible from any dynamic endpoint.

Thanks. :)

Unable to configure Rqueue in jhipster Spring boot microservice application

Describe the bug
I'm not able to configure Rqueue in a jHipster Spring boot microservice application. I have followed all your resources over the internet and also your tutorial projects, but without luck

this is my pom file:

<spring-boot.version>2.2.7.RELEASE</spring-boot.version>
...............
<dependency>
	<groupId>com.github.sonus21</groupId>
	<artifactId>rqueue-spring-boot-starter</artifactId>
	<version>2.0.1-RELEASE</version>
</dependency>
...............
# adding /removing this dependency didn't help.
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

Here is my configuration file:

import com.github.sonus21.rqueue.config.SimpleRqueueListenerContainerFactory;
import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.repository.configuration.EnableRedisRepositories;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;


@EnableRedisRepositories
@Configuration
public class RQueueConfiguration {


    @Bean()
    public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory(RqueueMessageHandler rqueueMessageHandler) {
        SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory();
        factory.setRqueueMessageHandler(rqueueMessageHandler);

        // adding this ThreadPoolTaskExecutor didn't help either.
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setThreadNamePrefix("taskExecutor");
        threadPoolTaskExecutor.setCorePoolSize(10);
        threadPoolTaskExecutor.setMaxPoolSize(50);
        threadPoolTaskExecutor.setQueueCapacity(0);
        threadPoolTaskExecutor.afterPropertiesSet();
        factory.setTaskExecutor(threadPoolTaskExecutor);

        return factory;
    }

}

How to Reproduce

generate a jHipster microservice project and integrate Rqueue in it.
here is the error i'm getting:

ERROR 2704 --- [  restartedMain] o.s.b.d.LoggingFailureAnalysisReporter   : 

***************************
APPLICATION FAILED TO START
***************************

Description:

The bean 'rqueueMetrics' could not be injected as a 'com.github.sonus21.rqueue.metrics.RqueueMetrics' because it is a JDK dynamic proxy that implements:
	org.springframework.context.ApplicationListener


Action:

Consider injecting the bean as one of its interfaces or forcing the use of CGLib-based proxies by setting proxyTargetClass=true on @EnableAsync and/or @EnableCaching.


Process finished with exit code 1

additional resources

use https://start.jhipster.tech/ to generate a jHipster app in just 1min. here is my configurations: https://i.ibb.co/rZ7mrsS/image.png

please help me with this.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.