Giter Site home page Giter Site logo

reactor / reactor-core Goto Github PK

View Code? Open in Web Editor NEW
4.8K 4.8K 1.2K 63.01 MB

Non-Blocking Reactive Foundation for the JVM

Home Page: http://projectreactor.io

License: Apache License 2.0

Java 100.00%
asynchronous flow flux jvm mono reactive reactive-extensions reactive-streams

reactor-core's Introduction

Reactor Project

Join the chat at https://gitter.im/reactor/reactor

Download

Starting from 3.0, Reactor is now organized into multiple projects:

A set of compatible versions for all these projects is curated under a BOM ("Bill of Materials") hosted under this very repository.

Using the BOM with Maven

In Maven, you need to import the bom first:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>2024.0.0-M1</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

Notice we use the <dependencyManagement> section and the import scope.

Next, add your dependencies to the relevant reactor projects as usual, except without a <version>:

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

Using the BOM with Gradle

Gradle 5.0+

Use the platform keyword to import the Maven BOM within the dependencies block, then add dependencies to your project without a version number.

dependencies {
     // import BOM
     implementation platform('io.projectreactor:reactor-bom:2024.0.0-M1')

     // add dependencies without a version number
     implementation 'io.projectreactor:reactor-core'
}

Gradle 4.x and earlier

Gradle versions prior to 5.0 have no core support for Maven BOMs, but you can use Spring's gradle-dependency-management plugin.

First, apply the plugin from Gradle Plugin Portal (check and change the version if a new one has been released):

plugins {
    id "io.spring.dependency-management" version "1.0.11.RELEASE"
}

Then use it to import the BOM:

dependencyManagement {
     imports {
          mavenBom "io.projectreactor:reactor-bom:2024.0.0-M1"
     }
}

Then add a dependency to your project without a version number:

dependencies {
     compile 'io.projectreactor:reactor-core'
}

BOM Versioning Scheme

The BOM can be imported in Maven, which will provide a set of default artifact versions to use whenever the corresponding dependency is added to a pom without an explicitly provided version.

As the different artifacts versions are not necessarily aligned, the BOM represents a release train with an heterogeneous range of versions that are curated to work together. The artifact version follows the YYYY.MINOR.MICRO-QUALIFIER scheme since Europium, where:

  • YYYY is the year of the first GA release in a given release cycle (like 3.4.0 for 3.4.x)
  • .MINOR is a 0-based number incrementing with each new release cycle ** in the case of the BOM it allows discerning between release cycles in case two get first released the same year
  • .PATCH is a 0-based number incrementing with each service release
  • -QUALIFIER is a textual qualifier, which is omitted in the case of GA releases (see below)

On top of the artifact version, each release train has an associated codename, a chemical name from the Periodic Table of Elements in growing alphabetical order, for reference in discussions.

So far, the release trains code names are:

  • Aluminium for the 3.0.x generation of Reactor-Core (💡)
  • Bismuth for the 3.1.x generation (💡)
  • Californium for the 3.2.x generation (💡)
  • Dysprosium for the 3.3.x generation (💡)
  • Europium (2020.0) for the 3.4.x generation (💡)

