##引言 接触过 EventBus 和 RxJava 的都知道,可以用 RxJava 来实现 EventBus,网上随便一搜,就可以拿得到代码。但是究竟为什么可以这么做?却没有类似的文章作进一步的深度解析。(本文假定读者都已经了解 EventBus 和 RxJava 是什么,可以做什么。)
public class RxBus { private static volatile RxBus instance; private final SerializedSubject<Object, Object> subject; private RxBus() {
subject = new SerializedSubject<>(PublishSubject.create());
} public static RxBus getInstance() { if (instance == null) { synchronized (RxBus.class) { if (instance == null) {
instance = new RxBus();
}
}
} return instance;
} public void post(Object object) {
subject.onNext(object);
} private <T> Observable<T> toObservable(final Class<T> type) { return subject.ofType(type);
} public boolean hasObservers() { return subject.hasObservers();
}
}可以看到,代码非常简练,当 RxJava 遇上 EventBus ,居然会如此神奇~那么问题来了,为什么这段代码就可以实现 EventBus 的功能?
要搞明白这几个问题,我们得弄清楚这些东西。
EventBus是如何实现的?RxJava满足了实现EventBus的哪些条件?如何用
RxJava去封装一个EventBus?
EventBus工作流程
简单讲,事件总线,顾名思义,分为两个概念,一个事件,即 Event ,一个总线,即 Bus ,这是在整个 APP 里一种规范地传递事件的方式。作为独立于项目里各个模块的 Application 级别的存在,可以很好地用来程序的解耦。使用大致有四个步骤:注册→发送→接收→取消注册。具体的源码分析,可以参看 codeKK 上 Trinea 的 EventBus源码解析 和 kymjs张涛 的 EventBus源码研读。
更重要的一点,RxBus 的重点应该在 Bus 上,而不是 RxJava 上。用 RxJava 去实现 EventBus 的思想。因此,应该把分析 EventBus 作为一个重点。
我们来看看要实现一个 EventBus 需要满足什么条件。
获取一个
EventBus实例,可以用单例,也可以用Builder;注册
EventBus和取消注册EventBus;发送和接收事件的方法。
##RxJava && EventBus 要实现 EventBus 需要满足的条件,在 RxJava 里是如何体现的呢?
首先我们需要明确的是,EventBus 里都有哪些角色:Event、Subscriber、Publisher,也就是说需要Event、Observer、Observable,Event 自不必说,在 RxJava 里既能充当Observer,又能充当Observable的对象就是 Subject,而 Subject 是线程非安全的,我们要构造一个线程安全的 Subject ,需要用到它的子类 SerializedSubject,而实际使用的时候,我们的观察者只需要订阅发生之后的,来自 Observable 的数据,因此还需要给 SerializedSubject 传入 PublishSubject 作为参数。
获取
Bus实例,一个单例即可,当然,EventBus还提供了使用Builder创建实例的方法,可根据具体情况自行实现;private static volatile RxBus instance;private RxBus() { subject = new SerializedSubject<>(PublishSubject.create()); }public static RxBus getInstance() { if (instance == null) { synchronized (RxBus.class) { if (instance == null) { instance = new RxBus(); } } } return instance; }注册和取消注册
Bus;EventBus的注册过程,就是对接收某个事件的所有方法进行subscribe(),在subscribe()方法里拿到这些的方法,把这些方法存进subscriberMethods(一个 List 集合)中,然后把事件的类名作为 key ,把subscriberMethods作为 value ,存进methodCache(一个 HashMap 缓存)里,这样就不用每次都去反射了。这里需要注意一点,EventBus里用methodCachecache 下来的不是Observer,也不是Observable,而是Observable.subscribe(Observer),即Subscription,那么如果用RxJava,该怎么去实现这么个功能呢?在 RxJava 里有这样一个类CompositeSubscription,对应的是一个存储着Subscription的HashSet,因此我们只需要将接收事件的方法 add 进一个CompositeSubscription,在生命周期结束的时候,再把CompositeSubscription取消订阅即可。明确了上面的流程,对 RxJava 的封装就好办了。我们只需要获取
Subscription即可。注意,这里跟EventBus是有区别的,EventBus的封装,是通过反射,获取所有接收事件的方法,然后注册,当然,现在的EventBus版本里这些反射几乎对性能没有任何影响了。现在我们用RxJava是不是也要用反射再去获取所有的Subscription呢?当然不是,EventBus 的机制其实类似于广播,在接收事件的地方是没有方法调用的,因此需要反射。但是RxBus则提供了调用接收事件的方法,因此只需要在Activity或Fragment里 new 出来CompositeSubscription对象,然后在需要接收事件的地方,用CompositeSubscription对象去 add 进对应的Subscription就可以了(这一点在下面发送接收事件一节还会提到)。对应的取消注册的过程就简单多了,在生命周期结束的地方,对
CompositeSubscription取消注册即可,以避免内存泄露,而CompositeSubscription的取消注册方法是可以自动取消HashSet里的所有Subscription的,因此无须对每个Subscription单独处理。发送和接收事件。
EventBus发送事件,就是 post 方法,在EventBus里有一个内部类PostingThreadState, 通过postingState.eventQueue可以获取一个 List 集合,只要eventQueue不为空,就不断地从eventQueue里取出事件(当然,伴随有是否为主线程,是否正在发送等状态的判断),然后调用postSingleEvent方法,最后调postToSubscription把事件发出去,post 一个,就从eventQueue里 remove 一个,最终又来到了我们从刚接触 Android 就让人很头痛的Handler,这是一个叫HandlerPoster的类。说一千,道一万,对应的RxJava处理事件就是调方法onNext。这样代码就有了。public void post(Object object) { subject.onNext(object); }EventBus接收事件需要通过onEvent开头的方法来遍历获取,第一次遍历会缓存,仅查找onEvent开头的方法,同时忽略一些特定 SDK 的方法,可以提高一些效率。在使用RxJava接收事件的时候,根据传递的事件类型(eventType)可以获取对应类型的Observable<EventType>,那么问题就来了,在这里我们是不是要提供一个返回对应的Subscription的方法呢?其实是可以的!但是需要指定Scheduler,因为我们知道,接收事件处理事件是有可能在不同的线程里的,如果在这里我们就提供一个返回Subscription的方法,那后续的事件处理是在哪个线程呢?因此在这里就指定了 UI 线程或者异步线程,后面的具体的事件处理就可能会有问题。当然,我们也只可以在需要接收事件的地方,调用toObservable方法,然后指定线程。这也是相对于Otto的一个优势。在
RxBus里:public <T> Observable<T> toObservable(final Class<T> type) { return subject.ofType(type); }public <T> Subscription toSubscription(Class<T> type, Action1<T> action1, Scheduler scheduler) { return RxBus.getInstance() .toObservable(type) .observeOn(scheduler) .subscribe(action1); }在
Activity或Fragment里再去获取Subscription。private <T> Subscription toSubscription(Class<T> type, Action1<T> action1) { return RxBus.getInstance() .toObservable(type) .observeOn(AndroidSchedulers.mainThread()) .subscribe(action1); }最后,将所有的
Subscriptionadd 进CompositeSubscription就好了。最后,一定不要忘记对CompositeSubscription取消注册。到这里,有关
EventBus的内容,基本是完了,不过还有一点,EventBus里是有一个StickyEvent的,什么意思呢,就是说一般流程是,我们先去订阅事件,然后被观察者再去发布事件,观察者去接收事件,但是如果是先发布了事件,再去订阅事件呢?这时候先于订阅事件之前发布的事件就会被丢弃,这时候StickyEvent就登场了。即便是先于订阅事件之前发布了事件,它已然可以接收一个最近被发布的事件,可以理解为它缓存了一个最近发布的事件,而与订阅状态无关。当然,只是一个!明确了这些,要用RxJava实现就非常简单了,在RxJava里有一个BehaviorSubject完美实现了这个功能,具体实现跟PublishSubject一模一样。这时候就会有人问了,如果我们想要接收到所有的(而不是一个)在订阅事件之前发布的事件,该怎么办呢?很遗憾,EventBus是无法办到的,但是RxJava可以!将BehaviorSubject换成RelaySubject即可。不过说句题外话,这样的功能好鸡肋啊,感觉适用场景少之又少。我想这也是EventBus没有去实现这样的功能的原因吧。
具体使用可以参照AndroidFluxPractice,Sample 里将 EventBus 替换为了 RxBus ,完美地实现了一模一样的效果。