Giter Site home page Giter Site logo

protoactor-kotlin's Introduction

Build Status Download Coverage Status stability-experimental

Proto.Actor Kotlin

Ultra-fast, distributed, cross-platform actors. This is the Kotlin repository for Proto.Actor.

Stability

It's used in production but doesn't have the same adoption and stability as the C# and Go implementations.

How to build

./gradlew build

Design principles

Minimalistic API - The API should be small and easy to use. Avoid enterprisey containers and configurations.

Build on existing technologies - There are already a lot of great technologies for e.g. networking and clustering. Build on those instead of reinventing them. E.g. gRPC streams for networking, Consul for clustering.

Pass data, not objects - Serialization is an explicit concern - don't try to hide it. Protobuf all the way.

Be fast - Do not trade performance for magic API trickery.

Inprocess Ping-Pong results:

Dispatcher		Elapsed		Msg/sec
300			273		116885925
400			217		147426522
500			150		213037390
600			85		375979638
700			87		364621820
800			83		381552772 <-- 380+ mil msg/sec

Modules

Dependencies

Package dependencies

Getting started

The best place currently for learning how to use Proto.Actor is the examples.

Hello world

build.gradle.kts

repositories {
    jcenter()
}

dependencies {
	implementation("actor.proto:proto-actor:latest.release")
}

App.kt

import actor.proto.*

fun main() {
	val prop = fromFunc { msg ->
		when (msg) {
			is Started -> println("Started")
			is String -> {
				println("Hello $msg")
				stop(self)
			}
			is Stopping -> println("Stopping")
			is Stopped -> println("Stopped")
			else -> println("Unknown message $msg")
		}
	}

	val pid = spawn(prop)
	send(pid, "Proto.Actor")
	readLine()
}

Release management

Stable release are published to https://bintray.com/asynkronit/protoactor-kotlin and linked to jcenter. Anyone of the repositories below will do.

repositories {
   	maven("https://dl.bintray.com/asynkronit/protoactor-kotlin")
}
repositories {
   	jcenter()
}

Snapshot

Commits on the master branch are deployed as snapshots to https://oss.jfrog.org/artifactory/oss-snapshot-local/actor/proto/ and can be consumed by adding the following configuration to your gradle file:

repositories {
    repositories {
        maven { url 'http://oss.jfrog.org/artifactory/oss-snapshot-local' }
    }
}

dependencies {
    compile 'actor.proto:proto-actor:0.1.0-SNAPSHOT'
}

Publishing a new version

When a tag is created e.g. v0.1.0 Travis will build and publish the packages to Bintray.

Support

Many thanks to JetBrains for support!

Also thanks to ej-technologies.com for their Java profiler - JProfiler

protoactor-kotlin's People

Contributors

asomov avatar guenhter avatar james-cobb avatar keitase avatar orjan avatar rogeralsing avatar zsmb13 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

protoactor-kotlin's Issues

UninitializedPropertyAccessException can occur in EndpointWriter restarting() method causing infinite loop

In tests deliberately causing gRPC endpoints to fail, we found the EndpointWriter on the surviving node can get into an infinite loop.

The EndpointWriter is sent a Restarting message. In the restarting() method channel.shutdownNow() is called, but the lateinit channel is not yet initialized. This causes an exception in the restarting() method, which then causes the supervisor to force another restart, causing a loop using 100% CPU.

https://github.com/AsynkronIT/protoactor-kotlin/blob/2234df6fdc5cf4175624ec3f1632de72f718bcc0/proto-remote/src/main/kotlin/actor/proto/remote/EndpointWriter.kt#L67

Failing Unit Test

