配置
所有运行节点安装 pyarrow
,需要 >= 0.8
为什么会有 pandas UDF
在过去的几年中,python 正在成为数据分析师的默认语言。一些类似 pandas
,numpy
,statsmodel
,scikit-learn
被大量使用,逐渐成为主流的工具包。同时,spark 也成为了大数据处理的标准,为了让数据分析师能够使用 spark ,Spark在 0.7 版本增加了 python api,也支持了 udf (user-defined functions)。
这些 udf 对每条记录都会操作一次,同时数据需要在 JVM 和 Python 中传输,因此有了额外的序列化和调用开销。因此可以用 Java 和 Scala 中定义 UDF,然后在 python 中调用它们。
image
pandas UDFs 为什么快?
Pandas Udf 构建在 Apache Arrow
之上,带来了低开销,高性能的UDF。
每个系统都有自己的存储格式,70%-80%的时间花费在序列化和反序列化上
Apache Arrow:一个跨平台的在内存中以列式存储的数据层,用来加速大数据分析速度。
循环执行 转化为 pandas 向量化计算。
python 和 JVM 使用同一种数据结构,避免了序列化的开销
每批进行向量化计算的数据量由 spark.sql.execution.arrow.maxRecordsPerBatch
参数控制,默认为10000条。如果一次计算的 columns 特别多,可以适当的减小该值。
一些限制
不支持所有的 sparkSQL 数据类型,包括 BinaryType,MapType, ArrayType,TimestampType 和嵌套的 StructType。
pandas udf 和 udf 不能混用。
使用方式
1. spark df & pandas df
spark df 与 pandas df 相互转化性能优化,需要开启配置,默认为关闭。
配置项:
spark.sql.execution.arrow.enabled true
相互转化
import numpy as npimport pandas as pd//初始化 pandas DFpdf = pd.DataFrame(np.random.rand(100000, 3))// pdf -> sdf%time df = spark.createDataFrame(pdf)// sdf -> pdf%time result_pdf = df.select("*").toPandas()
性能对比:
execution.arrow.enabled | pdf -> sdf | sdf -> pdf |
---|---|---|
false | 4980ms | 722ms |
true | 72ms | 79ms |
tips: 即便是提高了转化的速度,pandas df 依旧是单机在 driver 中执行的,不应该返回大量的数据。
2. pandas UDFs(Vectorized UDFs)
pandas udf 的入参和返回值类型都为 pandas.Series
注册 udf
方法1:
from pyspark.sql.functions import pandas_udfdef plus_one(a): return a + 1//df_udf plus_one_pd_udf = pandas_udf(plus_one, returnType=LongType()) //sql udf spark.udf.register('plus_one',plus_one_pd_udf)
方法2:
from pyspark.sql.functions import pandas_udf //默认为 PandasUDFType.SCALAR 类型@pandas_udf('long')def plus_one(a): return a + 1spark.udf.register('plus_one',plus_one)
spark.udf.register可以接受一个 SQL_BATCHED_UDF 或 SQL_SCALAR_PANDAS_UDF 方法。
使用 pandas udf 后,物理执行计划会从 BatchEvalPython
变为 ArrowEvalPython
,可以使用 explain()
检查 pandas udf 是否生效。
Scalar Pandas UDFs
import pandas as pdfrom pyspark.sql.functions import col, pandas_udf,udffrom pyspark.sql.types import LongTypedef multiply_func(a, b): return a * b multiply_pd = pandas_udf(multiply_func, returnType=LongType()) multiply = udf(multiply_func, returnType=LongType()) x = pd.Series([1, 2, 3] * 10000) df = spark.createDataFrame(pd.DataFrame(x, columns=["x"])) %timeit df.select(multiply_pd(col("x"), col("x"))).count() %timeit df.select(multiply(col("x"), col("x"))).count()
Grouped Map Pandas UDFs
计算均方差
from pyspark.sql.functions import pandas_udf, PandasUDFTypedf = spark.createDataFrame( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)def substract_mean(pdf): # pdf is a pandas.DataFrame v = pdf.v return pdf.assign(v=v - v.mean()) df.groupby("id").apply(substract_mean).show() +---+----+ | id| v| +---+----+ | 1|-0.5| | 1| 0.5| | 2|-3.0| | 2|-1.0| | 2| 4.0| +---+----+
测试用例
数据准备: 10M-row DataFrame , 2列,一列Int类型,一列Double类型
df = spark.range(0, 10 * 1000 * 1000).withColumn('id', (col('id') / 10000).cast('integer')).withColumn('v', rand()) df.cache() df.count()
Plus one
from pyspark.sql.functions import pandas_udf, PandasUDFType# 输入和输出都是 doubles 类型的 pandas.Series@pandas_udf('double', PandasUDFType.SCALAR)def pandas_plus_one(v): return v + 1df.withColumn('v2', pandas_plus_one(df.v))
Cumulative Probability
import pandas as pdfrom scipy import stats@pandas_udf('double')def cdf(v): return pd.Series(stats.norm.cdf(v)) df.withColumn('cumulative_probability', cdf(df.v))
Subtract Mean
# 输入和输出类型都是 pandas.DataFrame@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)def subtract_mean(pdf): return pdf.assign(v=pdf.v - pdf.v.mean()) df.groupby('id').apply(subtract_mean)
Scalar 和 Grouped map 的一些区别
... | Scalar | Grouped map |
---|---|---|
udf 入参类型 | pandas.Series | pandas.DataFrame |
udf 返回类型 | pandas.Series | pandas.DataFrame |
聚合语义 | 无 | groupby 的子句 |
返回大小 | 与输入一致 | rows 和 columns 都可以和入参不同 |
返回类型声明 | pandas.Series 的 DataType | pandas.DataFrame 的 StructType |
性能对比
类型 | udf | pandas udf |
---|---|---|
plus_one | 2.54s | 1.28s |
cdf | 2min 2s | 1.52s |
Subtract Mean | 1min 8s | 4.4s |
配置和测试方法
环境
Spark 2.3
Anaconda 4.4.0 (python 2.7.13)
运行模式 local[10]
作者:breeze_lsw
链接:https://www.jianshu.com/p/17117574a86b