Android教程網
  1. 首頁
  2. Android 技術
  3. Android 手機
  4. Android 系統教程
  5. Android 游戲
 Android教程網 >> Android技術 >> 關於Android編程 >> AsyncTask - 基本原理 圖文剖析

AsyncTask - 基本原理 圖文剖析

編輯:關於Android編程

最近用到了AsyncTask,這玩意每個寫android程序的都會用,可是不見得每個人都能用的好。如果想要用好,那麼首先勢必對基本原理有個大概了解。其實網上對這類問題的說明已經很多很多了,這裡我就用自己的思維整理一下。

AsyncTask概述

AsyncTask是google公司封裝的一個輕量級的異步任務類。實際上它內部也是通過Thread + handler實現的。如果沒有AsyncTask類,我們完全可以用thread+handler來處理。這個時候就很可能自己回去封裝一下thread+handler了。正是因為這類需求很多,google就幫我們封裝了一下。其實我們也可以自己封裝,但是我相信99%程序員自己封裝的東西比不上google的。所以還是有必要學習一下AsyncTask。

AsyncTask相關的其他類

首先我們看一下AsyncTask用到了哪些主要的框架?注意,每個版本的sdk可能對AsyncTask的實現有所不同,我這裡用的是Android-23.

下面是從sdk裡面copy出來的AsyncTask一部分內容。

 

public abstract class AsyncTask {
    private static final String LOG_TAG = "AsyncTask";

    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
    private static final int CORE_POOL_SIZE = CPU_COUNT + 1;
    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
    private static final int KEEP_ALIVE = 1;

    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);

        public Thread newThread(Runnable r) {
            return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
        }
    };

    private static final BlockingQueue sPoolWorkQueue =
            new LinkedBlockingQueue(128);

    /**
     * An {@link Executor} that can be used to execute tasks in parallel.
     */
    public static final Executor THREAD_POOL_EXECUTOR
            = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
                    TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);

    /**
     * An {@link Executor} that executes tasks one at a time in serial
     * order.  This serialization is global to a particular process.
     */
    public static final Executor SERIAL_EXECUTOR = new SerialExecutor();

    private static final int MESSAGE_POST_RESULT = 0x1;
    private static final int MESSAGE_POST_PROGRESS = 0x2;

    private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
    private static InternalHandler sHandler;

    private final WorkerRunnable mWorker;
    private final FutureTask mFuture;

    private volatile Status mStatus = Status.PENDING;
    ...
};

 

 

 

AsyncTask的數據成員主要有:

1. static Executor THREAD_POOL_EXECUTOR

2. static Executor SERIAL_EXECUTOR;

3. static Executor sDefaultExecutor; 這玩意默認指向了SERIAL_EXECUTOR。看源代碼就知道了:

      private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;

 

 

 

4. static InternalHandler sHandler;

5. WorkerRunnable mWorker;

6. FutureTask mFuture;

7. Status mStatus;

這7個是AsyncTask主要的數據成員,大部分功能都跟這些成員有關。主要上述7個數據成員中,前面4個都是靜態的。

 

THREAD_POOL_EXECUTOR & SERIAL_EXECUTOR

這兩個靜態實例的類分別是:ThreadPoolExecutor和SerialExecutor.

類結構看起來像:

\

其中SerialExecutor很簡單,全部代碼也就:

    private static class SerialExecutor implements Executor {
        final ArrayDeque mTasks = new ArrayDeque();
        Runnable mActive;

        public synchronized void execute(final Runnable r) {
            mTasks.offer(new Runnable() {
                public void run() {
                    try {
                        r.run();
                    } finally {
                        scheduleNext();
                    }
                }
            });
            if (mActive == null) {
                scheduleNext();
            }
        }

        protected synchronized void scheduleNext() {
            if ((mActive = mTasks.poll()) != null) {
                THREAD_POOL_EXECUTOR.execute(mActive);
            }
        }
    }

SerialExecutor是AsyncTask的一個內嵌類。超簡單,裡面就2個數據成員:mTasks和mActive。每次caller調用那個execute,就創建一個Runnable匿名內嵌類對象,這個對象存入mTasks,在匿名內嵌類的run函數裡面調用傳入參數r.run()。然後通過一個scheduleNext函數把mTasks裡面的所有對象通過THREAD_POOL_EXECUTOR.execute(mActive)執行一遍。說穿了,也就是說SerialExecutor類會把所有的任務丟入一個容器,之後把容器裡面的所有對象一個一個的排隊執行THREAD_POOL_EXECUTOR.execute(mActive);