actor.proto.tests.ActorTests > actorStartedException STANDARD_ERROR
    [DefaultDispatcher-worker-4 @coroutine#29] WARN actor.proto.ActorContext - Handling root failure for nonhost/$9
    [DefaultDispatcher-worker-4 @coroutine#31] WARN actor.proto.ActorContext - Handling root failure for nonhost/$9

actor.proto.tests.ActorTests > actorStartedException STANDARD_OUT
        actor.proto.Started@406769a0
        actor.proto.Restarting@1d184d59
        actor.proto.Started@406769a0
        actor.proto.Stopping@59d6b7fc
        actor.proto.Stopped@523aa1c
        actor.proto.Restarting@1d184d59
        actor.proto.Started@406769a0
        hello

actor.proto.tests.ActorTests > actorStartedException FAILED
    java.lang.AssertionError: expected same:<actor.proto.Restarting@1d184d59> was not:<actor.proto.Stopping@59d6b7fc>
        at org.junit.Assert.fail(Assert.java:88)
        at org.junit.Assert.failNotSame(Assert.java:828)
        at org.junit.Assert.assertSame(Assert.java:771)
        at kotlin.test.junit.JUnitAsserter.assertSame(JUnitSupport.kt:40)
        at kotlin.test.AssertionsKt__AssertionsKt.assertSame(Assertions.kt:58)
        at kotlin.test.AssertionsKt.assertSame(Unknown Source)
        at kotlin.test.AssertionsKt__AssertionsKt.assertSame$default(Assertions.kt:57)
        at kotlin.test.AssertionsKt.assertSame$default(Unknown Source)
        at actor.proto.tests.ActorTests.actorStartedException(ActorTests.kt:107)

I'm looking at the failing unit test and I dumped the messages as well (above). It looks like a legit issue to me due to the ordering of the messages.

Wrong action from supervisor strategy when running multiple tests

The actorStartedException succeeds when running in isolation.
But when running together with the full testsuite, it fails.

I can see that we are getting the wrong system message in the ActorContext at a point.
The handleFailure of the OneForOneStrategy is the caller, and it seems to be passing the message.

Not clear what causes this.
Concurrency issues in a concurrency lib are the best :-/

DeferredProcess causes a memory leak

DeferredProcess adds itself to the process registry on creation. But this is entry is never removed. The Process Registry uses a map with strong references, so completed deferred processes are never garbage collected.

Remote terminates when connection is lost

I've been fiddling around a little bit on how to create a set of cross language acceptance tests.
The first take was trying to port https://github.com/AsynkronIT/protoactor-go/blob/dev/examples/remoteactivate/node2/main.go to kotlin and call it from https://github.com/AsynkronIT/protoactor-go/blob/dev/examples/remoteactivate/node1/main.go

Node 1 will receive the message but when it'll exit the kotlin remote will fail hard.

fun main(args: Array<String>) {
    Serialization.registerFileDescriptor(Messages.getDescriptor())
    Remote.start("127.0.0.1", 8080)
    val props: Props = fromProducer { HelloActor() }
    Remote.registerKnownKind("hello", props)
    readLine()
}

class HelloActor : Actor {
    suspend override fun Context.receive(msg: Any) {
        when (msg) {
            is actor.proto.acceptance.Messages.HelloRequest -> {
                respond(Messages.HelloResponse.newBuilder()
                        .setMessage("Hello from kotlin")
                        .build())
            }
            else -> {
                println("Another message: " + msg)
            }
        }
    }
}
Connecting to address 127.0.0.1:8081
Connected to address 127.0.0.1:8081
io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
Dec 30, 2017 11:22:58 PM io.grpc.internal.SerializingExecutor run
SEVERE: Exception while executing runnable io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed@79157d48
java.lang.NullPointerException
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:418)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:41)
	at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:663)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:41)
	at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:392)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:443)
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:63)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:525)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:446)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:557)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Mailbox schedule() called in endless loop without any messages

Intermittently (hard to reproduce reliably) we see CPU spinning at 50% or 100% (one or two cores), with profiling showing that schedule() is being called repeatedly, even though no messages are being processed by actors.

I had thought the problem only related to bounded queues. But now think I've seen it with unbounded queues too. Would have to rerun a lot of tests to check this for sure.

Looking at DefaultMailbox - the system message queue has an atomic counter tracking whether there are messages to be polled.

On a hunch I added a similar atomic counter to check if there are messages in the user queue as well, and used the two atomic counters as the check if schedule() should be called. So replacing the isNotEmpty checks here:

https://github.com/AsynkronIT/protoactor-kotlin/blob/ca94fb9a2f18e5738c0d01d8f9749caee29293b1/proto-mailbox/src/main/kotlin/actor/proto/mailbox/DefaultMailbox.kt#L71

with this check:

if (sysCount.get() > 0 || (!suspended && userCount.get() > 0)) {

This seems to have done the trick - we no longer see CPU spinning.

If the above count check fails, I'm then doing the original (isNotEmpty) check, and if that check is true I'm logging. This shows there may be async access issues. Having checked that at least one of the queues isNotEmpty, the logging then shows that both of the queues are indeed empty. Can there be multiple threads removing from the queues? I thought consumer was only single threaded?

@rogeralsing What was the reason for adding the atomic sysCount in DefaultMailbox in the 'queues' commit? Was it to solve a similar issue that I'm seeing here? The need for the atomic counter might suggest maybe there are multiple threads consuming the queues?

Actor system can deadlock

I think I have a handle on why my attempt to measure the messaging throughput with a single contended echo (rather than one per client) resulting in deadlock. I suspect the problem is actually very serious.

The changes I made:

  • use a different mailBoxSpec for 'Echo', set to c;lientCount producers
  • create just one echo actor instance
  • to make testing easier I just use 900 batch size and 1 iteration

On my system, I have 8 cores, so I have 160 clients and they try to send 64000 messages in 'repeat' loops. This results in the JVM CPU usage dropping to 0.

If I increase the echo mailbox capacity to 30000 then it still fails. With 40000 it is working, but I think I'm just lucky and the echo actor got some CPU.

I believe the problem is that the CPU threads are all waiting on the actor's mailbox because send is effectively a blocking operation.

I think that's rather bad in an actor runtime.

Throughput is fine when its working, but it seems dangerous to use fixed size queues that can block a sender.


package actor.proto.examples.inprocessbenchmark

import actor.proto.*
import actor.proto.mailbox.DefaultDispatcher
import actor.proto.mailbox.newMpscArrayMailbox
import actor.proto.mailbox.newSpecifiedMailbox
import org.jctools.queues.spec.ConcurrentQueueSpec
import org.jctools.queues.spec.Ordering
import org.jctools.queues.spec.Preference
import java.lang.Runtime.getRuntime
import java.lang.System.nanoTime
import java.util.concurrent.CountDownLatch

fun main(args: Array) {
repeat(1 /0/) {
run()
readLine()
}
}

fun run() {
val clientCount = getRuntime().availableProcessors() * 20
val mailboxSpecClient = ConcurrentQueueSpec(1,1,5000, Ordering.PRODUCER_FIFO , Preference.NONE)
val mailboxSpecEcho = ConcurrentQueueSpec(clientCount,1,40000, Ordering.PRODUCER_FIFO , Preference.NONE)
val messageCount = 1_000_000
val batchSize = 400
println("Dispatcher\t\tElapsed\t\tMsg/sec")
val tps = arrayOf(/1,2,5,10,20,50,100,150,200,300, 400, 500, 600, 700, 800,/ 900)
for (t in tps) {
val d = DefaultDispatcher(throughput = t)

    val echoProps =
            fromProducer { EchoActor() }
            .withDispatcher(d)
            .withMailbox { newSpecifiedMailbox(mailboxSpecEcho) }

    val latch = CountDownLatch(clientCount)
    val clientProps =
            fromProducer { PingActor(latch, messageCount, batchSize) }
            .withDispatcher(d)
            .withMailbox {  newSpecifiedMailbox(mailboxSpecClient)}

    val echoActor = spawn(echoProps)

    val pairs = (0 until clientCount)
            .map { Pair(spawn(clientProps), echoActor) }
            .toTypedArray()

    val sw = nanoTime()
    for ((ping, pong) in pairs) {
        send(ping, Start(Msg(ping,pong)))
    }
    latch.await()
    val elapsedNanos = (nanoTime() - sw).toDouble()
    val elapsedMillis = (elapsedNanos / 1_000_000).toInt()
    val totalMessages = messageCount * 2 * clientCount
    val x = ((totalMessages.toDouble() / elapsedNanos * 1_000_000_000.0 ).toInt())
    println("$t\t\t\t\t$elapsedMillis\t\t\t$x")
    for ((client, echo) in pairs) {
        stop(client)
        stop(echo)
    }
    Thread.sleep(500)
}

}

data class Msg(val ping: PID, val pong: PID)
data class Start(val msg : Msg)

class EchoActor : Actor {
suspend override fun Context.receive(msg: Any) {
//print('.')
when (msg) {
is Msg -> send(msg.ping, msg)
}
}
}

class PingActor(private val latch: CountDownLatch, private var messageCount: Int, private val batchSize: Int, private var batch: Int = 0) : Actor {
suspend override fun Context.receive(msg: Any) {
when (msg) {
is Start -> sendBatch(msg.msg)
is Msg -> {
batch--
if (batch > 0) return
if (!sendBatch(msg)) {
latch.countDown()
}
}
}
}

private fun Context.sendBatch(msg : Msg): Boolean = when (messageCount) {
    0 -> false
    else -> {
        val n = minOf(batchSize, messageCount)
        repeat(n) { send(msg.pong, msg) }
        messageCount -= n
        batch = n
        true
    }
}

}

jcenter integration for proto-mailbox

At the moment we cannot resolve the proto-mailbox from jcenter, even if it should be there.

A workaround is to add a direct dependency on bintray:

    repositories {
        jcenter()
        maven {
            url "https://dl.bintray.com/asynkronit/protoactor-kotlin"
        }
    }
Caused by: org.gradle.internal.resolve.ModuleVersionNotFoundException: Could not find actor.proto:proto-mailbox:0.0.2.
Searched in the following locations:
    https://jcenter.bintray.com/actor/proto/proto-mailbox/0.0.2/proto-mailbox-0.0.2.pom
    https://jcenter.bintray.com/actor/proto/proto-mailbox/0.0.2/proto-mailbox-0.0.2.jar

Remove old branches to avoid confusion

Can you please remove branches which were merged:
remotes/origin/update-to-jdk
rmotes/origin/router
remotes/origin/implicit-context
remotes/origin/300mil
remotes/origin/reformat
remotes/origin/racy-routers

Can these be removed because Kotlin 1.3 is used already ?
remotes/origin/kotlin-1.1.3-2
remotes/origin/kotlin-1.1.4-eap-33
remotes/origin/kotlin-1.2-M1

These look like old because Gradle is already used:
remotes/origin/revert-3-gradle
remotes/origin/gradle

Release newer version

A newer version of protoactor should be released (maybe when the two open pull requests #47 and #50 are merged in), because the current latest version 0.0.2 relies on pretty outdated dependencies (like coroutines) which unfortunately forces to use that old version (upgrade of coroutines would break protocactor).

Actor can receive message before mailbox dispatcher is initialized

Still need to investigate the precise circumstances that cause this, but we have an actor that sends a message to a newly instantiated actor using send(pid, msg). Sending throws an exception because the recipient mailbox lateinit dispatcher has not yet been initialized.

For a local actor it's easy to catch the exception and retry the message later. But if a message is sent to a remote actor with an uninitialized dispatcher, the exception occurs at the recipient end within EndpointReader's StreamObserver receive method, which then causes a gRPC UNKNOWN exception on the sending end, which causes the connection to die.

From the DefaultMailbox it looks as though a spawn call can only return a pid once handlers are registered, so it's hard to see how this is possible.

For the moment I've checked initialization here: https://github.com/AsynkronIT/protoactor-kotlin/blob/ca94fb9a2f18e5738c0d01d8f9749caee29293b1/proto-mailbox/src/main/kotlin/actor/proto/mailbox/DefaultMailbox.kt#L81

using

 if (wasIdle && ::dispatcher.isInitialized) {

(and actually also at this line, checking twice just to introduce some more logging)
https://github.com/AsynkronIT/protoactor-kotlin/blob/ca94fb9a2f18e5738c0d01d8f9749caee29293b1/proto-mailbox/src/main/kotlin/actor/proto/mailbox/DefaultMailbox.kt#L18

And that has at least stopped the exceptions. But there will be a better solution if I can find exactly what case allows a message to be sent to an actor that hasn't yet had handlers registered.

Java Duration @RequiresApi(Build.VERSION_CODES.O)

Have some limitations when using this in android, because Java Duration @RequiresApi(Build.VERSION_CODES.O).
I can using val res = requestAwait(pid, "Proto.Actor", Duration.ofMillis(200)) for support android verson < API 26.

Actor can receive a user message before Started message is successfully processed

Not sure what the intended behaviour is.

We have an actor that takes a long time to initialise, and this is carried out on receiving the Started message. If too long is taken a timeout exception can be thrown during initialisation. We would like to see restart attempts, but until the actor has returned from processing a Started message without exception, we don't want it to process any user messages.

However the actor does receive other user messages in between Started and Restarting messages. I wrote a quick test to demonstrate this:

    fun test() {
        val prop = fromFunc { msg ->
            println("Mssg Received " + msg.toString())
            when (msg) {
                is Started -> {
                    Thread.sleep(1000)
                    throw Exception()
                }
                else -> {

                }
            }
        }
        val pid = spawn(prop)
        for (i in 1..2) {
            send(pid, "Proto.Actor")
        }
        Thread.sleep(5000)
    }

Gives this:

Mssg Received actor.proto.Started@2f386338
Handling root failure for nonhost/$1
Restarting nonhost/$1 Reason java.lang.Exception
Mssg Received actor.proto.Restarting@379abc62
Mssg Received actor.proto.Started@2f386338
Handling root failure for nonhost/$1
Restarting nonhost/$1 Reason java.lang.Exception
Mssg Received Proto.Actor
Mssg Received actor.proto.Restarting@379abc62

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.