Giter Site home page Giter Site logo

aronchung's People

Watchers

 avatar

aronchung's Issues

Kylin优化 --- Build Cube

image
从维度ABCD开始,依次基于上一层Cuboid的结果进行再聚合。每一层的计算都是一个单独的Map Reduce任务

cube构建常注意的几个数

可以看出,cuboid越多,build cube就越慢,可通过命令查看cuboid个数:

./kylin.sh org.apache.kylin.engine.mr.common.CubeStatsReader [cubeName]

image
在这里可以看出,该cube在构建之后得到的cuboids为95,大小约6032mb

Build区间的数据源大小,可以在下图查看:
image
用2.47266 * 1024 / 630% 大概为 400mb

在调整:Mandatory Dimensions、Hierarchy Dimensions、Joint Dimensions进行降维之后,通过在kylin web gui 的model界面选择一个READY状态的Cube,将光标移到Cube Size上面,会显示出Cube源数据的大小,以及当前Cube的大小除以源数据大小的比例
image
一般Cube的膨胀率应该在0%-1000%之间,如果Cube的膨胀率超过了1000%,那么就需要查询其中的原因了,导致膨胀率高的原因一般为以下几点:

  1. Cube的维度数量较多,没有进行很好的剪枝降维
  2. Cube中存在较高基数的维度,导致这类维度每个Cuboid占用的空间很大,从而造成Cube体积变大;
  3. 存在比较占用空间的度量。

Cube体积直接影响整个build性能,所以在创建时需再三注意有无可减少的度量和维度

如果有10个维度,那么就会生成2^10=1024个Cuboid,如果有20个维度那么将会生成2^20=1048576个Cuboid,是指数级增长,kylin.properties中参数_ kylin.cube.aggrgroup.max-combination=4096,也就是说当Cuboid数量大于4096时, Cube定义是无法保存的的,会报TooManyCuboidException异常。所以默认维度不能超过12个,若非得超过12个,那必须剪枝降维。

调优步骤(基于Advanced Setting降维处理后性能依旧差)

Step1:Create Intermediate Flat Hive Table

从hive表抽取数据插入到临时表中,因此小文件不宜过多,注意小文件问题,可以在kylin_hive_conf.properties中hive.merge.mapfiles设置为true

<name>hive.merge.mapfiles</name>
<value>false</value>

Step2:Redistribute Flat Hive Table

经过step1,hive会在hdfs中生成一些数据文件,但文件大小分布不一定均衡,会导致MR执行不均匀,从log中可以看到kylin会重新分配:

total input rows = 159869711
expected input rows per mapper = 1000000
num reducers for RedistributeFlatHiveTableStep = 160

由于我们每次build的数据量并不大,想要更多的并发数,应该将kylin.job.mapreduce.mapper.input.rows设小一些,增加reducers数量。
官网提到,若cube指定了高基数的列,如user_id、order_id,通过设置shard by为true,kylin会让hive根据该列的相同的值分发到同一个文件,比随机分发要更好,做了预先分类,能减少build时长,但我接触的测试用例没有类似字段,大家可以试一试。(低基数不要用这种方式,会造成数据倾斜)

Step3:Extract Fact Table Distinct Columns

从cube的设计上来优化,包括各种降维,在这里没有合适的可配置的高级参数

Step4:Build Dimension Dictionary

这一步很快,可忽略

Step5:Save Cuboid Statistics

这一步很快,可忽略

Step6:Create HTable

这一步很快,可忽略

Step7:Build Cube with Spark

kylin.engine.spark.rdd-partition-cut-mb = 200

1
直接影响spark的分区数,首先大概清楚cuboid数据总大小,例如6032mb,那么partitions = 6032/10 = 600,即会产生600个小文件,随后在step8跑mr时,就会拉起600个map,使得服务器负载骤增,发生报警。

调整参数后执行,如何跟踪服务器情况?只需跟踪step8的mr情况即可

  1. http://bgnode1:8088/cluster/scheduler 找到对应的application
  2. %of Cluster是否过高
  3. 进入详情查看map数量是否过多,若过多则很可能是rdd-partition-cut-mb设置过小导致,此时要调大参数

因此,合理的调整rdd-partition-cut-mb能防止机器报警

这一步是对每一层的cuboid依次进行计算并写入hdfs,耗时会比较长
2

kylin.engine.spark-conf.spark.executor.cores = 2
kylin.engine.spark-conf.spark.executor.memory = 4G

这两个参数自行根据数据大小来调整,cores和memory 都不是越大越好,需根据要build的数据量,再三调整测试最优值。

Step8:Convert Cuboid Data to HFile

kylin.storage.hbase.region-cut-gb = 1

可根据hbase的实际region情况,设置hbase每一个region的大小,默认是5G。
此参数越小,region数越多,reducer就越多
数据对象是所有cuboid的总内存:3

Step9:Load HFile to HBase Table

这一步很快,可忽略

Step10:Update Cube Info

这一步很快,可忽略

Step11:Hive Cleanup

这一步很快,可忽略

总结

罗列几个显著影响性能的参数:

kylin.engine.spark.rdd-partition-cut-mb = 200
kylin.engine.spark-conf.spark.executor.memory = 4G
kylin.storage.hbase.region-cut-gb = 5

Kylin官网提供的在前端可以设置的参数:
http://kylin.apache.org/docs/install/configuration.html

Kylin是基于mr与spark来做数据处理,所以kylin的调优从某种角度看来就是调用的组件或引擎能最好的适应当前进程的调优。

查kylin性能的基本思路:
通过./kylin.sh org.apache.kylin.engine.mr.common.CubeStatsReader [cubeName]
查看cuboid总大小,然后再根据以下步骤进行对参数的调整:

  1. 3种降维选项是否有根据业务实际情况调整
  2. 有没合理的使用分区列
  3. 小文件是否过多
  4. reducer是否过少
  5. spark partitions是否合适
  6. spark的excutor内存和cores数量分配是否合理
  7. cuboid文件转hfile时,map是否过多,reducer是否过少(region块的大小和数量是否合理)

Zookeeper---集群节点数量为什么要是奇数个?

Zookeeper集群节点数量为什么要是奇数个?
至今看到解释得比较好的文章 引用一下:
原文:https://blog.csdn.net/u010476994/article/details/79806041

