手记

18、大数据之HBase开发

1. hbase开发

1.1. 配置

HBaseConfiguration

包:org.apache.hadoop.hbase.HBaseConfiguration

作用:通过此类可以对HBase进行配置

用法实例:

Configuration config = HBaseConfiguration.create();

说明: HBaseConfiguration.create() 默认会从classpath 中查找 hbase-site.xml 中的配置信息,初始化 Configuration。 

使用方法:

static Configuration config = null;
static {
     config = HBaseConfiguration.create();
     config.set("hbase.zookeeper.quorum", "slave1,slave2,slave3");
     config.set("hbase.zookeeper.property.clientPort", "2181");

}


1.2. 表管理类

HBaseAdmin

包:org.apache.hadoop.hbase.client.HBaseAdmin

作用:提供接口关系HBase 数据库中的表信息

 

用法:

HBaseAdmin admin = new HBaseAdmin(config);


1.3. 表描述类

HTableDescriptor

包:org.apache.hadoop.hbase.HTableDescriptor

作用:HTableDescriptor 类包含了表的名字以及表的列族信息

          表的schema(设计)

用法:

HTableDescriptor htd =new HTableDescriptor(tablename);

htd.addFamily(new HColumnDescriptor(“myFamily”));


1.4. 列族的描述类

HColumnDescriptor

包:org.apache.hadoop.hbase.HColumnDescriptor

作用:HColumnDescriptor 维护列族的信息

 

用法:

htd.addFamily(new HColumnDescriptor(“myFamily”));


1.5. 创建表的操作


CreateTable(一般我们用shell创建表)


static Configuration config = null;


static {


     config = HBaseConfiguration.create();


     config.set("hbase.zookeeper.quorum", "slave1,slave2,slave3");


     config.set("hbase.zookeeper.property.clientPort", "2181");


}


 


HBaseAdmin admin = new HBaseAdmin(config);


HTableDescriptor desc = new HTableDescriptor(tableName);


HColumnDescriptor family1 = new HColumnDescriptor(“f1”);


HColumnDescriptor family2 = new HColumnDescriptor(“f2”);


desc.addFamily(family1);


desc.addFamily(family2);


admin.createTable(desc);


1.6. 删除表

HBaseAdmin admin = new HBaseAdmin(config);

admin.disableTable(tableName);

admin.deleteTable(tableName);


1.7. 创建一个表的类

HTable

包:org.apache.hadoop.hbase.client.HTable

作用:HTable 和 HBase 的表通信

用法:

// 普通获取表

 HTable table = new HTable(config,Bytes.toBytes(tablename);

// 通过连接池获取表

Connection connection = ConnectionFactory.createConnection(config);

HTableInterface table = connection.getTable(TableName.valueOf("user"));


1.8. 单条插入数据

Put

包:org.apache.hadoop.hbase.client.Put

作用:插入数据

用法:

Put put = new Put(row);

p.add(family,qualifier,value);

说明:向表 tablename 添加 “family,qualifier,value”指定的值。

 

示例代码:

Connection connection = ConnectionFactory.createConnection(config);

HTableInterface table = connection.getTable(TableName.valueOf("user"));

Put put = new Put(Bytes.toBytes(rowKey));

put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier),Bytes.toBytes(value));

table.put(put);


1.9. 批量插入

批量插入

List<Put> list = new ArrayList<Put>();

Put put = new Put(Bytes.toBytes(rowKey));//获取put,用于插入

put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier),Bytes.toBytes(value));//封装信息

list.add(put);

table.put(list);//添加记录


1.10. 删除数据

Delete

包:org.apache.hadoop.hbase.client.Delete

作用:删除给定rowkey的数据

用法:

Delete del= new Delete(Bytes.toBytes(rowKey));

table.delete(del);

代码实例

Connection connection = ConnectionFactory.createConnection(config);

HTableInterface table = connection.getTable(TableName.valueOf("user"));

