Giter Site home page Giter Site logo

reactor / reactor-netty Goto Github PK

View Code? Open in Web Editor NEW
2.5K 2.5K 622.0 23.71 MB

TCP/HTTP/UDP/QUIC client/server with Reactor over Netty

Home Page: https://projectreactor.io

License: Apache License 2.0

HTML 0.03% CSS 0.16% Java 99.81%
flux http ipc mono netty quic reactive reactive-streams reactor reactor3 tcp udp

reactor-netty's Introduction

Reactor Project

Join the chat at https://gitter.im/reactor/reactor

Download

Starting from 3.0, Reactor is now organized into multiple projects:

A set of compatible versions for all these projects is curated under a BOM ("Bill of Materials") hosted under this very repository.

Using the BOM with Maven

In Maven, you need to import the bom first:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>2024.0.0-M1</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

Notice we use the <dependencyManagement> section and the import scope.

Next, add your dependencies to the relevant reactor projects as usual, except without a <version>:

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

Using the BOM with Gradle

Gradle 5.0+

Use the platform keyword to import the Maven BOM within the dependencies block, then add dependencies to your project without a version number.

dependencies {
     // import BOM
     implementation platform('io.projectreactor:reactor-bom:2024.0.0-M1')

     // add dependencies without a version number
     implementation 'io.projectreactor:reactor-core'
}

Gradle 4.x and earlier

Gradle versions prior to 5.0 have no core support for Maven BOMs, but you can use Spring's gradle-dependency-management plugin.

First, apply the plugin from Gradle Plugin Portal (check and change the version if a new one has been released):

plugins {
    id "io.spring.dependency-management" version "1.0.11.RELEASE"
}

Then use it to import the BOM:

dependencyManagement {
     imports {
          mavenBom "io.projectreactor:reactor-bom:2024.0.0-M1"
     }
}

Then add a dependency to your project without a version number:

dependencies {
     compile 'io.projectreactor:reactor-core'
}

BOM Versioning Scheme

The BOM can be imported in Maven, which will provide a set of default artifact versions to use whenever the corresponding dependency is added to a pom without an explicitly provided version.

As the different artifacts versions are not necessarily aligned, the BOM represents a release train with an heterogeneous range of versions that are curated to work together. The artifact version follows the YYYY.MINOR.MICRO-QUALIFIER scheme since Europium, where:

  • YYYY is the year of the first GA release in a given release cycle (like 3.4.0 for 3.4.x)
  • .MINOR is a 0-based number incrementing with each new release cycle ** in the case of the BOM it allows discerning between release cycles in case two get first released the same year
  • .PATCH is a 0-based number incrementing with each service release
  • -QUALIFIER is a textual qualifier, which is omitted in the case of GA releases (see below)

On top of the artifact version, each release train has an associated codename, a chemical name from the Periodic Table of Elements in growing alphabetical order, for reference in discussions.

So far, the release trains code names are:

  • Aluminium for the 3.0.x generation of Reactor-Core (๐Ÿ’ก)
  • Bismuth for the 3.1.x generation (๐Ÿ’ก)
  • Californium for the 3.2.x generation (๐Ÿ’ก)
  • Dysprosium for the 3.3.x generation (๐Ÿ’ก)
  • Europium (2020.0) for the 3.4.x generation (๐Ÿ’ก)