无论是公司的生产环境,还是自己搭建的测试环境,Zookeeper集群的节点个数都是奇数个。至于为什么要是奇数个,以前只是模糊的知道是为了满足选举需要,并不知道详细的原因。最近重点学习zookeeper,了解到其中的原理,现将其整理记录下来。

首先需要明确zookeeper选举的规则:leader选举,要求 可用节点数量 > 总节点数量/2  。注意 是 > , 不是 ≥。

注:为什么规则要求 可用节点数量 > 集群总结点数量/2 ?  如果不这样限制,在集群出现脑裂的时候,可能会出现多个子集群同时服务的情况(即子集群各组选举出自己的leader), 这样对整个zookeeper集群来说是紊乱的。

换句话说,如果遵守上述规则进行选举,即使出现脑裂,集群最多也只能回出现一个子集群可以提供服务的情况(能满足节点数量> 总结点数量/2 的子集群最多只会有一个)。所以要限制 可用节点数量 > 集群总结点数量/2 。

采用奇数个的节点主要是出于两方面的考虑:

1、防止由脑裂造成的集群不可用。

首先,什么是脑裂?集群的脑裂通常是发生在节点之间通信不可达的情况下,集群会分裂成不同的小集群,小集群各自选出自己的master节点,导致原有的集群出现多个master节点的情况,这就是脑裂。

下面举例说一下为什么采用奇数台节点,就可以防止由于脑裂造成的服务不可用:

(1) 假如zookeeper集群有 5 个节点,发生了脑裂,脑裂成了A、B两个小集群: 

     (a) A : 1个节点 ,B :4个节点 , 或 A、B互换

     (b) A : 2个节点, B :3个节点  , 或 A、B互换

    可以看出,上面这两种情况下,A、B中总会有一个小集群满足 可用节点数量 > 总节点数量/2 。所以zookeeper集群仍然能够选举出leader , 仍然能对外提供服务,只不过是有一部分节点失效了而已。

(2) 假如zookeeper集群有4个节点,同样发生脑裂,脑裂成了A、B两个小集群:

    (a) A:1个节点 ,  B:3个节点,   或 A、B互换 

    (b) A:2个节点 , B:2个节点

    可以看出,情况(a) 是满足选举条件的,与(1)中的例子相同。 但是情况(b) 就不同了,因为A和B都是2个节点,都不满足 可用节点数量 > 总节点数量/2 的选举条件, 所以此时zookeeper就彻底不能提供服务了。

综合上面两个例子可以看出: 在节点数量是奇数个的情况下, zookeeper集群总能对外提供服务(即使损失了一部分节点);如果节点数量是偶数个,会存在zookeeper集群不能用的可能性(脑裂成两个均等的子集群的时候)。

在生产环境中,如果zookeeper集群不能提供服务,那将是致命的 , 所以zookeeper集群的节点数一般采用奇数个。

2、在容错能力相同的情况下,奇数台更节省资源。

leader选举,要求 可用节点数量 > 总节点数量/2  。注意 是 > , 不是 ≥。

举两个例子:

(1) 假如zookeeper集群1 ,有3个节点,3/2=1.5 ,  即zookeeper想要正常对外提供服务(即leader选举成功),至少需要2个节点是正常的。换句话说,3个节点的zookeeper集群,允许有一个节点宕机。

(2) 假如zookeeper集群2,有4个节点,4/2=2 , 即zookeeper想要正常对外提供服务(即leader选举成功),至少需要3个节点是正常的。换句话说,4个节点的zookeeper集群,也允许有一个节点宕机。

那么问题就来了, 集群1与集群2都有 允许1个节点宕机 的容错能力,但是集群2比集群1多了1个节点。在相同容错能力的情况下,本着节约资源的原则,zookeeper集群的节点数维持奇数个更好一些。

Spark---内存管理

内存管理

在执行 Spark 的应用程序时,Spark 集群会启动 Driver 和 Executor 两种 JVM 进程,前者为主控进程,负责创建 Spark 上下文,提交 Spark 作业(Job),并将作业转化为计算任务(Task),在各个 Executor 进程间协调任务的调度,后者负责在工作节点上执行具体的计算任务,并将结果返回给 Driver,同时为需要持久化的 RDD 提供存储功能

1. 堆内和堆外内存规划

作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。
clipboard

1.1 堆内内存

    堆内内存的大小,由 Spark 应用程序启动时的 –executor-memory 或 spark.executor.memory 参数配置。Executor 内运行的并发任务共享 JVM 堆内内存,这些任务在缓存 RDD 数据和广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存,而这些任务在执行 Shuffle 时占用的内存被规划为执行(Execution)内存,剩余的部分不做特殊规划,那些 Spark 内部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用剩余的空间。不同的管理模式下,这三部分占用的空间大小各不相同
    我们知道,JVM 的对象可以以序列化的方式存储,序列化的过程是将对象转换为二进制字节流,本质上可以理解为将非连续空间的链式存储转化为连续空间或块存储,在访问时则需要进行序列化的逆过程——反序列化,将字节流转化为对象,序列化的方式可以节省存储空间,但增加了存储和读取时候的计算开销。
    对于 Spark 中序列化的对象,由于是字节流的形式,其占用的内存大小可直接计算,而对于非序列化的对象,其占用的内存是通过周期性地采样近似估算而得,即并不是每次新增的数据项都会计算一次占用的内存大小,这种方法降低了时间开销但是有可能误差较大,导致某一时刻的实际内存有可能远远超出预期[2]。此外,在被 Spark 标记为释放的对象实例,很有可能在实际上并没有被 JVM 回收,导致实际可用的内存小于 Spark 记录的可用内存。所以 Spark 并不能准确记录实际可用的堆内内存,从而也就无法完全避免内存溢出(OOM, Out of Memory)的异常。
    虽然不能精准控制堆内内存的申请和释放,但 Spark 通过对存储内存和执行内存各自独立的规划管理,可以决定是否要在存储内存里缓存新的 RDD,以及是否为新的任务分配执行内存,在一定程度上可以提升内存的利用率,减少异常的出现。

1.2 堆外内存

    为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。利用 JDK Unsafe API(从 Spark 2.0 开始,在管理堆外的存储内存时不再基于 Tachyon,而是与堆外的执行内存一样,基于 JDK Unsafe API 实现[3]),Spark 可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。堆外内存可以被精确地申请和释放,而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。
    在默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。