NOTE: Up until Dysprosium, the BOM was versioned using a release train scheme with a codename followed by a qualifier, and the qualifiers were slightly different. For example: Aluminium-RELEASE (first GA release, would now be something like YYYY.0.0), Bismuth-M1, Californium-SR1 (service release would now be something like YYYY.0.1), Dysprosium-RC1, Dysprosium-BUILD-SNAPSHOT (after each patch, we'd go back to the same snapshot version. would now be something like YYYY.0.X-SNAPSHOT so we get 1 snapshot per PATCH).

Contributing, Community / Support

license

As hinted above, this repository is for hosting the BOM and for transverse issues only. Most of the time, if you're looking to open an issue or a PR, it should be done in a more specific repository corresponding to one of the actual artifacts.

All projects follow the same detailed contributing guidelines which you can find here.

This document also give some ways you can get answers to your questions.

Documentation

Detail of Projects

Reactor Core

Reactor Core

Reactive foundations for apps and frameworks and reactive extensions inspired API with Mono (1 element) and Flux (n elements) types

Reactor Netty

Reactor Netty

TCP and HTTP client and server.

Reactor Addons

Reactor Addons

Extra projects adding features to reactor:

Snapshot Artifacts

While Stable Releases are synchronized with Maven Central, fresh snapshot and milestone artifacts are provided in the repo.spring.io repositories.

To add this repo to your Maven build, add it to the <repositories> section like the following:

<repositories>
	<repository>
	    <id>spring-snapshot</id>
	    <name>Spring Snapshot Repository</name>
	    <url>https://repo.spring.io/snapshot</url>
	    <snapshots>
	        <enabled>true</enabled>
	    </snapshots>
	</repository>
</repositories>

To add it to your Gradle build, use the repositories configuration like this:

repositories {
	maven { url 'https://repo.spring.io/libs-snapshot' }
	mavenCentral()
}

You should then be able to import a -SNAPSHOT version of the BOM, like 2020.0.{NUMBER}-SNAPSHOT for the snapshot of the {NUMBER}th service release of 2020.0 (Europium).

Sponsored by VMware

reactor-core's People

Contributors

akarnokd avatar aneveu avatar berry120 avatar bsideup avatar buzzardo avatar chemicl avatar dependabot[bot] avatar dfeist avatar ericbottard avatar gindex avatar izeye avatar jnizet avatar kadyana avatar madgnome avatar nurkiewicz avatar olegdokuka avatar osi avatar pderop avatar rajinisivaram avatar renovate-bot avatar renovate[bot] avatar sdeleuze avatar seants avatar simonbasle avatar smaldini avatar spring-builds avatar tejavenkatlanka avatar theevangelista avatar ugir avatar violetagg avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

reactor-core's Issues

Consider introducing utility method for checked functions

When adopting reactor, but needing to wrap and use existing code you find yourself using try/catch blocks within map and other operators repeatedly which severely impacts the readability of code. The introduction of a CheckedFunction could help with this significantly.

Exceptions.checked(CheckedFunction) : Function

Support creating `Mono` from `java.util.Optional` and "nullable" values

Reactor Core is not Java 8+ yet, however it should be able to detect and do the right thing when Optional is passed into Mono.just(T) much like Spring Framework is Java 6+ but supports Optional in a number of places.

Related to that is the question how to deal with creating a Mono from a potentially "nullable" value which may be the case when the value is obtained from a source that could return null (e.g. resolving a query string parameter for an HTTP request from a Map). One then has to do something like return (value != null ? Mono.just(value) : Mono.empty()).

If Mono.just(T) supports Optional then one could do return Mono.just(Optional.ofNullable(value)) and that might just be good enough vs an explicit additional operator such as justOrEmpty(T). In any case it makes sense to consider any solution in the context of both Optional and "nullable" values.

Missing API calls in documentation

From the docs:

If you passed the integers 1, 2 and 3 into the Processor, you would see output in the console similar to this

But it doesn't show how to "pass" integers (or anything) into a processor. Only the output.

Interrupt issue with PublishOn

Original from reactive-streams/reactive-streams-jvm#310 by @sskrla

When using .dispatchOn Flux calls .interrupt() on the current thread (via scheduler.accept(null)) before it calls the onError or onComplete.

Specifically, this caused issues using the ElasticSearch client as it throws an exception if the current thread is an interrupted state when you make a blocking API call.

For example:

Flux.from(documents)
    .publishOn(processorsExecutorService)
    .buffer(batchSize)
    .onNext( requests -> {
        BulkRequestBuilder builder = client.prepareBulk();
        requests.forEach(builder::add);
        builder.get() // Throws interrupted exception on last batch
    })
    .subscribe()

Flux & Mono Repeat(Predicate)

Flux#repeat(Predicate) and Mono#repeat(Predicate) are not operating as expected. Please refer to #repeatWhen operator in the meantime.

zipWith requests data too eagerly

Currently when using zipWith (and perhaps others) data is always requested at PlatformDependent.XS_BUFFER_SIZE. In some cases, this is too much data. For example, if implementing an exponential back off, You might want to have a Stream.range() concatenated with a Mono.error in order to flag when too many retries have happened. If the number of elements in the range is smaller than XS_BUFFER_SIZE, the error is immediately generated, not when the end of the range is reached. An example of this behavior would look like:

import reactor.core.publisher.Mono;
import reactor.rx.Stream;

import java.util.concurrent.CountDownLatch;

import static java.util.concurrent.TimeUnit.SECONDS;

public final class Test {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Stream
            .interval(0, 1, SECONDS)
            .zipWith(Stream
                .range(0, 5)
                .concatWith(Mono.error(new IllegalStateException("A request too far"))))
            .consume(
                System.out::println,

                (throwable) -> {
                    throwable.printStackTrace();
                    latch.countDown();
                },

                latch::countDown);

        latch.await(); // Should print 0,0, 1,1, 2,2, and 3,3 before printing an IllegalStateException
    }

}

