猿问

pyspark 使用 saveAsNewAPIHadoopFile 将 dstream

这是我的代码:


  es_write_conf = {

     ¦   "es.nodes" : ES_IP,

     ¦   "es.port" : ES_PORT,

     ¦   "es.resource" : "%s/%s" % (index, doc_type),

     ¦   "es.input.json": "true",

     ¦   # "es.mapping.rich.date": "true"

     ¦   # "es.mapping.id": "guid"

     }


     dstream.foreachRDD(lambda es_rdd: es_rdd.saveAsNewAPIHadoopFile(

     ¦   path="-",

     ¦   outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",

     ¦   keyClass="org.apache.hadoop.io.NullWritable",

     ¦   valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",

     ¦   conf=es_write_conf))

我收到了这个警告


WARN EsOutputFormat: Speculative execution enabled for reducer - consider disabling it to prevent data corruption

我该如何解决这个警告?


慕标5832272
浏览 338回答 1
1回答

慕丝7291255

我通过这个解决问题     es_write_conf = {     ¦   "es.nodes" : ES_IP,     ¦   "es.port" : ES_PORT,     ¦   "es.resource" : "%s/%s" % (index, doc_type),     ¦   "es.input.json": "true",     ¦   "mapred.reduce.tasks.speculative.execution": "false",     ¦   "mapred.map.tasks.speculative.execution": "false",     ¦   # "es.mapping.rich.date": "true"     ¦   # "es.mapping.id": "guid"     }     dstream.foreachRDD(lambda es_rdd: es_rdd.saveAsNewAPIHadoopFile(     ¦   path="-",     ¦   outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",     ¦   keyClass="org.apache.hadoop.io.NullWritable",     ¦   valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",     ¦   conf=es_write_conf))
随时随地看视频慕课网APP

相关分类

Python
我要回答