Giter Site home page Giter Site logo

sparkinternals's Introduction

Spark Internals

Spark Version: 1.0.2 Doc Version: 1.0.2.0

Authors

Weibo/Twitter ID Name Contributions
@JerryLead Lijie Xu Author of the original Chinese version, and English version update
@juhanlol Han JU English version and update (Chapter 0, 1, 3, 4, and 7)
@invkrh Hao Ren English version and update (Chapter 2, 5, and 6)
@AorJoa Bhuridech Sudsee Thai version

Introduction

This series discuss the design and implementation of Apache Spark, with focuses on its design principles, execution mechanisms, system architecture and performance optimization. In addition, there's some comparisons with Hadoop MapReduce in terms of design and implementation. I'm reluctant to call this document a "code walkthrough", because the goal is not to analyze each piece of code in the project, but to understand the whole system in a systematic way (through analyzing the execution procedure of a Spark job, from its creation to completion).

There're many ways to discuss a computer system. Here, We've chosen a problem-driven approach. Firstly one concrete problem is introduced, then it gets analyzed step by step. We'll start from a typical Spark example job and then discuss all the related important system modules. I believe that this approach is better than diving into each module right from the beginning.

The target audiences of this series are geeks who want to have a deeper understanding of Apache Spark as well as other distributed computing frameworks.

I'll try my best to keep this documentation up to date with Spark since it's a fast evolving project with an active community. The documentation's main version is in sync with Spark's version. The additional number at the end represents the documentation's update version.

For more academic oriented discussion, please check out Matei's PHD thesis and other related papers. You can also have a look at my blog (in Chinese) blog.

I haven't been writing such complete documentation for a while. Last time it was about three years ago when I was studying Andrew Ng's ML course. I was really motivated at that time! This time I've spent 20+ days on this document, from the summer break till now (August 2014). Most of the time is spent on debugging, drawing diagrams and thinking how to put my ideas in the right way. I hope you find this series helpful.

Contents

We start from the creation of a Spark job, and then discuss its execution. Finally, we dive into some related system modules and features.

  1. Overview Overview of Apache Spark
  2. Job logical plan Logical plan of a job (data dependency graph)
  3. Job physical plan Physical plan
  4. Shuffle details Shuffle process
  5. Architecture Coordination of system modules in job execution
  6. Cache and Checkpoint Cache and Checkpoint
  7. Broadcast Broadcast feature
  8. Job Scheduling TODO
  9. Fault-tolerance TODO

Other languages

Chinese Version is at markdown/. Thai Version is at markdown/thai

How to read this document

The documentation is written in markdown. The pdf version is also available here.

If you're under Mac OS X, I recommand MacDown with a github theme for reading.

Gitbook (Chinese version)

Thanks @Yourtion for creating the gitbook version.

Online reading http://spark-internals.books.yourtion.com/

Downloads

Examples

I've created some examples to debug the system during the writing, they are avaible under SparkLearning/src/internals.

Book version (NEW)

We have written a book named "The design principles and implementation of Apache Spark", which talks about the system problems, design principles, and implementation strategies of Apache Spark, and also details the shuffle, fault-tolerant, and memory management mechanisms. Currently, it is written in Chinese.

Book link: https://item.jd.com/12924768.html

Book cover: book cover

Book preface: https://github.com/JerryLead/ApacheSparkBook/blob/master/Preface.pdf

Acknowledgement

I appreciate the help from the following in providing solutions and ideas for some detailed issues:

  • @Andrew-Xia Participated in the discussion of BlockManager's implemetation's impact on broadcast(rdd).

  • @CrazyJVM Participated in the discussion of BlockManager's implementation.

  • @王联辉 Participated in the discussion of BlockManager's implementation.

Thanks to the following for complementing the document:

Weibo ID Chapter Content Revision status
@OopsOutOfMemory Overview Relation between workers and executors and Summary on Spark Executor Driver's Resouce Management (in Chinese) There's not yet a conclusion on this subject since its implementation is still changing, a link to the blog is added

Thanks to the following for finding errors:

Weibo Id Chapter Error/Issue Revision status
@Joshuawangzj Overview When multiple applications are running, multiple Backend process will be created Corrected, but need to be confirmed. No idea on how to control the number of Backend processes
@_cs_cm Overview Latest groupByKey() has removed the mapValues() operation, there's no MapValuesRDD generated Fixed groupByKey() related diagrams and text
@染染生起 JobLogicalPlan N:N relation in FullDepedency N:N is a NarrowDependency Modified the description of NarrowDependency into 3 different cases with detaild explaination, clearer than the 2 cases explaination before
@zzl0 Fisrt four chapters Lots of typos,such as "groupByKey has generated the 3 following RDDs",should be 2. Check pull request All fixed
@左手牵右手TEL Cache and Broadcast chapter Lots of typos All fixed
@cloud-fan JobLogicalPlan Some arrows in the Cogroup() diagram should be colored red All fixed
@CrazyJvm Shuffle details Starting from Spark 1.1, the default value for spark.shuffle.file.buffer.kb is 32k, not 100k All fixed

