Giter Site home page Giter Site logo

rsocket-java's Introduction

RSocket

Join the chat at https://gitter.im/RSocket/RSocket-Java

RSocket is a binary protocol for use on byte stream transports such as TCP, WebSockets, and Aeron.

It enables the following symmetric interaction models via async message passing over a single connection:

  • request/response (stream of 1)
  • request/stream (finite stream of many)
  • fire-and-forget (no response)
  • event subscription (infinite stream of many)

Learn more at http://rsocket.io

Build and Binaries

Build Status

⚠️ The master branch is now dedicated to development of the 1.2.x line.

Releases and milestones are available via Maven Central.

Example:

repositories {
    mavenCentral()
    maven { url 'https://repo.spring.io/milestone' }  // Reactor milestones (if needed)
}
dependencies {
    implementation 'io.rsocket:rsocket-core:1.2.0-SNAPSHOT'
    implementation 'io.rsocket:rsocket-transport-netty:1.2.0-SNAPSHOT'
}

Snapshots are available via oss.jfrog.org (OJO).

Example:

repositories {
    maven { url 'https://maven.pkg.github.com/rsocket/rsocket-java' }
    maven { url 'https://repo.spring.io/snapshot' }  // Reactor snapshots (if needed)
}
dependencies {
    implementation 'io.rsocket:rsocket-core:1.2.0-SNAPSHOT'
    implementation 'io.rsocket:rsocket-transport-netty:1.2.0-SNAPSHOT'
}

Development

Install the google-java-format in Intellij, from Plugins preferences. Enable under Preferences -> Other Settings -> google-java-format Settings

Format automatically with

$./gradlew goJF

Debugging

Frames can be printed out to help debugging. Set the logger io.rsocket.FrameLogger to debug to print the frames.

Requirements

Trivial Client

package io.rsocket.transport.netty;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import io.rsocket.util.DefaultPayload;
import reactor.core.publisher.Flux;

import java.net.URI;

public class ExampleClient {
    public static void main(String[] args) {
        WebsocketClientTransport ws = WebsocketClientTransport.create(URI.create("ws://rsocket-demo.herokuapp.com/ws"));
        RSocket clientRSocket = RSocketConnector.connectWith(ws).block();

        try {
            Flux<Payload> s = clientRSocket.requestStream(DefaultPayload.create("peace"));

            s.take(10).doOnNext(p -> System.out.println(p.getDataUtf8())).blockLast();
        } finally {
            clientRSocket.dispose();
        }
    }
}

Zero Copy

By default to make RSocket easier to use it copies the incoming Payload. Copying the payload comes at cost to performance and latency. If you want to use zero copy you must disable this. To disable copying you must include a payloadDecoder argument in your RSocketFactory. This will let you manage the Payload without copying the data from the underlying transport. You must free the Payload when you are done with them or you will get a memory leak. Used correctly this will reduce latency and increase performance.

Example Server setup

RSocketServer.create(new PingHandler())
        // Enable Zero Copy
        .payloadDecoder(PayloadDecoder.ZERO_COPY)
        .bind(TcpServerTransport.create(7878))
        .block()
        .onClose()
        .block();

Example Client setup

RSocket clientRSocket =
        RSocketConnector.create()
            // Enable Zero Copy
            .payloadDecoder(PayloadDecoder.ZERO_COPY)
            .connect(TcpClientTransport.create(7878))
            .block();

Bugs and Feedback

For bugs, questions and discussions please use the Github Issues.

LICENSE

Copyright 2015-2020 the original author or authors.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

rsocket-java's People

Contributors

