Android教程網
  1. 首頁
  2. Android 技術
  3. Android 手機
  4. Android 系統教程
  5. Android 游戲
 Android教程網 >> Android技術 >> 關於Android編程 >> Android響應式編程(一)RxJava前篇[入門基礎]

Android響應式編程(一)RxJava前篇[入門基礎]

編輯:關於Android編程

1.RxJava概述

ReactiveX與RxJava

在講到RxJava之前我們首先要了解什麼是ReactiveX,因為RxJava是ReactiveX的一種java實現。
ReactiveX是Reactive Extensions的縮寫,一般簡寫為Rx,微軟給的定義是,Rx是一個函數庫,讓開發者可以利用可觀察序列和LINQ風格查詢操作符來編寫異步和基於事件的程序,開發者可以用Observables表示異步數據流,用LINQ操作符查詢異步數據流, 用Schedulers參數化異步數據流的並發處理,Rx可以這樣定義:Rx = Observables + LINQ + Schedulers。

為何要用RxJava

想到異步的操作我們會想到android的AsyncTask 和Handler,但是隨著請求的數量越來越多,代碼邏輯將會變得越來越復雜而RxJava卻仍舊能保持清晰的邏輯。RxJava的原理就是創建一個Observable對象來干活,然後使用各種操作符建立起來的鏈式操作,就如同流水線一樣把你想要處理的數據一步一步地加工成你想要的成品然後發射給Subscriber。

RxJava與觀察者模式

RxJava的異步操作是通過擴展的觀察者模式來實現的,不了解觀察者模式的可以先看下 Rxjava有四個基本的要素:Observable (被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、event(事件)。Observable (被觀察者) 和 Observer (觀察者)通過 subscribe() 方法實現訂閱關系,Observable就可以在需要的時候來通知Observer。

2.RxJava基本用法

在使用RxJava前請現在Android Studio 配置gradle:

dependencies {
    ...
    compile 'io.reactivex:rxjava:1.1.6'
    compile 'io.reactivex:rxandroid:1.2.1'
}

其中RxAndroid是RxJava的一部分,在普通的RxJava基礎上添加了幾個有用的類,比如特殊的調度器,後文會提到。

RxJava的基本用法分為三個步驟,他們分別是:

創建Observer(觀察者)

決定事件觸發的時候將有怎樣的行為

           Subscriber subscriber=new Subscriber() {
            @Override
            public void onCompleted() {
                Log.i("wangshu","onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i("wangshu","onError");
            }

            @Override
            public void onNext(String s) {
                Log.i("wangshu","onNext"+s);
            }

            @Override
            public void onStart() {
                Log.i("wangshu","onStart");
            }
        };

其中onCompleted、onError和onNext是必須要實現的方法,他們的含義分別是:

onCompleted:事件隊列完結,RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列。當不會再有新的 onNext發出時,需要觸發 onCompleted() 方法作為完成標志。 onError:事件隊列異常,在事件處理過程中出異常時,onError() 會被觸發,同時隊列自動終止,不允許再有事件發出。 onNext:普通的事件,將要處理的事件添加到事件隊列中。 onStart:它會在事件還未發送之前被調用,可以用於做一些准備工作。例如數據的清零或重置,這是一個可選方法,默認情況下它的實現為空。

當然如果要實現簡單的功能也可以用到Observer來創建觀察者,Observer是一個接口,而上面用到Subscriber是在Observer基礎上進行了擴展,在後文的Subscribe訂閱過程中Observer也會先被轉換為Subscriber來使用。

        Observer observer = new Observer() {
            @Override
            public void onCompleted() {
                Log.i("wangshu", "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i("wangshu", "onError");
            }

            @Override
            public void onNext(String s) {
                Log.i("wangshu", "onNext" + s);
            }
        };

創建 Observable(被觀察者)

它決定什麼時候觸發事件以及觸發怎樣的事件。 RxJava 使用 create() 方法來創建一個 Observable ,並為它定義事件觸發規則:

    Observable observable = Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                subscriber.onNext("楊影楓");
                subscriber.onNext("月眉兒");
                subscriber.onCompleted();
            }
        });