NOTE: Up until Dysprosium, the BOM was versioned using a release train scheme with a codename followed by a qualifier, and the qualifiers were slightly different. For example: Aluminium-RELEASE (first GA release, would now be something like YYYY.0.0), Bismuth-M1, Californium-SR1 (service release would now be something like YYYY.0.1), Dysprosium-RC1, Dysprosium-BUILD-SNAPSHOT (after each patch, we'd go back to the same snapshot version. would now be something like YYYY.0.X-SNAPSHOT so we get 1 snapshot per PATCH).

Contributing, Community / Support

license

As hinted above, this repository is for hosting the BOM and for transverse issues only. Most of the time, if you're looking to open an issue or a PR, it should be done in a more specific repository corresponding to one of the actual artifacts.

All projects follow the same detailed contributing guidelines which you can find here.

This document also give some ways you can get answers to your questions.

Documentation

Detail of Projects

Reactor Core

Reactor Core

Reactive foundations for apps and frameworks and reactive extensions inspired API with Mono (1 element) and Flux (n elements) types

Reactor Netty

Reactor Netty

TCP and HTTP client and server.

Reactor Addons

Reactor Addons

Extra projects adding features to reactor:

Snapshot Artifacts

While Stable Releases are synchronized with Maven Central, fresh snapshot and milestone artifacts are provided in the repo.spring.io repositories.

To add this repo to your Maven build, add it to the <repositories> section like the following:

<repositories>
	<repository>
	    <id>spring-snapshot</id>
	    <name>Spring Snapshot Repository</name>
	    <url>https://repo.spring.io/snapshot</url>
	    <snapshots>
	        <enabled>true</enabled>
	    </snapshots>
	</repository>
</repositories>

To add it to your Gradle build, use the repositories configuration like this:

repositories {
	maven { url 'https://repo.spring.io/libs-snapshot' }
	mavenCentral()
}

You should then be able to import a -SNAPSHOT version of the BOM, like 2020.0.{NUMBER}-SNAPSHOT for the snapshot of the {NUMBER}th service release of 2020.0 (Europium).

Sponsored by VMware

reactor-netty's People

Contributors

akiraly avatar aneveu avatar anshlykov avatar bclozel avatar bsideup avatar chemicl avatar ctlove0523 avatar dependabot-preview[bot] avatar dependabot[bot] avatar ericbottard avatar jameschenx avatar jchenga avatar jdaru avatar lhotari avatar liyixin95 avatar olegdokuka avatar pderop avatar quanticc avatar raycoarana avatar roggenbrot avatar rstoyanchev avatar samueldlightfoot avatar simonbasle avatar smaldini avatar spring-builds avatar spring-operator avatar sullis avatar tamasperlaki avatar violetagg avatar yuzawa-san avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

reactor-netty's Issues

ByteBufFlux skipping bytes when reading from file channel

in the ByteBufFlux.java line 144 the code is erroneously updating the file channel read position after reading a chunk. The position is already updated during the read (line 140) so there is no need to update it again.
The current buggy implementation is skipping bytes as a result.

You can have a look at my fork here, specifically the ByteBufFluxTest.java which is replicating the bug and the ByteBufFlux.java where I added the comment above the buggy line.

I can fix the bug and create a pull request if you wish.

Have `addEncoder`/`addDecoder` skip if handler already exist, like legacy `addHandler`

addHandler used to check if there was an existing handler with the provided name, and skip the addition of the new handler if that was the case.
Since the move to addEncoder/addDecoder, this is no longer the case.

This should be fixed, for a minimum consistency with the previous API (even though the methods don't add at the same position as before).

Note that this is the inverse of #22 which would introduce a way of replacing instead of skipping.

Rework NettyContext user pipeline API

The current API has confusing semantics, misleading javadocs and bugged implementation.

The ultimate goal of the addHandler/addDecoder methods is to let the users simply add handlers in the "user part" of the ChannelPipeline, which could be represented as the middle part of the pipeline:

 [ [reactor codecs], [<- user ENCODERS added here, user DECODERS added here ->], [reactor handlers] ]

The addHandler method should be removed, and addEncoder method added.
Detection of where to place handlers can be achieved more easily by prefixing the NettyPipeline names with reactor.left or reactor.right:

  • addEncoder will put its handler just after the last reactor.left. handler in the pipeline
  • addDecoder will put its handler just before the first reactor.right. handler in the pipeline

Question: Half Closed Connections

I'm wondering if there is a way in reactor-netty to shut down the output (from the client to the server) once the data has been written to the channel. Shutting down the output means calling SocketChannel#shutdownOutput()

Essentially I have a Publisher that is producing the data to send. Shutting down the output when the Publisher finishes might be too early because there might be pending writes. Is there a way to intercept when all the writes have been completed?
My TCP server (on which I don't have control) is waiting for the shut shutdownOutput() signal before starting sending the response.

HTTP Client - EncoderException: unexpected message type: DefaultHttpRequest

After updating to reactor-netty 0.6.0.Release we started to get erro in Http Client, everything was working fine before. The error happens after the first request is made and keeps going the same behavior: one request is successfully processed and the next one fails. This issue happens when setting http content-length header in post request's with the following Exception:

io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: unexpected message type: DefaultHttpRequest at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:106) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:749) at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:812) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:825) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:805) at reactor.ipc.netty.channel.ChannelOperationsHandler.doWrite(ChannelOperationsHandler.java:245) at reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:372) at reactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:161) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:787) at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:813) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:825) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:805) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:842) at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1032) at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:296) at reactor.ipc.netty.http.HttpOperations.lambda$sendHeaders$0(HttpOperations.java:96) at reactor.ipc.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:114) at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:38) at reactor.core.publisher.MonoThenIgnore$MonoThenIgnoreMain.drain(MonoThenIgnore.java:166) at reactor.core.publisher.MonoThenIgnore.subscribe(MonoThenIgnore.java:54) at reactor.ipc.netty.NettyOutbound.subscribe(NettyOutbound.java:267) at reactor.ipc.netty.channel.ChannelOperations.applyHandler(ChannelOperations.java:465) at reactor.ipc.netty.http.client.HttpClientOperations.onHandlerStart(HttpClientOperations.java:415) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418) at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:312) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalStateException: unexpected message type: DefaultHttpRequest at io.netty.handler.codec.http.HttpObjectEncoder.encode(HttpObjectEncoder.java:69) at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:88) ... 27 common frames omitted

Please,we need a fix on this because we have to be able to work with content-length header in http client requests.

Consider removing default transfer-encoding header?

On the NettyHttpChannel class, we by default set the transfer-encoding header to chunked. This means that consumers need to call responseTransfer() or removeTransferEncodingChunked() to disable this behavior. Should we instead let the client decide whether they want to set the transfer-encoding or not?

At the moment, the spring-web-reactive class ReactorServerHttpResponse never calls responseTransfer() or removeTransferEncodingChunked() so all reactive based responses produced by spring-web are forced to include the transfer-encoding which should be an optional behavior.

Alternatively, we could consider updating spring-web's ReactorServerHttpResponse to call responseTransfer() or removeTransferEncodingChunked() to disable transfer-encoding in the presence of the conten-length header.

https://github.com/reactor/reactor-netty/blob/master/src/main/java/reactor/ipc/netty/http/NettyHttpChannel.java#L91

https://github.com/spring-projects/spring-framework/blob/master/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java#L87

p.s. I've created https://jira.spring.io/browse/SPR-14643 to potentially address this issue upstream.

`httpServerResponse.sendString(Mono.error(...))` stucks

reactor-netty doesn't handle httpServerResponse.sendString(Mono.error(new IllegalArgumentException())) correctly.

Reproduction code is here:

package reactor.ipc.netty.http;

import org.junit.Test;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyState;
import reactor.ipc.netty.config.ClientOptions;

import java.nio.charset.StandardCharsets;

