Giter Site home page Giter Site logo

kotlinx.coroutines's Introduction

kotlinx.coroutines

Kotlin Stable JetBrains official project GitHub license Download Kotlin Slack channel

Library support for Kotlin coroutines with multiplatform support. This is a companion version for the Kotlin 1.9.21 release.

suspend fun main() = coroutineScope {
    launch { 
       delay(1000)
       println("Kotlin Coroutines World!") 
    }
    println("Hello")
}

Play with coroutines online here

Modules

Documentation

Using in your projects

Maven

Add dependencies (you can also add other modules that you need):

<dependency>
    <groupId>org.jetbrains.kotlinx</groupId>
    <artifactId>kotlinx-coroutines-core</artifactId>
    <version>1.8.1-Beta</version>
</dependency>

And make sure that you use the latest Kotlin version:

<properties>
    <kotlin.version>1.9.21</kotlin.version>
</properties>

Gradle

Add dependencies (you can also add other modules that you need):

dependencies {
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.1-Beta")
}

And make sure that you use the latest Kotlin version:

plugins {
    // For build.gradle.kts (Kotlin DSL)
    kotlin("jvm") version "1.9.21"
    
    // For build.gradle (Groovy DSL)
    id "org.jetbrains.kotlin.jvm" version "1.9.21"
}

Make sure that you have mavenCentral() in the list of repositories:

repositories {
    mavenCentral()
}

Android

Add kotlinx-coroutines-android module as a dependency when using kotlinx.coroutines on Android:

implementation("org.jetbrains.kotlinx:kotlinx-coroutines-android:1.8.1-Beta")

This gives you access to the Android Dispatchers.Main coroutine dispatcher and also makes sure that in case of a crashed coroutine with an unhandled exception that this exception is logged before crashing the Android application, similarly to the way uncaught exceptions in threads are handled by the Android runtime.

R8 and ProGuard

R8 and ProGuard rules are bundled into the kotlinx-coroutines-android module. For more details see "Optimization" section for Android.

Avoiding including the debug infrastructure in the resulting APK

The kotlinx-coroutines-core artifact contains a resource file that is not required for the coroutines to operate normally and is only used by the debugger. To exclude it at no loss of functionality, add the following snippet to the android block in your Gradle file for the application subproject:

packagingOptions {
    resources.excludes += "DebugProbesKt.bin"
}

Multiplatform

Core modules of kotlinx.coroutines are also available for Kotlin/JS and Kotlin/Native.

In common code that should get compiled for different platforms, you can add a dependency to kotlinx-coroutines-core right to the commonMain source set:

commonMain {
    dependencies {
        implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.1-Beta")
    }
}

Platform-specific dependencies are recommended to be used only for non-multiplatform projects that are compiled only for target platform.

JS

Kotlin/JS version of kotlinx.coroutines is published as kotlinx-coroutines-core-js (follow the link to get the dependency declaration snippet).

Native

Kotlin/Native version of kotlinx.coroutines is published as kotlinx-coroutines-core-$platform where $platform is the target Kotlin/Native platform. Targets are provided in accordance with official K/N target support.

Building and Contributing

See Contributing Guidelines.

kotlinx.coroutines's People

Contributors

alexanderprendota avatar ansman avatar danil-pavlov avatar dkhalanskyjb avatar dmitry-borodin avatar dzharkov avatar elizarov avatar erokhins avatar fvasco avatar ilya-g avatar inego avatar konrad-kaminski avatar koshachy avatar louiscad avatar mareklangiewicz avatar masoodfallahpoor avatar merfemor avatar mvicsokolova avatar ndkoval avatar nikitabobko avatar objcode avatar p7nov avatar paolop avatar petrakovichvictoria avatar qwwdfsad avatar sebastianaigner avatar turansky avatar vadimsemenov avatar woainikk avatar wojtek-kalicinski avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kotlinx.coroutines's Issues

ConflatedBroadcastChannel hangs in offer()

Hi there,

maybe there is a better way to achieve what I want, but with my current wrapper class around MutableMaps that allows coroutine waiting for key additions will at times get stuck and cause 100% cpu load. I'm using the recently added ConflatedBroadcastChannel class to inform all coroutines waiting for a key to arrive on a put operation.