ThreadPoolExecutor就相對復雜一點了。它有幾個比較重要的數據成員:核心線程數,最大線程數,緩存隊列。以下上AsyncTask的一部分代碼。

 

  
    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
    private static final int CORE_POOL_SIZE = CPU_COUNT + 1;
    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
    private static final int KEEP_ALIVE = 1;

    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);

        public Thread newThread(Runnable r) {
            return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
        }
    };
    private static final BlockingQueue sPoolWorkQueue =
            new LinkedBlockingQueue(128);  
    public static final Executor THREAD_POOL_EXECUTOR
            = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
                    TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);

 

 

當創建ThreadPoolExecutor對象的時候,構造函數裡面第一個參數就是核心線程數量,這裡取得是CPU個數 + 1, 第二個參數是最大線程數量,這裡是CPU個數 * 2 + 1,第五個參數是緩沖區的隊列,這裡是個LinkedBlockingQueue,這個隊列的最大容量是128.

實際上ThreadPoolExecutor內部有個線程池概念。它的大概工作原理如下:

1. 如果正在運行的線程數量小於核心線程數量(由調用者設置,像AsyncTask就設置了cpu個數+1),那麼就新創建要給線程,來執行任務(execute的傳入參數)

2. 如果正在運行的線程數量大於等於核心線程數量,這個時候就分兩種情況:

a. 可以把任務丟進緩沖區,那就丟進去,等待空閒線程來執行。

b. 如果緩沖區滿了,那就看最大線程數 - 運行線程數是不是>0。如果 > 0, 就創建線程來運行新的任務。如果=0,那就丟出異常,也就是ThreadPoolExecutor不接受這個任務了。(所以使用ThreadPoolExecutor的時候需要注意異常,因為它有可能不接受任務)以下是ThreadPoolExecutor的execute代碼。

    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }


addWork也是蠻關鍵的一個函數,實現如下:大概就是可以的情況下,創建線程來執行任務。

   private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }


那麼針對AsyncTask的情況,AsyncTask的任務是通過SerialExecutor來調用ThreadPoolExecutor的execute函數的,而SerialExecute已經通過要給容器控制任務一個一個執行了,所以這種情況下ThreadPoolExecutor只會有兩種情況;

1. 沒有任何線程在運行

2. 只有一個線程在運行,執行完一個任務就繼續執行SerialExecutor的容器裡面的下一個任務,如果有,就在當前線程裡面繼續執行,如果沒有線程結束或者空閒。當serialExecutor有新任務來的時候,就再啟動一個線程(或者用某個空閒線程)來執行任務。

總體來講,針對AsyncTask,它有個靜態數據成員SerialExecutor,

public static final Executor SERIAL_EXECUTOR = new SerialExecutor();

也就是說所有的AsyncTask對象,不管有多少個,都共享同一個SerialExecutor對象(因為它是個靜態成員)。

mWorker & mFuture

其實,這兩個家伙只是對Runnable和callback的一個封裝。結構圖:

\

具體細節我們不見得要去關心。當AsyncTask構造函數調用的時候,mWorker和mFuture會被創建,同時mWoker會被傳入到mFuture對象裡面去

    public AsyncTask() {
        mWorker = new WorkerRunnable() {
            public Result call() throws Exception {
                mTaskInvoked.set(true);

                Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                //noinspection unchecked
                Result result = doInBackground(mParams);
                Binder.flushPendingCommands();
                return postResult(result);
            }
        };

        mFuture = new FutureTask(mWorker) {
            @Override
            protected void done() {
                try {
                    postResultIfNotInvoked(get());
                } catch (InterruptedException e) {
                    android.util.Log.w(LOG_TAG, e);
                } catch (ExecutionException e) {
                    throw new RuntimeException("An error occurred while executing doInBackground()",
                            e.getCause());
                } catch (CancellationException e) {
                    postResultIfNotInvoked(null);
                }
            }
        };
    }

這兩個家伙的定義如下;

 

WorkerRunnable mWorker;

FutureTask mFuture;

WorkerRunnable超簡單:

    private static abstract class WorkerRunnable implements Callable {
        Params[] mParams;
    }

就放了個數據成員:Params[] mParams。

那麼整個流程大概是什麼樣子的呢?

1. 首先AsyncTask的構造函數會創建mWorker和mFuture。