Special thanks to @明风Andy for his great support.

Special thanks to the rockers (including researchers, developers and users) who participate in the design, implementation and discussion of big data systems.

sparkinternals's People

Contributors

aorjoa avatar cyfdecyf avatar darkjh avatar ihainan avatar imx7 avatar invkrh avatar jerrylead avatar jmuth avatar johnbradley avatar keypointt avatar krishnakalyan3 avatar lambdai avatar lukasnalezenec avatar min-chen avatar morganzhang100 avatar singhpk234 avatar weiygo avatar wllmtrng avatar wongxingjun avatar wypb avatar yourtion avatar zanmato1984 avatar zhaoyao avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

sparkinternals's Issues

Why the definition of dependencies is different from RDD paper?

From the paper Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing

  • narrow dependencies, where each partition of the parent RDD is used by at most one partition of the child RDD
  • wide dependencies, where multiple child partitions may depend on it

However, The definition of dependencies from the chapter JobLogicalPlan is different :

  • NarrowDependency, Each partition of the child RDD fully depends on a small number of partitions of its parent RDD. Fully depends (i.e., FullDependency) means that a child partition depends the entire parent partition.

  • ShuffleDependency, Multiple child partitions partially depends on a parent partition. Partially depends (i.e., PartialDependency) means that each child partition depends a part of the parent partition.

This makes me really confused. Are ShuffleDependency and wide dependency the same thing?

The quantitative relationship between workers and executors

@OopsOutOfMemory 指出关于OverView中如何配置多个Backend进程的问题:在Worker Actor中,每次LaunchExecutor会创建一个Backend进程,它们是1对1的关系。也就是说集群里启动多少Executor实例就有多少Backend进程。

Backend个数发生变化情况:1、启动一个新的Application(每个APP都会 LaunchExecutor,此时会生成此进程)2、还可以通过设置SPARK_WORKER_INSTANCES参数来增加 Backend个数。图可以依此稍做改动。

设置多个Worker只是增加了集群资源,可申请更多的executor起来计算,这样就可以增加CoarseGrainedExecutorBackend。不过这样做似乎没有什么意义。只是想表达“Executor和CoarseGrainedExecutorBackend”是一对一启动的,增加Executor的个数,就是增加CoarseGrainedExecutorBack.

如果想调大1个Worker里的CoarseGrainedExecutorBackend的个数,就要通过参数设置,调节Worker所生成的executor的个数,一般受cpu核数影响。

这里有一个算法,去根据当前ALIVE的Worker还剩余的core,并且当前Worker没有此APP的executor,并且。。。此处省去若干字,具体还是去Master.scala里的schedule方法看一下吧,这个评论字数有限制,没法描述清楚。

刚才说的增加Worker的个数,其实就是增加可用cpu个数,让executor可以产生的更多,被分配到更多的Worker上。如果是yarn模式,可以去研究一下spark-submit 里指定的--num-executors这个参数。。

关于partitioner的疑问

我在 Learning Spark 中看到有一段话:

Finally, for binary operations, which partitioner is set on the output depends on the parent RDDs’ partitioners. By default, it is a hash partitioner, with the number of partitions set to the level of parallelism of the operation. However, if one of the parents has a partitioner set, it will be that partitioner; and if both parents have a partitioner set, it will be the partitioner of the first parent.

子RDD的partitioner应该由父RDD的partitioner决定。但在 SparkInternals 的第二章,父子RDD的partitioner都不相同,这是怎么回事?如果两个父RDD的其中一个是hash-partitioner,那么子RDD不应该也是hash-partitioner吗?

关于第二章JobLogicalPlan的一点小意见

“groupByKey() 没有在 map 端进行 combine,因为 map 端 combine 只会省掉 partition 里面重复 key 占用的空间,当重复 key 特别多时,可以考虑开启 combine。”这一句
根据我从stackoverflow上查到的 “groupByKey doesn't know whether map side aggregations would save anything and how to do them as it has no information on the aggregation you are planning to do (if any). reduceByKey or combineByKey are provided with an aggregate function which means spark can actually do this optimization.”,groupbykey这么设计是因为它不能自定义聚合函数,spark不知道groupbykey之后用户要干什么,所以不combine,感觉这个“只能节约空间”和没有combine没有直接关联

关于cache()的疑问

概览里的example,第五步,调用cache已经将数据缓存了,为何第六步,调用count还是为了缓存呢?

关于第三章的有些疑问

