Giter Site home page Giter Site logo

vsilaev / tascalate-concurrent Goto Github PK

View Code? Open in Web Editor NEW
197.0 14.0 29.0 903 KB

Implementation of blocking (IO-Bound) cancellable java.util.concurrent.CompletionStage and related extensions to java.util.concurrent.ExecutorService-s

License: Apache License 2.0

Java 100.00%
java java8 java-library completablefuture completionstage concurrency concurrent concurrent-programming executor executorservice

tascalate-concurrent's Introduction

Maven Central GitHub release license

tascalate-concurrent

The library provides an implementation of the CompletionStage interface and related classes these are designed to support long-running blocking tasks (typically, I/O bound). This functionality augments the sole Java 8 built-in implementation, CompletableFuture, that is primarily supports computational tasks. Also, the library helps with numerous asynchronous programing challenges like handling timeouts, retry/poll functionality, orchestrating results of multiple concurrent computations and similar.

Since the version 0.7.0 the library is shipped as a multi-release JAR and may be used both with Java 8 as a classpath library or with Java 9+ as a module.

IMPORTANT!

In the version 0.8.0 the artifact was renamed: New name:

<dependency>
    <groupId>net.tascalate</groupId>
    <artifactId>net.tascalate.concurrent</artifactId>
    <version>0.9.8</version> <!-- Any version above 0.8.0, the latest one is recommended -->
</dependency>

Old Name

<dependency>
    <groupId>net.tascalate.concurrent</groupId>
    <artifactId>net.tascalate.concurrent.lib</artifactId>
    <version>0.7.1</version>
</dependency>

Why a CompletableFuture is not enough?

There are several shortcomings associated with CompletableFuture implementation that complicate its usage for real-life asynchronous programming, especially when you have to work with I/O-bound interruptible tasks:

  1. CompletableFuture.cancel() method does not interrupt underlying thread; it merely puts future to exceptionally completed state. So even if you use any blocking calls inside functions passed to thenApplyAsync / thenAcceptAsync / etc - these functions will run till the end and never will be interrupted. Please see CompletableFuture can't be interrupted by Tomasz Nurkiewicz.
  2. By default, all *Async composition methods of CompletableFutrure use ForkJoinPool.commonPool() (see here) unless explicit Executor is specified. This thread pool is shared between all CompletableFuture-s and all parallel streams across all applications deployed on the same JVM. This hard-coded, unconfigurable thread pool is completely outside of application developers' control, hard to monitor and scale. Therefore, in robust real-life applications you should always specify your own Executor. With API enhancements in Java 9+, you can fix this drawback, but it will require some custom coding.
  3. Additionally, built-in Java 8 concurrency classes provides pretty inconvenient API to combine several CompletionStage-s. CompletableFuture.allOf / CompletableFuture.anyOf methods accept only CompletableFuture as arguments; you have no mechanism to combine arbitrary CompletionStage-s without converting them to CompletableFuture first. Also, the return type of the aforementioned CompletableFuture.allOf is declared as CompletableFuture<Void> - hence you are unable to extract conveniently individual results of the each future supplied. CompletableFuture.anyOf is even worse in this regard; for more details please read on here: CompletableFuture in Action (see Shortcomings) by Tomasz Nurkiewicz.
  4. Support for timeouts/delays was introduced to CompletableFuture only in Java 9, so still widely supported applications running on Java 8 are left out without this important functionality. Plus, some design decisions like using delayed executors instead of 'delay' operator, are somewhat questionable.

There are numerous free open-source libraries that address some of the aforementioned shortcomings. However, none of them provides implementation of interruptible CompletionStage and no one solves all of the issues coherently.

How to use?

To use a library you have to add a single Maven dependency

<dependency>
    <groupId>net.tascalate</groupId>
    <artifactId>net.tascalate.concurrent</artifactId>
    <version>0.9.8</version>
</dependency>

What is inside?

1. Promise interface

This is the core interface of the Tascalate Concurrent library. It may be best described by the formula:

Promise == CompletionStage + Future

I.e., it combines both blocking Future’s API, including cancel(boolean mayInterruptIfRunning) method, AND composition capabilities of CompletionStage’s API. Importantly, all composition methods of CompletionStage API (thenAccept, thenCombine, whenComplete etc.) are re-declared to return Promise as well.

The decision to introduce an interface that merges CompletionStage and Future is aligned with the design of CompletableFuture API. In addition, several useful methods of CompletableFuture API are added as well:

T getNow(T valueIfAbsent) throws CancellationException, CompletionException;
T getNow(Supplier<? extends T> valueIfAbsent) throws CancellationException, CompletionException;
T join() throws CancellationException, CompletionException;

So it should be pretty straightforward to use the Promise as a drop-in replacement for the CompletableFuture in many cases.

Although the library is compatible with JDK 1.8 there are methods in Promise interface that makes it "upfront compatible" with the higher JDK versions (available since version 0.8.2):

Promise<T> exceptionallyAsync(Function<Throwable, ? extends T> fn);
Promise<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor);
Promise<T> exceptionallyCompose(Function<Throwable, ? extends CompletionStage<T>> fn);
Promise<T> exceptionallyComposeAsync(Function<Throwable, ? extends CompletionStage<T>> fn);
Promise<T> exceptionallyComposeAsync(Function<Throwable, ? extends CompletionStage<T>> fn, 
                                     Executor executor);

Plus, there are several overloads of the filtering operator that complements canonical map/flatMap (or thenApply/thenCompose in terms of the CompletionStage API), available since version 0.9.0:

Promise<T> thenFilter(Predicate<? super T> predicate);
Promise<T> thenFilter(Predicate<? super T> predicate, Function<? super T, Throwable> errorSupplier);
Promise<T> thenFilterAsync(Predicate<? super T> predicate);
Promise<T> thenFilterAsync(Predicate<? super T> predicate, Function<? super T, Throwable> errorSupplier);
Promise<T> thenFilterAsync(Predicate<? super T> predicate, Executor executor);
Promise<T> thenFilterAsync(Predicate<? super T> predicate, Function<? super T, Throwable> errorSupplier,
                           Executor executor);

Besides this, there are numerous operators in the Promise API to work with timeouts and delays, to override default asynchronous executor and similar. All of them will be discussed later.

When discussing Promise interface, it's mandatory to mention the accompanying class Promises that provides several useful methods to adapt third-party CompletionStage (including the standard CompletableFuture) to the Promise API. First, there are two unit operations to create successfully/faulty settled Promise-es:

public final class Promises {
  ...
  static <T> Promise<T> success(T value)
  static <T> Promise<T> failure(Throwable exception)
  ...
}

Actually, release 0.9.0 adds the third unit operation:

public final class Promises {
  ...
  static <T> Promise<T> maybe(Optional<T> maybeValue)
  ....
}

It converts the Optional supplied either to the successfully settled Promise or to the faulty settled one with NoSuchElementException.

Second, there is an adapter method from:

public final class Promises {
  ...
  static <T> Promise<T> from(CompletionStage<T> stage)
  ...
}

It behaves as the following:

  1. If the supplied stage is already a Promise then it is returned unchanged
  2. If stage is a CompletableFuture then a specially-tailored wrapper is returned.
  3. If stage additionally implements Future then specialized wrapper is returned that delegates all the blocking methods defined in Future API
  4. Otherwise generic wrapper is created with good-enough implementation of blocking Future API atop of asynchronous CompletionStage API.

To summarize, the returned wrapper delegates as much as possible functionality to the supplied stage and never resorts to CompletionStage.toCompletableFuture because in Java 8 API it's an optional method. From documentation: "A CompletionStage implementation that does not choose to interoperate with others may throw UnsupportedOperationException." (this text was dropped in Java 9+). In general, Tascalate Concurrent library does not depend on this method and should be interoperable with any minimal (but valid) CompletionStage implementation.

It's important to emphasize, that Promise-s returned from Promises.success, Promises.failure and Promises.from methods are cancellable in the same way as CompletableFuture, but are not interruptible in general, while interruption depends on a concrete implementation. Next we discuss the concrete implementation of an interruptible Promise provided by the Tascalate Concurrent library -- the CompletableTask class.

2. CompletableTask

This is why this project was ever started. CompletableTask is the implementation of the Promise API for long-running blocking tasks. Typically, to create a CompletableTask, you should submit Supplier / Runnable to the Executor right away, in a similar way as with CompletableFuture:

Promise<SomeValue> p1 = CompletableTask.supplyAsync(() -> {
  return blockingCalculationOfSomeValue();
}, myExecutor);

Promise<Void> p2 = CompletableTask.runAsync(this::someIoBoundMethod, myExecutor);

blockingCalculationOfSomeValue and someIoBoundMethod in the example above can have I/O code, work with blocking queues, do blocking get on regular Java-s Future-s and alike. If at later time you decide to cancel either of the returned promises then corresponding blockingCalculationOfSomeValue and someIoBoundMethod will be interrupted (if not completed yet).

In the realm of I/O-related functionality, failures like connection time-outs, missing or locked files are pretty common, and checked exceptions mechanism is used frequently to signal failures. Therefore the library provides an entry point to the API that accepts Callable instead of Supplier:

// Notice the checked exception in the method signature
byte[] loadFile(File file) throws IOException {
    byte[] result = ... //load file content;
    return result;
}
...
ExecutorService executorService = Executors.newFixedThreadPool(6);
Promise<byte[]> contentPromise = CompletableTask.submit(
    () -> loadFile(new File("./myfile.dat")), 
    executorService
); 

Additionally, there are 2 unit operations to create a CompletableTask:

a. CompletableTask.asyncOn(Executor executor) Returns an already-completed null-valued Promise that is "bound" to the specified executor. I.e. any function passed to asynchronous composition methods of Promise (like thenApplyAsync / thenAcceptAsync / whenCompleteAsync etc.) will be executed using this executor unless executor is overridden via explicit composition method parameter. Moreover, any nested composition calls will use same executor, if it’s not redefined via explicit composition method parameter:

CompletableTask
  .asyncOn(myExecutor)
  .thenApplyAsync(myValueGenerator)
  .thenAcceptAsync(myConsumer)
  .thenRunAsync(myAction);

All of myValueGenerator, myConsumer, myAction will be executed using myExecutor.

b. CompletableTask.completed(T value, Executor executor) Same as above, but the starting point is a Promise completed with the specified value:

CompletableTask
   .completed("Hello!", myExecutor)
   .thenApplyAsync(myMapper)
   .thenApplyAsync(myTransformer)   
   .thenAcceptAsync(myConsumer)
   .thenRunAsync(myAction);

All of myMapper, myTransformer, myConsumer, myAction will be executed using myExecutor. WARNING Before release 0.9.0 this method was named CompletableTask.complete(...)

Crucially, all composed promises support true cancellation (incl. interrupting thread) for the functions supplied as arguments:

Promise<?> p1 = CompletableTask.asyncOn(myExecutor)
Promise<?> p2 = p1.thenApplyAsync(myValueGenerator)
Promise<?> p3 = p2.thenRunAsync(myAction);
  
...
p2.cancel(true);

In the example above myValueGenerator will be interrupted if already in progress. Both p2 and p3 will be settled with failure: p2 with a CancellationException and p3 with a CompletionException.

