猿问

PySpark Dataframe 根据函数返回值创建新列

我有一个数据框,我想根据函数返回的值添加一个新列。此函数的参数是来自同一数据帧的四列。这个和这个有点类似于我想要的,但没有回答我的问题。


这是我的数据框(有更多的列然后这四个)


 + ------ + ------ + ------ + ------ +

 | lat1   | lng1   | lat2   | lng2   |

 + ------ + ------ + ------ + ------ +

 | -32.92 | 151.80 | -32.89 | 151.71 |

 | -32.92 | 151.80 | -32.89 | 151.71 |

 | -32.92 | 151.80 | -32.89 | 151.71 |

 | -32.92 | 151.80 | -32.89 | 151.71 |

 | -32.92 | 151.80 | -32.89 | 151.71 |

 + ------ + ------ + ------ + ------ +

我想添加另一列“距离”,它是两个位置点(纬度/经度)之间的总距离。我有一个函数,它将四个位置点作为参数,并将差值作为浮点数返回。


def get_distance(lat_1, lng_1, lat_2, lng_2): 

  d_lat = lat_2 - lat_1

  d_lng = lng_2 - lng_1 


  temp = (  

  math.sin(d_lat / 2) ** 2 

    + math.cos(lat_1) 

    * math.cos(lat_2) 

    * math.sin(d_lng / 2) ** 2

  )


  return 6367.0 * (2 * math.asin(math.sqrt(temp))) 

这是我的尝试导致错误,我也不确定这种方法,它基于我已经提到的其他问题。


udf_func = udf(lambda lat_1, lng_1, lat_2, lng_2: get_distance(lat_1, lng_1, lat_2, lng_2), returnType=FloatType())


df1 = df.withColumn('difference', udf_func(df.lat1, df_lng1, df.lat2, df.lng2))

df_subset1.show()


慕哥6287543
浏览 281回答 2
2回答

猛跑小猪

关于 unicode 的堆栈跟踪部分表明该列的类型是 StringType,因为您不能减去两个字符串。您可以使用df.printSchema().如果您float(lat1)在任何计算之前将所有 lats 和 longs 转换为浮点数(例如),您的 udf 应该可以正常执行。
随时随地看视频慕课网APP

相关分类

Python
我要回答