Giter Site home page Giter Site logo

jesque's Introduction

Jesque

Build Status Coverage Status License Apache 2.0 Maven Central

Jesque is an implementation of Resque in Java. It is fully-interoperable with the Ruby and Node.js (Coffee-Resque) implementations.

Jesque is a Maven project and depends on Jedis to connect to Redis, Jackson to map to/from JSON and SLF4J for logging.

The project contains a client implementation as well as a worker implementation that supports listeners.

NOTE: Jesque's delayed jobs implementation is not compatible with resque-scheduler


How do I use it?

Jesque requires Java 7+. Download the latest source at:

https://github.com/gresrun/jesque Or, to use it in your Maven project, add it as a dependency:

<dependency>
  <groupId>net.greghaines</groupId>
  <artifactId>jesque</artifactId>
  <version>2.2.0</version>
</dependency>

Quickstart:

// Configuration
final Config config = new ConfigBuilder().build();

// Add a job to the queue
final Job job = new Job("TestAction",
  new Object[]{ 1, 2.3, true, "test", Arrays.asList("inner", 4.5)});
final Client client = new ClientImpl(config);
client.enqueue("foo", job);
client.end();

// Start a worker to run jobs from the queue
final Worker worker = new WorkerImpl(config,
  Arrays.asList("foo"), new MapBasedJobFactory(map(entry("TestAction", TestAction.class))));
  
final Thread workerThread = new Thread(worker);
workerThread.start();

// Enqueue more jobs, etc.

// Shutdown the worker when finished
worker.end(true);
try { workerThread.join(); } catch (Exception e){ e.printStackTrace(); }

If enqueueing multiple jobs at the same time, there is client.batchEnqueue(String queue, List<Job> jobs) which does it in an optimized way.

Delayed jobs

Delayed jobs can be executed at sometime in the future.

final long delay = 10; // in seconds
final long future = System.currentTimeMillis() + (delay * 1000); // timestamp

client.delayedEnqueue("fooDelay", job, future);

Recurring Jobs

Recurring jobs can start at a specific time and execute at specified intervals.

final long delay = 10; // in seconds
final long future = System.currentTimeMillis() + (delay * 1000); // timestamp
final long frequency = 60; // in seconds

client.recurringEnqueue("fooRecur", job, future, (frequency * 1000));

Cancelling jobs

Delayed and recurring jobs can be cancelled.

client.removeDelayedEnqueue("fooDelay", job);
client.removeRecurringEnqueue("fooRecur", job);

Using a ClientPool

ClientPool is useful in multi threaded apps,

final Client jesqueClientPool = new ClientPoolImpl(config, PoolUtils.createJedisPool(config));
jesqueClientPool.enqueue("foo", job);

Listeners

You can execute custom callbacks during specific Worker events.

int myVar = 0;
worker.getWorkerEventEmitter().addListener(new WorkerListener(){
   public void onEvent(WorkerEvent event, Worker worker, String queue, Job job, 
					   Object runner, Object result, Throwable t) {
    if (runner instanceof TestAction) {
        ((TestAction) runner).setSomeVariable(myVar);
    }
  }
}, WorkerEvent.JOB_EXECUTE);

Available Worker Events

  • WORKER_START Finished starting up and is about to start running.
  • WORKER_POLL Polling the queue.
  • JOB_PROCESS Processing a Job.
  • JOB_EXECUTE About to execute a materialized Job.
  • JOB_SUCCESS Successfully executed a materialized Job.
  • JOB_FAILURE Caught an Exception during the execution of a materialized Job.
  • WORKER_ERROR Caught an Exception during normal operation.
  • WORKER_STOP Finished running and is about to shutdown.

For more usage examples check the tests. The tests require that Redis is running on localhost:6379.

Use the resque-web application to see the status of your jobs and workers or, if you prefer Java, try Jesque-Web.

Redis Configuration

As mentioned Jesque depends on Jedis to connect to Redis.

You can configure Jesque to connect to Redis given a URL in a system property (as used in Heroku + RedisToGo) with the following snippet:

final ConfigBuilder configBuilder = new ConfigBuilder();
try {
  URI redisUrl = new URI(System.getProperty("REDIS_PROVIDER", "127.0.0.1"));
  String redisHost = redisUrl.getHost();
  int redisPort = redisUrl.getPort();
  String redisUserInfo = redisUrl.getUserInfo();
  if (redisHost != null) {
    configBuilder.withHost(redisHost);
  }
  if (redisPort > -1) {
    configBuilder.withPort(redisPort);
  }
  if (redisUserInfo != null) {
    configBuilder.withPassword(redisUserInfo.split(":",2)[1]);
  }
} catch (URISyntaxException use) {
  // Handle error
}
final Config config = configBuilder.build();

Design Decisions

  • I chose to implement the jobs as classes that implement java.lang.Runnable or java.util.concurrent.Callable. If the job requires arguments (most do), there must be a constructor that matches the supplied arguments. I felt this was the most flexible option and didn't require the jobs to inherit or implement a special Jesque class. Because of this, the jobs don't even need to know about Jesque at all. Furthermore, the client need not have the job's Class in its VM, it only needs to know the classname and all the parameters' Classes on its classpath. Only the workers realize the job and then run them.
  • I chose to use Jedis because:
    1. It is simple to use
    2. Fully supports Redis 2.0 and uses the new unified protocol
    3. No dependencies
  • I chose to use Jackson because:
    1. I was already familiar with it
    2. It performs great and does what it says on the tin
    3. No dependencies
  • I chose to use SLF4J because:
    1. It lets the application choose how to log
    2. No dependencies

Misc.

If you are on Mac OS X, I highly recommend using the fantasic Homebrew package manager. It makes installing and maintaining libraries, tools and applications a cinch. E.g.:

brew install redis
brew install git
brew install maven