There should be away to ensure that a subscriber can request as much data as they want, but can only request in batches of a certain size allowing producers to specify that size.

Timeout on .get(Duration) gives obscure exception

Currently we see something like: reactor.core.util.Exceptions$CancelException: The subscriber has denied dispatching, which is obscure. A TimeoutException would at least indicate what is happening.

Flux.map() and null-handling when wrapping existing blocking API's

The only way to do this currently is via the use of flatMap and Mono.justOrEmpty() which isn't as efficient and causes more allocations/stack.

One option is to introduce something in Flux e.g. Flux.mapOrEmpty() that behaves the same as map but drops message and does request(1) when null is returned, but I understand this clutters the primary API where nulls simply are not allowed. So maybe an alternative is to implement something outside of Flux in something like LegacyUtils that can then somehow be pulled into and used in a Flux stream just in the cases where you need to deal with legacy API's that throw exceptions/return null.

This is somewhat related to: #80

Flux.buffer() pushing to downstream when there are no more pending elements?

The use case is to implement adaptive batching: drain what's available on RingBuffer up to some maximum and push to downstream (DB in our case). That way batching automatically adapts to load, having minimum transaction lengths under normal load and fully utilize batching for better throughput under peak loads. This seems to be quite a common pattern. Is there a ready solution or could it be added to Reactor?

From implementation perspective, it seems quite straightforward, replacing the following line in FluxBuffer.onNext():

if (b.size() == size)

with something like

if (b.size() == size || (source instanceof Backpressurable && ((Backpressurable)source).getPending() - 1 /to account for the current element being processed/ <= 0))

Please correct me if I'm wrong.

One other thing is that maximum buffer size - ideally there should be a variant with IntSupplier so that the max number of elements drained to the buffer could be changed dynamically. I'm thinking of a strategy measuring transaction length and adjusting max batch size in order to fit into a goal (similar to G1 goals on GC pauses).

Add code style configuration

A code style configuration file would be nice to have, so contributors don't mess up the already formatted code.

An eclipse format file could be enough since IntelliJ has a converter from that file.

Flux.after(Supplier<? extends Provider<T>>) throws CancelException

The follow fails with a CancelException:

class AfterTest extends Specification {
    def "after executes correctly"() {
        when: 
        String result = Flux.fromIterable([1])
            .doOnNext {  }
            .after {
                Mono.just "success"
            }
            .last()
            .get()

        then:
        result == "success"
    }
}

However the following works as expected:

class AfterTest extends Specification {
    def "after executes correctly"() {
        when: 
        String result = Flux.fromIterable([1])
            .doOnNext {  }
            .after()
            .after {
                Mono.just "success"
            }
            .get()

        then:
        result == "success"
    }
}

Exception thrown from Stream.yield causes DuplicateOnSubscribeException

Currently, allowing an exception to escape from Stream.yield (i.e. not catching Throwable) causes a DuplicateOnSubscribeException to be thrown, masking the original exception. The solution to this is to properly implement Stream.yield so that it doesn't allow exceptions to escape, but Reactor should behave slightly better in the face of a poor implementation.

