Giter Site home page Giter Site logo

mbrace.core's People

Contributors

anirothan avatar dsyme avatar eiriktsarpalis avatar forki avatar gitter-badger avatar isaacabraham avatar jasonweiyi avatar krontogiannis avatar leafgarland avatar nbenton avatar palladin avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

mbrace.core's Issues

Unexpected crash handling medium CloudFile

The datafile contains 42,000 images, and can be found here: http://1drv.ms/1GnHRRA
The cloudValidation process faults every time, with the following:
Nessos.FsPickler.FsPicklerException: Error serializing object of type 'MBrace.Azure.Runtime.PickledJob'. ---> System.IO.IOException: There is not enough space on the disk. (details below)

#load "credentials.fsx"

open MBrace.Core
open MBrace.Azure
open MBrace.Azure.Client
open MBrace.Store
open MBrace.Workflows
open MBrace.Flow

let dataPath = __SOURCE_DIRECTORY__ + @"../../../data/"
let fullDataPath = dataPath + "large.csv"
let cluster = Runtime.GetHandle(config)

CloudFile.Delete("data/large.csv") |> cluster.RunLocally
//let large = CloudFile.Upload(fullDataPath,"data/large.csv") |> cluster.RunLocally
let large = CloudFile("data/large.csv")

// works
cloud { 
    let! data = CloudFile.ReadAllLines(large.Path) 
    return data.Length }
|> cluster.Run

type Image = int[]
type Example = { Label:int; Image:Image }

let parseLine (line:string) =
    let columns = line.Split ','
    let label = columns.[0] |> int
    let pixels = columns.[1..] |> Array.map int
    { Label = label; Image = pixels }

// works
System.IO.File.ReadAllLines fullDataPath
|> Array.map parseLine

// works
cloud { 
    let! data = CloudFile.ReadAllLines(large.Path) 
    let parsed = data |> Array.map parseLine
    return parsed.Length }
|> cluster.Run

cluster.AttachClientLogger(ConsoleLogger())

#time

let distance (img1:Image) (img2:Image) =
    let mutable total = 0
    let size = 28 * 28
    for i in 0 .. (size - 1) do
        let diff = img1.[i] - img2.[i]
        total <- total + abs diff //diff * diff
    total

let classifier (sample:Example[]) (image:Image) =
    sample
    |> Array.minBy(fun ex -> distance ex.Image image)
    |> fun x -> x.Label

let cloudValidation = 
    cloud {
        let! data = CloudFile.ReadAllLines(large.Path)
        let train = data.[..40000] |> Array.map parseLine
        let test = data.[40001..] |> Array.map parseLine
        let model = classifier train
        let! correct =
            test
            |> CloudFlow.OfArray
            |> CloudFlow.withDegreeOfParallelism 16
            |> CloudFlow.map (fun ex ->
                    if model ex.Image = ex.Label then 1.0 else 0.0)
            |> CloudFlow.average
        return correct}

// faults
let job = cloudValidation |> cluster.CreateProcess
job.Completed
job.AwaitResult ()

cluster.ShowProcesses ()

Added the stacktrace:

MBrace.Core.FaultException: Failed to execute job 'fe2eef6cfdf64eb28e375cd4e7a224f4' ---> Nessos.FsPickler.FsPicklerException: Error serializing object of type 'MBrace.Azure.Runtime.PickledJob'. ---> System.IO.IOException: There is not enough space on the disk.

   at System.IO.__Error.WinIOError(Int32 errorCode, String maybeFullPath)
   at System.IO.FileStream.WriteCore(Byte[] buffer, Int32 offset, Int32 count)
   at <StartupCode$FsPickler>[email protected](WriteState w, String tag, T t) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\PicklerGeneration\FieldPicklers.fs:line 125
   at Nessos.FsPickler.CompositePickler`1.Write(WriteState state, String tag, T value) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\Pickler\CompositePickler.fs:line 202
   at recordSerializer(Pickler[] , WriteState , PickledJob )
   at <StartupCode$FsPickler>[email protected](WriteState w, String tag, Record t) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\PicklerGeneration\FSharpTypeGen.fs:line 191
   at Nessos.FsPickler.CompositePickler`1.Write(WriteState state, String tag, T value) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\Pickler\CompositePickler.fs:line 202
   at Nessos.FsPickler.RootSerialization.writeRootObject[T](IPicklerResolver resolver, ReflectionCache reflectionCache, IPickleFormatWriter formatter, FSharpOption`1 streamingContext, Pickler`1 pickler, T value) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\FsPickler\RootSerialization.fs:line 38
   --- End of inner exception stack trace ---
   at Nessos.FsPickler.RootSerialization.writeRootObject[T](IPicklerResolver resolver, ReflectionCache reflectionCache, IPickleFormatWriter formatter, FSharpOption`1 streamingContext, Pickler`1 pickler, T value) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\FsPickler\RootSerialization.fs:line 41
   at Nessos.FsPickler.FsPicklerSerializer.Serialize[T](Stream stream, T value, FSharpOption`1 streamingContext, FSharpOption`1 encoding, FSharpOption`1 leaveOpen) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\FsPickler\Serializer.fs:line 47
   at <StartupCode$MBrace-Azure-Runtime>[email protected](Tuple`2 _arg1) in C:\workspace\krontogiannis\MBrace.Azure\src\MBrace.Azure.Runtime\Primitives\Queues.fs:line 151
   at [email protected](AsyncParams`1 args)
   --- End of inner exception stack trace ---

   at <StartupCode$MBrace-Azure-Client>[email protected](Unit _arg4) in C:\workspace\krontogiannis\MBrace.Azure\src\MBrace.Azure.Client\Process.fs:line 133
   at [email protected](a a)
   at MBrace.Core.Internals.ExceptionDispatchInfoUtils.Async.RunSync[T](FSharpAsync`1 workflow, FSharpOption`1 cancellationToken) in c:\Users\eirik\Development\mbrace\MBrace.Core\src\MBrace.Core\Utils\ExceptionDispatchInfo.fs:line 139
   at <StartupCode$FSI_0023>.$FSI_0023.main@()
Stopped due to error

CloudKeyValueStore as a primitive

Some programmatic distributed compute frameworks I've seen make distributed, partitioned key-value stores the primary data structure in the framework )in some cases it is more or less the only data primitive). This obviously also maps well onto table storage. CloudVector can be seen as a special case of a distributed KVS.

In any case, I'm wondering whether this direction has been considered.

Thanks
Don

Error when using CloudFlow.toArray

I get a strange F# compiler error when using MBrace Cloud Flow today โ€“ has anyone ever seen this? It seems to be failing to resolve a reference to a method in Streams.Core.dll, as if there is a version mismatch between the things being referenced.

4-cloud-parallel-data-flow.fsx(36,8): error FS0971: Undefined value 'AddRange : !<'T> ArrayCollector <'T> -> ArrayCollector <'T> -> unit
         ! compiled_name! =  AddRange
           membInfo-slotsig! =  []
         # arity<1>[1, 1]'

It is when running this code from the starter pack:

let streamComputationJob = 
    [| 1..100 |]
    |> CloudFlow.ofArray
    |> CloudFlow.toArray
    |> cluster.CreateProcess

Against these dependencies:

    FsPickler (1.0.17)
    MBrace.Azure (0.6.5-alpha)
    MBrace.Azure.Standalone (0.6.5-alpha)
    MBrace.Core (0.9.8-alpha)
    MBrace.Flow (0.9.8-alpha)
    MBrace.Runtime.Core (0.9.8-alpha)
    Microsoft.Data.Edm (5.6.4)
    Microsoft.Data.OData (5.6.4)
    Microsoft.Data.Services.Client (5.6.4) - framework: >= net40
    Microsoft.WindowsAzure.ConfigurationManager (3.1.0) - framework: >= net40
    Mono.Cecil (0.9.6.0)
    Newtonsoft.Json (6.0.8) - framework: wpv8.0, >= net40
    Streams (0.3.0)
    System.Spatial (5.6.4)
    Unquote (2.2.2)
    Vagabond (0.6.3)
    WindowsAzure.ServiceBus (2.6.5)
    WindowsAzure.Storage (4.3.0)
``

CloudCell isn't obvious how to use.

I have a text file in blob storage. I can get to it via CloudFile easily enough, but I want to cache it. So I can't use CloudFile for this, I need CloudCell. But the API doesn't have any easy way for me to either go from a CloudFile to a CloudCell, nor can I easily do e.g. CloudCell.Load "myfile".

Do we even need two abstractions for a reference to a remote bit of data?

Cache --> PopulateCache

I'd recommend "Cache()" on "CloudSequence" be called "PopulateCache()"

arrayOfDataInCloud.GetPartition(0).Cache()

Improve downloading to CloudFile

Getting data from HTTP sources into MBrace is a common use case; currently the standard cloud { } involves the usual creating of a web client, downloading and pushing into a cloudfile. This could be incorporated into the library easily enough (either directly onto CloudFile or perhaps better as an extension method on it), but more than that, for large files it could be written to in parallel - certainly Azure blobs allow for writing to multiple parts of a blob using the API. This would be very useful where you have one extremely large file and want to upload it quickly into Azure.

Discussion: cloud workflows and the distribution effect

Drawing on the discussion started in this issue, I would like to share a few thoughts on the programming model.

As you may know, cloud workflows are used in every aspect of the MBrace API, from parallel combinators to store operations. For instance, the ICloudDisposable interface has the following signature:

type ICloudDisposable =
    abstract Dispose : unit -> Cloud<unit>

An interesting question that arises here is, how one can know if a dispose implementation does not introduce distribution? While it makes sense that all primitive store operations should not introduce distribution, this cannot be guaranteed by their type signature. A workflow of type Cloud<unit> could either signify asynchronous store operation or it could contain a massively distributed computation. In other words, there is no way to statically detect if a workflow carries the distribution effect.

Currently, this is somewhat mitigated using the following primitive:

Cloud.ToLocal : Cloud<'T> -> Cloud<'T>

This has the effect of evaluating the input workflow with thread pool parallelism semantics, thus giving a dynamic guarantee that the nested computation will never exit the current worker machine. It offers relative sanity, but is hard to reason about and does not work correctly in conjunction with forking operations, like Cloud.StartChild.

My proposal to amending this issue is to introduce two brands of computation expressions for MBrace workflows, for local and distributed computations. A distributed workflow can compose local workflows, but not the other way around. Store operations will be local workflows and the parallelism primitives will necessary return distributed workflows. This would allow to statically reason about the distribution effect, while potentially complicating the programming model.

I have created an experimental branch that attempts to develop these ideas:
Workflow definitions
Builder declarations
Store operations using local workflows
Cloud.Parallel primitive

Thoughts?

CloudFile.Upload: why sourcePath: seq<string> ?

I was a bit surprised to see that CloudFile.Upload takes in a sequence of paths, instead of the path to a single file. I would have expected a single-filepath signature, but I am probably missing a use case / scenario. Can you clarify why you went this route?

Add CloudFlow.OfHttpFileByLine

It would be really useful to have CloudFlow.OfHttpFileByLine/OfHttpFilesByLine with semantics similar to OfCloudFileByLine/OfCloudFilesByLine

CloudFlow.OfHttpFileByLine "http://www.url.com/big.csv"
|> CloudFlow.filter (fun line -> ...)
|> CloudFlow.take 10
|> CloudFlow.toArray

thanks to @tpetricek for the feature request.

FsPickler deserialization failure for FSharpFunc`2

The following sample crashes, and I can't figure out how to interpret the exception. The data files can be found in http://1drv.ms/1QV0MWw , the code fails somewhere in the cloudValidation part with
MBrace.Core.FaultException: Failed to unpickle Job '6a051dbdebba43c5affd8e2e47fb1178' ---> Nessos.FsPickler.FsPicklerException: Error deserializing object of type 'Microsoft.FSharp.Core.FSharpFunc2[MBrace.Core.Internals.ExecutionContext,Microsoft.FSharp.Core.Unit]'. ---> System.IO.FileNotFoundException: Could not load file or assembly 'FSI-ASSEMBLY, Version=0.0.0.0, Culture=neutral, PublicKeyToken=null' or one of its dependencies. The system cannot find the file specified.`

Failing sample:

open System
open System.IO

type Image = int[]
type Example = { Label:int; Image:Image }

let readFrom (path:string) = 
    path
    |> File.ReadAllLines
    |> fun lines -> lines.[1..]
    |> Array.map (fun line -> line.Split ',')
    |> Array.map (fun line -> 
        let label = line.[0] |> int
        let image = line.[1..] |> Array.map int
        { Label = label; Image = image })

let dataPath = __SOURCE_DIRECTORY__ + @"../../../data/"
let trainingPath = dataPath + @"trainingsample.csv"
let training = readFrom trainingPath
let validationPath = dataPath + @"validationsample.csv"
let validation = readFrom validationPath

type Distance = Image -> Image -> int
let distance (img1:Image) (img2:Image) =
    let mutable total = 0
    let size = 28 * 28
    for i in 0 .. (size - 1) do
        let diff = img1.[i] - img2.[i]
        total <- total + diff * diff
    total

let classifier (sample:Example[]) (d:Distance) (img:Image) =
    sample
    |> Array.minBy (fun ex -> d ex.Image img)
    |> fun ex -> ex.Label

#load "credentials.fsx"

open MBrace.Core
open MBrace.Azure
open MBrace.Azure.Client
open MBrace.Flow

let cluster = Runtime.GetHandle(config)

open MBrace.Store
let cloudTraining = 
    cloud { return! CloudValue.New(training,enableCache=true) }
    |> cluster.RunLocally

open MBrace.Workflows

let cloudValidation = 
    validation 
    |> Cloud.Balanced.mapLocal(fun example -> 
        local { 
            let! localTraining = cloudTraining.Value 
            let prediction = classifier localTraining distance example.Image 
            let correct = 
                if prediction = example.Label 
                then 1.0 
                else 0.0 
            return correct}) 
    |> cluster.Run 
    |> Array.average 

Full stack trace:

MBrace.Core.FaultException: Failed to unpickle Job '6a051dbdebba43c5affd8e2e47fb1178' ---> Nessos.FsPickler.FsPicklerException: Error deserializing object of type 'Microsoft.FSharp.Core.FSharpFunc`2[MBrace.Core.Internals.ExecutionContext,Microsoft.FSharp.Core.Unit]'. ---> System.IO.FileNotFoundException: Could not load file or assembly 'FSI-ASSEMBLY, Version=0.0.0.0, Culture=neutral, PublicKeyToken=null' or one of its dependencies. The system cannot find the file specified.
   at System.Reflection.RuntimeAssembly._nLoad(AssemblyName fileName, String codeBase, Evidence assemblySecurity, RuntimeAssembly locationHint, StackCrawlMark& stackMark, IntPtr pPrivHostBinder, Boolean throwOnFileNotFound, Boolean forIntrospection, Boolean suppressSecurityChecks)
   at System.Reflection.RuntimeAssembly.InternalLoadAssemblyName(AssemblyName assemblyRef, Evidence assemblySecurity, RuntimeAssembly reqAssembly, StackCrawlMark& stackMark, IntPtr pPrivHostBinder, Boolean throwOnFileNotFound, Boolean forIntrospection, Boolean suppressSecurityChecks)
   at System.Reflection.Assembly.Load(AssemblyName assemblyRef)
   at Nessos.FsPickler.ReflectionCache.loadAssembly(AssemblyInfo aI) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\Pickler\ReflectionCache.fs:line 55
   at System.Collections.Concurrent.ConcurrentDictionary`2.GetOrAdd(TKey key, Func`2 valueFactory)
   at Nessos.FsPickler.ReflectionCache.loadMemberInfo(FSharpOption`1 tyConv, FSharpFunc`2 loadAssembly, FSharpFunc`2 getMethodSignature, CompositeMemberInfo mI) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\Pickler\ReflectionCache.fs:line 169
   at Nessos.FsPickler.Utils.BiMemoizer`2.G(S s) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\Utils\Utils.fs:line 135
   at [email protected](ReadState r, String tag) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\Pickler\ReflectionPicklers.fs:line 253
   at <StartupCode$FsPickler>[email protected](ReadState r, String t) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\Pickler\CompositePickler.fs:line 112
   at Nessos.FsPickler.CompositePickler`1.Read(ReadState state, String tag) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\Pickler\CompositePickler.fs:line 265
   at <StartupCode$FsPickler>[email protected](ReadState r, String tag) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\Combinators\Array.fs:line 91
   at Nessos.FsPickler.CompositePickler`1.Read(ReadState state, String tag) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\Pickler\CompositePickler.fs:line 265
   at [email protected](ReadState r, String tag) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\Pickler\ReflectionPicklers.fs:line 190
   at <StartupCode$FsPickler>[email protected](ReadState r, String t) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\Pickler\CompositePickler.fs:line 112
   at Nessos.FsPickler.CompositePickler`1.Read(ReadState state, String tag) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\Pickler\CompositePickler.fs:line 265
   at Nessos.FsPickler.CompositePickler`1.Read(ReadState state, String tag) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\Pickler\CompositePickler.fs:line 255
   at classDeserializer(Pickler[] , ReadState )
   at Nessos.FsPickler.CompositePickler`1.Read(ReadState state, String tag) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\Pickler\CompositePickler.fs:line 265
   at Nessos.FsPickler.Pickler`1.UntypedRead(ReadState state, String tag) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\Pickler\Pickler.fs:line 59
   at Nessos.FsPickler.CompositePickler`1.Read(ReadState state, String tag) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\Pickler\CompositePickler.fs:line 242
   at dataContractDeserializer(Pickler[] , ReadState )
   at Nessos.FsPickler.CompositePickler`1.Read(ReadState state, String tag) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\Pickler\CompositePickler.fs:line 265
   at classDeserializer(Pickler[] , ReadState )
   at Nessos.FsPickler.CompositePickler`1.Read(ReadState state, String tag) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\Pickler\CompositePickler.fs:line 265
   at Nessos.FsPickler.Pickler`1.UntypedRead(ReadState state, String tag) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\Pickler\Pickler.fs:line 59
   at Nessos.FsPickler.CompositePickler`1.Read(ReadState state, String tag) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\Pickler\CompositePickler.fs:line 242
   at Nessos.FsPickler.RootSerialization.readRootObject[T](IPicklerResolver resolver, ReflectionCache reflectionCache, IPickleFormatReader formatter, FSharpOption`1 streamingContext, Pickler`1 pickler) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\FsPickler\RootSerialization.fs:line 49
   --- End of inner exception stack trace ---
   at Nessos.FsPickler.RootSerialization.readRootObject[T](IPicklerResolver resolver, ReflectionCache reflectionCache, IPickleFormatReader formatter, FSharpOption`1 streamingContext, Pickler`1 pickler) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\FsPickler\RootSerialization.fs:line 54
   at Nessos.FsPickler.FsPicklerSerializer.Deserialize[T](Stream stream, FSharpOption`1 streamingContext, FSharpOption`1 encoding, FSharpOption`1 leaveOpen) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\FsPickler\Serializer.fs:line 98
   at <StartupCode$FsPickler>.$Serializer.f@156-7[T](FsPicklerSerializer bp, FSharpOption`1 streamingContext, FSharpOption`1 encoding, Stream m) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\FsPickler\Serializer.fs:line 245
   at Nessos.FsPickler.FsPicklerSerializer.UnPickle[T](Byte[] pickle, FSharpOption`1 streamingContext, FSharpOption`1 encoding) in c:\Users\eirik\Development\nessos\FsPickler\src\FsPickler\FsPickler\Serializer.fs:line 245
   at MBrace.Azure.Runtime.PickledJob.ToJob() in C:\workspace\krontogiannis\MBrace.Azure\src\MBrace.Azure.Runtime\RuntimeProvider\Job.fs:line 202
   at <StartupCode$MBrace-Azure-Runtime>[email protected](Unit unitVar) in C:\workspace\krontogiannis\MBrace.Azure\src\MBrace.Azure.Runtime\Execution\JobEvaluator.fs:line 95
   at [email protected](AsyncParams`1 args)
   --- End of inner exception stack trace ---

   at <StartupCode$MBrace-Azure-Client>[email protected](Unit _arg4) in C:\workspace\krontogiannis\MBrace.Azure\src\MBrace.Azure.Client\Process.fs:line 133
   at [email protected](a a)
   at MBrace.Core.Internals.ExceptionDispatchInfoUtils.Async.RunSync[T](FSharpAsync`1 workflow, FSharpOption`1 cancellationToken) in c:\Users\eirik\Development\mbrace\MBrace.Core\src\MBrace.Core\Utils\ExceptionDispatchInfo.fs:line 139
   at <StartupCode$MBrace-Azure-Client>[email protected](Unit unitVar) in C:\workspace\krontogiannis\MBrace.Azure\src\MBrace.Azure.Client\Client.fs:line 281
   at [email protected](AsyncParams`1 args)
   at MBrace.Core.Internals.ExceptionDispatchInfoUtils.Async.RunSync[T](FSharpAsync`1 workflow, FSharpOption`1 cancellationToken) in c:\Users\eirik\Development\mbrace\MBrace.Core\src\MBrace.Core\Utils\ExceptionDispatchInfo.fs:line 139
   at <StartupCode$FSI_0006>.$FSI_0006.main@() in C:\Users\Mathias\Documents\Visual Studio 2013\Projects\mbrace\MBrace.StarterKit-master\azure\HandsOnTutorial\digits.fsx:line 73