Boom! Ready to go!


License

Copyright 2021 Greg Haines

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

jesque's People

Contributors

argvk avatar chenzhang22 avatar danieldk avatar dependabot[bot] avatar dwilkie avatar evpaassen avatar gkorland avatar gregjan avatar gresrun avatar heartsavior avatar lpfeup avatar maciejp avatar mewesk avatar michaelcameron avatar pasjimmy avatar pierredavidbelanger avatar sullis avatar taher-ghaleb avatar tj--- avatar tjhruska avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

jesque's Issues

why don't you use blpop

I saw the source code in WorkerImpl class. In the pop() method: payload = this.jedis.lpop(key). it's not block method, and you poll the result every 500ms if payload is null. It wastes CPU time, and I think it is not the better solution. So, i'm puzzled, why don't you use the blpop method?

AUTH issue with Redis 2.4.0+: Jedis is sending unexpected AUTH

I am trying to upgrade to Redis 2.4.0 (saw same issue in 2.4.1) and am using the Jedis 2.0.0.

My redis config is NOT using authentication yet on startup I receive this exception:

redis.clients.jedis.exceptions.JedisDataException: ERR Client sent AUTH, but no password is set
    at redis.clients.jedis.Protocol.processError(Protocol.java:55)
    at redis.clients.jedis.Protocol.process(Protocol.java:62)
    at redis.clients.jedis.Protocol.read(Protocol.java:127)
    at redis.clients.jedis.Connection.getStatusCodeReply(Connection.java:162)
    at redis.clients.jedis.BinaryClient.connect(BinaryClient.java:58)
    at redis.clients.jedis.Connection.sendCommand(Connection.java:80)
    at redis.clients.jedis.Connection.sendCommand(Connection.java:76)
    at redis.clients.jedis.BinaryClient.auth(BinaryClient.java:468)
    at redis.clients.jedis.Jedis.auth(Jedis.java:1966)

If I build the latest Jedis from trunk (jedis-2.0.1-SNAPSHOT.jar) I get the same error.

Downgrading to Redis 2.2.14 solves the problem. Is there anything I can do on my end so we can continue to push out Redis 2.4?

Thanks.

jesque worker pruned by resque worker

We are using jesque workers in a RoR project with resque (client and workers).

When a jesque worker has already registered itself and a resque worker starts up, it
attempts to prune_dead_workers https://github.com/defunkt/resque/blob/master/lib/resque/worker.rb#L329.

It fails to recognize the jesque worker as being alive and so it is pruned. Is this a common issue? What
are best practices for interoperating jesque and resque?

We tried making the jesque worker more looking like a resque worker by giving it a procline ($0) matching /resque/.
But without patching resque this still fails, because jesque appends to the pid the thread id (?). Resque does not
seem to know about this, so the described problem occurs.

Selected database is lost when reviving the Jedis client connection

Reviving a lost connection via JedisUtils#ensureJedisConnection zeroes the selected database index, and so following a re-connection all tasks will be added to database '0'.

This sequence resets the index:

try { jedis.quit(); } catch (Exception e){} // Ignore
try { jedis.disconnect(); } catch (Exception e){} // Ignore

And it looks like the index should be selected once more upon re-connection.

Perhaps it would be best to encapsulate all the handling of the Jedis client within net.greghaines.jesque.client.ClientImpl

home page Example has error

source is

// Start a worker to run jobs from the queue
final Worker worker = new WorkerImpl(config,
Arrays.asList("foo"), map(entry("TestAction", TestAction.class)));
final Thread workerThread = new Thread(worker);

I try ๏ผŒit should be

final Worker worker = new WorkerImpl(config, Arrays.asList("foo"),
new MapBasedJobFactory(JesqueUtils.map(JesqueUtils.entry(
"TestAction", TestAction.class))));

I hope more doc about jesque for more people use it;

could you give a whole product use example , thanks

Delayed jobs with resque-web

The documentation says that it is possible to use the resque-web application to view the status of queues etc. I have tried this and it seems to work fine until I specify a delayed job, at which point resque-web starts failing with 500 errors. I have added resque-sheduler to resque-web but still have the same problem.

I wondered if the delayed job implementation use a different message format than resque-scheduler? Should I be using jesque-web instead of resque-web?

Thanks

pipelinedCommands are not reseted after JedisConnectionException

Hi gresrun,

