欢迎回家
我们一直在改变

Spark RDD 函数方法

(1)map
map函数方法参数:

/**
   * Return a new RDD by applying a function to all elements of this RDD.
   */
  def map[U: ClassTag](f: T => U): RDD[U]

//使用示例

scala> val rdd1=sc.parallelize(Array(1,2,3,4)).map(x=>2*x).collect
rdd1: Array[Int] = Array(2, 4, 6, 8)

(2)filter
方法参数:

/**
   * Return a new RDD containing only the elements that satisfy a predicate.
   */
  def filter(f: T => Boolean): RDD[T]

使用示例

scala> val rdd1=sc.parallelize(Array(1,2,3,4)).filter(x=>x>1).collect
rdd1: Array[Int] = Array(2, 3, 4)

(3)flatMap
方法参数:

/**
   *  Return a new RDD by first applying a function to all elements of this
   *  RDD, and then flattening the results.
   */
  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] 

使用示例:

scala>  val data =Array(Array(1, 2, 3, 4, 5),Array(4,5,6))
data: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5), Array(4, 5, 6))

scala> val rdd1=sc.parallelize(data)
rdd1: org.apache.spark.rdd.RDD[Array[Int]] = ParallelCollectionRDD[2] at parallelize at <console>:23

scala> val rdd2=rdd1.flatMap(x=>x.map(y=>y))
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at flatMap at <console>:25

scala> rdd2.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 4, 5, 6)

(4)mapPartitions(func)

mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。它的函数定义为:

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

f即为输入函数,它处理每个分区里面的内容。每个分区中的内容将以Iterator[T]传递给输入函数f,f的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的。

scala> val a = sc.parallelize(1 to 9, 3)
scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
    var res = List[(T, T)]() 
    var pre = iter.next 
    while (iter.hasNext) {
        val cur = iter.next; 
        res .::= (pre, cur)
        pre = cur;
    } 
    res.iterator
}
scala> a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

上述例子中的函数myfunc是把分区中一个元素和它的下一个元素组成一个Tuple。因为分区中最后一个元素没有下一个元素了,所以(3,4)和(6,7)不在结果中。
mapPartitions还有些变种,比如mapPartitionsWithContext,它能把处理过程中的一些状态信息传递给用户指定的输入函数。还有mapPartitionsWithIndex,它能把分区的index传递给用户指定的输入函数。

(5)mapPartitionsWithIndex

mapPartitionsWithIndex函数是mapPartitions函数的一个变种,它的函数参数如下:

def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]

```
scala> val a = sc.parallelize(1 to 9, 3)
//函数带分区索引,返回的集合第一个元素为分区索引
scala> def myfunc[T](index:T,iter: Iterator[T]) : Iterator[(T,T,T)] = {
    var res = List[(T,T, T)]() 
    var pre = iter.next 
    while (iter.hasNext) {
        val cur = iter.next
        res .::= (index,pre, cur) 
        pre = cur
    } 
    res.iterator
}
scala> a.mapPartitionsWithIndex(myfunc).collect
res11: Array[(Int, Int, Int)] = Array((0,2,3), (0,1,2), (1,5,6), (1,4,5), (2,8,9), (2,7,8))

(6)sample
方法参数:

 /**
   * Return a sampled subset of this RDD.
   *
   * @param withReplacement can elements be sampled multiple times (replaced when sampled out)
   * @param fraction expected size of the sample as a fraction of this RDD's size
   *  without replacement: probability that each element is chosen; fraction must be [0, 1]
   *  with replacement: expected number of times each element is chosen; fraction must be >= 0
   * @param seed seed for the random number generator
   */
  def sample(
      withReplacement: Boolean,
      fraction: Double,
      seed: Long = Utils.random.nextLong): RDD[T] 

使用示例:

scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:21

scala> val smapledA=a.sample(true,0.5)
smapledA: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[13] at sample at <console>:23
scala> smapledA.collect
res12: Array[Int] = Array(3, 3, 3, 5, 6, 8, 8)

scala> val smapledA2=a.sample(false,0.5).collect
smapledA2: Array[Int] = Array(1, 4)

原文链接:https://blog.csdn.net/lovehuangjiaju/article/details/48580863

赞(0)
未经允许不得转载:91coding » Spark RDD 函数方法
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!

 

91CODING 小白轻松上手,大牛稳健进步

关于我们免责声明