Android教程網
  1. 首頁
  2. Android 技術
  3. Android 手機
  4. Android 系統教程
  5. Android 游戲
 Android教程網 >> Android技術 >> 關於Android編程 >> Android中的線程池(一)

Android中的線程池(一)

編輯:關於Android編程

一直想寫關於AsyncTask的實現原理,AsyncTask的實現是用到了線程池和消息機制的,關於Android中的消息機制我已經在博客裡寫過了,有興趣的同學可以去閱讀。那這篇博客就一起來學習Android中的線程池。關於Android的線程池有2篇。這一篇是基礎篇,下一篇是提高篇。

在講解Android中的線程池前,先介紹兩個和線程池相關的類,在AsyncTask的實現中也會接觸到。

Callable與FutureTask

Callable是一個泛型接口,接收一個泛型參數V,內部只有一個返回值為V的call方法

public interface Callable {
    V call() throws Exception;
}

我們對Runnable比較熟悉,Callable和Runnable很相似,區別是Callable的call方法有返回值,而Runnable的run方法沒有返回值。
Runable是包裝在Thread 內部,而Callable包裝在FutureTask內部。

那就來看看FutureTask吧。從FutureTask的繼承結構說起:

public class FutureTask implements RunnableFuture {
    //代碼省略
}

FutureTask實現了RunnableFuture接口的,可RunnableFuture怎麼那麼像Runnable和Future的合體?Future又是什麼?

那麼看看RunnableFuture接口

