Android教程網
  1. 首頁
  2. Android 技術
  3. Android 手機
  4. Android 系統教程
  5. Android 游戲
 Android教程網 >> Android技術 >> 關於Android編程 >> Android7.0 MessageQueue

Android7.0 MessageQueue

編輯:關於Android編程

Android中的消息處理機制大量依賴於Handler。每個Handler都有對應的Looper,用於不斷地從對應的MessageQueue中取出消息處理。

一直以來,覺得MessageQueue應該是Java層的抽象,然而事實上MessageQueue的主要部分在Native層中。

自己對MessageQueue在Native層的工作不太熟悉,借此機會分析一下。

一、MessageQueue的創建

當需要使用Looper時,我們會調用Looper的prepare函數:

public static void prepare() {
    prepare(true);
}

private static void prepare(boolean quitAllowed) {
    if (sThreadLocal.get() != null) {
        throw new RuntimeException("Only one Looper may be created per thread");
    }
    //sThreadLocal為線程本地存儲區;每個線程僅有一個Looper
    sThreadLocal.set(new Looper(quitAllowed));
}

private Looper(boolean quitAllowed) {
    //創建出MessageQueue
    mQueue = new MessageQueue(quitAllowed);
    mThread = Thread.currentThread();
}

1 NativeMessageQueue

我們看看MessageQueue的構造函數:

MessageQueue(boolean quitAllowed) {
    mQuitAllowed = quitAllowed;
    //mPtr的類型為long?
    mPtr = nativeInit();
}

MessageQueue的構造函數中就調用了native函數,我們看看android_os_MessageQueue.cpp中的實現:

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
    //MessageQueue的Native層實體
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
    ............
    //這裡應該類似與將指針轉化成long類型,放在Java層保存;估計Java層使用時,會在native層將long變成指針,就可以操作隊列了
    return reinterpret_cast(nativeMessageQueue);
}

我們跟進NativeMessageQueue的構造函數:

NativeMessageQueue::NativeMessageQueue() :
        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
    //創建一個Native層的Looper,也是線程唯一的
    mLooper = Looper::getForThread();
    if (mLooper == NULL) {
        mLooper = new Looper(false);
        Looper::setForThread(mLooper);
    }
}

從代碼來看,Native層和Java層均有Looper對象,應該都是操作MessageQueue的。MessageQueue在Java層和Native層有各自的存儲結構,分別存儲Java層和Native層的消息。

2 Native層的looper

我們看看Native層looper的構造函數:

Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
        mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
    //此處創建了個fd
    mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    .......
    rebuildEpollLocked();
}

在native層中,MessageQueue中的Looper初始化時,還調用了rebuildEpollLocked函數,我們跟進一下:

void Looper::rebuildEpollLocked() {
    // Close old epoll instance if we have one.
    if (mEpollFd >= 0) {
        close(mEpollFd);
    }

    // Allocate the new epoll instance and register the wake pipe.
    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
    ............
    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeEventFd;
    //在mEpollFd上監聽mWakeEventFd上是否有數據到來
    int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
    ...........
    for (size_t i = 0; i < mRequests.size(); i++) {
        const Request& request = mRequests.valueAt(i);
        struct epoll_event eventItem;
        request.initEventItem(&eventItem);
        //監聽request對應fd上數據的到來
        int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
        ............
    }
}

從native層的looper來看,我們知道Native層依賴於epoll來驅動事件處理。此處我們先保留一下大致的映像,後文詳細分析。

二、使用MessageQueue

1 寫入消息

Android中既可以在Java層向MessageQueue寫入消息,也可以在Native層向MessageQueue寫入消息。我們分別看一下對應的操作流程。

1.1 Java層寫入消息

Java層向MessageQueue寫入消息,依賴於enqueueMessage函數:

boolean enqueueMessage(Message msg, long when) {
    if (msg.target == null) {
        throw new IllegalArgumentException("Message must have a target.");
    }
    if (msg.isInUse()) {
        throw new IllegalStateException(msg + " This message is already in use.");
    }

    synchronized (this) {
        if (mQuitting) {
            .....
            return false;
        }

        msg.markInUse();
        msg.when = when;
        Message p = mMessages;
        boolean needWake;
        if (p == null || when == 0 || when < p.when) {
            // New head, wake up the event queue if blocked.
            msg.next = p;
            mMessages = msg;
            //在頭部插入數據,如果之前MessageQueue是阻塞的,那麼現在需要喚醒
            needWake = mBlocked;
        } else {
            // Inserted within the middle of the queue.  Usually we don't have to wake
            // up the event queue unless there is a barrier at the head of the queue
            // and the message is the earliest asynchronous message in the queue.
            needWake = mBlocked && p.target == null && msg.isAsynchronous();
            Message prev;
            for (;;) {
                prev = p;
                p = p.next;
                if (p == null || when < p.when) {
                    break;
                }
                //不是第一個異步消息時,needWake置為false
                if (needWake && p.isAsynchronous()) {
                    needWake = false;
                }
            }
            msg.next = p; // invariant: p == prev.next
            prev.next = msg;
        }
        // We can assume mPtr != 0 because mQuitting is false.
        if (needWake) {
            nativeWake(mPtr);
        }
    }
    return true;
}

上述代碼比較簡單,主要就是將新加入的Message按執行時間插入到原有的隊列中,然後根據情況調用nativeAwake函數。

我們跟進一下nativeAwake:

void NativeMessageQueue::wake() {
    mLooper->wake();
}

void Looper::wake() {
    uint64_t inc = 1;
    //就是向mWakeEventFd寫入數據
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
    .............
}

在native層的looper初始化時,我們提到過native層的looper將利用epoll來驅動事件,其中構造出的epoll句柄就監聽了mWakeEventFd。

實際上從MessageQueue中取出數據時,若沒有數據到來,就會利用epoll進行等待;因此當Java層寫入消息時,將會將喚醒處於等待狀態的MessageQueue。

在後文介紹從MessageQueue中提取消息時,將再次分析這個問題。

1.2 Native層寫入消息

Native層寫入消息,依賴於Native層looper的sendMessage函數:

void Looper::sendMessage(const sp& handler, const Message& message) {
    nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
    sendMessageAtTime(now, handler, message);
}

void Looper::sendMessageAtTime(nsecs_t uptime, const sp& handler,
        const Message& message) {
    size_t i = 0;
    {
        AutoMutex _l(mLock);

        //同樣需要按時間插入
        size_t messageCount = mMessageEnvelopes.size();
        while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
            i += 1;
        }

        //將message包裝成一個MessageEnvelope對象
        MessageEnvelope messageEnvelope(uptime, handler, message);
        mMessageEnvelopes.insertAt(messageEnvelope, i, 1);

        // Optimization: If the Looper is currently sending a message, then we can skip
        // the call to wake() because the next thing the Looper will do after processing
        // messages is to decide when the next wakeup time should be.  In fact, it does
        // not even matter whether this code is running on the Looper thread.
        if (mSendingMessage) {
            return;
        }
    }
    // Wake the poll loop only when we enqueue a new message at the head.
    if (i == 0) {
        //若插入在隊列頭部,同樣利用wake函數觸發epoll喚醒
        wake();
    }
}

以上就是向MessageQueue中加入消息的主要流程,接下來我們看看從MessageQueue中取出消息的流程。

2、提取消息

當Java層的Looper對象調用loop函數時,就開始使用MessageQueue提取消息了:

public static void loop() {
    final Looper me = myLooper();
    .......
    for (;;) {
        Message msg = queue.next(); // might block
        .......
        try {
            //調用Message的處理函數進行處理
            msg.target.dispatchMessage(msg);
        }........
    }
}

此處我們看看MessageQueue的next函數:

Message next() {
    //mPtr保存了NativeMessageQueue的指針
    final long ptr = mPtr;
    .......
    int pendingIdleHandlerCount = -1; // -1 only during first iteration
    int nextPollTimeoutMillis = 0;

    for (;;) {
        if (nextPollTimeoutMillis != 0) {
            //會調用Native函數,最終調用IPCThread的talkWithDriver,將數據寫入Binder驅動或者讀取一次數據
            //不知道在此處進行這個操作的理由?
            Binder.flushPendingCommands();
        }

        //處理native層的數據,此處會利用epoll進行blocked
        nativePollOnce(ptr, nextPollTimeoutMillis);

        synchronized (this) {
            final long now = SystemClock.uptimeMillis();
            Message prevMsg = null;
            Message msg = mMessages;
            //下面其實就是找出下一個異步處理類型的消息;異步處理類型的消息,才含有對應的執行函數
            if (msg != null && msg.target == null) {
                // Stalled by a barrier.  Find the next asynchronous message in the queue.
                do {
                    prevMsg = msg;
                    msg = msg.next;
                } while (msg != null && !msg.isAsynchronous());
            }

            if (msg != null) {
                if (now < msg.when) {
                    // Next message is not ready.  Set a timeout to wake up when it is ready.
                    nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                } else {
                    // Got a message.
                    mBlocked = false;
                    //完成next記錄的存儲
                    if (prevMsg != null) {
                        prevMsg.next = msg.next;
                    } else {
                        mMessages = msg.next;
                    }
                    msg.next = null;
                    if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                    msg.markInUse();
                    return msg;
                }
            } else {
                // No more messages.
                nextPollTimeoutMillis = -1;
            }

            // Process the quit message now that all pending messages have been handled.
            if (mQuitting) {
                dispose();
                return null;
            }

            //MessageQueue中引入了IdleHandler接口,即當MessageQueue沒有數據處理時,調用IdleHandler進行一些工作

            //pendingIdleHandlerCount表示待處理的IdleHandler,初始為-1
            if (pendingIdleHandlerCount < 0
                    && (mMessages == null || now < mMessages.when)) {
                //mIdleHandlers的size默認為0,調用接口addIdleHandler才能增加
                pendingIdleHandlerCount = mIdleHandlers.size();
            }

            if (pendingIdleHandlerCount <= 0) {
                // No idle handlers to run.  Loop and wait some more.
                mBlocked = true;
                continue;
            }

            //將待處理的IdleHandler加入到PendingIdleHandlers中
            if (mPendingIdleHandlers == null) {
                mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
            }
            //調用ArrayList.toArray(T[])節省每次分配的開銷;畢竟對於Message.Next這樣調用頻率較高的函數,能省一點就是一點
            mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
        }

        for (int i = 0; i < pendingIdleHandlerCount; i++) {
            final IdleHandler idler = mPendingIdleHandlers[i];
            mPendingIdleHandlers[i] = null; // release the reference to the handler

            boolean keep = false;
            try {
                //執行實現類的queueIdle函數,返回值決定是否繼續保留
                keep = idler.queueIdle();
            } catch (Throwable t) {
                Log.wtf(TAG, "IdleHandler threw exception", t);
            }

            if (!keep) {
                synchronized (this) {
                    mIdleHandlers.remove(idler);
                }
            }
        }
        pendingIdleHandlerCount = 0;
        nextPollTimeoutMillis = 0;
    }
}

整個提取消息的過程,大致上如上圖所示。

可以看到在Java層,Looper除了要取出MessageQueue的消息外,還會在隊列空閒期執行IdleHandler定義的函數。

2.1 nativePollOnce

現在唯一的疑點是nativePollOnce是如何處理Native層數據的,我們看看對應的native函數:

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
    //果然Java層調用native層MessageQueue時,將long類型的ptr變為指針
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    //最後還是進入到Native層looper的pollOnce函數
    mLooper->pollOnce(timeoutMillis);
    mPollObj = NULL;
    mPollEnv = NULL;

    if (mExceptionObj) {
        .........
    }
}

看看native層looper的pollOnce函數:

//timeoutMillis為超時等待時間。值為-1時,表示無限等待直到有事件到來;值為0時,表示無需等待
//outFd此時為null,含義是:存儲產生事件的文件句柄
//outEvents此時為null,含義是:存儲outFd上發生了哪些事件,包括可讀、可寫、錯誤和中斷
//outData此時為null,含義是:存儲上下文數據,其實調用時傳入的參數
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        //處理response,目前我們先不關注response的內含
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            int ident = response.request.ident;
            if (ident >= 0) {
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;

                if (outFd != NULL) *outFd = fd;
                if (outEvents != NULL) *outEvents = events;
                if (outData != NULL) *outData = data;
                return ident;
            }
        }

        //根據pollInner的結果,進行操作
        if (result != 0) {
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = 0;
            if (outData != NULL) *outData = NULL;
            return result;
        }

        //主力還是靠pollInner
        result = pollInner(timeoutMillis);
    }
}

