mapreduce:分布式并行离线计算框架,是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架;Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上;
一种分布式计算模型。
MapReduce将这个并行计算过程抽象到两个函数。
一个简单的MapReduce程序只需要指定map()、reduce()、input和output,剩下的事由框架完成。
Job :用户的每一个计算请求称为一个作业。
Task:每一个作业,都需要拆分开了,交由多个主机来完成,拆分出来的执行单位就是任务。
Task又分为如下三种类型的任务:
具体流程说明:
一个mr程序启动的时候,最先启动的是MRAppMaster,MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程
maptask进程启动之后,根据给定的数据切片范围进行数据处理,主体流程为:
MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据范围(数据分区)
Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储
需求:有一批文件(规模为TB级或者PB级),如何统计这些文件中所有单词出现次数
如有三个文件,文件名是qf_course.txt、qf_stu.txt 和 qf_teacher
qf_course.txt内容:
php java linux bigdata VR C C++ java web linux shell
qf_stu.txt内容:
tom jim lucy lily sally andy tom jim sally
qf_teacher内容:
jerry Lucy tom jim
方案
实现代码
创建一个简单的java项目
添加hadoop client依赖的jar
编写代码
自定义一个mapper类
xxxxxxxxxx
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* Maper里面的泛型的四个类型从左到右依次是:
*
* LongWritable KEYIN: 默认情况下,是mr框架所读到的一行文本的起始偏移量,Long, 类似于行号但是在hadoop中有自己的更精简的序列化接口,所以不直接用Long,而用LongWritable
* Text VALUEIN:默认情况下,是mr框架所读到的一行文本的内容,String,同上,用Text
*
* Text KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key,在此处是单词,String,同上,用Text
* IntWritable VALUEOUT:是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词次数,Integer,同上,用IntWritable
*/
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
/**
* map阶段的业务逻辑就写在自定义的map()方法中
* maptask会对每一行输入数据调用一次我们自定义的map()方法
*/
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//将maptask传给我们的一行的文本内容先转换成String
String line = value.toString();
//根据空格将这一行切分成单词
String[] words = line.split(" ");
/**
*将单词输出为<单词,1>
*如<lily,1> <lucy,1> <c,1> <c++,1> <tom,1>
*/
for(String word:words){
//将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发,以便于相同单词会到相同的reduce task
context.write(new Text(word), new IntWritable(1));
}
}
}
自定义一个reduce类
xxxxxxxxxx
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* Reducer里面的泛型的四个类型从左到右依次是:
* Text KEYIN: 对应mapper输出的KEYOUT
* IntWritable VALUEIN: 对应mapper输出的VALUEOUT
*
* KEYOUT, 是单词
* VALUEOUT 是自定义reduce逻辑处理结果的输出数据类型,是总次数
*/
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
/**
* <tom,1>
* <tom,1>
* <linux,1>
* <banana,1>
* <banana,1>
* <banana,1>
* 入参key,是一组相同单词kv对的key
* values是若干相同key的value集合
* 如 <tom,[1,1]> <linux,[1]> <banana,[1,1,1]>
*/
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count=0; //累加单词的出现的次数
for(IntWritable value:values){
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
编写一个Driver类
xxxxxxxxxx
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 相当于一个yarn集群的客户端
* 需要在此封装我们的mr程序的相关运行参数,指定jar包
* 最后提交给yarn
*/
public class WordcountDriver {
/**
* 该类是运行在hadoop客户端的,main一运行,yarn客户端就启动起来了,与yarn服务器端通信
* yarn服务器端负责启动mapreduce程序并使用WordcountMapper和WordcountReducer类
*/
public static void main(String[] args) throws Exception {
if (args == null || args.length == 0) {//此代码需要两个输入参数 第一个参数支持要处理的源文件;第二个参数是处理结果的输出路径
args = new String[2];
args[0] = "hdfs://192.168.18.64:9000/wordcount/input/";//路径都是 hdfs系统的文件路径
args[1] = "hdfs://192.168.18.64:9000/wordcount/output";
}
/**
* 什么也不设置时,如果在安装了hadoop的机器上运行时,自动读取
* /home/hadoop/app/hadoop-2.7.1/etc/hadoop/core-site.xml
* 文件放入Configuration中
*/
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//指定本程序的jar包所在的本地路径
job.setJarByClass(WordcountDriver.class);
//指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
//指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定job的输出结果所在目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
/*job.submit();*/
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
运行此程序的步骤
将此程序打包 名为wordcount.jar
第一步
第二步
第三步
上传wordcount.jar到名为min1机器的/home/hadoop目录下
在hdfs上创建文件夹“/wordcount/input”,并将三个文件(qf_course.txt、qf_stu.txt 和 qf_teacher)上传到hdfs的“/wordcount/input”目录下
xxxxxxxxxx
hadoop fs mkdir -p /wordcount/input
hadoop fs –put qf_course.txt /wordcount/input
hadoop fs –put qf_stu.txt /wordcount/input
hadoop fs –put qf_teacher.txt /wordcount/input
在/home/hadoop下启动wordcount.jar运行
xxxxxxxxxx
hadoop jar wordcount.jar 包名.WordcountDriver /wordcount/input /wordcount/output
在hadoop的/wordcount/output下生成两个文件 如下:
_SUCCESS //表示计算成功
part-r-00000 //处理结果文件
查看结果
xxxxxxxxxx
hadoop fs -cat /wordcount/output/part-r-00000 #结果如下
Hello 4
ketty 2
tom 2
jim 1
word 1
需求:求给定日期的最高温度
待处理数据内容:
201701082.6
201701066
2017020810
2017030816.33
2017060833.0
每一行的前8位是日期,从第8位往后是温度
代码
xxxxxxxxxx
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 求最高温度
* @author lyd
*
* 数据:
*
* 201701082.6
* 201701066
* 2017020810
* 2017030816.33
* 2017060833.0
* 2017050126.6
* 2017050320.9
*
*/
public class HighTem {
public static class MyMapper extends Mapper<Object, Text, Text,Text>{
protected void map(Object key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
String tmp = line.substring(8, line.length());
context.write(new Text(""), new Text(tmp));
/**
"" 201701082.6
"" 201701066
"" 2017020810
"" 2017030816.33
"" 2017060833.0
"" 2017050126.6
"" 2017050320.9
*/
}
}
/**
* 自定义reducer类
* @author lyd
*
*/
public static class MyReducer extends Reducer<Text, Text, Text, Text>{
protected void reduce(Text key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
/**
* "" list(2.6,6,10)
*/
double max = Double.MIN_VALUE;
//获取最大值
for (Text t : value) {
if(max < Double.parseDouble(t.toString())){
max = Double.parseDouble(t.toString());
}
}
context.write(new Text(max+""), new Text(""));
}
}
public static void main(String[] args) {
try {
//获取配置对象
Configuration conf = new Configuration();
//创建job
Job job = new Job(conf, "HighTemp");
//为job设置运行主类
job.setJarByClass(HighTem.class);
//设置map阶段的属性
job.setMapperClass(MyMapper.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
//设置reduce阶段的属性
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交运行作业job 并打印信息
int isok = job.waitForCompletion(true)?0:1;
//退出job
System.exit(isok);
} catch (IOException | ClassNotFoundException | InterruptedException e) {
e.printStackTrace();
}
}
}
待处理数据内容:
名字 语文 数学 英语 lh 92 68 70 zyt 94 88 75 ls 96 78 78 hgw 90 70 56 yxx 80 88 73 hz 90 98 70 xyd 60 88 73 hj 90 58 70 cs 50 58 11
算每个人的平均成绩?
xxxxxxxxxx
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class AvgDemo {
//自定义myMapper
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
//只在map方法运行之前执行一次。(仅执行一次)
protected void setup(Context context)
throws IOException, InterruptedException {
}
Text k = new Text();
Text v = new Text();
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
String [] scores = line.split("\t");
String name = scores[0];
String chinese = scores[1];
String math = scores[2];
String english = scores[3];
double avg = (Integer.parseInt(chinese) + Integer.parseInt(math) +
Integer.parseInt(english)) / ((scores.length-1)*1.0);
k.set(name);
v.set(avg+"");
context.write(k,v);
}
//map方法运行完后执行一次(仅执行一次)
protected void cleanup(Context context)
throws IOException, InterruptedException {
}
}
/*
//自定义myReducer
public static class MyReducer extends Reducer<Text, Text, Text, Text>{
//在reduce方法执行之前执行一次。(仅一次)
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
}
@Override
protected void reduce(Text key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
}
//在reduce方法执行之后执行一次。(仅一次)
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
}
}
*/
/**
* job的驱动方法
* @param args
*/
public static void main(String[] args) {
try {
//1、获取Conf
Configuration conf = new Configuration();
//2、创建job
Job job = Job.getInstance(conf, "model01");
//3、设置运行job的class
job.setJarByClass(AvgDemo.class);
//4、设置map相关属性
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
//5、设置reduce相关属性
/*job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);*/
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//6、提交运行job
int isok = job.waitForCompletion(true) ? 0 : 1;
//退出
System.exit(isok);
} catch (IOException | ClassNotFoundException | InterruptedException e) {
e.printStackTrace();
}
}
}
求每个学科的平均成绩
xxxxxxxxxx
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
每个学科的平均成绩?
语文 数学 英语
76 89 90
* @author lyd
*
*/
public class AvgDemo02 {
//自定义myMapper
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
//只在map方法运行之前执行一次。(仅执行一次)
protected void setup(Context context)
throws IOException, InterruptedException {
}
Text k = new Text();
Text v = new Text();
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
String scores [] = line.split("\t");
String chinese = scores[1];
String math = scores[2];
String english = scores[3];
k.set("_");
v.set(chinese+"_"+math+"_"+english);
context.write(k, v);
}
//map方法运行完后执行一次(仅执行一次)
protected void cleanup(Context context)
throws IOException, InterruptedException {
}
}
//自定义myReducer
public static class MyReducer extends Reducer<Text, Text, Text, Text>{
//在reduce方法执行之前执行一次。(仅一次)
protected void setup(Context context)
throws IOException, InterruptedException {
context.write(new Text("语文"+"\t"+"数学"+"\t"+"英语"), new Text(""));
}
protected void reduce(Text key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
int counter = 0;
double c = 0;
double m = 0;
double e = 0;
for (Text t : value) {
String scores [] = t.toString().split("_");
c += Double.parseDouble(scores[0]);
m += Double.parseDouble(scores[1]);
e += Double.parseDouble(scores[2]);
counter ++;
}
context.write(new Text(c/counter+"\t"+m/counter+"\t"+e/counter), new Text(""));
}
//在reduce方法执行之后执行一次。(仅一次)
protected void cleanup(Context context)
throws IOException, InterruptedException {
}
}
/**
* job的驱动方法
* @param args
*/
public static void main(String[] args) {
try {
//1、获取Conf
Configuration conf = new Configuration();
//2、创建job
Job job = Job.getInstance(conf, "model01");
//3、设置运行job的class
job.setJarByClass(AvgDemo02.class);
//4、设置map相关属性
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
//5、设置reduce相关属性
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//6、提交运行job
int isok = job.waitForCompletion(true) ? 0 : 1;
//退出
System.exit(isok);
} catch (IOException | ClassNotFoundException | InterruptedException e) {
e.printStackTrace();
}
}
}
总平均分每个分数段的人数以及百分比
如:
分数段 人数 占总数的百分比
<60 1 8% 60-70 2 16% 70-80 5 33% 80-90 2 16% 90-100 3 28%
xxxxxxxxxx
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
<60 1 8%
60-70 2 %16
70-80 5 33%
80-90 2 16%
90-100 3 28%
* @author lyd
*
*/
public class AvgDemo03 {
//static int counter = 0;
//自定义myMapper
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
//只在map方法运行之前执行一次。(仅执行一次)
protected void setup(Context context)
throws IOException, InterruptedException {
}
Text k = new Text();
Text v = new Text();
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
String scores [] = line.split("\t");
String chinese = scores[1];
String math = scores[2];
String english = scores[3];
double avg = (Double.parseDouble(chinese) + Double.parseDouble(math)
+ Double.parseDouble(english))/(scores.length-1);
//判断
if(avg < 60){
k.set("<60");
v.set("1");
} else if(avg >= 60 && avg < 70){
k.set("60-70");
v.set("1");
} else if(avg >= 70 && avg < 80){
k.set("70-80");
v.set("1");
} else if(avg >= 80 && avg < 90){
k.set("80-90");
v.set("1");
} else if(avg >= 90 && avg <= 100){
k.set("90-100");
v.set("1");
}
//context.getConfiguration().setInt("counter", counter);
context.write(k, v);
}
//map方法运行完后执行一次(仅执行一次)
protected void cleanup(Context context)
throws IOException, InterruptedException {
}
}
//自定义myReducer
public static class MyReducer extends Reducer<Text, Text, Text, Text>{
//在reduce方法执行之前执行一次。(仅一次)
protected void setup(Context context)
throws IOException, InterruptedException {
context.write(new Text("分数段"), new Text("人数"+"\t"+"百分比"));
}
int totalPerson = 0;
/*int l6 = 0;
int g6l7 = 0;
int g7l8 = 0;
int g8l9 = 0;
int g9l10 = 0;*/
List<String> li = new ArrayList<String>();
protected void reduce(Text key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
/**
* <60 list(1,1)
*/
int i = 0;
for (Text t : value) {
if(key.toString().equals("<60")){
//l6 ++;
i ++ ;
} else if (key.toString().equals("60-70")){
//g6l7 ++;
i ++ ;
} else if (key.toString().equals("70-80")){
//g7l8 ++ ;
i ++ ;
} else if (key.toString().equals("80-90")){
//g8l9 ++;
i ++ ;
} else if (key.toString().equals("90-100")){
//g9l10 ++;
i ++ ;
}
totalPerson ++ ;
}
li.add(key.toString()+"_"+i);
//context.getConfiguration().get("counter");
}
//在reduce方法执行之后执行一次。(仅一次)
protected void cleanup(Context context)
throws IOException, InterruptedException {
for (String s : li) {
String l [] = s.split("_");
context.write(new Text(l[0]), new Text(l[1]+"\t"+Double.parseDouble(l[1])/totalPerson*100+"%"));
}
}
}
/**
* job的驱动方法
* @param args
*/
public static void main(String[] args) {
try {
//1、获取Conf
Configuration conf = new Configuration();
//2、创建job
Job job = Job.getInstance(conf, "model01");
//3、设置运行job的class
job.setJarByClass(AvgDemo03.class);
//4、设置map相关属性
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
//5、设置reduce相关属性
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//6、提交运行job
int isok = job.waitForCompletion(true) ? 0 : 1;
//退出
System.exit(isok);
} catch (IOException | ClassNotFoundException | InterruptedException e) {
e.printStackTrace();
}
}
}
将三门课程中任意一门不及格的学生过滤出来
xxxxxxxxxx
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
将三门课程中任意一门不及格的学生过滤出来?
* @author lyd
*
*/
public class GrepDemo {
//static int counter = 0;
//自定义myMapper
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
//只在map方法运行之前执行一次。(仅执行一次)
protected void setup(Context context)
throws IOException, InterruptedException {
}
Text k = new Text();
Text v = new Text();
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
String scores [] = line.split("\t");
String chinese = scores[1];
String math = scores[2];
String english = scores[3];
if(Double.parseDouble(chinese) < 60 || Double.parseDouble(math) < 60 || Double.parseDouble(english) < 60){
context.write(value, new Text(""));
}
}
//map方法运行完后执行一次(仅执行一次)
protected void cleanup(Context context)
throws IOException, InterruptedException {
}
}
/**
* job的驱动方法
* @param args
*/
public static void main(String[] args) {
try {
//1、获取Conf
Configuration conf = new Configuration();
//2、创建job
Job job = Job.getInstance(conf, "model01");
//3、设置运行job的class
job.setJarByClass(GrepDemo.class);
//4、设置map相关属性
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
//5、设置reduce相关属性
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//6、提交运行job
int isok = job.waitForCompletion(true) ? 0 : 1;
//退出
System.exit(isok);
} catch (IOException | ClassNotFoundException | InterruptedException e) {
e.printStackTrace();
}
}
}
统计成材率?
每一门成绩都大于60分的人数/总人数
xxxxxxxxxx
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
统计成材率?每一门成绩都大于60分的人数/总人数
成材率 88%
留级率 12%
* @author lyd
*
*/
public class SuccessDemo {
//static int counter = 0;
//自定义myMapper
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
//只在map方法运行之前执行一次。(仅执行一次)
protected void setup(Context context)
throws IOException, InterruptedException {
}
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
String scores [] = line.split("\t");
String chinese = scores[1];
String math = scores[2];
String english = scores[3];
if(Double.parseDouble(chinese) >= 60 && Double.parseDouble(math) >= 60 && Double.parseDouble(english) >= 60){
context.write(new Text("up"), new Text("1"));
} else {
context.write(new Text("down"), new Text("1"));
}
}
//map方法运行完后执行一次(仅执行一次)
protected void cleanup(Context context)
throws IOException, InterruptedException {
}
}
//自定义myReducer
public static class MyReducer extends Reducer<Text, Text, Text, Text>{
//在reduce方法执行之前执行一次。(仅一次)
protected void setup(Context context)
throws IOException, InterruptedException {
context.write(new Text("分数段"), new Text("人数"+"\t"+"百分比"));
}
int totalPerson = 0;
int u = 0;
protected void reduce(Text key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
for (Text t : value) {
if(key.toString().equals("up")){
u ++;
}
totalPerson ++;
}
}
//在reduce方法执行之后执行一次。(仅一次)
protected void cleanup(Context context)
throws IOException, InterruptedException {
context.write(new Text("成才率"), new Text(u*100.0/totalPerson+"%"));
context.write(new Text("留级率"), new Text((totalPerson-u)*100.0/totalPerson+"%"));
}
}
/**
* job的驱动方法
* @param args
*/
public static void main(String[] args) {
try {
//1、获取Conf
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
//2、创建job
Job job = Job.getInstance(conf, "model01");
//3、设置运行job的class
job.setJarByClass(SuccessDemo.class);
//4、设置map相关属性
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
//5、设置reduce相关属性
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//判断输出目录是否存在,若存在则删除
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path(args[1]))){
fs.delete(new Path(args[2]), true);
}
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//6、提交运行job
int isok = job.waitForCompletion(true) ? 0 : 1;
//退出
System.exit(isok);
} catch (IOException | ClassNotFoundException | InterruptedException e) {
e.printStackTrace();
}
}
}
xxxxxxxxxx
时间同步:
1、date -s ""
2、基于ntp服务的时间同步 (c/s)
3、将配置好的ntp用脚本做定时任务
rpm -q ntp 查询是否安装
ntp配置:
选择作为ntpserver的服务器进行配置
vi /etc/ntp.conf
启动停止命令:
service ntpd status/start/stop/restart
在client端执行:
ntpdate hadoop01
注意:
ntpserver需要启动,而client不能启动
配置文件中/etc/ntp.conf server 127.127.1.0
不要在ntpserver服务器上来同步时间
*/1 * * * * /usr/sbin/ntpdate hadoop01 >> /dev/null
yarn的聚合日志:job运行的详细记录。 需要启动jobhistoryserver 需要配置yarn-site.xml