使用 Beam SQL 查询 Avro 架构

我正在尝试使用 Apache Beam 读取 avro 文件并使用 Beam SQL 来转换数据。


我对 Beam 和 Java 还是新手。这是我的简单代码:


public class BeamSQLReadAvro {

    @SuppressWarnings("serial")

    public static void main(String[] args) throws IOException {

        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

        Pipeline p = Pipeline.create(options);


        /* Schema definition */

        Schema schema = new Schema.Parser().parse(new File("data/RATE_CODE/RATE_CODE.avsc"));


        /* Create record/row */

        PCollection<GenericRecord> records = p.apply(AvroIO.readGenericRecords(schema).from("data/RATE_CODE/*.avro"));


        /* SQL Transform */

        records.apply("SQL Transform 01",SqlTransform.query("SELECT RCODE,RNAME,RDESC FROM PCOLLECTION LIMIT 10"))


        /* Print output */

               .apply("Output",

                      MapElements.via(

                        new SimpleFunction<Row, Row>() {

                          @Override

                          public Row apply(Row input) {

                            System.out.println("PCOLLECTION: " + input.getValues());

                            return input;

                          }

                        }

                      )

               );

        p.run().waitUntilFinish();

    }

}

它给了我错误


Exception in thread "main" java.lang.IllegalStateException: Cannot call getSchema when there is no schema

我不明白,我定义了一个名为 schema 的变量。这里有什么指点吗?


HUH函数
浏览 47回答 1
1回答

慕姐4208626

实际上,您的管道中有两种类型的模式 - Avro 和 Beam 模式。Avro 模式用于解析 Avro 输入记录,但对于 SQL 转换,您应该使用具有 Beam 模式的行。为此,AvroIO提供一个选项withBeamSchemas(boolean),应true根据您的情况设置为,例如:AvroIO.readGenericRecords(schema).withBeamSchemas(true).from("data/RATE_CODE/*.avro")
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java