大话RxJava:三、RxJava的中级使用方法

/ 0评 / 0

写在前面

前面两篇文章中介绍几乎全部都是基础,而且如果前面两篇吃透了的话,RxJava就算完全入门了。那入门之后就得学一些比较高级一点的用法及其原理了。

所以这篇文章来介绍一下RxJava中另一个核心内容—— 变换 。本来准备连它的原理一起说明,但是变换的原理稍微复杂了点,如果连在一起写可能篇幅过长,看起来也就比较枯燥。所以暂时不讲源码,先来看看它的基本使用。当然,依然是通过小案例的形式来说明,这样就不会枯燥。

说完几个基本变换操作符之后,再了解一下RxJava中的FuncX与ActionX的作用与区别,最后补充一下上一次说线程控制Scheduler的时候故意省略的一部分内容,至于省略的原因后面再说。

所以全文的目录如下:

神奇的变换

回顾

在说变换操作之前,先来回顾一下之前异步获取网络图片的小案例。

//创建被观察者
Observable.create(new Observable.OnSubscribe<Bitmap>() {
    /**
    * 复写call方法
    *
    * @param subscriber 观察者对象
    */
    @Override
    public void call(Subscriber<? super Bitmap> subscriber) {
        //通过URL得到图片的Bitmap对象
        Bitmap bitmap = GetBitmapForURL.getBitmap(url);
        //回调观察者方法
        subscriber.onNext(bitmap);
        subscriber.onCompleted();
    }
})
.subscribeOn(Schedulers.io()) // 指定subscribe()发生在IO线程
.observeOn(AndroidSchedulers.mainThread()) // 指定Subscriber的回调发生在UI线程
.subscribe(new Observer<Bitmap>() {   //订阅观察者(其实是观察者订阅被观察者)

    @Override
    public void onNext(Bitmap bitmap) {
        mainImageView.setImageBitmap(bitmap);
    }

    @Override
    public void onCompleted() {
        mainProgressBar.setVisibility(View.GONE);
        Log.i(" onCompleted ---> ", "完成");
    }

    @Override
    public void onError(Throwable e) {
        Log.e(" onError --->", e.toString());
    }
 });

案例本身是没有问题的,但是在这里需要注意一下,在Observable的create方法中就已经规定了发送的对象的类型是 Bitmap ,而这个Bitmap是通过图片的Url来获取得到的,得到后再发送给Subscriber(就是观察者Observer,再强调一下后面的文章一律用Subscriber代替Observer,原因上一篇文章已经强调过了,不再赘述)。

简单来说,就是一开始我们就规定好了要发送一个对象的类型。

那是否可以这样设想,我们一开始发送 String 类型的Url,然后通过某种方式再将得到的 Bitmap 发送出去呢?

用图来说就是这样:

什么操作

Map

答案当然是有的,RxJava中提供了一种操作符: Map ,它的官方定义是这样的:

Map操作符对原始Observable发射的每一项数据应用一个你选择的函数,然后返回一个发射这些结果的Observable。

这句话简单来说就是,它对Observable发送的每一项数据都应用一个函数,并在函数中执行变换操作。

如果还是不明白的话,那就画图去理解,这个图也是官方的图,只不过我重新画了一下:

map

从图中可以看到,这是一对一的转换,就是一个单独的数据转成另一个单独的数据,这一点需要跟后面的 flatmap 对比,所以需要留意一下这句话。

好了,了解了基本原理,现在就来给之前的代码进行一个改造:

//先传递String类型的Url
Observable.just(url)
    .map(new Func1<String, Bitmap>() {
        @Override
        public Bitmap call(String s) {
            //通过Map转换成Bitmap类型发送出去
            return GetBitmapForURL.getBitmap(s);
        }
    })
    .subscribeOn(Schedulers.io()) // 指定subscribe()发生在IO线程
    .observeOn(AndroidSchedulers.mainThread()) // 指定Subscriber的回调发生在UI线程
    //可以看到,这里接受的类型是Bitmap,而不是String
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            mainImageView.setImageBitmap(bitmap);
            mainProgressBar.setVisibility(View.GONE);
        }
});

这里我们先用just操作符传递一个String类型的Url进去,然后在map操作符中,利用 Func1 类的call方法返回一个 Bitmap 出去,最后在 subscribe操作中的 Action1 类中接收一个 Bitmap 对象。

