Giter Site home page Giter Site logo

rsocket / rsocket-kotlin Goto Github PK

View Code? Open in Web Editor NEW
525.0 19.0 38.0 1.72 MB

RSocket Kotlin multi-platform implementation

Home Page: http://rsocket.io

License: Apache License 2.0

Kotlin 99.95% JavaScript 0.05%
kotlin android rsocket ktor coroutines async tcp kotlin-multiplatform websockets ios kmp multiplatform

rsocket-kotlin's Introduction

rsocket-kotlin

RSocket Kotlin multi-platform implementation based on kotlinx.coroutines and ktor-io.

RSocket is a binary application protocol providing Reactive Streams semantics for use on byte stream transports such as TCP, WebSockets, QUIC and Aeron.

It enables the following symmetric interaction models via async message passing over a single connection:

Learn more at http://rsocket.io

Supported platforms and transports :

Local (in memory) transport is supported on all targets. Most of other transports are implemented using ktor to ensure Kotlin multiplatform. So it depends on ktor client/server engines for available transports and platforms.

Client transports:

TCP WebSocket
JVM ✅ via ktor ✅ via ktor
JS ✅ via nodeJS (not supported in browser) ✅ via ktor
Native
(except windows)
✅ via ktor ✅ via ktor

Server transports:

TCP WebSocket
JVM ✅ via ktor ✅ via ktor
JS ✅ via nodeJS (not supported in browser)
Native
(except windows)
✅ via ktor ✅ via ktor

Using in your projects

rsocket-kotlin is available on Maven Central

Make sure, that you use Kotlin 1.6.20+, ktor 2.0.0+ and have mavenCentral() in the list of repositories:

repositories {
    mavenCentral()
}

Ktor plugins

rsocket-kotlin provides client and server plugins for ktor

Dependencies:

dependencies {
    //for client
    implementation("io.rsocket.kotlin:rsocket-ktor-client:0.15.4")

    //for server
    implementation("io.rsocket.kotlin:rsocket-ktor-server:0.15.4")
}

Example of client plugin usage:

//create ktor client
val client = HttpClient {
    install(WebSockets) //rsocket requires websockets plugin installed
    install(RSocketSupport) {
        //configure rSocket connector (all values have defaults)
        connector {
            maxFragmentSize = 1024

            connectionConfig {
                keepAlive = KeepAlive(
                    interval = 30.seconds,
                    maxLifetime = 2.minutes
                )

                //payload for setup frame
                setupPayload {
                    buildPayload {
                        data("""{ "data": "setup" }""")
                    }
                }

                //mime types
                payloadMimeType = PayloadMimeType(
                    data = WellKnownMimeType.ApplicationJson,
                    metadata = WellKnownMimeType.MessageRSocketCompositeMetadata
                )
            }

            //optional acceptor for server requests
            acceptor {
                RSocketRequestHandler {
                    requestResponse { it } //echo request payload
                }
            }
        }
    }
}

//connect to some url
val rSocket: RSocket = client.rSocket("wss://demo.rsocket.io/rsocket")

//request stream
val stream: Flow<Payload> = rSocket.requestStream(
    buildPayload {
        data("""{ "data": "hello world" }""")
    }
)

//take 5 values and print response
stream.take(5).collect { payload: Payload ->
    println(payload.data.readText())
}

Example of server plugin usage:

//create ktor server
embeddedServer(CIO) {
    install(WebSockets) //rsocket requires websockets plugin installed
    install(RSocketSupport) {
        //configure rSocket server (all values have defaults)

        server {
            maxFragmentSize = 1024

            //install interceptors
            interceptors {
                forConnection(::SomeConnectionInterceptor)
            }
        }
    }
    //configure routing
    routing {
        //configure route `/rsocket`
        rSocket("rsocket") {
            println(config.setupPayload.data.readText()) //print setup payload data

            RSocketRequestHandler {
                //handler for request/response
                requestResponse { request: Payload ->
                    println(request.data.readText()) //print request payload data
                    delay(500) // work emulation
                    buildPayload {
                        data("""{ "data": "Server response" }""")
                    }
                }
                //handler for request/stream      
                requestStream { request: Payload ->
                    println(request.data.readText()) //print request payload data
                    flow {
                        repeat(10) { i ->
                            emit(
                                buildPayload {
                                    data("""{ "data": "Server stream response: $i" }""")
                                }
                            )
                        }
                    }
                }
            }
        }
    }
}.start(true)