1.3 内存管理接口

Spark 为存储内存和执行内存的管理提供了统一的接口——MemoryManager,同一个 Executor 内的任务都调用这个接口的方法来申请或释放内存:
清单 1 . 内存管理接口的主要方法
//申请存储内存
def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
//申请展开内存
def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
//申请执行内存
def acquireExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long
//释放存储内存
def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit
//释放执行内存
def releaseExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Unit
//释放展开内存
def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit

我们看到,在调用这些方法时都需要指定其内存模式(MemoryMode),这个参数决定了是在堆内还是堆外完成这次操作。
MemoryManager 的具体实现上,Spark 1.6 之后默认为统一管理(Unified Memory Manager)方式,1.6 之前采用的静态管理(Static Memory Manager)方式仍被保留,可通过配置 spark.memory.useLegacyMode 参数启用。两种方式的区别在于对空间分配的方式,下面的第 2 小节会分别对这两种方式进行介绍。

2 . 内存空间分配

2.1 静态内存管理

在 Spark 最初采用的静态内存管理机制下,存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置,堆内内存的分配如图 2 所示:
图 2 . 静态内存管理图示——堆内
clipboard2
清单 2 . 可用堆内内存空间
可用的存储内存 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safetyFraction
可用的执行内存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction

    其中 systemMaxMemory 取决于当前 JVM 堆内内存的大小,最后可用的执行内存或者存储内存要在此基础上与各自的 memoryFraction 参数和 safetyFraction 参数相乘得出。上述计算公式中的两个 safetyFraction 参数,其意义在于在逻辑上预留出 1-safetyFraction 这么一块保险区域,降低因实际内存超出当前预设范围而导致 OOM 的风险(上文提到,对于非序列化对象的内存采样估算会产生误差)。值得注意的是,这个预留的保险区域仅仅是一种逻辑上的规划,在具体使用时 Spark 并没有区别对待,和"其它内存"一样交给了 JVM 去管理。

    堆外的空间分配较为简单,只有存储内存和执行内存,如图 3 所示。可用的执行内存和存储内存占用的空间大小直接由参数 spark.memory.storageFraction 决定,由于堆外内存占用的空间可以被精确计算,所以无需再设定保险区域。
图 3 . 静态内存管理图示——堆外
clipboard3
    静态内存管理机制实现起来较为简单,但如果用户不熟悉 Spark 的存储机制,或没有根据具体的数据规模和计算任务或做相应的配置,很容易造成"一半海水,一半火焰"的局面,即存储内存和执行内存中的一方剩余大量的空间,而另一方却早早被占满,不得不淘汰或移出旧的内容以存储新的内容。由于新的内存管理机制的出现,这种方式目前已经很少有开发者使用,出于兼容旧版本的应用程序的目的,Spark 仍然保留了它的实现。

2.2 统一内存管理

    Spark 1.6 之后引入的统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域,如图 4 和图 5 所示
图 4 . 统一内存管理图示——堆内
clipboard4
图 5 . 统一内存管理图示——堆外
clipboard5
其中最重要的优化在于动态占用机制,其规则如下:

  • 设定基本的存储内存和执行内存区域(spark.storage.storageFraction 参数),该设定确定了双方各自拥有的空间的范围
  • 双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)
  • 执行内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后"归还"借用的空间
  • 存储内存的空间被对方占用后,无法让对方"归还",因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂

图 6 . 动态占用机制图示
clipboard6
    凭借统一内存管理机制,Spark 在一定程度上提高了堆内和堆外内存资源的利用率,降低了开发者维护 Spark 内存的难度,但并不意味着开发者可以高枕无忧。譬如,所以如果存储内存的空间太大或者说缓存的数据过多,反而会导致频繁的全量垃圾回收,降低任务执行时的性能,因为缓存的 RDD 数据通常都是长期驻留内存的 。所以要想充分发挥 Spark 的性能,需要开发者进一步了解存储内存和执行内存各自的管理方式和实现原理。

3. 存储内存管理

3.1 RDD 的持久化机制

    弹性分布式数据集(RDD)作为 Spark 最根本的数据抽象,是只读的分区记录(Partition)的集合,只能基于在稳定物理存储中的数据集上创建,或者在其他已有的 RDD 上执行转换(Transformation)操作产生一个新的 RDD。转换后的 RDD 与原始的 RDD 之间产生的依赖关系,构成了血统(Lineage)。凭借血统,Spark 保证了每一个 RDD 都可以被重新恢复。但 RDD 的所有转换都是惰性的,即只有当一个返回结果给 Driver 的行动(Action)发生时,Spark 才会创建任务读取 RDD,然后真正触发转换的执行。
    Task 在启动之初读取一个分区时,会先判断这个分区是否已经被持久化,如果没有则需要检查 Checkpoint 或按照血统重新计算。所以如果一个 RDD 上要执行多次行动,可以在第一次行动中使用 persist 或 cache 方法,在内存或磁盘中持久化或缓存这个 RDD,从而在后面的行动时提升计算速度。事实上,cache 方法是使用默认的 MEMORY_ONLY 的存储级别将 RDD 持久化到内存,故缓存是一种特殊的持久化。 堆内和堆外存储内存的设计,便可以对缓存 RDD 时使用的内存做统一的规划和管 理 (存储内存的其他应用场景,如缓存 broadcast 数据,暂时不在本文的讨论范围之内)。
