Android教程網
  1. 首頁
  2. Android 技術
  3. Android 手機
  4. Android 系統教程
  5. Android 游戲
 Android教程網 >> Android技術 >> Android資訊 >> Android FutureTask 分析

Android FutureTask 分析

日期:2017/8/18 9:19:22      編輯:Android資訊

之前在研究AsyncTask源代碼的時候發現了它的內部使用了FutureTaskFutureCallable類來實現,因為之前在學習Java的時候並沒有接觸到這些東西,於是乎就打開了百度看了半天別人的博客也沒有理解其用法以及原理,後來果斷的查看了一下其源代碼之後才知道其來龍去脈。官方文檔這麼介紹FutureTask類的。

A cancellable asynchronous computation. This class provides a base implementation of Future, with methods to start and cancel a computation, query to see if the computation is complete, and retrieve the result of the computation. The result can only be retrieved when the computation has completed; the get methods will block if the computation has not yet completed. Once the computation has completed, the computation cannot be restarted or cancelled (unless the computation is invoked using runAndReset()).

翻譯:

這是一個可以取消的異步計算,該類提供了Future的基本實現,具有啟動和取消運算,查詢運算是否結束,並且檢查返回計算的結果,該結果只能在運行完成之後才能獲取到,如果程序沒有運行結束,則`get()`將會阻塞。程序運行結束之後,無法重新啟動或者是取消程序(除非調用`runAndReset`方法)

也就是說FutureTask也是一個用來執行異步任務的類,同時當程序執行完成之後還會返回運算的結果。我們之前也學過了使用Thread+Runnable來執行異步任務的,但是使用這種方式不能獲取到執行的結果而已。下面我們就來看看裡面具體的原理。

用法

我們就使用該類來實現一個模擬一個非常簡單的事情,使用Thread.sleep()來模擬執行耗時的操作工作,然後將執行完成的結果返回出來。然後打印出來:

public class FutureTaskActivity extends Activity implements View.OnClickListener {

    .........

    //創建一個實現Callable接口的並且在 call()方法中做耗時的操作
    class WorkTask implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            //我們這裡通過使用線程 sleep來模擬耗時的操作,以後我們所有的耗時操作都在該方法裡面執行了
            Thread.sleep(5000);
            //將執行的結果返回出去
            return 1000;
        }
    }

    .........

    private void executeTask() {
        //創建一個worktask,並且當作參數傳入到FutureTask中。
        WorkTask workTask = new WorkTask();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(workTask) {
            @Override
            protected void done() {
                try {
                    //該方法回調意思線程運行結束回調的,然後獲取call方法中返回的結果
                    int result = get();
                    Log.i("LOH", "result..." + result);
                    Thread.currentThread().getName();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
        };
        //將FutureTask作為參數傳入到Thread函數中執行。
        Thread thread = new Thread(futureTask);
        //啟動線程執行任務
        thread.start();
    }
}

從上面的代碼中我們可以看到其實它的使用跟Runnable的使用差不多的,只是多出了一個用來實現Callable接口的類作為參數傳入到 FutureTask中,並且以前的那種方法不能監聽線程執行完成的。以前我們在使用線程的時候都知道只有三種方式來執行異步任務:

  • 使用new Thread(new Runnable()).start()的方法來執行異步任務,也就是說將實現Runnable接口的類當作參數傳入到Thread 類中,然後使用thread.start來啟動線程執行
  • 通過繼承Thread類並且重寫該類的 run 方法,在該方法中執行耗時的操作,同樣也是使用 Thread.start()來啟動線程執行
  • 使用線程池來執行實現了 Runnable 接口的類,這樣子也可以達到執行異步任務的目的。

通過對上面的這些總結我們可以知道了要想實現異步任務的話就必須實現 Runnable() 接口才行的,所以我們也可以非常肯定的斷定FutureTask也是實現了該接口的。不然就無法放到執行異步任務的,通過對上面的一個簡單的介紹我們知道了如何使用它,下面就來看看裡面到底是一個什麼樣的機制。

源碼分析

首先我們通過一個類的關系圖來看看這幾個類之間的關系圖

image_1bhu4a4bccid1s0iq001sgbpe49.png-36.4kB

從圖中我們可以一目了然的看到FutureTask實現了RunnableFuture接口,但是RunnableFuture也實現了Runnable接口和Future接口,所以FutureTask可以當作任務在線程池中執行,也可以當作參數傳入Thread中進行啟動任務,在創建FutureTask對象的時候需要傳入一個Callable接口的實現類,從上面的中我們可以看到執行FutureTask任務同樣也是在 run方法中的。

private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
    private static final long STATE;
    private static final long RUNNER;
    private static final long WAITERS;
    static {
        try {
           //通過Unsafe來獲取字段state相對於本對象內存地址的偏移地址
            STATE = U.objectFieldOffset
                (FutureTask.class.getDeclaredField("state"));
            //獲取字段runner在內存中的偏移地址
            RUNNER = U.objectFieldOffset
                (FutureTask.class.getDeclaredField("runner"));
            //獲取字段waiters在內存中的偏移地址
            WAITERS = U.objectFieldOffset
                (FutureTask.class.getDeclaredField("waiters"));
        } catch (ReflectiveOperationException e) {
            throw new Error(e);
        }

        Class<?> ensureLoaded = LockSupport.class;
}

在創建FutureTask對象之前,有一個非常重類 sun.misc.Unsafe 該類的介紹我們這裡也不做介紹,在我的另外一篇博客中會有介紹的。靜態代碼塊中的功能主要是獲取該對象的字段在內存中的偏移地址,獲取這些偏移地址的作用是為了直接操作內存中某個變量的變量值做准備的。我們在學習C或者是C++的時候知道,如果我們知道了某個對象的某個字段的內存地址話,那我們就可以直接通過地址的方式來更新該字段的內存值了。

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}
public class FutureTask<V> implements RunnableFuture<V> {