aharonha avatar bclozel avatar benjchristensen avatar bsideup avatar junaidkhalid avatar kbahr avatar lehecka avatar lexs avatar linux-china avatar marcingrzejszczak avatar mostroverkhov avatar nebhale avatar niteshkant avatar olegdokuka avatar pmackowski avatar qweek avatar rdegnan avatar reactivesocketadmin avatar robertroeser avatar rstoyanchev avatar serceman avatar seregamorph avatar simonbasle avatar somasun avatar spencergibb avatar stevegury avatar tmontgomery avatar violetagg avatar xiazuojie avatar yschimke avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rsocket-java's Issues

NEXT+COMPLETE RequestResponse Efficiency

Currently request/response requires 2 messages to complete (NEXT + COMPLETE): ;https://github.com/ReactiveSocket/reactivesocket-java/blob/e38b757ca02e597712139dfb9d01ba5dc7079c40/src/main/java/io/reactivesocket/ReactiveSocketServerProtocol.java#L105

The protocol design will support multiple headers (types) being sent in a single frame.

  1. We need to at least get NEXT and COMPLETE together in a single frame.
  2. Is there a more efficient way to do this in the Java implementation? A stream of 2 can not be optimized like a scalar, synchronous Observable of 1 so will have a higher cost as currently implemented. Capturing NEXT then COMPLETE and knowing to batch them together is more difficult than a special type, such as NEXT_COMPLETE.

I'm not suggesting the network protocol needs a new type, only that we figure out the optimization in Java.

Tear Down and Resource Cleanup

I have little confidence in everything being properly connected for clean tear down when a connection closes, or when an error occurs. Need to unit test and prove resources are released or cleaned up.

REQUEST_N for Request Stream

Now that we have requestChannel we also need to support REQUEST_N going both directions.

This will require us differentiating which direction the frame is flowing, otherwise we won't know which stream it applies to since requests are bi-directional and both sides start at streamId 1.

Talked with @tmontgomery briefly about this and it seemed like a bit would need to exist on the REQUEST_N frame to state whether it originated from the requestor or responder.

Upgrade Framing to Spec 1.0

The 0.5 branch is currently compliant with spec 0.1. v0.2 of the spec has landed, and resumability is about to land (maybe making it 0.3?). This will make the spec ready to hit v1 within the next month hopefully.

