11 HBase和MpaReduce整合

2017-03-12 09:47

环境搭建

搭建步骤:

  1. 在etc/hadoop目录中创建hbase-site.xml的软连接。在真正的集群环境中的时候,hadoop运行mapreduce会通过该文件查找具体的hbase环境信息。
  2. 将hbase需要的jar包添加到hadoop运行环境中,其中hbase需要的jar就是lib文件夹下面的所有*.jar文件。
  3. 使用hbase自带的server jar测试是否安装成功。

环境搭建-软连接创建

命令:
ln -s /home/hadoop/bigdater/hbase-0.98.6-cdh5.3.6/conf/hbase-site.xml /home/hadoop/bigdater/hadoop-2.5.0-cdh5.3.6/etc/hadoop/hbase-site.xml

环境搭建-hbase jar添加

在hadoop中添加其他第三方的jar有多种方式,比如添加hbase的jar到hadoop环境中。这里介绍三种:
第一种:在hadoop-env.sh中添加HADOOP_CLASSPATH环境变量,value为hbase的lib文件夹下的所有jar文件。
第二种:在系统级或者用户级修改*profile文件内容,在文件中添加HADOOP_CLASSPATH。
第三种:直接将hbase的lib文件夹中所有jar文件复制到hadoop的share/hadoop/common/lib 或者share/hadoop/mapreduce等文件夹中。

将hbase的jar信息引入到hadoop执行环节中去。【第一种】
  vim hadoop-env.sh
  添加内容