   .......

    public void run() {
        /**
         * 如果當前的線程的狀態不是新創建的話就返回
         * 第三個條件判斷是 安全檢查具體沒有找到源代碼,我們先不用管
         */
        if (state != NEW ||
            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
            return;
        try {
            //將我們傳進來的callable對象賦值給一個臨時變量
            Callable<V> c = callable;
            //判斷傳入進來的callable對象不為空並且線程狀態也是新建的
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    /**
                     * 原來我們總是說的call()方法原來是在run方法中執行的
                     * 然後call()返回一個泛型類型的返回值 ,這種通過實現接口的方法在我們平時中是很常見的吧.
                     */
                    result = c.call();
                    ran = true;
                   /**
                    * 該地方作者考慮的很清楚,在定義call方法的時候拋出異常,然後這裡捕捉異常進行
                    * 處理,因為我們在call方法中寫代碼難免會有異常問題的。
                    */
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                //如果call方法中不跑出異常的話,則通過set()方法將結果保存起來
                //該set()方法其實也是Future接口定義的方法
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            //如果當前的狀態不是
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

    .........
}

當線程執行完成之後(沒有異常)就會調用set方法,根據上面的代碼我們其實也沒有什麼好看的,call()方法就是在run()方法中執行的。

protected void set(V v) {
    if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
        //最後將結果賦值給成員變量 outcome
        outcome = v;
        //同時更新state狀態為NORMAL,這是這種更新方式是直接根據內存地址來修改內存值的
        U.putOrderedInt(this, STATE, NORMAL); // final state
        finishCompletion();
    }
}

boolean compareAndSwapInt() 方法是Unsafe中的本地方法,主要作用是用於在多線程中並發的修改和讀取某個值,其主要原理:根據傳入的期望的數據跟內存中的數據進行對比,如果期望的數據跟內存中的數據相同的話,說明該變量的值沒有被其他的線程修改過,同時將我們需要更改的新數據替換內存中的數據,體會成功之後並且返回true,表示的是修改新數據成功了。相反如果有其他線程修改了內存中的則放棄更新新數據,並且返回true。它有三個參數:object 表示更改數據的對象;offset 表示對象上字段的偏移地址;expectedValue 表示期望的數據,該數據的作用是用於跟內存中的數據進行比較,如果兩者不想等的話說明內存中的數據被其他線程修改過;newValue表示更新的值。最後如果更新某個內存地址的值成功的話,則返回true,否則返回false。這個也是我們平時用到的多線程並發的原理基石理論:CAS(Compare and Swap, 翻譯成比較並交換)。

private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            //根據尋找內存地址的方式來修改屬性在內存中的值,將該waiters對象置為null
            if (U.compareAndSwapObject(this, WAITERS, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        //回調Future接口的方法,標識中程序正常的結束了,我們需要重寫該方法.
        done();
        //特別需要注意的是需要將該接口變量置為空,防止出現因為引用問題導致內粗洩漏
        callable = null;        // to reduce footprint,
    }

當執行到該方法的時候也標識的程序正常的運行結束了,首先會將所有等待的線程全部喚醒,因為在執行FutureTask任務的時候調用get()方法是阻塞的,因為call()方法都還沒有執行完成,這個時候你是獲取不到任何結果的,所以會將當前調用get()方法的線程阻塞等待,直到調用finishCompletion()方法來解除線程阻塞,最後調用done()方法,這個時候我們就可以在該結束方法中執行我們想要的邏輯了;從代碼中我們可以看出done()方法其實也還是運行在子線程的,所以我們並不可以在done()方法中更新UI的,還是需要Handler來發送消息的。

在線程都全部執行結束之後,我們就可以在done()方法通過調用get()方法來獲取最後執行的結果了,也就是剛剛在set()方法中看到的outcome的值。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

我們在之前在調用set()方法時通過尋址(根據內存地址的偏移量)的方式修改過了state的值為NORMAL了,所以NORMAL大於COMPLETING,最後直接調用report()方法,最後直接通過return x 來返回結果。這個也就是我們使用FutureTask的一個大概的流程。其實通過代碼我們就能很容易的看出該類的大概的設計原理,同時還可以學到更多的其他技術。

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

我們通過調用Future接口的isDone()來判斷程序是否結束,可以直接根據state的狀態判斷是否是新創建的,該類的線程有7中不同的狀態,主要狀態切換成其中的一種我們就可以說程序結束了。

private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

public boolean isDone() {
    return state != NEW;
}

只要狀態值大於CANCELLED(4),也就是用戶主動調用cancel()方法,不管是主動中斷線程還是其他的方式都屬於取消的操作的。

public boolean isCancelled() {
    return state >= CANCELLED;
}

當程序裡面的FutureTask未執行完成的時候get()方法會一直阻塞調用該方法的線程,直到FutureTask裡面的任務執行才會解除阻塞。所以get()方法是一個阻塞式的去獲取結果的,從上面的get()方法的代碼中我們可以得出當狀態還是NEW的時候,會調用awaitDone(false ,0)方法。

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    // The code below is very delicate, to achieve these goals:
    // - call nanoTime exactly once for each call to park
    // - if nanos <= 0L, return promptly without allocation or nanoTime
    // - if nanos == Long.MIN_VALUE, don't underflow
    // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
    //   and we suffer a spurious wakeup, we will do no worse than
    //   to park-spin for a while
    long startTime = 0L;    // Special value 0L means not yet parked
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING)
            // We may have already promised (via isDone) that we are done
            // so never return empty-handed or throw InterruptedException
            Thread.yield();
        else if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        else if (q == null) {
            if (timed && nanos <= 0L)
                return s;
            q = new WaitNode();
        }
        else if (!queued)
            queued = U.compareAndSwapObject(this, WAITERS,
                                            q.next = waiters, q);
        else if (timed) {
            ........
        }
        else
            LockSupport.park(this);
    }
}