cloud-foundry-3 cloudfoundry-client.request     POST   /v2/apps
cloud-foundry-3 stream.upload                    onSubscribe(reactor.core.publisher.MonoNext$NextSubscriber@1fd2007b)
cloud-foundry-3 stream.upload                    request(unbounded)
cloud-foundry-2 reactor.core.publisher.SchedulerGroup Unrouted exception
reactor.core.util.Exceptions$DuplicateOnSubscribeException: Spec. Rule 2.12 - Subscriber.onSubscribe MUST NOT be called more than once (based on object equality)
    at reactor.core.util.Exceptions.duplicateOnSubscribeException(Exceptions.java:133) ~[reactor-core-2.5.0.BUILD-20160209.140333-334.jar:na]
    at reactor.core.util.BackpressureUtils.reportSubscriptionSet(BackpressureUtils.java:440) ~[reactor-core-2.5.0.BUILD-20160209.140333-334.jar:na]
    at reactor.core.util.DeferredSubscription.set(DeferredSubscription.java:62) ~[reactor-core-2.5.0.BUILD-20160209.140333-334.jar:na]
    at reactor.core.publisher.FluxPublishOn$PublishOnPipeline.onSubscribe(FluxPublishOn.java:131) ~[reactor-core-2.5.0.BUILD-20160209.140333-334.jar:na]
    at reactor.core.util.EmptySubscription.error(EmptySubscription.java:34) ~[reactor-core-2.5.0.BUILD-20160209.140333-334.jar:na]
    at reactor.core.publisher.FluxYieldingEmitter.subscribe(FluxYieldingEmitter.java:51) ~[reactor-core-2.5.0.BUILD-20160209.140333-334.jar:na]
    at reactor.rx.StreamSource.subscribe(StreamSource.java:69) ~[reactor-stream-2.5.0.BUILD-20160209.172643-328.jar:na]
    at reactor.core.util.ScalarSubscription.trySubscribeScalarMap(ScalarSubscription.java:93) ~[reactor-core-2.5.0.BUILD-20160209.140333-334.jar:na]
    at reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:93) ~[reactor-core-2.5.0.BUILD-20160209.140333-334.jar:na]
    at reactor.rx.StreamSource.subscribe(StreamSource.java:69) ~[reactor-stream-2.5.0.BUILD-20160209.172643-328.jar:na]
    at reactor.core.publisher.FluxPublishOn$SourceSubscribeTask.run(FluxPublishOn.java:318) ~[reactor-core-2.5.0.BUILD-20160209.140333-334.jar:na]
    at reactor.core.publisher.SchedulerGroup$TaskSubscriber.onNext(SchedulerGroup.java:1072) [reactor-core-2.5.0.BUILD-20160209.140333-334.jar:na]
    at reactor.core.publisher.SchedulerGroup$TaskSubscriber.onNext(SchedulerGroup.java:1054) [reactor-core-2.5.0.BUILD-20160209.140333-334.jar:na]
    at reactor.core.publisher.WorkQueueProcessor$QueueSubscriberLoop.run(WorkQueueProcessor.java:849) [reactor-core-2.5.0.BUILD-20160209.140333-334.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_73]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_73]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73]

dispatchOn raises exception when prefetch=1