Stopped due to error

Further simplify the CloudFlow producer API methods

The ofCloudFileByLine and ofTextFileByLine and extremely useful (particularly the latter). However it could be made even better by allowing either a couple of overloads and / or optional parameters: -

  1. Ability to specify multiple files which are implicitly amalgamated into a single CloudFlow. e.g. ofTextFiles(string paths []). If you use this as a params argument you can simply merge this with the existing ofTextFiles producer.
  2. Ability to specify a directory, in which all files will be consumed (as in 1). You could reuse the above function but if you identify a path that ends in a / or * then assume a wildcard search (or similar). Alternative would be a secondary function but I can see use cases where you might want to combine e.g. 2 specific files plus 1 directory.
  3. Ability to simply specify 'T as a type argument without the need to specify a deserialization routine (useful for CloudFiles that were stored using the default MBrace serializer) or a built-in deserializer for JSON .NET (e.g. where each line in a text file is a JSON record).

I also think that this function (or variations of it) should be promoted as extension methods onto e.g. StoreClient on the runtime handle (or even higher), and possible on to CloudFile as well to aid discoverability - people need to know about the module and functions currently - putting them up the intellisense stack will help in this regard.

FSPickler issue on MBrace

This a repro I can make, on 0.6.10, taken from MBrace Starter Kit:

#load "credentials.fsx"

open System
open System.IO
open MBrace.Core
open MBrace.Azure
open MBrace.Azure.Client
open MBrace.Flow
open MBrace.Store
open MBrace.Workflows.Cloud

let cluster = Runtime.GetHandle(config)

// this all works
let job = 
    cloud {
        let name = "isaac"
        let date = System.DateTime.Now
        return (sprintf "Hello world %s %A!" name date)
    } 
    |> cluster.CreateProcess

type Person = { Name : string; Age : int }
let isaac = { Name = "Isaac"; Age = 35 }

// this goes pop
let v = CloudValue.New(isaac, "data/isaac") |> cluster.RunLocally

With the following error

Nessos.FsPickler.FsPicklerException: Error serializing object of type 'FSI_0025+Person'. ---> Nessos.Vagabond.VagabondException: type 'FSI_0025+Person' in dynamic assembly 'FSI-ASSEMBLY, Version=0.0.0.0, Culture=neutral, PublicKeyToken=null' does not correspond to slice

If you don't run the first cloud { } and just go straight to the CloudValue it works fine.

Mono compatibility

Running m-brace on Linux / Mono for heterogeneous clusters.
Getting around issue that Microsoft.ServiceBus library doesn't work on mono as figured out by Konstantinos:

#r "Microsoft.ServiceBus"
open Microsoft.ServiceBus
open Microsoft.ServiceBus.Messaging
let ns = NamespaceManager.CreateFromConnectionString("[conn]")

In the interactive:

System.TypeInitializationException: An exception was thrown by the type initializer for TimerManager ---> System.EntryPointNotFoundException: CreateWaitableTimer

at (wrapper managed-to-native)  Microsoft.ServiceBus.Common.Interop.UnsafeNativeMethods:CreateWaitableTimer (intptr,bool,string)

In a compiled binary the Mono runtime just hangs. It looks like the ServiceBus library uses some native libs not available in mono.

Suggestion: rename "CloudChannel" to "CloudQueue"?

On the whole I really like where the renaming and API reorganization has got us.

One thing I noticed today when coming back to 0.6.8 is about the CloudChannel name. As I mentioned in comments like this, I don't really love the "channel" terminology. A channel in MBrace is ultimately a queue - why not call it a queue? Won't people find that more intuitive?

It seems to me that having a CloudQueue abstraction (e.g. which implements both ISendPort and IReceivePort and which has a name in the same way as CloudFile and CloudDicitonary) would make sense and be more regular? The current use of send/receive pairs feels irregular. And I get the feeling that performance considerations will ultimately require adding further operations to CloudQueue, e.g. supporting bulk operations.

CloudSequence.Parse

I think that CloudSequence.Parse (or any .Parse) should check that the given path exists.

let x = runtime.StoreClient.CloudSequence.Parse<int>(System.Guid.NewGuid().ToString())
val x : CloudSequence<int> = CloudSequence[System.Int32] at e13f61da-05dc-4028-b892-bb625a74313e

I think that delaying the error might be confusing.

Food for thought: Can CloudSequence and CloudVector be one?

From what I can tell a CloudSequence is just a CloudVector with a single partition.

Have you thought of simplifying these and just have the one main type CloudVector with CloudSequence renamed to CloudVectorPartition?

I suppose it depends if an un-partitioned CloudSequence has substantial independent utility either as a primitive or as something commonly used.

Even if it exists as a question, I find myself wondering if CloudSequence should be taught in the primary learning sequence, or if people should just go straight to CloudVector. See https://github.com/dsyme/mbrace-docs/blob/fix-2/docs/content/programming-model.fsx#L377 for example.

This issue is just food for thought about how to keep simplifying the API and the primary teaching sequences. Close it once it is digested :)

cheers
don

Request: more visibility of what workers are doing

I'm finding situations where I don't know what the active workers are doing and a bit more visibility of whether they are actively working on jobs would be appreciated.

For example, I tried to create some 5-minute CPU-intensive computations like this:

let ps = 
 [for i in 0 .. 20 ->
   cloud { let t = System.DateTime.Now
           let x = ref 0
           while System.DateTime.Now - t < System.TimeSpan.FromMinutes(1.0) do
              x := !x + 1 
         }
    |> cluster.CreateProcess ]