Actually, I would only need a coroutine compatible wait/notify primitives... hints?

Anyway, when I pause the threads during the lockup, I will get a stacktrace that may look like this one:

<15> ForkJoinPool.commonPool-worker-3"@830,030,709,776 in group "main": RUNNING
getNext:121, LockFreeLinkedListNode {kotlinx.coroutines.experimental.internal}
takeFirstReceiveOrPeekClosed:920, AbstractSendChannel {kotlinx.coroutines.experimental.channels}
offerInternal:54, AbstractSendChannel {kotlinx.coroutines.experimental.channels}
offerInternal:39, ConflatedChannel {kotlinx.coroutines.experimental.channels}
offerInternal:268, ConflatedBroadcastChannel$Subscriber {kotlinx.coroutines.experimental.channels}
offerInternal:239, ConflatedBroadcastChannel {kotlinx.coroutines.experimental.channels}
offer:219, ConflatedBroadcastChannel {kotlinx.coroutines.experimental.channels}
put:11, AwaitableMap {com.example}

Any ideas what's going on here?

package com.example

import kotlinx.coroutines.experimental.channels.ConflatedBroadcastChannel

class AwaitableMap<K : Any, V : Any>(val delegateMap: MutableMap<K, V>) : MutableMap<K, V> by delegateMap {

    val channel = ConflatedBroadcastChannel<K>()

    override fun put(key: K, value: V): V? {
        val putVal = delegateMap.put(key, value)
        channel.offer(key)
        return putVal
    }

    suspend fun await(key: K): V {
        return delegateMap[key] ?: channel.open().use {
            do {
                val fetched = delegateMap[key]
                if (fetched != null) {
                    return@use fetched
                }
                it.receive()
            } while (true)
            return@use null
        }!!
    }
}

Execution of async code in a new thread

In current implementation execution of coroutine code starts in the same thread and continues until first execution point.
Probably it's worth to run it on a thread-pool (it could be another parameter with some sensible default value, e.g. ForkJoinPool.commonPool())

How to convert a callback to a Deferred on JDK6

I'm still a little new to Kotlin and especially its coroutines, so pardon me if I'm asking stupid questions.

Could you provide an example how to interface a third-party library utilizing callbacks with kotlinx-coroutines-core with JDK6?

If it's currently not possible, could the required functionality be considered to be added?

kotlinx-coroutines-jdk8 has CompletableFuture, but I'm not able to use JDK8. Another way would be to utilize something like streamsupport's streamsupport-cfuture, but I'd prefer not to add it as a dependency if possible.

Also, I could use channels, but if Deferred is the recommended way to transfer a single value between coroutines, I'd like to use that.

Love the work you are doing!

some problem when i use it

I just copy some code to my Idea and my kotlin version is 1.1.2 and kotlinx.coroutines version is 0.16.0
code is:
fun main(args: Array) = runBlocking {
launch(CommonPool) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500)
}
}
delay(1300) // just quit after delay
}


error is:
Exception in thread "ForkJoinPool.commonPool-worker-1" java.lang.NoSuchMethodError: kotlinx.coroutines.experimental.DelayKt.delay$default(JLjava/util/concurrent/TimeUnit;ILjava/lang/Object;)

How can i wait for another thread operator complete? (coroutine+Thread)

I want to write some coroutine code which will kill callback hell,with kotlinx.coroutines.Codes like this:

import kotlinx.coroutines.experimental.*
import kotlin.concurrent.*
import kotlinx.coroutines.experimental.channels.*


fun main(args: Array<String>) = runBlocking<Unit> {

val channel = Channel<Int>()

launch(context){

        thread(start = true) {  
        println("running from thread(): ${Thread.currentThread()}")
        channel.send(1)

    }

    channel.receive()
}

}

But which complie error

:Suspension functions can be called only within coroutine body

I know the error message means,But i want to ask if any object that can operator in another thread an can wait result in the coroutine?So i Can write sequence code which runs under callback!

0.2-alpha-1 doesn't work with Kotlin 1.1-M03

