Comments (12)
Interestingly, for me, this test crashes on the JVM as well.
from kotlinx.coroutines.
It also crashes for me on the JVM. On Android, it's either timing (so Ignored exception
is printed) or slightly different executor/dispatcher setup.
The cause is straightforward -- there are active coroutines running on the executor that is concurrently closed.
So, any attempt to (re)dispatch any coroutine on that dispatcher fails. Moreover, often there is no place in the code (or this place might be unrelated) to rethrow such an exception, thus an error. It cannot be reasonably ignored -- it indicates something went really wrong (e.g. it might imply that your finally
blocks in suspend
functions didn't execute).
So I won't triage it as a bug. We can document it better and maybe (but it's really debatable topic) treat it not as an internal error, but instead raise global exception handler immediately (which, on Android, will crash the app)
from kotlinx.coroutines.
So I won't triage it as a bug.
The error message clearly states that any such error is worth reporting to us:
kotlinx.coroutines.CoroutinesInternalError: Fatal exception in coroutines machinery for CancellableContinuation(DispatchedContinuation[DarwinGlobalQueueDispatcher@53c0308, Continuation @ 6]){Completed}@79400a0. Please read KDoc to 'handleFatalException' method and report this incident to maintainers
So, I don't think we can ignore it when people do just that.
Also, I think this actually is an issue on our side: since there is no guarantee that dispatch
doesn't throw, the places where it throws shouldn't be treated as surprises (that is, internal errors).
from kotlinx.coroutines.
True, we have to acknowledge that.
since there is no guarantee that dispatch doesn't throw
The documentation (somewhat vaguely, though) states the following:
This method should generally be exception-safe. An exception thrown from this method may leave the coroutines that use this dispatcher in an inconsistent and hard-to-debug state.
We can re-visit the corresponding places again, though there are not that many alternatives -- internal error or immediate crash (handleCoroutineException
, which is also invoked by the internal error path).
We have another place like this -- ThreadContextElement
(the documentation to handleFatalException
mentions it)
from kotlinx.coroutines.
It cannot be reasonably ignored -- it indicates something went really wrong (e.g. it might imply that your finally blocks in suspend functions didn't execute).
So what's correct way to close the dispatcher?
My real use case is simple: I'm implementing a client which setup websocket connection to backend and creates singleThreadDispatcher
.
When an event is received from the websocket - the client parses it on singleThreadDispatcher
then updates database withContext(Dispatchers.IO)
and emitting an onUpdated
event to outside.
Also the client has shutdown()
method where it closes the websocket connection and the singleThreadDispatcher
.
Obviously shutdown
could be called at any moment of time.
So my code looks like:
class Client {
private val dispatcher = newSingleThreadContext("MyDispatcher")
private val scope = CoroutineScope(dispatcher)
private var websocket: WebSocket? = null
val onUpdated = MutableSharedFlow<Event>()
private fun onWebsocketMessage(message: String) {
scope.launch {
val event = parseMessage(message)
withContext(Dispatchers.IO) {
updateDB(event)
}
processEvent(event)
onUpdated.emit(event)
}
}
fun shutdown() {
websocket?.close()
scope.cancel()
dispatcher.close()
}
}
Is there a better approach than just maintain collection of ioDispatcherJobs
and joinAll
them in the shutdown()
method?
Like:
class Client {
private val dispatcher = newSingleThreadContext("MyDispatcher")
private val scope = CoroutineScope(dispatcher + SupervisorJob())
private var websocket: WebSocket? = null
val onUpdated = MutableSharedFlow<Event>()
private val ioDispatcherJobs = mutableListOf<Job>()
private fun onWebsocketMessage(message: String) {
scope.launch {
val event = parseMessage(message)
val job = launch(Dispatchers.IO, CoroutineStart.LAZY) {
updateDB(event)
}
ioDispatcherJobs += job
job.join()
ioDispatcherJobs -= job
processEvent(event)
onUpdated.emit(event)
}
}
suspend fun shutdown() {
websocket?.close()
withContext(dispatcher) {
ioDispatcherJobs.joinAll()
}
scope.cancel()
dispatcher.close()
}
}
from kotlinx.coroutines.
May I suggest not using newSingleThreadContext
at all, instead doing Dispatchers.IO.limitedParallelism(1)
? This way, you won't need to close the dispatcher at all.
from kotlinx.coroutines.
Correct me if I'm wrong, but I don't see in documentation thatlimitedParallelism(1)
guarantees execution on single thread.
It means that I have to synchronize access to data everywhere, for example in my processEvent
method:
val allEvents = mutableListOf<Event>()
val mutex = Mutex()
fun processEvent(event: Event) {
mutex.withLock {
allEvents += event
}
}
Correct?
Which is exactly what I'm trying to avoid by using newSingleThreadContext
from kotlinx.coroutines.
I don't see in documentation that
limitedParallelism(1)
guarantees execution on single thread.
It doesn't guarantee that this will always run on the same thread, so if you have things like thread local variables, yes, limitedParallelism
on its own won't help you; but it does guarantee that the parallelism will be at most 1
, or, in other words, at most one thread at a time will execute the code scheduled on that dispatcher. So no, you don't need mutexes: only one thread at a time (though possibly a different one between calls) can call processEvent
.
from kotlinx.coroutines.
Thank you, that make sense. Two more questions regarding limitedParallelism(1)
then:
- Does it guarantees FIFO order of operations?
- As it could be executed on different threads I still have to to use atomic vars, otherwise it's not guaranteed that I read last value set to a var by a different thread. Correct?
var eventCounter by atomic(0)
fun processEvent(event: Event) {
eventCounter++
}
from kotlinx.coroutines.
- Yes, it stores a queue internally.
- No, the happens-before relationship is guaranteed by the coroutines machinery.
from kotlinx.coroutines.
Thank you so much! I'll try to go with limitedParallelism(1)
Regarding initial issue - I would expect the same behaviour on all platforms. So one of possible solution is to make the test crash on Android the same way how it crashes on iOS and JVM.
In this case common code debugged once on android - will work on other platforms without changes.
from kotlinx.coroutines.
@rusmonster, could you please explain why you thought that limitedParellelism
was unsuitable? @qwwdfsad
found this misleading piece of information: https://github.com/KStateMachine/kstatemachine/blob/master/docs/index.md#use-single-threaded-coroutinescope Are there any other ones?
from kotlinx.coroutines.
Related Issues (20)
- Provide an API to invoke a callback on job cancellation
- Even After bumping kotlinx coroutine test dependency to 1.8.0 giving an UncaughtExceptionsBeforeTest. HOT 4
- ThreadLocal.asContextElement may not be cleaned up when used with Dispatchers.Main.immediate HOT 8
- `kotlinx.coroutines.debug`'s `module-info.java` is incorrect preventing any project using JPMS to use debug probes
- BlockHound false positive in kotlin.jvm.internal.Reflection.renderLambdaToString HOT 3
- java.lang.NullPointerException: Cannot invoke "kotlinx.coroutines.flow.Flow.collect when bumping up coroutines 1.6.4 to 1.8.1 HOT 1
- Improve the API reference HOT 3
- Coroutine on EventLoop dispatcher fails to yield to a task which is resuming after a delay HOT 5
- TestScope swallows an exception thrown from `launch` outside `runTest` HOT 4
- Exceptions being swallowed during tests / UncaughtExceptionsBeforeTest HOT 5
- Flow collection silently hangs when dispatcher throws an exception
- Suggestion for a potential new Flow's `timeout` extension HOT 2
- Mutex is unlocked on cancellation
- Crash on GraalVM at `1.9.0-RC` HOT 7
- Non-linearizable behavior in `cancel` + `awaitClose` inside of `produce` HOT 1
- Inherit from `kotlinx.coroutines` warning even with interface delegation ? HOT 1
- Consider stabilizing `CoroutineStart.ATOMIC`
- 1.9.0-RC: `kotlinx-coroutines-core/jvm/src/internal/ProbesSupport.kt` calls ` kotlinx.coroutines.debug.internal.probeCoroutineResumed` HOT 1
- 1.9.0-RC: DispatchersToStringTest.testLimitedParallelism fails when CORE_POOL_SIZE == 2 HOT 3
- CancellableContinuation.invokeOnCancellation should accept a suspend callback HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kotlinx.coroutines.