Skip to content

第六次

1. 启动

1.1. 检查 hosts

172.22.0.3 master
172.22.0.4 slave1
172.22.0.5 slave2

1.2. 启动 hbase 服务

1.2.1. 启动 hdfs

Terminal window
[lc@master ~] start-dfs.sh

同时,保证 hdfs 中有一个 hbase 的目录项,若没有,则需要通过下面的命令创建

Terminal window
[lc@master ~] hadoop fs -mkdir /hbase

1.2.2. 启动 hbase

Terminal window
[lc@master bin] ./start-hbase.sh

2. API 编程

2.1. 通过 maven 在项目中添加 hbase 依赖

pox.xml<dependencies> 中加入下面的项,并刷新 maven

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0</version>
</dependency>

2.2. 尝试通过 API 连接 hbase

2.2.1. 在 hbase shell 中尝试建表

hbase(main):002:0> create 'tab1','f1','f2'

2.2.2. 编写 Java 代码

package cn.edu.nnu;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
public class HbaseDemo {
public static void main(String[] args) {
Configuration conf = null;
Connection conn = null;
HBaseAdmin admin = null;
try {
conf = HBaseConfiguration.create();
// 1. 对于第二个参数,如果在本机的 hosts 中已经配置了 172.22.0.3 master 等等项
// 则可以直接用主机名即可,否则需要填写主机的 IP,例如 `172.22.0.3:2181`
// 2. 如果在 `/hbase/conf/hbase-site.xml` 中 clientPort 写入了 2181 的端口号
// 则按照下面来写即可,否则还需要在主机名后加上端口号,例如 `master:2181`
conf.set("hbase.zookeeper.quorum", "master,slave1,slave2");
conn = ConnectionFactory.createConnection(conf);
admin = (HBaseAdmin) conn.getAdmin();
if (admin.tableExists("tab1")) {
System.out.println("tab1 exists!");
} else {
System.out.println("tab1 noexists!");
}
System.out.println("ok");
} catch (Exception ex) {
System.out.println("fail");
} finally {
try {
assert conn != null;
conn.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}

2.2.3. 通过 API 编程添加表和数据

创建表

admin, conf, conn 放入类的静态成员变量,并在 main 中初始化。

private static boolean existTable(String tableName) throws IOException {
if (admin.tableExists(tableName)) {
System.out.println(tableName + "已存在");
return true;
} else {
System.out.println(tableName + "不存在");
return false;
}
}
private static void createTable(String tabName, String... cfs) throws IOException {
if (existTable(tabName)) {
return;
}
HTableDescriptor tab = new HTableDescriptor(
TableName.valueOf(tabName));
for (String cf : cfs) {
HColumnDescriptor colDesc = new HColumnDescriptor(cf); // 创建列
tab.addFamily(colDesc); // 添加列族
}
admin.createTable(tab); // 创建表
System.out.println("Create Success!");
}
// main
createTable("tab2", "cf1", "cf2");

添加数据
private static void putTest(String tabName, String rowKey, Map<String, List> data)
throws IOException {
if (!existTable(tabName)) { // 判断表是否存在
return;
}
Table tab = conn.getTable(TableName.valueOf(tabName)); // 获取表对象
Put put = new Put(rowKey.getBytes(StandardCharsets.UTF_8));
for (Map.Entry<String, List> entry : data.entrySet()) { // 逐个获取 data 中的数据
String cf = entry.getKey();
List val = entry.getValue();
for (int i = 0; i < val.size(); i += 2) {
put.addColumn( // 添加列
cf.getBytes(StandardCharsets.UTF_8), // 列名
((String) val.get(i)).getBytes(StandardCharsets.UTF_8), // 列项
((String) val.get(i + 1)).getBytes(StandardCharsets.UTF_8) // 数据
);
}
}
tab.put(put); // 放入数据
}
public static void main(String[] args) {
try {
// ... 初始化
Map<String, List> data = new HashMap<>(); // 创建 Map 对象
data.put("cf1", Arrays.asList("name", "lc", "age", "20")); // cf1 列族的项
data.put("cf2", Arrays.asList("java", "100", "hadoop", "50")); // cf2 列族的项
putTest("tab2", "rk001", data); // 静态成员方法添加数据
System.out.println("ok");
} catch (Exception ex) {
System.out.println("fail");
} finally {
// ... 后续处理
}
}

读取数据
// main
getTest("tab2", "rk001", "cf1:name");
// get 输出 lc
private static void getTest(String tabName, String rowKey, String cfName)
throws IOException {
Table tab = conn.getTable(TableName.valueOf(tabName));
Get get = new Get(rowKey.getBytes(StandardCharsets.UTF_8));
Result result = tab.get(get);
Cell[] cells = result.rawCells();
for (int i = 0; i < cells.length; i++) {
Cell cell = cells[i];
String name = new String(CellUtil.cloneFamily(cell)) + ":"
+ new String(CellUtil.cloneQualifier(cell));
if (name.equals(cfName)) { // 只读取一条
String val = new String(CellUtil.cloneValue(cell));
System.out.println(val);
break;
}
}
}
// main
scanTest("tab2");
// scan
private static void scanTest(String tabName) throws IOException {
Table tab = conn.getTable(TableName.valueOf(tabName));
Scan scan = new Scan();
ResultScanner scanner = tab.getScanner(scan); // 读取全部
while (true) {
Result result = scanner.next();
if (result == null) {
break;
}
Cell[] cells = result.rawCells();
for (int i = 0; i < cells.length; i++) {
Cell cell = cells[i];
String rk = new String(CellUtil.cloneRow(cell));
String name = new String(CellUtil.cloneFamily(cell)) + ":"
+ new String(CellUtil.cloneQualifier(cell));
String val = new String(CellUtil.cloneValue(cell));
System.out.println(rk + "\t" + name + "\t" + val);
}
}
}
// 输出
rk001 cf1:age 20
rk001 cf1:name lc
rk001 cf2:hadoop 50
rk001 cf2:java 100

3. MapReduce 集成 HBase

3.1. 前期准备

3.1.1. 拷贝 hbase-site.xml 配置文件到 hadoop 的配置目录下。

Terminal window
cp hbase/conf/hbase-site.xml hadoop/etc/hadoop/

3.1.2. 编辑 hadoop/etc/hadoop/hadoop-env.sh

Terminal window
[lc@master hadoop]$ pwd
/home/lc/hadoop/etc/hadoop
[lc@master hadoop]$ vi hadoop-enc.sh
Terminal window
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:~/hbase/lib/*

3.1.3. 重启 hbase 和 hdfs

3.1.4. 测试命令

Terminal window
hadoop jar hbase/lib/hbase-server-1.2.0-cdh5.7.0.jar rowcounter tab2

3.2. 综合应用

3.2.1. 批量导入数据

在 hbase 中创建 music 表
create 'music','info'
组织数据文件并上传到 hdfs 的 /musicdata
Terminal window
hadoop fs -put code/hadoop/music1.txt /musicdata/
hadoop fs -put code/hadoop/music2.txt /musicdata/
hadoop fs -put code/hadoop/music3.txt /musicdata/
使用 hbase 的 importtsv 工具将数据导入

以下命令在一行内

Terminal window
[lc@master ~] hadoop jar ~/hbase/lib/hbase-server-1.2.0-cdh5.7.0.jar importtsv -Dimporttsv.
bulk.output=tmp -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:singer,info:gender,
info:rhythm,info:terminal music /musicdata
参数含义
-Dimporttsv.bulk.output=tmp表示输出文件夹
-Dimporttsv.columns指定数据对应的含义
HBASE_ROW_KEY表示作为行键
music表示要存储的表名
/musicdata表示输入目录或文件
-Dcreate.table=yes表不存在时创建表

载入数据

使用 hbase 的 completebulkload 工具将数据载入 HRegion 中

Terminal window
hadoop jar ~/hbase/lib/hbase-server-1.2.0-cdh5.7.0.jar completebulkload tmp music
参数含义
tmp上一步的输出目录
music目标表

通过 hbase shell 查看表内的详细信息

3.2.2. Hbase MapReduce API

添加依赖
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
<version>1.2.0</version>
</dependency>

3.2.3. 单一 Mapper 示例

编写代码,打包为 jar,上传到虚拟机后运行

Terminal window
hadoop jar bd-1.0-SNAPSHOT.jar cn.edu.nnu.hbasemr.HbaseApp

序号数据
1RK[1] ,info:name song1
10RK[10] ,info:name song1
11RK[11] ,info:name song1
12RK[12] ,info:name song2

3.2.4. MapperReducer 示例:统计各客户端播放次数

3.2.5. MapperReducer 示例:将 HDFS 中的格式文件写入

4. 作业 3

具体内容

所有评论数在 20+ 的电影中,评分最高的 10 部电影,输出其电影名和平均分

  1. 编写 MR 分别导入数据到 HBase 中
  2. 编程 MR 统计电影(输出电影编号、平均分)
  3. 读取文件,排序
  4. 根据编号查询 HBase 获取电影名

4.1. 程序目录结构

D:\ISO\HADOOP-DEMO
+---.idea
+---src
| +---main
| | +---java
| | +---cn.edu.nnu
| | | +---hbasemr
| | | +---MovieNode.java // 电影:评分:评论数
| | | +---MovieRating.java // 用于本机处理数据
| | | +---MovieApp.java // 将电影名存入 hbase
| | | +---MovieMapper.java
| | | +---MovieReducer.java
| | | +---RatingApp.java // 将评分统计入 hbase
| | | +---RatingMapper.java
| | | +---RatingReducer.java
| | +---demo
| +---test
| +---java
| +---cn.edu.nnu
+---target

4.2. 将电影编号和名称读入 hbase

4.2.1. 调用 HBase API 编写程序

MovieMapper.java
package cn.edu.nnu.hbasemr;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MovieMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(
LongWritable key, Text value,
Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
MovieReducer.java
package cn.edu.nnu.hbasemr;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class MovieReducer extends TableReducer<Text, NullWritable, NullWritable> {
@Override
protected void reduce(
Text key, Iterable<NullWritable> values,
Reducer<Text, NullWritable, NullWritable, Mutation>.Context context)
throws IOException, InterruptedException {
String s = key.toString();
String[] data = s.split("::");
Put put = new Put(data[0].getBytes(StandardCharsets.UTF_8));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"),
Bytes.toBytes(data[1]));
context.write(NullWritable.get(), put);
}
}
MovieApp.java
package cn.edu.nnu.hbasemr;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MovieApp {
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
Job job = Job.getInstance();
job.setJarByClass(MovieApp.class);
job.setMapperClass(MovieMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
TableMapReduceUtil.initTableReducerJob(args[0], MovieReducer.class, job);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Put.class);
Path inPath = new Path(args[1]);
Path outPath = new Path(args[2]);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true);
}
}

4.2.2. 导出包运行命令

Terminal window
hadoop jar bd-1.0-SNAPSHOT.jar cn.edu.nnu.hbasemr.MovieApp movies /movies/movies.dat /movieout1

4.2.3. 结果

4.3. 统计电影编号对应的评分

4.3.1. 调用 HBase API 编写程序

RatingMapper.java
package cn.edu.nnu.hbasemr;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class RatingMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(
LongWritable key, Text value,
Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
String[] s = value.toString().split("::");
try {
context.write(
new Text(s[1]),
new LongWritable(Long.parseLong(s[2]))
);
} catch (Exception ex) {
}
}
}
RatingReducer.java
package cn.edu.nnu.hbasemr;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class RatingReducer extends TableReducer<Text, LongWritable, NullWritable> {
@Override
protected void reduce(
Text key,
Iterable<LongWritable> values,
Reducer<Text, LongWritable, NullWritable, Mutation>.Context context)
throws IOException, InterruptedException {
long sum = 0L;
long cnt = 0L;
for (LongWritable lw : values) {
sum += lw.get();
cnt++;
}
double avg = 0L;
if (cnt != 0) {
avg = (double) sum / cnt;
}
Put put = new Put(Bytes.toBytes(key.toString()));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("avg"),
Bytes.toBytes(Double.toString(avg)));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("cnt"),
Bytes.toBytes(Long.toString(cnt)));
// 此处写入的 double 和 long 要先转为字符串
// 否则写入的是按字节的编码,读取时较为困难
context.write(NullWritable.get(), put);
}
}
RatingApp.java
package cn.edu.nnu.hbasemr;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class RatingApp {
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
Job job = Job.getInstance();
job.setJarByClass(RatingApp.class);
job.setMapperClass(RatingMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
TableMapReduceUtil.initTableReducerJob(args[0], RatingReducer.class, job);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Put.class);
Path inPath = new Path(args[1]);
Path outPath = new Path(args[2]);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true);
}
}

4.3.2. 导出包运行命令

Terminal window
hadoop jar bd-1.0-SNAPSHOT.jar cn.edu.nnu.hbasemr.RatingApp ratings /movies/ratings.dat /movieout2

4.3.3. 结果

4.4. 在本机上读取数据库并处理

4.4.1. 程序清单

数据结构
package cn.edu.nnu.hbasemr;
public class MovieNode {
private final String movieName; // 电影名
private double rate; // 评分
private long cnt; // 评论数
MovieNode(String movieName, Double rate, int cnt) {
this.movieName = movieName;
this.rate = rate;
this.cnt = cnt;
}
MovieNode(String movieName) { this(movieName, 0., 0); }
public void setCnt(long cnt) { this.cnt = cnt; }
public void setRate(double rate) { this.rate = rate; }
public double getRate() { return rate; }
public long getCnt() { return cnt; }
public String getMovieName() { return movieName; }
}
数据处理程序
package cn.edu.nnu.hbasemr;
import jdk.nashorn.internal.runtime.regexp.joni.exception.ValueException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.Comparator;
import java.util.PriorityQueue;
public class MovieRating {
static Configuration conf = null;
static Connection conn = null;
static HBaseAdmin admin = null;
static PriorityQueue<MovieNode> queue = null;
public static void main(String[] args) {
try {
init(); // 连接 hbase 初始化
queue = new PriorityQueue<>(new Comparator<MovieNode>() {
@Override
public int compare(MovieNode o1, MovieNode o2) {
return Double.compare(o1.getRate(), o2.getRate());
}
});
getValues("movies", "ratings"); // 将最终数据放到成员变量 queue 中
printQueue(); // 输出小根堆
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("fail");
} finally {
try {
assert conn != null;
conn.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
private static void getValues(String moviesTab, String ratingsTab) throws IOException {
if (!existTable(moviesTab) || !existTable(ratingsTab)) {
return;
}
Table mtab = conn.getTable(TableName.valueOf(moviesTab));
Table rtab = conn.getTable(TableName.valueOf(ratingsTab));
Scan mscan = new Scan();
Scan rscan = new Scan();
ResultScanner mscanner = mtab.getScanner(mscan);
while (true) {
Result mres = mscanner.next(); // 从 movies 表开始扫描
if (mres == null) {
break;
}
Cell[] cells = mres.rawCells();
for (Cell cell : cells) {
String movieNo = new String(CellUtil.cloneRow(cell)); // 电影编号
String movieName = new String(CellUtil.cloneValue(cell)); // 电影名
RowFilter rowFilter = new RowFilter( // 行过滤器(按电影编号)
CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes(movieNo)));
rscan.setFilter(rowFilter);
ResultScanner rscanner = rtab.getScanner(rscan); // 从 ratings 表读取
MovieNode movieNode = new MovieNode(movieName); // 创建 MovieNode 对象
while (true) {
Result rres = rscanner.next(); // 读取一个数据
if (rres == null) {
break;
}
Cell[] cells2 = rres.rawCells();
for (Cell cell2 : cells2) {
String rk = new String(CellUtil.cloneRow(cell2));
String qualifier = new String(CellUtil.cloneQualifier(cell2));
String val = new String(CellUtil.cloneValue(cell2));
if (qualifier.equalsIgnoreCase("avg")) {
try { // 将字符串转为 double
movieNode.setRate(Double.parseDouble(val));
} catch (ValueException ve) {
ve.printStackTrace();
}
} else if (qualifier.equalsIgnoreCase("cnt")) {
try { // 将字符串转为 int
movieNode.setCnt(Long.parseLong(val));
} catch (ValueException ve) {
ve.printStackTrace();
}
}
}
}
if (movieNode.getCnt() >= 20) { // 如果评论数大于等于 20
queue.add(movieNode); // 加入队列
}
if (queue.size() > 10) { // 队列成员个数超过 10 个
queue.poll(); // 将评分最小的节点出队
}
}
}
}
private static void printQueue() {
for (MovieNode mn : queue) {
System.out.println(mn.getMovieName() + "\t" + mn.getRate());
}
}
private static void init() throws IOException {
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "master,slave1,slave2");
conn = ConnectionFactory.createConnection(conf);
admin = (HBaseAdmin) conn.getAdmin();
}
private static boolean existTable(String tableName) throws IOException {
if (admin.tableExists(tableName)) {
System.out.println(tableName + "已存在");
return true;
} else {
System.out.println(tableName + "不存在");
return false;
}
}
}

4.4.2. 结果

在这里用了小根堆来排查,因此没有对他们进行排序。

电影名评分
Sanjuro (1962)4.608695652173913
Seven Samurai (The Magnificent Seven) (Shichinin no samurai) (1954)4.560509554140127
Shawshank Redemption, The (1994)4.554557700942973
Godfather, The (1972)4.524966261808367
Close Shave, A (1995)4.52054794520548
Usual Suspects, The (1995)4.517106001121705
Schindler’s List (1993)4.510416666666667
Wrong Trousers, The (1993)4.507936507936508
Sunset Blvd. (a.k.a. Sunset Boulevard) (1950)4.491489361702127
Raiders of the Lost Ark (1981)4.477724741447892