Standalone transports

rsocket-kotlin also provides standalone transports which can be used to establish RSocket connection:

Dependencies:

dependencies {
    implementation("io.rsocket.kotlin:rsocket-core:0.15.4")

    // TCP ktor client/server transport
    implementation("io.rsocket.kotlin:rsocket-transport-ktor-tcp:0.15.4")

    // WS ktor client transport
    implementation("io.rsocket.kotlin:rsocket-transport-ktor-websocket-client:0.15.4")

    // WS ktor server transport
    implementation("io.rsocket.kotlin:rsocket-transport-ktor-websocket-server:0.15.4")

    // TCP nodeJS client/server transport
    implementation("io.rsocket.kotlin:rsocket-transport-nodejs-tcp:0.15.4")
}

Example of usage standalone client transport:

val transport = TcpClientTransport("0.0.0.0", 8080)
val connector = RSocketConnector {
    //configuration goes here
}
val rsocket: RSocket = connector.connect(transport)
//use rsocket to do request
val response = rsocket.requestResponse(buildPayload { data("""{ "data": "hello world" }""") })
println(response.data.readText())

Example of usage standalone server transport:

val transport = TcpServerTransport("0.0.0.0", 8080)
val connector = RSocketServer {
    //configuration goes here
}
val server: TcpServer = server.bind(transport) {
    RSocketRequestHandler {
        //handler for request/response
        requestResponse { request: Payload ->
            println(request.data.readText()) //print request payload data
            delay(500) // work emulation
            buildPayload {
                data("""{ "data": "Server response" }""")
            }
        }
    }
}
server.handlerJob.join() //wait for server to finish

More samples:

Reactive Streams Semantics

From RSocket protocol:

Reactive Streams semantics are used for flow control of Streams, Subscriptions, and Channels. 
This is a credit-based model where the Requester grants the Responder credit for the number of PAYLOADs it can send. 
It is sometimes referred to as "request-n" or "request(n)".

kotlinx.coroutines doesn't truly support request(n) semantic, but it has flexible CoroutineContext which can be used to achieve something similar. rsocket-kotlin contains RequestStrategy coroutine context element, which defines, strategy for sending of requestNframes.

Example:

//assume we have client
val client: RSocket = TODO()

//and stream
val stream: Flow<Payload> = client.requestStream(Payload("data"))

//now we can use `flowOn` to add request strategy to context of flow
//here we use prefetch strategy which will send requestN for 10 elements, when, there is 5 elements left to collect
//so on call `collect`, requestStream frame with requestN will be sent, and then, after 5 elements will be collected
//new requestN with 5 will be sent, so collect will be smooth 
stream.flowOn(PrefetchStrategy(requestSize = 10, requestOn = 5)).collect { payload: Payload ->
    println(payload.data.readText())
}

Bugs and Feedback

For bugs, questions and discussions please use the Github Issues.

LICENSE

Copyright 2015-2022 the original author or authors.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

rsocket-kotlin's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rsocket-kotlin's Issues

GraalVM native-image support broken due to Payload

