本文主要介绍Spark的核心知识点,
2. Spark 核心概念
主要介绍Spark核心概念RDD以及相应的API。
2.1 RDD介绍
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。代码中是一个抽象类,它代表一个弹性的、只读的、可分区的、支持并行计算的分布式数据集合,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用。
数据集:RDD 是数据集合的抽象,是复杂物理介质上存在数据的一种逻辑视图。从外 部来看,RDD 的确可以被看待成经过封装,带扩展特性(如容错性)的数据集合。
分布式:RDD的数据可能在物理上存储在多个节点的磁盘或内存中,也就是所谓的多级存储。
弹性:如果数据集的一部分数据丢失,则可以对它进行重建;具有自动容错,位置感知调度和可伸缩性,而容错性是最难实现的。
大部分分布式数据集的容错性有两种:数据检查点(成本高)和记录数据的更新(依赖关系)。
2.2 RDD特点
RDD 表示只读的分区的数据集,对RDD进行改动,只能通过RDD的转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息。RDDs之间存在依赖,RDD的执行是按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化RDD来切断血缘关系。
2.2.1 弹性
- 存储的弹性:内存与磁盘的自动切换;
- 容错的弹性:数据丢失可以自动恢复;
- 计算的弹性:计算出错重试机制;
- 分片的弹性:可根据需要重新分片。
2.2.2 分区
RDD对象实质上是一个元数据结构,存储着Block、Node等映射关系,以及其他元数据信息。一个RDD就是一组分区(Partition),RDD的每个分区Partition对应一个Block, Block可以存储在内存,当内存不够时可以存储到磁盘上。
RDD的数据源也可以存储在HDFS上,数据按 照HDFS分布策略进行分区,HDFS中的一个 Block对应Spark RDD的一个Partition。
2.2.3 只读
RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD。由一个RDD转换到另一个RDD,可以通过丰富的操作算子实现,不再像MapReduce那样只能写map和reduce了。
RDD的操作算子包括两类,一类叫做transformations,它是用来将RDD进行转化,构建RDD的血缘关系;另一类叫做actions,它是用来触发RDD的计算,得到RDD的相关计算结果或者将RDD保存到文件系统中。
2.2.4 依赖
RDDs 通过操作算子进行转换,转换得到的新 RDD 包含了从其他 RDDs 衍生所必需的 信息,RDDs 之间维护着这种血缘关系,也称之为依赖。如下图所示,依赖包括两种,一种 是窄依赖,RDDs 之间分区是一一对应的,另一种是宽依赖,下游 RDD 的每个分区与上游 RDD(也称之为父 RDD)的每个分区都有关,是多对多的关系。
2.2.5 缓存
如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。如下图所示,RDD1经过一系列的转换后得到RDD-n并保存到 HDFS,RDD-1 在这一过程中会有个中间结果, 如果将其缓存到内存,那么在随后的 RDD-1 转换到 RDD-m 这一过程中,就不会计算其之 前的 RDD-0 了。
2.3 RDD编程及其基本操作
在Spark中,RDD被表示为对象,通过对象上的方法调用来对 RDD 进行转换。经过 一系列的 transformations 定义 RDD 之后,就可以调用 actions 触发 RDD 的计算,action 可以是向应用程序返回结果(count, collect 等),或者是向存储系统保存数据(saveAsTextFile 等)。在 Spark 中,只有遇到 action,才会执行 RDD 的计算(即延迟计算),这样在运行时可 以通过管道的方式传输多个转换。
2.3.1 RDD创建
在Spark中创建 RDD 的创建方式可以分为三种:从集合中创建 RDD;从外部存储创建RDD;从其他RDD创建。
- 从集合中创建
从集合中创建 RDD,Spark主要提供了两种函数:parallelize 和 makeRDD。
1 | val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8)) |
从外部存储系统的数据集创建:包括本地的文件系统,还有所有 Hadoop 支持的数据集,比如 HDFS、Cassandra、HBase 等。
从其他RDD创建:一般通过各种transformation操作,来实现不同RDD之间的转换。
2.3.2 RDD的转换(transformations)
RDD整体上分为Value类型和Key-Value类型。
首先介绍Value类型的操作:
- map(func):返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成。
- mapPartitions(func):类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T]=>Iterator[U]。假设有N个元素,有M个分区,那么map函数将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。
- mapPartitionsWithIndex(func):类似于 mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U];
1
2
3
4val rdd = sc.parallelize(Array(1,2,3,4))
val indexRdd = rdd.mapPartitionsWithIndex((index,items)=>(items.map((index,_))))
indexRdd.collect
# res2: Array[(Int, Int)] = Array((0,1), (0,2), (1,3), (1,4))
map()和mapPartition()的区别:
map():每次处理一条数据。
mapPartition():每次处理一个分区的数据,这个分区的数据处理完后,原 RDD 中分区的 数据才能释放,可能导致 OOM。
- flatMap(func):类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func 应该返回一个序列,而不是单一元素)。多用来将嵌套的数据『打平』。
1
2
3
4
5
6
7
8
9
10比如一个包含三行内容的数据文件“README.md”。
a b c
d
经过以下转换过程:
val textFile = sc.textFile("README.md")
textFile.flatMap(_.split(" "))
其实就是经历了以下转换:
["a b c", "", "d"] => [["a","b","c"],[],["d"]] => ["a","b","c","d"]
# 在这个示例中,flatMap就把包含多行数据的RDD,即[“a b c”, “”, “d”] ,转换为了一个包含多个单词的集合。实际上,flatMap相对于map多了的是[[“a”,”b”,”c”],[],[“d”]] => [“a”,”b”,”c”,”d”]这一步。
map(func)函数会对每一条输入进行指定的func操作,然后为每一条输入返回一个对象;而flatMap(func)也会对每一条输入进行执行的func操作,然后每一条输入返回一个相对,但是最后会将所有的对象再合成为一个对象;从返回的结果的数量上来讲,map返回的数据对象的个数和原来的输入数据是相同的,而flatMap返回的个数则是不同的。
glom:将每一个分区形成一个数组,形成新的 RDD 类型时 RDD[Array[T]]。
groupBy(func):分组,按照传入函数的返回值进行分组。将相同的 key 对应的值放入一个迭代 器。
filter(func):过滤。返回一个新的 RDD,该 RDD 由经过 func 函数计算后返回值为 true 的输入元素组成。
sample(withReplacement, fraction, seed):以指定的随机种子随机抽样出数量为 fraction 的数据,withReplacement 表示是抽 出的数据是否放回,true 为有放回的抽样,false 为无放回的抽样,seed 用于指定随机数生成器种子。
distinct([numTasks])):对源RDD进行去重后返回一个新的RDD。默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它。
coalesce(numPartitions):缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
repartition(numPartitions):根据分区数,重新通过网络随机洗牌所有数据。
coalesce和repartition的区别
- coalesce 重新分区,可以选择是否进行 shuffle 过程。由参数 shuffle: Boolean = false/true决定。
- repartition 实际上是调用的 coalesce,进行 shuffle。源码如下:
1
2
3>def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
>}
sortBy(func,[ascending], [numTasks]):使用 func 先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。
pipe(command, [envVars]):管道,针对每个分区,都执行一个 shell 脚本,返回输出的 RDD。(注意:脚本需要放在 Worker 节点可以访问到的位置)
除了单Value的类型,还有很多情况下需要进行双Value的操作:
- union(otherDataset):对源RDD和参数RDD求并集后返回一个新的RDD。
- subtract (otherDataset):计算差的一种函数,去除两个RDD中相同的元素,不同的 RDD将保留下来。
- intersection(otherDataset):对源RDD和参数RDD求交集后返回一个新的RDD。
- cartesian(otherDataset):笛卡尔积(尽量避免使用),两个RDD中每个元素间相互组合,产生新的RDD。
- zip(otherDataset):将两个 RDD 组合成 Key/Value 形式的 RDD,这里默认两个 RDD 的 partition 数量 以及元素数量都相同,否则会抛出异常。
接下来介绍Key-Value类型的操作:
- partitionBy:对pairRDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致 的话就不进行分区,否则会生成ShuffleRDD,即会产生shuffle过程。
- reduceByKey(func, [numTasks]):在一个(K,V)的 RDD 上调用,返回一个(K,V)的 RDD,使用指定的 reduce 函数,将相同 key 的值聚合到一起,reduce 任务的个数可以通过第二个可选的参数来设置。
- groupByKey:groupByKey也是对每个key进行操作,但只生成一个seq。
reduceByKey、groupByKey区别:
reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是 RDD[k,v]。下图可以看到,在数据对被搬移前,同一机器上同样的key先进行预聚合,然后在每个分区上被再次调用来将所有值reduce成最终结果。因此reduceByKey函数更适合使用在大数据集上。
groupByKey:按照key进行分组,直接进行shuffle。当调用 groupByKey时,所有的键值对(key-value pair) 都会被移动,在网络上传输这些数据非常没必要,因此避免使用 GroupByKey。
- aggregateByKey:(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U):在kv对的RDD中,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。
(1)zeroValue:给每一个分区中的每一个 key 一个初始值;
(2)seqOp:函数用于在每一个分区中用初始值逐步迭代 value;
(3)combOp:函数用于合并每个分区中的结果。
下面是一个具体例子:
- foldByKey,参数(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
aggregateByKey的简化操作,seqop和combop相同。
- combineByKey,参数(createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C),作用是针对相同K,将V合并成一个集合。
(1)createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过, 要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作 createCombiner()的函数来创建那个键对应的累加器的初始值。
(2)mergeValue: 如果这是一个在处理当前分区之前已经遇到的键,它会使用 mergeValue()方法将该键的 累加器对应的当前值与这个新的值进行合并。
(3)mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两 个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分 区的结果进行合并。
具体例子如下:
sortByKey([ascending], [numTasks]):在一个(K,V)的 RDD 上调用,K 必须实现 Ordered接口,返回一个按照key进行排序的(K,V)的RDD。
mapValues:针对于(K,V)形式的类型只对V进行操作。
join(otherDataset, [numTasks]):在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素对在 一起的(K,(V,W))的 RDD。
cogroup(otherDataset, [numTasks]):在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable
,Iterable ))类 型的 RDD。和join的不同点在于返回的是迭代器。
2.3.3 RDD的Action操作
reduce(func):通过 func 函数聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据。
collect():在驱动程序中,以数组的形式返回数据集的所有元素。
count():返回RDD中元素的个数。
first()返回RDD中的第一个元素。
take(n):返回一个由 RDD 的前 n 个元素组成的数组。
takeOrdered(n):返回该 RDD 排序后的前 n 个元素组成的数组。
aggregate,参数:(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U),aggregate 函数将每个分区里面的元素通过 seqOp 和初始值进行聚合,然后用 combine 函数将每个分区的结果和初始值(zeroValue)进行 combine 操作。这个函数最终返回 的类型不需要和 RDD 中元素类型一致。
fold(num)(func):折叠操作,aggregate的简化操作,seqop和combop一样。
saveAsTextFile(path):将数据集的元素以 textfile 的形式保存到 HDFS 文件系统或者其他支持的文件系统, 对于每个元素,Spark 将会调用 toString 方法,将它装换为文件中的文本。
saveAsSequenceFile(path):将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录下,可以使 HDFS 或者其他 Hadoop 支持的文件系统。
saveAsObjectFile(path):用于将 RDD 中的元素序列化成对象,存储到文件中。
countByKey():针对(K,V)类型的 RDD,返回一个(K,Int)的 map,表示每一个 key 对应的元素个数。
foreach(func):在数据集的每一个元素上,运行函数 func 进行更新。
2.4 RDD依赖关系
2.4.1 宽窄依赖
RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据 信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和 恢复丢失的数据分区。
RDD 和它依赖的父 RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。
- 窄依赖指的是每一个父 RDD 的 Partition 最多被子 RDD 的一个 Partition 使用,窄依赖 我们形象的比喻为独生子女:
- 宽依赖指的是多个子 RDD 的 Partition 会依赖同一个父 RDD 的 Partition,会引起 shuffle,总结:宽依赖我们形象的比喻为超生:
2.4.2 DAG以及任务划分
DAG(Directed Acyclic Graph)叫做有向无环图,原始的 RDD 通过一系列的转换就就 形成了 DAG,根据 RDD 之间的依赖关系的不同将 DAG 划分成不同的 Stage,对于窄依 赖,partition 的转换处理在 Stage 中完成计算。对于宽依赖,由于有 Shuffle 的存在,只能 在 parent RDD 处理完成后,才能开始接下来的计算,因此宽依赖是划分 Stage 的依据。
任务划分
RDD 任务切分中间分为:Application、Job、Stage 和 Task
1)Application:初始化一个 SparkContext 即生成一个 Application
2)Job:一个 Action 算子就会生成一个 Job
3)Stage:根据 RDD 之间的依赖关系的不同将 Job 划分成不同的 Stage,遇到一个宽依赖 则划分一个 Stage。
4)Task:Stage 是一个 TaskSet,将 Stage 划分的结果发送到不同的 Executor 执行即为一个 Task。
注意:Application->Job->Stage->Task 每一层都是1对n的关系。
2.4.3 缓存
RDD 通过 persist 方法或 cache 方法可以将前面的计算结果缓存,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 时,该 RDD 将会 被缓存在计算节点的内存中,并供后面重用。
通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存 存储一份,Spark 的存储级别还有好多种,存储级别在object StorageLevel中定义的。
在存储级别的末尾加上“_2”来把持久化数据存为两份
缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD 的缓存容 错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于 RDD 的一系列转换,丢 失的数据会被重算,由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部 分即可,并不需要重算全部 Partition。
2.4.4 RDD CheckPoint
Spark 中对于数据的保存除了持久化操作之外,还提供了一种检查点的机制,检查点 (本质是通过将 RDD 写入 Disk 做检查点)是为了通过 lineage 做容错的辅助,lineage 过长 会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而 丢失分区,从做检查点的 RDD 开始重做 Lineage,就会减少开销。检查点通过将数据写入 到 HDFS 文件系统实现了 RDD 的检查点功能。
为当前RDD设置检查点。该函数将会创建一个二进制的文件,并存储到checkpoint目录中,该目录是用Context.setCheckpointDir()设置的。在 checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。
2.5 键值对的RDD分区
Spark 目前支持 Hash 分区和 Range 分区,用户也可以自定义分区,Hash 分区为当前 的默认分区,Spark 中分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 过程属于哪个分区和 Reduce 的个数
注意:
(1)只有 Key-Value 类型的 RDD 才有分区的,非 Key-Value 类型的 RDD 分区的值是 None
(2)每个 RDD 的分区 ID 范围:0~numPartitions-1,决定这个值是属于那个分区的。
可以通过使用 RDD 的 partitioner 属性来获取 RDD 的分区方式。它会返回一个 scala.Option 对象, 通过 get 方法获取其中的值。
2.5.1 Hash分区
HashPartitioner 分区的原理:对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则用余数+分区的个数(否则加 0),最后返回的值就是这个key所属的分区ID。
2.5.2 Ranger分区
HashPartitioner 分区弊端:可能导致每个分区中数据量的不均匀,极端情况下会导致 某些分区拥有 RDD 的全部数据。
RangePartitioner 作用:将一定范围内的数映射到某一个分区内,尽量保证每个分区 中数据量的均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分 区内的元素小或者大,但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内 的数映射到某一个分区内。实现过程为:
第一步:先重整个 RDD 中抽取出样本数据,将样本数据排序,计算出每个分区的最 大 key 值,形成一个 Array[KEY]类型的数组变量 rangeBounds;
第二步:判断 key 在 rangeBounds 中所处的范围,给出该 key 值在下一个 RDD 中的 分区 id 下标;该分区器要求 RDD 中的 KEY 类型必须是可以排序的。
2.5.3 自定义分区
要实现自定义的分区器,你需要继承 org.apache.spark.Partitioner 类并实现下面三个方法。
(1)numPartitions: Int:返回创建出来的分区数。
(2)getPartition(key: Any): Int:返回给定键的分区编号(0 到 numPartitions-1)。
(3)equals():Java 判断相等性的标准方法。这个方法的实现非常重要,Spark 需要用这个 方法来检查你的分区器对象是否和其他分区器实例相同,这样 Spark 才可以判断两个 RDD 的分区方式是否相同。
2.6 文件数据读取与保存
2.6.1 Text文件
数据读取: textFile(String)
数据保存:saveAsTextFile(String)
2.6.2 Json文件
如果 JSON 文件中每一行就是一个 JSON 记录,那么可以通过将 JSON 文件当做文本 文件来读取,然后利用相关的 JSON 库对每一条数据进行 JSON 解析。
注意:使用 RDD 读取 JSON 文件处理很复杂,同时 SparkSQL 集成了很好的处理 JSON 文件的方式,所以应用中多是采用 SparkSQL 处理 JSON 文件。
2.6.3 Sequence文件
SequenceFile 文件是 用来存储二进制形式的 key-value 对而设计的一种平面 文件(Flat File)。Spark 有专门用来读取 SequenceFile 的接口。在 SparkContext 中,可以 调用 sequenceFile [ keyClass, valueClass] (path)。
注意:SequenceFile 文件只针对 PairRDD
2.6.4 对象文件
对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制。可以通过 objectFilek,v 函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调 用 saveAsObjectFile() 实现对对象文件的输出。因为是序列化所以要指定类型。