RDD转换操作算子 — zip、join zip 用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常 @Test def test { // zip 将两个RDD组合成(
RDD转换操作算子 — zip、join
- zip 用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常
def test{
// zip 将两个RDD组合成(key,value)的形式
val a = sc.makeRDD(1 to 5,2)
val b = sc.makeRDD(Seq("a","b","c","d","e"),2)
val zip1: RDD[(Int, String)] = a.zip(b)
val zip2: RDD[(String, Int)] = b.zip(a)
zip1.collect().foreach(print(_))//(1,a)(2,b)(3,c)(4,d)(5,e)
zip2.collect().foreach(print(_))//(a,1)(b,2)(c,3)(d,4)(e,5)
sc.stop()
}
- join 将两个RDD按照相同的Key来进行连接
val b = sc.parallelize(Seq(("a",1),("d",4),("e",5)))
val join = a.join(b)
val r_join = a.rightOuterJoin(b)
val l_join = a.leftOuterJoin(b)
val f_join = a.fullOuterJoin(b)
join.collect().foreach(println(_))
/**
* (a,(1,1))
*/
r_join.collect().foreach(println(_))
/**
* (a,(Some(1),1))
* (d,(None,4))
* (e,(None,5))
*/
l_join.collect().foreach(println(_))
/**
* (a,(1,Some(1)))
* (b,(2,None))
* (c,(3,None))
*/
f_join.collect().foreach(println(_))
/**
* (a,(Some(1),Some(1)))
* (b,(Some(2),None))
* (c,(Some(3),None))
* (d,(None,Some(4)))
* (e,(None,Some(5)))
*/
- cogroup (otherDataset, [numTasks])是将输入数据集(K, V)和另外一个数据集(K, W)进行cogroup,得到一个格式为(K, Seq[V], Seq[W])的数据集。
val a = sc.parallelize(Seq(("aa",1),("bb",2),("cc",3)))
val b = sc.parallelize(Seq(("ee",10),("dd",20),("aa",30)))
val c = a.cogroup(b).collect()
c.foreach(println(_))
/**
* (cc,(CompactBuffer(3),CompactBuffer()))
* (aa,(CompactBuffer(1),CompactBuffer(30)))
* (dd,(CompactBuffer(),CompactBuffer(20)))
* (ee,(CompactBuffer(),CompactBuffer(10)))
* (bb,(CompactBuffer(2),CompactBuffer()))
*/
}
- lookup 用于(K,V)类型的RDD,指定K值,返回RDD中该K对应的所有V值
def test(): Unit ={
val m = sc.parallelize(Seq(("a",1),("b",2),("b",3),("a",11)))
m.lookup("a").foreach(println(_))
// 1
// 11
m.lookup("b").foreach(println(_))
// 2
// 3
}