Exception in thread "main" java.lang.ExceptionInInitializerError
	at com.oracle.svm.core.hub.ClassInitializationInfo.initialize(ClassInitializationInfo.java:290)
	at java.lang.Class.ensureInitialized(DynamicHub.java:499)
	at com.oracle.svm.core.hub.ClassInitializationInfo.initialize(ClassInitializationInfo.java:235)
	at java.lang.Class.ensureInitialized(DynamicHub.java:499)
	at io.ktor.utils.io.core.IoBuffer.<clinit>(IoBufferJVM.kt:469)
	at com.oracle.svm.core.hub.ClassInitializationInfo.invokeClassInitializer(ClassInitializationInfo.java:350)
	at com.oracle.svm.core.hub.ClassInitializationInfo.initialize(ClassInitializationInfo.java:270)
	at java.lang.Class.ensureInitialized(DynamicHub.java:499)
	at io.ktor.utils.io.core.internal.ChunkBuffer$Companion.getEmpty(ChunkBuffer.kt:151)
	at io.ktor.utils.io.core.ByteReadPacket.<clinit>(ByteReadPacket.kt:46)
	at com.oracle.svm.core.hub.ClassInitializationInfo.invokeClassInitializer(ClassInitializationInfo.java:350)
	at com.oracle.svm.core.hub.ClassInitializationInfo.initialize(ClassInitializationInfo.java:270)
	at java.lang.Class.ensureInitialized(DynamicHub.java:499)
	at io.rsocket.kotlin.payload.Payload.<clinit>(Payload.kt:26)
	at com.oracle.svm.core.hub.ClassInitializationInfo.invokeClassInitializer(ClassInitializationInfo.java:350)
	at com.oracle.svm.core.hub.ClassInitializationInfo.initialize(ClassInitializationInfo.java:270)
	at java.lang.Class.ensureInitialized(DynamicHub.java:499)
	at io.rsocket.kotlin.core.RSocketClientSupport$Config.<init>(RSocketClientSupport.kt:40)
	at io.rsocket.kotlin.core.RSocketClientSupport$Feature.prepare(RSocketClientSupport.kt:61)
	at io.rsocket.kotlin.core.RSocketClientSupport$Feature.prepare(RSocketClientSupport.kt:58)
	at io.ktor.client.HttpClientConfig$install$3.invoke(HttpClientConfig.kt:70)
	at io.ktor.client.HttpClientConfig$install$3.invoke(HttpClientConfig.kt:16)
	at io.ktor.client.HttpClientConfig.install(HttpClientConfig.kt:90)
	at io.ktor.client.HttpClient.<init>(HttpClient.kt:146)
	at io.ktor.client.HttpClientKt.HttpClient(HttpClient.kt:40)
	at io.rsocket.cli.MainKt.buildClient(Main.kt:308)
	at io.rsocket.cli.Main.exec(Main.kt:161)
	at io.rsocket.cli.Main$run$1.invokeSuspend(Main.kt:130)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:56)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
	at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:84)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
	at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
	at io.rsocket.cli.Main.run(Main.kt:129)
	at io.rsocket.cli.Main$Companion.exec(Main.kt:285)
	at io.rsocket.cli.Main$Companion.main(Main.kt:269)
	at io.rsocket.cli.Main.main(Main.kt)
Caused by: java.lang.RuntimeException: java.lang.NoSuchFieldException: top
	at java.util.concurrent.atomic.AtomicLongFieldUpdater$CASUpdater.<init>(AtomicLongFieldUpdater.java:205)
	at java.util.concurrent.atomic.AtomicLongFieldUpdater.newUpdater(AtomicLongFieldUpdater.java:95)
	at io.ktor.utils.io.pool.DefaultPool.<clinit>(DefaultPool.kt:97)
	at com.oracle.svm.core.hub.ClassInitializationInfo.invokeClassInitializer(ClassInitializationInfo.java:350)
	at com.oracle.svm.core.hub.ClassInitializationInfo.initialize(ClassInitializationInfo.java:270)
	... 39 more
Caused by: java.lang.NoSuchFieldException: top
	at java.lang.Class.getDeclaredField(DynamicHub.java:2411)
	at java.util.concurrent.atomic.AtomicLongFieldUpdater$CASUpdater.<init>(AtomicLongFieldUpdater.java:202)
	... 43 more