Problem not occurring in RxJava (see also https://gist.github.com/evacchi/ae362310dec8aacfdef4)

package rxplayground;

import reactor.core.publisher.Flux;
import reactor.core.publisher.SchedulerGroup;

import java.util.Iterator;
import java.util.function.Function;

public class ReactorPublish {
    static class Iter implements Iterator<Long> {
        long count = 0;
        @Override public boolean hasNext() { return true; }
        @Override public Long next() { return count++; }
    };
    final Flux<Long> doubleFlux = Flux.fromIterable(Iter::new).as(Shared());

    void stream() {

        doubleFlux.as(DropOld()).map(this::fastComputation).consume(i -> System.out.println("1) " + i));
        doubleFlux.as(DropOld()).map(this::slowComputation).consume(i -> System.err.println("2) " + i));
    }

    public static <T> Function<Flux<T>, Flux<T>> DropOld() {
        return (flux) -> flux.onBackpressureLatest().dispatchOn(SchedulerGroup.single(), 1);
    }

    public static <T> Function<Flux<T>, Flux<T>> Shared() {
        return (flux) -> flux.publishOn(SchedulerGroup.single()).publish().refCount();
    }


    <T> T fastComputation(T in) {
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return in;
    }

    <T> T slowComputation(T in) {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return in;
    }

    public static void main(String[] args) throws InterruptedException {
        new ReactorPublish().stream();
        Thread.currentThread().join();
    }
}
2016-03-16 12:01:30 ERROR r.c.p.SchedulerGroup:477 - Unrouted exception
reactor.core.util.Exceptions$UpstreamException: java.lang.IllegalStateException: Queue is full?!
    at reactor.core.util.Exceptions.failUpstream(Exceptions.java:106)
    at reactor.core.util.Exceptions.onErrorDropped(Exceptions.java:153)
    at reactor.core.subscriber.ConsumerSubscriber.doError(ConsumerSubscriber.java:141)
    at reactor.core.subscriber.ConsumerSubscriber.onError(ConsumerSubscriber.java:128)
    at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:137)
    at reactor.core.publisher.FluxDispatchOn$DispatchOnSubscriber.checkTerminated(FluxDispatchOn.java:493)
    at reactor.core.publisher.FluxDispatchOn$DispatchOnSubscriber.runAsync(FluxDispatchOn.java:417)
    at reactor.core.publisher.FluxDispatchOn$DispatchOnSubscriber.run(FluxDispatchOn.java:476)
    at reactor.core.publisher.SchedulerGroup$TaskSubscriber.onNext(SchedulerGroup.java:1081)
    at reactor.core.publisher.SchedulerGroup$TaskSubscriber.onNext(SchedulerGroup.java:1063)
    at reactor.core.publisher.TopicProcessor$TopicSubscriberLoop.run(TopicProcessor.java:877)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Queue is full?!
    at reactor.core.publisher.FluxDispatchOn$DispatchOnSubscriber.onNext(FluxDispatchOn.java:259)
    at reactor.core.publisher.FluxLatest$LatestSubscriber.drain(FluxLatest.java:177)
    at reactor.core.publisher.FluxLatest$LatestSubscriber.request(FluxLatest.java:97)
    at reactor.core.publisher.FluxDispatchOn$DispatchOnSubscriber.runAsync(FluxDispatchOn.java:432)
    ... 7 more

Need an error handler method on Flux when processing a stream of elements

HI, perhaps this is already available in the Flux API but I couldn´t find and I think that could be interesting. I´d like to have a way to handle discrete errors while processing a stream of elements.

For instance, see this code:

Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers)
// Get each set of quotes in a separate thread
.flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s)))
// Convert each list of raw quotes string in a new Flux<String>
.flatMap(list -> Flux.fromIterable(list))
// Convert the string to POJOs
.flatMap(x -> {
        try {
            return Flux.just(converter.convertHistoricalCSVToStockQuotation(x));    
        }
        catch (IllegalArgumentException ex){
            System.out.println("Error decoding stock quotation: " + x);
            return Flux.empty();
        }
});

I need to write this bulky try {} catch{} block to handle single errors and continue processing the rest of the flux. The methods I have seen so far allow me to return a specific value or to restart processing the flux from the beginning.

See this Stackoverflow question:
http://stackoverflow.com/questions/36237230/how-to-handle-error-while-executing-flux-map

As "akarnokd" comments, errors are terminal signals but perhaps this new method would help people developing with Reactor.

Thanks!
Victor

Rename TestSubscriber#awaitAndAssertValues(With)

After more thoughts and feedback after my Hands-On, I think we should rename TestSubscriber#awaitAndAssertValues to something like TestSubscriber#awaitAndAssertNextValues or TestSubscriber#awaitAndAssertNext because current name is misleading. Same thing for TestSubscriber#awaitAndAssertValuesWith.

Flux & Mono Retry(Predicate)