public class HttpErrorTests {
    @Test
    public void test() {
        NettyState server = HttpServer.create(0).newRouter(httpServerRoutes
                -> httpServerRoutes.get("/", (httpServerRequest, httpServerResponse) -> {
            return httpServerResponse.sendString(Mono.error(new IllegalArgumentException()));
        })).block();
        HttpClient client = HttpClient.create(ClientOptions.to("localhost", server.address().getPort()));
        client.get("/")
                .flatMap(httpClientResponse -> {
                    return httpClientResponse.receive()
                            .asString(StandardCharsets.UTF_8)
                            .collectList();
                })
                .doOnNext(it -> System.out.println("GOT!!!!!!" + it))
                .doOnError(Throwable::printStackTrace)
                .blockLast();
        server.dispose();
    }
}

Output log is:

11:13:52.025 [main] DEBUG r.ipc.netty.util.NettyNativeDetector - Default Netty Epoll support : false
11:13:52.028 [main] DEBUG reactor.ipc.netty.tcp.TcpServer - Server is not managed (Not directly introspectable)
11:13:52.488 [reactor-tcp-server-select-1] DEBUG reactor.ipc.netty.tcp.TcpServer - BIND OK /0:0:0:0:0:0:0:0:65069
11:13:52.800 [reactor-tcp-client-io-1] DEBUG reactor.ipc.netty.http.HttpClient - [id: 0x26b03022] REGISTERED
11:13:52.801 [reactor-tcp-client-io-1] DEBUG reactor.ipc.netty.http.HttpClient - [id: 0x26b03022] CONNECT: localhost/127.0.0.1:65069
11:13:52.807 [reactor-tcp-server-io-2] DEBUG reactor.ipc.netty.tcp.TcpServer - CONNECT [id: 0xc8e06f1d, L:/127.0.0.1:65069 - R:/127.0.0.1:65070]
11:13:52.809 [reactor-tcp-client-io-1] DEBUG reactor.ipc.netty.tcp.TcpClient - CONNECT OK localhost/127.0.0.1:65069
11:13:52.809 [reactor-tcp-client-io-1] DEBUG reactor.ipc.netty.http.HttpClient - [id: 0x26b03022, L:/127.0.0.1:65070 - R:localhost/127.0.0.1:65069] ACTIVE
11:13:52.817 [reactor-tcp-server-io-2] DEBUG reactor.ipc.netty.http.HttpServer - [id: 0xc8e06f1d, L:/127.0.0.1:65069 - R:/127.0.0.1:65070] REGISTERED
11:13:52.817 [reactor-tcp-server-io-2] DEBUG reactor.ipc.netty.http.HttpServer - [id: 0xc8e06f1d, L:/127.0.0.1:65069 - R:/127.0.0.1:65070] ACTIVE
11:13:52.838 [reactor-tcp-client-io-1] DEBUG reactor.ipc.netty.http.HttpClient - [id: 0x26b03022, L:/127.0.0.1:65070 - R:localhost/127.0.0.1:65069] WRITE: 48B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 20 48 54 54 50 2f 31 2e 31 0d 0a |GET / HTTP/1.1..|
|00000010| 68 6f 73 74 3a 20 6c 6f 63 61 6c 68 6f 73 74 0d |host: localhost.|
|00000020| 0a 61 63 63 65 70 74 3a 20 2a 2f 2a 0d 0a 0d 0a |.accept: */*....|
+--------+-------------------------------------------------+----------------+
11:13:52.839 [reactor-tcp-client-io-1] DEBUG reactor.ipc.netty.http.HttpClient - [id: 0x26b03022, L:/127.0.0.1:65070 - R:localhost/127.0.0.1:65069] FLUSH
11:13:52.843 [reactor-tcp-server-io-2] DEBUG reactor.ipc.netty.http.HttpServer - [id: 0xc8e06f1d, L:/127.0.0.1:65069 - R:/127.0.0.1:65070] RECEIVED: 48B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 20 48 54 54 50 2f 31 2e 31 0d 0a |GET / HTTP/1.1..|
|00000010| 68 6f 73 74 3a 20 6c 6f 63 61 6c 68 6f 73 74 0d |host: localhost.|
|00000020| 0a 61 63 63 65 70 74 3a 20 2a 2f 2a 0d 0a 0d 0a |.accept: */*....|
+--------+-------------------------------------------------+----------------+
11:13:52.861 [reactor-tcp-server-io-2] DEBUG reactor.ipc.netty.http.HttpServer - [id: 0xc8e06f1d, L:/127.0.0.1:65069 - R:/127.0.0.1:65070] WRITE: 66B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 74 72 61 6e 73 66 65 72 2d 65 6e 63 6f 64 69 |.transfer-encodi|
|00000020| 6e 67 3a 20 63 68 75 6e 6b 65 64 0d 0a 63 6f 6e |ng: chunked..con|
|00000030| 6e 65 63 74 69 6f 6e 3a 20 63 6c 6f 73 65 0d 0a |nection: close..|
|00000040| 0d 0a                                           |..              |
+--------+-------------------------------------------------+----------------+
11:13:52.861 [reactor-tcp-server-io-2] DEBUG reactor.ipc.netty.http.HttpServer - [id: 0xc8e06f1d, L:/127.0.0.1:65069 - R:/127.0.0.1:65070] FLUSH
11:13:52.867 [reactor-tcp-client-io-1] DEBUG reactor.ipc.netty.http.HttpClient - [id: 0x26b03022, L:/127.0.0.1:65070 - R:localhost/127.0.0.1:65069] RECEIVED: 66B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 74 72 61 6e 73 66 65 72 2d 65 6e 63 6f 64 69 |.transfer-encodi|
|00000020| 6e 67 3a 20 63 68 75 6e 6b 65 64 0d 0a 63 6f 6e |ng: chunked..con|
|00000030| 6e 65 63 74 69 6f 6e 3a 20 63 6c 6f 73 65 0d 0a |nection: close..|
|00000040| 0d 0a                                           |..              |
+--------+-------------------------------------------------+----------------+
11:13:52.869 [reactor-tcp-client-io-1] DEBUG r.i.netty.http.HttpClientOperations - Received response (auto-read:false) : io.netty.handler.codec.http.DefaultHttpHeaders@bfebfb1b
Got response
200 OK
11:13:52.881 [reactor-tcp-server-io-2] ERROR r.ipc.netty.channel.NettyOperations - Write error
java.lang.IllegalArgumentException: null
	at reactor.ipc.netty.http.HttpErrorTests.lambda$null$0(HttpErrorTests.java:16) ~[test/:na]
	at reactor.ipc.netty.http.DefaultHttpServerRoutes$HttpRouteHandler.apply(DefaultHttpServerRoutes.java:80) ~[main/:na]
	at reactor.ipc.netty.http.DefaultHttpServerRoutes$HttpRouteHandler.apply(DefaultHttpServerRoutes.java:60) ~[main/:na]
	at reactor.ipc.netty.http.HttpServer.routeRequestResponse(HttpServer.java:214) ~[main/:na]
	at reactor.ipc.netty.http.HttpServer.lambda$newRouter$0(HttpServer.java:139) ~[main/:na]
	at reactor.ipc.netty.http.HttpServerOperations.onNext(HttpServerOperations.java:199) ~[main/:na]
	at reactor.ipc.netty.channel.NettyChannelHandler.channelRead(NettyChannelHandler.java:69) [main/:na]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:233) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:571) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:512) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:426) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:398) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]