这样就成功的将初始Observable所发送参数的类型通过 map 转换成了其他的类型。这就是 map 操作符的妙用。

FuncX与ActionX

在上面的代码中,出现了这两个类: Func1Action1 ,这是什么意思呢?

ActionX

先来解释Action1。点开它的源码:

/**
 * A one-argument action.
 * @param <T> the first argument type
 */
public interface Action1<T> extends Action {
    void call(T t);
}

原来是有一个参数的接口,接口中有一个 单参数无返回值的call方法。由于 onNext(T obj)onError(Throwable error) 也是单参数无返回值的,因此 Action1 可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现 不完整定义的回调

也就是说,这种回调只调用 onNextonError 两个方法,并不是完整的回调(完整的是回调三个方法)。

而对于这种不完整的回调,RxJava 会自动根据定义创建出 Subscriber

另外,与Action1类似的是 Action0 ,这个也比较常用,依然点进去看一下源码:

/**
 * A zero-argument action.
 */
public interface Action0 extends Action {
    void call();
}

跟上面的一对比,一下就恍然大悟了,其实就是 无参数无返回值的call方法 ,由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来,作为一个参数传入 subscribe() 以实现 不完整定义的回调

除此之外:

RxJava 是提供了多个 ActionX 形式的接口 (例如 Action2, Action3) 的,它们可以被用以包装不同的无返回值的方法。

好吧,说得通俗一点:

FuncX

了解了ActionX之后,来看这个Func1。点进去看源码:

/**
 * Represents a function with one argument.
 * @param <T> the first argument type
 * @param <R> the result type
 */
public interface Func1<T, R> extends Function {
    R call(T t);
}

一对比,很容易发现,它跟Action1很相似,也是RxJava的一个接口。但是有一个明显的区别在于,Func1包装的是 有返回值 的方法。

而且与ActionX一样,FuncX也有很多个,主要用于不同个数的参数的方法。我们只要记着一点:

FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法。

FlatMap

好了,至此就把Map说完了。还记得之前强调的那句话么,Map是一对一的转换,那么有没有一对多的转换呢?当然有,就是现在要说的FlatMap

依然先看官方定义:

FlatMap操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。

好吧,定义总是很迷糊。没关系,现在尝试用图的形式来说明:

flatmap_1

简单来说就是分别将一组数据中的每个数据进行转换,转换后再把转换后的数据合并到一条序列上进行发送。

不过需要注意的是,转换后的每个数据本身其实也是一个可以发送数据的 Observable ,所以将上面图简化一下就是如下图所示:

flatmap_2

从简图可以看出,与Map相比较,FlatMap是能进行一对多的转换。

好了,闲话不多说,我们来看具体的案例。案例就是一个GridView异步加载多张网络图片。

对于这个项目我事先说明两点:

好了,为了回顾前文from操作符的使用,我们将四个图片的Url加到一个数据中去,也就是一组Url数据:

private final String url1 = "http://www.iamxiarui.com/wp-content/uploads/2016/06/套路.png";
private final String url2 = "http://www.iamxiarui.com/wp-content/uploads/2016/06/为什么我的流量又没了.png";
private final String url3 = "http://www.iamxiarui.com/wp-content/uploads/2016/05/cropped-iamxiarui.com_2016-05-05_14-42-31.jpg";
private final String url4 = "http://www.iamxiarui.com/wp-content/uploads/2016/05/微信.png";

//一组Url数据
private final String[] urls = new String[]{url1, url2, url3, url4};

然后来看flatmap如何处理:

//先传递String类型的Url
Observable.from(urls)
    .flatMap(new Func1<String, Observable<String>>() {
        @Override
        public Observable<String> call(String s) {
            return Observable.just(s);
        }
    })

可以看到,它是将 一组String类型的Urls 转换成一个 发送单独的String类型Url的Observable

既然转换成了能够发送单独数据的Observable,那么就简单多了,就用刚刚学的map操作符吧:

    .map(new Func1<String, Bitmap>() {
        @Override
        public Bitmap call(String s) {
            //通过Map转换成Bitmap类型发送出去
            return GetBitmapForURL.getBitmap(s);
        }
    })
    .subscribeOn(Schedulers.io()) // 指定subscribe()发生在IO线程
    .observeOn(AndroidSchedulers.mainThread()) // 指定Subscriber的回调发生在UI线程
    //可以看到,这里接受的类型是Bitmap,而不是String
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            mainImageView.setImageBitmap(bitmap);
            mainProgressBar.setVisibility(View.GONE);
        }
});

