我正在编写客户端-服务器应用程序。我从数据库中获取数据并将其放入 rxjava2 的 ReplaySubject(ReplaySubject 是必需的,因为我需要保证每个客户端上的数据相同)当客户端连接订阅它时,我想将此数据发送给他但是当我尝试它时我的头“可能的方式^_^”它阻止了。通过块我的意思是它不发送数据但是当我关闭服务器数据时立即显示在客户端。
我尝试在客户端和服务器端事件循环中添加一些线程(我在想可能是线程块,因为我使用“无限”源所以接收这个我需要另一个线程或类似的东西)。
服务器端通道代码:
public
class ClientHandler
extends SimpleChannelInboundHandler<DataWrapper> {
private final Observable<DataWrapper> data;
public ClientHandler(Observable<DataWrapper> data) {
this.data = data;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// super.channelRegistered(ctx);
final Channel channel = ctx.channel();
Server
.INSTANCE
.appendToChannelGroup(channel);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// super.channelActive(ctx);
// i believe there is something wrong
data.subscribe(ctx::writeAndFlush);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
// rest skip
}
客户端:
public
class DirectNetworkCommunicator
extends SimpleChannelInboundHandler<DataWrapper> {
private Observable<DataWrapper> generatedData;
private ExecutorService fallbackThread;
DirectNetworkCommunicator(Observable<DataWrapper> generatedData) {
this.fallbackThread = Executors.newSingleThreadExecutor();
this.generatedData = generatedData;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg);
DataWrapper inComingData = (DataWrapper) msg;
Adapter
.INSTANCE
.appendFromNettworkData(inComingData);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// super.channelReadComplete(ctx);
ctx.flush();
}
// rest skip
}
所以我之前提到过我希望它在服务器关闭时接收数据,而不是在服务器关闭时接收数据 ^_^。如果那会帮助 netty 版本 4.1.37 final。
汪汪一只猫
相关分类