Skip to content
Notes
GitHub

第六次

1. 启动

1.1. 检查 hosts

172.22.0.3 master
172.22.0.4 slave1
172.22.0.5 slave2
172.22.0.3 master
172.22.0.4 slave1
172.22.0.5 slave2

1.2. 启动 hbase 服务

1.2.1. 启动 hdfs

Terminal window
[lc@master ~] start-dfs.sh
Terminal window
[lc@master ~] start-dfs.sh

同时,保证 hdfs 中有一个 hbase 的目录项,若没有,则需要通过下面的命令创建

Terminal window
[lc@master ~] hadoop fs -mkdir /hbase
Terminal window
[lc@master ~] hadoop fs -mkdir /hbase

public/shixun/Pasted_image_20211211092127.png

1.2.2. 启动 hbase

Terminal window
[lc@master bin] ./start-hbase.sh
Terminal window
[lc@master bin] ./start-hbase.sh

public/shixun/Pasted_image_20211211092148.png

2. API 编程

2.1. 通过 maven 在项目中添加 hbase 依赖

pox.xml<dependencies> 中加入下面的项,并刷新 maven

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0</version>
</dependency>

2.2. 尝试通过 API 连接 hbase

2.2.1. 在 hbase shell 中尝试建表

hbase(main):002:0> create 'tab1','f1','f2'
hbase(main):002:0> create 'tab1','f1','f2'

public/shixun/Pasted_image_20211211093734.png

2.2.2. 编写 Java 代码

