系列文章
Spring 响应式编程 随记 – C1 为什么选择响应式 Spring
Spring 响应式编程 随记 – C2 Spring 响应式编程基本概念 (一)
Spring 响应式编程 随记 – C2 Spring 响应式编程基本概念 (二)
Spring 响应式编程 随记 – C2 Spring 响应式编程基本概念 (三)
Spring 响应式编程 随记 – C2 Spring 响应式编程基本概念 (四)
Spring 响应式编程 随记 – C2 Spring 响应式编程基本概念 (五)
Spring 响应式编程 随记 – C3 响应式流 新的标准流 (一)
Spring 响应式编程 随记 – C3 响应式流 新的标准流 (二)
2.2.6 用 RxJava 重建温度传感器示例程序
重写之前的"显示房间温度"的温度传感器应用代码,使用 RxJava 需要手动在 gradle 或 pom 文件中引入依赖项。
此处仍然使用同样的类来表示温度.
public class Temperature{
private final double value;
}
为了模拟传感器,和之前一样我们需要实现一个 TemperatureSensor类,加上 @Component 注解。
他需要返回的是一个响应数据流。
1 模拟传感器:
@Component
public class TemperatureSensor{
private final Random random = new Random();
@Getter
private final Observable<Temperature> dataStream = Observable.range(0, Integer.MAX_VALUR)
.concatMap(tick -> Observable.just(tick)
.delay(random.nextInt(5000), MILLISENCONDS)
.map(tickValue -> this.probe())
)
.publish()
.refCount();
private Temperature probe(){
return new Temperature(16 + random.nextGaussian() * 10);
}
}
random 是一个模拟的硬件传感器测量值,dataStream 是组件定义的唯一 Observable 流。
Observable.range(0, Integer.MAX_VALUR)
会产生一个从 0 到 Integer.MAX_VALUR 的数字流序列,concatMap 用来连接 N 个生成的流,concatMap 接受的是一个 lambda 函数,这个函数把每一项接受的参数 tick 转化成一个流。
这个函数创建了一个只有一个结果的新流,并且使用 .delay(random.nextInt(5000), MILLISENCONDS)
做了一个随机延迟模拟。
.map(tickValue -> this.probe())
是一个转换操作,生成一个随机数,模拟传感器的温度值。
于是我们有了一个 Observable 流 dataStream,它会返回模拟的传感器数值,并且两个元素发送的随机时间间隔最多是5秒。
.publish()
可以将源流中的事件广播到所有的目标流,返回的是 ConnectableObservable
。
.refCount()
则实现的是保证仅在存在至少一个传出订阅时,才创建对传入共享流的订阅。
通过 .publish()
和 .refCount()
可以防止每一个订阅者都去出发新的传感器读数序列,以及防止没有订阅的时候内部去订阅了传感器流。
2 自定义 SseEmitter:
TemperatureSensor 对外暴露了一个流,我们可以把每一个新的 SseEmitter 订阅到流上。SseEmitter 的唯一用途就是用于发送 SSE 事件,我们需要进行一些拓展使其支持自定义的 onNext 等响应式流信号实现。
class RxSseEmitter extends SseEmitter {
static final long SSE_SESSION_TIMEOUT = 30 * 60 * 1000L;
@Getter
private final Subscriber<Temperature> subscriber;
public RxSseEmitter(){
super(SSE_SESSION_TIMEOUT);
this.subscriber = new Subscriber<Temperature>(){
@Override
public void onNext(Temperature temperature){
try{
RxSseEmitter.this.send(temperature);
}catch{
unsubscribe();
}
}
@Override public void onError(Throwable e){}
@Override public void onCompleted(){}
};
onCompletion(subscriber::unsubscribe);
onTimeout(subscriber::unsubscribe);
}
}
因为在这个场景中,传感器产生的温度数据说一个几乎无限的正确数字序列,默认不会出错的,所以不需要对 onError 和 onCompleted 做任何处理。
对于流的完成 onCompletion 和 超时 onTimeout 会默认直接取消订阅。
Subscriber<Temperature>
通过 getter 函数获取。
3 暴露SSE端点:
用一个自动装配的 RestController 即可暴露公开这个API
@RestController
public class TemperatureController {
private final TemperatureSensor temperatureSensor;
public TemperatureController(TemperatureSensor temperatureSensor){
this.temperatureSensor = temperatureSensor;
}
@GetMapping("/temperature-stream")
public SseEmitter events(HttpServletRequest request){
RxSseEmitter emitter = new RxSseEmitter();
temperatureSensor.getDataStream()
.subscribe(emitter.getSubscriber());
return emitter;
}
}
当创建了新的 SSE 会话,controller 会实例化一个新的 RxSseEmitter 并且把订阅者订阅到数据流上,再将 RxSseEmitter 实例返回到 Servlet 容器来处理。
4 应用程序配置:
我们不需要加任何的额外配置,因为我们使用了RxJava的框架,所以不再需要去配置 Spring 的 Executor 以及 @EnableAsync
上述代码就已经用 RxJava 重建温度传感器示例程序了,相比之前用Spring框架去自己实现Event的处理,RxJava 更加方便也更加好用。