該方法有個無限循環知道狀態值大於COMPLETING才返回一個狀態值,我們在線程未執行完成的時候調用了get()方法,可以看到首先會創建一個WaitNode對象,然後通過Unsafe類來更新成員變量waiter的值為 q,然後再次循環最後會進入 LockSupport.park(this) 分支,該函數主要是獲取許可阻塞當前的線程,直到程序執行結束之後,調用LockSupport.unpark(this)來釋放阻塞。所以如果我們在主線程中直接調用get()方法來獲取結果的話則很有可能導致ANR,直到程序結束之後才會釋放阻塞的,正確的用法就是在done()方法裡面調用get()來獲取執行的結果的。關於LockSupport是一個非常重要的多線程並發的類,不懂的直接在百度上看看其解釋。

我們平時在使用AsyncTask的時候有一個cancel()方法來取消當前執行的任務,我們之前也說了AsyncTask的本質其實也是使用了FutureTask來實現的。其實它的cancel()方法也是調用FutureTask的取消方法的,下面看看取消的原理:

//如果返回值為true的話表示取消成功,否則為取消失敗了
public boolean cancel(boolean mayInterruptIfRunning) {
    //首先判斷當前的狀態是否是NEW,然後在通過Unsafe類去更新內存中state字段的值為cancel。 
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {
        //如果以上的狀態值設置成功的話,則判斷是否設置中斷運行
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                //直接通過調用Thread的中斷方法來強制中斷當前運行的線程
                if (t != null)
                    t.interrupt();
            } finally { // final state
                //最後修改當前狀態state的值為 INTERRUPTED 為中斷
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        //最後解鎖所有被阻塞的線程
        finishCompletion();
    }
    return true;
}

