1、RDD的概念和属性 2、常用算子回顾
1、map、mapPartitions、mapPartitionsWithIndex算子区别 2、aggregate算子 3、aggregateByKey算子 4、checkpoint(设置检查点) 5、repartition、coalesce、partitionBy算子区别 6、combineByKey算子 7、其它算子 8、根据基站位置判断用户家庭工作地址案例
1、掌握用算子实现函数式编程 2、熟悉checkpoint流程 3、用SparkCore实现案例需求
xmap和partition的区别:
scala> val rdd2 = rdd1.mapPartitions(_.map(_*10))
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] ...
scala> rdd2.collect
res1: Array[Int] = Array(10, 20, 30, 40, 50, 60, 70)
scala> rdd1.map(_ * 10).collect
res3: Array[Int] = Array(10, 20, 30, 40, 50, 60, 70)
介绍mapPartition和map的区别,引出下面的内容:
mapPartitionsWithIndex
val func = (index: Int, iter: Iterator[(Int)]) => {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1.mapPartitionsWithIndex(func).collect
第一个参数是分区里的每个元素相加,第二个参数是每个分区的结果再相加 rdd1.aggregate(0)(_+_, _+_) 需求:把每个分区的最大值取出来,再把各分区最大值相加 rdd1.aggregate(0)(math.max(_, _), _+_) 再看初始值设为10的结果 rdd1.aggregate(10)(math.max(_, _), _+_) 再看初始值设为2的结果 rdd1.aggregate(2)(math.max(_, _), _+_) def func1(index: Int, iter: Iterator[(Int)]) : Iterator[String] = { iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator } val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2) rdd1.mapPartitionsWithIndex(func1).collect rdd1.aggregate(0)(math.max(_, _), _ + _) rdd1.aggregate(5)(math.max(_, _), _ + _) val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2) def func2(index: Int, iter: Iterator[(String)]) : Iterator[String] = { iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator } rdd2.mapPartitionsWithIndex(func2).collect --查看每个分区的元素 rdd2.aggregate("")(_ + _, _ + _) 查看初始值被应用了几次 rdd2.aggregate("=")(_ + _, _ + _) 如果设了三个分区,初始值被应用了几次? val rdd3 = sc.parallelize(List("12","23","345","4567"),2) rdd3.mapPartitionsWithIndex(func2).collect --查看每个分区的元素 每次返回的值不一样,因为executor有时返回的慢,有时返回的快一些 rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y) val rdd4 = sc.parallelize(List("12","23","345",""),2) rdd4.mapPartitionsWithIndex(func2).collect --查看每个分区的元素 为什么是01或10? 关键点:"".length是"0",下次比较最小length就是1了 rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) val rdd5 = sc.parallelize(List("12","23","","345"),2) rdd5.mapPartitionsWithIndex(func2).collect rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2) 需求:统计猫狗耗子各有多少只? 比较两种方法 pairRDD.aggregateByKey(0)(_+_,_+_).collect pairRDD.reduceByKey(_+_).collect --也能实现 需求:把每个分区每种最多的动物取出来再进行对应的相加 def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = { iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator } pairRDD.mapPartitionsWithIndex(func2).collect pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
checkpoint(以后结合实例再讲) sc.setCheckpointDir("hdfs://node01:9000/ck") val rdd = sc.textFile("hdfs://node01:9000/wc").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_) rdd.checkpoint rdd.isCheckpointed rdd.count rdd.isCheckpointed rdd.getCheckpointFile
repartition(重新分配分区), coalesce((合并)重新分配分区并设置是否shuffle), partitionBy(根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区) val rdd1 = sc.parallelize(1 to 10, 10) rdd1.repartition(5) --分区调整为5个 rdd1.partitions.length =5 coalesce:调整分区数量,参数一:要合并成几个分区,参数二:是否shuffle,false不会shuffle val rdd2 = rdd1.coalesce(2, false) val rdd1 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)), 3) var rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2)) rdd2.partitions.length
偏底层,reduceByKey、aggregateByKey、combineByKey底层都是调用的combineByKeyWithClassTag combineByKey val rdd1 = sc.textFile("hdfs://node01:9000/wc").flatMap(_.split(" ")).map((_, 1)) 以前是这样调用的 val rdd2 = rdd1.reduceByKey(_+_) 或者是这样 val rdd2 = rdd1.aggregateByKey(0)(_+_,_+_) 现在用这个,x => x:把每一个元素拿出来,(a: Int, b: Int) => a + b: 分区的元素相加,(m: Int, n: Int) => m + n:把每个分区的结果相加 val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) rdd2.collect 上面的combineByKey的应用场景,虽然复杂,但可以实现很多需求 下面每个值多了30,因为有三个分区,各加了10。x代表分区里的第一个值,10只加一次 val rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) rdd3.collect val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3) 组合在一起,用拉链,RDD5在前面,谁拉谁? val rdd6 = rdd5.zip(rdd4) rdd6.collect 需求:把单身狗放在一起,成双成对放在一起 val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ++ n) 画图分析 idea实现 在源码查看reduceByKey、aggregateByKey、combineByKey调用的都是combineByKeyWithClassTag
数组或集合变成一个map collectAsMap val rdd = sc.parallelize(List(("a", 1), ("b", 2))) rdd.collectAsMap ------------------------------------------------------------------------------------------- countByKey val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1))) 统计相同key的value出现的次数,和reduceByKey比较 rdd1.countByKey 统计相同元素出现的次数 rdd1.countByValue ------------------------------------------------------------------------------------------- filterByRange 过滤出一个范围的所有的值,以key过滤 val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1))) val rdd2 = rdd1.filterByRange("c", "d") rdd2.colllect ------------------------------------------------------------------------------------------- flatMapValues val a = sc.parallelize(List(("a", "1 2"), ("b", "3 4"))) 以value进行map再压平 rdd3.flatMapValues(_.split(" ")) ------------------------------------------------------------------------------------------- foldByKey val rdd1 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2) val rdd2 = rdd1.map(x => (x.length, x)) 以key来进行折叠 val rdd3 = rdd2.foldByKey("")(_+_) val rdd = sc.textFile("hdfs://node01:9000/wc").flatMap(_.split(" ")).map((_, 1)) rdd.foldByKey(0)(_+_) ------------------------------------------------------------------------------------------- foreachPartition val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3) 把每个分区的元素取出来,常用作操作分区数据后向数据库写入数据 rdd1.foreachPartition(x => println(x.reduce(_ + _))) 该结果再IDEA里才能 打印出来 ------------------------------------------------------------------------------------------- keyBy val rdd1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) 以参数作为key来生成新的元组 val rdd2 = rdd1.keyBy(_.length) rdd2.collect ------------------------------------------------------------------------------------------- keys values val rdd1 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val rdd2 = rdd1.map(x => (x.length, x)) rdd2.keys.collect rdd2.values.collect
通过用户经常连接的基站信息,判断用户的家庭地址和工作地址。
用户连接信息:
手机号,发生时间,基站ID,事件类型 18101056806,20160327075000,9F36407EAD0629FC166F14DDE7970F68,1 18101056806,20160327081000,9F36407EAD0629FC166F14DDE7970F68,0 18101056806,20160327081100,CC0710CC94ECC657A8561DE549D940E0,1 18101056806,20160327082000,CC0710CC94ECC657A8561DE549D940E0,0 18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1 18101056806,20160327082500,16030401EAFB68F1E3CDF819735E1C66,1 18688888888,20160327170000,16030401EAFB68F1E3CDF819735E1C66,0 18101056806,20160327180000,16030401EAFB68F1E3CDF819735E1C66,0 18688888888,20160327171000,CC0710CC94ECC657A8561DE549D940E0,1 18688888888,20160327171600,CC0710CC94ECC657A8561DE549D940E0,1 18101056806,20160327180500,CC0710CC94ECC657A8561DE549D940E0,1 18101056806,20160327181500,CC0710CC94ECC657A8561DE549D940E0,0 18101056806,20160327182000,9F36407EAD0629FC166F14DDE7970F68,1 18101056806,20160327230000,9F36407EAD0629FC166F14DDE7970F68,0
基站信息:
基站ID,经度,纬度,信号类型 9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6 CC0710CC94ECC657A8561DE549D940E0,116.303955,40.041935,6 16030401EAFB68F1E3CDF819735E1C66,116.296302,40.032296,6
案例代码:
xxxxxxxxxx
object MobileLocation {
def main(args: Array[String]): Unit = {
// 模板代码
val conf = new SparkConf()
.setAppName("MobileLocation")
.setMaster("local[2]")
val sc = new SparkContext(conf)
// 获取用户访问基站信息数据
val file: RDD[String] = sc.textFile("mobilelocation/log")
// 切分数据
val phoneAndLacAndTime: RDD[((String, String), Long)] = file.map(line => {
val fields = line.split(",")
val phone = fields(0) // 用户手机号
val time = fields(1).toLong // 时间戳
val lac = fields(2) // 基站ID
val eventType = fields(3).toInt // 事件类型
val time_long = if (eventType == 1) -time else time
((phone, lac), time_long)
})
// 用户在基站停留的时间的总和
val sumedPhoneAndLacAndTime: RDD[((String, String), Long)] = phoneAndLacAndTime.reduceByKey(_+_)
// 把经纬度加到数据里
val lacAndPhoneAndTime: RDD[(String, (String, Long))] = sumedPhoneAndLacAndTime.map(x => {
val phone = x._1._1 // 手机号
val lac = x._1._2 // 基站ID
val time = x._2 // 用户在某个基站停留的总时长
(lac, (phone, time))
})
// 读取基站的经纬度信息
val lacInfo: RDD[String] =
sc.textFile("mobilelocation/lac_info.txt")
// 切分基站对应的经纬度信息
val lacAndXY: RDD[(String, (String, String))] = lacInfo.map(line => {
val fields = line.split(",")
val lac = fields(0) // 基站ID
val x = fields(1) // 经度
val y = fields(2) // 纬度
(lac, (x, y))
})
// 用户在基站停留的时间上加上经纬度
val joined: RDD[(String, ((String, Long), (String, String)))] = lacAndPhoneAndTime.join(lacAndXY)
// 首先把数据重新调整,便于以后的计算
val phoneAndTimeAndXY: RDD[(String, Long, (String, String))] = joined.map(x => {
val phone = x._2._1._1 // 手机号
val lac = x._1 // 基站ID
val time = x._2._1._2 // 停留时长
val xy = x._2._2 // 经纬度
(phone, time, xy)
})
// 按手机号进行分组并按停留的时间进行排序
val sorted: RDD[(String, List[(String, Long, (String, String))])] =
phoneAndTimeAndXY.groupBy(_._1).mapValues(_.toList.sortBy(_._2).reverse.take(2))
val res = sorted.map(_._2)
println(res.collect.toBuffer)
sc.stop()
}
}
xuzhongcn.github.io/spark/02/mobilelocation/log/19735E1C66.log
xuzhongcn.github.io/spark/02/mobilelocation/log/DDE7970F68.log
xuzhongcn.github.io/spark/02/mobilelocation/log/E549D940E0.log
xuzhongcn.github.io/spark/02/mobilelocation/lac_info.txt
1、SparkCore算子熟悉 2、checkpoint实现流程 3、利用所学过的算子实现案例需求
1、为什么要实现checkpoint以及checkpoint流程 2、aggregate和aggregateByKey算子的区别