Delete del= new Delete(Bytes.toBytes(rowKey));

table.delete(del);


1.11. 单条查询

Get

包:org.apache.hadoop.hbase.client.Get

作用:获取单个行的数据

用法:

HTable table = new HTable(config,Bytes.toBytes(tablename));

Get get = new Get(Bytes.toBytes(row));

Result result = table.get(get);

说明:获取 tablename 表中 row 行的对应数据

 

代码示例:

Connection connection = ConnectionFactory.createConnection(config);

HTableInterface table = connection.getTable(TableName.valueOf("user"));

Get get = new Get(rowKey.getBytes());

Result row = table.get(get);

for (KeyValue kv : row.raw()) {

System.out.print(new String(kv.getRow()) + " ");

System.out.print(new String(kv.getFamily()) + ":");

System.out.print(new String(kv.getQualifier()) + " = ");

System.out.print(new String(kv.getValue()));

System.out.print(" timestamp = " + kv.getTimestamp() + "\n");

}


1.12. 批量查询

ResultScanner

包:org.apache.hadoop.hbase.client.ResultScanner

作用:获取值的接口

用法:

ResultScanner scanner = table.getScanner(scan);

For(Result rowResult : scanner){

        Bytes[] str = rowResult.getValue(family,column);

}

说明:循环获取行中列值。

 

代码示例:

Connection connection = ConnectionFactory.createConnection(config);

HTableInterface table = connection.getTable(TableName.valueOf("user"));

Scan scan = new Scan();

scan.setStartRow("a1".getBytes());

scan.setStopRow("a20".getBytes());

ResultScanner scanner = table.getScanner(scan);

for (Result row : scanner) {

System.out.println("\nRowkey: " + new String(row.getRow()));

for (KeyValue kv : row.raw()) {

     System.out.print(new String(kv.getRow()) + " ");

     System.out.print(new String(kv.getFamily()) + ":");

     System.out.print(new String(kv.getQualifier()) + " = ");

     System.out.print(new String(kv.getValue()));

     System.out.print(" timestamp = " + kv.getTimestamp() + "\n");

}

}

1.13. hbase过滤器

1.13.1. FilterList

FilterList 代表一个过滤器列表,可以添加多个过滤器进行查询,多个过滤器之间的关系有:

与关系(符合所有):FilterList.Operator.MUST_PASS_ALL  

或关系(符合任一):FilterList.Operator.MUST_PASS_ONE

 

使用方法:

FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);   

Scan s1 = new Scan();  

 filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes(“f1”),  Bytes.toBytes(“c1”),  CompareOp.EQUAL,Bytes.toBytes(“v1”) )  );  

filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes(“f1”),  Bytes.toBytes(“c2”),  CompareOp.EQUAL,Bytes.toBytes(“v2”) )  );  

 // 添加下面这一行后,则只返回指定的cell,同一行中的其他cell不返回  

 s1.addColumn(Bytes.toBytes(“f1”), Bytes.toBytes(“c1”));  

 s1.setFilter(filterList);  //设置filter

 ResultScanner ResultScannerFilterList = table.getScanner(s1);  //返回结果列表


1.13.2. 过滤器的种类

过滤器的种类:

列植过滤器—SingleColumnValueFilter

      过滤列植的相等、不等、范围等

列名前缀过滤器—ColumnPrefixFilter

      过滤指定前缀的列名

多个列名前缀过滤器—MultipleColumnPrefixFilter

       过滤多个指定前缀的列名

rowKey过滤器—RowFilter

      通过正则,过滤rowKey值。


1.13.3. 列植过滤器—SingleColumnValueFilter

SingleColumnValueFilter 列值判断

相等 (CompareOp.EQUAL ),

不等(CompareOp.NOT_EQUAL),

范围 (e.g., CompareOp.GREATER)…………

下面示例检查列值和字符串'values' 相等...

