Giter Site home page Giter Site logo

executor's Introduction

Key Sequential Executor

This small library provides an optimized solution to a problem where tasks for a particular key need to be processed sequentially as they arrive. This kind of problem can be solved by a SingleThreadExecutor; however, it is not efficient. The issue is that the tasks for unrelated keys are not being processed in parallel, instead they are put into a queue common to all keys and wait for the single thread to execute them. This library allows them to be executed concurrently. Moreover this library works well in a situation where all the possible keys and their number is not known upfront.

Usage

A typical scenario in order management or booking systems is that messages for a particular trade A must be processed sequentially in the same order as they are received (otherwise the state of the trade will be incorrect). The same is true for any other trade - for example messages for the trade B must be processed sequentially as well. However, it is desirable that a message for the trade A does not block processing of a message for the trade B (and vice versa) if they happen to arrive at the same time.

ExecutorService underlyingExecutor = Executors.newFixedThreadPool(10);
KeySequentialRunner<String> runner = new KeySequentialRunner<>(underlyingExecutor);

String tradeIdA = "327";
String tradeIdB = "831";
// more Trade IDs can arrive in a real scenario, but it is usually not known how many upfront

Runnable task = new Runnable() {
    @Override
    public void run() {
        // process a message for the trade
    }
};

runner.run(tradeIdA, task); // execute the task by the underlying executor

runner.run(tradeIdB, task); // execution is not blocked by the task for tradeIdA

runner.run(tradeIdA, task); // execution starts when the previous task for tradeIdA completes

In the example above the key is a Trade ID. Tasks for a particular Trade ID are executed sequentially, but they do not block tasks for other Trade IDs (unless the tasks are blocked by the underlying executor).

Please note the Key needs to correctly implement hashCode and equals methods as the implementation stores the tasks in a HashMap.

If you require an Executor you can use KeySequentialExecutor instead of KeySequentialRunner which accepts Runnable delegating its hashCode and equals methods to the key.

Executor executor = new KeySequentialExecutor(underlyingExecutor);

// KeyRunnable is a helper class delegating 'hashCode' and 'equals' to the key
Runnable runnable = new KeyRunnable<>(tradeIdA, task);

executor.execute(runnable);

underlyingExecutor.shutdown();

// at this point, tasks for new keys will be rejected
// however, tasks for keys being currently executed may still be accepted (and executed)

underlyingExecutor.awaitTermination(timeout, TimeUnit.SECONDS);

// if the executor terminates before a timeout, then it is guaranteed that all accepted
// tasks have been executed

The KeySequentialExecutor and KeySequentialRunner do not support back-pressure. It means that execute and run methods never block, instead the submitted tasks are put into a queue where they wait until executed by the underlying executor. In many cases this is not a problem, but in some situations it may cause an application to run out of memory as the number of waiting tasks grows. If you want to restrict the number of queued tasks, consider use of a KeySequentialBoundedExecutor which can be configured to block the task submission when the number of tasks, which haven't been executed yet, reaches the limit.

ExecutorService underlyingExecutor = Executors.newCachedThreadPool();
int maxTasks = 10;
KeySequentialBoundedExecutor boundedExecutor =
        new KeySequentialBoundedExecutor(maxTasks, BoundedStrategy.BLOCK, underlyingExecutor);

KeyRunnable<String> task = new KeyRunnable<>("my key", () -> {
    // do something
});

boundedExecutor.execute(task);
// execute more tasks ... at most 10 will be scheduled

// before shutting down you can call a 'drain' method
// which blocks until all submitted task have been executed

// returns true if drained; false if the timeout elapses
boundedExecutor.drain(timeout, TimeUnit.SECONDS);

// newly submitted tasks will be rejected after calling 'drain'

// safe to call 'shutdownNow' if drained as there should be no active tasks
underlyingExecutor.shutdownNow();

The source code of the examples can be found here.

A note on thread-safety: The library is thread-safe; i.e. methods run, execute or drain can be safely invoked from multiple threads without synchronization.

Maven Dependency

<dependency>
  <groupId>com.jano7</groupId>
  <artifactId>executor</artifactId>
  <version>2.0.2</version>
</dependency>

executor's People

Contributors

dependabot[bot] avatar jano7 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

executor's Issues

Get Queue size

Hi, Thanks for creating this library.
I wonder is there any way to get the current queue size for tracking the progress?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.