Android教程網
  1. 首頁
  2. Android 技術
  3. Android 手機
  4. Android 系統教程
  5. Android 游戲
 Android教程網 >> Android技術 >> 關於Android編程 >> MINA框架源碼分析(三)

MINA框架源碼分析(三)

編輯:關於Android編程

在addNow方法執行結束之後,我們已經為當前NioProcessor裡面所有的NioSocketSession對應的SocketChannel注冊了OP_READ事件,接下來繼續查看Processor的run方法,源碼在上一篇中有,執行到第49行,判斷如果selected大於0執行第52行的process方法,selected的值其實就是Selector的select方法返回值,表示客戶端存在和服務端交互的請求,那麼我們看看process做了些什麼事:

AbstractPollingIoProcessor$process()

 

 private void process() throws Exception {
        for (Iterator i = selectedSessions(); i.hasNext();) {
            S session = i.next();
            process(session);
            i.remove();
        }
    }
可以發現他就是遍歷那些已經發生注冊事件的NioSocketSession集合,並且調用process(S session)方法:

 

AbstractPollingIoProcessor$process()

 

private void process(S session) {
        // Process Reads
        if (isReadable(session) && !session.isReadSuspended()) {
            read(session);
        }

        // Process writes
        if (isWritable(session) && !session.isWriteSuspended()) {
            // add the session to the queue, if it's not already there
            if (session.setScheduledForFlush(true)) {
                flushingSessions.add(session);
            }
        }
    }
首先通過isReadable方法判斷當前NioSocketSession對應的SocketChannel中是否注冊過OP_READ事件,如果注冊過的話,執行read(session)方法;

 

AbstractPollingIoProcessor$read()

 

private void read(S session) {
        IoSessionConfig config = session.getConfig();
        int bufferSize = config.getReadBufferSize();
        IoBuffer buf = IoBuffer.allocate(bufferSize);

        final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();

        try {
            int readBytes = 0;
            int ret;

            try {
                if (hasFragmentation) {

                    while ((ret = read(session, buf)) > 0) {
                        readBytes += ret;

                        if (!buf.hasRemaining()) {
                            break;
                        }
                    }
                } else {
                    ret = read(session, buf);

                    if (ret > 0) {
                        readBytes = ret;
                    }
                }
            } finally {
                buf.flip();
            }

            if (readBytes > 0) {
                IoFilterChain filterChain = session.getFilterChain();
                filterChain.fireMessageReceived(buf);
                buf = null;

                if (hasFragmentation) {
                    if (readBytes << 1 < config.getReadBufferSize()) {
                        session.decreaseReadBufferSize();
                    } else if (readBytes == config.getReadBufferSize()) {
                        session.increaseReadBufferSize();
                    }
                }
            }

            if (ret < 0) {
                // scheduleRemove(session);
                IoFilterChain filterChain = session.getFilterChain();
                filterChain.fireInputClosed();
            }
        } catch (Exception e) {
            if (e instanceof IOException) {
                if (!(e instanceof PortUnreachableException)
                        || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
                        || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
                    scheduleRemove(session);
                }
            }

            IoFilterChain filterChain = session.getFilterChain();
            filterChain.fireExceptionCaught(e);
        }
    }

這部分代碼比較長,我們了解主干就可以了;

首先第4行創建了一個IoBuffer對象,其實這就是我們java NIO中的Buffer角色,接著看到調用了read(session,buf)方法,這個方法返回值大於0表示讀取數據成功,具體這個方法裡面執行了些什麼我們可以到NioProcessor裡面的read方法看看:

 

  protected int read(NioSession session, IoBuffer buf) throws Exception {
        ByteChannel channel = session.getChannel();

        return channel.read(buf.buf());
    }
其實很簡單了,就是將通道中的數據寫到我們的緩存中罷了,這就是NIO本身的用法;

 

