当前位置 : 主页 > 网络编程 > net编程 >

spark与pyspark教程(一)

来源:互联网 收集:自由互联 发布时间:2023-09-07
大数据生态圈简介 大数据生态圈可以分为7层,总的可以归纳为数据采集层、数据计算层和数据应用层。 spark 1.简介 spark是一种计算引擎,类似于hadoop架构下mapreduce,与mapreduce不同的是

大数据生态圈简介

大数据生态圈可以分为7层,总的可以归纳为数据采集层、数据计算层和数据应用层。
spark与pyspark教程(一)_大数据

spark

1.简介

spark是一种计算引擎,类似于hadoop架构下mapreduce,与mapreduce不同的是将计算的结果存入hdfs分布式文件系统。spark则是写入内存中,像mysql一样可以实现实时的计算,包括SQL查询。
spark不单单支持传统批量处理应用,更支持交互式查询、流式计算、机器学习、图计算等各种应用,
spark是由scala语言开发,具备python的接口,pyspark。

2.spark组件

spark包含着多个紧密集成的组件,如图所示:

spark与pyspark教程(一)_spark_02

2.1 spark core

实现spark基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。
同时也包含对弹性分布式数据集(RDD),RDD表示分布在多个计算节点上可以并行操作的元素集合。

2.2 spark sql

spark sql用来操作结构化数据的程序包,我们可以使用sql或者hive语言来查询数据。

2.3 spark streaming

spark streaming上对实时数据进行流式计算的组件。例如:在网页服务日志,或者在网络服务中用户提交的状态更新组成的队列。

2.4 mlib

mlib提供机器学习功能程序库,提供多种机器学习算法

2.5 graphx

Graphx用来操作图,可以进行并行的图计算

2.6 集群管理器

Spark 设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计
算。为了实现这样的要求,同时获得最大灵活性,Spark 支持在各种集群管理器。

搭建spark集群

  • 步骤1:搭建hadoop单机和伪分布式环境
  • 步骤2:构造分布式hadoop集群
  • 步骤3:构造分布式spark集群

3.RDD编程

3.1RDD基础

实例1:读取外部数据集,并调用转化操作filter提取包含“python”的字符串,并调用first()行动,返回第一个包含python的字符串。

#初始化SparkContext
import pyspark
from pyspark import SparkContext,SparkConf

#配置应用
conf=SparkConf().setMaster("local").setAppName("My App")
#基于sparkconf创建一个sparkcontext
sc=SparkContext(conf=conf)
#读取外部数据
lines=sc.textFile("README.md")
pythonlines=lines.filter(lambda line:"python" in line)
pythonlines.first()

out:u'## Interactive Python Shell'

实例2:spark的RDD会对每次行动进行重新计算,如果想复用同一个RDD,使用RDD.persist(),将RDD内容保存到内存中

pythonlines.persist
pythonlines.count()
pythonlines.first()

out:u'## Interactive Python Shell'

3.2创建RDD

实例1:将程序中一个已有集合传递给SparkContext的parallelize()

#内部创建数据
lines=sc.parallelize(["pandas","i like pandas"])
#外部读取数据
lines=sc.textFile("/path/to/README.md")

3.3RDD操作

3.3.1 转化操作

实例1:假定有一个日志文件log.txt,内部含若干信息,希望提取出其中的错误信息

inputRDD=sc.textFile("log.txt")
errorsRDD=inputRDD.filter(lambda x:"error" in x)

实例2:打印包含error或warning的行数

errorsRDD=inputRDD.filter(lambda x:"error" in x)
warningsRDD=inputRDD.filter(lamdba x:"warning" in x)
badlinesRDD=errorsRDD.union(warningsRDD)

spark与pyspark教程(一)_python_03

3.3.2 行动操作

实例1:输出badlinesRDD的一些信息,count()返回计数结果,take()收集RDD部分元素,collect()获取整个RDD数据

print("Input had"+badlinesRDD.count()+"concerning lines")
print("here are 10 examples:")
for line in badlinesRDD.take(10):
print line

3.4向spark传递函数

实例1:

#1
word=rdd.filter(lambda s:"error" in s)

#2
def containserrors(s):
return "error" in s
word=rdd.filter(containserror)

实例2:

class wordfunctions(object):
def getmatchesnoreference(self,rdd):
query=self.query
return rdd.filter(lambda x:query in x)

3.5常见转化操作和行动操作

3.5.1 基本RDD

map()和filter()
实例1:计算RDD中各值的平方

