Android教程網
  1. 首頁
  2. Android 技術
  3. Android 手機
  4. Android 系統教程
  5. Android 游戲
 Android教程網 >> Android技術 >> Android開發 >> 關於android開發 >> Android開發學習之路-Android中使用RxJava,-androidrxjava

Android開發學習之路-Android中使用RxJava,-androidrxjava

編輯:關於android開發

Android開發學習之路-Android中使用RxJava,-androidrxjava


RxJava的核心內容很簡單,就是進行異步操作。類似於Handler和AsyncTask的功能,但是在代碼結構上不同。

RxJava使用了觀察者模式和建造者模式中的鏈式調用(類似於C#的LINQ)。

觀察者模式:Observable(被觀察者)被Observer(觀察者)訂閱(Subscribe)之後,Observable在發出消息的時候會通知對應的Observer,並且,一個Observable可以有被多個Observer訂閱。

鏈式調用:和Builder模式類似,調用對應的方法對原對象進行處理後返回原對象,從而做到鏈式調用。


 這裡介紹一些下面會用到的名詞:


先從簡單入手模擬一下這個過程:

先配置依賴(版本為當前最新):

dependencies {
    ...
    compile 'io.reactivex:rxjava:1.1.8'
    compile 'io.reactivex:rxandroid:1.2.1'
}

接著在工程的onCreate方法中直接加入如下代碼:

 1         Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
 2             @Override
 3             public void call(Subscriber<? super String> subscriber) {
 4                 subscriber.onNext("hello");
 5                 subscriber.onNext("android");
 6                 subscriber.onCompleted();
 7             }
 8         });
 9 
10         Subscriber<String> subscriber = new Subscriber<String>() {
11             @Override
12             public void onCompleted() {
13                 Log.d(TAG, "onCompleted() called");
14             }
15 
16             @Override
17             public void onError(Throwable e) {
18                 Log.d(TAG, "onError() called with: e = [" + e + "]");
19             }
20 
21             @Override
22             public void onNext(String s) {
23                 Log.d(TAG, "onNext() called with: s = [" + s + "]");
24             }
25         };
26 
27         observable.subscribe(subscriber);

1-8行創建了一個Observable對象,這個對象可以通過create方法來創建,傳入一個OnSubscribe的實現類,並在其onCall方法中調用subscriber的onNext方法進行消息的發送,分別發送兩個字符串然後調用onComplete方法表示發送完成。

10-25行創建了一個Subscriber(訂閱者)對象,這個類是Observer的子類,Observer是傳統的觀察者,而Subscriber則豐富了Observer,Subsciber添加了兩個方法,分別是:

  • onStart:在Subscriber和Observable連接之後,消息發送之前調用,也就是先於onNext方法調用
  • unsubscirbe:解除訂閱關系,Subscriber不再收到從Observable發送的消息

如果不需要用到這兩個方法,直接創建Observer也是可以的,創建的方法類似Subscriber。

27行調用了Observable的subscribe方法對Subscriber進行綁定。

運行程序會打印如下的Log:

 最簡單的流程就是這樣,如果上面代碼中的局部變量不是必須的話,代碼可以變成如下所示:

        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("hello");
                subscriber.onNext("android");
                subscriber.onCompleted();
            }
        }).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted() called");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError() called with: e = [" + e + "]");
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext() called with: s = [" + s + "]");
            }
        });

效果是完全一樣的,相信可以看得明白,後面這種形式會比較常見。

 

接著要介紹兩個類——Actionx和Funcx(這兩個類最後的x為數字)。

Actionx:指的是一組操作(用於簡化Subscriber),而這組操作有x個參數,並且返回Void

Funcx:指的是一組方法(用於對消息進行處理,下面的變換會使用到),這個方法有x個參數和1個返回值

這裡還是和上面例子一樣,假設我們只關心Subscriber的onNext方法,而不關心onCompleted和onError方法,那麼這一個Subscriber就可以被替換為一個Action1,代碼如下:

        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("hello");
                subscriber.onNext("android");
                subscriber.onCompleted();
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.d(TAG, "call() called with: s = [" + s + "]");
            }
        });

 如果我們只關心onNext和onError,則可以用兩個Action1來表示,因為subscribe的重載允許我們這麼做,可以自行嘗試。


 

因為RxJava的內容其實比較多,這裡就對其中一些進行介紹,有興趣的可以轉到文章最後的地方會給出一些參考文章。

① 創建操作

常見的創建操作有幾種:Create、From、Just和Interval(還有其他可以參考文末文章)

上面的例子中直接使用了create方法進行Observable的創建,下面可以通過just來進行簡單的創建,代碼如下:

1         Observable.just("hello", "world").subscribe(new Action1<String>() {
2             @Override
3             public void call(String s) {
4                 Log.d(TAG, "call() called with: s = [" + s + "]");
5             }
6         });

