我正在尝试使用 JdbcIO.Read 读取 Java Beam 中的云 SQL 表。我想使用 .withRowMapper(Resultset resultSet) 方法将 Resultset 中的每一行转换为 GenericData.Record。有没有办法可以将 JSON 架构字符串作为 .withRowMapper 方法中的输入传递,例如 ParDo 接受 sideInputs 作为 PCollectionView
我尝试过执行这两种读取操作(在同一 JdbcIO.Read 转换中从 information_schema.columns 和 My Table 读取)。但是,我想先生成 Schema PCollection,然后使用 JdbcIO.Read 读取表
我正在动态生成表的 Avro 模式,如下所示:
PCollection<String> avroSchema= pipeline.apply(JdbcIO.<String>read()
.withDataSourceConfiguration(config)
.withCoder(StringUtf8Coder.of())
.withQuery("SELECT DISTINCT column_name, data_type \n" +
"FROM information_schema.columns\n" +
"WHERE table_name = " + "'" + tableName + "'")
.withRowMapper((JdbcIO.RowMapper<String>) resultSet -> {
// code here to generate avro schema string
// this works fine for me
}))
创建 PCollectionView 它将保存每个表的 json 模式。
PCollectionView<String> s = avroSchema.apply(View.<String>asSingleton());
// I want to access this view as side input in next JdbcIO.Read operation
// something like this ;
pipeline.apply(JdbcIO.<String>read()
.withDataSourceConfiguration(config)
.withCoder(StringUtf8Coder.of())
.withQuery(queryString)
.withRowMapper(new JdbcIO.RowMapper<String>() {
@Override
public String mapRow(ResultSet resultSet) throws Exception {
// access schema here and use it to parse and create
//GenericData.Record from ResultSet fields as per schema
return null;
}
})).
withSideInputs(My PCollectionView here); // this option is not there right now.
有没有更好的方法来解决这个问题?
千巷猫影
相关分类