Android教程網
  1. 首頁
  2. Android 技術
  3. Android 手機
  4. Android 系統教程
  5. Android 游戲
 Android教程網 >> Android技術 >> 關於Android編程 >> Redis AE 異步事件模塊

Redis AE 異步事件模塊

編輯:關於Android編程

首先想一個問題,為何Redis比Memcached快呢?
一般想法:Memcached完全基於內存,而Redis具有持久化保存特性,即使是異步的,Redis也不可能比Memcached快。
可實際測試情況基本上是:Redis占絕對優勢。


可能原因有二:
1、Libevent: Memcached使用、而Redis沒有選用。Libevent為了迎合通用性造成代碼龐大及犧牲了在特定平台的不少性能。Redis一直堅持設計小巧並去依賴庫的思路。
2、CAS問題:CAS是Memcached中比較方便的一種防止競爭修改資源的方法。
CAS實現需要為每個cache key設置一個隱藏的cas token,cas相當value版本號,每次set會將token需要遞增,

因此帶來CPU和內存的雙重開銷、但達到單機10G+ cache以及QPS上萬之後這些開銷就會給雙方相對帶來一些

細微性能差別。


Redis在封裝事件的處理采用了Reactor模式,添加了定時事件的處理。
Redis處理事件是單進程單線程的,而經典Reator模式對事件是串行處理的。
即如果有一個事件阻塞過久的話會導致整個Redis被阻塞。


下面來簡要分析一下Redis AE事件處理模型。


從代碼中可以看到它主要支持了epoll、select、kqueue、以及基於Solaris的event ports。
主要提供了對兩種類型的事件驅動:
1、IO事件(文件事件),包括有IO的讀事件和寫事件。
2、定時器事件,包括有一次性定時器和循環定時器。

基本的數據結構:@ae.h

 

//定義文件事件處理接口(函數指針)

 

typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);

//時間事件處理接口(函數指針),該函數返回定時的時長
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);

//aeMain中使用,在調用處理事件前調用
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);

//文件事件結構體
typedef struct aeFileEvent {
    //讀或者寫,也用於標識該事件結構體是否正在使用
    int mask; /* one of AE_(READABLE|WRITABLE) */
    //讀事件的處理函數
    aeFileProc *rfileProc;
    //寫事件的處理函數
    aeFileProc *wfileProc;
    //傳遞給上述兩個函數的數據
    void *clientData;
} aeFileEvent;

//時間事件
typedef struct aeTimeEvent {
    //時間事件標識符,用於唯一標識該時間事件,並且用於刪除時間事件
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    //該事件對應的處理程序
    aeTimeProc *timeProc;
    //時間事件的最後一次處理程序,若已設置,則刪除時間事件時會被調用
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *next;
} aeTimeEvent;

//這裡用於保存已觸發的事件
typedef struct aeFiredEvent {
    int fd;
    int mask;
} aeFiredEvent;
/* State of an event based program */
typedef struct aeEventLoop {
    //最大文件描述符的值
    int maxfd;   /* highest file descriptor currently registered */
    //文件描述符的最大監聽數
    int setsize; /* max number of file descriptors tracked */
    //用於生成時間事件的唯一標識id
    long long timeEventNextId;
    //用於檢測系統時間是否變更(判斷標准 now


1、 aeCreateEventLoop

底層epoll多路復用初始化,然後存放在aeEventLoop中 void * 類型的apidata,隱藏了底層的實現。

 

 

typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;

//ae底層的數據創建以及初始化
static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    //創建setsize個epoll_event
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    eventLoop->apidata = state;
    return 0;
}

//創建事件循環,setsize為最大事件的的個數,對於epoll來說也是epoll_event的個數
aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

	//分配該結構體的內存空間
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    
    //初始化最多setsize個事件
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    
    //這一步為創建底層IO處理的數據,如epoll,創建epoll_event,和epfd
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;

err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}

這裡將最大文件描述符作為參數setSize、後面創建的eventLoop->events、eventLoop->fired都是以此
來創建、即用文件描述符作為其索引、以最大內存 sizeof(aeFiredEvent) + sizeof(aeFileEvent) 40字節
乘以總的fd數之內存換取o(1)查找效率是值得的。

 

 

2、aeCreateFileEvent

對於創建文件事件,需要傳入一個該事件對應的處理程序,當事件發生時,會調用對應的回調函數。
這裡設計的aeFileEvent結構體就是將事件源(FD),事件,事件處理程序關聯起來。

 

//添加監聽的事件,其中如果該fd對應的事件已經存在,則為修改合並舊的事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    //判斷fd是否已經添加了事件的監聽
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

//刪除指定事件的監聽
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    int mask = eventLoop->events[fd].mask & (~delmask);

    ee.events = 0;
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (mask != AE_NONE) {
        epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
    } else {
        /* Note, Kernel < 2.6.9 requires a non null event pointer even for
         * EPOLL_CTL_DEL. */
        epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
    }
}

//創建文件事件,並將該事件注冊到eventLoop中
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    
    //直接使用fd來獲取FileEvent,來後面分離事件時也采用這種方法(直接索引)
    aeFileEvent *fe = &eventLoop->events[fd];

	//該該事件添加eventLoop中或者修改原來的已有的(保留舊的)
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    
    //將該事件的處理程序放到對應的位置
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    
    //設置將要傳遞給該事件處理程序的數據
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

3、aeProcessEvents

這個是核心部分,通過epoll_wait將事件分離出來,從而保存到fired中,對於語句
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
通過觸發事件的 fd 在events中直接映射找到與事件關聯的結構體,從而實現事件分派。
Reactor的核心是實現了事件的分離分派。

 

 

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

	//等待事件產生
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            // 利用fired數組記錄觸發的事件
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

//事件處理程序
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

	//若什麼都沒有設置,則直接返回
    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

	//如果有文件事件或者設置了時間事件並且沒有設置DONT_WAIT標志
    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
        	//查找時間最早的時間事件
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;

            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;
			// 找到最早的時間事件與當前時間差值就是epoll wait時間
            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
            	//如果沒有時間事件則可以阻塞、如果此時加入一個Timer event,啥時候喚醒呢?!
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

	    /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
            	//這裡的判斷是為了防止重復調用
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

總結如下:
1. Reactor模式,串行處理事件
2. 具有定時事件功能(但是不能過多,因為是使用鏈表實現的)、O(N)復雜度
3. 優先處理讀事件
  1. 上一頁:
  2. 下一頁:
熱門文章
閱讀排行版
Copyright © Android教程網 All Rights Reserved