我有一个 Java 中的 Map 列表,基本上代表行。
List<Map<String, Object>> dataList = new ArrayList<>();
Map<String, Object> row1 = new HashMap<>();
row1.put("fund", "f1");
row1.put("broker", "b1");
row1.put("qty", 100);
Map<String, Object> row2 = new HashMap<>();
row2.put("fund", "f2");
row2.put("broker", "b2");
row2.put("qty", 200);
dataList.add(row1);
dataList.add(row2);
我正在尝试从中创建一个 Spark DataFrame。
我试图将其转换为JavaRDD<Map<String, Object>>使用
JavaRDD<Map<String,Object>> rows = sc.parallelize(dataList);
但我不确定如何从这里转到Dataset<Row>. 我看过 Scala 示例,但没有看过 Java 示例。
我还尝试将列表转换为 JSON 字符串,并读取 JSON 字符串。
String jsonStr = mapper.writeValueAsString(dataList);
但似乎我必须将它写入文件然后使用读取
Dataset<Row> df = spark.read().json(pathToFile);
如果可能的话,我宁愿在内存中进行,而不是写入文件并从那里读取。
SparkConf sparkConf = new SparkConf().setAppName("SparkTest").setMaster("local[*]")
.set("spark.sql.shuffle.partitions", "1");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SparkSession sparkSession =
SparkSession.builder().config(sparkConf).getOrCreate();
List<Map<String, Object>> dataList = new ArrayList<>();
Map<String, Object> row1 = new HashMap<>();
row1.put("fund", "f1");
row1.put("broker", "b1");
row1.put("qty", 100);
Map<String, Object> row2 = new HashMap<>();
row2.put("fund", "f2");
row2.put("broker", "b2");
row2.put("qty", 200);
dataList.add(row1);
dataList.add(row2);
ObjectMapper mapper = new ObjectMapper();
String jsonStr = mapper.writeValueAsString(dataList);
JavaRDD<Map<String,Object>> rows = sc.parallelize(dataList);
Dataset<Row> data = sparkSession.createDataFrame(rows, Map.class);
data.show();
胡说叔叔
慕桂英546537
慕标5832272
慕姐4208626
相关分类