IOSocket.cpp 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. #define WIN32_LEAN_AND_MEAN
  2. #include <windows.h>
  3. #include <WinSock2.h>
  4. #include <assert.h>
  5. #include <iostream>
  6. #include <mutex>
  7. #include "IOSocket.h"
  8. #include "../common/Pool.h"
  9. #include "../common/IOBuffer.h"
  10. bool CIOSocket::Init(HANDLE hCom)
  11. {
  12. ZeroMemory(&this->m_OverlappedRead, sizeof(m_OverlappedRead));
  13. ZeroMemory(&m_OverlappedWrite, sizeof(m_OverlappedWrite));
  14. m_pFirstHead = m_pLastHead = NULL;
  15. m_dwBufferCount = 0;
  16. m_IOCount = 0;
  17. ZeroMemory(m_WriteBuff, sizeof(m_WriteBuff));
  18. if (!CreateIoCompletionPort((HANDLE)m_hSocket, hCom, (ULONG_PTR)this, 0))
  19. {
  20. assert(false);
  21. std::cout << "failed" << std::endl;
  22. return false;
  23. }
  24. OnCreate();
  25. return true;
  26. }
  27. CIOSocket::CIOSocket(HANDLE hCom, SOCKET s) :m_hSocket(s), m_lRef(1), m_hComplete(hCom), m_pFirstHead(nullptr), m_pLastHead(nullptr)
  28. {
  29. m_pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
  30. }
  31. void CIOSocket::Read(DWORD dwLeft)
  32. {
  33. if (m_hSocket == INVALID_SOCKET) return;
  34. if (m_pBuffer->Length > dwLeft)
  35. {
  36. m_pBuffer->Length -= dwLeft;
  37. CIOBuffer* pNextBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
  38. int8_t* ptr = m_pBuffer->Buffer;
  39. ptr += m_pBuffer->Length;
  40. memcpy(pNextBuffer->Buffer, ptr, dwLeft);
  41. m_pBuffer->Release(__FILE__, __LINE__);
  42. m_pBuffer = pNextBuffer;
  43. pNextBuffer->Length = dwLeft;
  44. }
  45. AddRef();
  46. WSABUF buf;
  47. buf.len = CIOBuffer::IO_BUFFER_SIZE - m_pBuffer->Length;
  48. int8_t* ptr = m_pBuffer->Buffer;
  49. ptr += m_pBuffer->Length;
  50. buf.buf = (char*)ptr;
  51. DWORD dwFlag = 0;
  52. DWORD dwRecv;
  53. if (WSARecv(m_hSocket, &buf, 1, &dwRecv, &dwFlag, &this->m_OverlappedRead, NULL) != 0)
  54. {
  55. if (WSAGetLastError() != WSA_IO_PENDING)
  56. {
  57. std::cout << "WSARecg Error " << WSAGetLastError() << std::endl;
  58. Close();
  59. return;
  60. }
  61. }
  62. }
  63. void CIOSocket::Write(CIOBuffer* pBuffer)
  64. {
  65. if (m_hSocket == INVALID_SOCKET) return;
  66. if (pBuffer->Length == 0) return;
  67. if (pBuffer->Length > CIOBuffer::IO_BUFFER_SIZE) return;
  68. pBuffer->AddRef();
  69. std::lock_guard<std::mutex> l(m_lock);
  70. if (m_dwBufferCount == 0)
  71. {
  72. AddRef();
  73. m_WriteBuff[m_dwBufferCount].buf = (char*)pBuffer->Buffer;
  74. m_WriteBuff[m_dwBufferCount].len = pBuffer->Length;
  75. m_dwWriteCount = pBuffer->Length;
  76. m_dwBufferCount = 1;
  77. DWORD dwSent = 0;
  78. if (WSASend(m_hSocket, m_WriteBuff, m_dwBufferCount, &dwSent, NULL, &m_OverlappedWrite, NULL) != 0)
  79. {
  80. DWORD error = WSAGetLastError();
  81. if (error != WSA_IO_PENDING)
  82. {
  83. m_dwWriteCount = 0;
  84. pBuffer->Release(__FILE__, __LINE__);
  85. m_dwBufferCount = 0;
  86. Release();
  87. PostQueuedCompletionStatus(m_hComplete, -1, (DWORD_PTR)this, NULL);
  88. }
  89. }
  90. }
  91. else
  92. {
  93. IO_STRUCT* ptr = Alloc_Pool<IO_STRUCT>::GetInstance().Alloc();
  94. ptr->pBuffer = pBuffer;
  95. if (m_pFirstHead == NULL)
  96. {
  97. m_pFirstHead = m_pLastHead = ptr;
  98. }
  99. else
  100. {
  101. m_pLastHead->pNext = ptr;
  102. m_pLastHead = ptr;
  103. }
  104. m_IOCount++;
  105. }
  106. }
  107. void CIOSocket::WriteCallback(DWORD dwTransferred)
  108. {
  109. if (m_hSocket == INVALID_SOCKET) return;
  110. if (dwTransferred == 0 || dwTransferred == -1)
  111. {
  112. Close();
  113. return;
  114. }
  115. if (m_dwWriteCount != dwTransferred)
  116. {
  117. Close();
  118. return;
  119. }
  120. std::lock_guard<std::mutex> l(m_lock);
  121. for (DWORD i = 0; i < m_dwBufferCount; i++)
  122. {
  123. CIOBuffer* ptr = (CIOBuffer*)(m_WriteBuff[i].buf - offsetof(CIOBuffer, Buffer));
  124. ptr->Release(__FILE__, __LINE__);
  125. }
  126. ZeroMemory(&m_WriteBuff, sizeof(m_WriteBuff));
  127. m_dwBufferCount = m_dwWriteCount = 0;
  128. if (m_pFirstHead == NULL)
  129. {
  130. m_pLastHead = NULL;
  131. m_bSending = false;
  132. }
  133. else
  134. {
  135. while (m_pFirstHead != NULL && m_dwWriteCount < CIOBuffer::IO_BUFFER_SIZE)
  136. {
  137. m_WriteBuff[m_dwBufferCount].buf = (char*)m_pFirstHead->pBuffer->Buffer;
  138. m_WriteBuff[m_dwBufferCount].len = m_pFirstHead->pBuffer->Length;
  139. m_dwWriteCount += m_pFirstHead->pBuffer->Length;
  140. IO_STRUCT* temp = m_pFirstHead;
  141. m_pFirstHead = m_pFirstHead->pNext;
  142. Alloc_Pool<IO_STRUCT>::GetInstance().Free(temp);
  143. m_IOCount--;
  144. m_dwBufferCount++;
  145. if (m_dwBufferCount == _countof(m_WriteBuff))break;
  146. }
  147. DWORD dwSent;
  148. AddRef();
  149. if (WSASend(m_hSocket, m_WriteBuff, m_dwBufferCount, &dwSent, NULL, &m_OverlappedWrite, NULL) != 0)
  150. {
  151. DWORD error = WSAGetLastError();
  152. if (error != WSA_IO_PENDING)
  153. {
  154. for (DWORD i = 0; i < m_dwBufferCount; i++)
  155. {
  156. CIOBuffer* pBuffer = (CIOBuffer*)(m_WriteBuff[i].buf - offsetof(CIOBuffer, Buffer));
  157. pBuffer->Release(__FILE__, __LINE__);
  158. }
  159. PostQueuedCompletionStatus(m_hComplete, -1, (ULONG_PTR)this, NULL);
  160. Release();
  161. }
  162. }
  163. }
  164. }
  165. CIOSocket::~CIOSocket()
  166. {
  167. for (DWORD i = 0; i < m_dwBufferCount; i++)
  168. {
  169. CIOBuffer* ptr = (CIOBuffer*)(m_WriteBuff[i].buf - offsetof(CIOBuffer, Buffer));
  170. ptr->Release(__FILE__,__LINE__);
  171. }
  172. m_dwBufferCount = 0;
  173. IO_STRUCT* temp = m_pFirstHead;
  174. while (temp)
  175. {
  176. IO_STRUCT* t = temp;
  177. temp = temp->pNext;
  178. t->pBuffer->Release(__FILE__, __LINE__);
  179. Alloc_Pool<IO_STRUCT>::GetInstance().Free(t);
  180. m_IOCount--;
  181. }
  182. if (m_IOCount != 0)
  183. {
  184. std::cout << "memory leack" << std::endl;
  185. }
  186. m_pBuffer->Release(__FILE__, __LINE__);
  187. }
  188. void CIOSocket::AddRef()
  189. {
  190. InterlockedIncrement(&m_lRef);
  191. }
  192. void CIOSocket::Release()
  193. {
  194. if (InterlockedDecrement(&m_lRef) == 0)
  195. {
  196. //Free();
  197. }
  198. }
  199. void CIOSocket::Close()
  200. {
  201. SOCKET s = (SOCKET)InterlockedExchange((UINT64*)&m_hSocket, INVALID_SOCKET);
  202. if (s != INVALID_SOCKET)
  203. {
  204. linger lin = { TRUE, 0 };
  205. setsockopt(s, SOL_SOCKET, SO_LINGER, (char*)&lin, sizeof(lin));
  206. closesocket(s);
  207. for (DWORD i = 0; i < m_dwBufferCount; i++)
  208. {
  209. CIOBuffer* p = (CIOBuffer*)(m_WriteBuff[i].buf - offsetof(CIOBuffer, Buffer));
  210. p->Release(__FILE__, __LINE__);
  211. }
  212. m_dwBufferCount = 0;
  213. IO_STRUCT* temp = m_pFirstHead;
  214. while (temp != NULL)
  215. {
  216. //long c = temp->pBuffer->m_wSize;
  217. IO_STRUCT* t = temp;
  218. temp = temp->pNext;
  219. t->pBuffer->Release(__FILE__, __LINE__);
  220. Alloc_Pool<IO_STRUCT>::GetInstance().Free(t);
  221. m_IOCount--;
  222. }
  223. m_dwWriteCount = 0;
  224. m_pFirstHead = m_pLastHead = NULL;
  225. m_IOCount = 0;
  226. OnClose();
  227. }
  228. }
  229. unsigned int CIOSocket::IOThread(void* param)
  230. {
  231. //srand((unsigned int)time(NULL));
  232. LPOVERLAPPED lpOverlapped;
  233. CIOSocket* pObject;
  234. DWORD dwTransferred;
  235. HANDLE hCom = (HANDLE)param;
  236. while (true)
  237. {
  238. BOOL bSucc = GetQueuedCompletionStatus(hCom, &dwTransferred, (PULONG_PTR)&pObject, &lpOverlapped, WSA_INFINITE);
  239. if (dwTransferred == -1 || dwTransferred == 0)
  240. {
  241. pObject->Close();
  242. }
  243. else
  244. {
  245. pObject->OnIOCallback(bSucc, dwTransferred, lpOverlapped);
  246. }
  247. }
  248. }
  249. void CIOSocket::OnIOCallback(BOOL bSucc, DWORD dwTransferred, LPOVERLAPPED lpOverlapped)
  250. {
  251. if (!bSucc)
  252. {
  253. if (lpOverlapped == &this->m_OverlappedRead)
  254. {
  255. Close();
  256. }
  257. }
  258. else
  259. {
  260. if (lpOverlapped == &this->m_OverlappedWrite)
  261. {
  262. WriteCallback(dwTransferred);
  263. }
  264. else if (lpOverlapped == &m_OverlappedRead)
  265. {
  266. ReadCallback(dwTransferred);
  267. }
  268. }
  269. }
  270. void CIOSocket::ReadCallback(DWORD dwTransferred)
  271. {
  272. if (m_hSocket == INVALID_SOCKET || dwTransferred <= 0) return;
  273. m_pBuffer->Length += dwTransferred;
  274. OnRead();
  275. }