当前位置 : 主页 > 编程语言 > 其它开发 >

7、Spark Sql

来源:互联网 收集:自由互联 发布时间:2022-05-30
1.请分析sparkSQL出现的原因,并简述SparkSQL的起源和发展1.1 出现的原因 1.关系数据库已经很流行 2.关系数据库在大数据时代已经不能满足要求•首先,用户需要从不同数据源执行各种操作
1.请分析sparkSQL出现的原因,并简述SparkSQL的起源和发展 1.1 出现的原因

1.关系数据库已经很流行 2.关系数据库在大数据时代已经不能满足要求•首先,用户需要从不同数据源执行各种操作,包括结构化、半结构化和非结构化数据。其次,用户需要执行高级分析,比如机器学习和图像处理•在实际大数据应用中,经常需要融合关系查询和复杂分析算法(比如机器学习或图像处理),但是,缺少这样的系统。

并且Spark SQL填补了这个鸿沟:首先,可以提供DataFrame API,可以对内部和外部各种数据源执行各种关系型操作。其次,可以支持大数据中的大量数据源和数据分析算法Spark SQL可以融合:传统关系数据库的结构化数据管理能力和机器学习算法的数据处理能力

1.2 sparksql的起源和发展

Spark 1.0版本开始,推出了Spark SQL。其实最早使用的,都是Hadoop自己的Hive查询引擎;但是后来Spark提供了Shark;再后来Shark被淘汰,推出了Spark SQL。Shark的性能比Hive就要高出一个数量级,而Spark SQL的性能又比Shark高出一个数量级。

最早来说,Hive的诞生,主要是因为要让那些不熟悉Java工程师,无法深入进行MapReduce编程的数据分析师,能够使用他们熟悉的关系型数据库的SQL模型,来操作HDFS上的数据。因此推出了Hive。Hive底层基于MapReduce实现SQL功能,能够让数据分析人员,以及数据开发人员,方便的使用Hive进行数据仓库的建模和建设,然后使用SQL模型针对数据仓库中的数据进行统计和分析。但是Hive有个致命的缺陷,就是它的底层基于MapReduce,而MapReduce的shuffle又是基于磁盘的,因此导致Hive的性能异常低下。进而出现复杂的SQL ETL,要运行数个小时,甚至数十个小时的情况。

后来,Spark推出了Shark,Shark与Hive实际上还是紧密关联的,Shark底层很多东西还是依赖于Hive,但是修改了内存管理、物理计划、执行三个模块,底层使用Spark的基于内存的计算模型,从而让性能比Hive提升了数倍到上百倍。

然而,Shark还是它的问题所在,Shark底层依赖了Hive的语法解析器、查询优化器等组件,因此对于其性能的提升还是造成了制约。所以后来Spark团队决定,完全抛弃Shark,推出了全新的Spark SQL项目。Spark SQL就不只是针对Hive中的数据了,而且可以支持其他很多数据源的查询。

2.简述RDD和DataFrame的联系和区别 2.1 联系
  • 都是spark中得弹性分布式数据集,轻量级

  • 都是惰性机制,延迟计算

  • 根据内存情况,自动缓存,加快计算速度

  • 都有partition分区概念

  • 众多相同得算子:map flatmap 等等

2.2区别
  • RDD不支持SQL

  • DF每一行都是Row类型,不能直接访问字段,必须解析才行

  • DS每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获 得每一行的信息

  • DataFrame与Dataset均支持spark sql的操作,比如select,group by之类,还 能注册临时表/视窗,进行sql语句操作

  • 可以看出,Dataset在需要访问列中的某个字段时是非常方便的,然而,如果要 写一些适配性很强的函数时,如果使用Dataset,行的类型又不确定,可能是 各种case class,无法实现适配,这时候用DataFrame即Dataset[Row]就能比较 好的解决问题。

3.DateFrame的创建和保存 3.1Pyspark-DataFrame的创建
  • spark.read.text(url)

  • saprk.read.json(url)

  • spark.read.format("text").load("people.txt")

  • spark.read.format("json").load("people.json")

  • 描述从不同文件类型生成DataFrame的区别.

不同之处是:text生成的DataFrame只有value属性;但是json创建的DataFrame会识别到文件中的键值

  • 用相同的txt或json文件,同时创建RDD,比较RDD与DataFrame的区别。

3.2 DataFrame的保存

可以使用spark.write操作,把一个DataFram保存成不同格式的文件,最终会生成一个目录,但是注意不是文件,里面可能会有多个文件和一个SUCCESS,如下:

  • df.write.text(dir)


  • df.write.json(dir)


  • df.write.format("text").save(dir)

  • df.write.format("json").save(dir)

4.选择题

4.1单选(2分)‍关于Shark,下面描述正确的是:c

A.Shark提供了类似Pig的功能

B.Shark把SQL语句转换成MapReduce作业

C.Shark重用了Hive中的HiveQL解析、逻辑执行计划翻译、执行计划优化等逻辑

D.Shark的性能比Hive差很多

4.2单选(2分)‏下面关于Spark SQL架构的描述错误的是:B

A.在Shark原有的架构上重写了逻辑执行计划的优化部分,解决了Shark存在的问题

B.Spark SQL在Hive兼容层面仅依赖HiveQL解析和Hive元数据

C.Spark SQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责

D.Spark SQL执行计划生成和优化需要依赖Hive来完成