make RSocketError.Custom.errorCode usable

The errorCode field of RSocketError made internal, it is not accessible from application code.

Motivation

An RSocketError.Custom has an error code associated with it. Allowed errorCode value should be in range [0x00000301-0xFFFFFFFE]. The only sane reason to set the code for custom exception is an ability to inspect it later within application.

  1. the code value is not usable within application when errorCode is internal;
  2. it is not clear what minimal allowed value is until IAE is thrown. ErrorCode.CustomMin constant -- the lower bond -- is also internal.

Desired solution

Make both errorCode and ErrorCode.CustomMin public.

Bug with flow.buffer(2).take(2)

From https://github.com/yschimke/rsocket-cli/pull/1

Total request is 4

Send:
RequestStream frame -> Stream Id: 1 Length: 13
Flags: 0b100000000 (M1F0C0N0)
Initial request: 2
Metadata: Empty
Data: Empty


Receive:
Payload frame -> Stream Id: 1 Length: 19
Flags: 0b000100000 (M0F0C0N1)
Data(length=13):
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 35 39 39 38 35 30 30 39 31 34 31 34          |1599850091414   |
+--------+-------------------------------------------------+----------------+

1599850091414

Receive:
Payload frame -> Stream Id: 1 Length: 19
Flags: 0b000100000 (M0F0C0N1)
Data(length=13):
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 35 39 39 38 35 30 30 39 31 34 36 34          |1599850091464   |
+--------+-------------------------------------------------+----------------+


Send:
RequestN frame -> Stream Id: 1 Length: 10
Flags: 0b000000000 ()
RequestN: 2

1599850091464

Send:
Cancel frame -> Stream Id: 1 Length: 6
Flags: 0b000000000 ()

java.lang.IllegalArgumentException: End gap 8 is too big: capacity is 7

Actual Behavior

Exception in thread "DefaultDispatcher-worker-3" java.lang.IllegalArgumentException: End gap 8 is too big: capacity is 7
	at io.ktor.utils.io.core.BufferKt.endGapReservationFailedDueToCapacity(Buffer.kt:463)
	at io.ktor.utils.io.core.Buffer.reserveEndGap(Buffer.kt:220)
	at io.ktor.utils.io.core.AbstractOutput.appendNewChunk(AbstractOutput.kt:195)
	at io.ktor.utils.io.core.AbstractOutput.prepareWriteHead(AbstractOutput.kt:497)
	at io.ktor.utils.io.core.OutputPrimitivesKt.writeIntFallback(OutputPrimitives.kt:133)
	at io.ktor.utils.io.core.OutputPrimitivesKt.writeInt(OutputPrimitives.kt:22)
	at io.rsocket.kotlin.frame.Frame.toPacket(Frame.kt:41)
	at io.rsocket.kotlin.ConnectionKt.sendFrame(Connection.kt:42)
	at io.rsocket.kotlin.internal.RSocketState$start$3.invokeSuspend(RSocketState.kt:197)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.selects.SelectBuilderImpl.resumeWith(Select.kt:302)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665)
