Giter Site home page Giter Site logo

reactor-addons's Introduction

Reactor Project

Join the chat at https://gitter.im/reactor/reactor

Download

Starting from 3.0, Reactor is now organized into multiple projects:

A set of compatible versions for all these projects is curated under a BOM ("Bill of Materials") hosted under this very repository.

Using the BOM with Maven

In Maven, you need to import the bom first:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>2024.0.0-M1</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

Notice we use the <dependencyManagement> section and the import scope.

Next, add your dependencies to the relevant reactor projects as usual, except without a <version>:

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

Using the BOM with Gradle

Gradle 5.0+

Use the platform keyword to import the Maven BOM within the dependencies block, then add dependencies to your project without a version number.

dependencies {
     // import BOM
     implementation platform('io.projectreactor:reactor-bom:2024.0.0-M1')

     // add dependencies without a version number
     implementation 'io.projectreactor:reactor-core'
}

Gradle 4.x and earlier

Gradle versions prior to 5.0 have no core support for Maven BOMs, but you can use Spring's gradle-dependency-management plugin.

First, apply the plugin from Gradle Plugin Portal (check and change the version if a new one has been released):

plugins {
    id "io.spring.dependency-management" version "1.0.11.RELEASE"
}

Then use it to import the BOM:

dependencyManagement {
     imports {
          mavenBom "io.projectreactor:reactor-bom:2024.0.0-M1"
     }
}

Then add a dependency to your project without a version number:

dependencies {
     compile 'io.projectreactor:reactor-core'
}

BOM Versioning Scheme

The BOM can be imported in Maven, which will provide a set of default artifact versions to use whenever the corresponding dependency is added to a pom without an explicitly provided version.

As the different artifacts versions are not necessarily aligned, the BOM represents a release train with an heterogeneous range of versions that are curated to work together. The artifact version follows the YYYY.MINOR.MICRO-QUALIFIER scheme since Europium, where:

  • YYYY is the year of the first GA release in a given release cycle (like 3.4.0 for 3.4.x)
  • .MINOR is a 0-based number incrementing with each new release cycle ** in the case of the BOM it allows discerning between release cycles in case two get first released the same year
  • .PATCH is a 0-based number incrementing with each service release
  • -QUALIFIER is a textual qualifier, which is omitted in the case of GA releases (see below)

On top of the artifact version, each release train has an associated codename, a chemical name from the Periodic Table of Elements in growing alphabetical order, for reference in discussions.

So far, the release trains code names are:

  • Aluminium for the 3.0.x generation of Reactor-Core (๐Ÿ’ก)
  • Bismuth for the 3.1.x generation (๐Ÿ’ก)
  • Californium for the 3.2.x generation (๐Ÿ’ก)
  • Dysprosium for the 3.3.x generation (๐Ÿ’ก)
  • Europium (2020.0) for the 3.4.x generation (๐Ÿ’ก)