regard to this reported bug in jedis library (redis/jedis#433), could you add jedis.getClient().resetPipelinedCount(), until jedis library repairs this bug?

Something like that

/**
     * Attempt to reconnect to Redis.
     * 
     * @param jedis
     *            the connection to Redis
     * @param reconAttempts
     *            number of times to attempt to reconnect before giving up
     * @param reconnectSleepTime
     *            time in milliseconds to wait between attempts
     * @return true if reconnection was successful
     */
    public static boolean reconnect(final Jedis jedis, final int reconAttempts, final long reconnectSleepTime) {
        int i = 1;
        do {
            try {
                jedis.disconnect();
                jedis.getClient().resetPipelinedCount();
                try {
                    Thread.sleep(reconnectSleepTime);
                } catch (Exception e2) {
                }
                jedis.connect();
            } catch (JedisConnectionException jce) {
            } // Ignore bad connection attempts
            catch (Exception e3) {
                LOG.error("Unknown Exception while trying to reconnect to Redis", e3);
            }
        } while (++i <= reconAttempts && !testJedisConnection(jedis));
        return testJedisConnection(jedis);
    }

Jesque not removing workers from Resque upon end()

I am spinning up Workers and they're processing jobs just fine. I use the existence of a control file to request workers to shut down.

Even though I am calling

worker.end(true);

The worker is still in Resque, waiting for a job. I am monitoring the redis stream using redis-cli monitor and dont see any commands to remove this worker.

          for(Worker w : avatarWorkers) {
            if(w != null) {
              log.info("Stopping avatar worker: " + w.toString());
              w.end(true);
            }
          }

Results log messages like:

2011-10-29 16:42:58,436 [main] INFO  com.batch.App - Stopping avatar worker: dev:worker:Codys-MacBook-Pro.local:10065-6:JAVA_DYNAMIC_QUEUES,avatars

But they're stil in Resque / Redis.

Am I doing something wrong?

Why does a worker have to be running?

Does a worker thread have to be running before you can write to a queue?

During development, in my web app, I write to a queue that has no worker running. When looking at jesque-web I can see the message in the queue.

If I then fire up a worker, what happens is that the queue is deleted and nothing is processed.

When I fired up a worker first, then wrote to the queue via my web app, the message was processed successfully.

Why is this the case?

Each job runs twice

I'm observing peculiar behavior with Jesque. Each job that's pushed onto the queue causes the worker's run() method to be called twice. I can't tell if I'm doing something wrong or if there is some bug in Jesque. I've double checked though using the resque-web tool that there is really only one job in the queue. At the end of the run, I see the output twice:

Done...
Done...

My worker invocation looks like this:

public class JesqueViz extends JesqueWorker {

    public static void main(String[] args) throws Exception {
        final Config config = configureRedisConnection();

        final Worker worker = new WorkerImpl(config,
                Arrays.asList("viz"), 
                map(entry("com.foobar.viz.VizWorker", VizApp.class)));

        final Thread t = new Thread(worker);
        t.start();
        t.join();
    }    
}

And the worker's (VizApp's) run() method:

public class VizApp implements Runnable {

    public void run() {
        // do stuff
        System.out.println("Done...");
    }
}

load_ptf cannot be used as a delayed queue

I'm getting this exception when trying to enqueue a delayed job in a queue that contains classig jobs, is this behaviour normal ?
After debugging I found out the type of the queues is list, in this method jesque waits for a zset or a none.

public static boolean canUseAsDelayedQueue(final Jedis jedis, final String key) {
    final String type = jedis.type(key);
    return (ZSET.equalsIgnoreCase(type) || NONE.equalsIgnoreCase(type));
}

java.lang.IllegalArgumentException
Error
load_ptf cannot be used as a delayed queue
at net.greghaines.jesque.client.AbstractClient.doDelayedEnqueue(AbstractClient.java:276)
at net.greghaines.jesque.client.ClientPoolImpl$4.doWork(ClientPoolImpl.java:119)
at net.greghaines.jesque.client.ClientPoolImpl$4.doWork(ClientPoolImpl.java:113)
at net.greghaines.jesque.utils.PoolUtils.doWorkInPool(PoolUtils.java:58)
at net.greghaines.jesque.client.ClientPoolImpl.doDelayedEnqueue(ClientPoolImpl.java:113)
at net.greghaines.jesque.client.AbstractClient.delayedEnqueue(AbstractClient.java:289)

Release 2.0.1

There is a critical bug in 2.0.0 which is fixed with the Jackson Annotation commit you just did.

The Bug:
when reenqueuing a failed job, exception information is not deserialized and therefor stacktraces are gone in the updated JobFailure in the failed list.

it would be super helpful if you could release 2.0.1

Is there any obstacles one could help you with that is blocking a release?

Job Factory suggestion

Hey there. I am running jesque now and have already forked the related project, called jesque-spring, in order to add more status tracking. The jesque-spring project does some heavy subclassing of WorkerImpl, but really only to materialize job beans from Spring, instead of reflection. The spring retrieval would be easy to do with a factory pattern for job finding, an easy and optional addition to the current reflection methods. Anyway, I can submit a patch in that direction if you are interested.
I would add a one job factory property to the Worker class and create a factory interface with a single method. Implementors could then inject a factory of their choosing.
Is there another approach to this problem that I am not seeing? Many thanks!
Greg

Recover failed job

There is an issue when a job fails.

  1. Job was submited
  2. the worker starts to process this job and crash before complete it. Eg: The worker machine shutdown.
  3. The job remains in the working queue forever.

Is it possibile?

net.greghaines.jesque.utils.NoSuchConstructorException

Can't find much information about this...it's being thrown in the ReflectionUtils.findConstructor() method...

My job has a proper constructor which takes one argument to fill its one attribute. I also added a blank, default constructor and that still didn't fix the problem. This Object's attribute is an object that also has properly defined constructors.

Example Failure; NoSuchConstructorException

Hi,

I am trying to make the example on README to work. Though the worker seems to be failing executing the job.

At the MONITOR command, I see an exception which is RPUSHed to "resque:failed"; net.greghaines.jesque.utils.NoSuchConstructorException

Monitor Log;