跟進一下pollInner函數:

int Looper::pollInner(int timeoutMillis) {
    // Adjust the timeout based on when the next message is due.
    //timeoutMillis是Java層事件等待事件
    //native層維持了native message的等待時間
    //此處其實就是選擇最小的等待時間
    if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
         nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
         int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
         if (messageTimeoutMillis >= 0
                && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
            timeoutMillis = messageTimeoutMillis;
        }
    }

    int result = POLL_WAKE;
    //pollInner初始就清空response
    mResponses.clear();
    mResponseIndex = 0;

    // We are about to idle.
    mPolling = true;

    //利用epoll等待mEpollFd監控的句柄上事件到達
    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

    // No longer idling.
    mPolling = false;

    // Acquire lock.
    mLock.lock();

    //重新調用rebuildEpollLocked時,將使得epoll句柄能夠監聽新加入request對應的fd
    if (mEpollRebuildRequired) {
        mEpollRebuildRequired = false;
        rebuildEpollLocked();
        goto Done;
    }

    // Check for poll error.
    if (eventCount < 0) {
        if (errno == EINTR) {
            goto Done;
        }
        ......
        result = POLL_ERROR;
        goto Done;
    }

    // Check for poll timeout.
    if (eventCount == 0) {
        result = POLL_TIMEOUT;
        goto Done;
    }

    for (int i = 0; i < eventCount; i++) {
        if (fd == mWakeEventFd) {
            if (epollEvents & EPOLLIN) {
                //前面已經分析過,當java層或native層有數據寫入隊列時,將寫mWakeEventFd,以觸發epoll喚醒
                //awoken將讀取並清空mWakeEventFd上的數據
                awoken();
            } else {
                .........
            }
        } else {
            //epoll同樣監聽的request對應的fd
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                //存儲這個fd對應的response
                pushResponse(events, mRequests.valueAt(requestIndex));
            } else {
                ..........
            }
        }
    }

Done:
    // Invoke pending message callbacks.
    mNextMessageUptime = LLONG_MAX;
    //處理Native層的Message
    while (mMessageEnvelopes.size() != 0) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        if (messageEnvelope.uptime <= now) {
            // Remove the envelope from the list.
            // We keep a strong reference to the handler until the call to handleMessage
            // finishes.  Then we drop it so that the handler can be deleted *before*
            // we reacquire our lock.
            {
                sp handler = messageEnvelope.handler;
                Message message = messageEnvelope.message;
                mMessageEnvelopes.removeAt(0);
                mSendingMessage = true;
                mLock.unlock();

                //處理Native Message
                handler->handleMessage(message);
            }
            mLock.lock();
            mSendingMessage = false;
            result = POLL_CALLBACK;
        } else {
            // The last message left at the head of the queue determines the next wakeup time.
            mNextMessageUptime = messageEnvelope.uptime;
            break;
        }
    }

    // Release lock.
    mLock.unlock();

    //處理帶回調函數的response
    for (size_t i = 0; i < mResponses.size(); i++) {
        Response& response = mResponses.editItemAt(i);
        if (response.request.ident == POLL_CALLBACK) {
            int fd = response.request.fd;
            int events = response.events;
            void* data = response.request.data;

            //調用response的callback
            int callbackResult = response.request.callback->handleEvent(fd, events, data);
            if (callbackResult == 0) {
                removeFd(fd, response.request.seq);
            }

            response.request.callback.clear();
            result = POLL_CALLBACK;
        }
    }
    return result;
}

說實話native層的代碼寫的很亂,該函數的功能比較多。

如上圖所示,在nativePollOnce中利用epoll監聽是否有數據到來,然後處理native message、native response。

最後,我們看看如何在native層中加入request。

3 添加監控請求

native層增加request依賴於looper的接口addFd:

//fd表示需要監聽的句柄
//ident的含義還沒有搞明白
//events表示需要監聽的事件,例如EVENT_INPUT、EVENT_OUTPUT、EVENT_ERROR和EVENT_HANGUP中的一個或多個
//callback為事件發生後的回調函數
//data為回調函數對應的參數
int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {
    return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data);
}

結合上文native層輪詢隊列的操作,我們大致可以知道:addFd的目的,就是讓native層的looper監控新加入的fd上是否有指定事件發生。