I have engineering time on my team to tackle upgrading the framing of the Java implementation, and ensuring Java/C++ interop over the coming weeks (and need this working in that time frame for something we're doing).

Any issues with us taking on this work? Do you want us to target the 0.5 branch, or something else? At what point do we call it "master" or "1.x"?

Byte Abstraction

I've used String as the data type for input/output just to get functional quickly, rather than choosing a byte abstraction. This now needs to be replaced with whatever we choose to use.

See https://github.com/ReactiveSocket/reactivesocket-java/blob/e38b757ca02e597712139dfb9d01ba5dc7079c40/src/main/java/io/reactivesocket/RequestHandler.java#L24

Rather than byte[] or nio.ByteBuffer, we probably want something that can work across transport implementations, including Netty's ByteBuf.

Lost Perf with Setup Frame

We somehow lost perf when adding the Setup Frame. I would expect some loss per connection, but the perf test is a single connection, so I wouldn't expect to see the cost of a single extra frame. Thus, it seems something about the setup processing has affected it for every message.

We went from ~3.5m to ~3.1m request/response per second.

`RequestHandler` returning `Publisher<T>`

I have not thought about it much but on the face of it, looks like it will be better to move to RxNetty's established model of returning a Publisher<Void>. The drawback of returning Publisher<T> is that we loose feedback from the write to the stream.

Netty/HTTP (Server) Implementation

I'm filing this since the reactivesocket-http-netty repository isn't present. I'm working on an HTTP (1.1 will do; 2 will be more efficient) implementation described below, wanted to get feedback on the approach.

  • Browser JS clients are to be supported in addition to ones built with other stacks.
  • Reactive Socket protocol bytes are sent/received as-is over HTTP.
  • Client-to-server messages are sent using POST. (Batching will be allowed, but streaming is not possible since browser support does not yet exist.)
  • Server to client messages are streamed as-is in the response.
  • In both cases, the encoding is application/octet-stream.

[0.5.x] synthetic empty payload is delivered to clients

I know this relates to rsocket/rsocket#126 but it deserves a issue here for tracking since reactivesocket-java is magicking up an extra payload which is internally inconsistent. e.g. if the synthetic empty payload is going to be created, it should be dropped before it is seen by clients.

        return source.map(Payload::getData)
                .map(ByteBufferUtil::toUtf8String)
                .doOnNext(outputHandler::showOutput)
                .doOnError(e -> outputHandler.error("error from server", e))
                .onExceptionResumeNext(Observable.empty())
                .toCompletable();

Move request start/accept to DuplexConnection?

Today the interaction with the library in a client/server is as follows:

DuplexConnection c = createConnection(); // Create from some factory
ReactiveSocketClientProtocol p = ReactiveSocketClientProtocol.create(c);
p.requestResponse("Hello).subscribe();

I see two issues with this approach:

  • (Just a matter of taste) It looks to me that the request initiation belongs to the connection and the protocol abstraction seems unnecessary. So, it would look something like:
DuplexConnection c = createConnection(); // Create from some factory
c.requestResponse("Hello).subscribe();
  • The current model does not handle broken connections and re-connect. The reason being that the protocol class does not have the knowledge to re-create a connection. So, in order to solve this issue, we have to go either the way of providing the protocol class with a Single<DuplexConnection> or if we move these methods to the connection itself, we just flatmap the Single and do our stuff. The re-connect would just be a retry in either cases.

I have recently added a construct in RxNetty to cache a connection till it closes, which would be useful in this case as we would have multiple subscriptions to the Single<DuplexConnection> which would need to get the same DuplexConnection till it is valid. The relevant class is here.

Remove branch `master`?

Code from current master is in branch 0.2.x and the next version code is in branch 0.5.x.

IMO, the existence of a branch named master is sort of confusing when we are working on two different versions. So, should we delete master branch and just have two 0.2.x and 0.5.x branches?

After deletion of master, 0.2.x will be the default branch till 0.5.x is stable.

Shutdown Implementation

The shutdown methods are not implemented. They are all just printing out the following right now:

**** Requester.shutdown => this should actually do something

DuplexConnection

I have modeled this after our learning from RxNetty, particularly the Publisher<Void> write(Publisher<Message> m) API which supports backpressure as we hook into async IO, such as Netty and Aeron.

  1. @NiteshKant can you please confirm this is all done correctly as per your learnings with RxNetty?
  2. What should we call this thing?

https://github.com/ReactiveSocket/reactivesocket-java/blob/e38b757ca02e597712139dfb9d01ba5dc7079c40/src/main/java/io/reactivesocket/DuplexConnection.java#L24

Connection? SocketConnection? ReactiveSocketConnection? Leave it as DuplexConnection?

Inactivity in the last few months

Netflix has not been contributing to this repository in the last few months which naturally causes people to ask the question of "Is the project still active?"

We believe, it is a good time now to explain the reason for this lull and what can be expected henceforth.

Reason

ReactiveSocket is a relatively new technology which is constantly changing while getting used in production inside Netflix. Nature of these changes are such that touches almost all part of the reactivesocket-java implementation. We realised that this model was slowing us down in our effort to move fast with our experiments in our production systems. So, in an effort to make non-trivial changes to the implementation (few of which were hinted in old and new issues like #67, #129, #130, #131, etc) and still move fast with our experiments, we decided to make these changes in private. The decision was also keeping in mind that not anyone apart from Netflix was using the java implementation and hence in our opinion, it did not impact anyone.

Future

At this point in time, we are close to what we believe are the correct abstractions and we are also happy with the quality of the implementation. This combined with a few offline conversations with folks who are interested in using the java implementation, we have decided to push the private changes to github and from now we will "work in the open".

As part of pushing these changes, we will do the following:

  • Move the current code to a different branch: 0.2.x
  • Create a new branch 0.5.x with the new code.
  • Release a new version of the artifacts from this new code.

[0.5.x] stream and subscription not responding

Run against TcpPongServer

package io.reactivesocket.transport.tcp;

import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.client.ReactiveSocketClient;
import io.reactivesocket.frame.ByteBufferUtil;
import io.reactivesocket.transport.tcp.client.TcpTransportClient;
import io.reactivesocket.util.PayloadImpl;
import io.reactivex.Flowable;
import org.reactivestreams.Publisher;
import org.slf4j.event.Level;

import java.net.InetSocketAddress;

import static io.reactivesocket.client.KeepAliveProvider.never;
import static io.reactivesocket.client.SetupProvider.keepAlive;

public class TcpPongClient {
    public static void main(String[] args) {
        Publisher<? extends ReactiveSocket> c = ReactiveSocketClient.create(TcpTransportClient.create(new InetSocketAddress(7878)).logReactiveSocketFrames("rs", Level.WARN),
                keepAlive(never()).disableLease()).connect();

        ReactiveSocket client = Flowable.fromPublisher(c).blockingFirst();

        Publisher<Payload> x = client.requestStream(new PayloadImpl("Hello"));

        Payload response = Flowable.fromPublisher(x).blockingFirst();

        System.out.println(response);
    }
}

Output

28 Dec 2016 09:40:09,986  INFO [main] (Frame.java:61) - Creating thread pooled named io.reactivesocket.frame.UnpooledFrame
28 Dec 2016 09:40:10,778  WARN [rxnetty-nio-eventloop-1-1] (ReactiveSocketFrameLogger.java:57) - [id: 0x56236355, L:/192.168.0.21:52919 - R:0.0.0.0/0.0.0.0:7878] Writing frame: Frame[0] => Stream ID: 0 Type: SETUP Version: 0 keep-alive interval: 2147483647 max lifetime: 2147483645 metadata mime type: application/x.reactivesocket.meta+cbor data mime type: application/binary Payload: 
28 Dec 2016 09:40:10,952  WARN [rxnetty-nio-eventloop-1-1] (ReactiveSocketFrameLogger.java:57) - [id: 0x56236355, L:/192.168.0.21:52919 - R:0.0.0.0/0.0.0.0:7878] Writing frame: Frame[0] => Stream ID: 3 Type: REQUEST_N RequestN: 2147483647 Payload: 

Error Handling

Error handling, and the many possible edge cases that can occur, needs thorough testing and review. Only superficial testing has been done on this. I'm pretty certain we have bugs in this area.

Close `ReactiveSocket` on `DuplexConnection` close.

Problem

Today, ReactiveSocket does not know when the underlying DuplexConnection is closed. This has two problems:

  • The underlying connection close is only discovered on writing a KeepAlive frame or writing/reading other frames. Although this "works" but is not ideal as that means we have to wait till the KeepAlive duration or next write to know about something that is easily available from the transport layer.
  • There is currently no way of notifying a user when a ReactiveSocket closes. onShutdown() only handles explicit shutdown() calls.

Proposal

We can add two methods to ReactiveSocket, viz.,

    /**
     * Close this {@code ReactiveSocket} upon subscribing to the returned {@code Publisher}
     *
     * <em>This method is idempotent and hence can be called as many times at any point with same outcome.</em>
     *
     * @return A {@code Publisher} that completes when this {@code ReactiveSocket} close is complete.
     */
    Publisher<Void> close();

and

    /**
     * Returns a {@code Publisher} that completes when this {@code ReactiveSocket} is closed. A {@code ReactiveSocket}
     * can be closed by explicitly calling {@link #close()} or when the underlying transport connection is closed.
     *
     * @return A {@code Publisher} that completes when this {@code ReactiveSocket} close is complete.
     */
    Publisher<Void> closeNotifier();

The above can replace the following methods that currently exist:

void onShutdown(Completable c);
void shutdown();
void close() throws Exception;

Same methods will be added to DuplexConnection thus wiring close() together and providing correct notifications to users when the ReactiveSocket is closed.

We can modify onShutdown() and shutdown() methods to the signature of my proposal but I think close() is more inline with other implementations of connections found elsewhere.

Current close() method is inherited from AutoCloseable which signifies synchronous execution as it return void.

Improving `ReactiveSocket` abstraction.

Today we have the following primary interfaces that application writers interact with:

ReactiveSocket

Primary abstraction for the multiplexed, full duplex connection following the protocol.

ConnectionSetupHandler

Used only at the server when a new connection is accepted.

RequestHandler

Used only at the server to handle various interaction models requested by the client.

Problem

There are a few issues/lack of feature with the above abstraction and current implementations:

  • Client has no way of accepting requests from the server (this isn't currently implemented and I can imagine using ConnectionSetupHandler and RequestHandler for this purpose)
  • RequestHandler and ReactiveSocket have very similar methods (primarily around various request interaction models) and hence is confusing as to why they are two different contracts.
  • Client does not have a way to accept leases from the server. (this is also a lack of feature today but not impossible with the current abstractions)

Proposal

We can replace the 3 interfaces with two interfaces

ReactiveSocket

Keep the current interface but use it both as a way to request and respond to requests on a ReactiveSocket. This means for every connection there will be two ReactiveSocket instances on each end.

ReactiveSocketFactory (May be a better name)

On either end (client/server) there will be a factory which may look something like the below.

public interface ReactiveSocketFactory {

    /**
     * {@code ReactiveSocket} is a symmetric protocol where everything that a client can do can be done on the server
     * and vice-versa. So, every transport connection is associated with two {@code ReactiveSocket} instances, one for
     * responding to requests from the peer and another for sending requests to the peer.
     * This method is the converter that accepts a remote {@code ReactiveSocket} which is used to send requests to the
     * peer and returns a local {@code ReactiveSocket} that responds to requests from the peer.
     * 
     * @param requestingSocket Socket used to send request to the peer.
     * 
     * @return {@code ReactiveSocket} that is used to accept and respond to request from the peer.
     */
    ReactiveSocket accept(ReactiveSocket requestingSocket);
}

I am not yet sure, but this may look different at client than at the server due to the nature of replying to or sending a ConnectionSetupFrame. Anyways, this will be a replacement for ConnectionSetupHandler and either generify or provide for client side usecases.

This change will simplify the API a bit and also provide symmetric APIs for both client and server.

Using milestones for releases?

Problem (kind of)

Currently, we have an ad-hoc process of release i.e. we release when we feel like 😄

Solution

Have github milestones to indicate when master is ready for release.
This will give better insight to people as to when can they expect a release.

I have created two milestones for now, hoping everyone agrees :)

NPE on UnicastSubject's onNext() method

I found this bug while writing tests with my TCK.
If we request Stream or Subscription, and then call request(n) and then immediately cancel() without awaiting for the value to come, and we do this multiple times, we will get a NPE in Unicast Subject's onNext() function, because for some reason the subscriber inside is null.
An repro of this bug can be found in my fork here: https://github.com/xytosis/reactivesocket-java/tree/failing_tests/reactivesocket-tck-drivers
The failing client/server is inside a package called fail. Simply open in Intellij, run the server, and then run the client.

io.reactivesocket.rx.* package

I'm not thrilled about this package existing: https://github.com/ReactiveSocket/reactivesocket-java/tree/master/src/main/java/io/reactivesocket/rx

The Completable is useful, but I don't yet know if it will make it into RxJava v2. Where should that type go?

As for the Observable types, it would be better to use RxJava v2 at some point if these exist. Or can we somehow use just the java.util.function.Consumer class for these use cases? I haven't convinced myself yet whether we actually need the onError/onComplete terminal events on this, as the only reason we use it is to expose DuplexConnection.getInput, and that should never invoke onComplete. A connection can fail, but perhaps we should have a different way of signaling that, since an error should affect the entire connection, not just input or output.

ReactiveSocket server handler evaluates RequestHandler code lazily

While trying to implement concurrent tests for the TCK, I found that the lazily evaluation of the RequestHandler (the code gets evaluated during runtime upon the first request to the server) could cause the server to throw a NPE during many concurrent initial requests, as some of the data structures that some requests require are not initialized yet. While it probably is possible to write code in a way that this doesn't happen, it still seems rather dangerous that this can happen at all.

Remove `ReactiveSocket.onRequestReady`?

ReactiveSocket today has two methods:

void onRequestReady(Consumer<Throwable> c);
void onRequestReady(Completable c);

At the least converting them to

Publisher<Void> onRequestReady();

may prove better in reducing API surface area and usability.

I would propose that we remove these methods

In reference to the proposal in #129, this functionality IMO can be achieved in the proposed ReactiveSocketFactory abstraction there.

DuplexConnection.getInput() should return Publisher<Frame>?

Problem

This is the signature of DuplexConnection.getInput()

Observable<Frame> getInput();

where Observable is defined locally within ReactiveSocket. This seems unnecessary as it introduces one more abstraction where Publisher can be used.

Proposal

Make the method return Publisher<Frame>

Result

Consistency of API and also reduced number of contracts we have to maintain.

HeapByteBufferR not Usable

The underlying Agrona code has a hard time with HeapByteBufferR (read-only view) that I tried to use:

java.lang.ClassCastException: java.nio.HeapByteBufferR cannot be cast to sun.nio.ch.DirectBuffer
    at uk.co.real_logic.agrona.concurrent.UnsafeBuffer.putBytes(UnsafeBuffer.java:732)
    at uk.co.real_logic.agrona.concurrent.UnsafeBuffer.putBytes(UnsafeBuffer.java:713)
    at io.reactivesocket.internal.FrameFlyweight.encode(FrameFlyweight.java:141)

Depend on `1.0.0` of ReactiveStreams

I noticed the project is depending on compile 'org.reactivestreams:reactive-streams:1.0.0.final' which "works".

However has some weird corner cases (esp. around OSGi, not that I personally care, but downstream consumers of such libs may care). It is the exact same bytes as 1.0.0, however ordering of -RC vs. .final behaves weirdly in some contexts, 1.0.0 is safe everywhere, so that version should be prefered.

Pick a single ByteBuffer implementation for all public APIs

Currently java.nio.ByteBuffer is predominantly used for public APIs. Have a few issues, e.g. you really need to defensively shallow copy with bb.duplicate(). Also not always clear whether the intention is to use position/limit that caller provided, or treat it as a byte[] holder.

Some public APIs e.g. Frame expose Agrona and its used in metadata package. The netty packages use Netty's buffer, but mostly this is internal.

A single buffer API should be used which would make it easier to be sure that code is correct.

[0.5.x] Questions about StreamIdSupplier

  1. StreamIdSupplier.clientSupplier().nextStreamId() -> 3

Why not start with 1?

  1. isValid seems flipped

  2. Should isValid consider odds/even? clientSupplier and serverSupplier may be vastly out of sync.

    public static void main(String[] args) {
        StreamIdSupplier s = StreamIdSupplier.clientSupplier();

        System.out.println("1 " + s.isValid(1));

        System.out.println("next " + s.nextStreamId());
        System.out.println("next " + s.nextStreamId());

        System.out.println("3 " + s.isValid(3));
        System.out.println("4 " + s.isValid(4));
        System.out.println("5 " + s.isValid(5));
        System.out.println("6 " + s.isValid(6));
        System.out.println("7 " + s.isValid(7));
    }

Output

1 false
next 3
next 5
3 false
4 false
5 false
6 true
7 true

Payload reclaim code assumes synchronous onNext() dispatch which is not guaranteed by RS specification

I have been playing around with the latest code from the 0.5.x branch. I started with the following code from the examples subproject:

Flowable.fromPublisher(c.underlyingSocket.requestResponse(new PayloadImpl("Hello")))
  .doOnNext(toConsumer(x => println(s"### client ### ${ByteBufferUtil.toUtf8String(x.getData)}")))
  .blockingFirst()

which prints out the response as expected. However when trying to connect the Publisher that is returned from the requestResponse to Akka Streams with the following code:

Source.fromPublisher(c.underlyingSocket.requestResponse(new PayloadImpl("Hello")))
  .map { x => println(s"### client ### ${ByteBufferUtil.toUtf8String(x.getData)}"); x }
  .runWith(Sink.head)

I get an exception:

io.netty.util.IllegalReferenceCountException: refCnt: 0
        at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1407)
        at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1353)
        at io.netty.buffer.AbstractByteBuf.getInt(AbstractByteBuf.java:415)
        at io.netty.buffer.PooledSlicedByteBuf.getInt(PooledSlicedByteBuf.java:188)
        at io.reactivesocket.transport.tcp.MutableDirectByteBuf.getInt(MutableDirectByteBuf.java:284)
        at io.reactivesocket.frame.FrameHeaderFlyweight.frameLength(FrameHeaderFlyweight.java:229)
        at io.reactivesocket.frame.FrameHeaderFlyweight.dataLength(FrameHeaderFlyweight.java:251)
        at io.reactivesocket.frame.FrameHeaderFlyweight.sliceFrameData(FrameHeaderFlyweight.java:202)
        at io.reactivesocket.Frame.getData(Frame.java:98)
        at akka.stream.alpakka.reactivesocket.scaladsl.HelloWorld$$anonfun$start$1$$anonfun$2.apply(HelloWorld.scala:113)
        at akka.stream.alpakka.reactivesocket.scaladsl.HelloWorld$$anonfun$start$1$$anonfun$2.apply(HelloWorld.scala:113)
        at akka.stream.impl.fusing.Map$$anon$8.onPush(Ops.scala:43)
        at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:747)
        at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:710)
        at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:616)
        at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471)
        at akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:410)
        at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:603)
        at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:618)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
        at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:529)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I there a race condition between something a Publisher from reactivesocket expects and my call to getData?

