所以,我有一个包含空间 ( latitude, longitude) 和时间 ( timestamp) 数据的 CSV 。
为了对我们有用,我们将空间信息转换为“ geohash”,将时间信息转换为“ timehash”。
问题是,如何使用 spark 为 CSV 中的每一行添加geohash和timehash作为字段(因为数据大约为200 GB)?
我们尝试使用JavaPairRDD它的功能mapTopair,但问题仍然在于如何转换回 aJavaRdd然后转换为 CSV?所以我认为这是一个糟糕的解决方案,我要求一个简单的方法。
问题更新:
在@Alvaro 得到帮助后,我创建了这个 java 类:
public class Hash {
public static SparkConf Spark_Config;
public static JavaSparkContext Spark_Context;
UDF2 geohashConverter = new UDF2<Long, Long, String>() {
public String call(Long latitude, Long longitude) throws Exception {
// convert here
return "calculate_hash";
}
};
UDF1 timehashConverter = new UDF1<Long, String>() {
public String call(Long timestamp) throws Exception {
// convert here
return "calculate_hash";
}
};
public Hash(String path) {
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL Example")
.config("spark.master", "local")
.getOrCreate();
spark.udf().register("geohashConverter", geohashConverter, DataTypes.StringType);
spark.udf().register("timehashConverter", timehashConverter, DataTypes.StringType);
Dataset df=spark.read().csv(path)
.withColumn("geohash", callUDF("geohashConverter", col("_c6"), col("_c7")))
.withColumn("timehash", callUDF("timehashConverter", col("_c1")))
.write().csv("C:/Users/Ahmed/Desktop/preprocess2");
}
public static void main(String[] args) {
String path = "C:/Users/Ahmed/Desktop/cabs_trajectories/cabs_trajectories/green/2013";
Hash h = new Hash(path);
}
}
然后我得到序列化问题,当我删除时消失 write().csv()
相关分类