猿问

当我从 ReplaySubject 使用 Observable 时阻止

我正在编写客户端-服务器应用程序。我从数据库中获取数据并将其放入 rxjava2 的 ReplaySubject(ReplaySubject 是必需的,因为我需要保证每个客户端上的数据相同)当客户端连接订阅它时,我想将此数据发送给他但是当我尝试它时我的头“可能的方式^_^”它阻止了。通过块我的意思是它不发送数据但是当我关闭服务器数据时立即显示在客户端。


我尝试在客户端和服务器端事件循环中添加一些线程(我在想可能是线程块,因为我使用“无限”源所以接收这个我需要另一个线程或类似的东西)。


服务器端通道代码:


public

    class ClientHandler

        extends SimpleChannelInboundHandler<DataWrapper> {



    private final Observable<DataWrapper> data;


    public ClientHandler(Observable<DataWrapper> data) {

        this.data = data;

    }



    @Override

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

        // super.channelRegistered(ctx);

        final Channel channel = ctx.channel();

        Server

            .INSTANCE

            .appendToChannelGroup(channel);


    }


    @Override

    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        // super.channelActive(ctx);

        // i believe there is something wrong

        data.subscribe(ctx::writeAndFlush);

    }


    @Override

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

        ctx.flush();

    }

    // rest skip

}

客户端:


public

    class DirectNetworkCommunicator

        extends SimpleChannelInboundHandler<DataWrapper> {



    private Observable<DataWrapper> generatedData;

    private ExecutorService fallbackThread;



    DirectNetworkCommunicator(Observable<DataWrapper> generatedData) {

        this.fallbackThread = Executors.newSingleThreadExecutor();

        this.generatedData = generatedData;

    }


    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        // super.channelRead(ctx, msg);

        DataWrapper inComingData = (DataWrapper) msg;

        Adapter

            .INSTANCE

            .appendFromNettworkData(inComingData);

    }


    @Override

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

        // super.channelReadComplete(ctx);

        ctx.flush();

    }

    // rest skip

}

所以我之前提到过我希望它在服务器关闭时接收数据,而不是在服务器关闭时接收数据 ^_^。如果那会帮助 netty 版本 4.1.37 final。


慕仙森
浏览 116回答 1
1回答

汪汪一只猫

好的,所以未来的人们会面临同样的问题,我自己找到了答案。来自客户端的 Netty 使用后台线程作为通信的主要线程,这意味着我要等待主线程释放,然后它才能对 observable 进行操作。希望它能帮助别人。
随时随地看视频慕课网APP

相关分类

Java
我要回答