使用 Spark 将字段添加到 Csv

所以,我有一个包含空间 ( latitude, longitude) 和时间 ( timestamp) 数据的 CSV 。


为了对我们有用,我们将空间信息转换为“ geohash”,将时间信息转换为“ timehash”。


问题是,如何使用 spark 为 CSV 中的每一行添加geohash和timehash作为字段(因为数据大约为200 GB)?


我们尝试使用JavaPairRDD它的功能mapTopair,但问题仍然在于如何转换回 aJavaRdd然后转换为 CSV?所以我认为这是一个糟糕的解决方案,我要求一个简单的方法。


问题更新:

在@Alvaro 得到帮助后,我创建了这个 java 类:


public class Hash {

public static SparkConf Spark_Config;

public static JavaSparkContext Spark_Context;


UDF2 geohashConverter = new UDF2<Long, Long, String>() {

    

    public String call(Long latitude, Long longitude) throws Exception {

        // convert here

        return "calculate_hash";

    }

};


UDF1 timehashConverter = new UDF1<Long, String>() {

    

    public String call(Long timestamp) throws Exception {

        // convert here

        return "calculate_hash";

    }

};

public Hash(String path) {

    SparkSession spark = SparkSession

            .builder()

            .appName("Java Spark SQL Example")

            .config("spark.master", "local")

            .getOrCreate();

    

    spark.udf().register("geohashConverter", geohashConverter, DataTypes.StringType);

    spark.udf().register("timehashConverter", timehashConverter, DataTypes.StringType);

    

Dataset df=spark.read().csv(path)

    .withColumn("geohash", callUDF("geohashConverter", col("_c6"), col("_c7")))

    .withColumn("timehash", callUDF("timehashConverter", col("_c1")))

.write().csv("C:/Users/Ahmed/Desktop/preprocess2");


 }


public static void main(String[] args) {

    String path = "C:/Users/Ahmed/Desktop/cabs_trajectories/cabs_trajectories/green/2013";

    Hash h = new Hash(path);

}

}

然后我得到序列化问题,当我删除时消失 write().csv()


偶然的你
浏览 275回答 1
1回答
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java