如果我們讀取到了數據,就會執行第33行的if語句,在if語句塊中會執行IoFilterChain的fireMessageReceived方法,其實呢,IoFilterChain就是我們的責任鏈,前面分析源碼的過程中我們知道在創建NioSocketSession的時候會創建一個DefaultIoFilterChain出來,並且會在它裡面創建一個EntryImpl鏈,默認情況下會創建一個HeadFilter鏈頭和TailFilter鏈尾,那麼這裡的IoFilterChain其實就是對DefaultIoFilterChain進行轉換過來的,默認情況下也就值存在鏈頭和鏈尾了,我們在使用MINA的時候可以通過NioSocketAcceptor的getFilterChain獲得其對應的IoFilterChain,其實getFilterChain的真正實現是在AbstarctIoService裡面的,有了這個IoFilterChain之後,我們可以調用他的addLast方法為其添加我們自定義或者MINA自帶的Filter對象,addLast的真正實現是在DefaultIoFilterChain裡面的,我們可以看看:

 

 public synchronized void addLast(String name, IoFilter filter) {
        checkAddable(name);
        register(tail.prevEntry, name, filter);
    }
間接調用了register方法,來看看register

 

 

private void register(EntryImpl prevEntry, String name, IoFilter filter) {
        EntryImpl newEntry = new EntryImpl(prevEntry, prevEntry.nextEntry, name, filter);

        try {
            filter.onPreAdd(this, name, newEntry.getNextFilter());
        } catch (Exception e) {
            throw new IoFilterLifeCycleException("onPreAdd(): " + name + ':' + filter + " in " + getSession(), e);
        }

        prevEntry.nextEntry.prevEntry = newEntry;
        prevEntry.nextEntry = newEntry;
        name2entry.put(name, newEntry);

        try {
            filter.onPostAdd(this, name, newEntry.getNextFilter());
        } catch (Exception e) {
            deregister0(newEntry);
            throw new IoFilterLifeCycleException("onPostAdd(): " + name + ':' + filter + " in " + getSession(), e);
        }
    }
如果你鏈表操作很熟的話,會發現其實這裡進行的就是鏈表插入操作了,在第10行和11行可以體現出來,那麼我們這裡有個疑問了,鏈表操作的時候,我們只需要一個鏈頭就可以了,沒必要給鏈尾啊,這裡的鏈尾是干嘛的呀,我來告訴你答案吧,鏈尾其實就是用來鏈接我們的IoHandler對象的,IoHandler是我們整個責任鏈的結束部分,我們真正的業務邏輯的處理都是在它裡面完成的,所以你會發現在你使用MINA框架的時候,如果不給NioSocketAcceptor設置IoHandler的話是會報異常的,因為他是要進行業務邏輯處理的,沒有他你整個程序是沒法處理的,既然他是鏈接在鏈尾後面的,那麼我們就該看看TailFilter的實現了:

 

他是DefaultIoFilterChain的靜態內部類,代碼比較長,我就截取兩個方法,其他的方法類似啦:

 

 private static class TailFilter extends IoFilterAdapter {
        @Override
        public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception {
            try {
                session.getHandler().sessionCreated(session);
            } finally {
                // Notify the related future.
                ConnectFuture future = (ConnectFuture) session.removeAttribute(SESSION_CREATED_FUTURE);

                if (future != null) {
                    future.setSession(session);
                }
            }
        }

        @Override
        public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception {
            session.getHandler().sessionOpened(session);
        }
}
可以看到在TailFiler裡面執行的方法實際上都是執行的IoHandler中對應的方法啦,也就是這樣我們把Filter責任鏈和IoHandler聯系到了一起;

 

好了,扯得有點遠了,繼續回到我們的AbstractPollingIoProcessor裡面的read方法,第33行在我們獲取到數據之後首先會獲得我們的DefaultIoFilterChain責任鏈,並且調用fireMessageReceived方法,我們來看看fireMessageReceived方法:

這個方法位於DefaultIoFilterChain中

 

public void fireMessageReceived(Object message) {
        if (message instanceof IoBuffer) {
            session.increaseReadBytes(((IoBuffer) message).remaining(), System.currentTimeMillis());
        }

        callNextMessageReceived(head, session, message);
    }
可以看到他調用的是callNextMessageReceived方法
 private void callNextMessageReceived(Entry entry, IoSession session, Object message) {
        try {
            IoFilter filter = entry.getFilter();
            NextFilter nextFilter = entry.getNextFilter();
            filter.messageReceived(nextFilter, session, message);
        } catch (Exception e) {
            fireExceptionCaught(e);
        } catch (Error e) {
            fireExceptionCaught(e);
            throw e;
        }
    }