README.md was recently changed so that the Maven and Gradle snippets include 0.2-alpha-1 instead of 0.1-alpha-2. Most users of Kotlin 1.1 are likely to be using 1.1-M03 as that's what is available in IntelliJ and bintray.

README.md should be clear about which versions are compatible with which Kotlin versions. I only figured it out by looking at the commit history and going back to the previous version.

Writing complex actors

I need to create a complex actor using some private methods to update the internal state, but I cannot find any useful base class to implement ActorJob.

What is the best practice in such case?

[Request] Netty/Undertow async http example

I've been playing with async/await and studying the examples out of curiosity, but I'm having trouble making the leap to interop with existing frameworks.

I think a great addition to this repo would be a hello world demonstration of an async http handler that makes a fake database call val user = await(db.getUser()) to generate some toy html <!doctype html><div>hello ${user.name}</div> and then does the async write to the response.

A basic example with Netty or Undertow would help illuminate interop, the necessary boilerplate, and what the end result abstraction could look like.

CoroutineScope "context" shadows Android context

In Android Activities, Fragments, and Views there is a getContext() method that is used frequently. When used with Kotlin the IDE suggests using property access syntax with the synthetic property context. This is shadowed when using an async or launch coroutine block, though, by the CoroutineContext context of CoroutineScope.

We can use getContext() instead and it's not the end of the world, but I wanted to file the issue to see if there should instead be an alternate name for the CoroutineContext. I imagine many people will run into this "issue" whether writing code or reading other's code and getting confused by the two different "contexts".

Cheers.

wrapping callbacks that fire more than once

I am now trying to convert some Firebase code on Android to use coroutines. some Firebase callbacks are a bit weird in that they sometimes trigger more than once. For example for getting the download url from firebase storage I have written this:

    private suspend fun getFileDownloadUrlAsync(file_sref: StorageReference): Uri = suspendCoroutine { c ->
        with(file_sref.downloadUrl) { //<---- call to Firebase getDownloadUrl()
            addOnCompleteListener { //<--- this callback can trigger more than once!
                if (it.isSuccessful) {
                    c.resume(it.result)
                } else
                    c.resumeWithException(Exception("some error"))
            }
            addOnFailureListener {
                c.resumeWithException(it)
            }
        }
    }

Since the completion listener is triggered more than once I get an IllegalStateException("Already resumed")
As a work-around I have defined

    class WrappedContinuation<T>(val c: Continuation<T>) : Continuation<T> {
        var isResolved = false
        override val context: CoroutineContext
            get() = c.context

        override fun resume(value: T) {
            if (!isResolved) {
                isResolved = true
                c.resume(value)
            }
        }

        override fun resumeWithException(exception: Throwable) {
            if (!isResolved) {
                isResolved = true
                c.resumeWithException(exception)
            }
        }

    }

    public inline suspend fun <T> suspendCoroutineW(crossinline block: (WrappedContinuation<T>) -> Unit): T =
            suspendCoroutine { c ->
                val wd = WrappedContinuation(c)
                block(wd)
            }

and I am using suspendCoroutineW instead of suspendCoroutine. Would it be possible to
modify SafeContinuation class in the library to offer similar functionality, since it already keeps the information about if the continuation has already been resolved or not, and expose it to the user?

Allow indirect calls to suspendable methods

The following code throws a compilation error because "suspendFunction()" is called from a non suspend method:

    suspend fun suspendFunction() {
        delay(1000)
    }

    fun notSuspendFunction() {
        suspendFunction()
    }

    @Test
    fun indirectSuspendCall() = runBlocking<Unit> {
        notSuspendFunction()
    }

However, indirectly, the notSuspendFunction() method is called into a runBlocking block so I would expect it to run without issues at runtime.
While I clearly understand why there is this check at compile time, this forces to transitively modify all the functions of your code to add the "suspend" annotation. In some situations this is not possible.
For example, imagine that suspendFunction() is in a DAO library and "notSuspendFunction()" is in a project that uses it, in this case you could provide a coroutine based alternative implementation for the DAO but it will break all the library consumers.

Mutex.unlock with Unconfined dispatcher causes StackOverflowError

