Giter Site home page Giter Site logo

sotera / distributed-graph-analytics Goto Github PK

View Code? Open in Web Editor NEW
174.0 31.0 80.0 3.66 MB

Distributed Graph Analytics (DGA) is a compendium of graph analytics written for Bulk-Synchronous-Parallel (BSP) processing frameworks such as Giraph and GraphX. The analytics included are High Betweenness Set Extraction, Weakly Connected Components, Page Rank, Leaf Compression, and Louvain Modularity.

License: Apache License 2.0

Java 66.75% Shell 0.59% Python 0.13% Scala 32.53%

distributed-graph-analytics's Introduction

distributed-graph-analytics

Distributed Graph Analytics (DGA) is a compendium of graph analytics written for Bulk-Synchronous-Parallel (BSP) processing frameworks such as Giraph and GraphX.

Currently, DGA supports the following analytics:

Giraph
  • Weakly Connected Components
  • Leaf Compression
  • Page Rank
  • High Betweenness Set Extraction
  • Louvain
GraphX
  • Louvain Modularity (initial stage)
  • Weakly Connected Components
  • High Betweenness Set Extraction
  • Leaf Compression
  • Page Rank
  • Neighboring Communities
dga-giraph

dga-giraph is the project that contains our Giraph implementation of DGA. For more information, go here: dga-giraph README.md

documentation

http://sotera.github.io/distributed-graph-analytics

Steps to run Louvain GraphX

Download The CentOS VM

https://github.com/Sotera/seam-team-6-vm

Required Scala Version

Scala version: 2.11.1 Scala installation location: /opt/scala

wget http://downloads.typesafe.com/scala/2.11.1/scala-2.11.1.tgz tar xvf scala-2.11.1.tgz sudo mv scala-2.11.1 /opt/scala

Required Spark Core and GraphX

Spark Core and GraphX version: 1.3.0 Spark installation location: /opt/spark

Start Spark with one master and four worker instances

cd /usr/lib/spark/sbin ./start-master.sh echo "export SPARK_WORKER_INSTANCES=4" >> spark-env.sh ./start-slaves.sh

Spark Web UI: Browse to the master web UI to make sure

the master and all the workers are started correctly

http://localhost:8080/

Copy the example data to HDFS:

wget http://sotera.github.io/distributed-graph-analytics/data/example.csv hdfs dfs -put example.csv hdfs://localhost:8020/tmp/dga/louvain/input/

Clone the repository and build the code

cd /vagrant git clone https://github.com/Sotera/distributed-graph-analytics cd distributed-graph-analytics gradle clean dist cd /vagrant/distributed-graph-analytics/dga-graphx/build/dist ./run.sh

You can find the output in:

hdfs://localhost:8020/tmp/dga/louvain/output

distributed-graph-analytics's People

Contributors

dgilliland avatar scotthaleen avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

distributed-graph-analytics's Issues

The communitySigmaTot of the communityRDD is initialized by zero in the second iteration.

Thanks for making your implementation public.
I compared results of the louvain method of the giraph version with that of the graphx version by using a graph as follows.
https://gist.github.com/ken57/6a090706a17735e8a333b931af451a49

A final q value of a result of giraph version was 0.429, and that of graphx version was 0.402.
Although I tried parameter tuning, a final q value of graphx version was still lower than that of giraph version.

I checked values of communityRDD in the second iteration by adding debug logs as follows.

println("totalEdgeWeight: " + totalGraphWeight.value)

// gather community information from each vertex's local neighborhood
// var communityRDD = louvainGraph.mapReduceTriplets(sendCommunityData, mergeCommunityMessages)
communityRDD.collect().foreach(println) // add to debug communityRDD

var activeMessages = communityRDD.count()

https://github.com/Sotera/distributed-graph-analytics/blob/master/dga-graphx/src/main/scala/com/soteradefense/dga/graphx/louvain/LouvainCore.scala#L852

