Spark算子进阶和案例讲解

回顾

1、RDD的概念和属性
2、常用算子回顾

今天内容

1、map、mapPartitions、mapPartitionsWithIndex算子区别
2、aggregate算子
3、aggregateByKey算子
4、checkpoint(设置检查点)
5、repartition、coalesce、partitionBy算子区别
6、combineByKey算子
7、其它算子
8、根据基站位置判断用户家庭工作地址案例

教学目标

1、掌握用算子实现函数式编程
2、熟悉checkpoint流程
3、用SparkCore实现案例需求

第一节 map、mapPartitions、mapPartitionsWithIndex

 

第二节 aggregate

第一个参数是分区里的每个元素相加,第二个参数是每个分区的结果再相加
rdd1.aggregate(0)(_+_, _+_)
需求:把每个分区的最大值取出来,再把各分区最大值相加
rdd1.aggregate(0)(math.max(_, _), _+_)
再看初始值设为10的结果
rdd1.aggregate(10)(math.max(_, _), _+_)
再看初始值设为2的结果
rdd1.aggregate(2)(math.max(_, _), _+_)

def func1(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
  iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1.mapPartitionsWithIndex(func1).collect
rdd1.aggregate(0)(math.max(_, _), _ + _)
rdd1.aggregate(5)(math.max(_, _), _ + _)


val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
def func2(index: Int, iter: Iterator[(String)]) : Iterator[String] = {
  iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
}
rdd2.mapPartitionsWithIndex(func2).collect  --查看每个分区的元素
rdd2.aggregate("")(_ + _, _ + _)
查看初始值被应用了几次
rdd2.aggregate("=")(_ + _, _ + _)
如果设了三个分区,初始值被应用了几次?

val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3.mapPartitionsWithIndex(func2).collect  --查看每个分区的元素
每次返回的值不一样,因为executor有时返回的慢,有时返回的快一些
rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)

val rdd4 = sc.parallelize(List("12","23","345",""),2)
rdd4.mapPartitionsWithIndex(func2).collect  --查看每个分区的元素
为什么是01或10? 关键点:"".length是"0",下次比较最小length就是1了
rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)

val rdd5 = sc.parallelize(List("12","23","","345"),2)
rdd5.mapPartitionsWithIndex(func2).collect
rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)

第三节 aggregateByKey

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
需求:统计猫狗耗子各有多少只? 比较两种方法
pairRDD.aggregateByKey(0)(_+_,_+_).collect
pairRDD.reduceByKey(_+_).collect --也能实现

需求:把每个分区每种最多的动物取出来再进行对应的相加
def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
  iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
}
pairRDD.mapPartitionsWithIndex(func2).collect
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect

第四节 checkpoint

checkpoint(以后结合实例再讲)
sc.setCheckpointDir("hdfs://node01:9000/ck")
val rdd = sc.textFile("hdfs://node01:9000/wc").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
rdd.checkpoint
rdd.isCheckpointed
rdd.count
rdd.isCheckpointed
rdd.getCheckpointFile

第五节 repartition、coalesce、partitionBy

repartition(重新分配分区), coalesce((合并)重新分配分区并设置是否shuffle),
	partitionBy(根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区)
val rdd1 = sc.parallelize(1 to 10, 10)
rdd1.repartition(5) --分区调整为5个
rdd1.partitions.length =5
coalesce:调整分区数量,参数一:要合并成几个分区,参数二:是否shuffle,false不会shuffle
val rdd2 = rdd1.coalesce(2, false)
val rdd1 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)), 3)
var rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd2.partitions.length

第六节 combineByKey

偏底层,reduceByKey、aggregateByKey、combineByKey底层都是调用的combineByKeyWithClassTag
combineByKey
val rdd1 = sc.textFile("hdfs://node01:9000/wc").flatMap(_.split(" ")).map((_, 1))
以前是这样调用的
val rdd2 = rdd1.reduceByKey(_+_)
或者是这样
val rdd2 = rdd1.aggregateByKey(0)(_+_,_+_)
现在用这个,x => x:把每一个元素拿出来,(a: Int, b: Int) => a + b:
分区的元素相加,(m: Int, n: Int) => m + n:把每个分区的结果相加
val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd2.collect
上面的combineByKey的应用场景,虽然复杂,但可以实现很多需求
下面每个值多了30,因为有三个分区,各加了10。x代表分区里的第一个值,10只加一次
val rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd3.collect