java.util.concurrent.CancellationException: Connection closed
	at kotlinx.coroutines.ExceptionsKt.CancellationException(Exceptions.kt:22)
	at io.rsocket.kotlin.internal.RSocketState$start$2.invoke(RSocketState.kt:183)
	at io.rsocket.kotlin.internal.RSocketState$start$2.invoke(RSocketState.kt:33)
	at kotlinx.coroutines.InvokeOnCompletion.invoke(JobSupport.kt:1412)
	at kotlinx.coroutines.JobSupport.notifyCompletion(JobSupport.kt:1544)
	at kotlinx.coroutines.JobSupport.completeStateFinalization(JobSupport.kt:326)
	at kotlinx.coroutines.JobSupport.finalizeFinishingState(JobSupport.kt:241)
	at kotlinx.coroutines.JobSupport.continueCompleting(JobSupport.kt:951)
	at kotlinx.coroutines.JobSupport.access$continueCompleting(JobSupport.kt:29)
	at kotlinx.coroutines.JobSupport$ChildCompletion.invoke(JobSupport.kt:1175)
	at kotlinx.coroutines.JobSupport.notifyCompletion(JobSupport.kt:1544)
	at kotlinx.coroutines.JobSupport.completeStateFinalization(JobSupport.kt:326)
	at kotlinx.coroutines.JobSupport.finalizeFinishingState(JobSupport.kt:241)
	at kotlinx.coroutines.JobSupport.tryMakeCompletingSlowPath(JobSupport.kt:922)
	at kotlinx.coroutines.JobSupport.tryMakeCompleting(JobSupport.kt:875)
	at kotlinx.coroutines.JobSupport.makeCompletingOnce$kotlinx_coroutines_core(JobSupport.kt:840)
	at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:111)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
	at kotlinx.coroutines.selects.SelectBuilderImpl.resumeWith(Select.kt:302)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665)
Caused by: java.lang.IllegalArgumentException: End gap 8 is too big: capacity is 7
	at io.ktor.utils.io.core.BufferKt.endGapReservationFailedDueToCapacity(Buffer.kt:463)
	at io.ktor.utils.io.core.Buffer.reserveEndGap(Buffer.kt:220)
	at io.ktor.utils.io.core.AbstractOutput.appendNewChunk(AbstractOutput.kt:195)
	at io.ktor.utils.io.core.AbstractOutput.prepareWriteHead(AbstractOutput.kt:497)
	at io.ktor.utils.io.core.OutputPrimitivesKt.writeIntFallback(OutputPrimitives.kt:133)
	at io.ktor.utils.io.core.OutputPrimitivesKt.writeInt(OutputPrimitives.kt:22)
	at io.rsocket.kotlin.frame.Frame.toPacket(Frame.kt:41)
	at io.rsocket.kotlin.ConnectionKt.sendFrame(Connection.kt:42)
	at io.rsocket.kotlin.internal.RSocketState$start$3.invokeSuspend(RSocketState.kt:197)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	... 7 more

Steps to Reproduce

With

./rsocket-cli ws://rsocket-demo-1.ew.r.appspot.com/rsocket
./rsocket-cli wss://rsocket-demo-1.ew.r.appspot.com/rsocket

Your Environment

GraalVM (11) or JDK11

OkHttp based transport is limited

Connection will be force closed once total queued data size exceeds 16 MiB. Probably makes sense to also add http2 one - this will move minimal supported Android version to 5.0 (to get ALPN) and cost ~14% of dropped device support

Kotlin/Native TCP support

Since Ktor 1.4.0, CIO engine support TCP sockets (without TLS) on native. So it would be good to support it.
Firstly, we need to support Kotlin/Native, which will cause refactorings of some internals because of different memory model.
This will also allow to use Kotlin/Native with other transports later, when ktor will support them (f.e. websockets)

incompatibilty with newer coroutines

when running with coroutines 1.3.9 or newer rsockets wil lcrash with a NoSuchMethodError

void kotlinx.coroutines.flow.internal.ChannelFlow.<init>(kotlin.coroutines.CoroutineContext, int)'
java.lang.NoSuchMethodError: 'void kotlinx.coroutines.flow.internal.ChannelFlow.<init>(kotlin.coroutines.CoroutineContext, int)'

Steps to Reproduce

build.gradle.kts

dependencies {
    api("org.jetbrains.kotlinx:kotlinx-serialization-protobuf:1.0.1")
    api("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2")

    api("io.ktor:ktor-client-cio:1.4.2")

    api("io.rsocket.kotlin:rsocket-core:0.11.5")
    api("io.rsocket.kotlin:rsocket-transport-ktor-client::0.11.5")
}
fun RSocket(protobuf: ProtoBuf, events: Flow<Event>) {
    requestChannel(
        payloads = events.map { event ->
            protobuf.encodeToPayload("track", Event.serializer(), event)
        }
    )
}