I obtained debug logs as follows, and It is found that the communitySigmaTot of the communityRDD is initialized by zero in the second iteration.

(4,Map((2,0) -> 9, (23,0) -> 4, (25,0) -> 2))
(25,Map((27,0) -> 1, (2,0) -> 1, (23,0) -> 6, (4,0) -> 2))
(27,Map((23,0) -> 3, (25,0) -> 1))
(23,Map((27,0) -> 3, (2,0) -> 3, (25,0) -> 6, (4,0) -> 4))
(7,Map((2,0) -> 2, (5,0) -> 2))
(5,Map((7,0) -> 2, (2,0) -> 2))
(2,Map((5,0) -> 2, (4,0) -> 9, (25,0) -> 1, (23,0) -> 3, (7,0) -> 2))

So, I added partitionBy(PartitionStrategy.EdgePartition2D).groupEdges(_ + _) to the compressGraph as follows.

    val louvainGraph = compressedGraph.outerJoinVertices(nodeWeights)((vid, data, weightOption) => {
      val weight = weightOption.getOrElse(0L)
      data.communitySigmaTot = weight + data.internalWeight
      data.nodeWeight = weight
      data
    }).partitionBy(PartitionStrategy.EdgePartition2D).groupEdges(_ + _).cache()

https://github.com/Sotera/distributed-graph-analytics/blob/master/dga-graphx/src/main/scala/com/soteradefense/dga/graphx/louvain/LouvainCore.scala#L334

The communityRDD is initialized by correct values and I obtained final q value 0.417 by giraphx version.
I think that modifications as above are necessary.

I have two more things to contact you.
This is a minute thing. Very large logs are outputted by println as follows. I would be grateful if you delete it.
https://github.com/Sotera/distributed-graph-analytics/blob/master/dga-graphx/src/main/scala/com/soteradefense/dga/graphx/louvain/LouvainCore.scala#L102

I tried the PartitionStrategy.CanonicalRandomVertexCut instead of the PartitionStrategy.EdgePartition2D with a large graph (approximately 100,000,000 edges) and I obtained a result by approximately 10 times faster.
It is found that the optimal PartitionStrategy is dependent on an inputted graph structure.
I would be grateful if it be selectable.

thanks.

louvain invalid input exception

I've installed hadoop 2.5.2, giraph 1.1.0, and sotera dga locally on my machine without cloudera. I've successfully run the hbse example. I have been unable to get the louvain example to complete successfully. It seems that it completes the calculations that go into $OUTPUTDIR/giraph_0, but then throws an invalid input exception when it cannot find $OUTPUTDIR/giraph_1:

Processing /home/justinmaojones/bigdataproject/louvain_example/output/giraph_0 and /home/justinmaojones/bigdataproject/louvain_example/output/giraph_1
15/04/18 17:29:34 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
15/04/18 17:29:34 INFO mapreduce.JobSubmitter: Cleaning up the staging area file:/tmp/hadoop-justinmaojones/mapred/staging/nobody717318776/.staging/job_local717318776_0003
org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: file:/home/justinmaojones/bigdataproject/louvain_example/output/giraph_1

To execute the louvain job I use the following command:

 ./bin/dga-yarn-giraph louvain /tmp/dga/louvain/input/ ~/bigdataproject/louvain_example/output/ -ca io.edge.reverse.duplicator=true -ca giraph.SplitMasterWorker=false

I can't seem to figure out why this isn't working. Is this a bug or an installation/configuration issue?

re-computation of totalGraphWeight LouvainCore.louvain

In LouvainCore.louvain, we compute totalGraphWeight at very first step which is nothing but 2 * sum-of-all-edges-weights. Why do we need to compute this again and again whenever we call LouvainCore.louvain from outside do-while loop. I think this graph weight is fixed because we never delete any edges nor add any edge, we just compress node (means we convert edges into self edge) which should not change totalGraphWeight?
Please let me know, if I am thinking in wrong direction.