SingleColumnValueFilter f = new  SingleColumnValueFilter(

Bytes.toBytes("cFamily")              Bytes.toBytes("column"), CompareFilter.CompareOp.EQUAL,

        Bytes.toBytes("values"));

s1.setFilter(f);

注意:如果过滤器过滤的列在数据表中有的行中不存在,那么这个过滤器对此行无法过滤。


1.13.4. 列名前缀过滤器—ColumnPrefixFilter

过滤器—ColumnPrefixFilter

ColumnPrefixFilter 用于指定列名前缀值相等

ColumnPrefixFilter f = new ColumnPrefixFilter(Bytes.toBytes("values"));

s1.setFilter(f);


1.13.5. 多个列值前缀过滤器—MultipleColumnPrefixFilter

MultipleColumnPrefixFilter 和 ColumnPrefixFilter 行为差不多,但可以指定多个前缀

byte[][] prefixes = new byte[][] {Bytes.toBytes("value1"),Bytes.toBytes("value2")};

Filter f = new MultipleColumnPrefixFilter(prefixes);

s1.setFilter(f);


1.13.6. rowKey过滤器—RowFilter

RowFilter 是rowkey过滤器

通常根据rowkey来指定范围时,使用scan扫描器的StartRow和StopRow方法比较好。

Filter f = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("^1234")); //匹配以1234开头的rowkey

s1.setFilter(f);

 

2. MapReduce操作Hbase

2.1. 实现方法

Hbase对MapReduce提供支持,它实现了TableMapper类和TableReducer类,我们只需要继承这两个类即可。


1、写个mapper继承TableMapper<Text, IntWritable>

参数:Text:mapper的输出key类型; IntWritable:mapper的输出value类型。

      其中的map方法如下:

map(ImmutableBytesWritable key, Result value,Context context)

 参数:key:rowKey;value: Result ,一行数据; context上下文


2、写个reduce继承TableReducer<Text, IntWritable, ImmutableBytesWritable>

参数:Text:reducer的输入key; IntWritable:reduce的输入value;

 ImmutableBytesWritable:reduce输出到hbase中的rowKey类型。

      其中的reduce方法如下:

reduce(Text key, Iterable<IntWritable> values,Context context)

参数: key:reduce的输入key;values:reduce的输入value;

 

2.2. 准备表

1、建立数据来源表‘word’,包含一个列族‘content’

向表中添加数据,在列族中放入列‘info’,并将短文数据放入该列中,如此插入多行,行键为不同的数据即可

2、建立输出表‘stat’,包含一个列族‘content’

3、通过Mr操作Hbase的‘word’表,对‘content:info’中的短文做词频统计,并将统计结果写入‘stat’表的‘content:info中’,行键为单词


2.3. 实现

