Android教程網
  1. 首頁
  2. Android 技術
  3. Android 手機
  4. Android 系統教程
  5. Android 游戲
 Android教程網 >> Android技術 >> 關於Android編程 >> RxJava(RxAndroid)線程切換機制

RxJava(RxAndroid)線程切換機制

編輯:關於Android編程

自從項目中使用RxJava以來,可以很方便的切換線程。至於是怎麼實現的,一直沒有深入的研究過!本篇文章就是分析RxJava的線程模型。
  RxJava基本使用
  先上一個平時使用RxJava切換線程的例子:
  

Observable observable = Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                subscriber.onNext("hello");
                subscriber.onCompleted();
            }
        });
        Subscriber subscriber = new Subscriber() {
            @Override
            public void onCompleted() {
                //TODO
            }

            @Override
            public void onError(Throwable e) {
                //TODO
            }

            @Override
            public void onNext(String o) {
                Log.e("RxJava", o);
            }
        };
        observable
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(subscriber);

如果結合Lamda表達式,代碼會非常的簡潔,為了更直觀的分析代碼,就沒有使用Lamda。通過subscribeOn(Schedulers.io()),
observeOn(AndroidSchedulers.mainThread())輕松的實現線程的切換。非常的簡單。下面一步一步分析:
  RxJava主要類
  如果對RxJava有了解的話,都知道它實際是用觀察者模式實現的,我們這裡只是簡單的介紹一下主要的類。
  

Observable和Subscriber大家已經很熟悉了,分別是被觀察者和觀察者。在使用RxJava過程中我們一般都是三步,第一步創建Observable,第二部創建Subscriber,第三步通過subscribe()方法達到訂閱的目的。為了搞清楚線程切換的實現,必須先搞清楚RxJava內部調用的流程!
  在上邊代碼中,Observable.create()對應於第一步,會產生一個Observable。那麼是怎麼創建的呢?而create()方法的源碼:
  

public static  Observable create(OnSubscribe f) {
        return new Observable(hook.onCreate(f));
    }

create()會接收一個OnSubscribe實例,OnSubscribe繼承自Action1,有一個call()回調方法。在subscribe()方法執行時會被調用。這裡的hook.onCreate()其實其他的什麼也沒有做就是你傳進去什麼就返回什麼。最後可以看到產生了一個Observable的實例。
  接著看一下subscribe(subscriber) , subscribe()方法會產生訂閱行為,接收一個Subscriber實例,現在有了被觀察者和觀察者,那麼我們就可以分析訂閱行為了。看subscribe()方法的最終實現:
  

 static  Subscription subscribe(Subscriber subscriber, Observable observable) {
     // validate and proceed
        if (subscriber == null) {
            throw new IllegalArgumentException("subscriber can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
            /*
             * the subscribe function can also be overridden but generally that's not the appropriate approach
             * so I won't mention that in the exception
             */
        }

        // new Subscriber so onStart it
        subscriber.onStart();

        /*
         * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
         */
        // if not already wrapped
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber(subscriber);
        }

        // The code below is exactly the same an unsafeSubscribe but not used because it would 
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // in case the subscriber can't listen to exceptions anymore
            if (subscriber.isUnsubscribed()) {
                RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
            } else {
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(hook.onSubscribeError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    hook.onSubscribeError(r);
                    // TODO why aren't we throwing the hook's return value.
                    throw r;
                }
            }
            return Subscriptions.unsubscribed();
        }
    }

一般情況下會執行第31行,

hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);

hook.onSubscribeStart()什麼也沒有做,將observable.onSubscribe直接返回,那麼這句話就可以簡化成
  observable.onSubscribe.call(subscriber);
  如果我們沒有使用任何的操作符,那麼這裡的observable.onSubscribe就是我們在create()方法中傳入的onSubscribe實例。就會回調其call( subscriber)方法。在call()方法中,我們一般又會根據獲得的subscriber引用,去調用相應的onNext()和onComplete()方法。這就是調用的基本流程!
  如果我們使用了例如map這樣的操作符,那麼基本的流程大致是一樣的,只不過是將Observable實例進行相應的變化後,向下傳遞。最終執行subscribe操作的是最後一個Observable,所以,每個變換後的Observable都會持有上一個Observable 中OnSubscribe對象的引用。
  目前還是沒有分析subscribeOn()和observeOn(),接下來就看看他們內部怎麼實現的吧!
  subscribeOn()源碼:
  

 public final Observable subscribeOn(Scheduler scheduler) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable)this).scalarScheduleOn(scheduler);
        }
        return create(new OperatorSubscribeOn(this, scheduler));
    }

正如前邊所說,這裡會產生新的Observable,並且持有上一個Observable的OnSubscribe(我們的例子中就是我們在create()方法中傳入的)的引用。我們繼續看OperatorSubscribeOn這個類:
  

public final class OperatorSubscribeOn implements OnSubscribe {

    final Scheduler scheduler;
    final Observable source;

    public OperatorSubscribeOn(Observable source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }

    @Override
    public void call(final Subscriber subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);

        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();