1401107990.266951 [0 127.0.0.1:61234] "SELECT" "0"
1401107990.644568 [0 127.0.0.1:61234] "SADD" "resque:queues" "foo"
1401107990.646014 [0 127.0.0.1:61234] "RPUSH" "resque:queue:foo" "{\"class\":\"TestAction\",\"args\":[1,2.3,true,\"test\",[\"inner\",4.5]]}"
1401107990.646970 [0 127.0.0.1:61235] "SELECT" "0"
1401107990.648241 [0 127.0.0.1:61235] "TYPE" "resque:queue:fooDelay"
1401107990.648449 [0 127.0.0.1:61235] "ZADD" "resque:queue:fooDelay" "1.401108000646E12" "{\"class\":\"TestAction\",\"args\":[1,2.3,true,\"test\",[\"inner\",4.5]]}"
1401107990.648895 [0 127.0.0.1:61235] "SADD" "resque:queues" "fooDelay"
1401107990.663061 [0 127.0.0.1:61236] "SELECT" "0"
1401107990.666277 [0 127.0.0.1:61236] "SADD" "resque:workers" "epappas-mac.local:30947-0:JAVA_DYNAMIC_QUEUES,foo"
1401107990.667268 [0 127.0.0.1:61236] "SET" "resque:worker:epappas-mac.local:30947-0:JAVA_DYNAMIC_QUEUES,foo:started" "2014-05-26T13:39:50.666+0100"
1401107990.668316 [0 127.0.0.1:61236] "TYPE" "resque:queue:foo"
1401107990.668579 [0 127.0.0.1:61236] "WATCH" "resque:queue:foo"
1401107990.668742 [0 127.0.0.1:61236] "LINDEX" "resque:queue:foo" "0"
1401107990.682321 [0 127.0.0.1:61236] "MULTI"
1401107990.682353 [0 127.0.0.1:61236] "LPOP" "resque:queue:foo"
1401107990.682367 [0 127.0.0.1:61236] "LPUSH" "resque:inflight:epappas-mac.local:30947-0:JAVA_DYNAMIC_QUEUES,foo:foo" "{\"class\":\"TestAction\",\"args\":[1,2.3,true,\"test\",[\"inner\",4.5]]}"
1401107990.682412 [0 127.0.0.1:61236] "EXEC"
1401107990.732301 [0 127.0.0.1:61236] "SET" "resque:worker:epappas-mac.local:30947-0:JAVA_DYNAMIC_QUEUES,foo" "{\"run_at\":\"2014-05-26T12:39:50.716+0000\",\"queue\":\"foo\",\"payload\":{\"class\":\"TestAction\",\"args\":[1,2.3,true,\"test\",[\"inner\",4.5]]},\"paused\":false}"
1401107990.735992 [0 127.0.0.1:61236] "PING"
1401107990.736167 [0 127.0.0.1:61236] "INCR" "resque:stat:failed"
1401107990.736430 [0 127.0.0.1:61236] "INCR" "resque:stat:failed:epappas-mac.local:30947-0:JAVA_DYNAMIC_QUEUES,foo"
1401107990.741058 [0 127.0.0.1:61236] "RPUSH" "resque:failed" "{\"worker\":\"epappas-mac.local:30947-0:JAVA_DYNAMIC_QUEUES,foo\",\"queue\":\"foo\",\"payload\":{\"class\":\"TestAction\",\"args\":[1,2.3,true,\"test\",[\"inner\",4.5]]},\"exception\":\"net.greghaines.jesque.utils.NoSuchConstructorException\",\"error\":\"class=Main$TestAction args=[1, 2.3, true, test, [inner, 4.5]]\",\"backtrace\":[\"\\tat net.greghaines.jesque.utils.ReflectionUtils.findConstructor(ReflectionUtils.java:205)\",\"\\tat net.greghaines.jesque.utils.ReflectionUtils.createObject(ReflectionUtils.java:147)\",\"\\tat net.greghaines.jesque.utils.JesqueUtils.materializeJob(JesqueUtils.java:400)\",\"\\tat net.greghaines.jesque.worker.MapBasedJobFactory.materializeJob(MapBasedJobFactory.java:49)\",\"\\tat net.greghaines.jesque.worker.WorkerImpl.process(WorkerImpl.java:523)\",\"\\tat net.greghaines.jesque.worker.WorkerImpl.poll(WorkerImpl.java:399)\",\"\\tat net.greghaines.jesque.worker.WorkerImpl.run(WorkerImpl.java:183)\",\"\\tat java.lang.Thread.run(Thread.java:745)\"],\"failed_at\":\"2014-05-26T12:39:50.736+0000\",\"retried_at\":null}"
1401107990.743568 [0 127.0.0.1:61236] "LPOP" "resque:inflight:epappas-mac.local:30947-0:JAVA_DYNAMIC_QUEUES,foo:foo"
1401107990.744015 [0 127.0.0.1:61236] "DEL" "resque:worker:epappas-mac.local:30947-0:JAVA_DYNAMIC_QUEUES,foo"
1401107990.744314 [0 127.0.0.1:61236] "TYPE" "resque:queue:foo"
1401107990.744603 [0 127.0.0.1:61236] "WATCH" "resque:queue:foo"
1401107990.744812 [0 127.0.0.1:61236] "LINDEX" "resque:queue:foo" "0"
1401107990.745252 [0 127.0.0.1:61236] "UNWATCH"
1401107991.246603 [0 127.0.0.1:61236] "TYPE" "resque:queue:foo"
1401107991.246809 [0 127.0.0.1:61236] "WATCH" "resque:queue:foo"
1401107991.246932 [0 127.0.0.1:61236] "LINDEX" "resque:queue:foo" "0"
1401107991.247247 [0 127.0.0.1:61236] "UNWATCH"
1401107995.667507 [0 127.0.0.1:61236] "SREM" "resque:workers" "epappas-mac.local:30947-0:JAVA_DYNAMIC_QUEUES,foo"
1401107995.667696 [0 127.0.0.1:61236] "DEL" "resque:worker:epappas-mac.local:30947-0:JAVA_DYNAMIC_QUEUES,foo" "resque:worker:epappas-mac.local:30947-0:JAVA_DYNAMIC_QUEUES,foo:started" "resque:stat:failed:epappas-mac.local:30947-0:JAVA_DYNAMIC_QUEUES,foo" "resque:stat:processed:epappas-mac.local:30947-0:JAVA_DYNAMIC_QUEUES,foo"

My Java code;

import static net.greghaines.jesque.utils.JesqueUtils.entry;
import static net.greghaines.jesque.utils.JesqueUtils.map;

public class Main {
    private static final Logger log = LoggerFactory.getLogger(Main.class);

    class TestAction implements Runnable {

        private final Logger log = LoggerFactory.getLogger(TestAction.class);

        private final Integer i;
        private final Double d;
        private final Boolean b;
        private final String s;
        private final List<Object> l;