2. 調用AsyncTask的execute過程如:

 

    public final AsyncTask execute(Params... params) {
        return executeOnExecutor(sDefaultExecutor, params);
    }
   public final AsyncTask executeOnExecutor(Executor exec,
            Params... params) {
        if (mStatus != Status.PENDING) {
            switch (mStatus) {
                case RUNNING:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task is already running.");
                case FINISHED:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task has already been executed "
                            + "(a task can be executed only once)");
            }
        }

        mStatus = Status.RUNNING;

        onPreExecute();

        mWorker.mParams = params;
        exec.execute(mFuture);

        return this;
    }
execute的函數params被丟到了mWorker裡面去,然後exec.execute(mFuture)執行任務,這裡exec是sDefaultExecutor,而sDefaultExecutor就是SerialExecutor,

 

(private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;)

 

有關WorkerRunnable和FutureTask的具體封裝技巧,這裡不做過多的描述,有興趣可以自己去看,沒興趣就跳過。

 

調用例子

一個典型的調用如下:

        AsyncTask asyncTask = new AsyncTask(){
            @Override
            protected String doInBackground(Void... param)
            {
                Log.v("AsyncTask", "doInBackground");
                return "hello asyncTask";
            }

            @Override
            public void onPostExecute(String response) {
                //    callback.onSendRequestFinished(JsonUtil.jsonToBean(response, mBeanType));
                Toast.makeText(MainActivity.this, "result: " + response, Toast.LENGTH_LONG).show();
            }
        };
        asyncTask.execute();


比如我們可以在onCreate裡面調用這段代碼。如果我們在doInBackground裡面下斷點,就會看到如下調用堆棧:

\
最終的doInBackground是在AsyncTask的構造函數裡面創建的匿名內嵌類裡面被調用的。

    public AsyncTask() {
        mWorker = new WorkerRunnable() {
            public Result call() throws Exception {
                mTaskInvoked.set(true);

                Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                //noinspection unchecked
                Result result = doInBackground(mParams);
                Binder.flushPendingCommands();
                return postResult(result);
            }
        };

        mFuture = new FutureTask(mWorker) {
            @Override
            protected void done() {
                try {
                    postResultIfNotInvoked(get());
                } catch (InterruptedException e) {
                    android.util.Log.w(LOG_TAG, e);
                } catch (ExecutionException e) {
                    throw new RuntimeException("An error occurred while executing doInBackground()",
                            e.getCause());
                } catch (CancellationException e) {
                    postResultIfNotInvoked(null);
                }
            }
        };
    }


mWorker是WorkerRunnable的子類(匿名內嵌子類)對象,mWorker被傳給了mFuture,FutureTask的 callable就是mWorker。

 

   public FutureTask(Callable callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

 

mFuture被傳到了AsyncTask的execute裡面,mFuture實際上就是Runnable的一個子類,mFuture被被SerialExecute傳給ThreadPoolExecute來執行。大概流程:

1. ThreadPoolExecutor裡面的一個線程執行任務(mFuture)

2. FutureTask的run()會在線程裡面被執行

3. Future的run()裡面會嘗試獲得callable,然後調用callable的call()函數。callable就是mWorker,也就是WorkRunnable,而WorkRunnable實現了接口Callable,Callable裡面有個方法就是call()。

    public void run() {
        if (state != NEW ||
            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
            return;
        try {
            Callable c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

4. 這樣,mWorker是WorkerRunnable的子類對象,而且剛好實現了call函數,而call函數通過接口Callable在FutureTask的run()裡面被調用了。所以AsyncTask的構造函數裡面的匿名內嵌類裡面的call實現被ThreadPoolExecutor的線程調用了。也就是這段代碼:

 

        mWorker = new WorkerRunnable() {
            public Result call() throws Exception {
                mTaskInvoked.set(true);

                Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                //noinspection unchecked
                Result result = doInBackground(mParams);
                Binder.flushPendingCommands();
                return postResult(result);
            }
        };

 

就這樣,AsyncTask的doInBackgroun被一個線程給調用了。

整個AsyncTask的大致流程就是這樣,當然還有其他一些內容,如cancel, onPreExecute, onPostExecute, onCancelled等等,還有ThreadPoolExecutor執行完任務後,怎麼通知主線程的等等問題。

畫了個總體類圖:可能不是很准確,但是可以看出各個類之間的關系,作為參考。

\

 

  1. 上一頁:
  2. 下一頁:
熱門文章
閱讀排行版
Copyright © Android教程網 All Rights Reserved