我有数据框,我想使用 pysaprk 中的折线库获取给定地理位置的 lat_long
+-----------------+--------------------+----------+
| vid| geolocations| trip_date|
+-----------------+--------------------+----------+
|58AC21B17LU006754|eurnE||yqU???????...|2020-02-22|
|2T3EWRFV0LW060632|uocbGfjniOK[Fs@rC...|2020-02-25|
|JTDP4RCE0LJ014008|w}wtFpdxtM????Q_@...|2020-02-25|
|4T1BZ1HK8KU029845|}rz_Dp~hhN?@?@???...|2020-03-03|
我正在使用 pandas_udf 并且 apache arrow 已启用
from pyspark.sql.functions import col, pandas_udf
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", "true")
lat_long_udf = pandas_udf(lambda geoloc: polyline.decode(geoloc)[0],ArrayType(StringType()))
df1=df.withColumn('lat_long',lat_long_udf(df.geolocations))
当调用 df.count() 给出结果时,但在执行 df.show() 时,我收到如下错误:
248, in init_stream_yield_batches
for series in iterator:
File "/Users/prantik.pariksha/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
File "/Users/prantik.pariksha/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
File "/Users/prantik.pariksha/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 110, in <lambda>
verify_result_type(f(*a)), len(a[0])), arrow_return_type)
File "/Users/prantik.pariksha/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
return f(*args, **kwargs)
梵蒂冈之花
相关分类