猿问

PySpark:如何将 Python UDF 应用于 PySpark DataFrame 列?

我有一个带有两组纬度、经度坐标的 PySpark DataFrame。我正在尝试计算给定行的每组坐标之间的 Haversine 距离。我正在使用haversine()我在网上找到的以下内容。问题是它不能应用于列,或者至少我不知道这样做的语法。有人可以分享语法或指出更好的解决方案吗?


from math import radians, cos, sin, asin, sqrt


def haversine(lat1, lon1, lat2, lon2):

    """

    Calculate the great circle distance between two points 

    on the earth (specified in decimal degrees)

    """

    # convert decimal degrees to radians 

    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # haversine formula 

    dlon = lon2 - lon1 

    dlat = lat2 - lat1 

    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2

    c = 2 * asin(sqrt(a)) 

    # Radius of earth in miles is 3,963; 5280 ft in 1 mile

    ft = 3963 * 5280 * c

    return ft

我知道haversine()上面的函数有效,因为我使用数据框中的一些纬度/经度坐标对其进行了测试,并得到了合理的结果:


haversine(-85.8059, 38.250134, 

          -85.805122, 38.250098)

284.1302325439314

当我在 PySpark 数据框中将示例坐标替换为对应于纬度/经度的列名时,我收到错误消息。我尝试了以下代码,试图创建一个新列,其中包含计算的Haversine 距离(以英尺为单位):


df.select('id', 'p1_longitude', 'p1_latitude', 'p2_lon', 'p2_lat').withColumn('haversine_dist', 

                           haversine(df['p1_latitude'],

                                    df['p1_longitude'],

                                    df['p2_lat'],

                                    df['p2_lon']))

.show()

但我得到了错误:


必须是实数,而不是 Column Traceback(最近调用最后一次):

文件“”,第 8 行,haversine TypeError:必须是实数,而不是 Column


这向我表明我必须以某种方式迭代地将我的 hasrsine 函数应用于我的 PySpark DataFrame 的每一行,但我不确定这个猜测是否正确,即使是这样,我也不知道该怎么做。顺便说一句,我的纬度/经度是浮点类型。


红糖糍粑
浏览 128回答 1
1回答

qq_遁去的一_1

当您可以使用 Spark 内置函数时,不要使用 UDF,因为它们通常性能较差。这是一个仅使用与您的函数相同的 Spark SQL 函数的解决方案:from pyspark.sql.functions import col, radians, asin, sin, sqrt, cosdf.withColumn("dlon", radians(col("p2_lon")) - radians(col("p1_longitude"))) \  .withColumn("dlat", radians(col("p2_lat")) - radians(col("p1_latitude"))) \  .withColumn("haversine_dist", asin(sqrt(                                         sin(col("dlat") / 2) ** 2 + cos(radians(col("p1_latitude")))                                         * cos(radians(col("p2_lat"))) * sin(col("dlon") / 2) ** 2                                         )                                    ) * 2 * 3963 * 5280) \  .drop("dlon", "dlat")\  .show(truncate=False)给出:+-----------+------------+----------+---------+------------------+|p1_latitude|p1_longitude|p2_lat    |p2_lon   |haversine_dist    |+-----------+------------+----------+---------+------------------+|-85.8059   |38.250134   |-85.805122|38.250098|284.13023254857814|+-----------+------------+----------+---------+------------------+您可以在此处找到可用的 Spark 内置函数。
随时随地看视频慕课网APP

相关分类

Python
我要回答