Map<File, Dataset<Row> allWords = ...
StructField[] structFields = new StructField[] {
new StructField("word", DataTypes.StringType, false, Metadata.empty()),
new StructField("count", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("files", ???, false, Metadata.empty())
};
StructType structType = new StructType(structFields);
Dataset<Row> allFilesWords = spark.createDataFrame(new ArrayList<>(), structType);
for (Map.Entry<File, Dataset<Row>> entry : allWords.entrySet()) {
Integer fileIndex = files.indexOf(entry.getKey());
allFilesWords.unionAll(
allWords.get(entry.getKey()).withColumn("files", ???)
);
}
在上面给定的代码中,allWords表示从文件到其字数 ( Row: (string, integer))的映射。现在,我想将所有文件的结果聚合到一个 DataFrame 中,同时保留提到该单词的原始文件。由于最后,每个单词可能在多个文件中都被提到过,因此该files列设计为整数类型集(假设文件被映射为整数)。现在,我正在尝试向allWordsDataFrame添加一个新列,然后使用unionAll将它们全部合并在一起。
但我不知道如何files使用仅包含一个 item 的 set来定义和初始化新列(此处命名)fileIndex。
感谢评论中提供的链接,我知道我应该使用functions.typedLit但此函数要求提供第二个参数,我不知道该提供什么。另外,我不知道如何定义列。最后一件事,提供的链接是 Python 中的,我正在寻找 Java API。
慕侠2389804
相关分类