我們在取消任務的時候可以設置強制中斷線程運行,只要調用cancel(true) 就行了,有時候我們調用cancel(false)並不能立刻的停止線程執行完成的,因為這個時候程序在run()方法中已經執行過了狀態(state)值判斷的話,這個時候就直接執行call()方法了,但是call() 方法也沒有執行完成,如果這個時候我們去取消的話, 因為我們知道取消的原理就是使用Unsafe類去修改內存中的state的值,但是這個時候設置已經來不急了。

public void run() {
        //第一次線程狀態判斷
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            //第二次線程判斷,如果我們的設置了一般的取消操作比該判斷滯後的話則是沒有什麼用的。
            if (c != null && state == NEW) {
                .....

                result = c.call();

                .....
            }
        } finally {
            ........
        }
    }

雖然我們調用了cancel(false)方法去取消任務的,但是很多的時候還是不能馬上終止任務執行,最後線程還是會繼續執行的,但是到了set()方法的時候,這裡會有一個狀態值的判斷的。之前我們已經介紹了線程並發的基石CAS,首先我們使用Unsafe類去比較state狀態值是否發生了變化,如果state的值被其他的線程修改了,則不會調用done()方法了。

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        ........
    }
}

總結

從上面我們對源代碼的分析的分析很明顯的知道了 Callable、Future、FutureTask三者之間的關系了,也很明白的知道的如何更好的使用它們了,其實FutureTask根本就沒有想象的這麼難,我看網上把FutureTask說的神乎其神,不會拋出什麼異常,然後可以返回獲取結果等等各種各樣的解釋,由於沒有例子和代碼,最後還是不知道講的是什麼。最後發現不過就是一個接口函數在run()方法中執行,我們只需要實現Callable接口並且重寫call()方法就可以了。不過通過看源代碼可以使我們學到很多的東西。

本次有兩個非常重要的東西很值得我們繼續去研究,由於本人理解水平有限,博客裡面的內容寫的比較亂,有什麼疑問的大家一起討論和學習。

  • Unsafe 類的使用以及作用
  • LockSupport 作用以及使用
  • CAS (Compare and Swap, 比較並交換),該原理是所有線程並發的原理,有時間可以深入了解下
  1. 上一頁:
  2. No
熱門文章
閱讀排行版
Copyright © Android教程網 All Rights Reserved