P.S. currently I am publishing locally latest code of the 0.5.x branch. Would it be possible to get a milestone release, so it is easier to share and run these experiments?

Snapshots and Releases

I got releases flowing to Maven Central today: http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22io.reactivesocket%22

My changes also affected snapshots, which are now publishing based on the next tagged version. For example: https://oss.jfrog.org/libs-snapshot/io/reactivesocket/reactivesocket-core/0.5.3-SNAPSHOT/

Do you want it working like this (the default behavior of the plugin) where it uploads snapshots with the next version number and the SNAPSHOT suffix?

Or do you want me to put it back to always releasing a 0.5.0 snapshot, regardless of the released version?

Now that we are releasing to maven central, I think it's better to have it showing the snapshot for the next release.

So right now, 0.5.2 is on Maven Central, and it's publishing 0.5.3-SNAPSHOT as the snapshot.

Calling cancel() before request(n) causes null pointer exception

While writing the TCK for reactive sockets, my TCK code found the bug mentioned above.

val s4 = requestResponse("g", "h")
    s4 cancel()
    s4 assertCanceled()
    s4 assertNoErrors()
    s4 assertNotCompleted()
    s4 assertReceivedCount 0

The line s4 cancel() causes the NPE on the client side, as it seems like the current implementation always expects request(n) to be called before cancel.