你好,在第三章中第三页的图中有一幅插图我不太理解。
qq 20150722212641
图中stage1和stage2的任务是并发执行的吗?之前参考了http://dongxicheng.org/framework-on-yarn/apache-spark-multi-threads-model/ 这篇博客,得出多作业在spark中是可以并发执行的,然而图中stage1与stage2似乎也是并发关系,此外stage2中的两个lineage也是并发执行的吗?如果是,是不是表示在一个stage中的不同task也可以并发的执行?
此外,如果多作业在spark中能够并发执行,那么前后的数据依赖关系怎么处理?(举个例子,现在的资源够图中3个stage同时开始运算,然而stage0的计算需要stage1和stage2的计算结果,那么stage0是否是先判断自己做计算所需的rdd不足,然后再等待stage1和stage2完成任务?而且stage0是怎么被激发的?是在stage1结束之后被激发的吗?)
初学spark,很多不理解,希望能解惑,非常感谢

字体问题

pdf版本的实际显示效果为黑体于楷体混合,影响观感。

2015-08-04-145043_984x337_scrot

是否可以更改下字体设置?

Shuffle details的一点建议

Hi,
最近发现您写的sparkinternals的技术笔记,读完之后觉得非常好,收益匪浅。
但是觉得稍有不足之处就是shuffle details这一章节只介绍了hash based shuffle,但是其实从1.2开始,sort based shuffle已经成为了spark的默认选择,从1.5开始又引入了tungsten-sort。所以个人觉得更新这部分内容可能会更好?

祝好!

Narrow dependencies-第二章第二节图FullDependency: N : N

Narrow dependencies: each partition of the parent RDD is used by at most one partition of the child RDD
第二章第二节中 FullDependency: N : N 那张图, 父RDD中的一个分区被子RDD的两个分区依赖, 不能被称为Narrow Dependency吧, 为啥说FullDenpency是NarrowDepency呢?

请教个关于spark具体应用设计问题

您好:
看了您写的文章非常好,首先表示感谢,在工作中有个设计问题想请教下。情景是这样的,我们需要从HDFS读文件,然后再和HBASE表中读到的数据进行匹配,最终结果输出到HDFS上。
目前设计思路是从HDFS读文件变成RDD形式、然后根据HDFS文件名,文件名中含有需要在HBASE表中数据的返回,我们去查找到HBASE数据(表数据不是很大,可能也就1G左右),广播出去,然后在对从HDFS读的RDD进行mapParttion操作,将匹配到的保存的HDFS中去,然后用总的RDD-匹配的RDD,得到未匹配的RDD也同样保存到HDFS中。。结果发现速度执行的比较慢,3.2G的HDFS文件在12个节点,每个节点8G运行内存的条件下耗时2.5分钟左右,发现任务的反序列化时间非常长,而且发现封装的函数越多耗时越长,请问这种有没有好的思路那。 烦请百忙之中帮忙看看,万分感谢!

关于第三章第二幅图的理解

这里划分task的时候我的理解是从最后的task往前倒退,如果依赖什么就计算什么
那么以FlatMappedValuesRDD的第一个Partition为例,回退到左上角的时候ShuffleRDD的第二个和第三个Partition不应该被计算吧。这两个Partition的线不应该是粗线吧。

sparktask

Spark 2.0 Content

Thanks so much for your great effort, highly appreciated !

I guess the latest content is on Spark 1.0, any plans for Spark 2.X ?

第二章 JobLogicaPlan.pdf 存在笔误

在第一小节中(1. 如何产生RDD,应该产生哪些RDD) 第二段 应该是:
负责接收来自上一个 RDD 或者数据源的 input records,perform transformation() 的计算逻辑,然后输出 records。

一点建议

您好,我看完了您的这个 project,受益匪浅,感谢!有一点建议希望其能对继续改善这个 project 有帮助。

如您能在每个 chapter 的最后,列出相关的源代码文件的名称或链接,或是简单讲讲要看那些个源代码文件,那对一些想继续深入了解 spark 的人一定很有帮助。

还想请问一下,对于 checkpoint 这块,如果我想深入了解其实现机制,需要看哪些源代码文件?

初级问题,就是文章里是有很多RDD的依赖关系图,但是我找不到这些plan怎么在worker执行的相关代码?多谢!

比如这个testcase

test("sortByKey") {

val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)), 2)

assert(pairs.sortByKey().collect() === Array((0, 0), (1, 0), (2, 0), (3, 0)))

}

collect()之前都是一个一个new新的RDD,好像没有实际计算,之后runJob里面又debug不到,

全部的归并结果的代码应该在RDD.scala的Array.concat(results: _*)

def collect(): Array[T] = withScope {

val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)

Array.concat(results: _*)

}

您的shuffle那章有sortByKey的转化图,可是这些计算过程在哪呢?

关于章节 “1.总体介绍” 中 “Job 物理执行图” 中的问题

