上文提到,使用kudu等列式存储将数据以update模式写入kudu.
下面说一下java操作kudu的相关demo。java操作kudu在git上有相关demo,而spark操作kudu并没有。cloudera官网的操作中只提到了scala版本。本文列举java操作kudu的全示例,仅供入门参考。(痛苦的是sparksql查询kudu的java实现,官方没有示例,google也不好用)
1)pom依赖
<!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-client --> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client</artifactId> <version>1.5.0-cdh5.13.1</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-client-tools --> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client-tools</artifactId> <version>1.5.0-cdh5.13.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 --> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-spark2_2.11</artifactId> <version>1.6.0</version> </dependency>
本文用的是cloudera版本,添加:
<repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories>
2)功能列表:
使用kuduClient创建表;
使用kuduClient添加数据;
使用kuduClient更新数据;
使用kuduClient查询数据;
使用kuduClient删除表;
使用sparksql查询数据;
使用spark---kuduContext判断表存在
ps:sparksql查询数据在cloudera官网只有scala版本。google也难找到java版的具体写法。查看源码,实际上通过format来指定包路径,制定的路径下包含实现了sparksql的DefaultSource即可。如spark.kudu包中存在DefaultSource类便可以被sparksql识别。
举一反三,其他的库也可以通过此方式访问。同时要扩展集成一个可以供sparksql查询的库也可以通过此方式实现。
package org.apache.kudu.spark.kudu @org.apache.yetus.audience.InterfaceStability.Unstableclass DefaultSource() extends scala.AnyRef with org.apache.spark.sql.sources.RelationProvider with org.apache.spark.sql.sources.CreatableRelationProvider with org.apache.spark.sql.sources.SchemaRelationProvider { val TABLE_KEY : java.lang.String = { /* compiled code */ } val KUDU_MASTER : java.lang.String = { /* compiled code */ } val OPERATION : java.lang.String = { /* compiled code */ } val FAULT_TOLERANT_SCANNER : java.lang.String = { /* compiled code */ } val SCAN_LOCALITY : java.lang.String = { /* compiled code */ } def defaultMasterAddrs : scala.Predef.String = { /* compiled code */ } override def createRelation(sqlContext : org.apache.spark.sql.SQLContext, parameters : scala.Predef.Map[scala.Predef.String, scala.Predef.String]) : org.apache.spark.sql.sources.BaseRelation = { /* compiled code */ } override def createRelation(sqlContext : org.apache.spark.sql.SQLContext, mode : org.apache.spark.sql.SaveMode, parameters : scala.Predef.Map[scala.Predef.String, scala.Predef.String], data : org.apache.spark.sql.DataFrame) : org.apache.spark.sql.sources.BaseRelation = { /* compiled code */ } override def createRelation(sqlContext : org.apache.spark.sql.SQLContext, parameters : scala.Predef.Map[scala.Predef.String, scala.Predef.String], schema : org.apache.spark.sql.types.StructType) : org.apache.spark.sql.sources.BaseRelation = { /* compiled code */ } }
3)代码示例:
import jdk.nashorn.internal.ir.annotations.Ignore;import org.apache.kudu.ColumnSchema;import org.apache.kudu.Schema;import org.apache.kudu.Type;import org.apache.kudu.client.*;import org.apache.kudu.spark.kudu.KuduContext;import org.apache.spark.SparkConf;import org.apache.spark.SparkContext;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import org.junit.Test;import java.util.ArrayList;import java.util.Arrays;import java.util.List;/** * @ClassName: KuduUtil * @Description:用于操作kudu的示例代码 * @author jason.li * @date 2018年1月11日 下午3:45:06 */@Ignorepublic class KuduUtil { private static final String KUDU_MASTER = "10.1.0.20:7051"; private static String tableName = "TestKudu"; @Test public void kuduCreateTableTest(){ KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { List<ColumnSchema> columns = new ArrayList(2); columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.STRING) .key(true) .build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING) .build()); List<String> rangeKeys = new ArrayList<>(); rangeKeys.add("key"); Schema schema = new Schema(columns); client.createTable(tableName, schema, new CreateTableOptions().setRangePartitionColumns(rangeKeys)); } catch (Exception e) { e.printStackTrace(); }finally { try { client.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } @Test public void kuduSaveTest(){ KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try{ KuduTable table = client.openTable(tableName); KuduSession session = client.newSession(); System.out.println("-------start--------"+System.currentTimeMillis()); for (int i = 30000; i < 31000; i++) { Insert insert = table.newInsert(); PartialRow row = insert.getRow(); row.addString(0, i+""); row.addString(1, "aaa"); OperationResponse operationResponse = session.apply(insert); } System.out.println("-------end--------"+System.currentTimeMillis()); } catch (Exception e) { e.printStackTrace(); }finally { try { client.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } @Test public void kuduUpdateTest(){ KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { KuduTable table = client.openTable(tableName); KuduSession session = client.newSession(); Update update = table.newUpdate(); PartialRow row = update.getRow(); row.addString("key", 4+""); row.addString("value", "value " + 10); OperationResponse operationResponse = session.apply(update); System.out.print(operationResponse.getRowError()); } catch (Exception e) { e.printStackTrace(); }finally { try { client.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } @Test public void kuduSearchTest(){ KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { KuduTable table = client.openTable(tableName); List<String> projectColumns = new ArrayList<>(1); projectColumns.add("value"); KuduScanner scanner = client.newScannerBuilder(table) .setProjectedColumnNames(projectColumns) .build(); while (scanner.hasMoreRows()) { RowResultIterator results = scanner.nextRows(); while (results.hasNext()) { RowResult result = results.next(); System.out.println(result.getString(0)); } } } catch (Exception e) { e.printStackTrace(); }finally { try { client.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } @Test public void kuduDelTabletest(){ KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { client.deleteTable(tableName); } catch (Exception e) { e.printStackTrace(); } finally { try { client.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } @Test public void searchBysparkSql(){ SparkSession sparkSession = getSparkSession(); List<StructField> fields = Arrays.asList( DataTypes.createStructField("key", DataTypes.StringType, true), DataTypes.createStructField("value", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(fields); Dataset ds = sparkSession.read().format("org.apache.kudu.spark.kudu"). schema(schema).option("kudu.master","10.1.0.20:7051").option("kudu.table","TestKudu").load(); ds.registerTempTable("abc"); sparkSession.sql("select * from abc").show(); } @Test public void checkTableExistByKuduContext(){ SparkSession sparkSession = getSparkSession(); KuduContext context = new KuduContext("10.1.0.20:7051",sparkSession.sparkContext()); System.out.println(tableName +" is exist = "context.tableExists(tableName)); } public SparkSession getSparkSession(){ SparkConf conf = new SparkConf().setAppName("test") .setMaster("local[*]") .set("spark.driver.userClassPathFirst", "true"); conf.set("spark.sql.crossJoin.enabled", "true"); SparkContext sparkContext = new SparkContext(conf); SparkSession sparkSession = SparkSession.builder().sparkContext(sparkContext).getOrCreate(); return sparkSession; } }
作者:假文艺的真码农
链接:https://www.jianshu.com/p/ef1a621fc6ea