猿问

Apache Beam 管道从 csv 文件读取、拆分、groupbyKey 并写入文本文件时出现

我的输入数据如下所示:


id,vin,url,exteriorColor,interiorColor,design,transmission,lastcrawled,mileage,price,certified,dealerId,historyType,MSRP

114722309,19XVC2F35PR012846,http://www.pohankaacura.com/auto/used-2017-acura-ilx-chantilly-va-near-buckeystown-md/24742881/,Modern Steel,graystone,0,8-Speed Dual-Clutch,2018-02-05 01:49:47 UTC,1646,22550,0,28453

我想构建一个 Beam 管道,它将从 csv 文件中读取这些数据,获取 vin 并计算 vin 在文件中出现的次数。所以我想按 vin 分组并计算计数。我希望我的最终输出在一个平面文件中。我错过了注释,所以我现在已经添加了它,但是我得到了一个不同的错误,我在这里也找不到解决方案。下面是我的代码。


import org.apache.beam.sdk.Pipeline;

import org.apache.beam.sdk.io.TextIO;

import org.apache.beam.sdk.options.PipelineOptions;

import org.apache.beam.sdk.options.PipelineOptionsFactory;

import org.apache.beam.sdk.transforms.*;

import org.apache.beam.sdk.values.KV;


public class p1 {

    public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.create();


        Pipeline p = Pipeline.create(options);

        p.apply(TextIO.read().from("~/slow_storage_drive/beam_test_files/one_vin.csv"))


                .apply("Parse&ConvertToKV", MapElements.via(

                        new SimpleFunction<String, KV<String, Integer>>() {

                            public KV<String, Integer> apply(String input){

                                String[] split = input.split(",");

                                String key = split[1];

                                Integer value = 1;

                                return KV.of(key, value);

                            }

                        }

                ))


我尝试使用以下命令运行程序:


mvn compile -X exec:java -Dexec.mainClass=p1 -Pdirect-runner

我收到以下错误:


[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project emr_beam_test: An exception occured while executing the Java class. java.lang.IllegalStateException: Invisible parameter type of p1$2 arg0 for public p1$2$DoFnInvoker(p1$2) -> [Help 1]

我无法理解我做错了什么。谁能帮帮我吗?


胡说叔叔
浏览 161回答 2
2回答

呼唤远方

您必须使用 @ProcessElement 批注来批注匿名类方法 processElement。更多注解信息请参考https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/transforms/DoFn.ProcessElement.html

慕少森

似乎我收到了不可见参数类型异常,因为 Apache Beam 尚不支持 Java 10。我将我的 JAVA_HOME 改为指向 Java 8,程序运行了。我从这个线程中得到了这个想法:Apache Beam: Invisible parameter type exception
随时随地看视频慕课网APP

相关分类

Java
我要回答