在callNextMessageReceived方法中首先會獲得當前Filter對象,接著獲得當前Filter的nextFilter對象,接著調用filter的messageReceived方法,這個方法其實上執行的是DefaultIoFilterChain的messageReceived方法:

 

 

  public void messageReceived(IoSession session, Object message) {
                    Entry nextEntry = EntryImpl.this.nextEntry;
                    callNextMessageReceived(nextEntry, session, message);
                }
可以看到他還是執行的callNextMessageReceived方法,這樣層層遞歸的執行,直到Filter的鏈尾,那麼接下來就是執行IoHandler裡面對應的messageReceived方法進行具體的業務邏輯操作喽!這樣的話,整個read過程中涉及到的關鍵部分就結束啦!

 

接下來分析下write過程,如果我們想要給服務端發送消息內容的話,首先我們需要獲取到IoSession對象,這裡我們以NioSocketSession為例,發送消息調用的將是他的write方法,查看NioSocketSession發現他裡面沒有write方法,到他的父類NioSession查看也不存在,最後在AbstractIoSession找到啦;

AbstractIoSession$write()

 

public WriteFuture write(Object message) {
        return write(message, null);
    }
也就是說他執行的是兩個參數的write方法,
public WriteFuture write(Object message, SocketAddress remoteAddress) {
        if (message == null) {
            throw new IllegalArgumentException("Trying to write a null message : not allowed");
        }

        // We can't send a message to a connected session if we don't have
        // the remote address
        if (!getTransportMetadata().isConnectionless() && (remoteAddress != null)) {
            throw new UnsupportedOperationException();
        }

        // If the session has been closed or is closing, we can't either
        // send a message to the remote side. We generate a future
        // containing an exception.
        if (isClosing() || !isConnected()) {
            WriteFuture future = new DefaultWriteFuture(this);
            WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
            WriteException writeException = new WriteToClosedSessionException(request);
            future.setException(writeException);
            return future;
        }

        FileChannel openedFileChannel = null;

        // TODO: remove this code as soon as we use InputStream
        // instead of Object for the message.
        try {
            if ((message instanceof IoBuffer) && !((IoBuffer) message).hasRemaining()) {
                // Nothing to write : probably an error in the user code
                throw new IllegalArgumentException("message is empty. Forgot to call flip()?");
            } else if (message instanceof FileChannel) {
                FileChannel fileChannel = (FileChannel) message;
                message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
            } else if (message instanceof File) {
                File file = (File) message;
                openedFileChannel = new FileInputStream(file).getChannel();
                message = new FilenameFileRegion(file, openedFileChannel, 0, openedFileChannel.size());
            }
        } catch (IOException e) {
            ExceptionMonitor.getInstance().exceptionCaught(e);
            return DefaultWriteFuture.newNotWrittenFuture(this, e);
        }

        // Now, we can write the message. First, create a future
        WriteFuture writeFuture = new DefaultWriteFuture(this);
        WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);

        // Then, get the chain and inject the WriteRequest into it
        IoFilterChain filterChain = getFilterChain();
        filterChain.fireFilterWrite(writeRequest);

        // TODO : This is not our business ! The caller has created a
        // FileChannel,
        // he has to close it !
        if (openedFileChannel != null) {
            // If we opened a FileChannel, it needs to be closed when the write
            // has completed
            final FileChannel finalChannel = openedFileChannel;
            writeFuture.addListener(new IoFutureListener() {
                public void operationComplete(WriteFuture future) {
                    try {
                        finalChannel.close();
                    } catch (IOException e) {
                        ExceptionMonitor.getInstance().exceptionCaught(e);
                    }
                }
            });
        }

        // Return the WriteFuture.
        return writeFuture;
    }
這部分源碼比較長,我們挑重點看,在第45行創建了一個WriteFuture對象,接著把write的消息內容以及WriteFuture對象對象作為參數封裝出來一個WriteRequest對象,第49行獲得了我們的責任鏈,和read過程一樣,我們也可以通過DefaultIoFilterChain的addLast方法添加自己創建的Filter對象,接著第50行調用DefaultIoFilterChain的fireFilterWrite方法

 

 

  public void fireFilterWrite(WriteRequest writeRequest) {
        callPreviousFilterWrite(tail, session, writeRequest);
    }
