hopac / hopac Goto Github PK
View Code? Open in Web Editor NEWhttp://hopac.github.io/Hopac/Hopac.html
License: MIT License
http://hopac.github.io/Hopac/Hopac.html
License: MIT License
I'm having some trouble working with the newly added polling channel communications. Here's the code:
let c = ch()
let receiver = job{
while true do
do! Ch.take c}
let poll = job{
let counter = ref 0
for i = 0 to 999 do
let! res = Ch.Try.give c ()
if not res then counter := !counter + 1
printfn "%A" !counter }
let run() =
start receiver
run poll
printfn "done"
The poll
job never completes and neither the counter
nor "done" are printed. Am I doing something silly?
Did a quick read of IVar.cs
. Other frameworks I read about recommend exponential backoff as a strategy for handling high contention. Can be quite simple - Thread.SpinWait every time we re-enter "Spin", with doubling the time, up to a threshold. Should be possible to construct a high-contention benchmark to experiment with this.
Does this sound useful, or are there other abstractions in the library that do this better than IVar's? Thanks.
Write examples and documentation to show how use of fresh reply ivars/chs in CML style avoids the need to filter responses to canceled operations in client-server protocols that seems to be common in Erlang as seen several times in the Erlang Programming book by Cesarini and Thompson. See e.g. page 105.
Somehow this infers Array for xs, not seq. Thoughts?
let push out xs =
job {
for x in xs do
do! Ch.give out x
}
Hi,
Is it possible to give a value to a channel if it has a listener, or to proceed without blocking or buffering the value when it has none? This is basically behaviour similar to an event.
(The question is a duplicate of http://stackoverflow.com/questions/24721444/hopac-can-i-give-value-to-a-channel-only-if-someone-is-listening-to-it but as the only two mentions of Hopac on stackoverflow seem to be mine, perhaps better to post it here).
Thanks for making Hopac publicly available. I can honestly say it's one of my favourite things about whole F#.
Ubuntu 14.04.1 LTS, 64 bit
Mono JIT compiler version 3.2.8 (Debian 3.2.8+dfsg-4ubuntu1)
Hopac 0.0.0.25
CPU load is close to zero.
(partial) output:
SerAsc: 55 - 0.025753s
ParTsk: 55 - 12.259953s (7.177841 tasks/s)
ParAsc: 55 - 0.026713s
SerFun: 6765 - 0.000148s (10945 recs)
ParOpt: 6765 - 0.003772s (2901643.690350 jobs/s)
ParJob: 6765 - 0.009304s (1176401.040435 jobs/s)
ParPro: 6765 - 0.104681s (104555.545790 jobs/s)
SerOpt: 6765 - 0.005330s
SerJob: 6765 - 0.006729s
FibNck: 6765 - 0.011253s
SerAsc: 6765 - 0.013057s
ParTsk: 6765 - 192.592561s (56.829817 tasks/s)
ParAsc: 6765 - 0.866197s
SerFun: 832040 - 0.014316s (1346268 recs)
ParOpt: 832040 - 0.244005s (5517381.003414 jobs/s)
ParJob: 832040 - 0.264780s (5084483.440385 jobs/s)
ParPro: 832040 - 0.323679s (4159274.045303 jobs/s)
SerOpt: Warning: Degraded allocation. Consider increasing nursery-size if the warning persists.
Warning: Degraded allocation. Consider increasing nursery-size if the warning persists.
832040 - 2.997870s
SerJob: Warning: Repeated degraded allocation. Consider increasing nursery-size.
832040 - 6.977952s
FibNck: 832040 - 1.010520s
SerAsc: 832040 - 0.819772s
ParTsk:
It hangs for several minutes on this state.
Core i5 (4 physical cores), windows 7 x64:
ChMsg: 4 * 300 msgs => 24811 msgs/s
ChMsg: 4 * 3000 msgs => 4049403 msgs/s
ChMsg: 4 * 30000 msgs => 2429420 msgs/s
ChMsg: 4 * 300000 msgs => 3767980 msgs/s
ChMsg: 4 * 3000000 msgs => 4213296 msgs/s
MbMsg: 4 * 300 msgs => 56186 msgs/s
MbMsg: 4 * 3000 msgs => 2688413 msgs/s
MbMsg: 4 * 30000 msgs => 2772842 msgs/s
MbMsg: 4 * 300000 msgs => 3313607 msgs/s
MbMsg: 4 * 3000000 msgs => 2959431 msgs/s
Xeon (24 physical cores), windows server 2012R2 x64:
ChMsg: 48 * 300 msgs => 73654 msgs/s
ChMsg: 48 * 3000 msgs => 515786 msgs/s
ChMsg: 48 * 30000 msgs => 600174 msgs/s
ChMsg: 48 * 300000 msgs => 610965 msgs/s
ChMsg: 48 * 3000000 msgs => 618576 msgs/s
MbMsg: 48 * 300 msgs => 135676 msgs/s
MbMsg: 48 * 3000 msgs => 558427 msgs/s
MbMsg: 48 * 30000 msgs => 994872 msgs/s
MbMsg: 48 * 300000 msgs => 872920 msgs/s
MbMsg: <cannot managed to wait this one to finish>
An older Xeon (8 cores), windows server 2012R2 x64:
ChMsg: 8 * 300 msgs => 39138 msgs/s
ChMsg: 8 * 3000 msgs => 1219054 msgs/s
ChMsg: 8 * 30000 msgs => 1225679 msgs/s
ChMsg: 8 * 300000 msgs => 1191693 msgs/s
ChMsg: 8 * 3000000 msgs => 1203870 msgs/s
MbMsg: 8 * 300 msgs => 119806 msgs/s
MbMsg: 8 * 3000 msgs => 596963 msgs/s
MbMsg: 8 * 30000 msgs => 784205 msgs/s
MbMsg: 8 * 300000 msgs => 626359 msgs/s
MbMsg: 8 * 3000000 msgs => 558681 msgs/s
Hopac currently does not support anything like the SynchronizationContext
mechanism that would allow the execution of Hopac jobs to be pinned to particular threads (or groups of threads) such as the main UI thread.
The overhead for supporting synchronization contexts is significant. Just adding the extra field corresponding to such a context, without making any use of the field, would seem to add at least about 10% execution time overhead in many benchmarks. When execution of a Hopac job needs to be started on a particular synchronization context that is not controlled by Hopac code (like the main UI thread), an operation needs to be posted to the context and the overhead is likely to be an order of magnitude higher than with Hopac's custom scheduler.
Despite the overhead cost, and contrary to my previous opinion, it would seem worth it to support synchronization contexts (and other similar mechanism). Doing so allows Hopac to be seamlessly used for GUI programming. Hopac performance will still likely be significantly better than with current F# Async. So, basically there will be one less reason to use F# Async at all.
I built Hopac from source in VS, referenced Hopac
, Hopac.Core
and Hopac.Platform
(...\Libs\Hopac.Platform.Net\bin\Release\Hopac.Platform.dll). Everything is ok when I run scripts in FSI, but it raises NullReferenceException
when running in a console app. References are looked like this:
The same exception occurred earlier in FSI when I didn't reference Hopac.Platform
.
MailboxProcessor in F# is specifically designed for F# and because of different implementation of Task and Async, MailboxProcessor isn't a pleasant experience in C#.
My question is Hopac the same thing as MailboxProcessor from C# point of view? or Can we modified it to use it & make it a pleasant for C# developer.
I haven't looked at the source code and asking if some one can intuitively think that it isn't much effort to use it from C#?
Public Hopac libs do not directly provide primitives for distributed computing. Implement a "remote" mailbox for communication between nodes on (potentially) separate physical nodes. Provide mechanisms to easily connect nodes, create remote mailboxes and observe node failures (Erlang monitor_node -> Node.Alt.down node).
@VesaKarvonen do you mind if I add a one-line readme.md with CI build status?
I'm going to reorganize the libraries somewhat. I plan to eliminate the Hopac.Extra
library by moving a number of modules from it to Hopac
and the remaining modules to Hopac.Experimental
. I plan to move the following modules and associated types to Hopac
:
Hopac.Extra.Alt
-> Hopac.Alt
Hopac.Extra.BoundedMb
-> Hopac.BoundedMb
Hopac.Extra.Streams
-> Hopac.Stream
(Note: name changed)Hopac.Extra.StreamVar
-> Hopac.Stream.Var
Hopac.Extra.StreamSrc
-> Hopac.Stream.Src
Hopac.Extra.StreamsBuilders
-> Hopac.TopLevel
The other modules will be moved to Hopac.Experimental
. Many of those modules are either rather incomplete (e.g. Hopac.Extra.Stream
) or unlikely to be really useful (e.g. DirCh
and SwapCh
).
This should also make the name of the Hopac.Extras
library more meaningful.
The current messaging primitives of Hopac are optimized for low-contention scenarios. This is what should be done to maximize end-to-end processing performance with lightweight threads. However there are cases where a server needs to perform well even in potentially high-contention scenarios and it would be useful to have a "fan-in" mailbox for those cases. The idea of a FanIn mailbox is that it allows simultaneous access by multiple producer threads without interference. To make it really work, message ordering guarantees need to be relaxed or performance doesn't really improve. BTW, it would seem that there is no need for a FanOut mailbox, because lightweight threads and work distributing scheduler already take care of that.
Hi, I noticed you removed all of the README files a short while ago. What bearing does this have on the state of of the project? Is it ok to start using right now, or is there some kind of overhaul in progress?
In #29 @vasily-kirichenko ran into a subtyping issue with the >>=?
combinator. I wonder if all the combinators in Hopac should be changed to use bounded quantification at covariant positions where a user given function is expected to return a Job<_>
or an Alt<_>
?
For example, the current signature of >>=?
is
val (>>=?): Alt<'x> -> ('x -> Job<'y>) -> Alt<'y>
and it would be changed to
val (>>=?): Alt<'x> -> ('x -> #Job<'y>) -> Alt<'y>
This is a breaking change in the sense that existing code will need to be modified (e.g. upcast
s removed sometimes other type ascriptions added) to compile. I would assume that the amount of client code using Hopac is still small enough that it would not be a major problem. Any thoughts on this? (If I get no replies, I assume nobody cares and I will just make the change.)
I actually did a test run of this modification locally about a week ago, but I reverted the changes. It seemed that it is an overall win: to my eyes client code got a little bit simpler with fewer upcasts and other type ascriptions. The problem in #29 would disappear, for example. However, it also does change the inferred types in other ways and you will need to add type ascriptions elsewhere. So, it is not just a clear 100% improvement.
Does someone have an idea for a better name for the hold combinator? See the doc here.
I'm trying to implement a higher-order function which should
Alt
which is returned by a function passed as an argument.My attempt is as following:
let ch = ch<int>()
let take() = Ch.take ch
let g (f: int -> Alt<_>) = Alt.guard << Job.delay <| fun _ ->
take() |>>? fun x -> f x
run (ch <-+ 0)
for
(g (fun _ -> Timer.Global.timeOutMillis 3000 |>>? fun _ -> printfn "g timeout")) <|>?
(Timer.Global.timeOutMillis 10000 |>>? fun _ -> printfn "outer timeout")
|> run
It results with g timeout
.
And for
(g (fun _ -> Timer.Global.timeOutMillis 10000 |>>? fun _ -> printfn "g timeout")) <|>?
(Timer.Global.timeOutMillis 3000 |>>? fun _ -> printfn "outer timeout")
|> run
it results with outer timeout
, which is the desired behavior.
But if I not give a message to ch
, then it hungs forever. As I understand, it waits on take()
and not reaches fun x -> f x
which is return the timeout.
How I can fix this?
Add some FAKE support.
I have started using Hopac as an alternative to Async/TPL and I love it. I understand basic usage, but some aspects are still not clear.
First, could we compare Alt
to F# lazy
, so that a job inside an Alt
is only evaluated on Alt.pick?
Second, is this implementation of AutoResetEvent correct and idiomatic for Hopac?
/// <summary>
/// MSDN: The AutoResetEvent class represents a local wait handle event that resets automatically
/// when signaled, after releasing a single waiting thread. An AutoResetEvent object is automatically
/// reset to non-signaled by the system after a single waiting thread has been released.
/// If no threads are waiting, the event object's state remains signaled.
///
/// Hopac's alternative to http://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266923.aspx
/// </summary>
type HopacAutoResetEvent (initialState : bool) =
// We will wait on take, and set with send
let setChannel : Ch<unit> = ch()
do if initialState then start <| Ch.send setChannel ()
new() = HopacAutoResetEvent(false)
member this.Wait(timeout:int) : Job<bool> =
let timedOut : Alt<bool> =
((float timeout) |> TimeSpan.FromMilliseconds |> Timer.Global.timeOut)
>>=? fun () -> Job.result false
let signaled = Ch.Alt.take setChannel >>=? fun () -> Job.result true
signaled <|> timedOut
// From docs, important for <|>:
// The given alternatives are processed in a left-to-right order with short-cut evaluation.
// In other words, given an alternative of the form first <|> second, the first alternative
// is first instantiated and, if it is pickable, is committed to and the second alternative
// will not be instantiated at all.
member this.Set() : Job<unit> =
// from MSDN: Also, if Set is called when there are no threads waiting and the EventWaitHandle
// is already signaled, the call has no effect.
// try take and send covers all cases
// if there was no waiters and state was signalled -> will steal the state and send it back immediately
// if there were waiting thread or state was not signaled -> there was no signal and we steal nothing, just signal
(Ch.Try.take setChannel) >>. Ch.send setChannel ()
Third, is this implementation of ManualResetEvent correct and idiomatic for Hopac?
/// <summary>
/// Hopac's alternative to http://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266920.aspx
/// </summary>
type HopacManualResetEvent (initialState : bool) =
[<VolatileFieldAttribute>]
let mutable state : bool = initialState
let setChannel : MChan<bool> = run <| Multicast.create ()
let lock = Lock.Now.create()
new() = HopacManualResetEvent(false)
member this.Wait() : Job<bool> =
let rec loop () =
job {
if state then return true
else
let! port = Multicast.port setChannel
let! res = (Multicast.recv port) // waiting here
if res then return true
else return! loop ()
}
loop ()
// From Multicast.fsi: **Sends** a message to all of the ports listening to the multicast channel.
// Send must mean the same as in Ch
member this.Set() : Job<unit> =
(Multicast.multicast setChannel true) // there could be no waiters
|>> (fun _ -> state <- true ) // in any case we set the state
>>% () // and return unit
|> (Lock.duringJob lock)
member this.Reset() : Job<unit> =
(Multicast.multicast setChannel false) // (redundant?) if there are takers, res in loop() will be false and loop will iterate
|>> (fun _ -> state <- false ) // in any case we set the state
>>% ()
|> (Lock.duringJob lock)
Thanks in advance!
Cross-post: http://stackoverflow.com/questions/26912904/better-understanding-of-f-hopac-library
fun accum sum = sun (
choose [
wrap (recvEvt addCh, fun x => accum(sum+x)),
wrap (recvEvt subCh, fun x => accum(sum-x)),
wrap (sendEvt (readCh, sum), fun () => accum sum)
])
let rec accum sum =
Alt.choose [ Ch.Alt.take addCh >>=? fun x -> accum (sum + x)
Ch.Alt.take subCh >>=? fun x -> accum (sum - x)
Ch.Alt.give readCh sum >>=? fun _ -> accum sum ]
But it does not compile since >>=?
right argument must be Job<_>
. So, is it not possible to create new Alt
s from existing ones combining them with combinators? I'm not sure why Job
must appear in such definitions at all.
Hypothesis: Erlang style pattern matching based selective communication (which also obviously works with CML) is good for implementing internal/closed protocols while CML style selective communication is good for external/open combination of such protocols. Write documentation to elaborate on this hypothesis. Show how issues, such as need to filter mailbox contents and race conditions that seem to be frequent in the Erlang Programming book, are due to attempts to do external combinations of otherwise perfectly working internal protocols and how CML style events help to avoid such problems with ease.
What's the best way to consume ordinary IEvent
(or IObservable
)? This is what I'm trying:
type RunningProcess =
{ LineOutput: Ch<Line>
ProcessExited: IVar<ExitResult> }
let execute (p: Process) =
let lineOutput = ch()
let processExited = ivar()
let server() =
Streams.subscribingTo p.OutputDataReceived <| Streams.iterJob (fun args ->
if args.Data <> null then
lineOutput <-+ args.Data
else Job.unit())
>>.
(Streams.subscribingTo p.Exited <| Streams.iterJob (fun _ ->
Job.start (processExited <-= p.ExitCode)))
start (server())
if not <| p.Start() then failwithf "Cannot start %s." p.StartInfo.FileName
p.BeginOutputReadLine()
{ LineOutput = lineOutput; ProcessExited = processExited }
Here I basically create two streams based on two events and route each event to a dedicated channel.
Client code is as following:
let pr = <create a RunningProcess instance here>
let rec loop() =
(pr.LineOutput >>=? fun line ->
Job.start (job { printfn "Line: %s" line }) >>. loop()) <|>?
(pr.ProcessExited |>>? fun res ->
printfn "Exited with code %A." res)
<|>? ...other Alts...
start (loop())
Besides it does not work, is this code a good use case for streams? Or it's possible to consume event more easily in this case?
Hi, thanks for the library, it's looking very useful - fills a big hole in concurrency abstractions in F# :)
Just looking for the first time over the code, why did you settle on using C# for the Core library? Is it really for performance or? I find C# a bit harder to read.
I'll need more time to read/play with this lib. Are there areas where you'd like contributions? Documentation comes to mind..
Thanks!
I'm working on tests for ParallelExecutor
and I cannot understand why https://github.com/Hopac/Hopac.Extras/blob/master/tests/Hopac.Extras.Tests/ParallelExecutorTests.fs#L42 hangs then failed with timeout exception. I suspect that something wrong with the test itself.
I'd like to be able to cancel or permanently block a job by using its handle only (without changing a given job's body). Is it possible? An example
let jb = job{
let sleep = Hopac.Timer.Global.sleep (TimeSpan.FromMilliseconds 1000.)
let inner = job{
do! sleep
printfn "slept"}
do! inner
do! inner
}
start jb
//Do something so that "slept" is not printed two times
A second related question: is there a more primitive way of converting a job to an Alt that is available for picking once the job completes than something like
let runRead jb =
let ivar = ivar()
start (jb >>= IVar.fill ivar)
IVar.Alt.read ivar
Thanks!
Hi,
May be resort to use full naming
Channel
Alternative
Actor
ActorThread
Continuation
Also where possible and appropriate have operator to back the functionality for conciseness.
Does this make sense?
I think asyncAsAlt
is very useful for interop with Async
s, so I suggest to include it into the lib (something like Alt.ofAsync
).
Thread spawning, suspend and resume are very fast with cooperative threads and should be used to advantage. Polling should not be used. Hostile long running stuff, if really needed, should be run with native threads/scheduler, which are designed for that (via preemption and fairness mechanisms).
fun mkUIdSrc () = let
val ch = channel()
fun loop i = (send(ch, i); loop(i+1))
in
spawn (fn () => loop 0);
fn () => recv ch
end
My attempt:
let uidServer = Job.delay <| fun _ ->
let channel = ch()
let rec loop value =
Ch.give channel value >>. loop (value + 1)
Job.start (loop 0) >>% channel
let server = run uidServer
run (Ch.take server)
It causes Stack Overflow on run uidServer
. I'm struggling to understand what's going wrong here.
May be you can provide the Actor implementation also as per the example or may be improved.
See page 257 in the book Erlang Programming by Cesarini and Thompson. The bottom example seems to have a race condition with the monitor_node pattern. Specifically, a node might send the ok response and then die before the client receives the response leaving a spurious nodedown message in the mailbox of the client. Assuming I'm correct, the authors fail to notice it despite mentioning it is important to demonitor, because it generates nodedown messages. Write documentation to explain how defining "nodedown" as a CML style event and using CML style selective communication avoids race conditions such as the one in the book, while also making code more modular. CML style selective communication isn't just for timeouts.
Sorry for long question, but I really stuck on it. I'm afraid I don't fully understand guards.
I'm trying to implement a resizable pool of workers. It has degreeOfParallelism
MVar
which determines how many parallel jobs are actually executing at any moment (maximum).
It works OK until I call SetDegreeOfParallelism
with a value which is smaller that current one. At this moment the pool stops working silently. The output is
Getting from source...
Alt.never
[worker] Received 1. Sleeping...
[worker] Received 1000. Sleeping...
Giving result to workDone: Choice1Of2 1
Giving result to workDone: Choice1Of2 1000
Work done: Choice1Of2 1 (worker count = 2)
Getting from source...
Alt.never
[worker] Received 999. Sleeping...
Work done: Choice1Of2 1000 (worker count = 2)
Getting from source...
Alt.never
[worker] Received 998. Sleeping...
Giving result to workDone: Choice1Of2 999
Giving result to workDone: Choice1Of2 998
Work done: Choice1Of2 999 (worker count = 2)
Getting from source...
Alt.never
[worker] Received 997. Sleeping...
Work done: Choice1Of2 998 (worker count = 2)
Getting from source...
Alt.never
[worker] Received 996. Sleeping...
Giving result to workDone: Choice1Of2 997
Giving result to workDone: Choice1Of2 996
Work done: Choice1Of2 997 (worker count = 2)
Getting from source...
Alt.never
[worker] Received 995. Sleeping...
Work done: Choice1Of2 996 (worker count = 2)
Getting from source...
Alt.never
[worker] Received 994. Sleeping...
// call SetDegreeOfParallelism here
val it : unit = ()
Alt.never
Giving result to workDone: Choice1Of2 995
Giving result to workDone: Choice1Of2 994
So it synchronizes on Alt.never()
or workDone
channel. The two workers call Ch.give workDone
but Alt.choose
does not synchronize on it. I cannot understand why.
open Hopac
open Hopac.Job.Infixes
open Hopac.Alt.Infixes
open Hopac.Infixes
type Pool<'msg, 'res, 'error>(degreeOfParallelism: int, source: Alt<'msg>, worker: 'msg -> Job<Choice<'res, 'msg * 'error>>) =
let degreeOfParallelism = MVar.Now.createFull degreeOfParallelism
let dopChanged = ch<unit>()
let workDone = ch<Choice<'res, 'msg * 'error>>()
let failedMessages = mb()
let getMessage workerCount =
let get() = Alt.guard (job {
let! dop = MVar.read degreeOfParallelism
//printfn "Dop read: %d. Worker count: %d" dop workerCount
return
if workerCount < dop then
printfn "Getting from source..."
source <|>? failedMessages
else
printfn "Alt.never"
Alt.never() })
get() <|>? (dopChanged >>.? get())
let pool = Job.iterateServer 0 <| fun workerCount ->
Alt.choose [ getMessage workerCount >>=? fun msg ->
Job.queue (worker msg >>= fun r -> printfn "Giving result to workDone: %A" r; Ch.give workDone r)
>>% workerCount + 1
workDone >>=? fun r ->
printfn "Work done: %A (worker count = %d)" r workerCount
match r with
| Choice1Of2 _ -> Job.result (workerCount - 1)
| Choice2Of2 (msg, _) -> failedMessages <<-+ msg >>% workerCount - 1
]
do start pool
member __.SetDegreeOfParallelism value =
degreeOfParallelism >>. MVar.fill degreeOfParallelism value >>. (dopChanged <-+ ()) |> run
module Test =
open System
open Hopac.Extensions
let mb = mb<int>()
let pool = Pool<int, int, exn>(2, mb, (fun msg -> job {
printfn "[worker] Received %A. Sleeping..." msg
do! Timer.Global.timeOut (TimeSpan.FromSeconds 1.)
return Choice1Of2 msg }))
[1..1000] |> Seq.Con.iterJob (Mailbox.send mb) |> run
pool.SetDegreeOfParallelism 1
Assuming you have a CML style server that takes request messages from a channel then you can make it hot-swappable to a degree quite easily. Specifically, make the channel be held in a mutable ref cell. That is, clients send requests to the server only by getting the channel from the ref cell. The server, on the other hand, has the channel held in a local variable and doesn't touch the ref cell. To hot-swap the server, simply create a new channel and start a new server that takes messages from the new channel. Then set the ref cell to point to the new channel. The old server will finish pending requests and will then be garbage collected as it is no longer accessible to new clients. Write an elaborated example that shows how this can be done in detail.
I'm struggling to understand why the first server works but the second does not:
let ch = ch<string>()
Job.foreverServer (job {
let! v = ch
printfn "from CE job: %s" v }) // only this output works
|> run
Job.foreverServer (Job.delay <| fun _ ->
ch >>% fun v -> printfn "from plane job: %s" v) // never prints anything
|> run
run (ch <-- sprintf "msg")
As far as I understand these two servers semantically equal. What am I doing wrong?
Hi Vesa,
Thanks for such a fantastic library!
The following example deadlocks sometimes, am I making a mistake somewhere? I have commented a line that may be suspicious.
The idea is to have a single server 'alphaServer' which is connected to by many 'betaServer's. All servers receive a request and then send a reply. The betaServers do this by forwarding the request on to the alphaServer. The alphaServer just sends back the same number.
// Server will listen on putCh and reply on getCh.
type Server = {
getCh : Ch<int>
putCh : Ch<int>
}
// Makes a new server that always repeats the same job
let newServer serverToJob =
let server = {
getCh = Ch.Now.create()
putCh = Ch.Now.create() }
server
|> serverToJob
|> Job.foreverServer
|> run
server
// Makes a job that sends x to a server and then receives the reply
let sendAndReceive server x =
server.putCh <-- x >>.
server.getCh
// alphaServer just sends back the same values that it receives
let alphaServer = newServer <| fun server ->
server.putCh >>= fun x ->
server.getCh <-- x
// each betaServer passes values on to the alpha server
// and then passes the replies back to the client.
let newBetaServer () = newServer <| fun server ->
server.putCh >>= fun x ->
server.getCh <-- (sendAndReceive alphaServer x |> run) // Is it ok to call run here?
// Makes a job that sends and receives x on a new betaserver
let newClientJob x = sendAndReceive (newBetaServer ()) x
// Create and run n jobs, each of which spawns a betaserver and talks to it
let test n =
[1..n]
|> Seq.map newClientJob
|> Job.conCollect
|> run
|> printfn "%A"
test 10 // lower numbers (2) seem to work. higher numbers (10) deadlock sometimes
Thanks,
Tim
Edit: turned on syntax highlighting
That might be interesting to benchmark/tune scaling to many cores.
let task: Task<unit> = Task.Factory.StartNew(fun _ -> failwith "error")
task |> Task.awaitJob |> run
this code hangs on the second line forever. Is this behavior intentional?
Joinads allow "waiting for several concrete values from several channels", like this (https://github.com/tpetricek/FSharp.Joinads/blob/master/src/Joins/Samples.fs#L70):
join {
match! put, empty, get, contains with
| (s, repl), (), ?, ? -> return react {
yield contains.Put(s)
yield repl.Reply() }
| ?, ?, repl, v -> return react {
yield repl.Reply(v)
yield empty.Put(()) }
}
I.e. it synchronizes on either
put
channel and the union value from empty
channelget
and contains
channelsI tried to port it to Hopac:
open Hopac
open Hopac.Infixes
open Hopac.Job.Infixes
open Hopac.Alt.Infixes
let put, get, empty, contains = ch(), ch(), ch(), ch()
// Initially, the buffer is empty
run (empty <-+ ())
Job.foreverServer (
// the match! ported
Alt.choose [ empty >>.? (put >>= Ch.send contains)
contains >>=? fun v -> Ch.send get v >>. Ch.send empty () ])
|> run
// the rest of the Joinads example ported:
// Repeatedly try to put value into the buffer
job { do! Async.Sleep 1000
for i in 0 .. 10 do
printfn "putting: %d" i
do! put <-- string i
do! Async.Sleep 500 }
|> start
// Repeatedly read values from the buffer and print them
job { while true do
do! Async.Sleep 250
let! v = get
printfn "got: %s" v }
|> start
Although it works as expected, I'm not sure Alt.choose
is capable to express match!
semantics in general. I mean, I've no idea how to rewrite things like this:
match! ch1, ch2, ch3, ch4, ch5 with
| ?, 0, "a", ?, "b" -> ...
| (), 1, ?, ?, ? -> ...
In this doc https://github.com/VesaKarvonen/Hopac/blob/master/Docs/Alternatives.md you introduce
let asyncAsAlt (xA: Async<'x>) : Alt<'x> = Alt.withNack <| fun nack ->
let rI = ivar ()
let tokenSource = new CancellationTokenSource ()
let op = async {
try
let! x = xA
do rI <-= x |> start
// do printfn "Success"
with e ->
do rI <-=! e |> start
// do printfn "Failure"
}
Async.Start (op, cancellationToken = tokenSource.Token)
nack
|>> fun () ->
tokenSource.Cancel ()
// printfn "Cancel"
|> Job.start >>%
upcast rI
where tokenSource
is not disposed. If I replace let
with use
the function stops working for obvious reasons (tokenSource
is disposed at the point where token.Cancel()
is called).
I cannot see any solution for this.
@theburningmonk voices a concern over cooperative scheduling in conjunction with Orleans in his blog post A look at Microsoft Orleans through Erlang-tinted glasses.
The scheduler of Hopac is also cooperative and optimized for parallel throughput. Runnings jobs are never implicitly switched out (would be an extra cost and, more importantly, would asymptically increase memory usage of highly parallel programs) and jobs are mostly scheduled in a stack/LIFO fashion (improves locality of reference). This also means that if CPU utilization is 100% over any given time period, then there may be ready jobs that don't get any CPU time during that period.
If someone encounters actual problems with cooperative scheduling it would be interesting to hear about them!
It would certainly be possible to offer a semi-preemptive scheduler that would use queues/FIFOs and would preempt execution of concurrent jobs based on time and/or number of monadic operations performed. However, I don't think that it would be really possible to preempt a running job that does not perform any monadic operations. (Perhaps the best that could be done would be to forcibly kill a worker thread that runs a job that has not performed any monadic operations in a given time period, but this is not a very satisfactory solution.) This means that it would still be possible to write non-cooperative jobs that would block other concurrent jobs from getting any CPU time.
The current Mailbox implementation, even though it seems to provide substantially better throughput than e.g. MailboxProcessor, still leaves plenty of room for improvements. It should be possible to implement it so that a producer and consumer can simultaneously access a Mailbox without interfering with each other in case there is a buffer of messages (although in case of an empty or near empty mailbox it becomes effectively impossible). It is also possible to inline the queue implementation into the Mailbox class to reduce space overhead.
I played with CounterActor benchmark a bit, I am getting about 10x better throughput on the one using native lock
vs the Hopac lock. I guess this is as expected - Hopac lock advantage is that for long critical sections it does not waste a whole thread, right? There is also perhaps some extra overhead in how parallel-for is done, to account for this.
A bit more surprising is that my first attempt at implementing lock API in F# is performing a bit better (1.5x). A bit strange - perhaps I'm not implementing it correctly, or the benchmark is not representative, or there's some other disadvantage.
If you have a spare moment could you give it a look?
module AltLock =
open System
open System.Threading
open Hopac
[<AbstractClass>]
type Section() =
abstract Do : byref<Core.Worker> -> unit
[<Sealed>]
type Section<'T>(f: unit -> 'T, cont: Cont<'T>) =
inherit Section()
override self.Do(worker) =
try
let res = f ()
cont.DoCont(&worker, res)
with e ->
cont.DoHandle(&worker, e)
type LockState =
| Free
| Locked of list<Section>
let ( == ) (a: obj) (b: obj) =
Object.ReferenceEquals(a, b)
let inline update (state: byref<'T>) (update: 'T -> 'T) =
let mutable loop = true
let mutable old = Unchecked.defaultof<_>
let mutable n = 0
while loop do
old <- state
if old == Interlocked.CompareExchange(&state, update old, old) then
loop <- false
else
if n > 5 then
Thread.SpinWait(1 <<< n)
if n < 23 then
n <- n + 1
old
[<Sealed>]
type AltLock internal () =
let mutable state = Free
static let locked0 = Locked []
let enterLock x =
let st =
update &state (function
| Free -> locked0
| Locked xs -> Locked (x :: xs))
match st with
| Free -> true
| _ -> false
let exitLock (worker: byref<Core.Worker>) =
let mutable loop = true
while loop do
let st =
update &state (function
| Locked [] -> Free
| Locked xs -> locked0
| _ -> failwith "Impossible")
match st with
| Locked [] -> loop <- false
| Locked work ->
for w in work do
w.Do(&worker)
| _ -> failwith "Impossible"
member self.Protect<'R>(f: unit -> 'R) =
{
new Job<'R>() with
member __.DoJob(worker, cont) =
let work = Section<_>(f, cont) :> Section
if enterLock work then
work.Do(&worker)
exitLock &worker
}
let create () = AltLock()
let protect (lock: AltLock) f = lock.Protect(f)
I'm still not quite getting the DoJob/Worker/Cont protocol. Is the above basically how you use it? Is there some more stuff one can do with the Worker struct?
About backoff / Thread.SpinWait - I think I got the idea from Aaron Turon's work like Reagents and his thesis - can't confirm any benefit, but does not seem to hurt either. I have a 6-core machine. It might make a difference for scaling to more cores. Not sure what the mechanism is exactly that makes it work, but something bad is going on when many cores are reading the same memory address, so "backing off" to spin-wait reportedly helps.
Is there anywhere else to backoff? Through worker? Like, switch to a different task if contention on current one is too high?
Is it possible to consider code targeting to other formats outside of the .net CLI like GPU, FPGA, Native/ASIC, JS etc. with the ability to specify which parts get targeted to which to get best performance and versatility.
I think WebShaper does this for JS.
Server should process requests withing given timeout. So, I create timedOut
ivar and start a job that fills the ivar after the timeout.
The problem is than it cases memory leak. Profiler shows constantly increasing number of object of the following types:
All of them are created by ProcessRunner.start
and File.startReading
.
If I comment the timeout logic, as in the snippet below, then the leak goes away:
Job.foreverServer (
reqCh >>= fun req ->
Job.usingAsync (ProcessRunner.start exePath req.Args) <| fun runner ->
Job.usingAsync (File.startReading req.LogPath) <| fun logFile ->
//let timedOut = ivar()
//start (timeOut req.Timeout >>=? fun _ -> timedOut <-= ())
let processExitedAlt() = runner.ProcessExited |>>? fun x ->
x |> Choice.mapError (fun e -> ProcessRunError (exePath, e))
let rec running() = Job.delay <| fun _ ->
processExitedAlt() .>>? timeOutMillis 500 >>=? exited <|>?
(logFile.NewLine >>=? fun line ->
match LogParser.parseLine line with
| None -> running()
| Some status ->
runner.Kill() >>. stopping status)// <|>?
//(timedOut |>>? fun _ -> Fail (TimeOut req.Timeout))
and stopping logStatus = Job.delay <| fun _ ->
(processExitedAlt() |>>? fun exitResult ->
combineResults logStatus exitResult) <|>?
(timeOutMillis 5000 >>%? (logStatus <!!> LogFileError))
and exited exitResult = Job.delay <| fun _ ->
(logFile.NewLine >>=? fun line ->
match LogParser.parseLine line with
| None -> exited exitResult
| Some status -> Job.result <| combineResults status exitResult) <|>?
(Alt.always() >>%? exitResult)
running()
>>= Ch.give req.ReplyCh)
|> start
I thought that it's safe to start long running or even never returning jobs because they are GC-ed when they are the only objects referencing the Alt
s they are synchronizing on. However, timeOut xx
is not such an Alt
, so, as I understand, timedOut
ivar is hold by long running start (timeOut req.Timeout >>=? fun _ -> timedOut <-= ())
and it prevents all the object to be GC-ed? If I'm right, what solution could you recommend?
In order to more easily support various platforms, such as Xamarin.iOS and Xamarin.Android, Hopac should be refactored to a PCL core plus (small) platform specific threading implementations.
Current master branch, Windows 7 x64, VS 2013, x64 FSI:
#r @"..\..\Hopac\Libs\Hopac\bin\Release\Hopac.Core.dll"
#r @"..\..\Hopac\Libs\Hopac\bin\Release\Hopac.dll"
#r "System.Runtime"
open Hopac
open Hopac.Job.Infixes
open Hopac.Alt.Infixes
let reqCh = ch()
let respCh = ch()
let server() =
Job.iterateServer () <| fun _ ->
Ch.take reqCh >>= fun x -> Ch.give respCh (x * x)
start (server())
The last line causes an exception:
System.NullReferenceException: Object reference not set to an instance of an object.
> at Hopac.Global.reallyInitGlobalScheduler() in P:\git\Hopac\Libs\Hopac\Hopac.fs:line 570
at Hopac.Job.Global.start[?](Job`1 xJ) in P:\git\Hopac\Libs\Hopac\Hopac.fs:line 645
at <StartupCode$FSI_0016>.$FSI_0016.main@()
Stopped due to error
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.