Android教程網
  1. 首頁
  2. Android 技術
  3. Android 手機
  4. Android 系統教程
  5. Android 游戲
 Android教程網 >> Android技術 >> 關於Android編程 >> RX操作符之輔助操作

RX操作符之輔助操作

編輯:關於Android編程

一、materialize

Materialize將數據項和事件通知都當做數據項發射,Dematerialize剛好相反。一個合法的有限的Obversable將調用它的觀察者的onNext方法零次或多次,然後調用觀察者的onCompleted或onError正好一次。Materialize操作符將這一系列調用,包括原來的onNext通知和終止通知onCompleted或onError都轉換為一個Observable發射的數據序列。materialize將來自原始Observable的通知轉換為Notification對象,然後它返回的Observable會發射這些數據。

 Observable observable =  Observable.create(new Observable.OnSubscribe() {

            @Override
            public void call(Subscriber subscriber) {
                for (int i = 0; i < 5; i++) {
                        subscriber.onNext(i + "");
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {

                    }
                }
                subscriber.onCompleted();
            }
        });


        Subscriber

運行結果:

 

\

 

二、dematerialize

Dematerialize操作符是Materialize的逆向過程,它將Materialize轉換的結果還原成它原本的形式。

ematerialize反轉這個過程,將原始Observable發射的Notification對象還原成Observable的通知。

 

 Observable observable =  Observable.create(new Observable.OnSubscribe() {

            @Override
            public void call(Subscriber subscriber) {
                for (int i = 0; i < 5; i++) {
                    subscriber.onNext(i + "");
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {

                    }
                }
                subscriber.onCompleted();
            }
        });


        Subscriber

 

 

運行結果:

\
 

 

三、timeStamp

timestamp發射T類型數據的Observable轉換為一個發射類型為Timestamped的數據的Observable,每一項都包含數據的原始發射時間。

 

String[] items = {"timeStamp1","timeStamp2","timeStamp3","timeStamp4","timeStamp5"};
        Observable observable =  Observable.from(items);
        Subscriber
運行結果:

 

\
 

四、serialize

一個Observable可以異步調用它的觀察者的方法,可能是從不同的線程調用。這可能會讓Observable行為不正確,它可能會在某一個onNext調用之前嘗試調用onCompleted或onError方法,或者從兩個不同的線程同時調用onNext方法。使用Serialize操作符,你可以糾正這個Observable的行為,保證它的行為是正確的且是同步的。

 

 Observable observable =  Observable.create(new Observable.OnSubscribe() {

            @Override
            public void call(Subscriber subscriber) {
                    subscriber.onNext(1);
                    subscriber.onNext(2);
                    subscriber.onNext(3);
                    subscriber.onCompleted();
                    subscriber.onNext(4);
                    subscriber.onNext(5);
                    subscriber.onCompleted();
            }
        });

        Subscriber subscriber = new Subscriber() {

            @Override
            public void onNext(Integer v) {
                Log.e(TAG,"onNext................."+v);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };

        observable
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        Log.e(TAG,"call.................unsubscribed");
                    }
                })
                .serialize().subscribe(subscriber);

運行結果:

 

\

 

五、replay

保證所有的觀察者收到相同的數據序列,即使它們在Observable開始發射數據之後才訂閱。可連接的Observable (connectable Observable)與普通的Observable差不多,不過它並不會在被訂閱時開始發射數據,而是直到使用了Connect操作符時才會開始。用這種方法,你可以在任何時候讓一個Observable開始發射數據。如果在將一個Observable轉換為可連接的Observable之前對它使用Replay操作符,產生的這個可連接Observable將總是發射完整的數據序列給任何未來的觀察者,即使那些觀察者在這個Observable開始給其它觀察者發射數據之後才訂閱。
 

 

 final ConnectableObservable observable =  Observable.interval(1,TimeUnit.SECONDS).observeOn(Schedulers.newThread())
                .take(5).replay(3);

        final Subscriber subscriber1 = new Subscriber() {

            @Override
            public void onNext(Long v) {
                Log.e(TAG,"onNext1................."+v);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted1.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError1.....................");
            }
        };


        Subscriber subscriber = new Subscriber() {

            @Override
            public void onNext(Long l) {
                Log.e(TAG,"onNext................."+l);
                if(l == 3){
                    observable.subscribe(subscriber1);
                }
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };




        observable.subscribe(subscriber);
        observable.connect();

運行結果:

 

\

 

六、observeOn

指定一個觀察者在哪個調度器上觀察這個Observable

 

 Observable observable =  Observable.create(new Observable.OnSubscribe() {

            @Override
            public void call(Subscriber subscriber) {
                for (int i = 0; i < 5; i++) {
                    subscriber.onNext(i + "");
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {

                    }
                }
                subscriber.onCompleted();
            }
        }).observeOn(Schedulers.io());


        Subscriber

運行結果:

 

\

 

七、subscribeon

指定Observable自身在哪個調度器上執行

 

 Observable observable =  Observable.create(new Observable.OnSubscribe() {

            @Override
            public void call(Subscriber subscriber) {
                for (int i = 0; i < 5; i++) {
                    subscriber.onNext(i + "");
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {

                    }
                    Log.e(TAG, "call................."+ Thread.currentThread().getName());
                }
                subscriber.onCompleted();
            }
        })
                .subscribeOn(Schedulers.newThread());


        Subscriber
運行結果:

 

\
 

八、doOnEach

發射完一個Observable時的回調

 

 Observable observable = Observable.just(1,2,3,4,5,6);
        Subscriber subscriber = new Subscriber() {

            @Override
            public void onNext(Integer v) {
                Log.e(TAG,"onNext................."+v);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };

        observable
                .doOnEach(new Action1>() {
                    @Override
                    public void call(rx.Notification notification) {
                        int i = (int) notification.getValue();
                        Log.e(TAG, "發射了一個數據....................."+i);
                    }
                })
                .subscribe(subscriber);

運行結果:

 

\

 

九、doOnNext

doOnNext操作符類似於doOnEach(Action1),但是它的Action不是接受一個Notification參數,而是接受發射的數據項。

 

Observable observable = Observable.just(1,2,3,4,5,6);
        Subscriber subscriber = new Subscriber() {

            @Override
            public void onNext(Integer v) {
                Log.e(TAG,"onNext................."+v);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };

        observable
                .doOnNext(new Action1() {
                    @Override
                    public void call(Integer integer) {
                        if(integer == 3){
                            Log.e(TAG, "doOnNext.....................");
                        }
                    }
                })
                .subscribe(subscriber);
運行結果:

 

\

十、doOnSubscribe

 

 

訂閱生成時的回調 
Observable observable = Observable.just(1,2,3,4,5,6);
        Subscriber subscriber = new Subscriber() {

            @Override
            public void onNext(Integer v) {
                Log.e(TAG,"onNext................."+v);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };

        observable
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        Log.e(TAG, "觀察者訂閱了它生成的Observable.....................");
                    }
                })
                .subscribe(subscriber);

運行結果:

 

\

  1. 上一頁:
  2. 下一頁:
熱門文章
閱讀排行版
Copyright © Android教程網 All Rights Reserved