猿问

将Java中的地图列表转换为spark中的数据集

我有一个 Java 中的 Map 列表,基本上代表行。


List<Map<String, Object>> dataList = new ArrayList<>();

Map<String, Object> row1 = new HashMap<>();

row1.put("fund", "f1");

row1.put("broker", "b1");

row1.put("qty", 100);


Map<String, Object> row2 = new HashMap<>();

row2.put("fund", "f2");

row2.put("broker", "b2");

row2.put("qty", 200);


dataList.add(row1);

dataList.add(row2);

我正在尝试从中创建一个 Spark DataFrame。


我试图将其转换为JavaRDD<Map<String, Object>>使用


JavaRDD<Map<String,Object>> rows = sc.parallelize(dataList);

但我不确定如何从这里转到Dataset<Row>. 我看过 Scala 示例,但没有看过 Java 示例。


我还尝试将列表转换为 JSON 字符串,并读取 JSON 字符串。


String jsonStr = mapper.writeValueAsString(dataList);

但似乎我必须将它写入文件然后使用读取


Dataset<Row> df = spark.read().json(pathToFile);

如果可能的话,我宁愿在内存中进行,而不是写入文件并从那里读取。


SparkConf sparkConf = new SparkConf().setAppName("SparkTest").setMaster("local[*]")

            .set("spark.sql.shuffle.partitions", "1");

JavaSparkContext sc = new JavaSparkContext(sparkConf);

    SparkSession sparkSession = 

SparkSession.builder().config(sparkConf).getOrCreate();


List<Map<String, Object>> dataList = new ArrayList<>();

Map<String, Object> row1 = new HashMap<>();

row1.put("fund", "f1");

row1.put("broker", "b1");

row1.put("qty", 100);


Map<String, Object> row2 = new HashMap<>();

row2.put("fund", "f2");

row2.put("broker", "b2");

row2.put("qty", 200);


dataList.add(row1);

dataList.add(row2);


ObjectMapper mapper = new ObjectMapper();

    

String jsonStr = mapper.writeValueAsString(dataList);

JavaRDD<Map<String,Object>> rows = sc.parallelize(dataList);

Dataset<Row> data = sparkSession.createDataFrame(rows, Map.class);

data.show();


德玛西亚99
浏览 255回答 4
4回答

胡说叔叔

您根本不需要使用 RDD。您需要做的是从地图列表中提取所需的架构,将地图列表转换为行列表,然后使用spark.createDataFrame.在 Java 中,这有点痛苦,尤其是在创建Row对象时,但它是这样进行的:List<String> cols = new ArrayList(dataList.get(0).keySet());List<Row> rows = dataList&nbsp; &nbsp; .stream()&nbsp; &nbsp; .map(row -> cols.stream().map(c -> (Object) row.get(c).toString()))&nbsp; &nbsp; .map(row -> row.collect(Collectors.toList()))&nbsp; &nbsp; .map(row -> JavaConverters.asScalaBufferConverter(row).asScala().toSeq())&nbsp; &nbsp; .map(Row$.MODULE$::fromSeq)&nbsp; &nbsp; .collect(Collectors.toList());StructType schema = new StructType(&nbsp; &nbsp; cols.stream()&nbsp; &nbsp; &nbsp; &nbsp; .map(c -> new StructField(c, DataTypes.StringType, true, new Metadata()))&nbsp; &nbsp; &nbsp; &nbsp; .collect(Collectors.toList())&nbsp; &nbsp; &nbsp; &nbsp; .toArray(new StructField[0]));Dataset<Row> result = spark.createDataFrame(rows, schema);

慕桂英546537

spark 文档已经指出了如何加载内存中的 json 字符串。这是来自https://spark.apache.org/docs/latest/sql-data-sources-json.html的示例// Alternatively, a DataFrame can be created for a JSON dataset represented by// a Dataset<String> storing one JSON object per string.List<String> jsonData = Arrays.asList(&nbsp; &nbsp; &nbsp; &nbsp; "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);anotherPeople.show();// +---------------+----+// |&nbsp; &nbsp; &nbsp; &nbsp; address|name|// +---------------+----+// |[Columbus,Ohio]| Yin|// +---------------+----+

慕标5832272

public class MyRow implements Serializable {&nbsp; private String fund;&nbsp; private String broker;&nbsp; private int qty;&nbsp; public MyRow(String fund, String broker, int qty) {&nbsp; &nbsp; super();&nbsp; &nbsp; this.fund = fund;&nbsp; &nbsp; this.broker = broker;&nbsp; &nbsp; this.qty = qty;&nbsp; }&nbsp; public String getFund() {&nbsp; &nbsp; return fund;&nbsp; }&nbsp; public void setFund(String fund) {&nbsp; &nbsp; this.fund = fund;&nbsp; }&nbsp; public String getBroker() {&nbsp; &nbsp; return broker;&nbsp; }&nbsp; public void setBroker(String broker) {&nbsp; &nbsp; this.broker = broker;&nbsp; }&nbsp; public int getQty() {&nbsp; &nbsp; return qty;&nbsp; }&nbsp; public void setQty(int qty) {&nbsp; &nbsp; this.qty = qty;&nbsp; }}现在创建一个 ArrayList。此列表中的每个项目都将充当最终数据框中的行。MyRow r1 = new MyRow("f1", "b1", 100);MyRow r2 = new MyRow("f2", "b2", 200);List<MyRow> dataList = new ArrayList<>();dataList.add(r1);dataList.add(r2);现在我们必须将此列表转换为数据集 -Dataset<Row> ds = spark.createDataFrame(dataList, MyRow.class);ds.show()

慕姐4208626

import org.apache.spark.api.java.function.Function;private static JavaRDD<Map<String, Object>> rows;private static final Function f = (Function<Map<String, Object>, Row>) strObjMap -> RowFactory.create(new TreeMap<String, Object>(strObjMap).values().toArray(new Object[0]));public void test(){&nbsp; &nbsp; rows = sc.parallelize(list);&nbsp; &nbsp; JavaRDD<Row> rowRDD = rows.map(f);&nbsp; &nbsp; Map<String, Object> headMap = list.get(0);&nbsp; &nbsp; TreeMap<String, Object> headerMap = new TreeMap<>(headMap);&nbsp; &nbsp; List<StructField> fields = new ArrayList<>();&nbsp; &nbsp; StructField field;&nbsp; &nbsp; for (String key : headerMap.keySet()) {&nbsp; &nbsp; &nbsp; &nbsp; System.out.println("key:::"+key);&nbsp; &nbsp; &nbsp; &nbsp; Object value = list.get(0).get(key);&nbsp; &nbsp; &nbsp; &nbsp; if (value instanceof Integer) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; field = DataTypes.createStructField(key, DataTypes.IntegerType, true);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; else if (value instanceof Double) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; field = DataTypes.createStructField(key, DataTypes.DoubleType, true);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; else if (value instanceof Date || value instanceof java.util.Date) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; field = DataTypes.createStructField(key, DataTypes.DateType, true);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; field = DataTypes.createStructField(key, DataTypes.StringType, true);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fields.add(field);&nbsp; &nbsp; }&nbsp; &nbsp; StructType struct = DataTypes.createStructType(fields);&nbsp; &nbsp; Dataset<Row> data = this.spark.createDataFrame(rowRDD, struct);}
随时随地看视频慕课网APP

相关分类

Java
我要回答