Rx是Reactive Extensions的缩写
RxJava是响应式编程框架的Java实现, 据说最早实现是C#并流行起来
RxJava框架亮点
-
把解决问题的思路的改变: 由面向对象编程的领域建模问题改为领域数据建模及数据转换, 使解决问题的核心更聚焦于数据本身
-
大量的API参数(ObservableCreate, Operator, Observer)使用函数式接口(一个接口只有一个方法声明)响应式编程, 有函数式编程风格; 但Java目前还未有函数类型, 只能通过匿名内部类方式类似实现, 导致代码量激增; 如果使用Java8中的Lambda表达式, 可部分解决问题
-
著名的链式编程, 可以看到整个数据被处理的完整过程, 解决思路更容易理解, 有问题也更容易定位解决
RxJava框架设计思想
- Message Driven: 数据,消息驱动
- Responsive: 响应式
- Rasilient: 弹性
- Elastic: 灵活
框架组成
- Observable各种创建器
- Operator, 高阶函数, 操作符
- Observer, 观察者
框架简单源码分析
示例代码:
1
2
3
4
5
6
Observable.create((ObservableEmitter<Integer> emitter) -> {
emitter.onNext(1);
}).map((Integer integer) -> integer * 2)
.subscribe(integer -> {
System.out.println(integer);
});
1 代码中匿名内部类都采用了Java8的Lambda表达式写法
2 创建Observale对象
基本上每种Observable都有一个专门的Observable*
类, RxJava自带了一些常用的Observable创造器, 代码在io.reactivex.internal.operators.observable下
每个创造器的核心工作就是实现protected void subscribeActual(Observer<? super T> observer); 方法
Observable.create(source)需要ObservableCreate创造器, source对象实现如下:
(ObservaleEmitter
emitter) -> {emitter.onNext()}; // 2.0
emitter对象会在第5步创建时说明
3 在RxJava中对数据处理由Operator对象来完成; 可以使用RxJava自带的, 如果自带的不能满足需求可以自定义扩展
Operator对象都是实现Function接口;
通常实现方式都是对当前Observable关注的Observer对象通过包装设计模式, 重写其onNext, onError, onComplite, onSubscribe方法, 达到功能增强的目的, 而最终的onNext方法的功能已经按Operator调用的顺序被层层增强
以map operator具体代码实现为例:
1
2
3
4
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
然后在ObservableMap实现类里重写subscribeActual方法, 核心一步
source.subscribe(new MapObserver<T, U>(t, function));
其中MapObserver就是对Observer(t)进行装饰, source是直到此Operator之前的Observale
每调用一个operator, 就对当前的observer包装一层, 达到功能叠加的目的; 比如调用3次operator操作
source.subscribe(new Operator1Observer(obsever, function1)); // 执行operator1, 3.a
source.subscribe(new Operator2Observer(obsever, function2)); // 执行operator2, 3.b
source.subscribe(new Operator3Observer(obsever, function3)); // 执行operator3, 3.c
实际上相当于
source.subscribe(new Operator3Observer(new Operator2Observer(new Operator1Observer(observer, function1), function2), function3));
4 创建Obsever, 即我们对经过一系列Operator处理后的被观察数据的最终处理方式: 此处仅仅是简单的打印输出
integer -> { System.out.println(integer); }
5 Observale与Obsever建立联系
Observable.subscribe(obsever);
当执行到此步的时候, 在subscribe方法里会调用Observable.subscribeActual(observer)
这个方法在Observale中是采用模板设计模式, 由Observale的具体子类负责实现真正的subscribeActual(observer), 亦此例中的ObservaleCreate类
无模板不框架!!
ObservableCreate的subscribeActual(observer)实现如下
1
2
3
4
5
6
7
8
9
CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 5.a
observer.onSubscribe(parent); // 5.b
try {
source.subscribe(parent); // 5.c
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex); // 5.d
}
代码不多, 但设计复杂精巧, 现逐行分析:
- 在5.a步中把由第4步创建的observer封装成emitter对象(此对象最终传给第2步创建的source对象, 用来操作observer对象)
- 在5.c步中由第2步创建ObservableOnSubscribe source对象调用subscribe方法(由调用方实现的)
- 现在回过头来看ObservableOnSubscribe.subscribe方法中的实现:
emitter.onNext(T);
也就是在source对象中直接调用了observer对象的onNext方法(emitter是对observer的包装, 那就意味着在source对象的subscribe方法里无任何限制的调用observer对象的方法); 而emitter对象又是对第4步创建observer经过第3步一系列operator操作的层层包装的最终对象(详见第3步)
简单总结
- Rx中核心对象, ObservableCreate, Operator, Observer
- 按实际需求调用相应的ObservableCreate并实现subscribe方法(模板设计模式使用1)
- 根据业务, 调用一系列的Operator, 对observer进行增加(模板设计模式使用2+装饰设计模式)
- 创建Observer用来接受最终的数据
why? 个人思考
- 通过模板设计模式尽可能屏蔽掉非业务代码(创建source, 创建Operator的Function, 创建Observer)
- 通过装饰模式达到Operator可叠加的功能, 还可便于扩展(自带的Operator再多也不可能满足所有情况), 并附带链式编程副作用
- 观察者设计模式, 从代码量上看Java的函数式编程代码量还是大, 体现不出跟传统方式(过程式)的优点; 但是他的最大作用在于解决问题过程中降维, 把实现功能的过程改为数据流的处理, 链式编程又能直观的看到数据处理的过程