猿问

如何在 Java 中并行运行 Spark 程序

所以我有一个 java 应用程序,它具有 spark maven 依赖项,并且在运行它时,它会在运行它的主机上启动 spark 服务器。服务器实例有 36 个内核。我正在指定 SparkSession 实例,其中我同时提到了内核数和其他配置属性,但是当我看到使用 的统计信息时htop,它似乎并没有使用所有内核,而只是使用了 1 个。


   SparkSession spark  = SparkSession

                .builder()

                .master("local")

                .appName("my-spark")

                .config("spark.driver.memory","50g")

                .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")

                .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")

                .config("spark.sql.shuffle.partitions", "400")

                .config("spark.eventLog.enabled", "true")

                .config("spark.eventLog.dir", "/dir1/dir2/logs")

                .config("spark.history.fs.logDirectory", "/dir1/dir2/logs")

                .config("spark.executor.cores", "36")

我还在 JavaSparkContext 中添加了:


JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());

sc.hadoopConfiguration().set("fs.s3a.access.key", AWS_KEY);

sc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_KEY);

sc.hadoopConfiguration().set("spark.driver.memory","50g");

sc.hadoopConfiguration().set("spark.eventLog.enabled", "true");

sc.hadoopConfiguration().set("spark.eventLog.dir", "/dir1/dir2/logs");

sc.hadoopConfiguration().set("spark.executor.cores", "36");

我的任务是将 aws s3 中的数据读入 df 并将数据写入另一个存储桶中。


Dataset<Row> df = spark.read().format("csv").option("header", "true").load("s3a://bucket/file.csv.gz");

        //df = df.repartition(200);


        df.withColumn("col_name", df.col("col_name")).sort("col_name", "_id").write().format("iceberg").mode("append").save(location);



婷婷同学_
浏览 161回答 2
2回答

红颜莎娜

.gz 文件是“不可吐槽的”:要解压缩它们,您必须从字节 0 开始并向前阅读。结果,spark、hive、MapReduce 等将整个文件交给了一个工人。如果您想要并行处理,请使用不同的压缩格式(例如 snappy)

互换的青春

你是在本地模式运行的火花,spark.executor.cores将不会生效,考虑改变.master("local")对.master("local[*]")
随时随地看视频慕课网APP

相关分类

Java
我要回答