写这么一篇文章主要是为了解惑,我们都知道Retrofit
可以配合RxJava
一起使用,而且那种链式的调用简直了,但是一直有个疑惑:
getObservable().subscribe(new Observer<String>() { @Override public void onNext(String value) { // ... 得到数据 } })
看上面那段伪代码之后我们都知道Observable
是需要subscribe
才会真正执行的,那么Retrofit
是怎么实现这个流程的呢?不然老是能得到数据却不懂的怎么来的,所以为了解读这一脸的懵逼只能从源码中去寻找答案。
简单使用
val mRetrofit: Retrofit = Retrofit.Builder() .baseUrl(HttpUrl.parse(Constant.URL)) .client(okHttpBuilder.build()) .addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io())) .addConverterFactory(GsonConverterFactory.create(GsonBuilder().create())) .build()
嗯嗯,只要加了RxJava2CallAdapterFactory.createWithScheduler
之后就能愉快的结合RxJava
一起使用了:
GankHttpRequest.instance.mApiService.getAndroidData() .compose(Constant.transformer()) .subscribe(object : Consumer<MutableList<AndroidResult>> { override fun accept(t: MutableList<AndroidResult>?) { callback?.onHttpSuccess(t) } }, object : Consumer<Throwable> { override fun accept(t: Throwable?) { } })
更多例子源码查看:https://github.com/Neacy/GankKotlin
总是很佩服Square
开源的项目,因为解决了很多难题,以上就是Retrofit
和RxJava
的简单效果。
开始分析
我们直接奔主题进入口开始分析mRetrofit.create(ApiService::class.java)
也就是Retrofit
中的create
方法:
public <T> T create(final Class<T> service) { return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service }, new InvocationHandler() { private final Platform platform = Platform.get(); @Override public Object invoke(Object proxy, Method method, @Nullable Object[] args) throws Throwable { ServiceMethod<Object, Object> serviceMethod = (ServiceMethod<Object, Object>) loadServiceMethod(method); OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args); return serviceMethod.callAdapter.adapt(okHttpCall); } }); }
这里重点分析loadServiceMethod
方法,点进源码可以看到主要执行new ServiceMethod.Builder<>(this, method).build()
经过一系列折腾最后回到Retrofit
中的nextCallAdapter
方法:
public CallAdapter<?, ?> nextCallAdapter(@Nullable CallAdapter.Factory skipPast, Type returnType, Annotation[] annotations) { checkNotNull(returnType, "returnType == null"); checkNotNull(annotations, "annotations == null"); int start = adapterFactories.indexOf(skipPast) + 1; for (int i = start, count = adapterFactories.size(); i < count; i++) { CallAdapter<?, ?> adapter = adapterFactories.get(i).get(returnType, annotations, this); if (adapter != null) { return adapter; } } }
这里主要是调用adapterFactories.get(returnType, annotations, this)
这里的adapterFactories
就是我们初始化传进来的RxJava2CallAdapterFactory
类所以很自然get
方法执行之后返回的是RxJava2CallAdapter
,很好;终于看到跟主题相关的Rx开头的类了。
执行完ServiceMethod
的初始化后代码继续走:
OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args);return serviceMethod.callAdapter.adapt(okHttpCall);
首先我们明白一点serviceMethod.callAdapter
也就是我们前面返回的RxJava2CallAdapter
对象,所以自然进入该类中的adapt
方法:
@Override public Object adapt(Call<R> call) { Observable<Response<R>> responseObservable = isAsync ? new CallEnqueueObservable<>(call) : new CallExecuteObservable<>(call); Observable<?> observable; if (isResult) { observable = new ResultObservable<>(responseObservable); } else if (isBody) { observable = new BodyObservable<>(responseObservable); } else { observable = responseObservable; } if (scheduler != null) { observable = observable.subscribeOn(scheduler); } if (isFlowable) { return observable.toFlowable(BackpressureStrategy.LATEST); } if (isSingle) { return observable.singleOrError(); } if (isMaybe) { return observable.singleElement(); } if (isCompletable) { return observable.ignoreElements(); } return observable; }
Observable
好刺眼的词,这不就是RxJava
的类嘛,有点慌仿佛就要结束了。
这里简单点我们只挑CallExecuteObservable
来分析,这个类的代码不长直接贴上来看看:
final class CallExecuteObservable<T> extends Observable<Response<T>> { private final Call<T> originalCall; CallExecuteObservable(Call<T> originalCall) { this.originalCall = originalCall; } @Override protected void subscribeActual(Observer<? super Response<T>> observer) { // Since Call is a one-shot type, clone it for each new observer. Call<T> call = originalCall.clone(); observer.onSubscribe(new CallDisposable(call)); boolean terminated = false; try { Response<T> response = call.execute(); if (!call.isCanceled()) { observer.onNext(response); } if (!call.isCanceled()) { terminated = true; observer.onComplete(); } } catch (Throwable t) { Exceptions.throwIfFatal(t); if (terminated) { RxJavaPlugins.onError(t); } else if (!call.isCanceled()) { try { observer.onError(t); } catch (Throwable inner) { Exceptions.throwIfFatal(inner); RxJavaPlugins.onError(new CompositeException(t, inner)); } } } } private static final class CallDisposable implements Disposable { private final Call<?> call; CallDisposable(Call<?> call) { this.call = call; } @Override public void dispose() { call.cancel(); } @Override public boolean isDisposed() { return call.isCanceled(); } } }
那什么时候开始执行呢?
这时候我们需要回头看一下Retrofit.create
这里用到了动态代理
所以再invoke
中serviceMethod.callAdapter.adapt(okHttpCall)
就是把RxJava2CallAdapter
中的Observable
返回回去,所以:
当我们代码中调用subscribe
的时候会执行Observable.subscribeActual
,回头看看这方法中做了什么:
Response<T> response = call.execute();// 使用OkHttp执行接口请求if (!call.isCanceled()) { observer.onNext(response); }if (!call.isCanceled()) { terminated = true; observer.onComplete(); }
很轻松地我们界面就得到了Retrofit
中的Observable
发射出来的数据了然后我们就可以做任何处理了。
我们再回头看看一下RxJava2CallAdapter.adapt
方法:
Observable<?> observable;if (isResult) { observable = new ResultObservable<>(responseObservable); } else if (isBody) { observable = new BodyObservable<>(responseObservable); } else { observable = responseObservable; }if (scheduler != null) { observable = observable.subscribeOn(scheduler); }
看到observable被各种条件进行赋值,不过我们知道了CallExecuteObservable
这个是怎么发射数据了现在再回头看已经很清晰了。
综上:
整个流程其实就是RxJava2CallAdapterFactory
-> RxJava2CallAdapter
-> xxxxxxObservable
-> onNext
作者:左手木亽
链接:https://www.jianshu.com/p/4db6869a4bba