REQUEST_STREAM payload lost when subscribing via client a second time

I'm prototyping a client library that uses a REQUEST_STREAM payload. ReactiveSocket.requestStream(Payload payload) returns a Publisher that works for the first Subscription. Upon the second subscription to the same Publisher, the client sends another REQUEST_STREAM to the server that contains an empty payload. I'm guessing this is an accidental bug and not expected behavior.

The root cause appears to be here:

The Payload instance contains mutable ByteBuffers whose positions get advanced and fully consumed during the first encoding.

I'm not yet familiar enough with the codebase to claim an ideal solution (assuming this is a bug), but would it work to reset the ByteBuffer positions in the handleStreamResponse() lambda before passing it to Frame.Request.from?

I've worked around this for the time being by creating a Payload implementation that re-creates a new ByteBuffer for each call to the accessor method.

ClientReactiveSocket sends NEXT_COMPLETE after REQUEST_SUBSCRIPTION/REQUEST_RESPONSE/etc.

I'm observing that after REQUEST_SUBSCRIPTION it also sends a RESPONSE frame with the COMPLETE flag set. This is out of spec obviously but might break a less lenient implementation.

This is caused by ClientReactiveSocket passing a Px.just<Payload>() which emits a onComplete() that is handled by RemoteSender (created in handleStreamResponse()) here: https://github.com/ReactiveSocket/reactivesocket-java/blob/b8fd58aaafec022bacc7a9c5f214d720f3535dd1/reactivesocket-core/src/main/java/io/reactivesocket/internal/RemoteSender.java#L162

