猿问

将 JavaRDD<Status> 转换为 JavaRDD<String> 的问题

我正在尝试将推文从 twitter 保存到 MongoDb 数据库。


我有RDD<Status>,我正在尝试借助 ObjectMapper 将其转换为 JSON 格式。但是这种转换存在一些问题(


public class Main {



    //set system credentials for access to twitter

    private static void setTwitterOAuth() {

        System.setProperty("twitter4j.oauth.consumerKey", TwitterCredentials.consumerKey);

        System.setProperty("twitter4j.oauth.consumerSecret", TwitterCredentials.consumerSecret);

        System.setProperty("twitter4j.oauth.accessToken", TwitterCredentials.accessToken);

        System.setProperty("twitter4j.oauth.accessTokenSecret", TwitterCredentials.accessTokenSecret);

    }



    public static void main(String [] args) {


        setTwitterOAuth();


        SparkConf conf = new SparkConf().setMaster("local[2]")

                                        .setAppName("SparkTwitter");

        JavaSparkContext sparkContext = new JavaSparkContext(conf);

        JavaStreamingContext jssc = new JavaStreamingContext(sparkContext, new Duration(1000));

        JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);


        //Stream that contains just tweets in english

        JavaDStream<Status> enTweetsDStream=twitterStream.filter((status) -> "en".equalsIgnoreCase(status.getLang()));

        enTweetsDStream.persist(StorageLevel.MEMORY_AND_DISK());



        enTweetsDStream.print();

        jssc.start();

        jssc.awaitTermination();

    }


    static void saveRawTweetsToMondoDb(JavaRDD<Status> rdd,JavaSparkContext sparkContext) {

     try {

            ObjectMapper objectMapper = new ObjectMapper();

            SQLContext sqlContext = new SQLContext(sparkContext);

            JavaRDD<String> tweet =  rdd.map(status -> objectMapper.writeValueAsString(status));


            DataFrame dataFrame = sqlContext.read().json(tweet);


        } catch (Exception e) {

            System.out.println("Error saving to database");

        }

    }


JavaRDD<String> tweet =  rdd.map(status -> objectMapper.writeValueAsString(status));

这是一个问题。需要不兼容的类型JavaRDD<String>,但地图被推断为javaRDD<R>


人到中年有点甜
浏览 110回答 1
1回答

慕标琳琳

不幸的是,Java 类型推断并不总是非常聪明,所以我在这些情况下所做的是提取我的 lambda 的所有位作为变量,直到我找到一个 Java 无法为其提供准确类型的位。然后我给表达式我认为它应该具有的类型,看看为什么 Java 会抱怨它。有时它只是编译器的一个限制,您必须显式地将表达式“转换”为所需的类型,有时您会发现代码存在问题。在你的情况下,代码对我来说很好,所以一定有别的东西。然而,我有一个评论:在这里你支付一次 JSON 序列化(从StatusJSON 字符串)然后反序列化(从 JSON 字符串到Row)的成本。另外,您没有向您提供任何架构Dataset,因此它必须两次传递数据(或根据您的配置对其进行采样)以推断架构。如果数据很大,所有这些都可能非常昂贵。如果性能是一个问题并且相对简单,我建议您直接编写从Status到的转换。RowStatus另一个“顺便说一句”:您正在隐式序列化您的ObjectMapper,很可能您不想这样做。看起来该类确实支持 Java 序列化,但具有特殊的逻辑。由于 Spark 的默认配置是使用 Kryo(其性能比 Java 序列化好得多),我怀疑它在使用默认FieldSerializer. 您有以下三种选择:使对象映射器静态化以避免序列化它配置您的 Kryo 注册器以ObjectMapper使用 Java 序列化序列化/反序列化类型的对象。那会起作用,但不值得付出努力。到处使用 Java 序列化而不是 Kryo。馊主意!它很慢并且占用大量空间(内存和磁盘取决于序列化对象的写入位置)。
随时随地看视频慕课网APP

相关分类

Java
我要回答