Android教程網
  1. 首頁
  2. Android 技術
  3. Android 手機
  4. Android 系統教程
  5. Android 游戲
 Android教程網 >> Android技術 >> 關於Android編程 >> RxJava RxAndroid 理論結合實踐

RxJava RxAndroid 理論結合實踐

編輯:關於Android編程

1 操作符的使用

1、merge操作符,合並觀察對象

    Observable observable1 = Observable.just(6,7,8) ;
    Observable observable2 = Observable.just("a","b","c") ;
    //合並數據  先發送observable2的全部數據,然後發送 observable1的全部數據
    Observable observable = Observable.merge( observable2 , observable1 ) ;
    observable.subscribe(new Action1() {
        @Override
        public void call(Object o) {
            Log.e("call==",""+o);
        }
    });

2、zip 操作符,合並多個觀察對象的數據。並且允許 Func2()函數重新發送合並後的數據

Observable observable1 = Observable.just("6","7","8") ;
        Observable observable2 = Observable.just("a","b","c") ;

        Observable observable3 =  Observable.zip(observable1, observable2, new Func2() {
            @Override
            public String call(String s1 , String s2 ) {
                return s1 + s2  ;
            }
        }) ;

        observable3.subscribe(new Action1() {
            @Override
            public void call(Object o) {
                Log.e( "zip-- ",""+ o );
            }
        }) ;

3 scan累加器操作符的使用

Observable observable = Observable.just( 1 , 2 , 3 , 4 , 5  ) ;
        observable.scan(new Func2() {
            @Override
            public Integer call(Integer o, Integer o2) {
                return o + o2 ;
            }
        }).subscribe(new Action1() {
                    @Override
                    public void call(Object o) {
                        Log.e( "scan-- ",""+o );
                    }
       });

4 filter 過濾操作符的使用

Observable observable = Observable.just( 1 , 2 , 3 , 4 , 5 , 6 , 7 ) ;
        observable.filter(new Func1() {
            @Override
            public Boolean call(Integer o) {
                //數據大於4的時候才會被發送
                return o > 4 ;
            }
        }).subscribe(new Action1() {
            @Override
            public void call(Object o) {
               Log.e( "filter-- ",""+ o );
            }
        });

5 消息數量過濾操作符的使用
* take :取前n個數據
* takeLast:取後n個數據
* first 只發送第一個數據
* last 只發送最後一個數據
* skip() 跳過前n個數據發送後面的數據
* skipLast() 跳過最後n個數據,發送前面的數據

 //take 發送前3個數據
        Observable observable = Observable.just( 1 , 2 , 3 , 4 , 5 , 6 , 7 ) ;
        observable.take( 3 )
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {
                        Log.e( "take-- ","" +o );
                    }
                })   ;

        //takeLast 發送最後三個數據
        Observable observable2 = Observable.just( 1 , 2 , 3 , 4 , 5 , 6 , 7 ) ;
        observable2.takeLast( 3 )
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {
                        Log.e( "takeLast-- ","" +o );
                    }
                })   ;

        //first 只發送第一個數據
        Observable observable3 = Observable.just( 1 , 2 , 3 , 4 , 5 , 6 , 7 ) ;
        observable3.first()
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {
                        Log.e( "first-- ","" +o );
                    }
                })   ;

        //last 只發送最後一個數據
        Observable observable4 = Observable.just( 1 , 2 , 3 , 4 , 5 , 6 , 7 ) ;
        observable4.last()
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {
                        Log.e( "last-- ","" +o );
                    }
                })   ;

        //skip() 跳過前2個數據發送後面的數據
        Observable observable5 = Observable.just( 1 , 2 , 3 , 4 , 5 , 6 , 7 ) ;
        observable5.skip( 2 )
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {
                        Log.e( "skip-- ","" +o );
                    }
                })   ;

        //skipLast() 跳過最後兩個數據,發送前面的數據
        Observable observable6 = Observable.just( 1 , 2 , 3 , 4 , 5 , 6 , 7 ) ;
        observable6.skipLast( 2 )
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {
                        Log.e( "skipLast-- ","" +o );
                    }
                })   ;