Just run the following example and look at the result

fun main(args: Array<String>) {

    val waiters = 1000

    val mutex = Mutex()
    val start = CountDownLatch(1)
    val done  = CountDownLatch(waiters)

    launch(CommonPool) {
        mutex.lock()
        try {
            start.await()
        } finally {
            mutex.unlock() // StackOverflowError
        }
    }

    repeat(waiters) {
        launch(Unconfined) {
            mutex.withLock {
                done.countDown()
            }
        }
    }

    start.countDown()
    println(done.await(1, SECONDS))
}

Could you avoid using call stack when iterating lock waiters?

Broadcast channel

It was considered to include a broadcaster?

This should be an inverse of "select", each broadcast message will sent to all registered channel, like an asynchronous event bus.

Introduce a "sending side" (settable) for Deferred

As reported by @vaskir at public Slack:

What if I want to pass it as a channel somewhere, then wait until it's ready? I do it with a channel like this:

    val reply = Channel<Long>()
    getAndResetCh.send(reply)
    return reply.receive()
}

so I send a message to an actor "do the job, then send the result to this channel when ready", then I wait on receive asynchronously.

So, the use-case is basically to send some object to remote actor to receive a single answer from it. How to we name such and object and what would its "sending" (setting) function be named?

Stack trace recovery

Look at the example below

fun main(args: Array<String>) = runBlocking {
    xxx()
}


suspend fun xxx() {
    delay(100)
    throw Exception()
}

The execution of the code results to an exception with the incomplete stack trace.

Exception in thread "main" java.lang.Exception
	at TestKt$xxx$1.doResume(Test.kt:20) // throw Exception()
	at kotlin.coroutines.experimental.jvm.internal.CoroutineImpl.resume(CoroutineImpl.kt:54)
        ....
	at TestKt.main(Test.kt:13) //  runBlocking

The stack is starting from the bridge function and completes with the last continuation. All the intermediate calls are missing.

The problem makes it difficult to debug the app because it is not possible to determine the true continuation caller. The stack trace of an exception, thrown from a coroutine, becomes useless.

Help newbies to handle exceptions in coroutines

I can't figure how to catch the exceptions thrown from a launch block. I don't understand how to use the CoroutineExceptionHandler. I think that the Try pattern is quite heavy (https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/Try.kt). The CompletableFuture wrapper works as expected (simply), but it requires the jdk8 lib.
Thanks for this masterpiece !

import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.future.await
import kotlinx.coroutines.experimental.future.future
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking
import org.junit.Ignore
import org.junit.Test
import java.util.concurrent.atomic.AtomicBoolean

class CoroutinesTest {

    class TestException: Exception("test exception")

    @Test
    @Ignore("not that way...")
    fun testExceptionOnLaunch_tryCatch() {
        runBlocking {
            val caught = AtomicBoolean(false)
            val job = launch(context) {
                delay(10)
                throw TestException()
            }
            try {
                job.join()
            } catch (e: TestException) {
                caught.set(true)
            } finally {
                assert(caught.get())
            }
        }
    }

    @Test
    @Ignore("not that way...")
    fun testExceptionOnLaunch_getCompletionException() {
        runBlocking {
            val job = launch(context) {
                delay(10)
                throw TestException()
            }
            try {
                job.join()
            } finally {
                val exc = job.getCompletionException()
                assert(exc is TestException)
            }
        }
    }

    @Test
    fun testExceptionOnFuture() {
        System.setProperty("kotlinx.coroutines.debug", "off")
        runBlocking {
            val caught = AtomicBoolean(false)
            val job = future(context) {
                delay(10)
                throw TestException()
            }
            try {
                job.await()
            } catch (e: TestException) {
                caught.set(true)
            } finally {
                assert(caught.get())
            }
        }
    }
}

CoroutineDispatcher's names

CoroutineDispatcher's names are too heterogeneous (CommonPool, Unconfined) and programmer isn't helped to collate these in the same hierarchy.
Other CoroutineDispatchers have too common name (Swing, JavaFX) and are subjects to name clash: Look And Feel engine can also expose "Swing" and "JavaFX" singletons.