####hbase lib jars
if [ "$HADOOP_CLASSPATH" ]; then
  export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/home/hadoop/bigdater/hbase-0.98.6-cdh5.3.6/lib/*
else
  export HADOOP_CLASSPATH=/home/hadoop/bigdater/hbase-0.98.6-cdh5.3.6/lib/*
fi
  保存退出

环境搭建-测试

直接使用hbase自带的命名进行环境的测试,命令为:hadoop jar $HBASE_HOME/lib/hbase-server-0.98.6-cdh5.3.6.jar rowcounter hbasetablename。 运行该命名可以查看到我们指定table的行数。

hadoop jar /home/hadoop/bigdater/hbase-0.98.6-cdh5.3.6/lib/hbase-server-0.98.6-cdh5.3.6.jar rowcounter users

案例–统计产品信息

功能介绍:我们用爬虫从网络上爬取数据,然后现在就需要充这些爬取的数据中提取产品信息。我们爬取的数据是放到hbase中的,然后最终我们的提取信息也要保存到hbase中。
注意:在window上运行操作hbase的mapreduce程序,如果指定fs.defaultFS为集群地址信息,那么使用TableMapReducerUtil的时候必须将addDependency设置为false,如果没有指定,那么必须为true。如果在集群中运行mapreduce程序,那么addDependency必须为true。
架构:nutch/solr+hbase+hdfs+mapreduce

数据准备(rowkey使用Java中的时间)

源表
import java.util.Date
create 'data','f'
put 'data',Date.new().getTime(),'f:content','{"p_id":"100001","p_name":"<张家界-凤凰-天门山-玻璃栈道飞机5日游>","price":"2141"}'
put 'data',Date.new().getTime(),'f:content','{"p_id":"100002","p_name":"<丽江-香格里拉-泸沽湖双飞7日游>","price":"4429"}'
put 'data',Date.new().getTime(),'f:content','{"p_id":"100003","p_name":"<香格里拉-昆大丽3飞6日游>","price":"2479"}'
put 'data',Date.new().getTime(),'f:content','{"p_id":"100004","p_name":"<桂林-阳朔-古东瀑布-世外桃源双飞5日游>","price":"2389"}'
put 'data',Date.new().getTime(),'f:content','{"p_id":"100005","p_name":"<海南三亚-无自费5日游>","price":"2389"}'
put 'data',Date.new().getTime(),'f:content','{"p_id":"100006","p_name":"<成都-九寨沟-黄龙-花湖-红原-九曲双飞7日游>","price":"3729"}'
put 'data',Date.new().getTime(),'f:content','{"p_id":"100007","p_name":"<海南三亚5日游>","price":"2168"}'
put 'data',Date.new().getTime(),'f:content','{"p_id":"100008","p_name":"<海南三亚五星0购物6日游>","price":"2916"}'
put 'data',Date.new().getTime(),'f:content','{"p_id":"100009","p_name":"<厦门双飞4日游>","price":"1388"}'
put 'data',Date.new().getTime(),'f:content','{"p_id":"100010","p_name":"<绵阳-九寨-黄龙-都江堰-成都双飞5日游>","price":"2899"}'
put 'data',Date.new().getTime(),'f:content','{"p_id":"100011","p_name":"<桂林-阳朔-古东-世外桃源双飞4日游>","price":"2249"}'
put 'data',Date.new().getTime(),'f:content','{"p_id":"100012","p_name":"<成都-九寨沟-黄龙双飞6日游>","price":"2611"}'
put 'data',Date.new().getTime(),'f:content','{"p_id":"100013","p_name":"<版纳-香格里拉-昆大丽4飞一卧8日游>","price":"3399"}'
put 'data',Date.new().getTime(),'f:content','{"p_id":"100014","p_name":"<成都-都江堰-黄龙九寨沟双飞6日游>","price":"2989"}'
put 'data',Date.new().getTime(),'f:content','{"p_id":"100015","p_name":"<桂林-大漓江-阳朔-龙脊梯田双飞4日游>","price":"2048"}'
put 'data',Date.new().getTime(),'f:content','{"p_id":"100016","p_name":"<长沙-张家界-黄龙洞-天门山-凤凰双飞7日游>","price":"3141"}'

目标表
create 'online_product', 'f'

分析

"p_id":"100001","p_name":"<张家界-凤凰-天门山-玻璃栈道飞机5日游>","price":"2141"

pid多个的情况,

pid:xxx
pname:xxx
price:xxx

案例编写

编写自定义的MR程序读取hbase的数据或者往hbase中写数据注意点:
1. 如果是从hbase中读取数据,那么要求mapper实现TableMapper抽象类。如果是往hbase中写数据,而且是有reducer的情况下,要求reducer实现TableReducer抽象类。
2. 使用TableMapperReducerUtil类来进行job创建初始化操作。如果是往hbase中写数据,而且reducer是可以省略的话,那么我们也可以不指定reducer的具体类,直接使用指定null,比设置job的reducer的task个数为0.
3. 如果是在window环境中运行job(任务代码的执行时在window上),那么需要将参数addDependency设置为false,或者将fs.defaultFS设置为file:///。如果是在集群中运行job,那么必须将addDependency设置为true,并且fs.defaultFS设置为hdfs://xxx
4. 具体代码参考../代码/beifeng10文件夹

本地运行:在Windows eclipse上运行
集群运行:在linux集群中
hadoop jar beifeng10-0.0.1.jar com.beifeng.hbase.HBaseTableDemo

实现:

ProductModel

——首先根据源数据进行分析,自定义输入数据格式(域、构造、get/set、读、写、比较等)

package com.beifeng.hbase;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**
 * 自定义数据格式 partitioner 分区
 *
 * @author gerry
 *
 */
public class ProductModel implements WritableComparable<ProductModel> {
     private String id;
     private String name;
     private String price;

     public ProductModel() {
           super();
     }

     public ProductModel(String id, String name, String price) {
           super();
           this.id = id;
           this.name = name;
           this.price = price;
     }

     public String getId() {
           return id;
     }

     public void setId(String id) {
           this.id = id;
     }

     public String getName() {
           return name;
     }

     public void setName(String name) {
           this.name = name;
     }

     public String getPrice() {
           return price;
     }

