手记

RxAndroid响应式开发(三)

RxAndroid响应式开发(一)

RxAndroid响应式开发(二)

RxAndroid响应式开发(三)


  这篇从源码和原理上来分析一下RxJava的使用和实现,主要包括Observable(被观察者),Observer/Subscriber(观察者),还有订阅关系subscribe


Observer/Subscriber(观察者)的联系和区别

Observer 是一个接口,它的作用就是对事件作出瞬间响应(担当的是警察的角色)。


public interface Observer<T> {    /**     * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.     * <p>     * The {@link Observable} will not call this method if it calls {@link #onError}.     */    void onCompleted();    /**     * @param e     *          the exception encountered by the Observable     */    void onError(Throwable e);    /**     * @param t     *          the item emitted by the Observable     */    void onNext(T t);}



Observer 有三个方法:

onNext(T t):一个参数方法,所有的事件都在此方法中执行,Rxjava中需要对每个事件单独执行,并且以队列的形式依次执行。

onCompleted():无参方法,表示事件正常结束,当没有onNext()方法发出的时候,需要触发onCompleted()方法标志事件正常完成。

onError(Throwable e):一个参数方法,事件执行遇到异常,同时剩余的onNext不再执行。

注意:onCompleted 和 onError 两个方法只能有一个执行,并且只能在事件序列的最后一个,要么是事件顺利完成触发onCompleted要么是出现异常触发onError 。


Subscriber 是实现了Observer接口的抽象类,做了一些对事件处理的扩展,但是和Observer的基本使用方法还是一样的。


public abstract class Subscriber<T> implements Observer<T>, Subscription {    // represents requested not set yet    private static final long NOT_SET = Long.MIN_VALUE;    private final SubscriptionList subscriptions;    private final Subscriber<?> subscriber;    /* protected by `this` */    private Producer producer;    /* protected by `this` */    private long requested = NOT_SET; // default to not set    protected Subscriber() {        this(null, false);    }    /**     * @param subscriber     *            the other Subscriber     */    protected Subscriber(Subscriber<?> subscriber) {        this(subscriber, true);    }    /**     * @param subscriber     *            the other Subscriber     * @param shareSubscriptions     *            {@code true} to share the subscription list in {@code subscriber} with     *            this instance     * @since 1.0.6     */    protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {        this.subscriber = subscriber;        this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();    }    /**     * Adds a {@link Subscription} to this Subscriber's list of subscriptions if this list is not marked as     * unsubscribed. If the list <em>is</em> marked as unsubscribed, {@code add} will indicate this by     * explicitly unsubscribing the new {@code Subscription} as well.     *     * @param s     *            the {@code Subscription} to add     */    public final void add(Subscription s) {        subscriptions.add(s);    }    @Override
    public final void unsubscribe() {
        subscriptions.unsubscribe();
    }

    /**     * Indicates whether this Subscriber has unsubscribed from its list of subscriptions.     *     * @return {@code true} if this Subscriber has unsubscribed from its subscriptions, {@code false} otherwise     */    @Override
    public final boolean isUnsubscribed() {
        return subscriptions.isUnsubscribed();
    }

    /**     * This method is invoked when the Subscriber and Observable have been connected but the Observable has     * not yet begun to emit items or send notifications to the Subscriber. Override this method to add any     * useful initialization to your subscription, for instance to initiate backpressure.     */    public void onStart() {
        // do nothing by default
    } }



  在Rxjava中,如果使用Observer作为观察者,最后也会转化成Subscriber进行使用,本质上他们一样,但是在使用上他们还是有些区别的。

onStart():这是Subscriber增加的一个空方法,在订阅方法subscribe()中执行,此时事件还未发送,开发者可以做一些准备工作像数据初始化啊,数据、数组清零等,onStart所在的线程与subscribe()订阅方法在同一个线程,在这处理一些UI操作还是要谨慎,根据指定subscribe()方法所在线程而定,指定线程后面说。

isUnsubscribed():判断当前订阅状态。

unsubscribe():用于取消订阅,这是实现的另一个接口Subscription的方法,这个方法执行后,Subscriber就不会再接收事件了。

