猿问

如何在Java中使Vertx MongoClient操作同步但不阻塞事件循环?

我正在尝试使用Vertx MongoClient将新文档保存到MongoDB,如下所示:


MongoDBConnection.mongoClient.save("booking", query, res -> {

    if(res.succeeded()) {

        documentID = res.result();

        System.out.println("MongoDB inserted successfully. + document ID is : " + documentID);

    }

    else {

        System.out.println("MongoDB insertion failed.");

    }

});


if(documentID != null) {


    // MongoDB document insertion successful. Reply with a booking ID

    String resMsg = "A confirmed booking has been successfully created with booking id as " + documentID + 

        ". An email has also been triggered to the shared email id " + emailID;


    documentID = null;


    return new JsonObject().put("fulfillmentText", resMsg);

}

else {

    // return intent response

    documentID = null;

    return new JsonObject().put("fulfillmentText", 

        "There is some issues while booking the shipment. Please start afreash.");

}

上面的代码成功地将查询jsonObject写入MongoDB集合。但是,包含此代码的函数始终以 返回 。bookingThere is some issues while booking the shipment. Please start afreash


发生这种情况可能是因为MongoClient处理程序“res”是异步的。但是,我想返回基于成功操作和失败保存操作的条件响应。save()save()


如何在Vertx Java中实现它?


桃花长相依
浏览 99回答 2
2回答

幕布斯6054654

您的假设是正确的,您不会等待来自数据库的异步响应。你能做的,就是把它包装在一个像这样的未来里:&nbsp; public Future<JsonObject> save() {&nbsp; &nbsp; Future<JsonObject> future = Future.future();&nbsp; &nbsp; MongoDBConnection.mongoClient.save("booking", query, res -> {&nbsp; &nbsp; &nbsp; if(res.succeeded()) {&nbsp; &nbsp; &nbsp; &nbsp; documentID = res.result();&nbsp; &nbsp; &nbsp; &nbsp; if(documentID != null) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("MongoDB inserted successfully. + document ID is : " + documentID);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String resMsg = "A confirmed booking has been successfully created with booking id as " + documentID +&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ". An email has also been triggered to the shared email id " + emailID;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; future.complete(new JsonObject().put("fulfillmentText", resMsg));&nbsp; &nbsp; &nbsp; &nbsp; }else{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; future.complete(new JsonObject().put("fulfillmentText",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "There is some issues while booking the shipment. Please start afreash."))&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; System.out.println("MongoDB insertion failed.");&nbsp; &nbsp; &nbsp; &nbsp; future.fail(res.cause());&nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; });&nbsp; &nbsp; return future;&nbsp; }然后我假设你有和端点最终调用它,例如:router.route("/book").handler(this::addBooking);...然后,您可以调用 save 方法并根据结果提供不同的响应public void addBooking(RoutingContext ctx){&nbsp; &nbsp; save().setHandler(h -> {&nbsp; &nbsp; &nbsp; &nbsp; if(h.succeeded()){&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ctx.response().end(h.result());&nbsp; &nbsp; &nbsp; &nbsp; }else{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ctx.response().setStatusCode(500).end(h.cause());&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; })}

梵蒂冈之花

您可以使用&nbsp;RxJava 2&nbsp;和反应式 Mongo 客户端&nbsp;(io.vertx.reactivex.ext.mongo.MongoClient)下面是一个代码片段:部署程序public class Deployer extends AbstractVerticle {&nbsp; &nbsp;private static final Logger logger = getLogger(Deployer.class);&nbsp; &nbsp;@Override&nbsp; &nbsp;public void start(Future<Void> startFuture) {&nbsp; &nbsp; &nbsp; DeploymentOptions options = new DeploymentOptions().setConfig(config());&nbsp; &nbsp; &nbsp; JsonObject mongoConfig = new JsonObject()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .put("connection_string",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String.format("mongodb://%s:%s@%s:%d/%s",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; config().getString("mongodb.username"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; config().getString("mongodb.password"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; config().getString("mongodb.host"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; config().getInteger("mongodb.port"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; config().getString("mongodb.database.name")));&nbsp; &nbsp; &nbsp; MongoClient client = MongoClient.createShared(vertx, mongoConfig);&nbsp; &nbsp; &nbsp; RxHelper.deployVerticle(vertx, new BookingsStorage(client), options)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .subscribe(e -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;logger.info("Successfully Deployed");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;startFuture.complete();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }, error -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;logger.error("Failed to Deployed", error);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;startFuture.fail(error);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; });&nbsp; &nbsp;}}预订存储public class BookingsStorage extends AbstractVerticle {&nbsp; &nbsp;private MongoClient mongoClient;&nbsp; &nbsp;public BookingsStorage(MongoClient mongoClient) {&nbsp; &nbsp; &nbsp; this.mongoClient = mongoClient;&nbsp; &nbsp;}&nbsp; &nbsp;@Override&nbsp; &nbsp;public void start() {&nbsp; &nbsp; &nbsp; var eventBus = vertx.eventBus();&nbsp; &nbsp; &nbsp; eventBus.consumer("GET_ALL_BOOKINGS_ADDRESS", this::getAllBookings);&nbsp; &nbsp;}&nbsp; &nbsp;private void getAllBookings(Message msg) {&nbsp; &nbsp; &nbsp; mongoClient.rxFindWithOptions("GET_ALL_BOOKINGS_COLLECTION", new JsonObject(), sortByDate())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .subscribe(bookings -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// do something with bookings&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;msg.reply(bookings);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; error -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;fail(msg, error);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; );&nbsp; &nbsp;}&nbsp; &nbsp;private void fail(Message msg, Throwable error) {&nbsp; &nbsp; &nbsp; msg.fail(500, "An unexpected error occurred: " + error.getMessage());&nbsp; &nbsp;}&nbsp; &nbsp;private FindOptions sortByDate() {&nbsp; &nbsp; &nbsp; return new FindOptions().setSort(new JsonObject().put("date", 1));&nbsp; &nbsp;}}HttpRouterVerticle// inside a router handler:&nbsp;&nbsp;vertx.eventBus().rxSend("GET_ALL_BOOKINGS_ADDRESS", new JsonObject())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.subscribe(bookings -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// do something with bookings&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; e -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // handle error&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;});&nbsp;
随时随地看视频慕课网APP

相关分类

Java
我要回答