10 hbase Java API

2017-03-12 09:47

Java客户端

Java客户端其实就是shell客户端的一种实现,操作命令基本上就是shell客户端命令的一个映射。Java客户端使用的配置信息是被映射到一个HBaseConfiguration的实例对象中的,当使用该类的create方法创建实例对象的时候,会从classpath路径下获取hbase-site.xml文件并进行配置文件内容的读取,同时会读取hadoop的配置文件信息。也可以通过java代码指定命令信息,只需要给定zk的相关环境变量信息即可。代码如下:
Configuration config = HBaseConfiguration.create();
config.set(“hbase.zookeeper.quorum”, “hh”);
如果是分布式
config.set(“hbase.zookeeper.quorum”, “hh1,hh2,hh3….”);

HBaseAdmin

HBaseAdmin类是主要进行DDL操作相关的一个接口类,主要包括命名空间管理,用户表管理。通过该接口我们可以创建、删除、获取用户表,也可以进行用户表的分割、紧缩等操作。

HTable,HTableDescriptor

HTable是hbase中的用户表的一个映射的java实例,我们可以通过该类进行表数据的操作,包括数据的增删查改,也就是在这里我们可以类似shell中put,get和scan进行数据的操作。
HTableDescriptor是hbase用户表的具体描述信息类,一般我们创建表获取获取表信息,就是通过该类进行的。

Put,Get,Scan,Delete

Put类是专门提供插入数据的类。
Get类是专门提供根据rowkey获取数据的类。
Scan是专门进行范围查找的类。
Delete是专门进行删除的类。

HBase连接池

在web应用中,如果我们之间使用HTable来操作hbase,那么在创建连接和关闭连接的时候,一定会浪费资源。那么HBase提供了一个连接池的基础,主要涉及到的类和接口包括:HConnection,HConnectionManager,HTableInterface,ExecutorService四个。其中HConnection就是hbase封装好的hbase连接池,HConnectionManager是管理连接池的一个类,HTableInterface是在类HTable的基础上进行的一个接口抽象。ExecutorService是jdk的线程池对象。

HBaseUtil——-工具类

package com.beifeng.hbase.util;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;

public class HBaseUtil {
    /**
 * 获取hbase的配置文件信息
 * @return
 */
    public static Configuration getHBaseConfiguration(){
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "192.168.175.110");
        return conf;
    }
}

TestHBaseAdmin——-HBaseAdmin(DDL操作)

package com.beifeng.hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import com.beifeng.hbase.util.HBaseUtil;

public class TestHBaseAdmin {
    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseUtil.getHBaseConfiguration();
        HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);
        try {
            testCreateTable(hBaseAdmin); // 创建表
            // testGetTableDescribe(hBaseAdmin);//打印表描述信息
            // testDeleteTable(hBaseAdmin);
        } finally {
            hBaseAdmin.close();
        }
    }

    /**
 * 测试创建table
 *
 * @throws IOException
 */
    static void testCreateTable(HBaseAdmin hbAdmin) throws IOException {
        TableName name = TableName.valueOf("users");
        if (hbAdmin.tableExists(name)) {
            System.out.println("表已经存在");
        } else {
            HTableDescriptor htd = new HTableDescriptor(name);
            htd.addFamily(new HColumnDescriptor("f"));
            // 设置文件大小
            htd.setMaxFileSize(10000L);
            hbAdmin.createTable(htd);
            System.out.println("创建表成功");
        }

    }

    /**
 * 获取表信息
 *
 * @param hbAdmin
 * @throws IOException
 */
    static void testGetTableDescribe(HBaseAdmin hbAdmin) throws IOException {
        // 创建表命名空间
        // hbAdmin.createNamespace(NamespaceDescriptor.create("dd").build());
        TableName name = TableName.valueOf("users");
        if (hbAdmin.tableExists(name)) {
            HTableDescriptor htd = hbAdmin.getTableDescriptor(name);
            System.out.println(htd);
        } else {
            System.out.println("表不存在");
        }

    }

    /**
 * 测试删除
 *
 * @param hbAdmin
 * @throws IOException
 */
    static void testDeleteTable(HBaseAdmin hbAdmin) throws IOException {
        TableName name = TableName.valueOf("users");
        // 表是否存在判断
        if (hbAdmin.tableExists(name)) {
            // 判断表是enabled还是disabled状态
            if (hbAdmin.isTableEnabled(name)) {
                hbAdmin.disableTable(name);
                hbAdmin.deleteTable(name);
            }
            hbAdmin.deleteTable(name);
            System.out.println("删除成功");
        } else {
            System.out.println("表不存在");
        }

    }
}

TestHTable——HTable(表数据DML操作)

package com.beifeng.hbase;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;

import com.beifeng.hbase.util.HBaseUtil;

public class TestHTable {
     static byte[] family = Bytes.toBytes("f");

     public static void main(String[] args) throws Exception {
           Configuration conf = HBaseUtil.getHBaseConfiguration();
           testUseHbaseConnectionPool(conf);
           // testUseHTable(conf);
     }

     static void testUseHTable(Configuration conf) throws IOException {
           HTable hTable = new HTable(conf, "users");
           try {
                // testPut(hTable);
                // testGet(hTable);
                // testDelete(hTable);
                testScan(hTable);
           } finally {
                hTable.close();
           }
     }