11:13:52.883 [reactor-tcp-server-io-2] DEBUG r.ipc.netty.channel.NettyOperations - Pausing read due to lack of request
11:13:52.883 [reactor-tcp-server-io-2] DEBUG reactor.ipc.netty.http.HttpServer - [id: 0xc8e06f1d, L:/127.0.0.1:65069 - R:/127.0.0.1:65070] WRITE: 0B
11:13:52.884 [reactor-tcp-server-io-2] DEBUG reactor.ipc.netty.http.HttpServer - [id: 0xc8e06f1d, L:/127.0.0.1:65069 - R:/127.0.0.1:65070] FLUSH
11:13:52.885 [reactor-tcp-server-io-2] ERROR r.i.netty.http.HttpServerOperations - Error processing connection. Closing the channel.
java.lang.IllegalArgumentException: null
	at reactor.ipc.netty.http.HttpErrorTests.lambda$null$0(HttpErrorTests.java:16) ~[test/:na]
	at reactor.ipc.netty.http.DefaultHttpServerRoutes$HttpRouteHandler.apply(DefaultHttpServerRoutes.java:80) ~[main/:na]
	at reactor.ipc.netty.http.DefaultHttpServerRoutes$HttpRouteHandler.apply(DefaultHttpServerRoutes.java:60) ~[main/:na]
	at reactor.ipc.netty.http.HttpServer.routeRequestResponse(HttpServer.java:214) ~[main/:na]
	at reactor.ipc.netty.http.HttpServer.lambda$newRouter$0(HttpServer.java:139) ~[main/:na]
	at reactor.ipc.netty.http.HttpServerOperations.onNext(HttpServerOperations.java:199) ~[main/:na]
	at reactor.ipc.netty.channel.NettyChannelHandler.channelRead(NettyChannelHandler.java:69) ~[main/:na]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:233) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:571) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:512) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:426) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:398) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_77]
11:13:52.886 [reactor-tcp-client-io-1] DEBUG r.ipc.netty.channel.NettyOperations - Pausing read due to lack of request
11:13:52.886 [reactor-tcp-client-io-1] DEBUG r.ipc.netty.channel.NettyOperations - Subscribing inbound receiver [pending: 0, done: false]

The log says Error processing connection. Closing the channel. but it doesn't close the socket.

As a result, client request stucks forever.

