第四次
1. MapReduce 预备知识
1.1. 工作过程
MapReduce 程序的工作分两个阶段进行:Map 阶段、Reduce 阶段
- Map 阶段:将每个分割的数据传递给映射函数来产生输出值。
- Reduce 阶段:对重排阶段输出值进行汇总。这个阶段结合来自重排阶段值,并返回一个输出值。
2. 示例代码
2.1. 在 pom.xml
中添加依赖
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version></dependency>
2.2. 创建 Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
子类
template | 作用 |
---|---|
KEYIN | mapreduce 所读取到的一行文本的起始偏移量,Long 类型,在 hadoop 中有其自己的序列化类 LongWriteable |
VALUEIN | mapreduce 所读取到的一行文本的内容,hadoop 中的序列化类型为 Text |
KEYOUT | 是用户自定义逻辑处理完成后输出的 KEY ,在此处是单词,Text 类型 |
VALUEOUT | 是用户自定义逻辑输出的 value ,这里是单词出现的次数, LongWriteable 类型 |
2.3. 重写 map
方法
package cn.edu.nnu.mr;
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map( LongWritable key, // 读到内容的位置 Text value, // 读到的内容 Mapper<LongWritable, Text, Text, LongWritable>.Context context // 输出 ) throws IOException, InterruptedException { String[] ss = value.toString().split("[^A-Za-z]"); // 按照非英文字符进行分割 LongWritable one = new LongWritable(1); for (String s : ss) { if (!s.trim().equals("")) { // 输出非空的字符串 context.write( new Text(s.trim()), // KEYOUT one // VALUEOUT ); } } }}
2.4. 创建 Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
子类
template | 作用 |
---|---|
KEYIN | mapper 传入的数据,本例中是单词,Text 类型 |
VALUEIN | mapper 传入的数据,本例中是单词出现的次数,LongWriteable 类型 |
KEYOUT | 输出结果,在此处是单词,Text 类型 |
VALUEOUT | 输出结果,这里是合并后单词出现的次数,LongWriteable 类型 |
2.5. 重写 reduce
方法
package cn.edu.nnu.mr;
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce( Text key, // 参数 Iterable<LongWritable> values, // 迭代器 Reducer<Text, LongWritable, Text, LongWritable>.Context context // 输出 ) throws IOException, InterruptedException { long sum = 0L; for (LongWritable lw : values) { sum += lw.get(); } context.write( key, // KEYOUT new LongWritable(sum) // VALUEOUT ); }}
2.6. 主程序
package cn.edu.nnu.mr;
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountApp { public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); // 创建任务 Job job = Job.getInstance(conf); // 设置任务的主程序、Mapper 类、Reducer 类 job.setJarByClass(WordCountApp.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 设置输出数据的键、值类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 设置任务的输入、输出目录 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 执行任务 try { job.waitForCompletion(true); } catch (Exception ex) { System.out.println(ex); } }}
2.7. 生成 jar 包
2.7.1. 如果项目中已经打过包,那么需要先清理项目:maven -> clean
2.7.2. 双击 package
生成 jar 包
2.7.3. 将 jar 包上传至虚拟机
2.7.4. 使用命令运行刚刚写的程序
命令中 /test1127
和 /output
分别对应代码中的 args[0]
和 args[1]
,运行后会将 /test1127
目录下的所有文件进行词频统计。
hadoop jar bd-1.0-SNAPSHOT.jar cn.edu.nnu.mr.WordCountApp /test1127 /output
2.7.5. 查看结果
成功后查看 /output
目录,其中 _SUCCESS
表示操作成功,part-r-00000
文件中存放的是最终结果,通过命令查看统计结果
hadoop fs -cat /output/part-r-00000
3. 示例:统计天气
3.1. 数据文件
DATE | HOUR | COND | PRES | HUM | TMP | AQI | PM2.5 | PM10 |
---|---|---|---|---|---|---|---|---|
20160101 | 00 | 霾 | 1024 | 67 | -2 | 212 | 168 | 158 |
20160101 | 01 | 霾 | 1024 | 70 | -3 | 209 | 164 | 132 |
数据文件以 ,
为分隔符,为 csv
格式文件,在读取后需要通过 .split(",")
来进行分割。同时,温度值有些可能会有无效数据 N/A
,因此需要将这些无效数据跳过。
3.2. 将数据文件上传到 hadoop 系统中
将本地文件上传到系统中。
// 该部分为封装过的方法mkdir("/test1127"); // 创建目录put("D:\\iso\\data\\weather.txt", "/test1127/weather.txt"); // 上传文件
3.3. 修改代码 并执行数据统计
3.3.1. 修改代码
/* File: WeatherMapper */package cn.edu.nnu.mr;
import ...
public class WeatherMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map( LongWritable key, // 读到内容的位置 Text value, // 读到的内容 Mapper<LongWritable, Text, Text, LongWritable>.Context context // 输出 ) throws IOException, InterruptedException { String[] ss = value.toString().split(","); // 按照 "," 拆分 if (ss[0].equals("DATE")) { return; } // 日期作为 key,温度为 value try { context.write( new Text(ss[0]), new LongWritable(Long.parseLong(ss[5])) ); } catch (Exception ex) { } }}
/* File: WeatherReducer */package cn.edu.nnu.mr;
import ...
public class WeatherReducer extends Reducer<Text, LongWritable, Text, DoubleWritable> { @Override protected void reduce( Text key, // 参数 Iterable<LongWritable> values, // 迭代器 Reducer<Text, LongWritable, Text, DoubleWritable>.Context context // 输出 ) throws IOException, InterruptedException { long sum = 0L; long cnt = 0L; for (LongWritable lw : values) { cnt++; sum += lw.get(); } context.write( key, // KEYOUT new DoubleWritable(sum * 1.0 / cnt) // VALUEOUT ); }}
package cn.edu.nnu.mr;
import ...
public class WeatherApp { public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); // 创建任务 Job job = Job.getInstance(conf); // 设置任务的主程序、Mapper 类、Reducer 类 job.setJarByClass(WeatherApp.class); job.setMapperClass(WeatherMapper.class); job.setReducerClass(WeatherReducer.class); // 设置输出数据的键、值类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); // 设置任务的输入、输出目录 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 执行任务 try { job.waitForCompletion(true); } catch (Exception ex) { System.out.println(ex); } }}
3.3.2. 打包后上传到 master 主机运行
hadoop jar bd-1.0-SNAPSHOT.jar cn.edu.nnu.mr.WeatherApp /test1127/weather.txt /output/weatherhadoop fs -cat /output/weather/part-r-00000
4. 作业 1:每个月的平均温度
4.1. 代码
/* File: MonthWeatherMapper */package cn.edu.nnu.mr;
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MonthWeatherMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map( LongWritable key, // 读到内容的位置 Text value, // 读到的内容 Mapper<LongWritable, Text, Text, LongWritable>.Context context // 输出 ) throws IOException, InterruptedException { String[] ss = value.toString().split(","); // 按照 "," 拆分 if (ss[0].equals("DATE")) { return; } // 月份作为 key,温度为 value String yymm = ss[0].substring(0, 6); // 取前 6 位字串 try { context.write( new Text(yymm), // 年月 201601 new LongWritable(Long.parseLong(ss[5])) // 温度 ); } catch (Exception ex) { } }}
/* File: MonthWeatherReducer */package cn.edu.nnu.mr;
import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MonthWeatherReducer extends Reducer<Text, LongWritable, Text, DoubleWritable> { @Override protected void reduce( Text key, // 参数 Iterable<LongWritable> values, // 迭代器 Reducer<Text, LongWritable, Text, DoubleWritable>.Context context // 输出 ) throws IOException, InterruptedException { long sum = 0L; long cnt = 0L; for (LongWritable lw : values) { cnt++; sum += lw.get(); } context.write( key, // KEYOUT new DoubleWritable(sum * 1.0 / cnt) // VALUEOUT ); }}
/* File: MonthWeatherApp */package cn.edu.nnu.mr;
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MonthWeatherApp { public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); // 创建任务 Job job = Job.getInstance(conf); // 设置任务的主程序、Mapper 类、Reducer 类 job.setJarByClass(MonthWeatherApp.class); // 修改类名 job.setMapperClass(MonthWeatherMapper.class); job.setReducerClass(MonthWeatherReducer.class); // 设置输出数据的键、值类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); // 设置任务的输入、输出目录 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 执行任务 try { job.waitForCompletion(true); } catch (Exception ex) { System.out.println(ex); } }}
4.2. 上传 jar 包
4.3. 运行
hadoop jar bd-1.0-SNAPSHOT.jar cn.edu.nnu.mr.MonthWeatherApp /test1127/weather.txt /output/monthweatherhadoop fs -cat /output/monthweather/part-r-00000
每个月的平均温度结果如下
年月 | 平均温度 |
---|---|
201601 | -3.808199121522694 |
201602 | 1.680161943319838 |
201603 | 9.741197183098592 |
201604 | 16.772661870503597 |
201605 | 21.534170153417016 |
201606 | 25.697014925373136 |
5. 中文统计
5.1. 安装 Jieba
依赖
5.1.1. 在 pom.xml
中添加依赖
<dependency> <groupId>com.huaban</groupId> <artifactId>jieba-analysis</artifactId> <version>1.0.2</version></dependency>
重新加载 Maven 项目,此时能够自动安装 Jieba
5.1.2. 尝试使用 Jieba 分词
package demo;
import com.huaban.analysis.jieba.JiebaSegmenter;import com.huaban.analysis.jieba.SegToken;
import java.util.List;
public class Test3 { public static void main(String[] args) { String str = "老师好我叫何同学"; JiebaSegmenter jieba = new JiebaSegmenter(); List<SegToken> tokens = jieba.process(str, JiebaSegmenter.SegMode.INDEX); for (SegToken st: tokens) { System.out.println(st.word); } }}
5.2. 尝试处理中文文件
5.2.1. 将 maven 自动下载的 Jieba 包上传到 master 主机上的以下目录
/home/lc/hadoop/share/hadoop/common/lib/
5.2.2. 将本地一个中文文本文件上传到 hdfs
put("D:\\iso\\data\\chinese.txt", "/test1127/chinese.txt");
5.2.3. 重写 CnWordCountMapper
CnWordCountReducer
CnWordCountApp
/* File: CnWordCountMapper */package cn.edu.nnu.mr;
import ...
public class CnWordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map( LongWritable key, // 读到内容的位置 Text value, // 读到的内容 Mapper<LongWritable, Text, Text, LongWritable>.Context context // 输出 ) throws IOException, InterruptedException { JiebaSegmenter jieba = new JiebaSegmenter(); List<SegToken> tokens = jieba.process(value.toString(), JiebaSegmenter.SegMode.INDEX); LongWritable one = new LongWritable(1); for (SegToken st: tokens) { if (st.word.matches("[\u4e00-\u9fa5]{2,}")) { // 匹配中文词语 context.write(new Text(st.word), one); } } }}
5.2.4. 打包 上传 运行
5.2.5. 下载到本地
get(false, "/output/cnwordcount/part-r-00000", "D:\\iso\\data\\cnword.txt", true);
词语 | 频数 |
---|---|
一个 | 12 |
一些 | 3 |
一切 | 3 |
一切都是 | 1 |
一块 | 2 |
… | … |
5.2.6. 对统计结果进行排序
/* File: Test4 */package demo;
import ...
public class Test4 { public static void main(String[] args) { InputStream is = null; InputStreamReader isr = null; BufferedReader br = null;
try { is = new FileInputStream("D:\\iso\\data\\cnword.txt"); isr = new InputStreamReader(is, StandardCharsets.UTF_8); br = new BufferedReader(isr); List<Word> words = new ArrayList<>(); String line = null; while ((line = br.readLine()) != null) { Word wd = new Word(); String[] ss = line.split("\\s+"); wd.key = ss[0]; wd.value = Integer.parseInt(ss[1]); words.add(wd); } words.sort(new Comparator<Word>() { @Override public int compare(Word o1, Word o2) { return o2.value - o1.value; // 从大到小,降序排序 } }); for (int i = 0; i < words.size() && i < 10; i++) { // 输出前 10 个词语 System.out.println(words.get(i).key + " " + words.get(i).value); } } catch (Exception ex) { } finally { // 关闭输入流 try { br.close(); } catch (Exception ex) { ex.printStackTrace(); } try { isr.close(); } catch (Exception ex) { ex.printStackTrace(); } try { is.close(); } catch (Exception ex) { ex.printStackTrace(); } } }}
词语 | 频数 |
---|---|
康德 | 40 |
哲学 | 24 |
一种 | 22 |
能动 | 22 |
科学 | 17 |
能动性 | 16 |
主义 | 15 |
可以 | 14 |
思维 | 14 |
我们 | 14 |
6. 作业 2:找出所有四大名著中的 20 个热词
数据清洗
如果某一行以中文字符结尾,而不是以标点为结尾,那么有可能这一行刚好被分开,此时词语可能会是断开的。因此,我们需要加上判断,先进行数据清洗,再将文件上传到 hdfs 中。
使用 concat 先将 4 个文件拼接起来,字节流 → 字符流 → 缓冲流
6.1. 编写方法处理字符文件
/* File: ConcateText */package cn.edu.nnu.homework;
import ...
public class ConcateText {
private static Configuration cong = null; private static FileSystem fs = null;
public static void main(String[] args) throws URISyntaxException, IOException { System.setProperty("HADOOP_USER_NAME", "lc"); cong = new Configuration(); fs = FileSystem.get(new URI("hdfs://master:8020"), cong); concat(); }
public static void concat() throws IOException { // 名著列表 String[] files = {"hongloumeng-utf8.txt", "sanguoyanyi-utf8.txt", "shuihu-utf8.txt", "xiyouji-utf8.txt"}; // 获取文件路径 File path = new File("D:\\iso\\data"); // 创建 hdfs 文件 FSDataOutputStream out = fs.create(new Path("/test1127/masterpieces.txt")); // 输入流 InputStream is = null; InputStreamReader isr = null; BufferedReader br = null; // 遍历四大名著 for (String file : files) { try { is = new FileInputStream(new File(path, file)); isr = new InputStreamReader(is, StandardCharsets.UTF_8); br = new BufferedReader(isr); String line = null; while ((line = br.readLine()) != null) { if (line.trim().equals("")) continue; while (true) { // 取出最后一个字符 char lastChar = line.charAt(line.length() - 1); // 如果是汉字,那么再读取一行,加到 line 上 if (lastChar >= 0x4e00 && lastChar <= 0x9fa5) { String newline = br.readLine(); // 如果新的一行存在并且不为空 if (newline != null) { if (!newline.trim().equals("")) { // 就拼上这一行 line += newline.trim(); } } else { break; } } else { break; // 空则退出 } } // 写入流 out.write(line.getBytes(StandardCharsets.UTF_8)); } } catch (Exception ex) { ex.printStackTrace(); } finally { try { br.close(); } catch (Exception ex) { ex.printStackTrace(); } try { isr.close(); } catch (Exception ex) { ex.printStackTrace(); } try { is.close(); } catch (Exception ex) { ex.printStackTrace(); } } } // 关闭输出流 out.close(); }}
运行后可以在 hdfs 看到处理后的文件,大致为 8.5 MB
6.2. 统计词频
使用 jar 包的方法,统计词频
hadoop jar bd-1.0-SNAPSHOT.jar cn.edu.nnu.mr.CnWordCountApp /test1127/masterpieces.txt/output/masterpieces
处理完成后可以看到处理后的文件大约为 1.3 MB
6.3. 下载文件 排序统计
文件下载
排序统计结果
package demo;
import java.io.BufferedReader;import java.io.FileInputStream;import java.io.InputStream;import java.io.InputStreamReader;import java.nio.charset.StandardCharsets;import java.util.ArrayList;import java.util.Comparator;import java.util.List;
public class Test4 { public static void main(String[] args) { InputStream is = null; InputStreamReader isr = null; BufferedReader br = null;
try { is = new FileInputStream("D:\\iso\\data\\mastercount.txt"); isr = new InputStreamReader(is, StandardCharsets.UTF_8); br = new BufferedReader(isr); List<Word> words = new ArrayList<>(); String line = null; while ((line = br.readLine()) != null) { Word wd = new Word(); String[] ss = line.split("\\s+"); wd.key = ss[0]; wd.value = Integer.parseInt(ss[1]); words.add(wd); } words.sort(new Comparator<Word>() { @Override public int compare(Word o1, Word o2) { return o2.value - o1.value; // 从大到小,降序排序 } }); for (int i = 0; i < words.size() && i < 20; i++) { System.out.println(words.get(i).key + "=" + words.get(i).value); } } catch (Exception ex) { } finally { try { br.close(); } catch (Exception ex) { ex.printStackTrace(); } try { isr.close(); } catch (Exception ex) { ex.printStackTrace(); } try { is.close(); } catch (Exception ex) { ex.printStackTrace(); } } }}
词语 | 频数 |
---|---|
行者 | 4468 |
一个 | 4268 |
宝玉 | 3755 |
宋江 | 3696 |
两个 | 3128 |
那里 | 2791 |
只见 | 2352 |
我们 | 2276 |
不知 | 2139 |
如何 | 1912 |
出来 | 1907 |
太太 | 1848 |
不得 | 1826 |
师父 | 1822 |
这里 | 1822 |
八戒 | 1808 |
怎么 | 1764 |
不是 | 1760 |
孔明 | 1748 |
夫人 | 1746 |