6 elementAt 、elementAtOrDefault

//elementAt() 發送數據序列中第n個數據 ,序列號從0開始
        //如果該序號大於數據序列中的最大序列號,則會拋出異常,程序崩潰
        //所以在用elementAt操作符的時候,要注意判斷發送的數據序列號是否越界
        int s = 6;
        List list = new ArrayList<>();
        for (int i = 0; i < 7; i++) {
            list.add(i);
        }
        if(s >= list.size()){
            Log.e("elementAt","角標越界了");
        }else {
            Observable observable7 = Observable.from(list);
            observable7.elementAt(s)
                    .subscribe(new Action1() {
                        @Override
                        public void call(Object o) {
                            Log.e("elementAt-- ", "" + o);
                        }
                    });
        }

        //elementAtOrDefault( int n , Object default ) 發送數據序列中第n個數據 ,序列號從0開始。
        //如果序列中沒有該序列號,則發送默認值
        Observable observable9 = Observable.just( 1 , 2 , 3 , 4 , 5 ) ;
        observable9.elementAtOrDefault(  8 , 666  )
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {
                        Log.e( "elementAtOrDefault-- ",""+  o );
                    }
                })   ;

7 startWith() 插入數據

        //插入普通數據
        //startWith 數據序列的開頭插入一條指定的項 , 最多插入9條數據
        Observable observable = Observable.just( "aa" , "bb" , "cc" ) ;
        observable
                .startWith( "dd" , "ee" )
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {
                        Log.e( "startWith-- ",""+ o );
                    }
        }) ;

        //插入Observable對象
        List list = new ArrayList<>() ;
        list.add( "ff" ) ;
        list.add( "gg" ) ;
        observable.startWith( Observable.from( list ))
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {
                        Log.e( "startWith2 -- ",""+ o );
                    }
        }) ;

8 delay操作符,延遲數據發送

Observable observable = Observable.just( "1" , "2" , "3" , "4" , "5" , "6" , "7" , "8" ) ;

        //延遲數據發射的時間,僅僅延時一次,也就是發射第一個數據前延時。發射後面的數據不延時
        observable.delay( 3 , TimeUnit.SECONDS )  //延遲3秒鐘
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {
                       Log.e("delay-- ",""+ o);
                    }
        }) ;

9 Timer 延時操作符的使用

 //5秒後輸出 hello world , 然後顯示一張圖片
        Observable.timer( 5 , TimeUnit.SECONDS )
                .observeOn(AndroidSchedulers.mainThread() )
                .subscribe(new Action1() {
                    @Override
                    public void call(Long aLong) {
                       Log.e( "timer--hello world ",""+aLong );
                }
       }) ;

delay 、timer 總結: 
相同點:delay 、 timer 都是延時操作符。
不同點:delay 延時一次,延時完成後,可以連續發射多個數據。timer延時一次,延時完成後,只發射一次數據。
10 interval 輪詢操作符,循環發送數據,數據從0開始遞增
//參數一:延遲時間 參數二:間隔時間 參數三:時間顆粒度

    Observable observable =  Observable.interval(3000, 1000, TimeUnit.MILLISECONDS) ;
        Subscription subscription = observable.subscribe(new Action1() {
            @Override
            public void call(Object o) {
                Log.e( "interval-  ","" + o );
            }
        })  ;

11 doOnNext() 操作符,在每次 OnNext() 方法被調用前執行
使用場景:從網絡請求數據,在數據被展示前,緩存到本地

Observable observable = Observable.just( "1" , "2" , "3" , "4" ) ;
                observable.doOnNext(new Action1() {
                    @Override
                    public void call(Object o) {
                        Log.e( "doOnNext--緩存數據",""+ o  );
                    }
                }).subscribe(new Observer() {
                            @Override
                            public void onCompleted() {

                            }

                            @Override
                            public void onError(Throwable e) {

                            }

                            @Override
                            public void onNext(Object o) {
                                Log.e( "onNext--",""+ o  );
                            }
          }) ;

