一、前言
从 EventBus 的介绍中,EventBus 给的定位是:
Event bus for Android and Java that simplifies communication between Activities, Fragments, Threads, Services, etc. Less code, better quality
简单理解一下就是 Event bus 给 Android 及 Java (当然主要是 Android)的 Activity,Fragment,Threads,Services 之间提供一个简单的通信方式,从而能让我们用质量高且少的代码来完成我们的功能。
二、EventBus 详解之官文解读
2.1 架构概述
EventBus 是一个基于发布/订阅的设计模式的事件总线,其架构图如下。
从架构图上来看,还是很简单的,跟我们所最熟悉的观察者模式是类似的。大概工作过程就是:
(1) Subscriber 也就是订阅者,订阅一个 Event,其中 Event 是自己定义的,符合 POJO 的规范即可。
(2) 在需要时,Publisher 也就是发布者,就可将事件通过 EventBus 分发布相应的订阅者 Subscriber。
恩,就是如此简单。当然,其实现远不会这么简单了,还是干了很多脏活和累活的。
2.2 特性概述
- 简化了组件之间的通信
- 事件发送者和接收者分离
- 在Activity,Fragment和后台线程中都可以很好的进行事件的分发
- 避免复杂且容易出错的依赖关系和生命周期问题
- 简化代码
- 快,多快呢?
- 小(约50k)
- 已经通过100,000,000+安装的应用程序在实践中得到证实
- 具有指定分发线程,优先级等高级功能。
2.3 功能概述
- 事件订阅 / 分发
这是其最核心的功能,也是要在源码分析中需要深入的。 - 指定分发线程 Delivery Threads (ThreadMode)
即指定订阅者将在哪个线程中响应事件的处理。恩,这主要用于如网络或者耗时的线程处理,需要切换到主线程来更新 UI 的场景。当然,如果用 RxJava 的话,这个就再熟悉不过了。 - 配置 Configuration
通过 EventBusBuilder 类对 EventBus 进行各方面的配置,如配置可允许事件的分布没有订阅者处理,而不报异常。 - 粘性事件 Sticky Events
将用户发送过的事件暂时保存在内存中,当订阅者一旦订阅了该事件,就可以立即收到该事件。恩,这个就与粘性广播的概念是一致的了。 - 优先级以及取消事件
主要是指定 Subscriber 的优先级,其默认优先级为 0,指定了较高的优先级就可以优先收到事件进行处理。而收到事件的 Subcriber 中提前取消事件的继续分发,从而中断事件的继续分发。恩,这个也跟广播里的 abortBroadcast 概念一样。 - Subscriber Index
用中文不知道咋说。这个是 3.0 的版本加的,主要是将事件处理中的对事件的反射处理,提前到编译时来处理了。从而提高性能。当然,主要也是推荐在 Android 下使用。 - 异步执行器 AsyncExecutor
主要就是一个线程池,可以理解就是一个帮助类。
三、源码解析
3.1 开始向导
EventBus 的官方文档为我们提供了经典的使用 EventBus 的 3 个过程。
3.1.1定义事件
普通的 POJO 即可
public class MessageEvent {
public final String message;
public MessageEvent(String message) {
this.message = message;
}
}
3.1.2 准备订阅者
文章是基于 EventBus 3 进行分析,对于订阅者的方法命名可以自由命名了,而不需要采用约定命名。
@Subscribe(threadMode = ThreadMode.MAIN)
public void onMessageEvent(MessageEvent event) {
Toast.makeText(getActivity(), event.message, Toast.LENGTH_SHORT).show();
}
// This method will be called when a SomeOtherEvent is posted
@Subscribe
public void handleSomethingElse(SomeOtherEvent event) {
doSomethingWith(event);
}
准备好了订阅者之后,我们还需要向总线注册这个订阅者,这个我们的订阅者方法才能接收到相应的事件。
@Override
public void onStart() {
super.onStart();
EventBus.getDefault().register(this);
}
@Override
public void onStop() {
EventBus.getDefault().unregister(this);
super.onStop();
}
3.1.3 发送事件
我们可以在任何地方发送事件。
EventBus.getDefault().post(new MessageEvent("Hello everyone!"));
3.2 源码解析
根据前文的 3 步经典使用方法,源码的分析大概也分成如下 4 步来进行。
3.2.1 注册订阅者
如上面的时序图,事件的订阅大概可以分成 6 个步骤,下面来一一分析。
方法getDefault()
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
getDefault() 方法就是一个“懒汉”的单例模式,目的就是让我们能在全局对 EventBus 的引用进行使用。值得关注的是,这个懒汉的单例模式实现并不会有多线程的安全问题。因为对于 defaultInstance 的定义是 volatile 的。
static volatile EventBus defaultInstance;
接下来继续看看构造方法。
构造方法EventBus()
public EventBus() {
this(DEFAULT_BUILDER);
}
EventBus(EventBusBuilder builder) {
logger = builder.getLogger();
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}
构造方法使用了常见的 Builder 模式对 EventBus 中的各个属性进行了初始化。这里使用的是默认配置。一般情况下,我们也都是使用 getDefault() 及其默认配置来获取实例的。我们也可以通过配置 EventBusBuilder 来 build 出一个自己的实例,但是要注意的是,后面的注册、注销以及发送事件都要基于此实例来进行,否则就会发生事件错乱发错的问题。
方法register()
如果不看方法的实现,根据经验判断,我想当 register 的发生,应该是将订阅者中用于接收事件的方法与该事件关联起来。那是不是这样呢?
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
// 1.通过订阅者类找到其订阅方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
// 2. 然后进行逐个订阅
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
首行来看看找订阅方法。这里封装了一个专门的类 SubscriberMethodFinder,并且通过其方法 findSubscriberMethods() 来进行查找,这里就不贴 findSubscriberMethods() 的代码,而是看看其内部实际用于寻找订阅方法的 findUsingReflection()。
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
......
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
......
}
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
......
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
......
}
}
}
总结一下,该方法主要就是寻找订阅者类中的公有方法,且其参数唯一的以及含有注解声明@Subscribe 的方法。就是我们在写代码时所定义的订阅者方法了。找到订阅者方法后,会将其 method(方法的反射类 Method) 、event、thread mode 以及优先级等封装成一个 SubscribeMethod 然后添加到 FindState 中。
这个 FindState 是 SubscriberMethodFinder 的一个内部类。其用了大小只有 4 的 FIND_STATE_POOL 来进行管理,这样避免了 FindState 对象的重复创建。
最后在 find 中会将所找到订阅者方法添加到 “METHOD_CACHE” 中,这是一个 ConcurrentHashMap 的结构。
Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
方法subscribe()
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
......
}
}
// 按优先级进行插入
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
// 粘性事件,注册就会发送
if (subscriberMethod.sticky) {
.....
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
subscribe() 方法完成事件与 subscribe 、 subscribeMethod 的关联,具体怎么关联的呢,请看图。
这里分别用了 2 个 HashMap 来描述。subscriptionsByEventType 记录了一个事件应该调用哪个订阅者的订阅方法来处理。而 typesBySubscriber 记录了订阅者订阅了哪些事件。
至此,订阅者的注册完成之后,也就相当于是 EventBus 的初始化完成了,那么接下来就可以愉快发送事件了。
3.2.2 事件发送
事件的发送看起来好像也比较简单。
public void post(Object event) {
// 获取发送者线程的状态管理器
PostingThreadState postingState = currentPostingThreadState.get();
// 从状态管理器中取出事件队列,并将事件添加到队列中
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
// 如果当前不是发送状态,那就进入发送状态
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
......
}
try {
// 在发送状态下,通过遍历事件队列逐个发送事件
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
post() 方法的主要过程都写在代码注释里了。这里主要关注下 currentPostingThreadState,它的定义如下:
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
而 PostingThreadState 的定义如下。
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}
这里用了 ThreadLocal 来保存 PostingThreadState。ThreadLocal 的主要作用是使得所定义的资源是线程私有的。那么,也就是说,对于每一个发送事件的线程其都有一个唯一的 PostingThreadState 来记录事件发送的队列以及状态。
下图是 ThreadLocal 在 Thread 中的数据结构描述,而这里的 PostingThreadState 就是下图中的 Value。
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
......
postSingleEvent 所做的事情主要就是根据要发送的事情收集事件,然后逐个发送。收集的什么事情呢,很简单,就是收集它的父类事件。也就是说,当我们发送一个事件时,它的所有父类事件也同时会被发送,这里的父类还包括了 Object 在内。所以,这里需要开发者们非常注意了:
事件是具有向上传递性质的
接下来的 postSingleEventForEventType 就是从 subscriptionsByEventType 找出事件所订阅的订阅者以及订阅者方法,即 CopyOnWriteArrayList。
然后再通过方法 postToSubscription() 将事件逐个逐个向订阅者进行发送。在 postToSubscription() 方法中会根据不同的 ThreadMode 来决定不同的线程调度策略,具体在下面讲。而不管采用何种策略,最终都将会通过invokeSubscriber() 进行方法的反射调用来进行订阅方法的调用。
void invokeSubscriber(Subscription subscription, Object event) {
......
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
......
}
至此,事件的发送就完成了。很简单,其真实的面目就是一个方法的反射调用。而对于事件的发送,这里还应该关注一下线程的调度策略。
3.2.3 线程调度策略。
上面说到线程的调度策略在 postToSubscription() 方法中,那么就来看一看这个方法。
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
针对每个不同的策略,下面来简单看一看。
POSTING
发送者线程直接发送,也是 ThreadMode 的默认值。
MAIN
如果发送者是主线程,则直接发送。如果不是则交给 mainThreadPoster 来发送。mainThreadPoster 是 HandlerPoster 类型,该类继承了 Handler 并实现了 Poster 接口,其实例是由 AndroidHandlerMainThreadSupport 所创建的。AndroidHandlerMainThreadSupport 包含了主线程的 Looper ,而 HandlerPoster 包含了一个链表结构的队列 PendingPostQueue,事件也将插入到 PendingPostQueue 中。每一个事件都会以一个空 Message 发送到主线程的消息队列,消息处理完再取出下一个事件,同样以 Message 的形式来执行,如此反复执行。直到队列为空。
MAIN_ORDERED
一般来说也是通过 mainThreadPoster 来发送,异常情况下才通过发送者线程来发送。从代码侧来看,它主要就是属于 MAIN 策略的第二种情况。
BACKGROUND
如果当前为主线程,则将其投递到 backgroundPoster,否则直接发送。这里的 backgroundPoster 就是一个子线程。
ASYNC
异步模式下,最终就是通过 ExecutorService ,也即线程池来发送的。asyncPoster 即 AsyncPoster,其关于线程调度的关键代码为:
eventBus.getExecutorService().execute(this);
3.2.4 注销订阅者
注销就比较简单了,如下代码,主要就是从 typesBySubscriber 这个 map 中将订阅者移除掉,并且解决订阅者和事件的关联。
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
四、总结
从整个分析过程来看,EventBus 是一个比较简单的框架,其涉及到的核心知识是 ThreadLocal 以及方法的反射调用,而基于此为我们封装了一套完整的事件 分发-传递-响应 机制,当然也是一个非常优秀的框架。总结一下,其主要的特点:
- 事件的定义是由用户自己来定义的,只要是 POJO 就可以。且需要注意的是,事件具有向上传递的特性,即发送一个事件会连同它的父类事件一起发送。
- 关于 ThreadMode,默认则发送者线程发送,并且是同步的。对于主线程则通过主线程的 MainLooper 来实现,异步事件则由线程池来实现。
- EventBus 的实例可以通过 getDefault() 的单例来获取,也可以通过 EventBusBuilder 的 builder() 方法来获取,但要注意它们不是同一个 EventBus 的实例,也不能互相发送事件。