4.3单选(2分)‌要把一个DataFrame保存到people.json文件中,下面语句哪个是正确的:A

A.df.write.json("people.json")

B.df.json("people.json")

C.df.write.format("csv").save("people.json")

D.df.write.csv("people.json")

4.4多选(3分)‎Shark的设计导致了两个问题:AC

A.执行计划优化完全依赖于Hive,不方便添加新的优化策略

B.执行计划优化不依赖于Hive,方便添加新的优化策略

C.Spark是线程级并行,而MapReduce是进程级并行,因此,Spark在兼容Hive的实现上存在线程安全问题,导致Shark不得不使用另外一套独立维护的、打了补丁的Hive源码分支

D.Spark是进程级并行,而MapReduce是线程级并行,因此,Spark在兼容Hive的实现上存在线程安全问题,导致Shark不得不使用另外一套独立维护的、打了补丁的Hive源码分支

4.5 多选(3分)‌下面关于为什么推出Spark SQL的原因的描述正确的是:AB

A.Spark SQL可以提供DataFrame API,可以对内部和外部各种数据源执行各种关系操作

B.可以支持大量的数据源和数据分析算法,组合使用Spark SQL和Spark MLlib,可以融合传统关系数据库的结构化数据管理能力和机器学习算法的数据处理能力

C.Spark SQL无法对各种不同的数据源进行整合

D.Spark SQL无法融合结构化数据管理能力和机器学习算法的数据处理能力

4.6多选(3分)‌下面关于DataFrame的描述正确的是:ABCD

A.DataFrame的推出,让Spark具备了处理大规模结构化数据的能力

B.DataFrame比原有的RDD转化方式更加简单易用,而且获得了更高的计算性能

C.Spark能够轻松实现从MySQL到DataFrame的转化,并且支持SQL查询

D.DataFrame是一种以RDD为基础的分布式数据集,提供了详细的结构信息

4.7多选(3分)‏要读取people.json文件生成DataFrame,可以使用下面哪些命令:AC

A.spark.read.json("people.json")

B.spark.read.parquet("people.json")

C.spark.read.format("json").load("people.json")

D.spark.read.format("csv").load("people.json")

5. PySpark-DataFrame各种常用操作 5.1 基于df的操作
  • 打印数据 df.show()默认打印前20条数据

*打印概要 df.printSchema()

  • 查询总行数 df.count()

  • df.head(3) #list类型,list中每个元素是Row类

  • 输出全部行 df.collect() #list类型,list中每个元素是Row类

  • 查询概况 df.describe().show()

  • 取列 df[‘name’], df.name, df[1]

  • 选择 df.select() 每个人的年龄+1

  • 筛选 df.filter() 20岁以上的人员信息

  • 筛选年龄为空的人员信息

  • 分组df.groupBy() 统计每个年龄的人数

  • 排序df.sortBy() 按年龄进行排序

5.2基于spark.sql的操作
  • 创建临时表 df.registerTempTable('people')

  • spark.sql执行SQL语句 spark.sql('select name from people').show()

5.3 pyspark中DataFrame与pandas中DataFrame
  • 分别从文件创建两种DataFrame

> 使用spark创建

> 使用pandas创建

  • 查看两种df的区别

(1) 地址写法不同

(2)pyspark的df要通过操作来查看结果

(3)pandas的df会自动添加索引

  • pandas中DataFrame转换为Pyspark中DataFrame

  • Pyspark中DataFrame转换为pandas中DataFrame

6.从RDD转换得到DataFrame 6.1 利用反射机制推断RDD模式
  • 创建RDD sc.textFile(url).map(),读文件,分割数据项

  • 每个RDD元素转换成 Row

  • 由Row-RDD转换到DataFrame

6.2 使用编程方式定义RDD模式
  • 下面生成“表头”
>>> from pyspark.sql import Row
>>> from pyspark.sql.types import *
>>> 
>>> schemaString = 'name age'
>>> fields = [StructField(field_name,StringType(),True) for field_name in schemaString.split(' ')]
>>> schema = StructType(fields)

  • 下面生成“表中的记录”

    • 创建RDD
    • 转化为Row元素,列名=值
>>> lines = spark.sparkContext.textFile('file:///root/tools/people.txt')
>>> part = lines.map(lambda w:w.split(","))
>>> peoples = part.map(lambda p:Row(p[0],p[1].strip()))
>>> peoples.collect()

  • 下面把“表头”和“表中的记录”拼装在一起
>>> schemaPeople = spark.createDataFrame(people,schema)
>>> schemaPeople.show()
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

>>> schemaPeople.printSchema()
root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)

7.选择题

7.1单选(2分)以下操作中,哪个不是DataFrame的常用操作:D

A.printSchema()

B.select()

C.filter()

D.sendto()

7.2多选(3分)‏从RDD转换得到DataFrame包含两种典型方法,分别是:AB

A.利用反射机制推断RDD模式

B.使用编程方式定义RDD模式

C.利用投影机制推断RDD模式

D.利用互联机制推断RDD模式

7.3多选(3分)‍使用编程方式定义RDD模式时,主要包括哪三个步骤:ABC

A.制作“表头”

B.制作“表中的记录”

C.制作映射表

D.把“表头”和“表中的记录”拼装在一起

上一篇:golang 循环中的 switch 里的 break 与 continue
下一篇:没有了
网友评论