$ curl -vvv http://localhost:65069/
*   Trying ::1...
* Connected to localhost (::1) port 65069 (#0)
> GET / HTTP/1.1
> Host: localhost:65069
> User-Agent: curl/7.49.1
> Accept: */*
> 
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< connection: close
<

https://gyazo.com/d6b920d1126f423bb97d2c2a273aa290

receiveWebSocket results "Failed to upgrade to websocket"

HttpClientResponse#receiveWebSocket delegates to withWebsocketSupport which checks if headers have been sent and if so returns an ISE. Of course at this stage when we already have a response the headers have been sent, so as far as I can see there is no way this could possibly work.

Let the `NettyInbound.onReadIdle` and `NettyOutbound.onWriteIdle` replace existing idle handlers

java.lang.IllegalArgumentException: Duplicate handler name: onChannelReadIdle
        at io.netty.channel.DefaultChannelPipeline.checkDuplicateName(DefaultChannelPipeline.java:1062)
        at io.netty.channel.DefaultChannelPipeline.filterName(DefaultChannelPipeline.java:293)
        at io.netty.channel.DefaultChannelPipeline.addBefore(DefaultChannelPipeline.java:250)
        at io.netty.channel.DefaultChannelPipeline.addBefore(DefaultChannelPipeline.java:240)
        at reactor.ipc.netty.channel.ChannelOperations.addHandler(ChannelOperations.java:141)
        at reactor.ipc.netty.channel.ChannelOperations.addHandler(ChannelOperations.java:64)
        at reactor.ipc.netty.NettyInbound.onReadIdle(NettyInbound.java:66)
        at org.springframework.messaging.tcp.reactor.ReactorNettyTcpConnection.onReadInactivity(ReactorNettyTcpConnection.java:67)
        at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$StompConnectionHandler.initHeartbeats(StompBrokerRelayMessageHandler.java:706)
...

For context, on a TCP connection to a STOMP broker we use heartbeats to keep the connection from hanging. However heartbeats have to be negotiated first through an exchange of CONNECT and CONNECTED frames and so there is a possibility to hang before the heartbeats have started leading to a resource leak. This is why we first register one task first check if we haven't received CONNECTED and then a heartbeat task after.

This should not cause onChannelReadIdle to fail. One option would be to replace the OnChannelReadIdle handler first (my current workaround) or the API could provide a clear choice whether the new task should be added or used as a replacement.

Chained Mono.fromRunnable is not executed (+Workaround)

If you create a Mono with Mono.fromRunnable(...) and chain it using .then(...) it will be executed in 3.0.4.RELEASE but not in 3.0.5.RELEASE.
Workaround
Attach a .doOnSuccess(_void_ -> {}); to your runnable-Mono and it will be executed.

The code below produces the following output:
3.0.4.RELEASE
1: Mono1
2: Mono2
3: Mono1
4: Mono2

3.0.5.RELEASE
1: Mono1
2: Mono2
3: Mono1
4:

@Test
public void test_chained_runnable_monos() {

	Mono<Void> mono1 = Mono.fromRunnable(() -> {
		System.out.println("Mono1");
	}).doOnSuccess(_void_ -> {});
	Mono<Void> mono2 = Mono.fromRunnable(() -> {
		System.out.println("Mono2");
	});
	Mono<Void> monoEmpty = Mono.empty();
	
	System.out.print("1: ");
	mono1.subscribe();
	System.out.print("2: ");
	mono2.subscribe();
	System.out.print("3: ");
	monoEmpty.then(mono1).subscribe();
	System.out.print("4: ");
	monoEmpty.then(mono2).subscribe();
	
}

HTTP Server keep alive - Netty Server unable to process TCP incoming requests

We noted that the Reactor HTTP Server Application we are developing running on Netty at IP:PORT are failing to process incoming requests.

We see that the TCP client repeatedly attempts to send an HTTP GET request to the application; however, it does not start processing all the messages sent.

The first request results in a successful interaction however the next one fails and logs do not show any errors but the application categorically rejected to process the next incoming request and the cycle begins again.

After an in depth investigation we noted when the failure happens the client is re-route to a different peer at IP:PORT and then this will result in a next successful interaction. Clients will not receive errors, however they experience more than 60 seconds delay for the HTTP synchronus invocation and the connection is timed out or aborted.

If the failure is not forced by a next incoming request, the server takes 60 seconds from the previous request to successfully process. Any request received in this time period will ultimately fails.

Path parameter parsing should allow dots in segments and ignore query parameters

There seems to be a problem with pattern matching for paths in HTTP requests in reactor-netty 0.6.0.RELEASE. Please consider the four examples below. They outline two different problems that are all related to (at least for me) unexpected behaviour of the pattern matcher. In summary they are

  1. Dots in parameterized segments break pattern matching (/{x} won't match /a.b)
  2. Query parameters break pattern matching, event without path params (/search won't match /search?q=reactor)

The last example with /test/{order} illustrates nicely the brokenness of the implementation here as soon as query parameters come into play (given a path /test/foo?q=bar, the value for order will be foo?q=bar).

package reactor.ipc.netty.http.server;
import org.junit.Test;
import reactor.ipc.netty.http.server.HttpPredicate.UriPathTemplate;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

public class UriPathTemplateTest {

    @Test
    public void patternShouldMatchPathWithOnlyLetters() {
        UriPathTemplate uriPathTemplate = new UriPathTemplate("/test/{order}");
        // works as expected
        assertThat(uriPathTemplate.match("/test/1").get("order"), is("1"));
    }

    @Test
    public void patternShouldMatchPathWithDots() {
        UriPathTemplate uriPathTemplate = new UriPathTemplate("/test/{order}");
        // does not match, the dot in the segment parameter breaks matching
        // expected: a map containing {"order": "2.0"}, found: empty map
        assertThat(uriPathTemplate.match("/test/2.0").get("order"), is("2.0"));
    }

    @Test
    public void staticPatternShouldMatchPathWithQueryParams() {
        UriPathTemplate uriPathTemplate = new UriPathTemplate("/test/3");
        // does not match, the query parameter breaks matching
        // expected: true, found: false
        assertThat(uriPathTemplate.matches("/test/3?q=reactor"), is(true));
    }

    @Test
    public void parameterizedPatternShouldMatchPathWithQueryParams() {
        UriPathTemplate uriPathTemplate = new UriPathTemplate("/test/{order}");
        // does not match, the query parameter breaks matching
        // expected: a map containing {"order": "3"}, found: a map containing {"order": "3?q=reactor"}
        assertThat(uriPathTemplate.match("/test/3?q=reactor").get("order"), is("3"));
    }

}

No way to wait for shutdown of TcpClient resources

Following the model of TcpResources I'm trying to create my own resources:

this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
this.loopResources = LoopResources.create("reactor-netty-tcp-client");
this.poolResources = PoolResources.fixed("reactor-netty-tcp-pool");

TcpClient.create(opts -> opts
		.channelGroup(this.channelGroup)
		.loopResources(this.loopResources)
		.poolResources(this.poolResources)
		.preferNative(false));

So I can then shut them down:

ChannelGroupFuture future = this.channelGroup.close();
Mono<Void> completion = FutureMono.from(future)
		.doAfterTerminate((x, e) -> {
			this.loopResources.dispose();
			this.poolResources.dispose();
		});

Note however that unlike ChannelGroup there is no option to wait for the completion of the shutdown of resources whose dispose methods return void and don't wait.

No way to set sub-protocols on WebSocket client

HttpClientOperations delegates to withWebSocketSupport which accepts sub-protocols but that's not exposed. Publicly HttpClientRequest only has sendWebSocket().

HttpClientResponse has receiveWebSocket which does accept sub-protocols but that makes no sense on a response since it is a request header.

Default 404 handler doesn't send content-length or transfer-encoding header.

Default implementation of HttpClient's 404 handler is following.

						request.delegate()
						       .writeAndFlush(new DefaultHttpResponse(HttpVersion.HTTP_1_1,
								       HttpResponseStatus.NOT_FOUND));

This response doesn't include Content-Length header and Transfer-Encoding header. As a result, HTTP client can't determine end of the response. HTTP/1.1 server must return Content-Length or Transfer-Encoding(iirc).

Curl's verbose output is here:

$ curl -vvv http://localhost:8011/jkjk
*   Trying ::1...
* connect to ::1 port 8011 failed: Connection refused
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8011 (#0)
> GET /jkjk HTTP/1.1
> Host: localhost:8011
> User-Agent: curl/7.49.1
> Accept: */*
> 
< HTTP/1.1 404 Not Found
* no chunk, no close, no size. Assume close to signal end
< 

Support client SNI

Currently, reactor uses the same code for creating a SslHandler regardless of the Netty channel being a client or server connection. (https://github.com/reactor/reactor-netty/blob/master/src/main/java/reactor/ipc/netty/options/NettyOptions.java#L219)

Clients should use the newHandler(ByteBufAllocator alloc, String peerHost, int peerPort) variant so that the server name gets sent in the SSL handshake so that the server knows which certificate to send to the client. See netty/netty#3801.

Connections should be acquired on send, not on subscribe

Currently, connections are acquired from the pool (or opened in non-pooling scenarios) on subscription. This means that the connection is either unusable by other flows or consuming resources during a period of time that it is not in use. I'd like to see connections acquired on send instead of subscribe.

It may seem like this distinction is quite small in practical terms, but the Cloud Foundry Java Client has a number of extended flows, nested flows, etc. where a connection is acquired on subscription, but continuing up the flow, another connection is also acquired for a different request. This means that the first (lower) connection sits idle, but unusable until the first connection has returned. Worse even than that though, is in small pool sizes (say one connection), there is a deadlock as the second (higher) connection attempts to acquire a connection currently reserved by the first (lower) and it will never be able to do so.

I believe that it's a reasonable (if slow) requirement that any flow written should be able to use a one-connection pool successfully.

No way to shut down TcpClient resources

Currently TcpClient is created as follows:

ClientOptions clientOptions = ClientOptions.create();
clientOptions.loopResources(TcpResources.get()).poolResources(TcpResources.get());
options.accept(clientOptions);
return new TcpClient(clientOptions.duplicate());

Even if I provide my own resource through ClientOptions the global resources are created anyway and after that there is no way to shut them down since dispose() is empty and _dispose() is protected.

Please support the use case to create a TcpClient and then shut down all associated resources. If I provide my own resources it shouldn't trigger creation of global ones. There should also be some way to shut down global resources through an explicit method designed for that. Consider for example that when running in a server like Tomcat on shutdown you get ugly messages about active threads.

It would also be very useful for ClientOptions to expose a way to expose what an option is currently set to so that I can also accept external options and add my own only if others are not provided -- same thing that TcpClient#create would have to do.

ByteBuf is too unqualified for WebSocket messages

For WebSocket the NettyInbound.receive() which provides ByteBuf's is too unqualified to be useful because there is no way of knowing the boundaries of a complete WebSocket message nor whether it is binary, text, or pong. It's possible to use NettyInbound.receiveObject().cast(WebSocketFrame.class) but it would be nicer if a handler is given something a little less generic and more tailored to a WebSocket interaction -- e.g. a receive method that provides WebSocketFrame's.

A similar improvement on the outbound side would remove the need to specify text vs binary when calling one of the HttpOutbound#upgradeToWebSocketXxx methods since the input would already be WebSocketFrame's.

ClosedChannelException on NettyOutbound.sendFile(โ€ฆ) with epoll

Using epoll channels with NettyOutbound.sendFile(โ€ฆ) sending files from a ZIP File-System causes ClosedChannelException.

Versions:

  • Reactor Core 3.0.5.RELEASE
  • Reactor Netty 0.6.2.RELEASE

Trace:

java.nio.channels.ClosedChannelException
	at io.netty.channel.epoll.Native.sendfile(...)(Unknown Source)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoError] :
	reactor.core.publisher.Mono.error(Mono.java:274)
	reactor.ipc.netty.FutureMono.from(FutureMono.java:48)
	reactor.ipc.netty.NettyOutbound.lambda$sendFile$1(NettyOutbound.java:184)
	reactor.core.publisher.MonoUsing.subscribe(MonoUsing.java:87)
	reactor.core.publisher.MonoSource.subscribe(MonoSource.java:65)
	reactor.core.publisher.MonoThenIgnore$MonoThenIgnoreMain.drain(MonoThenIgnore.java:151)
	reactor.core.publisher.MonoThenIgnore.subscribe(MonoThenIgnore.java:54)
	reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:78)
	reactor.ipc.netty.channel.ChannelOperations.applyHandler(ChannelOperations.java:378)
	reactor.ipc.netty.http.server.HttpServerOperations.onHandlerStart(HttpServerOperations.java:354)
	io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:304)
	io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