RDD 的持久化由 Spark 的 Storage 模块 [7] 负责,实现了 RDD 与物理存储的解耦合。Storage 模块负责管理 Spark 在计算过程中产生的数据,将那些在内存或磁盘、在本地或远程存取数据的功能封装了起来。在具体实现时 Driver 端和 Executor 端的 Storage 模块构成了主从式的架构,即 Driver 端的 BlockManager 为 Master,Executor 端的 BlockManager 为 Slave。Storage 模块在逻辑上以 Block 为基本存储单位,RDD 的每个 Partition 经过处理后唯一对应一个 Block(BlockId 的格式为 rdd_RDD-ID_PARTITION-ID )。Master 负责整个 Spark 应用程序的 Block 的元数据信息的管理和维护,而 Slave 需要将 Block 的更新等状态上报到 Master,同时接收 Master 的命令,例如新增或删除一个 RDD。
图 7 . Storage 模块示意图
clipboard7
在对 RDD 持久化时,Spark 规定了 MEMORY_ONLY、MEMORY_AND_DISK 等 7 种不同的 存储级别 :
8
而存储级别是以下 5 个变量的组合:
9
通过对数据结构的分析,可以看出存储级别从三个维度定义了 RDD 的 Partition(同时也就是 Block)的存储方式:

  • 存储位置:磁盘/堆内内存/堆外内存。如 MEMORY_AND_DISK 是同时在磁盘和堆内内存上存储,实现了冗余备份。OFF_HEAP 则是只在堆外内存存储,目前选择堆外内存时不能同时存储到其他位置。
  • 存储形式:Block 缓存到存储内存后,是否为非序列化的形式。如 MEMORY_ONLY 是非序列化方式存储,OFF_HEAP 是序列化方式存储。
  • 副本数量:大于 1 时需要远程冗余备份到其他节点。如 DISK_ONLY_2 需要远程备份 1 个副本

3.2 RDD 缓存的过程

    RDD 在缓存到存储内存之前,Partition 中的数据一般以迭代器(Iterator)的数据结构来访问,这是 Scala 语言中一种遍历数据集合的方法。通过 Iterator 可以获取分区中每一条序列化或者非序列化的数据项(Record),这些 Record 的对象实例在逻辑上占用了 JVM 堆内内存的 other 部分的空间,同一 Partition 的不同 Record 的空间并不连续。
    RDD 在缓存到存储内存之后,Partition 被转换成 Block,Record 在堆内或堆外存储内存中占用一块连续的空间。将Partition由不连续的存储空间转换为连续存储空间的过程,Spark称之为"展开"(Unroll)。Block 有序列化和非序列化两种存储格式,具体以哪种方式取决于该 RDD 的存储级别。非序列化的 Block 以一种 DeserializedMemoryEntry 的数据结构定义,用一个数组存储所有的对象实例,序列化的 Block 则以 SerializedMemoryEntry的数据结构定义,用字节缓冲区(ByteBuffer)来存储二进制数据。每个 Executor 的 Storage 模块用一个链式 Map 结构(LinkedHashMap)来管理堆内和堆外存储内存中所有的 Block 对象的实例[6],对这个 LinkedHashMap 新增和删除间接记录了内存的申请和释放。
    因为不能保证存储空间可以一次容纳 Iterator 中的所有数据,当前的计算任务在 Unroll 时要向 MemoryManager 申请足够的 Unroll 空间来临时占位,空间不足则 Unroll 失败,空间足够时可以继续进行。对于序列化的 Partition,其所需的 Unroll 空间可以直接累加计算,一次申请。而非序列化的 Partition 则要在遍历 Record 的过程中依次申请,即每读取一条 Record,采样估算其所需的 Unroll 空间并进行申请,空间不足时可以中断,释放已占用的 Unroll 空间。如果最终 Unroll 成功,当前 Partition 所占用的 Unroll 空间被转换为正常的缓存 RDD 的存储空间,如下图 8 所示。
图 8. Spark Unroll 示意图
10
在图 3 和图 5 中可以看到,在静态内存管理时,Spark 在存储内存中专门划分了一块 Unroll 空间,其大小是固定的,统一内存管理时则没有对 Unroll 空间进行特别区分,当存储空间不足时会根据动态占用机制进行处理。

3.3 淘汰和落盘

由于同一个 Executor 的所有的计算任务共享有限的存储内存空间,当有新的 Block 需要缓存但是剩余空间不足且无法动态占用时,就要对 LinkedHashMap 中的旧 Block 进行淘汰(Eviction),而被淘汰的 Block 如果其存储级别中同时包含存储到磁盘的要求,则要对其进行落盘(Drop),否则直接删除该 Block。

存储内存的淘汰规则为:

  • 被淘汰的旧 Block 要与新 Block 的 MemoryMode 相同,即同属于堆外或堆内内存
  • 新旧 Block 不能属于同一个 RDD,避免循环淘汰
  • 旧 Block 所属 RDD 不能处于被读状态,避免引发一致性问题
  • 遍历 LinkedHashMap 中 Block,按照最近最少使用(LRU)的顺序淘汰,直到满足新 Block所需的空间。其中 LRU 是 LinkedHashMap 的特性。

落盘的流程则比较简单,如果其存储级别符合_useDisk 为 true 的条件,再根据其_deserialized 判断是否是非序列化的形式,若是则对其进行序列化,最后将数据存储到磁盘,在 Storage 模块中更新其信息。

4. 执行内存管理

4.1 多任务间内存分配

Executor 内运行的任务同样共享执行内存,Spark 用一个 HashMap 结构保存了任务到内存耗费的映射。每个任务可占用的执行内存大小的范围为 1/2N ~ 1/N,其中 N 为当前 Executor 内正在运行的任务的个数。每个任务在启动之时,要向 MemoryManager 请求申请最少为 1/2N 的执行内存,如果不能被满足要求则该任务被阻塞,直到有其他任务释放了足够的执行内存,该任务才可以被唤醒。(Q:当任务数量增加,是否所有的任务执行内存都会受影响,这里是动态分配的?)

  • Shuffle Write
  1. 若在 map 端选择普通的排序方式,会采用 ExternalSorter 进行外排,在内存中存储数据时主要占用堆内执行空间。
  2. 若在 map 端选择 Tungsten 的排序方式,则采用 ShuffleExternalSorter 直接对以序列化形式存储的数据排序,在内存中存储数据时可以占用堆外或堆内执行空间,取决于用户是否开启了堆外内存以及堆外执行内存是否足够。
  • Shuffle Read
  1. 在对 reduce 端的数据进行聚合时,要将数据交给 Aggregator 处理,在内存中存储数据时占用堆内执行空间。
  2. 如果需要进行最终结果排序,则要将再次将数据交给 ExternalSorter 处理,占用堆内执行空间。