NOTE: Up until Dysprosium, the BOM was versioned using a release train scheme with a codename followed by a qualifier, and the qualifiers were slightly different. For example: Aluminium-RELEASE (first GA release, would now be something like YYYY.0.0), Bismuth-M1, Californium-SR1 (service release would now be something like YYYY.0.1), Dysprosium-RC1, Dysprosium-BUILD-SNAPSHOT (after each patch, we'd go back to the same snapshot version. would now be something like YYYY.0.X-SNAPSHOT so we get 1 snapshot per PATCH).

Contributing, Community / Support

license

As hinted above, this repository is for hosting the BOM and for transverse issues only. Most of the time, if you're looking to open an issue or a PR, it should be done in a more specific repository corresponding to one of the actual artifacts.

All projects follow the same detailed contributing guidelines which you can find here.

This document also give some ways you can get answers to your questions.

Documentation

Detail of Projects

Reactor Core

Reactor Core

Reactive foundations for apps and frameworks and reactive extensions inspired API with Mono (1 element) and Flux (n elements) types

Reactor Netty

Reactor Netty

TCP and HTTP client and server.

Reactor Addons

Reactor Addons

Extra projects adding features to reactor:

Snapshot Artifacts

While Stable Releases are synchronized with Maven Central, fresh snapshot and milestone artifacts are provided in the repo.spring.io repositories.

To add this repo to your Maven build, add it to the <repositories> section like the following:

<repositories>
	<repository>
	    <id>spring-snapshot</id>
	    <name>Spring Snapshot Repository</name>
	    <url>https://repo.spring.io/snapshot</url>
	    <snapshots>
	        <enabled>true</enabled>
	    </snapshots>
	</repository>
</repositories>

To add it to your Gradle build, use the repositories configuration like this:

repositories {
	maven { url 'https://repo.spring.io/libs-snapshot' }
	mavenCentral()
}

You should then be able to import a -SNAPSHOT version of the BOM, like 2020.0.{NUMBER}-SNAPSHOT for the snapshot of the {NUMBER}th service release of 2020.0 (Europium).

Sponsored by VMware

reactor-addons's People

Contributors

akarnokd avatar aneveu avatar bclozel avatar berry120 avatar bsideup avatar chemicl avatar ctlove0523 avatar dannyjiang001 avatar dependabot[bot] avatar ericbottard avatar greg65236592 avatar gregturn avatar guillaumelamirand avatar kiwisincebirth avatar l2dy avatar lebannen avatar lucasdeabreu avatar olegdokuka avatar osi avatar pderop avatar poutsma avatar rajinisivaram avatar sdeleuze avatar simonbasle avatar smaldini avatar spencergibb avatar spring-builds avatar stylismo avatar violetagg avatar vy avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

reactor-addons's Issues

Introduce an `ValidatingPublisher`

The idea for an AssertPublisher would be to offer an alternative to using DirectProcessor in tests. It would expose:

  • methods to perform next, complete and error signals on demand
  • convenience methods, eg. to perform n onNext and terminate with onComplete in one call
  • assertion methods, eg. to check the requested amount

Optionally, it could authorize some "misbehaviors" wrt the RS spec. For instance, contrary to DirectProcessor it could authorize to perform next(T) despite the requested amount being 0.

Add latency expectations for signals

Some expectations could have a delay factor e.g.

ScriptedSubscriber.create()
                  .expectValues(Duration.ofSeconds(1), 1, 2, 3)
                  .expectComplete(Duration.ofSeconds(1))
                  .verify(flux);

How to test publisher derived from processors + buffer with StepVerifier?

Hello,

Recently, I start to use StepVerifier to test Flux and Monos in order to have less manual "waiting" during tests (also to speed up the tests). However, I am confused about how to make it work with processors + buffer. I always have the similar issue with most of the processors:

java.util.concurrent.RejectedExecutionException: Scheduler unavailable

@Test
public void testWithoutBuffer_whichPasses() {
    final UnicastProcessor<String> processor = UnicastProcessor.create();

    final Publisher<Integer> publisher = processor.map(String::length);

    processor.onNext("foo");
    processor.onNext("foobar");

    StepVerifier.create(publisher)
        .expectSubscription()
        .expectNext(3)
        .expectNext(6)
        .thenCancel()
        .log().verify();
}

@Test
public void testWithBuffer_whichDoesNotPass() {
    final UnicastProcessor<String> processor = UnicastProcessor.create();

    final Publisher<Integer> publisher = processor
        .bufferTimeout(5, Duration.ofSeconds(1))
        .map(words -> words.stream().map(String::length).collect(toList()))
        .flatMapIterable(Function.identity());

    processor.onNext("foo");
    processor.onNext("foobar");

    StepVerifier.withVirtualTime(() -> publisher)
        .expectSubscription()
        .thenAwait(Duration.ofSeconds(2))
        .expectNext(3)
        .expectNext(6)
        .thenCancel()
        .log().verify();
}

The first test pass, however, the second one with buffer doesn't.. I am wondering what's the correct usage of the StepVerifier withVirtualTime?

Full example here:
https://github.com/chengchen/rx-smoker/blob/master/src/test/java/com/edgelab/marketdata/consumer/UnicastProcessorTest.java

Thank you very much for the help!

ListenableFuture to Mono/Flux adapter

When trying to use reactor in our Spring based project, I had to transform from ListenableFuture to CompletableFuture to leverage async part of Reactor. It would be handy to have some sort of adapter. Any plans for this?

Tried to put together an example.

Request tracking is not complete

In StepVerifier, if a scenario wraps RequestEvent in SubscriptionTaskEvent, these request events are not taken into account for requested tracking. This results in errors about request overflow being thrown.
Eg. this test fails:

@Test
@SuppressWarnings("unchecked")
public void requestBufferDoesntOverflow() {
	LongAdder requestCallCount = new LongAdder();
	LongAdder totalRequest = new LongAdder();
	Flux<Integer> source = Flux.range(1, 10).hide()
	                           .doOnRequest(r -> requestCallCount.increment())
	                           .doOnRequest(totalRequest::add);

	StepVerifier.withVirtualTime(//start with a request for 1 buffer
			() -> source.bufferUntil(i -> i % 3 == 0), 1)
	            .expectSubscription()
	            .expectNext(Arrays.asList(1, 2, 3))
	            .expectNoEvent(Duration.ofSeconds(1))
	            .thenRequest(2)
	            .expectNext(Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9))
	            .expectNoEvent(Duration.ofSeconds(1))
	            .thenRequest(3)
	            .expectNext(Collections.singletonList(10))
	            .expectComplete()
	            .verify();

	assertThat(requestCallCount.intValue()).isEqualTo(11); //10 elements then the completion
	assertThat(totalRequest.longValue()).isEqualTo(11L); //ignores the main requests
}

