small_925_ant
2020-01-29 23:12:17浏览 2279
Spark 结合HBase
最近使用到了spark处理数据落地到hbase,这里简单介绍一下具体的实现。包括单表和多表。
saveAsNewAPIHadoopFile
我们可以直接使用Hbase的java api,这里不做介绍。
我们先看看如何使用新版本Hadoop API saveAsNewAPIHadoopFile来进行将数据写入
Hbase。
val hbaseInfoRDD = logDF.rdd.map(x => {
val col_01= x.getAs[String]("col_01")
val col_02= x.getAs[String]("col_02")
val columns = scala.collection.mutable.HashMap[String,String]()
columns.put("col_01",col_01)
columns.put("col_02",col_02)
val rowkey = getRowKey("自己实现")
val put = new Put(Bytes.toBytes(rowkey))
for((k,v) <- columns) {
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(k.toString), Bytes.toBytes(v.toString));
}
(new ImmutableBytesWritable(rowkey.getBytes), put)
})
val conf = new Configuration()
conf.set("hbase.rootdir","hdfs://hadoop001:8020/hbase")
conf.set("hbase.zookeeper.quorum","hadoop001:2181")
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
hbaseInfoRDD.saveAsNewAPIHadoopFile(
"null",
classOf[ImmutableBytesWritable],
classOf[Put],
classOf[TableOutputFormat[ImmutableBytesWritable]],
conf
)
上面完成了单表的写入,但是当我们需要根据条件写入多张HBase表,又应该怎么办呢?
val hbaseInfoRDD = logDF.rdd.map(x => {
val col_01= x.getAs[String]("col_01")
val day= x.getAs[String]("day")
val columns = scala.collection.mutable.HashMap[String,String]()
columns.put("col_01",col_01)
columns.put("day",day)
val rowkey = getRowKey("自己实现")
val put = new Put(Bytes.toBytes(rowkey))
for((k,v) <- columns) {
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(k.toString), Bytes.toBytes(v.toString));
}
val tableName=day
(new ImmutableBytesWritable(Bytes.toBytes(tableName)), put)
})
val conf = new Configuration()
conf.set("hbase.rootdir","hdfs://hadoop001:8020/hbase")
conf.set("hbase.zookeeper.quorum","hadoop001:2181")
hbaseInfoRDD.saveAsNewAPIHadoopFile(
"null",
classOf[ImmutableBytesWritable],
classOf[Put],
classOf[MultiTableOutputFormat],
conf
)
注意这种情况只能事先用于表存在的情况。
假如需要表不存在的情况下自动建表怎么办好呢?
这里仅仅提供一个思路。可以自己重写MultiTableOutputFormat里面的
getRecordWriter方法来实现。具体源码就需要自己去研究了。
public class MyMultiTableOutputFormat extends MultiTableOutputFormat {
@Override
public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
return new TableOutputCommitter();
}
@Override
public RecordWriter<ImmutableBytesWritable, Mutation> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
return new MyMultiTableRecordWriter(HBaseConfiguration.create(conf),
conf.getBoolean(WAL_PROPERTY, WAL_ON));
}
}
public class MyMultiTableRecordWriter extends RecordWriter<ImmutableBytesWritable, Mutation> {
private static final Log LOG = LogFactory.getLog(MyMultiTableRecordWriter.class);
Connection connection;
Map<ImmutableBytesWritable, BufferedMutator> mutatorMap = new HashMap<>();
Configuration conf;
boolean useWriteAheadLogging;
public static final String FAMILY_NAME="family";
public MyMultiTableRecordWriter(Configuration conf,
boolean useWriteAheadLogging) {
LOG.debug("Created new MultiTableRecordReader with WAL "
+ (useWriteAheadLogging ? "on" : "off"));
this.conf = conf;
this.useWriteAheadLogging = useWriteAheadLogging;
}
BufferedMutator getBufferedMutator(ImmutableBytesWritable tableName) throws IOException {
if(this.connection == null){
this.connection = ConnectionFactory.createConnection(conf);
}
if (!mutatorMap.containsKey(tableName)) {
LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get())+ "\" for writing");
TableName tableNameObj=TableName.valueOf(tableName.get());
BufferedMutator mutator =
connection.getBufferedMutator(tableNameObj);
createIfNotExist(connection,tableNameObj);
mutatorMap.put(tableName, mutator);
}
return mutatorMap.get(tableName);
}
private void createIfNotExist(Connection connection,TableName tableName){
try {
Admin admin=connection.getAdmin();
if(!admin.tableExists(tableName)){
HTableDescriptor desc=new HTableDescriptor(tableName);
String familyName=this.conf.get(FAMILY_NAME);
HColumnDescriptor hColumnDescriptor=new HColumnDescriptor(familyName);
desc.addFamily(hColumnDescriptor);
admin.createTable(desc);
}
}catch (Exception e){
LOG.error("error:{}",e);
}
}
......省略.....
}
conf.set(MyMultiTableRecordWriter.FAMILY_NAME,"indo_demo")
hbaseInfoRDD.saveAsNewAPIHadoopFile(
"null",
classOf[ImmutableBytesWritable],
classOf[Put],
classOf[MyMultiTableOutputFormat],
conf
)