This created the expected processes:

 Name                        Process Id   State  Completed  Execution Time            Jobs           Result Type  Start Time                  Completion Time 
 ----                        ----------   -----  ---------  --------------            ----           -----------  ----------                  --------------- 
       d5130f87184a4329a1d751ee1fe47175  Posted  False      00:08:27.8911021    0 /   0 /   0 /   1  unit         02/03/2015 09:35:23 +00:00  N/A             
       3957470ca1d641238d2c5b65d16c7b0e  Posted  False      00:08:25.4043661    0 /   0 /   0 /   1  unit         02/03/2015 09:35:25 +00:00  N/A             
       bb473a0a9d4242a2bda52fc6687f33c0  Posted  False      00:08:23.3617708    0 /   0 /   0 /   1  unit         02/03/2015 09:35:27 +00:00  N/A             
       c24cdde5f1974b7bbba8d9e244437b34  Posted  False      00:08:21.3528384    0 /   0 /   0 /   1  unit         02/03/2015 09:35:29 +00:00  N/A             
       96a736a0095a40388459a5b5e3fc3f13  Posted  False      00:08:19.3260546    0 /   0 /   0 /   1  unit         02/03/2015 09:35:31 +00:00  N/A             
       16d5e4ad5e94469e85e0b9bfef24141e  Posted  False      00:08:17.3328754    0 /   0 /   0 /   1  unit         02/03/2015 09:35:33 +00:00  N/A             
       7a8b5597e0504a6d8c518eeb7d54e350  Posted  False      00:08:15.2460662    0 /   0 /   0 /   1  unit         02/03/2015 09:35:35 +00:00  N/A             
       cbc02732dfc545a0b61dff4358467015  Posted  False      00:08:13.0899509    0 /   0 /   0 /   1  unit         02/03/2015 09:35:37 +00:00  N/A             
       5bd377501ac04e77bab7fed1211a2198  Posted  False      00:08:10.9151137    0 /   0 /   0 /   1  unit         02/03/2015 09:35:40 +00:00  N/A             
       ab8829c3df39446ea84078abc05567bf  Posted  False      00:08:08.7859904    0 /   0 /   0 /   1  unit         02/03/2015 09:35:42 +00:00  N/A             
       bb7c594d8e9540b08012ed1d09c90dd8  Posted  False      00:08:06.5305152    0 /   0 /   0 /   1  unit         02/03/2015 09:35:44 +00:00  N/A             
       74996cf74d48445f98cef0fbd5c37842  Posted  False      00:08:04.0222305    0 /   0 /   0 /   1  unit         02/03/2015 09:35:46 +00:00  N/A             
       7505ebcc477942daa95a1112d25d625c  Posted  False      00:08:01.9067604    0 /   0 /   0 /   1  unit         02/03/2015 09:35:49 +00:00  N/A             
       f8fcaf12293b449ea60dcbe7d48b3ca7  Posted  False      00:07:59.7708827    0 /   0 /   0 /   1  unit         02/03/2015 09:35:51 +00:00  N/A             
       9dfdbf94e1174ab9a73fc15b23a57070  Posted  False      00:07:57.6437405    0 /   0 /   0 /   1  unit         02/03/2015 09:35:53 +00:00  N/A             
       8410086495a04d13a502db18efc5fd4c  Posted  False      00:07:55.4526887    0 /   0 /   0 /   1  unit         02/03/2015 09:35:55 +00:00  N/A             
       241e143a811046d4b8446a73bbe902a0  Posted  False      00:07:53.1919711    0 /   0 /   0 /   1  unit         02/03/2015 09:35:57 +00:00  N/A             
       618f0edcb0314478bbd9dcb753397895  Posted  False      00:07:51.0978760    0 /   0 /   0 /   1  unit         02/03/2015 09:35:59 +00:00  N/A             
       140a7915ca994a4d9a138adec1ea3b6f  Posted  False      00:07:49.0119524    0 /   0 /   0 /   1  unit         02/03/2015 09:36:01 +00:00  N/A             
       d7104d4c3d59491fb6eeb7870e82815d  Posted  False      00:07:46.8516552    0 /   0 /   0 /   1  unit         02/03/2015 09:36:04 +00:00  N/A             
       e6be0fad3b8a4e9ea66f9d52ceebb13f  Posted  False      00:07:44.7516656    0 /   0 /   0 /   1  unit         02/03/2015 09:36:06 +00:00  N/A             

However the workers do not appear to be processing work. I assume something is horked with my cluster, but it would help if I could see if any work is actively being performed by any worker. The CPU utilization may be one indicator but that's not enough: I suppose I'd like to know which processes or cloud tasks are actively assigned to which worker.

 Id                     Hostname        % CPU / Cores  % Memory / Total(MB)  Network(ul/dl : kbps)  Jobs   Process Id  Initialization Time         Heartbeat                   Is active 
 --                     --------        -------------  --------------------  ---------------------  ----   ----------  -------------------         ---------                   --------- 
 MBraceWorkerRole_IN_0  RD0003FF4BD77D    0.90 / 2       20.29 / 3583.00         39.02 / 21.79      0 / 2        3032  02/03/2015 09:28:41 +00:00  02/03/2015 09:44:29 +00:00  True      
 MBraceWorkerRole_IN_1  RD0003FF4B9D5F    2.36 / 2       23.22 / 3583.00         39.86 / 22.92      0 / 2        1460  02/03/2015 09:29:41 +00:00  02/03/2015 09:44:29 +00:00  True      

CloudFlow.skip

If possible, a skip() function should be added to the CloudFlow module.

MBrace.CSharp rewrite

