欢迎回家
我们一直在改变

Spark DataFrame

(1)DataFrame简介

DataFrames在Spark-1.3.0中引入,主要解决使用Spark RDD API使用的门槛,使熟悉R语言等的数据分析师能够快速上手Spark下的数据分析工作,极大地扩大了Spark使用者的数量,由于DataFrames脱胎自SchemaRDD,因此它天然适用于分布式大数据场景。相信在不久的将来,Spark将是大数据分析的终极归宿。

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,与传统RDBMS的表结构类似。与一般的RDD不同的是,DataFrame带有schema元信息,即DataFrame所表示的表数据集的每一列都带有名称和类型,它对于数据的内部结构具有很强的描述能力。因此Spark SQL可以对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率。

DataFrames具有如下特点:

(1)Ability to scale from kilobytes of data on a single laptop to petabytes on a large cluster(支持单机KB级到集群PB级的数据处理)
(2)Support for a wide array of data formats and storage systems(支持多种数据格式和存储系统,如图所示)

(3)State-of-the-art optimization and code generation through the Spark SQL Catalyst optimizer(通过Spark SQL Catalyst优化器可以进行高效的代码生成和优化)
(4)Seamless integration with all big data tooling and infrastructure via Spark(能够无缝集成所有的大数据处理工具)
(5)APIs for Python, Java, Scala, and R (in development via SparkR)(提供Python, Java, Scala, R语言API)

(2)DataFrame 实战

将people.json上传到HDFS上,放置在/data目录下,people.json文件内容如下:

root@sparkslave01:~# hdfs dfs -cat /data/people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

由于json文件中已经包括了列名称的信息,因此它可以直接创建DataFrame

scala> val df = sqlContext.read.json("/data/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

//显示DataFrame完整信息
scala> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
//查看DataFrame元数据信息
scala> df.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
//返回DataFrame某列所有数据
scala> df.select("name").show()
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+
//DataFrame数据过滤
scala> df.filter(df("age") > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
//按年龄分组
scala> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
|null|    1|
|  19|    1|
|  30|    1|
+----+-----+

//注册成表
scala> df.registerTempTable(“people”)
//执行SparkSQL
scala> val teenagers = sqlContext.sql(“SELECT name, age FROM people WHERE age >= 13 AND age <= 19”) teenagers: org.apache.spark.sql.DataFrame = [name: string, age: bigint] //结果格式化输出 scala> teenagers.map(t => “Name: ” + t(0)).collect().foreach(println)
Name: Justin

原文链接:https://blog.csdn.net/lovehuangjiaju/article/details/48661847

赞(0)
未经允许不得转载:91coding » Spark DataFrame
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!

 

91CODING 小白轻松上手,大牛稳健进步

关于我们免责声明