        public TestAction(final Integer i, final Double d, final Boolean b, final String s, final List<Object> l) {
            this.i = i;
            this.d = d;
            this.b = b;
            this.s = s;
            this.l = l;
        }

        public void run() {
            log.info("TestAction.run() {} {} {} {} {}", new Object[]{this.i, this.d, this.b, this.s, this.l});
            try {
                Thread.sleep(100);
            } catch (Exception e) {
            }
        }
    }

    public static void main(String[] args) {
        // Configuration
        final Config config = new ConfigBuilder().build();
        {
            // Add a job to the queue
            final Job job = new Job("TestAction",
                    new Object[]{1, 2.3, true, "test", Arrays.asList("inner", 4.5)});
            final Client client = new ClientImpl(config);
            client.enqueue("foo", job);
            client.end();
        }
        {
            // Add a job to the delayed queue
            final Job job = new Job("TestAction",
                    new Object[]{1, 2.3, true, "test", Arrays.asList("inner", 4.5)});

            final long delay = 10; // in seconds
            final long future = System.currentTimeMillis() + (delay * 1000); // timestamp

            final Client client = new ClientImpl(config);
            client.delayedEnqueue("fooDelay", job, future);
            client.end();
        }

        // Start a worker to run jobs from the queue
        final Worker worker = new WorkerImpl(config,
                Arrays.asList("foo"), new MapBasedJobFactory(map(entry("TestAction", TestAction.class))));
        final Thread workerThread = new Thread(worker);
        workerThread.start();
    }
}

Thanks!

Redis username configuration

I'm using RedisToGo and the Redis URL that I need to connect to is of the form:

redis://user:[email protected]:9259/

I can see that the ConfigBuilder accepts a withPort, withHost and withPassword but it does not provide a withUser.

FYI have the following code to extract the details from the REDIS_PROVIDER_URL (compatible with Sidekiq)

    final ConfigBuilder configBuilder = new ConfigBuilder();

    try {
      URI redisUrl = new URI(System.getProperty("REDIS_PROVIDER", "127.0.0.1"));

      String redisHost = redisUrl.getHost();
      int redisPort = redisUrl.getPort();
      String redisUserInfo = redisUrl.getUserInfo();

      if (redisHost != null) {
        configBuilder.withHost(redisHost);
      }

      if (redisPort > -1) {
        configBuilder.withPort(redisPort);
      }

      if (redisUserInfo != null) {
        configBuilder.withPassword(redisUserInfo);
      }

      // need to split the UserInfo into username and password here and set it with:
      // configBuilder.withUser(); and  configBuilder.withUser();
    }
    catch (URISyntaxException e) {
      System.err.println(e.toString());
      System.exit(1);
    }

    final Config config = configBuilder.build();

net.greghaines.jesque.utils.NoSuchConstructorException for Job with class and named arguments

After reading #38 I found that it is not possible to write

    final Job job = new Job("FooAction", new Foo("a", "b", "c"));

So, I tried using

    Map<String, Object> templateValues = new HashMap<String, Object>();
    Map<String,Object> constructorArgs = new HashMap<String, Object>();
    constructorArgs.put("template", "someTemplate");    
    constructorArgs.put("values", templateValues);
    final Job job = new Job("FooAction", constructorArgs);

I am trying to use:

    public Job(final String className, final Map<String,? extends Object> vars)

I am getting net.greghaines.jesque.utils.NoSuchConstructorException.

1.) Is that a bug or am I using the API in wrong way (like using maps)?
2.) If I have to use ObjectMapper as specified in #38 , can you give a complete example of how to use it and where to call the ObjectMapper?

Implement failover queues/worker

Queues are polled with an atomic 'lpopFailover' method to store a job on the
failover data-set(failoverQueue, failoverData). If a worker is terminated immediately while processing a job,
that job will be requeued eventually by FailoverWorker. Otherwise it will be just removed from the failover data-set.

Wheather worker terminated nomally or not(SIGTERM, SIGKILL, machine-failure, etc), failback will be run automatically.

Selected database is lost when recovering from an exception

Somewhat related to issues #21 and #22, another place that's susceptible to the same error is JedisUtils#reconnect:

jedis.disconnect();
try { Thread.sleep(reconnectSleepTime); } catch (Exception e2){}
jedis.connect();

Jedis' disconnect function zeroes the select DB index, and so the selected DB is lost for the rest of the operation.
This function's called from WorkerImpl#recoverFromException but the DB index isn't reselected

client.doDelayedEnqueue() occasionally throws exception

After running for a few hours, I eventually see the following exception when enqueuing a delayed job:

Caused By java.lang.IllegalArgumentException: default is not a delayed queue
    at net.greghaines.jesque.client.AbstractClient.doDelayedEnqueueโ€‹(AbstractClient.java:273)
    at net.greghaines.jesque.client.ClientPoolImpl$4.doWorkโ€‹(ClientPoolImpl.java:119)
    at net.greghaines.jesque.client.ClientPoolImpl$4.doWorkโ€‹(ClientPoolImpl.java:113)
    at net.greghaines.jesque.utils.PoolUtils.doWorkInPoolโ€‹(PoolUtils.java:58)
    at net.greghaines.jesque.client.ClientPoolImpl.doDelayedEnqueueโ€‹(ClientPoolImpl.java:113)
    at net.greghaines.jesque.client.AbstractClient.delayedEnqueueโ€‹(AbstractClient.java:286)

I don't have a repro for this because it doesn't happen often, but I think it's the same root cause as #56. IsDelayedQueue() is returning false because the queue is empty so redis is reporting the key's type as none. If you let the test code I attached in #56 run for a few hours you may see this reproduced. Also, testing with a high worker count or even multiple hosts may help in producing a local repro.