Error has been observed by the following operator(s):
	|_	Mono.error(FutureMono.java:48)
	|_	Mono.using(NettyOutbound.java:183)
	|_	Mono.then(ReactorNetty.java:246)
	|_	Mono.thenEmpty(ReactorNetty.java:246)

Code to reproduce: https://gist.github.com/mp911de/e0cf842ca27fb56a4e478d705dcfe34a
(Note: File must be served from the Jar-File to reproduce the error. Serving from the classes directory will not reveal the issue)

100-Continue bug ?

Following https://gitter.im/reactor/reactor?at=58c928a5872fc8ce62083bae

This doesn't work when a 100-Continue is issued by the client:

return HttpServer.create(port).newHandler((request, response) ->  request.receive()
	.doOnNext(ByteBuf::retain)
	.compose(bb -> response.send(bb));
);

Server responds with a 200 OK instead of a 100-Continue.

> POST / HTTP/1.1
> User-Agent: curl/7.26.0
> Host: localhost:9100
> Accept: */*
> Content-Length: 1828
> Content-Type: application/x-www-form-urlencoded
> Expect: 100-continue
>
* additional stuff not fine transfer.c:1037: 0 0
* HTTP 1.1 or later with persistent connection, pipelining supported
< HTTP/1.1 200 OK
< transfer-encoding: chunked
<
* additional stuff not fine transfer.c:1037: 0 0
* Done waiting for 100-continue
* additional stuff not fine transfer.c:1037: 0 0
* additional stuff not fine transfer.c:1037: 0 0
* additional stuff not fine transfer.c:1037: 0 0
* additional stuff not fine transfer.c:1037: 0 0
* additional stuff not fine transfer.c:1037: 0 0
* additional stuff not fine transfer.c:1037: 0 0
* Operation timed out after 5000 milliseconds with 0 bytes received
* Closing connection #0
curl: (28) Operation timed out after 5000 milliseconds with 0 bytes received

If I do the following it works though

return HttpServer.create(port).newHandler((request, response) ->  request.receive()
	.doOnNext(ByteBuf::retain)
	.flatMap(bb -> response.send(Mono.just(bb)));
);

I'm using version 0.6.2.RELEASE
Also please note that if the client doesn't expect a 100-Continue (for instance, sent content is small enough to fit in one chunk) both solution work.

NettyState dispose deadlock

dispose() call never return as shown with simple test:

	@Test
	public void testHang() throws Exception {
		NettyState httpServer = HttpServer
				.create(ServerOptions.on("0.0.0.0", 0)
						.eventLoopGroup(new NioEventLoopGroup(10)))
				.newRouter(r -> r.get("/data", (request, response) -> {
					return response.send(Mono.empty());
				})).block();
		httpServer.dispose();
	}

From stack dump:

"nioEventLoopGroup-2-1" #23 prio=10 os_prio=0 tid=0x00007fb8f0a36800 nid=0x7441 in Object.wait() [0x00007fb8cf2e1000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	- waiting on <0x0000000672ef4af8> (a io.netty.util.concurrent.DefaultPromise)
	at java.lang.Object.wait(Object.java:502)
	at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:239)
	- locked <0x0000000672ef4af8> (a io.netty.util.concurrent.DefaultPromise)
	at io.netty.util.concurrent.DefaultPromise.sync(DefaultPromise.java:340)
	at io.netty.util.concurrent.DefaultPromise.sync(DefaultPromise.java:34)
	at reactor.ipc.netty.tcp.TcpServer.lambda$channel$1(TcpServer.java:313)
	at reactor.ipc.netty.tcp.TcpServer$$Lambda$11/84739718.dispose(Unknown Source)
	at reactor.ipc.netty.channel.ChannelConnectHandler.lambda$operationComplete$0(ChannelConnectHandler.java:54)
	at reactor.ipc.netty.channel.ChannelConnectHandler$$Lambda$14/870848850.operationComplete(Unknown Source)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:514)
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:488)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:427)
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:111)
	at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:82)
	at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1062)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:686)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:664)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:607)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.close(DefaultChannelPipeline.java:1276)
	at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:634)
	at io.netty.channel.AbstractChannelHandlerContext.access$1100(AbstractChannelHandlerContext.java:38)
	at io.netty.channel.AbstractChannelHandlerContext$13.run(AbstractChannelHandlerContext.java:623)
	at io.netty.util.concurrent.SingleThreadEventExecutor.safeExecute(SingleThreadEventExecutor.java:451)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:401)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	at java.lang.Thread.run(Thread.java:745)


"main" #1 prio=5 os_prio=0 tid=0x00007fb8f000e800 nid=0x741f in Object.wait() [0x00007fb8f8d22000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	- waiting on <0x0000000677aae4e0> (a io.netty.channel.DefaultChannelPromise)
	at java.lang.Object.wait(Object.java:502)
	at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:239)
	- locked <0x0000000677aae4e0> (a io.netty.channel.DefaultChannelPromise)
	at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:129)
	at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:28)
	at io.netty.util.concurrent.DefaultPromise.sync(DefaultPromise.java:340)
	at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:117)
	at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:28)
	at reactor.ipc.netty.channel.ChannelConnectHandler.dispose(ChannelConnectHandler.java:77)
	at reactor.ipc.netty.channel.ChannelState.dispose(ChannelState.java:69)
	at org.springframework.cloud.stream.app.gpfdist.sink.FooTests.testHang(FooTests.java:40)

HttpClient blocked after a failed request, error "unexpected message type: DefaultHttpRequest"

Version:
spring-boot-starter-web-reactive:0.1.0.BUILD-SNAPSHOT => reactor-netty-0.6.0.BUILD-20161219.135933-150

We used spring-web-reactive in our application since a while and it worked fine. Today, we got exception in our test client and the process is blocked somehow due to this error:

2016-12-19 18:49:24,359 [reactor-http-nio-1] ERROR reactor.ipc.netty.channel.ChannelOperations - [HttpClient] Error processing connection. Requesting close the channel
io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: unexpected message type: DefaultHttpRequest
	at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:106)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:749)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:812)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:825)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:805)
	at reactor.ipc.netty.channel.ChannelOperationsHandler.doWrite(ChannelOperationsHandler.java:267)
	at reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:368)
	at reactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:150)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:787)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:813)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:825)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:805)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:842)
	at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1032)
	at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:296)
	at reactor.ipc.netty.http.HttpOperations.lambda$sendHeaders$0(HttpOperations.java:98)
	at reactor.ipc.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:114)
	at reactor.core.publisher.FluxConcatIterable$ConcatIterableSubscriber.onComplete(FluxConcatIterable.java:152)
	at reactor.core.publisher.FluxConcatIterable.subscribe(FluxConcatIterable.java:69)
	at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:41)
	at reactor.ipc.netty.channel.ChannelOperations.applyHandler(ChannelOperations.java:421)
	at reactor.ipc.netty.http.client.HttpClientOperations.onChannelActive(HttpClientOperations.java:452)
	at reactor.ipc.netty.channel.PooledClientContextHandler.connectOrAcquire(PooledClientContextHandler.java:171)
	at reactor.ipc.netty.channel.PooledClientContextHandler.lambda$operationComplete$0(PooledClientContextHandler.java:118)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:454)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: unexpected message type: DefaultHttpRequest
	at io.netty.handler.codec.http.HttpObjectEncoder.encode(HttpObjectEncoder.java:69)
	at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:88)

After this error, the process is hanging forever. To reproduce this, one could setup a spring boot project using the version mentioned above and with the follow classes:

Application.java

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        new SpringApplication().run(args);
    }
}
AnyController.java

@Controller
@RequestMapping("/any")
public class AnyController {
    @RequestMapping(value = "/thing", method = RequestMethod.HEAD)
    public Mono<ResponseEntity<Void>> testAny() {
        return Mono.just(ResponseEntity.notFound().build());
    }
}
AnyControllerTest.java

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class AnyControllerTest {
    @LocalServerPort
    int port;
    String url = "http://localhost:{port}/any/thing";
    WebClient webClient = WebClient.create(new ReactorClientHttpConnector());
    @Test
    public void aTest() {
        Mono<ClientResponse> response = webClient.exchange(ClientRequest.HEAD(url, port).build());
        assert response.block().statusCode() == HttpStatus.NOT_FOUND;
    }
    @Test
    public void anotherTest() {
        Mono<ClientResponse> response = webClient.exchange(ClientRequest.HEAD(url, port).build());
        assert response.block().statusCode() == HttpStatus.NOT_FOUND;
    }
}

When one runs the test, the first test case will succeed without any problem, however the second one will be simply blocked and hangs forever....

HTTP service and TCP client bridge

One of my Spring WebFlux controller needs to access a remote TCP/IP server (legacy). How could I bridge the HTTP response with the TcpClient provided by reactor-netty?

Or maybe, would it be easier to not use Spring Webflux at all? If yes, is there some code I could use as example?

Thanks for your help,
--nick

onComplete of All Data Received Signal Available Without Receive

Given the code

AtomicLong startTimeHolder = new AtomicLong();

httpClient.get(uri)
            .log("stream.beforeTiming")
            .doOnSubscribe(s -> startTimeHolder.set(System.currentTimeMillis()))
            .doFinally(signal -> System.out.println(System.currentTimeMillis() - startTimeHolder.get()))
            .log("stream.afterTiming")

notification of all "completion-style" events (doOnSuccess(), doOnTerminate(), doAfterTerminate(), and doFinally()) happen before the last of the data is passed to an subscriber to .receive(). You can see an example of this here where onComplete() is signaled followed by quite a lot more data.

Now, I'm not sure that this is strictly wrong. The Mono<HttpClient> is clearly done sending items, so it makes a certain kind of sense. However, this makes it exceeding difficult for any party other than the final subscriber of the actual data to get this information.

There should be a way for the caller of the HttpClient operations (.get() et al) to be signaled when all data has been received.

IllegalReferenceCountException with reactor-netty 0.6.1

I use reactor's TcpClient to connect to a Server.
While exchanging messages I get very often a IllegalReferenceCountException (see below) from netty.
I didn't have this problem with [core: 3.0.1.RELEASE, netty: 0.5.2.RELEASE].
It started after upgrading to core: [3.0.5.RELEASE, netty: 0.6.1.RELEASE]

Is it a bug or am I doing something wrong?
Maybe someone can give me a hint from the attached logs...

IllegalReferenceCountException.txt

Provide method to close WebSocket connection outside of a Flux Subscriber's Cancellation

A Subscriber to the Flux of input WebSocket messages can use the Cancellation it receives to close the WebSocket connection.

It would be very useful to expose a close() method that can be called by something other than the Flux Subscriber. For example a 3rd party WebSocketSession abstraction built around the Reactor Netty WebSocket support that itself does not subscribe to the input Flux.

Semantically this should be possible assuming there is only one Subscriber for a given WebSocket connection? Or potentially some WebSocket handler's may not even subscribe to the inbound stream effectively ignoring it and only broadcasting messages -- even in those cases it should be possible to close the connection somehow.

There should also be a way to specify a close status, e.g. close(int status).

Document encoding/decoding API and Spring 5 codecs

<dependency>
        <groupId>io.projectreactor.ipc</groupId>
	<artifactId>reactor-netty</artifactId>
	<version>0.5.2.RELEASE</version>
</dependency>
WARNING: An exception 'java.lang.NoClassDefFoundError: reactor/ipc/codec/Codec' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:
java.lang.NoClassDefFoundError: reactor/ipc/codec/Codec
	at reactive.net.Netty.lambda$0(Netty.java:26)
	at reactor.ipc.netty.common.NettyChannelHandler.channelActive(NettyChannelHandler.java:104)

Provide a simpler embedded server API

Current Reactor Netty API is not very straightforward when it comes to just run it from a main, see for example this code sample where AtomicReference<NettyContext> is needed.

Could we have something like startAndAwait() method and something to add handlers in a more friendly fashion?

Add Mechanism to obtain port from HttpServer

It is very convenient that HttpServer.create allows passing in 0 for the port to dynamically allocate an available port. However, I do not see a way to obtain the dynamically allocated port which makes using it for writing tests difficult.

It would be nice if there were a way to access the port on HttpServer.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.