现在我们来看看运行后的动态图:

加载多张图片

可以看到,它是依次加载各张图片。

还记得我之前说这个并不是最好的案例么,为什么呢?因为Flatmap有一个特性:

FlatMap对这些Observables发射的数据做的合并操作可能是交错的。

什么意思呢?也就是这一组数据转换成单独数据后可能顺序会发生改变,从我这个案例来看,并没有出现这种情况,所以我说这并不是一个最完美的案例。

那么有人就问了,如何让它不产生交错呢?

RxJava还给我们提供了一个 concatMap 操作符,它类似于最简单版本的flatMap,但是它 按次序连接 而不是合并那些生成的Observables,然后产生自己的数据序列。

这个比较简单,我就不写案例演示了。

好了至此我们就将 常用且非常重要的 变换操作符讲完了。后面的文章会具体分析它的原理。

再话Scheduler

最后呢,想对Scheduler做一些补充。

还记得之前说Scheduler的时候介绍的两个操作符么:

现在我们多介绍两个操作符。

doOnSubscribe

之前在说Subscriber与Observer的不同的时候,提到过Subscriber多了两个方法。其中 onStart() 方法发生在 subscribe() 方法调用后且事件发送之前 是一个进行初始化操作的方法。但是这个初始化操作并不能指定线程。

就那我这个案例来说,里面有一个进度条,如果要显示进度条的话必须在主线程中执行。但是我们事先并不知道subscribeOn()方法会指定什么样的线程。所以在onStart方法中执行一些初始化操作是比较有风险的。

那该怎么办呢?

RxJava中给我们提供了另外一种操作符: doOnSubscribe ,这个操作符跟onStart方法一样,都是在 subscribe() 方法调用后且事件发送之前 执行,所以我们一样可以在这里面进行初始化的操作。而区别在于它可以指定线程。

默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。

关于这句话我有两点疑问:

先撇开疑问,来看一下用法:

Observable.just(url)    //IO线程
    .map(new Func1<String, Bitmap>() {
        @Override
        public Bitmap call(String s) {
            Log.i(" map ---> ", "执行");
            Log.i(" map ---> ", Thread.currentThread().getName());
            return GetBitmapForURL.getBitmap(s);
        }
    })
    .subscribeOn(Schedulers.io()) // 指定subscribe()发生在IO线程
    .doOnSubscribe(new Action0() { //需要在主线程中执行
        @Override
        public void call() {
            mainProgressBar.setVisibility(View.VISIBLE);
            Log.i(" doOnSubscribe ---> ", "执行");
            Log.i(" doOnSubscribe ---> ", Thread.currentThread().getName());
        }
    })
    .subscribeOn(AndroidSchedulers.mainThread()) // 指定subscribe()发生在主线程
    .observeOn(AndroidSchedulers.mainThread()) // 指定Subscriber的回调发生在主线程
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            mainImageView.setImageBitmap(bitmap);
            mainProgressBar.setVisibility(View.GONE);
            Log.i(" subscribe ---> ", "执行");
            Log.i(" subscribe ---> ", Thread.currentThread().getName());
        }
});

下面这是执行的Log日志:

log1

可以看到,从 onClick() 触发后,先执行了 doOnSubscribe() 然后执行 map() ,最后执行绑定操作 subscribe() 。也就是说,它确实是在数据发送之前调用的,完全可以做初始化操作。

好了,现在我们来解决疑问,先解决第二点:什么是最近的? 将代码改成这样:

... 
.subscribeOn(Schedulers.newThread()) // 指定subscribe()发生在新线程
.doOnSubscribe(new Action0() { //需要在主线程中执行
    @Override
    public void call() {
        Log.i(" doOnSubscribe ---> ", "执行");
        Log.i(" doOnSubscribe ---> ", Thread.currentThread().getName());
    }
})
.subscribeOn(Schedulers.io()) // 指定subscribe()发生在IO线程
.subscribeOn(AndroidSchedulers.mainThread()) // 指定subscribe()发生在主线程
...

