如何在 Spring Flux 中并行化数据库查询?

我想在 Spring 中mysql使用Flux<JSONObject>流公开来自数据库的聚合结果。


@RestController

public class FluxController {

     @GetMapping("/", produces = TEXT_EVENT_STREAM_VALUE)

     public Flux<JSONObject> stream() {

          return service.getJson();

     }

}


@Service

public class DatabaseService {

    public List<JSONObject> getJson() {

        List<Long> refs = jdbc.queryForList(...);

        MapSqlParameterSource params = new MapSqlParameterSource();

        params.addValue("refs", refs);


        //of course real world sql is much more complex

        List<Long, Product> products = jdbc.query(SELECT * from products where ref IN (:refs), params);

        List<Long, Item> items = jdbc.query(SELECT * from items where ref IN (:refs), params);

        List<Long, Warehouse> warehouses = jdbc.query(SELECT * from warehouses where ref IN (:refs), params);


        List<JSONObject> results = new ArrayList<>();

        for (Long ref : refs) {

            JSONObject json = new JSONObject();

            json.put("ref", ref);

            json.put("product", products.get(ref));

            json.put("item", items.get(ref));

            json.put("warehouse", warehouses.get(ref));

            results.add(json);

        }


        return results;

    }

现在我想将其转换为通量,将其作为事件流公开。但是我怎样才能并行化数据库查找并将它链接在一起成为一个通量呢?


