mapreduce快速入门

第一节:mapreduce概述

mapreduce:分布式并行离线计算框架,是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架;Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上;

1.1 mapreduce产生的背景
1.2 mapreduce编程模型
1.3 Mapreduce的几个关键名词
1.4 mapreduce程序运行流程

mapreduce-flow

具体流程说明:

  1. 一个mr程序启动的时候,最先启动的是MRAppMaster,MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程

  2. maptask进程启动之后,根据给定的数据切片范围进行数据处理,主体流程为:

    • 利用客户指定的inputformat来获取RecordReader读取数据,形成输入KV对。
    • 将输入KV(k是文件的行号,v是文件一行的数据)对传递给客户定义的map()方法,做逻辑运算,并将map()方法输出的KV对收集到缓存。
    • 将缓存中的KV对按照K分区排序后不断溢写到磁盘文件
  3. MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据范围(数据分区)

  4. Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储

第二节 maprecue实例开发

2.1 编程步骤
  1. 用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端)
  2. Mapper的输入数据是KV对的形式(KV的类型可自定义)
  3. Mapper的输出数据是KV对的形式(KV的类型可自定义)
  4. Mapper中的业务逻辑写在map()方法中
  5. map()方法(maptask进程)对每一个调用一次
  6. Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
  7. Reducer的业务逻辑写在reduce()方法中
  8. Reducetask进程对每一组相同k的组调用一次reduce()方法
  9. 用户自定义的Mapper和Reducer都要继承各自的父类
  10. 整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象
2.2 经典的wordcount程序编写
  1. 需求:有一批文件(规模为TB级或者PB级),如何统计这些文件中所有单词出现次数

    如有三个文件,文件名是qf_course.txt、qf_stu.txt 和 qf_teacher

    qf_course.txt内容:

    php java linux
    bigdata VR
    C C++ java web
    linux shell

    qf_stu.txt内容:

    tom jim lucy
    lily sally
    andy
    tom jim sally

    qf_teacher内容:

    jerry Lucy tom
    jim
  2. 方案

    • 分别统计每个文件中单词出现次数 - map()
    • 累加不同文件中同一个单词出现次数 - reduce()
  3. 实现代码

    • 创建一个简单的java项目

    • 添加hadoop client依赖的jar

    • 编写代码

      • 自定义一个mapper类

         
      • 自定义一个reduce类

         
      • 编写一个Driver类

         
    • 运行此程序的步骤

      1. 将此程序打包 名为wordcount.jar

        第一步

        wordcount-step1

        第二步

        wordcount-step2

        第三步

        wordcount-step3

      2. 上传wordcount.jar到名为min1机器的/home/hadoop目录下

      3. 在hdfs上创建文件夹“/wordcount/input”,并将三个文件(qf_course.txt、qf_stu.txt 和 qf_teacher)上传到hdfs的“/wordcount/input”目录下

         
      4. 在/home/hadoop下启动wordcount.jar运行

         
      5. 在hadoop的/wordcount/output下生成两个文件 如下:

        ​ _SUCCESS //表示计算成功

        ​ part-r-00000 //处理结果文件

      6. 查看结果

         
2.3 案例-统计最高温度
  1. 需求:求给定日期的最高温度

    待处理数据内容:

    ​ 201701082.6

    ​ 201701066

    ​ 2017020810

    ​ 2017030816.33

    ​ 2017060833.0

    每一行的前8位是日期,从第8位往后是温度

  2. 代码

     
2.4 案例-求平均成绩

待处理数据内容:

​ 名字 语文 数学 英语 ​ lh 92 68 70 ​ zyt 94 88 75 ​ ls 96 78 78 ​ hgw 90 70 56 ​ yxx 80 88 73 ​ hz 90 98 70 ​ xyd 60 88 73 ​ hj 90 58 70 ​ cs 50 58 11

  1. 算每个人的平均成绩?

     
  2. 求每个学科的平均成绩

     
  3. 总平均分每个分数段的人数以及百分比

    如:

    ​ 分数段 人数 占总数的百分比

    ​ <60 1 8% ​ 60-70 2 16% ​ 70-80 5 33% ​ 80-90 2 16% ​ 90-100 3 28%

     
  4. 将三门课程中任意一门不及格的学生过滤出来

     
  5. 统计成材率?

    每一门成绩都大于60分的人数/总人数

     

第三节:一些必要知识

3.1 hadoop集群各主机时间要一致
 

3.2 yarn的聚合日志

​ yarn的聚合日志:job运行的详细记录。 ​ 需要启动jobhistoryserver ​ 需要配置yarn-site.xml

3.3 awk(sed) 和 mr