     public void setPrice(String price) {
           this.price = price;
     }

     @Override
     public void write(DataOutput out) throws IOException {
           out.writeUTF(this.id);
           out.writeUTF(this.name);
           out.writeUTF(this.price);
     }

     @Override
     public void readFields(DataInput in) throws IOException {
           this.id = in.readUTF();
           this.name = in.readUTF();
           this.price = in.readUTF();
     }

     @Override
     public int compareTo(ProductModel o) {
           if (this == o) {
                return 0;
           }

           int tmp = this.id.compareTo(o.id);
           if (tmp != 0) {
                return tmp;
           }
           tmp = this.name.compareTo(o.name);
           if (tmp != 0) {
                return tmp;
           }
           tmp = this.price.compareTo(o.price);
           return tmp;
     }

     @Override
     public int hashCode() {
           final int prime = 31;
           int result = 1;
           result = prime * result + ((id == null) ? 0 : id.hashCode());
           result = prime * result + ((name == null) ? 0 : name.hashCode());
           result = prime * result + ((price == null) ? 0 : price.hashCode());
           return result;
     }

     @Override
     public boolean equals(Object obj) {
           if (this == obj)
                return true;
           if (obj == null)
                return false;
           if (getClass() != obj.getClass())
                return false;
           ProductModel other = (ProductModel) obj;
           if (id == null) {
                if (other.id != null)
                     return false;
           } else if (!id.equals(other.id))
                return false;
           if (name == null) {
                if (other.name != null)
                     return false;
           } else if (!name.equals(other.name))
                return false;
           if (price == null) {
                if (other.price != null)
                     return false;
           } else if (!price.equals(other.price))
                return false;
           return true;
     }
}

 HBaseTableDemo

——编写Mapper、Reducer、入口等,以及一些异常分析

package com.beifeng.hbase;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

/**
 * mapreduce操作hbase
 *
 * @author gerry
 *
 */
public class HBaseTableDemo {
     /**
 * 转换字符串为map对象
 *
 * @param content
 * @return
 */
     static Map<String, String> transfoerContent2Map(String content) {
           Map<String, String> map = new HashMap<String, String>();
           int i = 0;
           String key = "";
           // 删掉不需要的字符
           StringTokenizer tokenizer = new StringTokenizer(content, "({|}|\"|:|,)");
           while (tokenizer.hasMoreTokens()) {
                if (++i % 2 == 0) {
                     // 当前的值是value
                     map.put(key, tokenizer.nextToken());
                } else {
                     // 当前的值是key
                     key = tokenizer.nextToken();
                }
           }
           return map;
     }

     /**
 * mapper类,从hbase输入数据
 *
 * @author gerry
 *
 */
     static class DemoMapper extends TableMapper<Text, ProductModel> {
           private Text outputKey = new Text();
           private ProductModel outputValue = new ProductModel();

           @Override
           protected void map(ImmutableBytesWritable key, Result value, Context context)
                     throws IOException, InterruptedException {
                String content = Bytes.toString(value.getValue(Bytes.toBytes("f"), Bytes.toBytes("content")));
                if (content == null) {
                     System.err.println("数据格式错误" + content);
                     return;
                }
                Map<String, String> map = HBaseTableDemo.transfoerContent2Map(content);
                if (map.containsKey("p_id")) {
                     // 产品id存在
                     outputKey.set(map.get("p_id"));
                } else {
                     System.err.println("数据格式错误" + content);
                     return;
                }
                if (map.containsKey("p_name") && map.containsKey("price")) {
                     // 数据正常,进行赋值
                     outputValue.setId(outputKey.toString());
                     outputValue.setName(map.get("p_name"));
                     outputValue.setPrice(map.get("price"));
                } else {
                     System.err.println("数据格式错误" + content);
                     return;
                }
                context.write(outputKey, outputValue);
           }
     }