Flux#retry(Predicate) and Mono#retry(Predicate) are not operating as expected. Please refer to #retryWhen operator in the meantime.

Consider addition of Flux.onErrorDrop() operator

Sometimes when an error occurs you simply want to drop the message being processed, rather than terminate or fallback to a fixed value and continue. Especially if you are using another mechanism to nack the message. See [https://github.com//issues/73]

Doing this via simple out-of the box operators is not possible today from what i can see, yet if an onErrorDrop() operator existed this would be trivial.

In the following example the Flux.error() would be terminal if it wasn't for the 'onErrorDrop', with 'onErrorDrop' a single failing item in sequence will not terminate stream.

Flux.range(0,10).map(i -> {
    try {
         return process(i);
    } catch (Throwable ex) {
        return Flux.error(ex);
    }
}).onErrorDrop().consume(System.out::println);

throw Exceptions.fail(e) instead of Exceptions.fail(e)

Exceptions.fail(e)available in Reactor 2.5 is not very easy to use because the compiler doesn't know it throw an exception.

For example, this code does not compile because compiler says there is a missing return statement:

Flux<User> capitalizeFlux(Flux<User> flux) {
        return flux.map(user -> {
            try {
                capitalizeUserWithUncheckedException(user);
            }
            catch(Exception e) {
                Exceptions.fail(e);
            }
        });
    }

It would be better to make Exceptions.fail(e) returning the wrapped exception instead of throwing it in order to allow writing:

Flux<User> capitalizeFlux(Flux<User> flux) {
        return flux.map(user -> {
            try {
                capitalizeUserWithUncheckedException(user);
            }
            catch(Exception e) {
                throw Exceptions.fail(e);
            }
        });
    }

Round of Operator promotion from Stream to Flux/Mono

Initial feedbacks and new capabilities in Stream are encouraging some selected promotions/additions from Stream API to Mono/Flux.
Likely candidates for vote :

  • Mono#justOrEmpty
  • Mono#concatWith
  • Mono#timeout
  • Flux#timeout
  • Mono#cache
  • Flux#cache
  • Flux#single
  • Flux#take
  • Flux#filter
  • Mono#repeat
  • Mono#repeatWhen
  • Flux#repeat
  • Mono#afterComplete(Supplier)

Provide a Method for Thread Affinity Within a Subscription

Occasionally there will be work that is aggressively single threaded and portions of the consumer chain will need to happen on the same thread. Take for example:

Flux.from(request)
    .dispatchOn(sqliteExecutor)
    .map(sqliteWork)
    .dispatchOn(serviceSubmissionExecutor)
    .batch(500)
    .flatMap(submitToService)
    .dispatchOn(sqliteExecutor)
    .doOnNext(recordSubmissions)
    .doOnTerminate(closeDatabase)
    .subscribe()

It would be nice to have a SchedulerGroup or some other standard option that assured that work would be done on the same thread when ever a dispatchOn was encountered for a given subscription.

isTerminated and isStarted always return false

Hi all,

calling isTerminated() or isStarted() on a Mono always return false as shown in this test:

    @Test
    public void testIsTerminated() throws InterruptedException {
        Mono<String> result = Mono.just("helloWorld");

        result.consume(
                text -> System.out.println("next"),
                e -> System.out.println("error"),
                () -> System.out.println("complete"));

        assertEquals("helloWorld", result.get());

        Thread.sleep(100);

        assertTrue(result.isTerminated()); // <- This assertion fails
        assertTrue(result.isStarted());   // <- This assertion also fails
    }

"next" and "complete" are printed in the standard output but the assertions fail.

doOnError/onErrorReturn Improvements

  1. doOnError(Class<E> exceptionType,final Consumer<E> onError) is missing from Mono
  2. Consider if it makes sense to add doOnError(Predicate<Throwable> predicate, final Consumer<E> onError). This is very useful if you need to need to handle exceptions that don't match a exceptionType or if some logic involved in conditional error handling.
  3. Other error handling methods could also benefit from allowing a predicate also. e.g. onErrorReturn to allow handling of certain exceptions but not others.
  4. The only way to do the equivalent of multiple catch's with different types is to use mapError() and if/else statements. Consider if there is an easier way to do this.
  5. Seems there is no way to do onErrorReturn() with access to the exception which requires the user of ononErrorResumeWith which adds complexity/stack and I assume does a flatMap behind the scenes.
  6. Add onErrorResumeWith(Class exceptionType..)/onErrorReturn(Class exceptionType.) for handling only specific error types.

Can a parent interface with generic type that extends Publisher be used to ensure consistency of basic set of operators on Flux and Mono and avoid inconsistencies or missing operators?

Flux#combineLatest with 2 sources is using withLatestFrom

As the obvious title suggests it needs to be replaced before the next milestone/candidate.
Workaround in Reactor Core M2 involves using the combineLatest generic signature (Iterable or Varargs) that force use of the good CombineLatest operator class. E.g.

Flux.combineLatest(Tuple::fn2, source1, source2); //good, note that Tuple::fn2 will create Tuple2<Object, Object>, an explicit lambda casting away the Object[] values might be necessary, which is what we do usually in the T2-T6 versions.

Flux.combineLatest(source1, source2, (a, b) -> Tuple.of(a,b)); //not good for now

/cc @evacchi

Ability to signal error upstream (nack) without cancelling/completing

Often when processing streams of messages from an external source there is a need to ack/nack the message. When using a blocking API execution stack return implies you can ack and an exception being thrown (depending on type potentially) typically implies you need to nack.

When using using a reactive API it is much harder to determine when to ack/nack and I don't see any way in which the framework (reactor) helps with this, neither how I'd go about doing it myself really.

Take this example:
JMS.queue("in").map(msg -> processMsg(msg)).consume(System.out::println)

If processMsg fails with an exception processing should not continue and the message should not be printed to the console, and at the same time this message should be nacked by the JMS.queue publisher.

Discussed a couple of options with @smaldini , but none of them ideal or even easily workable..

  1. Use Value holder for success/error value. This approach isn't suitable because the next subscriber will be notified and this conflicts with the requirement above.
  2. Use Exceptions.bubble(e). This may work, but has a number of significant concerns:
    i) Throwing exceptions upstream like this is really reserved for fatal exceptions only
    ii) This approach requires the use of try/catch blocks and is not a programming model that is really compatible with everything else.
    iii) What happens wen there are thread boundaries.
    iv) #74
  3. Use a ErrorHandler within Message
    A more manual approach is to pass a ErrorHandler within the message being processed which is invoked on error. But this has it's own issues:
    i) If errors aren't thrown as exception then onNext() returning cannot be used to ack, so CompletionHandler is needed rather than ErrorHandler.
    ii) How/when is onSucess() invoked? would need to add something non-transparent to the end of the stream to do this.
    iii) How/when is onError() invoked? Can't just do it at the end of stream..