public interface RunnableFuture extends Runnable, Future {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

果然RunnableFuture是繼承自Runnable和Future的。

那麼Future是什麼呢?
官方文檔對其的描述是這樣的:

A Future represents the result of an asynchronous computation. 

而我查閱相關書籍得到了更好的描述是: Future為線程池制定了一個可管理的任務標准,它提供了對Runnable,Callable任務的執行結果進行取消,查詢是否完成,獲取結果,設置結果的操作。

public interface Future {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

}

而FaskTask作為Runnable和Future的實現類,那也就有了他們兩者的能力。簡單理解就是有了可以獲取任務執行之後的結果等等很強大的能力。 那既然FutureTask那麼強大,就繼續挖掘他內部的信息吧。

從FutureTask的構造方法開始:

public FutureTask(Callable callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

這個構造方法接收一個Callable對象,這樣就在內部完成了對Callable的包裝。

接著看第二個構造方法:

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

Executors調用callable方法將傳入的Runnable轉換為Callable並賦值給FutureTask的callable字段。
在Android中創建線程池是使用工廠方法的模式的,而這個工廠就是Executors。這個先放放,會講到的。
先來看下Executors的callable方法的內部邏輯:

public static  Callable callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter(task, result);
}

如果傳入的Runnable為null,則會拋出空指針異常。否則將Runnable傳入RunnableAdapter的構造方法。
這個RunnableAdapter是Executors的內部類。是實現了Callable接口的。那麼繼續跟進RunnableAdapter看看

static final class RunnableAdapter implements Callable {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

內部的邏輯實現是用到了適配器模式,從構造方法中傳入Runnable對象,然後Callable調用call方法時,實際上內部執行的是Runnable的run方法。但是在外部看來,我就是在調用Callable的call方法,call方法內部邏輯我作為調用者是不關心的。
關於適配器模式我也寫過一篇博客,有興趣的同學也可以去閱讀一下。

好了,我們可以總結一下,在創建FutureTask的時候,如果注入的是Runnable,則會被Executors的callable方法轉換成Callable。
如果注入的是Callable,FutureTask還是會執行Callable的任務。
也就是說無論傳入Callable或Runnable,FutureTask最終執行的都是Callable的任務。

FutureTask的創建好像挺復雜的哦。其實不復雜,我覺的只是設計的巧妙而已,就是做初始化的工作嘛。

那麼剛才說到FutureTask是Runnable和Future的合體,那麼FutureTask一定實現了Runnable的run方法的吧,我們可以看看FutureTask的run方法內部實現:

public void run() {
    if (state != NEW ||
        !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
        return;
    try {
        Callable c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

如果FutureTask的創建是正常的話,那麼
if (c != null && state == NEW)這個判斷是能通過的。從這個if語句的邏輯可以知道。FutureTask調用run方法就一定會調用Callable的call方法,並將執行結果賦值給result,result也是一個泛型,從創建FutureTask那一刻已經定了。
最後會調用set方法講result設置進去。因此我們可以通過get方法來獲取run方法執行的結果, 因為run方法內部包裝了call方法,更准確說是獲取call方法的執行結果。這就是FutureTask設計的強大之處。

好了。講了很多,還不知道FutureTask怎麼使用呢。其實Callable和FutureTask是用於線程池的。

線程池

這篇文章作為基礎篇,當然要先簡單的講下線程池,下一篇繼續深入
之前也提過是使用工廠方法的模式來創建的。我們可以這樣創建一個簡單的線程池。

ExecutorService executorService =Executors.newSingleThreadExecutor();

將FutureTask提交就行了。

executorService.submit(new FutureTask(new Callable() {
    @Override
    public Integer call() throws Exception {
        //具體邏輯
        return null;
    }
}));

使用傳進Callable的構造方法創建了一個FutureTask對象,並傳進submit方法,提交。submit方法返回一個Future對象,也就是說我們可以使用這個Future對象來做一些相關的操作,比如說獲取call方法返回的結果。

而至於調用submit方法提交FutureTask之後又是怎麼樣的呢?我們之前分析過,Callable封裝在FutureTask中,call方法是在FutureTask裡的run方法中調用的。那麼run方法在哪裡得到了執行呢?

這個要從newSingleThreadExecutor方法裡開始尋找信息。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue()));
}

ExecutorService只是一個接口。核心在於ThreadPoolExecutor,他是ExecutorService的實現類,他非常非常的重要。那麼ThreadPoolExecutor中的submit方法就是我們的答案。

糟糕的是ThreadPoolExecutor並沒有submit方法,那麼只能向上尋找, ThreadPoolExecutor繼承自AbstractExecutorService。AbstractExecutorService是一個抽象類。那麼跟進AbstractExecutorService瞧瞧。然後就找到了submit方法。
submit有三個重載的方法,而在我們剛才的舉例中傳進的是FutureTask,那麼我們需要了解的是這個

public Future submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

好,繼續分析

RunnableFuture ftask = newTaskFor(task, null);

這行代碼的意義就是保證最終傳入execute方法的是RunnableFuture類型的對象。看看newTaskFor方法

protected  RunnableFuture newTaskFor(Runnable runnable, T value) {
    return new FutureTask(runnable, value);
}

內部調用的是FutureTask的構造方法。

好,那就看到execute方法了。傳進execute方法的是RunnableFuture。

但是AbstractExecutorService並沒有實現execute方法,想想也對,Execute應該讓具體的子類來實現的。那麼回到ThreadPoolExecutor,找到execute方法。謎底快揭曉了

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

首先我們要先回憶,剛才我們在submit方法中調用execute方法,傳入的是RunnableFuture對象。
這裡做了一系列的判斷,主要是核心線程數,任務隊列等等,這些都會在下一篇終極版講到。RunnableFuture對象會傳進addWorker方法,我們現在看到addWorker方法即可

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {

    //代碼省略

    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

抓重點,看看傳進的Runnable在哪裡使用到了,在這裡

w = new Worker(firstTask);
final Thread t = w.thread;

這裡Worker又對Runnable進行了包裝。Worker的構造方法如下:

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

繼續前進,然後w.thread取出Worker裡的線程對象並賦值給t變量。

往下看addWorker方法,一系列的判斷之後

if (workerAdded) {
   t.start();
   workerStarted = true;
}

在這裡調用了Worker裡的線程對象的start方法。那麼就會執行Worker裡面的run方法。如下

/** Delegates main run loop to outer runWorker. */
public void run() {
    runWorker(this);
}

run方法內部調用的runWorker方法的源碼如下:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

看到這一行代碼:

Runnable task = w.firstTask

這行代碼取出了Worker之前封裝的Runnable對象(一開始的FutureTask),並賦值給task,繼續看runWorker方法,最終……

try {
    beforeExecute(wt, task);
    Throwable thrown = null;
    try {
        task.run();
    } catch (RuntimeException x) {
        thrown = x; throw x;
    } catch (Error x) {
        thrown = x; throw x;
    } catch (Throwable x) {
        thrown = x; throw new Error(x);
    } finally {
        afterExecute(task, thrown);
    }
} 

可以看到,task(FutureTask)終於調用run方法。這就是我們一直想尋找的。我們剛開始就想知道,submit提交了FutureTask之後,FutureTask的run方法在哪裡被調用了。現在終於明白了。終於海闊天空了。

呼,結束了!我們大致分析了線程池的submit,execute的過程,不敢說思路很清晰,但是至少心中有數。

相信下一篇終極版會更加的暢快。

  1. 上一頁:
  2. 下一頁:
熱門文章
閱讀排行版
Copyright © Android教程網 All Rights Reserved