     /**
 * mapper类,从hbase输入数据,不使用reducer,直接进行输出到habse
 *
 * @author gerry
 *
 */
     static class DemoMapper2 extends TableMapper<ImmutableBytesWritable, Put> {

           @Override
           protected void map(ImmutableBytesWritable key, Result value, Context context)
                     throws IOException, InterruptedException {
                String content = Bytes.toString(value.getValue(Bytes.toBytes("f"), Bytes.toBytes("content")));
                if (content == null) {
                     System.err.println("数据格式错误" + content);
                     return;
                }
                Map<String, String> map = HBaseTableDemo.transfoerContent2Map(content);
                ImmutableBytesWritable outputkey = new ImmutableBytesWritable();
                if (map.containsKey("p_id")) {
                     // 产品id存在
                     outputkey = new ImmutableBytesWritable(Bytes.toBytes(map.get("p_id")));
                } else {
                     System.err.println("数据格式错误" + content);
                     return;
                }
                Put put = new Put(Bytes.toBytes(map.get("p_id")));
                if (map.containsKey("p_name") && map.containsKey("price")) {
                     // 数据正常,进行赋值
                     put.add(Bytes.toBytes("f"), Bytes.toBytes("id"), Bytes.toBytes(map.get("p_id")));
                     put.add(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes(map.get("p_name")));
                     put.add(Bytes.toBytes("f"), Bytes.toBytes("price"), Bytes.toBytes(map.get("price")));
                } else {
                     System.err.println("数据格式错误" + content);
                     return;
                }
                context.write(outputkey, put);
           }
     }

     /**
 * reducer类,往hbase输出
 *
 * @author gerry
 *
 */
     static class DemoReducer extends TableReducer<Text, ProductModel, ImmutableBytesWritable> {
           @Override
           protected void reduce(Text key, Iterable<ProductModel> values, Context context)
                     throws IOException, InterruptedException {
                for (ProductModel value : values) {
                     // 我只拿一个,如果有多个产品id的话
                     ImmutableBytesWritable outputKey = new ImmutableBytesWritable(Bytes.toBytes(key.toString()));
                     Put put = new Put(Bytes.toBytes(key.toString()));
                     put.add(Bytes.toBytes("f"), Bytes.toBytes("id"), Bytes.toBytes(value.getId()));
                     put.add(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes(value.getName()));
                     put.add(Bytes.toBytes("f"), Bytes.toBytes("price"), Bytes.toBytes(value.getPrice()));
                     context.write(outputKey, put);
                }
           }
     }

     /**
 * 执行入口
 *
 * @param args
 * @throws Exception
 */
     public static void main(String[] args) throws Exception {
           // 本地选择:
           // initLocalHbaseMapReducerJobConfig2&initLocalHbaseMapReducerJobConfig
           // 集群选择:
           // initLocalHbaseMapReducerJobConfig2&initFailureLocalHbaseMapReducerJobConfig
           Job job = // initLocalHbaseMapReducerJobConfig3();
                     // initLocalHbaseMapReducerJobConfig2();
                     // initFailureLocalHbaseMapReducerJobConfig();
                     initLocalHbaseMapReducerJobConfig();
           int l = job.waitForCompletion(true) ? 0 : 1;
           System.out.println("执行:" + l);
     }

     /**
 * 本地正常 运行1
 *
 * @return
 * @throws Exception
 */
     static Job initLocalHbaseMapReducerJobConfig() throws Exception {
           Configuration conf = HBaseConfiguration.create();
           conf.set("fs.defaultFS", "hdfs://192.168.175.110"); // hadoop的环境
           conf.set("hbase.zookeeper.quorum", "192.168.175.110"); // hbase zk环境信息

           Job job = Job.getInstance(conf, "demo");
           job.setJarByClass(HBaseTableDemo.class);

           // 设置mapper相关,mapper从hbase输入
           // 本地环境,而且fs.defaultFS为集群模式的时候,需呀设置addDependencyJars参数为false。
           // addDependencyJars集群中,参数必须为true。
           TableMapReduceUtil.initTableMapperJob("data", new Scan(), DemoMapper.class, Text.class, ProductModel.class, job,
                     false);

           // 设置reducer相关,reducer往hbase输出
           // 本地环境,而且fs.defaultFS为集群模式的时候,需呀设置addDependencyJars参数为false。
          TableMapReduceUtil.initTableReducerJob("online_product", DemoReducer.class, job, null, null, null, null, false);

           return job;
     }