在 ExternalSorter 和 Aggregator 中,Spark 会使用一种叫 AppendOnlyMap 的哈希表在堆内执行内存中存储数据,但在 Shuffle 过程中所有数据并不能都保存到该哈希表中,当这个哈希表占用的内存会进行周期性地采样估算,当其大到一定程度,无法再从 MemoryManager 申请到新的执行内存时,Spark 就会将其全部内容存储到磁盘文件中,这个过程被称为溢存(Spill),溢存到磁盘的文件最后会被归并(Merge)。

    Shuffle Write 阶段中用到的 Tungsten 是 Databricks 公司提出的对 Spark 优化内存和 CPU 使用的计划[9],解决了一些 JVM 在性能上的限制和弊端。Spark 会根据 Shuffle 的情况来自动选择是否采用 Tungsten 排序。Tungsten 采用的页式内存管理机制建立在 MemoryManager 之上,即 Tungsten 对执行内存的使用进行了一步的抽象,这样在 Shuffle 过程中无需关心数据具体存储在堆内还是堆外。每个内存页用一个 MemoryBlock 来定义,并用 Object obj 和 long offset 这两个变量统一标识一个内存页在系统内存中的地址。堆内的 MemoryBlock 是以 long 型数组的形式分配的内存,其 obj 的值为是这个数组的对象引用,offset 是 long 型数组的在 JVM 中的初始偏移地址,两者配合使用可以定位这个数组在堆内的绝对地址;堆外的 MemoryBlock 是直接申请到的内存块,其 obj 为 null,offset 是这个内存块在系统内存中的 64 位绝对地址。Spark 用 MemoryBlock 巧妙地将堆内和堆外内存页统一抽象封装,并用页表(pageTable)管理每个 Task 申请到的内存页。

Tungsten 页式管理下的所有内存用 64 位的逻辑地址表示,由页号和页内偏移量组成:

  • 页号:占 13 位,唯一标识一个内存页,Spark 在申请内存页之前要先申请空闲页号。
  • 页内偏移量:占 51 位,是在使用内存页存储数据时,数据在页内的偏移地址。

    有了统一的寻址方式,Spark 可以用 64 位逻辑地址的指针定位到堆内或堆外的内存,整个 Shuffle Write 排序的过程只需要对指针进行排序,并且无需反序列化,整个过程非常高效,对于内存访问效率和 CPU 使用效率带来了明显的提升[10]。
    Spark 的存储内存和执行内存有着截然不同的管理方式:对于存储内存来说,Spark 用一个 LinkedHashMap 来集中管理所有的 Block,Block 由需要缓存的 RDD 的 Partition 转化而成;而对于执行内存,Spark 用 AppendOnlyMap 来存储 Shuffle 过程中的数据,在 Tungsten 排序中甚至抽象成为页式内存管理,开辟了全新的 JVM 内存管理机制。

azkaban---Web Server 高可用方案

azkaban webServer做ha需考虑以下几个问题:

  1. 二次开发的代码封装,避免更新版本带来冲突
  2. 采用zookeeper来做ha, 用Ephemeral短暂节点类型来进行抢占
import java.io.*;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.zookeeper.*;

public class ZkServer implements Watcher {
	private ZooKeeper zk;
	private static String serviceId = getHostName();
	private boolean isLeader = false;

	public String getServiceId() {
		return serviceId;
	}

	public void setServiceId(String serviceId) {
		this.serviceId = serviceId;
	}

	public static String getHostName() {
		if (System.getenv("COMPUTERNAME") != null) {
			return System.getenv("COMPUTERNAME");
		} else {
			return getHostNameForLiunx();
		}
	}

	public static String getHostNameForLiunx() {
		try {
			return (InetAddress.getLocalHost()).getHostName();
		} catch (UnknownHostException uhe) {
			String host = uhe.getMessage(); // host = "hostname: hostname"
			if (host != null) {
				int colon = host.indexOf(':');
				if (colon > 0) {
					return host.substring(0, colon);
				}
			}
			return "UnknownHost";
		}
	}

	public void startZk() throws IOException {
		zk = new ZooKeeper("bgtest3:2181,bgtest4:2181,bgtest5:2181", 5000, this);
	}

	@Override
	public void process(WatchedEvent event) {
		System.out.println("process");

	}

	public void stopZk() throws InterruptedException {
		zk.close();
	}

	public boolean checkMaster() throws InterruptedException {
		while (true) {
			try {
				Stat stat = new Stat();
				byte data[] = zk.getData("/azkaban", false, stat);
				isLeader = new String(data, "utf-8").equals(getServiceId());
				return isLeader;
			} catch (KeeperException.NoNodeException e) {
				return false;
			} catch (KeeperException e) {
				e.printStackTrace();
			} catch (UnsupportedEncodingException e) {
				e.printStackTrace();
			}
		}
	}

