RxJava框架学习(2)之Observable说明及创建

Posted by Mingo's Blog on 2017-03-02 00:00

Observable是被观察者对象, 本文主要学习Observable的基本用法, Observable对象创建

观察者模式

观察者模式简而言之, 把要处理的数据做为被观察一方, 处理的结果给观察者

该模式的核心思想是把要处理的数据定为可观察的(Observable), 其它关心此数据的都是观察者(Observer), 两者通过订阅(subscribe)建立关联

在数据到达或说被观察者处理之前, 可以被一系列高阶函数进行处理(转换), 这也是著名的链式编程, 如:

1
2
3
   Flowable.fromArray("Hello", "world")
	   .contains("world")
	   .subscribe(System.out::println);

实现业务功能变为数据流的一系列处理转换, 通过Observable及后续的高阶函数可以使我们更清晰的看到数据的转换过程(亦业务实现过程);这也是函数式编程的精髓

上图是Observable基本设计思想

此图有3个要素:

  • 被观察者: 用最上面的一根箭头表示; 我们需要自行把业务抽象成数据, Observable封装了一系列工具方法来简化我们创建被观察者过程
  • 处理: 数据流; 业务处理的关键, 我们需要对业务原始数据进行加工, 输出结果即为最下面的一根箭头; 我们可以对同一数据反复处理直到观察者满意为止
  • 观察者: 数据处理完的最终结果就让观察者使用

创建Observable对象的方法

amb, ambArray

  • 作用: 从一堆Observable里返回先发送数据的Observable

2个箭头表明有2个Observable, 上面的箭头先发送了红球, 最终返回的都是上面的数据;

  • 示例:
1
2
3
4
5
6
7
8
   Observable<Integer> source1 = Observable.just(1, 2, 3).delay(3, TimeUnit.SECONDS);
   Observable<Integer> source2 = Observable.just(4, 5, 6).delay(2, TimeUnit.SECONDS);
   Observable<Integer> source3 = Observable.just(7, 8, 9).delay(1, TimeUnit.SECONDS);

   Observable.amb(Arrays.asList(source1, source2, source3))
           .subscribe((integer) -> { System.out.println("--> " + integer); });

   Observable.ambArray(source1, source2, source3).subscribe(integer -> {System.out.println("--> " + integer);});

combineLatest, combineLatestDelayError

  • 作用: 把除最后一个Observable外的所有Observable的发送的数据与最后一个Observable发送的数据合并, 合并规则由combiner对象指定

  • 示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
BiFunction<Media, Rating, ExtendedResult> combine = new BiFunction<Media, Rating, ExtendedResult>() {
    @Override
    public ExtendedResult apply(Media m, Rating r) {
        return new ExtendedResult();
    }
};

Observable<HorrorMovie> horrors = Observable.just(new HorrorMovie());
Observable<CoolRating> ratings = Observable.just(new CoolRating());

Observable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
Observable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
Observable.<Media, Rating, ExtendedResult> combineLatest(horrors, ratings, combine).blockingForEach(extendedAction);
Observable.<Media, Rating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
Observable.<Media, Rating, ExtendedResult> combineLatest(horrors, ratings, combine).blockingForEach(action);

Observable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine);

concat, concatArray, concatArrayDelayError, concatArrayEager, concatDelayError, concatEager

  • 作用: 把所有Observable发送的数据合并到一起

  • 示例:
1
2
3
4
5
bservable<Integer> source1 = Observable.just(1, 2, 3);
Observable<Integer> source2 = Observable.just(4, 5, 6, 7);
Observable<Integer> source3 = Observable.just(8, 9);

Observable.concat(source1, source2, source3).subscribe(integer -> System.out.println(integer));

create, unsafeCreate

  • 作用: 通过ObservableOnSubscribe, ObservableSource来构建Observable

  • 示例: ```java

```

defer

empty

error

from*

  • fromArray
  • fromCallable
  • fromFuture
  • fromIterable
  • fromPublisher

generate

interval, intervalRange

just

merge, mergeArray, mergeArrayDelayError, mergeDelayError

never

range, rangeLong

sequenceEqual

switchOnNext, switchOnNextDelayError

timer

using

wrap

zip, zipArray, zipIterable

参考