大话RxJava:二、轻松学源码之基础篇

/ 0评 / 0

写在前面

前篇文章简单介绍了一下RxJava的概念与基本使用,本来准备继续说一下RxJava的进阶使用,包括一些复杂操作符的使用与线程控制的相关内容。然后想了想,不能这样急躁地去学如何使用,应该先稍微明白它的一些过程,后面学习起来思路更加清晰,这也是我之前学习RxJava的一个误区所在。

所以今天这篇文章,打算先认识一下基本的常见的两种操作符及其使用,然后采用图文的模式详细介绍一下RxJava的基础源码,主要包括create()与subscribe()这两个方法到底做了什么事情。

有了这两个基础后,后期学大量复杂且重要的操作符及其原理就有了基本的认知,至少学到复杂的地方不慌。而采用图文模式讲解源码看起来也不烦躁,轻轻松松得去学习。

下面是本文的目录:

初识操作符

所谓操作符(Operators),简单来说就是一种指令,表示需要执行什么样的操作。Rx中的每种编程语言实现都实现了一组操作符的集合。RxJava也不例外。

RxJava中有大量的操作符,比如创建操作符、变换操作符、过滤操作符等等,这些操作符要全部讲解完几乎是不可能也没必要的事情。所以我们只介绍常见的、有用的、重要的操作符。其他的如果用到直接到文档查找就行了。

下面就针对前篇文章的创建(create)来说明一下另外两种常见的创建操作符。先来回顾一下create()操作符,比较简单,这里就不解释了:

//创建Observable
Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("World");
        subscriber.onCompleted();
    }
    }).subscribe(new Observer<String>() {

        @Override
        public void onNext(String s) {
            Log.i("onNext ---> ", s);
        }

         @Override
        public void onCompleted() {
            Log.i("onCompleted ---> ", "完成");
        }

        @Override
        public void onError(Throwable e) {

        }
});

Observable.just()

首先给出定义:

Just操作符是创建一个将参数依次发送出来的Observable

具体一点来说就是, just() 中会接收1~9个参数,它会返回一个按照传入参数的顺序依次发送这些参数的Observable。

这样说可能还是不够清晰,所以画个图来看:

just

从图中可以看出,其实就是依次发送单个数据,它的具体写法是这样的,非常简单:

Observable.just("Hello","world");
//其实就相当于依次调用:
//subscriber.onNext("Hello");
//subscriber.onNext("World");

但是这里要注意一点,如果你传递null给just,它会返回一个发送null值的Observable,而不是返回一个空Observable(完全不发送任何数据的Observable)。后面会讲到,如果需要空Observable应该使用 Empty 操作符。

现在来看完整的代码,代码本身很简单,注意看Log日志:

//创建Observable
Observable.just("Hello", "World", null)
    .subscribe(new Observer<String>() {

        @Override
        public void onNext(String s) {
            if (s == null) {
                Log.i("onNext ---> ", "null");
            }else {
                Log.i("onNext ---> ", s);
            }
        }

        @Override
        public void onCompleted() {
            Log.i("onCompleted ---> ", "完成");
        }

        @Override
        public void onError(Throwable e) {
            Log.i("onError ---> ", "出错 --->" + e.toString());
        }
});

just Log

这里因为我们要打印字符串,所以不能为null,我就处理了一下,可以看到当发送 null 的时候,s确实等于null。

Observable.from()

尽管与just一样是创建操作符,但是from操作符稍微强大点。因为from操作符的作用是:

将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。

注意,这里不再是发送单个对象,而是直接发送一组对象。为了与just对比,也来画个图描述一下:

from

它的具体写法是这样的,也非常简单:

String[] str = new String[]{"Hello", "World"};
//创建Observable
Observable.from(str);

这里由于篇幅关系,我就不贴完整代码了,跟just是类似的,十分简单。

基础源码

讲了两个简单但是常用的操作符后,我们回过头来看一下之前的实现RxJava的代码,这里我打上了Log日志,来看一下每个方法执行的顺序。

//创建Observable
Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("World");
        subscriber.onCompleted();
        Log.i("执行顺序 ---> ", " call ");
    }
    }).subscribe(new Observer<String>() {

    @Override
    public void onNext(String s) {
        Log.i("onNext ---> ", s);
        Log.i("执行顺序 ---> ", " subscribe onNext");
    }

    @Override
    public void onCompleted() {
        Log.i("onCompleted ---> ", "完成");
        Log.i("执行顺序 ---> ", " subscribe onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        Log.i("onError ---> ", "出错 --->" + e.toString());
    }
});

好了,来看一下Log日志:

chain Log

从图中可以看到,subscribe方法先执行,等执行完成后再执行call方法。

好了,这就是结论。先在脑子里产生个印象,方便后面追溯。

create()

先来看看Observable的create()方法做了些什么?Ctrl点进去看看:

public static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(hook.onCreate(f));
}