The inclusion of the Local api (see #21) to MBrace.Core has rendered the MBrace.CSharp project obsolete. We need to consider a rewrite of this project.

CloudFlow to CloudFile

Having a combinator to go from CloudFile to CloudFlow is very useful, but the logical end of this would be to also have a toCloudFile combinator as well - this would truly allow us to perform data workloads which revolve around reading a file (or set of files), performing some arbitrary operations on it, and outputting the result back to a file. Of course I would imagine that the implementation of this would also run in parallel rather than fork/ join for performance reasons (as has been done AFAIK for the fromCloudFileAsLines implementation).

Promote common types up from .Core namespace

Things like the cloud { } could (should?) be moved up to the MBrace namespace. Only types that are either advanced or e.g. interfaces that are not often used directly etc. could be left in MBrace.Core. The same could be done with some of the MBrace.Azure core types e.g. Configuration; in this way you can share a common logical namespace from several different physical assemblies.

This way someone can simply type MBrace. and see a simple list of the common types that are going to be needed quickly and easily.

Cannot add individual NuGet packages

Trying to create an MBrace project by hand, adding the required NuGet packages - and can't add the dependencies I need.

  1. Create an F# library (VS2013)
  2. Add MBrace.Core 0.9.14-alpha via "manage NuGet packages" in visual studio / Solution Explorer
  3. Add MBrace.Flow 0.9.14-alpha
  4. Add MBrace.Stream 0.9.7-alpha -> fails (Updating MBrace.Core 0.9.14-alpha to MBrace.Core 0.9.7-alpha failed)
    Additionally, when I try to add MBrace.Azure, I also get a fail, and MBrace.Azure.Client, nothing seems to be happening.

CloudStream.groupBy, distinct etc.?

Are there plans to add groupBy, sortByDescending, distinct and other such operators to CloudStream? Perhaps the F# community can contribute if you make available a roadmap based on this table?

Allow use of native 64-bit DLLs with Mbrace/Vagabond/Azure

(I'm not sure if this is an issue for Core, Vagabond or Azure)

This sample shows how to use the Math.NET nugget package as part of MBrace on Azure:

However, the Math.NET native Intel MKL implementation is about 20x faster than the managed version, for large matrix operations. To be competitive, and to be cool, we really need to allow the use of Intel MKL native DLLs as part of MBrace jobs on Azure. I presume workers can in principle run native code.

On the local machine, this is what's needed to activate Intel MKL native:

  1. Add this to the paket dependencies:

    nuget MathNet.Numerics.MKL.Win-x64

  2. Add this to the script:

    let dllPath = SOURCE_DIRECTORY + @"\packages\MathNet.Numerics.MKL.Win-x64\content"
    System.Environment.SetEnvironmentVariable("Path",System.Environment.GetEnvironmentVariable("Path") + ";" + dllPath)
    Control.UseNativeMKL();

Then matrix multiplications are much, much faster

On the cluster, it is reasonable to expect to have to manually set Control.UseNativeMKL() at the start of each CreateProcess, and even to adjust the Path explicitly (or set call SetDllDirectory). The problem is that we don't know what SomeDirectory is - where is the native DLL located on the cluster?

let invertRandomMatricesJob = 
    cloud { 
        Control.UseNativeMKL();
        Environment.SetEnvironmentVariable("Path",Environment.GetEnvironmentVariable("Path") + ";" + SomeDirectory)
        let m = Matrix<double>.Build.Random(100,100) 
        let x = (m * m.Inverse()).L1Norm()
        return x } 
    |> cluster.CreateProcess

It would also be reasonable to have some explicit vagabond registration of native DLLs

Thanks!

Ability to construct closure-less cloud computations?

Whilst a large amount of flexibility and power comes from working with closures, we've seen how accidentally creating large bindings that get implicitly captured by cloud { } can have a detrimental effect on performance. It would be nice to also have the option to somehow send some computation to the cluster that only used some arbitrary value supply e.g.

let data = "hello"
data |> Cloud.Execute(fun x -> ,,) // x is the same as data

This is just off the top of my head - I think that this proposal would have to have runtime checks to fail if anything was accidentally closed over. But you get the gist of what I'm trying to get at?

API consistency on flows and streams

If you have an array of items and want to map across them via mbrace, there are (as far as I found) several different ways: -

  1. CloudFlow.map. This would seem to be the default mechanism for doing such things - use the CloudFlow.ofArray combinator and off you go. But you can't easily deal with cloud { } or local { } blocks inside these - or if you can, how?
  2. Cloud.Parallel. A little more low-level than CloudFlow, and more flexible. but there's a finite size on the amount of items you can chuck at this before it throws an ArgumentException because of too many processes.
  3. Workflow.Cloud.Balanced has two methods: map and mapLocal. The former does what you would expect i.e. T -> U and to all intents and purposes behaves like CloudFlow.map. The latter goes from T -> U local i.e. it expects to receive a local { } back. This is useful if you want to do something like access a CloudValue inside - map doesn't let you.
  4. Workflow.Cloud.Sequential has a map method. Except this one goes from T -> U cloud! Why is the signature different on this map to the other ones?

I can see there being two types of uses for this sort of thing: -

  1. Standard simple maps i.e. T -> U.
  2. Maps that need local { }. This is very useful if you need to go back to the cluster for anything inside a map.

What's the recommended way of doing them given the options identified above (and are there any others that I've missed)?

CloudStream.iter?

Is there any reason why there isn't a CloudStream.iter that accepts a T --> Cloud<unit>?

CloudVector performance / usage question

I have a CloudFlow that is operating over a 100mb file split into around 2million rows. I've done a relatively simple set of maps and filters on the flow, and wanted to persist / caching the results in memory across the cluster so that I can then repeatedly perform further cloud flows operations starting from here.

I started with a flow as follows: -

let results =
    [ fileRef ]
    |> CloudFlow.ofCloudFilesByLine
    |> CloudFlow.filter(fun row -> not (row.StartsWith "Acc_Index")) // skip header
    |> CloudFlow.map(fun row -> Casualties.ParseRows row |> Seq.head) // raw text to provided type
    |> CloudFlow.countBy(fun row -> row.Sex_of_Casualty)
    |> CloudFlow.toArray
    |> cluster.CreateProcess

This took around 60 seconds to run. I then modified the above code as follows: -

let results =
    [ fileRef ]
    |> CloudFlow.ofCloudFilesByLine
    |> CloudFlow.filter(fun row -> not (row.StartsWith "Acc_Index")) // skip header
    |> CloudFlow.map(fun row -> Casualties.ParseRows row |> Seq.head) // raw text to provided type
    |> CloudFlow.toCachedCloudVector
    |> cluster.CreateProcess

This process took over 2.5 minutes to run.

 Name                        Process Id     Status  Completed  Execution Time            Jobs           Result Type                                                                                          Start Time                  Completion Time            
 ----                        ----------     ------  ---------  --------------            ----           -----------                                                                                          ----------                  ---------------            
4c3352570aff44e5bf12af4b26d2ae06  Completed  True       00:01:10.7470280    0 /   0 /  13 /  13  (int * int64) []                                                                                     20/04/2015 17:34:15 +00:00  20/04/2015 17:35:26 +00:00 
3a57b4cb90da442d97baac5327c3283e  Completed  True       00:02:33.5285346    0 /   0 /   5 /   5  CloudVector<(string * int * int * int * int * int * int * int * int * int * int * int * int * int)>  20/04/2015 17:39:46 +00:00  20/04/2015 17:42:20 +00:00 

When I observed the workers, only one of them seemed to be doing much - it had high network bandwidth for a while, and ended up with higher memory utilization than the others: -

Id                     Hostname        % CPU / Cores  % Memory / Total(MB)  Network(ul/dl : kbps)   Jobs   Process Id  Initialization Time         Heartbeat                   Is active 
 --                     --------        -------------  --------------------  ---------------------   ----   ----------  -------------------         ---------                   --------- 
 MBraceWorkerRole_IN_3  RD0003FF44A489    2.96 / 4       21.75 / 7167.00        174.19 / 276.71     0 / 32        1256  17/04/2015 10:19:42 +00:00  20/04/2015 17:50:40 +00:00  True      
 MBraceWorkerRole_IN_1  RD0003FF44F5E6    2.52 / 4       16.92 / 7167.00        137.08 / 215.45     0 / 32        1012  17/04/2015 10:19:45 +00:00  20/04/2015 17:50:40 +00:00  True      
 MBraceWorkerRole_IN_2  RD0003FF44EC26    4.09 / 4       16.24 / 7167.00        165.88 / 180.45     0 / 32        1152  17/04/2015 10:19:52 +00:00  20/04/2015 17:50:39 +00:00  True      
 MBraceWorkerRole_IN_0  RD0003FF44F1F5    3.14 / 4       16.20 / 7167.00        184.52 / 273.63     0 / 32        2252  17/04/2015 10:19:59 +00:00  20/04/2015 17:50:39 +00:00  True      

What I don't get is - what exactly have I done by calling toCachedCloudVector? What has it done? Why was there high network upload?

Am I using the correct abstraction to get the correct behaviour i.e. store in memory the results of that flow so that I can use it repeatedly afterwards.

CloudFlow.collect requires an explicit seq

The signature of CloudFlow.Collect looks like: -

val inline collect : f:('T -> seq<'R>) -> flow:CloudFlow<'T> -> CloudFlow<'R>

Even if you return e.g. an array<'R>, the compiler will complain about this and you need to e.g. Seq.ofArray at the end of the lambda. Could the signature be changed to something like this?

val inline collect : f:('T -> #seq<'R>) -> flow:CloudFlow<'T> -> CloudFlow<'R>

display/ToString for CloudDirectory

Given this

let dp = cluster.DefaultStoreClient.FileStore.Directory.Create()

this should display the directory name:

val dp : CloudDirectory

Discussion thread: naming for CloudSequence and CloudStream

Hi all,

I'd like to start a discussion thread about the names "CloudSequence" and "CloudStream". Caveat: I haven't delved into CloudSequence much, and have only gone through the basic combinatory programming with CloudStream.

OK, so I guess I'm concerned by the naming here, from the perspective of a newcomer:

  • CloudSequence sounds totally like it is "an IEnumerable in the cloud". It's not.
  • CloudStream sounds like it is, well, for streaming data. It's not really, at least not yet.

So both newcomer intuitions are incorrect (I believe). As I understand it,

  • CloudSequence<T> is an immutable array of data items of type T stored in a CloudFile in cloud storage. It is definitely not an abstract interface-based notion like seq<T>
  • CloudStream<T> is for composing a data-parallel workflow, normally starting with fixed partitioned data and files, streamed through a set of combinators in a DryadLinq-style.

For CloudSequence, why not call it like it is - either CloudFile<T> (using generic overloading), or CloudCollectionFile<T> or the like? Why use the word "sequence" when it has so many other connotations in F#? I think I'd really recommend a change here.

For CloudStream.... OK, so the horse may have bolted here - since this naming is related to the Stream library from Nessos :)