     /**
 * hbase 连接池【更好】
 *
 * @param conf
 * @throws IOException
 */
     static void testUseHbaseConnectionPool(Configuration conf) throws IOException {
           ExecutorService threads = Executors.newFixedThreadPool(2); // 两个线程
           HConnection pool = HConnectionManager.createConnection(conf);
           HTableInterface hTable = pool.getTable("users");
           try {
                // testPut(hTable);
                // testGet(hTable);
                // testDelete(hTable);
                testScan(hTable);
           } finally {
                hTable.close(); // 每次hTable操作完关闭,其实是放到pool中
                pool.close(); // 最终的时候关闭
           }
     }

     /**
 * 测试scan
 *
 * @param hTable
 * @throws IOException
 */
     static void testScan(HTableInterface hTable) throws IOException {
           Scan scan = new Scan();
           // 增加起始row key
           scan.setStartRow(Bytes.toBytes("row1"));
           scan.setStopRow(Bytes.toBytes("row4"));
           // 增加过滤filter
           FilterList list = new FilterList(Operator.MUST_PASS_ALL); // 要求所有条件满足
           byte[][] prefixes = new byte[2][];
           prefixes[0] = Bytes.toBytes("id");
           prefixes[1] = Bytes.toBytes("name");
           MultipleColumnPrefixFilter mcpf = new MultipleColumnPrefixFilter(prefixes);
           list.addFilter(mcpf);
           scan.setFilter(list);

           ResultScanner rs = hTable.getScanner(scan);
           Iterator<Result> iter = rs.iterator();
           while (iter.hasNext()) {
                Result result = iter.next();
                printResult(result);
           }
     }

     /**
 * 打印Result对象
 *
 * @param result
 */
     static void printResult(Result result) {
           System.out.println("********" + Bytes.toString(result.getRow()) + "*********");
           NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = result.getMap();
           for (Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entry : map.entrySet()) {
                String family = Bytes.toString(entry.getKey());
                for (Map.Entry<byte[], NavigableMap<Long, byte[]>> columnEntry : entry.getValue().entrySet()) {
                     String column = Bytes.toString(columnEntry.getKey());
                     String value = "";
                     if ("age".equals(column)) {
                           value = "" + Bytes.toString(columnEntry.getValue().firstEntry().getValue());
                     } else {
                           value = Bytes.toString(columnEntry.getValue().firstEntry().getValue());
                     }
                     System.out.println(family + ":" + column + ":" + value);
                }
           }
     }

     /**
 * 测试put操作 3种方式
 *
 * @param table
 * @throws InterruptedIOException
 * @throws RetriesExhaustedWithDetailsException
 */
     static void testPut(HTableInterface hTable) throws Exception {
           // 单个put
           Put put = new Put(Bytes.toBytes("row1"));
           put.add(Bytes.toBytes("f"), Bytes.toBytes("id"), Bytes.toBytes("1"));
           put.add(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("zhangsan"));
           put.add(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes("27"));
           put.add(Bytes.toBytes("f"), Bytes.toBytes("phone"), Bytes.toBytes("11111111"));
           put.add(Bytes.toBytes("f"), Bytes.toBytes("email"), Bytes.toBytes("zhangsan@qq.com"));
           hTable.put(put);

           // 同时put多个
           Put put1 = new Put(Bytes.toBytes("row2"));
           put1.add(Bytes.toBytes("f"), Bytes.toBytes("id"), Bytes.toBytes("2"));
           put1.add(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("user2"));

           Put put2 = new Put(Bytes.toBytes("row3"));
           put2.add(Bytes.toBytes("f"), Bytes.toBytes("id"), Bytes.toBytes("3"));
           put2.add(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("user3"));

           Put put3 = new Put(Bytes.toBytes("row4"));
           put3.add(Bytes.toBytes("f"), Bytes.toBytes("id"), Bytes.toBytes("4"));
           put3.add(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("user4"));

           List<Put> list = new ArrayList<>();
           list.add(put1);
           list.add(put2);
           list.add(put3);
           hTable.put(list);

           // 检测put,条件成功就插入,要求rowkey是相同的
           // 数据只插入一次,不再修改
           Put put4 = new Put(Bytes.toBytes("row5"));
           put4.add(Bytes.toBytes("f"), Bytes.toBytes("id"), Bytes.toBytes("6"));
           hTable.checkAndPut(Bytes.toBytes("row5"), Bytes.toBytes("f"), Bytes.toBytes("id"), null, put4);
           System.out.println("插入成功");
     }

     /**
 * 测试get命令
 *
 * @param hTable
 * @throws IOException
 */
     static void testGet(HTableInterface hTable) throws IOException {
           Get get = new Get(Bytes.toBytes("row1"));
           Result result = hTable.get(get);
           byte[] buf = result.getValue(family, Bytes.toBytes("id"));
           System.out.println("id: " + Bytes.toString(buf));
           buf = result.getValue(family, Bytes.toBytes("age"));
           System.out.println("age: " + Bytes.toString(buf));
           buf = result.getValue(family, Bytes.toBytes("name"));
           System.out.println("name: " + Bytes.toString(buf));

           buf = result.getRow(); // 获取rowkey
           System.out.println("row: " + Bytes.toString(buf));
     }

     /**
 * 测试delete命令
 *
 * @param hTable
 * @throws IOException
 */
     static void testDelete(HTableInterface hTable) throws IOException {
           Delete delete = new Delete(Bytes.toBytes("row3"));
           delete.deleteColumn(family, Bytes.toBytes("name"));
           hTable.delete(delete);
           System.out.println("删除成功");
           // 直接删除family
           // delete.deleteFamily(family);
     }
}