Appearance
RxJava 的基本用法,包括发送最简单的使用、线程调度、变换。
RxJava
一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。
说白了,就是让异步代码逻辑更简洁!!!
观察者模式:观察者需要在被观察者的某个状态更新时立即获得通知从而作出响应。要么观察者时刻检查被观察者,要么使用订阅方式让被观察者主动通知。
安装
groovy
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.9'
使用
- 创建观察者
- 创建被观察者
- 观察者订阅被观察者
java
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("observer: " + d);
}
@Override
public void onNext(@NonNull String s) {
System.out.println("observer: " + s);
}
@Override
public void onError(@NonNull Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("observer: onComplete");
}
};
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
// 在 subscribe() 成功后调用
e.onNext("aa");
e.onNext("bb");
e.onNext("cc");
e.onComplete();
}
});
observable.subscribe(observer);
kotlin
val observer = object : Observer<String> {
override fun onSubscribe(@NonNull d: Disposable) {
// 用于解除订阅关系 d.dispose()
println("observer: " + d)
}
override fun onNext(@NonNull s: String) {
// 发射器,用于发射数据和通知
println("observer: " + s)
}
override fun onError(@NonNull e: Throwable) {
e.printStackTrace()
}
override fun onComplete() {
println("observer: onComplete")
}
}
val observable = Observable.create(ObservableOnSubscribe<String> { e ->
e.onNext("aa")
e.onNext("bb")
e.onNext("cc")
e.onComplete()
})
observable.subscribe(observer)
不完整定义的回调
Action
Consumer
BiConsumer
Consumer<Object[]>
java
Observable.just("aa", "bb", "cc")
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println(s);
}
});
kotlin
Observable.just("aa", "bb", "cc")
.subscribe(::println)
线程控制
subscribeOn
指定生产事件的线程,只有第一次指定会生效。
observerOn
指定消费事件的线程,每次都可以指定不同线程。
kotlin
Observable.just("aa", "bb", "cc")
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(::println)
变换
一对一变换 map
变换后的对象直接传送给 subscriber 的回调方法中。
kotlin
Observable.just(1, 2, 3)
.map { integer -> "" + integer!! }
.subscribe(::println)
java
Observable.just(1, 2, 3)
.map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return "" + integer;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println(s);
}
});
一对多变换 flapMap
变换后返回一个新的 Observable 对象,包含「铺平后的众多事件」。
java
getToken()
.flatMap(new Func1<String, Observable<User>>() {
@Override
public Observable<User> onNext(String token) {
return getUser(token, userId);
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override
public void onNext(User user) {
userView.setUser(user);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable error) {
// Error handling
...
}
});