12 Buffer 操作符
Buffer( int n ) 把n個數據打成一個list包,然後再次發送。
Buffer( int n , int skip) 把n個數據打成一個list包,然後跳過第skip個數據。

  List list = new ArrayList<>();
        for (int i = 1; i < 10; i++) {
            list.add("" + i);
        }

        Observable observable = Observable.from(list);
        observable
                .buffer(2)   //把每兩個數據為一組打成一個包,然後發送
                .subscribe(new Action1>() {
                    @Override
                    public void call(List strings) {
                       Log.e("call","buffer---------------" );
                        Observable.from( strings ).subscribe(new Action1() {
                            @Override
                            public void call(String s) {
                                Log.e( "buffer data --", s);
                            }
                        }) ;
                    }
        });

13、throttleFirst 操作符
使用場景:1、button按鈕防抖操作,防連續點擊 2、百度關鍵詞聯想,在一段時間內只聯想一次,防止頻繁請求服務器

 Observable.interval( 1 , TimeUnit.SECONDS)
                .throttleFirst( 3 , TimeUnit.SECONDS )
                .subscribe(new Action1() {
                    @Override
                    public void call(Long aLong) {
                        Log.e( "throttleFirst--",""+aLong );
                    }
       }) ;

14 distinct 過濾重復的數據

 List list = new ArrayList<>() ;
        list.add( "1" ) ;
        list.add( "2" ) ;
        list.add( "1" ) ;
        list.add( "3" ) ;
        list.add( "4" ) ;
        list.add( "2" ) ;
        list.add( "1" ) ;
        list.add( "1" ) ;

        Observable.from( list )
                .distinct()
                .subscribe(new Action1() {
                    @Override
                    public void call(String s) {
                        Log.e( "distinct--",s );
        }
       }) ;

15 debounce() 操作符
一段時間內沒有變化,就會發送一個數據。
16 doOnSubscribe()
使用場景: 可以在事件發出之前做一些初始化的工作,比如彈出進度條等等
注意:
1、doOnSubscribe() 默認運行在事件產生的線程裡面,然而事件產生的線程一般都會運行在 io 線程裡。那麼這個時候做一些,更新UI的操作,是線程不安全的。所以如果事件產生的線程是io線程,但是我們又要在doOnSubscribe() 更新UI , 這時候就需要線程切換。
2、如果在 doOnSubscribe() 之後有 subscribeOn() 的話,它將執行在離它最近的 subscribeOn() 所指定的線程。
3、 subscribeOn() 事件產生的線程 ; observeOn() : 事件消費的線程

Observable.create(onSubscribe)
    .subscribeOn(Schedulers.io())
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            progressBar.setVisibility(View.VISIBLE); // 需要在主線程執行
        }
    })
    .subscribeOn(AndroidSchedulers.mainThread()) // 指定主線程
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);

17、range 操作符的使用
Range操作符發射一個范圍內的有序整數序列,你可以指定范圍的起始和長度。

RxJava將這個操作符實現為range函數,它接受兩個參數,一個是范圍的起始值,一個是范圍的數據的數目。如果你將第二個參數設為0,將導致Observable不發射任何數據(如果設置為負數,會拋異常)。

range默認不在任何特定的調度器上執行。有一個變體可以通過可選參數指定Scheduler。

Observable.range( 10 , 3 )
              .subscribe(new Action1() {
                  @Override
                  public void call(Integer integer) {
                      Log.v( "rx_range  " , "" + integer ) ;
                  }
              }) ;

18、defer 操作符