这一段提到 “比如 partition 99 里面只含有 9 个 records”。请问为什么 partition 99 里面有 9 个 records?

看前面提供的代码,在 flatMap 运行中进行初始化的时候,每个 partition 里应该是有 numKVPairs 个,也就是 10000 个 records.

这里说 9 个 records 是因为经过了 flatMap 操作后,可能只有 9 个 records 被分配到这个 partition 吗?

readme.md的content目录,英文版的link不统一

readme.md的content目录

前三章是:master/EnglishVersion/xxx.md

第四章开始是:master/markdown/english/xxx.md

似乎应该删掉其中一个,全部放到一个地方,没有重复的文件

如果能确定删掉哪一个文件夹,我可以提交一个PR 😄

hadoop distributedcache 应该是共享的

hadoop用distributedcache发布数据应该是共享的。
每次task开始前会初始化,这里面会包括distributedcache下载数据。
这个操作在每个node上只能顺序进行。多个task的话只能一个一个等。
如果已经下载过了就不会重复下载。

reduceByKey 函数 map 端 combine 的实现变化

非常感谢你的文章!我在阅读学习的时候也做了一些demo,想请教一个问题:

在第二章 Job 逻辑执行图 —— 逻辑图的生成 这个部分中,您有提到 实际 RDD 个数比我们想象的多一些 我参照 groupByKeyreduceByKey 分别做了两个实验,发现结果和预期不一致(请见下图和我的实验)。结果都只是产生了 “ParallelCollectionRDD” 和 "ShuffledRDD" 两种,并没有看到中间过程的RDD。我比较了源码中PariRDDFunctions.scala里的实现,发现果然已经有变化了。

请问现在的 map 端的 combine 工作是怎么实现的?

Job 逻辑执行图:
groupbykey
reducebykey

我的实验代码:(spark 2.1.0)

object Test {
  def main(args: Array[String]) {
    val sc = new SparkContext("local[2]", "Test")

    val data = Array[(Char, Int)](('A', 1), ('B', 1), ('C', 1), ('B', 1), ('C', 1), ('D', 1), ('C', 1), ('A', 1))
    val a = sc.parallelize(data, 3)

    val groupByKeyRDD = a.groupByKey()
    val reduceByKeyRDD = a.reduceByKey(_ + _)

    reduceByKeyRDD.foreach(println)
    groupByKeyRDD.foreach(println)

    println(groupByKeyRDD.toDebugString)
    println(reduceByKeyRDD.toDebugString)

    sc.stop()
  }
}

输出结果:

output

Please add README.md to markdown/thai

This would be a great addition to the tahi books list at Free-Programming-Books. But the proposed link there makes it difficult to find the thai resource, and a link directly to markdown/thai doesn't help the user navigate the book. A thai language README page in that directory would solve the problem.

Would it be possible to have this in English?

I have checked your documentation (images, examples and with help of a translator) and it looks pretty awesome, but I wonder if possible we could have it in English, because I thinks it worths.

Thankyou.

关于CogroupRDD的一点疑问以及依赖的一点问题

我看CogroupRDD的实现,没看懂narrowdependency或shuffledependency对cogrouprdd中partition的影响... 不知道如果a.cogroup(b) , a分别是rangepartitioner和hashpartitioner的话,中间生成的cogrouprdd的分区数莫非和rdd a的一样多?因为cogroup这个算子不能指定numPartitons呀
我看您在JobLogicalPlan章节中对dependency分了4类(或者说两打类), 而且看cogroupRDD的对于依赖的处理,似乎并没有这么复杂,完全无视了所谓的N:1 NarrowDependency。

override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
val sparkConf = SparkEnv.get.conf
val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = dependencies.length

// A list of (rdd iterator, dependency number) pairs
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
  case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked =>
    val dependencyPartition = split.narrowDeps(depNum).get.split
    // Read them from the parent
    val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
    rddIterators += ((it, depNum))

  case shuffleDependency: ShuffleDependency[_, _, _] =>
    // Read map outputs of shuffle
    val it = SparkEnv.get.shuffleManager
      .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context)
      .read()
    rddIterators += ((it, depNum))
}

关于PairRDD的key类型问题

在Job 逻辑执行图中提到的

对 RDD 进行一系列的 transformation() 操作,每一个 transformation() 会产生一个或多个包含不同类型 T 的 RDD[T]。T 可以是 Scala 里面的基本类型或数据结构,不限于 (K, V)。但如果是 (K, V),K 不能是 Array 等复杂类型(因为难以在复杂类型上定义 partition 函数)。

我看1.1.0的HashPartitioner中的注释有提到,

Java arrays have hashCodes that are based on the arrays' identities rather than their contents,
so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
produce an unexpected or incorrect result.

Array仅是个特例,并不是所有复杂类型都不能使用吧

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.