HBSE Overflow Error

There is overflow when using longs, so they need to be converted to BigIntegers.

Expand edge list format for DGA's graph visualization package

Currently the edge list we provide to the dga visualization package requires the format:

type_1:source_id,type_2:destination_id,weight,...\n

Enhance this format to accommodate:
An edge list without types (if type_1 and type_2 exist, use them, otherwise, default to some type)
A configurable field delimiter (other than the default of a comma)

Perhaps we can make the default type also configurable as well. We can add a properties or configuration file into the mix - if it exists, we'll use whatever configuration values we have specified in it for the delimiter and/or the default type. If no default type exists and no type exists in the field for source or edge id, we'll default to Node as the type.

Why download scala-compiler?

I already have scala:

$ scalac -version
Scala compiler version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

Why are you downloading it anew?

dga-graphx$ gradle check dist
...
:dga-graphx:compileScala
Download https://repo1.maven.org/maven2/org/scala-lang/scala-compiler/2.10.4/scala-compiler-2.10.4.pom
Download https://repo1.maven.org/maven2/org/scala-lang/scala-reflect/2.10.4/scala-reflect-2.10.4.pom
Download https://repo1.maven.org/maven2/org/scala-lang/scala-compiler/2.10.4/scala-compiler-2.10.4.jar
Download https://repo1.maven.org/maven2/org/scala-lang/scala-reflect/2.10.4/scala-reflect-2.10.4.jar
...
BUILD SUCCESSFUL

Expand Unit Tests

Our unit tests now are somewhat okay, but we should spend a little more time on them.

Passing Application.conf doesn't work.

When using spark-submit to pass an analytic to run with yarn, the application.conf in the conf/ directory is not the one being used. For now, the application.conf is being bundled with the jar, so if you make any changes to it, you have to rebuild the jars. There doesn't seem to be a way to pass this in with spark-submit.

Write a runner application front end (for each platform) that will call all of the analytics DGA supports

DGA will support 5 analytics on two platforms; we need a single point of entry for all analytics to be run. I'm fine with us having a single point of entry for each platform (e.g. the GraphX application that runs our jobs will be different than the Giraph application).

Each analytic should have a stub GiraphConfiguration (and whatever GraphX might need) that is prepopulated with sensible defaults, but allow them to be overwritten via the command line if necessary (excepting the InputFormats and OutputFormats).

We also have to come up with a completely different way to chain Louvain's jobs together; prior to DGA we got by with a python script, but we want this application to create the chained jobs and determine when we've run through to completion.

Investigate Integration Tests for DGA-Giraph

Our unit tests (at least the computations) have a sort of half integration test sort of approach, but we should have our entire flow put through, including our input formats, output formats, and our computation and mastercompute steps.

Couldn't build giraph

I'm using Oracle JDK 1.7 on Ubuntu 12.04 LTS.
I can't build Giraph on my machine. Although I could build and run graphx community detection.

This is the error message I'm getting:

:dga-giraph:compileJava

FAILURE: Build failed with an exception.

BUILD FAILED

Unable to load realm info from SCDynamicStore

distributed-graph-analytics$ gradle check dist
...
:dga-giraph:test
2015-03-31 13:19:20.694 java[61248:378342] Unable to load realm info from SCDynamicStore
:dga-giraph:test FAILED
...
BUILD FAILED
dga-graphx$ gradle check dist
...
2015-03-31 13:21:48.448 java[61380:379398] Unable to load realm info from SCDynamicStore
...
BUILD SUCCESSFUL

graphX runner does not make proper use of spark-submit