啥事没干,就是返回一个 Observable

再看看它的构造函数,构造一下对象:

protected Observable(OnSubscribe<T> f) {
    this.onSubscribe = f;
}

再来看这个 hook.onCreate(f)hook 是啥呢?

hook是一个代理对象, 仅仅用作调试的时候可以插入一些测试代码。

static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

注意是仅仅,所以它几乎没啥用处,完全可以忽略。来看 hook.onCreate(f)

public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
    return f;
}

依然啥事没干,只是把 OnSubscribe 这个对象返回了一下。

OK,说到这里,虽然我一直在强调它“啥事没干”,其实仔细推敲,它还真做了三件事情:

说到这里,不知道大家看懂了没有。我第一次看这到这里的时候,还有略微有点模糊的,没关系,只要模糊那就画图理解:

create

这样看起来是不是轻松很多,create()就做了这样简单的事情,所以概括(但可能并不准确)地来说就是:

create()方法创建了一个Observable,且在这个Observable中有个OnSubscribe。

所以就画个简图就如下图所示这样,这个图要注意,之后还会扩展:

create简图

好了,create方法就分析到这里,虽然有点啰嗦,但是已经十分详细了。

subscribe()

现在来看另外一个比较重要的操作 subscribe() ,在前篇文章中说过,这个是将观察者(Observer)与被观察者(Observable)联系到一起的操作,也就是产生一种订阅(Subcribe)关系。

跟前面的一样,先来看一下源码,点进去是这样的:

public final Subscription subscribe(final Observer<? super T> observer) {
    if (observer instanceof Subscriber) {
        return subscribe((Subscriber<? super T>)observer);
    }
    return subscribe(new ObserverSubscriber<T>(observer));
}

之前我们说过一句话:

实质上,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。

在这里就能够看出,首先 if 中的语句意思是如果这个Observer已经是Subscriber类型,那就直接返回。如果不是的话 new了一个ObserverSubscriber ,再点进去看看:

public final class ObserverSubscriber<T> extends Subscriber<T> {
    final Observer<? super T> observer;

    public ObserverSubscriber(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        observer.onNext(t);
    }

    @Override
    public void onError(Throwable e) {
        observer.onError(e);
    }

    @Override
    public void onCompleted() {
        observer.onCompleted();
    }
}

果然,它还是转成了Subscriber类型,刚好印证了之前的话。所以为了方便起见,之后文章中,所有的观察者(Observer)我都用Subscriber来代替。 这是一个小插曲,注意一下就好。

好了,继续看 subscribe 源码:

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
    ... 
    hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
    ...
}

把一些暂时无关的代码省略掉来看,其实就是执行了一句 hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);

而这个 hook.onSubscribeStart 方法再点进去看看:

public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
        // pass through by default
        return onSubscribe;
    }

可以看到,竟然直接返回了一个 onSubscribe ,由于之前说过这个hook没什么作用,直接删掉,那就等于整个 subscribe 做了一件事就是 onSubscribe.call(subscriber) ,当然这个call里面的参数subscriber是我们代码中传递进去的。

而onSubscribe在create源码解析中我们已经知道是新建 ObservableA 的一个属性,所以总结来说,subscribe()方法做的事情就是这样:

ObservableA.onSubscribe.call(subscriber);

而调用 call 方法,就是调用传入的参数subscriber的onNext/onCompleted/onError方法。

这就是全部的过程。

看到这里,估计大家又迷糊了。没关系,依然画个图来说,图中省略了create中的创建步骤:

subscribe

 

结合图我们最后再顺一下思路:

不知道大家还记得那个Log日志么,从日志可以看到,将onNext与onCompleted方法执行完后,call方法才结束。这也印证了call方法回调Subscriber的方法这一说。

结语

好了,终于把源码中比较简单的部分讲解完了,等于是再复习了一边之前学的。

而且终于也把RxJava的入门知识讲解完了。后面的文章中,就开始学如何稍微高级一点的运用RxJava,比如map/flatmap操作符、lift的原理、线程控制的原理、各种运用场景等等,还有很长的路要走啊。

最后这是我第一次试着讲解源码,而且也在边学边分享,所以肯定有错误或不清楚的地方,欢迎大家指正与交流。

参考资料

RxJava基本流程和lift源码分析

彻底搞懂 RxJava — 基础篇

项目源码

IamXiaRui-Github-FirstRxJavaDemo

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注