In my case, I only have one queue and it is always delayed (zset), so I am thinking of just changing IsDelayedQueue() to return true but that's pretty hacky. Any ideas on a better fix for this?

Thanks!

Method success() in WorkerImpl sometimes might not fire JOB_SUCCESS event.

Method in question:

 protected void success(final Job job, final Object runner, final Object result, final String curQueue) {
        this.jedis.incr(key(STAT, PROCESSED));
        this.jedis.incr(key(STAT, PROCESSED, this.name));

        this.listenerDelegate.fireEvent(JOB_SUCCESS, this, curQueue, job, runner, result, null);
    }

Lets say we have a long running job (in my case it was ~10 minutes).
Job completes, and method success() is invoked.
But this.jedis.incr(key(STAT, PROCESSED)); throws an exception:

redis.clients.jedis.exceptions.JedisConnectionException: It seems like server has closed the connection.
    at redis.clients.util.RedisInputStream.readLine(RedisInputStream.java:90)
    at redis.clients.jedis.Protocol.processStatusCodeReply(Protocol.java:85)
    at redis.clients.jedis.Protocol.process(Protocol.java:74)
    at redis.clients.jedis.Protocol.read(Protocol.java:131)
    at redis.clients.jedis.Connection.getIntegerReply(Connection.java:188)
    at redis.clients.jedis.Jedis.incr(Jedis.java:595)

And this causes this.listenerDelegate.fireEvent(JOB_SUCCESS, this, curQueue, job, runner, result, null); to be not executed. And listeners are not being notified.
Probably there is a similar issue with failure() method.

check if a job exists

We probably need a way to check if a job (delayed and recurring as well) exists in the queue.
It is something that I find very useful while writing test cases.
Something like an assertJob would be nice.

Thoughts ?

How to run workers / best practice for cleaning up

Is there a best practice to running worker(s) and cleaning up?

I currently run a worker like so:

      final Worker worker = new WorkerImpl(jesqueConfig, Arrays.asList(queue), Arrays.asList(PhotoUploadActivity.class));
      Thread t = new Thread(worker);
      t.start();
      t.join();

And this seems to work just fine. However the only way for me to cancel the process is to send it a signal, either a TERM or USR{1,2}.

If I look in resque (via resque-web) I can see that there are multiple of these Java workers that just accrue up - they are not getting pruned.

Thus - whats the best practice to gracefully shutting down the runtime container for workers?. I've added a JVM shutdown hook which attempts to call worker.end(true) but this never seems to get called.

Pause all workers in cluster?

I would love to be able to pause/resume all workers that are using queues on a given Redis instance using a single API method.

For instance, the Client interface could have pauseAllWorkers() and resumeAllWorkers() methods. The pauseAllWorkers method would block until all of the workers complete their current task. Then the workers would remain in a paused state until they were resumed.

Without a feature like this, workers must be individually paused and un-paused via the Worker.togglePause() method. This isn't so hard to do on a single machine, but if workers are running on multiple machines, this task is much more difficult. A new control bus that must be implemented by the application developer to deal with this. It seems to me that this control layer would be a very nice feature for the task queue implementation (in this case Jesque) itself.

There are some thorny problems to implemented the control layer - sending the pause messages in a robust way, acking the messages, etc. That makes it seem to me all the better to do right inside the library.

Thoughts?

Usage of WorkerPool - threads lack a join() ?

Please excuse my Java noobness.

I have the first pattern of instantiating individual WorkerImpl classes and then putting those instances in threads and calling start() and join() on each thread.

Like so:

      final Thread[] threads = new Thread[2];
      final Worker[] workers = new Worker[2];

      workers[0] = new WorkerImpl(jesqueConfig, Arrays.asList("avatars"), Arrays.asList(FetchUserAvatar.class));
      workers[1] = new WorkerImpl(jesqueConfig, Arrays.asList("resize"), Arrays.asList(PhotoUploadActivity.class));
      threads[0] = new Thread(workers[0]);
      threads[1] = new Thread(workers[1]);

      for(int i = 0; i < threads.length; i++) {
        threads[i].start();
        threads[i].join();
      }

This works perfectly. It appears that the WorkerPool class encapsulates all of this for me, so my first stab is:

      final WorkerPool avatarsPool = new WorkerPool(jesqueConfig, Arrays.asList("avatars"), Arrays.asList(FetchUserAvatar.class), 10);
      final WorkerPool photosPool = new WorkerPool(jesqueConfig, Arrays.asList("resize"), Arrays.asList(PhotoUploadActivity.class), 10);

      avatarsPool.run();
      photosPool.run();

Which executes and then immediately quits. Looking at the source of WorkerPool I never see join() called on its threads, thus the main thread never waits and immediately ceases execution.

What is the proper way to use the WorkerPool?

Thanks.

Implementing a recurring job

hey there,

I'm currently using jesque now for creating and deleting scheduled jobs, additionally to create recurring jobs now I schedule a new job before the worker returns.

I was looking for a way to create it from within jesque and figured it could probably be achieved with a zadd (zadd job with new millis i.e) in the pop method (with the job meta & frequency stored in a redis hashmap).

I haven't seen how resque-scheduler does this, would be nice if someone could shed some light on it.

Thoughts ?

Error queues per job type

I'm using jesque and it's very good but i am not happy to have all failed jobs in the same queue. Would it be possible to configure a failed queue per job?

Server timeout

When no operation takes place for some time the server closes the connection, resulting in a stacktrace like this:

It seems like server has closed the connection.
at redis.clients.util.RedisInputStream.readLine(RedisInputStream.java:91)
at redis.clients.jedis.Protocol.processInteger(Protocol.java:106)
at redis.clients.jedis.Protocol.process(Protocol.java:66)
at redis.clients.jedis.Protocol.read(Protocol.java:123)
at redis.clients.jedis.Connection.getIntegerReply(Connection.java:161)
at redis.clients.jedis.Jedis.sadd(Jedis.java:1169)
at net.greghaines.jesque.client.ClientImpl.enqueue(ClientImpl.java:84)