Common interface for Flux/Mono that defines common operators

This would help to ensure/enforce consistency between common operators on Flux/Mono, but before considering this need to:

  1. Determine if there is a performance impact in doing this over final methods in Flux/Mono as used currently.
  2. Decide what the best/preferred approach for treating streams (Mono/Flux) generically is. Is a common interface that extends Publisher preferred, or in fact is the best way to treat stream generically to wrap them in a Flux using Flux.from(). Flux is after all a less specialized version of Mono anyway, given it's a 0..n stream.
  1. will determine if this issue is even considered. 2) will help determine is a common interface should/shouldn't extend Publisher.

repeatWhenEmpty() versions show different behaviour

We tried switching cf-java-client from using .repeatWhenEmpty() with a maxRepeat value, to using the version that just takes a repeatFactory. This leads to a number of test failures caused by 'insufficient' mocking in the tests - a test that successfully provided two responses to a repeated request (example) before our change fails with a null pointer exception because it's being asked to supply a third response with the new approach.

For now we're using the two parameter version with MAXINT-1 as a workaround (example). Should the two versions of .repeatWhenEmpty() differ in this way?

Add `cache` operator

In spring-reactive WebSessionManager returns a WebSession. In turn that's used from ServerWebExchange.getSession() such that multiple calls to getSession() should always return the same Mono and that should provide the same WebSession instance to all subscribers.

