Giter Site home page Giter Site logo

hopac's Introduction

ReferenceGuideDocs

Hopac is a Concurrent ML style concurrent programming library for F#.

NuGet version AppVeyor Build status

Development

Check out the repo and use your favorite IDE. The project builds fine in VS and using the dotnet CLI.

Usage

When you've followed the links at the top of this README, and you've read the programming guide, you can use ./run repl as well as the file Hopac.fsx to play around with.

Furthermore, you'll find a large number of examples in (./Examples)[./Examples].

Release / publish

Build the Hopac project and publish the nupkg file in /Libs/Hopac/bin/Release/*.nupkg. Your commits are tested on AppVeyor when you send PR:s and push to master.

Update docs

You need the FsiRefGen git submodule for this. If it’s not already up to date, run:

git submodule update --init

TODO: Describe commands needed to update docs

hopac's People

Contributors

17cupsofcoffee avatar ajgajg1134 avatar cloudroutine avatar cmeeren avatar colinbull avatar enricosada avatar forki avatar haf avatar hyprhare avatar mavnn avatar mrakgr avatar ncave avatar neoeinstein avatar polytypic avatar rahimovir avatar szer avatar t0yv0 avatar tamizhvendan avatar vasily-kirichenko avatar wallymathieu avatar whyer avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

hopac's Issues

Add FAKE support

Add some FAKE support.

  • build everything in release configuration
  • create and publish NuGet package(s)

Problems with cooperative scheduling?

@theburningmonk voices a concern over cooperative scheduling in conjunction with Orleans in his blog post A look at Microsoft Orleans through Erlang-tinted glasses.

The scheduler of Hopac is also cooperative and optimized for parallel throughput. Runnings jobs are never implicitly switched out (would be an extra cost and, more importantly, would asymptically increase memory usage of highly parallel programs) and jobs are mostly scheduled in a stack/LIFO fashion (improves locality of reference). This also means that if CPU utilization is 100% over any given time period, then there may be ready jobs that don't get any CPU time during that period.

If someone encounters actual problems with cooperative scheduling it would be interesting to hear about them!

It would certainly be possible to offer a semi-preemptive scheduler that would use queues/FIFOs and would preempt execution of concurrent jobs based on time and/or number of monadic operations performed. However, I don't think that it would be really possible to preempt a running job that does not perform any monadic operations. (Perhaps the best that could be done would be to forcibly kill a worker thread that runs a job that has not performed any monadic operations in a given time period, but this is not a very satisfactory solution.) This means that it would still be possible to write non-cooperative jobs that would block other concurrent jobs from getting any CPU time.

Consuming events

What's the best way to consume ordinary IEvent (or IObservable)? This is what I'm trying:

type RunningProcess = 
    { LineOutput: Ch<Line>
      ProcessExited: IVar<ExitResult> }

let execute (p: Process) =
    let lineOutput = ch()
    let processExited = ivar()

    let server() =
        Streams.subscribingTo p.OutputDataReceived <| Streams.iterJob (fun args -> 
            if args.Data <> null then 
                lineOutput <-+ args.Data
            else Job.unit())
        >>. 
        (Streams.subscribingTo p.Exited <| Streams.iterJob (fun _ ->
            Job.start (processExited <-= p.ExitCode)))

    start (server())
    if not <| p.Start() then failwithf "Cannot start %s." p.StartInfo.FileName
    p.BeginOutputReadLine()
    { LineOutput = lineOutput; ProcessExited = processExited }      

Here I basically create two streams based on two events and route each event to a dedicated channel.
Client code is as following:

let pr = <create a RunningProcess instance here>
let rec loop() =
    (pr.LineOutput >>=? fun line -> 
        Job.start (job { printfn "Line: %s" line }) >>. loop()) <|>?
    (pr.ProcessExited |>>? fun res ->
        printfn "Exited with code %A." res)
    <|>? ...other Alts...

start (loop())

Besides it does not work, is this code a good use case for streams? Or it's possible to consume event more easily in this case?

Implement FanIn mailbox for high-contention serialization

The current messaging primitives of Hopac are optimized for low-contention scenarios. This is what should be done to maximize end-to-end processing performance with lightweight threads. However there are cases where a server needs to perform well even in potentially high-contention scenarios and it would be useful to have a "fan-in" mailbox for those cases. The idea of a FanIn mailbox is that it allows simultaneous access by multiple producer threads without interference. To make it really work, message ordering guarantees need to be relaxed or performance doesn't really improve. BTW, it would seem that there is no need for a FanOut mailbox, because lightweight threads and work distributing scheduler already take care of that.

NullReferenceException in windows console application

I built Hopac from source in VS, referenced Hopac, Hopac.Core and Hopac.Platform (...\Libs\Hopac.Platform.Net\bin\Release\Hopac.Platform.dll). Everything is ok when I run scripts in FSI, but it raises NullReferenceException when running in a console app. References are looked like this:

image

image

The same exception occurred earlier in FSI when I didn't reference Hopac.Platform.

Cannot port the Uid server from CML

fun mkUIdSrc () = let
    val ch = channel() 
    fun loop i = (send(ch, i); loop(i+1))
    in 
        spawn (fn () => loop 0);
        fn () => recv ch
    end

My attempt:

let uidServer = Job.delay <| fun _ ->
    let channel = ch()
    let rec loop value =
        Ch.give channel value >>. loop (value + 1)
    Job.start (loop 0) >>% channel

let server = run uidServer
run (Ch.take server)

It causes Stack Overflow on run uidServer. I'm struggling to understand what's going wrong here.

Add asyncAsAlt into sources

I think asyncAsAlt is very useful for interop with Asyncs, so I suggest to include it into the lib (something like Alt.ofAsync).

Use of C#?

Hi, thanks for the library, it's looking very useful - fills a big hole in concurrency abstractions in F# :)

Just looking for the first time over the code, why did you settle on using C# for the Core library? Is it really for performance or? I find C# a bit harder to read.

I'll need more time to read/play with this lib. Are there areas where you'd like contributions? Documentation comes to mind..

Thanks!

Write an example that shows how to replace (hot-swap) a running server

Assuming you have a CML style server that takes request messages from a channel then you can make it hot-swappable to a degree quite easily. Specifically, make the channel be held in a mutable ref cell. That is, clients send requests to the server only by getting the channel from the ref cell. The server, on the other hand, has the channel held in a local variable and doesn't touch the ref cell. To hot-swap the server, simply create a new channel and start a new server that takes messages from the new channel. Then set the ref cell to point to the new channel. The old server will finish pending requests and will then be garbage collected as it is no longer accessible to new clients. Write an elaborated example that shows how this can be done in detail.

Use bounded quantification at covariant positions for `Job<_>` and `Alt<_>`?

In #29 @vasily-kirichenko ran into a subtyping issue with the >>=? combinator. I wonder if all the combinators in Hopac should be changed to use bounded quantification at covariant positions where a user given function is expected to return a Job<_> or an Alt<_>?

For example, the current signature of >>=? is

val (>>=?): Alt<'x> -> ('x -> Job<'y>) -> Alt<'y>

and it would be changed to

val (>>=?): Alt<'x> -> ('x -> #Job<'y>) -> Alt<'y>

This is a breaking change in the sense that existing code will need to be modified (e.g. upcasts removed sometimes other type ascriptions added) to compile. I would assume that the amount of client code using Hopac is still small enough that it would not be a major problem. Any thoughts on this? (If I get no replies, I assume nobody cares and I will just make the change.)

I actually did a test run of this modification locally about a week ago, but I reverted the changes. It seemed that it is an overall win: to my eyes client code got a little bit simpler with fewer upcasts and other type ascriptions. The problem in #29 would disappear, for example. However, it also does change the inferred types in other ways and you will need to add type ascriptions elsewhere. So, it is not just a clear 100% improvement.

How to combine Alts into new ones

fun accum sum = sun (
    choose [
        wrap (recvEvt addCh, fun x => accum(sum+x)),
        wrap (recvEvt subCh, fun x => accum(sum-x)),
        wrap (sendEvt (readCh, sum), fun () => accum sum)
    ])
let rec accum sum =
    Alt.choose [ Ch.Alt.take addCh >>=? fun x -> accum (sum + x)
                 Ch.Alt.take subCh >>=? fun x -> accum (sum - x)
                 Ch.Alt.give readCh sum >>=? fun _ -> accum sum ]

But it does not compile since >>=? right argument must be Job<_>. So, is it not possible to create new Alts from existing ones combining them with combinators? I'm not sure why Job must appear in such definitions at all.

Higher order Alt functions

I'm trying to implement a higher-order function which should

  1. Wait for a message from a channel.
  2. Then synchronize on a Alt which is returned by a function passed as an argument.

My attempt is as following:

let ch = ch<int>()
let take() = Ch.take ch

let g (f: int -> Alt<_>) = Alt.guard << Job.delay <| fun _ ->
    take() |>>? fun x -> f x

run (ch <-+ 0)

for

(g (fun _ -> Timer.Global.timeOutMillis 3000 |>>? fun _ -> printfn "g timeout")) <|>?
(Timer.Global.timeOutMillis 10000 |>>? fun _ -> printfn "outer timeout")
|> run

It results with g timeout.
And for

(g (fun _ -> Timer.Global.timeOutMillis 10000 |>>? fun _ -> printfn "g timeout")) <|>?
(Timer.Global.timeOutMillis 3000 |>>? fun _ -> printfn "outer timeout")
|> run

it results with outer timeout, which is the desired behavior.
But if I not give a message to ch, then it hungs forever. As I understand, it waits on take() and not reaches fun x -> f x which is return the timeout.

How I can fix this?

FYI: Merging parts of Hopac.Extra to Hopac and the rest to Hopac.Experimental

I'm going to reorganize the libraries somewhat. I plan to eliminate the Hopac.Extra library by moving a number of modules from it to Hopac and the remaining modules to Hopac.Experimental. I plan to move the following modules and associated types to Hopac:

  • Hopac.Extra.Alt -> Hopac.Alt
  • Hopac.Extra.BoundedMb -> Hopac.BoundedMb
  • Hopac.Extra.Streams -> Hopac.Stream (Note: name changed)
  • Hopac.Extra.StreamVar -> Hopac.Stream.Var
  • Hopac.Extra.StreamSrc -> Hopac.Stream.Src
  • Hopac.Extra.StreamsBuilders -> Hopac.TopLevel

The other modules will be moved to Hopac.Experimental. Many of those modules are either rather incomplete (e.g. Hopac.Extra.Stream) or unlikely to be really useful (e.g. DirCh and SwapCh).

This should also make the name of the Hopac.Extras library more meaningful.

Elaborate on Erlang style selective communication vs CML style selective communication

Hypothesis: Erlang style pattern matching based selective communication (which also obviously works with CML) is good for implementing internal/closed protocols while CML style selective communication is good for external/open combination of such protocols. Write documentation to elaborate on this hypothesis. Show how issues, such as need to filter mailbox contents and race conditions that seem to be frequent in the Erlang Programming book, are due to attempts to do external combinations of otherwise perfectly working internal protocols and how CML style events help to avoid such problems with ease.

Implement remote mailbox

Public Hopac libs do not directly provide primitives for distributed computing. Implement a "remote" mailbox for communication between nodes on (potentially) separate physical nodes. Provide mechanisms to easily connect nodes, create remote mailboxes and observe node failures (Erlang monitor_node -> Node.Alt.down node).

Question for understanding

I have started using Hopac as an alternative to Async/TPL and I love it. I understand basic usage, but some aspects are still not clear.

First, could we compare Alt to F# lazy, so that a job inside an Alt is only evaluated on Alt.pick?

Second, is this implementation of AutoResetEvent correct and idiomatic for Hopac?

/// <summary>
/// MSDN: The AutoResetEvent class represents a local wait handle event that resets automatically 
/// when signaled, after releasing a single waiting thread. An AutoResetEvent object is automatically 
/// reset to non-signaled by the system after a single waiting thread has been released. 
/// If no threads are waiting, the event object's state remains signaled.
///
/// Hopac's alternative to http://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266923.aspx
/// </summary>
type HopacAutoResetEvent (initialState : bool) =
    // We will wait on take, and set with send
    let setChannel : Ch<unit> = ch()
    do if initialState then start <| Ch.send setChannel ()
    new() = HopacAutoResetEvent(false)
    member this.Wait(timeout:int) : Job<bool> = 
            let timedOut : Alt<bool> = 
                ((float timeout) |> TimeSpan.FromMilliseconds |> Timer.Global.timeOut)
                >>=? fun () -> Job.result false
            let signaled = Ch.Alt.take setChannel >>=? fun () -> Job.result true
            signaled <|> timedOut

    // From docs, important for <|>:
    // The given alternatives are processed in a left-to-right order with short-cut evaluation. 
    // In other words, given an alternative of the form first <|> second, the first alternative 
    // is first instantiated and, if it is pickable, is committed to and the second alternative 
    // will not be instantiated at all.

    member this.Set() : Job<unit> = 
        // from MSDN: Also, if Set is called when there are no threads waiting and the EventWaitHandle 
        // is already signaled, the call has no effect.

        // try take and send covers all cases
        // if there was no waiters and state was signalled -> will steal the state and send it back immediately
        // if there were waiting thread or state was not signaled -> there was no signal and we steal nothing, just signal
        (Ch.Try.take setChannel) >>. Ch.send setChannel ()

Third, is this implementation of ManualResetEvent correct and idiomatic for Hopac?

/// <summary>
/// Hopac's alternative to http://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266920.aspx
/// </summary>
type HopacManualResetEvent (initialState : bool) =
    [<VolatileFieldAttribute>]
    let mutable state : bool = initialState
    let setChannel : MChan<bool> = run <| Multicast.create ()
    let lock = Lock.Now.create()
    new() = HopacManualResetEvent(false)

    member this.Wait() : Job<bool> =
        let rec loop () = 
            job {
                if state then return true 
                else 
                    let! port = Multicast.port setChannel
                    let! res = (Multicast.recv port) // waiting here
                    if res then return true
                    else return! loop ()
            }
        loop ()

    // From Multicast.fsi: **Sends** a message to all of the ports listening to the multicast channel.
    // Send must mean the same as in Ch
    member this.Set() : Job<unit> = 
        (Multicast.multicast setChannel true)   // there could be no waiters
        |>> (fun _ -> state <- true )   // in any case we set the state
        >>% ()                          // and return unit
        |> (Lock.duringJob lock)

    member this.Reset() : Job<unit> = 
        (Multicast.multicast setChannel false) // (redundant?) if there are takers, res in loop() will be false and loop will iterate
        |>> (fun _ -> state <- false )  // in any case we set the state
        >>% ()
        |> (Lock.duringJob lock)

Thanks in advance!

Cross-post: http://stackoverflow.com/questions/26912904/better-understanding-of-f-hopac-library

Contrast Erlang monitor_node with CML style events to avoid race condition

See page 257 in the book Erlang Programming by Cesarini and Thompson. The bottom example seems to have a race condition with the monitor_node pattern. Specifically, a node might send the ok response and then die before the client receives the response leaving a spurious nodedown message in the mailbox of the client. Assuming I'm correct, the authors fail to notice it despite mentioning it is important to demonitor, because it generates nodedown messages. Write documentation to explain how defining "nodedown" as a CML style event and using CML style selective communication avoids race conditions such as the one in the book, while also making code more modular. CML style selective communication isn't just for timeouts.

Is Hopac "natural" to use from C#

MailboxProcessor in F# is specifically designed for F# and because of different implementation of Task and Async, MailboxProcessor isn't a pleasant experience in C#.

My question is Hopac the same thing as MailboxProcessor from C# point of view? or Can we modified it to use it & make it a pleasant for C# developer.

I haven't looked at the source code and asking if some one can intuitively think that it isn't much effort to use it from C#?

Exponential backoff?

Did a quick read of IVar.cs. Other frameworks I read about recommend exponential backoff as a strategy for handling high contention. Can be quite simple - Thread.SpinWait every time we re-enter "Spin", with doubling the time, up to a threshold. Should be possible to construct a high-contention benchmark to experiment with this.

Does this sound useful, or are there other abstractions in the library that do this better than IVar's? Thanks.

SynchronizationContext support

Hopac currently does not support anything like the SynchronizationContext mechanism that would allow the execution of Hopac jobs to be pinned to particular threads (or groups of threads) such as the main UI thread.

The overhead for supporting synchronization contexts is significant. Just adding the extra field corresponding to such a context, without making any use of the field, would seem to add at least about 10% execution time overhead in many benchmarks. When execution of a Hopac job needs to be started on a particular synchronization context that is not controlled by Hopac code (like the main UI thread), an operation needs to be posted to the context and the overhead is likely to be an order of magnitude higher than with Hopac's custom scheduler.

Despite the overhead cost, and contrary to my previous opinion, it would seem worth it to support synchronization contexts (and other similar mechanism). Doing so allows Hopac to be seamlessly used for GUI programming. Hopac performance will still likely be significantly better than with current F# Async. So, basically there will be one less reason to use F# Async at all.

New Naming

Hi,

May be resort to use full naming
Channel
Alternative
Actor
ActorThread
Continuation

Also where possible and appropriate have operator to back the functionality for conciseness.

Does this make sense?

Is it possible to cancel/perma-block a job using its handle?

I'd like to be able to cancel or permanently block a job by using its handle only (without changing a given job's body). Is it possible? An example

let jb = job{
    let sleep = Hopac.Timer.Global.sleep (TimeSpan.FromMilliseconds 1000.)
    let inner = job{
        do! sleep
        printfn "slept"}
    do! inner
    do! inner
}
start jb
//Do something so that "slept" is not printed two times

A second related question: is there a more primitive way of converting a job to an Alt that is available for picking once the job completes than something like

let runRead jb = 
    let ivar = ivar()
    start (jb >>= IVar.fill ivar)
    IVar.Alt.read ivar

Thanks!

Contrast mailbox filtering and use of reply ivar/ch

Write examples and documentation to show how use of fresh reply ivars/chs in CML style avoids the need to filter responses to canceled operations in client-server protocols that seems to be common in Erlang as seen several times in the Erlang Programming book by Cesarini and Thompson. See e.g. page 105.

Why were the README files removed?

Hi, I noticed you removed all of the README files a short while ago. What bearing does this have on the state of of the project? Is it ok to start using right now, or is there some kind of overhaul in progress?

Porting "match!" Joinads semantics

Joinads allow "waiting for several concrete values from several channels", like this (https://github.com/tpetricek/FSharp.Joinads/blob/master/src/Joins/Samples.fs#L70):

join {
    match! put, empty, get, contains with 
    | (s, repl), (), ?, ? -> return react {
      yield contains.Put(s)
      yield repl.Reply() }
    | ?, ?, repl, v -> return react {
      yield repl.Reply(v)
      yield empty.Put(()) } 
}

I.e. it synchronizes on either

  • any value in put channel and the union value from empty channel
  • get and contains channels

I tried to port it to Hopac:

open Hopac
open Hopac.Infixes
open Hopac.Job.Infixes
open Hopac.Alt.Infixes

let put, get, empty, contains = ch(), ch(), ch(), ch()

// Initially, the buffer is empty
run (empty <-+ ())

Job.foreverServer (
    // the match! ported
    Alt.choose [ empty >>.? (put >>= Ch.send contains)
                 contains >>=? fun v -> Ch.send get v >>. Ch.send empty () ])
|> run

// the rest of the Joinads example ported:

// Repeatedly try to put value into the buffer
job { do! Async.Sleep 1000
      for i in 0 .. 10 do
          printfn "putting: %d" i
          do! put <-- string i
          do! Async.Sleep 500 }
|> start

// Repeatedly read values from the buffer and print them
job { while true do 
          do! Async.Sleep 250
          let! v = get
          printfn "got: %s" v }
|> start

Although it works as expected, I'm not sure Alt.choose is capable to express match! semantics in general. I mean, I've no idea how to rewrite things like this:

match! ch1, ch2, ch3, ch4, ch5 with
| ?, 0, "a", ?, "b" -> ...
| (), 1, ?, ?, ? -> ...

HopacTest runs very very long on Mono/Linux

Ubuntu 14.04.1 LTS, 64 bit
Mono JIT compiler version 3.2.8 (Debian 3.2.8+dfsg-4ubuntu1)
Hopac 0.0.0.25

CPU load is close to zero.

(partial) output:

SerAsc: 55 - 0.025753s
ParTsk: 55 - 12.259953s (7.177841 tasks/s)
ParAsc: 55 - 0.026713s
SerFun: 6765 - 0.000148s (10945 recs)
ParOpt: 6765 - 0.003772s (2901643.690350 jobs/s)
ParJob: 6765 - 0.009304s (1176401.040435 jobs/s)
ParPro: 6765 - 0.104681s (104555.545790 jobs/s)
SerOpt: 6765 - 0.005330s
SerJob: 6765 - 0.006729s
FibNck: 6765 - 0.011253s
SerAsc: 6765 - 0.013057s
ParTsk: 6765 - 192.592561s (56.829817 tasks/s)
ParAsc: 6765 - 0.866197s
SerFun: 832040 - 0.014316s (1346268 recs)
ParOpt: 832040 - 0.244005s (5517381.003414 jobs/s)
ParJob: 832040 - 0.264780s (5084483.440385 jobs/s)
ParPro: 832040 - 0.323679s (4159274.045303 jobs/s)
SerOpt: Warning: Degraded allocation.  Consider increasing nursery-size if the warning persists.
Warning: Degraded allocation.  Consider increasing nursery-size if the warning persists.
832040 - 2.997870s
SerJob: Warning: Repeated degraded allocation.  Consider increasing nursery-size.
832040 - 6.977952s
FibNck: 832040 - 1.010520s
SerAsc: 832040 - 0.819772s
ParTsk: 

It hangs for several minutes on this state.

Unexpected deadlock

Hi Vesa,

Thanks for such a fantastic library!

The following example deadlocks sometimes, am I making a mistake somewhere? I have commented a line that may be suspicious.

The idea is to have a single server 'alphaServer' which is connected to by many 'betaServer's. All servers receive a request and then send a reply. The betaServers do this by forwarding the request on to the alphaServer. The alphaServer just sends back the same number.

// Server will listen on putCh and reply on getCh.
type Server = {
    getCh : Ch<int>
    putCh : Ch<int>
}

// Makes a new server that always repeats the same job
let newServer serverToJob =
    let server = {
        getCh = Ch.Now.create()
        putCh = Ch.Now.create() }

    server
    |> serverToJob
    |> Job.foreverServer
    |> run

    server

// Makes a job that sends x to a server and then receives the reply
let sendAndReceive server x =
    server.putCh <-- x >>.
    server.getCh

// alphaServer just sends back the same values that it receives
let alphaServer = newServer <| fun server ->
    server.putCh >>= fun x ->
    server.getCh <-- x

// each betaServer passes values on to the alpha server
// and then passes the replies back to the client.
let newBetaServer () = newServer <| fun server ->
    server.putCh >>= fun x ->
    server.getCh <-- (sendAndReceive alphaServer x |> run) // Is it ok to call run here?

// Makes a job that sends and receives x on a new betaserver
let newClientJob x = sendAndReceive (newBetaServer ()) x

// Create and run n jobs, each of which spawns a betaserver and talks to it
let test n =
    [1..n]
    |> Seq.map newClientJob
    |> Job.conCollect
    |> run
    |> printfn "%A"

test 10 // lower numbers (2) seem to work. higher numbers (10) deadlock sometimes

Thanks,
Tim

Edit: turned on syntax highlighting

timeOut ... |> start causes memory leak

Server should process requests withing given timeout. So, I create timedOut ivar and start a job that fills the ivar after the timeout.
The problem is than it cases memory leak. Profiler shows constantly increasing number of object of the following types:

image

All of them are created by ProcessRunner.start and File.startReading.

If I comment the timeout logic, as in the snippet below, then the leak goes away:

Job.foreverServer (
    reqCh >>= fun req ->
        Job.usingAsync (ProcessRunner.start exePath req.Args) <| fun runner ->
        Job.usingAsync (File.startReading req.LogPath) <| fun logFile ->
            //let timedOut = ivar()
            //start (timeOut req.Timeout >>=? fun _ -> timedOut <-= ())

            let processExitedAlt() = runner.ProcessExited |>>? fun x ->
                x |> Choice.mapError (fun e -> ProcessRunError (exePath, e))

            let rec running() = Job.delay <| fun _ ->
                processExitedAlt() .>>? timeOutMillis 500 >>=? exited <|>? 
                (logFile.NewLine >>=? fun line -> 
                    match LogParser.parseLine line with
                    | None -> running()
                    | Some status -> 
                        runner.Kill() >>. stopping status)// <|>?
                //(timedOut |>>? fun _ -> Fail (TimeOut req.Timeout))

            and stopping logStatus = Job.delay <| fun _ ->
                (processExitedAlt() |>>? fun exitResult -> 
                    combineResults logStatus exitResult) <|>?
                (timeOutMillis 5000 >>%? (logStatus <!!> LogFileError)) 

            and exited exitResult = Job.delay <| fun _ ->
                (logFile.NewLine >>=? fun line -> 
                    match LogParser.parseLine line with
                    | None -> exited exitResult
                    | Some status -> Job.result <| combineResults status exitResult) <|>?
                (Alt.always() >>%? exitResult)

            running()
        >>= Ch.give req.ReplyCh) 
|> start

I thought that it's safe to start long running or even never returning jobs because they are GC-ed when they are the only objects referencing the Alts they are synchronizing on. However, timeOut xx is not such an Alt, so, as I understand, timedOut ivar is hold by long running start (timeOut req.Timeout >>=? fun _ -> timedOut <-= ()) and it prevents all the object to be GC-ed? If I'm right, what solution could you recommend?

Very bad CounterActor benchmark performance on a manycore machine

Core i5 (4 physical cores), windows 7 x64:

ChMsg: 4 *      300 msgs =>    24811 msgs/s
ChMsg: 4 *     3000 msgs =>  4049403 msgs/s
ChMsg: 4 *    30000 msgs =>  2429420 msgs/s
ChMsg: 4 *   300000 msgs =>  3767980 msgs/s
ChMsg: 4 *  3000000 msgs =>  4213296 msgs/s
MbMsg: 4 *      300 msgs =>    56186 msgs/s
MbMsg: 4 *     3000 msgs =>  2688413 msgs/s
MbMsg: 4 *    30000 msgs =>  2772842 msgs/s
MbMsg: 4 *   300000 msgs =>  3313607 msgs/s
MbMsg: 4 *  3000000 msgs =>  2959431 msgs/s

Xeon (24 physical cores), windows server 2012R2 x64:

ChMsg: 48 *      300 msgs =>    73654 msgs/s
ChMsg: 48 *     3000 msgs =>   515786 msgs/s
ChMsg: 48 *    30000 msgs =>   600174 msgs/s
ChMsg: 48 *   300000 msgs =>   610965 msgs/s
ChMsg: 48 *  3000000 msgs =>   618576 msgs/s
MbMsg: 48 *      300 msgs =>   135676 msgs/s
MbMsg: 48 *     3000 msgs =>   558427 msgs/s
MbMsg: 48 *    30000 msgs =>   994872 msgs/s
MbMsg: 48 *   300000 msgs =>   872920 msgs/s
MbMsg: <cannot managed to wait this one to finish>

CPU usage - Hopac
image

CPU usage - MB
image

An older Xeon (8 cores), windows server 2012R2 x64:

ChMsg: 8 *      300 msgs =>    39138 msgs/s
ChMsg: 8 *     3000 msgs =>  1219054 msgs/s
ChMsg: 8 *    30000 msgs =>  1225679 msgs/s
ChMsg: 8 *   300000 msgs =>  1191693 msgs/s
ChMsg: 8 *  3000000 msgs =>  1203870 msgs/s
MbMsg: 8 *      300 msgs =>   119806 msgs/s
MbMsg: 8 *     3000 msgs =>   596963 msgs/s
MbMsg: 8 *    30000 msgs =>   784205 msgs/s
MbMsg: 8 *   300000 msgs =>   626359 msgs/s
MbMsg: 8 *  3000000 msgs =>   558681 msgs/s

'start' fails with exception on windows

Current master branch, Windows 7 x64, VS 2013, x64 FSI:

#r @"..\..\Hopac\Libs\Hopac\bin\Release\Hopac.Core.dll"
#r @"..\..\Hopac\Libs\Hopac\bin\Release\Hopac.dll" 
#r "System.Runtime"

open Hopac
open Hopac.Job.Infixes
open Hopac.Alt.Infixes

let reqCh = ch()
let respCh = ch()

let server() = 
    Job.iterateServer () <| fun _ ->
        Ch.take reqCh >>= fun x -> Ch.give respCh (x * x)

start (server())

The last line causes an exception:

System.NullReferenceException: Object reference not set to an instance of an object.
>    at Hopac.Global.reallyInitGlobalScheduler() in P:\git\Hopac\Libs\Hopac\Hopac.fs:line 570
   at Hopac.Job.Global.start[?](Job`1 xJ) in P:\git\Hopac\Libs\Hopac\Hopac.fs:line 645
   at <StartupCode$FSI_0016>.$FSI_0016.main@()
Stopped due to error

Understanding Alt.never()

Sorry for long question, but I really stuck on it. I'm afraid I don't fully understand guards.

I'm trying to implement a resizable pool of workers. It has degreeOfParallelism MVar which determines how many parallel jobs are actually executing at any moment (maximum).

It works OK until I call SetDegreeOfParallelism with a value which is smaller that current one. At this moment the pool stops working silently. The output is

Getting from source...
Alt.never
[worker] Received 1. Sleeping...
[worker] Received 1000. Sleeping...
Giving result to workDone: Choice1Of2 1
Giving result to workDone: Choice1Of2 1000
Work done: Choice1Of2 1 (worker count = 2)
Getting from source...
Alt.never
[worker] Received 999. Sleeping...
Work done: Choice1Of2 1000 (worker count = 2)
Getting from source...
Alt.never
[worker] Received 998. Sleeping...
Giving result to workDone: Choice1Of2 999
Giving result to workDone: Choice1Of2 998
Work done: Choice1Of2 999 (worker count = 2)
Getting from source...
Alt.never
[worker] Received 997. Sleeping...
Work done: Choice1Of2 998 (worker count = 2)
Getting from source...
Alt.never
[worker] Received 996. Sleeping...
Giving result to workDone: Choice1Of2 997
Giving result to workDone: Choice1Of2 996
Work done: Choice1Of2 997 (worker count = 2)
Getting from source...
Alt.never
[worker] Received 995. Sleeping...
Work done: Choice1Of2 996 (worker count = 2)
Getting from source...
Alt.never
[worker] Received 994. Sleeping...

// call SetDegreeOfParallelism here
val it : unit = ()

Alt.never
Giving result to workDone: Choice1Of2 995
Giving result to workDone: Choice1Of2 994

So it synchronizes on Alt.never() or workDone channel. The two workers call Ch.give workDone but Alt.choose does not synchronize on it. I cannot understand why.

open Hopac 
open Hopac.Job.Infixes
open Hopac.Alt.Infixes
open Hopac.Infixes

type Pool<'msg, 'res, 'error>(degreeOfParallelism: int, source: Alt<'msg>, worker: 'msg -> Job<Choice<'res, 'msg * 'error>>) =
    let degreeOfParallelism = MVar.Now.createFull degreeOfParallelism
    let dopChanged = ch<unit>() 
    let workDone = ch<Choice<'res, 'msg * 'error>>()
    let failedMessages = mb()

    let getMessage workerCount = 
        let get() = Alt.guard (job {
            let! dop = MVar.read degreeOfParallelism
            //printfn "Dop read: %d. Worker count: %d" dop workerCount
            return 
                if workerCount < dop then
                    printfn "Getting from source..."
                    source <|>? failedMessages 
                else
                    printfn "Alt.never"
                    Alt.never() })

        get() <|>? (dopChanged >>.? get())

    let pool = Job.iterateServer 0 <| fun workerCount ->
        Alt.choose [ getMessage workerCount >>=? fun msg ->
                        Job.queue (worker msg >>= fun r -> printfn "Giving result to workDone: %A" r; Ch.give workDone r) 
                        >>% workerCount + 1 
                     workDone >>=? fun r ->
                        printfn "Work done: %A (worker count = %d)" r workerCount
                        match r with
                        | Choice1Of2 _ -> Job.result (workerCount - 1)
                        | Choice2Of2 (msg, _) -> failedMessages <<-+ msg >>% workerCount - 1 
                   ]
    do start pool

    member __.SetDegreeOfParallelism value =
        degreeOfParallelism >>. MVar.fill degreeOfParallelism value >>. (dopChanged <-+ ()) |> run

module Test =
    open System
    open Hopac.Extensions

    let mb = mb<int>()

    let pool = Pool<int, int, exn>(2, mb, (fun msg -> job {
                   printfn "[worker] Received %A. Sleeping..." msg
                   do! Timer.Global.timeOut (TimeSpan.FromSeconds 1.)
                   return Choice1Of2 msg }))

    [1..1000] |> Seq.Con.iterJob (Mailbox.send mb) |> run


    pool.SetDegreeOfParallelism 1

CancellationTokenSource leak in asyncAsAlt

In this doc https://github.com/VesaKarvonen/Hopac/blob/master/Docs/Alternatives.md you introduce

let asyncAsAlt (xA: Async<'x>) : Alt<'x> = Alt.withNack <| fun nack ->
  let rI = ivar ()
  let tokenSource = new CancellationTokenSource ()
  let op = async {
      try
        let! x = xA
        do rI <-= x |> start
        // do printfn "Success"
      with e ->
        do rI <-=! e |> start
        // do printfn "Failure"
    }
  Async.Start (op, cancellationToken = tokenSource.Token)
  nack
  |>> fun () ->
        tokenSource.Cancel ()
        // printfn "Cancel"
  |> Job.start >>%
  upcast rI

where tokenSource is not disposed. If I replace let with use the function stops working for obvious reasons (tokenSource is disposed at the point where token.Cancel() is called).

I cannot see any solution for this.

Task.awaitJob mute exceptions

let task: Task<unit> = Task.Factory.StartNew(fun _ -> failwith "error")
task |> Task.awaitJob |> run

this code hangs on the second line forever. Is this behavior intentional?

Polling channel communication permanently blocks?

I'm having some trouble working with the newly added polling channel communications. Here's the code:

let c = ch()
let receiver = job{
    while true do
        do! Ch.take c}
let poll = job{
    let counter = ref 0
    for i = 0 to 999 do
        let! res = Ch.Try.give c ()
        if not res then counter := !counter + 1
    printfn "%A" !counter }
let run() =
    start receiver
    run poll
    printfn "done"

The poll job never completes and neither the counter nor "done" are printed. Am I doing something silly?

Question about CounterActor benchmark

I played with CounterActor benchmark a bit, I am getting about 10x better throughput on the one using native lock vs the Hopac lock. I guess this is as expected - Hopac lock advantage is that for long critical sections it does not waste a whole thread, right? There is also perhaps some extra overhead in how parallel-for is done, to account for this.

A bit more surprising is that my first attempt at implementing lock API in F# is performing a bit better (1.5x). A bit strange - perhaps I'm not implementing it correctly, or the benchmark is not representative, or there's some other disadvantage.

If you have a spare moment could you give it a look?

module AltLock =
    open System
    open System.Threading
    open Hopac

    [<AbstractClass>]
    type Section() =
        abstract Do : byref<Core.Worker> -> unit

    [<Sealed>]
    type Section<'T>(f: unit -> 'T, cont: Cont<'T>) =
        inherit Section()

        override self.Do(worker) =
            try
                let res = f ()
                cont.DoCont(&worker, res)
            with e ->
                cont.DoHandle(&worker, e)

    type LockState =
        | Free
        | Locked of list<Section>

    let ( == ) (a: obj) (b: obj) =
        Object.ReferenceEquals(a, b)

    let inline update (state: byref<'T>) (update: 'T -> 'T) =
        let mutable loop = true
        let mutable old = Unchecked.defaultof<_>
        let mutable n = 0
        while loop do
            old <- state
            if old == Interlocked.CompareExchange(&state, update old, old) then
                loop <- false
            else
                if n > 5 then
                    Thread.SpinWait(1 <<< n)
                if n < 23 then
                    n <- n + 1
        old

    [<Sealed>]
    type AltLock internal () =
        let mutable state = Free
        static let locked0 = Locked []

        let enterLock x =
            let st =
                update &state (function
                    | Free -> locked0
                    | Locked xs -> Locked (x :: xs))
            match st with
            | Free -> true
            | _ -> false

        let exitLock (worker: byref<Core.Worker>) =
            let mutable loop = true
            while loop do
                let st =
                    update &state (function
                        | Locked [] -> Free
                        | Locked xs -> locked0
                        | _ -> failwith "Impossible")
                match st with
                | Locked [] -> loop <- false
                | Locked work ->
                    for w in work do
                        w.Do(&worker)
                | _ -> failwith "Impossible"

        member self.Protect<'R>(f: unit -> 'R) =
            {
                new Job<'R>() with
                    member __.DoJob(worker, cont) =
                        let work = Section<_>(f, cont) :> Section
                        if enterLock work then
                            work.Do(&worker)
                            exitLock &worker
            }

    let create () = AltLock()
    let protect (lock: AltLock) f = lock.Protect(f)

I'm still not quite getting the DoJob/Worker/Cont protocol. Is the above basically how you use it? Is there some more stuff one can do with the Worker struct?

About backoff / Thread.SpinWait - I think I got the idea from Aaron Turon's work like Reagents and his thesis - can't confirm any benefit, but does not seem to hurt either. I have a 6-core machine. It might make a difference for scaling to more cores. Not sure what the mechanism is exactly that makes it work, but something bad is going on when many cores are reading the same memory address, so "backing off" to spin-wait reportedly helps.

Is there anywhere else to backoff? Through worker? Like, switch to a different task if contention on current one is too high?

Actor Implementation

May be you can provide the Actor implementation also as per the example or may be improved.

NullReferenceException

When I run my top level Job in a console application, the following exception raises:

untitled

What could cause it?

Revisit Mailbox implementation

The current Mailbox implementation, even though it seems to provide substantially better throughput than e.g. MailboxProcessor, still leaves plenty of room for improvements. It should be possible to implement it so that a producer and consumer can simultaneously access a Mailbox without interfering with each other in case there is a buffer of messages (although in case of an empty or near empty mailbox it becomes effectively impossible). It is also possible to inline the queue implementation into the Mailbox class to reduce space overhead.

Elaborate on consequences of using cooperative threads

Thread spawning, suspend and resume are very fast with cooperative threads and should be used to advantage. Polling should not be used. Hostile long running stuff, if really needed, should be run with native threads/scheduler, which are designed for that (via preemption and fairness mechanisms).

Can I give a value to a channel only if someone is listening?

Hi,

Is it possible to give a value to a channel if it has a listener, or to proceed without blocking or buffering the value when it has none? This is basically behaviour similar to an event.

(The question is a duplicate of http://stackoverflow.com/questions/24721444/hopac-can-i-give-value-to-a-channel-only-if-someone-is-listening-to-it but as the only two mentions of Hopac on stackoverflow seem to be mine, perhaps better to post it here).

Thanks for making Hopac publicly available. I can honestly say it's one of my favourite things about whole F#.

Code Generation Other Targets

Is it possible to consider code targeting to other formats outside of the .net CLI like GPU, FPGA, Native/ASIC, JS etc. with the ability to specify which parts get targeted to which to get best performance and versatility.

I think WebShaper does this for JS.

Differences between Job.delay and job {}

I'm struggling to understand why the first server works but the second does not:

let ch = ch<string>()

Job.foreverServer (job {
    let! v = ch
    printfn "from CE job: %s" v })  // only this output works
|> run

Job.foreverServer (Job.delay <| fun _ ->
    ch >>% fun v -> printfn "from plane job: %s" v)  // never prints anything 
|> run

run (ch <-- sprintf "msg")

As far as I understand these two servers semantically equal. What am I doing wrong?

for..in type inference

Somehow this infers Array for xs, not seq. Thoughts?

    let push out xs =
        job {
            for x in xs do
                do! Ch.give out x
        }

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.