Without resorting to connection pooling one should probably ping the server in a dedicated ScheduledExecutorService thread. I modified the default ClientImpl to that effect - AFAIK the problem is gone now. I'll report back here if it persists despite the pinging.

What happen when a job got failed?

What happen when a job got failed?Will jesque re-enqueue it and retry to re-do it?

We need to retry the failled job again and again and also make sure that every job will be excute at least one time.Is jesque suitable for this siuation?

Thanks & Best Regards!

Worker to use Pool<Jedis>

Hey,

What's the easiest way to make Worker classes use Pool instead of Jedis instance directly?

  • Animesh

what are limitations of jesque

Please clarify on following things.

  1. I see resque has resque scheduler , can i know whether jesque had
    implementation for scheduler or we have to use cron.
  2. What are the limitations for the jesque.
  3. Is jesque scalable

Handle extra arguments in Job (de)serialization

I am using Jesque to (succesfully) implement Java based workers for Sidekiq (Ruby).
I needed some hacking, but most of it was possible by extending existing classes and thus not messing with the original source code. However, Sidekiq uses extra variables in the json that defines a job. Jesque currently just raises a exception when it encounters variables that are not hardcoded in the JobJsonDeserializer. It would be nice if Jesque could store these values in a extra Map<String, Object> in the Job class, so anyone who has alternate implementations of Redis could handle them.
For now I had to alter the Job class to do exactly this.

Maybe there are better ways in Jesque to create custom Job classes, but it was not very obvious to me.

I would be happy to implement this and submit a pull request, if it has a change of getting included.

Delayed queues throw exceptions when using multiple workers

Hi,

I'm having a problem with delayed queues. After running Jesque for a while under light load, I noticed exceptions were being thrown at what looked like random times. Debugging this locally, I found that JesqueUtils.isDelayedQueue() is returning false when the delayed queue is empty. This causes a bunch of problems, because delayed queues may randomly get processed as regular queues.

It's a timing problem / race condition, but here's a repro that should work if you run it a few times:

import net.greghaines.jesque.*;
import net.greghaines.jesque.client.*;
import net.greghaines.jesque.worker.*;
import redis.clients.jedis.JedisPool;
import java.util.Arrays;

import static net.greghaines.jesque.utils.JesqueUtils.entry;
import static net.greghaines.jesque.utils.JesqueUtils.map;

public class TestHarness {
    public static final Config config = new ConfigBuilder().withHost("localhost").withPort(6379).withDatabase(0).build();
    public static final Client client = new ClientPoolImpl(config, new JedisPool("localhost"));
    public static final String queue = "default";
    public static long runCount = 0;

    public static void main(String[] args) {
        test();
    }

    public static void test() {
        // Start workers
        JobFactory jobFactory = new MapBasedJobFactory(map(entry(TestAction.class.getSimpleName(), TestAction.class)));
        WorkerImplFactory workerFactory = new WorkerImplFactory(config, Arrays.asList(queue), jobFactory);
        WorkerPool pool = new WorkerPool(workerFactory, 10);
        pool.run();

        // Start jobs
        enqueue();

        // Wait a few secs then shutdown
        try { Thread.sleep(5000); } catch (Exception e){} // Give ourselves time to process
        client.end();
        try { pool.endAndJoin(true, 100); } catch (Exception e){ e.printStackTrace(); }
    }

    public static void enqueue() {
        long future = System.currentTimeMillis() + 10;
        Job job = new Job(TestAction.class.getSimpleName());
        client.delayedEnqueue(queue, job, future);
    }

    public class TestAction implements Runnable {
        public void run() {
            System.out.println("TestAction.run(): " + runCount++);
            enqueue();
        }
    }
}

The code above throws an exception for me about every time I run it. Here's an example of the output:

<snip/>
TestAction.run(): 7
redis.clients.jedis.exceptions.JedisDataException: WRONGTYPE Operation against a key holding the wrong kind of value
  at redis.clients.jedis.Protocol.processError(Protocol.java:96)
  at redis.clients.jedis.Protocol.process(Protocol.java:114)
  at redis.clients.jedis.Protocol.read(Protocol.java:183)
  at redis.clients.jedis.Connection.getBinaryBulkReply(Connection.java:189)
  at redis.clients.jedis.Connection.getBulkReply(Connection.java:178)
  at redis.clients.jedis.Jedis.lindex(Jedis.java:994)
  at net.greghaines.jesque.worker.WorkerImpl.lpoplpush(WorkerImpl.java:727)
  at net.greghaines.jesque.worker.WorkerImpl.pop(WorkerImpl.java:434)
  at net.greghaines.jesque.worker.WorkerImpl.poll(WorkerImpl.java:396)
  at net.greghaines.jesque.worker.WorkerImpl.run(WorkerImpl.java:183)
  at java.lang.Thread.run(Thread.java:745)
TestAction.run(): 8
<snip/>

Inspecting the keys in redis-cli reveals that when the last member is popped off the queue, the data type changes from zset to none, which is causing our problem in JedisUtils.isDelayedQueue():

$ redis-cli
127.0.0.1:6379> type resque:queue:default
zset
127.0.0.1:6379> zrange resque:queue:default 0 -1
1) "{\"class\":\"TestAction\",\"args\":[]}"
127.0.0.1:6379> zrem resque:queue:default "{\"class\":\"TestAction\",\"args\":[]}"
(integer) 1
127.0.0.1:6379> type resque:queue:default
none

I am new to Redis so I'm not sure if that's the expected behavior or not, but it does seem a little weird that the data type changes ... and it's breaking delayed queues.

What's the right way to fix this in Jesque? Thanks!