The correct way to submit spark applications to a cluster is using spark submit to set the cluster configuration options, jars, files,etc. However the DGA runner attempts to set jars and other infomation itself based on the command line options, this can cause conflicts and result in odd errors (like class not found errors when jars aren't distributed to workers).

Anytime a spark program is launched spark submit options should be used when available and not overwritten in the code.

Keep getting java heap error

My current Spark is a standalone version based on cdh4 (http://www.apache.org/dyn/closer.cgi/spark/spark-1.2.0/spark-1.2.0-bin-cdh4.tgz)

For the moment however I haven't been able to successfully run it as I keep getting a java heap error. The input file itself is only 136kb, so I don't think memory is the case.

Below is an the command that I ran and the error message that I get.

[hafidz@localhost dga]$ /opt/dga/dga-mr1-graphx pr -i sna_exp_comma.csv -o pr_sna.txt -s /home/hafidz/Playground/spark-1.2.0-bin-cdh4 -n testPageRank -m spark://localhost.localdomain:7077 --S spark.executor.memory=1g --ca parallelism=10 --S spark.worker.timeout=400 --S spark.cores.max=2 Analytic: pr SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/hafidz/Playground/spark-1.2.0-bin-cdh4/lib/spark-examples-1.2.0-hadoop2.0.0-mr1-cdh4.2.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/hafidz/Playground/spark-1.2.0-bin-cdh4/lib/spark-assembly-1.2.0-hadoop2.0.0-mr1-cdh4.2.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] [Stage 0:> (0 + 2) / 11][ERROR] sparkDriver-akka.actor.default-dispatcher-2 03:50:16 Lost executor 0 on 192.168.126.129: remote Akka client disassociated [ERROR] sparkDriver-akka.actor.default-dispatcher-2 03:50:16 Asked to remove non-existent executor 0 [ERROR] sparkDriver-akka.actor.default-dispatcher-3 03:50:16 Asked to remove non-existent executor 0 [Stage 0:> (0 + 2) / 11][ERROR] sparkDriver-akka.actor.default-dispatcher-5 03:50:21 Lost executor 1 on 192.168.126.129: remote Akka client disassociated [ERROR] sparkDriver-akka.actor.default-dispatcher-5 03:50:21 Asked to remove non-existent executor 1 [ERROR] sparkDriver-akka.actor.default-dispatcher-2 03:50:21 Asked to remove non-existent executor 1 [Stage 0:> (0 + 2) / 11][ERROR] sparkDriver-akka.actor.default-dispatcher-2 03:50:25 Lost executor 2 on 192.168.126.129: remote Akka client disassociated [ERROR] sparkDriver-akka.actor.default-dispatcher-2 03:50:25 Asked to remove non-existent executor 2 [ERROR] sparkDriver-akka.actor.default-dispatcher-16 03:50:25 Asked to remove non-existent executor 2 [Stage 0:> (0 + 2) / 11][ERROR] task-result-getter-3 03:50:28 Task 0 in stage 0.0 failed 4 times; aborting job Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 10, 192.168.126.129): java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183) at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2244) at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280) at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75) at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1000) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:138) at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:214) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:210) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:99) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Time in seconds: 26 [hafidz@localhost dga]$

dga-graphx LouvainCore.louvain do-while loop

Thanks a lot for making your implementation public. I am able to understand this do-while in bits and pieces but not fully. I am not able to understand when this loop breaks
{
(stop <= progressCounter && (even || (updated > 0 && count < maxIter)))
}
This loop should break only when even=false (means on odd cycles). Would you be kind to explain this do-while loop in detail?

Also you compress this graph to a new graph only when numberOfPasses (=2*count) > 2, why?
Thanks in advance.

Informational Site

We need an information site put together for DGA to include the following information;

What is it?
Why would someone want it?
How would someone get and install it?
How would someone test that it's working right with sample data?
How would someone use it with their own data?

louvain modularity in Graphx seems to have a problem

When compute deltaQ in q method , deltaQ = k_i_in - (k_i * sigma_tot / M) seems not right. Here M is degree of Graph(2* edge, here may be total edge weight*2). According the paper,we can use relative value of Q to choose neighbor ( that is k_i_in - sigma_tot*k_i/m ). so here M should be replaced by M/2(just m in paper) . is right or am I wrong?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.