As background I've always been loathe to "use up" the "Stream" name in F# nomenclature - for starters, the name is used in System.IO, and secondly it was used by an obscure (and little used) OCaml feature, and thirdly it has the streaming-data connotation above. But I still wonder if a word like "Flow" might serve MBrace users better, and leave open the possibility of using the "stream" word for later, e.g.

arrayOfDataInCloud |> CloudFlow.ofCloudVector |> CloudFlow.map (Array.sumBy (fun (i,j) -> i+j)) |> CloudFlow.sortBy (fun n -> n) 10 |> CloudFlow.toArray |> cluster.CreateProcess
or
arrayOfDataInCloud |> CloudDataFlow.ofCloudVector |> CloudDataFlow.map (Array.sumBy (fun (i,j) -> i+j)) |> CloudDataFlow.sortBy (fun n -> n) 10 |> CloudDataFlow.toArray |> cluster.CreateProcess
Equally I can see the Nessos stream library has filled the space I originally left, and with something very good too! ๐Ÿ‘ :) So I'd understand if you kept this naming!

Suggestion: clarify Jobs columns for GetProcesses()

When I call cluster.GetProcesses(), I see a summary which includes a header for Jobs, with 4 columns underneath.
I suspect these 4 columns must be total jobs, faulted, running and done, however as is it's difficult to know what is what.
I suggest clarifying what each column is, perhaps with RUN/ ERR/ TOT/ DONE, or something similar.

Need a easy list of all the abstractions

I'm seeing more and more people having difficulties about getting up to speed with the abstractions of MBrace. Things like CloudVector have little documentation on it - the intellisense comments of each static method just refer to CloudVector but don't actually explain what it is. Is there a list of all the main abstractions e.g. on the website?

Going through the starter pack, I think there needs to be better explanations of what things are, what the use case of them is and how it fits in with the rest of the MBrace API overall. Perhaps there's a need for some easier exercises in the pack that use some dataset to work with rather than something abstract like prime numbers etc. etc.?

CloudCell.Delete?

Given we can delete immutable cloud files, I believe it should be possible to delete immutable CloudCell values.

I realise this breaks the caching model, and is thus logically part of the storage-management API (like CloudFile.Delete and so on). But it seems necessary to allow this programmatically somehow.

CloudVector on a single large element uses two partitions, one very small

When you create a cloud vector for one item which logically fits in one partition, you get two partitions, the second of which is very small.

For example:

let cv2 = CloudVector.New ([| [| 0 .. 100000 |] |], 100000L) |> cluster.Run

cv2.PartitionCount
[ for p  in 0 .. cv2.PartitionCount - 1 -> cv2.GetPartition(p).Size |> cluster.Run ]

gives:

val it : int64 list = [400048L; 35L]

CloudAtomUtils shows empty in intellisense

CloudAtomUtils is showing as empty in intellisense, because it has an extension member. It would be better if it didn't show at all. I actually didn't think such modules showed in F# intellisense but apparently they do (WebExtensions from FSharp.Core also shows). Anyway, it would be nice if there were some way that it didn't show: clean intellisense menus are a blessing for users.

module CloudAtomUtils =

Could not load Deedle when running code for second time

I wrote an M-Brace demo that uses Deedle - quite curiously, it works fine the first time I run it, but breaks down when I run the code again. The script is here - I can try getting a smaller repro, but I thought I'd share the whole thing first.

The first part of the code creates CloudValue<string * Series<string, float>>. This bit works fine (I was thinking serializing Deedle series might go wrong, but it seems to be working good!)

The next part of the code reads the cloud value and runs K-means clustering on countries (using just a single cloud job). This works fine when I run it once, but when I run the lines 139-141 for the second time, I get the following (full error message is here):

System.IO.FileNotFoundException: Could not load file or assembly 'Deedle, Version=1.0.7.0, Culture=neutral, PublicKeyToken=null' or one of its dependencies. The system cannot find the file specified.
File name: 'Deedle, Version=1.0.7.0, Culture=neutral, PublicKeyToken=null' ---> System.IO.FileNotFoundException: Could not load file or assembly 'Deedle, Version=1.0.7.0, Culture=neutral, PublicKeyToken=null' or one of its dependencies. The system cannot find the file specified.
File name: 'Deedle, Version=1.0.7.0, Culture=neutral, PublicKeyToken=null'