nums=sc.parallelize([1,2,3,4])
squared=nums.map(lambda x:x*x).collect()
for num in squared:
print "%i "(num)

实例2:使用flatMap()将行数据划分为单词

lines=sc.parallelize(["hello world","hi"])
words=lines.flatMap(lambda line:line.split(" "))
words.first()

其他转化操作:
集合操作

spark与pyspark教程(一)_python_04
RDD笛卡儿积
spark与pyspark教程(一)_python_05
转化操作列表

函数名

目的

示例

结果

map()

将函数应用于RDD 中的每个元素,将返回值构成新的RDD

rdd.map(x => x + 1)

{2, 3, 4, 4}

flatMap()

将函数应用于RDD 中的每个元素,将返回的迭代器的所有内容构成新的RDD。通常用来切分单词

rdd.flatMap(x => x.to(3))

{1, 2, 3, 2, 3, 3, 3}

filter()

返回一个由通过传给filter()的函数的元素组成的RDD

rdd.filter(x => x != 1)

{2, 3, 3}

distinct()

去重

rdd.distinct()

{1, 2, 3}

sample(withReplacement,fraction,[seed])

对RDD采样,以及是否替换

rdd.sample(false, 0.5)

非确定的

union()

生成一个包含两个RDD 中所有元素的RDD

rdd.union(other)

{1, 2, 3, 3, 4, 5}

intersection()

求两个RDD 共同的元素的RDD

rdd.intersection(other)

{3}

subtract()

移除一个RDD 中的内容(例如移除训练数据)

rdd.subtract(other)

{1, 2}

cartesian()

与另一个RDD 的笛卡儿积

rdd.cartesian(other)

{(1, 3), (1, 4), (3, 5)}

行动操作列表

函数名

目的

示例

结果

collect()

返回RDD中的所有元素

rdd.collect()

{1, 2, 3, 3}

count()

RDD中的元素个数

rdd.count()

4

countByValue()

各元素在RDD中出现的次数

rdd.countByValue()

{(1, 1),(2, 1),(3, 2)}

take(num)

从RDD中返回num个元素

rdd.take(2)

{1, 2}

top(num)

从RDD中返回最前面的num个元素

rdd.top(2)

{3, 3}

takeOrdered(num)(ordering)

从RDD中按照提供的顺序返回最前面的num 个元素

rdd.takeOrdered(2)(myOrdering)

{3, 3}

takeSample(withReplacement,num,[seed])

从RDD中返回任意一些元素

rdd.takeSample(false, 1)

非确定的

reduce(func)

并行整合RDD中所有数据(例sum)

rdd.reduce((x, y) => x + y)

9

fold(zero)(func)

和reduce() 一样,但是需要提供初始值

rdd.fold(0)((x, y) => x + y)

9

aggregate(zeroValue)(seqOp,combOp)

和reduce() 相似,但是通常返回不同类型的函数

rdd.aggregate((0, 0))((x, y) =>(x._1 + y, x._2 + 1),(x, y) =>(x._1 + y._1, x._2 + y._2))

(9,4)

foreach(func)

对RDD中的每个元素使用给定的函数

rdd.foreach(func)


4.键值对操作

4.1 创建Pair RDD

集合:(key,value)

pairs = lines.map(lambda x: (x.split(" ")[0], x))

对键值对集合{(1, 2), (3, 4), (3, 6)}为例
转化操作:

函数名

目的

示例

结果

reduceByKey(func)

合并具有相同键的值

rdd.reduceByKey((x, y) => x + y)

{(1,2), (3,10)}

groupByKey()

对具有相同键的值进行分组

rdd.groupByKey()

{(1,[2]),(3, [4,6])}

mapValues(func)

对pairRDD中的每个值应用一个函数而不改变键

rdd.mapValues(x => x+1)

{(1,3), (3,5), (3,7)}

keys()

返回一个仅包含键的RDD

rdd.keys()

{1,3,3}

values()

返回一个仅包含值的RDD

rdd.values()

{2,4,6}

sortByKey()

返回一个根据键排序的RDD

rdd.sortByKey()

{(1,2), (3,4), (3,6)}

针对两个pair RDD的转化操作(rdd = {(1, 2), (3, 4), (3, 6)}other = {(3, 9)})

函数名

目的

示例

结果

subtractByKey

删掉RDD中键与otherRDD中的键相同的元素

rdd.subtractByKey(other)

{(1, 2)}

join

