Skip to content
On this page

RxJava 的基本用法,包括发送最简单的使用、线程调度、变换。

RxJava

一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。

说白了,就是让异步代码逻辑更简洁!!!

观察者模式:观察者需要在被观察者的某个状态更新时立即获得通知从而作出响应。要么观察者时刻检查被观察者,要么使用订阅方式让被观察者主动通知。

安装
groovy
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.9'
使用
  1. 创建观察者
  2. 创建被观察者
  3. 观察者订阅被观察者
java
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        System.out.println("observer: " + d);
    }

    @Override
    public void onNext(@NonNull String s) {
        System.out.println("observer: " + s);
    }

    @Override
    public void onError(@NonNull Throwable e) {
        e.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("observer: onComplete");
    }
};

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
      	// 在 subscribe() 成功后调用
        e.onNext("aa");
        e.onNext("bb");
        e.onNext("cc");
        e.onComplete();
    }
});

observable.subscribe(observer);
kotlin
val observer = object : Observer<String> {
    override fun onSubscribe(@NonNull d: Disposable) {
      	// 用于解除订阅关系 d.dispose()
        println("observer: " + d)
    }

    override fun onNext(@NonNull s: String) {
      	// 发射器,用于发射数据和通知
        println("observer: " + s)
    }

    override fun onError(@NonNull e: Throwable) {
        e.printStackTrace()
    }

    override fun onComplete() {
        println("observer: onComplete")
    }
}

val observable = Observable.create(ObservableOnSubscribe<String> { e ->
    e.onNext("aa")
    e.onNext("bb")
    e.onNext("cc")
    e.onComplete()
})

observable.subscribe(observer)
不完整定义的回调

Action Consumer BiConsumer Consumer<Object[]>

java
Observable.just("aa", "bb", "cc")
  .subscribe(new Consumer<String>() {
      @Override
      public void accept(@NonNull String s) throws Exception {
       	 System.out.println(s);
      }
  });
kotlin
Observable.just("aa", "bb", "cc")
	.subscribe(::println)
线程控制

subscribeOn 指定生产事件的线程,只有第一次指定会生效。

observerOn 指定消费事件的线程,每次都可以指定不同线程。

kotlin
Observable.just("aa", "bb", "cc")
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
    .subscribe(::println)
变换

一对一变换 map

变换后的对象直接传送给 subscriber 的回调方法中。

kotlin
Observable.just(1, 2, 3)
    .map { integer -> "" + integer!! }
    .subscribe(::println)
java
Observable.just(1, 2, 3)
    .map(new Function<Integer, String>() {
        @Override
        public String apply(@NonNull Integer integer) throws Exception {
       	   return "" + integer;
        }
    })
    .subscribe(new Consumer<String>() {
        @Override
        public void accept(@NonNull String s) throws Exception {
       	   System.out.println(s);
        }
    });

一对多变换 flapMap

变换后返回一个新的 Observable 对象,包含「铺平后的众多事件」。

java
getToken()
    .flatMap(new Func1<String, Observable<User>>() {
        @Override
        public Observable<User> onNext(String token) {
            return getUser(token, userId);
        })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable error) {
            // Error handling
            ...
        }
    });
参考

给 Android 开发者的 RxJava 详解

RxJava2 浅析