猿问

在 Pyspark 中使用 udf 时出现 __getnewargs__ 错误

有一个包含 2 列(db 和 tb)的数据框:db 代表数据库,tb 代表该数据库的 tableName。


   +--------------------+--------------------+

   |            database|           tableName|

   +--------------------+--------------------+

   |aaaaaaaaaaaaaaaaa...|    tttttttttttttttt|

   |bbbbbbbbbbbbbbbbb...|    rrrrrrrrrrrrrrrr|

   |aaaaaaaaaaaaaaaaa...|  ssssssssssssssssss|

我在python中有以下方法:


 def _get_tb_db(db, tb):

      df = spark.sql("select * from {}.{}".format(db, tb))

      return df.dtypes

这个udf:


 test = udf(lambda db, tb: _get_tb_db(db, tb), StringType())

运行时:


 df = df.withColumn("dtype", test(col("db"), col("tb")))

有以下错误:


 pickle.PicklingError: Could not serialize object: Py4JError: An 

 error occurred while calling o58.__getnewargs__. Trace:

 py4j.Py4JException: Method __getnewargs__([]) does not exist

我发现了一些关于 stackoverflow 的讨论:Spark __getnewargs__ 错误 ,但我不确定如何解决这个问题?错误是因为我在 UDF 中创建了另一个数据框吗?


类似于链接中的解决方案,我尝试了这个:


       cols = copy.deepcopy(df.columns)

       df = df.withColumn("dtype", scanning(cols[0], cols[1]))

但仍然出现错误


有什么解决办法吗?


慕虎7371278
浏览 440回答 1
1回答

萧十郎

该错误意味着您不能在 UDF 中使用Spark 数据帧。但是由于您包含数据库和表名称的数据框很可能很小,因此只需使用 Pythonfor循环就足够了,以下是一些可能有助于获取数据的方法:from pyspark.sql import Row# assume dfs is the df containing database names and table namesdfs.printSchema()root&nbsp;|-- database: string (nullable = true)&nbsp;|-- tableName: string (nullable = true)方法一:使用 df.dtypes运行 sqlselect * from database.tableName limit 1生成 df 并返回其 dtypes,将其转换为 StringType()。data = []DRow = Row('database', 'tableName', 'dtypes')for row in dfs.collect():&nbsp; try:&nbsp; &nbsp; dtypes = spark.sql('select * from `{}`.`{}` limit 1'.format(row.database, row.tableName)).dtypes&nbsp; &nbsp; data.append(DRow(row.database, row.tableName, str(dtypes)))&nbsp; except Exception, e:&nbsp; &nbsp; print("ERROR from {}.{}: [{}]".format(row.database, row.tableName, e))&nbsp; &nbsp; passdf_dtypes = spark.createDataFrame(data)# DataFrame[database: string, tableName: string, dtypes: string]笔记:使用dtypes而不是str(dtypes)将分别获得以下模式 where_1和_2arecol_name和col_dtype:root&nbsp;|-- database: string (nullable = true)&nbsp;|-- tableName: string (nullable = true)&nbsp;|-- dtypes: array (nullable = true)&nbsp;|&nbsp; &nbsp; |-- element: struct (containsNull = true)&nbsp;|&nbsp; &nbsp; |&nbsp; &nbsp; |-- _1: string (nullable = true)&nbsp;|&nbsp; &nbsp; |&nbsp; &nbsp; |-- _2: string (nullable = true)使用这种方法,每个表将只有一行。对于接下来的两种方法,表的每个 col_type 都会有自己的行。方法二:使用描述您还可以通过运行spark.sql("describe tableName")直接获取数据帧来检索此信息,然后使用 reduce 函数来合并所有表的结果。from functools import reducedef get_df_dtypes(db, tb):&nbsp; try:&nbsp; &nbsp; return spark.sql('desc `{}`.`{}`'.format(db, tb)) \&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .selectExpr(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; '"{}" as `database`'.format(db)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; , '"{}" as `tableName`'.format(tb)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; , 'col_name'&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; , 'data_type')&nbsp; except Exception, e:&nbsp; &nbsp; print("ERROR from {}.{}: [{}]".format(db, tb, e))&nbsp; &nbsp; pass# an example table:get_df_dtypes('default', 'tbl_df1').show()+--------+---------+--------+--------------------+|database|tableName|col_name|&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;data_type|+--------+---------+--------+--------------------+| default|&nbsp; tbl_df1| array_b|array<struct<a:st...|| default|&nbsp; tbl_df1| array_d|&nbsp; &nbsp; &nbsp; &nbsp;array<string>|| default|&nbsp; tbl_df1|struct_c|struct<a:double,b...|+--------+---------+--------+--------------------+# use reduce function to union all tables into one dfdf_dtypes = reduce(lambda d1, d2: d1.union(d2), [ get_df_dtypes(row.database, row.tableName) for row in dfs.collect() ])方法 3:使用 spark.catalog.listColumns()使用 spark.catalog.listColumns() 创建collections.Column对象列表,检索name和dataType合并数据。生成的数据框在它们自己的列上使用 col_name 和 col_dtype 进行标准化(与使用Method-2相同)。data = []DRow = Row('database', 'tableName', 'col_name', 'col_dtype')for row in dfs.select('database', 'tableName').collect():&nbsp; try:&nbsp; &nbsp; for col in spark.catalog.listColumns(row.tableName, row.database):&nbsp; &nbsp; &nbsp; data.append(DRow(row.database, row.tableName, col.name, col.dataType))&nbsp; except Exception, e:&nbsp; &nbsp; print("ERROR from {}.{}: [{}]".format(row.database, row.tableName, e))&nbsp; &nbsp; passdf_dtypes = spark.createDataFrame(data)# DataFrame[database: string, tableName: string, col_name: string, col_dtype: string]注意:在检索元数据时,不同的 Spark 发行版/版本可能会产生不同的结果describe tbl_name和其他命令,请确保在查询中使用正确的列名。
随时随地看视频慕课网APP

相关分类

Python
我要回答