猿问

如何使用 avro 在镶木地板文件架构中创建重复类型?

我们正在创建一个数据流管道,我们将从 postgres 读取数据并将其写入镶木地板文件。ParquetIO.Sink 允许您将 GenericRecord 的 PCollection 写入 Parquet 文件(来自此处https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/io/parquet/ParquetIO。网页)。但是镶木地板文件架构并不像我预期的那样


这是我的模式:


schema = new org.apache.avro.Schema.Parser().parse("{\n" +

         "     \"type\": \"record\",\n" +

         "     \"namespace\": \"com.example\",\n" +

         "     \"name\": \"Patterns\",\n" +

         "     \"fields\": [\n" +

         "       { \"name\": \"id\", \"type\": \"string\" },\n" +

         "       { \"name\": \"name\", \"type\": \"string\" },\n" +

         "       { \"name\": \"createdAt\", \"type\": {\"type\":\"string\",\"logicalType\":\"timestamps-millis\"} },\n" +

         "       { \"name\": \"updatedAt\", \"type\": {\"type\":\"string\",\"logicalType\":\"timestamps-millis\"} },\n" +

         "       { \"name\": \"steps\", \"type\": [\"null\",{\"type\":\"array\",\"items\":{\"type\":\"string\",\"name\":\"json\"}}] },\n" +

         "     ]\n" +

         "}");

到目前为止,这是我的代码:


Pipeline p = Pipeline.create(

        PipelineOptionsFactory.fromArgs(args).withValidation().create());


p.apply(JdbcIO.<GenericRecord> read()

       .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(

             "org.postgresql.Driver", "jdbc:postgresql://localhost:port/database")

             .withUsername("username")

             .withPassword("password"))

       .withQuery("select * from table limit(10)")

       .withCoder(AvroCoder.of(schema))

       .withRowMapper((JdbcIO.RowMapper<GenericRecord>) resultSet -> {

            GenericRecord record = new GenericData.Record(schema);

            ResultSetMetaData metadata = resultSet.getMetaData();

            int columnsNumber = metadata.getColumnCount();

素胚勾勒不出你
浏览 93回答 2
2回答

皈依舞

我没有找到从 Avro 创建不在 GroupType 中的重复元素的方法。Beam 中的 ParquetIO 使用项目中定义的“标准”avro 转换,在这里parquet-mr实现。似乎有两种方法可以将 Avro ARRAY 字段转换为 Parquet 消息——但它们都没有创建您正在寻找的内容。目前,avro 转换是目前与 ParquetIO 交互的唯一方式。我在 ParquetIO 中看到了这个 JIRA&nbsp;Use Beam 模式,将其扩展到 Beam Rows,这可能允许不同的 parquet 消息策略。或者,您可以为 ParquetIO 创建 JIRA 功能请求以支持 thrift 结构,这应该允许更好地控制 parquet 结构。

FFIVE

它是您用来描述预期模式的 protobuf 消息吗?我认为您得到的是从指定的 JSON 模式正确生成的。optional repeated在 protobuf 语言规范中没有意义:https://developers.google.com/protocol-buffers/docs/reference/proto2-spec您可以删除null方括号以生成简单的repeated字段,它在语义上等同于optional repeated(因为repeated意味着零次或多次)。
随时随地看视频慕课网APP

相关分类

Java
我要回答