Spring Boot中如何映射RSocket的所有交互模型

RSocket 中提供了 4 种交互模型。

  • 开除即忘

  • 请求和响应

  • 请求流

  • 请求通道

  • (元数据推送)

Spring(和 Spring Boot)提供 RSocket 集成,可以很容易地使用现有的消息传递基础设施构建 RSocket 服务器以隐藏原始 RSocket API。

   @MessageMapping("hello")

    public Mono<Void> hello(Greeting p) {

        log.info("received: {} at {}", p, Instant.now());

        return Mono.empty();

    }


    @MessageMapping("greet.{name}")

    public Mono<String> greet(@DestinationVariable String name, @Payload Greeting p) {

        log.info("received: {}, {} at {}", name, p, Instant.now());

        return Mono.just("Hello " + name + ", " + p.getMessage() + " at " + Instant.now());

    }


    @MessageMapping("greet-stream")

    public Flux<String> greetStream(@Payload Greeting p) {

        log.info("received:  {} at {}", p, Instant.now());

        return Flux.interval(Duration.ofSeconds(1))

                .map(i -> "Hello #" + i + "," + p.getMessage() + " at " + Instant.now());

    }

在客户端,提供了RescoketRequester与服务器握手的功能。


    @GetMapping("hello")

    Mono<Void> hello() {

        return this.requester.route("hello").data(new Greeting("Welcome to Rsocket")).send();

    }


    @GetMapping("name/{name}")

    Mono<String> greet(@PathVariable String name) {

        return this.requester.route("greet." + name).data(new Greeting("Welcome to Rsocket")).retrieveMono(String.class);

    }


    @GetMapping(value = "stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)

    Flux<String> greetStream() {

        return this.requester.route("greet-stream").data(new Greeting("Welcome to Rsocket"))

                .retrieveFlux(String.class)

                .doOnNext(msg -> log.info("received messages::" + msg));

    }

但是如何使用请求通道和 Spring 方式的元数据推送模型(使用消息传递基础结构)?


示例代码在Github上。更新:添加了requestChannel示例。


更新:SETUP和METADATA_PUSH可以由@ConnectMapping. Spring Security RSocket 可以保护SETUP和REQUEST。


泛舟湖上清波郎朗
浏览 145回答 1
1回答

函数式编程

此提交已在发行说明中提到:<…>RSocket支持包括通过注释方法的响应处理@MessageMapping和通过.RSocketRequester<…>— Spring Framework 5.2.0.M1 现已可用。渠道互动模型参考示例对应的代码部分:@MessageMapping("echo-channel") Flux<String> echoChannel(Flux<String> payloads) {     return payloads.delayElements(Duration.ofMillis(10)).map(payload -> payload + " async"); }元数据推送目前看来,注释不支持它@MessageMapping。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java