     /**
 * 本地运行错误
 *
 * @return
 * @throws Exception
 */
     static Job initFailureLocalHbaseMapReducerJobConfig() throws Exception {
           Configuration conf = HBaseConfiguration.create();
           conf.set("fs.defaultFS", "hdfs://192.168.175.110"); // hadoop的环境
           conf.set("hbase.zookeeper.quorum", "192.168.175.110"); // hbase zk环境信息

           Job job = Job.getInstance(conf, "demo");
           job.setJarByClass(HBaseTableDemo.class);

           // 设置mapper相关,mapper从hbase输入【方法的参数少】false
           TableMapReduceUtil.initTableMapperJob("data", new Scan(), DemoMapper.class, Text.class, ProductModel.class,
                     job);

           // 设置reducer相关,reducer往hbase输出【方法的参数少】
          TableMapReduceUtil.initTableReducerJob("online_product", DemoReducer.class, job);

           return job;
     }

     /**
 * 本地正常 运行2
 *
 * @return
 * @throws Exception
 */
     static Job initLocalHbaseMapReducerJobConfig2() throws Exception {
           Configuration conf = HBaseConfiguration.create();
           // 不要hadoop的配置信息,也可以解决initFailureLocalHbaseMapReducerJobConfig的这个问题。
           // conf.set("fs.defaultFS", "hdfs://192.168.175.110"); // hadoop的环境
           conf.set("hbase.zookeeper.quorum", "192.168.175.110"); // hbase zk环境信息

           Job job = Job.getInstance(conf, "demo");
           job.setJarByClass(HBaseTableDemo.class);

           // 设置mapper相关,mapper从hbase输入
           TableMapReduceUtil.initTableMapperJob("data", new Scan(), DemoMapper.class, Text.class, ProductModel.class,
                     job);

           // 设置reducer相关,reducer往hbase输出
          TableMapReduceUtil.initTableReducerJob("online_product", DemoReducer.class, job);

           return job;
     }

     /**
 * 本地正常 运行3, 直接使用mapper进行hbase输出,不使用reducer进行输出
 *
 * @return
 * @throws Exception
 */
     static Job initLocalHbaseMapReducerJobConfig3() throws Exception {
           Configuration conf = HBaseConfiguration.create();
           conf.set("fs.defaultFS", "hdfs://192.168.175.110"); // hadoop的环境
           conf.set("hbase.zookeeper.quorum", "192.168.175.110"); // hbase zk环境信息

           Job job = Job.getInstance(conf, "demo");
           job.setJarByClass(HBaseTableDemo.class);

           // 设置mapper相关,mapper从hbase输入
           // 本地环境,而且fs.defaultFS为集群模式的时候,需呀设置addDependencyJars参数为false。
           // addDependencyJars集群中,参数必须为true。
           TableMapReduceUtil.initTableMapperJob("data", new Scan(), DemoMapper2.class, ImmutableBytesWritable.class,
                     Put.class, job, false);

           // 设置reducer相关,reducer往hbase输出
           // 本地环境,而且fs.defaultFS为集群模式的时候,需呀设置addDependencyJars参数为false。
          TableMapReduceUtil.initTableReducerJob("online_product", null, job, null, null, null, null, false);
           job.setNumReduceTasks(0);

           return job;
     }
}