(this test could serve as issue validation in StepVerifierTests)

Add ScriptedSubscriber#then

Consider adding arbitrary scripted tasks for use cases like :

DirectProcessor<Void> p = DirectProcessor.create();
ScriptedSubscriber.create(0)
          .doRequest(3)
          .expectValues(1, 2, 3)
          .then(p::onComplete)
          .expectComplete()
          .verify(flux.takeUntil(p))

prevent expectNextCount/expectNextSequence to consider values asserted by other expectXXX

right now, the following test succeeds:

@Test
public void expectNextCount2elements() {
	Flux<String> flux = Flux.just("foo", "bar");
	StepVerifier.create(flux)
	            .expectNext("foo", "bar")
	            .expectNextCount(2)
	            .verifyComplete();
}

It means that the expectNextCount counts the "foo" and "bar" values asserted by expectNext.

But the following test fails:

@Test
public void expectNextCount4elements() {
	Flux<String> flux = Flux.just("foo", "bar", "foz", "baz");
	StepVerifier.create(flux)
	            .expectNext("foo", "bar")
	            .expectNextCount(2)
	            .verifyComplete();
}

My gut feeling is that the inverse should be true: expectNextCount could then be used in conjunction with expectNext (or other onNext expectations) in order to "skip" over an amount of onNext notifications, eg. asserting the first 5 elements and last element in a sequence of 20 elements in details, but only care about the number of elements in the middle:

@Test
public void expectNextCountRange() {
    Flux<Integer> flux = Flux.range(1, 20);
    StepVerifier.create(flux)
                .expectNext(1, 2, 3, 4, 5) //<- assert first five
                .expectNextCount(14) //<- only care that elements 6-19 are there
                .expectNext(20) //<- assert last element
                .verifyComplete();
}

StepVerifier should explicitly reset VirtualTimeScheduler

When using multiple tests with VirtualTimeScheduler and StepVerifier, a problem with the way the resources are cleaned up has been discovered.

The first test would instantiate a dedicated VirtualTimeScheduler and immediately dispose() it.

The second test would not instantiate any specific vts, but would use withVirtualTime.

Despite that, the second test would see REJECTED executions, coming from the same vts as the previous test.

This is due to the fact that StepVerifier enables the first vts, but never cleans it up. It relies on the shutdown method doing it, but in the first test this is prevented due to the vts being shutdown a first time before the StepVerifier enables it:

  1. vts is created
  2. vts is shut down, setting an internal flag
  3. StepVerifier enables the vts (despite it being shut down), the factory using it is created and the CURRENT reference is set
  4. StepVerifier relies on vts.shutdown to clean up the CURRENT reference, but that path is shortcircuited by (2)
  5. Second test calls VirtualTimeScheduler, which in turn calls VirtualTimeScheduler.enable, which returns the CURRENT.
  6. The scheduler used is already shut down and keeps on REJECTing tasks...

Deprecate Pipes?

I'm afraid I can't maintain pipes anymore. Do you think it's worth to deprecate and remove? It never really took off, either, so it won't be such big of a loss...

AsynApender stop appending Logs when RingBuffer is full

Hi,

If the subscriber is unable to write logs because of any reason the logger doAppend stuck in an infinite waiting loop because the writer cannot surpass the reader. Below is the is the thread dump captured in that moment.

java.lang.Thread.State: WAITING
	  at sun.misc.Unsafe.park(Unsafe.java:-1)
	  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:338)
	  at reactor.core.publisher.MultiProducerRingBuffer.next(RingBuffer.java:1850)
	  at reactor.core.publisher.MultiProducerRingBuffer.next(RingBuffer.java:1816)
	  at reactor.core.publisher.UnsafeRingBuffer.next(RingBuffer.java:1576)
	  at reactor.core.publisher.EventLoopProcessor.onNext(EventLoopProcessor.java:445)
	  at reactor.logback.AsyncAppender.queueLoggingEvent(AsyncAppender.java:238)
	  at reactor.logback.AsyncAppender.doAppend(AsyncAppender.java:104)
	  at reactor.logback.AsyncAppender.doAppend(AsyncAppender.java:46)
	  at ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:48)
	  at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:273)
	  at ch.qos.logback.classic.Logger.callAppenders(Logger.java:260)
	  at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:442)
	  at ch.qos.logback.classic.Logger.filterAndLog_0_Or3Plus(Logger.java:396)
	  at ch.qos.logback.classic.Logger.debug(Logger.java:503)
	  at com.non.reactive.bank.Application.main(Application.java:18)

Also, below the code snippet where it stuck, taken from reactor RingBuffer:

@Override
	public long next(int n)
	{
		if (n < 1)
		{
			throw new IllegalArgumentException("n must be > 0");
		}

		long current;
		long next;

		do
		{
			current = cursor.getAsLong();
			next = current + n;

			long wrapPoint = next - bufferSize;
			long cachedGatingSequence = gatingSequenceCache.getAsLong();

			if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
			{
				long gatingSequence = RingBuffer.getMinimumSequence(gatingSequences, current);

				if (wrapPoint > gatingSequence)
				{
					if(spinObserver != null) {
						spinObserver.run();
					}
					LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
					continue;
				}

				gatingSequenceCache.set(gatingSequence);
			}
			else if (cursor.compareAndSet(current, next))
			{
				break;
			}
		}
		while (true);

		return next;
	}

I can see that that the wait strategy is not being used to handle this situation. Is there any way to get out of this situation either by throwing any timeout exception or anything else rational. I know we may lose logs for some time but at least the requesting threads will not hang because of this issue.

StepVerifier#withVirtualTime(Supplier) stucks

I started to learn reactor with this.
Here I stuck with StepVerifier tests. So in this example I modified the original test a bit just to make it quicker.

@Test
public void countWithVirtualTime() {
    expect3600Elements(() -> Flux.interval(Duration.ofMills(1)).take(5));
}

void expect3600Elements(Supplier<Flux<Long>> supplier) {
    StepVerifier.withVirtualTime(supplier)
                .expectNextCount(5)
                .expectComplete()
                .verify();
}

Maybe I'm doing something wrong here but it goes to infinite loop in DefaultVerifySubscriber#pollTaskEventOrComplete.

The analogous test just works fine:

@Test
public void count() {
    stepVerifierTesting.expect10Elements(Flux.interval(Duration.ofSeconds(1)).take(10));
}

void expect10Elements(Flux<Long> flux) {
    StepVerifier.create(flux)
                .expectNextCount(10)
                .expectComplete()
                .verify();
}

Maybe I misunderstood StepVerifier#withVirtualTime but it looks the issue for me.

Add StepVerifier expectations for Hooks

This could set up the relevant hooks at the beginning of verify, then correctly always reset them at the end.
MVP probably expectErrorDropped(Throwable...) and expectValueDropped(Object...), have the hook capture to a collection (as several drops can happen in a row, at least for values).

Provide more relevant stacktrace in StepVerifier

The stacktrace printed by default by DefaultStepVerifierBuilder#fail() seems not very relevant nor useful.

In my use case my test is (Kotlin code):

kotlin
Hooks.onOperator<Any> { h -> h.operatorStacktrace() }
StepVerifier.create(response.flatMap{ r -> r.bodyToFlux(User::class)})
                .consumeNextWith {
                    assert(it == User(1L, "Robert")) }
                .consumeNextWith { assert(it == User(2L, "Raide"))  }
                .consumeNextWith { assert(it == User(3L, "Ford")) }
                .expectComplete()
                .verify()

I get that output:

java.lang.AssertionError: expectation "consumeNextWith" failed (expected: onNext(); actual: onError(java.lang.ArrayIndexOutOfBoundsException: -1))

	at reactor.test.DefaultStepVerifierBuilder.failPrefix(DefaultStepVerifierBuilder.java:1653)
	at reactor.test.DefaultStepVerifierBuilder.fail(DefaultStepVerifierBuilder.java:1649)
	at reactor.test.DefaultStepVerifierBuilder.lambda$consumeNextWith$1(DefaultStepVerifierBuilder.java:139)
	at reactor.test.DefaultStepVerifierBuilder$SignalEvent.test(DefaultStepVerifierBuilder.java:1409)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onSignal(DefaultStepVerifierBuilder.java:1050)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onExpectation(DefaultStepVerifierBuilder.java:996)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onError(DefaultStepVerifierBuilder.java:740)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:322)
	at reactor.core.publisher.MonoFlatMap$FlattenSubscriber$InnerSubscriber.onError(MonoFlatMap.java:196)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:322)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:140)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:322)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:794)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:558)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:538)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onError(FluxFlatMap.java:517)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:356)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:316)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:173)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:316)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:316)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
	at reactor.ipc.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:185)
	at reactor.ipc.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:291)
	at reactor.ipc.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:284)
	at reactor.ipc.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:493)
	at reactor.ipc.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:112)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:574)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:488)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
	at java.lang.Thread.run(Thread.java:745)

And to have the relevant stacktrace that show me the origin of the error, I have to call signal.getThrowable().printStackTrace() from consumeNextWith():