My prosose is to define a common suffix like "CoroutineDispatcher", "CDispatcher" or simply "Dispatcher" to grouping names.

I.E.

  1. CommonDispatcher
  2. UnconfinedDispatcher
  3. SwingDispatcher
  4. JavaFXDispatcher
launch(CommonDispatcher) {
    . . .
}

[Support] What happened to kotlinx.coroutines.generate?

Hello there,

a while back (with 0.2-beta) I wrote some tree-walking code using function kotlinx.coroutines.generate. Specifically, the generate-function would create Sequence objects given a coroutine body containing yield suspension points.

I cannot find that library function anymore. Indeed, the Maven artifact itself seems to have gone. Is there a replacement?

DeferredCompletionSource ?

In .NET we have the TaskCompletionSource<T> which contains a Task that can be awaited.
The TCS can then be used to trigger completion of this task, making it possible to let clients await the TCS task, and then on whatever signal/event, the task is set to completed.

Is there anything like this for Deferred<T> ?

I suppose I could use a channel, and let someone await a message on the channel and send to the channel when the event/signal occurs.

But: IMO, it would have been cleaner with this support for Deferred as they communicate that there is a single value to be awaited, while a channel doesnt.

Lifecycle handling

I was trying to use coroutine Jobs for Android lifecycle handling together with the UI dispatcher, but I've stumbled upon an interesting issue where basically the following code can cause a crash:

var job: Job? = null

fun foo() {
  job = launch(UI) {
    val bar = download().await()
    // Cancel could have been called now, but the following code is posted to a Handler and executed anyway
    performSomeWorkOnTheUI(bar) // Causes a crash after onPause
  }
}

override fun onPause() {
  job?.cancel()
}

A more elaborate example that consistently crashes due to the IllegalStateException thrown in onResume can be found here. The repo contains a complete executable project.

Event conflation

I've read Guide to UI programming with coroutines. Event conflation. But unfortunately didn't catch one moment. For example I have switching button (with 3 or more states), which starts some "long" task depending on button state. Button should change state quickly on every click, but task (as it can't switch so fast) should switch only to most recent state from eventActor. So how to realize it? Should I keep two actors in fun Node.onClick>: one for UI (with capacity = default) and another for processing (with capacity = Channel.CONFLATED). Or it can be done other way?

Go-like WaitGroup to wait multiple coroutines to finish

Hi,
This is more a question than an issue, however - are there any plans to have WaitGroup-like mechanism? Go documentation for WaitGroup is here.

The problem it is supposed to solve is how to wait bunch of coroutines to complete? Here is a nice summary. Currently it is achievable via channels (sort of done channel) or just by doing jobs.forEach { it.join() }. However the latter requires extra list / array allocation whilst WaitGroup is effectively an atomic counter wrapper with some wait logic in it (see examples). Although it can be implemented quickly it still might be useful as a part of kotlinx.coroutines.

IO thread pool for blocking calls

We need some kind of IO dispatcher in kotlinx.coroutines that will be optimized for offloading of blocking calls and will be backed by an unbounded thread-pool that is smart enough to create new threads only when needed and to shut them down on timeout. The goal is that you could always wrap blocking IO call in withContext(IO) { ... } (UPDATED: it was formerly named run(IO)) and be sure that you will not run into the problem of having not enough threads in your pool. However, it will be up to the user to control the maximal number of concurrent operations via some other means.

It will be somewhat conceptually similar to newThread() scheduler in Rx, but it will be also designed to be used where io() scheduler in Rx is used.

JavaScript support

Any plan about to include JavaScript specifications into the library?

It could be great to make this library as a reference.

import org.w3c.fetch.Response
import kotlin.coroutines.experimental.*
import kotlin.js.Promise

external fun fetch(url: String): Promise<Response> = definedExternally

suspend fun <T> Promise<T>.await(): T = suspendCoroutine {
    then(it::resume).catch(it::resumeWithException)
}

fun <T> async(block: suspend () -> T) = Promise<T> { resolve, reject ->
        block.startCoroutine(completion = object : Continuation<T> {
            override val context: CoroutineContext = EmptyCoroutineContext

            override fun resume(value: T) {
                resolve(value)
            }

            override fun resumeWithException(exception: Throwable) {
                reject(exception)
            }
        })
    }
}