可以看到這個方法執行的是callPreviousFilterWrite方法

 

 

    private void callPreviousFilterWrite(Entry entry, IoSession session, WriteRequest writeRequest) {
        try {
            IoFilter filter = entry.getFilter();
            NextFilter nextFilter = entry.getNextFilter();
            filter.filterWrite(nextFilter, session, writeRequest);
        } catch (Exception e) {
            writeRequest.getFuture().setException(e);
            fireExceptionCaught(e);
        } catch (Error e) {
            writeRequest.getFuture().setException(e);
            fireExceptionCaught(e);
            throw e;
        }
    }
和之前的read方法中責任鏈的執行過程一樣,也是首先獲取filter對象,同時獲取該filter對象的下一個nextFilter對象,調用他的filterWrite方法

 

 

 @Override
        public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
            nextFilter.filterWrite(session, writeRequest);
        }
它裡面調用的是filterWrite方法,這個方法裡面會繼續調用filterWrite方法,這樣層層遞歸,直到到達責任鏈的鏈頭,也就是HeadFilter為止,調用HeadFilter的filterWrite方法,HeadFilter是DefaultIoFilterChain的靜態內部類:

 

 

 public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {

            AbstractIoSession s = (AbstractIoSession) session;

            // Maintain counters.
            if (writeRequest.getMessage() instanceof IoBuffer) {
                IoBuffer buffer = (IoBuffer) writeRequest.getMessage();
                // I/O processor implementation will call buffer.reset()
                // it after the write operation is finished, because
                // the buffer will be specified with messageSent event.
                buffer.mark();
                int remaining = buffer.remaining();

                if (remaining > 0) {
                    s.increaseScheduledWriteBytes(remaining);
                }
            } else {
                s.increaseScheduledWriteMessages();
            }

            WriteRequestQueue writeRequestQueue = s.getWriteRequestQueue();

            if (!s.isWriteSuspended()) {
                if (writeRequestQueue.isEmpty(session)) {
                    // We can write directly the message
                    s.getProcessor().write(s, writeRequest);
                } else {
                    s.getWriteRequestQueue().offer(s, writeRequest);
                    s.getProcessor().flush(s);
                }
            } else {
                s.getWriteRequestQueue().offer(s, writeRequest);
            }
        }
在HeadFilter的filterWrite方法裡面,你會看到有這麼一句代碼s.getProcessor(),他其實上就是獲得處理我們當前NioSocketSession的NioProcessor對象而已,那麼接下來的write操作是包含兩個參數的,我們的NioProcessor裡面並沒有實現這個方法,需要到他的父類AbstractPollingIoProcessor查看,代碼如下:

 

 

  public void write(S session, WriteRequest writeRequest) {
        WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();

        writeRequestQueue.offer(session, writeRequest);

        if (!session.isWriteSuspended()) {
            this.flush(session);
        }
    }
做的事還是比較少的,就是將當前的寫請求加入到我們當前NioSocketSession的寫請求隊列中,同時通過AbstractPollingIoProcessor的flush方法將NioSocketSession放入到flushingSessions隊列中,這個隊列主要存儲的是那些將要被flush的IoSession集合;我們來看看flush方法

 

AbstractPollingIoProcessor$flush

 

public final void flush(S session) {
        // add the session to the queue if it's not already
        // in the queue, then wake up the select()
        if (session.setScheduledForFlush(true)) {
            flushingSessions.add(session);
            wakeup();
        }
    }
第5行執行了將當前NioSocketSession加入到flushingSession的操作,隨後調用了wakeup方法,wakeup方法會喚醒我們阻塞的select方法,這樣的話,我們的服務端就可以收到客戶端發送的消息了,接著讀取過程就和上面的源碼講解一樣了;

 

至此,MINA中主要的服務端源碼分析結束了,注意我們只分析了NioSocketAcceptor部分的源碼,沒有涉及NioSocketConnector部分,其實NioSocketConnector部分的源碼和NioSocketAcceptor部分基本上是類似的分析過程,在這裡我就不細細分析了,下一篇我會對MINA框架做一個小結,包括它裡面涉及到的一些線程模型結構;
 

 

  1. 上一頁:
  2. 下一頁:
熱門文章
閱讀排行版
Copyright © Android教程網 All Rights Reserved