我正在编写一个库,以将Apache Spark与自定义环境集成。我正在实现自定义流源和流编写器。
我正在开发的某些资源至少在应用程序崩溃后是不可恢复的。如果应用程序重新启动,则需要重新加载所有数据。因此,我们希望避免用户不得不显式设置'checkpointLocation'选项。但是,如果未提供该选项,则会看到以下错误:
org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);
但是,如果我使用控制台流输出,则一切正常。
有没有办法获得相同的行为?
注意:我们正在将Spark v2接口用于流读取器/写入器。
火花日志:
18/06/29 16:36:48 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/C:/mydir/spark-warehouse/').
18/06/29 16:36:48 INFO SharedState: Warehouse path is 'file:/C:/mydir/spark-warehouse/'.
18/06/29 16:36:48 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);
at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:213)
at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:208)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:207)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:299)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:296)
...
18/06/29 16:36:50 INFO SparkContext: Invoking stop() from shutdown hook
这就是我开始流媒体作业的方式:
spark.readStream().format("mysource").load()
.writeStream().format("mywriter").outputMode(OutputMode.Append()).start();
一切正常,相反,例如,如果我运行:
spark.readStream().format("mysource").load()
.writeStream().format("console").outputMode(OutputMode.Append()).start();
相关分类