Giter Site home page Giter Site logo

flambo's Introduction

Flambo

Flambo

Flambo is a Clojure DSL for Apache Spark

Contents

Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala and Python, and an optimized engine that supports general execution graphs.

Flambo is a Clojure DSL for Spark. It allows you to create and manipulate Spark data structures using idiomatic Clojure.

"So that's where I came from." --Flambo

flambo 0.8.2 targets Spark 2.x flambo 0.7.2 targets Spark 1.x

Flambo is available from clojars. Depending on the version of Spark you're using, add one of the following to the dependences in your project.clj file:

With Leiningen

[yieldbot/flambo "0.8.2"] for Spark 2.x [yieldbot/flambo "0.7.2"] for Spark 1.x

Don't forget to add spark (and possibly your hadoop distribution's hadoop-client library) to the :provided profile in your project.clj file:

{:profiles {:provided
             {:dependencies
              [[org.apache.spark/spark-core_2.11 "2.2.0"]]}}}

It is necessary to AOT compile any namespaces which require flambo.api. You can AOT compile your application uberjar before running it in your spark cluster. This can easily accomplished by adding an :uberjar profile with {:aot :all} in it.

When working locally in a REPL, you'll want to AOT compile those namespaces as well. An easy way to do that is to add an :aot key to your :dev profile in your leiningen project.clj

:profiles {:dev
    {:aot [my.namespace my.other.namespace]}}

Flambo makes developing Spark applications quick and painless by utilizing the powerful abstractions available in Clojure. For instance, you can use the Clojure threading macro -> to chain sequences of operations and transformations.

The first step is to create a Spark configuration object, SparkConf, which contains information about your application. This is used to construct a SparkContext object which tells Spark how to access a cluster.

Here we create a SparkConf object with the string local to run in local mode:

(ns com.fire.kingdom.flambit
  (:require [flambo.conf :as conf])
  (:require [flambo.api :as f]))

(def c (-> (conf/spark-conf)
           (conf/master "local")
           (conf/app-name "flame_princess")))

(def sc (f/spark-context c))

The master url string parameter can be one of the following formats:

Master URL Meaning
spark://HOST:PORT Connect to a standalone Spark cluster master.
mesos://HOST:PORT Connect to a Mesos cluster.
local Use one worker thread to run Spark locally (no parallelism).
local[N] Use N worker threads to run Spark locally.
local[*] Use the same number of threads as cores to run Spark locally.
Only available for Spark 1.0.0+

For running on YARN, see running on YARN for details.

Hard-coding the value of master and other configuration parameters can be avoided by passing the values to Spark when running spark-submit (Spark 1.0.0) or by allowing spark-submit to read these properties from a configuration file. See Standalone Applications for information on running flambo applications and see Spark's documentation for more details about configuring Spark properties.

The main abstraction Spark provides is a resilient distributed dataset, RDD, which is a fault-tolerant collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

Parallelized Collections

Parallelized collections (RDDs) in flambo are created by calling the parallelize function on your Clojure data structure:

(ns com.fire.kingdom.flambit
  (:require [flambo.api :as f]))

(def data (f/parallelize sc [["a" 1] ["b" 2] ["c" 3] ["d" 4] ["e" 5]]))

Once initialized, the distributed dataset or RDD can be operated on in parallel.

An important parameter for parallel collections is the number of slices to cut the dataset into. Spark runs one task for each slice of the cluster. Normally, Spark tries to set the number of slices automatically based on your cluster. However, you can also set it manually in flambo by passing it as a third parameter to parallelize:

(def data (f/parallelize sc [1 2 3 4 5] 4))

External Datasets

Spark can create RDDs from any storage source supported by Hadoop, including the local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.