	public void runForMaster() throws InterruptedException {
		while (true) {
			try {
				zk.create("/azkaban", getServiceId().getBytes("utf-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE,
						CreateMode.EPHEMERAL);
				isLeader = true;
				break;
			} catch (KeeperException.NodeExistsException e) {
				isLeader = false;
				break;
			} catch (KeeperException e) {
				e.printStackTrace();
			} catch (UnsupportedEncodingException e) {
				e.printStackTrace();
			}
			if (checkMaster()) {
				break;
			}
		}
	}
}
  1. 任务定时1分钟轮询调度,起多个webServer需避免重复调度问题
    在定时执行的run方法中,设置一个开关,若当前节点是leader,则开放轮询调度
@Override
public void run() {
	ZkServer master = new ZkServer();
	try {
		master.startZk();
	} catch (IOException e1) {
		try {
			master.stopZk();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		e1.printStackTrace();
	}
	while (!this.shutdown) {
		boolean isLeader = false;
		try {
			master.runForMaster();
			isLeader = master.checkMaster();
		} catch (InterruptedException e2) {
			e2.printStackTrace();
		}				
	
                //为保证多个webServer的数据一致,在此刷新triggers以及对应的map
		reloadTriggersAndMap();
		if(!isLeader) {                        
			logger.info("----this is not a leader----");
		}
		synchronized (TriggerManager.this.syncObj) {
			try {
				TriggerManager.this.lastRunnerThreadCheckTime = System.currentTimeMillis();

				TriggerManager.this.scannerStage = "Ready to start a new scan cycle at "
						+ TriggerManager.this.lastRunnerThreadCheckTime;

				try {
					if(isLeader) {
						logger.info("----this is a leader----");
						checkAllTriggers();	
					}
				} catch (final Exception e) {
					e.printStackTrace();
					logger.error(e.getMessage());
				} catch (final Throwable t) {
					t.printStackTrace();
					logger.error(t.getMessage());
				}

				TriggerManager.this.scannerStage = "Done flipping all triggers.";

				TriggerManager.this.runnerThreadIdleTime = this.scannerInterval
						- (System.currentTimeMillis() - TriggerManager.this.lastRunnerThreadCheckTime);

				if (TriggerManager.this.runnerThreadIdleTime < 0) {
					logger.error("Trigger manager thread " + this.getName() + " is too busy!");
				} else {
					TriggerManager.this.syncObj.wait(TriggerManager.this.runnerThreadIdleTime);
				}
			} catch (final InterruptedException e) {
				try {
					master.stopZk();
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
				logger.info("Interrupted. Probably to shut down.");
			}
		}
	}
}

private void reloadTriggersAndMap() {
	  try {
		  Map<Integer, Trigger> result = new ConcurrentHashMap<>();
			List<Trigger> triggers = triggerLoader.loadTriggers();
			for (final Trigger t : triggers) {
				if(triggerIdMap.get(t.getTriggerId()) != null) {
					result.put(t.getTriggerId(), triggerIdMap.get(t.getTriggerId()));
				}else {
					//all append on triggers
					runnerThread.addTrigger(t);
					result.put(t.getTriggerId(), t);
				}
			}
			Set<Integer> triggerIdMapKeys = triggerIdMap.keySet();
			Set<Integer> resultKeys = result.keySet();
			Set<Integer> differenceSet = Sets.difference(triggerIdMapKeys, resultKeys);
			differenceSet.forEach(o -> {
				runnerThread.deleteTrigger(triggerIdMap.get(o));
			});
			triggerIdMap.clear();
			triggerIdMap.putAll(result);
			
		} catch (final Exception e) {
			logger.error(e);
		}
  }
  1. 如何更新缓存队列
    在WebServer启动时,大部分数据都已事先加载到内存中,所以会出现master在操作UI后,slave没有对应的变更,若master宕机,那么用户操作slave将会看到数据不一致的情况。

根据公司使用的高频模块,只对两个地方做了处理:Projects、Scheduling

  • Projects:
    将原有的loadAllProjects()私有方法改为公有,即public void loadAllProjects()
    随后在每次点击首页的任意三个模块时,都重加载一遍projects:
    manager.loadAllProjects();
    if (hasParam(req, "all")) {
      final List<Project> projects = manager.getProjects();
      page.add("viewProjects", "all");
      page.add("projects", projects);      
      
    } else if (hasParam(req, "group")) {
      final List<Project> projects = manager.getGroupProjects(user);
      page.add("viewProjects", "group");
      page.add("projects", projects);
    } else {
      final List<Project> projects = manager.getUserProjects(user);
      page.add("viewProjects", "personal");
      page.add("projects", projects);    
    }
  • Scheduling:
    这里有几个地方需要注意:
    对于这里的缓存队列,采取判断删除或新增的做法,全量重载的话万一数据多大,会影响性能,上面Projects也一样。
  1. flow的创建
public synchronized void insertSchedule(final Schedule s) {
	Schedule schedule = null;
	try {
		schedule = this.getScheduleById(s.getScheduleId());
	} catch (TriggerLoaderException | ScheduleManagerException e1) {
		e1.printStackTrace();
	}
	
    Schedule exist = this.scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
    if(schedule == null && exist != null) {
    	this.scheduleIdentityPairMap.remove(s.getScheduleIdentityPair());
    	exist = null;
	}
    
    if (s.updateTime()) {
      try {
        if (exist == null) {
          this.loader.insertSchedule(s);
          internalSchedule(s);
        } else {
          s.setScheduleId(exist.getScheduleId());
          this.loader.updateSchedule(s);
          internalSchedule(s);
        }
      } catch (final ScheduleManagerException e) {
        logger.error(e);
      }
    } else {
      logger
          .error("The provided schedule is non-recurring and the scheduled time already passed. "
              + s.getScheduleName());
    }
  }
  1. flow的删除
    在remove时,增加一个schedule的校验,若db存在而scheduleIDMap不存在,则将该schedule加入scheduleIDMap、scheduleIdentityPairMap、runnerThread.addTrigger、triggerIdMap,从而不影响接下去的代码
这是ScheduleServlet.java中的ajaxRemoveSched方法的其中一段:

    final int scheduleId = getIntParam(req, "scheduleId");
    Schedule sched;
    try {
      sched = this.scheduleManager.getSchedule(scheduleId);
    } catch (final ScheduleManagerException e) {
      throw new ServletException(e);
    }
	if (sched == null) {
		try {
			sched = this.scheduleManager.getScheduleById(scheduleId);
			if(sched == null) {
				ret.put("message", "Schedule with ID " + scheduleId + " does not exist");
				ret.put("status", "error");
				return;
			}
			this.scheduleManager.supplementSchedule(sched);
		} catch (TriggerLoaderException | ScheduleManagerException e) {
			e.printStackTrace();
		}
	}



  /**
   * 在ScheduleManager.java中添加该方法
   * supplement schedule which is not in maps
   */
  public void supplementSchedule(Schedule schedule) {
	  //scheduleIDMap scheduleIdentityPairMap
	  this.internalSchedule(schedule);
	  //triggerIdMap triggers
	  this.loader.supplementTriggerById(schedule.getScheduleId());
  }

  /**
   * 在TriggerBasedScheduleLoader.java中添加该方法
   */
public void supplementTriggerById(int id) {
	Trigger t;
	try {
		t = this.triggerManager.getTriggerById(id);
		this.triggerManager.supplementTrigger(t);
	} catch (TriggerLoaderException e) {
		e.printStackTrace();
	}
}

  /**
   * 在TriggerManager.java中添加该方法
   */
public void supplementTrigger(Trigger t) {
	this.runnerThread.addTrigger(t);
	triggerIdMap.put(t.getTriggerId(), t);
}

kylin优化 --- Rowkey设计

为什么要优化Rowkey?

Apache Kylin使用HBase做为Cube的存储引擎。而HBase是Key-Value数据库,这个Key在HBase中称为Rowkey。为了能够支持按多个维度进行查询,Kylin需要将多个维度值以某种次序组成Rowkey。HBase Scan Range,排在Rowkey靠前部分的维度,将比排在靠后部分的维度更易于做筛选,查询效率更高。

那些维度适合排在前部分?

  1. 基数高的排前面
    在Load Hive Table时,勾选 Calculate column cardinality,即可在load时计算各字段的基数,并在Data Source -> Tables -> 点击相应的table查看
  2. 在查询中被用作过滤条件的维度放在非过滤条件维度的前面

除了各维度在Rowkey上的次序外,维度的编码方法对于空间占用及查询性能也有着显著的影响
合适的编码能减少维度对空间的占用,同时编码值也会加速查询过滤。

Kylin支持的编码类型如下:

Dict:使用字典将长的值映射成短的ID,适合中低基数的维度,默认推荐编码。但由于字典要被加载到Kylin内存中,在超高基情况下,可能引起内存不足的问题。
Fixed_Length:适用于超高基场景,将选取字段的前N个字节作为编码值,当N小于字段长度,会造成字段截断,当N较大时,造成RowKey过长,查询性能下降。只适用于varchar或nvarchar类型。
Fixed_Length_Hex:适用于字段值为十六进制字符,比如1A2BFF或者FF00FF,每两个字符需要一个字节。只适用于varchar或nvarchar类型。
Integer:将数值类型字段直接用数字表示,不做编码转换。Integer编码需要提供一个额外的参数"Length"来代表需要多少个字节。Length的长度为1到8,支持的整数区间为[ -2^(8*N-1), 2^(8*N-1)]
Date:将日期类型的数据使用三个字节进行编码,支持的格式包括yyyyMMdd、yyyy-MM-dd、yyyy-MM-dd HH:mm:ss、yyyy-MM-dd HH:mm:ss.SSS,其中如果包含时间戳部分会被截断。
Time:对时间戳字段进行编码,支持范围为[ 1970-01-01 00:00:00, 2038/01/19 03:14:07],毫秒部分会被忽略。time编码适用于time, datetime, timestamp等类型。
Boolean:用一个byte表示布尔值,适用于字段值为:  true, false, TRUE, FALSE, True, False, t, f, T, F, yes, no, YES, NO, Yes, No, y, n, Y, N, 1, 0

参考:https://mp.weixin.qq.com/s/tnYRNkSAFwoyJUknZnzp8w

HBase---Hbase split的三种方式和split的过程

引用:https://www.cnblogs.com/niurougan/p/3976519.html

Hbase是通过把数据分配到一定数量的region来达到负载均衡的。一个table会被分配到一个或多个region中,这些region会被分配到一个或者多个regionServer中。在自动split策略中,当一个region达到一定的大小就会自动split成两个region。table在region中是按照row key来排序的,并且一个row key所对应的行只会存储在一个region中,这一点保证了Hbase的强一致性 。

Pre-splitting

当一个table刚被创建的时候,Hbase默认的分配一个region给table。也就是说这个时候,所有的读写请求都会访问到同一个regionServer的同一个region中,这个时候就达不到负载均衡的效果了,集群中的其他regionServer就可能会处于比较空闲的状态。解决这个问题可以用pre-splitting,在创建table的时候就配置好,生成多个region。

在table初始化的时候如果不配置的话,Hbase是不知道如何去split region的,因为Hbase不知道应该那个row key可以作为split的开始点。如果我们可以大概预测到row key的分布,我们可以使用pre-spliting来帮助我们提前split region。不过如果我们预测得不准确的话,还是可能导致某个region过热,被集中访问,不过还好我们还有auto-split。最好的办法就是首先预测split的切分点,做pre-splitting,然后后面让auto-split来处理后面的负载均衡。

Hbase自带了两种pre-split的算法,分别是 HexStringSplit 和 UniformSplit 。如果我们的row key是十六进制的字符串作为前缀的,就比较适合用HexStringSplit,作为pre-split的算法。例如,我们使用HexHash(prefix)作为row key的前缀,其中Hexhash为最终得到十六进制字符串的hash算法。我们也可以用我们自己的split算法。

在hbase shell 下:

hbase org.apache.hadoop.hbase.util.RegionSplitter pre_split_table HexStringSplit -c 10 -f f1

-c 10 的意思为,最终的region数目为10个;-f f1为创建一个那么为f1的 column family.

执行scan 'hbase:meta' 可以看到meta表中:
161750312375059

只截取了meta表中的2个region的记录(一共10个region),分别是rowkey范围是 '' ''19999999 和1999999933333332的region。

我们也可以自定义切分点,例如在hbase shell下使用如下命令:

create 't1', 'f1', {SPLITS => ['10', '20', '30', '40']}

自动splitting

当一个reion达到一定的大小,他会自动split称两个region。如果我们的Hbase版本是0.94 ,那么默认的有三种自动split的策略,ConstantSizeRegionSplitPolicy,IncreasingToUpperBoundRegionSplitPolicy还有 KeyPrefixRegionSplitPolicy.

在0.94版本之前ConstantSizeRegionSplitPolicy 是默认和唯一的split策略。当某个store(对应一个column family)的大小大于配置值 ‘hbase.hregion.max.filesize’的时候(默认10G)region就会自动分裂。

而0.94版本中,IncreasingToUpperBoundRegionSplitPolicy 是默认的split策略。

这个策略中,最小的分裂大小和table的某个region server的region 个数有关,当store file的大小大于如下公式得出的值的时候就会split,公式如下

Min (R^2 * “hbase.hregion.memstore.flush.size”, “hbase.hregion.max.filesize”) R为同一个table中在同一个region server中region的个数。

例如:

hbase.hregion.memstore.flush.size 默认值 128MB。

hbase.hregion.max.filesize默认值为10GB 。

如果初始时R=1,那么Min(128MB,10GB)=128MB,也就是说在第一个flush的时候就会触发分裂操作。
当R=2的时候Min(22128MB,10GB)=512MB ,当某个store file大小达到512MB的时候,就会触发分裂。
如此类推,当R=9的时候,store file 达到10GB的时候就会分裂,也就是说当R>=9的时候,store file 达到10GB的时候就会分裂。

split 点都位于region中row key的中间点。

KeyPrefixRegionSplitPolicy可以保证相同的前缀的row保存在同一个region中。

指定rowkey前缀位数划分region,通过读取 KeyPrefixRegionSplitPolicy.prefix_length 属性,该属性为数字类型,表示前缀长度,在进行split时,按此长度对splitPoint进行截取。此种策略比较适合固定前缀的rowkey。当table中没有设置该属性,指定此策略效果等同与使用IncreasingToUpperBoundRegionSplitPolicy。

我们可以通过配置 hbase.regionserver.region.split.policy 来指定split策略,我们也可以写我们自己的split策略。

强制split

Hbase 允许客户端强制执行split,在hbase shell中执行以下命令:

split 'forced_table', 'b' //其中forced_table 为要split的table , ‘b’ 为split 点

region splits 执行过程:

region server处理写请求的时候,会先写入memstore,当memstore 达到一定大小的时候,会写入磁盘成为一个store file。这个过程叫做 memstore flush。当store files 堆积到一定大小的时候,region server 会 执行‘compact’操作,把他们合成一个大的文件。 当每次执行完flush 或者compact操作,都会判断是否需要split。当发生split的时候,会生成两个region A 和 region B但是parent region数据file并不会发生复制等操作,而是region A 和region B 会有这些file的引用。这些引用文件会在下次发生compact操作的时候清理掉,并且当region中有引用文件的时候是不会再进行split操作的。这个地方需要注意一下,如果当region中存在引用文件的时候,而且写操作很频繁和集中,可能会出现region变得很大,但是却不split。因为写操作比较频繁和集中,但是没有均匀到每个引用文件上去,所以region一直存在引用文件,不能进行分裂,这篇文章讲到了这个情况,总结得挺好的。http://koven2049.iteye.com/blog/1199519
0dcc57a4-d0b8-31b3-b99d-e1017dff0031

  1. 创建splitDir(region目录下的splits目录)
  2. 状态机添加CREATE_SPLIT_DIR
  3. 执行internalFlushcache把内存刷到磁盘
  4. close parent并且返回所有storefile
  5. 状态机添加CLOSED_PARENT_REGION
  6. 把region从rs的online列表中删除
  7. 状态机添加OFFLINED_PARENT
  8. 多线程进行split storefiles,创建子目录并把文件写进去,(原文件不删除,该过程默认超过30s会强行中.止并抛出IOE)
  9. 状态机添加STARTED_REGION_A_CREATION
  10. 创建第一个daughter region
  11. 状态机添加STARTED_REGION_B_CREATION
  12. 创建第二个daughter region
  13. 在meta表中下线parent
  14. 原子性往meta表中写以下信息:parent置为offline以及split状态,parent添加两列:splitA和splitB
  15. 并发open DaughterA和DaughterB(如果线程中断,则通知rs退出进程)
  16. 在open期间,如果server中止,则先把A和B的信息写入到meta表中再跳过以下过程
  17. 创建两个新的HRegion,通知rs把子region添加到online列表中
  18. 把Daughter信息写入meta表

当以上过程中任何一步抛出异常时,regionserver会进入回滚逻辑(rollback):
对状态机中存储己经进行的状态进行检查,并从后往前开始遍历己进行的状态:
CREATE_SPLIT_DIR:删除子目录
CLOSED_PARENT_REGION:重新初始化parent
STARTED_REGION_A_CREATION:删除A对应的目录
STARTED_REGION_B_CREATION:删除B对应的目录
OFFLINED_PARENT:把parent添加到online队列中

虽然split region操作是region server单独确定的,但是split过程必须和很多其他部件合作。region server 在split开始前和结束前通知master,并且需要更新.META.表,这样,客户端就能知道有新的region。在hdfs中重新排列目录结构和数据文件。split是一个复杂的操作。在split region的时候会记录当前执行的状态,当出错的时候,会根据状态进行回滚。下图表示split中,执行的过程。(红色线表示region server 或者master的操作,绿色线表示client的操作。)

  1. region server 决定split region,第一步,region server在zookeeper中创建在/hbase/region-in-transition/region-name 目录下,创建一个znode,状态为SPLITTING.

  2. 因为master有对 region-in-transition 的znode做监听,所以,mater的得知parent region需要split

  3. region server 在hdfs的parent region的目录下创建一个名为“.splits”的子目录

  4. region server 关闭parent region。强制flush缓存,并且在本地数据结构中标记region为下线状态。如果这个时候客户端刚好请求到parent region,会抛出NotServingRegionException。这时客户端会进行补偿性重试。

  5. region server在.split 目录下分别为两个daughter region创建目录和必要的数据结构。然后创建两个引用文件指向parent regions的文件。

  6. region server 在HDFS中,创建真正的region目录,并且把引用文件移到对应的目录下。

  7. region server 发送一个put的请求到.META.表中,并且在.META.表中设置parent region为下线状态,并且在parent region对应的row中两个daughter region的信息。但是这个时候在.META.表中daughter region 还不是独立的row。这个时候如果client scan .META.表,会发现parent region正在split,但是client还看不到daughter region的信息。当这个put 成功之后,parent region split会被正在的执行。如果在 RPC 成功之前 region server 就失败了,master和下次打开parent region的region server 会清除关于这次split的脏状态。但是当RPC返回结果给到parent region ,即.META.成功更新之后,,region split的流程还会继续进行下去。相当于是个补偿机制,下次在打开这个parent region的时候会进行相应的清理操作。

  8. region server 打开两个daughter region接受写操作。

  9. region server 在.META.表中增加daughters A 和 B region的相关信息,在这以后,client就能发现这两个新的regions并且能发送请求到这两个新的region了。client本地具体有.META.表的缓存,当他们访问到parent region的时候,发现parent region下线了,就会重新访问.META.表获取最新的信息,并且更新本地缓存。

  10. region server 更新 znode 的状态为SPLIT。master就能知道状态更新了,master的平衡机制会判断是否需要把daughter regions 分配到其他region server 中。

  11. 在split之后,meta和HDFS依然会有引用指向parent region. 当compact 操作发生在daughter regions中,会重写数据file,这个时候引用就会被逐渐的去掉。垃圾回收任务会定时检测daughter regions是否还有引用指向parent files,如果没有引用指向parent files的话,parent region 就会被删除。

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.