                Subscriber s = new Subscriber(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }

                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };

                source.unsafeSubscribe(s);
            }
        });
    }
}

該類實現了OnSubscribe接口,並且實現了call()方法,也就意味著,我們之前所分析的在subscribe()時,會一步一步去執行observable.onSubscribe.call(),對於SubscribeOn來說他的call()方法的真正實現就是在這裡。那麼還要注意一點的是這個call()的回調時機,這裡要和ObserveOn()做比較,稍後再分析。回到這裡的call方法,首先通過外部傳入的scheduler創建Worker - inner對象,接著在inner中執行了一段代碼,Action0中call()方法這段代碼就在worker線程中執行了,也就是此刻線程進行了切換。
  注意最後一句代碼source.unsafeSubscribe(s)就是將當前的Observable與上一個Observable通過onSubscribe關聯起來。那麼如果上一個Observable也是一個subscribeOn()產生的那麼會出現什麼情況?很顯然最終會切換到上一個subscribeOn指定的線程中。例如:
  

  btn_map.setOnClickListener(v -> getObservable()
                .map(integer -> null)
                .subscribeOn(Schedulers.computation())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(integer1 -> Log.e("map", integer1 + "")));

map的轉換實際上會發生在computation線程而不是io線程,換句話說就是設置多個subscribeOn時,實際上只會切換到第一個subscribeOn指定的線程。這一點很重要!!!
  到現在我們已經分析了subscribe()訂閱後,一直到回調到我們在create()方法中傳入的OnSubscribe的call()方法,subscribeOn()方法是在這個過程中產生作用的。那麼ObserveOn呢?看源碼:
  

 public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable)this).scalarScheduleOn(scheduler);
        }
        return lift(new OperatorObserveOn(scheduler, delayError, bufferSize));
    }

這裡經過了一次lift變化,這是個啥玩意呢?
  

public final  Observable lift(final Operator operator) {
        return new Observable(new OnSubscribeLift(onSubscribe, operator));
    }

實際上也是包裝了一層Observable。明白了這一點,在回到observeOn()源碼中的OperatorObserveOn。
  

public final class OperatorObserveOn implements Operator {

  ...省略...

    /**
     * @param scheduler the scheduler to use
     * @param delayError delay errors until all normal events are emitted in the other thread?
     * @param bufferSize for the buffer feeding the Scheduler workers, defaults to {@code RxRingBuffer.MAX} if <= 0
     */
    public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
    }

    @Override
    public Subscriber call(Subscriber child) {
        if (scheduler instanceof ImmediateScheduler) {
            // avoid overhead, execute directly
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            // avoid overhead, execute directly
            return child;
        } else {
            ObserveOnSubscriber parent = new ObserveOnSubscriber(scheduler, child, delayError, bufferSize);
            parent.init();
            return parent;
        }
    }

    ...省略...

    /** Observe through individual queue per observer. */
    private static final class ObserveOnSubscriber extends Subscriber implements Action0 {

       ...省略...

        @Override
        public void onNext(final T t) {
            if (isUnsubscribed() || finished) {
                return;
            }
            if (!queue.offer(on.next(t))) {
                onError(new MissingBackpressureException());
                return;
            }
            schedule();
        }

        @Override
        public void onCompleted() {
            if (isUnsubscribed() || finished) {
                return;
            }
            finished = true;
            schedule();
        }

        @Override
        public void onError(final Throwable e) {
            if (isUnsubscribed() || finished) {
                RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
                return;
            }
            error = e;
            finished = true;
            schedule();
        } 

         protected void schedule() {
            if (counter.getAndIncrement() == 0) {
                recursiveScheduler.schedule(this);
            }
        }

        ...省略...
    }
}

安裝前文的分析,先看他的call方法,返回了個ObserveOnSubscriber實例,我們需要關注這個實例的onNext方法,很簡單,只是執行了schedule()方法,該方法中的recursiveScheduler是在構造方法中根據我們設置的schedule創建的Scheduler.Worker 。

this.recursiveScheduler = scheduler.createWorker();

線程再次切換了,並且這次是在OnNext()方法中切換的,注意是在OnNext()方法中切換,和subscribeOn()在call中切換是有區別的。這樣observeOn設置的線程會影響其後面的流程,直到出現下一次observeOn或者結束。
  這樣最基本的線程切換已經搞清楚了,現在我們來分析一下加入例如map這樣的操作符的過程:
  

 observable
        .map(str->str+"Rx")
        .map(str1->str1+"Java")
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(subscriber);

根據上邊的分析,上一張流程圖(不要在意,圖很挫):
  
  
  總的來說,RxJava的處理順序像一條流水線,這不僅僅是代碼寫起來像一條鏈上,邏輯上也是如此,也就是說,當你切換流水的流向(線程),整條鏈都改變了方向,並不會進行分流。理解了這一點,對RxJava的線程切換也就不會感到困難了。

  1. 上一頁:
  2. 下一頁:
熱門文章
閱讀排行版
Copyright © Android教程網 All Rights Reserved