java.lang.ArrayIndexOutOfBoundsException: -1
	at io.netty.buffer.HeapByteBufUtil.getByte(HeapByteBufUtil.java:24)
	at io.netty.buffer.UnpooledHeapByteBuf._getByte(UnpooledHeapByteBuf.java:323)
	at io.netty.buffer.UnpooledHeapByteBuf.getByte(UnpooledHeapByteBuf.java:318)
	at org.springframework.http.codec.json.JsonObjectDecoder$1.apply(JsonObjectDecoder.java:140)
	at org.springframework.http.codec.json.JsonObjectDecoder$1.apply(JsonObjectDecoder.java:103)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:353)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:316)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:173)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:316)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:316)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
	at reactor.ipc.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:185)
	at reactor.ipc.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:291)
	at reactor.ipc.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:284)
	at reactor.ipc.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:493)
	at reactor.ipc.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:112)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:574)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:488)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
	at java.lang.Thread.run(Thread.java:745)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxFlatMap] :
	reactor.core.publisher.Flux.flatMap(Flux.java:3401)
	reactor.core.publisher.Flux.flatMap(Flux.java:3378)
	reactor.core.publisher.Flux.flatMap(Flux.java:3335)
	org.springframework.http.codec.json.JsonObjectDecoder.decode(JsonObjectDecoder.java:103)
	org.springframework.http.codec.json.Jackson2JsonDecoder.decodeInternal(Jackson2JsonDecoder.java:114)
	org.springframework.http.codec.json.Jackson2JsonDecoder.decode(Jackson2JsonDecoder.java:82)
	org.springframework.http.codec.DecoderHttpMessageReader.read(DecoderHttpMessageReader.java:75)
	org.springframework.http.codec.BodyExtractors.lambda$null$2(BodyExtractors.java:91)
	java.util.Optional.map(Optional.java:215)
	org.springframework.http.codec.BodyExtractors.readWithMessageReaders(BodyExtractors.java:120)
	org.springframework.http.codec.BodyExtractors.lambda$toFlux$3(BodyExtractors.java:89)
	org.springframework.web.client.reactive.DefaultClientResponse.body(DefaultClientResponse.java:72)
	org.springframework.web.client.reactive.DefaultClientResponse.bodyToPublisher(DefaultClientResponse.java:102)
	org.springframework.web.client.reactive.DefaultClientResponse.bodyToFlux(DefaultClientResponse.java:87)
	mixit.support.ExtensionsKt.bodyToFlux(Extensions.kt:53)
	mixit.integration.UserIntegrationTests$findAll$1.apply(UserIntegrationTests.kt:31)
	mixit.integration.UserIntegrationTests$findAll$1.apply(UserIntegrationTests.kt:16)
	reactor.core.publisher.MonoFlatMap$FlattenSubscriber.onNext(MonoFlatMap.java:121)
	reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:124)
	reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:173)
	reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:124)
	reactor.core.publisher.FluxResume$ResumeSubscriber.onNext(FluxResume.java:71)
	reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:124)
	reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.onNext(FluxRetryPredicate.java:78)
	reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:95)
	reactor.ipc.netty.channel.PooledClientContextHandler.fireContextActive(PooledClientContextHandler.java:73)
	reactor.ipc.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:474)
	reactor.ipc.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:112)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
	io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
	io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:574)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:488)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450)
	io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
Composition chain until failing Operator :
	|_	Flux.flatMap(JsonObjectDecoder.java:103)
	|_	Flux.map(Jackson2JsonDecoder.java:115)

It should be nice to have that information by default in the console after a failed test.

Provide a convenience method for doing both a final `expectXXX` and a `verify`

Having the StepVerifier layer, as currently returned by methods from FinalStep (expectComplete, expectError, thenCancel), is still necessary:

  • there is an overload with a timeout Duration
  • there is the possibility to trigger log()ging
  • in the future it could be possible to get a Subscriber rather than triggering subscription and verification...

That said, concern has been raised that the verify() step is easy to forget. Maybe adding a convenience sibling method to all these FinalStep methods that also triggers a verification would help. It would only cover the case where no log() is triggered and no timeout is used.

For example, verifyComplete() would call .expectComplete().verify()...

Non relevant error message with consumeNextWith()

With the following test:

StepVerifier.create(response.flatMap{ r -> r.bodyToFlux(Event::class)})
                    .consumeNextWith { assertEquals(2012, it.year) }
                    .consumeNextWith { assertEquals(2013, it.year) }
                    .consumeNextWith { assertEquals(2014, it.year) }
                    .consumeNextWith { assertEquals(2015, it.year) }
                    .consumeNextWith { assertEquals(2016, it.year) }
                    .consumeNextWith { assertEquals(2017, it.year) }
                    .expectComplete()
                    .verify()

I get the following not very useful error message:

Expectation failed (failed running expectation on signal [onNext(FluxError)] with [java.lang.ClassCastException]:
reactor.core.publisher.FluxError cannot be cast to mixit.model.sponsor.Event)
java.lang.AssertionError: expectation failed (failed running expectation on signal [onNext(FluxError)] with [java.lang.ClassCastException]:
reactor.core.publisher.FluxError cannot be cast to mixit.model.sponsor.Event)
	at reactor.test.DefaultStepVerifierBuilder.failPrefix(DefaultStepVerifierBuilder.java:1679)
	at reactor.test.DefaultStepVerifierBuilder.fail(DefaultStepVerifierBuilder.java:1675)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onExpectation(DefaultStepVerifierBuilder.java:1017)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onNext(DefaultStepVerifierBuilder.java:764)
	at reactor.core.publisher.MonoFlatMap$FlattenSubscriber$InnerSubscriber.onNext(MonoFlatMap.java:191)