我故意将 doOnSubscribe 写在两个 subscribeOn 之间,且后面有两个subscribeOn ,现在来看日志:

log3

从日志明显可以看出,doOnSubscribe() 执行在IO线程,所以结论是:

  • 如果在doOnSubscribe()之后指定了subscribeOn(),它决定了doOnSubscribe()在哪种线程中执行。
    • (1)doOnSubscribe()之前的subscribeOn()不会影响它。
    • (2)doOnSubscribe()之后的subscribeOn(),且是最近的才会影响它。

再来看第二个疑问:默认线程在哪里? 将代码改成这样:

...
.subscribeOn(Schedulers.io()) // 指定subscribe()发生在IO线程
.doOnSubscribe(new Action0() { //需要在主线程中执行
    @Override
    public void call() {
        Log.i(" doOnSubscribe ---> ", "执行");
        Log.i(" doOnSubscribe ---> ", Thread.currentThread().getName());
    }
})
.observeOn(Schedulers.io()) // 指定Subscriber的回调发生在io线程
...

来看Log:

log4

大家看到这个日志肯定会有疑问,我当时也非常有疑问,为什么subscribeOn() 与 observeOn() 都指定了IO线程,且 doOnSubscribe() 之后并没有 subscribeOn() ,这个时候它应该默认执行在 subscribe() 所在线程。

而 subscribe() 所在线程已经被 observeOn() 指定在了IO线程,所以此时它应该执行在IO线程才对啊,为什么还是 main 线程呢?

我找了翻看了WiKi,找了很多资料,甚至看了源码都没有找到是什么原因。

如果有人知道,请告诉我,谢谢!

doOnNext

由于from与flatmap操作符能发送多个数据,假设有这样的需求,需要在每个数据发送的时候提示一下,告诉我们又发了一个数据,那该如何做呢?

RxJava中给我们提供了一个操作符: doOnNext() ,这个操作符允许我们在每次输出一个元素之前做一些其他的事情,比如提示啊保存啊之类的操作。

具体用法很简单,如下图所示,这个代码也就是上面flatmap案例的完整代码:

Observable.from(urls)
    .flatMap(new Func1<String, Observable<String>>() {
        @Override
        public Observable<String> call(String s) {
            return Observable.just(s);
        }
    })
    .map(new Func1<String, Bitmap>() {
        @Override
        public Bitmap call(String s) {
            return GetBitmapForURL.getBitmap(s);
        }
    })
    .subscribeOn(Schedulers.io()) // 指定subscribe()发生在IO线程
    .observeOn(AndroidSchedulers.mainThread()) // 指定后面所发生的回调发生在主线程
    .doOnNext(new Action1<Bitmap>() {    //每运行一次所要执行的操作
        @Override
        public void call(Bitmap bitmap) {
            Toast.makeText(OtherActivity.this, "图片增加", Toast.LENGTH_SHORT).show();
        }
    })
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            //将获取到的Bitmap对象添加到集合中
            list.add(bitmap);
            //设置图片
            gvOther.setAdapter(new GridViewAdapter(OtherActivity.this, list));
            pbOther.setVisibility(View.GONE);
        }
    });
}

来看运行的动态图:

加载多张图片2

可以看到,在每张图片的加载过程中都有弹窗提示图片增加,这就是doOnNext操作符的作用。

结语

好了,今天的全部内容都讲解完毕了。大部分都是用法,而这些用法与基础用法相比较起来都或多或少复杂了一点,所以我就将它称为中级运用。

跟前面的基础一样,用法讲完了就需要了解其原理了。所以后面的文章将会讲解一下 变换 的原理,仍然是通过图文的形式轻轻松松地去学。

而每次写文章过程中,都能发现自己学习过程中的理解不当或错误的地方,现在分享出来。但是肯定还会有不对的地方,所以希望大家如果有不同意见给予指正或与我交流,谢谢!

大话RxJava:一、初识RxJava与基本运用

大话RxJava:二、轻松学源码之基础篇

参考资料

给 Android 开发者的 RxJava 详解

RxJava文档和教程

深入浅出RxJava(二:操作符)

项目源码

IamXiaRui-Github-FirstRxJavaDemo

发表评论

电子邮件地址不会被公开。 必填项已用*标注