如果發生了指定的事件,就利用回調函數及參數構造對應的response。

native層的looper處理response時,就可以執行對應的回調函數了。

看看實際的代碼:

int Looper::addFd(int fd, int ident, int events, const sp& callback, void* data) {
    ........
    {
        AutoMutex _l(mLock);

        //利用參數構造一個request
        Request request;
        request.fd = fd;
        request.ident = ident;
        request.events = events;
        request.seq = mNextRequestSeq++;
        request.callback = callback;
        request.data = data;
        if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1

        struct epoll_event eventItem;
        request.initEventItem(&eventItem);

        //判斷之前是否已經利用該fd構造過Request
        ssize_t requestIndex = mRequests.indexOfKey(fd);
        if (requestIndex < 0) {
            //mEpollFd新增一個需監聽fd
            int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
            .......
            mRequests.add(fd, request);
        } else {
            //mEpollFd修改舊的fd對應的監聽事件
            int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
            if (epollResult < 0) {
                if (errno == ENOENT) {
                    // Tolerate ENOENT because it means that an older file descriptor was
                    // closed before its callback was unregistered and meanwhile a new
                    // file descriptor with the same number has been created and is now
                    // being registered for the first time. 
                    epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
                    .......
                }
                //發生錯誤重新加入時,安排EpollRebuildLocked,將讓epollFd重新添加一次待監聽的fd
                scheduleEpollRebuildLocked();
            }
            mRequests.replaceValueAt(requestIndex, request);
        }
    }
}

對加入監控請求的處理,在上文介紹pollInner函數時已做分析,此處不再贅述。

三、總結

1、流程總結

MessageQueue的整個流程包括了Java部分和Native部分,從圖中可以看出Native層的比重還是很大的。我們結合上圖回憶一下整個MessageQueue對應的處理流程:

1、Java層創建Looper對象時,將會創建Java層的MessageQueue;Java層的MessageQueue初始化時,將利用Native函數創建出Native層的MessageQueue。

2、Native層的MessageQueue初始化後,將創建對應的Native Looper對象。Native對象初始化時,將創建對應epollFd和WakeEventFd。其中,epollFd將作為epoll的監聽句柄,初始時epollFd僅監聽WakeEventFd。

3、圖中紅色線條為Looper從MessageQueue中取消息時,處理邏輯的流向。

3.1、當Java層的Looper開始循環時,首先需要通過JNI函數調用Native Looper進行pollOnce的操作。

3.2、Native Looper開始運行後,需要等待epollFd被喚醒。當epollFd等待超時或監聽的句柄有事件到來,Native Looper就可以開始處理事件了。

3.3、在Native層,Native Looper將先處理Native MessageQueue中的消息,再調用Response對應的回調函數。

3.4、本次循環中,Native層事件處理完畢後,才開始處理Java層中MessageQueue的消息。若MessageQueue中沒有消息需要處理,並且MessageQueue中存在IdleHandler時,將調用IdleHandler定義的處理函數。

圖中藍色部分為對應的函數調用:

在Java層:

利用MessageQueue的addIdleHandler,可以為MessageQueue增加IdleHandler;

利用MessageQueue的enqueueMessage,可以向MessageQueue增加消息;必要時將利用Native函數向Native層的WakeEventFd寫入消息,以喚醒epollFd。

在Native層:

利用looper:sendMessage,可以為Native MessageQueue增加消息;同樣,要時將向Native層的WakeEventFd寫入消息,以喚醒epollFd;

利用looper:addFd,可以向Native Looper注冊監聽請求,監聽請求包含需監聽的fd、監聽的事件及對應的回調函數等,監聽請求對應的fd將被成為epollFd監聽的對象。當被監聽的fd發生對應的事件後,將會喚醒epollFd,此時將生成對應response加入的response List中,等待處理。一旦response被處理,就會調用對應的回調函數。

2、注意事項

MessageQueue在Java層和Native層有各自的存儲結構,可以分別增加消息。從處理邏輯來看,會優先處理native層的Message,然後處理Native層生成的response,最後才是處理Java層的Message。

  1. 上一頁:
  2. 下一頁:
熱門文章
閱讀排行版
Copyright © Android教程網 All Rights Reserved