package cn.edu.nnu;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
public class HbaseDemo {
public static void main(String[] args) {
Configuration conf = null;
Connection conn = null;
HBaseAdmin admin = null;
try {
conf = HBaseConfiguration.create();
// 1. 对于第二个参数,如果在本机的 hosts 中已经配置了 172.22.0.3 master 等等项
// 则可以直接用主机名即可,否则需要填写主机的 IP,例如 `172.22.0.3:2181`
// 2. 如果在 `/hbase/conf/hbase-site.xml` 中 clientPort 写入了 2181 的端口号
// 则按照下面来写即可,否则还需要在主机名后加上端口号,例如 `master:2181`
conf.set("hbase.zookeeper.quorum", "master,slave1,slave2");
conn = ConnectionFactory.createConnection(conf);
admin = (HBaseAdmin) conn.getAdmin();
if (admin.tableExists("tab1")) {
System.out.println("tab1 exists!");
} else {
System.out.println("tab1 noexists!");
}
System.out.println("ok");
} catch (Exception ex) {
System.out.println("fail");
} finally {
try {
assert conn != null;
conn.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
package cn.edu.nnu;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
public class HbaseDemo {
public static void main(String[] args) {
Configuration conf = null;
Connection conn = null;
HBaseAdmin admin = null;
try {
conf = HBaseConfiguration.create();
// 1. 对于第二个参数,如果在本机的 hosts 中已经配置了 172.22.0.3 master 等等项
// 则可以直接用主机名即可,否则需要填写主机的 IP,例如 `172.22.0.3:2181`
// 2. 如果在 `/hbase/conf/hbase-site.xml` 中 clientPort 写入了 2181 的端口号
// 则按照下面来写即可,否则还需要在主机名后加上端口号,例如 `master:2181`
conf.set("hbase.zookeeper.quorum", "master,slave1,slave2");
conn = ConnectionFactory.createConnection(conf);
admin = (HBaseAdmin) conn.getAdmin();
if (admin.tableExists("tab1")) {
System.out.println("tab1 exists!");
} else {
System.out.println("tab1 noexists!");
}
System.out.println("ok");
} catch (Exception ex) {
System.out.println("fail");
} finally {
try {
assert conn != null;
conn.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}

2.2.3. 通过 API 编程添加表和数据

创建表

admin, conf, conn 放入类的静态成员变量,并在 main 中初始化。

private static boolean existTable(String tableName) throws IOException {
if (admin.tableExists(tableName)) {
System.out.println(tableName + "已存在");
return true;
} else {
System.out.println(tableName + "不存在");
return false;
}
}
private static void createTable(String tabName, String... cfs) throws IOException {
if (existTable(tabName)) {
return;
}
HTableDescriptor tab = new HTableDescriptor(
TableName.valueOf(tabName));
for (String cf : cfs) {
HColumnDescriptor colDesc = new HColumnDescriptor(cf); // 创建列
tab.addFamily(colDesc); // 添加列族
}
admin.createTable(tab); // 创建表
System.out.println("Create Success!");
}
// main
createTable("tab2", "cf1", "cf2");
private static boolean existTable(String tableName) throws IOException {
if (admin.tableExists(tableName)) {
System.out.println(tableName + "已存在");
return true;
} else {
System.out.println(tableName + "不存在");
return false;
}
}
private static void createTable(String tabName, String... cfs) throws IOException {
if (existTable(tabName)) {
return;
}
HTableDescriptor tab = new HTableDescriptor(
TableName.valueOf(tabName));
for (String cf : cfs) {
HColumnDescriptor colDesc = new HColumnDescriptor(cf); // 创建列
tab.addFamily(colDesc); // 添加列族
}
admin.createTable(tab); // 创建表
System.out.println("Create Success!");
}
// main
createTable("tab2", "cf1", "cf2");

public/shixun/Pasted_image_20211211102710.png

添加数据
private static void putTest(String tabName, String rowKey, Map<String, List> data)
throws IOException {
if (!existTable(tabName)) { // 判断表是否存在
return;
}
Table tab = conn.getTable(TableName.valueOf(tabName)); // 获取表对象
Put put = new Put(rowKey.getBytes(StandardCharsets.UTF_8));
for (Map.Entry<String, List> entry : data.entrySet()) { // 逐个获取 data 中的数据
String cf = entry.getKey();
List val = entry.getValue();
for (int i = 0; i < val.size(); i += 2) {
put.addColumn( // 添加列
cf.getBytes(StandardCharsets.UTF_8), // 列名
((String) val.get(i)).getBytes(StandardCharsets.UTF_8), // 列项
((String) val.get(i + 1)).getBytes(StandardCharsets.UTF_8) // 数据
);
}
}
tab.put(put); // 放入数据
}
public static void main(String[] args) {
try {
// ... 初始化
Map<String, List> data = new HashMap<>(); // 创建 Map 对象
data.put("cf1", Arrays.asList("name", "lc", "age", "20")); // cf1 列族的项
data.put("cf2", Arrays.asList("java", "100", "hadoop", "50")); // cf2 列族的项
putTest("tab2", "rk001", data); // 静态成员方法添加数据
System.out.println("ok");
} catch (Exception ex) {
System.out.println("fail");
} finally {
// ... 后续处理
}
}
private static void putTest(String tabName, String rowKey, Map<String, List> data)
throws IOException {
if (!existTable(tabName)) { // 判断表是否存在
return;
}
Table tab = conn.getTable(TableName.valueOf(tabName)); // 获取表对象
Put put = new Put(rowKey.getBytes(StandardCharsets.UTF_8));
for (Map.Entry<String, List> entry : data.entrySet()) { // 逐个获取 data 中的数据
String cf = entry.getKey();
List val = entry.getValue();
for (int i = 0; i < val.size(); i += 2) {
put.addColumn( // 添加列
cf.getBytes(StandardCharsets.UTF_8), // 列名
((String) val.get(i)).getBytes(StandardCharsets.UTF_8), // 列项
((String) val.get(i + 1)).getBytes(StandardCharsets.UTF_8) // 数据
);
}
}
tab.put(put); // 放入数据
}
public static void main(String[] args) {
try {
// ... 初始化
Map<String, List> data = new HashMap<>(); // 创建 Map 对象
data.put("cf1", Arrays.asList("name", "lc", "age", "20")); // cf1 列族的项
data.put("cf2", Arrays.asList("java", "100", "hadoop", "50")); // cf2 列族的项
putTest("tab2", "rk001", data); // 静态成员方法添加数据
System.out.println("ok");
} catch (Exception ex) {
System.out.println("fail");
} finally {
// ... 后续处理
}
}

public/shixun/Pasted_image_20211211104231.png

读取数据
// main
getTest("tab2", "rk001", "cf1:name");
// get 输出 lc
private static void getTest(String tabName, String rowKey, String cfName)
throws IOException {
Table tab = conn.getTable(TableName.valueOf(tabName));
Get get = new Get(rowKey.getBytes(StandardCharsets.UTF_8));
Result result = tab.get(get);
Cell[] cells = result.rawCells();
for (int i = 0; i < cells.length; i++) {
Cell cell = cells[i];
String name = new String(CellUtil.cloneFamily(cell)) + ":"
+ new String(CellUtil.cloneQualifier(cell));
if (name.equals(cfName)) { // 只读取一条
String val = new String(CellUtil.cloneValue(cell));
System.out.println(val);
break;
}
}
}
// main
getTest("tab2", "rk001", "cf1:name");
// get 输出 lc
private static void getTest(String tabName, String rowKey, String cfName)
throws IOException {
Table tab = conn.getTable(TableName.valueOf(tabName));
Get get = new Get(rowKey.getBytes(StandardCharsets.UTF_8));
Result result = tab.get(get);
Cell[] cells = result.rawCells();
for (int i = 0; i < cells.length; i++) {
Cell cell = cells[i];
String name = new String(CellUtil.cloneFamily(cell)) + ":"
+ new String(CellUtil.cloneQualifier(cell));
if (name.equals(cfName)) { // 只读取一条
String val = new String(CellUtil.cloneValue(cell));
System.out.println(val);
break;
}
}
}
// main
scanTest("tab2");
// scan
private static void scanTest(String tabName) throws IOException {
Table tab = conn.getTable(TableName.valueOf(tabName));
Scan scan = new Scan();
ResultScanner scanner = tab.getScanner(scan); // 读取全部
while (true) {
Result result = scanner.next();
if (result == null) {
break;
}
Cell[] cells = result.rawCells();
for (int i = 0; i < cells.length; i++) {
Cell cell = cells[i];
String rk = new String(CellUtil.cloneRow(cell));
String name = new String(CellUtil.cloneFamily(cell)) + ":"
+ new String(CellUtil.cloneQualifier(cell));
String val = new String(CellUtil.cloneValue(cell));
System.out.println(rk + "\t" + name + "\t" + val);
}
}
}
// main
scanTest("tab2");
// scan
private static void scanTest(String tabName) throws IOException {
Table tab = conn.getTable(TableName.valueOf(tabName));
Scan scan = new Scan();
ResultScanner scanner = tab.getScanner(scan); // 读取全部
while (true) {
Result result = scanner.next();
if (result == null) {
break;
}
Cell[] cells = result.rawCells();
for (int i = 0; i < cells.length; i++) {
Cell cell = cells[i];
String rk = new String(CellUtil.cloneRow(cell));
String name = new String(CellUtil.cloneFamily(cell)) + ":"
+ new String(CellUtil.cloneQualifier(cell));
String val = new String(CellUtil.cloneValue(cell));
System.out.println(rk + "\t" + name + "\t" + val);
}
}
}
// 输出
rk001 cf1:age 20
rk001 cf1:name lc
rk001 cf2:hadoop 50
rk001 cf2:java 100
// 输出
rk001 cf1:age 20
rk001 cf1:name lc
rk001 cf2:hadoop 50
rk001 cf2:java 100

3. MapReduce 集成 HBase

3.1. 前期准备

3.1.1. 拷贝 hbase-site.xml 配置文件到 hadoop 的配置目录下。

Terminal window
cp hbase/conf/hbase-site.xml hadoop/etc/hadoop/
Terminal window
cp hbase/conf/hbase-site.xml hadoop/etc/hadoop/

3.1.2. 编辑 hadoop/etc/hadoop/hadoop-env.sh

Terminal window
[lc@master hadoop]$ pwd
/home/lc/hadoop/etc/hadoop
[lc@master hadoop]$ vi hadoop-enc.sh
Terminal window
[lc@master hadoop]$ pwd
/home/lc/hadoop/etc/hadoop
[lc@master hadoop]$ vi hadoop-enc.sh
Terminal window
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:~/hbase/lib/*
Terminal window
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:~/hbase/lib/*

public/shixun/Pasted_image_20211211134025.png

3.1.3. 重启 hbase 和 hdfs

3.1.4. 测试命令

Terminal window
hadoop jar hbase/lib/hbase-server-1.2.0-cdh5.7.0.jar rowcounter tab2
Terminal window
hadoop jar hbase/lib/hbase-server-1.2.0-cdh5.7.0.jar rowcounter tab2

public/shixun/Pasted_image_20211211141432.png

3.2. 综合应用

3.2.1. 批量导入数据

在 hbase 中创建 music 表
create 'music','info'
create 'music','info'
组织数据文件并上传到 hdfs 的 /musicdata
Terminal window
hadoop fs -put code/hadoop/music1.txt /musicdata/
hadoop fs -put code/hadoop/music2.txt /musicdata/
hadoop fs -put code/hadoop/music3.txt /musicdata/
Terminal window
hadoop fs -put code/hadoop/music1.txt /musicdata/
hadoop fs -put code/hadoop/music2.txt /musicdata/
hadoop fs -put code/hadoop/music3.txt /musicdata/
使用 hbase 的 importtsv 工具将数据导入

以下命令在一行内

Terminal window
[lc@master ~] hadoop jar ~/hbase/lib/hbase-server-1.2.0-cdh5.7.0.jar importtsv -Dimporttsv.
bulk.output=tmp -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:singer,info:gender,
info:rhythm,info:terminal music /musicdata
Terminal window
[lc@master ~] hadoop jar ~/hbase/lib/hbase-server-1.2.0-cdh5.7.0.jar importtsv -Dimporttsv.
bulk.output=tmp -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:singer,info:gender,
info:rhythm,info:terminal music /musicdata
参数含义
-Dimporttsv.bulk.output=tmp表示输出文件夹
-Dimporttsv.columns指定数据对应的含义
HBASE_ROW_KEY表示作为行键
music表示要存储的表名
/musicdata表示输入目录或文件
-Dcreate.table=yes表不存在时创建表

public/shixun/Pasted_image_20211211143432.png

public/shixun/Pasted_image_20211211143530.png

载入数据

使用 hbase 的 completebulkload 工具将数据载入 HRegion 中

Terminal window
hadoop jar ~/hbase/lib/hbase-server-1.2.0-cdh5.7.0.jar completebulkload tmp music
Terminal window
hadoop jar ~/hbase/lib/hbase-server-1.2.0-cdh5.7.0.jar completebulkload tmp music
参数含义
tmp上一步的输出目录
music目标表

通过 hbase shell 查看表内的详细信息

public/shixun/Pasted_image_20211211151106.png

3.2.2. Hbase MapReduce API

添加依赖
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
<version>1.2.0</version>
</dependency>

3.2.3. 单一 Mapper 示例

编写代码,打包为 jar,上传到虚拟机后运行

Terminal window
hadoop jar bd-1.0-SNAPSHOT.jar cn.edu.nnu.hbasemr.HbaseApp
Terminal window
hadoop jar bd-1.0-SNAPSHOT.jar cn.edu.nnu.hbasemr.HbaseApp

public/shixun/Pasted_image_20211211155545.png

序号数据
1RK[1] ,info:name song1
10RK[10] ,info:name song1
11RK[11] ,info:name song1
12RK[12] ,info:name song2

3.2.4. MapperReducer 示例:统计各客户端播放次数

public/shixun/Pasted_image_20211211163213.png

public/shixun/Pasted_image_20211211163407.png

3.2.5. MapperReducer 示例:将 HDFS 中的格式文件写入

public/shixun/Pasted_image_20211214172040.png

4. 作业 3

具体内容

所有评论数在 20+ 的电影中,评分最高的 10 部电影,输出其电影名和平均分

  1. 编写 MR 分别导入数据到 HBase 中
  2. 编程 MR 统计电影(输出电影编号、平均分)
  3. 读取文件,排序
  4. 根据编号查询 HBase 获取电影名

4.1. 程序目录结构

D:\ISO\HADOOP-DEMO
+---.idea
+---src
| +---main
| | +---java
| | +---cn.edu.nnu
| | | +---hbasemr
| | | +---MovieNode.java // 电影:评分:评论数
| | | +---MovieRating.java // 用于本机处理数据
| | | +---MovieApp.java // 将电影名存入 hbase
| | | +---MovieMapper.java
| | | +---MovieReducer.java
| | | +---RatingApp.java // 将评分统计入 hbase
| | | +---RatingMapper.java
| | | +---RatingReducer.java
| | +---demo
| +---test
| +---java
| +---cn.edu.nnu
+---target
D:\ISO\HADOOP-DEMO
+---.idea
+---src
| +---main
| | +---java
| | +---cn.edu.nnu
| | | +---hbasemr
| | | +---MovieNode.java // 电影:评分:评论数
| | | +---MovieRating.java // 用于本机处理数据
| | | +---MovieApp.java // 将电影名存入 hbase
| | | +---MovieMapper.java
| | | +---MovieReducer.java
| | | +---RatingApp.java // 将评分统计入 hbase
| | | +---RatingMapper.java
| | | +---RatingReducer.java
| | +---demo
| +---test
| +---java
| +---cn.edu.nnu
+---target

4.2. 将电影编号和名称读入 hbase

4.2.1. 调用 HBase API 编写程序

MovieMapper.java
package cn.edu.nnu.hbasemr;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MovieMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(
LongWritable key, Text value,
Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
MovieMapper.java
package cn.edu.nnu.hbasemr;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MovieMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(
LongWritable key, Text value,
Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
MovieReducer.java
package cn.edu.nnu.hbasemr;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class MovieReducer extends TableReducer<Text, NullWritable, NullWritable> {
@Override
protected void reduce(
Text key, Iterable<NullWritable> values,
Reducer<Text, NullWritable, NullWritable, Mutation>.Context context)
throws IOException, InterruptedException {
String s = key.toString();
String[] data = s.split("::");
Put put = new Put(data[0].getBytes(StandardCharsets.UTF_8));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"),
Bytes.toBytes(data[1]));
context.write(NullWritable.get(), put);
}
}
MovieReducer.java
package cn.edu.nnu.hbasemr;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class MovieReducer extends TableReducer<Text, NullWritable, NullWritable> {
@Override
protected void reduce(
Text key, Iterable<NullWritable> values,
Reducer<Text, NullWritable, NullWritable, Mutation>.Context context)
throws IOException, InterruptedException {
String s = key.toString();
String[] data = s.split("::");
Put put = new Put(data[0].getBytes(StandardCharsets.UTF_8));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"),
Bytes.toBytes(data[1]));
context.write(NullWritable.get(), put);
}
}
MovieApp.java
package cn.edu.nnu.hbasemr;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MovieApp {
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
Job job = Job.getInstance();
job.setJarByClass(MovieApp.class);
job.setMapperClass(MovieMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
TableMapReduceUtil.initTableReducerJob(args[0], MovieReducer.class, job);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Put.class);
Path inPath = new Path(args[1]);
Path outPath = new Path(args[2]);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true);
}
}
MovieApp.java
package cn.edu.nnu.hbasemr;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MovieApp {
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
Job job = Job.getInstance();
job.setJarByClass(MovieApp.class);
job.setMapperClass(MovieMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
TableMapReduceUtil.initTableReducerJob(args[0], MovieReducer.class, job);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Put.class);
Path inPath = new Path(args[1]);
Path outPath = new Path(args[2]);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true);
}
}

4.2.2. 导出包运行命令

Terminal window
hadoop jar bd-1.0-SNAPSHOT.jar cn.edu.nnu.hbasemr.MovieApp movies /movies/movies.dat /movieout1
Terminal window
hadoop jar bd-1.0-SNAPSHOT.jar cn.edu.nnu.hbasemr.MovieApp movies /movies/movies.dat /movieout1

4.2.3. 结果

public/shixun/Pasted_image_20211216165557.png

4.3. 统计电影编号对应的评分

4.3.1. 调用 HBase API 编写程序

RatingMapper.java
package cn.edu.nnu.hbasemr;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class RatingMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(
LongWritable key, Text value,
Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
String[] s = value.toString().split("::");
try {
context.write(
new Text(s[1]),
new LongWritable(Long.parseLong(s[2]))
);
} catch (Exception ex) {
}
}
}
RatingMapper.java
package cn.edu.nnu.hbasemr;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class RatingMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(
LongWritable key, Text value,
Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
String[] s = value.toString().split("::");
try {
context.write(
new Text(s[1]),
new LongWritable(Long.parseLong(s[2]))
);
} catch (Exception ex) {
}
}
}
RatingReducer.java
package cn.edu.nnu.hbasemr;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class RatingReducer extends TableReducer<Text, LongWritable, NullWritable> {
@Override
protected void reduce(
Text key,
Iterable<LongWritable> values,
Reducer<Text, LongWritable, NullWritable, Mutation>.Context context)
throws IOException, InterruptedException {
long sum = 0L;
long cnt = 0L;
for (LongWritable lw : values) {
sum += lw.get();
cnt++;
}
double avg = 0L;
if (cnt != 0) {
avg = (double) sum / cnt;
}
Put put = new Put(Bytes.toBytes(key.toString()));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("avg"),
Bytes.toBytes(Double.toString(avg)));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("cnt"),
Bytes.toBytes(Long.toString(cnt)));
// 此处写入的 double 和 long 要先转为字符串
// 否则写入的是按字节的编码,读取时较为困难
context.write(NullWritable.get(), put);
}
}
RatingReducer.java
package cn.edu.nnu.hbasemr;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class RatingReducer extends TableReducer<Text, LongWritable, NullWritable> {
@Override
protected void reduce(
Text key,
Iterable<LongWritable> values,
Reducer<Text, LongWritable, NullWritable, Mutation>.Context context)
throws IOException, InterruptedException {
long sum = 0L;
long cnt = 0L;
for (LongWritable lw : values) {
sum += lw.get();
cnt++;
}
double avg = 0L;
if (cnt != 0) {
avg = (double) sum / cnt;
}
Put put = new Put(Bytes.toBytes(key.toString()));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("avg"),
Bytes.toBytes(Double.toString(avg)));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("cnt"),
Bytes.toBytes(Long.toString(cnt)));
// 此处写入的 double 和 long 要先转为字符串
// 否则写入的是按字节的编码,读取时较为困难
context.write(NullWritable.get(), put);
}
}
RatingApp.java
package cn.edu.nnu.hbasemr;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class RatingApp {
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
Job job = Job.getInstance();
job.setJarByClass(RatingApp.class);
job.setMapperClass(RatingMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
TableMapReduceUtil.initTableReducerJob(args[0], RatingReducer.class, job);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Put.class);
Path inPath = new Path(args[1]);
Path outPath = new Path(args[2]);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true);
}
}
RatingApp.java
package cn.edu.nnu.hbasemr;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class RatingApp {
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
Job job = Job.getInstance();
job.setJarByClass(RatingApp.class);
job.setMapperClass(RatingMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
TableMapReduceUtil.initTableReducerJob(args[0], RatingReducer.class, job);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Put.class);
Path inPath = new Path(args[1]);
Path outPath = new Path(args[2]);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true);
}
}

4.3.2. 导出包运行命令

Terminal window
hadoop jar bd-1.0-SNAPSHOT.jar cn.edu.nnu.hbasemr.RatingApp ratings /movies/ratings.dat /movieout2
Terminal window
hadoop jar bd-1.0-SNAPSHOT.jar cn.edu.nnu.hbasemr.RatingApp ratings /movies/ratings.dat /movieout2

4.3.3. 结果

public/shixun/Pasted_image_20211216165735.png

4.4. 在本机上读取数据库并处理

4.4.1. 程序清单

数据结构
package cn.edu.nnu.hbasemr;
public class MovieNode {
private final String movieName; // 电影名
private double rate; // 评分
private long cnt; // 评论数
MovieNode(String movieName, Double rate, int cnt) {
this.movieName = movieName;
this.rate = rate;
this.cnt = cnt;
}
MovieNode(String movieName) { this(movieName, 0., 0); }
public void setCnt(long cnt) { this.cnt = cnt; }
public void setRate(double rate) { this.rate = rate; }
public double getRate() { return rate; }
public long getCnt() { return cnt; }
public String getMovieName() { return movieName; }
}
package cn.edu.nnu.hbasemr;
public class MovieNode {
private final String movieName; // 电影名
private double rate; // 评分
private long cnt; // 评论数
MovieNode(String movieName, Double rate, int cnt) {
this.movieName = movieName;
this.rate = rate;
this.cnt = cnt;
}
MovieNode(String movieName) { this(movieName, 0., 0); }
public void setCnt(long cnt) { this.cnt = cnt; }
public void setRate(double rate) { this.rate = rate; }
public double getRate() { return rate; }
public long getCnt() { return cnt; }
public String getMovieName() { return movieName; }
}
数据处理程序
package cn.edu.nnu.hbasemr;
import jdk.nashorn.internal.runtime.regexp.joni.exception.ValueException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.Comparator;
import java.util.PriorityQueue;
public class MovieRating {
static Configuration conf = null;
static Connection conn = null;
static HBaseAdmin admin = null;
static PriorityQueue<MovieNode> queue = null;
public static void main(String[] args) {
try {
init(); // 连接 hbase 初始化
queue = new PriorityQueue<>(new Comparator<MovieNode>() {
@Override
public int compare(MovieNode o1, MovieNode o2) {
return Double.compare(o1.getRate(), o2.getRate());
}
});
getValues("movies", "ratings"); // 将最终数据放到成员变量 queue 中
printQueue(); // 输出小根堆
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("fail");
} finally {
try {
assert conn != null;
conn.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
private static void getValues(String moviesTab, String ratingsTab) throws IOException {
if (!existTable(moviesTab) || !existTable(ratingsTab)) {
return;
}
Table mtab = conn.getTable(TableName.valueOf(moviesTab));
Table rtab = conn.getTable(TableName.valueOf(ratingsTab));
Scan mscan = new Scan();
Scan rscan = new Scan();
ResultScanner mscanner = mtab.getScanner(mscan);
while (true) {
Result mres = mscanner.next(); // 从 movies 表开始扫描
if (mres == null) {
break;
}
Cell[] cells = mres.rawCells();
for (Cell cell : cells) {
String movieNo = new String(CellUtil.cloneRow(cell)); // 电影编号
String movieName = new String(CellUtil.cloneValue(cell)); // 电影名
RowFilter rowFilter = new RowFilter( // 行过滤器(按电影编号)
CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes(movieNo)));
rscan.setFilter(rowFilter);
ResultScanner rscanner = rtab.getScanner(rscan); // 从 ratings 表读取
MovieNode movieNode = new MovieNode(movieName); // 创建 MovieNode 对象
while (true) {
Result rres = rscanner.next(); // 读取一个数据
if (rres == null) {
break;
}
Cell[] cells2 = rres.rawCells();
for (Cell cell2 : cells2) {
String rk = new String(CellUtil.cloneRow(cell2));
String qualifier = new String(CellUtil.cloneQualifier(cell2));
String val = new String(CellUtil.cloneValue(cell2));
if (qualifier.equalsIgnoreCase("avg")) {
try { // 将字符串转为 double
movieNode.setRate(Double.parseDouble(val));
} catch (ValueException ve) {
ve.printStackTrace();
}
} else if (qualifier.equalsIgnoreCase("cnt")) {
try { // 将字符串转为 int
movieNode.setCnt(Long.parseLong(val));
} catch (ValueException ve) {
ve.printStackTrace();
}
}
}
}
if (movieNode.getCnt() >= 20) { // 如果评论数大于等于 20
queue.add(movieNode); // 加入队列
}
if (queue.size() > 10) { // 队列成员个数超过 10 个
queue.poll(); // 将评分最小的节点出队
}
}
}
}
private static void printQueue() {
for (MovieNode mn : queue) {
System.out.println(mn.getMovieName() + "\t" + mn.getRate());
}
}
private static void init() throws IOException {
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "master,slave1,slave2");
conn = ConnectionFactory.createConnection(conf);
admin = (HBaseAdmin) conn.getAdmin();
}
private static boolean existTable(String tableName) throws IOException {
if (admin.tableExists(tableName)) {
System.out.println(tableName + "已存在");
return true;
} else {
System.out.println(tableName + "不存在");
return false;
}
}
}
package cn.edu.nnu.hbasemr;
import jdk.nashorn.internal.runtime.regexp.joni.exception.ValueException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.Comparator;
import java.util.PriorityQueue;
public class MovieRating {
static Configuration conf = null;
static Connection conn = null;
static HBaseAdmin admin = null;
static PriorityQueue<MovieNode> queue = null;
public static void main(String[] args) {
try {
init(); // 连接 hbase 初始化
queue = new PriorityQueue<>(new Comparator<MovieNode>() {
@Override
public int compare(MovieNode o1, MovieNode o2) {
return Double.compare(o1.getRate(), o2.getRate());
}
});
getValues("movies", "ratings"); // 将最终数据放到成员变量 queue 中
printQueue(); // 输出小根堆
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("fail");
} finally {
try {
assert conn != null;
conn.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
private static void getValues(String moviesTab, String ratingsTab) throws IOException {
if (!existTable(moviesTab) || !existTable(ratingsTab)) {
return;
}
Table mtab = conn.getTable(TableName.valueOf(moviesTab));
Table rtab = conn.getTable(TableName.valueOf(ratingsTab));
Scan mscan = new Scan();
Scan rscan = new Scan();
ResultScanner mscanner = mtab.getScanner(mscan);
while (true) {
Result mres = mscanner.next(); // 从 movies 表开始扫描
if (mres == null) {
break;
}
Cell[] cells = mres.rawCells();
for (Cell cell : cells) {
String movieNo = new String(CellUtil.cloneRow(cell)); // 电影编号
String movieName = new String(CellUtil.cloneValue(cell)); // 电影名
RowFilter rowFilter = new RowFilter( // 行过滤器(按电影编号)
CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes(movieNo)));
rscan.setFilter(rowFilter);
ResultScanner rscanner = rtab.getScanner(rscan); // 从 ratings 表读取
MovieNode movieNode = new MovieNode(movieName); // 创建 MovieNode 对象
while (true) {
Result rres = rscanner.next(); // 读取一个数据
if (rres == null) {
break;
}
Cell[] cells2 = rres.rawCells();
for (Cell cell2 : cells2) {
String rk = new String(CellUtil.cloneRow(cell2));
String qualifier = new String(CellUtil.cloneQualifier(cell2));
String val = new String(CellUtil.cloneValue(cell2));
if (qualifier.equalsIgnoreCase("avg")) {
try { // 将字符串转为 double
movieNode.setRate(Double.parseDouble(val));
} catch (ValueException ve) {
ve.printStackTrace();
}
} else if (qualifier.equalsIgnoreCase("cnt")) {
try { // 将字符串转为 int
movieNode.setCnt(Long.parseLong(val));
} catch (ValueException ve) {
ve.printStackTrace();
}
}
}
}
if (movieNode.getCnt() >= 20) { // 如果评论数大于等于 20
queue.add(movieNode); // 加入队列
}
if (queue.size() > 10) { // 队列成员个数超过 10 个
queue.poll(); // 将评分最小的节点出队
}
}
}
}
private static void printQueue() {
for (MovieNode mn : queue) {
System.out.println(mn.getMovieName() + "\t" + mn.getRate());
}
}
private static void init() throws IOException {
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "master,slave1,slave2");
conn = ConnectionFactory.createConnection(conf);
admin = (HBaseAdmin) conn.getAdmin();
}
private static boolean existTable(String tableName) throws IOException {
if (admin.tableExists(tableName)) {
System.out.println(tableName + "已存在");
return true;
} else {
System.out.println(tableName + "不存在");
return false;
}
}
}

4.4.2. 结果

在这里用了小根堆来排查,因此没有对他们进行排序。

public/shixun/Pasted_image_20211216173108.png

电影名评分
Sanjuro (1962)4.608695652173913
Seven Samurai (The Magnificent Seven) (Shichinin no samurai) (1954)4.560509554140127
Shawshank Redemption, The (1994)4.554557700942973
Godfather, The (1972)4.524966261808367
Close Shave, A (1995)4.52054794520548
Usual Suspects, The (1995)4.517106001121705
Schindler’s List (1993)4.510416666666667
Wrong Trousers, The (1993)4.507936507936508
Sunset Blvd. (a.k.a. Sunset Boulevard) (1950)4.491489361702127
Raiders of the Lost Ark (1981)4.477724741447892