12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783 |
- /*******************************************************************************
- * Copyright (c) 2009, 2019 IBM Corp.
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * and Eclipse Distribution License v1.0 which accompany this distribution.
- *
- * The Eclipse Public License is available at
- * http://www.eclipse.org/legal/epl-v10.html
- * and the Eclipse Distribution License is available at
- * http://www.eclipse.org/org/documents/edl-v10.php.
- *
- * Contributors:
- * Ian Craggs - initial API and implementation and/or initial documentation
- * Ian Craggs - bug 384016 - segv setting will message
- * Ian Craggs - bug 384053 - v1.0.0.7 - stop MQTTClient_receive on socket error
- * Ian Craggs, Allan Stockdill-Mander - add ability to connect with SSL
- * Ian Craggs - multiple server connection support
- * Ian Craggs - fix for bug 413429 - connectionLost not called
- * Ian Craggs - fix for bug 421103 - trying to write to same socket, in publish/retries
- * Ian Craggs - fix for bug 419233 - mutexes not reporting errors
- * Ian Craggs - fix for bug 420851
- * Ian Craggs - fix for bug 432903 - queue persistence
- * Ian Craggs - MQTT 3.1.1 support
- * Ian Craggs - fix for bug 438176 - MQTT version selection
- * Rong Xiang, Ian Craggs - C++ compatibility
- * Ian Craggs - fix for bug 443724 - stack corruption
- * Ian Craggs - fix for bug 447672 - simultaneous access to socket structure
- * Ian Craggs - fix for bug 459791 - deadlock in WaitForCompletion for bad client
- * Ian Craggs - fix for bug 474905 - insufficient synchronization for subscribe, unsubscribe, connect
- * Ian Craggs - make it clear that yield and receive are not intended for multi-threaded mode (bug 474748)
- * Ian Craggs - SNI support, message queue unpersist bug
- * Ian Craggs - binary will message support
- * Ian Craggs - waitforCompletion fix #240
- * Ian Craggs - check for NULL SSL options #334
- * Ian Craggs - allocate username/password buffers #431
- * Ian Craggs - MQTT 5.0 support
- *******************************************************************************/
- /**
- * @file
- * \brief Synchronous API implementation
- *
- */
- #define _GNU_SOURCE /* for pthread_mutexattr_settype */
- #include <stdlib.h>
- #include <string.h>
- #if !defined(WIN32) && !defined(WIN64)
- #include <sys/time.h>
- #endif
- #include "MQTTClient.h"
- #if !defined(NO_PERSISTENCE)
- #include "MQTTPersistence.h"
- #endif
- #include "utf-8.h"
- #include "MQTTProtocol.h"
- #include "MQTTProtocolOut.h"
- #include "Thread.h"
- #include "SocketBuffer.h"
- #include "StackTrace.h"
- #include "Heap.h"
- #if defined(OPENSSL)
- #include <openssl/ssl.h>
- #else
- #define URI_SSL "ssl://"
- #endif
- #include "OsWrapper.h"
- #define URI_TCP "tcp://"
- #define URI_WS "ws://"
- #define URI_WSS "wss://"
- #include "VersionInfo.h"
- #include "WebSocket.h"
- const char *client_timestamp_eye = "MQTTClientV3_Timestamp " BUILD_TIMESTAMP;
- const char *client_version_eye = "MQTTClientV3_Version " CLIENT_VERSION;
- void MQTTClient_global_init(MQTTClient_init_options* inits)
- {
- #if defined(OPENSSL)
- SSLSocket_handleOpensslInit(inits->do_openssl_init);
- #endif
- }
- static ClientStates ClientState =
- {
- CLIENT_VERSION, /* version */
- NULL /* client list */
- };
- ClientStates* bstate = &ClientState;
- MQTTProtocol state;
- #if defined(WIN32) || defined(WIN64)
- static mutex_type mqttclient_mutex = NULL;
- static mutex_type socket_mutex = NULL;
- static mutex_type subscribe_mutex = NULL;
- static mutex_type unsubscribe_mutex = NULL;
- static mutex_type connect_mutex = NULL;
- extern mutex_type stack_mutex;
- extern mutex_type heap_mutex;
- extern mutex_type log_mutex;
- BOOL APIENTRY DllMain(HANDLE hModule,
- DWORD ul_reason_for_call,
- LPVOID lpReserved)
- {
- switch (ul_reason_for_call)
- {
- case DLL_PROCESS_ATTACH:
- Log(TRACE_MAX, -1, "DLL process attach");
- if (mqttclient_mutex == NULL)
- {
- mqttclient_mutex = CreateMutex(NULL, 0, NULL);
- subscribe_mutex = CreateMutex(NULL, 0, NULL);
- unsubscribe_mutex = CreateMutex(NULL, 0, NULL);
- connect_mutex = CreateMutex(NULL, 0, NULL);
- stack_mutex = CreateMutex(NULL, 0, NULL);
- heap_mutex = CreateMutex(NULL, 0, NULL);
- log_mutex = CreateMutex(NULL, 0, NULL);
- socket_mutex = CreateMutex(NULL, 0, NULL);
- }
- case DLL_THREAD_ATTACH:
- Log(TRACE_MAX, -1, "DLL thread attach");
- case DLL_THREAD_DETACH:
- Log(TRACE_MAX, -1, "DLL thread detach");
- case DLL_PROCESS_DETACH:
- Log(TRACE_MAX, -1, "DLL process detach");
- }
- return TRUE;
- }
- #else
- static pthread_mutex_t mqttclient_mutex_store = PTHREAD_MUTEX_INITIALIZER;
- static mutex_type mqttclient_mutex = &mqttclient_mutex_store;
- static pthread_mutex_t socket_mutex_store = PTHREAD_MUTEX_INITIALIZER;
- static mutex_type socket_mutex = &socket_mutex_store;
- static pthread_mutex_t subscribe_mutex_store = PTHREAD_MUTEX_INITIALIZER;
- static mutex_type subscribe_mutex = &subscribe_mutex_store;
- static pthread_mutex_t unsubscribe_mutex_store = PTHREAD_MUTEX_INITIALIZER;
- static mutex_type unsubscribe_mutex = &unsubscribe_mutex_store;
- static pthread_mutex_t connect_mutex_store = PTHREAD_MUTEX_INITIALIZER;
- static mutex_type connect_mutex = &connect_mutex_store;
- void MQTTClient_init(void)
- {
- pthread_mutexattr_t attr;
- int rc;
- pthread_mutexattr_init(&attr);
- #if !defined(_WRS_KERNEL)
- pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
- #else
- /* #warning "no pthread_mutexattr_settype" */
- #endif /* !defined(_WRS_KERNEL) */
- if ((rc = pthread_mutex_init(mqttclient_mutex, &attr)) != 0)
- printf("MQTTClient: error %d initializing client_mutex\n", rc);
- if ((rc = pthread_mutex_init(socket_mutex, &attr)) != 0)
- printf("MQTTClient: error %d initializing socket_mutex\n", rc);
- if ((rc = pthread_mutex_init(subscribe_mutex, &attr)) != 0)
- printf("MQTTClient: error %d initializing subscribe_mutex\n", rc);
- if ((rc = pthread_mutex_init(unsubscribe_mutex, &attr)) != 0)
- printf("MQTTClient: error %d initializing unsubscribe_mutex\n", rc);
- if ((rc = pthread_mutex_init(connect_mutex, &attr)) != 0)
- printf("MQTTClient: error %d initializing connect_mutex\n", rc);
- }
- #define WINAPI
- #endif
- static volatile int library_initialized = 0;
- static List* handles = NULL;
- static int running = 0;
- static int tostop = 0;
- static thread_id_type run_id = 0;
- typedef struct
- {
- MQTTClient_message* msg;
- char* topicName;
- int topicLen;
- unsigned int seqno; /* only used on restore */
- } qEntry;
- typedef struct
- {
- char* serverURI;
- #if defined(OPENSSL)
- int ssl;
- #endif
- int websocket;
- Clients* c;
- MQTTClient_connectionLost* cl;
- MQTTClient_messageArrived* ma;
- MQTTClient_deliveryComplete* dc;
- void* context;
- MQTTClient_disconnected* disconnected;
- void* disconnected_context; /* the context to be associated with the disconnected callback*/
- MQTTClient_published* published;
- void* published_context; /* the context to be associated with the disconnected callback*/
- #if 0
- MQTTClient_authHandle* auth_handle;
- void* auth_handle_context; /* the context to be associated with the authHandle callback*/
- #endif
- sem_type connect_sem;
- int rc; /* getsockopt return code in connect */
- sem_type connack_sem;
- sem_type suback_sem;
- sem_type unsuback_sem;
- MQTTPacket* pack;
- } MQTTClients;
- struct props_rc_parms
- {
- MQTTClients* m;
- MQTTProperties* properties;
- enum MQTTReasonCodes reasonCode;
- };
- void MQTTClient_sleep(long milliseconds)
- {
- FUNC_ENTRY;
- #if defined(WIN32) || defined(WIN64)
- Sleep(milliseconds);
- #else
- usleep(milliseconds*1000);
- #endif
- FUNC_EXIT;
- }
- #if defined(WIN32) || defined(WIN64)
- #define START_TIME_TYPE DWORD
- START_TIME_TYPE MQTTClient_start_clock(void)
- {
- return GetTickCount();
- }
- #elif defined(AIX)
- #define START_TIME_TYPE struct timespec
- START_TIME_TYPE MQTTClient_start_clock(void)
- {
- static struct timespec start;
- clock_gettime(CLOCK_MONOTONIC, &start);
- return start;
- }
- #else
- #define START_TIME_TYPE struct timeval
- START_TIME_TYPE MQTTClient_start_clock(void)
- {
- static struct timeval start;
- static struct timespec start_ts;
- clock_gettime(CLOCK_MONOTONIC, &start_ts);
- start.tv_sec = start_ts.tv_sec;
- start.tv_usec = start_ts.tv_nsec / 1000;
- return start;
- }
- #endif
- #if defined(WIN32) || defined(WIN64)
- long MQTTClient_elapsed(DWORD milliseconds)
- {
- return GetTickCount() - milliseconds;
- }
- #elif defined(AIX)
- #define assert(a)
- long MQTTClient_elapsed(struct timespec start)
- {
- struct timespec now, res;
- clock_gettime(CLOCK_MONOTONIC, &now);
- ntimersub(now, start, res);
- return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
- }
- #else
- long MQTTClient_elapsed(struct timeval start)
- {
- struct timeval now, res;
- static struct timespec now_ts;
- clock_gettime(CLOCK_MONOTONIC, &now_ts);
- now.tv_sec = now_ts.tv_sec;
- now.tv_usec = now_ts.tv_nsec / 1000;
- timersub(&now, &start, &res);
- return (res.tv_sec)*1000 + (res.tv_usec)/1000;
- }
- #endif
- static void MQTTClient_terminate(void);
- static void MQTTClient_emptyMessageQueue(Clients* client);
- static int MQTTClient_deliverMessage(
- int rc, MQTTClients* m,
- char** topicName, int* topicLen,
- MQTTClient_message** message);
- static int clientSockCompare(void* a, void* b);
- static thread_return_type WINAPI connectionLost_call(void* context);
- static thread_return_type WINAPI MQTTClient_run(void* n);
- static void MQTTClient_stop(void);
- static void MQTTClient_closeSession(Clients* client, enum MQTTReasonCodes reason, MQTTProperties* props);
- static int MQTTClient_cleanSession(Clients* client);
- static MQTTResponse MQTTClient_connectURIVersion(
- MQTTClient handle, MQTTClient_connectOptions* options,
- const char* serverURI, int MQTTVersion,
- START_TIME_TYPE start, long millisecsTimeout,
- MQTTProperties* connectProperties, MQTTProperties* willProperties);
- static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI,
- MQTTProperties* connectProperties, MQTTProperties* willProperties);
- static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int internal, int stop, enum MQTTReasonCodes, MQTTProperties*);
- static int MQTTClient_disconnect_internal(MQTTClient handle, int timeout);
- static void MQTTClient_retry(void);
- static MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc);
- static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, long timeout);
- /*static int pubCompare(void* a, void* b); */
- static void MQTTProtocol_checkPendingWrites(void);
- static void MQTTClient_writeComplete(int socket, int rc);
- int MQTTClient_createWithOptions(MQTTClient* handle, const char* serverURI, const char* clientId,
- int persistence_type, void* persistence_context, MQTTClient_createOptions* options)
- {
- int rc = 0;
- MQTTClients *m = NULL;
- FUNC_ENTRY;
- rc = Thread_lock_mutex(mqttclient_mutex);
- if (serverURI == NULL || clientId == NULL)
- {
- rc = MQTTCLIENT_NULL_PARAMETER;
- goto exit;
- }
- if (!UTF8_validateString(clientId))
- {
- rc = MQTTCLIENT_BAD_UTF8_STRING;
- goto exit;
- }
- if (strstr(serverURI, "://") != NULL)
- {
- if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) != 0
- && strncmp(URI_WS, serverURI, strlen(URI_WS)) != 0
- #if defined(OPENSSL)
- && strncmp(URI_SSL, serverURI, strlen(URI_SSL)) != 0
- && strncmp(URI_WSS, serverURI, strlen(URI_WSS)) != 0
- #endif
- )
- {
- rc = MQTTCLIENT_BAD_PROTOCOL;
- goto exit;
- }
- }
- if (options && (strncmp(options->struct_id, "MQCO", 4) != 0 || options->struct_version != 0))
- {
- rc = MQTTCLIENT_BAD_STRUCTURE;
- goto exit;
- }
- if (!library_initialized)
- {
- #if defined(HEAP_H)
- Heap_initialize();
- #endif
- Log_initialize((Log_nameValue*)MQTTClient_getVersionInfo());
- bstate->clients = ListInitialize();
- Socket_outInitialize();
- Socket_setWriteCompleteCallback(MQTTClient_writeComplete);
- handles = ListInitialize();
- #if defined(OPENSSL)
- SSLSocket_initialize();
- #endif
- library_initialized = 1;
- }
- m = malloc(sizeof(MQTTClients));
- *handle = m;
- memset(m, '\0', sizeof(MQTTClients));
- if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
- serverURI += strlen(URI_TCP);
- else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
- {
- serverURI += strlen(URI_WS);
- m->websocket = 1;
- }
- else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
- {
- #if defined(OPENSSL)
- serverURI += strlen(URI_SSL);
- m->ssl = 1;
- #else
- rc = MQTTCLIENT_SSL_NOT_SUPPORTED;
- goto exit;
- #endif
- }
- else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
- {
- #if defined(OPENSSL)
- serverURI += strlen(URI_WSS);
- m->ssl = 1;
- m->websocket = 1;
- #else
- rc = MQTTCLIENT_SSL_NOT_SUPPORTED;
- goto exit;
- #endif
- }
- m->serverURI = MQTTStrdup(serverURI);
- ListAppend(handles, m, sizeof(MQTTClients));
- m->c = malloc(sizeof(Clients));
- memset(m->c, '\0', sizeof(Clients));
- m->c->context = m;
- m->c->MQTTVersion = (options) ? options->MQTTVersion : MQTTVERSION_DEFAULT;
- m->c->outboundMsgs = ListInitialize();
- m->c->inboundMsgs = ListInitialize();
- m->c->messageQueue = ListInitialize();
- m->c->clientID = MQTTStrdup(clientId);
- m->connect_sem = Thread_create_sem();
- m->connack_sem = Thread_create_sem();
- m->suback_sem = Thread_create_sem();
- m->unsuback_sem = Thread_create_sem();
- #if !defined(NO_PERSISTENCE)
- rc = MQTTPersistence_create(&(m->c->persistence), persistence_type, persistence_context);
- if (rc == 0)
- {
- rc = MQTTPersistence_initialize(m->c, m->serverURI);
- if (rc == 0)
- MQTTPersistence_restoreMessageQueue(m->c);
- }
- #endif
- ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List));
- exit:
- Thread_unlock_mutex(mqttclient_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTClient_create(MQTTClient* handle, const char* serverURI, const char* clientId,
- int persistence_type, void* persistence_context)
- {
- return MQTTClient_createWithOptions(handle, serverURI, clientId, persistence_type,
- persistence_context, NULL);
- }
- static void MQTTClient_terminate(void)
- {
- FUNC_ENTRY;
- MQTTClient_stop();
- if (library_initialized)
- {
- ListFree(bstate->clients);
- ListFree(handles);
- handles = NULL;
- WebSocket_terminate();
- #if defined(HEAP_H)
- Heap_terminate();
- #endif
- Log_terminate();
- library_initialized = 0;
- }
- FUNC_EXIT;
- }
- static void MQTTClient_emptyMessageQueue(Clients* client)
- {
- FUNC_ENTRY;
- /* empty message queue */
- if (client->messageQueue->count > 0)
- {
- ListElement* current = NULL;
- while (ListNextElement(client->messageQueue, ¤t))
- {
- qEntry* qe = (qEntry*)(current->content);
- free(qe->topicName);
- MQTTProperties_free(&qe->msg->properties);
- free(qe->msg->payload);
- free(qe->msg);
- }
- ListEmpty(client->messageQueue);
- }
- FUNC_EXIT;
- }
- void MQTTClient_destroy(MQTTClient* handle)
- {
- MQTTClients* m = *handle;
- FUNC_ENTRY;
- Thread_lock_mutex(mqttclient_mutex);
- if (m == NULL)
- goto exit;
- if (m->c)
- {
- int saved_socket = m->c->net.socket;
- char* saved_clientid = MQTTStrdup(m->c->clientID);
- #if !defined(NO_PERSISTENCE)
- MQTTPersistence_close(m->c);
- #endif
- MQTTClient_emptyMessageQueue(m->c);
- MQTTProtocol_freeClient(m->c);
- if (!ListRemove(bstate->clients, m->c))
- Log(LOG_ERROR, 0, NULL);
- else
- Log(TRACE_MIN, 1, NULL, saved_clientid, saved_socket);
- free(saved_clientid);
- }
- if (m->serverURI)
- free(m->serverURI);
- Thread_destroy_sem(m->connect_sem);
- Thread_destroy_sem(m->connack_sem);
- Thread_destroy_sem(m->suback_sem);
- Thread_destroy_sem(m->unsuback_sem);
- if (!ListRemove(handles, m))
- Log(LOG_ERROR, -1, "free error");
- *handle = NULL;
- if (bstate->clients->count == 0)
- MQTTClient_terminate();
- exit:
- Thread_unlock_mutex(mqttclient_mutex);
- FUNC_EXIT;
- }
- void MQTTClient_freeMessage(MQTTClient_message** message)
- {
- FUNC_ENTRY;
- MQTTProperties_free(&(*message)->properties);
- free((*message)->payload);
- free(*message);
- *message = NULL;
- FUNC_EXIT;
- }
- void MQTTClient_free(void* memory)
- {
- FUNC_ENTRY;
- free(memory);
- FUNC_EXIT;
- }
- DLLExport void MQTTResponse_free(MQTTResponse response)
- {
- FUNC_ENTRY;
- if (response.properties)
- {
- if (response.reasonCodeCount > 0 && response.reasonCodes)
- free(response.reasonCodes);
- MQTTProperties_free(response.properties);
- free(response.properties);
- }
- FUNC_EXIT;
- }
- static int MQTTClient_deliverMessage(int rc, MQTTClients* m, char** topicName, int* topicLen, MQTTClient_message** message)
- {
- qEntry* qe = (qEntry*)(m->c->messageQueue->first->content);
- FUNC_ENTRY;
- *message = qe->msg;
- *topicName = qe->topicName;
- *topicLen = qe->topicLen;
- if (strlen(*topicName) != *topicLen)
- rc = MQTTCLIENT_TOPICNAME_TRUNCATED;
- #if !defined(NO_PERSISTENCE)
- if (m->c->persistence)
- MQTTPersistence_unpersistQueueEntry(m->c, (MQTTPersistence_qEntry*)qe);
- #endif
- ListRemove(m->c->messageQueue, m->c->messageQueue->first->content);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- /**
- * List callback function for comparing clients by socket
- * @param a first integer value
- * @param b second integer value
- * @return boolean indicating whether a and b are equal
- */
- static int clientSockCompare(void* a, void* b)
- {
- MQTTClients* m = (MQTTClients*)a;
- return m->c->net.socket == *(int*)b;
- }
- /**
- * Wrapper function to call connection lost on a separate thread. A separate thread is needed to allow the
- * connectionLost function to make API calls (e.g. connect)
- * @param context a pointer to the relevant client
- * @return thread_return_type standard thread return value - not used here
- */
- static thread_return_type WINAPI connectionLost_call(void* context)
- {
- MQTTClients* m = (MQTTClients*)context;
- (*(m->cl))(m->context, NULL);
- return 0;
- }
- int MQTTClient_setDisconnected(MQTTClient handle, void* context, MQTTClient_disconnected* disconnected)
- {
- int rc = MQTTCLIENT_SUCCESS;
- MQTTClients* m = handle;
- FUNC_ENTRY;
- Thread_lock_mutex(mqttclient_mutex);
- if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
- rc = MQTTCLIENT_FAILURE;
- else
- {
- m->disconnected_context = context;
- m->disconnected = disconnected;
- }
- Thread_unlock_mutex(mqttclient_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- /**
- * Wrapper function to call disconnected on a separate thread. A separate thread is needed to allow the
- * disconnected function to make API calls (e.g. connect)
- * @param context a pointer to the relevant client
- * @return thread_return_type standard thread return value - not used here
- */
- static thread_return_type WINAPI call_disconnected(void* context)
- {
- struct props_rc_parms* pr = (struct props_rc_parms*)context;
- (*(pr->m->disconnected))(pr->m->disconnected_context, pr->properties, pr->reasonCode);
- MQTTProperties_free(pr->properties);
- free(pr->properties);
- free(pr);
- return 0;
- }
- int MQTTClient_setPublished(MQTTClient handle, void* context, MQTTClient_published* published)
- {
- int rc = MQTTCLIENT_SUCCESS;
- MQTTClients* m = handle;
- FUNC_ENTRY;
- Thread_lock_mutex(mqttclient_mutex);
- if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
- rc = MQTTCLIENT_FAILURE;
- else
- {
- m->published_context = context;
- m->published = published;
- }
- Thread_unlock_mutex(mqttclient_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- #if 0
- int MQTTClient_setHandleAuth(MQTTClient handle, void* context, MQTTClient_handleAuth* auth_handle)
- {
- int rc = MQTTCLIENT_SUCCESS;
- MQTTClients* m = handle;
- FUNC_ENTRY;
- Thread_lock_mutex(mqttclient_mutex);
- if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
- rc = MQTTCLIENT_FAILURE;
- else
- {
- m->auth_handle_context = context;
- m->auth_handle = auth_handle;
- }
- Thread_unlock_mutex(mqttclient_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- /**
- * Wrapper function to call authHandle on a separate thread. A separate thread is needed to allow the
- * disconnected function to make API calls (e.g. MQTTClient_auth)
- * @param context a pointer to the relevant client
- * @return thread_return_type standard thread return value - not used here
- */
- static thread_return_type WINAPI call_auth_handle(void* context)
- {
- struct props_rc_parms* pr = (struct props_rc_parms*)context;
- (*(pr->m->auth_handle))(pr->m->auth_handle_context, pr->properties, pr->reasonCode);
- MQTTProperties_free(pr->properties);
- free(pr->properties);
- free(pr);
- return 0;
- }
- #endif
- /* This is the thread function that handles the calling of callback functions if set */
- static thread_return_type WINAPI MQTTClient_run(void* n)
- {
- long timeout = 10L; /* first time in we have a small timeout. Gets things started more quickly */
- FUNC_ENTRY;
- running = 1;
- run_id = Thread_getid();
- Thread_lock_mutex(mqttclient_mutex);
- while (!tostop)
- {
- int rc = SOCKET_ERROR;
- int sock = -1;
- MQTTClients* m = NULL;
- MQTTPacket* pack = NULL;
- Thread_unlock_mutex(mqttclient_mutex);
- pack = MQTTClient_cycle(&sock, timeout, &rc);
- Thread_lock_mutex(mqttclient_mutex);
- if (tostop)
- break;
- timeout = 1000L;
- /* find client corresponding to socket */
- if (ListFindItem(handles, &sock, clientSockCompare) == NULL)
- {
- /* assert: should not happen */
- continue;
- }
- m = (MQTTClient)(handles->current->content);
- if (m == NULL)
- {
- /* assert: should not happen */
- continue;
- }
- if (rc == SOCKET_ERROR)
- {
- if (m->c->connected)
- MQTTClient_disconnect_internal(m, 0);
- else
- {
- if (m->c->connect_state == SSL_IN_PROGRESS)
- {
- Log(TRACE_MIN, -1, "Posting connect semaphore for client %s", m->c->clientID);
- m->c->connect_state = NOT_IN_PROGRESS;
- Thread_post_sem(m->connect_sem);
- }
- if (m->c->connect_state == WAIT_FOR_CONNACK)
- {
- Log(TRACE_MIN, -1, "Posting connack semaphore for client %s", m->c->clientID);
- m->c->connect_state = NOT_IN_PROGRESS;
- Thread_post_sem(m->connack_sem);
- }
- }
- }
- else
- {
- if (m->c->messageQueue->count > 0)
- {
- qEntry* qe = (qEntry*)(m->c->messageQueue->first->content);
- int topicLen = qe->topicLen;
- if (strlen(qe->topicName) == topicLen)
- topicLen = 0;
- Log(TRACE_MIN, -1, "Calling messageArrived for client %s, queue depth %d",
- m->c->clientID, m->c->messageQueue->count);
- Thread_unlock_mutex(mqttclient_mutex);
- rc = (*(m->ma))(m->context, qe->topicName, topicLen, qe->msg);
- Thread_lock_mutex(mqttclient_mutex);
- /* if 0 (false) is returned by the callback then it failed, so we don't remove the message from
- * the queue, and it will be retried later. If 1 is returned then the message data may have been freed,
- * so we must be careful how we use it.
- */
- if (rc)
- {
- #if !defined(NO_PERSISTENCE)
- if (m->c->persistence)
- MQTTPersistence_unpersistQueueEntry(m->c, (MQTTPersistence_qEntry*)qe);
- #endif
- ListRemove(m->c->messageQueue, qe);
- }
- else
- Log(TRACE_MIN, -1, "False returned from messageArrived for client %s, message remains on queue",
- m->c->clientID);
- }
- if (pack)
- {
- if (pack->header.bits.type == CONNACK)
- {
- Log(TRACE_MIN, -1, "Posting connack semaphore for client %s", m->c->clientID);
- m->pack = pack;
- Thread_post_sem(m->connack_sem);
- }
- else if (pack->header.bits.type == SUBACK)
- {
- Log(TRACE_MIN, -1, "Posting suback semaphore for client %s", m->c->clientID);
- m->pack = pack;
- Thread_post_sem(m->suback_sem);
- }
- else if (pack->header.bits.type == UNSUBACK)
- {
- Log(TRACE_MIN, -1, "Posting unsuback semaphore for client %s", m->c->clientID);
- m->pack = pack;
- Thread_post_sem(m->unsuback_sem);
- }
- else if (m->c->MQTTVersion >= MQTTVERSION_5)
- {
- if (pack->header.bits.type == DISCONNECT && m->disconnected)
- {
- struct props_rc_parms* dp;
- Ack* disc = (Ack*)pack;
- dp = malloc(sizeof(struct props_rc_parms));
- dp->m = m;
- dp->reasonCode = disc->rc;
- dp->properties = malloc(sizeof(MQTTProperties));
- *(dp->properties) = disc->properties;
- free(disc);
- MQTTClient_disconnect1(m, 10, 0, 1, MQTTREASONCODE_SUCCESS, NULL);
- Log(TRACE_MIN, -1, "Calling disconnected for client %s", m->c->clientID);
- Thread_start(call_disconnected, dp);
- }
- #if 0
- if (pack->header.bits.type == AUTH && m->auth_handle)
- {
- struct props_rc_parms dp;
- Ack* disc = (Ack*)pack;
- dp.m = m;
- dp.properties = &disc->properties;
- dp.reasonCode = disc->rc;
- free(pack);
- Log(TRACE_MIN, -1, "Calling auth_handle for client %s", m->c->clientID);
- Thread_start(call_auth_handle, &dp);
- }
- #endif
- }
- }
- else if (m->c->connect_state == TCP_IN_PROGRESS)
- {
- int error;
- socklen_t len = sizeof(error);
- if ((m->rc = getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len)) == 0)
- m->rc = error;
- Log(TRACE_MIN, -1, "Posting connect semaphore for client %s rc %d", m->c->clientID, m->rc);
- m->c->connect_state = NOT_IN_PROGRESS;
- Thread_post_sem(m->connect_sem);
- }
- #if defined(OPENSSL)
- else if (m->c->connect_state == SSL_IN_PROGRESS)
- {
- rc = m->c->sslopts->struct_version >= 3 ?
- SSLSocket_connect(m->c->net.ssl, m->c->net.socket, m->serverURI,
- m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
- SSLSocket_connect(m->c->net.ssl, m->c->net.socket, m->serverURI,
- m->c->sslopts->verify, NULL, NULL);
- if (rc == 1 || rc == SSL_FATAL)
- {
- if (rc == 1 && (m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
- m->c->session = SSL_get1_session(m->c->net.ssl);
- m->rc = rc;
- Log(TRACE_MIN, -1, "Posting connect semaphore for SSL client %s rc %d", m->c->clientID, m->rc);
- m->c->connect_state = NOT_IN_PROGRESS;
- Thread_post_sem(m->connect_sem);
- }
- }
- #endif
- else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS)
- {
- Log(TRACE_MIN, -1, "Posting websocket handshake for client %s rc %d", m->c->clientID, m->rc);
- m->c->connect_state = WAIT_FOR_CONNACK;
- Thread_post_sem(m->connect_sem);
- }
- }
- }
- run_id = 0;
- running = tostop = 0;
- Thread_unlock_mutex(mqttclient_mutex);
- FUNC_EXIT;
- return 0;
- }
- static void MQTTClient_stop(void)
- {
- int rc = 0;
- FUNC_ENTRY;
- if (running == 1 && tostop == 0)
- {
- int conn_count = 0;
- ListElement* current = NULL;
- if (handles != NULL)
- {
- /* find out how many handles are still connected */
- while (ListNextElement(handles, ¤t))
- {
- if (((MQTTClients*)(current->content))->c->connect_state > NOT_IN_PROGRESS ||
- ((MQTTClients*)(current->content))->c->connected)
- ++conn_count;
- }
- }
- Log(TRACE_MIN, -1, "Conn_count is %d", conn_count);
- /* stop the background thread, if we are the last one to be using it */
- if (conn_count == 0)
- {
- int count = 0;
- tostop = 1;
- if (Thread_getid() != run_id)
- {
- while (running && ++count < 100)
- {
- Thread_unlock_mutex(mqttclient_mutex);
- Log(TRACE_MIN, -1, "sleeping");
- MQTTClient_sleep(100L);
- Thread_lock_mutex(mqttclient_mutex);
- }
- }
- rc = 1;
- }
- }
- FUNC_EXIT_RC(rc);
- }
- int MQTTClient_setCallbacks(MQTTClient handle, void* context, MQTTClient_connectionLost* cl,
- MQTTClient_messageArrived* ma, MQTTClient_deliveryComplete* dc)
- {
- int rc = MQTTCLIENT_SUCCESS;
- MQTTClients* m = handle;
- FUNC_ENTRY;
- Thread_lock_mutex(mqttclient_mutex);
- if (m == NULL || ma == NULL || m->c->connect_state != NOT_IN_PROGRESS)
- rc = MQTTCLIENT_FAILURE;
- else
- {
- m->context = context;
- m->cl = cl;
- m->ma = ma;
- m->dc = dc;
- }
- Thread_unlock_mutex(mqttclient_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- static void MQTTClient_closeSession(Clients* client, enum MQTTReasonCodes reason, MQTTProperties* props)
- {
- FUNC_ENTRY;
- client->good = 0;
- client->ping_outstanding = 0;
- if (client->net.socket > 0)
- {
- if (client->connected)
- MQTTPacket_send_disconnect(client, reason, props);
- Thread_lock_mutex(socket_mutex);
- WebSocket_close(&client->net, WebSocket_CLOSE_NORMAL, NULL);
- #if defined(OPENSSL)
- SSLSocket_close(&client->net);
- #endif
- Socket_close(client->net.socket);
- Thread_unlock_mutex(socket_mutex);
- client->net.socket = 0;
- #if defined(OPENSSL)
- client->net.ssl = NULL;
- #endif
- }
- client->connected = 0;
- client->connect_state = NOT_IN_PROGRESS;
- if (client->MQTTVersion < MQTTVERSION_5 && client->cleansession)
- MQTTClient_cleanSession(client);
- FUNC_EXIT;
- }
- static int MQTTClient_cleanSession(Clients* client)
- {
- int rc = 0;
- FUNC_ENTRY;
- #if !defined(NO_PERSISTENCE)
- rc = MQTTPersistence_clear(client);
- #endif
- MQTTProtocol_emptyMessageList(client->inboundMsgs);
- MQTTProtocol_emptyMessageList(client->outboundMsgs);
- MQTTClient_emptyMessageQueue(client);
- client->msgID = 0;
- FUNC_EXIT_RC(rc);
- return rc;
- }
- void Protocol_processPublication(Publish* publish, Clients* client)
- {
- qEntry* qe = NULL;
- MQTTClient_message* mm = NULL;
- MQTTClient_message initialized = MQTTClient_message_initializer;
- FUNC_ENTRY;
- qe = malloc(sizeof(qEntry));
- mm = malloc(sizeof(MQTTClient_message));
- memcpy(mm, &initialized, sizeof(MQTTClient_message));
- qe->msg = mm;
- qe->topicName = publish->topic;
- qe->topicLen = publish->topiclen;
- publish->topic = NULL;
- /* If the message is QoS 2, then we have already stored the incoming payload
- * in an allocated buffer, so we don't need to copy again.
- */
- if (publish->header.bits.qos == 2)
- mm->payload = publish->payload;
- else
- {
- mm->payload = malloc(publish->payloadlen);
- memcpy(mm->payload, publish->payload, publish->payloadlen);
- }
- mm->payloadlen = publish->payloadlen;
- mm->qos = publish->header.bits.qos;
- mm->retained = publish->header.bits.retain;
- if (publish->header.bits.qos == 2)
- mm->dup = 0; /* ensure that a QoS2 message is not passed to the application with dup = 1 */
- else
- mm->dup = publish->header.bits.dup;
- mm->msgid = publish->msgId;
- if (publish->MQTTVersion >= 5)
- mm->properties = MQTTProperties_copy(&publish->properties);
- ListAppend(client->messageQueue, qe, sizeof(qe) + sizeof(mm) + mm->payloadlen + strlen(qe->topicName)+1);
- #if !defined(NO_PERSISTENCE)
- if (client->persistence)
- MQTTPersistence_persistQueueEntry(client, (MQTTPersistence_qEntry*)qe);
- #endif
- FUNC_EXIT;
- }
- static MQTTResponse MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI, int MQTTVersion,
- START_TIME_TYPE start, long millisecsTimeout, MQTTProperties* connectProperties, MQTTProperties* willProperties)
- {
- MQTTClients* m = handle;
- int rc = SOCKET_ERROR;
- int sessionPresent = 0;
- MQTTResponse resp = MQTTResponse_initializer;
- FUNC_ENTRY;
- resp.reasonCode = SOCKET_ERROR;
- if (m->ma && !running)
- {
- Thread_start(MQTTClient_run, handle);
- if (MQTTClient_elapsed(start) >= millisecsTimeout)
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- MQTTClient_sleep(100L);
- }
- Log(TRACE_MIN, -1, "Connecting to serverURI %s with MQTT version %d", serverURI, MQTTVersion);
- #if defined(OPENSSL)
- rc = MQTTProtocol_connect(serverURI, m->c, m->ssl, m->websocket, MQTTVersion, connectProperties, willProperties);
- #else
- rc = MQTTProtocol_connect(serverURI, m->c, m->websocket, MQTTVersion, connectProperties, willProperties);
- #endif
- if (rc == SOCKET_ERROR)
- goto exit;
- if (m->c->connect_state == NOT_IN_PROGRESS)
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- if (m->c->connect_state == TCP_IN_PROGRESS) /* TCP connect started - wait for completion */
- {
- Thread_unlock_mutex(mqttclient_mutex);
- MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTClient_elapsed(start));
- Thread_lock_mutex(mqttclient_mutex);
- if (rc != 0)
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- #if defined(OPENSSL)
- if (m->ssl)
- {
- int port;
- size_t hostname_len;
- const char *topic;
- int setSocketForSSLrc = 0;
- hostname_len = MQTTProtocol_addressPort(m->serverURI, &port, &topic);
- setSocketForSSLrc = SSLSocket_setSocketForSSL(&m->c->net, m->c->sslopts,
- m->serverURI, hostname_len);
- if (setSocketForSSLrc != MQTTCLIENT_SUCCESS)
- {
- if (m->c->session != NULL)
- if ((rc = SSL_set_session(m->c->net.ssl, m->c->session)) != 1)
- Log(TRACE_MIN, -1, "Failed to set SSL session with stored data, non critical");
- rc = m->c->sslopts->struct_version >= 3 ?
- SSLSocket_connect(m->c->net.ssl, m->c->net.socket, m->serverURI,
- m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
- SSLSocket_connect(m->c->net.ssl, m->c->net.socket, m->serverURI,
- m->c->sslopts->verify, NULL, NULL);
- if (rc == TCPSOCKET_INTERRUPTED)
- m->c->connect_state = SSL_IN_PROGRESS; /* the connect is still in progress */
- else if (rc == SSL_FATAL)
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- else if (rc == 1)
- {
- if (m->websocket)
- {
- m->c->connect_state = WEBSOCKET_IN_PROGRESS;
- rc = WebSocket_connect(&m->c->net,m->serverURI);
- if ( rc == SOCKET_ERROR )
- goto exit;
- }
- else
- {
- rc = MQTTCLIENT_SUCCESS;
- m->c->connect_state = WAIT_FOR_CONNACK;
- if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- if ((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
- m->c->session = SSL_get1_session(m->c->net.ssl);
- }
- }
- }
- else
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- }
- #endif
- else if (m->websocket)
- {
- m->c->connect_state = WEBSOCKET_IN_PROGRESS;
- if ( WebSocket_connect(&m->c->net, m->serverURI) == SOCKET_ERROR )
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- }
- else
- {
- m->c->connect_state = WAIT_FOR_CONNACK; /* TCP connect completed, in which case send the MQTT connect packet */
- if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- }
- }
- #if defined(OPENSSL)
- if (m->c->connect_state == SSL_IN_PROGRESS) /* SSL connect sent - wait for completion */
- {
- Thread_unlock_mutex(mqttclient_mutex);
- MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTClient_elapsed(start));
- Thread_lock_mutex(mqttclient_mutex);
- if (rc != 1)
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- if((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
- m->c->session = SSL_get1_session(m->c->net.ssl);
- if ( m->websocket )
- {
- /* wait for websocket connect */
- m->c->connect_state = WEBSOCKET_IN_PROGRESS;
- rc = WebSocket_connect( &m->c->net, m->serverURI );
- if ( rc != 1 )
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- }
- else
- {
- m->c->connect_state = WAIT_FOR_CONNACK; /* TCP connect completed, in which case send the MQTT connect packet */
- if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- }
- }
- #endif
- if (m->c->connect_state == WEBSOCKET_IN_PROGRESS) /* websocket request sent - wait for upgrade */
- {
- Thread_unlock_mutex(mqttclient_mutex);
- MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTClient_elapsed(start));
- Thread_lock_mutex(mqttclient_mutex);
- m->c->connect_state = WAIT_FOR_CONNACK; /* websocket upgrade complete */
- if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- }
- if (m->c->connect_state == WAIT_FOR_CONNACK) /* MQTT connect sent - wait for CONNACK */
- {
- MQTTPacket* pack = NULL;
- Thread_unlock_mutex(mqttclient_mutex);
- pack = MQTTClient_waitfor(handle, CONNACK, &rc, millisecsTimeout - MQTTClient_elapsed(start));
- Thread_lock_mutex(mqttclient_mutex);
- if (pack == NULL)
- rc = SOCKET_ERROR;
- else
- {
- Connack* connack = (Connack*)pack;
- Log(TRACE_PROTOCOL, 1, NULL, m->c->net.socket, m->c->clientID, connack->rc);
- if ((rc = connack->rc) == MQTTCLIENT_SUCCESS)
- {
- m->c->connected = 1;
- m->c->good = 1;
- m->c->connect_state = NOT_IN_PROGRESS;
- if (MQTTVersion == 4)
- sessionPresent = connack->flags.bits.sessionPresent;
- if (m->c->cleansession || m->c->cleanstart)
- rc = MQTTClient_cleanSession(m->c);
- if (m->c->outboundMsgs->count > 0)
- {
- ListElement* outcurrent = NULL;
- while (ListNextElement(m->c->outboundMsgs, &outcurrent))
- {
- Messages* m = (Messages*)(outcurrent->content);
- m->lastTouch = 0;
- }
- MQTTProtocol_retry((time_t)0, 1, 1);
- if (m->c->connected != 1)
- rc = MQTTCLIENT_DISCONNECTED;
- }
- if (m->c->MQTTVersion == MQTTVERSION_5)
- {
- resp.properties = malloc(sizeof(MQTTProperties));
- *resp.properties = MQTTProperties_copy(&connack->properties);
- }
- }
- MQTTPacket_freeConnack(connack);
- m->pack = NULL;
- }
- }
- exit:
- if (rc == MQTTCLIENT_SUCCESS)
- {
- if (options->struct_version >= 4) /* means we have to fill out return values */
- {
- options->returned.serverURI = serverURI;
- options->returned.MQTTVersion = MQTTVersion;
- options->returned.sessionPresent = sessionPresent;
- }
- }
- else
- MQTTClient_disconnect1(handle, 0, 0, (MQTTVersion == 3), MQTTREASONCODE_SUCCESS, NULL); /* don't want to call connection lost */
- resp.reasonCode = rc;
- FUNC_EXIT_RC(resp.reasonCode);
- return resp;
- }
- static int retryLoopInterval = 5;
- static void setRetryLoopInterval(int keepalive)
- {
- int proposed = keepalive / 10;
- if (proposed < 1)
- proposed = 1;
- else if (proposed > 5)
- proposed = 5;
- if (proposed < retryLoopInterval)
- retryLoopInterval = proposed;
- }
- static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI,
- MQTTProperties* connectProperties, MQTTProperties* willProperties)
- {
- MQTTClients* m = handle;
- START_TIME_TYPE start;
- long millisecsTimeout = 30000L;
- MQTTResponse rc = MQTTResponse_initializer;
- int MQTTVersion = 0;
- FUNC_ENTRY;
- rc.reasonCode = SOCKET_ERROR;
- millisecsTimeout = options->connectTimeout * 1000;
- start = MQTTClient_start_clock();
- m->c->keepAliveInterval = options->keepAliveInterval;
- m->c->retryInterval = options->retryInterval;
- setRetryLoopInterval(options->keepAliveInterval);
- m->c->MQTTVersion = options->MQTTVersion;
- m->c->cleanstart = m->c->cleansession = 0;
- if (m->c->MQTTVersion >= MQTTVERSION_5)
- m->c->cleanstart = options->cleanstart;
- else
- m->c->cleansession = options->cleansession;
- m->c->maxInflightMessages = (options->reliable) ? 1 : 10;
- if (options->struct_version >= 6)
- {
- if (options->maxInflightMessages > 0)
- m->c->maxInflightMessages = options->maxInflightMessages;
- }
- if (m->c->will)
- {
- free(m->c->will->payload);
- free(m->c->will->topic);
- free(m->c->will);
- m->c->will = NULL;
- }
- if (options->will && (options->will->struct_version == 0 || options->will->struct_version == 1))
- {
- const void* source = NULL;
- m->c->will = malloc(sizeof(willMessages));
- if (options->will->message || (options->will->struct_version == 1 && options->will->payload.data))
- {
- if (options->will->struct_version == 1 && options->will->payload.data)
- {
- m->c->will->payloadlen = options->will->payload.len;
- source = options->will->payload.data;
- }
- else
- {
- m->c->will->payloadlen = (int)strlen(options->will->message);
- source = (void*)options->will->message;
- }
- m->c->will->payload = malloc(m->c->will->payloadlen);
- memcpy(m->c->will->payload, source, m->c->will->payloadlen);
- }
- else
- {
- m->c->will->payload = NULL;
- m->c->will->payloadlen = 0;
- }
- m->c->will->qos = options->will->qos;
- m->c->will->retained = options->will->retained;
- m->c->will->topic = MQTTStrdup(options->will->topicName);
- }
- #if defined(OPENSSL)
- if (m->c->sslopts)
- {
- if (m->c->sslopts->trustStore)
- free((void*)m->c->sslopts->trustStore);
- if (m->c->sslopts->keyStore)
- free((void*)m->c->sslopts->keyStore);
- if (m->c->sslopts->privateKey)
- free((void*)m->c->sslopts->privateKey);
- if (m->c->sslopts->privateKeyPassword)
- free((void*)m->c->sslopts->privateKeyPassword);
- if (m->c->sslopts->enabledCipherSuites)
- free((void*)m->c->sslopts->enabledCipherSuites);
- if (m->c->sslopts->struct_version >= 2)
- {
- if (m->c->sslopts->CApath)
- free((void*)m->c->sslopts->CApath);
- }
- free(m->c->sslopts);
- m->c->sslopts = NULL;
- }
- if (options->struct_version != 0 && options->ssl)
- {
- m->c->sslopts = malloc(sizeof(MQTTClient_SSLOptions));
- memset(m->c->sslopts, '\0', sizeof(MQTTClient_SSLOptions));
- m->c->sslopts->struct_version = options->ssl->struct_version;
- if (options->ssl->trustStore)
- m->c->sslopts->trustStore = MQTTStrdup(options->ssl->trustStore);
- if (options->ssl->keyStore)
- m->c->sslopts->keyStore = MQTTStrdup(options->ssl->keyStore);
- if (options->ssl->privateKey)
- m->c->sslopts->privateKey = MQTTStrdup(options->ssl->privateKey);
- if (options->ssl->privateKeyPassword)
- m->c->sslopts->privateKeyPassword = MQTTStrdup(options->ssl->privateKeyPassword);
- if (options->ssl->enabledCipherSuites)
- m->c->sslopts->enabledCipherSuites = MQTTStrdup(options->ssl->enabledCipherSuites);
- m->c->sslopts->enableServerCertAuth = options->ssl->enableServerCertAuth;
- if (m->c->sslopts->struct_version >= 1)
- m->c->sslopts->sslVersion = options->ssl->sslVersion;
- if (m->c->sslopts->struct_version >= 2)
- {
- m->c->sslopts->verify = options->ssl->verify;
- if (options->ssl->CApath)
- m->c->sslopts->CApath = MQTTStrdup(options->ssl->CApath);
- }
- if (m->c->sslopts->struct_version >= 3)
- {
- m->c->sslopts->ssl_error_cb = options->ssl->ssl_error_cb;
- m->c->sslopts->ssl_error_context = options->ssl->ssl_error_context;
- }
- if (m->c->sslopts->struct_version >= 4)
- {
- m->c->sslopts->ssl_psk_cb = options->ssl->ssl_psk_cb;
- m->c->sslopts->ssl_psk_context = options->ssl->ssl_psk_context;
- m->c->sslopts->disableDefaultTrustStore = options->ssl->disableDefaultTrustStore;
- }
- }
- #endif
- if (m->c->username)
- free((void*)m->c->username);
- if (options->username)
- m->c->username = MQTTStrdup(options->username);
- if (m->c->password)
- free((void*)m->c->password);
- if (options->password)
- {
- m->c->password = MQTTStrdup(options->password);
- m->c->passwordlen = (int)strlen(options->password);
- }
- else if (options->struct_version >= 5 && options->binarypwd.data)
- {
- m->c->passwordlen = options->binarypwd.len;
- m->c->password = malloc(m->c->passwordlen);
- memcpy((void*)m->c->password, options->binarypwd.data, m->c->passwordlen);
- }
- if (options->struct_version >= 3)
- MQTTVersion = options->MQTTVersion;
- else
- MQTTVersion = MQTTVERSION_DEFAULT;
- if (MQTTVersion == MQTTVERSION_DEFAULT)
- {
- rc = MQTTClient_connectURIVersion(handle, options, serverURI, 4, start, millisecsTimeout,
- connectProperties, willProperties);
- if (rc.reasonCode != MQTTCLIENT_SUCCESS)
- {
- rc = MQTTClient_connectURIVersion(handle, options, serverURI, 3, start, millisecsTimeout,
- connectProperties, willProperties);
- }
- }
- else
- rc = MQTTClient_connectURIVersion(handle, options, serverURI, MQTTVersion, start, millisecsTimeout,
- connectProperties, willProperties);
- FUNC_EXIT_RC(rc.reasonCode);
- return rc;
- }
- MQTTResponse MQTTClient_connectAll(MQTTClient handle, MQTTClient_connectOptions* options,
- MQTTProperties* connectProperties, MQTTProperties* willProperties);
- int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
- {
- MQTTClients* m = handle;
- MQTTResponse response;
- if (m->c->MQTTVersion >= MQTTVERSION_5)
- return MQTTCLIENT_WRONG_MQTT_VERSION;
- response = MQTTClient_connectAll(handle, options, NULL, NULL);
- return response.reasonCode;
- }
- MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connectOptions* options,
- MQTTProperties* connectProperties, MQTTProperties* willProperties)
- {
- MQTTClients* m = handle;
- MQTTResponse response = MQTTResponse_initializer;
- if (m->c->MQTTVersion < MQTTVERSION_5)
- {
- response.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
- return response;
- }
- return MQTTClient_connectAll(handle, options, connectProperties, willProperties);
- }
- MQTTResponse MQTTClient_connectAll(MQTTClient handle, MQTTClient_connectOptions* options,
- MQTTProperties* connectProperties, MQTTProperties* willProperties)
- {
- MQTTClients* m = handle;
- MQTTResponse rc = MQTTResponse_initializer;
- FUNC_ENTRY;
- Thread_lock_mutex(connect_mutex);
- Thread_lock_mutex(mqttclient_mutex);
- rc.reasonCode = SOCKET_ERROR;
- if (options == NULL)
- {
- rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
- goto exit;
- }
- if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 6)
- {
- rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
- goto exit;
- }
- #if defined(OPENSSL)
- if (m->ssl && options->ssl == NULL)
- {
- rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
- goto exit;
- }
- #endif
- if (options->will) /* check validity of will options structure */
- {
- if (strncmp(options->will->struct_id, "MQTW", 4) != 0 || (options->will->struct_version != 0 && options->will->struct_version != 1))
- {
- rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
- goto exit;
- }
- }
- #if defined(OPENSSL)
- if (options->struct_version != 0 && options->ssl) /* check validity of SSL options structure */
- {
- if (strncmp(options->ssl->struct_id, "MQTS", 4) != 0 || options->ssl->struct_version < 0 || options->ssl->struct_version > 4)
- {
- rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
- goto exit;
- }
- }
- #endif
- if ((options->username && !UTF8_validateString(options->username)) ||
- (options->password && !UTF8_validateString(options->password)))
- {
- rc.reasonCode = MQTTCLIENT_BAD_UTF8_STRING;
- goto exit;
- }
- if (options->MQTTVersion != MQTTVERSION_DEFAULT &&
- (options->MQTTVersion < MQTTVERSION_3_1 || options->MQTTVersion > MQTTVERSION_5))
- {
- rc.reasonCode = MQTTCLIENT_BAD_MQTT_VERSION;
- goto exit;
- }
- if (options->MQTTVersion >= MQTTVERSION_5)
- {
- if (options->cleansession != 0)
- {
- rc.reasonCode = MQTTCLIENT_BAD_MQTT_OPTION;
- goto exit;
- }
- }
- else if (options->cleanstart != 0)
- {
- rc.reasonCode = MQTTCLIENT_BAD_MQTT_OPTION;
- goto exit;
- }
- if (options->struct_version < 2 || options->serverURIcount == 0)
- {
- if ( !m )
- {
- rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
- goto exit;
- }
- rc = MQTTClient_connectURI(handle, options, m->serverURI, connectProperties, willProperties);
- }
- else
- {
- int i;
- for (i = 0; i < options->serverURIcount; ++i)
- {
- char* serverURI = options->serverURIs[i];
- if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
- serverURI += strlen(URI_TCP);
- else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
- {
- serverURI += strlen(URI_WS);
- m->websocket = 1;
- }
- #if defined(OPENSSL)
- else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
- {
- serverURI += strlen(URI_SSL);
- m->ssl = 1;
- }
- else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
- {
- serverURI += strlen(URI_WSS);
- m->ssl = 1;
- m->websocket = 1;
- }
- #endif
- rc = MQTTClient_connectURI(handle, options, serverURI, connectProperties, willProperties);
- if (rc.reasonCode == MQTTREASONCODE_SUCCESS)
- break;
- }
- }
- if (rc.reasonCode == MQTTREASONCODE_SUCCESS)
- {
- if (rc.properties && MQTTProperties_hasProperty(rc.properties, MQTTPROPERTY_CODE_RECEIVE_MAXIMUM))
- {
- int recv_max = MQTTProperties_getNumericValue(rc.properties, MQTTPROPERTY_CODE_RECEIVE_MAXIMUM);
- if (m->c->maxInflightMessages > recv_max)
- m->c->maxInflightMessages = recv_max;
- }
- }
- exit:
- if (m && m->c && m->c->will)
- {
- if (m->c->will->payload)
- free(m->c->will->payload);
- if (m->c->will->topic)
- free(m->c->will->topic);
- free(m->c->will);
- m->c->will = NULL;
- }
- Thread_unlock_mutex(mqttclient_mutex);
- Thread_unlock_mutex(connect_mutex);
- FUNC_EXIT_RC(rc.reasonCode);
- return rc;
- }
- /**
- * mqttclient_mutex must be locked when you call this function, if multi threaded
- */
- static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int call_connection_lost, int stop,
- enum MQTTReasonCodes reason, MQTTProperties* props)
- {
- MQTTClients* m = handle;
- START_TIME_TYPE start;
- int rc = MQTTCLIENT_SUCCESS;
- int was_connected = 0;
- FUNC_ENTRY;
- if (m == NULL || m->c == NULL)
- {
- rc = MQTTCLIENT_FAILURE;
- goto exit;
- }
- if (m->c->connected == 0 && m->c->connect_state == NOT_IN_PROGRESS)
- {
- rc = MQTTCLIENT_DISCONNECTED;
- goto exit;
- }
- was_connected = m->c->connected; /* should be 1 */
- if (m->c->connected != 0)
- {
- start = MQTTClient_start_clock();
- m->c->connect_state = DISCONNECTING; /* indicate disconnecting */
- while (m->c->inboundMsgs->count > 0 || m->c->outboundMsgs->count > 0)
- { /* wait for all inflight message flows to finish, up to timeout */
- if (MQTTClient_elapsed(start) >= timeout)
- break;
- Thread_unlock_mutex(mqttclient_mutex);
- MQTTClient_yield();
- Thread_lock_mutex(mqttclient_mutex);
- }
- }
- MQTTClient_closeSession(m->c, reason, props);
- exit:
- if (stop)
- MQTTClient_stop();
- if (call_connection_lost && m->cl && was_connected)
- {
- Log(TRACE_MIN, -1, "Calling connectionLost for client %s", m->c->clientID);
- Thread_start(connectionLost_call, m);
- }
- FUNC_EXIT_RC(rc);
- return rc;
- }
- /**
- * mqttclient_mutex must be locked when you call this function, if multi threaded
- */
- static int MQTTClient_disconnect_internal(MQTTClient handle, int timeout)
- {
- return MQTTClient_disconnect1(handle, timeout, 1, 1, MQTTREASONCODE_SUCCESS, NULL);
- }
- /**
- * mqttclient_mutex must be locked when you call this function, if multi threaded
- */
- void MQTTProtocol_closeSession(Clients* c, int sendwill)
- {
- MQTTClient_disconnect_internal((MQTTClient)c->context, 0);
- }
- int MQTTClient_disconnect(MQTTClient handle, int timeout)
- {
- int rc = 0;
- Thread_lock_mutex(mqttclient_mutex);
- rc = MQTTClient_disconnect1(handle, timeout, 0, 1, MQTTREASONCODE_SUCCESS, NULL);
- Thread_unlock_mutex(mqttclient_mutex);
- return rc;
- }
- int MQTTClient_disconnect5(MQTTClient handle, int timeout, enum MQTTReasonCodes reason, MQTTProperties* props)
- {
- int rc = 0;
- Thread_lock_mutex(mqttclient_mutex);
- rc = MQTTClient_disconnect1(handle, timeout, 0, 1, reason, props);
- Thread_unlock_mutex(mqttclient_mutex);
- return rc;
- }
- int MQTTClient_isConnected(MQTTClient handle)
- {
- MQTTClients* m = handle;
- int rc = 0;
- FUNC_ENTRY;
- Thread_lock_mutex(mqttclient_mutex);
- if (m && m->c)
- rc = m->c->connected;
- Thread_unlock_mutex(mqttclient_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- MQTTResponse MQTTClient_subscribeMany5(MQTTClient handle, int count, char* const* topic,
- int* qos, MQTTSubscribe_options* opts, MQTTProperties* props)
- {
- MQTTClients* m = handle;
- List* topics = NULL;
- List* qoss = NULL;
- int i = 0;
- int rc = MQTTCLIENT_FAILURE;
- MQTTResponse resp = MQTTResponse_initializer;
- int msgid = 0;
- FUNC_ENTRY;
- Thread_lock_mutex(subscribe_mutex);
- Thread_lock_mutex(mqttclient_mutex);
- resp.reasonCode = MQTTCLIENT_FAILURE;
- if (m == NULL || m->c == NULL)
- {
- rc = MQTTCLIENT_FAILURE;
- goto exit;
- }
- if (m->c->connected == 0)
- {
- rc = MQTTCLIENT_DISCONNECTED;
- goto exit;
- }
- for (i = 0; i < count; i++)
- {
- if (!UTF8_validateString(topic[i]))
- {
- rc = MQTTCLIENT_BAD_UTF8_STRING;
- goto exit;
- }
- if (qos[i] < 0 || qos[i] > 2)
- {
- rc = MQTTCLIENT_BAD_QOS;
- goto exit;
- }
- }
- if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
- {
- rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
- goto exit;
- }
- topics = ListInitialize();
- qoss = ListInitialize();
- for (i = 0; i < count; i++)
- {
- ListAppend(topics, topic[i], strlen(topic[i]));
- ListAppend(qoss, &qos[i], sizeof(int));
- }
- rc = MQTTProtocol_subscribe(m->c, topics, qoss, msgid, opts, props);
- ListFreeNoContent(topics);
- ListFreeNoContent(qoss);
- if (rc == TCPSOCKET_COMPLETE)
- {
- MQTTPacket* pack = NULL;
- Thread_unlock_mutex(mqttclient_mutex);
- pack = MQTTClient_waitfor(handle, SUBACK, &rc, 10000L);
- Thread_lock_mutex(mqttclient_mutex);
- if (pack != NULL)
- {
- Suback* sub = (Suback*)pack;
- if (m->c->MQTTVersion == MQTTVERSION_5)
- {
- if (sub->properties.count > 0)
- {
- resp.properties = malloc(sizeof(MQTTProperties));
- *resp.properties = MQTTProperties_copy(&sub->properties);
- }
- resp.reasonCodeCount = sub->qoss->count;
- resp.reasonCode = *(int*)sub->qoss->first->content;
- if (sub->qoss->count > 1)
- {
- ListElement* current = NULL;
- int rc_count = 0;
- resp.reasonCodes = malloc(sizeof(enum MQTTReasonCodes) * (sub->qoss->count));
- while (ListNextElement(sub->qoss, ¤t))
- (resp.reasonCodes)[rc_count++] = *(enum MQTTReasonCodes*)(current->content);
- }
- }
- else
- {
- ListElement* current = NULL;
- i = 0;
- while (ListNextElement(sub->qoss, ¤t))
- {
- int* reqqos = (int*)(current->content);
- qos[i++] = *reqqos;
- }
- resp.reasonCode = rc;
- }
- rc = MQTTProtocol_handleSubacks(pack, m->c->net.socket);
- m->pack = NULL;
- }
- else
- rc = SOCKET_ERROR;
- }
- if (rc == SOCKET_ERROR)
- MQTTClient_disconnect_internal(handle, 0);
- else if (rc == TCPSOCKET_COMPLETE)
- rc = MQTTCLIENT_SUCCESS;
- exit:
- if (rc < 0)
- resp.reasonCode = rc;
- Thread_unlock_mutex(mqttclient_mutex);
- Thread_unlock_mutex(subscribe_mutex);
- FUNC_EXIT_RC(resp.reasonCode);
- return resp;
- }
- int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, int* qos)
- {
- MQTTClients* m = handle;
- MQTTResponse response = MQTTResponse_initializer;
- if (m->c->MQTTVersion >= MQTTVERSION_5)
- response.reasonCode = MQTTCLIENT_BAD_MQTT_VERSION;
- else
- response = MQTTClient_subscribeMany5(handle, count, topic, qos, NULL, NULL);
- return response.reasonCode;
- }
- MQTTResponse MQTTClient_subscribe5(MQTTClient handle, const char* topic, int qos,
- MQTTSubscribe_options* opts, MQTTProperties* props)
- {
- MQTTResponse rc;
- FUNC_ENTRY;
- rc = MQTTClient_subscribeMany5(handle, 1, (char * const *)(&topic), &qos, opts, props);
- if (qos == MQTT_BAD_SUBSCRIBE) /* addition for MQTT 3.1.1 - error code from subscribe */
- rc.reasonCode = MQTT_BAD_SUBSCRIBE;
- FUNC_EXIT_RC(rc.reasonCode);
- return rc;
- }
- int MQTTClient_subscribe(MQTTClient handle, const char* topic, int qos)
- {
- MQTTClients* m = handle;
- MQTTResponse response = MQTTResponse_initializer;
- if (m->c->MQTTVersion >= MQTTVERSION_5)
- response.reasonCode = MQTTCLIENT_BAD_MQTT_VERSION;
- else
- response = MQTTClient_subscribe5(handle, topic, qos, NULL, NULL);
- return response.reasonCode;
- }
- MQTTResponse MQTTClient_unsubscribeMany5(MQTTClient handle, int count, char* const* topic, MQTTProperties* props)
- {
- MQTTClients* m = handle;
- List* topics = NULL;
- int i = 0;
- int rc = SOCKET_ERROR;
- MQTTResponse resp = MQTTResponse_initializer;
- int msgid = 0;
- FUNC_ENTRY;
- Thread_lock_mutex(unsubscribe_mutex);
- Thread_lock_mutex(mqttclient_mutex);
- resp.reasonCode = MQTTCLIENT_FAILURE;
- if (m == NULL || m->c == NULL)
- {
- rc = MQTTCLIENT_FAILURE;
- goto exit;
- }
- if (m->c->connected == 0)
- {
- rc = MQTTCLIENT_DISCONNECTED;
- goto exit;
- }
- for (i = 0; i < count; i++)
- {
- if (!UTF8_validateString(topic[i]))
- {
- rc = MQTTCLIENT_BAD_UTF8_STRING;
- goto exit;
- }
- }
- if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
- {
- rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
- goto exit;
- }
- topics = ListInitialize();
- for (i = 0; i < count; i++)
- ListAppend(topics, topic[i], strlen(topic[i]));
- rc = MQTTProtocol_unsubscribe(m->c, topics, msgid, props);
- ListFreeNoContent(topics);
- if (rc == TCPSOCKET_COMPLETE)
- {
- MQTTPacket* pack = NULL;
- Thread_unlock_mutex(mqttclient_mutex);
- pack = MQTTClient_waitfor(handle, UNSUBACK, &rc, 10000L);
- Thread_lock_mutex(mqttclient_mutex);
- if (pack != NULL)
- {
- Unsuback* unsub = (Unsuback*)pack;
- if (m->c->MQTTVersion == MQTTVERSION_5)
- {
- if (unsub->properties.count > 0)
- {
- resp.properties = malloc(sizeof(MQTTProperties));
- *resp.properties = MQTTProperties_copy(&unsub->properties);
- }
- resp.reasonCodeCount = unsub->reasonCodes->count;
- resp.reasonCode = *(int*)unsub->reasonCodes->first->content;
- if (unsub->reasonCodes->count > 1)
- {
- ListElement* current = NULL;
- int rc_count = 0;
- resp.reasonCodes = malloc(sizeof(enum MQTTReasonCodes) * (unsub->reasonCodes->count));
- while (ListNextElement(unsub->reasonCodes, ¤t))
- (resp.reasonCodes)[rc_count++] = *(enum MQTTReasonCodes*)(current->content);
- }
- }
- else
- resp.reasonCode = rc;
- rc = MQTTProtocol_handleUnsubacks(pack, m->c->net.socket);
- m->pack = NULL;
- }
- else
- rc = SOCKET_ERROR;
- }
- if (rc == SOCKET_ERROR)
- MQTTClient_disconnect_internal(handle, 0);
- exit:
- if (rc < 0)
- resp.reasonCode = rc;
- Thread_unlock_mutex(mqttclient_mutex);
- Thread_unlock_mutex(unsubscribe_mutex);
- FUNC_EXIT_RC(resp.reasonCode);
- return resp;
- }
- int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic)
- {
- MQTTResponse response = MQTTClient_unsubscribeMany5(handle, count, topic, NULL);
- return response.reasonCode;
- }
- MQTTResponse MQTTClient_unsubscribe5(MQTTClient handle, const char* topic, MQTTProperties* props)
- {
- MQTTResponse rc;
- rc = MQTTClient_unsubscribeMany5(handle, 1, (char * const *)(&topic), props);
- return rc;
- }
- int MQTTClient_unsubscribe(MQTTClient handle, const char* topic)
- {
- MQTTResponse response = MQTTClient_unsubscribe5(handle, topic, NULL);
- return response.reasonCode;
- }
- MQTTResponse MQTTClient_publish5(MQTTClient handle, const char* topicName, int payloadlen, const void* payload,
- int qos, int retained, MQTTProperties* properties, MQTTClient_deliveryToken* deliveryToken)
- {
- int rc = MQTTCLIENT_SUCCESS;
- MQTTClients* m = handle;
- Messages* msg = NULL;
- Publish* p = NULL;
- int blocked = 0;
- int msgid = 0;
- MQTTResponse resp = MQTTResponse_initializer;
- FUNC_ENTRY;
- Thread_lock_mutex(mqttclient_mutex);
- if (m == NULL || m->c == NULL)
- rc = MQTTCLIENT_FAILURE;
- else if (m->c->connected == 0)
- rc = MQTTCLIENT_DISCONNECTED;
- else if (!UTF8_validateString(topicName))
- rc = MQTTCLIENT_BAD_UTF8_STRING;
- if (rc != MQTTCLIENT_SUCCESS)
- goto exit;
- /* If outbound queue is full, block until it is not */
- while (m->c->outboundMsgs->count >= m->c->maxInflightMessages ||
- Socket_noPendingWrites(m->c->net.socket) == 0) /* wait until the socket is free of large packets being written */
- {
- if (blocked == 0)
- {
- blocked = 1;
- Log(TRACE_MIN, -1, "Blocking publish on queue full for client %s", m->c->clientID);
- }
- Thread_unlock_mutex(mqttclient_mutex);
- MQTTClient_yield();
- Thread_lock_mutex(mqttclient_mutex);
- if (m->c->connected == 0)
- {
- rc = MQTTCLIENT_FAILURE;
- goto exit;
- }
- }
- if (blocked == 1)
- Log(TRACE_MIN, -1, "Resuming publish now queue not full for client %s", m->c->clientID);
- if (qos > 0 && (msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
- { /* this should never happen as we've waited for spaces in the queue */
- rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
- goto exit;
- }
- p = malloc(sizeof(Publish) + payloadlen);
- p->payload = (void*)payload;
- p->payloadlen = payloadlen;
- if (payloadlen > 0)
- {
- p->payload = (char*)p + sizeof(Publish);
- memcpy(p->payload, payload, payloadlen);
- p->payloadlen = payloadlen;
- }
- p->topic = MQTTStrdup(topicName);
- p->msgId = msgid;
- p->MQTTVersion = m->c->MQTTVersion;
- if (m->c->MQTTVersion >= MQTTVERSION_5)
- {
- if (properties)
- p->properties = *properties;
- else
- {
- MQTTProperties props = MQTTProperties_initializer;
- p->properties = props;
- }
- }
- rc = MQTTProtocol_startPublish(m->c, p, qos, retained, &msg);
- /* If the packet was partially written to the socket, wait for it to complete.
- * However, if the client is disconnected during this time and qos is not 0, still return success, as
- * the packet has already been written to persistence and assigned a message id so will
- * be sent when the client next connects.
- */
- if (rc == TCPSOCKET_INTERRUPTED)
- {
- while (m->c->connected == 1 && SocketBuffer_getWrite(m->c->net.socket))
- {
- Thread_unlock_mutex(mqttclient_mutex);
- MQTTClient_yield();
- Thread_lock_mutex(mqttclient_mutex);
- }
- rc = (qos > 0 || m->c->connected == 1) ? MQTTCLIENT_SUCCESS : MQTTCLIENT_FAILURE;
- }
- if (deliveryToken && qos > 0)
- *deliveryToken = msg->msgid;
- if (p->topic) free(p->topic);
- free(p);
- if (rc == SOCKET_ERROR)
- {
- MQTTClient_disconnect_internal(handle, 0);
- /* Return success for qos > 0 as the send will be retried automatically */
- rc = (qos > 0) ? MQTTCLIENT_SUCCESS : MQTTCLIENT_FAILURE;
- }
- exit:
- Thread_unlock_mutex(mqttclient_mutex);
- resp.reasonCode = rc;
- FUNC_EXIT_RC(resp.reasonCode);
- return resp;
- }
- int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen, const void* payload,
- int qos, int retained, MQTTClient_deliveryToken* deliveryToken)
- {
- MQTTClients* m = handle;
- MQTTResponse rc = MQTTResponse_initializer;
- if (m->c->MQTTVersion >= MQTTVERSION_5)
- rc.reasonCode = MQTTCLIENT_BAD_MQTT_VERSION;
- else
- rc = MQTTClient_publish5(handle, topicName, payloadlen, payload, qos, retained, NULL, deliveryToken);
- return rc.reasonCode;
- }
- MQTTResponse MQTTClient_publishMessage5(MQTTClient handle, const char* topicName, MQTTClient_message* message,
- MQTTClient_deliveryToken* deliveryToken)
- {
- MQTTResponse rc = MQTTResponse_initializer;
- MQTTProperties* props = NULL;
- FUNC_ENTRY;
- if (message == NULL)
- {
- rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
- goto exit;
- }
- if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
- (message->struct_version != 0 && message->struct_version != 1))
- {
- rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
- goto exit;
- }
- if (message->struct_version >= 1)
- props = &message->properties;
- rc = MQTTClient_publish5(handle, topicName, message->payloadlen, message->payload,
- message->qos, message->retained, props, deliveryToken);
- exit:
- FUNC_EXIT_RC(rc.reasonCode);
- return rc;
- }
- int MQTTClient_publishMessage(MQTTClient handle, const char* topicName, MQTTClient_message* message,
- MQTTClient_deliveryToken* deliveryToken)
- {
- MQTTClients* m = handle;
- MQTTResponse rc = MQTTResponse_initializer;
- if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
- (message->struct_version != 0 && message->struct_version != 1))
- rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
- else if (m->c->MQTTVersion >= MQTTVERSION_5)
- rc.reasonCode = MQTTCLIENT_BAD_MQTT_VERSION;
- else
- rc = MQTTClient_publishMessage5(handle, topicName, message, deliveryToken);
- return rc.reasonCode;
- }
- static void MQTTClient_retry(void)
- {
- static time_t last = 0L;
- time_t now;
- FUNC_ENTRY;
- time(&(now));
- if (difftime(now, last) > retryLoopInterval)
- {
- time(&(last));
- MQTTProtocol_keepalive(now);
- MQTTProtocol_retry(now, 1, 0);
- }
- else
- MQTTProtocol_retry(now, 0, 0);
- FUNC_EXIT;
- }
- static MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc)
- {
- struct timeval tp = {0L, 0L};
- static Ack ack;
- MQTTPacket* pack = NULL;
- FUNC_ENTRY;
- if (timeout > 0L)
- {
- tp.tv_sec = timeout / 1000;
- tp.tv_usec = (timeout % 1000) * 1000; /* this field is microseconds! */
- }
- #if defined(OPENSSL)
- if ((*sock = SSLSocket_getPendingRead()) == -1)
- {
- /* 0 from getReadySocket indicates no work to do, -1 == error, but can happen normally */
- #endif
- *sock = Socket_getReadySocket(0, &tp, socket_mutex);
- #if defined(OPENSSL)
- }
- #endif
- Thread_lock_mutex(mqttclient_mutex);
- if (*sock > 0)
- {
- MQTTClients* m = NULL;
- if (ListFindItem(handles, sock, clientSockCompare) != NULL)
- m = (MQTTClient)(handles->current->content);
- if (m != NULL)
- {
- if (m->c->connect_state == TCP_IN_PROGRESS || m->c->connect_state == SSL_IN_PROGRESS)
- *rc = 0; /* waiting for connect state to clear */
- else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS)
- *rc = WebSocket_upgrade(&m->c->net);
- else
- {
- pack = MQTTPacket_Factory(m->c->MQTTVersion, &m->c->net, rc);
- if (*rc == TCPSOCKET_INTERRUPTED)
- *rc = 0;
- }
- }
- if (pack)
- {
- int freed = 1;
- /* Note that these handle... functions free the packet structure that they are dealing with */
- if (pack->header.bits.type == PUBLISH)
- *rc = MQTTProtocol_handlePublishes(pack, *sock);
- else if (pack->header.bits.type == PUBACK || pack->header.bits.type == PUBCOMP)
- {
- int msgid;
- ack = (pack->header.bits.type == PUBCOMP) ? *(Pubcomp*)pack : *(Puback*)pack;
- msgid = ack.msgId;
- if (m && m->c->MQTTVersion >= MQTTVERSION_5 && m->published)
- {
- Log(TRACE_MIN, -1, "Calling published for client %s, msgid %d", m->c->clientID, msgid);
- (*(m->published))(m->published_context, msgid, pack->header.bits.type, &ack.properties, ack.rc);
- }
- *rc = (pack->header.bits.type == PUBCOMP) ?
- MQTTProtocol_handlePubcomps(pack, *sock) : MQTTProtocol_handlePubacks(pack, *sock);
- if (m && m->dc)
- {
- Log(TRACE_MIN, -1, "Calling deliveryComplete for client %s, msgid %d", m->c->clientID, msgid);
- (*(m->dc))(m->context, msgid);
- }
- }
- else if (pack->header.bits.type == PUBREC)
- {
- Pubrec* pubrec = (Pubrec*)pack;
- if (m && m->c->MQTTVersion >= MQTTVERSION_5 && m->published && pubrec->rc >= MQTTREASONCODE_UNSPECIFIED_ERROR)
- {
- Log(TRACE_MIN, -1, "Calling published for client %s, msgid %d", m->c->clientID, ack.msgId);
- (*(m->published))(m->published_context, pubrec->msgId, pack->header.bits.type,
- &pubrec->properties, pubrec->rc);
- }
- *rc = MQTTProtocol_handlePubrecs(pack, *sock);
- }
- else if (pack->header.bits.type == PUBREL)
- *rc = MQTTProtocol_handlePubrels(pack, *sock);
- else if (pack->header.bits.type == PINGRESP)
- *rc = MQTTProtocol_handlePingresps(pack, *sock);
- else
- freed = 0;
- if (freed)
- pack = NULL;
- }
- }
- MQTTClient_retry();
- Thread_unlock_mutex(mqttclient_mutex);
- FUNC_EXIT_RC(*rc);
- return pack;
- }
- static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, long timeout)
- {
- MQTTPacket* pack = NULL;
- MQTTClients* m = handle;
- START_TIME_TYPE start = MQTTClient_start_clock();
- FUNC_ENTRY;
- if (((MQTTClients*)handle) == NULL || timeout <= 0L)
- {
- *rc = MQTTCLIENT_FAILURE;
- goto exit;
- }
- if (running)
- {
- if (packet_type == CONNECT)
- {
- if ((*rc = Thread_wait_sem(m->connect_sem, timeout)) == 0)
- *rc = m->rc;
- }
- else if (packet_type == CONNACK)
- *rc = Thread_wait_sem(m->connack_sem, timeout);
- else if (packet_type == SUBACK)
- *rc = Thread_wait_sem(m->suback_sem, timeout);
- else if (packet_type == UNSUBACK)
- *rc = Thread_wait_sem(m->unsuback_sem, timeout);
- if (*rc == 0 && packet_type != CONNECT && m->pack == NULL)
- Log(LOG_ERROR, -1, "waitfor unexpectedly is NULL for client %s, packet_type %d, timeout %ld", m->c->clientID, packet_type, timeout);
- pack = m->pack;
- }
- else
- {
- *rc = TCPSOCKET_COMPLETE;
- while (1)
- {
- int sock = -1;
- pack = MQTTClient_cycle(&sock, 100L, rc);
- if (sock == m->c->net.socket)
- {
- if (*rc == SOCKET_ERROR)
- break;
- if (pack && (pack->header.bits.type == packet_type))
- break;
- if (m->c->connect_state == TCP_IN_PROGRESS)
- {
- int error;
- socklen_t len = sizeof(error);
- if ((*rc = getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len)) == 0)
- *rc = error;
- break;
- }
- #if defined(OPENSSL)
- else if (m->c->connect_state == SSL_IN_PROGRESS)
- {
- *rc = m->c->sslopts->struct_version >= 3 ?
- SSLSocket_connect(m->c->net.ssl, sock, m->serverURI,
- m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
- SSLSocket_connect(m->c->net.ssl, sock, m->serverURI,
- m->c->sslopts->verify, NULL, NULL);
- if (*rc == SSL_FATAL)
- break;
- else if (*rc == 1) /* rc == 1 means SSL connect has finished and succeeded */
- {
- if ((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
- m->c->session = SSL_get1_session(m->c->net.ssl);
- break;
- }
- }
- #endif
- else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS )
- {
- *rc = 1;
- break;
- }
- else if (m->c->connect_state == WAIT_FOR_CONNACK)
- {
- int error;
- socklen_t len = sizeof(error);
- if (getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len) == 0)
- {
- if (error)
- {
- *rc = error;
- break;
- }
- }
- }
- }
- if (MQTTClient_elapsed(start) > timeout)
- {
- pack = NULL;
- break;
- }
- }
- }
- exit:
- FUNC_EXIT_RC(*rc);
- return pack;
- }
- int MQTTClient_receive(MQTTClient handle, char** topicName, int* topicLen, MQTTClient_message** message,
- unsigned long timeout)
- {
- int rc = TCPSOCKET_COMPLETE;
- START_TIME_TYPE start = MQTTClient_start_clock();
- unsigned long elapsed = 0L;
- MQTTClients* m = handle;
- FUNC_ENTRY;
- if (m == NULL || m->c == NULL
- || running) /* receive is not meant to be called in a multi-thread environment */
- {
- rc = MQTTCLIENT_FAILURE;
- goto exit;
- }
- if (m->c->connected == 0)
- {
- rc = MQTTCLIENT_DISCONNECTED;
- goto exit;
- }
- *topicName = NULL;
- *message = NULL;
- /* if there is already a message waiting, don't hang around but still do some packet handling */
- if (m->c->messageQueue->count > 0)
- timeout = 0L;
- elapsed = MQTTClient_elapsed(start);
- do
- {
- int sock = 0;
- MQTTClient_cycle(&sock, (timeout > elapsed) ? timeout - elapsed : 0L, &rc);
- if (rc == SOCKET_ERROR)
- {
- if (ListFindItem(handles, &sock, clientSockCompare) && /* find client corresponding to socket */
- (MQTTClient)(handles->current->content) == handle)
- break; /* there was an error on the socket we are interested in */
- }
- elapsed = MQTTClient_elapsed(start);
- }
- while (elapsed < timeout && m->c->messageQueue->count == 0);
- if (m->c->messageQueue->count > 0)
- rc = MQTTClient_deliverMessage(rc, m, topicName, topicLen, message);
- if (rc == SOCKET_ERROR)
- MQTTClient_disconnect_internal(handle, 0);
- exit:
- FUNC_EXIT_RC(rc);
- return rc;
- }
- void MQTTClient_yield(void)
- {
- START_TIME_TYPE start = MQTTClient_start_clock();
- unsigned long elapsed = 0L;
- unsigned long timeout = 100L;
- int rc = 0;
- FUNC_ENTRY;
- if (running) /* yield is not meant to be called in a multi-thread environment */
- {
- MQTTClient_sleep(timeout);
- goto exit;
- }
- elapsed = MQTTClient_elapsed(start);
- do
- {
- int sock = -1;
- MQTTClient_cycle(&sock, (timeout > elapsed) ? timeout - elapsed : 0L, &rc);
- Thread_lock_mutex(mqttclient_mutex);
- if (rc == SOCKET_ERROR && ListFindItem(handles, &sock, clientSockCompare))
- {
- MQTTClients* m = (MQTTClient)(handles->current->content);
- if (m->c->connect_state != DISCONNECTING)
- MQTTClient_disconnect_internal(m, 0);
- }
- Thread_unlock_mutex(mqttclient_mutex);
- elapsed = MQTTClient_elapsed(start);
- }
- while (elapsed < timeout);
- exit:
- FUNC_EXIT;
- }
- /*
- static int pubCompare(void* a, void* b)
- {
- Messages* msg = (Messages*)a;
- return msg->publish == (Publications*)b;
- }*/
- int MQTTClient_waitForCompletion(MQTTClient handle, MQTTClient_deliveryToken mdt, unsigned long timeout)
- {
- int rc = MQTTCLIENT_FAILURE;
- START_TIME_TYPE start = MQTTClient_start_clock();
- unsigned long elapsed = 0L;
- MQTTClients* m = handle;
- FUNC_ENTRY;
- Thread_lock_mutex(mqttclient_mutex);
- if (m == NULL || m->c == NULL)
- {
- rc = MQTTCLIENT_FAILURE;
- goto exit;
- }
- elapsed = MQTTClient_elapsed(start);
- while (elapsed < timeout)
- {
- if (m->c->connected == 0)
- {
- rc = MQTTCLIENT_DISCONNECTED;
- goto exit;
- }
- if (ListFindItem(m->c->outboundMsgs, &mdt, messageIDCompare) == NULL)
- {
- rc = MQTTCLIENT_SUCCESS; /* well we couldn't find it */
- goto exit;
- }
- Thread_unlock_mutex(mqttclient_mutex);
- MQTTClient_yield();
- Thread_lock_mutex(mqttclient_mutex);
- elapsed = MQTTClient_elapsed(start);
- }
- exit:
- Thread_unlock_mutex(mqttclient_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTClient_getPendingDeliveryTokens(MQTTClient handle, MQTTClient_deliveryToken **tokens)
- {
- int rc = MQTTCLIENT_SUCCESS;
- MQTTClients* m = handle;
- *tokens = NULL;
- FUNC_ENTRY;
- Thread_lock_mutex(mqttclient_mutex);
- if (m == NULL)
- {
- rc = MQTTCLIENT_FAILURE;
- goto exit;
- }
- if (m->c && m->c->outboundMsgs->count > 0)
- {
- ListElement* current = NULL;
- int count = 0;
- *tokens = malloc(sizeof(MQTTClient_deliveryToken) * (m->c->outboundMsgs->count + 1));
- /*Heap_unlink(__FILE__, __LINE__, *tokens);*/
- while (ListNextElement(m->c->outboundMsgs, ¤t))
- {
- Messages* m = (Messages*)(current->content);
- (*tokens)[count++] = m->msgid;
- }
- (*tokens)[count] = -1;
- }
- exit:
- Thread_unlock_mutex(mqttclient_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- void MQTTClient_setTraceLevel(enum MQTTCLIENT_TRACE_LEVELS level)
- {
- Log_setTraceLevel((enum LOG_LEVELS)level);
- }
- void MQTTClient_setTraceCallback(MQTTClient_traceCallback* callback)
- {
- Log_setTraceCallback((Log_traceCallback*)callback);
- }
- MQTTClient_nameValue* MQTTClient_getVersionInfo(void)
- {
- #define MAX_INFO_STRINGS 8
- static MQTTClient_nameValue libinfo[MAX_INFO_STRINGS + 1];
- int i = 0;
- libinfo[i].name = "Product name";
- libinfo[i++].value = "Eclipse Paho Synchronous MQTT C Client Library";
- libinfo[i].name = "Version";
- libinfo[i++].value = CLIENT_VERSION;
- libinfo[i].name = "Build level";
- libinfo[i++].value = BUILD_TIMESTAMP;
- #if defined(OPENSSL)
- libinfo[i].name = "OpenSSL version";
- libinfo[i++].value = SSLeay_version(SSLEAY_VERSION);
- libinfo[i].name = "OpenSSL flags";
- libinfo[i++].value = SSLeay_version(SSLEAY_CFLAGS);
- libinfo[i].name = "OpenSSL build timestamp";
- libinfo[i++].value = SSLeay_version(SSLEAY_BUILT_ON);
- libinfo[i].name = "OpenSSL platform";
- libinfo[i++].value = SSLeay_version(SSLEAY_PLATFORM);
- libinfo[i].name = "OpenSSL directory";
- libinfo[i++].value = SSLeay_version(SSLEAY_DIR);
- #endif
- libinfo[i].name = NULL;
- libinfo[i].value = NULL;
- return libinfo;
- }
- const char* MQTTClient_strerror(int code)
- {
- static char buf[30];
- switch (code) {
- case MQTTCLIENT_SUCCESS:
- return "Success";
- case MQTTCLIENT_FAILURE:
- return "Failure";
- case MQTTCLIENT_DISCONNECTED:
- return "Disconnected";
- case MQTTCLIENT_MAX_MESSAGES_INFLIGHT:
- return "Maximum in-flight messages amount reached";
- case MQTTCLIENT_BAD_UTF8_STRING:
- return "Invalid UTF8 string";
- case MQTTCLIENT_NULL_PARAMETER:
- return "Invalid (NULL) parameter";
- case MQTTCLIENT_TOPICNAME_TRUNCATED:
- return "Topic containing NULL characters has been truncated";
- case MQTTCLIENT_BAD_STRUCTURE:
- return "Bad structure";
- case MQTTCLIENT_BAD_QOS:
- return "Invalid QoS value";
- case MQTTCLIENT_SSL_NOT_SUPPORTED:
- return "SSL is not supported";
- case MQTTCLIENT_BAD_PROTOCOL:
- return "Invalid protocol scheme";
- case MQTTCLIENT_BAD_MQTT_OPTION:
- return "Options for wrong MQTT version";
- case MQTTCLIENT_WRONG_MQTT_VERSION:
- return "Client created for another version of MQTT";
- }
- sprintf(buf, "Unknown error code %d", code);
- return buf;
- }
- /**
- * See if any pending writes have been completed, and cleanup if so.
- * Cleaning up means removing any publication data that was stored because the write did
- * not originally complete.
- */
- static void MQTTProtocol_checkPendingWrites(void)
- {
- FUNC_ENTRY;
- if (state.pending_writes.count > 0)
- {
- ListElement* le = state.pending_writes.first;
- while (le)
- {
- if (Socket_noPendingWrites(((pending_write*)(le->content))->socket))
- {
- MQTTProtocol_removePublication(((pending_write*)(le->content))->p);
- state.pending_writes.current = le;
- ListRemove(&(state.pending_writes), le->content); /* does NextElement itself */
- le = state.pending_writes.current;
- }
- else
- ListNextElement(&(state.pending_writes), &le);
- }
- }
- FUNC_EXIT;
- }
- static void MQTTClient_writeComplete(int socket, int rc)
- {
- ListElement* found = NULL;
- FUNC_ENTRY;
- /* a partial write is now complete for a socket - this will be on a publish*/
- MQTTProtocol_checkPendingWrites();
- /* find the client using this socket */
- if ((found = ListFindItem(handles, &socket, clientSockCompare)) != NULL)
- {
- MQTTClients* m = (MQTTClients*)(found->content);
- time(&(m->c->net.lastSent));
- }
- FUNC_EXIT;
- }
|