SocketBuffer.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. /*******************************************************************************
  2. * Copyright (c) 2009, 2018 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * http://www.eclipse.org/legal/epl-v10.html
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial API and implementation and/or initial documentation
  15. * Ian Craggs, Allan Stockdill-Mander - SSL updates
  16. * Ian Craggs - fix for issue #244, issue #20
  17. *******************************************************************************/
  18. /**
  19. * @file
  20. * \brief Socket buffering related functions
  21. *
  22. * Some other related functions are in the Socket module
  23. */
  24. #include "SocketBuffer.h"
  25. #include "LinkedList.h"
  26. #include "Log.h"
  27. #include "Messages.h"
  28. #include "StackTrace.h"
  29. #include <stdlib.h>
  30. #include <stdio.h>
  31. #include <string.h>
  32. #include "Heap.h"
  33. #if defined(WIN32) || defined(WIN64)
  34. #define iov_len len
  35. #define iov_base buf
  36. #endif
  37. /**
  38. * Default input queue buffer
  39. */
  40. static socket_queue* def_queue;
  41. /**
  42. * List of queued input buffers
  43. */
  44. static List* queues;
  45. /**
  46. * List of queued write buffers
  47. */
  48. static List writes;
  49. int socketcompare(void* a, void* b);
  50. void SocketBuffer_newDefQ(void);
  51. void SocketBuffer_freeDefQ(void);
  52. int pending_socketcompare(void* a, void* b);
  53. /**
  54. * List callback function for comparing socket_queues by socket
  55. * @param a first integer value
  56. * @param b second integer value
  57. * @return boolean indicating whether a and b are equal
  58. */
  59. int socketcompare(void* a, void* b)
  60. {
  61. return ((socket_queue*)a)->socket == *(int*)b;
  62. }
  63. /**
  64. * Create a new default queue when one has just been used.
  65. */
  66. void SocketBuffer_newDefQ(void)
  67. {
  68. def_queue = malloc(sizeof(socket_queue));
  69. def_queue->buflen = 1000;
  70. def_queue->buf = malloc(def_queue->buflen);
  71. def_queue->socket = def_queue->index = 0;
  72. def_queue->buflen = def_queue->datalen = def_queue->headerlen = 0;
  73. }
  74. /**
  75. * Initialize the socketBuffer module
  76. */
  77. void SocketBuffer_initialize(void)
  78. {
  79. FUNC_ENTRY;
  80. SocketBuffer_newDefQ();
  81. queues = ListInitialize();
  82. ListZero(&writes);
  83. FUNC_EXIT;
  84. }
  85. /**
  86. * Free the default queue memory
  87. */
  88. void SocketBuffer_freeDefQ(void)
  89. {
  90. free(def_queue->buf);
  91. free(def_queue);
  92. def_queue = NULL;
  93. }
  94. /**
  95. * Terminate the socketBuffer module
  96. */
  97. void SocketBuffer_terminate(void)
  98. {
  99. ListElement* cur = NULL;
  100. ListEmpty(&writes);
  101. FUNC_ENTRY;
  102. while (ListNextElement(queues, &cur))
  103. free(((socket_queue*)(cur->content))->buf);
  104. ListFree(queues);
  105. SocketBuffer_freeDefQ();
  106. FUNC_EXIT;
  107. }
  108. /**
  109. * Cleanup any buffers for a specific socket
  110. * @param socket the socket to clean up
  111. */
  112. void SocketBuffer_cleanup(int socket)
  113. {
  114. FUNC_ENTRY;
  115. SocketBuffer_writeComplete(socket); /* clean up write buffers */
  116. if (ListFindItem(queues, &socket, socketcompare))
  117. {
  118. free(((socket_queue*)(queues->current->content))->buf);
  119. ListRemove(queues, queues->current->content);
  120. }
  121. if (def_queue->socket == socket)
  122. {
  123. def_queue->socket = def_queue->index = 0;
  124. def_queue->headerlen = def_queue->datalen = 0;
  125. }
  126. FUNC_EXIT;
  127. }
  128. /**
  129. * Get any queued data for a specific socket
  130. * @param socket the socket to get queued data for
  131. * @param bytes the number of bytes of data to retrieve
  132. * @param actual_len the actual length returned
  133. * @return the actual data
  134. */
  135. char* SocketBuffer_getQueuedData(int socket, size_t bytes, size_t* actual_len)
  136. {
  137. socket_queue* queue = NULL;
  138. FUNC_ENTRY;
  139. if (ListFindItem(queues, &socket, socketcompare))
  140. { /* if there is queued data for this socket, add any data read to it */
  141. queue = (socket_queue*)(queues->current->content);
  142. *actual_len = queue->datalen;
  143. }
  144. else
  145. {
  146. *actual_len = 0;
  147. queue = def_queue;
  148. }
  149. if (bytes > queue->buflen)
  150. {
  151. if (queue->datalen > 0)
  152. {
  153. void* newmem = malloc(bytes);
  154. memcpy(newmem, queue->buf, queue->datalen);
  155. free(queue->buf);
  156. queue->buf = newmem;
  157. }
  158. else
  159. queue->buf = realloc(queue->buf, bytes);
  160. queue->buflen = bytes;
  161. }
  162. FUNC_EXIT;
  163. return queue->buf;
  164. }
  165. /**
  166. * Get any queued character for a specific socket
  167. * @param socket the socket to get queued data for
  168. * @param c the character returned if any
  169. * @return completion code
  170. */
  171. int SocketBuffer_getQueuedChar(int socket, char* c)
  172. {
  173. int rc = SOCKETBUFFER_INTERRUPTED;
  174. FUNC_ENTRY;
  175. if (ListFindItem(queues, &socket, socketcompare))
  176. { /* if there is queued data for this socket, read that first */
  177. socket_queue* queue = (socket_queue*)(queues->current->content);
  178. if (queue->index < queue->headerlen)
  179. {
  180. *c = queue->fixed_header[(queue->index)++];
  181. Log(TRACE_MAX, -1, "index is now %d, headerlen %d", queue->index, (int)queue->headerlen);
  182. rc = SOCKETBUFFER_COMPLETE;
  183. goto exit;
  184. }
  185. else if (queue->index > 4)
  186. {
  187. Log(LOG_FATAL, -1, "header is already at full length");
  188. rc = SOCKET_ERROR;
  189. goto exit;
  190. }
  191. }
  192. exit:
  193. FUNC_EXIT_RC(rc);
  194. return rc; /* there was no queued char if rc is SOCKETBUFFER_INTERRUPTED*/
  195. }
  196. /**
  197. * A socket read was interrupted so we need to queue data
  198. * @param socket the socket to get queued data for
  199. * @param actual_len the actual length of data that was read
  200. */
  201. void SocketBuffer_interrupted(int socket, size_t actual_len)
  202. {
  203. socket_queue* queue = NULL;
  204. FUNC_ENTRY;
  205. if (ListFindItem(queues, &socket, socketcompare))
  206. queue = (socket_queue*)(queues->current->content);
  207. else /* new saved queue */
  208. {
  209. queue = def_queue;
  210. /* if SocketBuffer_queueChar() has not yet been called, then the socket number
  211. in def_queue will not have been set. Issue #244.
  212. If actual_len == 0 then we may not need to do anything - I'll leave that
  213. optimization for another time. */
  214. queue->socket = socket;
  215. ListAppend(queues, def_queue, sizeof(socket_queue)+def_queue->buflen);
  216. SocketBuffer_newDefQ();
  217. }
  218. queue->index = 0;
  219. queue->datalen = actual_len;
  220. FUNC_EXIT;
  221. }
  222. /**
  223. * A socket read has now completed so we can get rid of the queue
  224. * @param socket the socket for which the operation is now complete
  225. * @return pointer to the default queue data
  226. */
  227. char* SocketBuffer_complete(int socket)
  228. {
  229. FUNC_ENTRY;
  230. if (ListFindItem(queues, &socket, socketcompare))
  231. {
  232. socket_queue* queue = (socket_queue*)(queues->current->content);
  233. SocketBuffer_freeDefQ();
  234. def_queue = queue;
  235. ListDetach(queues, queue);
  236. }
  237. def_queue->socket = def_queue->index = 0;
  238. def_queue->headerlen = def_queue->datalen = 0;
  239. FUNC_EXIT;
  240. return def_queue->buf;
  241. }
  242. /**
  243. * A socket operation had now completed so we can get rid of the queue
  244. * @param socket the socket for which the operation is now complete
  245. * @param c the character to queue
  246. */
  247. void SocketBuffer_queueChar(int socket, char c)
  248. {
  249. int error = 0;
  250. socket_queue* curq = def_queue;
  251. FUNC_ENTRY;
  252. if (ListFindItem(queues, &socket, socketcompare))
  253. curq = (socket_queue*)(queues->current->content);
  254. else if (def_queue->socket == 0)
  255. {
  256. def_queue->socket = socket;
  257. def_queue->index = 0;
  258. def_queue->datalen = 0;
  259. }
  260. else if (def_queue->socket != socket)
  261. {
  262. Log(LOG_FATAL, -1, "attempt to reuse socket queue");
  263. error = 1;
  264. }
  265. if (curq->index > 4)
  266. {
  267. Log(LOG_FATAL, -1, "socket queue fixed_header field full");
  268. error = 1;
  269. }
  270. if (!error)
  271. {
  272. curq->fixed_header[(curq->index)++] = c;
  273. curq->headerlen = curq->index;
  274. }
  275. Log(TRACE_MAX, -1, "queueChar: index is now %d, headerlen %d", curq->index, (int)curq->headerlen);
  276. FUNC_EXIT;
  277. }
  278. /**
  279. * A socket write was interrupted so store the remaining data
  280. * @param socket the socket for which the write was interrupted
  281. * @param count the number of iovec buffers
  282. * @param iovecs buffer array
  283. * @param frees a set of flags indicating which of the iovecs array should be freed
  284. * @param total total data length to be written
  285. * @param bytes actual data length that was written
  286. */
  287. #if defined(OPENSSL)
  288. void SocketBuffer_pendingWrite(int socket, SSL* ssl, int count, iobuf* iovecs, int* frees, size_t total, size_t bytes)
  289. #else
  290. void SocketBuffer_pendingWrite(int socket, int count, iobuf* iovecs, int* frees, size_t total, size_t bytes)
  291. #endif
  292. {
  293. int i = 0;
  294. pending_writes* pw = NULL;
  295. FUNC_ENTRY;
  296. /* store the buffers until the whole packet is written */
  297. pw = malloc(sizeof(pending_writes));
  298. pw->socket = socket;
  299. #if defined(OPENSSL)
  300. pw->ssl = ssl;
  301. #endif
  302. pw->bytes = bytes;
  303. pw->total = total;
  304. pw->count = count;
  305. for (i = 0; i < count; i++)
  306. {
  307. pw->iovecs[i] = iovecs[i];
  308. pw->frees[i] = frees[i];
  309. }
  310. ListAppend(&writes, pw, sizeof(pw) + total);
  311. FUNC_EXIT;
  312. }
  313. /**
  314. * List callback function for comparing pending_writes by socket
  315. * @param a first integer value
  316. * @param b second integer value
  317. * @return boolean indicating whether a and b are equal
  318. */
  319. int pending_socketcompare(void* a, void* b)
  320. {
  321. return ((pending_writes*)a)->socket == *(int*)b;
  322. }
  323. /**
  324. * Get any queued write data for a specific socket
  325. * @param socket the socket to get queued data for
  326. * @return pointer to the queued data or NULL
  327. */
  328. pending_writes* SocketBuffer_getWrite(int socket)
  329. {
  330. ListElement* le = ListFindItem(&writes, &socket, pending_socketcompare);
  331. return (le) ? (pending_writes*)(le->content) : NULL;
  332. }
  333. /**
  334. * A socket write has now completed so we can get rid of the queue
  335. * @param socket the socket for which the operation is now complete
  336. * @return completion code, boolean - was the queue removed?
  337. */
  338. int SocketBuffer_writeComplete(int socket)
  339. {
  340. return ListRemoveItem(&writes, &socket, pending_socketcompare);
  341. }
  342. /**
  343. * Update the queued write data for a socket in the case of QoS 0 messages.
  344. * @param socket the socket for which the operation is now complete
  345. * @param topic the topic of the QoS 0 write
  346. * @param payload the payload of the QoS 0 write
  347. * @return pointer to the updated queued data structure, or NULL
  348. */
  349. pending_writes* SocketBuffer_updateWrite(int socket, char* topic, char* payload)
  350. {
  351. pending_writes* pw = NULL;
  352. ListElement* le = NULL;
  353. FUNC_ENTRY;
  354. if ((le = ListFindItem(&writes, &socket, pending_socketcompare)) != NULL)
  355. {
  356. pw = (pending_writes*)(le->content);
  357. if (pw->count == 4)
  358. {
  359. pw->iovecs[2].iov_base = topic;
  360. pw->iovecs[3].iov_base = payload;
  361. }
  362. }
  363. FUNC_EXIT;
  364. return pw;
  365. }