This also happens when I try to run multiple things in parallel, for example using:

let p3 = 
  [ for i in 0 .. 10 -> kmeans distance aggregator 3 cloudWorld ]
  |> Cloud.Parallel
  |> cluster.CreateProcess

Some things I noticed:

  • I wrote this without using Deedle and that worked fine
  • I changed the code so that it does not store Deedle values in CloudValue, but that did not help
  • After it starts failing, even simple things like cloud { return 1 + 1 } |> cluster.Run stop working (throw the same error). cluster.Reset() does not help - and I basically had to recreate the cluster.

Is there something obvious I'm missing? What should I do to get a more useful log for you to look at?

FsPickler fails to serialize job

Trying the following simple example (you can get the file here: http://1drv.ms/1Bdiv9L) fails on serialization:

open System
open System.IO

type Image = int[]
type Example = { Label:int; Image:Image }

let readFrom (path:string) = 
    path
    |> File.ReadAllLines
    |> fun lines -> lines.[1..]
    |> Array.map (fun line -> line.Split ',')
    |> Array.map (fun line -> 
        let label = line.[0] |> int
        let image = line.[1..] |> Array.map int
        { Label = label; Image = image })

let dataPath = __SOURCE_DIRECTORY__ + @"../../../data/"
let trainingPath = dataPath + @"trainingsample.csv"
let training = readFrom trainingPath

#load "credentials.fsx"

open MBrace.Core
open MBrace.Azure
open MBrace.Azure.Client
open MBrace.Flow

let cluster = Runtime.GetHandle(config)

let test =
    training 
    |> CloudFlow.OfArray
    |> CloudFlow.countBy (fun ex -> ex.Label)
    |> CloudFlow.toArray
    |> cluster.CreateProcess 

This fails with the following error:

MBrace.Core.FaultException: Failed to execute job '5084a1b388a143ff9c2883a36a29f083' ---> Nessos.FsPickler.FsPicklerException: Error serializing object of type 'MBrace.Azure.Runtime.PickledJob'. ---> System.IO.IOException: There is not enough space on the disk.

Note that the local equivalent runs without a problem:

let localTest = training |> Seq.countBy (fun ex -> ex.Label) |> Seq.toArray

Functions that take in CloudFiles are not obvious

There are many functions on e.g. CloudCell that take in CloudFiles but the method naming often simply says "File" or "Path"; I actually thought that these were local file system paths! Either this should be renamed + commented appropriately in the XML comments, or these methods could be typed to take in a CloudFile instead of just a string.

Question: Async --> Local

I've been looking at running a web server on an MBrace worker, just out of curiosity. That leads to a question about what we do if we lose the cloud { ... } or local { .. } monadic context and whether that context can be locally captured and restored.

In particular, the handlers for all web server frameworks are in terms of Task<T>, Async<T> or synchronous handlers.

I'd like these handlers to be able to "do things" on the cluster. Of course they could rebind to the cluster itself as if they were a client. But I'm wondering if there is any way that handlers (which are logically running as part of a cloud { ... }) can do Local or Cloud actions even though we're no longer in a cloud monadic context but rather in an async handler that was registered by the monadic context.

That is, can we allow handlers to do an invoke as if they were a local { ... }. That may require the equivalent of an unsafe-perform-IO or a Local.FromContinuations backdoor.

t's not necessarily something we want to encourage of course, I'm just wondering.

Here's the rough structure of the web server process:

let handler() = async { ... }  // some async handlers to handler requests

let server app = 
  cloud { 
    do! Cloud.Logf "starting web server job..." 

    // Get the endpoint we have to bind to 
    let httpEP = RoleEnvironment.CurrentRoleInstance.InstanceEndpoints.["Endpoint2"].IPEndpoint

   // Start the Suave server, binding to the endpoint
    do Async.Start (async { startWebServer config handler  })

    // Wait for cancellation
    ...
    do! Cloud.Logf "exiting..." 
    return logLines()
    }

Programming model documentation issues

Here are some issues with the programming model doc. (I couldn't find the source to contribute fixes directly.)

Section: Parallelism Combinators

Second example ends with return raise e. This has two problems:

  • raise doesn't return
  • To re-raise an exception, you should use reraise, which better preserves the stack for debugging.

Section: Example: Defining a MapReduce workflow

it does enable data parallelism

missing the word not

Best way to create a CloudVector where each item is very large (CloudVector.ofCloudCells or ...?)

I'm creating a CloudVector of type CloudVector<(DateTimeOffset * float) []> of overall size 100GB, where each entry is a 1GB time series data. Note I'm expecting to process each particular time series in-memory on a single worker.

I first created data as 100 cloud cells, each containing a (DateTimeOffset * float) [] (1 GB each). I had somehow expected to find something like this:

CloudVector.ofCloudCells : seq<CloudCell<T>> -> Local<CloudVector<T>>

I didn't immediately see how to create the CloudVector without having all the data in memory of a single worker at any point in time. I thought of using CloudVector.ofCloudFiles but this felt like it would be artificial.

In the end I used 100 instances of CloudVector.New [| oneArrayOfData |] followed by a CloudVector.Merge. But it feels like there should be another way to do this. And CloudVector.OfPartitions felt strangely misleading - from its name I though that's what I needed, but it wasn't.

Nuget packages fail to install

VS2013, trying to add MBrace.Azure 0-6-10-alpha via the NuGet extension (right-click add reference) to a fresh F# library project, getting this:

Attempting to resolve dependency 'MBrace.Core (= 0.9.12-alpha)'.
Attempting to resolve dependency 'MBrace.Runtime.Core (= 0.9.12-alpha)'.
Attempting to resolve dependency 'Vagabond (= 0.6.9)'.
Attempting to resolve dependency 'Mono.Cecil (= 0.9.6.1)'.
Attempting to resolve dependency 'FsPickler (โ‰ฅ 1.2.2 && < 1.3.0)'.
Attempting to resolve dependency 'FsPickler.Json'.
Attempting to resolve dependency 'Newtonsoft.Json (โ‰ฅ 6.0.5 && < 6.1.0)'.
Attempting to resolve dependency 'FsPickler (= 1.2.9)'.
Attempting to resolve dependency 'FsPickler (= 1.2.7)'.
Updating 'FsPickler 1.2.9' to 'FsPickler 1.2.7' failed. Unable to find a version of 'FsPickler.Json' that is compatible with 'FsPickler 1.2.7'.

Explicitly remove a worker from the pool

If you attach a local worker (for example), how can you remove it from the pool? If you just close the console window process, it still remains for a while as a worker in the pool. Work gets allocated to it, and jobs don't complete until the pool realises that the worker has died.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.