pxEpoll.c 7.2 KB


  1. #include <stdlib.h>
  2. #include <string.h>
  3. #include <stdio.h>
  4. #include <fcntl.h>
  5. #include <sys/epoll.h>
  6. #include <errno.h>
  7. #include <pthread.h>
  8. #include <sys/socket.h>
  9. #include <sys/types.h>
  10. #include <netinet/in.h>
  11. #include <unistd.h>
  12. #include <signal.h>
  13. #include <sys/time.h>
  14. #include <sys/timerfd.h>
  15. #include "pxEpoll.h"
  16. #define MAX_FD_NUM 5
  17. LIST_NODE(timerHead);
  18. static pthread_mutex_t mutexEpoll = PTHREAD_MUTEX_INITIALIZER;
  19. static pthread_mutex_t mutexTimer = PTHREAD_MUTEX_INITIALIZER;
  20. int epollInit(char *threadName)
  21. {
  22. int epollfd=0;
  23. pxLog(INIT,"[%s] Epoll init.",threadName);
  24. epollfd= epoll_create(MAX_FD_NUM);
  25. if(epollfd==-1)
  26. {
  27. pxLog(ERROR,"epoll create %s\n", strerror(errno));
  28. return -1;
  29. }
  30. return epollfd;
  31. }
  32. void epollCancelListenConn(PXEPOLL_S* epoll)
  33. {
  34. struct epoll_event event;
  35. memset(&event, 0, sizeof(event));
  36. event.events = EPOLLIN | EPOLLERR | EPOLLHUP;
  37. event.data.ptr=(void*)epoll;
  38. pthread_mutex_lock(&mutexEpoll);
  39. epoll_ctl(epoll->serverfd, EPOLL_CTL_MOD, epoll->clientfd, &event);
  40. pthread_mutex_unlock(&mutexEpoll);
  41. }
  42. PXEPOLL_S* epollAdd_conn(int epollfd,int fd,EPOLLCB *recvCB,EPOLLCB *failCB,EPOLLCB *success,void *param)
  43. {
  44. struct epoll_event event;
  45. PXEPOLL_S *epoll=new(PXEPOLL_S);
  46. if(epoll==NULL)
  47. {
  48. pxLog(ERROR,"new(PXEPOLL_S) failed");
  49. return NULL;
  50. }
  51. memset(epoll,0,sizeof(PXEPOLL_S));
  52. epoll->clientfd=fd;
  53. epoll->serverfd=epollfd;
  54. epoll->recvCallBack=recvCB;
  55. epoll->failCallBack=failCB;
  56. epoll->connSuccess=success;
  57. epoll->param=param;
  58. memset(&event, 0, sizeof(event));
  59. event.events = EPOLLIN | EPOLLERR | EPOLLHUP;
  60. if(success)
  61. event.events|=EPOLLOUT;
  62. event.data.ptr=(void*)epoll;
  63. pthread_mutex_lock(&mutexEpoll);
  64. if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1)
  65. {
  66. pxLog(ERROR,"Epoll add ctl %s", strerror(errno));
  67. free(epoll);
  68. pthread_mutex_unlock(&mutexEpoll);
  69. return NULL;
  70. }
  71. pthread_mutex_unlock(&mutexEpoll);
  72. return epoll;
  73. }
  74. PXEPOLL_S* epollAdd(int epollfd,int fd,EPOLLCB *recvCB,EPOLLCB *failCB,void *param)
  75. {
  76. struct epoll_event event;
  77. PXEPOLL_S *epoll=new(PXEPOLL_S);
  78. if(epoll==NULL)
  79. {
  80. pxLog(ERROR,"new(PXEPOLL_S) failed");
  81. return NULL;
  82. }
  83. memset(epoll,0,sizeof(PXEPOLL_S));
  84. epoll->clientfd=fd;
  85. epoll->serverfd=epollfd;
  86. epoll->recvCallBack=recvCB;
  87. epoll->failCallBack=failCB;
  88. epoll->param=param;
  89. memset(&event, 0, sizeof(event));
  90. event.events = EPOLLIN | EPOLLERR | EPOLLHUP;
  91. event.data.ptr=(void*)epoll;
  92. pthread_mutex_lock(&mutexEpoll);
  93. if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1)
  94. {
  95. pxLog(ERROR,"Epoll add ctl %s", strerror(errno));
  96. free(epoll);
  97. pthread_mutex_unlock(&mutexEpoll);
  98. return NULL;
  99. }
  100. pthread_mutex_unlock(&mutexEpoll);
  101. return epoll;
  102. }
  103. int epollDel(PXEPOLL_S* epoll)
  104. {
  105. pthread_mutex_lock(&mutexEpoll);
  106. if (epoll_ctl(epoll->serverfd, EPOLL_CTL_DEL, epoll->clientfd, NULL) == -1)
  107. {
  108. pxLog(ERROR,"Epoll ctl del %s\n", strerror(errno));
  109. pthread_mutex_unlock(&mutexEpoll);
  110. return -1;
  111. }
  112. close(epoll->clientfd);
  113. free(epoll);
  114. pthread_mutex_unlock(&mutexEpoll);
  115. return 0;
  116. }
  117. void TimeCB(PXEPOLL_S *epoll)
  118. {
  119. PXTIMER_S *timer=epoll->param;
  120. uint64_t tmpExp = 0;
  121. //bufLen=recv(epoll->fd, buf, sizeof(buf), 0);
  122. read(epoll->clientfd, &tmpExp, sizeof(uint64_t));
  123. timer->timerCallBack(timer);
  124. if(!timer->multiTime)
  125. timerDel(timer);
  126. }
  127. PXTIMER_S* timerAdd(int epollfd,int num,int interval,int multiTime,TIMERCB *timeoutCB,void *param)
  128. {
  129. PXTIMER_S *timerNode,*newNode;
  130. struct itimerspec timer;
  131. int timerfd = -1;
  132. //如果序号不为零,则一个序号只能存在一个定时器
  133. if(num!=0)
  134. {
  135. pthread_mutex_lock(&mutexTimer);
  136. list_for_each_entry(timerNode,&timerHead,node)
  137. {
  138. if(timerNode->num == num)
  139. {
  140. pxLog(WARNING,"There is already a timer[%d]",num);
  141. pthread_mutex_unlock(&mutexTimer);
  142. return NULL;
  143. }
  144. }
  145. pthread_mutex_unlock(&mutexTimer);
  146. }
  147. newNode=new(PXTIMER_S);
  148. if(newNode==NULL)
  149. {
  150. return NULL;
  151. }
  152. memset(newNode,0,sizeof(PXTIMER_S));
  153. timerfd = timerfd_create(CLOCK_MONOTONIC,0);
  154. memset(&timer,0,sizeof(timer));
  155. if(interval==0)
  156. {
  157. multiTime=0;
  158. }
  159. if(multiTime)
  160. {
  161. // timer.it_value.tv_sec = 0;
  162. // timer.it_value.tv_nsec = 1;
  163. timer.it_value.tv_sec = interval/1000;
  164. timer.it_value.tv_nsec = (interval%1000)*1000000;
  165. timer.it_interval.tv_sec = interval/1000;
  166. timer.it_interval.tv_nsec = (interval%1000)*1000000;
  167. }
  168. else
  169. {
  170. timer.it_value.tv_sec = interval/1000;
  171. timer.it_value.tv_nsec = (interval%1000)*1000000;
  172. timer.it_interval.tv_sec = interval/1000;
  173. timer.it_interval.tv_nsec = (interval%1000)*1000000;
  174. }
  175. newNode->num=num;
  176. newNode->multiTime=multiTime;
  177. newNode->interval=interval;
  178. newNode->timerCallBack=timeoutCB;
  179. INIT_LIST_NODE(&newNode->node);
  180. newNode->param=param;
  181. newNode->epoll=epollAdd(epollfd,timerfd,TimeCB,NULL,(void*)newNode);
  182. if(!newNode->epoll)
  183. {
  184. free(newNode);
  185. pxLog(PACKGE,"Insert a new timer[%d] failed",num);
  186. return NULL;
  187. }
  188. if(num)
  189. {
  190. pthread_mutex_lock(&mutexTimer);
  191. list_add(&newNode->node,&timerHead);
  192. pthread_mutex_unlock(&mutexTimer);
  193. }
  194. timerfd_settime(timerfd,0,&timer,NULL);
  195. pxLog(PACKGE,"Insert a new timer[%d] interval:%d",num,timer.it_interval.tv_sec);
  196. return newNode;
  197. }
  198. void timerDel(PXTIMER_S *entry)
  199. {
  200. if(!entry)
  201. {
  202. pxLog(ERROR,"The timerNode to be deleted is NULL");
  203. return;
  204. }
  205. epollDel(entry->epoll);
  206. pthread_mutex_lock(&mutexTimer);
  207. list_del(&entry->node);
  208. pthread_mutex_unlock(&mutexTimer);
  209. free(entry);
  210. }
  211. // void timerDelByNum(int timerID);
  212. int epollMain(int epollfd,char *threadName)
  213. {
  214. struct epoll_event events[MAX_FD_NUM];
  215. pxLog(INIT,"[%s] epoll wait",threadName);
  216. while (1)
  217. {
  218. int num = epoll_wait(epollfd, events, MAX_FD_NUM, -1);
  219. if (num == -1)
  220. {
  221. if(errno==EINTR)
  222. {
  223. pxLog(PACKGE,"epoll wait get Interrupted system call");
  224. continue; // 重启 epoll_wait 函数
  225. }
  226. else
  227. {
  228. pxLog(ERROR,"epoll wait %s", strerror(errno));
  229. exit(EXIT_FAILURE);
  230. }
  231. }
  232. else
  233. {
  234. int i = 0;
  235. for (i=0; i<num; i++)
  236. {
  237. PXEPOLL_S *epoll=NULL;
  238. uint32_t eventsflag = events[i].events;
  239. if (eventsflag & EPOLLERR || eventsflag & EPOLLHUP || eventsflag & EPOLLRDHUP)
  240. {
  241. pxLog(PACKGE,"epoll wait %s");
  242. epoll=(PXEPOLL_S*)events[i].data.ptr;
  243. if(epoll->failCallBack)
  244. {
  245. epoll->failCallBack(epoll);
  246. }
  247. else
  248. {
  249. if(epoll->param)
  250. {
  251. free(epoll->param);
  252. epoll->param=NULL;
  253. }
  254. epollDel(epoll);
  255. }
  256. }
  257. else if(eventsflag & EPOLLIN)
  258. {
  259. epoll=(PXEPOLL_S*)events[i].data.ptr;
  260. if(epoll->recvCallBack!=NULL)
  261. epoll->recvCallBack(epoll);
  262. }
  263. }
  264. }
  265. }
  266. }