exception throws on call of requestChannel

Possible Solution

ChannelFlow::init has 3 arguments now, currently onBufferOverflow: BufferOverflow is not passed

updating to a slightly newer version of coroutines would probably also help

Your Environment

  • RSocket version(s) used: 0.11.5
  • Other relevant libraries versions: kotlinx.coroutines 1.4.2
  • Platform (eg. JVM version (javar -version) or Node version (node --version)): java 14

Frame logging

rsocket-cli tracking issue rsocket/rsocket-cli#100

Ideally with a simple flag, add ability to see either of

RSocket Frames (requestn, setup, payload) or
Transport Frames (Websocket frame, potentially HTTP2)

The former is way more important to me.

Remove slf4j dependency

To use rsocket-kotlin on Android you also need to pull slf4j-api and slf4j-android. As far as I can tell the debug messages, which are few, are of rather limited use for end users.

Thoughts?

I can PR this you you want..

OutOfDirectMemoryError when trying to test back pressure

Hi,

While creating a demo with RSockets that does backpressure, I noticed that netty chokes if there's too much data to deliver and the client doesn't answer. The setup is Spring Boot with rsocket-kotlin on the back end that pumps messages as fast as it can through a websocket to a angular front via rsocket-js. I can observe that there's backpressure caused drops, but once the browser slows down enough and the error below happens. With Firefox rather quickly.

The demo can be found at https://github.com/ration/ticker-demo
I just started to smoke out RSockets, so feel free to say if the example is just dumb.

io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 3456106503, max: 3464495104) at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367) at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:69) at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57) at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) at java.util.concurrent.FutureTask.run(FutureTask.java) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 3456106503, max: 3464495104) at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:640) at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:594) at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:764) at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:740) at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244) at io.netty.buffer.PoolArena.allocate(PoolArena.java:226) at io.netty.buffer.PoolArena.allocate(PoolArena.java:146) at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176) at io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:113) at io.rsocket.kotlin.Frame$PayloadFrame.from(Frame.kt:548) at io.rsocket.kotlin.Frame$PayloadFrame.from(Frame.kt:537) at io.rsocket.kotlin.Frame$PayloadFrame.from$default(Frame.kt:534) at io.rsocket.kotlin.internal.RSocketResponder$handleStream$1.apply(RSocketResponder.kt:201) at io.rsocket.kotlin.internal.RSocketResponder$handleStream$1.apply(RSocketResponder.kt:40) at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:63) at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:407) at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176) at io.reactivex.internal.schedulers.ImmediateThinScheduler$ImmediateThinWorker.schedule(ImmediateThinScheduler.java:89) at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.trySchedule(FlowableObserveOn.java:166) at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.onNext(FlowableObserveOn.java:117) at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:68) at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:407) at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176) at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66) ... 8 more Exception in thread "RxCachedThreadScheduler-1" io.reactivex.exceptions.UndeliverableException: The exception coul

Reboot rsocket-kotlin as a multiplatform + Coroutines based project

With the announcement of Coroutines Flow (see also preview status information and related open issue on their bugtracker), I think we have now all the required pieces to make rsocket-kotlin more than the kind of less maintained duplicate of the Java one that may be removed at some point.

Kotlin is becoming more and more a multiplatform language and I think after leveraging rsocket-transport-netty and rsocket-transport-okhttp (worth to notice that OkHttp plans to migrate to a Kotlin codebase) to provide the JVM implementation, it would be interesting to see if it is doable to provide a Kotlin/JS one. It is too early for a Kotlin/Native implementation but we could target that middle/long term.

Here is an overview of what could look like this multiplatform API.