You may notice, that above the term "asynchronous composition methods" is used, as well as *Async calls in examples (like thenApplyAsync, thenRunAsync. This is not accidental: non-asynchronous methods of CompletionStage API are not interruptible. The grounding beneath the design decision is that invoking asynchronous methods involves inevitable overhead of putting command to the queue of the executor, starting new threads implicitly, etc. And for simple, non-blocking methods, like small calculations, trivial transformations and alike this overhead might outweigh method's execution time itself. So the guideline is: use asynchronous composition methods for heavy I/O-bound blocking tasks, and use non-asynchronous composition methods for (typically lightweight) calculations.

Worth to mention, that CompletableTask-s and Promise-s composed out of it may be ever interruptible only if the Executor used is interruptible by nature. For example, ThreadPoolExecutor supports interruptible tasks, but ForkJoinPool does not!

3. DependentPromise

As it mentioned above, once you cancel Promise, all Promise-s that depends on this promise are completed with CompletionException wrapping CancellationException. This is a standard behavior, and CompletableFuture works just like this.

However, when you cancel derived Promise, the original Promise is not cancelled:

Promise<?> original = CompletableTask.supplyAsync(() -> someIoBoundMethod(), myExecutor);
Promise<?> derivedA = original.thenRunAsync(() -> someMethodA() );
Promise<?> derivedB = original.thenRunAsync(() -> someMethodB() );
...
derivedB.cancel(true);

So if you cancel derivedB above it's Runnable method, wrapping someMethod, is interrupted. However the original promise is not cancelled and someIoBoundMethod keeps running. Also, derivedA is not cancelled, and such behavior is expected. However, sometimes we have a linear chain of the promises and have a requirement to cancel the whole chain from a tail to the head. Consider the following method:

public Promise<DataStructure> loadData(String url) {
   return CompletableTask.supplyAsync( () -> loadXml(url) ).thenApplyAsync( xml -> parseXml(xml) ); 
}

...
Promise<DataStructure> p = loadData("http://someserver.com/rest/ds");
...
if (someCondition()) {
  // Only second promise is canceled, parseXml.
  p.cancel(true);
}

Clients of this method see only derived promise, and once they decide to cancel it, it is expected that any of loadXml and parseXml will be interrupted if not completed yet. To address this issue the library provides DependentPromise class:

public Promise<DataStructure> loadData(String url) {
   return DependentPromise
          .from(CompletableTask.supplyAsync( () -> loadXml(url) ))
          .thenApplyAsync( xml -> parseXml(xml), true ); 
}

...
Promise<DataStructure> p = loadData("http://someserver.com/rest/ds");
...
if (someCondition()) {
  // Now the whole chain is canceled.
  p.cancel(true);
}

DependentPromise overloads methods like thenApply / thenRun / thenAccept / thenCombine etc with additional argument:

  • if method accepts no other CompletionStage, like thenApply / thenRun / thenAccept etc, then it's a boolean flag enlistOrigin to specify whether or not the original Promise should be enlisted for the cancellation.
  • if method accepts other CompletionStage, like thenCombine / applyToEither / thenAcceptBoth etc, then it's a set of PromiseOrigin enum values, that specifies whether or not the original Promise and/or a CompletionStage supplied as argument should be enlisted for the cancellation along with the resulting promise, for example:
public Promise<DataStructure> loadData(String url) {
   return DependentPromise
          .from(CompletableTask.supplyAsync( () -> loadXml(url + "/source1") ))
          .thenCombine( 
              CompletableTask.supplyAsync( () -> loadXml(url + "/source2") ), 
              (xml1, xml2) -> Arrays.asList(xml1, xml2),
              PromiseOrigin.ALL
          )          .
          .thenApplyAsync( xmls -> parseXmlsList(xmls), true ); 
}

Please note, then in release 0.5.4 there is a new default method dependent in interface Promise that serves the same purpose and allows to write chained calls:

public Promise<DataStructure> loadData(String url) {
   return CompletableTask
          .supplyAsync( () -> loadXml(url) )
          .dependent()
          .thenApplyAsync( xml -> parseXml(xml), true ); 
}

Attention: just calling dependent() on the Promise is not enough to change the behavior of the "default" thenApply / thenRun / thenAccept / thenCombine etc methods defined in CompletionStage - you have to use overloaded form with either boolean or Set<PromiseOrigin> last parameter explicitly. This is the intentional design decision: just recall, that the Promise returned in the example above can be used by the client code to create independent promises (as per CompletionStage API), so the library has to follow the rule of the least surprise.

Below are 2 examples (wrong and right) to re-iterate the statement above in Java code.

Wrong code to cancel the whole chain:

Promise<Xml> xml = CompletableTask.supplyAsync( () -> loadXml(url) )
Promise<Data> data = xml.dependent()
                        .thenApplyAsync( xml -> parseXml(xml) );  // Not overloaded method - 
                                                                  // no "true" as the last parameter
...
// This cancels only "data" promise, but will not cancel "xml" promise
data.cancel(true);

Right code to cancel the whole chain:

Promise<Xml> xml = CompletableTask.supplyAsync( () -> loadXml(url) )
Promise<Data> data = xml.dependent()
                        .thenApplyAsync( xml -> parseXml(xml), true );  // Overloaded method - 
                                                                        // "true" as last parameter means 
                                                                        // enlist "origin" promise 
                                                                        // for cancellation
...
// This cancels both "data" promise AND the "xml" promise (if it's not completed yet)
data.cancel(true);

So far so good. And following CompletionStage API contract is good, no doubt. But what if you have to create locally a chain of 10 stages? 30 stages? It's both pretty boring and error-prone to put additional parameters to enforce overloaded method(s) usage. And error-prone means just this -- it's pretty easy to forget to type "true" as the latest parameter and break the cancellation chain. Tascalate Concurrent provides an option to handle this edge case: use overloaded form of dependent() method - dependent(Set<PromiseOrigin>). Here is an example from the real-life issue raised in the real-life project:

Promise<?> myPromise = 
CompletableTask.supplyAsync(() -> candidate, pool)
               .dependent(PromiseOrigin.ALL) //Key point - alter the behavior of CompletionStage API methds
               .thenApplyAsync(CheckIp::doWork)
               .thenApplyAsync(CheckType::doWork)
               .thenApplyAsync(CheckExclusion::doWork)
               .thenApplyAsync(AssignEntity::doWork)
               .thenCombineAsync(collectedStatsPromise, this:combineWithStats) // <-- pay attention here
               .thenApplyAsync(DecideWmi::doWork)
               .thenApplyAsync(ObtainWmiConnection::doWork)
               .thenApplyAsync(ObtainRegistryWmiConnection::doWork)
               .thenApplyAsync(RunCompliance::doWork)
               .thenApplyAsync(DecideComplianceResult::doWork)
               .thenApplyAsync(CreateAlert::doWork)
               .thenApplyAsync(ApplyFailureManager::doWork)
               .thenApplyAsync(ApplyDisconnectManager::doWork)
               .thenApplyAsync(EnforceCompliance::doWork)
               .exceptionally(ExceptionHandlerService::handle);

Unlike the conservative form of the dependent() decorator, the overloaded dependent(PromiseOrigin.ALL) will enlist ALL the promises mentioned by composition methods as dependent and will cancel them when myPromise is cancelled. Even the collectedStatsPromise that is passed from outside! Everything enlisted will form a single block of cancellation. By the way, if you don't need to cancel collectedStatsPromise above use dependent(PromiseOrigin.THIS_ONLY) decorator - it doesn't enlist "side-passed" promises.

Well, but what about obeying CompletionStage contract? Is it still safe to return myPromise to clients that are unaware of its overwritten behavior? It's not. You should revert the effect of the dependent(PromiseOrigin.ALL) decorator before passing myPromise outside:

public Promise<?> forkWmiProcessingChain(Promise<Stats collectedStatsPromise) {
  Promise<?> myPromise = 
  CompletableTask.supplyAsync(() -> candidate, pool)
                 .dependent(PromiseOrigin.ALL) 
                 .thenApplyAsync(CheckIp::doWork)
                 .thenApplyAsync(CheckType::doWork)
                 ...
                 .exceptionally(ExceptionHandlerService::handle);
  return myPromise.unwrap(); // or myPromise.raw() in this case               
}

The final operator in the function above - unwrap() removes the latest decorator applied, the dependent(...). Looking ahead, I should mention that raw() will remove all the decorators applied. Yes, there are several possible - and the dependent(...) is just one of them. In the next chapters we discuss defaultAsyncOn decorator and later you will see custom decorators (the ones not directly bound to the Promise API as methods).

4. Wait! Wait! It's not cancellable!!!

One of the most common complains in the issue tracker of Tascalate Concurrent project is a variation of the report "code is not cancelled when requested". The symptoms are typical: Promise is cancelled, any dependent Promise-s are completed exceptionally (as expected) but threads keep running till the end of the code submitted to an executor. So the primary promise of the library is broken: execution is not interrupted.

Is library broken? No. Is user code correct? Well, yes. But library code and user code do not cooperate well to really cancel asynchronous execution.

The promise returned from CompletableTask methods like supplyAsync / runAsync as well as any derived Promise (via thenApply or thenRun) does the following two things:

  1. Set own state to cancelled and let transitions to any dependent promises run (same as CompletableFuture)
  2. Interrupt the current Thread via Thread.currentThread().interrupt() (unlike CompletableFuture)

And it's responsibility of the user code - all of these Runnable, Consumer<T> and Function<T,R> passed - to react on the Thread.currentThread().isInterrupted() flag. Let us consider typical scenarios.

  1. User code is a computational code and do not use I/O.

In this case you have to put explicit checks for Thread.currentThread().isInterrupted() to break execution via either breaking loops, early returns or throwing exception:

outer:
for (int i = 0; i < SOME_LARGE_SIZE; i++) {
  for (int j = 0; j < SOME_EVEN_LARGER_SIZE; j++) {
    if (Thread.currentThread().isInterrupted()) {
      break outer;
    }
    // some calcs here
  }
}

It's up to library user to select how often to check the flag: there are always trade-offs between overhead for regular execution time (when execution is not cancelled) and the delay before thread react on cancellation. In the example above the check may be moved from the inner loop to the outer loop - this will speed-up normal execution path but the code will react on the cancellation with delay.

Some API provides good built-in points to check for early exit, the good example is java.nio.file.FileVisitor + java.nio.file.Files walkFileTree. Here you can just put checks in previsitDirectory / visitFile:

@Test
@SuppressWarnings("unchecked")
public void testReallife() throws Exception {
  ElapsedTime time = new ElapsedTime();
  Executor executor = Executors.getDefaultExecutor();
  Promise<Void> promise = CompletableTask.runAsync(() -> {
    try {
      Files.walkFileTree(Paths.get("d:/"), new FileVisitor<>() {
        public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
          System.out.printf("[%d ms] [%s] Task#%d: visiting dir %s%n", 
                            time.current(), Thread.currentThread(), id, dir);
          if (Thread.currentThread().isInterrupted()) {
            // STOP when interrupted
            return FileVisitResult.TERMINATE;
          } else {
            return FileVisitResult.CONTINUE;
          }
        }

      public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {...}
      public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {...}
      public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {...}
  }, executor);
  Thread.sleep(3000);
  System.out.printf("[%d ms] canceling%n", time.current());
  promise.cancel(true);
  assertCanceled(promise);
}
  1. You are using old blocking API of java.io.InputStream or java.net.URLConnection Probably, your code uses old blocking streams from java.io like java.io.FileInputStream or SocketInputStream opened from java.net.Socket. Or you are using java.net.URLConnection. Or outdated blocking Apache HTTP Client. These all prevents running thread from termination once interrupted -- all of these API-s are blocking and do not react on interruptions. The options are:
  • If you are using some third-party synchronous networking library then try to replace it with the asynchronous version. For HTTP protocol there is a plenty of options like AsyncHttpClient and OkHttp. Both returns CompletableFuture as operation results and therefore could be easily plugged with the rest of Tascalate Concurrent code.
  • If you are using blocking file API try to replace it with NIO version, it's the best option:
try (InputStream in = new FileInputStream("E:/Downloads/LargeFile.rar")) {
... // Work with input stream
}

==>
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
...
try (InputStream in = 
  Channels.newInputStream(FileChannel.open(Paths.get("E:/Downloads/LargeFile.rar"), 
                                           StandardOpenOption.READ)))) {
... // Work with input stream
} 
// Same could be done for sockets
  • If don't like to use new NIO API, but sources with blocking IO may be altered, then BlockingIO class provided by Tascalate Concurrent can help:
Promise<?> promise = CompletableTask.supplyAsync(
  BlockingIO.interruptible( // Wrap Supplier into interruptible Supplier
    () -> {
      // Register InputStream after creation as interruption target
      try (InputStream in = BlockingIO.register(new FileInputStream("E:/Downloads/LargeFile.rar"))) {
        ... // Work with input stream
      } catch (IOException ex) {
        throw new RuntimeException(ex);
      } 
    }
  )
);
...
promise.cancel(true);

BlockingIO.interruptible(...) can wrap as interruptible all the functional interfaces used in CompletionStage / Promise API. The parameter to the BlockingIO.register should be any implementation of java.lang.Closeable that will be closed on thread interruption. For InputStream or OutputStream it is just the stream itself. For java.net.HttpUrlConnection you can use

HttpURLConnection conn = (HttpURLConnection)(new URL(url).openConnection());
BlockingIO.register(conn::disconnect);

BlockingIO is a late addition to the Tascalate Concurrent, it's available only since version 0.9.5.

Unfortunately, it's necessary to put this statement straight at the end: if the blocking API is hidden behind third-party library code (like Spring RestTemplate) then you are out of luck - Tascalate Concurrent will not support true interruptions in this scenario.

5. Overriding default asynchronous executor

One of the pitfalls of the CompletableFuture implementation is how it works with default asynchronous executor. Consider the following example:

CompletionStage<String> p1 = CompletableFuture.supplyAsync(this::produceValue, executorInitial);
CompletionStage<String> p2 = p1.thenApplyAsync(this::transformValueA);
CompletionStage<String> p3 = p2.thenApplyAsync(this::transformValueB, executorNext);
CompletionStage<String> p4 = p3.thenApplyAsync(this::transformValueC);

The call to produceValue will be executed on the executorInitial - it is passed explicitly. However, the call to transformValueA will be excuted on... ForkJoinPool.commonPool()! Hmmmm... Probably this makes sense, but how to force using alternative executor by default? No way! Probably this is possible with deeper calls? The answer is "NO" again! The invocation to transformValueB ran on explicitly supplied executorNext. But next call, transformValueC will be executed on... you guess it... ForkJoinPool.commonPool()!

So, once you use CompletableFuture with JEE environment you must pass explicit instance of ManagedExecutorService to each and every method call. Not very convenient! To be fair, with Java 9+ API you can redefine this behavior via sub-classing CompletableFuture and overriding two methods: defaultExecutor and newIncompleteFuture. Plus, you will have to define your own "entry points" instead of the standard CompletableFuture.runAsync and CompletableFuture.supplyAsync.

With CompletableTask the situation is just the opposite. Let us rewrite the example above:

CompletionStage<String> p1 = CompletableTask.supplyAsync(this::produceValue, executorInitial);
CompletionStage<String> p2 = p1.thenApplyAsync(this::transformValueA);
CompletionStage<String> p3 = p2.thenApplyAsync(this::transformValueB, executorNext);
CompletionStage<String> p4 = p3.thenApplyAsync(this::transformValueC);

The call to produceValue will be executed on the executorInitial, obviously. But now, the call to transformValueA will be executed also on executorInitial! What's about deeper calls? The invocation to transformValueB ran on explicitly supplied executorNext. And next call, transformValueC will be executed on... check your intuition... executorNext. The logic behinds this is the following: the latest explicitly specified Executor is what will be used for all nested asynchronous composition methods without an explicit Executor parameter.

Obviously, it's rarely the case when one size fits all. Since release 0.6.6 there are two new options to specify default asynchronous executor:

  1. CompletableTask has an overloaded method:
public static Promise<Void> asyncOn(Executor executor, boolean enforceDefaultAsync)

When enforceDefaultAsync is true then all nested asynchronous composition methods without explicit Executor parameter will use the provided executor, even if previous composition methods use alternative Executor. This is somewhat similar to CompletableFuture but with the ability to explicitly set the default asynchronous executor initially.

  1. Promise interface has the following operation:
Promise<T> defaultAsyncOn(Executor executor)

The returned decorator will use the specified executor for all nested asynchronous composition methods without explicit Executor parameter. So, at any point, you are able to switch to the desired default asynchronous executor and keep using it for all nested composition call.

To summarize, with Tascalate Concurrent you have the following options to control what is the default asynchronous executor:

  1. The latest explicit Executor passed to *Async method is used for derived Promise-s - the default option.
  2. Single default Executor passed to the root CompletableTask.asyncOn(Executor executor, true) call is propagated through the whole chain. This is the only variant supported with CompletableFuture in Java 9+, though, with custom coding.
  3. Redefine Executor with defaultAsyncOn(Executor executor) for all derived Promise-s. Having the best of three(!) worlds, the only responsibility of the library user is to use these options consistently!

The last thing that should be mentioned is a typical task when you would like to start interruptible blocking method after completion of the standard CompletableFuture. The following utility method is defined in the CompletableTask:

public static <T> Promise<T> waitFor(CompletionStage<T> stage, Executor executor)

Roughly, this is a shortcut for the following:

CompletableTask.asyncOn(executor).thenCombine(stage, (u, v) -> v);

A typical usage of this method is:

TaskExecutorService executorService = TaskExecutors.newFixedThreadPool(3);
CompletableFuture<String> replyUrlPromise = sendRequestAsync();
Promise<byte[]> dataPromise = CompletableTask.waitFor(replyUrlPromise, executorService)
    .thenApplyAsync(url -> loadDataInterruptibly(url));

The dataPromise returned may be cancelled later and loadDataInterruptibly will be interrupted if not completed by that time.

6. Timeouts

Any robust application requires certain level of functionality, that handles situations when things go wrong. An ability to cancel a hanged operation existed in the library from the day one, but, obviously, it is not enough. Cancellation per se defines "what" to do in face of the problem, but the responsibility "when" to do was left to an application code. Starting from release 0.5.4 the library fills the gap in this functionality with timeout-related stuff.

An application developer now has the following options to control execution time of the Promise (declared in Promise interface itself):

<T> Promise<T> orTimeout(long timeout, TimeUnit unit[, boolean cancelOnTimeout = true])
<T> Promise<T> orTimeout(Duration duration[, boolean cancelOnTimeout = true])

These methods creates a new Promise that is either settled successfully/exceptionally when original promise is completed within a timeout given; or it is settled exceptionally with a TimeoutException when time expired. In any case, handling code is executed on the default asynchronous Executor of the original Promise.

Executor myExecutor = ...; // Get an executor
Promise<String> callPromise = CompletableTask
    .supplyAsync( () -> someLongRunningIoBoundMehtod(), myExecutor )
    .orTimeout( Duration.ofSeconds(3) );
    
Promise<?> nextPromiseSync = callPromise.whenComplete((v, e) -> processResultSync(v, e));
Promise<?> nextPromiseAsync = callPromise.whenCompleteAsync((v,e) -> processResultAsync(v, e));

In the example above callPromise will be settled within 3 seconds either successfully/exceptionally as a result of the someLongRunningIoBoundMehtod execution, or exceptionally with a TimeoutException.

It's worth to mention, that both processResultSync and processResultAsync will be executed with myExecutor, if timeout is triggered - this rule is true for all timeout-related methods.

The optional cancelOnTimeout parameter defines whether or not to cancel the original Promise when time is expired; it is implicitly true when omitted. So in example above the someLongRunningIoBoundMehtod will be interrupted if it takes more than 3 seconds to complete. Pay attention: any Promise is cancellable on timeout, even wrappers created via Promises.from(stage), but only CompletableTask is interruptible!

Cancelling original promise on timeout is a desired behavior in most cases but not always. In reality, "Warn-first-Cancel-next" scenarios are not rare, where "warn" may be logging, sending notification emails, showing messages to user on UI etc. The library provides an option to set several non-cancelling timeouts like in the example below:

Executor myExecutor = ...; // Get an executor
Promise<String> resultPromise = CompletableTask
    .supplyAsync( () -> someLongRunningIoBoundMehtod(), executor );

// Show UI message to user to let him/her know that everything is under control
Promise<?> t1 = resultPromise
    .orTimeout( Duration.ofSeconds(2), false )
    .exceptionally( e -> {
      if (e instanceof TimeoutException) {
        UI.showMessage("The operation takes longer than expected, please wait...");
      }
      return null;
    }, false); 

// Show UI confirmation to user to let him/her cancel operation explicitly
Promise<?> t2 = resultPromise
    .orTimeout( Duration.ofSeconds(5), false )
    .exceptionally( e -> {
      if (e instanceof TimeoutException) {
        UI.clearMessages();
        UI.showConfirmation("Service does not respond. Do you whant to cancel (Y/N)?");
      }
      return null;
    }, false); 

// Cancel in 10 seconds
resultPromise.orTimeout( Duration.ofSeconds(10), true );

Please note that the timeout is started from the call to the orTimeout method. Hence, if you have a chain of unresolved promises ending with the orTimeout call then the whole chain should be completed within the time given:

Executor myExecutor = ...; // Get an executor
Promise<String> parallelPromise = CompletableTask
    .supplyAsync( () -> someLongRunningDbCall(), executor );
Promise<List<String>> resultPromise = CompletableTask
    .supplyAsync( () -> someLongRunningIoBoundMehtod(), executor )
    .thenApplyAsync( v -> converterMethod(v) )
    .thenCombineAsync(parallelPromise, (u, v) -> Arrays.asList(u, v))
    .orTimeout( Duration.ofSeconds(5) );

In the latest example resultPromise will be resolved successfully if and only if all of someLongRunningIoBoundMehtod, converterMethod and even someLongRunningDbCall are completed within 5 seconds. If it's necessary to restrict execution time of the single step, please use standard CompletionStage.thenCompose method. Say, that in the previous example we have to restrict execution time of the converterMethod only. Then the modified chain will look like:

Promise<List<String>> resultPromise = CompletableTask
    .supplyAsync( () -> someLongRunningIoBoundMehtod(), executor )
    // Restict only execution time of converterMethod
    // -- start of changes
    .thenCompose( v -> 
        CompletableTask.completed(v, executor)
                       .thenApplyAsync(vv -> converterMethod(vv))
                       .orTimeout( Duration.ofSeconds(5) )
    )
    // -- end of changes
    .thenCombineAsync(parallelPromise, (u, v) -> Arrays.asList(u, v))
    ;

Moreover, in the original example only the call to the thenCombineAsync will be cancelled on timeout (the last in the chain), to cancel the whole chain please use the functionality of the DependentPromise interface (will be discussed later):

Executor myExecutor = ...; // Get an executor
Promise<String> parallelPromise = CompletableTask
    .supplyAsync( () -> someLongRunningDbCall(), executor );
Promise<List<String>> resultPromise = CompletableTask
    .supplyAsync( () -> someLongRunningIoBoundMehtod(), executor )
    .dependent()
    // enlist promise of someLongRunningIoBoundMehtod for cancellation
    .thenApplyAsync( v -> converterMethod(), true )  
    // enlist result of thenApplyAsync and parallelPromise for cancellation
    .thenCombineAsync(parallelPromise, (u, v) -> Arrays.asList(u, v), PromiseOrigin.ALL)
    .orTimeout( Duration.ofSeconds(5) ); // now timeout will cancel the whole chain

Another useful timeout-related methods declared in Promise interface are:

<T> Promise<T> onTimeout(T value, long timeout, TimeUnit unit[, boolean cancelOnTimeout = true])
<T> Promise<T> onTimeout(T value, Duration duration[, boolean cancelOnTimeout = true])
<T> Promise<T> onTimeout(Supplier<? extends T>, long timeout, TimeUnit unit[, boolean cancelOnTimeout = true])
<T> Promise<T> onTimeout(Supplier<? extends T>, Duration duration[, boolean cancelOnTimeout = true])

The onTimeout family of methods are similar in all regards to the orTimeout methods with the single obvious difference - instead of completing resulting Promise exceptionally with the TimeoutException when time is expired, they are settled successfully with the alternative value supplied (either directly or via Supplier):

Executor myExecutor = ...; // Get an executor
Promise<String> callPromise = CompletableTask
    .supplyAsync( () -> someLongRunningIoBoundMehtod(), executor )
    .onTimeout( "Timed-out!", Duration.ofSeconds(3) );

The example shows, that callPromise will be settled within 3 seconds either successfully/exceptionally as a result of the someLongRunningIoBoundMehtod execution, or with a default value "Timed-out!" when time exceeded.

It's important to mention the crucial difference between Promise.orTimeot / onTimeout and CompletableFuture.orTimeout / completeOnTimeout in Java 9+. In Tascalate Concurrent both operations return a new Promise, that is may be canceled individually, without cancelling the original Promise. Moreover, the original Promise will not be completed with TimeoutException when time expired but rather with the CancellationException (in the case of orTimeout([duration], true) or orTimeout([duration])). The behavior of CompletableFuture in Java 9+ is radically different: timeout-related operations are just "side-effects", and the returned value is the original CompletableFuture itself. So the call to completableFuture.orTimeout(100, TimeUnit.MILLIS).cancel() will cancel the completableFuture itself, and there is no way to revert the timeout once it's set. Correspondingly, when time expired the original completableFuture will be completed exceptionally with TimeoutException.

Finally, the Promise interface provides an option to insert delays into the call chain:

<T> Promise<T> delay(long timeout, TimeUnit unit[, boolean delayOnError = true])
<T> Promise<T> delay(Duration duration[, boolean delayOnError = true])

The delay is started only after the original Promise is completed either successfully or exceptionally (unlike orTimeout / onTimeout methods where timeout is started immediately). The resulting delay Promise is resolved after the timeout specified with the same result as the original Promise. The latest methods' argument - delayOnError - specifies whether or not we should delay if original Promise is resolved exceptionally, by default this argument is true. If false, then delay Promise is completed immediately after the failed original Promise.

Executor myExecutor = ...; // Get an executor
Promise<String> callPromise1 = CompletableTask
    .supplyAsync( () -> someLongRunningIoBoundMehtod(), executor )
    .delay( Duration.ofSeconds(1) ) // Give a second for CPU to calm down :)
    .thenApply(v -> convertValue(v));
    
Promise<String> callPromise2 = CompletableTask
    .supplyAsync( () -> aletrnativeLongRunningIoBoundMehtod(), executor )
    .delay( Duration.ofSeconds(1), false ) // Give a second for CPU to calm down ONLY on success :)
    .thenApply(v -> convertValue(v));

Like with other timeout-related methods, convertValue is invoked on the default asynchronous Executor of the original Promise.

You may notice, that delay may be introduced only in the middle of the chain, but what to do if you'd like to back-off the whole chain execution? Just start with a resolved promise!

// Option 1
// Interruptible tasks chain on the executor supplied
CompletableTask.asyncOn(executor)
    .delay( Duration.ofSeconds(5) )
    .thenApplyAsync(ignore -> produceValue());

// Option2
// Computational tasks on ForkJoinPool.commonPool()
Promises.from(CompletableFuture.completedFuture(""))
    .delay( Duration.ofSeconds(5) )
    .thenApplyAsync(ignore -> produceValue());

As long as back-off execution is not a very rare case, the library provides the following convenient shortcuts in the CompletableTask class:

static Promise<Duration> delay(long timeout, TimeUnit unit, Executor executor);
static Promise<Duration> delay(Duration duration, Executor executor);

Notice, that in Java 9+ a different approach is chosen to implement delays - there is no corresponding operation defined for the CompletableFuture object and you should use delayed Executor. Please read documentation on the CompletableFuture.delayedExecutor method for details.

7. Combining several CompletionStage-s.

The utility class Promises provides a rich set of methods to combine several CompletionStage-s, that lefts limited functionality of CompletableFuter.allOf / anyOf far behind:

  1. The library works with any CompletionStage implementation without resorting to converting arguments to CompletableFuture first (and CompletionStage.toCompletableFuture is an optional operation, at least it's documented so in Java 8).
  2. It's possible to pass either an array or a List of CompletionStage-s as arguments.
  3. The resulting Promise let access individual results of the settled CompletionStage-s passed.
  4. There is an option to cancel non-settled CompletionStage-s passed once the result of the operation is known.
  5. Optionally you can specify whether or not to tolerate individual failures as long as they don't affect the final result.
  6. General M completed successfully out of N passed promises scenario is possible.

Let us review the relevant methods, from the simplest ones to the most advance.

static <T> Promise<List<T>> all([boolean cancelRemaining=true,] CompletionStage<? extends T>... promises)
static <T> Promise<List<T>> all([boolean cancelRemaining=true,] 
                                List<? extends CompletionStage<? extends T>> promises)

Returns a promise that is completed normally when all CompletionStage-s passed as parameters are completed normally; if any promise completed exceptionally, then resulting promise is completed exceptionally as well.

static <T> Promise<T> any([boolean cancelRemaining=true,] CompletionStage<? extends T>... promises)
static <T> Promise<T> any([boolean cancelRemaining=true,] 
                          List<? extends CompletionStage<? extends T>> promises)

Returns a promise that is completed normally when any CompletionStage passed as parameters is completed normally (race is possible); if all promises completed exceptionally, then resulting promise is completed exceptionally as well.

static <T> Promise<T> anyStrict([boolean cancelRemaining=true,] CompletionStage<? extends T>... promises)
static <T> Promise<T> anyStrict([boolean cancelRemaining=true,] 
                                List<? extends CompletionStage<? extends T>> promises)

Returns a promise that is completed normally when any CompletionStage passed as parameters is completed normally (race is possible); if any promise completed exceptionally before the first result is available, then resulting promise is completed exceptionally as well (unlike non-Strict variant, where exceptions are ignored if any result is available at all).

static <T> Promise<List<T>> atLeast(int minResultsCount, [boolean cancelRemaining=true,] 
                                    CompletionStage<? extends T>... promises)
static <T> Promise<List<T>> atLeast(int minResultsCount, [boolean cancelRemaining=true,] 
                                    List<? extends CompletionStage<? extends T>> promises)

Generalization of the any method. Returns a promise that is completed normally when at least minResultCount of CompletionStage-s passed as parameters are completed normally (race is possible); if less than minResultCount of promises completed normally, then resulting promise is completed exceptionally.

static <T> Promise<List<T>> atLeastStrict(int minResultsCount, [boolean cancelRemaining=true,] 
                                          CompletionStage<? extends T>... promises)
static <T> Promise<List<T>> atLeastStrict(int minResultsCount, [boolean cancelRemaining=true,] 
                                          List<? extends CompletionStage<? extends T>> promises)

Generalization of the anyStrict method. Returns a promise that is completed normally when at least minResultCount of CompletionStage-s passed as parameters are completed normally (race is possible); if any promise completed exceptionally before minResultCount of results are available, then resulting promise is completed exceptionally as well (unlike non-Strict variant, where exceptions are ignored if minResultsCount of successful results are available).

All methods above have an optional parameter cancelRemaining (since library's version 0.5.3). When omitted, it means implicitly cancelRemaining = true. The cancelRemaining parameter defines whether or not to eagerly cancel remaining promises once the result of the operation is known, i.e. enough promises passed are settled successfully or some CompletionStage completed exceptionally in strict version.

Each operation to combine CompletionStage-s has overloaded versions that accept either a List of CompletionStage-s (since version 0.5.4 of the library) or varagr array of CompletionStage-s.

Besides any/anyStrict methods that return single-valued promise, all other combinator methods return a list of values per every successfully completed promise, at the same indexed position. If the promise at the given position was not settled at all, or failed (in non-strict version), then corresponding item in the result list is null. If necessary number or promises was not completed successfully, or any one completed exceptionally in strict version, then resulting Promise is settled with a failure of the type MultitargetException. Application developer may examine MultitargetException.getExceptions() to check what is the exact failure per concrete CompletionStage passed.

The Promise returned has the following characteristics:

  1. Cancelling resulting Promise will cancel all the CompletionStage-s passed as arguments.
  2. Default asynchronous executor of the resulting Promise is undefined, i.e. it could be either ForkJoin.commonPool or whatever Executor is used by any of the CompletionStage passed as argument. To ensure that necessary default Executor is used for subsequent asynchronous operations, please apply defaultAsyncOn(myExecutor) on the result.

8. Polling and asynchronous retry functionality

Once you departure from the pure algebraic calculations to the unreliable terrain of the I/O-related functionality you have to deal with failures. Network outage, insuffcient disk space, overloaded third-party servers, exhausted database connection pools - these and many similar infrastructure failures is what application have to cope with flawlessly. And many of the aforementioned issues are temporal by the nature, so it makes sense to re-try after small delay and keep fingers crossed that this time everything will run smoothly. So this is the primary use-case for the retry functionality, or better yet -- asynchronous retry functionality, while all we want our applications be as scalable as possible.

Another related area is polling functionality - unlike infrastructure failures these are sporadic, polling is built-in in certain asynchronous protocol communications. Say, an application sends an HTTP request to generate a report and waits for the known file on FTP server. There is no "asynchronous reply" expected from the third-party server, and the application has to poll periodically till the file will be available.

Both use-case are fully supported by the Tascalate Concurrent library. The library provides an API that is both unobtrusive and rich for a wide range of tasks. The following retry* methods are available in the Promises class:

Provided by utility class Promises but stands on its own

static Promise<Void> retry(Runnable codeBlock, Executor executor, 
                           RetryPolicy<? super Void> retryPolicy);
static Promise<Void> retry(RetryRunnable codeBlock, Executor executor, 
                           RetryPolicy<? super Void> retryPolicy);

static <T> Promise<T> retry(Callable<T> codeBlock, Executor executor, 
                            RetryPolicy<? super T> retryPolicy);
static <T> Promise<T> retry(RetryCallable<T, T> codeBlock, Executor executor, 
                            RetryPolicy<? super T> retryPolicy);
    
static <T> Promise<T> retryOptional(Callable<Optional<T>> codeBlock, Executor executor, 
                                    RetryPolicy<? super T> retryPolicy);
static <T> Promise<T> retryOptional(RetryCallable<Optional<T>, T> codeBlock, Executor executor, 
                                    RetryPolicy<? super T> retryPolicy);
    
static <T> Promise<T> retryFuture(Callable<? extends CompletionStage<T>> invoker, 
                                  RetryPolicy<? super T> retryPolicy);
static <T> Promise<T> retryFuture(RetryCallable<? extends CompletionStage<T>, T> invoker, 
                                  RetryPolicy<? super T> retryPolicy);

All the methods from retry family share the same pattern. First, there is a block of code that is executed per every attempt. It could be either a full block of the asynchronous code (retry and retryOptional) or a method that returns a CompletionStage from third-party API like Async Http library (retryFuture). Next, if we retry custom code block, then it's necessary to provide an Executor it should be run on. For retryFuture there is no explicit Executor, and it's up to the third-party library to provide scalable and robust Executor as a default asynchronous executor of the returned CompletionStage. Finally, RetryPolicy should be specified that provides a lot of customization options:

  1. How much attempts should be made?
  2. What is a time interval between attempts? Should it be fixed or dynamic?
  3. What is a timeout before a single attempted is considered "hanged"? Should it be dynamic?
  4. What exceptions are re-trieable and what are not?
  5. What result is expected to be valid? Is a null result valid? Is any non-null result valid or some returned object properties should be examined?

All in all, RetryPolicy is provides an API with endless customizations per every imaginable use-case. But before discussing it, it's necessary to explain a difference in each pair of methods. Why there are overloads with Runnable vs RetryRunnable and Callable vs RetryCallable? The reason is the following:

  1. Contextless retriable operations are captured as Runnable or Callable lambdas - they behaves the same for every iteration, and hence do not need a context.
  2. Methods with RetryRunnable and RetryCallable are contextual and may dynamically alter their behavior for the given iteration depending on the context passed. The RetryContext provides provides all necessary iteration-specific information.

9. Partitioned processing of promises

TBD - discuss Promises.partitioned

10. Stackless sequential processing of promises

TBD - discuss Promises.loop(...)

11. Context variables & contextual Promises

WARNING This part of API will be refactored in the upcoming release 0.9.0.

Ah, those dreaded TreadLocal-s we all hate, love to hate, but, neveretheless, use everywhere. It's quite common to store some contextual data (like authenticated user and current locale) inside ThreadLocal variables. Sometimes it's a custom code, sometimes the code from third-party library we can't alter.

Typically, we spawn asynchronous code from some thread with well-known characteristics, like HTTP request thread. Here we can easly access contextual information from thread-local variables. However, using thread-local variables from asynchronous code block is hard while it's impossible to predict what thread from the pool will execute the code. It's necessary to capture the context of the one thread and propagate it to threads executing asynchronous code.

To solve this issue, there Tascalate Concurrent provides ContextVar class (since version 0.8.1), that serves as a proxy over thread-local variable for multi-threaded code. Typical usage scenario is the following:

  1. Define ContextualPromiseFactory holding ContextVar-s from existing thread-local variables.
  2. Capture a context of the thread that spawns asynchronous operations.
  3. Convert a Promise to context-aware promise.
class MyService {
  // Tread-local variables; they are configured by some custom code (irrelevant for Tascalate Concurrent)
  static final ThreadLocal<Principal> CURRENT_PRINCIPAL = new ThreadLocal<Principal>();
  static final ThreadLocal<Locale> CURRENT_LOCALE = new ThreadLocal<Locale>();
}
...
// [1] -- Define `ContextualPromiseFactory`, it's immutable, thread-safe and may be shared in any manner
ContextualPromiseFactory cpf = ContextVar.relay(MyService.CURRENT_PRINCIPAL, MyService.CURRENT_LOCALE);
...
// [2] -- In the invoker thread (like HTTP request thread), 
// where MyService.CURRENT_PRINCIPAL, MyService.CURRENT_LOCALE are available:
Function<Promise<String>, Promise<String>> newContextualPromise = cpf.captureContext();
...
// [3] -- Convert a Promise to context-aware promise with 'newContextualPromise' factory:
Promise<Void> httpRequest = 
CompletableTask.supplyAsync(() -> getDownloadUrlTemplate(), myExecutor)
               .as(newContextualPromise) // <--- HERE the conversion is set 
               .thenApply(url -> 
                   url + 
                   "?user=" + MyService.CURRENT_PRINCIPAL.getName() + 
                   "&locale=" + MyService.CURRENT_LOCALE.toString()   
               )
               .thenAccept(url -> {
                 log.info("Get data for user " + MyService.CURRENT_PRINCIPAL.getName());
                 executeHttpRequest(url);
               });

In the example above, after the call to Promise.as(newContextualPromise) all of the derrived promises (i.e. code blocks in thenApply / thenAccept) may access contextual information captured in the originated thread.

Worth to mention, that copying contextual variables has certain performance penalty. To stop it at the certain level, just use Promise.raw() to undecorate derrived promises (as with any other decorator):

Promise<Void> afterHttpRequest =
httpRequest.raw()
           .thenRun(() -> {
             // MyService.CURRENT_PRINCIPAL, MyService.CURRENT_LOCALE are not available here
           });

12. Asynchronous locks

TBD -- net.tascalate.concurrent.locks

13. Asynchronous try-with-resources

TBD -- Promises.tryApply and Promises.tryCompose

14. Asynchronous I/O extensions

15. Decorators for CompletionStage / CompletableFuture / Promise

16. Extensions to ExecutorService API

It’s not mandatory to use any specific subclasses of Executor with CompletableTask – you may use any implementation. However, someone may find beneficial to have a Promise-aware ExecutorService API. Below is a list of related classes/interfaces:

a. Interface TaskExecutorService - the specialization of ExecutorService that uses Promise as a result of submit(...) methods.

b. Class ThreadPoolTaskExecutor - the subclass of the standard ThreadPoolExecutor that implements TaskExecutorService interface.

c. Class TaskExecutors - a drop-in replacement for Executors utility class that returns various useful implementations of TaskExecutorService instead of the standard ExecutorService.

Acknowledgements

Internal implementation details are greatly inspired by the work done by Lukáš Křečan. The part of the polling / asynchronous retry functionality is adopted from the async-retry library by Tomasz Nurkiewicz

tascalate-concurrent's People

Contributors

ajurcik avatar vsilaev avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

tascalate-concurrent's Issues

A FutureLocal.java would be nice(like ThreadLocal.java but more)

Twitter Futures allow swapping the slf4j MDC to use Local.scala which is a special context using a combination of ThreadLocal and swapping in/out context for thenCompose and thenApply. In this way, when a request comes in, I can MDC.put("userName", userName); and it will be available to all logging across all thenApply and thenCompose methods. Without this, logging in slf4j with MDC generally breaks down. With this fix, one can swap the MDC with a FutureLocal.java usage. Of course, the Futures will have to know about FutureLocal.java to make this happen.

.exceptionally not getting called if a state that is canceled throws a RuntimeException.

In this example:

   State s1 = new State("s1");
   State s2 = new State("s2");

    DependentPromise<String> promise = DependentPromise.from(
            CompletableTask.asyncOn(executorService)
    ).thenApplyAsync(
            (aVoid) -> longTask(...), true
    ).thenApplyAsync(
            (s) -> longTask(...), true
    ).exceptionally(
            (e) -> {
                System.out.println(e.getMessage());
                return null;
            }, true
    );

If I call promise.cancel() and lets say that longTask will throw a RuntimeException when it gets interrupted. Should exceptionally() get called since the task did not complete successfully?

OnTimeout not working when computing intensive task

I have a computing intensive task that should stop when a certain time has expired. I'm using the onTimeout method for this but the task is not interrupted. When i add a thread sleep in the computation task the task is properly interrupted.

I'm using a ThreadPoolExecutor as executer.

Any idea what could be the cause of this?

Include class names in the documentation when highlighting a method

A minor issue but... The docs are quite good but there are several places in the documentation where a method is highlighted. For example:

Second, there is an adapter method from:
static Promise from(CompletionStage stage)

It is not immediately clear in which class this method is located. The reader has to scan the surrounding text and spot the sentence:

When discussing Promise interface, it's mandatory to mention the accompanying class Promises

to realize this method is located in Promises.

An easy way to fix this might be to simply include a class name when highlighting a method in the documentation. The best way to fix this would be for method highlights to actually link to javadoc. This way when the user remembers that a method exists but is unsure which class the method is located in she can quickly search the docs and then click through to the javadoc.

MultitargetException swallows Exceptions

MultitargetException swallows underlying Exceptions making it absolutely impossible to determine what the original, underlying exception was. The MultitargetException constructor takes a list of Exceptions and makes no attempt to initCause() and set any of the exceptions in the list as a cause of the MultitargetException. When a MultitargetException occurs it is impossible to what caused the MultitargetException.

The MultitargetException constructor should change. Specifically:

  1. If the list is an empty an IllegalArgumentException should be thrown.
  2. The constructor should take the first element of the list and invoke initCause() with the first element as a parameter.

This should make it possible to always be able to determine what caused a MultitargetException.

Additionally the MultitargetException should override Throwable.getMessage(). When an exception occurs we really need as much information as possible as to why the exception occurred. Ideally MultitargetException.getMessage() would print out the number of exceptions and then print out detailed information about each exception occurred. Minimally we need to see the getMessage() for each of the Exceptions in the list.

MultitargetException maintains its own list of throwables. Instead MultitargetException should use the standard SuppressedException mechanism. If there is more than one exception in the list than all remaining exceptions (except the first which is passed to initCause()) should be passed to addSupressed.

The current version of MultitargetException which doesn't use initCause and doesn't use addSupressed is a big problem. Exceptions occur in our background threadpools and in the curent design it is impossible to determine what those exceptions were without writing custom code to "unwrap" the exception. This makes the library very difficult to use.

Wait for Interrupted of CompletableTask.cancel

Hi,

is there a way to wait for the CompletableTask (the Promise) to be Interrupted?

For example:

Promise<Void> task = CompletableTask.runAsync(() -> {
	try{
		//...
	} catch (InterruptedException e) {
		Thread.currentThread().interrupt();
		throw new CompletionException(e);
	}
}, EXECUTOR);
task.cancel(true);
//wait for interruption

immediately after task.cancel() (eiter trueor false) task.isDone() will be true and task.join() will throw CancellationException
how can I wait for the Runnable to actually finish or interrupt?
If i call task.cancel(true); I'd like to wait untill the Runnable get interrupted (and throws new CompletionException(e); in the example).
If i call task.cancel(false); I'd like to wait for the runnable to finish if already running (or get CancellationException if not started yet)
It might be also usefull to be able to get the result or the exception after it finish or interrupt.

The ratio behind this is that if i schedule several IO bound tasks (in a FixedThreadPool), at some point i might want to cancel them, but wait for the running ones to finish or gracefully interrupt.

Exceptions in thenCompose() will not propagate

thenCompose() and derivates are broken when it comes to Exception handling.

This code will never terminate:

    @Test
    public void test_exception_handling_with_thenCompose() throws ExecutionException, InterruptedException {
        State s1 = new State();
        Promise<Void> p = CompletableTask.runAsync(() -> longTask(1, s1), executor)
                .thenCompose(it -> {
                    throw new IllegalStateException("oh no!");
                });
        p.get();
    }

This code with the CompleteableFuture does:

    @Test
    public void test_exception_handling_with_thenCompose2() throws ExecutionException, InterruptedException {
        State s1 = new State();
        Future<Void> p = CompletableFuture.runAsync(() -> longTask(1, s1), executor)
                .thenCompose(it -> {
                    throw new IllegalStateException("oh no!");
                });
        p.get();
    }

Normal execution after cancellation due to onSuccess(...) in StageTransition.set(...)

Hi,

First of all, thank you for your code, it helped me really a lot as I needed in my GUI the ability to cancel the running underlying task. Moreover, your code enables me to extend this cancellation ability such that I could cancel the whole tree of underlying tasks which the "root" task depends on. This extension is currently in progress, and when finished I would like to share it with you. Would you be interested?

Anyway, I think I found an issue in the original code today. I wrote a simple test (OnSuccessTest) which runs two async tasks, the first one and then the second one. In the test I cancel the first task but the second one still can be executed which it should not as the first stage should complete exceptionally with CancellationException. For me the issue occurs when the first task is cancelled (from main thread) using AbstractCompletableTask.cancel(true) but the onError(...) in this method is DELAYed (I added some code before). In this case the StageTransition.set(...) method can be executed from FutureTask.run() during the DELAY on a thread pool thread and the first stage is completed successfully despite of its task was cancelled.
My solution is to test the completion status of the task in StageTransition.set(...) and call either onError(...) when the task was cancelled or onSuccess(...) otherwise.

test.zip

orTime and submit combination

When I execute this code:

CompletableTask
  .submit(this::doTask, executor)
  .orTimeout(Duration.ofSeconds(5L))
  .whenComplete((res, err) -> {
    if (res != null)
	System.out.println(res);
    else
        System.out.println(err.getMessage());
});

private String doTask () throws Exception {
  TimeUnit.SECONDS.sleep(3);
  if (true)
    throw new Exception("my error");
  return "executed ok";
}

I never get "my error" exception after 3 secs, only TimeoutException after 5 secs with "Timeout after PT5S" message.
If I remove orTimeout(...) line, I get "my error" exception.
Is there any way to catch both exceptions?

Thanks

Typo in DelayPolicy#withMinDelay

There's a called method name typo in the winMinDelay(mindelay, timeUnit) method of the DelayPolicy interface:

    default DelayPolicy<T> withMinDelay(long minDelay, TimeUnit timeUnit) {
        return withMaxDelay(Timeouts.toDuration(minDelay, timeUnit));
    }

Apparently it should call withMinDelay(duration).

Promise.onTimeout / completeOnTimeout -- should it cancel original Promise on timeout?

I'd like to open a discussing regarding design choice I made for Promise.orTimeout and Promise.completeOnTimeout.

Promise.orTimeout returns a new Promise that is either completed normally when original Promise completes normally within timeout given, or completes exceptionally, when original Promise completes exceptionally within timeout given, or completes exceptionally with TimeoutException if timeout was exceeded.

When the returned Promise is cancelled, timeout is cancelled as well. However, original Promise is not cancelled and its completion code keeps running. Moreover, if timeout happens then original Promise is not cancelled either.

Why? To support different execution paths depending on timeout duration. For example:

Promise<String> serviceCall = someSlowServiceCall();

serviceCall.orTimeout(Duration.of(5, ChronoUnit.SECONDS)).exceptionally(e -> {
  if (e instanceof TimeoutException) {
    UI.showMessage("Sorry, it takes a bit longer than expected. Please wait...");
  }
});

serviceCall.onTimeout(Duration.of(20, ChronoUnit.SECONDS)).exceptionally(e -> {
  if (e instanceof TimeoutException) {
    if ("Y".equals(UI.showConfirmation("Server does not respond, do you want to cancel operation (Y/N)?"))) {
      serviceCall.cancel();
    }
  }
});

Obviously, there is a way to cancel original promise as well with minimal coding:

Promise<String> serviceCall = someSlowServiceCall();
Promise<String> withCancelOnTimeout =
  serviceCall
  .orTimeout(Duration.of(5, ChronoUnit.SECONDS), true)
  .exceptionally(e -> serviceCall.cancel(true));

Promise.completeOnTimeout is almost identical in behavior, the only difference is that in case of timeout the returned promise is completed with alternative value (supplied as argument).

Both orTimeout and completeOnTimeout are executed on the default executor of the original Promise.

What do you think?

Feature enhancement on cancel

I like twitter Futures model where I can cancel the future I receive AND it cancels ALL futures in the chain UNLESS someone broke the chain by nulling out the cancel handler. This is really nice as it cancels the future you are waiting on AND any other futures that may be outstanding so those lambas will not run since I cancelled the entire request. This would reclaim all the processing on the cancel unlike java futures which only cancel the first future.

SO post was closed but is here
https://stackoverflow.com/questions/62106428/is-there-a-better-way-for-cancelling-a-chain-of-futures-in-java?noredirect=1#comment109864875_62106428

closed because the above I guess is an opinion.

Dean

whenCompleteAsync adds the failure exception as a suppressed exception to itself

The problem is in the following code:

// exceptions are handled in regular way
failure -> {
    try {
        action.accept(null, failure);
        return forwardException(failure);
    } catch (Throwable e) {
        // CompletableFuture does not override exception here
        // unlike as in handle[Async](BiFunction)
        // Preserve this behavior, but let us add at least 
        // suppressed exception
        failure.addSuppressed(e);
        return forwardException(failure);
    }
}

The forwardException throws a CompletionException which is immediatelly caught by the catch clause. Then, it is added to the failure as a suppressed exception. However, it can happen that failure == e (is the same object) since failure is retrown when its type already is the CompletionException.

I added a test for this in #8.

What about calling the forwardException after the try-catch block as in the following code? This also make the code a little bit cleaner since there is only one call to forward... now :)

// exceptions are handled in regular way
failure -> {
   try {
       action.accept(null, failure);
   } catch (Throwable e) {
       // CompletableFuture does not override exception here
       // unlike as in handle[Async](BiFunction)
       // Preserve this behavior, but let us add at least 
       // suppressed exception
       failure.addSuppressed(e);
   }
   return forwardException(failure);
}

Mapped promise can't be interrupted

First off, great library. It's very useful for briding the gap between traditional future interrupts and the completionstage api.

However, the following caught us offguard:

  • cancelling a completable task interrupts a thread
  • cancelling a completable task mapped with "whenComplete" or "thenApply" does not interrupt a thread

Is this by design?

Here's a quick program to illustrate the problem:

package test;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;

import net.tascalate.concurrent.CompletableTask;

public class PromiseCancelTest {

	private static final Executor EXECUTOR = Executors.newCachedThreadPool();

	public static void main(String[] args) throws InterruptedException {
		for (var assignWhenComplete : List.of(false, true))
			run(assignWhenComplete);
	}

	private static void run(Boolean assignWhenComplete) throws InterruptedException {
		var latch = new CountDownLatch(1);
		var promise = CompletableTask.submit(() -> {
			try {
				long counter = 0;
				while (!Thread.currentThread().isInterrupted()) {
					Thread.sleep(Duration.ofSeconds(1).toMillis());
					counter++;
					log(assignWhenComplete, counter);
				}
				return counter;
			} finally {
				latch.countDown();
			}
		}, EXECUTOR);
		BiConsumer<? super Object, ? super Throwable> whenCompleteAction = (v, t) -> log(assignWhenComplete,
				"whenComplete action");
		if (assignWhenComplete)
			promise = promise.whenComplete(whenCompleteAction);
		else
			promise.whenComplete(whenCompleteAction);
		Thread.sleep(Duration.ofSeconds(4).toMillis());
		promise.cancel(true);
		log(assignWhenComplete, "cancel requested");
		latch.await();
		log(assignWhenComplete, "done");

	}

	private static void log(Boolean assignWhenComplete, Object value) {
		System.out.println(String.format("assignWhenComplete:%s %s", assignWhenComplete, value));
	}
}

When 'assignWhenComplete' is false, the thread interrupts just fine. When assignWhenComplete is false, the program runs forever:

assignWhenComplete:false 1
assignWhenComplete:false 2
assignWhenComplete:false 3
assignWhenComplete:false whenComplete action
assignWhenComplete:false cancel requested
assignWhenComplete:false done
assignWhenComplete:true 1
assignWhenComplete:true 2
assignWhenComplete:true 3
assignWhenComplete:true cancel requested
assignWhenComplete:true 4
assignWhenComplete:true 5
assignWhenComplete:true 6
assignWhenComplete:true 7
assignWhenComplete:true 8
assignWhenComplete:true 9
assignWhenComplete:true 10
assignWhenComplete:true 11
[...]

Promises.all - List type argument

Hi Valery,

I hope you are doing well.
I just tried to create an all promise from a list of promises. See the follwing code:

List<Promise<T>> promises = new ArrayList<>();
Promise<?> all = Promises.all(promises);

And this does not compile since the generic type of the promises argument is too narrow. Specifically, we would need it to be Promise<T> all(List<? extends CompletionStage<? extends T>> promises). Maybe Promise<T> all(Collection<? extends CompletionStage<? extends T>> promises) could also work.

Best, Adam

Promises#all(CompletionStage<? extends T>...) cancellation does not seem to work correctly

The documentation about Promises#all(CompletionStage<? extends T>...) suggests that canceling a Promise that has been created using Promises#all cancels all Promisses that are wrapped inside. That does not seem to work. Please have a look into
CompletableTaskCancellationTest.java.txt methods testWrappedCancel*. (Executors#getDefaultExecutor() returns
new ThreadPoolTaskExecutor(10, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(30))).

Issues with Callbacks

Hi there,

me again. I have some issues with the callback mechanism used internally in tascalate-concurrent to add the successCallback etc.
The problems are due to the callbacks being added as another task execution via execute() on the Executor used.

I have identified two issues:

  • If I shutdown() the executor, the remaining callback hooks will cause rejections on CallbackRegistry:291 executor.execute() and throw these exceptions all the way up, once a (previously executing) Promise completes.
  • The extra task will cause starvation on SingleThreadExecutors. E.g. if I add several (hundreds) promises to the executor at once, the first promise will not terminate becauses the callback hook is only executed AFTER all other queued promises have been ALSO executed, due to in-order-execution.

Do you have any suggestions on this? Otherwise I probably have to revert to mutex/semaphores outside the executor to guarantee a single-threaded funnel, but that would defeat the purpose of the executor and might unnecessarily queue up threads waiting for the mutex to become available.

Thanks and keep up the good work
Chris

The cancel is not working on the async chain

Im trying to use the library due to the fact that i must cancel stucked threads (due to stuck WMI usually)

im noticing however that despite calling the cancel or specifing short timeout with the parameter set to true, the chain continues

to make it easier to see what im doing wrong ill attach some code and logs

public Promise<ComplianceCandidate> process(ComplianceCandidate candidate, ExecutorService pool) {

        return CompletableTask.supplyAsync(() -> candidate, pool).thenApplyAsync(CheckIp::doWork)
                .thenApplyAsync(CheckType::doWork).thenApplyAsync(CheckExclusion::doWork)
                .thenApplyAsync(AssignEntity::doWork).thenApplyAsync(DecideWmi::doWork)
                .thenApplyAsync(ObtainWmiConnection::doWork).thenApplyAsync(ObtainRegistryWmiConnection::doWork)
                .thenApplyAsync(RunCompliance::doWork).thenApplyAsync(DecideComplianceResult::doWork)
                .thenApplyAsync(CreateAlert::doWork).thenApplyAsync(ApplyFailureManager::doWork)
                .thenApplyAsync(ApplyDisconnectManager::doWork).thenApplyAsync(EnforceCompliance::doWork)
                .exceptionally(ExceptionHandlerService::handle).orTimeout(Duration.ofSeconds(3), true);
    }

the timeout should affect in the task obtainWmiConnection yet im seeing the task downstream being executed (runCompliance)
im also attaching the relevant logs

2021-02-19 09:09:18 [pool-2-thread-6] - [ipAddress=192.168.99.190, macAddress=0x18A905C1AAA9, type=Server] has entered Decide WMI task
2021-02-19 09:09:18 [pool-2-thread-6] - [ipAddress=192.168.99.190, macAddress=0x18A905C1AAA9, type=Server] needs WMI connection due to condition ComplianceCondition [id=6, type=FileWMI, uniqueName=AVDatFile]
2021-02-19 09:09:18 [pool-2-thread-6] - [ipAddress=192.168.99.190, macAddress=0x18A905C1AAA9, type=Server] has passed Decide WMI task
2021-02-19 09:09:18 [pool-2-thread-7] - [ipAddress=192.168.99.190, macAddress=0x18A905C1AAA9, type=Server] has entered Obtain WMI task
2021-02-19 09:09:19 [pool-2-thread-7] - Proceeding to establish wmi connection for device: [ipAddress=192.168.99.190, macAddress=0x18A905C1AAA9, type=Server]  wmiuser: localhost\administrator total users: 1
2021-02-19 09:09:23 [pool-3-thread-1] - Error TimeoutException: Timeout after PT3S on demand execution for device [ipAddress=192.168.99.190, macAddress=0x18A905C1AAA9, type=Server]
2021-02-19 09:09:34 [pool-2-thread-7] - Successfully established WMI for device: [ipAddress=192.168.99.190, macAddress=0x18A905C1AAA9, type=Server] with WMI user: Credential [id=12, wmiDomain=localhost, wmiUser=administrator], total time to obtain connection is 15 seconds
2021-02-19 09:09:34 [pool-2-thread-10] - [ipAddress=192.168.99.190, macAddress=0x18A905C1AAA9, type=Server] has entered run compliance task
2021-02-19 09:09:35 [pool-2-thread-10] - [ipAddress=192.168.99.190, macAddress=0x18A905C1AAA9, type=Server] has has the system name WISE-DEV190 and users WISE-DEV190\Administrator,WISE-DEV190\DWM-2,WISE-DEV190\DWM-1

The pool by the way is fixedThreadPool

onTimeout behaviour confusing after 0.8.4

Hi, I've been using this library for a while and recently dug out some project from over a year ago and upgraded from 0.8.4 to 0.9.6. The behaviour for onTimeout is no longer doing what it used to and I'm confused if I'm doing it wrong. The simplest test case I can see is this:

    @Test
    public void test()
    throws ExecutionException, InterruptedException
    {
        String message = "test";
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(500);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return message;
        });

        Promise<String> promise1 = Promises.from(future).orTimeout(Duration.ofSeconds(1));

        assertEquals(message, promise1.get());
    }

The promise1.get() reports a failure due to a CancellationException immediately. The code works and succeeds in 0.8.4. Any suggestions?

orTimeout() breaks cancellation abilities

Hi Valery,

this simple test case breaks here with the latest release:


    @Test
    public void test_orTimeout_does_not_break_cancellation_ability() {
        State s1 = new State();
        Promise<Void> p = CompletableTask.runAsync(() -> longTask(5, s1), executor)
                .orTimeout(2, TimeUnit.SECONDS);
        trySleep(2);
        p.cancel(true);
        trySleep(1);
        assertCancelled("s1", s1);
    }

Took me a couple of hours to figure out that orTimeout() causes it. Other than that, I think that your library is great to overcome the cumbersomeness of the original implementation. Thanks!

PS: I guess increasing the overall unit test coverage would also help your development as the stuff is pretty complex and multithreading done right is very hard.

Promise.onTimeout does not cancel the async call in original CompletableTask.supplyAsync?

I am using Java 8 and this library looks perfect for my case; it offers timeout feature and cancel the long running thread without upgrading to Java 9 or 11.

I am trying the onTimeout method but it seems not working as the README stated. The original Promise is not cancelled and the logic continues to run and return even after the timeout. My code is as follows.

List<CompletableFuture<ResponseEntity<String>>> futures = getFutures();
System.out.println("futures size:" + futures.size());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();
System.out.println("all futures completed");

private getFutures() {
    List<CompletableFuture<ResponseEntity<String>>> futures = new ArrayList();
    futures.add(get(proxy, "https://normal1.com"));        // returns within 5s
    futures.add(get(proxy, "https://normal2.com"));        // returns within 5s
    futures.add(get(proxy, "https://longrunning.com"));    // returns after a min
    return futures;
}

private CompletableFuture<ResponseEntity<String>> get(ProxyExchange<String> proxy, String url) throws InterruptedException {
    return CompletableTask
            .supplyAsync(() -> {
                System.out.println("inside supplyAsync(), calling foo() with url " + url);
                return foo(proxy, url);
            }, Executors.newFixedThreadPool(10))
            .onTimeout(() -> {
                System.out.println("timeout with url " + url);
                return defaultResponse();
            }, Duration.ofSeconds(30), true)
            .toCompletableFuture();
}

private ResponseEntity<String> foo(ProxyExchange<String> proxy, String url) {
    System.out.println("inside foo");
    System.out.println("calling " + url);
    ResponseEntity<String> result = proxy.uri(url).get();
    System.out.println("called " + url);
    if (CommonUtils.isNotEmpty(result.getBody())) {
        System.out.println("result not empty, with length " + result.getBody().length() );
    } else {
        System.out.println("result is empty");
    }
    return result;
}

private ResponseEntity<String> defaultResponse() {
    return ResponseEntity.ok("Timeout!!!");
}

I convert Promise into CompletableFuture after onTimeout to minimize code change as I used CompletableFuture before. It can be changed this caused the issue.

However when I check the logs,

inside supplyAsync(), calling foo() with url https://normal1.com
inside foo
calling https://normal1.com
inside supplyAsync(), calling foo() with url https://normal2.com
inside foo
calling https://normal2.com
inside supplyAsync(), calling foo() with url https://longrunning.com
inside foo
calling https://longrunning.com
futures size:3
called https://normal1.com
result not empty, with length 4301
called https://normal2.com
result not empty, with length 43695
timeout with url https://longrunning.com
all futures completed

Looking good. The page loads after 30s and it timeouts longrunning.com which I can get the defaultResponse() back. But after another 30s, following logs got printed.

called https://longrunning.com
result not empty, with length 188

I thought the onTimeout should interrupt and cancel the timeout thread. Can you point me out if I did anything wrong?

Optimized Same Executor Runs

Is there a way to make it so that if you have a Promise, you have a method like

Promise.thenRunSwitch(Runnable run, Executor toDo)

where the following holds true:

  1. If parent promise executed on executor toDo, delegate to method thenRun
  2. If parent promise executed on another executor, delegate to method thenRunAsync with executor toDo

That way, there is minimized context switching and if you are already running under a thread you want to be in, you can stay on that thread.

Unexpected cancellation behavior

Hi!

I stumbled upon your library and it looks very promising! Thanks for the hard work you put into it.
While testing timeouts and cancellations, the behavior I am seeing is not what I would expect though. Here's an example:

public void test() {
     return CompletableTask
        .supplyAsync(() -> {
            try {
                Thread.sleep(10_000); // sleep 10s
            } catch (InterruptedException e) {
                System.out.println("Interrupted");
                throw new CompletionException(e);
            }
            System.out.println("Interrupted? " + (Thread.currentThread().isInterrupted() ? "yes" : "no"));
            return "foo";
        }, Executors.newCachedThreadPool())
        .orTimeout(Duration.ofMillis(200), true)
        .join();
}

With such a code, I would expect the Thread.sleep() to be interrupted after 200ms but it does not seem to be the case ("Interrupted: no" is printed after 10s). Is that the expected behavior? Am I doing something wrong?

Also, I noticed that different executors will provide different results. Using newCachedThreadPool() will return a result after 200ms while using newSingleThreadExecutor() or Spring's ThreadPoolTaskExecutor will return but only after 10s. Again, is that expected?

Thanks!

How to wrap a Promise into Promise.retry

I am using the jersey rx() extension that wraps the requests in CompletionStages. I would like to be able to take this CompletionStage, convert it to a Promise, and use it within a Promises.retry without having to call get().

This means that Promises.retry would accept a Promise, rather than a Callable or Runnable, and then be able to resubmit the original code with binding context for re-execution.

Unconveniences of Promises.failure and Promises.all

Hi Valery,

I would like to have the method Promises.failure typed using a type parameter. This would make returning a failed promise easier in cases when the async task was not even started.

The method would look like:

public static <T> Promise<T> failure(Throwable t) {
    @SuppressWarnings("unchecked")
    CompletablePromise<T> result = (CompletablePromise<T>) new CompletablePromise<Object>();
    result.onFailure(exception);
    return result;
}

This is inspired by the implementation of Collections.emptyList().

Currently, I am using a workaround. I create a failure and I link another stage after it using thenApply which allows me to return the desired type.

Regarding Promises.all there is a problem with its parameter type. If you would like to specify the aggregated Promises differently then by listing them, you need to supply an array of a generic type (a Promise). In Java you cannot create such arrays directly. First you need to create a collection and then convert it to the array while you need to do an unchecked cast. It would be more convenient to provide also a method which works with collections.

Anyway, thank you for your great library.
Best, Adam

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.