It would expect the message + stacktrace of the error instead of this class cast error.

let Verifier work with a simple VirtualTimeScheduler instance

As of now the Verifier can be passed a Supplier<VirtualTimeScheduler> but it doesn't entirely manage it, meaning that it is still the responsibility of the developer to correctly call VirtualTimeScheduler#enable rather than the simpler factory method VirtualTimeScheduler#create.

enable could be reworked to accept an existing VirtualTimeScheduler instance though, and it could be made idempotent if called twice with the same instance.

It would allow in turn to let the default verifier implementation capture a VirtualTimeScheduler in its verify method and enable it (if the scheduler was supplied through enable already, idempotency kicks in, otherwise the scheduler is correctly enabled and used by the factories).

Rework ScriptedSubscriber virtual time API

  • Add ScriptedSubscriber#withVirtualTime instead of ScriptedSubscriber#enableVirtualTime.
  • Add ValueBuilderWithTime or similar to expose advanceTimeXxx.
  • Eventually AutoReset TestScheduler after verify.
  • Apply tweaks to TestScheduler (name, factory behavior).

reactor-logback does not stopping properly when servlet container shutdown

After upgrade reactor-logback to latest version in last week, i find a strange behaviour on reactor-logback's appender: the reactor.logback.AsyncAppender create some non-daemon thread for log consuming but didn't close it properly.
After call tomcat's shutdown script, using jstack to print thread-stacks get this:

"logger-2" #19 prio=5 os_prio=0 tid=0x00007f2602874800 nid=0x20c9 waiting on condition [0x00007f268248f000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000000827799a0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at reactor.jarjar.com.lmax.disruptor.LiteBlockingWaitStrategy.waitFor(LiteBlockingWaitStrategy.java:56)
    at reactor.jarjar.com.lmax.disruptor.ProcessingSequenceBarrier.waitFor(ProcessingSequenceBarrier.java:55)
    at reactor.core.processor.RingBufferWorkProcessor$WorkSignalProcessor.run(RingBufferWorkProcessor.java:765)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

"logger-1" #18 prio=5 os_prio=0 tid=0x00007f2602874000 nid=0x20c8 waiting on condition [0x00007f2682590000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000008277a008> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at reactor.jarjar.com.lmax.disruptor.LiteBlockingWaitStrategy.waitFor(LiteBlockingWaitStrategy.java:56)
    at reactor.jarjar.com.lmax.disruptor.ProcessingSequenceBarrier.waitFor(ProcessingSequenceBarrier.java:55)
    at reactor.core.processor.RingBufferWorkProcessor$WorkSignalProcessor.run(RingBufferWorkProcessor.java:765)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

"Attach Listener" #24910 daemon prio=9 os_prio=0 tid=0x00007f2640001000 nid=0x3975 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"DestroyJavaVM" #24909 prio=5 os_prio=0 tid=0x00007f26a4178000 nid=0x2086 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Abandoned connection cleanup thread" #20 daemon prio=5 os_prio=0 tid=0x00007f2617320800 nid=0x20ce in Object.wait() [0x00007f268175c000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
    - locked <0x0000000087abb8f8> (a java.lang.ref.ReferenceQueue$Lock)
    at com.mysql.jdbc.AbandonedConnectionCleanupThread.run(AbandonedConnectionCleanupThread.java:43)

"GC Daemon" #13 daemon prio=2 os_prio=0 tid=0x00007f26a43f5000 nid=0x209c in Object.wait() [0x00007f26901cd000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x00000000825760a0> (a sun.misc.GC$LatencyLock)
    at sun.misc.GC$Daemon.run(GC.java:117)
    - locked <0x00000000825760a0> (a sun.misc.GC$LatencyLock)

"AsyncFileHandlerWriter-1304836502" #12 daemon prio=5 os_prio=0 tid=0x00007f26a4156000 nid=0x2099 waiting on condition [0x00007f2690a45000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000008209fbf8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at java.util.concurrent.LinkedBlockingDeque.pollFirst(LinkedBlockingDeque.java:522)
    at java.util.concurrent.LinkedBlockingDeque.poll(LinkedBlockingDeque.java:684)
    at org.apache.juli.AsyncFileHandler$LoggerThread.run(AsyncFileHandler.java:145)

The version of reactor: 2.0.8.RELEASE

Consistent argument order between StepVerifier create() and withVirtualTime()

In StepVerifier#create() and StepVerifier#withVirtualTime(), the position of the long n parameter should be consistent (currently it is in last position in create(), and in first position in withVirtualTime().