BTW, this test was run on jesque-2.0.0 from Mavencentral under Redis 2.8.13, Java 1.7.0_67, and OSX 10.9.4. It's also reproducible on AWS under Ubuntu, Java 1.7, and hosted ElastiCache Redis.

Example of passing variables to workers

Just wondering if it's possible to pass a variable to a worker in a constructor. For example, I have the following code:

 class Producer implements Runnable {
   private final BlockingQueue queue;
   private final String arg1;
   private final String arg2;

   public Producer(BlockingQueue q) { queue = q; }
   public Producer(String arg1, String arg2) {
      this.arg1 = arg1;
      this.arg2 = arg2;
  }

   public void run() {
     queue.put(produce()); }
   }
   Object produce() { ... }
 }

 class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
 }

 class Setup {
   void main() {
     BlockingQueue q = new SomeQueueImplementation();
     Consumer c1 = new Consumer(q);
     Consumer c2 = new Consumer(q);

      final Config config = new ConfigBuilder().build();

      // how can I send the BlockingQueue to the Producer in the constructor here?
      final Worker worker = new WorkerImpl(config,
        Arrays.asList("default"), new MapBasedJobFactory(map(entry(Producer.class.getSimpleName(), Producer.class))));

    final Thread workerThread = new Thread(worker);
    workerThread.start();

     new Thread(c1).start();
     new Thread(c2).start();
   }
 }

Here I want to be have jesque to pass the BlockingQueue to the constructor when it creates the Worker so that it has access to the queue.

For now I'm just using a global variable to achieve this.

If there's a better way please let me know and I'll contribute to the README to illustrate the example. Cheers

Keep alive connection

I'm using the ClientImpl like show below but I'm still getting timeouts:

    final net.greghaines.jesque.client.Client jesqueMtClient = new net.greghaines.jesque.client.ClientImpl(
      jesqueConfig, 15, 15,
      java.util.concurrent.TimeUnit.SECONDS
    );

In ClientImpl on https://github.com/gresrun/jesque/blob/master/src/main/java/net/greghaines/jesque/client/ClientImpl.java#L92 checkConnectionBeforeUse is set to false. However on https://github.com/gresrun/jesque/blob/master/src/main/java/net/greghaines/jesque/client/ClientImpl.java#L155 the method that ensures the jedisConnection is open only runs if checkConnectionBeforeUse is set to true

    private void ensureJedisConnection() {
        if (this.checkConnectionBeforeUse && !JedisUtils.ensureJedisConnection(this.jedis)) {
            authenticateAndSelectDB();
        }
    }

I'm wondering if this is a bug or whether it's intentional. If it's intentional is there another way in which I should be keeping the connection open?

Thanks again for your help.

Using ensureJedisConnection(); in the Pool

Hi,

Just wondering if it's possible to use ensureJedisConnection() to ensure the pool connection is open before enqueuing a job in ClientPoolImpl as it's currently used in ClientImpl

If it's not currently possible I will submit a PR to add this functionality. Anything special I need to know?

Example is incorrect

In the example on the main page, the delayed job is scheduled to be executed in 10 seconds, but the worker seems to end after 5 second.

Change DefaultWorkerExceptionHandler?

I'm working on getting the Grails Jesque plugin to shutdown workers gracefully and have run into what I think might be the wrong condition in DefaultWorkerExceptionHandler.

When calling worker.end(true), the state of the worker is set to SHUTDOWN, then interrupts the worker thread. An exception is raised in the worker if it's sleeping, and the exception handler is called. In the DefaultWorkerExceptionHandler if the exception is InterruptedException and the worker is NOT SHUTDOWN it is told to terminate, but if it is interrupted and is shutdown then it's told to proceed. Now, in case case of a normal worker.end(true), this will lead to a log.warn with a stack trace in the exception handler, and it will actually call worker.end() again (but with now==false) . It seems like a normal worker.end(true) call should not by default log an exception, in which case the conditional in DefaultWorkerExceptionHandler would always return PROCEED for an interrupted exception regardless of worker state; if the worker is not shutdown, it will proceed like it does today, if it is shutdown PROCEED will just get the worker back in the pooling loop and shutdown gracefully because of the change in worker state.

I know I could just create my own worker exception handler, but it seems like the default should change in this case. If you agree, I can create a pull request.

Duplicate worker threads with same queues created

I am trying to create worker threads on start of server and stop the worker threads when stopping the server. Since the worker threads are created with process ids appended to keys, I can see duplicates.

1.) Is there a way to create non-duplicate worker threads.

2.) If I want to stop all worker threads with no jobs, how to do it? I tried this, but think this is not correct

List<WorkerInfo> workerInfos = workerInfoDAO.getAllWorkers();
    for(WorkerInfo info : workerInfos){
    if(WorkerInfo.State.IDLE.equals(info.getState())){
           LOGGER.info("Removing workers in IDLE state : " + info.getName());
       workerInfoDAO.removeWorker(info.getName());
        }
}

P.S: I couldn't mark it as discussion topic, so adding it as issue.

ReflectionUtils findConstructor fails to recognize scala constructors that accept variable count of parameters

Hi,

Thanks for releasing 2.0, the various improvements are great!

We've encountered a problem when defining class constructors with variable params count in scala for jobs.
For:

package com.test
class A(args: String*) {}

Running:

Class.forName("com.test.A").getConstructors()(0).getParameterTypes

Returns:

Array[Class[_]] = Array(interface scala.collection.Seq)

The signature of the class using reflections shows a single argument with the type scala.collection.Seq (I believe it matches java's java.util.List) without the type of the actual parameters passed ("String") and current implementation of findConstructor fails to match it to actual parameters array read from redis (["string1", "string2", "string3"]) and throws an exception.

Workaround - we avoid using variable parameters length constructors in our jobs, not a big deal.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.