interface RSocket : Availability, Closeable {
    suspend fun fireAndForget(payload: Payload)
    suspend fun requestResponse(payload: Payload): Payload
    fun requestStream(payload: Payload): Flow<Payload>
    fun requestChannel(payloads: Flow<Payload>): Flow<Payload>
    suspend fun metadataPush(payload: Payload)
}

I guess Payload could leverage Kotlin multiplatform I/O library.

I plan to work on that myself (with the help of others if some are interested to join) since that will allow me to validate various points of Flow API design including its Reactive-Streams interop. Also I do think this kind of approach would provide an interesting way to explore for multiplatform support that could ultimately allow RSocket to gain more traction.

Kotlinx.serialization integration for payload

It would be good to provide out of the box integration with kotlinx.serialization for serializing models into payload data (is metadata serialization needed?). kotlinx.serialization supports both binary formats such as CBOR and ProtoBuf, and string JSON format (and other custom community driven formats).
Possible implementations:

  1. reading value from payload: format.decodeFromPayload<ModelType>(payload) where: ModelType - resulting type (should be annotated with Serializable annotation), format - JSON/ProtoBuf/CBOR format (or any other custom format)
  2. writing value to payload: format.encodeToPayload(model, [optional metadata]) where model - entity to serialize, format - supported format
  3. typed interface for interactions (for requester): fun <T, R> requestResponse(data: R, metadata: ???): TypedPayload<R> where: T - type of request data, R - type of respose data, TypedPayload - payload, which contains data as type R and metadata
  4. something else ???

Research usage of big Payloads

On current moment Payload contains 2 properties data and metadata which can store bytes. The problem is, that those properties store ByteReadPacket which works like in-memory buffer. So f.e. to send big file through in one Payload(+fragmentation), it will be needed to firstly read everything from file to ByteReadPacket, so in memory.

Need to research, if and how it's possible to send such big payloads without reading everything to memory

Document that rsocket-transport-netty not Android compatible

Hit this issue

> Task :app:transformClassesWithStackFramesFixerForDebug
Exception in thread "main" java.lang.IllegalStateException: Couldn't load java/util/concurrent/Flow$Subscription, is the classpath complete?
	at com.google.devtools.build.android.desugar.DefaultMethodClassFixer.loadFromInternal(DefaultMethodClassFixer.java:223)
	at com.google.devtools.build.android.desugar.DefaultMethodClassFixer.stubMissingDefaultAndBridgeMethods(DefaultMethodClassFixer.java:201)
	at com.google.devtools.build.android.desugar.DefaultMethodClassFixer.visitEnd(DefaultMethodClassFixer.java:95)
	at org.objectweb.asm.ClassVisitor.visitEnd(ClassVisitor.java:339)
	at com.google.devtools.build.android.desugar.InterfaceDesugaring.visitEnd(InterfaceDesugaring.java:112)
	at org.objectweb.asm.ClassReader.accept(ClassReader.java:702)
	at org.objectweb.asm.ClassReader.accept(ClassReader.java:500)
	at com.google.devtools.build.android.desugar.Desugar.desugarClassesInInput(Desugar.java:477)
	at com.google.devtools.build.android.desugar.Desugar.desugarOneInput(Desugar.java:361)
	at com.google.devtools.build.android.desugar.Desugar.desugar(Desugar.java:314)
	at com.google.devtools.build.android.desugar.Desugar.main(Desugar.java:711)
Caused by: java.lang.ClassNotFoundException: Class java.util.concurrent.Flow$Subscription not found
	at com.google.devtools.build.android.desugar.HeaderClassLoader.findClass(HeaderClassLoader.java:53)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at com.google.devtools.build.android.desugar.DefaultMethodClassFixer.loadFromInternal(DefaultMethodClassFixer.java:221)
	... 10 more

Seems to be because of io.projectreactor.ipc:reactor-netty:0.7.8.RELEASE obviously depends on reactor-core which has reactor.adapter.JdkFlowAdapter. Not sure if this is intentional, but seems unfortunate given the name used to be rsocket-android, and kotlin gives you good support back to 1.6 and early android.

