从JDBC源迁移数据时如何优化分区?

我试图将数据从PostgreSQL表中的表移动到HDFS上的Hive表。为此,我想出了以下代码:


  val conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")

  val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()

  def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {

        val colList                = allColumns.split(",").toList

        val (partCols, npartCols)  = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))

        val queryCols              = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")

        val execQuery              = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"

 

数据将插入到基于以下内容动态分区的配置单元表中 prtn_String_columns: source_system_name, period_year, period_num




aluckdog
浏览 519回答 3
3回答

哈士奇WWW

以我的经验,有4种不同的内存设置:A)[1]用于处理数据的存储器,用于VS [2]用于保存程序堆栈的堆空间B)[1]驱动程序VS [2]执行程序内存到目前为止,通过增加适当的内存种类,我始终能够使我的Spark作业成功运行:因此,A2-B1将在驱动程序上有可用的内存来保存程序堆栈。等等。属性名称如下:A1-B1) executor-memoryA1-B2) driver-memoryA2-B1) spark.yarn.executor.memoryOverheadA2-B2) spark.yarn.driver.memoryOverhead请记住,所有* -B1的总和必须小于工作线程上的可用内存,而所有* -B2的总和必须小于驱动程序节点上的内存。我敢打赌,罪魁祸首是加粗标记的堆设置之一。
打开App,查看更多内容
随时随地看视频慕课网APP