pandas_udf 给出与 pyarrow 相关的错误

我有数据框,我想使用 pysaprk 中的折线库获取给定地理位置的 lat_long


+-----------------+--------------------+----------+                             

|              vid|        geolocations| trip_date|

+-----------------+--------------------+----------+

|58AC21B17LU006754|eurnE||yqU???????...|2020-02-22|

|2T3EWRFV0LW060632|uocbGfjniOK[Fs@rC...|2020-02-25|

|JTDP4RCE0LJ014008|w}wtFpdxtM????Q_@...|2020-02-25|

|4T1BZ1HK8KU029845|}rz_Dp~hhN?@?@???...|2020-03-03|

我正在使用 pandas_udf 并且 apache arrow 已启用


from pyspark.sql.functions import col, pandas_udf

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", "true")


lat_long_udf = pandas_udf(lambda geoloc:  polyline.decode(geoloc)[0],ArrayType(StringType()))

df1=df.withColumn('lat_long',lat_long_udf(df.geolocations))

当调用 df.count() 给出结果时,但在执行 df.show() 时,我收到如下错误:


 248, in init_stream_yield_batches

    for series in iterator:

  File "/Users/prantik.pariksha/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper

    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)

  File "/Users/prantik.pariksha/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>

    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)

  File "/Users/prantik.pariksha/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 110, in <lambda>

    verify_result_type(f(*a)), len(a[0])), arrow_return_type)

  File "/Users/prantik.pariksha/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper

    return f(*args, **kwargs)


潇湘沐
浏览 132回答 1
1回答

梵蒂冈之花

您很可能会收到此错误,因为 apandas_udf采用 pandas Series 作为输入,并且您将该decode函数直接应用于该系列,而不是将其应用于 pandas Series 中的值。例如,在下面的示例中,我稍微扩展了您的 lambda 函数,以便您可以看到它。我采用 pandas 系列,将polyline.decode函数应用于该系列,然后再次返回结果系列。请注意,我还将返回类型更改为ArrayType(DoubleType())而不是ArrayType(StringType()).import pandas as pdfrom pyspark.sql.types import ArrayType, DoubleType....df = spark.createDataFrame([["~sqU__pR_jpv@_pR"], ["_~t[__pR~qy@_pR"]], ["geolocations"])@pandas_udf(ArrayType(DoubleType()))def lat_long_udf(s: pd.Series) -> pd.Series:&nbsp; return s.apply(lambda x: polyline.decode(x)[0])df1=df.withColumn('decoded', lat_long_udf(df.geolocations))df1.collect()
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python