i = "11 " ;

        Observable defer = Observable.defer(new Func0>() {
            @Override
            public Observable call() {
                return Observable.just( i ) ;
            }
        }) ;

        Observable test = Observable.just(  i ) ;

        i = "12" ;

        defer.subscribe(new Action1() {
            @Override
            public void call(String s) {
                Log.e( "rx_defer  " , "" + s ) ;
            }
        }) ;

        test.subscribe(new Action1() {
            @Override
            public void call(Object o) {
                Log.e( "rx_just " , "" + o ) ;
            }
        }) ;

可以看到,just操作符是在創建Observable就進行了賦值操作,而defer是在訂閱者訂閱時才創建Observable,此時才進行真正的賦值操作。

2 生命周期控制和內存優化

RxJava使我們很方便的使用鏈式編程,代碼看起來既簡潔又優雅。但是RxJava使用起來也是有副作用的,使用越來越多的訂閱,內存開銷也會變得很大,稍不留神就會出現內存溢出的情況
1、取消訂閱 subscription.unsubscribe() ;

 private void subscribe() {
        subscription =  Observable.just( "123").subscribe(new Action1() {
            @Override
            public void call(String s) {
                Log.e( "tt--", s );
            }
        }) ;
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();

        if(subscription!=null){
            subscription.unsubscribe();
        }
    }

2、線程調度
Scheduler調度器,相當於線程控制器
Schedulers.immediate() : 直接在當前線程運行,相當於不指定線程。這是默認的 Scheduler。
Schedulers.newThread() :總是啟用新線程,並在新線程執行操作.
Schedulers.io():I/O 操作(讀寫文件、讀寫數據庫、網絡信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多,區別在於 io() 的內部實現是是用一個無數量上限的線程池,可以重用空閒的線程,因此多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免創建不必要的線程。
Schedulers.computation() : 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
還有RxAndroid裡面專門提供了AndroidSchedulers.mainThread(),它指定的操作將在 Android 主線程運行。

常見的場景:為了不阻塞UI,在子線程加載數據,在主線線程顯示數據

Observable.just( "1" , "2" , "3" )
                .delay(2,TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())  //指定 subscribe() 發生在 IO 線程
                .observeOn( AndroidSchedulers.mainThread() )  //指定 Subscriber 的回調發生在主線程
                .subscribe(new Action1() {
                    @Override
                    public void call(String s) {
                        ((TextView)findViewById(R.id.text)).setText( s );
                    }
                }) ;

Scheduler 自由多次切換線程。恩,這個更為牛逼
observeOn() 可以調用多次來切換線程,observeOn 決定他下面的方法執行時所在的線程。
subscribeOn() 用來確定數據發射所在的線程,位置放在哪裡都可以,但它是只能調用一次的。

Observable.just(1, 2, 3, 4) // IO 線程,由 subscribeOn() 指定
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .map(new Func1() {
                    @Override
                    public String call(Integer integer) {
                        return integer +"66";
                    }
                }) // 新線程,由 observeOn() 指定
                .observeOn(Schedulers.io())
                // IO 線程,由 observeOn() 指定
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(String s) {
                        Log.e("onNext==",s);
                    }
                });  // Android 主線程,由 observeOn() 指定

線程調度