I suggest to deprecate in 3.0 StepVerifier#withVirtualTime(long n, Supplier scenarioSupplier) and StepVerifier#withVirtualTime(long n, Supplier scenarioSupplier, Supplier vtsLookup) and add new StepVerifier#withVirtualTime(Supplier scenarioSupplier, long n) and StepVerifier#withVirtualTime(Supplier scenarioSupplier, long n, Supplier vtsLookup) static methods.

The deprecate methods should be removed in Reactor 3.1.

EventBus ThreadPool sometimes executes in producer thread

Hi again,

Another v1 to v2 migration issue here.

I'm using eventBus.notify(event) where my eventBus is using a ThreadPoolExecutorDispatcher (Spring Configuration at the end). I see that my events are being consumed and processed by the 5 threads in my thread pool. However, every so often the consumer is being executed in the event origin thread.

This is not the expected behavior and seems to be no to v2. What's going on here?

I think this might be related to this unresolved conversation from last year: "THREAD POOL is not working with different threads"

I can't figure out the take away from that discussion though.

To be honest we can't figure out what's going on with Reactor. This v1 -> v2 rewrite seems to have made a mess of thing. Changing the APIs without providing the documentation isn't really how things should be done. One of your main portals is that Spring promoted this project as an asynchronous event bus.. but that example code no longer works, eek!

I hate to say it but our company will surely be moving away from Reactor going forward. Still, perhaps this is a bug that can be fixed, or a "feature" that should be documented. Hopefully this issue report helps someone.

Good luck.

private static final int THREAD_POOL_SIZE = 5;
private static final int BACKLOG_SIZE = 1024;
private static final String THREAD_NAME = "my-reactor";

@Bean
public EventBus reactor(Environment env) {
    LOG.debug("Creating reactor..");
    return new EventBusSpec()
            .env(env)
                .dispatcher(reactorDispatcher())
                .consumerNotFoundHandler(notificationKey -> {
                    if(notificationKey != null) {
                        String key = notificationKey.toString();
                        if(key.startsWith(ExternalDataEvent.SELECTOR)) {
                            LOG.error("No consumer found for {}", key);
                        }
                    }
                })
                .broadcastEventRouting()
                .uncaughtErrorHandler(error -> LOG.error(
                        "An error occurred while processing event: {}", error.getMessage(), error))
                .get();
}

@Bean
public Dispatcher reactorDispatcher() {
    LOG.debug("Creating thread pool dispatcher with thread prefix '{}'", THREAD_NAME);
    return new ThreadPoolExecutorDispatcher(THREAD_POOL_SIZE, BACKLOG_SIZE, THREAD_NAME);
}

[Open Discussion] general feedback

Encouraged by https://twitter.com/projectreactor/status/788678127606173696 I'd like to start an open discussion as an outsider to the project that's interested in using the proposed API.

  1. Like @smaldini my background is rotted in the Groovy community so I was a bit confused when I sa the name for ScriptedSubscriber. I though it was related to scripting capabilities (Groovy, BSF, JSR-223, etc).

  2. Per https://github.com/reactor/reactor-addons/blob/master/reactor-test/src/test/java/reactor/test/subscriber/ScriptedSubscriberIntegrationTests.java#L134-L142 the usage of @Test(expected = AssertionError.class) is to coarse IMHO. We're expecting a test to fail, so it can fail for any reason, not necessarily because of the intended reason. The linked testcase may fail because the developer added more expectedValue() calls, changed expectedValue('foo') to expectedValue('bar'), or simply by adding assertTrue(false) at the top of the test method.

  3. I'd like to see additional expect methods that take Hamcrest Matcher as argument.

  4. Would it make sense to rename verify to verifyOn ?

  5. Would it make sense to turn around the start/termination methods (create and verify) such that the script reads as follows:

    ScriptedSubscriber.subscribeOn(flux)
    .expectValue("foo")
    .expecteComplete()
    .verify();

Or is it the intention that a "script" may be applied to multiple observable sources? If so then verify(src) makes more sense.

CircuitBreaker

Hello,

I've developed a CircuitBreaker alternative to Hystrix which is designed for functional programming.
https://github.com/RobWin/javaslang-circuitbreaker
I'm currently developing a custom RxJava operator for this library, so that I can attach a CircuitBreaker to any Single/Mono or Flowable/Flux.

CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("backendName");
Flowable.fromCallable(backendService::doSomething)
    .lift(CircuitBreakerOperator.of(circuitBreaker))

I would like to develop a custom operator for Spring Reactor, but I need your advice. I've spoke about my CircuitBreaker operator implementation with @smaldini and I think it still has some issues.

https://github.com/RobWin/javaslang-circuitbreaker/blob/master/src/main/java/io/github/robwin/circuitbreaker/operator/CircuitBreakerOperator.java

