将 Spark 与 Flask 与 JDBC 结合使用

我在做什么?


我想使用 Flask 构建一个 API 服务,从一个数据库中提取数据,进行一些数据分析,然后将新数据加载到一个单独的数据库中。


怎么了?


如果我自己运行 Spark,我可以访问数据库,执行分析并加载到数据库。但是在 Flask 应用程序(api 路由)中使用它们时,相同的功能将不起作用。


我是怎么做的?


首先我启动 Spark master 和 worker。我可以看到我localhost:8080在主人下面有一个工人。


export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)


../sbin/start-master.sh

../sbin/start-slave.sh spark://xxx.local:7077

对于 Flask 应用程序:


app = Flask(__name__)


spark = SparkSession\

    .builder\

    .appName("Flark - Flask on Spark")\

    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")



@app.route("/")

def hello():

    dataframe = spark.read.format("jdbc").options(

        url="jdbc:postgresql://localhost/foodnome_dev?user=postgres&password=''",

        database="foodnome_test",

        dbtable='"Dishes"'

    ).load()


    print([row["description"]

           for row in dataframe.select('description').collect()])


    return "hello"

为了运行这个应用程序,我使用 JDBC 驱动程序spark-submit:


../bin/spark-submit --master spark://Leos-MacBook-Pro.local:7077 --driver-class-path postgresql-42.2.5.jar server.py

我得到什么错误?


在 Flask 方面,错误是内部服务器错误。在 Spark 方面,


File "/Users/leoqiu/Desktop/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value

    format(target_id, ".", name), value)

py4j.protocol.Py4JJavaError: An error occurred while calling o36.collectToPython.

: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.0.0.67, executor 0): java.lang.ClassNotFoundException: org.postgresql.Driver


温温酱
浏览 297回答 2
2回答

慕雪6442864

--driver-class-path在这里是不够的。驱动程序 jar 也应该添加到执行程序类路径中。这通常使用以下方法一起处理:spark.jars.packages / --packagesspark.jars / --jars虽然你仍然可以使用spark.executor.extraClassPath.说明:带有 JDBC 源驱动程序负责读取元数据(schema)和用于实际数据检索过程的执行程序。这种行为对于不同的外部数据源是通用的,因此无论何时使用非内置格式,都应该在整个集群中分发相应的 jar。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python