和例子中的效果是一樣的,但是這裡要注意了,just方法可以傳入任何類型的對象,但是必須和Action1或者Subscriber所指定的泛型一致。

也就是假設我們要打印的不是字符串而是數字,代碼變成如下所示:

1         Observable.just(1, 2).subscribe(new Action1<Integer>() {
2             @Override
3             public void call(Integer s) {
4                 Log.d(TAG, "call() called with: s = [" + s + "]");
5             }
6         });

Action1所指定的泛型由String改為Integer,call方法的參數也相應的更改。

from方法也可以創建Observable,用法可以自行嘗試。

這裡說一下Interval方法,Android開發中可能會用到。Interval是定時器,每隔一段時間發送一個消息,這個消息由一個Long類型的值表示,從0開始。代碼:

1         Observable.interval(1, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
2             @Override
3             public void call(Long aLong) {
4                 Log.d(TAG, "call() called with: s = [" + aLong + "]");
5             }
6         });

運行代碼之後,Logcat會每隔一秒打印一句Log,而aLong的值會從0開始每次遞增1。interval方法的參數分別是:間隔值、間隔單位,上面代碼指的是每隔1s觸發一次。

光有計時器還不行,我們怎麼讓它自動停止呢,比如說打印5次。這裡涉及到RxJava的過濾問題,就是使用take方法並傳入發送次數,當發送次數大於這個值的時候,其他的消息都被過濾了,我們的Subscriber不再接收。代碼如下:

1         Observable.interval(1, TimeUnit.SECONDS).take(5).subscribe(new Action1<Long>() {
2             @Override
3             public void call(Long aLong) {
4                 Log.d(TAG, "call() called with: s = [" + aLong + "]");
5             }
6         });

這裡的take方法不能在subscribe方法後調用,因為subscribe方法返回值不再是Observable

當然,take方法還有其他重載,可以自行嘗試。

 

② 變換操作

變換操作主要是對Observable發送的消息進行變換,使得在Subscriber中處理起來更加簡單。就通過上面的例子來說明吧,我們設定了的計時器中,每次返回給Subscriber(Action1)的值是Long類型的,假設我們要在接收端得到一個Integer類型的值,怎麼辦呢?就要使用變換,最常用的變換就是map。

 1         Observable.interval(1, TimeUnit.SECONDS).take(5).map(new Func1<Long, Integer>() {
 2             @Override
 3             public Integer call(Long aLong) {
 4                 return aLong.intValue();
 5             }
 6         }).subscribe(new Action1<Integer>() {
 7             @Override
 8             public void call(Integer integer) {
 9                 Log.d(TAG, "call() called with: s = [" + integer + "]");
10             }
11         });

可以看到這個時候Action1使用的泛型變為Integer而不是Long,因為我們在subscribe方法之前調用了map方法對消息進行變換,可以看到,變換需要傳入一個Funcx的對象,並實現一個call方法將Long類型的數據轉換為Integer類型。希望你還記得Func1表示含有一個參數和一個返回值。

接著再說一個比較常用的,名字是flatmap。它的作用,是把一個數據類型變成一個Observable對象。也就是說Observable裡面包含Observable,而新建出來的Observable中的消息會交付給原來的Observable進行發送。如果不明白,可以看例子:

假設有兩個類,一個是學生(姓名、選課),一個是課程(分數),學生類中的選課是數組類型,要求是打印所有學生的每個選課成績:

    private class Student{
        String name;
        Course[] courses;
        Student(String name, Course[] courses) {
            this.name = name;
            this.courses = courses;
        }
    }

    private class Course{
        int score;
        private Course(int score) {this.score = score;}
    }

如果我們上面的做法,代碼如下:

 1         Student s1 = new Student("Jack", new Course[]{new Course(80), new Course(79)});
 2         Student s2 = new Student("Rose", new Course[]{new Course(87), new Course(69)});
 3 
 4         Observable.just(s1, s2).subscribe(new Action1<Student>() {
 5             @Override
 6             public void call(Student student) {
 7                 for (int i = 0; i < student.courses.length; i++) {
 8                     Log.d(TAG, "score = " + student.courses[i].score);
 9                 }
10             }
11         });

就是在接收端接收到一個Student類型的值,再使用for循環打印對應屬性的值。

但是如果我們不想使用for循環,而是希望直接再接收端就可以處理對應的成績,就要用到flatmap。代碼如下:

 1         Observable.just(s1, s2).flatMap(new Func1<Student, Observable<Course>>() {
 2             @Override
 3             public Observable<Course> call(Student student) {
 4                 return Observable.from(student.courses);
 5             }
 6         }).subscribe(new Action1<Course>() {
 7             @Override
 8             public void call(Course course) {
 9                 Log.d(TAG, "score = " + course.score);
10             }
11         });

※ Observable的泛型改為我們要轉換的類型Course,表示我們把兩個Student對象的每個Course發送出去。在Func1中的call方法中根據傳入的Student對象的courses屬性構造一個Observable並返回,RxJava會幫我們把每個Course逐個發送出去,log如下所示:

這裡如果你覺得還是不夠,你想把每個成績變成Integer再發送,該怎麼做?

我們可以在返回Observable之前再進行一次map轉換,這個時候得到的就是Integer了,記得更改Observable對應的泛型為Integer哦,代碼如下:

1 Observable.just(s1, s2).flatMap(new Func1<Student, Observable<Integer>>() { 2 @Override 3 public Observable<Integer> call(Student student) { 4 return Observable.from(student.courses).map(new Func1<Course, Integer>() { 5 @Override 6 public Integer call(Course course) { 7 return course.score; 8 } 9 }); 10 } 11 }).subscribe(new Action1<Integer>() { 12 @Override 13 public void call(Integer score) { 14 Log.d(TAG, "score = " + score); 15 } 16 }); View Code

變換操作還有其他一些,可以自行了解嘗試。

 

③ 過濾操作

在上面的計時器的例子中我們已經使用了過濾操作,take算是比較簡單的過濾了。這裡簡單介紹一下debounce,它可以讓我們過濾掉一段時間內的消息。

設定了debounce之後,在一個消息發送之後的一段時間之內發送的消息將會被過濾拋棄掉。這裡沒想到更好的例子,原諒我:

1         Observable.interval(1, TimeUnit.SECONDS).debounce(2, TimeUnit.SECONDS)
2                 .subscribe(new Action1<Long>() {
3             @Override
4             public void call(Long aLong) {
5                 Log.d(TAG, "call() called with: aLong = [" + aLong + "]");
6             }
7         });

可以看到我們設置的定時,每隔1s發送一個消息,而又給它設定了debounce為2s,什麼意思呢,就是間隔小於2s的消息都被拋棄,所以這段代碼沒有任何輸出。


 

實際上,RxJava還有其他一些操作,這裡先忽略。說一些Android開發有關的

我們的異步操作一般是用來進行網絡請求的,或者進行耗時操作,那麼這個時候肯定不能在主線程中進行,而RxJava的一大優點是,我們可以任意的切換線程,並且,切換起來非常方便。

為了驗證所在的線程,我們先編寫一個方法來獲取當前線程的id:

1     private long getCurrentThreadId() {
2         return Thread.currentThread().getId();
3     }

接著我們先驗證一般情況下消息的發送和接收:

 1         Observable.create(new Observable.OnSubscribe<String>() {
 2             @Override
 3             public void call(Subscriber<? super String> subscriber) {
 4                 Log.d(TAG, "Thread id of sending message is " + getCurrentThreadId());
 5                 subscriber.onNext("hello");
 6             }
 7         }).subscribe(new Action1<String>() {
 8             @Override
 9             public void call(String s) {
10                 Log.d(TAG, "Thread id of receiving message is " + getCurrentThreadId());
11             }
12         });

這段代碼很簡單,分別在發送和接收的地方打印當前的線程id,log如下:

那麼該如何切換線程呢?這裡要用到兩個方法:

  • subscribeOn:指定事件發生的線程
  • observeOn: 指定事件接收的線程

這兩個方法都需要傳入Scheduler對象,而這個對象可以通過Schedulers和AndroidSchedulers來獲取。

  • Scheduler.newThread():始終開啟新的線程
  • Scheduler.immediate():當前線程(默認)
  • Scheduler.io():IO線程,如網絡下載,文件讀寫
  • Scheduler.computation():計算線程,如圖形計算等
  • AndroidSchedulers.mainThread():Android中的UI線程
  • AndroidSchedulers.from(Looper looper):根據Looper來獲取對應的線程,這裡可以根據Handler來取得Handler對應的線程

如果我們需要在新的線程中進行消息發送(網絡下載),在UI線程中更新UI,那麼代碼應該如下所示:

 1         Observable.create(new Observable.OnSubscribe<String>() {
 2             @Override
 3             public void call(Subscriber<? super String> subscriber) {
 4                 Log.d(TAG, "Thread id of sending message is " + getCurrentThreadId());
 5                 subscriber.onNext("hello");
 6             }
 7         })
 8         .subscribeOn(Schedulers.io())
 9         .observeOn(AndroidSchedulers.mainThread())
10         .subscribe(new Action1<String>() {
11             @Override
12             public void call(String s) {
13                 Log.d(TAG, "Thread id of receiving message is " + getCurrentThreadId());
14             }
15         });

這個時候Log如下所示:

可以看到,現在兩個操作已經處於不同線程了。

如果有看我的上一篇文章,文章中需要進行高斯模糊的計算,這個計算過程可能會需要一點時間,如果不想讓界面卡頓是建議開啟新的線程進行的,而這裡正好可以用到調度器的Computation,是不是很方便呢。

 

參考文章:

  1. 上一頁:
  2. 下一頁:
熱門文章
閱讀排行版
Copyright © Android教程網 All Rights Reserved