fun asyncTest() = async {
    val value = fetch("./test.txt").then(Response::text).await()

    println(value)
}

IO suspend methods are mixed into FutureController

Suspension functions for async IO are placed in FutureController due to:

  • async IO must somehow interop with async futures.
  • lack of extension suspend functions
  • even if those extensions were possible, these methods are already member-extensions, so they can't be outside of the controller.

I suggest to wrap async IO operations into CompletableFuture's instead and perform await on these futures. This would require to write:

totalBytesRead += await(input.asyncRead(buf, totalBytesRead))

instead of

totalBytesRead += input.aRead(buf, totalBytesRead)

but IMO it's much more clear that a suspend point is introduced in the first example.

CountDownLatch implementation

In JDK we have CountDownLatch. It'd be useful to have a CountDownLatch which is aware of coroutines. It could have the following interface:

interface CountDownLatch {
    fun countDown()
    fun getCount(): Long
    suspend fun await()
    suspend fun await(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS): Boolean
}

I created two different implementations:

  • First which uses a list of Continuations which are resumed when the counter goes down to 0,
  • Second which uses ConflatedBroadcastChaneel holding current state of latch. Note that the countDown function in this version is suspending.

I'm sure a better(faster!) implementation could be made.

Looking for an opinion on this.

Can't join a canceled task

Hi,

In the following code, I want to make sure my code blocks until the task is really finished:

        val job = async(CommonPool) {
            while (isActive);
            Thread.sleep(1000)
            System.out.println("end of async")
        }

        job.cancel()
        job.join()
        System.out.println("end of test")

The result is that I see "end of test" instantly, and "end of async" appears a second after, even though I asked to join() the task. Am I misusing the library or is this a bug?

asyncGenerate + extensions

Is there an implementation of asyncGenerate from here? https://github.com/Kotlin/kotlin-coroutines/blob/master/examples/asyncGenerate.kt

Also I would propose adding some extension methods to functionally handle it:

inline suspend fun <T, T2> AsyncSequence<T>.map(crossinline transform: (T) -> T2) = asyncGenerate<T2> {
	for (e in this@map) {
		yield(transform(e))
	}
}

inline  suspend fun <T> AsyncSequence<T>.filter(crossinline filter: (T) -> Boolean) = asyncGenerate<T> {
	for (e in this@filter) {
		if (filter(e)) yield(e)
	}
}

suspend fun <T> AsyncSequence<T>.chunks(count: Int) = asyncGenerate<List<T>> {
	val chunk = arrayListOf<T>()

	for (e in this@chunks) {
		chunk += e
		if (chunk.size > count) {
			yield(chunk.toList())
			chunk.clear()
		}
	}

	if (chunk.size > 0) {
		yield(chunk.toList())
	}
}

suspend fun <T> AsyncSequence<T>.toList(): List<T> = await(async {
	val out = arrayListOf<T>()
	for (e in this@toList) out += e
	out
})

Add support for Completable

Sometimes you have a Completable that you want to use as a form of barrier that you need to await. Currently, the only way to await a Completable is

completable.toObservable<Unit>.materialize().awaitFirst()

which is very verbose and wasteful.

withTimeoutOrNull returns null even when it did not timeout itself

Copied from message by @gregschlom at public slack:

Hey guys. I have an issue with nested withTimeouts. Here’s some example code:

    val channel = Channel<Int>()

    // this blocks indefinitely if the channel is empty
    suspend fun nexValue(): Int {
        println("Waiting for next value ")
        return channel.receive()
    }

    // same as nextValue(), but returns null after the timeout expires
    suspend fun nextValueWithTimout(t: Long): Int? {
        return withTimeoutOrNull(t) { nexValue() }
    }

    suspend fun longOperation() {
        println("Starting long operation")
        while (true) {
            val v = nextValueWithTimout(1000)
            if (v == null) {
                println("timed out")
                // Nothing was received in our channel.
                // Do some other work while we wait for the value to arrive. Maybe we want to re-try sending
                // a message to our value producer for example
            }
            if (v == 42) return
        }
    }

    @Test
    fun main() = runBlocking {
        // This never returns. Instead we get stuck in an infinite loop in the while() above
        withTimeout(5000) {
            longOperation()
        }
    }