I think this is a common pattern to lazily initialize a class field such as a Mono and/or Flux where all subscribers should get the same values.

Improve handling of specific Throwable types

Right now it is a bit cumbersome to handle an error of a specific type.

For example, if I need Mono.otherwise to handle only a specific error I just do:

.otherwise( t -> {
    if(t instanceof SpecificException) {
        return ...
    }
    return Mono.error(t);
}

It would be nice if we had something similar to Flux.doOnError(Class<E> exceptionType, Consumer<E> onError).

There are likely other places that sort of handling would be nice.

Fluxes combined with Flux#combineLatest do not publish at the same pace

Example snippet:

    static class Iter implements Iterator<Long> {
        long count = 0;
        @Override public boolean hasNext() { return true; }
        @Override public Long next() {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return count++; }

    };


        final Flux<Long> s1 = Flux.fromIterable(Iter::new).publishOn(SchedulerGroup.single());
        final Flux<Long> s2 = Flux.fromIterable(Iter::new).publishOn(SchedulerGroup.single());


        Flux.combineLatest(
                v-> Tuple.of((long)v[0],(long)v[1]), 1,
                s1, s2 //, (l,r) -> Tuple.of(l,r)
        ).consume(t -> System.out.printf("(%d,%d)\n", t.getT1(),t.getT2()));

Using e5a23a2 ; I would expect to see a pair of approx the same values on the two sides, instead there are sometimes large gaps between the values, e.g.:


(1438,5)
(1439,5)
(1440,5)
(1441,5)
(1442,5)
(1443,5)
(1444,5)
(1445,5)
(1446,5)
(1447,5)
(1448,5)
(1449,5)
(1450,5)
(1451,5)
(1452,5)
(1453,5)
(1454,5)
(1455,5)

I'm not sure if I'm doing something wrong here

RxJava's PublishSubject equivalent in reactor-core

As seen on a Stackoverflow question, apparently there is not a equivalent Publisher in reactor-core.

So far, I haven't been able to migrate this behaviour from RxJava:

  • PublishSubject that observes a real time sensor (using subscribeOn).
  • Several Subscribers that subscribe to that PublishSubject. Since they get real time data they don't want any prefetched values.

This is one of the ways I tried (among others):

        Flux<Float> randomNumberGenerator = Flux.<Float>yield( consumer -> {
            SecureRandom sr = new SecureRandom();
            int i = 1;
            while(true){
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                Emission emission = consumer.emit(sr.nextFloat());
            }
        });
        randomNumberGenerator.log().subscribeOn(Computations.concurrent()).publishOn(Computations.concurrent()).subscribe();    

        Thread.sleep(6000);
        System.out.println("WAKE UP");
        RnApp app = new RnApp("APP");
        RnApp xxx = new RnApp("XXX");
        randomNumberGenerator.subscribe(app);
        randomNumberGenerator.subscribe(xxx);
        Thread.sleep(6000);
        System.out.println("WAKE UP 2"); 
        app.request(5);
        xxx.request(5);

In this case, the subscriber xxx never gets notified.

Map operator does not bubble up UpstreamException

Seems that FluxMap is missing Exceptions.throwIfFatal(e) in it's inner subscriber catch block which means that UpstreamExceptions aren't bubbled upstream and instead are propagated downstream.

Wondering if there is a way to refactor these inner subscribers so that it's not necessary for each and error operator to have to do these checks as seems to be the case now?

Rename Exceptions.fail*(e) methods

Exceptions.fail(e) and Exceptions.failUpstream(e) was relevant for the old behavior (throwing an exception) but not for the current (returning a wrapped exception). My proposal is to use Exceptions.wrap(e) and Exceptions.wrapUpstream(e), but feel free to propose better name.

Remove DependencyUtils#reactorVersion()

Since Reactor is now modular, all modules will not have the same version, so DependencyUtils#reactorVersion() makes not sense anymore. And removing it is from my POV a best practice since version should not be hardcoded like that in the code (not error prone during releases).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.