我试图将数据从PostgreSQL表中的表移动到HDFS上的Hive表。为此,我想出了以下代码:
val conf = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
val colList = allColumns.split(",").toList
val (partCols, npartCols) = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
}
finalDF
}
数据将插入到基于以下内容动态分区的配置单元表中 prtn_String_columns: source_system_name, period_year, period_num
数据未正确分区。一个分区较小,而另一个分区较大。这里有一个偏斜问题。将数据插入Hive表时,该作业在此行失败:spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")但是我知道这是由于数据偏斜问题而发生的。
我试图增加执行程序的数量,增加执行程序的内存,驱动程序的内存,试图将其另存为csv文件,而不是将数据帧保存到Hive表中,但是不会因给出异常而影响执行:
java.lang.OutOfMemoryError: GC overhead limit exceeded
代码中有什么我需要更正的吗?谁能让我知道如何解决这个问题?
HUX布斯
相关分类