对两个RDD进行内连接

rdd.join(other)

{(3, (4, 9)), (3,(6, 9))}

rightOuterJoin

对两个RDD进行连接操作,确保第一个RDD 的键必须存在(右外连接)

rdd.rightOuterJoin(other)

{(3,(Some(4),9)),(3,(Some(6),9))}

leftOuterJoin

对两个RDD进行连接操作,确保第二个RDD 的键必须存在(左外连接)

rdd.leftOuterJoin(other)

{(1,(2,None)), (3,(4,Some(9))), (3,(6,Some(9)))}

cogroup

将两个RDD 中拥有相同键的数据分组到一起

rdd.cogroup(other)

{(1,([2],[])), (3,([4, 6],[9]))}

spark与pyspark教程(一)_大数据_06

import os
import pyspark
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("test_SamShare").setMaster("local[4]")
sc = SparkContext(conf=conf)

# 使用 parallelize方法直接实例化一个RDD
rdd = sc.parallelize(range(1,11),4) # 这里的 4 指的是分区数量
rdd.take(100)
# [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


"""
----------------------------------------------
Transform算子解析
----------------------------------------------
"""
# 以下的操作由于是Transform操作,因为我们需要在最后加上一个collect算子用来触发计算。
# 1. map: 和python差不多,map转换就是对每一个元素进行一个映射
rdd = sc.parallelize(range(1, 11), 4)
rdd_map = rdd.map(lambda x: x*2)
print("原始数据:", rdd.collect())
print("扩大2倍:", rdd_map.collect())
# 原始数据: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 扩大2倍: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

# 2. flatMap: 这个相比于map多一个flat(压平)操作,顾名思义就是要把高维的数组变成一维
rdd2 = sc.parallelize(["hello SamShare", "hello PySpark"])
print("原始数据:", rdd2.collect())
print("直接split之后的map结果:", rdd2.map(lambda x: x.split(" ")).collect())
print("直接split之后的flatMap结果:", rdd2.flatMap(lambda x: x.split(" ")).collect())
# 直接split之后的map结果: [['hello', 'SamShare'], ['hello', 'PySpark']]
# 直接split之后的flatMap结果: ['hello', 'SamShare', 'hello', 'PySpark']

# 3. filter: 过滤数据
rdd = sc.parallelize(range(1, 11), 4)
print("原始数据:", rdd.collect())
print("过滤奇数:", rdd.filter(lambda x: x % 2 == 0).collect())
# 原始数据: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 过滤奇数: [2, 4, 6, 8, 10]

# 4. distinct: 去重元素
rdd = sc.parallelize([2, 2, 4, 8, 8, 8, 8, 16, 32, 32])
print("原始数据:", rdd.collect())
print("去重数据:", rdd.distinct().collect())
# 原始数据: [2, 2, 4, 8, 8, 8, 8, 16, 32, 32]
# 去重数据: [4, 8, 16, 32, 2]

# 5. reduceByKey: 根据key来映射数据
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print("原始数据:", rdd.collect())
print("原始数据:", rdd.reduceByKey(add).collect())
# 原始数据: [('a', 1), ('b', 1), ('a', 1)]
# 原始数据: [('b', 1), ('a', 2)]

# 6. mapPartitions: 根据分区内的数据进行映射操作
rdd = sc.parallelize([1, 2, 3, 4], 2)
def f(iterator):
yield sum(iterator)
print(rdd.collect())
print(rdd.mapPartitions(f).collect())
# [1, 2, 3, 4]
# [3, 7]

# 7. sortBy: 根据规则进行排序
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
print(sc.parallelize(tmp).sortBy(lambda x: x[0]).collect())
print(sc.parallelize(tmp).sortBy(lambda x: x[1]).collect())
# [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
# [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]

# 8. subtract: 数据集相减, Return each value in self that is not contained in other.
x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
y = sc.parallelize([("a", 3), ("c", None)])
print(sorted(x.subtract(y).collect()))
# [('a', 1), ('b', 4), ('b', 5)]

# 9. union: 合并两个RDD
rdd = sc.parallelize([1, 1, 2, 3])
print(rdd.union(rdd).collect())
# [1, 1, 2, 3, 1, 1, 2, 3]

# 10. interp: 取两个RDD的交集,同时有去重的功效
rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5, 2, 3])
rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
print(rdd1.interp(rdd2).collect())
# [1, 2, 3]

