猿问

SparkStreaming:避免检查点位置检查

我正在编写一个库,以将Apache Spark与自定义环境集成。我正在实现自定义流源和流编写器。


我正在开发的某些资源至少在应用程序崩溃后是不可恢复的。如果应用程序重新启动,则需要重新加载所有数据。因此,我们希望避免用户不得不显式设置'checkpointLocation'选项。但是,如果未提供该选项,则会看到以下错误:


org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);

但是,如果我使用控制台流输出,则一切正常。


有没有办法获得相同的行为?


注意:我们正在将Spark v2接口用于流读取器/写入器。


火花日志:


18/06/29 16:36:48 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/C:/mydir/spark-warehouse/').

18/06/29 16:36:48 INFO SharedState: Warehouse path is 'file:/C:/mydir/spark-warehouse/'.

18/06/29 16:36:48 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint

org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);

    at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:213)

    at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:208)

    at scala.Option.getOrElse(Option.scala:121)

    at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:207)

    at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:299)

    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:296)

    ...

18/06/29 16:36:50 INFO SparkContext: Invoking stop() from shutdown hook

这就是我开始流媒体作业的方式:


spark.readStream().format("mysource").load()

  .writeStream().format("mywriter").outputMode(OutputMode.Append()).start();

一切正常,相反,例如,如果我运行:


spark.readStream().format("mysource").load()

  .writeStream().format("console").outputMode(OutputMode.Append()).start();


慕桂英546537
浏览 510回答 1
1回答
随时随地看视频慕课网APP

相关分类

Java
我要回答