猿问

在 flatmap 操作之后访问 map 操作中的 Mono 对象

我正在尝试使用自定义过滤器使用 Spring Cloud Gateway 创建网关代理路由器。当以阻塞和命令的方式覆盖属性时,一切都按预期工作。


exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + newTargetURLHost + )

字符串变量 newTargetURLHost 是通过使用获得的:


newTarget = serviceReturnsMono.getServerMapping(id).block().getHost();

我是 Webflux 的新手,但上面这行代码对我来说已经是一种代码味道了。进一步阅读后,这不是使用反应式的最佳方法。我试图以更具功能性/反应性的方式重写,但未能让反应流发出所需的值。


Mono.just(serviceReturnsMono.getServerMapping(id))

                    .flatMap(flat -> flat)

                    .subscribeOn(Schedulers.immediate())

                    .map(server -> exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + server.getHost() ))

                    .subscribe();

当上面的代码执行时,exchange 属性不会发生变化。


我也尝试了以下但没有成功:


            serverMappingMono

                   .map(serverMapping -> exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + serverMapping.getHost() ))

                    .subscribe();

作为测试,当我修改下面的代码进行故障排除时,下面确实发出了一个硬编码字符串并且交换属性发生了变化。


                    .map(server -> exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + "testHostName" ))

                    .subscribe();

任何想法或指示将不胜感激。


更新:过滤代码如下:



    private ReturnsMonoServerMappingService returnsMonoServerMappingService;


    @Override

    public int getOrder() {

        return 10001;

    }


    @Override

    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {


        final String id = exchange.getRequest().getHeaders().getFirst("reference");


        return chain.filter(exchange).then(Mono.fromRunnable(() -> {


            Mono<ServerMapping> serverMapping = returnsMonoServerMappingService.getServerMapping(id);

            serverMapping

                    .map(server -> exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + server.getHost()  )))

                    .subscribe();


        }));

    }

}



我忽略的缺失部分是“doOnSuccess”,因为 returnsMonoServerMappingService 已经返回一个单声道。然后通过“then”链接交换以委托给链中的下一个过滤器。


慕丝7291255
浏览 259回答 1
1回答

MMTTMM

我在手机上写这个,所以无法测试它,我是凭记忆写的,但我认为它应该是这样的。或者至少你明白了要点。我们首先提取id。然后我们查找服务器映射,如果一切顺利,我们将其作为一个属性,然后我们继续过滤器链。您几乎不应该在应用程序中订阅,调用客户端通常是订阅者。永远不要阻塞反应性应用程序。@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {&nbsp; &nbsp; final String id = exchange.getRequest()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.getHeaders()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.getFirst("reference");&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; return returnsMonoServerMappingService.getServerMapping(id)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.doOnSuccess(serverMapping -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;exchange.getAttributes()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + server.getHost()&nbsp; &nbsp; &nbsp; &nbsp; }).then(chain.filter(exchange));}
随时随地看视频慕课网APP

相关分类

Java
我要回答