# 11. cartesian: 生成笛卡尔积
rdd = sc.parallelize([1, 2])
print(sorted(rdd.cartesian(rdd).collect()))
# [(1, 1), (1, 2), (2, 1), (2, 2)]

# 12. zip: 拉链合并,需要两个RDD具有相同的长度以及分区数量
x = sc.parallelize(range(0, 5))
y = sc.parallelize(range(1000, 1005))
print(x.collect())
print(y.collect())
print(x.zip(y).collect())
# [0, 1, 2, 3, 4]
# [1000, 1001, 1002, 1003, 1004]
# [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]

# 13. zipWithIndex: 将RDD和一个从0开始的递增序列按照拉链方式连接。
rdd_name = sc.parallelize(["LiLei", "Hanmeimei", "Lily", "Lucy", "Ann", "Dachui", "RuHua"])
rdd_index = rdd_name.zipWithIndex()
print(rdd_index.collect())
# [('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)]

# 14. groupByKey: 按照key来聚合数据
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print(rdd.collect())
print(sorted(rdd.groupByKey().mapValues(len).collect()))
print(sorted(rdd.groupByKey().mapValues(list).collect()))
# [('a', 1), ('b', 1), ('a', 1)]
# [('a', 2), ('b', 1)]
# [('a', [1, 1]), ('b', [1])]

# 15. sortByKey:
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
print(sc.parallelize(tmp).sortByKey(True, 1).collect())
# [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

# 16. join:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
print(sorted(x.join(y).collect()))
# [('a', (1, 2)), ('a', (1, 3))]

# 17. leftOuterJoin/rightOuterJoin
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
print(sorted(x.leftOuterJoin(y).collect()))
# [('a', (1, 2)), ('b', (4, None))]

"""
----------------------------------------------
Action算子解析
----------------------------------------------
"""
# 1. collect: 指的是把数据都汇集到driver端,便于后续的操作
rdd = sc.parallelize(range(0, 5))
rdd_collect = rdd.collect()
print(rdd_collect)
# [0, 1, 2, 3, 4]

# 2. first: 取第一个元素
sc.parallelize([2, 3, 4]).first()
# 2

# 3. collectAsMap: 转换为dict,使用这个要注意了,不要对大数据用,不然全部载入到driver端会爆内存
m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
m
# {1: 2, 3: 4}

# 4. reduce: 逐步对两个元素进行操作
rdd = sc.parallelize(range(10),5)
print(rdd.reduce(lambda x,y:x+y))
# 45

# 5. countByKey/countByValue:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print(sorted(rdd.countByKey().items()))
print(sorted(rdd.countByValue().items()))
# [('a', 2), ('b', 1)]
# [(('a', 1), 2), (('b', 1), 1)]

# 6. take: 相当于取几个数据到driver端
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print(rdd.take(5))
# [('a', 1), ('b', 1), ('a', 1)]

# 7. saveAsTextFile: 保存rdd成text文件到本地
text_file = "./data/rdd.txt"
rdd = sc.parallelize(range(5))
rdd.saveAsTextFile(text_file)

# 8. takeSample: 随机取数
rdd = sc.textFile("./test/data/hello_samshare.txt", 4) # 这里的 4 指的是分区数量
rdd_sample = rdd.takeSample(True, 2, 0) # withReplacement 参数1:代表是否是有放回抽样
rdd_sample

# 9. foreach: 对每一个元素执行某种操作,不生成新的RDD
rdd = sc.parallelize(range(10), 5)
accum = sc.accumulator(0)
rdd.foreach(lambda x: accum.add(x))
print(accum.value)
# 45

5.数据读取与保存

spark支持很多种输入输出源,一部分原因spark本身基于hadoop生态圈而构建,特别说spark可以通过HadoopMapReduce所使用的InputFormat和OutputFormat接口访问。

5.1 文本文件

读取文本文件,保存文件

data=sc.textFile("file://home/README.md")
data.saveAsTextFile(outputFile)

5.2 JSON文件

import json
data=input.map(lambdax:json.loads(x))
data.filter(lambda x:x["lovesPandas"]).map(lambda x:json.dumps(x)).saveAsTextFile(outputFile)

5.3 逗号分隔值与制表符分隔值

import csv
import StringIO
def loadRecord(line):
input=StringIO.stringIO(line)
reader=csv.DictReader(input,fieldnames=["name","favouriteAnimal"])
return reader.next()
input=sc.textFile(inputFile).map(loadRecord)


上一篇:C#/VB.NET Word转Text
下一篇:没有了
网友评论