RxJava框架学习(1)之框架概述

Posted by Mingo's Blog on 2017-03-01 00:00

Rx是Reactive Extensions的缩写

RxJava是响应式编程框架的Java实现, 据说最早实现是C#并流行起来

RxJava框架亮点

  • 把解决问题的思路的改变: 由面向对象编程的领域建模问题改为领域数据建模及数据转换, 使解决问题的核心更聚焦于数据本身

  • 大量的API参数(ObservableCreate, Operator, Observer)使用函数式接口(一个接口只有一个方法声明)响应式编程, 有函数式编程风格; 但Java目前还未有函数类型, 只能通过匿名内部类方式类似实现, 导致代码量激增; 如果使用Java8中的Lambda表达式, 可部分解决问题

  • 著名的链式编程, 可以看到整个数据被处理的完整过程, 解决思路更容易理解, 有问题也更容易定位解决

RxJava框架设计思想

RxJava设计思想

  • Message Driven: 数据,消息驱动
  • Responsive: 响应式
  • Rasilient: 弹性
  • Elastic: 灵活

框架组成

  • Observable各种创建器
  • Operator, 高阶函数, 操作符
  • Observer, 观察者

框架简单源码分析

示例代码:

1
2
3
4
5
6
Observable.create((ObservableEmitter<Integer>  emitter) -> {
        emitter.onNext(1);
    }).map((Integer integer) ->  integer * 2)
      .subscribe(integer -> {
        System.out.println(integer);
    });

1 代码中匿名内部类都采用了Java8的Lambda表达式写法

2 创建Observale对象

基本上每种Observable都有一个专门的Observable* 类, RxJava自带了一些常用的Observable创造器, 代码在io.reactivex.internal.operators.observable下

每个创造器的核心工作就是实现protected void subscribeActual(Observer<? super T> observer); 方法

Observable.create(source)需要ObservableCreate创造器, source对象实现如下:

(ObservaleEmitter emitter) -> {emitter.onNext()}; // 2.0

emitter对象会在第5步创建时说明

3 在RxJava中对数据处理由Operator对象来完成; 可以使用RxJava自带的, 如果自带的不能满足需求可以自定义扩展

Operator对象都是实现Function接口;

通常实现方式都是对当前Observable关注的Observer对象通过包装设计模式, 重写其onNext, onError, onComplite, onSubscribe方法, 达到功能增强的目的, 而最终的onNext方法的功能已经按Operator调用的顺序被层层增强

以map operator具体代码实现为例:

1
2
3
4
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

然后在ObservableMap实现类里重写subscribeActual方法, 核心一步

source.subscribe(new MapObserver<T, U>(t, function));

其中MapObserver就是对Observer(t)进行装饰, source是直到此Operator之前的Observale

每调用一个operator, 就对当前的observer包装一层, 达到功能叠加的目的; 比如调用3次operator操作

source.subscribe(new Operator1Observer(obsever, function1)); // 执行operator1, 3.a

source.subscribe(new Operator2Observer(obsever, function2)); // 执行operator2, 3.b

source.subscribe(new Operator3Observer(obsever, function3)); // 执行operator3, 3.c

实际上相当于

source.subscribe(new Operator3Observer(new Operator2Observer(new Operator1Observer(observer, function1), function2), function3));

4 创建Obsever, 即我们对经过一系列Operator处理后的被观察数据的最终处理方式: 此处仅仅是简单的打印输出

integer -> { System.out.println(integer); }

5 Observale与Obsever建立联系

Observable.subscribe(obsever);

当执行到此步的时候, 在subscribe方法里会调用Observable.subscribeActual(observer)

这个方法在Observale中是采用模板设计模式, 由Observale的具体子类负责实现真正的subscribeActual(observer), 亦此例中的ObservaleCreate类

无模板不框架!!

ObservableCreate的subscribeActual(observer)实现如下

1
2
3
4
5
6
7
8
9
CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 5.a
observer.onSubscribe(parent); // 5.b

try {   
    source.subscribe(parent); // 5.c
} catch (Throwable ex) {
    Exceptions.throwIfFatal(ex);
    parent.onError(ex); // 5.d
}       

代码不多, 但设计复杂精巧, 现逐行分析:

  • 在5.a步中把由第4步创建的observer封装成emitter对象(此对象最终传给第2步创建的source对象, 用来操作observer对象)
  • 在5.c步中由第2步创建ObservableOnSubscribe source对象调用subscribe方法(由调用方实现的)
  • 现在回过头来看ObservableOnSubscribe.subscribe方法中的实现:

emitter.onNext(T);

也就是在source对象中直接调用了observer对象的onNext方法(emitter是对observer的包装, 那就意味着在source对象的subscribe方法里无任何限制的调用observer对象的方法); 而emitter对象又是对第4步创建observer经过第3步一系列operator操作的层层包装的最终对象(详见第3步)

简单总结

  • Rx中核心对象, ObservableCreate, Operator, Observer
  • 按实际需求调用相应的ObservableCreate并实现subscribe方法(模板设计模式使用1)
  • 根据业务, 调用一系列的Operator, 对observer进行增加(模板设计模式使用2+装饰设计模式)
  • 创建Observer用来接受最终的数据

why? 个人思考

  • 通过模板设计模式尽可能屏蔽掉非业务代码(创建source, 创建Operator的Function, 创建Observer)
  • 通过装饰模式达到Operator可叠加的功能, 还可便于扩展(自带的Operator再多也不可能满足所有情况), 并附带链式编程副作用
  • 观察者设计模式, 从代码量上看Java的函数式编程代码量还是大, 体现不出跟传统方式(过程式)的优点; 但是他的最大作用在于解决问题过程中降维, 把实现功能的过程改为数据流的处理, 链式编程又能直观的看到数据处理的过程

参考资料