新技术论坛
搜索
查看: 990|回复: 0
打印 上一主题 下一主题

[Java] RxJava中操作符到底做了什么?

[复制链接]
  • TA的每日心情
    开心
    2016-12-9 18:18
  • 签到天数: 85 天

    连续签到: 1 天

    [LV.6]常住居民II

    扫一扫,手机访问本帖
    楼主
    跳转到指定楼层
    发表于 2016-11-18 07:39:43 | 只看该作者 |只看大图 回帖奖励 |正序浏览 |阅读模式


    RxJava今年彻底火了一把,其中最牛逼之处就是操作符了,以前只知道怎么用,这几天看了看源码,大致的弄清楚了操作符的工作过程,今天分享给大家。如果有什么不对地方,请大家多多指教。

    今天我们已filter为例,看代码:

    • Integer[] datas={1,2,3,4,5,6,7,8,9,10};
    • Observable.from(datas)
    •         .filter(new Func1<Integer, Boolean>() {
    •             @Override
    •             public Boolean call(Integer integer) {
    •                 return integer>=5;
    •             }
    •         })
    •         .subscribe(new Action1<Integer>() {
    •             @Override
    •             public void call(Integer integer) {
    •                 mText.append(integer.toString()+",");
    •             }
    •         });

    一个很简单的小例子,用过滤操作符 filter 找出大于等于5的数字。我们点进去看看源码中filter做了什么

    • public final Observable<T> filter(Func1<? super T, Boolean> predicate) {
    • return create(new OnSubscribeFilter<T>(this, predicate));
    • }

    调用了create()方法,等等我们什么时候是不是也用过create() 方法,我们在创建Observable时候也用过create()方法,原来创建了一个新的Observable返回出去了,那岂不是说我们的订阅者其实订阅的是这个新的Observable,我们继续往下看create方法,create方法需要的参数是一个OnSubscribe对象,那我们可以确定OnSubscribeFilter是OnSubscribe的一个实现类,我们点进去看看。

    • public final class OnSubscribeFilter<T> implements OnSubscribe<T> {
    •    
    •        final Observable<T> source;
    •    
    •        final Func1<? super T, Boolean> predicate;
    •    
    •        public OnSubscribeFilter(Observable<T> source, Func1<? super T, Boolean> predicate) {
    •            this.source = source;
    •            this.predicate = predicate;
    •        }

    果然不出我们所料,OnSubscribeFilter是OnSubscribe的实现类,我们看他的构造方法,传递了两个参数,第一个参数Observable对象,一个Func1,其中第一个参数就是我们我们自己创建的那个Observable,第二个参数使我们在外面写的Func1,然后保存了起来。我们都知道在subscribe()订阅的时候,OnSubscribe的call()方法。我们看看OnSubscribeFilter的call()方法都干了些什么

    • @Override
    •         public void call(final Subscriber<? super T> child) {
    •             FilterSubscriber<T> parent = new FilterSubscriber<T>(child, predicate);
    •             child.add(parent);
    •             source.unsafeSubscribe(parent);
    •         }

    出现了一个FilterSubscriber,什么鬼玩意儿,我们看看他是什么鬼

    • }
    •       @Override
    •       public void onError(Throwable e) {
    •           if (done) {
    •               RxJavaHooks.onError(e);
    •               return;
    •           }
    •           done = true;
    •           actual.onError(e);
    •       }
    •       @Override
    •       public void onCompleted() {
    •           if (done) {
    •               return;
    •           }
    •           actual.onCompleted();
    •       }
    •       @Override
    •       public void setProducer(Producer p) {
    •           super.setProducer(p);
    •           actual.setProducer(p);
    •       }
    •   }

    一个Subscriber的子类,我们看他的构造方法,两个参数,一个Subscriber一个Func1,我们在创建对象时候Subscriber对象是我们真正的从外界传过来的观察者,Func1呢使我们创建OnSubscribeFilter时候传递进来的对象,也就是我们在外界定义的Func1。

    回过头来我们继续看OnSubscribeFilter的call方法。我们看到source.unsafeSubscribe(parent),source是我们原来外界的Observable,他订阅了FilterSubscriber对象。我们在他的onNext方法中看到他根据func1.call(t)的返回值来判断是否让我们外界的真正的观察者调用onNext方法。

    看到这里有没有恍然大悟,啥?我都不知道你在说啥,额,那我们整体的屡屡。

    我们外界的代码,在subscribe()时候,Subscriber并不是订阅了我们自己写的Observable,Subscriber订阅的是filter方法返回的那个新的Observable对象,所以订阅时候会调用OnSubscribeFilter的call方法,OnSubscribeFilter才是我们订阅的被观察者的onSubscribe对象,在OnSubscribeFilter的call()方法中,我们让我们包装的FilterSubscriber订阅我们原来的被观察者,也就是我们在外界生成的那个Observable。我们在外界的Observable的onSubscribe对象的call方法中得到的观察者是FilterSubscriber对象,我们调用的onNext会回调到FilterSubscriber的onNext方法中。在FilterSubscriber的onNext方法中我们根据我们传递的Func1来判断是否要回调真正的Subscriber的onNext方法,在为true的时候我们才回调我们外界的观察者的onNext方法,也就起到了过滤的作用。这就是Filter的整个的流程。

    我们来测试下我们的小结论:

    • Observable.create(new Observable.OnSubscribe<Integer>() {
    •                @Override
    •                public void call(Subscriber<? super Integer> subscriber) {
    •                    Log.e("call:subscriber", "" + subscriber.getClass().getCanonicalName());
    •                    subscriber.onNext(5);
    •                }
    •            }).filter(new Func1<Integer, Boolean>() {
    •                @Override
    •                public Boolean call(Integer integer) {
    •                    return integer > 0;
    •                }
    •            }).subscribe(new Action1<Integer>() {
    •                @Override
    •                public void call(Integer integer) {
    •                     
    •                }
    •            });

    高级模式
    B Color Image Link Quote Code Smilies

    本版积分规则

    手机版|Archiver|开发者俱乐部 ( ICP/ISP证:辽B-2-4-20110106号 IDC证:辽B-1-2-20070003号 )

    GMT+8, 2024-12-23 11:14 , Processed in 0.137257 second(s), 22 queries .

    X+ Open Developer Network (xodn.com)

    © 2009-2017 沈阳讯网网络科技有限公司

    快速回复 返回顶部 返回列表