123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290 |
- #include <stdlib.h>
- #include <string.h>
- #include <stdio.h>
- #include <fcntl.h>
- #include <sys/epoll.h>
- #include <errno.h>
- #include <pthread.h>
- #include <sys/socket.h>
- #include <sys/types.h>
- #include <netinet/in.h>
- #include <unistd.h>
- #include <signal.h>
- #include <sys/time.h>
- #include <sys/timerfd.h>
- #include "pxEpoll.h"
- #define MAX_FD_NUM 5
- LIST_NODE(timerHead);
- static pthread_mutex_t mutexEpoll = PTHREAD_MUTEX_INITIALIZER;
- static pthread_mutex_t mutexTimer = PTHREAD_MUTEX_INITIALIZER;
- int epollInit(char *threadName)
- {
- int epollfd=0;
- pxLog(INIT,"[%s] Epoll init.",threadName);
- epollfd= epoll_create(MAX_FD_NUM);
- if(epollfd==-1)
- {
- pxLog(ERROR,"epoll create %s\n", strerror(errno));
- return -1;
- }
- return epollfd;
- }
- void epollCancelListenConn(PXEPOLL_S* epoll)
- {
- struct epoll_event event;
- memset(&event, 0, sizeof(event));
- event.events = EPOLLIN | EPOLLERR | EPOLLHUP;
- event.data.ptr=(void*)epoll;
- pthread_mutex_lock(&mutexEpoll);
- epoll_ctl(epoll->serverfd, EPOLL_CTL_MOD, epoll->clientfd, &event);
- pthread_mutex_unlock(&mutexEpoll);
- }
- PXEPOLL_S* epollAdd_conn(int epollfd,int fd,EPOLLCB *recvCB,EPOLLCB *failCB,EPOLLCB *success,void *param)
- {
- struct epoll_event event;
- PXEPOLL_S *epoll=new(PXEPOLL_S);
- if(epoll==NULL)
- {
- pxLog(ERROR,"new(PXEPOLL_S) failed");
- return NULL;
- }
-
- memset(epoll,0,sizeof(PXEPOLL_S));
- epoll->clientfd=fd;
- epoll->serverfd=epollfd;
- epoll->recvCallBack=recvCB;
- epoll->failCallBack=failCB;
- epoll->connSuccess=success;
- epoll->param=param;
- memset(&event, 0, sizeof(event));
- event.events = EPOLLIN | EPOLLERR | EPOLLHUP;
- if(success)
- event.events|=EPOLLOUT;
- event.data.ptr=(void*)epoll;
- pthread_mutex_lock(&mutexEpoll);
- if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1)
- {
- pxLog(ERROR,"Epoll add ctl %s", strerror(errno));
- free(epoll);
- pthread_mutex_unlock(&mutexEpoll);
- return NULL;
- }
- pthread_mutex_unlock(&mutexEpoll);
- return epoll;
- }
- PXEPOLL_S* epollAdd(int epollfd,int fd,EPOLLCB *recvCB,EPOLLCB *failCB,void *param)
- {
- struct epoll_event event;
- PXEPOLL_S *epoll=new(PXEPOLL_S);
- if(epoll==NULL)
- {
- pxLog(ERROR,"new(PXEPOLL_S) failed");
- return NULL;
- }
-
- memset(epoll,0,sizeof(PXEPOLL_S));
- epoll->clientfd=fd;
- epoll->serverfd=epollfd;
- epoll->recvCallBack=recvCB;
- epoll->failCallBack=failCB;
- epoll->param=param;
- memset(&event, 0, sizeof(event));
- event.events = EPOLLIN | EPOLLERR | EPOLLHUP;
- event.data.ptr=(void*)epoll;
- pthread_mutex_lock(&mutexEpoll);
- if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1)
- {
- pxLog(ERROR,"Epoll add ctl %s", strerror(errno));
- free(epoll);
- pthread_mutex_unlock(&mutexEpoll);
- return NULL;
- }
- pthread_mutex_unlock(&mutexEpoll);
- return epoll;
- }
- int epollDel(PXEPOLL_S* epoll)
- {
- pthread_mutex_lock(&mutexEpoll);
- if (epoll_ctl(epoll->serverfd, EPOLL_CTL_DEL, epoll->clientfd, NULL) == -1)
- {
- pxLog(ERROR,"Epoll ctl del %s\n", strerror(errno));
- pthread_mutex_unlock(&mutexEpoll);
- return -1;
- }
- close(epoll->clientfd);
- free(epoll);
- pthread_mutex_unlock(&mutexEpoll);
- return 0;
- }
- void TimeCB(PXEPOLL_S *epoll)
- {
- PXTIMER_S *timer=epoll->param;
- uint64_t tmpExp = 0;
- //bufLen=recv(epoll->fd, buf, sizeof(buf), 0);
- read(epoll->clientfd, &tmpExp, sizeof(uint64_t));
- timer->timerCallBack(timer);
- if(!timer->multiTime)
- timerDel(timer);
- }
- PXTIMER_S* timerAdd(int epollfd,int num,int interval,int multiTime,TIMERCB *timeoutCB,void *param)
- {
- PXTIMER_S *timerNode,*newNode;
- struct itimerspec timer;
- int timerfd = -1;
-
- //如果序号不为零,则一个序号只能存在一个定时器
- if(num!=0)
- {
- pthread_mutex_lock(&mutexTimer);
- list_for_each_entry(timerNode,&timerHead,node)
- {
- if(timerNode->num == num)
- {
- pxLog(WARNING,"There is already a timer[%d]",num);
- pthread_mutex_unlock(&mutexTimer);
- return NULL;
- }
- }
- pthread_mutex_unlock(&mutexTimer);
- }
- newNode=new(PXTIMER_S);
- if(newNode==NULL)
- {
- return NULL;
- }
- memset(newNode,0,sizeof(PXTIMER_S));
- timerfd = timerfd_create(CLOCK_MONOTONIC,0);
- memset(&timer,0,sizeof(timer));
- if(interval==0)
- {
- multiTime=0;
- }
- if(multiTime)
- {
- // timer.it_value.tv_sec = 0;
- // timer.it_value.tv_nsec = 1;
- timer.it_value.tv_sec = interval/1000;
- timer.it_value.tv_nsec = (interval%1000)*1000000;
- timer.it_interval.tv_sec = interval/1000;
- timer.it_interval.tv_nsec = (interval%1000)*1000000;
- }
- else
- {
- timer.it_value.tv_sec = interval/1000;
- timer.it_value.tv_nsec = (interval%1000)*1000000;
- timer.it_interval.tv_sec = interval/1000;
- timer.it_interval.tv_nsec = (interval%1000)*1000000;
- }
- newNode->num=num;
- newNode->multiTime=multiTime;
- newNode->interval=interval;
- newNode->timerCallBack=timeoutCB;
- INIT_LIST_NODE(&newNode->node);
- newNode->param=param;
- newNode->epoll=epollAdd(epollfd,timerfd,TimeCB,NULL,(void*)newNode);
- if(!newNode->epoll)
- {
- free(newNode);
- pxLog(PACKGE,"Insert a new timer[%d] failed",num);
- return NULL;
- }
- if(num)
- {
- pthread_mutex_lock(&mutexTimer);
- list_add(&newNode->node,&timerHead);
- pthread_mutex_unlock(&mutexTimer);
- }
- timerfd_settime(timerfd,0,&timer,NULL);
- pxLog(PACKGE,"Insert a new timer[%d] interval:%d",num,timer.it_interval.tv_sec);
- return newNode;
- }
- void timerDel(PXTIMER_S *entry)
- {
- if(!entry)
- {
- pxLog(ERROR,"The timerNode to be deleted is NULL");
- return;
- }
- epollDel(entry->epoll);
- pthread_mutex_lock(&mutexTimer);
- list_del(&entry->node);
- pthread_mutex_unlock(&mutexTimer);
- free(entry);
- }
- // void timerDelByNum(int timerID);
- int epollMain(int epollfd,char *threadName)
- {
- struct epoll_event events[MAX_FD_NUM];
- pxLog(INIT,"[%s] epoll wait",threadName);
- while (1)
- {
- int num = epoll_wait(epollfd, events, MAX_FD_NUM, -1);
- if (num == -1)
- {
- if(errno==EINTR)
- {
- pxLog(PACKGE,"epoll wait get Interrupted system call");
- continue; // 重启 epoll_wait 函数
- }
- else
- {
- pxLog(ERROR,"epoll wait %s", strerror(errno));
- exit(EXIT_FAILURE);
- }
- }
- else
- {
- int i = 0;
- for (i=0; i<num; i++)
- {
- PXEPOLL_S *epoll=NULL;
- uint32_t eventsflag = events[i].events;
- if (eventsflag & EPOLLERR || eventsflag & EPOLLHUP || eventsflag & EPOLLRDHUP)
- {
- pxLog(PACKGE,"epoll wait %s");
- epoll=(PXEPOLL_S*)events[i].data.ptr;
- if(epoll->failCallBack)
- {
- epoll->failCallBack(epoll);
- }
- else
- {
- if(epoll->param)
- {
- free(epoll->param);
- epoll->param=NULL;
- }
- epollDel(epoll);
- }
- }
- else if(eventsflag & EPOLLIN)
- {
- epoll=(PXEPOLL_S*)events[i].data.ptr;
- if(epoll->recvCallBack!=NULL)
- epoll->recvCallBack(epoll);
- }
- }
- }
- }
- }
|