Text file RDDs can be created in flambo using the text-file function under the flambo.api namespace. This function takes a URI for the file (either a local path on the machine, or a hdfs://..., s3n://..., etc URI) and reads it as a collection of lines. Note, text-file supports S3 and HDFS globs.

(ns com.fire.kingdom.flambit
  (:require [flambo.api :as f]))

(def data (f/text-file sc "hdfs://hostname:<port>/home/user/data_archive/2013/12/23/*/*.bz2"))

RDDs support two types of operations:

  • transformations, which create a new dataset from an existing one
  • actions, which return a value to the driver program after running a computation on the dataset

To illustrate RDD basics in flambo, consider the following simple application using the sample data.txt file located at the root of the flambo repo.

(ns com.fire.kingdom.flambit
  (:require [flambo.api :as f]))

;; NOTE: we are using the flambo.api/fn not clojure.core/fn
(-> (f/text-file sc "data.txt")   ;; returns an unrealized lazy dataset
    (f/map (f/fn [s] (count s)))  ;; returns RDD array of length of lines
    (f/reduce (f/fn [x y] (+ x y)))) ;; returns a value, should be 1406

The first line defines a base RDD from an external file. The dataset is not loaded into memory; it is merely a pointer to the file. The second line defines an RDD of the lengths of the lines as a result of the map transformation. Note, the lengths are not immediately computed due to laziness. Finally, we run reduce on the transformed RDD, which is an action, returning only a value to the driver program.

If we also wanted to reuse the resulting RDD of length of lines in later steps, we could insert:

(f/cache)

before the reduce action, which would cause the line-lengths RDD to be saved to memory after the first time it is realized. See RDD Persistence for more on persisting and caching RDDs in flambo.

Spark’s API relies heavily on passing functions in the driver program to run on the cluster. Flambo makes it easy and natural to define serializable Spark functions/operations and provides two ways to do this:

  • flambo.api/defsparkfn: defines named functions:
(ns com.fire.kingdom.flambit
  (:require [flambo.api :as f]))

(f/defsparkfn square [x] (* x x))
  • flambo.api/fn: defines inline anonymous functions:
(ns com.fire.kingdom.flambit
  (:require [flambo.api :as f]))

(-> (f/parallelize sc [1 2 3 4 5])
    (f/map (f/fn [x] (* x x))))

When we evaluate this map transformation on the initial RDD, the result is another RDD. The result of this transformation can be seen using the f/collect action to return all of the elements of the RDD.

(-> (f/parallelize sc [1 2 3 4 5])
    (f/map (f/fn [x] (* x x)))
    f/collect)
;; => [1 4 9 16 25]

We can also use f/first or f/take to return just a subset of the data.

(-> (f/parallelize sc [1 2 3 4 5])
    (f/map square)
    (f/take 2))
;; => [1 4]

While most Spark operations work on RDDs containing any type of objects, a few special operations are only available on RDDs of key-value pairs. The most common ones are distributed "shuffle" operations, such as grouping or aggregating the elements by a key.

In flambo, these operations are available on RDDs of (key, value) tuples. Flambo handles all of the transformations/serializations to/from Tuple, Tuple2, JavaRDD, JavaPairRDD, etc., so you only need to define the sequence of operations you'd like to perform on your data.

The following code generates pairs of word and count using ft/tuple. We can then use the reduce-by-key operation on the pairs to count how many times each word occurs in a file:

(ns com.fire.kingdom.flambit
  (:require [flambo.api :as f]
            [flambo.tuple :as ft]
            [clojure.string :as s]))

(-> (f/text-file sc "data.txt")
    (f/flat-map (f/iterator-fn [l] (s/split l #" ")))
    (f/map-to-pair (f/fn [w] (ft/tuple w 1)))
    (f/reduce-by-key (f/fn [x y] (+ x y))))

After the reduce-by-key operation, we can sort the pairs alphabetically using f/sort-by-key. To collect the word counts as an array of objects in the repl or to write them to a filesysten, we can use the f/collect action:

(ns com.fire.kingdom.flambit
  (:require [flambo.api :as f]
            [flambo.tuple :as ft]
            [clojure.string :as s]))

(-> (f/text-file sc "data.txt")
    (f/flat-map (f/iterator-fn [l] (s/split l #" ")))
    (f/map-to-pair (f/fn [w] (ft/tuple w 1)))
    (f/reduce-by-key (f/fn [x y] (+ x y)))
    f/sort-by-key
    f/collect
    clojure.pprint/pprint)

Flambo supports the following RDD transformations:

  • map: returns a new RDD formed by passing each element of the source through a function.
  • map-to-pair: returns a new JavaPairRDD of (K, V) pairs by applying a function to all elements of an RDD.
  • reduce-by-key: when called on an RDD of (K, V) pairs, returns an RDD of (K, V) pairs where the values for each key are aggregated using a reduce function.
  • flat-map: similar to map, but each input item can be mapped to 0 or more output items (so the function should return a collection rather than a single item). NB: as of Spark 2.x, flat-map functions are expected to return a java.util.Iterator object. flambo.api/iterator-fn is provided so that you can continue to return a collection if desired.
  • filter: returns a new RDD containing only the elements of the source RDD that satisfy a predicate function.
  • join: when called on an RDD of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
  • left-outer-join: performs a left outer join of a pair of RDDs. For each element (K, V) in the first RDD, the resulting RDD will be a guava Optional either containing all pairs (K, (V, W)) for W in second RDD, or the pair (K, (V, nil)) if no elements in the second RDD have key K.
  • sample: returns a 'fraction' sample of an RDD, with or without replacement, using a random number generator 'seed'.
  • combine-by-key: combines the elements for each key using a custom set of aggregation functions. Turns an RDD of (K, V) pairs into a result of type (K, C), for a 'combined type' C. Note that V and C can be different -- for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]). Users must provide three functions:
    • createCombiner, which turns a V into a C (e.g., creates a one-element list)
    • mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
    • mergeCombiners, to combine two C's into a single one.
  • sort-by-key: when called on an RDD of (K, V) pairs where K implements ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified by the optional boolean ascending argument.
  • coalesce: decreases the number of partitions in an RDD to 'n'. Useful for running operations more efficiently after filtering down a large dataset.
  • group-by: returns an RDD of items grouped by the return value of a function.
  • group-by-key: groups the values for each key in an RDD into a single sequence.
  • flat-map-to-pair: returns a new JavaPairRDD by first applying a function to all elements of the RDD, and then flattening the results. NB: as of Spark 2.x, flat-map-to-pair functions are expected to return a java.util.Iterator object. flambo.api/iterator-fn is provided so that you can continue to return a collection if desired.

Flambo supports the following RDD actions:

  • reduce: aggregates the elements of an RDD using a function which takes two arguments and returns one. The function should be commutative and associative so that it can be computed correctly in parallel.
  • count-by-key: only available on RDDs of type (K, V). Returns a map of (K, Int) pairs with the count of each key.
  • foreach: applies a function to all elements of an RDD.
  • fold: aggregates the elements of each partition, and then the results for all the partitions using an associative function and a neutral 'zero value'.
  • first: returns the first element of an RDD.
  • count: returns the number of elements in an RDD.
  • collect: returns all the elements of an RDD as an array at the driver process.
  • distinct: returns a new RDD that contains the distinct elements of the source RDD.
  • take: returns an array with the first n elements of the RDD.
  • glom: returns an RDD created by coalescing all elements of the source RDD within each partition into a list.
  • cache: persists an RDD with the default storage level ('MEMORY_ONLY').

Flambo supports the following tuple functions:

  • key-val-fn: When dealing with functions that produce tuples, key-val-fn will destrucure tuples into (K, V) and call the supplied function with those arguments.
  • key-val-val-fn: When dealing with tuples of the structure (K, (Tuple2(V1, V2)), calls the supplied function with K, V1 and V2.

To see an example of these functions in use, check out the tf-idf example.

Spark provides the ability to persist (or cache) a dataset in memory across operations. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it. Caching is a key tool for iterative algorithms and fast interactive use. Like Spark, flambo provides the functions f/persist and f/cache to persist RDDs. f/persist sets the storage level of an RDD to persist its values across operations after the first time it is computed. Storage levels are available in the flambo.api/STORAGE-LEVELS map. This can only be used to assign a new storage level if the RDD does not have a storage level set already. cache is a convenience function for using the default storage level, 'MEMORY_ONLY'.

(ns com.fire.kingdom.flambit
  (:require [flambo.api :as f]))

(let [line-lengths (-> (f/text-file sc "data.txt")
                       (f/map (f/fn [s] (count s)))
                       f/cache)]
  (-> line-lengths
      (f/reduce (f/fn [x y] (+ x y)))))

To run your flambo application as a standalone application using the Spark API, you'll need to package your application in an uberjar using lein and execute it with:

  • SPARK_CLASSPATH, if running Spark 0.9.1
  • ./bin/spark-submit, if running Spark 1.0.0 or greater
$ lein uberjar
...

$ SPARK_CLASSPATH=uberjar.jar spark-class com.some.class.with.main --flag1 arg1 --flag2 arg2
...
<output>

$ spark-submit --class com.some.class.with.main uberjar.jar --flag1 arg1 --flag2 arg2
...
<output>

Flambo requires that Spark is configured to use kryo for serialization. This is configured by default using system properties.

If you need to register custom serializers, extend flambo.kryo.BaseFlamboRegistrator and override its register method. Finally, configure your SparkContext to use your custom registrator by setting spark.kryo.registrator to your custom class.

There is a convenience macro for creating registrators, flambo.kryo.defregistrator. The namespace where a registrator is defined should be AOT compiled.

Here is an Example (this won't work in your REPL):

(ns com.fire.kingdom.flambit
  (:require [flambo.kryo :as kryo])
  (:import [flameprincess FlamePrincessHeat FlamePrincessHeatSerializer]))

(kryo/defregistrator flameprincess [this kryo]
  (.register kryo FlamePrincessHeat (FlamePrincessHeatSerializer.)))

(def c (-> (conf/spark-conf)
       (conf/set "spark.kryo.registrator" flameprincess)))

Thanks to The Climate Corporation and their open source project clj-spark which served as the starting point for this project.

Thanks to Ben Black for doing the work on the streaming api.

There is a #flambo channel available for support on the Clojurians Slack as well as a flambo-users google group.

YourKit

YourKit has generously supplied an open source license for their profiler to improve the performance of Flambo.

YourKit supports open source projects with its full-featured Java Profiler. YourKit, LLC is the creator of YourKit Java Profiler and YourKit .NET Profiler, innovative and intelligent tools for profiling Java and .NET applications.

Copyright © 2014,2015 Yieldbot, Inc.

Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.

flambo's People

Contributors

alexrobbins avatar anujsrc avatar ararog avatar arnaudsj avatar arrdem avatar b avatar chetmancini avatar davidwclin avatar dougselph avatar johnchapin avatar kul avatar lambda-knight avatar leon-barrett avatar mbaig avatar morganp4 avatar mthomure avatar oubiwann avatar plandes avatar severeoverfl0w avatar slipset avatar sorenmacbeth avatar strongh avatar viksit avatar whoahbot avatar zero323 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flambo's Issues

Test not passing

Hi,

I am starting to discover flambo as I am interested to use it to run some NLP process with Spark using RDD provided by Elastic Search via elasticsearch-hadoop. However I noticed that out of the box I am having issues running the basic midje tests. I get the following:

$ lein midje
2014-08-23 20:36:10.295 java[23244:1903] Unable to load realm info from SCDynamicStore

FAIL "about transformations - join returns an RDD of (K, (V, W)) pairs with all pairs of elements of each key when called on RDDs of type (K, V) and (K, W)" at (api_test.clj:103)
    Expected: [["key3" [[5] [33]]] ["key4" [[1] [44]]] ["key1" [[2] [22]]]]
      Actual: [["key1" [[2] [22]]] ["key3" [[5] [33]]] ["key4" [[1] [44]]]]

FAIL "about transformations - left-outer-join returns an RDD of (K, (V, W)) when called on RDDs of type (K, V) and (K, W)" at (api_test.clj:120)
    Expected: [["key3" [[5] [33]]] ["key4" [[1] [44]]] ["key5" [[2] nil]] ["key1" [[2] [22]]] ["key2" [[3] nil]]]
      Actual: [["key1" [[2] [22]]] ["key2" [[3] nil]] ["key3" [[5] [33]]] ["key4" [[1] [44]]] ["key5" [[2] nil]]]

FAIL "about transformations - sample returns a fraction of the RDD, with/without replacement,\n        using a given random number generator seed" at (api_test.clj:139)
    Expected: [1 4 7 11 14]
      Actual: [1 2 13]

FAIL "about transformations - group-by-key" at (api_test.clj:175)
    Expected: [["key3" [5]] ["key1" [1 2]] ["key2" [3 4]]]
      Actual: [["key1" [1 2]] ["key2" [3 4]] ["key3" [5]]]

FAIL "about actions - glom returns an RDD created by coalescing all elements within each partition into a list" at (api_test.clj:255)
    Expected: [[[1]] [[2]] [[3]] [[4] [5]]]
      Actual: [[] [[1]] [] [[2]] [[3]] [] [[4]] [[5]]]
FAILURE: 5 checks failed.  (But 31 succeeded.)
Subprocess failed

Any ideas? Is it happening for anybody else? I am using the latest Spark stable (1.0.2)

Thank you in advance!

Flambo on Mesos

Hi there,

Enjoying Flambo quite a bit.
I am trying to move stuff to mesos, got the following code:

    (def conf-messos (->
                        (conf/spark-conf)
                        (conf/set "spark.mesos.executor.home" "../../Downloads/spark-1.4.0-bin-hadoop2.4")
                        (conf/set "spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                        (conf/master "mesos://172.16.2.8:5050")
                        (conf/app-name "mesos-test")))

      (f/with-context c conf-messos
       (-> (f/parallelize sc [1 2 3 4 5 6])
                (f/map (f/fn [x] (* 2 x)))
                f/collect
                vec))

And keep on bumping in the following:

15/07/03 19:09:19 WARN fn: caught exception: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (clojure.lang.DynamicClassLoader)
classLoader (akka.actor.ReflectiveDynamicAccess)
_pm (akka.actor.ActorSystemImpl)
actorSystem (org.apache.spark.SparkEnv)
env (org.apache.spark.api.java.JavaSparkContext)
15/07/03 19:09:19 WARN fn: unable to call serializable.fn$serialize@31f35c90 on org.apache.spark.api.java.JavaSparkContext@6adfdaa3 in {"c" #object[org.apache.spark.api.java.JavaSparkContext 0x6adfdaa3 "org.apache.spark.api.java.JavaSparkContext@6adfdaa3"]}

and then a bit later:

15/07/03 19:09:19 WARN TaskSetManager: Lost task 1.0 in stage 2.0 (TID 17, slave1): java.lang.IllegalStateException: unread block data
    at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Any idea where to start looking ? or debugging ?

Exception: Unable to resolve symbol: `model` in this context

Hi guys,
we having a strange problem here.

We are running spark-streaming to do real-time recommendations. So we train a model based on ALS with batch processes, then we export the model and during the streaming we just 'score' the items based on the trained model.
In order to do so we need to Broadcast the model to the workers, and in the mappers execute the recommendProducts.
However when we try to build a mapper function which closes over the broadcast model it seems to have some issues during the 'deserialization' of the locals.

We tried to reduce the code to the shortest possible example and we built a similar streaming job which rather than consuming from Kafka, just receives the stream from a socket port (similar to the hello-world spark streaming example), so here is code sample

(ns flambo-test.core
  (:require [flambo.conf :as conf])
  (:require [flambo.api :as f])
  (:require [flambo.broadcast :as b]
            [flambo.streaming :as fstream])
  (:import  [org.apache.spark.mllib.recommendation MatrixFactorizationModel]
            [org.apache.spark.api.java JavaSparkContext]
            [org.apache.spark.streaming.dstream ConstantInputDStream]
            [scala.reflect ClassTag$]
            )
  (:gen-class))


(defn spark-config []
  (-> (conf/spark-conf)
      (conf/master "local[*]")
      (conf/set "spark.broadcast.factory" "org.apache.spark.broadcast.HttpBroadcastFactory")
      (conf/app-name "Flambo Test")))


(defn context [config]
  (fstream/streaming-context config 2000))

;;
;; HERE loading the trained model
;; and broadcast it
;;
(defn load-model [context model-path]
  (let [sc (.sparkContext context)]
    (b/broadcast sc
                 (MatrixFactorizationModel/load
                  (JavaSparkContext/toSparkContext sc)
                  model-path))))


(defn to-long [^String s] (Long/parseLong s))

;;
;; HERE SEEMS TO BE THE PROBLEM
;; We are creating a function to map over the stream
;; which closes over the *model* for every 
;; incoming value we run `recommendProducts`
;; and then we will use the output to extract
;; recommended items. However when running this
;; we are getting the following exception
;; Exception: Unable to resolve symbol: `model` in this context
;;
(defn do-model-scoring
  [^org.apache.spark.broadcast.Broadcast model]
  (f/fn [x]
    (let [m (b/value model)]
      (.recommendProducts m (to-long x) 10))))


(defn input-stream [context host port]
  (.socketTextStream context host port))


(defn do-score-stream! [stream b-model]
  (-> stream
      ;; scoring the items 
      (fstream/map (do-model-scoring b-model) )
      ;; just printing the output for testing purposes
      (fstream/print)))

Now here is the interesting part:

  • When we run the example as follow it seems to be working
  ;; using global vars works every time

  (def ssc (context (spark-config)))

  ;; `model` is defined globally
  (def model (load-model ssc "/tmp/model"))

  (do-score-stream!
   (input-stream ssc "localhost" 9999) model)

  (.start ssc)

  ;; stop it with:
  ;; (.stop ssc)
  • When the model is in a closure (wraps over a local value) it works on the first item but not on any subsequent item, and the Exception: Unable to resolve symbol: model in this context is thrown.
  ;; using a local `model` works only the first time

  (def ssc (context (spark-config)))

  ;; here `model` is a local binding.
  (let [model (load-model ssc "/tmp/model")]
    (do-score-stream!
     (input-stream ssc "localhost" 9999) model))

  (.start ssc)

  ;; stop it with:
  ;; (.stop ssc)

This is the exception we get in the from the second item onwards

java.lang.RuntimeException: Unable to resolve symbol: model in this context, compiling:(NO_SOURCE_PATH:38:13)
    at clojure.lang.Compiler.analyze(Compiler.java:6543)
    at clojure.lang.Compiler.analyze(Compiler.java:6485)
    at clojure.lang.Compiler$InvokeExpr.parse(Compiler.java:3791)
    at clojure.lang.Compiler.analyzeSeq(Compiler.java:6725)
    at clojure.lang.Compiler.analyze(Compiler.java:6524)
    at clojure.lang.Compiler.access$300(Compiler.java:38)
    at clojure.lang.Compiler$LetExpr$Parser.parse(Compiler.java:6129)
    at clojure.lang.Compiler.analyzeSeq(Compiler.java:6723)
    at clojure.lang.Compiler.analyze(Compiler.java:6524)
    at clojure.lang.Compiler.analyzeSeq(Compiler.java:6711)
    at clojure.lang.Compiler.analyze(Compiler.java:6524)
    at clojure.lang.Compiler.analyze(Compiler.java:6485)
    at clojure.lang.Compiler$BodyExpr$Parser.parse(Compiler.java:5861)
    at clojure.lang.Compiler$FnMethod.parse(Compiler.java:5296)
    at clojure.lang.Compiler$FnExpr.parse(Compiler.java:3925)
    at clojure.lang.Compiler.analyzeSeq(Compiler.java:6721)
    at clojure.lang.Compiler.analyze(Compiler.java:6524)
    at clojure.lang.Compiler.analyzeSeq(Compiler.java:6711)
    at clojure.lang.Compiler.analyze(Compiler.java:6524)
    at clojure.lang.Compiler.analyze(Compiler.java:6485)
    at clojure.lang.Compiler$InvokeExpr.parse(Compiler.java:3791)
    at clojure.lang.Compiler.analyzeSeq(Compiler.java:6725)
    at clojure.lang.Compiler.analyze(Compiler.java:6524)
    at clojure.lang.Compiler.analyzeSeq(Compiler.java:6711)
    at clojure.lang.Compiler.analyze(Compiler.java:6524)
    at clojure.lang.Compiler.analyze(Compiler.java:6485)
    at clojure.lang.Compiler$BodyExpr$Parser.parse(Compiler.java:5861)
    at clojure.lang.Compiler$LetExpr$Parser.parse(Compiler.java:6179)
    at clojure.lang.Compiler.analyzeSeq(Compiler.java:6723)
    at clojure.lang.Compiler.analyze(Compiler.java:6524)
    at clojure.lang.Compiler.analyze(Compiler.java:6485)
    at clojure.lang.Compiler$BodyExpr$Parser.parse(Compiler.java:5861)
    at clojure.lang.Compiler$FnMethod.parse(Compiler.java:5296)
    at clojure.lang.Compiler$FnExpr.parse(Compiler.java:3925)
    at clojure.lang.Compiler.analyzeSeq(Compiler.java:6721)
    at clojure.lang.Compiler.analyze(Compiler.java:6524)
    at clojure.lang.Compiler.eval(Compiler.java:6779)
    at clojure.lang.Compiler.eval(Compiler.java:6745)
    at clojure.core$eval.invoke(core.clj:3081)
    at serializable.fn$fn__362$fn__370.invoke(fn.clj:204)
    at serializable.fn$fn__362.invoke(fn.clj:203)
    at clojure.lang.MultiFn.invoke(MultiFn.java:233)
    at serializable.fn$deserialize.invoke(fn.clj:165)
    at clojure.lang.AFn.applyToHelper(AFn.java:154)
    at clojure.lang.AFn.applyTo(AFn.java:144)
    at clojure.core$apply.invoke(core.clj:630)
    at clojure.core.memoize$through_STAR_$fn__813.invoke(memoize.clj:66)
    at clojure.core.cache$through$fn__619.invoke(cache.clj:55)
    at clojure.core.memoize$through_STAR_$fn__809$fn__810.invoke(memoize.clj:65)
    at clojure.core.memoize$d_lay$reify__804.deref(memoize.clj:54)
    at clojure.core$deref.invoke(core.clj:2206)
    at clojure.core.memoize$build_memoizer$fn__864.doInvoke(memoize.clj:152)
    at clojure.lang.RestFn.applyTo(RestFn.java:137)
    at clojure.lang.AFunction$1.doInvoke(AFunction.java:29)
    at clojure.lang.RestFn.invoke(RestFn.java:408)
    at flambo.function$Function_call$fn__985.invoke(function.clj:83)
    at flambo.function$Function_call.doInvoke(function.clj:82)
    at clojure.lang.RestFn.invoke(RestFn.java:423)
    at flambo.function.Function.call(Unknown Source)
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1027)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1298)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1298)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Unable to resolve symbol: model in this context
    at clojure.lang.Util.runtimeException(Util.java:221)
    at clojure.lang.Compiler.resolveIn(Compiler.java:7019)
    at clojure.lang.Compiler.resolve(Compiler.java:6963)
    at clojure.lang.Compiler.analyzeSymbol(Compiler.java:6924)
    at clojure.lang.Compiler.analyze(Compiler.java:6506)
    ... 82 more
  • Finally if i declare model as a global var, it works again
  ;; hack to make it work

  ;; define a global `model'
  (def model nil)
  (def ssc (context (spark-config)))

  (let [m (load-model ssc "/tmp/model")]
    ;; update the global var `model'
    (alter-var-root #'model (constantly m))
    (do-score-stream!
     ;; use the global `model' rather than the local
     (input-stream ssc "localhost" 9999) model))

  (.start ssc)

  ;; stop it with:
  ;; (.stop ssc)

We are using Flambo 0.6.0 with Spark 1.5.0. We tried Spark 1.4.1 with the same result.
Can anyone help us to fix this issue?

Using sparksql rows in flambo

Is sparksql row in flambo a map? I'm writting a function which parse a log line using regex and create a row based on match groups, these rows will be used in a set of RDDs for further processing.

Any hints?

Add support for RDD/min and max

It would be nice to have support to RDD min and max, I've been working on convert a pyspark script into a clojure/flambo code but I got stucked with this missing actions.

No matching method found for join

Walking through the tutorial, I get:

user=> (def tfidf-by-term (-> (f/join tf-by-doc idf-by-term)
#=> (f/map (f/fn [[term [[doc-id tf] idf]]]
#
=> [doc-id term (* tf idf)]))
#_=> f/cache))

CompilerException java.lang.IllegalArgumentException: No matching method found: join for class org.apache.spark.api.java.JavaRDD, compiling:(form-init8987457259874014740.clj:1:24)

at the join step, the one with the heading TF-IDF.

I would love to see a version of the tutorial that users could walk through step by step. If it is any help, I've started modification of the existing tutorial (https://github.com/senor-hadoop/flambo/blob/develop/Tutorial.md) to pass along to others on my team.

Using Flambo as a checkout dependency

TL;DR I'm working on a project that uses Flambo. As I work, I would like to be able to add new features to Flambo as I need them. They could then eventually be submitted back to Flambo.

I am trying to use checkout dependencies in Leiningen but have so far been foiled because of the slash (/) in the project's name (checkout dependencies require you to make a symlink to a checkout of the repo)

More info here: http://stackoverflow.com/questions/26707540/leiningen-checkouts-when-library-has-a-slash-in-its-name

It's possible I'm doing it wrong, in which case I would really appreciate it if you could show me the right way.

How to define custom keys and Partitioners?

I have a scala program in which I define custom keys and create custom partitioners. How can I do this using flambo?

Specifically I want the following code to be transformed into clojure using flambo:

case class RFMCKey(cId: String, R: Double, F: Double, M: Double, C: Double)
class RFMCPartitioner(partitions: Int) extends Partitioner {
 require(partitions >= 0, "Number of partitions ($partitions) cannot be negative.")
 override def numPartitions: Int = partitions
 override def getPartition(key: Any): Int = {
 val k = key.asInstanceOf[RFMCKey]
 k.cId.hashCode() % numPartitions
 }
}
object RFMCKey {
 implicit def orderingByIdAirportIdDelay[A <: RFMCKey] : Ordering[A] = {
    Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1))
 }
}

How can I do this?

Unable to execute Spark jobs on a cluster

Executing a Spark job on a cluster throws errors like this.

java.io.IOException: unexpected exception type
    at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:724)
Caused by: org.apache.spark.SparkException: Failed to invoke flambo.kryo.BaseFlamboRegistrator
    at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:95)
    at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:84)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:84)
    at org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:148)
    at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:110)
    at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:234)
    at org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:169)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    ... 14 more
Caused by: java.lang.ClassNotFoundException: flambo.kryo.BaseFlamboRegistrator
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:270)
    at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:87)
    ... 26 more

The same job executes successfully when only one node is used, pointing strongly at the use of Kryo serialization for internode communication.

This behavior seems to hold on Spark 1.0.0, 1.0.2, and 1.1.0 (RC not final).

From my research, this appears to be related to these discussions:
http://apache-spark-developers-list.1001551.n3.nabble.com/bug-using-kryo-as-closure-serializer-td6473.html
https://issues.apache.org/jira/browse/SPARK-2878
https://github.com/GrahamDennis/spark-kryo-serialisation

These links imply that this should be fixed in 1.1.0, but my testing does not agree with that (using the RC, not the final 1.1.0).

Running on mesos with lein run

I've been trying to run the tfidf example on mesos and have been running into issues when doing so through lein, throwing this error:

Exception in thread "Thread-29" java.lang.NoSuchMethodError: org.apache.mesos.Protos$ExecutorInfo$Builder.setData(Lorg/apache/mesos/protobuf/ByteString;)Lorg/apache/mesos/Protos$ExecutorInfo$Builder;
    at org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend.createExecutorInfo(MesosSchedulerBackend.scala:130)
    at org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend.createMesosTask(MesosSchedulerBackend.scala:270)
    at org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend$$anonfun$resourceOffers$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$1.apply(MesosSchedulerBackend.scala:232)
    at org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend$$anonfun$resourceOffers$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$1.apply(MesosSchedulerBackend.scala:226)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend$$anonfun$resourceOffers$1$$anonfun$apply$mcV$sp$2.apply(MesosSchedulerBackend.scala:226)
    at org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend$$anonfun$resourceOffers$1$$anonfun$apply$mcV$sp$2.apply(MesosSchedulerBackend.scala:225)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend$$anonfun$resourceOffers$1.apply$mcV$sp(MesosSchedulerBackend.scala:225)
    at org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend.inClassLoader(MesosSchedulerBackend.scala:172)
    at org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend.resourceOffers(MesosSchedulerBackend.scala:188)

For context, it works when running the example exactly as is with master set to local[*], and also when I send the uberjar to mesos using spark-submit. The only change from the example code is changing master to point to the mesos master, and adding a spark.mesos.executor.home to the conf.

Any ideas what might be going wrong?

No matching method found: reduceByKey

Code that was working previously has stopped working with 0.5.0-SNAPSHOT.

(f/reduce-by-key
  (f/map collection (f/fn [[k _]] [k 1]))
  (f/fn [a b] (+ a b)))

Exception in thread "main" java.lang.IllegalArgumentException: No matching method found: reduceByKey for class org.apache.spark.api.java.JavaRDD
  at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:53)
  at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28)
  at flambo.api$reduce_by_key.invoke(api.clj:249)

Versions:

[yieldbot/flambo "0.5.0-SNAPSHOT"]
[org.apache.spark/spark-core_2.10 "1.2.1"]

Any ideas?

Problem running on spark 1.1.0 with 0.4.0-SNAPSHOT

First of all, great job so far on Flambo. It's been a pleasure learning and working with it. I'm trying to run a job on a new/fresh Spark 1.1.0 cluster. I'm using the spark-ec2 script that is bundles with spark-1.1.0-bin-hadoop, but am getting an exception:

Exception in thread "main" java.lang.IncompatibleClassChangeError: Found class org.apache.spark.serializer.Serializer, but interface was expected

my project.clj and the code I'm trying to run is posted here:

https://www.refheap.com/e9d5f99a8848f1969bbc1bab6

the error/stack trace I'm getting is posted here with most of the noise cut out:

https://www.refheap.com/d6d9b9ee6d55aabbb6ffe4bbe

if I take out the f/map, f/flat-map, etc....and just run text-file-with-splits and f/save-as-text-file, everything is fine.....

Basic Installation Broken

Gents,

Created a new project with lein, updated project.clj to include:

(defproject learn-clj-spark "0.1.0-SNAPSHOT"
:description "Learn Spark with Clojure"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.6.0"]
[yieldbot/flambo "0.3.2"]]
:main ^:skip-aot learn-clj-spark.core
:target-path "target/%s"
:profiles {:uberjar {:aot :all}})

Started a REPL (lein repl)"

learn-clj-spark.core=> (ns com.fire.kingdom.flambit
#=> (:require [flambo.conf :as conf])
#
=> (:require [flambo.api :as f]))

ClassNotFoundException org.apache.spark.SparkConf java.net.URLClassLoader$1.run (URLClassLoader.java:372)

Tutorial also broken:

user=> (require '[flambo.api :as f])

FileNotFoundException Could not locate flambo/api__init.class or flambo/api.clj on classpath: clojure.lang.RT.load (RT.java:443)
user=> Bye for now!

It would be great if we could get a step-by-step working version of flambo for those that want to learn clojure and spark together. The combination seems a natural fit.

Help to adding missing APIs

Hi there - great lib thanks. (Would it be better to do it with introspection like amazonica?)

Anyway, I need to add some missing APIs (hadoopFile) but I'm having trouble understanding how it works (yep, I'm new to clojure).

When I do:

(-> sc
    (.hadoopFile "/tmp/mmem/*.csv" org.apache.hadoop.mapred.TextInputFormat org.apache.hadoop.io.LongWritable org.apache.hadoop.io.Text)
    (f/collect))

the result is a vector of Tuple2s rather than something I can de-structure.

As a reference point, the definition of flambo.api/text-file doesn't really give me any clues about why this won't work.

Can I trade some guidance for a pull request?

Many thanks!
Alister.

Topic map in flambo.streaming/kafka-stream

I did not quite understand the way topics are passed to the function flambo.streaming/kafka-stream.
If I pass a topic map like {"test" 1}, what does the 1 stand for?

Flambo Stand-Alone Applications Broken

Flambo applications run in stand-alone mode, whether in YARN(--master yarn-cluster, --master yarn-client), stand-alone cluster (--master spark://...) or locally (--master local[*]), all fail with the nearly the same stacktrace found below. Essentially, nothing with 'spark-submit' works. For those wanting to try YARN mode, an out-of-the-box YARN cluster can be found within the Hortonworks sandbox (a virtual machine): http://hortonworks.com/products/hortonworks-sandbox/

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/hadoop/yarn/local/usercache/snunez/filecache/10/spark-assembly-1.2.1.2.2.4.2-2-hadoop2.6.0.2.2.4.2-2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/hdp/2.2.4.2-2/hadoop/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
15/07/25 03:05:38 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT]
15/07/25 03:05:39 INFO yarn.ApplicationMaster: ApplicationAttemptId: appattempt_1437781501203_0001_000001
15/07/25 03:05:40 INFO spark.SecurityManager: Changing view acls to: yarn,snunez
15/07/25 03:05:40 INFO spark.SecurityManager: Changing modify acls to: yarn,snunez
15/07/25 03:05:40 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, snunez); users with modify permissions: Set(yarn, snunez)
15/07/25 03:05:40 INFO yarn.ApplicationMaster: Starting the user JAR in a separate Thread
15/07/25 03:05:40 INFO yarn.ApplicationMaster: Waiting for spark context initialization
15/07/25 03:05:40 INFO yarn.ApplicationMaster: Waiting for spark context initialization ... 0
Exception in thread "Driver" java.lang.ExceptionInInitializerError
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:278)
at clojure.lang.RT.loadClassForName(RT.java:2093)
at clojure.lang.RT.load(RT.java:430)
at clojure.lang.RT.load(RT.java:411)
at clojure.core$load$fn__5066.invoke(core.clj:5641)
at clojure.core$load.doInvoke(core.clj:5640)
at clojure.lang.RestFn.invoke(RestFn.java:408)
at clojure.core$load_one.invoke(core.clj:5446)
at clojure.core$load_lib$fn__5015.invoke(core.clj:5486)
at clojure.core$load_lib.doInvoke(core.clj:5485)
at clojure.lang.RestFn.applyTo(RestFn.java:142)
at clojure.core$apply.invoke(core.clj:626)
at clojure.core$load_libs.doInvoke(core.clj:5524)
at clojure.lang.RestFn.applyTo(RestFn.java:137)
at clojure.core$apply.invoke(core.clj:626)
at clojure.core$require.doInvoke(core.clj:5607)
at clojure.lang.RestFn.invoke(RestFn.java:551)
at flambo.api$loading__4958__auto__.invoke(api.clj:12)
at flambo.api__init.load(Unknown Source)
at flambo.api__init.(Unknown Source)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:278)
at clojure.lang.RT.loadClassForName(RT.java:2093)
at clojure.lang.RT.load(RT.java:430)
at clojure.lang.RT.load(RT.java:411)
at clojure.core$load$fn__5066.invoke(core.clj:5641)
at clojure.core$load.doInvoke(core.clj:5640)
at clojure.lang.RestFn.invoke(RestFn.java:408)
at clojure.core$load_one.invoke(core.clj:5446)
at clojure.core$load_lib$fn__5015.invoke(core.clj:5486)
at clojure.core$load_lib.doInvoke(core.clj:5485)
at clojure.lang.RestFn.applyTo(RestFn.java:142)
at clojure.core$apply.invoke(core.clj:626)
at clojure.core$load_libs.doInvoke(core.clj:5524)
at clojure.lang.RestFn.applyTo(RestFn.java:137)
at clojure.core$apply.invoke(core.clj:626)
at clojure.core$require.doInvoke(core.clj:5607)
at clojure.lang.RestFn.invoke(RestFn.java:512)
at ict.core$loading__4958__auto__.invoke(core.clj:1)
at ict.core__init.load(Unknown Source)
at ict.core__init.(Unknown Source)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:278)
at clojure.lang.RT.loadClassForName(RT.java:2093)
at clojure.lang.RT.load(RT.java:430)
at clojure.lang.RT.load(RT.java:411)
at clojure.core$load$fn__5066.invoke(core.clj:5641)
at clojure.core$load.doInvoke(core.clj:5640)
at clojure.lang.RestFn.invoke(RestFn.java:408)
at clojure.lang.Var.invoke(Var.java:379)
at ict.core.(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:445)
Caused by: java.lang.ClassNotFoundException: flambo.WriterOutputStream
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:195)
at flambo.utils$loading__4958__auto__.invoke(utils.clj:1)
at flambo.utils__init.load(Unknown Source)
at flambo.utils__init.(Unknown Source)
... 57 more

Clojure 1.5.1 Compatible

I am not sure if this is due to my config, but I get following error when forms containing flambo api (mainly f/fn) are aot compiled with clojure 1.5.1. Doing lein check is enough to reproduce this in my case.

Caused by: java.lang.NoSuchMethodError: clojure.lang.Reflector.invokeNoArgInstanceMember(Ljava/lang/Object;Ljava/lang/String;Z)Ljava/lang/Object;
        at serializable.fn$save_env$iter__205__209$fn__210.invoke(fn.clj:14)
        at clojure.lang.LazySeq.sval(LazySeq.java:42)
        at clojure.lang.LazySeq.seq(LazySeq.java:60)
        at clojure.lang.RT.seq(RT.java:484)
        at clojure.lang.LazilyPersistentVector.create(LazilyPersistentVector.java:31)
        at clojure.core$vec.invoke(core.clj:354)
        at serializable.fn$save_env.invoke(fn.clj:15)
        at serializable.fn$fn.doInvoke(fn.clj:22)

I think it has something to do with http://dev.clojure.org/jira/browse/CLJ-1363. Could you please check this?

Clojure 1.5.1 runtime error

This is related to #9 . Although the error does not show during compile time but same error pops up during runtime when a f/fn is involved. e.g.

  (def sc (f/spark-context
            (-> (conf/spark-conf)
                (conf/master "local")
                (conf/app-name "foo"))))

  (-> (f/parallelize sc [1 2 3 4 5])
      (f/map (f/fn [x] (inc x)))
      (f/first))

Examples using Spark Streaming?

Hi, I am looking at an example project using Flambo, which demonstrates how to create both a streaming producer and consumer for demo purposes.

Ultimately I would like to write a Twitter stream producer, and a NLP consumer.

Any pointers?

Thank you in advance!

Streaming reduce by key doesn't work

In the reduce by key function for dstreams,

(defn reduce-by-key [dstream f]
    (-> dstream
      (.map (pair-function identity))
      (.reduceByKey (function2 f))
      (.map (function f/untuple))))

I get an error saying

Exception in thread "main" java.lang.ClassCastException: Cannot cast flambo.function.PairFunction to org.apache.spark.api.java.function.Function

I'm not entirely sure what's going on here, but perhaps someone can shed some light?

@b - I presume this is something you wrote, so mentioning you here.

Thanks!

JavaDoubleRDD Strangeness

Hi All,

I've run across a very odd situation where I encounter an arity exception when calling a method on an RDD created by flambo in certain circumstances. Here's a REPL extract:

project.core> (.mean d)
SparkException Job aborted due to stage failure: Task 7 in stage 733.0 failed 1 times, most recent failure: Lost task 7.0 in stage 733.0 (TID 3642, localhost): clojure.lang.ArityException: Wrong number of args (1) passed to: core/eval7755/fn--7756

project.core> (class d)
org.apache.spark.api.java.JavaDoubleRD

project.core> (.mean u)
-0.0013009480872174827

project.core> (class u)
org.apache.spark.api.java.JavaDoubleRDD

Any ideas on why one instance of the type would throw an arity exception but not another?

I've created a small example that demonstrates this problem at the REPL.

[Query] Buffer Underflow in Kryo

Hello @sorenmacbeth ,

I recently saw some kryo buffer underflow errors in some of my jobs with a slightly older build of my fork where your recent commits of chill upgrade were not there, with spark 1.2.0. But those disappeared with 0.5.0-SNAPSHOT. Did you also get this error? I was wondering what would be cause for it?

Please close this anytime as its not really an issue just a question.

Thanks

Accessing yarn-site.xml

Hi.

I have my yarn-site.xml set as an environment variable, however I see that flambo doesn't access this, as such it's not connecting properly to my resourcemanager:

14/10/15 09:46:56 INFO Remoting: Starting remoting
14/10/15 09:46:56 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:52256]
14/10/15 09:46:56 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:52256]
14/10/15 09:46:56 INFO Utils: Successfully started service 'sparkDriver' on port 52256.
14/10/15 09:46:56 INFO SparkEnv: Registering MapOutputTracker
14/10/15 09:46:56 INFO SparkEnv: Registering BlockManagerMaster
14/10/15 09:46:56 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20141015094656-3d3e
14/10/15 09:46:57 INFO Utils: Successfully started service 'Connection manager for block manager' on port 50860.
14/10/15 09:46:57 INFO ConnectionManager: Bound socket to port 50860 with id = ConnectionManagerId(master.tyme-data.com,50860)
14/10/15 09:46:57 INFO MemoryStore: MemoryStore started with capacity 943.6 MB
14/10/15 09:46:57 INFO BlockManagerMaster: Trying to register BlockManager
14/10/15 09:46:57 INFO BlockManagerMasterActor: Registering block manager master.tyme-data.com:50860 with 943.6 MB RAM
14/10/15 09:46:57 INFO BlockManagerMaster: Registered BlockManager
14/10/15 09:46:57 INFO HttpFileServer: HTTP File server directory is /tmp/spark-ab516f61-8eb7-4ab2-a523-a767f4571583
14/10/15 09:46:57 INFO HttpServer: Starting HTTP Server
14/10/15 09:46:57 INFO Utils: Successfully started service 'HTTP file server' on port 39485.
14/10/15 09:46:57 INFO Utils: Successfully started service 'SparkUI' on port 4040.
14/10/15 09:46:57 INFO SparkUI: Started SparkUI at http://master.tyme-data.com:4040
--args is deprecated. Use --arg instead.
14/10/15 09:46:58 INFO RMProxy: Connecting to ResourceManager at /0.0.0.0:8032

How can I get flambo to access this correctly? Running a regular spark-sell sessions works perfectly.

Update examples

I wanted to try out flambo but it appears that some of the examples in the README need to be updated. For example,

(:require [flambo.api :as f])

gives an error since f is no longer defined in version 0.4.0.

Transducers

Would be really awesome if the project was transducer compatible.

I know that it would be some extra work, but I think it might be possible.

Any thoughts?

cache question

Hello Soren/Yieldbot/Commiters,

I have a question about how I'm defining and using an RDD that is cached, and if I'm doing it correctly.... I basically have a large data file stored in S3 and I'm defining it in a let binding as such:

(let [infile (->
(util/infile-data sc (options :infile))
(f/persist (f/STORAGE-LEVELS :memory-only)))

...... more code.....

])

then I have a doseq (that is within the let scope) where I iterate over a number of files stored in S3. Essentially I join the segment data against the data defined in the infile which has been cached... something like this:

(doseq [segment segments-to-process](->
infile
%28f/join %28util/get-segment-data sc %28segment :s3-key%29%29%29
%28do more processing%29)
)

I've cut out a lot of the code, but what I'm wondering about is upon subsequent executions of the threading macro w/n the doseq, am I using the cached version of the infile? The first iteration takes a bit to read the initial infile data from S3. Subsequent iterations it seems that Spark really flies threw what I believe is the cached version of the infile. It seems like a cached version is being used, but I don't know how to tell for certain......

.....
.....
14/10/12 22:22:54 INFO scheduler.TaskSetManager: Finished task 431.0 in stage 37.0 (TID 11530) in 1195 ms on ip-10-244-84-47.us-west-2.compute.internal (447/450)
14/10/12 22:22:54 INFO scheduler.TaskSetManager: Finished task 358.0 in stage 37.0 (TID 11524) in 1230 ms on ip-10-244-84-47.us-west-2.compute.internal (448/450)
14/10/12 22:22:55 INFO scheduler.TaskSetManager: Finished task 356.0 in stage 37.0 (TID 11522) in 1366 ms on ip-10-244-84-47.us-west-2.compute.internal (449/450)
14/10/12 22:22:55 INFO scheduler.TaskSetManager: Finished task 432.0 in stage 37.0 (TID 11531) in 1324 ms on ip-10-244-84-47.us-west-2.compute.internal (450/450)

Any guidance/pointers would be appreciated. BTW, this has been a really fun learning opportunity, nice work!

Regards,

Dano

Exception: namespace 'flambo.function' not found

Getting this error message requiring flambo.api:

clojure.lang.Compiler$CompilerException: java.lang.Exception: namespace 'flambo.function' not found

It seems to be resolved if I aot it:

:aot [flambo.function]

in the project.clj

Any thoughts?

defsparkfn does not support docstrings

I would have expected this to work:

(defsparkfn my-fn
  "Eats data and does cool stuff"
  [input]
  (...))

But I received an error that the first argument should be a vector.

How to create a comparator from api/fn macro?

I've been trying to create a compartor using f/fn macro and pass it to take-ordered to help me define a custom sorting approach.

(defn take-ordered
  "Return an array with the first n elements of `rdd`.
  (Note: this is currently not executed in parallel. Instead, the driver
  program computes all the elements)."
  [rdd cnt compare-fn]
  (.takeOrdered rdd cnt 
    (if (instance? Comparator compare-fn)
      compare-fn
      (comparator compare-fn)))) 

Here's what I have to far for take-ordered, I can pass compare to it, but I can't do something like
(f/fn [v](* -1 %28get v 1%29)) where v in a vector or two elements [1 2] which is a element of other vector like [[1 2] [3 4] [5 6]]

Can't map in saveAsTextFile

Previously this worked:

(.saveAsTextFile (f/map my-rdd pr-str) "filename")

I just upgraded to the latest and now I get the error:

WARN fn: unable to call serializable.fn$serialize@6fc0bbc6
  on org.apache.spark.api.java.JavaSparkContext@29fe4840
  in {"output-location" "/Users/joe/sc/laskuri/example/hundred/output", "tasks" #{:doi-domain-periods-count :top-domains}, "doi-period-date" #<JavaPairRDD org.apache.spark.api.java.JavaPairRDD@25a1a012>, "doi-domain-period-date" #<JavaPairRDD org.apache.spark.api.java.JavaPairRDD@3c19592c>, "input-location" "/Users/joe/sc/laskuri/example/hundred/input/hundred.txt", "parsed-lines" #<JavaRDD MapPartitionsRDD[4]
  at filter at NativeMethodAccessorImpl.java:-2>, "period" :month, "ctx" #<JavaSparkContext org.apache.spark.api.java.JavaSparkContext@29fe4840>, "redact" false, "parsed-lines-period" #<JavaRDD MapPartitionsRDD[9]
  at map at NativeMethodAccessorImpl.java:-2>}

Should I expect this to continue to work? Is there another way to save lines of EDN serialised data in HDFS?

(Thanks for all your hard work!)

Attempting to call unbound fn: #'flambo.function/pair-flat-map-function

I'm running through the tutorial.

I am receiving this exception:

 java.lang.IllegalStateException: Attempting to call unbound fn: #'flambo.function/pair-flat-map-function
clojure.lang.Compiler$CompilerException: java.lang.IllegalStateException: Attempting to call unbound fn: #'flambo.function/pair-flat-map-function, compiling:(form-init4518676517918793357.clj:2:30)

when executing the flat-map-to-pair step

user=> (def doc-term-seq (-> doc-data
                             (f/flat-map-to-pair (ft/key-val-fn gen-docid-term-tuples))
                             f/cache))

Please help ASAP. I'm stuck :(

Dataframes lacks CSV Reader

I've got a very large, compressed, csv file that I'd like to read into a data frame for manipulation before feeding into MLlib. Databrix has a Spark CSV reader here: https://github.com/databricks/spark-csv

Anyone want to see this added to sql.clj? Any better suggestions? Any objections to using the databrix library in flambo? Happy to add this and give sql.clj a through thrashing before submitting a PR.

MLlib API

Flambo lacks an API for MLlib.

I recall bringing this up before (I think) and was told that the API exists, but was not released. I would rather avoid reinventing the wheel, so if it does exist would very much appreciate its addition to Flambo.

Parameter order in flambo functions

Hey Soren,

I was wondering what is the reason for having the rdd/collection as the first parameter for many falmbo functions? It is different as compared to typical clojure functions operating on collections - compare map vs f/map. This sometimes causes problems when using the functions with threading macros like this.

(-> (f/parallelize sc [1 2 34 5])
    (f/map (f/fn [x] {:key x}))
    f/collect
    (map :key))

Is there something behind this decision I missed?

Regards. Great job on Flambo.

Tutorial

Hey Soren

Not really a bug but I am having issues making a simple app running on the cluster would you mind doing a short write up how to run it locally?

I am specifically struggling with submitting code to the spark cluster. I am using this command

./spark-submit --class flambo-example.core  /Users/tomassvarovsky/sandbox/flambo_example/target/flambo_example-0.1.0-SNAPSHOT-standalone.jar 

But it is failing with

Exception in thread "main" java.lang.ClassNotFoundException: flambo-example.core

I have the -main method in ns flambo-example.core and the :gen-class directive. Probably missing something simple.

Upgrade to spark 1.2.0

I tried to upgrade to spark 1.2.0 release but somehow checksum fails for 1.2.0 deps. Ideally this may be a lein issue or my local envrionents fault. Anyways flambo's spark deps needs to be changes for upgrade.

install no clojars

Hi,

Sorry for the stupid question but I just have some Clojure experience at REPL. Ubuntu Clojars is broken, so I'm trying to run flambo with Leiningen without success,

after adding: in dependences
[yieldbot/flambo "0.2.0"]
I got:
No :main namespace specified in project.clj.
I added to the project.clj
:main tfidf
the example path and:
Exception in thread "main" java.lang.ClassNotFoundException: tfidf
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at cloj
I have :Leiningen 1.7.1 on Java 1.7.0_25 Java HotSpot(TM) 64-Bit Server VM

Thank you for your time
German

Cannot cast flambo.function.Function2 to scala.Function2 (Streaming)

I am trying to do some stream processing using Flambo. My code:

(ns spark-logstreaming.core
    (:require [flambo.conf :as conf])
    (:require [flambo.api :as f])
    (:require [flambo.streaming :as f-streaming])
    (:gen-class))


(def c (-> (conf/spark-conf)
    (conf/master "local")
    (conf/app-name "app_name")))

(def streaming-ctx (f-streaming/streaming-context c 5000))

(.set (.-hadoopConfiguration (.-sparkContext streaming-ctx)) "fs.s3n.awsAccessKeyId" "access_key")
(.set (.-hadoopConfiguration (.-sparkContext streaming-ctx)) "fs.s3n.awsSecretAccessKey" "secret_access_key")

(let [log-file-stream (.fileStream streaming-ctx "s3n://my-bucket-containing-logs")]
    (fs/reduce-by-window (fs/map log-file-stream (f/fn [s] (count s))) (f/fn [x y] (+ x y)) 30000 30000))

This gives me the error:

Cannot cast flambo.function.Function2 to scala.Function2

The weird thing is that this works with a regular JavaSparkContext (flambo.api/spark-context).

I dug into the flambo.function and flambo.streaming code and understand that Flambo is somehow converting flambo.api/fn types to Scala's Function2 types. But something else goes wrong, I just don't understand what that is.

If you can provide your thoughts on what could be some possible causes of this problem, I would be happy to try and fix it. Right now, I'm stumped, having spent most of the weekend and today trying to understand. Thanks.

ClassNotFoundException: flambo.function.Function

HW10964:spark-tutorial snunez$ lein deps
(Retrieving yieldbot/flambo/0.6.0-SNAPSHOT/flambo-0.6.0-20150408.182739-1.pom from clojars)
(Retrieving yieldbot/flambo/0.6.0-SNAPSHOT/flambo-0.6.0-20150408.182739-1.jar from clojars)

HW10964:spark-tutorial snunez$ lein repl
nREPL server started on port 59510 on host 127.0.0.1 - nrepl://127.0.0.1:59510
REPL-y 0.3.1
Clojure 1.6.0
Docs: (doc function-name-here)
(find-doc "part-of-name-here")
Source: (source function-name-here)
Javadoc: (javadoc java-object-or-class-here)
Exit: Control+D or (exit) or (quit)
Results: Stored in vars *1, *2, *3, an exception in *e

user=> (require '[flambo.api :as f])

CompilerException java.lang.ClassNotFoundException: flambo.function.Function, compiling:(flambo/function.clj:58:1)

How to destructure dataframe/rdd?

Hey,

I have a bit more complex data structure which I'm loading from parquet:

(def data (sql/parquet-file c (into-array ["/some-path/**"])))
(.printSchema data)
root
 |-- id: string (nullable = true)
 |-- modelVersion: string (nullable = true)
 |-- name: string (nullable = true)
...

is it possible somehow to destruct it?
something like:

(def result (f/map data (f/fn [[ id _ name ]] (str id name))))

but I'm getting:

java.lang.UnsupportedOperationException: nth not supported on this type: GenericRowWithSchema

Is there any way to implement nth for GenericRowWithSchema?

f/map doesn't work on a sql DataFrame

I ran the following in the repl (more or less... I removed the JDBC url and table name).

(require '[flambo.conf :as conf])
(require '[flambo.api :as f])
(require '[flambo.sql :as fsql])
(def c (-> (conf/spark-conf) (conf/master "local[*]") (conf/app-name "cljtest")))
(def sc (f/spark-context c))
(def sqlctx (fsql/sql-context sc))

(def jdbc-df (.load (.options (.format (.read sqlctx) "jdbc") {"url" "jdbc url here" "dbtable" "my.table"})))

Running the following on that jdbc-df produces an error:

=> (f/map jdbc-df fsql/row->vec)

IllegalArgumentException No matching method found: map for class org.apache.spark.sql.DataFrame  clojure.lang.Reflector.invokeMatchingMethod (Reflector.java:53)

Am I doing something wrong or is this broken?

reading avro files?

Do you have any examples on how one might go about reading avro files in flambo? In the spark-shell, I can do something like:

import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable

val records = sc.newAPIHadoopFile("file:///tmp/Segment_2552.avro",classOf[AvroKeyInputFormat[GenericRecord]], classOf[AvroKey[GenericRecord]], classOf[org.apache.hadoop.io.NullWritable]).map(x => x._1.datum)

records.foreach(x => println(x)

Any general guidance on where to start would be great!

WARN fn: caught exception: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException

I have a simple Flambo app that I'm working with that takes a text file of JSON objects/string (one per line) and does some simple snake casing of the keys, and then saves these back out as a Parquet file. When I add an anonymous fn (via (f/map (f/fn [x] ....) I get a Kyro warning....although the program runs, and is appearing to output the correct results, each time I add another anonymous fn to the threading operation, I get the same warning. So a warning for each fn I add....

See:
https://www.refheap.com/3bf5a8f0cca762ca06010a78f

Is there something I should be doing differently and why am I getting these warnings? If I remove all the (f/map ....) I get no warnings. I feel like I'm doing something knuckle headed here....

Issues with Spark (1.0.2, 1.1.0) with Flambo 0.3.3 & 0.4-SNAPSHOT?

Hi,

I am working on putting together an example project for a new project I am launching (core.nlp)), however I am running into some weird errors when using anything but Spark 1.0.1 with Flambo 0.3.2.

Feel free to checkout the sample project here: https://github.com/arnaudsj/core-nlp-flambo-101

When using 1.0.1 with 0.3.2 I get as expected:

$ lein repl
nREPL server started on port 61708 on host 127.0.0.1 - nrepl://127.0.0.1:61708
REPL-y 0.3.2, nREPL 0.2.3
Clojure 1.6.0
Java HotSpot(TM) 64-Bit Server VM 1.8.0-b132
    Docs: (doc function-name-here)
          (find-doc "part-of-name-here")
  Source: (source function-name-here)
 Javadoc: (javadoc java-object-or-class-here)
    Exit: Control+D or (exit) or (quit)
 Results: Stored in vars *1, *2, *3, an exception in *e

core-nlp-flambo-101.core=> (hello-world)
Hello, world!nil

However, if I use 0.3.3 with 1.0.2, or 0.4.0-SNAPSHOT with 1.1.0 I get the following:

#<CompilerException java.lang.ClassNotFoundException: flambo.function.Function, compiling:(flambo/function.clj:56:1)>
nREPL server started on port 61721 on host 127.0.0.1 - nrepl://127.0.0.1:61721
REPL-y 0.3.2, nREPL 0.2.3
Clojure 1.6.0
Java HotSpot(TM) 64-Bit Server VM 1.8.0-b132
    Docs: (doc function-name-here)
          (find-doc "part-of-name-here")
  Source: (source function-name-here)
 Javadoc: (javadoc java-object-or-class-here)
    Exit: Control+D or (exit) or (quit)
 Results: Stored in vars *1, *2, *3, an exception in *e

core-nlp-flambo-101.core=> (hello-world)

CompilerException java.lang.RuntimeException: Unable to resolve symbol: hello-world in this context, compiling:(/private/var/folders/qv/sy476py136z_j0vz824d9bgw0000gn/T/form-init7196712776664162946.clj:1:1)

Any ideas?

0.6.0 not available yet on clojars

README indicates that version "0.6.0" is available on clojars. However, clojars only has "0.6.0-SNAPSHOT" and "0.5.0". Please push "0.6.0" to clojars or update documentation with available 0.6.0 version

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.