例1

 Observable
                .create(new Observable.OnSubscribe() {
                    @Override
                    public void call(Subscriber subscriber) {
                        Log.e("rx_call", Thread.currentThread().getName());

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })
                .map(new Func1() {
                    @Override
                    public String call(String s) {
                        Log.e( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })
                .subscribe(new Action1() {
                    @Override
                    public void call(String s) {
                        Log.e( "rx_subscribe" , Thread.currentThread().getName()  );
                        Log.e( "rx_subscribe" , s  );
                    }
                }) ;

例2

 new Thread(new Runnable() {
            @Override
            public void run() {
                Log.e( "rx_newThread" , Thread.currentThread().getName()  );
                rx();
            }
        }).start();

 private void rx(){
        Observable
                .create(new Observable.OnSubscribe() {
                    @Override
                    public void call(Subscriber subscriber) {
                        Log.e( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })
                .map(new Func1() {
                    @Override
                    public String call(String s) {
                        Log.e( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })
                .subscribe(new Action1() {
                    @Override
                    public void call(String s) {
                        Log.e( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;

    }

例3

 Observable
                .create(new Observable.OnSubscribe() {
                    @Override
                    public void call(Subscriber subscriber) {
                        Log.e( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Func1() {
                    @Override
                    public String call(String s) {
                        Log.e( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })
                .subscribe(new Action1() {
                    @Override
                    public void call(String s) {
                        Log.e( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;

例4
map() 操作符默認運行在事件產生的線程之中。事件消費只是在 subscribe() 裡面

Observable
                .create(new Observable.OnSubscribe() {
                    @Override
                    public void call(Subscriber subscriber) {
                        Log.e( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })
                .map(new Func1() {
                    @Override
                    public String call(String s) {
                        Log.e( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())

                .subscribe(new Action1() {
                    @Override
                    public void call(String s) {
                        Log.e( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;

create() , just() , from() 等 — 事件產生
map() , flapMap() , scan() , filter() 等 – 事件加工
subscribe() – 事件消費
事件產生:默認運行在當前線程,可以由 subscribeOn() 自定義線程
事件加工:默認跟事件產生的線程保持一致, 可以由 observeOn() 自定義線程
事件消費:默認運行在當前線程,可以有observeOn() 自定義

例5 多次切換線程

Observable
                .create(new Observable.OnSubscribe() {
                    @Override
                    public void call(Subscriber subscriber) {
                        Log.e( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })

                .observeOn( Schedulers.newThread() )    //新線程

                .map(new Func1() {
                    @Override
                    public String call(String s) {
                        Log.e( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })

                .observeOn( Schedulers.io() )      //io線程

                .filter(new Func1() {
                    @Override
                    public Boolean call(String s) {
                        Log.e( "rx_filter" , Thread.currentThread().getName()  );
                        return s != null ;
                    }
                })

                .subscribeOn(Schedulers.io())     //定義事件產生線程:io線程
                .observeOn(AndroidSchedulers.mainThread())     //事件消費線程:主線程
                .subscribe(new Action1() {
                    @Override
                    public void call(String s) {
                        Log.e( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;

例6:只規定事件產生的線程

Observable
                .create(new Observable.OnSubscribe() {
                    @Override
                    public void call(Subscriber subscriber) {
                        Log.e( "rx--create " , Thread.currentThread().getName() ) ;
                        subscriber.onNext( "dd" ) ;
                    }
                })
                .subscribeOn(Schedulers.io())
                .subscribe(new Action1() {
                    @Override
                    public void call(String s) {
                        Log.e( "rx--subscribe " , Thread.currentThread().getName() ) ;
                    }
        }) ;

例:7:只規定事件消費線程

Observable
                .create(new Observable.OnSubscribe() {
                    @Override
                    public void call(Subscriber subscriber) {
                        Log.e( "rx--create " , Thread.currentThread().getName() ) ;
                        subscriber.onNext( "dd" ) ;
                    }
                })
                .observeOn( Schedulers.newThread() )
                .subscribe(new Action1() {
                    @Override
                    public void call(String s) {
                        Log.e( "rx--subscribe " , Thread.currentThread().getName() ) ;
                    }
        }) ;

例8:線程調度封裝

public class RxUtil {

    private final static Observable.Transformer schedulersTransformer = new  Observable.Transformer() {

        @Override 
        public Object call(Object observable) {

            return ((Observable)  observable).subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };

   public static   Observable.Transformer applySchedulers() {
        return (Observable.Transformer) schedulersTransformer;
    }

}

Observable.just( "123" )
        .compose( RxUtil.applySchedulers() )
        .subscribe(new Action1() {
            @Override
            public void call(Object o) {
                Log.e("call==",""+o);
            }
        }) ;
  1. 上一頁:
  2. 下一頁:
熱門文章
閱讀排行版
Copyright © Android教程網 All Rights Reserved