Android教程網
  1. 首頁
  2. Android 技術
  3. Android 手機
  4. Android 系統教程
  5. Android 游戲
 Android教程網 >> Android技術 >> Android開發 >> 關於android開發 >> RxJava 和 RxAndroid 五(線程調度),rxjavarxandroid

RxJava 和 RxAndroid 五(線程調度),rxjavarxandroid

編輯:關於android開發

RxJava 和 RxAndroid 五(線程調度),rxjavarxandroid


對rxJava不了解的同學可以先看

RxJava 和 RxAndroid 一 (基礎)
RxJava 和 RxAndroid 二(操作符的使用)
RxJava 和 RxAndroid 三(生命周期控制和內存優化)

RxJava 和 RxAndroid 四(RxBinding的使用)

 

本文將有幾個例子說明,rxjava線程調度的正確使用姿勢。

例1

 Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })
                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;

  結果

/rx_call: main           -- 主線程
/rx_map: main        --  主線程
/rx_subscribe: main   -- 主線程

例2

   new Thread(new Runnable() {
            @Override
            public void run() {
                Logger.v( "rx_newThread" , Thread.currentThread().getName()  );
                rx();
            }
        }).start();

 void rx(){
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })
                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;

    }

 

      結果

/rx_newThread: Thread-564   -- 子線程
/rx_call: Thread-564              -- 子線程
/rx_map: Thread-564            -- 子線程 
/rx_subscribe: Thread-564    -- 子線程

 

  • 通過例1和例2,說明,Rxjava默認運行在當前線程中。如果當前線程是子線程,則rxjava運行在子線程;同樣,當前線程是主線程,則rxjava運行在主線程

 

例3

 Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })

                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())

                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;

  結果

/rx_call: RxCachedThreadScheduler-1    --io線程
/rx_map: main                                     --主線程
/rx_subscribe: main                              --主線程

 

例4

 Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })
                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })

                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())

                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ; 

      結果

/rx_call: RxCachedThreadScheduler-1     --io線程
/rx_map: RxCachedThreadScheduler-1   --io線程
/rx_subscribe: main                              --主線程

   

  • 通過例3、例4 可以看出  .subscribeOn(Schedulers.io())  和 .observeOn(AndroidSchedulers.mainThread()) 寫的位置不一樣,造成的結果也不一樣。從例4中可以看出 map() 操作符默認運行在事件產生的線程之中。事件消費只是在 subscribe() 裡面。
  • 對於 create() , just() , from()   等                 --- 事件產生   

               map() , flapMap() , scan() , filter()  等    --  事件加工

              subscribe()                                          --  事件消費

  •   事件產生:默認運行在當前線程,可以由 subscribeOn()  自定義線程

         事件加工:默認跟事件產生的線程保持一致, 可以由 observeOn() 自定義線程

       事件消費:默認運行在當前線程,可以有observeOn() 自定義

 

例5  多次切換線程

 

Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })

                .observeOn( Schedulers.newThread() )    //新線程

                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })

                .observeOn( Schedulers.io() )      //io線程

                .filter(new Func1<String, Boolean>() {
                    @Override
                    public Boolean call(String s) {
                        Logger.v( "rx_filter" , Thread.currentThread().getName()  );
                        return s != null ;
                    }
                })

                .subscribeOn(Schedulers.io())     //定義事件產生線程:io線程
                .observeOn(AndroidSchedulers.mainThread())     //事件消費線程:主線程

                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;

  結果

/rx_call: RxCachedThreadScheduler-1           -- io 線程
/rx_map: RxNewThreadScheduler-1             -- new出來的線程
/rx_filter: RxCachedThreadScheduler-2        -- io線程
/rx_subscribe: main                                   -- 主線程

 

  1. 上一頁:
  2. 下一頁:
熱門文章
閱讀排行版
Copyright © Android教程網 All Rights Reserved