解析 Pyspark 数据框中的 XML 列

我对 PySpark 比较陌生,正在尝试解决数据问题。我有一个 pyspark DF,它是用从 MS SQL Server 中提取的数据创建的,有 2 列:ID(整数)和 XMLMsg(字符串)。第 2 列 XMLMsg 包含 XML 格式的数据。目标是解析 XMLMsg 列并在同一个 DF 中使用从 XML 中提取的列创建其他列。


以下是 pyspark DF 的示例结构:


ID  XMLMsg

101 ...<a><b>name1</b><c>loc1</c></a>...<d>dept1</d>...

102 ...<a><b>name2</b><c>loc2</c></a>...<d>dept2</d>...

103 ...<a><b>name3</b><c>loc3</c></a>...<d>dept3</d>...

预期输出是:


ID  XMLMsg                                              b       c       d

101 ...<a><b>name1</b><c>loc1</c></a>...<d>dept1</d>... name1   loc1    dept1

102 ...<a><b>name2</b><c>loc2</c></a>...<d>dept2</d>... name2   loc2    dept2

103 ...<a><b>name3</b><c>loc3</c></a>...<d>dept3</d>... name3   loc3    dept3

根据我在 SO 中的搜索,我尝试了一些建议;然而,未能达到预期的效果。因此,寻求一些帮助和指导。谢谢你的时间。


qq_笑_17
浏览 156回答 1
1回答

largeQ

考虑到我必须从一个巨大的 XML 文件中获取来自 4 个节点的文本,我最终使用 Lambda 和 UDF 解决了这个问题。由于 XML 文件已经在列中并且是 pyspark Dataframe 的一部分,我不想写成文件并再次解析整个 XML。我还想避免使用 XSD 架构。实际的 xml 有多个命名空间和一些具有特定条件的节点。例子:<ap:applicationproduct xmlns:xsi="http://www.example.com/2005/XMLSchema-instance" xmlns:ap="http://example.com/productinfo/1_6" xmlns:ct="http://example.com/commontypes/1_0" xmlns:dc="http://example.com/datacontent/1_0" xmlns:tp="http://aexample.com/prmvalue/1_0" ....." schemaVersion="..">&nbsp; <ap:ParameterInfo>&nbsp; &nbsp; <ap:Header>&nbsp; &nbsp; &nbsp; <ct:Version>1.0</ct:Version>&nbsp; &nbsp; &nbsp; <ct:Sender>ABC</ct:Sender>&nbsp; &nbsp; &nbsp; <ct:SenderVersion />&nbsp; &nbsp; &nbsp; <ct:SendTime>...</ct:SendTime>&nbsp; &nbsp; </ap:Header>&nbsp; &nbsp; <ap:ProductID>&nbsp; &nbsp; &nbsp; <ct:Model>&nbsp; &nbsp; &nbsp; &nbsp; <ct:Series>34AP</ct:Series>&nbsp; &nbsp; &nbsp; &nbsp; <ct:ModelNo>013780</ct:ModelNo>&nbsp; &nbsp; &nbsp;..............&nbsp; &nbsp; &nbsp; ..............&nbsp; &nbsp;<ap:Object>&nbsp; &nbsp; &nbsp; &nbsp; <ap:Parameter schemaVersion="2.5" Code="DDA">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <dc:Value>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <tp:Blob>mbQAEAgBTgKQEBAX4KJJYABAIASL0AA==</tp:Blob>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dc:Value>&nbsp; &nbsp; &nbsp; &nbsp; </ap:Parameter>.................在这里我需要从 ct:ModelNo 和 tp:Blob 中提取值from pyspark.sql.types import *from pyspark.sql.functions import udfimport xml.etree.ElementTree as ET# List of namespaces to be used:ns = {'ap' : 'http://example.com/productinfo/1_6',&nbsp;'ct':'http://example.com/commontypes/1_0',&nbsp;'dc':'http://example.com/datacontent/1_0',&nbsp;'tp':'http://aexample.com/prmvalue/1_0'&nbsp;&nbsp; &nbsp; }parsed_model = (lambda x:&nbsp; &nbsp; ET.fromstring(x).find('ap:ParameterInfo/ap:ProductID/ct:Model/ct:ModelNo', ns).text)udf_model = udf(parsed_model)parsed_model_df = df.withColumn('ModelNo', udf_Model('XMLMsg'))同样对于具有 blob 值的节点,可以编写类似的函数,但节点的路径将是:'ap:ParameterInfo/ap:Object/ap:Parameter[@Code="DDA"]/dc:Value/tp:Blob'这对我有用,我能够在 pyspark DF 中添加所需的值。欢迎提出任何建议以使其变得更好。谢谢你!
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python