在观察者和被观察者订阅之后,被观察者Observable会持有Subscriber的引用,要在适当的地方取消订阅关系,避免内存泄露。


Observable(被观察者)

  Observable决定了事件的触发时间,一般是在与观察者实现订阅的时候进行事件的触发。RxJava中最基本的创建一个Observable对象是通过create()方法,还有一堆其他的操作符可以实现创建Observable,后面介绍。


//创建被观察者Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {    @Override    public void call(Subscriber<? super String> subscriber) {        //订阅关系        if (!subscriber.isUnsubscribed()) {
            subscriber.onNext("hello");            subscriber.onNext("World");            subscriber.onNext("Hello World");            subscriber.onCompleted();        }
    }
});



/** * Invoked when Observable.subscribe is called. * @param <T> the output value type */public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {    // cover for generics insanity}



  call的回调顺序是三个onNext(),最后是onCompleted()。OnSubscribe作为参数传递给create(),OnSubscribe是一个继承了Action1的接口,而回调的call方法就是Action1的,当Observable被订阅的时候,激活回调call(),然后事件就会按顺序依次执行。这样,被观察者调用观察者的回调方法,然后把用户事件传递给观察者,就实现了观察者模式。

  这里提到的Action1是什么东东,其实还有Action0,Action2,Action3这些,后面详细介绍。


Subscribe(订阅)

  实现订阅的方法就是subscribe(),这样就实现了Observable和Subscriber的订阅关系,整个事件流就可以执行下去。


observable.subscribe(observer);
//关联被观察者observable.subscribe(subscriber);




  订阅方法的API源码分析如下,重点看红色标注字体。此方法在后面有提到,暂且称它为总方法。


static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { // validate and proceed    if (subscriber == null) {        throw new IllegalArgumentException("subscriber can not be null");    }    if (observable.onSubscribe == null) {        throw new IllegalStateException("onSubscribe function can not be null.");        /*         * the subscribe function can also be overridden but generally that's not the appropriate approach         * so I won't mention that in the exception         */    }    // new Subscriber so onStart it    subscriber.onStart();    /*     * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls     * to user code from within an Observer"     */    // if not already wrapped    if (!(subscriber instanceof SafeSubscriber)) {        // assign to `observer` so we return the protected version        subscriber = new SafeSubscriber<T>(subscriber);    }    // The code below is exactly the same an unsafeSubscribe but not used because it would    // add a significant depth to already huge call stacks.    try {        // allow the hook to intercept and/or decorate        RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
        return RxJavaHooks.onObservableReturn(subscriber);    } catch (Throwable e) {        // special handling for certain Throwable/Error/Exception types        Exceptions.throwIfFatal(e);        // in case the subscriber can't listen to exceptions anymore        if (subscriber.isUnsubscribed()) {
            RxJavaHooks.onError(RxJavaHooks.onObservableError(e));        } else {            // if an unhandled error occurs executing the onSubscribe we will propagate it            try {
                subscriber.onError(RxJavaHooks.onObservableError(e));            } catch (Throwable e2) {
                Exceptions.throwIfFatal(e2);                // if this happens it means the onError itself failed (perhaps an invalid function implementation)                // so we are unable to propagate the error correctly and will just throw                RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);                // TODO could the hook be the cause of the error in the on error handling.                RxJavaHooks.onObservableError(r);                // TODO why aren't we throwing the hook's return value.                throw r; // NOPMD            }
        }        return Subscriptions.unsubscribed();    }
}



大概流程就是先执行subscriber.onStart(),然后执行回调onSubscribe.call(subscriber),最后把Subscriber返回;

主要就是这三件事:

1、onStart()方法是一个空方法,在回调call之前调用;

2、回调方法call方法在订阅方法中被回调执行,所以我们看到,只有当订阅事件subscribe()方法被执行了,才会有事件的执行。

3、把观察者Subscriber返回,方便后续判断unsubscribe();


整个过程可以使用下图来表示:onNext依次执行,最后是onCompleted.

关于订阅方法subscribe()有多个重载方法,源码分别如下:


//没有参数public final Subscription subscribe() {
    Action1<T> onNext = Actions.empty();    Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;    Action0 onCompleted = Actions.empty();    return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));}

//一个参数
public final Subscription subscribe(final Action1<? super T> onNext) {    if (onNext == null) {        throw new IllegalArgumentException("onNext can not be null");    }

    Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;    Action0 onCompleted = Actions.empty();    return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));}

//两个参数
public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError) {    if (onNext == null) {        throw new IllegalArgumentException("onNext can not be null");    }    if (onError == null) {        throw new IllegalArgumentException("onError can not be null");    }

    Action0 onCompleted = Actions.empty();    return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));}

//三个参数
public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onCompleted) {    if (onNext == null) {        throw new IllegalArgumentException("onNext can not be null");    }    if (onError == null) {        throw new IllegalArgumentException("onError can not be null");    }    if (onCompleted == null) {        throw new IllegalArgumentException("onComplete can not be null");    }    return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));}


//传入Observer观察者

public final Subscription subscribe(final Observer<? super T> observer) {    if (observer instanceof Subscriber) {        return subscribe((Subscriber<? super T>)observer);    }    if (observer == null) {        throw new NullPointerException("observer is null");    }    return subscribe(new ObserverSubscriber<T>(observer));}


//传入Subscriber观察者

public final Subscription subscribe(Subscriber<? super T> subscriber) {    return Observable.subscribe(subscriber, this);}





订阅方法有多个,其实总归来说,最后调用的都是上面提到的总方法。这是Rxjava的订阅subscribe()实现的不完整回调,意思就是不一定非要与“观察者”实现订阅(加引号的原因就是本质上其实就是与Subscriber进行了订阅关系)。可以是一个onNext方法也可以是一个onError或者onCompleted方法,但是最后都会被包装成一个观察者对象。


使用实例如下:


Action1<String> onNextAction = new Action1<String>() {    @Override    public void call(String s) {
        Log.i(TAG, "call: ");    }
};Action1<Throwable> onErrorAction = new Action1<Throwable>() {    @Override    public void call(Throwable shrowable) {
        Log.i(TAG, "call: ");    }
};Action0 onCompletedAction = new Action0() {    @Override    public void call() {
        Log.i(TAG, "call: ");    }
};//自动创建Subscriber,使用onNextAction定义onNext();observable.subscribe(onNextAction);// 自动创建 Subscriber ,使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()observable.subscribe(onNextAction, onErrorAction);// 自动创建 Subscriber ,使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()observable.subscribe(onNextAction, onErrorAction, onCompletedAction);



  Action家族又出现了,下面来详细介绍一下他们这一家子。


一系列的Action

  这些Action都是一些接口,分别有Action0、Action1、Action2、Action3,没有发现Action4,他们都是Action的子类,,0,1,2,3是根据参数个数来定的,贴出源码如下。


/** * All Action interfaces extend from this. * <p> * Marker interface to allow instanceof checks. */public interface Action extends Function {

}



/** * A zero-argument action. */public interface Action0 extends Action {    void call();}
/** * A one-argument action. * @param <T> the first argument type */public interface Action1<T> extends Action {    void call(T t);}
/** * A two-argument action. * @param <T1> the first argument type * @param <T2> the second argument type */public interface Action2<T1, T2> extends Action {    void call(T1 t1, T2 t2);}
/** * A three-argument action. * @param <T1> the first argument type * @param <T2> the second argument type * @param <T3> the third argument type */public interface Action3<T1, T2, T3> extends Action {    void call(T1 t1, T2 t2, T3 t3);}



onCompleted()是无参方法,可以用Action0来包装对象使用;onError()和onNext()都有一个参数,可以使用Action1来包装使用,不管传递了几个参数,最后在subscribe()订阅方法中都被重新创建Subscriber观察者来进行订阅关系。


  在RxJava中还有一个大家族Func,有Func0~Func9,暂时还不知道他们作用,留作以后了解吧。

  同时,在RxJava中还有一大堆方便的操作符进行事件的更改,简直太强大了,之后研究介绍吧。

原文链接:http://www.apkbus.com/blog-865196-77131.html

0人推荐
随时随地看视频
慕课网APP