猿问

如何读取hbase数据给reduce?

如何读取hbase数据给reduce


RISEBY
浏览 698回答 1
1回答

陪伴而非守候

WordCountHbaseReaderMapper类继承了TableMapper< Text,Text>抽象类,TableMapper类专门用于完成MapReduce中Map过程与Hbase表之间的操作。此时的map(ImmutableBytesWritable key,Result value,Context context)方法,第一个参数key为Hbase表的rowkey主键,第二个参数value为key主键对应的记录集合,此处的map核心实现是遍历key主键对应的记录集合value,将其组合成一条记录通过contentx.write(key,value)填充到< key,value>键值对中。详细源码请参考:WordCountHbaseReader\src\com\zonesion\hbase\WordCountHbaseReader.javapublic static class WordCountHbaseReaderMapper extendsTableMapper<Text,Text>{@Overrideprotected void map(ImmutableBytesWritable key,Result value,Context context)throws IOException, InterruptedException {StringBuffer sb = new StringBuffer("");for(Entry<byte[],byte[]> entry:value.getFamilyMap("content".getBytes()).entrySet()){String str = new String(entry.getValue());//将字节数组转换为String类型if(str != null){sb.append(new String(entry.getKey()));sb.append(":");sb.append(str);}context.write(new Text(key.get()), new Text(new String(sb)));}}}3、 Reducer函数实现此处的WordCountHbaseReaderReduce实现了直接输出Map输出的< key,value>键值对,没有对其做任何处理。详细源码请参考:WordCountHbaseReader\src\com\zonesion\hbase\WordCountHbaseReader.javapublic static class WordCountHbaseReaderReduce extends Reducer<Text,Text,Text,Text>{private Text result = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values,Context context)throws IOException, InterruptedException {for(Text val:values){result.set(val);context.write(key, result);}}}4、 驱动函数实现与WordCount的驱动类不同,在Job配置的时候没有配置job.setMapperClass(),而是用以下方法执行Mapper类: TableMapReduceUtil.initTableMapperJob(tablename,scan,WordCountHbaseReaderMapper.class, Text.class, Text.class, job);该方法指明了在执行job的Map过程时,数据输入源是hbase的tablename表,通过扫描读入对象scan对表进行全表扫描,为Map过程提供数据源输入,通过WordCountHbaseReaderMapper.class执行Map过程,Map过程的输出key/value类型是 Text.class与Text.class,最后一个参数是作业对象。特别注意:这里声明的是一个最简单的扫描读入对象scan,进行表扫描读取数据,其中scan可以配置参数,这里为了例子简单不再详述,用户可自行尝试。详细源码请参考:WordCountHbaseReader\src\com\zonesion\hbase\WordCountHbaseReader.javapublic static void main(String[] args) throws Exception {String tablename = "wordcount";Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", "Master");String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 1) {System.err.println("Usage: WordCountHbaseReader <out>");System.exit(2);}Job job = new Job(conf, "WordCountHbaseReader");job.setJarByClass(WordCountHbaseReader.class);//设置任务数据的输出路径;FileOutputFormat.setOutputPath(job, new Path(otherArgs[0]));job.setReducerClass(WordCountHbaseReaderReduce.class);Scan scan = new Scan();TableMapReduceUtil.initTableMapperJob(tablename,scan,WordCountHbaseReaderMapper.class, Text.class, Text.class, job);//调用job.waitForCompletion(true) 执行任务,执行成功后退出;System.exit(job.waitForCompletion(true) ? 0 : 1);}5、部署运行1)启动Hadoop集群和Hbase服务[hadoop@K-Master ~]$ start-dfs.sh #启动hadoop HDFS文件管理系统[hadoop@K-Master ~]$ start-mapred.sh #启动hadoop MapReduce分布式计算服务[hadoop@K-Master ~]$ start-hbase.sh #启动Hbase[hadoop@K-Master ~]$ jps #查看进程22003 HMaster10611 SecondaryNameNode22226 Jps21938 HQuorumPeer10709 JobTracker22154 HRegionServer20277 Main10432 NameNode
随时随地看视频慕课网APP
我要回答