#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "pxEpoll.h" #define MAX_FD_NUM 5 LIST_NODE(timerHead); static pthread_mutex_t mutexEpoll = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t mutexTimer = PTHREAD_MUTEX_INITIALIZER; int epollInit(char *threadName) { int epollfd=0; pxLog(INIT,"[%s] Epoll init.",threadName); epollfd= epoll_create(MAX_FD_NUM); if(epollfd==-1) { pxLog(ERROR,"epoll create %s\n", strerror(errno)); return -1; } return epollfd; } void epollCancelListenConn(PXEPOLL_S* epoll) { struct epoll_event event; memset(&event, 0, sizeof(event)); event.events = EPOLLIN | EPOLLERR | EPOLLHUP; event.data.ptr=(void*)epoll; pthread_mutex_lock(&mutexEpoll); epoll_ctl(epoll->serverfd, EPOLL_CTL_MOD, epoll->clientfd, &event); pthread_mutex_unlock(&mutexEpoll); } PXEPOLL_S* epollAdd_conn(int epollfd,int fd,EPOLLCB *recvCB,EPOLLCB *failCB,EPOLLCB *success,void *param) { struct epoll_event event; PXEPOLL_S *epoll=new(PXEPOLL_S); if(epoll==NULL) { pxLog(ERROR,"new(PXEPOLL_S) failed"); return NULL; } memset(epoll,0,sizeof(PXEPOLL_S)); epoll->clientfd=fd; epoll->serverfd=epollfd; epoll->recvCallBack=recvCB; epoll->failCallBack=failCB; epoll->connSuccess=success; epoll->param=param; memset(&event, 0, sizeof(event)); event.events = EPOLLIN | EPOLLERR | EPOLLHUP; if(success) event.events|=EPOLLOUT; event.data.ptr=(void*)epoll; pthread_mutex_lock(&mutexEpoll); if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1) { pxLog(ERROR,"Epoll add ctl %s", strerror(errno)); free(epoll); pthread_mutex_unlock(&mutexEpoll); return NULL; } pthread_mutex_unlock(&mutexEpoll); return epoll; } PXEPOLL_S* epollAdd(int epollfd,int fd,EPOLLCB *recvCB,EPOLLCB *failCB,void *param) { struct epoll_event event; PXEPOLL_S *epoll=new(PXEPOLL_S); if(epoll==NULL) { pxLog(ERROR,"new(PXEPOLL_S) failed"); return NULL; } memset(epoll,0,sizeof(PXEPOLL_S)); epoll->clientfd=fd; epoll->serverfd=epollfd; epoll->recvCallBack=recvCB; epoll->failCallBack=failCB; epoll->param=param; memset(&event, 0, sizeof(event)); event.events = EPOLLIN | EPOLLERR | EPOLLHUP; event.data.ptr=(void*)epoll; pthread_mutex_lock(&mutexEpoll); if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1) { pxLog(ERROR,"Epoll add ctl %s", strerror(errno)); free(epoll); pthread_mutex_unlock(&mutexEpoll); return NULL; } pthread_mutex_unlock(&mutexEpoll); return epoll; } int epollDel(PXEPOLL_S* epoll) { pthread_mutex_lock(&mutexEpoll); if (epoll_ctl(epoll->serverfd, EPOLL_CTL_DEL, epoll->clientfd, NULL) == -1) { pxLog(ERROR,"Epoll ctl del %s\n", strerror(errno)); pthread_mutex_unlock(&mutexEpoll); return -1; } close(epoll->clientfd); free(epoll); pthread_mutex_unlock(&mutexEpoll); return 0; } void TimeCB(PXEPOLL_S *epoll) { PXTIMER_S *timer=epoll->param; uint64_t tmpExp = 0; //bufLen=recv(epoll->fd, buf, sizeof(buf), 0); read(epoll->clientfd, &tmpExp, sizeof(uint64_t)); timer->timerCallBack(timer); if(!timer->multiTime) timerDel(timer); } PXTIMER_S* timerAdd(int epollfd,int num,int interval,int multiTime,TIMERCB *timeoutCB,void *param) { PXTIMER_S *timerNode,*newNode; struct itimerspec timer; int timerfd = -1; //如果序号不为零,则一个序号只能存在一个定时器 if(num!=0) { pthread_mutex_lock(&mutexTimer); list_for_each_entry(timerNode,&timerHead,node) { if(timerNode->num == num) { pxLog(WARNING,"There is already a timer[%d]",num); pthread_mutex_unlock(&mutexTimer); return NULL; } } pthread_mutex_unlock(&mutexTimer); } newNode=new(PXTIMER_S); if(newNode==NULL) { return NULL; } memset(newNode,0,sizeof(PXTIMER_S)); timerfd = timerfd_create(CLOCK_MONOTONIC,0); memset(&timer,0,sizeof(timer)); if(interval==0) { multiTime=0; } if(multiTime) { // timer.it_value.tv_sec = 0; // timer.it_value.tv_nsec = 1; timer.it_value.tv_sec = interval/1000; timer.it_value.tv_nsec = (interval%1000)*1000000; timer.it_interval.tv_sec = interval/1000; timer.it_interval.tv_nsec = (interval%1000)*1000000; } else { timer.it_value.tv_sec = interval/1000; timer.it_value.tv_nsec = (interval%1000)*1000000; timer.it_interval.tv_sec = interval/1000; timer.it_interval.tv_nsec = (interval%1000)*1000000; } newNode->num=num; newNode->multiTime=multiTime; newNode->interval=interval; newNode->timerCallBack=timeoutCB; INIT_LIST_NODE(&newNode->node); newNode->param=param; newNode->epoll=epollAdd(epollfd,timerfd,TimeCB,NULL,(void*)newNode); if(!newNode->epoll) { free(newNode); pxLog(PACKGE,"Insert a new timer[%d] failed",num); return NULL; } if(num) { pthread_mutex_lock(&mutexTimer); list_add(&newNode->node,&timerHead); pthread_mutex_unlock(&mutexTimer); } timerfd_settime(timerfd,0,&timer,NULL); pxLog(PACKGE,"Insert a new timer[%d] interval:%d",num,timer.it_interval.tv_sec); return newNode; } void timerDel(PXTIMER_S *entry) { if(!entry) { pxLog(ERROR,"The timerNode to be deleted is NULL"); return; } epollDel(entry->epoll); pthread_mutex_lock(&mutexTimer); list_del(&entry->node); pthread_mutex_unlock(&mutexTimer); free(entry); } // void timerDelByNum(int timerID); int epollMain(int epollfd,char *threadName) { struct epoll_event events[MAX_FD_NUM]; pxLog(INIT,"[%s] epoll wait",threadName); while (1) { int num = epoll_wait(epollfd, events, MAX_FD_NUM, -1); if (num == -1) { if(errno==EINTR) { pxLog(PACKGE,"epoll wait get Interrupted system call"); continue; // 重启 epoll_wait 函数 } else { pxLog(ERROR,"epoll wait %s", strerror(errno)); exit(EXIT_FAILURE); } } else { int i = 0; for (i=0; ifailCallBack) { epoll->failCallBack(epoll); } else { if(epoll->param) { free(epoll->param); epoll->param=NULL; } epollDel(epoll); } } else if(eventsflag & EPOLLIN) { epoll=(PXEPOLL_S*)events[i].data.ptr; if(epoll->recvCallBack!=NULL) epoll->recvCallBack(epoll); } } } } }