The problem here is that we have two nested withTimeout calls

The code is generally suspended inside nextValue(). When the outer timeout fires (the one in the main function), there’s no way for the code in nextValueWithTimout to know whether it was it’s own timeout that fired or some other timeout

Therefore it returns null either way

Alternative naming for `generate-yeild`

I don't think that generate { yeild } naming is clear enough. In particular, there is no indication that this is a way to construct a Sequence.

I propose renaming this constructs into sequence { yeild }. That way it becomes just another factory method for creating sequences.

In addition we can have list {yeild}, which would be the eager version of it (can be done without coroutines). Also I thought about iterator {yeild}, but I think it doesn't read well.

Unresolved reference: "launch"

I was using "org.jetbrains.kotlinx:kotlinx-coroutines-core:0.11-rc" and everything was okey just warning that coroutines is experimental.
Now I have updated kotlin plugin to 1.1.0 and change coroutines dependency to "compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:0.12'" from your README. And now my AndroidStudio cannot see coroutines. I have unresolve reference error.
What I am doing wrong?

0.14 not in jcenter

I'm sorry if this is a bintray issue and doesn't belong here, but the 0.14 jar (which is referenced in the README) doesn't exist on bintray.

It shows up in search, here: https://bintray.com/kotlin/kotlinx/kotlinx.coroutines

However, the files are missing: https://jcenter.bintray.com/org/jetbrains/kotlinx/kotlinx-coroutines-core/

If you attempt to add 0.14 (or 0.14.1, which also shows up in search) gradle produces:

* What went wrong:
Could not resolve all dependencies for configuration ':compileClasspath'.
> Could not find org.jetbrains.kotlinx:kotlinx-coroutines-core:0.14.1.
  Searched in the following locations:
      https://repo1.maven.org/maven2/org/jetbrains/kotlinx/kotlinx-coroutines-core/0.14.1/kotlinx-coroutines-core-0.14.1.pom
      https://repo1.maven.org/maven2/org/jetbrains/kotlinx/kotlinx-coroutines-core/0.14.1/kotlinx-coroutines-core-0.14.1.jar
      https://jcenter.bintray.com/org/jetbrains/kotlinx/kotlinx-coroutines-core/0.14.1/kotlinx-coroutines-core-0.14.1.pom
      https://jcenter.bintray.com/org/jetbrains/kotlinx/kotlinx-coroutines-core/0.14.1/kotlinx-coroutines-core-0.14.1.jar
  Required by:
      project :

I managed to receive a jar by changing my version to 0.13

It might be worth noting that kotlinx-coroutines has a pom file, but no jar:
https://jcenter.bintray.com/org/jetbrains/kotlinx/kotlinx-coroutines/0.14.1/

How can I restart canceled job?

After cancel job, how can I restart?
This is my code. (kotlin)

send_udp_repeat = launch(CommonPool) {
            var nextPrintTime = System.currentTimeMillis()
            while (isActive) {
                val currentTime = System.currentTimeMillis()
                if (currentTime >= nextPrintTime) {
                    sendUDPrequest(UDP_SERVER_IP, UDP_SERVER_PORT, UDP_SERVER_MSG)
                    nextPrintTime = currentTime + 2000L
                }
            }
        }
commu_status.setOnClickListener {
            if (send_udp_repeat.isActive == true) {
                send_udp_repeat.cancel()
            }
        }
menu_button.setOnClickListener {
            // I want to restart send_udp_repeat here
        }

Serializability of coroutine classes

Currently, if you attempt to serialize coroutine state via standard Java Serialization the tree of reference leads you to the objects defined in kotlinx.coroutines like StandaloneCoroutine (which serves as a completion for launched coroutines) and its contexts (like CommonPool) which are not currently serializable. They should be made properly serializable.

See also https://github.com/Kotlin/kotlin-coroutines/issues/28

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.