通過調用subscriber的方法,不斷的將事件添加到任務隊列中,也可用just來實現:

  Observable observable = Observable.just("楊影楓", "月眉兒");

上述的代碼會依次調用onNext(“楊影楓”)、onNext(“月眉兒”)、onCompleted()。

Subscribe (訂閱)

訂閱比較簡單:

 observable.subscribe(subscriber);

或者也可以調用

 observable.subscribe(observer);

運行代碼查看log:

com.example.liuwangshu.moonrxjava I/wangshu: onStart
com.example.liuwangshu.moonrxjava I/wangshu: onNext楊影楓
com.example.liuwangshu.moonrxjava I/wangshu: onNext月眉兒
com.example.liuwangshu.moonrxjava I/wangshu: onCompleted

3.不完整定義回調

上文介紹了回調的接收主要是依賴subscribe(Observer) 和 subscribe(Subscriber),除此之外RxJava還提供了另一種回調方式,也就是不完整回調。再講到不完整回調之前我們首先要了解Action,查看RxJava源碼我們發現提供了一堆Action:
這裡寫圖片描述

我們打開Action0來看看:<喎?/kf/ware/vc/" target="_blank" class="keylink">vcD4NCjxwcmUgY2xhc3M9"brush:java;"> public interface Action0 extends Action { void call(); }

再打開Action1:

public interface Action1 extends Action {
    void call(T t);
}

最後看看Action9:

public interface Action9 extends Action {
    void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9);
}

很明顯Action後的數字代表回調的參數類型數量,上文訂閱也就可以改寫為下面的代碼:

        Action1 onNextAction = new Action1() {
            @Override
            public void call(String s) {
                Log.i("wangshu", "onNext" + s);
            }
        };
        Action1 onErrorAction = new Action1() {
            @Override
            public void call(Throwable throwable) {

            }
        };
        Action0 onCompletedAction = new Action0() {
            @Override
            public void call() {
                Log.d("wangshu", "onCompleted");
            }
        };
        observable.subscribe(onNextAction,onErrorAction,onCompletedAction);

我們定義了onNextAction來處理onNext的回調,同理我們還定義了onErrorAction和onCompletedAction,最後我們把他傳給subscribe方法。很顯然這樣寫的靈活度很大一些,同時我們也可以只傳一個或者兩個Action:

  observable.subscribe(onNextAction);
  observable.subscribe(onNextAction,onErrorAction);

第一行只定義了onNextAction來處理onNext的回調,而第二行則定義了onNextAction處理onNext的回調,onErrorAction來處理onError的回調。

4.Scheduler

內置的Scheduler

方才我們所做的都是運行在主線程的,如果我們不指定線程,默認是在調用subscribe方法的線程上進行回調的,如果我們想切換線程就需要使用Scheduler。RxJava 已經內置了5個 Scheduler:

Schedulers.immediate():默認的,直接在當前線程運行,相當於不指定線程。 Schedulers.newThread():總是啟用新線程,並在新線程執行操作。 Schedulers.io():I/O 操作(讀寫文件、讀寫數據庫、網絡信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多,區別在於 io() 的內部實現是是用一個無數量上限的線程池,可以重用空閒的線程,因此多數情況下 io() 比 newThread() 更有效率。 Schedulers.computation():計算所使用的 Scheduler,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。 Schedulers.trampoline():當我們想在當前線程執行一個任務時,並不是立即時,可以用.trampoline()將它入隊。這個調度器將會處理它的隊列並且按序運行隊列中每一個任務。

另外RxAndroid也提供了一個常用的Scheduler:

AndroidSchedulers.mainThread():RxAndroid庫提供的Scheduler,它指定的操作在主線程中運行。

控制線程

subscribeOn() 和 observeOn() 兩個方法來對線程進行控制。
subscribeOn()方法指定 subscribe() 這個方法所在的線程,即事件產生的線程。observeOn()方法指定 Subscriber 回調所運行在的線程,即事件消費的線程。

Action1 onNextAction = new Action1() {
            @Override
            public void call(String s) {
                Log.i("wangshu", "onNext" + s);

            }
        };
Observable observable = Observable.just("楊影楓", "月眉兒"); 
            observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(onNextAction);

我們仍舊是用log打印出onNext事件所傳遞過來的字符串,只不過這一次事件的產生的線程是在io線程上,事件回調的線程則是在主線程。

5.RxJava基礎應用

好了,講的不是很多,我們來舉一個例子來消化上面的知識。RxJava+Retrofit訪問網絡是比較搭的,但是此前我的網絡系列並沒有介紹Retrofit,所以我們先准備用RxJava+OKHttp來訪問網絡,至於RxJava+OKHttp訪問網絡會在此系列的以後的章節做介紹。
此前我們用OkHttp3訪問網絡是這樣做的:

      private void postAsynHttp(int size) {
        mOkHttpClient=new OkHttpClient();
        RequestBody formBody = new FormBody.Builder()
                .add("size", size+"")
                .build();
        Request request = new Request.Builder()
                .url("http://api.1-blog.com/biz/bizserver/article/list.do")
                .post(formBody)
                .build();
        Call call = mOkHttpClient.newCall(request);
        call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {

            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                String str = response.body().string();
                Log.i("wangshu", str);
                runOnUiThread(new Runnable() {
                    @Override
                    public void run() {
                        Toast.makeText(getApplicationContext(), "請求成功", Toast.LENGTH_SHORT).show();
                    }
                });
            }

        });
    }

接下來我們進行改造,首先我們創建Observable(被觀察者):

     private Observable getObservable(final int size){
       Observable observable = Observable.create(new Observable.OnSubscribe() {
           @Override
           public void call(final Subscriber subscriber) {
               mOkHttpClient=new OkHttpClient();
               RequestBody formBody = new FormBody.Builder()
                       .add("size",size+"")
                       .build();
               Request request = new Request.Builder()
                       .url("http://api.1-blog.com/biz/bizserver/article/list.do")
                       .post(formBody)
                       .build();
               Call call = mOkHttpClient.newCall(request);
               call.enqueue(new Callback() {
                   @Override
                   public void onFailure(Call call, IOException e) {
                       subscriber.onError(new Exception("error"));
                   }

                   @Override
                   public void onResponse(Call call, Response response) throws IOException {
                       String str = response.body().string();
                       subscriber.onNext(str);
                       subscriber.onCompleted();
                   }
               });
           }
       });
    return observable;
   }

我們將根據Okhttp的回調(不在主線程)來定義事件的規則,調用subscriber.onNext來將請求返回的數據添加到事件隊列中。接下來我們來實現觀察者:

private void postAsynHttp(int size){   
getObservable(size).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber() {
           @Override
           public void onCompleted() {
               Log.i("wangshu", "onCompleted");
           }

           @Override
           public void onError(Throwable e) {
             Log.i("wangshu", e.getMessage());
           }

           @Override
           public void onNext(String s) {
               Log.i("wangshu", s);
               Toast.makeText(getApplicationContext(), "請求成功", Toast.LENGTH_SHORT).show();
           }
       });
   }

我們將事件產生也就是訪問網絡的操作設置為io線程,訪問網絡回調設置為主線程,所以Toast是能正常顯示的。好了這一篇就講到這裡,關於RxJava的文章後期還會寫,敬請期待。

  1. 上一頁:
  2. 下一頁:
熱門文章
閱讀排行版
Copyright © Android教程網 All Rights Reserved