123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860 |
- /*******************************************************************************
- * Copyright (c) 2009, 2022 IBM Corp., Ian Craggs and others
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v2.0
- * and Eclipse Distribution License v1.0 which accompany this distribution.
- *
- * The Eclipse Public License is available at
- * https://www.eclipse.org/legal/epl-2.0/
- * and the Eclipse Distribution License is available at
- * http://www.eclipse.org/org/documents/edl-v10.php.
- *
- * Contributors:
- * Ian Craggs - initial implementation and documentation
- * Ian Craggs, Allan Stockdill-Mander - SSL support
- * Ian Craggs - multiple server connection support
- * Ian Craggs - fix for bug 413429 - connectionLost not called
- * Ian Craggs - fix for bug 415042 - using already freed structure
- * Ian Craggs - fix for bug 419233 - mutexes not reporting errors
- * Ian Craggs - fix for bug 420851
- * Ian Craggs - fix for bug 432903 - queue persistence
- * Ian Craggs - MQTT 3.1.1 support
- * Rong Xiang, Ian Craggs - C++ compatibility
- * Ian Craggs - fix for bug 442400: reconnecting after network cable unplugged
- * Ian Craggs - fix for bug 444934 - incorrect free in freeCommand1
- * Ian Craggs - fix for bug 445891 - assigning msgid is not thread safe
- * Ian Craggs - fix for bug 465369 - longer latency than expected
- * Ian Craggs - fix for bug 444103 - success/failure callbacks not invoked
- * Ian Craggs - fix for bug 484363 - segfault in getReadySocket
- * Ian Craggs - automatic reconnect and offline buffering (send while disconnected)
- * Ian Craggs - fix for bug 472250
- * Ian Craggs - fix for bug 486548
- * Ian Craggs - SNI support
- * Ian Craggs - auto reconnect timing fix #218
- * Ian Craggs - fix for issue #190
- * Ian Craggs - check for NULL SSL options #334
- * Ian Craggs - allocate username/password buffers #431
- * Ian Craggs - MQTT 5.0 support
- * Ian Craggs - refactor to reduce module size
- *******************************************************************************/
- #include <stdlib.h>
- #include <string.h>
- #if !defined(_WIN32) && !defined(_WIN64)
- #include <sys/time.h>
- #else
- #if defined(_MSC_VER) && _MSC_VER < 1900
- #define snprintf _snprintf
- #endif
- #endif
- #if !defined(NO_PERSISTENCE)
- #include "MQTTPersistence.h"
- #endif
- #include "MQTTAsync.h"
- #include "MQTTAsyncUtils.h"
- #include "utf-8.h"
- #include "MQTTProtocol.h"
- #include "MQTTProtocolOut.h"
- #include "Thread.h"
- #include "SocketBuffer.h"
- #include "StackTrace.h"
- #include "Heap.h"
- #include "OsWrapper.h"
- #include "WebSocket.h"
- static void MQTTAsync_freeServerURIs(MQTTAsyncs* m);
- #include "VersionInfo.h"
- const char *client_timestamp_eye = "MQTTAsyncV3_Timestamp " BUILD_TIMESTAMP;
- const char *client_version_eye = "MQTTAsyncV3_Version " CLIENT_VERSION;
- volatile int global_initialized = 0;
- List* MQTTAsync_handles = NULL;
- List* MQTTAsync_commands = NULL;
- int MQTTAsync_tostop = 0;
- static ClientStates ClientState =
- {
- CLIENT_VERSION, /* version */
- NULL /* client list */
- };
- MQTTProtocol state;
- ClientStates* bstate = &ClientState;
- enum MQTTAsync_threadStates sendThread_state = STOPPED;
- enum MQTTAsync_threadStates receiveThread_state = STOPPED;
- thread_id_type sendThread_id = 0,
- receiveThread_id = 0;
- // global objects init declaration
- int MQTTAsync_init(void);
- void MQTTAsync_global_init(MQTTAsync_init_options* inits)
- {
- MQTTAsync_init();
- #if defined(OPENSSL)
- SSLSocket_handleOpensslInit(inits->do_openssl_init);
- #endif
- }
- #if !defined(min)
- #define min(a, b) (((a) < (b)) ? (a) : (b))
- #endif
- #if defined(WIN32) || defined(WIN64)
- void MQTTAsync_init_rand(void)
- {
- START_TIME_TYPE now = MQTTTime_start_clock();
- srand((unsigned int)now);
- }
- #elif defined(AIX)
- void MQTTAsync_init_rand(void)
- {
- START_TIME_TYPE now = MQTTTime_start_clock();
- srand(now.tv_nsec);
- }
- #else
- void MQTTAsync_init_rand(void)
- {
- START_TIME_TYPE now = MQTTTime_start_clock();
- srand(now.tv_usec);
- }
- #endif
- #if defined(_WIN32) || defined(_WIN64)
- mutex_type mqttasync_mutex = NULL;
- mutex_type socket_mutex = NULL;
- mutex_type mqttcommand_mutex = NULL;
- sem_type send_sem = NULL;
- #if !defined(NO_HEAP_TRACKING)
- extern mutex_type stack_mutex;
- extern mutex_type heap_mutex;
- #endif
- extern mutex_type log_mutex;
- int MQTTAsync_init(void)
- {
- DWORD rc = 0;
- if (mqttasync_mutex == NULL)
- {
- if ((mqttasync_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
- {
- rc = GetLastError();
- printf("mqttasync_mutex error %d\n", rc);
- goto exit;
- }
- if ((mqttcommand_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
- {
- rc = GetLastError();
- printf("mqttcommand_mutex error %d\n", rc);
- goto exit;
- }
- if ((send_sem = CreateEvent(
- NULL, /* default security attributes */
- FALSE, /* manual-reset event? */
- FALSE, /* initial state is nonsignaled */
- NULL /* object name */
- )) == NULL)
- {
- rc = GetLastError();
- printf("send_sem error %d\n", rc);
- goto exit;
- }
- #if !defined(NO_HEAP_TRACKING)
- if ((stack_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
- {
- rc = GetLastError();
- printf("stack_mutex error %d\n", rc);
- goto exit;
- }
- if ((heap_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
- {
- rc = GetLastError();
- printf("heap_mutex error %d\n", rc);
- goto exit;
- }
- #endif
- if ((log_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
- {
- rc = GetLastError();
- printf("log_mutex error %d\n", rc);
- goto exit;
- }
- if ((socket_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
- {
- rc = GetLastError();
- printf("socket_mutex error %d\n", rc);
- goto exit;
- }
- }
- else
- {
- Log(TRACE_MAX, -1, "Library already initialized");
- }
- exit:
- return rc;
- }
- void MQTTAsync_cleanup(void)
- {
- if (send_sem)
- CloseHandle(send_sem);
- #if !defined(NO_HEAP_TRACKING)
- if (stack_mutex)
- CloseHandle(stack_mutex);
- if (heap_mutex)
- CloseHandle(heap_mutex);
- #endif
- if (log_mutex)
- CloseHandle(log_mutex);
- if (socket_mutex)
- CloseHandle(socket_mutex);
- if (mqttasync_mutex)
- CloseHandle(mqttasync_mutex);
- }
- #if defined(PAHO_MQTT_STATIC)
- static INIT_ONCE g_InitOnce = INIT_ONCE_STATIC_INIT; /* Global for one time initialization */
- /* This runs at most once */
- BOOL CALLBACK InitMutexesOnce (
- PINIT_ONCE InitOnce, /* Pointer to one-time initialization structure */
- PVOID Parameter, /* Optional parameter */
- PVOID *lpContext) /* Return data, if any */
- {
- int rc = MQTTAsync_init();
- return rc == 0;
- }
- #else
- BOOL APIENTRY DllMain(HANDLE hModule,
- DWORD ul_reason_for_call,
- LPVOID lpReserved)
- {
- switch (ul_reason_for_call)
- {
- case DLL_PROCESS_ATTACH:
- MQTTAsync_init();
- break;
- case DLL_THREAD_ATTACH:
- break;
- case DLL_THREAD_DETACH:
- break;
- case DLL_PROCESS_DETACH:
- if (lpReserved)
- MQTTAsync_cleanup();
- break;
- }
- return TRUE;
- }
- #endif
- #else
- static pthread_mutex_t mqttasync_mutex_store = PTHREAD_MUTEX_INITIALIZER;
- mutex_type mqttasync_mutex = &mqttasync_mutex_store;
- static pthread_mutex_t socket_mutex_store = PTHREAD_MUTEX_INITIALIZER;
- mutex_type socket_mutex = &socket_mutex_store;
- static pthread_mutex_t mqttcommand_mutex_store = PTHREAD_MUTEX_INITIALIZER;
- mutex_type mqttcommand_mutex = &mqttcommand_mutex_store;
- static cond_type_struct send_cond_store = { PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER };
- cond_type send_cond = &send_cond_store;
- int MQTTAsync_init(void)
- {
- pthread_mutexattr_t attr;
- int rc;
- pthread_mutexattr_init(&attr);
- #if !defined(_WRS_KERNEL)
- pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
- #else
- /* #warning "no pthread_mutexattr_settype" */
- #endif
- if ((rc = pthread_mutex_init(mqttasync_mutex, &attr)) != 0)
- printf("MQTTAsync: error %d initializing async_mutex\n", rc);
- else if ((rc = pthread_mutex_init(mqttcommand_mutex, &attr)) != 0)
- printf("MQTTAsync: error %d initializing command_mutex\n", rc);
- else if ((rc = pthread_mutex_init(socket_mutex, &attr)) != 0)
- printf("MQTTClient: error %d initializing socket_mutex\n", rc);
- else if ((rc = pthread_cond_init(&send_cond->cond, NULL)) != 0)
- printf("MQTTAsync: error %d initializing send_cond cond\n", rc);
- else if ((rc = pthread_mutex_init(&send_cond->mutex, &attr)) != 0)
- printf("MQTTAsync: error %d initializing send_cond mutex\n", rc);
- return rc;
- }
- #endif
- int MQTTAsync_createWithOptions(MQTTAsync* handle, const char* serverURI, const char* clientId,
- int persistence_type, void* persistence_context, MQTTAsync_createOptions* options)
- {
- int rc = 0;
- MQTTAsyncs *m = NULL;
- #if (defined(_WIN32) || defined(_WIN64)) && defined(PAHO_MQTT_STATIC)
- /* intializes mutexes once. Must come before FUNC_ENTRY */
- BOOL bStatus = InitOnceExecuteOnce(&g_InitOnce, InitMutexesOnce, NULL, NULL);
- #endif
- FUNC_ENTRY;
- MQTTAsync_lock_mutex(mqttasync_mutex);
- if (serverURI == NULL || clientId == NULL)
- {
- rc = MQTTASYNC_NULL_PARAMETER;
- goto exit;
- }
- if (!UTF8_validateString(clientId))
- {
- rc = MQTTASYNC_BAD_UTF8_STRING;
- goto exit;
- }
- if (strlen(clientId) == 0 && persistence_type == MQTTCLIENT_PERSISTENCE_DEFAULT)
- {
- rc = MQTTASYNC_PERSISTENCE_ERROR;
- goto exit;
- }
- if (strstr(serverURI, "://") != NULL)
- {
- if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) != 0
- && strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) != 0
- && strncmp(URI_WS, serverURI, strlen(URI_WS)) != 0
- #if defined(OPENSSL)
- && strncmp(URI_SSL, serverURI, strlen(URI_SSL)) != 0
- && strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) != 0
- && strncmp(URI_WSS, serverURI, strlen(URI_WSS)) != 0
- #endif
- )
- {
- rc = MQTTASYNC_BAD_PROTOCOL;
- goto exit;
- }
- }
- if (options && options->maxBufferedMessages <= 0)
- {
- rc = MQTTASYNC_MAX_BUFFERED;
- goto exit;
- }
- if (options && (strncmp(options->struct_id, "MQCO", 4) != 0 ||
- options->struct_version < 0 || options->struct_version > 2))
- {
- rc = MQTTASYNC_BAD_STRUCTURE;
- goto exit;
- }
- if (!global_initialized)
- {
- #if !defined(NO_HEAP_TRACKING)
- Heap_initialize();
- #endif
- Log_initialize((Log_nameValue*)MQTTAsync_getVersionInfo());
- bstate->clients = ListInitialize();
- Socket_outInitialize();
- Socket_setWriteContinueCallback(MQTTAsync_writeContinue);
- Socket_setWriteCompleteCallback(MQTTAsync_writeComplete);
- Socket_setWriteAvailableCallback(MQTTProtocol_writeAvailable);
- MQTTAsync_handles = ListInitialize();
- MQTTAsync_commands = ListInitialize();
- #if defined(OPENSSL)
- SSLSocket_initialize();
- #endif
- global_initialized = 1;
- }
- if ((m = malloc(sizeof(MQTTAsyncs))) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- *handle = m;
- memset(m, '\0', sizeof(MQTTAsyncs));
- if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
- serverURI += strlen(URI_TCP);
- else if (strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) == 0)
- serverURI += strlen(URI_MQTT);
- else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
- {
- serverURI += strlen(URI_WS);
- m->websocket = 1;
- }
- #if defined(OPENSSL)
- else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
- {
- serverURI += strlen(URI_SSL);
- m->ssl = 1;
- }
- else if (strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) == 0)
- {
- serverURI += strlen(URI_MQTTS);
- m->ssl = 1;
- }
- else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
- {
- serverURI += strlen(URI_WSS);
- m->ssl = 1;
- m->websocket = 1;
- }
- #endif
- if ((m->serverURI = MQTTStrdup(serverURI)) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- m->responses = ListInitialize();
- ListAppend(MQTTAsync_handles, m, sizeof(MQTTAsyncs));
- if ((m->c = malloc(sizeof(Clients))) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- memset(m->c, '\0', sizeof(Clients));
- m->c->context = m;
- m->c->outboundMsgs = ListInitialize();
- m->c->inboundMsgs = ListInitialize();
- m->c->messageQueue = ListInitialize();
- m->c->outboundQueue = ListInitialize();
- m->c->clientID = MQTTStrdup(clientId);
- if (m->c->context == NULL || m->c->outboundMsgs == NULL || m->c->inboundMsgs == NULL ||
- m->c->messageQueue == NULL || m->c->outboundQueue == NULL || m->c->clientID == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- m->c->MQTTVersion = MQTTVERSION_DEFAULT;
- m->shouldBeConnected = 0;
- if (options)
- {
- if ((m->createOptions = malloc(sizeof(MQTTAsync_createOptions))) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- memcpy(m->createOptions, options, sizeof(MQTTAsync_createOptions));
- if (options->struct_version > 0)
- m->c->MQTTVersion = options->MQTTVersion;
- }
- #if !defined(NO_PERSISTENCE)
- rc = MQTTPersistence_create(&(m->c->persistence), persistence_type, persistence_context);
- if (rc == 0)
- {
- rc = MQTTPersistence_initialize(m->c, m->serverURI); /* inflight messages restored here */
- if (rc == 0)
- {
- if (m->createOptions && m->createOptions->struct_version >= 2 && m->createOptions->restoreMessages == 0)
- MQTTAsync_unpersistCommandsAndMessages(m->c);
- else
- {
- MQTTAsync_restoreCommands(m);
- MQTTPersistence_restoreMessageQueue(m->c);
- }
- }
- }
- #endif
- ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List));
- exit:
- MQTTAsync_unlock_mutex(mqttasync_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId,
- int persistence_type, void* persistence_context)
- {
- MQTTAsync_init_rand();
- return MQTTAsync_createWithOptions(handle, serverURI, clientId, persistence_type,
- persistence_context, NULL);
- }
- void MQTTAsync_destroy(MQTTAsync* handle)
- {
- MQTTAsyncs* m = *handle;
- FUNC_ENTRY;
- MQTTAsync_lock_mutex(mqttasync_mutex);
- if (m == NULL)
- goto exit;
- MQTTAsync_closeSession(m->c, MQTTREASONCODE_SUCCESS, NULL);
- MQTTAsync_NULLPublishResponses(m);
- MQTTAsync_freeResponses(m);
- MQTTAsync_NULLPublishCommands(m);
- MQTTAsync_freeCommands(m);
- ListFree(m->responses);
- if (m->c)
- {
- SOCKET saved_socket = m->c->net.socket;
- char* saved_clientid = MQTTStrdup(m->c->clientID);
- #if !defined(NO_PERSISTENCE)
- MQTTPersistence_close(m->c);
- #endif
- MQTTAsync_emptyMessageQueue(m->c);
- MQTTProtocol_freeClient(m->c);
- if (!ListRemove(bstate->clients, m->c))
- Log(LOG_ERROR, 0, NULL);
- else
- Log(TRACE_MIN, 1, NULL, saved_clientid, saved_socket);
- free(saved_clientid);
- }
- if (m->serverURI)
- free(m->serverURI);
- if (m->createOptions)
- free(m->createOptions);
- MQTTAsync_freeServerURIs(m);
- if (m->connectProps)
- {
- MQTTProperties_free(m->connectProps);
- free(m->connectProps);
- m->connectProps = NULL;
- }
- if (m->willProps)
- {
- MQTTProperties_free(m->willProps);
- free(m->willProps);
- m->willProps = NULL;
- }
- if (!ListRemove(MQTTAsync_handles, m))
- Log(LOG_ERROR, -1, "free error");
- *handle = NULL;
- if (bstate->clients->count == 0)
- MQTTAsync_terminate();
- exit:
- MQTTAsync_unlock_mutex(mqttasync_mutex);
- FUNC_EXIT;
- }
- int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
- {
- MQTTAsyncs* m = handle;
- int rc = MQTTASYNC_SUCCESS;
- MQTTAsync_queuedCommand* conn;
- thread_id_type thread_id = 0;
- int locked = 0;
- FUNC_ENTRY;
- if (options == NULL)
- {
- rc = MQTTASYNC_NULL_PARAMETER;
- goto exit;
- }
- if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 8)
- {
- rc = MQTTASYNC_BAD_STRUCTURE;
- goto exit;
- }
- #if defined(OPENSSL)
- if (m->ssl && options->ssl == NULL)
- {
- rc = MQTTASYNC_NULL_PARAMETER;
- goto exit;
- }
- #endif
- if (options->will) /* check validity of will options structure */
- {
- if (strncmp(options->will->struct_id, "MQTW", 4) != 0 || (options->will->struct_version != 0 && options->will->struct_version != 1))
- {
- rc = MQTTASYNC_BAD_STRUCTURE;
- goto exit;
- }
- if (options->will->qos < 0 || options->will->qos > 2)
- {
- rc = MQTTASYNC_BAD_QOS;
- goto exit;
- }
- if (options->will->topicName == NULL)
- {
- rc = MQTTASYNC_NULL_PARAMETER;
- goto exit;
- } else if (strlen(options->will->topicName) == 0)
- {
- rc = MQTTASYNC_0_LEN_WILL_TOPIC;
- goto exit;
- }
- }
- if (options->struct_version != 0 && options->ssl) /* check validity of SSL options structure */
- {
- if (strncmp(options->ssl->struct_id, "MQTS", 4) != 0 || options->ssl->struct_version < 0 || options->ssl->struct_version > 5)
- {
- rc = MQTTASYNC_BAD_STRUCTURE;
- goto exit;
- }
- }
- if (options->MQTTVersion >= MQTTVERSION_5 && m->c->MQTTVersion < MQTTVERSION_5)
- {
- rc = MQTTASYNC_WRONG_MQTT_VERSION;
- goto exit;
- }
- if ((options->username && !UTF8_validateString(options->username)) ||
- (options->password && !UTF8_validateString(options->password)))
- {
- rc = MQTTASYNC_BAD_UTF8_STRING;
- goto exit;
- }
- if (options->MQTTVersion >= MQTTVERSION_5 && options->struct_version < 6)
- {
- rc = MQTTASYNC_BAD_STRUCTURE;
- goto exit;
- }
- if (options->MQTTVersion >= MQTTVERSION_5 && options->cleansession != 0)
- {
- rc = MQTTASYNC_BAD_MQTT_OPTION;
- goto exit;
- }
- if (options->MQTTVersion < MQTTVERSION_5 && options->struct_version >= 6)
- {
- if (options->cleanstart != 0 || options->onFailure5 || options->onSuccess5 ||
- options->connectProperties || options->willProperties)
- {
- rc = MQTTASYNC_BAD_MQTT_OPTION;
- goto exit;
- }
- }
- m->connect.onSuccess = options->onSuccess;
- m->connect.onFailure = options->onFailure;
- if (options->struct_version >= 6)
- {
- m->connect.onSuccess5 = options->onSuccess5;
- m->connect.onFailure5 = options->onFailure5;
- }
- m->connect.context = options->context;
- m->connectTimeout = options->connectTimeout;
- /* don't lock async mutex if we are being called from a callback */
- thread_id = Thread_getid();
- if (thread_id != sendThread_id && thread_id != receiveThread_id)
- {
- MQTTAsync_lock_mutex(mqttasync_mutex);
- locked = 1;
- }
- MQTTAsync_tostop = 0;
- if (sendThread_state != STARTING && sendThread_state != RUNNING)
- {
- sendThread_state = STARTING;
- Thread_start(MQTTAsync_sendThread, NULL);
- }
- if (receiveThread_state != STARTING && receiveThread_state != RUNNING)
- {
- receiveThread_state = STARTING;
- Thread_start(MQTTAsync_receiveThread, handle);
- }
- if (locked)
- MQTTAsync_unlock_mutex(mqttasync_mutex);
- m->c->keepAliveInterval = options->keepAliveInterval;
- setRetryLoopInterval(options->keepAliveInterval);
- m->c->cleansession = options->cleansession;
- m->c->maxInflightMessages = options->maxInflight;
- if (options->struct_version >= 3)
- m->c->MQTTVersion = options->MQTTVersion;
- else
- m->c->MQTTVersion = MQTTVERSION_DEFAULT;
- if (options->struct_version >= 4)
- {
- m->automaticReconnect = options->automaticReconnect;
- m->minRetryInterval = options->minRetryInterval;
- m->maxRetryInterval = options->maxRetryInterval;
- }
- if (options->struct_version >= 7)
- {
- m->c->net.httpHeaders = (const MQTTClient_nameValue *) options->httpHeaders;
- }
- if (options->struct_version >= 8)
- {
- if (options->httpProxy)
- m->c->httpProxy = MQTTStrdup(options->httpProxy);
- if (options->httpsProxy)
- m->c->httpsProxy = MQTTStrdup(options->httpsProxy);
- }
- if (m->c->will)
- {
- free(m->c->will->payload);
- free(m->c->will->topic);
- free(m->c->will);
- m->c->will = NULL;
- }
- if (options->will && (options->will->struct_version == 0 || options->will->struct_version == 1))
- {
- const void* source = NULL;
- if ((m->c->will = malloc(sizeof(willMessages))) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- if (options->will->message || (options->will->struct_version == 1 && options->will->payload.data))
- {
- if (options->will->struct_version == 1 && options->will->payload.data)
- {
- m->c->will->payloadlen = options->will->payload.len;
- source = options->will->payload.data;
- }
- else
- {
- m->c->will->payloadlen = (int)strlen(options->will->message);
- source = (void*)options->will->message;
- }
- if ((m->c->will->payload = malloc(m->c->will->payloadlen)) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- memcpy(m->c->will->payload, source, m->c->will->payloadlen);
- }
- else
- {
- m->c->will->payload = NULL;
- m->c->will->payloadlen = 0;
- }
- m->c->will->qos = options->will->qos;
- m->c->will->retained = options->will->retained;
- m->c->will->topic = MQTTStrdup(options->will->topicName);
- }
- #if defined(OPENSSL)
- if (m->c->sslopts)
- {
- if (m->c->sslopts->trustStore)
- free((void*)m->c->sslopts->trustStore);
- if (m->c->sslopts->keyStore)
- free((void*)m->c->sslopts->keyStore);
- if (m->c->sslopts->privateKey)
- free((void*)m->c->sslopts->privateKey);
- if (m->c->sslopts->privateKeyPassword)
- free((void*)m->c->sslopts->privateKeyPassword);
- if (m->c->sslopts->enabledCipherSuites)
- free((void*)m->c->sslopts->enabledCipherSuites);
- if (m->c->sslopts->struct_version >= 2)
- {
- if (m->c->sslopts->CApath)
- free((void*)m->c->sslopts->CApath);
- }
- free((void*)m->c->sslopts);
- m->c->sslopts = NULL;
- }
- if (options->struct_version != 0 && options->ssl)
- {
- if ((m->c->sslopts = malloc(sizeof(MQTTClient_SSLOptions))) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- memset(m->c->sslopts, '\0', sizeof(MQTTClient_SSLOptions));
- m->c->sslopts->struct_version = options->ssl->struct_version;
- if (options->ssl->trustStore)
- m->c->sslopts->trustStore = MQTTStrdup(options->ssl->trustStore);
- if (options->ssl->keyStore)
- m->c->sslopts->keyStore = MQTTStrdup(options->ssl->keyStore);
- if (options->ssl->privateKey)
- m->c->sslopts->privateKey = MQTTStrdup(options->ssl->privateKey);
- if (options->ssl->privateKeyPassword)
- m->c->sslopts->privateKeyPassword = MQTTStrdup(options->ssl->privateKeyPassword);
- if (options->ssl->enabledCipherSuites)
- m->c->sslopts->enabledCipherSuites = MQTTStrdup(options->ssl->enabledCipherSuites);
- m->c->sslopts->enableServerCertAuth = options->ssl->enableServerCertAuth;
- if (m->c->sslopts->struct_version >= 1)
- m->c->sslopts->sslVersion = options->ssl->sslVersion;
- if (m->c->sslopts->struct_version >= 2)
- {
- m->c->sslopts->verify = options->ssl->verify;
- if (options->ssl->CApath)
- m->c->sslopts->CApath = MQTTStrdup(options->ssl->CApath);
- }
- if (m->c->sslopts->struct_version >= 3)
- {
- m->c->sslopts->ssl_error_cb = options->ssl->ssl_error_cb;
- m->c->sslopts->ssl_error_context = options->ssl->ssl_error_context;
- }
- if (m->c->sslopts->struct_version >= 4)
- {
- m->c->sslopts->ssl_psk_cb = options->ssl->ssl_psk_cb;
- m->c->sslopts->ssl_psk_context = options->ssl->ssl_psk_context;
- m->c->sslopts->disableDefaultTrustStore = options->ssl->disableDefaultTrustStore;
- }
- if (m->c->sslopts->struct_version >= 5)
- {
- if (options->ssl->protos)
- m->c->sslopts->protos = (const unsigned char*)MQTTStrdup((const char*)options->ssl->protos);
- m->c->sslopts->protos_len = options->ssl->protos_len;
- }
- }
- #else
- if (options->struct_version != 0 && options->ssl)
- {
- rc = MQTTASYNC_SSL_NOT_SUPPORTED;
- goto exit;
- }
- #endif
- if (m->c->username)
- {
- free((void*)m->c->username);
- m->c->username = NULL;
- }
- if (options->username)
- m->c->username = MQTTStrdup(options->username);
- if (m->c->password)
- {
- free((void*)m->c->password);
- m->c->password = NULL;
- }
- if (options->password)
- {
- m->c->password = MQTTStrdup(options->password);
- m->c->passwordlen = (int)strlen(options->password);
- }
- else if (options->struct_version >= 5 && options->binarypwd.data)
- {
- m->c->passwordlen = options->binarypwd.len;
- if ((m->c->password = malloc(m->c->passwordlen)) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- memcpy((void*)m->c->password, options->binarypwd.data, m->c->passwordlen);
- }
- m->c->retryInterval = options->retryInterval;
- m->shouldBeConnected = 1;
- m->connectTimeout = options->connectTimeout;
- MQTTAsync_freeServerURIs(m);
- if (options->struct_version >= 2 && options->serverURIcount > 0)
- {
- int i;
- m->serverURIcount = options->serverURIcount;
- if ((m->serverURIs = malloc(options->serverURIcount * sizeof(char*))) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- for (i = 0; i < options->serverURIcount; ++i)
- m->serverURIs[i] = MQTTStrdup(options->serverURIs[i]);
- }
- if (m->connectProps)
- {
- MQTTProperties_free(m->connectProps);
- free(m->connectProps);
- m->connectProps = NULL;
- }
- if (m->willProps)
- {
- MQTTProperties_free(m->willProps);
- free(m->willProps);
- m->willProps = NULL;
- }
- if (options->struct_version >=6)
- {
- if (options->connectProperties)
- {
- MQTTProperties initialized = MQTTProperties_initializer;
- if ((m->connectProps = malloc(sizeof(MQTTProperties))) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- *m->connectProps = initialized;
- *m->connectProps = MQTTProperties_copy(options->connectProperties);
- if (MQTTProperties_hasProperty(options->connectProperties, MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL))
- m->c->sessionExpiry = MQTTProperties_getNumericValue(options->connectProperties,
- MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL);
- }
- if (options->willProperties)
- {
- MQTTProperties initialized = MQTTProperties_initializer;
- if ((m->willProps = malloc(sizeof(MQTTProperties))) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- *m->willProps = initialized;
- *m->willProps = MQTTProperties_copy(options->willProperties);
- }
- m->c->cleanstart = options->cleanstart;
- }
- /* Add connect request to operation queue */
- if ((conn = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
- conn->client = m;
- if (options)
- {
- conn->command.onSuccess = options->onSuccess;
- conn->command.onFailure = options->onFailure;
- conn->command.onSuccess5 = options->onSuccess5;
- conn->command.onFailure5 = options->onFailure5;
- conn->command.context = options->context;
- }
- conn->command.type = CONNECT;
- conn->command.details.conn.currentURI = 0;
- rc = MQTTAsync_addCommand(conn, sizeof(conn));
- exit:
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTAsync_reconnect(MQTTAsync handle)
- {
- int rc = MQTTASYNC_FAILURE;
- MQTTAsyncs* m = handle;
- FUNC_ENTRY;
- MQTTAsync_lock_mutex(mqttasync_mutex);
- if (m->automaticReconnect)
- {
- if (m->shouldBeConnected)
- {
- m->reconnectNow = 1;
- m->currentIntervalBase = m->minRetryInterval;
- m->currentInterval = m->minRetryInterval;
- m->retrying = 1;
- rc = MQTTASYNC_SUCCESS;
- }
- }
- else
- {
- /* to reconnect, put the connect command to the head of the command queue */
- MQTTAsync_queuedCommand* conn = malloc(sizeof(MQTTAsync_queuedCommand));
- if (!conn)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
- conn->client = m;
- conn->command = m->connect;
- /* make sure that the version attempts are restarted */
- if (m->c->MQTTVersion == MQTTVERSION_DEFAULT)
- conn->command.details.conn.MQTTVersion = 0;
- rc = MQTTAsync_addCommand(conn, sizeof(m->connect));
- }
- exit:
- MQTTAsync_unlock_mutex(mqttasync_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, const int* qos, MQTTAsync_responseOptions* response)
- {
- MQTTAsyncs* m = handle;
- int i = 0;
- int rc = MQTTASYNC_SUCCESS;
- MQTTAsync_queuedCommand* sub;
- int msgid = 0;
- FUNC_ENTRY;
- if (m == NULL || m->c == NULL)
- rc = MQTTASYNC_FAILURE;
- else if (m->c->connected == 0)
- rc = MQTTASYNC_DISCONNECTED;
- else for (i = 0; i < count; i++)
- {
- if (!UTF8_validateString(topic[i]))
- {
- rc = MQTTASYNC_BAD_UTF8_STRING;
- break;
- }
- if (qos[i] < 0 || qos[i] > 2)
- {
- rc = MQTTASYNC_BAD_QOS;
- break;
- }
- }
- if (rc != MQTTASYNC_SUCCESS)
- ; /* don't overwrite a previous error code */
- else if ((msgid = MQTTAsync_assignMsgId(m)) == 0)
- rc = MQTTASYNC_NO_MORE_MSGIDS;
- else if (m->c->MQTTVersion >= MQTTVERSION_5 && count > 1 && (count != response->subscribeOptionsCount
- && response->subscribeOptionsCount != 0))
- rc = MQTTASYNC_BAD_MQTT_OPTION;
- else if (response)
- {
- if (m->c->MQTTVersion >= MQTTVERSION_5)
- {
- if (response->struct_version == 0 || response->onFailure || response->onSuccess)
- rc = MQTTASYNC_BAD_MQTT_OPTION;
- }
- else if (m->c->MQTTVersion < MQTTVERSION_5)
- {
- if (response->struct_version >= 1 && (response->onFailure5 || response->onSuccess5))
- rc = MQTTASYNC_BAD_MQTT_OPTION;
- }
- }
- if (rc != MQTTASYNC_SUCCESS)
- goto exit;
- /* Add subscribe request to operation queue */
- if ((sub = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- memset(sub, '\0', sizeof(MQTTAsync_queuedCommand));
- sub->client = m;
- sub->command.token = msgid;
- if (response)
- {
- sub->command.onSuccess = response->onSuccess;
- sub->command.onFailure = response->onFailure;
- sub->command.onSuccess5 = response->onSuccess5;
- sub->command.onFailure5 = response->onFailure5;
- sub->command.context = response->context;
- response->token = sub->command.token;
- if (m->c->MQTTVersion >= MQTTVERSION_5)
- {
- sub->command.properties = MQTTProperties_copy(&response->properties);
- sub->command.details.sub.opts = response->subscribeOptions;
- if (count > 1)
- {
- if ((sub->command.details.sub.optlist = malloc(sizeof(MQTTSubscribe_options) * count)) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- if (response->subscribeOptionsCount == 0)
- {
- MQTTSubscribe_options initialized = MQTTSubscribe_options_initializer;
- for (i = 0; i < count; ++i)
- sub->command.details.sub.optlist[i] = initialized;
- }
- else
- {
- for (i = 0; i < count; ++i)
- sub->command.details.sub.optlist[i] = response->subscribeOptionsList[i];
- }
- }
- }
- }
- sub->command.type = SUBSCRIBE;
- sub->command.details.sub.count = count;
- sub->command.details.sub.topics = malloc(sizeof(char*) * count);
- sub->command.details.sub.qoss = malloc(sizeof(int) * count);
- if (sub->command.details.sub.topics && sub->command.details.sub.qoss)
- {
- for (i = 0; i < count; ++i)
- {
- if ((sub->command.details.sub.topics[i] = MQTTStrdup(topic[i])) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- sub->command.details.sub.qoss[i] = qos[i];
- }
- rc = MQTTAsync_addCommand(sub, sizeof(sub));
- }
- else
- rc = PAHO_MEMORY_ERROR;
- exit:
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTAsync_subscribe(MQTTAsync handle, const char* topic, int qos, MQTTAsync_responseOptions* response)
- {
- int rc = 0;
- FUNC_ENTRY;
- rc = MQTTAsync_subscribeMany(handle, 1, (char * const *)(&topic), &qos, response);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTAsync_unsubscribeMany(MQTTAsync handle, int count, char* const* topic, MQTTAsync_responseOptions* response)
- {
- MQTTAsyncs* m = handle;
- int i = 0;
- int rc = MQTTASYNC_SUCCESS;
- MQTTAsync_queuedCommand* unsub;
- int msgid = 0;
- FUNC_ENTRY;
- if (m == NULL || m->c == NULL)
- rc = MQTTASYNC_FAILURE;
- else if (m->c->connected == 0)
- rc = MQTTASYNC_DISCONNECTED;
- else for (i = 0; i < count; i++)
- {
- if (!UTF8_validateString(topic[i]))
- {
- rc = MQTTASYNC_BAD_UTF8_STRING;
- break;
- }
- }
- if (rc != MQTTASYNC_SUCCESS)
- ; /* don't overwrite a previous error code */
- else if ((msgid = MQTTAsync_assignMsgId(m)) == 0)
- rc = MQTTASYNC_NO_MORE_MSGIDS;
- else if (response)
- {
- if (m->c->MQTTVersion >= MQTTVERSION_5)
- {
- if (response->struct_version == 0 || response->onFailure || response->onSuccess)
- rc = MQTTASYNC_BAD_MQTT_OPTION;
- }
- else if (m->c->MQTTVersion < MQTTVERSION_5)
- {
- if (response->struct_version >= 1 && (response->onFailure5 || response->onSuccess5))
- rc = MQTTASYNC_BAD_MQTT_OPTION;
- }
- }
- if (rc != MQTTASYNC_SUCCESS)
- goto exit;
- /* Add unsubscribe request to operation queue */
- if ((unsub = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- memset(unsub, '\0', sizeof(MQTTAsync_queuedCommand));
- unsub->client = m;
- unsub->command.type = UNSUBSCRIBE;
- unsub->command.token = msgid;
- if (response)
- {
- unsub->command.onSuccess = response->onSuccess;
- unsub->command.onFailure = response->onFailure;
- unsub->command.onSuccess5 = response->onSuccess5;
- unsub->command.onFailure5 = response->onFailure5;
- unsub->command.context = response->context;
- response->token = unsub->command.token;
- if (m->c->MQTTVersion >= MQTTVERSION_5)
- unsub->command.properties = MQTTProperties_copy(&response->properties);
- }
- unsub->command.details.unsub.count = count;
- if ((unsub->command.details.unsub.topics = malloc(sizeof(char*) * count)) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- for (i = 0; i < count; ++i)
- unsub->command.details.unsub.topics[i] = MQTTStrdup(topic[i]);
- rc = MQTTAsync_addCommand(unsub, sizeof(unsub));
- exit:
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTAsync_unsubscribe(MQTTAsync handle, const char* topic, MQTTAsync_responseOptions* response)
- {
- int rc = 0;
- FUNC_ENTRY;
- rc = MQTTAsync_unsubscribeMany(handle, 1, (char * const *)(&topic), response);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen, const void* payload,
- int qos, int retained, MQTTAsync_responseOptions* response)
- {
- int rc = MQTTASYNC_SUCCESS;
- MQTTAsyncs* m = handle;
- MQTTAsync_queuedCommand* pub;
- int msgid = 0;
- FUNC_ENTRY;
- if (m == NULL || m->c == NULL)
- rc = MQTTASYNC_FAILURE;
- else if (m->c->connected == 0)
- {
- if (m->createOptions == NULL)
- rc = MQTTASYNC_DISCONNECTED;
- else if (m->createOptions->sendWhileDisconnected == 0)
- rc = MQTTASYNC_DISCONNECTED;
- else if (m->shouldBeConnected == 0 && (m->createOptions->struct_version < 2 || m->createOptions->allowDisconnectedSendAtAnyTime == 0))
- rc = MQTTASYNC_DISCONNECTED;
- }
- if (rc != MQTTASYNC_SUCCESS)
- goto exit;
- if (!UTF8_validateString(destinationName))
- rc = MQTTASYNC_BAD_UTF8_STRING;
- else if (qos < 0 || qos > 2)
- rc = MQTTASYNC_BAD_QOS;
- else if (qos > 0 && (msgid = MQTTAsync_assignMsgId(m)) == 0)
- rc = MQTTASYNC_NO_MORE_MSGIDS;
- else if (m->createOptions &&
- (m->createOptions->struct_version < 2 || m->createOptions->deleteOldestMessages == 0) &&
- (MQTTAsync_getNoBufferedMessages(m) >= m->createOptions->maxBufferedMessages))
- rc = MQTTASYNC_MAX_BUFFERED_MESSAGES;
- else if (response)
- {
- if (m->c->MQTTVersion >= MQTTVERSION_5)
- {
- if (response->struct_version == 0 || response->onFailure || response->onSuccess)
- rc = MQTTASYNC_BAD_MQTT_OPTION;
- }
- else if (m->c->MQTTVersion < MQTTVERSION_5)
- {
- if (response->struct_version >= 1 && (response->onFailure5 || response->onSuccess5))
- rc = MQTTASYNC_BAD_MQTT_OPTION;
- }
- }
- if (rc != MQTTASYNC_SUCCESS)
- goto exit;
- /* Add publish request to operation queue */
- if ((pub = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- memset(pub, '\0', sizeof(MQTTAsync_queuedCommand));
- pub->client = m;
- pub->command.type = PUBLISH;
- pub->command.token = msgid;
- if (response)
- {
- pub->command.onSuccess = response->onSuccess;
- pub->command.onFailure = response->onFailure;
- pub->command.onSuccess5 = response->onSuccess5;
- pub->command.onFailure5 = response->onFailure5;
- pub->command.context = response->context;
- response->token = pub->command.token;
- if (m->c->MQTTVersion >= MQTTVERSION_5)
- pub->command.properties = MQTTProperties_copy(&response->properties);
- }
- if ((pub->command.details.pub.destinationName = MQTTStrdup(destinationName)) == NULL)
- {
- free(pub);
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- pub->command.details.pub.payloadlen = payloadlen;
- if ((pub->command.details.pub.payload = malloc(payloadlen)) == NULL)
- {
- free(pub->command.details.pub.destinationName);
- free(pub);
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- memcpy(pub->command.details.pub.payload, payload, payloadlen);
- pub->command.details.pub.qos = qos;
- pub->command.details.pub.retained = retained;
- rc = MQTTAsync_addCommand(pub, sizeof(pub));
- exit:
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTAsync_sendMessage(MQTTAsync handle, const char* destinationName, const MQTTAsync_message* message,
- MQTTAsync_responseOptions* response)
- {
- int rc = MQTTASYNC_SUCCESS;
- MQTTAsyncs* m = handle;
- FUNC_ENTRY;
- if (message == NULL)
- {
- rc = MQTTASYNC_NULL_PARAMETER;
- goto exit;
- }
- if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
- (message->struct_version != 0 && message->struct_version != 1))
- {
- rc = MQTTASYNC_BAD_STRUCTURE;
- goto exit;
- }
- if (m->c->MQTTVersion >= MQTTVERSION_5 && response)
- response->properties = message->properties;
- rc = MQTTAsync_send(handle, destinationName, message->payloadlen, message->payload,
- message->qos, message->retained, response);
- exit:
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions* options)
- {
- if (options != NULL && (strncmp(options->struct_id, "MQTD", 4) != 0 || options->struct_version < 0 || options->struct_version > 1))
- return MQTTASYNC_BAD_STRUCTURE;
- else
- return MQTTAsync_disconnect1(handle, options, 0);
- }
- int MQTTAsync_isConnected(MQTTAsync handle)
- {
- MQTTAsyncs* m = handle;
- int rc = 0;
- FUNC_ENTRY;
- MQTTAsync_lock_mutex(mqttasync_mutex);
- if (m && m->c)
- rc = m->c->connected;
- MQTTAsync_unlock_mutex(mqttasync_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTAsync_isComplete(MQTTAsync handle, MQTTAsync_token dt)
- {
- int rc = MQTTASYNC_SUCCESS;
- MQTTAsyncs* m = handle;
- ListElement* current = NULL;
- FUNC_ENTRY;
- MQTTAsync_lock_mutex(mqttasync_mutex);
- if (m == NULL)
- {
- rc = MQTTASYNC_FAILURE;
- goto exit;
- }
- /* First check unprocessed commands */
- current = NULL;
- while (ListNextElement(MQTTAsync_commands, ¤t))
- {
- MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(current->content);
- if (cmd->client == m && cmd->command.token == dt)
- goto exit;
- }
- /* Now check the inflight messages */
- if (m->c && m->c->outboundMsgs->count > 0)
- {
- current = NULL;
- while (ListNextElement(m->c->outboundMsgs, ¤t))
- {
- Messages* m = (Messages*)(current->content);
- if (m->msgid == dt)
- goto exit;
- }
- }
- rc = MQTTASYNC_TRUE; /* Can't find it, so it must be complete */
- exit:
- MQTTAsync_unlock_mutex(mqttasync_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTAsync_waitForCompletion(MQTTAsync handle, MQTTAsync_token dt, unsigned long timeout)
- {
- int rc = MQTTASYNC_FAILURE;
- START_TIME_TYPE start = MQTTTime_start_clock();
- ELAPSED_TIME_TYPE elapsed = 0L;
- MQTTAsyncs* m = handle;
- FUNC_ENTRY;
- MQTTAsync_lock_mutex(mqttasync_mutex);
- if (m == NULL || m->c == NULL)
- {
- MQTTAsync_unlock_mutex(mqttasync_mutex);
- rc = MQTTASYNC_FAILURE;
- goto exit;
- }
- if (m->c->connected == 0)
- {
- MQTTAsync_unlock_mutex(mqttasync_mutex);
- rc = MQTTASYNC_DISCONNECTED;
- goto exit;
- }
- MQTTAsync_unlock_mutex(mqttasync_mutex);
- if (MQTTAsync_isComplete(handle, dt) == 1)
- {
- rc = MQTTASYNC_SUCCESS; /* well we couldn't find it */
- goto exit;
- }
- elapsed = MQTTTime_elapsed(start);
- while (elapsed < timeout && rc == MQTTASYNC_FAILURE)
- {
- MQTTTime_sleep(100);
- if (MQTTAsync_isComplete(handle, dt) == 1)
- rc = MQTTASYNC_SUCCESS; /* well we couldn't find it */
- MQTTAsync_lock_mutex(mqttasync_mutex);
- if (m->c->connected == 0)
- rc = MQTTASYNC_DISCONNECTED;
- MQTTAsync_unlock_mutex(mqttasync_mutex);
- elapsed = MQTTTime_elapsed(start);
- }
- exit:
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens)
- {
- int rc = MQTTASYNC_SUCCESS;
- MQTTAsyncs* m = handle;
- ListElement* current = NULL;
- int count = 0;
- FUNC_ENTRY;
- MQTTAsync_lock_mutex(mqttasync_mutex);
- MQTTAsync_lock_mutex(mqttcommand_mutex);
- *tokens = NULL;
- if (m == NULL)
- {
- rc = MQTTASYNC_FAILURE;
- goto exit;
- }
- /* calculate the number of pending tokens - commands plus inflight */
- while (ListNextElement(MQTTAsync_commands, ¤t))
- {
- MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(current->content);
- if (cmd->client == m && cmd->command.type == PUBLISH)
- count++;
- }
- if (m->c)
- count += m->c->outboundMsgs->count;
- if (count == 0)
- goto exit; /* no tokens to return */
- *tokens = malloc(sizeof(MQTTAsync_token) * (count + 1)); /* add space for sentinel at end of list */
- if (!*tokens)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- /* First add the unprocessed commands to the pending tokens */
- current = NULL;
- count = 0;
- while (ListNextElement(MQTTAsync_commands, ¤t))
- {
- MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(current->content);
- if (cmd->client == m && cmd->command.type == PUBLISH)
- (*tokens)[count++] = cmd->command.token;
- }
- /* Now add the inflight messages */
- if (m->c && m->c->outboundMsgs->count > 0)
- {
- current = NULL;
- while (ListNextElement(m->c->outboundMsgs, ¤t))
- {
- Messages* m = (Messages*)(current->content);
- (*tokens)[count++] = m->msgid;
- }
- }
- (*tokens)[count] = -1; /* indicate end of list */
- exit:
- MQTTAsync_unlock_mutex(mqttcommand_mutex);
- MQTTAsync_unlock_mutex(mqttasync_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTAsync_setCallbacks(MQTTAsync handle, void* context,
- MQTTAsync_connectionLost* cl,
- MQTTAsync_messageArrived* ma,
- MQTTAsync_deliveryComplete* dc)
- {
- int rc = MQTTASYNC_SUCCESS;
- MQTTAsyncs* m = handle;
- FUNC_ENTRY;
- MQTTAsync_lock_mutex(mqttasync_mutex);
- if (m == NULL || ma == NULL || m->c == NULL || m->c->connect_state != NOT_IN_PROGRESS)
- rc = MQTTASYNC_FAILURE;
- else
- {
- m->clContext = m->maContext = m->dcContext = context;
- m->cl = cl;
- m->ma = ma;
- m->dc = dc;
- }
- MQTTAsync_unlock_mutex(mqttasync_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTAsync_setConnectionLostCallback(MQTTAsync handle, void* context,
- MQTTAsync_connectionLost* cl)
- {
- int rc = MQTTASYNC_SUCCESS;
- MQTTAsyncs* m = handle;
- FUNC_ENTRY;
- MQTTAsync_lock_mutex(mqttasync_mutex);
- if (m == NULL || m->c->connect_state != 0)
- rc = MQTTASYNC_FAILURE;
- else
- {
- m->clContext = context;
- m->cl = cl;
- }
- MQTTAsync_unlock_mutex(mqttasync_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTAsync_setMessageArrivedCallback(MQTTAsync handle, void* context,
- MQTTAsync_messageArrived* ma)
- {
- int rc = MQTTASYNC_SUCCESS;
- MQTTAsyncs* m = handle;
- FUNC_ENTRY;
- MQTTAsync_lock_mutex(mqttasync_mutex);
- if (m == NULL || ma == NULL || m->c->connect_state != 0)
- rc = MQTTASYNC_FAILURE;
- else
- {
- m->maContext = context;
- m->ma = ma;
- }
- MQTTAsync_unlock_mutex(mqttasync_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTAsync_setDeliveryCompleteCallback(MQTTAsync handle, void* context,
- MQTTAsync_deliveryComplete* dc)
- {
- int rc = MQTTASYNC_SUCCESS;
- MQTTAsyncs* m = handle;
- FUNC_ENTRY;
- MQTTAsync_lock_mutex(mqttasync_mutex);
- if (m == NULL || m->c->connect_state != 0)
- rc = MQTTASYNC_FAILURE;
- else
- {
- m->dcContext = context;
- m->dc = dc;
- }
- MQTTAsync_unlock_mutex(mqttasync_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTAsync_setDisconnected(MQTTAsync handle, void* context, MQTTAsync_disconnected* disconnected)
- {
- int rc = MQTTASYNC_SUCCESS;
- MQTTAsyncs* m = handle;
- FUNC_ENTRY;
- MQTTAsync_lock_mutex(mqttasync_mutex);
- if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
- rc = MQTTASYNC_FAILURE;
- else
- {
- m->disconnected_context = context;
- m->disconnected = disconnected;
- }
- MQTTAsync_unlock_mutex(mqttasync_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTAsync_setConnected(MQTTAsync handle, void* context, MQTTAsync_connected* connected)
- {
- int rc = MQTTASYNC_SUCCESS;
- MQTTAsyncs* m = handle;
- FUNC_ENTRY;
- MQTTAsync_lock_mutex(mqttasync_mutex);
- if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
- rc = MQTTASYNC_FAILURE;
- else
- {
- m->connected_context = context;
- m->connected = connected;
- }
- MQTTAsync_unlock_mutex(mqttasync_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTAsync_setUpdateConnectOptions(MQTTAsync handle, void* context, MQTTAsync_updateConnectOptions* updateOptions)
- {
- int rc = MQTTASYNC_SUCCESS;
- MQTTAsyncs* m = handle;
- FUNC_ENTRY;
- MQTTAsync_lock_mutex(mqttasync_mutex);
- if (m == NULL)
- rc = MQTTASYNC_FAILURE;
- else
- {
- m->updateConnectOptions_context = context;
- m->updateConnectOptions = updateOptions;
- }
- MQTTAsync_unlock_mutex(mqttasync_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTAsync_setBeforePersistenceWrite(MQTTAsync handle, void* context, MQTTPersistence_beforeWrite* co)
- {
- int rc = MQTTASYNC_SUCCESS;
- MQTTAsyncs* m = handle;
- FUNC_ENTRY;
- MQTTAsync_lock_mutex(mqttasync_mutex);
- if (m == NULL)
- rc = MQTTASYNC_FAILURE;
- else
- {
- m->c->beforeWrite = co;
- m->c->beforeWrite_context = context;
- }
- MQTTAsync_unlock_mutex(mqttasync_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTAsync_setAfterPersistenceRead(MQTTAsync handle, void* context, MQTTPersistence_afterRead* co)
- {
- int rc = MQTTASYNC_SUCCESS;
- MQTTAsyncs* m = handle;
- FUNC_ENTRY;
- MQTTAsync_lock_mutex(mqttasync_mutex);
- if (m == NULL)
- rc = MQTTASYNC_FAILURE;
- else
- {
- m->c->afterRead = co;
- m->c->afterRead_context = context;
- }
- MQTTAsync_unlock_mutex(mqttasync_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- void MQTTAsync_setTraceLevel(enum MQTTASYNC_TRACE_LEVELS level)
- {
- Log_setTraceLevel((enum LOG_LEVELS)level);
- }
- void MQTTAsync_setTraceCallback(MQTTAsync_traceCallback* callback)
- {
- Log_setTraceCallback((Log_traceCallback*)callback);
- }
- MQTTAsync_nameValue* MQTTAsync_getVersionInfo(void)
- {
- #define MAX_INFO_STRINGS 8
- static MQTTAsync_nameValue libinfo[MAX_INFO_STRINGS + 1];
- int i = 0;
- libinfo[i].name = "Product name";
- libinfo[i++].value = "Eclipse Paho Asynchronous MQTT C Client Library";
- libinfo[i].name = "Version";
- libinfo[i++].value = CLIENT_VERSION;
- libinfo[i].name = "Build level";
- libinfo[i++].value = BUILD_TIMESTAMP;
- #if defined(OPENSSL)
- libinfo[i].name = "OpenSSL version";
- libinfo[i++].value = SSLeay_version(SSLEAY_VERSION);
- libinfo[i].name = "OpenSSL flags";
- libinfo[i++].value = SSLeay_version(SSLEAY_CFLAGS);
- libinfo[i].name = "OpenSSL build timestamp";
- libinfo[i++].value = SSLeay_version(SSLEAY_BUILT_ON);
- libinfo[i].name = "OpenSSL platform";
- libinfo[i++].value = SSLeay_version(SSLEAY_PLATFORM);
- libinfo[i].name = "OpenSSL directory";
- libinfo[i++].value = SSLeay_version(SSLEAY_DIR);
- #endif
- libinfo[i].name = NULL;
- libinfo[i].value = NULL;
- return libinfo;
- }
- const char* MQTTAsync_strerror(int code)
- {
- static char buf[30];
- int chars = 0;
- switch (code) {
- case MQTTASYNC_SUCCESS:
- return "Success";
- case MQTTASYNC_FAILURE:
- return "Failure";
- case MQTTASYNC_PERSISTENCE_ERROR:
- return "Persistence error";
- case MQTTASYNC_DISCONNECTED:
- return "Disconnected";
- case MQTTASYNC_MAX_MESSAGES_INFLIGHT:
- return "Maximum in-flight messages amount reached";
- case MQTTASYNC_BAD_UTF8_STRING:
- return "Invalid UTF8 string";
- case MQTTASYNC_NULL_PARAMETER:
- return "Invalid (NULL) parameter";
- case MQTTASYNC_TOPICNAME_TRUNCATED:
- return "Topic containing NULL characters has been truncated";
- case MQTTASYNC_BAD_STRUCTURE:
- return "Bad structure";
- case MQTTASYNC_BAD_QOS:
- return "Invalid QoS value";
- case MQTTASYNC_NO_MORE_MSGIDS:
- return "Too many pending commands";
- case MQTTASYNC_OPERATION_INCOMPLETE:
- return "Operation discarded before completion";
- case MQTTASYNC_MAX_BUFFERED_MESSAGES:
- return "No more messages can be buffered";
- case MQTTASYNC_SSL_NOT_SUPPORTED:
- return "SSL is not supported";
- case MQTTASYNC_BAD_PROTOCOL:
- return "Invalid protocol scheme";
- case MQTTASYNC_BAD_MQTT_OPTION:
- return "Options for wrong MQTT version";
- case MQTTASYNC_WRONG_MQTT_VERSION:
- return "Client created for another version of MQTT";
- case MQTTASYNC_0_LEN_WILL_TOPIC:
- return "Zero length will topic on connect";
- case MQTTASYNC_COMMAND_IGNORED:
- return "Connect or disconnect command ignored";
- case MQTTASYNC_MAX_BUFFERED:
- return "maxBufferedMessages in the connect options must be >= 0";
- }
- chars = snprintf(buf, sizeof(buf), "Unknown error code %d", code);
- if (chars >= sizeof(buf))
- {
- buf[sizeof(buf)-1] = '\0';
- Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
- }
- return buf;
- }
- void MQTTAsync_freeMessage(MQTTAsync_message** message)
- {
- FUNC_ENTRY;
- MQTTProperties_free(&(*message)->properties);
- free((*message)->payload);
- free(*message);
- *message = NULL;
- FUNC_EXIT;
- }
- void MQTTAsync_free(void* memory)
- {
- FUNC_ENTRY;
- free(memory);
- FUNC_EXIT;
- }
- void* MQTTAsync_malloc(size_t size)
- {
- void* val;
- int rc = 0;
- FUNC_ENTRY;
- val = malloc(size);
- rc = (val != NULL);
- FUNC_EXIT_RC(rc);
- return val;
- }
- static void MQTTAsync_freeServerURIs(MQTTAsyncs* m)
- {
- int i;
- for (i = 0; i < m->serverURIcount; ++i)
- free(m->serverURIs[i]);
- m->serverURIcount = 0;
- if (m->serverURIs)
- free(m->serverURIs);
- m->serverURIs = NULL;
- }
|