Kind regards,

Robert

What happened to reactor-alloc?

I came here looking for reactor-alloc as the main reactor project lists reactor-alloc under reactor-addons. I was going to ask about a RingBuffer based lock-free implementation of an allocator (for object pooling) but was surprised to see that reactor-alloc doesn't exist.

My interest in a lock-free allocator is related to my discussion here
https://groups.google.com/d/msg/reactor-framework/h2Dsm0USZjw/bGNotRyNCQAJ

If its not possible to avoid object creation when using reactor based messaging, then a lock free allocator would be useful to mitigate GC issues.

Allow to relax StepVerifier's check of under-requests, StepVerifierOptions

The StepVerifier is getting more and more complex, and assumes that it should perform a few sanity checks internally. In some scenarios, these assumptions don't apply and the checks should be skipped.

The first of these checks is the "under-request" one (that will prevent the verification if it thinks the request amount is too low compared to the expectations, but can sometimes be wrong about it.

A good way to do that is to introduce a StepVerifierOptions that would replace the current long initialRequest parameter, superseding it and additionally allowing to disable such checks.

AsyncAppender queueLoggingEvent method stuck when Ringbuffer is full with unread events

Hi,

If the subscriber is unable to write logs because of any reason the logger doAppend stuck in an infinite waiting loop because the writer cannot surpass the reader. Below is the is the thread dump captured in that moment.

java.lang.Thread.State: WAITING
	  at sun.misc.Unsafe.park(Unsafe.java:-1)
	  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:338)
	  at reactor.core.publisher.MultiProducerRingBuffer.next(RingBuffer.java:1850)
	  at reactor.core.publisher.MultiProducerRingBuffer.next(RingBuffer.java:1816)
	  at reactor.core.publisher.UnsafeRingBuffer.next(RingBuffer.java:1576)
	  at reactor.core.publisher.EventLoopProcessor.onNext(EventLoopProcessor.java:445)
	  at reactor.logback.AsyncAppender.queueLoggingEvent(AsyncAppender.java:238)
	  at reactor.logback.AsyncAppender.doAppend(AsyncAppender.java:104)
	  at reactor.logback.AsyncAppender.doAppend(AsyncAppender.java:46)
	  at ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:48)
	  at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:273)
	  at ch.qos.logback.classic.Logger.callAppenders(Logger.java:260)
	  at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:442)
	  at ch.qos.logback.classic.Logger.filterAndLog_0_Or3Plus(Logger.java:396)
	  at ch.qos.logback.classic.Logger.debug(Logger.java:503)
	  at com.non.reactive.bank.Application.main(Application.java:18)

Also, below is the code snippet where the requesting thread stuck. The code snippet is taken from reactor RingBuffer:

@Override
	public long next(int n)
	{
		if (n < 1)
		{
			throw new IllegalArgumentException("n must be > 0");
		}

		long current;
		long next;

		do
		{
			current = cursor.getAsLong();
			next = current + n;

			long wrapPoint = next - bufferSize;
			long cachedGatingSequence = gatingSequenceCache.getAsLong();

			if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
			{
				long gatingSequence = RingBuffer.getMinimumSequence(gatingSequences, current);

				if (wrapPoint > gatingSequence)
				{
					if(spinObserver != null) {
						spinObserver.run();
					}
					LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
					continue;
				}

				gatingSequenceCache.set(gatingSequence);
			}
			else if (cursor.compareAndSet(current, next))
			{
				break;
			}
		}
		while (true);

		return next;
	}

I can see that that the wait strategy is not being used to handle this situation. Is there any way to get out of this situation either by throwing any timeout exception or anything else rational. I know we may loose logs for some time but at least the requesting threads will not hang because of this issue.

improve detection of hung expectations

Either check the request amount >= expect amount during conflation or add a scheduled task that logs a particular expectation is hung for n seconds (cancelled in finally of onExpectations)

Example of hanging: request(2).expectNextCount(3)

Consumer Callbacks (v1 to v2 differences)

In Reactor v1.x we had a nice notify with a callback function which has been removed in v2.x. I'm sure there's a reason for this, however, as someone migrating to v2 I found myself lost.

Here's a unit test Gist to show what I tried, what works, and what doesn't:
https://gist.github.com/connollyst/da63998f65c309a442d8228ef61f7453

The gist of the gist..

// In reactor v1.x, we used to have..
reactor.notify("say.hi", Event.wrap("Hello World!"), call::back);

// In reactor v2.x it's gone, we have something similar in..
bus.sendAndReceive("say.hi", Event.wrap("Hello World!"), call::back);
// .. but this only works if my callback is set up with bus.receive, mine is bus.on

// events have a 'replyTo', lets try that..
bus.on($("callback"), call::back);
bus.notify("say.hi", Event.wrap("Hello World!", "callback"));
// .. nope, doesn't get called

Any thoughts?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.