Observable是被观察者对象, 本文主要学习Observable的基本用法, Observable对象创建
观察者模式
观察者模式简而言之, 把要处理的数据做为被观察一方, 处理的结果给观察者
该模式的核心思想是把要处理的数据定为可观察的(Observable), 其它关心此数据的都是观察者(Observer), 两者通过订阅(subscribe)建立关联
在数据到达或说被观察者处理之前, 可以被一系列高阶函数进行处理(转换), 这也是著名的链式编程, 如:
1
2
3
Flowable.fromArray("Hello", "world")
.contains("world")
.subscribe(System.out::println);
实现业务功能变为数据流的一系列处理转换, 通过Observable及后续的高阶函数可以使我们更清晰的看到数据的转换过程(亦业务实现过程);这也是函数式编程的精髓
上图是Observable基本设计思想
此图有3个要素:
- 被观察者: 用最上面的一根箭头表示; 我们需要自行把业务抽象成数据, Observable封装了一系列工具方法来简化我们创建被观察者过程
- 处理: 数据流; 业务处理的关键, 我们需要对业务原始数据进行加工, 输出结果即为最下面的一根箭头; 我们可以对同一数据反复处理直到观察者满意为止
- 观察者: 数据处理完的最终结果就让观察者使用
创建Observable对象的方法
amb, ambArray
- 作用: 从一堆Observable里返回先发送数据的Observable
2个箭头表明有2个Observable, 上面的箭头先发送了红球, 最终返回的都是上面的数据;
- 示例:
1
2
3
4
5
6
7
8
Observable<Integer> source1 = Observable.just(1, 2, 3).delay(3, TimeUnit.SECONDS);
Observable<Integer> source2 = Observable.just(4, 5, 6).delay(2, TimeUnit.SECONDS);
Observable<Integer> source3 = Observable.just(7, 8, 9).delay(1, TimeUnit.SECONDS);
Observable.amb(Arrays.asList(source1, source2, source3))
.subscribe((integer) -> { System.out.println("--> " + integer); });
Observable.ambArray(source1, source2, source3).subscribe(integer -> {System.out.println("--> " + integer);});
combineLatest, combineLatestDelayError
- 作用: 把除最后一个Observable外的所有Observable的发送的数据与最后一个Observable发送的数据合并, 合并规则由combiner对象指定
- 示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
BiFunction<Media, Rating, ExtendedResult> combine = new BiFunction<Media, Rating, ExtendedResult>() {
@Override
public ExtendedResult apply(Media m, Rating r) {
return new ExtendedResult();
}
};
Observable<HorrorMovie> horrors = Observable.just(new HorrorMovie());
Observable<CoolRating> ratings = Observable.just(new CoolRating());
Observable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
Observable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
Observable.<Media, Rating, ExtendedResult> combineLatest(horrors, ratings, combine).blockingForEach(extendedAction);
Observable.<Media, Rating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
Observable.<Media, Rating, ExtendedResult> combineLatest(horrors, ratings, combine).blockingForEach(action);
Observable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine);
concat, concatArray, concatArrayDelayError, concatArrayEager, concatDelayError, concatEager
- 作用: 把所有Observable发送的数据合并到一起
- 示例:
1
2
3
4
5
bservable<Integer> source1 = Observable.just(1, 2, 3);
Observable<Integer> source2 = Observable.just(4, 5, 6, 7);
Observable<Integer> source3 = Observable.just(8, 9);
Observable.concat(source1, source2, source3).subscribe(integer -> System.out.println(integer));
create, unsafeCreate
- 作用: 通过ObservableOnSubscribe, ObservableSource来构建Observable
- 示例: ```java
```
defer
empty
error
from*
- fromArray
- fromCallable
- fromFuture
- fromIterable
- fromPublisher