RxJava(RxKotlin)、RxAndroid
ps:文章中涉及到的代码均使用 Kotlin 实现,即需要导入 RxKotlin,同时也涉及到了 RxAndroid 相关内容
导入方法:
在项目的 build.gradle 文件中添加 RxKotlin 的版本信息
buildscript { ext.rx_kotlin_version = '1.0.0' ext.rx_android_version = '1.2.1'}
在 module 的 build.gradle 文件中添加 RxKotlin 以及 RxAndroid 的依赖
dependencies { // RxKotlin RxAndroid implementation "io.reactivex:rxkotlin:$rx_kotlin_version" implementation "io.reactivex:rxandroid:$rx_android_version"}
1. 一些常用的网站
2. 观察者模式的四大要素
Observable
被观察者Observer
观察者subscribe
订阅事件
3. 操作符
3.1 Creating 操作符
create justfromrange repeat interval defer empty / never timer start
create
操作符,直接创建一个 Subscriber 对象
Observable.create<String> { it.onNext("Hello Rx!") it.onCompleted() }.subscribe(object : Subscriber<String>() { override fun onNext(t: String) { println("onNext() --> $t") } override fun onCompleted() { println("onCompleted()") } override fun onError(e: Throwable?) { println("onError()") } })
onNext() --> Hello Rx! onCompleted()
just
操作符将一系列对象逐个发射出去,注意集合对象将作为一个整体进行发射
Observable.just(1, 1.0, "String", true) .subscribe(object : Subscriber<Any>() { override fun onNext(t: Any) { println("onNext() --> $t") } override fun onCompleted() { println("onCompleted()") } override fun onError(e: Throwable?) { println("onError()") } }) Observable.just(listOf(1, 2, 3, 4, 5)) .subscribe(object : Subscriber<List<Int>>() { override fun onNext(t: List<Int>) { t.forEach { println("onNext() --> $it") } } override fun onCompleted() { println("onCompleted()") } override fun onError(e: Throwable?) { println("onError()") } })
onNext() --> 1onNext() --> 1.0onNext() --> StringonNext() --> trueonCompleted() onNext() --> 1onNext() --> 2onNext() --> 3onNext() --> 4onNext() --> 5onCompleted()
from
操作符可以将集合中的元素逐个发射出去
Observable.from(listOf(5, 4, 3, 2, 1, 0)) .subscribe(object : Subscriber<Int>() { override fun onNext(t: Int) { println("onNext() --> $t") } override fun onCompleted() { println("onCompleted()") } override fun onError(e: Throwable?) { println("onError()") } })
onNext() --> 5 onNext() --> 4 onNext() --> 3 onNext() --> 2 onNext() --> 1 onNext() --> 0 onCompleted()
range
在一定范围内向观察者发射整型数据,repeat
重复发射,默认重复无数次
Observable.range(1, 3) .repeat(2) .subscribe(object : Subscriber<Int>() { override fun onNext(t: Int) { println("onNext() --> $t") } override fun onCompleted() { println("onCompleted()") } override fun onError(e: Throwable?) { println("onError()") } })
onNext() --> 1 onNext() --> 2 onNext() --> 3 onNext() --> 1 onNext() --> 2 onNext() --> 3 onCompleted()
interval
定时向观察者发送一个 Long 类型的数字(逐个叠加)
Observable.interval(2, 2, TimeUnit.SECONDS) .subscribe(object : Subscriber<Long>() { override fun onNext(t: Long) { println("onNext() --> $t") } override fun onCompleted() { println("onCompleted()") } override fun onError(e: Throwable?) { println("onError()") } })
onNext() --> 0 onNext() --> 1 onNext() --> 2 onNext() --> 3 ...
defer
延迟创建 Observable 对象,只有在调用 subscribe() 方法时,才会创建 Observable 对象
var arg = "初始值"val observable = Observable.defer { Observable.just(arg) } arg = "再次赋值"observable.subscribe(object : Subscriber<String>() { override fun onNext(t: String) { println("onNext() --> $t") } override fun onCompleted() { println("onCompleted()") } override fun onError(e: Throwable?) { println("onError()") } })
onNext() --> 再次赋值 onCompleted()
3.2 Transforming 操作符
mapflatMap groupBy buffer scan window
map
Observable.just(123, 234).map { "¥ $it" }.subscribe(object : Subscriber<String>() { override fun onNext(t: String) { println("onNext() --> $t") } override fun onCompleted() { println("onCompleted()") } override fun onError(e: Throwable?) { println("onError()") } })
onNext() --> ¥ 123 onNext() --> ¥ 234 onCompleted()
flatMap
Observable.just(123, 234, 345) .flatMap { Observable.just("$ $it") }.subscribe(object : Subscriber<String>() { override fun onNext(t: String) { println("onNext() --> $t") } override fun onCompleted() { println("onCompleted()") } override fun onError(e: Throwable?) { println("onError()") } })
onNext() --> $ 123onNext() --> $ 234onNext() --> $ 345onCompleted()
groupBy
Observable.just(1, 2, 3, 4, 5, 6) .groupBy { it % 2 } .subscribe(object : Observer<GroupedObservable<Int, Int>> { override fun onError(e: Throwable?) { } override fun onNext(t: GroupedObservable<Int, Int>) { t.subscribe(object : Subscriber<Int>() { override fun onNext(r: Int) { println("group -> ${t.key}, value -> $r") } override fun onCompleted() { } override fun onError(e: Throwable?) { } }) } override fun onCompleted() { } })
group -> 1, value -> 1 group -> 0, value -> 2 group -> 1, value -> 3 group -> 0, value -> 4 group -> 1, value -> 5 group -> 0, value -> 6
buffer
Observable.range(0, 7) .buffer(3) .subscribe(object : Subscriber<List<Int>>() { override fun onNext(t: List<Int>) { println(t) } override fun onCompleted() { } override fun onError(e: Throwable?) { } })
[0, 1, 2] [3, 4, 5] [6]
作者:ChenME
链接:https://www.jianshu.com/p/15f80333c19f