猿问

需要在 Spark 中优化对休息服务的调用

我需要为 spark 中数据集的每一行调用一个休息服务。我生成了以下代码:


import requests


df= spark.read.parquet("file.parquet")


for row in df.rdd.collect():

  requests.post('rest.api/endpoint')

我不确定这是否是最好的方法,性能方面。有没有更好的方法来实现它?


小怪兽爱吃肉
浏览 75回答 1
1回答

慕丝7291255

通过在结果上运行它,.collect您将失去任何并行化,并且所有请求都将由驱动程序完成。您可以创建一个为每一行调用 API 的 UDF:from pyspark.sql.functions import udfimport requestsapi = "https://swapi.co/api/people/"@udf("string")def swapiGetPersonName(id):    response = requests.get(api + str(id))    return response.json()["name"]df = spark.range(1,10)df.select("id", swapiGetPersonName("id").alias("name")).show()但是,如果您有大量数据,这很容易使您的休息服务或执行程序超载。(您几乎会对您的服务进行拒绝服务攻击或用完套接字)。如果这是一个问题,您可以通过在时间加载数据的子集来批处理数据通过拆分它foreachPartition并在每个行中一个接一个地处理它来批处理它使用结构化流传输数据并限制您一次处理的行数使用支持批处理操作的API(或修改你的)(而不是每一行,上传整个分区/重要部分数据)这些都是我的想法,但是除了对服务进行太多调用之外,不要忘记添加适当的异常处理:)
随时随地看视频慕课网APP

相关分类

Python
我要回答