val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
组合在一起,用拉链,RDD5在前面,谁拉谁?
val rdd6 = rdd5.zip(rdd4)
rdd6.collect
需求:把单身狗放在一起,成双成对放在一起
val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y,
 (m: List[String], n: List[String]) => m ++ n)
画图分析
idea实现

在源码查看reduceByKey、aggregateByKey、combineByKey调用的都是combineByKeyWithClassTag

第七节 其它算子

数组或集合变成一个map
collectAsMap
val rdd = sc.parallelize(List(("a", 1), ("b", 2)))
rdd.collectAsMap
-------------------------------------------------------------------------------------------
countByKey 
val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))
统计相同key的value出现的次数,和reduceByKey比较
rdd1.countByKey
统计相同元素出现的次数
rdd1.countByValue
-------------------------------------------------------------------------------------------
filterByRange
过滤出一个范围的所有的值,以key过滤
val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1)))
val rdd2 = rdd1.filterByRange("c", "d")
rdd2.colllect
-------------------------------------------------------------------------------------------
flatMapValues
val a = sc.parallelize(List(("a", "1 2"), ("b", "3 4")))
以value进行map再压平
rdd3.flatMapValues(_.split(" "))
-------------------------------------------------------------------------------------------
foldByKey
val rdd1 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2)
val rdd2 = rdd1.map(x => (x.length, x))
以key来进行折叠
val rdd3 = rdd2.foldByKey("")(_+_)
val rdd = sc.textFile("hdfs://node01:9000/wc").flatMap(_.split(" ")).map((_, 1))
rdd.foldByKey(0)(_+_)
-------------------------------------------------------------------------------------------
foreachPartition
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
把每个分区的元素取出来,常用作操作分区数据后向数据库写入数据
rdd1.foreachPartition(x => println(x.reduce(_ + _))) 该结果再IDEA里才能 打印出来
-------------------------------------------------------------------------------------------
keyBy
val rdd1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
以参数作为key来生成新的元组
val rdd2 = rdd1.keyBy(_.length)
rdd2.collect
-------------------------------------------------------------------------------------------
keys values
val rdd1 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val rdd2 = rdd1.map(x => (x.length, x))
rdd2.keys.collect
rdd2.values.collect

第八节 根据基站位置判断用户家庭工作地址

通过用户经常连接的基站信息,判断用户的家庭地址和工作地址。

用户连接信息:

 手机号,发生时间,基站ID,事件类型
18101056806,20160327075000,9F36407EAD0629FC166F14DDE7970F68,1
18101056806,20160327081000,9F36407EAD0629FC166F14DDE7970F68,0
18101056806,20160327081100,CC0710CC94ECC657A8561DE549D940E0,1
18101056806,20160327082000,CC0710CC94ECC657A8561DE549D940E0,0
18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1
18101056806,20160327082500,16030401EAFB68F1E3CDF819735E1C66,1
18688888888,20160327170000,16030401EAFB68F1E3CDF819735E1C66,0
18101056806,20160327180000,16030401EAFB68F1E3CDF819735E1C66,0
18688888888,20160327171000,CC0710CC94ECC657A8561DE549D940E0,1
18688888888,20160327171600,CC0710CC94ECC657A8561DE549D940E0,1
18101056806,20160327180500,CC0710CC94ECC657A8561DE549D940E0,1
18101056806,20160327181500,CC0710CC94ECC657A8561DE549D940E0,0
18101056806,20160327182000,9F36407EAD0629FC166F14DDE7970F68,1
18101056806,20160327230000,9F36407EAD0629FC166F14DDE7970F68,0

基站信息:

基站ID,经度,纬度,信号类型
9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6
CC0710CC94ECC657A8561DE549D940E0,116.303955,40.041935,6
16030401EAFB68F1E3CDF819735E1C66,116.296302,40.032296,6

案例代码:

 

附件

xuzhongcn.github.io/spark/02/mobilelocation/log/19735E1C66.log

xuzhongcn.github.io/spark/02/mobilelocation/log/DDE7970F68.log

xuzhongcn.github.io/spark/02/mobilelocation/log/E549D940E0.log

xuzhongcn.github.io/spark/02/mobilelocation/lac_info.txt

作业

1、SparkCore算子熟悉
2、checkpoint实现流程
3、利用所学过的算子实现案例需求

面试题

1、为什么要实现checkpoint以及checkpoint流程
2、aggregate和aggregateByKey算子的区别