Limit rate operator for streams

Need to implement some limit rate operator for streams and channels, to improve flexibility of streams API.
F.e to support similar operator from reactor like Flux.limitRate or reactor-like prefetch logic: request new elements, when 75% of elements collected

Connection error closes Android application process

Rxjava is pedantic regarding not handled exceptions in streams, and rethrows them on main Application thread with less-than-helpful stacktrace, closes Application. Can be worked around with RxJavaPlugins.setErrorHandler. Should be solved with explicit error handlers on streams, and avoiding stream subscriptions inside on- handlers (exceptions I saw probably caused by ConnectionMultiplexer).

RSocket infos

hi

  1. what does mean payload ZERO_COPY ?
  2. when server is close (dispose() invoke) the acceptor keep listen for incoming message
  3. server can send request like client ?
  4. handler for : client connect & disconnect, server start & dispose
  5. what effect to use onClose() ?

Intellij project build fails because of missing assemble target

Expected Behavior

Clicking the project build icon in Intellij does a project build.

Actual Behavior

Task 'assemble' not found in root project 'rsocket-kotlin'.

* Try:
Run gradle tasks to get a list of available tasks. Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights.

Steps to Reproduce

Possible Solution

Your Environment

OSX (MacBook Air M1)

rsocket-kotlin-scripting target

What could we do to optimise the kotlin scripting experience?

https://github.com/yschimke/okurl-scripts/blob/master/commands/rsocketTcpProxy.main.kts

  1. single maven dependency to import to bring in JVM scripting dependencies

Collapse the DependsOn to a single line

  1. Utility functions for use in ascripting environments based around common operations, with lower barrier than normal public API. Make this a one liner.
val httpClient = HttpClient(ClientCIO) {
        install(ClientWebSockets)
        install(ClientRSocketSupport) {
          connector = RSocketConnector {
            loggerFactory = PrintLogger.withLevel(LoggingLevel.DEBUG)
            connectionConfig {
              payloadMimeType = config.payloadMimeType
            }
          }
        }
      }

Error with elapsedNow in snapshot

Repro

./rsocket-cli --route searchTweets -i Trump wss://rsocket-demo.herokuapp.com/rsocket

After some time

Exception in thread "main" java.lang.NoSuchMethodError: 'double kotlin.time.TimeMark.elapsedNow()'
	at io.rsocket.kotlin.keepalive.KeepAliveHandler$startIn$1.invokeSuspend(KeepAliveHandler.kt:46)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:56)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:738)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665)

Similar to https://github.com/InsertKoinIO/koin/pull/873/files

Stream ID overflow and reuse

Now no overflow protection and reuse in Kotlin version.

internal sealed class StreamIds(private var streamId: Int) {

    @Synchronized
    fun nextStreamId(): Int {
        streamId += 2
        return streamId
    }

    @Synchronized
    fun isBeforeOrCurrent(streamId: Int): Boolean =
            this.streamId >= streamId && streamId > 0
}

internal class ClientStreamIds : StreamIds(-1)

internal class ServerStreamIds : StreamIds(0)

Please refer Please refer https://github.com/rsocket/rsocket-java/blob/develop/rsocket-core/src/main/java/io/rsocket/StreamIdSupplier.java

Ktor 1.5.0

Actual Behavior

Ktor serverside does not run any code in ConnectionAcceptor

although keepalives seem to get sent regularly for a bit

Steps to Reproduce

use implementation("io.rsocket.kotlin:rsocket-transport-ktor-server:1.5.0")

Your Environment

  • RSocket version(s) used: 0.12.0
  • Other relevant libraries versions (eg. netty, ...): ktor-cio and ktor-netty: 1.5.0
  • Platform (eg. JVM version (javar -version) or Node version (node --version)): JVM(server), client (JS)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.