我想在 Spring 中mysql使用Flux<JSONObject>流公开来自数据库的聚合结果。
@RestController
public class FluxController {
@GetMapping("/", produces = TEXT_EVENT_STREAM_VALUE)
public Flux<JSONObject> stream() {
return service.getJson();
}
}
@Service
public class DatabaseService {
public List<JSONObject> getJson() {
List<Long> refs = jdbc.queryForList(...);
MapSqlParameterSource params = new MapSqlParameterSource();
params.addValue("refs", refs);
//of course real world sql is much more complex
List<Long, Product> products = jdbc.query(SELECT * from products where ref IN (:refs), params);
List<Long, Item> items = jdbc.query(SELECT * from items where ref IN (:refs), params);
List<Long, Warehouse> warehouses = jdbc.query(SELECT * from warehouses where ref IN (:refs), params);
List<JSONObject> results = new ArrayList<>();
for (Long ref : refs) {
JSONObject json = new JSONObject();
json.put("ref", ref);
json.put("product", products.get(ref));
json.put("item", items.get(ref));
json.put("warehouse", warehouses.get(ref));
results.add(json);
}
return results;
}
现在我想将其转换为通量,将其作为事件流公开。但是我怎样才能并行化数据库查找并将它链接在一起成为一个通量呢?
public Flux<JSONObject> getJsonFlux() {
//I need this as source
List<Long> refs = jdbc.queryForList(...);
return Flux.fromIterable(refs).map(refs -> {
//TODO how to aggregate the different database calls concurrently?
//and then expose each JSONObject one by one into the stream as soon as it is build?
};
}
旁注:我知道这仍然会阻塞。但在我的实际应用程序中,我正在应用分页和分块,所以每个块都会在准备好时暴露给流。
然后主要问题是我不知道如何并行化,然后聚合/合并结果,例如在最后一个通量步骤中。
慕工程0101907
白猪掌柜的
相关分类