如何将侧面输入/额外输入传递给 JdbcIO RowMapper Java

我正在尝试使用 JdbcIO.Read 读取 Java Beam 中的云 SQL 表。我想使用 .withRowMapper(Resultset resultSet) 方法将 Resultset 中的每一行转换为 GenericData.Record。有没有办法可以将 JSON 架构字符串作为 .withRowMapper 方法中的输入传递,例如 ParDo 接受 sideInputs 作为 PCollectionView


我尝试过执行这两种读取操作(在同一 JdbcIO.Read 转换中从 information_schema.columns 和 My Table 读取)。但是,我想先生成 Schema PCollection,然后使用 JdbcIO.Read 读取表


我正在动态生成表的 Avro 模式,如下所示:


PCollection<String> avroSchema= pipeline.apply(JdbcIO.<String>read()

                .withDataSourceConfiguration(config)

                .withCoder(StringUtf8Coder.of())

                .withQuery("SELECT DISTINCT column_name, data_type \n" +

                        "FROM information_schema.columns\n" +

                        "WHERE table_name = " + "'" + tableName + "'")

                .withRowMapper((JdbcIO.RowMapper<String>) resultSet -> {

            // code here to generate avro schema string

           // this works fine for me


}))

创建 PCollectionView 它将保存每个表的 json 模式。


 PCollectionView<String> s = avroSchema.apply(View.<String>asSingleton());


// I want to access this view as side input in next JdbcIO.Read operation

// something like this ;


pipeline.apply(JdbcIO.<String>read()

        .withDataSourceConfiguration(config)

        .withCoder(StringUtf8Coder.of())

        .withQuery(queryString)

        .withRowMapper(new JdbcIO.RowMapper<String>() {


            @Override

            public String mapRow(ResultSet resultSet) throws Exception {

                // access schema here and use it to parse and create 

               //GenericData.Record from ResultSet fields as per schema


                return null;

            }

        })).


    withSideInputs(My PCollectionView here); // this option is not there right now.

有没有更好的方法来解决这个问题?


犯罪嫌疑人X
浏览 100回答 1
1回答

千巷猫影

此时 IO API 不接受 SideInputs。在读取后立即添加 ParDo 并在那里进行映射应该是可行的。ParDo 可以接受侧面输入。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java