    public Flux<JSONObject> getJsonFlux() {

        //I need this as source

        List<Long> refs = jdbc.queryForList(...);


        return Flux.fromIterable(refs).map(refs -> {

            //TODO how to aggregate the different database calls concurrently?

            //and then expose each JSONObject one by one into the stream as soon as it is build?

        };

    }

旁注:我知道这仍然会阻塞。但在我的实际应用程序中,我正在应用分页和分块,所以每个块都会在准备好时暴露给流。


然后主要问题是我不知道如何并行化,然后聚合/合并结果,例如在最后一个通量步骤中。


qq_花开花谢_0
浏览 155回答 2
2回答

慕工程0101907

这个想法是首先获取 的完整列表refs,然后同时获取 Products、Items 和 Warehouses——我称之为 Tuple3 lookups。然后将每一个ref与组合起来,并一一lookups转换。JSONObjectreturn Mono.fromCallable(jdbc::queryForList) //fetches refs                .subscribeOn(Schedulers.elastic())                .flatMapMany(refList -> { //flatMapMany allows to convert Mono to Flux in flatMap operation                            Flux<Tuple3<Map<Long, Product>, Map<Long, Item>, Map<Long, Warehouse>>> lookups = Mono.zip(fetchProducts(refList), fetchItems(refList), fetchWarehouses(refList))                                    .cache().repeat(); //notice cache - it makes sure that Mono.zip is executed only once, not for each zipWith call                            return Flux.fromIterable(refList)                                    .zipWith(lookups);                        }                )                .map(t -> {                    Long ref = t.getT1();                    Tuple3<Map<Long, Product>, Map<Long, Item>, Map<Long, Warehouse>> lookups = t.getT2();                    JSONObject json = new JSONObject();                    json.put("ref", ref);                    json.put("product", lookups.getT1().get(ref));                    json.put("item", lookups.getT2().get(ref));                    json.put("warehouse", lookups.getT3().get(ref));                    return json;                });每个数据库调用的方法:Mono<Map<Long, Product>> fetchProducts(List<Long> refs) {    return Mono.fromCallable(() -> jdbc.query(SELECT * from products where ref IN(:refs),params))        .subscribeOn(Schedulers.elastic());}Mono<Map<Long, Item>> fetchItems(List<Long> refs) {    return Mono.fromCallable(() -> jdbc.query(SELECT * from items where ref IN(:refs),params))        .subscribeOn(Schedulers.elastic());}Mono<Map<Long, Warehouse>> fetchWarehouses(List<Long> refs) {    return Mono.fromCallable(() -> jdbc.query(SELECT * from warehouses where ref IN(:refs),params))        .subscribeOn(Schedulers.elastic());}为什么我需要订阅?我之所以这样说是因为两个原因:它允许在专用线程池的线程上执行数据库查询,从而防止阻塞主线程它允许真正并行化Mono.zip。看到这个,它是关于的flatMap,但它也适用于zip.flatMap()为了完整起见,在 zip 结果上使用时也是可能的。虽然我不确定.cache()这里是否还有必要。  .flatMapMany(refList -> {        Mono.zip(fetchProducts(refList), fetchItems(refList), fetchWarehouses(refList)).cache()            .flatMap(tuple -> Flux.fromIterable(refList).map(refId -> Tuples.of(refId, tuple)));    .map(tuple -> {        String refId = tuple.getT1();        Tuple lookups = tuple.getT2();    }})

白猪掌柜的

如果我理解得很好,您想通过将所有引用作为参数传递来执行查询。它不会真正成为一个事件流,因为它会等到所有查询都完成并且所有 json 对象都在内存中,然后才开始流式传输它们。public Flux<JSONObject> getJsonFlux(){&nbsp; &nbsp; return Mono.fromCallable(jdbc::queryForList)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.subscribeOn(Schedulers.elastic()) // elastic thread pool meant for blocking IO, you can use a custom one&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.flatMap(this::queryEntities)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.map(this::createJsonObjects)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.flatMapMany(Flux::fromIterable);}private Mono<Tuple4<List<Long>, List<Product>, List<Item>, List<Warehouse>>> queryEntities(List<Long> refs){&nbsp; &nbsp; Mono<List<Product>> products = Mono.fromCallable(() -> jdbc.queryProducts(refs)).subscribeOn(Schedulers.elastic());&nbsp; &nbsp; Mono<List<Item>> items = Mono.fromCallable(() -> jdbc.queryItems(refs)).subscribeOn(Schedulers.elastic());&nbsp; &nbsp; Mono<List<Warehouse>> warehouses = Mono.fromCallable(() -> jdbc.queryWarehouses(refs)).subscribeOn(Schedulers.elastic());&nbsp; &nbsp; return Mono.zip(Mono.just(refs), products, items, warehouses); // query calls will be concurrent}private List<JSONObject> createJsonObjects(Tuple4<List<Long>, List<Product>, List<Item>, List<Warehouse>> tuple){&nbsp; &nbsp; List<Long> refs = tuple.getT1();&nbsp; &nbsp; List<Product> products = tuple.getT2();&nbsp; &nbsp; List<Item> items = tuple.getT3();&nbsp; &nbsp; List<Warehouse> warehouses = tuple.getT4();&nbsp; &nbsp; List<JSONObject> jsonObjects = new ArrayList<>();&nbsp; &nbsp; for (Long ref : refs)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; JSONObject json = new JSONObject();&nbsp; &nbsp; &nbsp; &nbsp; // build json object here&nbsp; &nbsp; &nbsp; &nbsp; jsonObjects.add(json);&nbsp; &nbsp; }&nbsp; &nbsp; return jsonObjects;}另一种方法是分别查询每个引用的实体。这样每个 JSONObject 都被单独查询,并且它们可以在流中交错。我不确定数据库如何处理这种负载。这是你应该考虑的事情。public Flux<JSONObject> getJsonFlux(){&nbsp; &nbsp; return Mono.fromCallable(jdbc::queryForList)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.flatMapMany(Flux::fromIterable)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.subscribeOn(Schedulers.elastic()) // elastic thread pool meant for blocking IO, you can use a custom one&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.flatMap(this::queryEntities)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.map(this::createJsonObject);}private Mono<Tuple4<Long, Product, Item, Warehouse>> queryEntities(Long ref){&nbsp; &nbsp; Mono<Product> product = Mono.fromCallable(() -> jdbc.queryProduct(ref)).subscribeOn(Schedulers.elastic());&nbsp; &nbsp; Mono<Item> item = Mono.fromCallable(() -> jdbc.queryItem(ref)).subscribeOn(Schedulers.elastic());&nbsp; &nbsp; Mono<Warehouse> warehouse = Mono.fromCallable(() -> jdbc.queryWarehouse(ref))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.subscribeOn(Schedulers.elastic());&nbsp; &nbsp; return Mono.zip(Mono.just(ref), product, item, warehouse); // query calls will be concurrent}private JSONObject createJsonObject(Tuple4<Long, Product, Item, Warehouse> tuple){&nbsp; &nbsp; Long ref = tuple.getT1();&nbsp; &nbsp; Product product = tuple.getT2();&nbsp; &nbsp; Item item = tuple.getT3();&nbsp; &nbsp; Warehouse warehouse = tuple.getT4();&nbsp; &nbsp; JSONObject json = new JSONObject();&nbsp; &nbsp; // build json object here&nbsp; &nbsp; return json;}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java