123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415 |
- /*******************************************************************************
- * Copyright (c) 2009, 2018 IBM Corp.
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * and Eclipse Distribution License v1.0 which accompany this distribution.
- *
- * The Eclipse Public License is available at
- * http://www.eclipse.org/legal/epl-v10.html
- * and the Eclipse Distribution License is available at
- * http://www.eclipse.org/org/documents/edl-v10.php.
- *
- * Contributors:
- * Ian Craggs - initial API and implementation and/or initial documentation
- * Ian Craggs, Allan Stockdill-Mander - SSL updates
- * Ian Craggs - fix for issue #244, issue #20
- *******************************************************************************/
- /**
- * @file
- * \brief Socket buffering related functions
- *
- * Some other related functions are in the Socket module
- */
- #include "SocketBuffer.h"
- #include "LinkedList.h"
- #include "Log.h"
- #include "Messages.h"
- #include "StackTrace.h"
- #include <stdlib.h>
- #include <stdio.h>
- #include <string.h>
- #include "Heap.h"
- #if defined(WIN32) || defined(WIN64)
- #define iov_len len
- #define iov_base buf
- #endif
- /**
- * Default input queue buffer
- */
- static socket_queue* def_queue;
- /**
- * List of queued input buffers
- */
- static List* queues;
- /**
- * List of queued write buffers
- */
- static List writes;
- int socketcompare(void* a, void* b);
- void SocketBuffer_newDefQ(void);
- void SocketBuffer_freeDefQ(void);
- int pending_socketcompare(void* a, void* b);
- /**
- * List callback function for comparing socket_queues by socket
- * @param a first integer value
- * @param b second integer value
- * @return boolean indicating whether a and b are equal
- */
- int socketcompare(void* a, void* b)
- {
- return ((socket_queue*)a)->socket == *(int*)b;
- }
- /**
- * Create a new default queue when one has just been used.
- */
- void SocketBuffer_newDefQ(void)
- {
- def_queue = malloc(sizeof(socket_queue));
- def_queue->buflen = 1000;
- def_queue->buf = malloc(def_queue->buflen);
- def_queue->socket = def_queue->index = 0;
- def_queue->buflen = def_queue->datalen = def_queue->headerlen = 0;
- }
- /**
- * Initialize the socketBuffer module
- */
- void SocketBuffer_initialize(void)
- {
- FUNC_ENTRY;
- SocketBuffer_newDefQ();
- queues = ListInitialize();
- ListZero(&writes);
- FUNC_EXIT;
- }
- /**
- * Free the default queue memory
- */
- void SocketBuffer_freeDefQ(void)
- {
- free(def_queue->buf);
- free(def_queue);
- def_queue = NULL;
- }
- /**
- * Terminate the socketBuffer module
- */
- void SocketBuffer_terminate(void)
- {
- ListElement* cur = NULL;
- ListEmpty(&writes);
- FUNC_ENTRY;
- while (ListNextElement(queues, &cur))
- free(((socket_queue*)(cur->content))->buf);
- ListFree(queues);
- SocketBuffer_freeDefQ();
- FUNC_EXIT;
- }
- /**
- * Cleanup any buffers for a specific socket
- * @param socket the socket to clean up
- */
- void SocketBuffer_cleanup(int socket)
- {
- FUNC_ENTRY;
- SocketBuffer_writeComplete(socket); /* clean up write buffers */
- if (ListFindItem(queues, &socket, socketcompare))
- {
- free(((socket_queue*)(queues->current->content))->buf);
- ListRemove(queues, queues->current->content);
- }
- if (def_queue->socket == socket)
- {
- def_queue->socket = def_queue->index = 0;
- def_queue->headerlen = def_queue->datalen = 0;
- }
- FUNC_EXIT;
- }
- /**
- * Get any queued data for a specific socket
- * @param socket the socket to get queued data for
- * @param bytes the number of bytes of data to retrieve
- * @param actual_len the actual length returned
- * @return the actual data
- */
- char* SocketBuffer_getQueuedData(int socket, size_t bytes, size_t* actual_len)
- {
- socket_queue* queue = NULL;
- FUNC_ENTRY;
- if (ListFindItem(queues, &socket, socketcompare))
- { /* if there is queued data for this socket, add any data read to it */
- queue = (socket_queue*)(queues->current->content);
- *actual_len = queue->datalen;
- }
- else
- {
- *actual_len = 0;
- queue = def_queue;
- }
- if (bytes > queue->buflen)
- {
- if (queue->datalen > 0)
- {
- void* newmem = malloc(bytes);
- memcpy(newmem, queue->buf, queue->datalen);
- free(queue->buf);
- queue->buf = newmem;
- }
- else
- queue->buf = realloc(queue->buf, bytes);
- queue->buflen = bytes;
- }
- FUNC_EXIT;
- return queue->buf;
- }
- /**
- * Get any queued character for a specific socket
- * @param socket the socket to get queued data for
- * @param c the character returned if any
- * @return completion code
- */
- int SocketBuffer_getQueuedChar(int socket, char* c)
- {
- int rc = SOCKETBUFFER_INTERRUPTED;
- FUNC_ENTRY;
- if (ListFindItem(queues, &socket, socketcompare))
- { /* if there is queued data for this socket, read that first */
- socket_queue* queue = (socket_queue*)(queues->current->content);
- if (queue->index < queue->headerlen)
- {
- *c = queue->fixed_header[(queue->index)++];
- Log(TRACE_MAX, -1, "index is now %d, headerlen %d", queue->index, (int)queue->headerlen);
- rc = SOCKETBUFFER_COMPLETE;
- goto exit;
- }
- else if (queue->index > 4)
- {
- Log(LOG_FATAL, -1, "header is already at full length");
- rc = SOCKET_ERROR;
- goto exit;
- }
- }
- exit:
- FUNC_EXIT_RC(rc);
- return rc; /* there was no queued char if rc is SOCKETBUFFER_INTERRUPTED*/
- }
- /**
- * A socket read was interrupted so we need to queue data
- * @param socket the socket to get queued data for
- * @param actual_len the actual length of data that was read
- */
- void SocketBuffer_interrupted(int socket, size_t actual_len)
- {
- socket_queue* queue = NULL;
- FUNC_ENTRY;
- if (ListFindItem(queues, &socket, socketcompare))
- queue = (socket_queue*)(queues->current->content);
- else /* new saved queue */
- {
- queue = def_queue;
- /* if SocketBuffer_queueChar() has not yet been called, then the socket number
- in def_queue will not have been set. Issue #244.
- If actual_len == 0 then we may not need to do anything - I'll leave that
- optimization for another time. */
- queue->socket = socket;
- ListAppend(queues, def_queue, sizeof(socket_queue)+def_queue->buflen);
- SocketBuffer_newDefQ();
- }
- queue->index = 0;
- queue->datalen = actual_len;
- FUNC_EXIT;
- }
- /**
- * A socket read has now completed so we can get rid of the queue
- * @param socket the socket for which the operation is now complete
- * @return pointer to the default queue data
- */
- char* SocketBuffer_complete(int socket)
- {
- FUNC_ENTRY;
- if (ListFindItem(queues, &socket, socketcompare))
- {
- socket_queue* queue = (socket_queue*)(queues->current->content);
- SocketBuffer_freeDefQ();
- def_queue = queue;
- ListDetach(queues, queue);
- }
- def_queue->socket = def_queue->index = 0;
- def_queue->headerlen = def_queue->datalen = 0;
- FUNC_EXIT;
- return def_queue->buf;
- }
- /**
- * A socket operation had now completed so we can get rid of the queue
- * @param socket the socket for which the operation is now complete
- * @param c the character to queue
- */
- void SocketBuffer_queueChar(int socket, char c)
- {
- int error = 0;
- socket_queue* curq = def_queue;
- FUNC_ENTRY;
- if (ListFindItem(queues, &socket, socketcompare))
- curq = (socket_queue*)(queues->current->content);
- else if (def_queue->socket == 0)
- {
- def_queue->socket = socket;
- def_queue->index = 0;
- def_queue->datalen = 0;
- }
- else if (def_queue->socket != socket)
- {
- Log(LOG_FATAL, -1, "attempt to reuse socket queue");
- error = 1;
- }
- if (curq->index > 4)
- {
- Log(LOG_FATAL, -1, "socket queue fixed_header field full");
- error = 1;
- }
- if (!error)
- {
- curq->fixed_header[(curq->index)++] = c;
- curq->headerlen = curq->index;
- }
- Log(TRACE_MAX, -1, "queueChar: index is now %d, headerlen %d", curq->index, (int)curq->headerlen);
- FUNC_EXIT;
- }
- /**
- * A socket write was interrupted so store the remaining data
- * @param socket the socket for which the write was interrupted
- * @param count the number of iovec buffers
- * @param iovecs buffer array
- * @param frees a set of flags indicating which of the iovecs array should be freed
- * @param total total data length to be written
- * @param bytes actual data length that was written
- */
- #if defined(OPENSSL)
- void SocketBuffer_pendingWrite(int socket, SSL* ssl, int count, iobuf* iovecs, int* frees, size_t total, size_t bytes)
- #else
- void SocketBuffer_pendingWrite(int socket, int count, iobuf* iovecs, int* frees, size_t total, size_t bytes)
- #endif
- {
- int i = 0;
- pending_writes* pw = NULL;
- FUNC_ENTRY;
- /* store the buffers until the whole packet is written */
- pw = malloc(sizeof(pending_writes));
- pw->socket = socket;
- #if defined(OPENSSL)
- pw->ssl = ssl;
- #endif
- pw->bytes = bytes;
- pw->total = total;
- pw->count = count;
- for (i = 0; i < count; i++)
- {
- pw->iovecs[i] = iovecs[i];
- pw->frees[i] = frees[i];
- }
- ListAppend(&writes, pw, sizeof(pw) + total);
- FUNC_EXIT;
- }
- /**
- * List callback function for comparing pending_writes by socket
- * @param a first integer value
- * @param b second integer value
- * @return boolean indicating whether a and b are equal
- */
- int pending_socketcompare(void* a, void* b)
- {
- return ((pending_writes*)a)->socket == *(int*)b;
- }
- /**
- * Get any queued write data for a specific socket
- * @param socket the socket to get queued data for
- * @return pointer to the queued data or NULL
- */
- pending_writes* SocketBuffer_getWrite(int socket)
- {
- ListElement* le = ListFindItem(&writes, &socket, pending_socketcompare);
- return (le) ? (pending_writes*)(le->content) : NULL;
- }
- /**
- * A socket write has now completed so we can get rid of the queue
- * @param socket the socket for which the operation is now complete
- * @return completion code, boolean - was the queue removed?
- */
- int SocketBuffer_writeComplete(int socket)
- {
- return ListRemoveItem(&writes, &socket, pending_socketcompare);
- }
- /**
- * Update the queued write data for a socket in the case of QoS 0 messages.
- * @param socket the socket for which the operation is now complete
- * @param topic the topic of the QoS 0 write
- * @param payload the payload of the QoS 0 write
- * @return pointer to the updated queued data structure, or NULL
- */
- pending_writes* SocketBuffer_updateWrite(int socket, char* topic, char* payload)
- {
- pending_writes* pw = NULL;
- ListElement* le = NULL;
- FUNC_ENTRY;
- if ((le = ListFindItem(&writes, &socket, pending_socketcompare)) != NULL)
- {
- pw = (pending_writes*)(le->content);
- if (pw->count == 4)
- {
- pw->iovecs[2].iov_base = topic;
- pw->iovecs[3].iov_base = payload;
- }
- }
- FUNC_EXIT;
- return pw;
- }
|