欢迎回家
我们一直在改变

Spark 弹性分布式数据集(RDD)

弹性分布式数据集(RDD,Resilient Distributed Datasets),由Berkeley实验室于2011年提出 。

(1)RDD设计目标

RDD用于支持在并行计算时能够高效地利用中间结果,支持更简单的编程模型,同时也具有像MapReduce等并行计算框架的高容错性、能够高效地进行调度及可扩展性。RDD的容错通过记录RDD转换操作的lineage关系来进行,lineage记录了RDD的家族关系,当出现错误的时候,直接通过lineage进行恢复。RDD最合数据挖掘, 机器学习及图计算,因此这些应用涉及到大家的迭代计算,基于内存能够极大地提升其在分布式环境下的执行效率;RDD不适用于诸如分布式爬虫等需要频繁更新共享状态的任务。

下面给出的是在spark-shell中如何查看RDD的Lineage

//textFile读取hdfs根目录下的README.md文件,然后筛选出所有包括Spark的行
scala> val rdd2=sc.textFile("/README.md").filter(line => line.contains("Spark"))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:21
//toDebugString方法会打印出RDD的家族关系
//可以看到textFile方法会生成两个RDD,分别是HadoopRDD
//MapPartitionsRDD,而filter同时也会生成新的MapPartitionsRDD
scala> rdd2.toDebugString
15/09/20 01:35:27 INFO mapred.FileInputFormat: Total input paths to process : 1
res0: String = 
(2) MapPartitionsRDD[2] at filter at <console>:21 []
 |  MapPartitionsRDD[1] at textFile at <console>:21 []
 |  /README.md HadoopRDD[0] at textFile at <console>:21 []

(2)RDD抽象

RDD在Spark中是一个只读的(val类型)、经过分区的记录集合。RDD在Spark中只有两种创建方式:(1)从存储系统中创建;(2)从其它RDD中创建。从存储中创建有多种方式,可以是本地文件系统,也可以是分布式文件系统,还可以是内存中的数据。
下面的代码演示的是从HDFS中创建RDD

scala> sc.textFile("/README.md")
res1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at textFile at <console>:22

下面的代码演示的是从内存中创建RDD

//内存中定义了一个数组
scala> val data = Array(1, 2, 3, 4, 5)
data: Array[Int] = Array(1, 2, 3, 4, 5)
//通过parallelize方法创建ParallelCollectionRDD
scala> val distData = sc.parallelize(data)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at :23

下面的代码演示的是从其它RDD创建新的RDD

//filter函数将distData RDD转换成新的RDD
scala> val distDataFiletered=distData.filter(e=>e>2)
distDataFiletered: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at filter at <console>:25
//触发action操作(后面我们会讲),查看过滤后的内容
//注意collect只适合数据量较少时使用
scala> distDataFiltered.collect
res3: Array[Int] = Array(3, 4, 5)

(3)RDD编程模型

在前面的例子中,我们已经接触过到如何利用RDD进行编程,前面我们提到的

//filter函数将distData RDD转换成新的RDD
scala> val distDataFiletered=distData.filter(e=>e>2)
//触发action操作(后面我们会讲),查看过滤后的内容
//注意collect只适合数据量较少时使用
scala> distDataFiltered.collect

这段代码它已经给我们解释了RDD编程模型的核心思想:“filter函数将distData RDD转换成新的RDD”,“触发action操作”。也就是说RDD的操作包括Transformations(转换)、Actions两种。

transformations操作会将一个RDD转换成一个新的RDD,需要特别注意的是所有的transformation都是lazy的,如果对scala中的lazy了解的人都知道,transformation之后它不会立马执行,而只是会记住对相应数据集的transformation,而到真正被使用的时候才会执行,例如distData.filter(e=>e>2) transformation后,它不会立即执行,而是等到distDataFiltered.collect方法执行时才被执行,如下图所示

原文链接:https://blog.csdn.net/lovehuangjiaju/article/details/48580863

赞(0)
未经允许不得转载:91coding » Spark 弹性分布式数据集(RDD)
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!

 

91CODING 小白轻松上手,大牛稳健进步

关于我们免责声明