Performance

As of 3e52b25 the performance looks as follows:

./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 6 .*ReactiveSocketPerf*'

Benchmark                                   Mode  Cnt        Score        Error  Units
ReactiveSocketPerf.requestResponseHello    thrpt    5  3466701.646 ± 232969.245  ops/s
ReactiveSocketPerf.requestStreamHello1000  thrpt    5    13441.690 ±    977.459  ops/s

(On my 2.3 GHz Intel Core i7 running OSX)

This involves no IO, just an in-memory dual-channel for simulating transport and exercising the protocol handlers.

This is 3.5M "hello" request/response per second (with a static payload each time) and 16M messages/second with streams of 1000 "hello" messages (again, all static, so no compute time to encoding).

Here is a Flight Recorder screenshot of the requestResponse test:

screen shot 2015-08-20 at 7 49 41 pm

screen shot 2015-08-20 at 7 51 19 pm

Project status?

I just found this project and it looks fantastic. What is the status? Is it ready for production use? Particularly aeron and tcp.
Thanks

Parsing Strictness

Right now it fails if it sees a message it doesn't understand: https://github.com/ReactiveSocket/reactivesocket-java/blob/e38b757ca02e597712139dfb9d01ba5dc7079c40/src/main/java/io/reactivesocket/ReactiveSocketServerProtocol.java#L76

Should this continue to behave this way? Are there message types it should ignore?

The protocol will support headers with an "ignore" bit: https://github.com/ReactiveSocket/reactivesocket/blob/79b046b3617a28ee5809f988967acc2b4ea3541c/Protocol.md#header-chains This suggests the server should at least ignore those if it doesn't understand them.

REQUEST_N Reactive Pull Support

The handler exists in the server implementation to receive the REQUEST_N messages, but it is not functioning: https://github.com/ReactiveSocket/reactivesocket-java/blob/e38b757ca02e597712139dfb9d01ba5dc7079c40/src/main/java/io/reactivesocket/ReactiveSocketServerProtocol.java#L192

Client side is sending the REQUEST_N messages: https://github.com/ReactiveSocket/reactivesocket-java/blob/e38b757ca02e597712139dfb9d01ba5dc7079c40/src/main/java/io/reactivesocket/ReactiveSocketClientProtocol.java#L132 However, the protocol design now specifies the initial request should be combined with the initial payload, and the client implementation does not do that right now. It won't be hard though once the binary encoding supports it.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.