我正在尝试将推文从 twitter 保存到 MongoDb 数据库。
我有RDD<Status>,我正在尝试借助 ObjectMapper 将其转换为 JSON 格式。但是这种转换存在一些问题(
public class Main {
//set system credentials for access to twitter
private static void setTwitterOAuth() {
System.setProperty("twitter4j.oauth.consumerKey", TwitterCredentials.consumerKey);
System.setProperty("twitter4j.oauth.consumerSecret", TwitterCredentials.consumerSecret);
System.setProperty("twitter4j.oauth.accessToken", TwitterCredentials.accessToken);
System.setProperty("twitter4j.oauth.accessTokenSecret", TwitterCredentials.accessTokenSecret);
}
public static void main(String [] args) {
setTwitterOAuth();
SparkConf conf = new SparkConf().setMaster("local[2]")
.setAppName("SparkTwitter");
JavaSparkContext sparkContext = new JavaSparkContext(conf);
JavaStreamingContext jssc = new JavaStreamingContext(sparkContext, new Duration(1000));
JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);
//Stream that contains just tweets in english
JavaDStream<Status> enTweetsDStream=twitterStream.filter((status) -> "en".equalsIgnoreCase(status.getLang()));
enTweetsDStream.persist(StorageLevel.MEMORY_AND_DISK());
enTweetsDStream.print();
jssc.start();
jssc.awaitTermination();
}
static void saveRawTweetsToMondoDb(JavaRDD<Status> rdd,JavaSparkContext sparkContext) {
try {
ObjectMapper objectMapper = new ObjectMapper();
SQLContext sqlContext = new SQLContext(sparkContext);
JavaRDD<String> tweet = rdd.map(status -> objectMapper.writeValueAsString(status));
DataFrame dataFrame = sqlContext.read().json(tweet);
} catch (Exception e) {
System.out.println("Error saving to database");
}
}
JavaRDD<String> tweet = rdd.map(status -> objectMapper.writeValueAsString(status));
这是一个问题。需要不兼容的类型JavaRDD<String>,但地图被推断为javaRDD<R>
慕标琳琳
相关分类