我有一个Python类,用于在Spark中加载和处理一些数据。在需要做的各种事情中,我正在生成一个从Spark数据帧中各个列派生的伪变量列表。我的问题是我不确定如何正确定义用户定义函数来完成我所需要的。
我目前确实有一种方法,当将其映射到基础数据帧RDD上时,可以解决一半的问题(请记住,这是较大data_processor类中的方法):
def build_feature_arr(self,table):
# this dict has keys for all the columns for which I need dummy coding
categories = {'gender':['1','2'], ..}
# there are actually two differnt dataframes that I need to do this for, this just specifies which I'm looking at, and grabs the relevant features from a config file
if table == 'users':
iter_over = self.config.dyadic_features_to_include
elif table == 'activty':
iter_over = self.config.user_features_to_include
def _build_feature_arr(row):
result = []
row = row.asDict()
for col in iter_over:
column_value = str(row[col]).lower()
cats = categories[col]
result += [1 if column_value and cat==column_value else 0 for cat in cats]
return result
return _build_feature_arr
从本质上讲,对于指定的数据帧,此操作将获取指定列的分类变量值,并返回这些新虚拟变量的值的列表。这意味着以下代码:
data = data_processor(init_args)
result = data.user_data.rdd.map(self.build_feature_arr('users'))
返回类似:
In [39]: result.take(10)
Out[39]:
[[1, 0, 0, 0, 1, 0],
[1, 0, 0, 1, 0, 0],
[1, 0, 0, 0, 0, 0],
[1, 0, 1, 0, 0, 0],
[1, 0, 0, 1, 0, 0],
[1, 0, 0, 1, 0, 0],
[0, 1, 1, 0, 0, 0],
[1, 0, 1, 1, 0, 0],
[1, 0, 0, 1, 0, 0],
[1, 0, 0, 0, 0, 1]]
就生成所需的虚拟变量列表而言,这正是我想要的,但这是我的问题:我如何(a)制作具有可以在Spark SQL查询中使用的类似功能的UDF(或其他方法) ,我想),或(b)提取上述映射得出的RDD并将其作为新列添加到user_data数据帧?
无论哪种方式,我需要做的是生成一个新的数据框,其中包含来自user_data的列,以及一个feature_array包含上述函数的输出(或功能等效的东西)的新列(我们称之为)。
手掌心
相关分类