拉风的咖菲猫
将 csv 文件读取为文本并将值拆分,并计算元素。df = spark.read.text('test.csv')df.show(10, False)+-------------------------------+|value |+-------------------------------+|Col1,Col2,Col3,Col4 ||Value11,Value12,Value13,Value14||Value21,Value22,Value23,Value24|+-------------------------------+import pyspark.sql.functions as Fdf2 = df.withColumn('count', F.size(F.split('value', ',')))df2.show(10, False)+-------------------------------+-----+|value |count|+-------------------------------+-----+|Col1,Col2,Col3,Col4 |4 ||Value11,Value12,Value13,Value14|4 ||Value21,Value22,Value23,Value24|4 |+-------------------------------+-----+df2.groupBy().agg(F.min('count'), F.max('count')).show(10, False)+----------+----------+|min(count)|max(count)|+----------+----------+|4 |4 |+----------+----------+
炎炎设计
由于您想知道错误的行,因此唯一的方法就是循环:In [18]: erroneous_lines = []In [19]: with open(r'C:\Users\abaskaran\Desktop\mycsv.txt') as fd: ...: for line_num, line in enumerate(fd,1): ...: if len(line.split(',')) != 4: ...: erroneous_lines.append((line_num, line))In [20]: erroneous_linesOut[20]:[(5, 'Value21,Value22,Value23,Value24Value11,Value12,Value13,Value14\n'), (6, 'Value21,Value22,Value23\n')]该erroneous_lines列表将包含一个元组列表,包含行号和行的实际内容,但不包含所有值。我将 CSV 内容修改为 belowj 只是为了测试:Col1,Col2,Col3,Col4Value11,Value12,Value13,Value14Value21,Value22,Value23,Value24Value11,Value12,Value13,Value14Value21,Value22,Value23,Value24Value11,Value12,Value13,Value14Value21,Value22,Value23Value11,Value12,Value13,Value14Value21,Value22,Value23,Value24Value11,Value12,Value13,Value14Value21,Value22,Value23,Value24