[java] view plain copy print?

  1. <span style="font-size:14px;">package com.itcast.hbase;  

  2.   

  3. import java.io.IOException;  

  4. import java.util.ArrayList;  

  5. import java.util.List;  

  6.   

  7. import org.apache.hadoop.conf.Configuration;  

  8. import org.apache.hadoop.hbase.HBaseConfiguration;  

  9. import org.apache.hadoop.hbase.HColumnDescriptor;  

  10. import org.apache.hadoop.hbase.HTableDescriptor;  

  11. import org.apache.hadoop.hbase.client.HBaseAdmin;  

  12. import org.apache.hadoop.hbase.client.HTable;  

  13. import org.apache.hadoop.hbase.client.Put;  

  14. import org.apache.hadoop.hbase.client.Result;  

  15. import org.apache.hadoop.hbase.client.Scan;  

  16. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  

  17. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;  

  18. import org.apache.hadoop.hbase.mapreduce.TableMapper;  

  19. import org.apache.hadoop.hbase.mapreduce.TableReducer;  

  20. import org.apache.hadoop.hbase.util.Bytes;  

  21. import org.apache.hadoop.io.IntWritable;  

  22. import org.apache.hadoop.io.Text;  

  23. import org.apache.hadoop.mapreduce.Job;  

  24. /** 

  25.  * mapreduce操作hbase 

  26.  * @author wilson 

  27.  * 

  28.  */  

  29. public class HBaseMr {  

  30.     /** 

  31.      * 创建hbase配置 

  32.      */  

  33.     static Configuration config = null;  

  34.     static {  

  35.         config = HBaseConfiguration.create();  

  36.         config.set("hbase.zookeeper.quorum", "slave1,slave2,slave3");  

  37.         config.set("hbase.zookeeper.property.clientPort", "2181");  

  38.     }  

  39.     /** 

  40.      * 表信息 

  41.      */  

  42.     public static final String tableName = "word";//表名1  

  43.     public static final String colf = "content";//列族  

  44.     public static final String col = "info";//列  

  45.     public static final String tableName2 = "stat";//表名2  

  46.     /** 

  47.      * 初始化表结构,及其数据 

  48.      */  

  49.     public static void initTB() {  

  50.         HTable table=null;  

  51.         HBaseAdmin admin=null;  

  52.         try {  

  53.             admin = new HBaseAdmin(config);//创建表管理  

  54.             /*删除表*/  

  55.             if (admin.tableExists(tableName)||admin.tableExists(tableName2)) {  

  56.                 System.out.println("table is already exists!");  

  57.                 admin.disableTable(tableName);  

  58.                 admin.deleteTable(tableName);  

  59.                 admin.disableTable(tableName2);  

  60.                 admin.deleteTable(tableName2);  

  61.             }  

  62.             /*创建表*/  

  63.                 HTableDescriptor desc = new HTableDescriptor(tableName);  

  64.                 HColumnDescriptor family = new HColumnDescriptor(colf);  

  65.                 desc.addFamily(family);  

  66.                 admin.createTable(desc);  

  67.                 HTableDescriptor desc2 = new HTableDescriptor(tableName2);  

  68.                 HColumnDescriptor family2 = new HColumnDescriptor(colf);  

  69.                 desc2.addFamily(family2);  

  70.                 admin.createTable(desc2);  

  71.             /*插入数据*/  

  72.                 table = new HTable(config,tableName);  

  73.                 table.setAutoFlush(false);  

  74.                 table.setWriteBufferSize(5);  

  75.                 List<Put> lp = new ArrayList<Put>();  

  76.                 Put p1 = new Put(Bytes.toBytes("1"));  

  77.                 p1.add(colf.getBytes(), col.getBytes(), ("The Apache Hadoop software library is a framework").getBytes());  

  78.                 lp.add(p1);  

  79.                 Put p2 = new Put(Bytes.toBytes("2"));p2.add(colf.getBytes(),col.getBytes(),("The common utilities that support the other Hadoop modules").getBytes());  

  80.                 lp.add(p2);  

  81.                 Put p3 = new Put(Bytes.toBytes("3"));  

  82.                 p3.add(colf.getBytes(), col.getBytes(),("Hadoop by reading the documentation").getBytes());  

  83.                 lp.add(p3);  

  84.                 Put p4 = new Put(Bytes.toBytes("4"));  

  85.                 p4.add(colf.getBytes(), col.getBytes(),("Hadoop from the release page").getBytes());  

  86.                 lp.add(p4);  

  87.                 Put p5 = new Put(Bytes.toBytes("5"));  

  88.                 p5.add(colf.getBytes(), col.getBytes(),("Hadoop on the mailing list").getBytes());  

  89.                 lp.add(p5);  

  90.                 table.put(lp);  

  91.                 table.flushCommits();  

  92.                 lp.clear();  

  93.         } catch (Exception e) {  

  94.             e.printStackTrace();  

  95.         } finally {  

  96.             try {  

  97.                 if(table!=null){  

  98.                     table.close();  

  99.                 }  

  100.             } catch (IOException e) {  

  101.                 e.printStackTrace();  

  102.             }  

  103.         }  

  104.     }  

  105.     /** 

  106.      * MyMapper 继承 TableMapper 

  107.      * TableMapper<Text,IntWritable>  

  108.      * Text:输出的key类型, 

  109.      * IntWritable:输出的value类型 

  110.      */  

  111.     public static class MyMapper extends TableMapper<Text, IntWritable> {  

  112.         private static IntWritable one = new IntWritable(1);  

  113.         private static Text word = new Text();  

  114.         @Override  

  115.         //输入的类型为:key:rowKey; value:一行数据的结果集Result  

  116.         protected void map(ImmutableBytesWritable key, Result value,  

  117.                 Context context) throws IOException, InterruptedException {  

  118.             //获取一行数据中的colf:col  

  119.             String words = Bytes.toString(value.getValue(Bytes.toBytes(colf), Bytes.toBytes(col)));// 表里面只有一个列族,所以我就直接获取每一行的值  

  120.             //按空格分割  

  121.             String itr[] = words.toString().split(" ");  

  122.             //循环输出word和1  

  123.             for (int i = 0; i < itr.length; i++) {  

  124.                 word.set(itr[i]);  

  125.                 context.write(word, one);  

  126.             }  

  127.         }  

  128.     }  

  129.     /** 

  130.      * MyReducer 继承 TableReducer 

  131.      * TableReducer<Text,IntWritable>  

  132.      * Text:输入的key类型, 

  133.      * IntWritable:输入的value类型, 

  134.      * ImmutableBytesWritable:输出类型,表示rowkey的类型 

  135.      */  

  136.     public static class MyReducer extends  

  137.             TableReducer<Text, IntWritable, ImmutableBytesWritable> {  

  138.         @Override  

  139.         protected void reduce(Text key, Iterable<IntWritable> values,  

  140.                 Context context) throws IOException, InterruptedException {  

  141.             //对mapper的数据求和  

  142.             int sum = 0;  

  143.             for (IntWritable val : values) {//叠加  

  144.                 sum += val.get();  

  145.             }  

  146.             // 创建put,设置rowkey为单词  

  147.             Put put = new Put(Bytes.toBytes(key.toString()));  

  148.             // 封装数据  

  149.             put.add(Bytes.toBytes(colf), Bytes.toBytes(col),Bytes.toBytes(String.valueOf(sum)));  

  150.             //写到hbase,需要指定rowkey、put  

  151.             context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())),put);  

  152.         }  

  153.     }  

  154.       

  155.     public static void main(String[] args) throws IOException,  

  156.             ClassNotFoundException, InterruptedException {  

  157.         config.set("df.default.name", "hdfs://master:9000/");//设置hdfs的默认路径  

  158.         config.set("hadoop.job.ugi", "hadoop,hadoop");//用户名,组  

  159.         config.set("mapred.job.tracker", "master:9001");//设置jobtracker在哪  

  160.         //初始化表  

  161.         initTB();//初始化表  

  162.         //创建job  

  163.         Job job = new Job(config, "HBaseMr");//job  

  164.         job.setJarByClass(HBaseMr.class);//主类  

  165.         //创建scan  

  166.         Scan scan = new Scan();  

  167.         //可以指定查询某一列  

  168.         scan.addColumn(Bytes.toBytes(colf), Bytes.toBytes(col));  

  169.         //创建查询hbase的mapper,设置表名、scan、mapper类、mapper的输出key、mapper的输出value  

  170.         TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class,Text.class, IntWritable.class, job);  

  171.         //创建写入hbase的reducer,指定表名、reducer类、job  

  172.         TableMapReduceUtil.initTableReducerJob(tableName2, MyReducer.class, job);  

  173.         System.exit(job.waitForCompletion(true) ? 0 : 1);  

  174.     }  

  175. }</span>  

原文出处

0人推荐
随时随地看视频
慕课网APP