MQTTClient.c 74 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783
  1. /*******************************************************************************
  2. * Copyright (c) 2009, 2019 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * http://www.eclipse.org/legal/epl-v10.html
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial API and implementation and/or initial documentation
  15. * Ian Craggs - bug 384016 - segv setting will message
  16. * Ian Craggs - bug 384053 - v1.0.0.7 - stop MQTTClient_receive on socket error
  17. * Ian Craggs, Allan Stockdill-Mander - add ability to connect with SSL
  18. * Ian Craggs - multiple server connection support
  19. * Ian Craggs - fix for bug 413429 - connectionLost not called
  20. * Ian Craggs - fix for bug 421103 - trying to write to same socket, in publish/retries
  21. * Ian Craggs - fix for bug 419233 - mutexes not reporting errors
  22. * Ian Craggs - fix for bug 420851
  23. * Ian Craggs - fix for bug 432903 - queue persistence
  24. * Ian Craggs - MQTT 3.1.1 support
  25. * Ian Craggs - fix for bug 438176 - MQTT version selection
  26. * Rong Xiang, Ian Craggs - C++ compatibility
  27. * Ian Craggs - fix for bug 443724 - stack corruption
  28. * Ian Craggs - fix for bug 447672 - simultaneous access to socket structure
  29. * Ian Craggs - fix for bug 459791 - deadlock in WaitForCompletion for bad client
  30. * Ian Craggs - fix for bug 474905 - insufficient synchronization for subscribe, unsubscribe, connect
  31. * Ian Craggs - make it clear that yield and receive are not intended for multi-threaded mode (bug 474748)
  32. * Ian Craggs - SNI support, message queue unpersist bug
  33. * Ian Craggs - binary will message support
  34. * Ian Craggs - waitforCompletion fix #240
  35. * Ian Craggs - check for NULL SSL options #334
  36. * Ian Craggs - allocate username/password buffers #431
  37. * Ian Craggs - MQTT 5.0 support
  38. *******************************************************************************/
  39. /**
  40. * @file
  41. * \brief Synchronous API implementation
  42. *
  43. */
  44. #define _GNU_SOURCE /* for pthread_mutexattr_settype */
  45. #include <stdlib.h>
  46. #include <string.h>
  47. #if !defined(WIN32) && !defined(WIN64)
  48. #include <sys/time.h>
  49. #endif
  50. #include "MQTTClient.h"
  51. #if !defined(NO_PERSISTENCE)
  52. #include "MQTTPersistence.h"
  53. #endif
  54. #include "utf-8.h"
  55. #include "MQTTProtocol.h"
  56. #include "MQTTProtocolOut.h"
  57. #include "Thread.h"
  58. #include "SocketBuffer.h"
  59. #include "StackTrace.h"
  60. #include "Heap.h"
  61. #if defined(OPENSSL)
  62. #include <openssl/ssl.h>
  63. #else
  64. #define URI_SSL "ssl://"
  65. #endif
  66. #include "OsWrapper.h"
  67. #define URI_TCP "tcp://"
  68. #define URI_WS "ws://"
  69. #define URI_WSS "wss://"
  70. #include "VersionInfo.h"
  71. #include "WebSocket.h"
  72. const char *client_timestamp_eye = "MQTTClientV3_Timestamp " BUILD_TIMESTAMP;
  73. const char *client_version_eye = "MQTTClientV3_Version " CLIENT_VERSION;
  74. void MQTTClient_global_init(MQTTClient_init_options* inits)
  75. {
  76. #if defined(OPENSSL)
  77. SSLSocket_handleOpensslInit(inits->do_openssl_init);
  78. #endif
  79. }
  80. static ClientStates ClientState =
  81. {
  82. CLIENT_VERSION, /* version */
  83. NULL /* client list */
  84. };
  85. ClientStates* bstate = &ClientState;
  86. MQTTProtocol state;
  87. #if defined(WIN32) || defined(WIN64)
  88. static mutex_type mqttclient_mutex = NULL;
  89. static mutex_type socket_mutex = NULL;
  90. static mutex_type subscribe_mutex = NULL;
  91. static mutex_type unsubscribe_mutex = NULL;
  92. static mutex_type connect_mutex = NULL;
  93. extern mutex_type stack_mutex;
  94. extern mutex_type heap_mutex;
  95. extern mutex_type log_mutex;
  96. BOOL APIENTRY DllMain(HANDLE hModule,
  97. DWORD ul_reason_for_call,
  98. LPVOID lpReserved)
  99. {
  100. switch (ul_reason_for_call)
  101. {
  102. case DLL_PROCESS_ATTACH:
  103. Log(TRACE_MAX, -1, "DLL process attach");
  104. if (mqttclient_mutex == NULL)
  105. {
  106. mqttclient_mutex = CreateMutex(NULL, 0, NULL);
  107. subscribe_mutex = CreateMutex(NULL, 0, NULL);
  108. unsubscribe_mutex = CreateMutex(NULL, 0, NULL);
  109. connect_mutex = CreateMutex(NULL, 0, NULL);
  110. stack_mutex = CreateMutex(NULL, 0, NULL);
  111. heap_mutex = CreateMutex(NULL, 0, NULL);
  112. log_mutex = CreateMutex(NULL, 0, NULL);
  113. socket_mutex = CreateMutex(NULL, 0, NULL);
  114. }
  115. case DLL_THREAD_ATTACH:
  116. Log(TRACE_MAX, -1, "DLL thread attach");
  117. case DLL_THREAD_DETACH:
  118. Log(TRACE_MAX, -1, "DLL thread detach");
  119. case DLL_PROCESS_DETACH:
  120. Log(TRACE_MAX, -1, "DLL process detach");
  121. }
  122. return TRUE;
  123. }
  124. #else
  125. static pthread_mutex_t mqttclient_mutex_store = PTHREAD_MUTEX_INITIALIZER;
  126. static mutex_type mqttclient_mutex = &mqttclient_mutex_store;
  127. static pthread_mutex_t socket_mutex_store = PTHREAD_MUTEX_INITIALIZER;
  128. static mutex_type socket_mutex = &socket_mutex_store;
  129. static pthread_mutex_t subscribe_mutex_store = PTHREAD_MUTEX_INITIALIZER;
  130. static mutex_type subscribe_mutex = &subscribe_mutex_store;
  131. static pthread_mutex_t unsubscribe_mutex_store = PTHREAD_MUTEX_INITIALIZER;
  132. static mutex_type unsubscribe_mutex = &unsubscribe_mutex_store;
  133. static pthread_mutex_t connect_mutex_store = PTHREAD_MUTEX_INITIALIZER;
  134. static mutex_type connect_mutex = &connect_mutex_store;
  135. void MQTTClient_init(void)
  136. {
  137. pthread_mutexattr_t attr;
  138. int rc;
  139. pthread_mutexattr_init(&attr);
  140. #if !defined(_WRS_KERNEL)
  141. pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
  142. #else
  143. /* #warning "no pthread_mutexattr_settype" */
  144. #endif /* !defined(_WRS_KERNEL) */
  145. if ((rc = pthread_mutex_init(mqttclient_mutex, &attr)) != 0)
  146. printf("MQTTClient: error %d initializing client_mutex\n", rc);
  147. if ((rc = pthread_mutex_init(socket_mutex, &attr)) != 0)
  148. printf("MQTTClient: error %d initializing socket_mutex\n", rc);
  149. if ((rc = pthread_mutex_init(subscribe_mutex, &attr)) != 0)
  150. printf("MQTTClient: error %d initializing subscribe_mutex\n", rc);
  151. if ((rc = pthread_mutex_init(unsubscribe_mutex, &attr)) != 0)
  152. printf("MQTTClient: error %d initializing unsubscribe_mutex\n", rc);
  153. if ((rc = pthread_mutex_init(connect_mutex, &attr)) != 0)
  154. printf("MQTTClient: error %d initializing connect_mutex\n", rc);
  155. }
  156. #define WINAPI
  157. #endif
  158. static volatile int library_initialized = 0;
  159. static List* handles = NULL;
  160. static int running = 0;
  161. static int tostop = 0;
  162. static thread_id_type run_id = 0;
  163. typedef struct
  164. {
  165. MQTTClient_message* msg;
  166. char* topicName;
  167. int topicLen;
  168. unsigned int seqno; /* only used on restore */
  169. } qEntry;
  170. typedef struct
  171. {
  172. char* serverURI;
  173. #if defined(OPENSSL)
  174. int ssl;
  175. #endif
  176. int websocket;
  177. Clients* c;
  178. MQTTClient_connectionLost* cl;
  179. MQTTClient_messageArrived* ma;
  180. MQTTClient_deliveryComplete* dc;
  181. void* context;
  182. MQTTClient_disconnected* disconnected;
  183. void* disconnected_context; /* the context to be associated with the disconnected callback*/
  184. MQTTClient_published* published;
  185. void* published_context; /* the context to be associated with the disconnected callback*/
  186. #if 0
  187. MQTTClient_authHandle* auth_handle;
  188. void* auth_handle_context; /* the context to be associated with the authHandle callback*/
  189. #endif
  190. sem_type connect_sem;
  191. int rc; /* getsockopt return code in connect */
  192. sem_type connack_sem;
  193. sem_type suback_sem;
  194. sem_type unsuback_sem;
  195. MQTTPacket* pack;
  196. } MQTTClients;
  197. struct props_rc_parms
  198. {
  199. MQTTClients* m;
  200. MQTTProperties* properties;
  201. enum MQTTReasonCodes reasonCode;
  202. };
  203. void MQTTClient_sleep(long milliseconds)
  204. {
  205. FUNC_ENTRY;
  206. #if defined(WIN32) || defined(WIN64)
  207. Sleep(milliseconds);
  208. #else
  209. usleep(milliseconds*1000);
  210. #endif
  211. FUNC_EXIT;
  212. }
  213. #if defined(WIN32) || defined(WIN64)
  214. #define START_TIME_TYPE DWORD
  215. START_TIME_TYPE MQTTClient_start_clock(void)
  216. {
  217. return GetTickCount();
  218. }
  219. #elif defined(AIX)
  220. #define START_TIME_TYPE struct timespec
  221. START_TIME_TYPE MQTTClient_start_clock(void)
  222. {
  223. static struct timespec start;
  224. clock_gettime(CLOCK_MONOTONIC, &start);
  225. return start;
  226. }
  227. #else
  228. #define START_TIME_TYPE struct timeval
  229. START_TIME_TYPE MQTTClient_start_clock(void)
  230. {
  231. static struct timeval start;
  232. static struct timespec start_ts;
  233. clock_gettime(CLOCK_MONOTONIC, &start_ts);
  234. start.tv_sec = start_ts.tv_sec;
  235. start.tv_usec = start_ts.tv_nsec / 1000;
  236. return start;
  237. }
  238. #endif
  239. #if defined(WIN32) || defined(WIN64)
  240. long MQTTClient_elapsed(DWORD milliseconds)
  241. {
  242. return GetTickCount() - milliseconds;
  243. }
  244. #elif defined(AIX)
  245. #define assert(a)
  246. long MQTTClient_elapsed(struct timespec start)
  247. {
  248. struct timespec now, res;
  249. clock_gettime(CLOCK_MONOTONIC, &now);
  250. ntimersub(now, start, res);
  251. return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
  252. }
  253. #else
  254. long MQTTClient_elapsed(struct timeval start)
  255. {
  256. struct timeval now, res;
  257. static struct timespec now_ts;
  258. clock_gettime(CLOCK_MONOTONIC, &now_ts);
  259. now.tv_sec = now_ts.tv_sec;
  260. now.tv_usec = now_ts.tv_nsec / 1000;
  261. timersub(&now, &start, &res);
  262. return (res.tv_sec)*1000 + (res.tv_usec)/1000;
  263. }
  264. #endif
  265. static void MQTTClient_terminate(void);
  266. static void MQTTClient_emptyMessageQueue(Clients* client);
  267. static int MQTTClient_deliverMessage(
  268. int rc, MQTTClients* m,
  269. char** topicName, int* topicLen,
  270. MQTTClient_message** message);
  271. static int clientSockCompare(void* a, void* b);
  272. static thread_return_type WINAPI connectionLost_call(void* context);
  273. static thread_return_type WINAPI MQTTClient_run(void* n);
  274. static void MQTTClient_stop(void);
  275. static void MQTTClient_closeSession(Clients* client, enum MQTTReasonCodes reason, MQTTProperties* props);
  276. static int MQTTClient_cleanSession(Clients* client);
  277. static MQTTResponse MQTTClient_connectURIVersion(
  278. MQTTClient handle, MQTTClient_connectOptions* options,
  279. const char* serverURI, int MQTTVersion,
  280. START_TIME_TYPE start, long millisecsTimeout,
  281. MQTTProperties* connectProperties, MQTTProperties* willProperties);
  282. static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI,
  283. MQTTProperties* connectProperties, MQTTProperties* willProperties);
  284. static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int internal, int stop, enum MQTTReasonCodes, MQTTProperties*);
  285. static int MQTTClient_disconnect_internal(MQTTClient handle, int timeout);
  286. static void MQTTClient_retry(void);
  287. static MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc);
  288. static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, long timeout);
  289. /*static int pubCompare(void* a, void* b); */
  290. static void MQTTProtocol_checkPendingWrites(void);
  291. static void MQTTClient_writeComplete(int socket, int rc);
  292. int MQTTClient_createWithOptions(MQTTClient* handle, const char* serverURI, const char* clientId,
  293. int persistence_type, void* persistence_context, MQTTClient_createOptions* options)
  294. {
  295. int rc = 0;
  296. MQTTClients *m = NULL;
  297. FUNC_ENTRY;
  298. rc = Thread_lock_mutex(mqttclient_mutex);
  299. if (serverURI == NULL || clientId == NULL)
  300. {
  301. rc = MQTTCLIENT_NULL_PARAMETER;
  302. goto exit;
  303. }
  304. if (!UTF8_validateString(clientId))
  305. {
  306. rc = MQTTCLIENT_BAD_UTF8_STRING;
  307. goto exit;
  308. }
  309. if (strstr(serverURI, "://") != NULL)
  310. {
  311. if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) != 0
  312. && strncmp(URI_WS, serverURI, strlen(URI_WS)) != 0
  313. #if defined(OPENSSL)
  314. && strncmp(URI_SSL, serverURI, strlen(URI_SSL)) != 0
  315. && strncmp(URI_WSS, serverURI, strlen(URI_WSS)) != 0
  316. #endif
  317. )
  318. {
  319. rc = MQTTCLIENT_BAD_PROTOCOL;
  320. goto exit;
  321. }
  322. }
  323. if (options && (strncmp(options->struct_id, "MQCO", 4) != 0 || options->struct_version != 0))
  324. {
  325. rc = MQTTCLIENT_BAD_STRUCTURE;
  326. goto exit;
  327. }
  328. if (!library_initialized)
  329. {
  330. #if defined(HEAP_H)
  331. Heap_initialize();
  332. #endif
  333. Log_initialize((Log_nameValue*)MQTTClient_getVersionInfo());
  334. bstate->clients = ListInitialize();
  335. Socket_outInitialize();
  336. Socket_setWriteCompleteCallback(MQTTClient_writeComplete);
  337. handles = ListInitialize();
  338. #if defined(OPENSSL)
  339. SSLSocket_initialize();
  340. #endif
  341. library_initialized = 1;
  342. }
  343. m = malloc(sizeof(MQTTClients));
  344. *handle = m;
  345. memset(m, '\0', sizeof(MQTTClients));
  346. if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
  347. serverURI += strlen(URI_TCP);
  348. else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
  349. {
  350. serverURI += strlen(URI_WS);
  351. m->websocket = 1;
  352. }
  353. else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
  354. {
  355. #if defined(OPENSSL)
  356. serverURI += strlen(URI_SSL);
  357. m->ssl = 1;
  358. #else
  359. rc = MQTTCLIENT_SSL_NOT_SUPPORTED;
  360. goto exit;
  361. #endif
  362. }
  363. else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
  364. {
  365. #if defined(OPENSSL)
  366. serverURI += strlen(URI_WSS);
  367. m->ssl = 1;
  368. m->websocket = 1;
  369. #else
  370. rc = MQTTCLIENT_SSL_NOT_SUPPORTED;
  371. goto exit;
  372. #endif
  373. }
  374. m->serverURI = MQTTStrdup(serverURI);
  375. ListAppend(handles, m, sizeof(MQTTClients));
  376. m->c = malloc(sizeof(Clients));
  377. memset(m->c, '\0', sizeof(Clients));
  378. m->c->context = m;
  379. m->c->MQTTVersion = (options) ? options->MQTTVersion : MQTTVERSION_DEFAULT;
  380. m->c->outboundMsgs = ListInitialize();
  381. m->c->inboundMsgs = ListInitialize();
  382. m->c->messageQueue = ListInitialize();
  383. m->c->clientID = MQTTStrdup(clientId);
  384. m->connect_sem = Thread_create_sem();
  385. m->connack_sem = Thread_create_sem();
  386. m->suback_sem = Thread_create_sem();
  387. m->unsuback_sem = Thread_create_sem();
  388. #if !defined(NO_PERSISTENCE)
  389. rc = MQTTPersistence_create(&(m->c->persistence), persistence_type, persistence_context);
  390. if (rc == 0)
  391. {
  392. rc = MQTTPersistence_initialize(m->c, m->serverURI);
  393. if (rc == 0)
  394. MQTTPersistence_restoreMessageQueue(m->c);
  395. }
  396. #endif
  397. ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List));
  398. exit:
  399. Thread_unlock_mutex(mqttclient_mutex);
  400. FUNC_EXIT_RC(rc);
  401. return rc;
  402. }
  403. int MQTTClient_create(MQTTClient* handle, const char* serverURI, const char* clientId,
  404. int persistence_type, void* persistence_context)
  405. {
  406. return MQTTClient_createWithOptions(handle, serverURI, clientId, persistence_type,
  407. persistence_context, NULL);
  408. }
  409. static void MQTTClient_terminate(void)
  410. {
  411. FUNC_ENTRY;
  412. MQTTClient_stop();
  413. if (library_initialized)
  414. {
  415. ListFree(bstate->clients);
  416. ListFree(handles);
  417. handles = NULL;
  418. WebSocket_terminate();
  419. #if defined(HEAP_H)
  420. Heap_terminate();
  421. #endif
  422. Log_terminate();
  423. library_initialized = 0;
  424. }
  425. FUNC_EXIT;
  426. }
  427. static void MQTTClient_emptyMessageQueue(Clients* client)
  428. {
  429. FUNC_ENTRY;
  430. /* empty message queue */
  431. if (client->messageQueue->count > 0)
  432. {
  433. ListElement* current = NULL;
  434. while (ListNextElement(client->messageQueue, &current))
  435. {
  436. qEntry* qe = (qEntry*)(current->content);
  437. free(qe->topicName);
  438. MQTTProperties_free(&qe->msg->properties);
  439. free(qe->msg->payload);
  440. free(qe->msg);
  441. }
  442. ListEmpty(client->messageQueue);
  443. }
  444. FUNC_EXIT;
  445. }
  446. void MQTTClient_destroy(MQTTClient* handle)
  447. {
  448. MQTTClients* m = *handle;
  449. FUNC_ENTRY;
  450. Thread_lock_mutex(mqttclient_mutex);
  451. if (m == NULL)
  452. goto exit;
  453. if (m->c)
  454. {
  455. int saved_socket = m->c->net.socket;
  456. char* saved_clientid = MQTTStrdup(m->c->clientID);
  457. #if !defined(NO_PERSISTENCE)
  458. MQTTPersistence_close(m->c);
  459. #endif
  460. MQTTClient_emptyMessageQueue(m->c);
  461. MQTTProtocol_freeClient(m->c);
  462. if (!ListRemove(bstate->clients, m->c))
  463. Log(LOG_ERROR, 0, NULL);
  464. else
  465. Log(TRACE_MIN, 1, NULL, saved_clientid, saved_socket);
  466. free(saved_clientid);
  467. }
  468. if (m->serverURI)
  469. free(m->serverURI);
  470. Thread_destroy_sem(m->connect_sem);
  471. Thread_destroy_sem(m->connack_sem);
  472. Thread_destroy_sem(m->suback_sem);
  473. Thread_destroy_sem(m->unsuback_sem);
  474. if (!ListRemove(handles, m))
  475. Log(LOG_ERROR, -1, "free error");
  476. *handle = NULL;
  477. if (bstate->clients->count == 0)
  478. MQTTClient_terminate();
  479. exit:
  480. Thread_unlock_mutex(mqttclient_mutex);
  481. FUNC_EXIT;
  482. }
  483. void MQTTClient_freeMessage(MQTTClient_message** message)
  484. {
  485. FUNC_ENTRY;
  486. MQTTProperties_free(&(*message)->properties);
  487. free((*message)->payload);
  488. free(*message);
  489. *message = NULL;
  490. FUNC_EXIT;
  491. }
  492. void MQTTClient_free(void* memory)
  493. {
  494. FUNC_ENTRY;
  495. free(memory);
  496. FUNC_EXIT;
  497. }
  498. DLLExport void MQTTResponse_free(MQTTResponse response)
  499. {
  500. FUNC_ENTRY;
  501. if (response.properties)
  502. {
  503. if (response.reasonCodeCount > 0 && response.reasonCodes)
  504. free(response.reasonCodes);
  505. MQTTProperties_free(response.properties);
  506. free(response.properties);
  507. }
  508. FUNC_EXIT;
  509. }
  510. static int MQTTClient_deliverMessage(int rc, MQTTClients* m, char** topicName, int* topicLen, MQTTClient_message** message)
  511. {
  512. qEntry* qe = (qEntry*)(m->c->messageQueue->first->content);
  513. FUNC_ENTRY;
  514. *message = qe->msg;
  515. *topicName = qe->topicName;
  516. *topicLen = qe->topicLen;
  517. if (strlen(*topicName) != *topicLen)
  518. rc = MQTTCLIENT_TOPICNAME_TRUNCATED;
  519. #if !defined(NO_PERSISTENCE)
  520. if (m->c->persistence)
  521. MQTTPersistence_unpersistQueueEntry(m->c, (MQTTPersistence_qEntry*)qe);
  522. #endif
  523. ListRemove(m->c->messageQueue, m->c->messageQueue->first->content);
  524. FUNC_EXIT_RC(rc);
  525. return rc;
  526. }
  527. /**
  528. * List callback function for comparing clients by socket
  529. * @param a first integer value
  530. * @param b second integer value
  531. * @return boolean indicating whether a and b are equal
  532. */
  533. static int clientSockCompare(void* a, void* b)
  534. {
  535. MQTTClients* m = (MQTTClients*)a;
  536. return m->c->net.socket == *(int*)b;
  537. }
  538. /**
  539. * Wrapper function to call connection lost on a separate thread. A separate thread is needed to allow the
  540. * connectionLost function to make API calls (e.g. connect)
  541. * @param context a pointer to the relevant client
  542. * @return thread_return_type standard thread return value - not used here
  543. */
  544. static thread_return_type WINAPI connectionLost_call(void* context)
  545. {
  546. MQTTClients* m = (MQTTClients*)context;
  547. (*(m->cl))(m->context, NULL);
  548. return 0;
  549. }
  550. int MQTTClient_setDisconnected(MQTTClient handle, void* context, MQTTClient_disconnected* disconnected)
  551. {
  552. int rc = MQTTCLIENT_SUCCESS;
  553. MQTTClients* m = handle;
  554. FUNC_ENTRY;
  555. Thread_lock_mutex(mqttclient_mutex);
  556. if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
  557. rc = MQTTCLIENT_FAILURE;
  558. else
  559. {
  560. m->disconnected_context = context;
  561. m->disconnected = disconnected;
  562. }
  563. Thread_unlock_mutex(mqttclient_mutex);
  564. FUNC_EXIT_RC(rc);
  565. return rc;
  566. }
  567. /**
  568. * Wrapper function to call disconnected on a separate thread. A separate thread is needed to allow the
  569. * disconnected function to make API calls (e.g. connect)
  570. * @param context a pointer to the relevant client
  571. * @return thread_return_type standard thread return value - not used here
  572. */
  573. static thread_return_type WINAPI call_disconnected(void* context)
  574. {
  575. struct props_rc_parms* pr = (struct props_rc_parms*)context;
  576. (*(pr->m->disconnected))(pr->m->disconnected_context, pr->properties, pr->reasonCode);
  577. MQTTProperties_free(pr->properties);
  578. free(pr->properties);
  579. free(pr);
  580. return 0;
  581. }
  582. int MQTTClient_setPublished(MQTTClient handle, void* context, MQTTClient_published* published)
  583. {
  584. int rc = MQTTCLIENT_SUCCESS;
  585. MQTTClients* m = handle;
  586. FUNC_ENTRY;
  587. Thread_lock_mutex(mqttclient_mutex);
  588. if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
  589. rc = MQTTCLIENT_FAILURE;
  590. else
  591. {
  592. m->published_context = context;
  593. m->published = published;
  594. }
  595. Thread_unlock_mutex(mqttclient_mutex);
  596. FUNC_EXIT_RC(rc);
  597. return rc;
  598. }
  599. #if 0
  600. int MQTTClient_setHandleAuth(MQTTClient handle, void* context, MQTTClient_handleAuth* auth_handle)
  601. {
  602. int rc = MQTTCLIENT_SUCCESS;
  603. MQTTClients* m = handle;
  604. FUNC_ENTRY;
  605. Thread_lock_mutex(mqttclient_mutex);
  606. if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
  607. rc = MQTTCLIENT_FAILURE;
  608. else
  609. {
  610. m->auth_handle_context = context;
  611. m->auth_handle = auth_handle;
  612. }
  613. Thread_unlock_mutex(mqttclient_mutex);
  614. FUNC_EXIT_RC(rc);
  615. return rc;
  616. }
  617. /**
  618. * Wrapper function to call authHandle on a separate thread. A separate thread is needed to allow the
  619. * disconnected function to make API calls (e.g. MQTTClient_auth)
  620. * @param context a pointer to the relevant client
  621. * @return thread_return_type standard thread return value - not used here
  622. */
  623. static thread_return_type WINAPI call_auth_handle(void* context)
  624. {
  625. struct props_rc_parms* pr = (struct props_rc_parms*)context;
  626. (*(pr->m->auth_handle))(pr->m->auth_handle_context, pr->properties, pr->reasonCode);
  627. MQTTProperties_free(pr->properties);
  628. free(pr->properties);
  629. free(pr);
  630. return 0;
  631. }
  632. #endif
  633. /* This is the thread function that handles the calling of callback functions if set */
  634. static thread_return_type WINAPI MQTTClient_run(void* n)
  635. {
  636. long timeout = 10L; /* first time in we have a small timeout. Gets things started more quickly */
  637. FUNC_ENTRY;
  638. running = 1;
  639. run_id = Thread_getid();
  640. Thread_lock_mutex(mqttclient_mutex);
  641. while (!tostop)
  642. {
  643. int rc = SOCKET_ERROR;
  644. int sock = -1;
  645. MQTTClients* m = NULL;
  646. MQTTPacket* pack = NULL;
  647. Thread_unlock_mutex(mqttclient_mutex);
  648. pack = MQTTClient_cycle(&sock, timeout, &rc);
  649. Thread_lock_mutex(mqttclient_mutex);
  650. if (tostop)
  651. break;
  652. timeout = 1000L;
  653. /* find client corresponding to socket */
  654. if (ListFindItem(handles, &sock, clientSockCompare) == NULL)
  655. {
  656. /* assert: should not happen */
  657. continue;
  658. }
  659. m = (MQTTClient)(handles->current->content);
  660. if (m == NULL)
  661. {
  662. /* assert: should not happen */
  663. continue;
  664. }
  665. if (rc == SOCKET_ERROR)
  666. {
  667. if (m->c->connected)
  668. MQTTClient_disconnect_internal(m, 0);
  669. else
  670. {
  671. if (m->c->connect_state == SSL_IN_PROGRESS)
  672. {
  673. Log(TRACE_MIN, -1, "Posting connect semaphore for client %s", m->c->clientID);
  674. m->c->connect_state = NOT_IN_PROGRESS;
  675. Thread_post_sem(m->connect_sem);
  676. }
  677. if (m->c->connect_state == WAIT_FOR_CONNACK)
  678. {
  679. Log(TRACE_MIN, -1, "Posting connack semaphore for client %s", m->c->clientID);
  680. m->c->connect_state = NOT_IN_PROGRESS;
  681. Thread_post_sem(m->connack_sem);
  682. }
  683. }
  684. }
  685. else
  686. {
  687. if (m->c->messageQueue->count > 0)
  688. {
  689. qEntry* qe = (qEntry*)(m->c->messageQueue->first->content);
  690. int topicLen = qe->topicLen;
  691. if (strlen(qe->topicName) == topicLen)
  692. topicLen = 0;
  693. Log(TRACE_MIN, -1, "Calling messageArrived for client %s, queue depth %d",
  694. m->c->clientID, m->c->messageQueue->count);
  695. Thread_unlock_mutex(mqttclient_mutex);
  696. rc = (*(m->ma))(m->context, qe->topicName, topicLen, qe->msg);
  697. Thread_lock_mutex(mqttclient_mutex);
  698. /* if 0 (false) is returned by the callback then it failed, so we don't remove the message from
  699. * the queue, and it will be retried later. If 1 is returned then the message data may have been freed,
  700. * so we must be careful how we use it.
  701. */
  702. if (rc)
  703. {
  704. #if !defined(NO_PERSISTENCE)
  705. if (m->c->persistence)
  706. MQTTPersistence_unpersistQueueEntry(m->c, (MQTTPersistence_qEntry*)qe);
  707. #endif
  708. ListRemove(m->c->messageQueue, qe);
  709. }
  710. else
  711. Log(TRACE_MIN, -1, "False returned from messageArrived for client %s, message remains on queue",
  712. m->c->clientID);
  713. }
  714. if (pack)
  715. {
  716. if (pack->header.bits.type == CONNACK)
  717. {
  718. Log(TRACE_MIN, -1, "Posting connack semaphore for client %s", m->c->clientID);
  719. m->pack = pack;
  720. Thread_post_sem(m->connack_sem);
  721. }
  722. else if (pack->header.bits.type == SUBACK)
  723. {
  724. Log(TRACE_MIN, -1, "Posting suback semaphore for client %s", m->c->clientID);
  725. m->pack = pack;
  726. Thread_post_sem(m->suback_sem);
  727. }
  728. else if (pack->header.bits.type == UNSUBACK)
  729. {
  730. Log(TRACE_MIN, -1, "Posting unsuback semaphore for client %s", m->c->clientID);
  731. m->pack = pack;
  732. Thread_post_sem(m->unsuback_sem);
  733. }
  734. else if (m->c->MQTTVersion >= MQTTVERSION_5)
  735. {
  736. if (pack->header.bits.type == DISCONNECT && m->disconnected)
  737. {
  738. struct props_rc_parms* dp;
  739. Ack* disc = (Ack*)pack;
  740. dp = malloc(sizeof(struct props_rc_parms));
  741. dp->m = m;
  742. dp->reasonCode = disc->rc;
  743. dp->properties = malloc(sizeof(MQTTProperties));
  744. *(dp->properties) = disc->properties;
  745. free(disc);
  746. MQTTClient_disconnect1(m, 10, 0, 1, MQTTREASONCODE_SUCCESS, NULL);
  747. Log(TRACE_MIN, -1, "Calling disconnected for client %s", m->c->clientID);
  748. Thread_start(call_disconnected, dp);
  749. }
  750. #if 0
  751. if (pack->header.bits.type == AUTH && m->auth_handle)
  752. {
  753. struct props_rc_parms dp;
  754. Ack* disc = (Ack*)pack;
  755. dp.m = m;
  756. dp.properties = &disc->properties;
  757. dp.reasonCode = disc->rc;
  758. free(pack);
  759. Log(TRACE_MIN, -1, "Calling auth_handle for client %s", m->c->clientID);
  760. Thread_start(call_auth_handle, &dp);
  761. }
  762. #endif
  763. }
  764. }
  765. else if (m->c->connect_state == TCP_IN_PROGRESS)
  766. {
  767. int error;
  768. socklen_t len = sizeof(error);
  769. if ((m->rc = getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len)) == 0)
  770. m->rc = error;
  771. Log(TRACE_MIN, -1, "Posting connect semaphore for client %s rc %d", m->c->clientID, m->rc);
  772. m->c->connect_state = NOT_IN_PROGRESS;
  773. Thread_post_sem(m->connect_sem);
  774. }
  775. #if defined(OPENSSL)
  776. else if (m->c->connect_state == SSL_IN_PROGRESS)
  777. {
  778. rc = m->c->sslopts->struct_version >= 3 ?
  779. SSLSocket_connect(m->c->net.ssl, m->c->net.socket, m->serverURI,
  780. m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
  781. SSLSocket_connect(m->c->net.ssl, m->c->net.socket, m->serverURI,
  782. m->c->sslopts->verify, NULL, NULL);
  783. if (rc == 1 || rc == SSL_FATAL)
  784. {
  785. if (rc == 1 && (m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
  786. m->c->session = SSL_get1_session(m->c->net.ssl);
  787. m->rc = rc;
  788. Log(TRACE_MIN, -1, "Posting connect semaphore for SSL client %s rc %d", m->c->clientID, m->rc);
  789. m->c->connect_state = NOT_IN_PROGRESS;
  790. Thread_post_sem(m->connect_sem);
  791. }
  792. }
  793. #endif
  794. else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS)
  795. {
  796. Log(TRACE_MIN, -1, "Posting websocket handshake for client %s rc %d", m->c->clientID, m->rc);
  797. m->c->connect_state = WAIT_FOR_CONNACK;
  798. Thread_post_sem(m->connect_sem);
  799. }
  800. }
  801. }
  802. run_id = 0;
  803. running = tostop = 0;
  804. Thread_unlock_mutex(mqttclient_mutex);
  805. FUNC_EXIT;
  806. return 0;
  807. }
  808. static void MQTTClient_stop(void)
  809. {
  810. int rc = 0;
  811. FUNC_ENTRY;
  812. if (running == 1 && tostop == 0)
  813. {
  814. int conn_count = 0;
  815. ListElement* current = NULL;
  816. if (handles != NULL)
  817. {
  818. /* find out how many handles are still connected */
  819. while (ListNextElement(handles, &current))
  820. {
  821. if (((MQTTClients*)(current->content))->c->connect_state > NOT_IN_PROGRESS ||
  822. ((MQTTClients*)(current->content))->c->connected)
  823. ++conn_count;
  824. }
  825. }
  826. Log(TRACE_MIN, -1, "Conn_count is %d", conn_count);
  827. /* stop the background thread, if we are the last one to be using it */
  828. if (conn_count == 0)
  829. {
  830. int count = 0;
  831. tostop = 1;
  832. if (Thread_getid() != run_id)
  833. {
  834. while (running && ++count < 100)
  835. {
  836. Thread_unlock_mutex(mqttclient_mutex);
  837. Log(TRACE_MIN, -1, "sleeping");
  838. MQTTClient_sleep(100L);
  839. Thread_lock_mutex(mqttclient_mutex);
  840. }
  841. }
  842. rc = 1;
  843. }
  844. }
  845. FUNC_EXIT_RC(rc);
  846. }
  847. int MQTTClient_setCallbacks(MQTTClient handle, void* context, MQTTClient_connectionLost* cl,
  848. MQTTClient_messageArrived* ma, MQTTClient_deliveryComplete* dc)
  849. {
  850. int rc = MQTTCLIENT_SUCCESS;
  851. MQTTClients* m = handle;
  852. FUNC_ENTRY;
  853. Thread_lock_mutex(mqttclient_mutex);
  854. if (m == NULL || ma == NULL || m->c->connect_state != NOT_IN_PROGRESS)
  855. rc = MQTTCLIENT_FAILURE;
  856. else
  857. {
  858. m->context = context;
  859. m->cl = cl;
  860. m->ma = ma;
  861. m->dc = dc;
  862. }
  863. Thread_unlock_mutex(mqttclient_mutex);
  864. FUNC_EXIT_RC(rc);
  865. return rc;
  866. }
  867. static void MQTTClient_closeSession(Clients* client, enum MQTTReasonCodes reason, MQTTProperties* props)
  868. {
  869. FUNC_ENTRY;
  870. client->good = 0;
  871. client->ping_outstanding = 0;
  872. if (client->net.socket > 0)
  873. {
  874. if (client->connected)
  875. MQTTPacket_send_disconnect(client, reason, props);
  876. Thread_lock_mutex(socket_mutex);
  877. WebSocket_close(&client->net, WebSocket_CLOSE_NORMAL, NULL);
  878. #if defined(OPENSSL)
  879. SSLSocket_close(&client->net);
  880. #endif
  881. Socket_close(client->net.socket);
  882. Thread_unlock_mutex(socket_mutex);
  883. client->net.socket = 0;
  884. #if defined(OPENSSL)
  885. client->net.ssl = NULL;
  886. #endif
  887. }
  888. client->connected = 0;
  889. client->connect_state = NOT_IN_PROGRESS;
  890. if (client->MQTTVersion < MQTTVERSION_5 && client->cleansession)
  891. MQTTClient_cleanSession(client);
  892. FUNC_EXIT;
  893. }
  894. static int MQTTClient_cleanSession(Clients* client)
  895. {
  896. int rc = 0;
  897. FUNC_ENTRY;
  898. #if !defined(NO_PERSISTENCE)
  899. rc = MQTTPersistence_clear(client);
  900. #endif
  901. MQTTProtocol_emptyMessageList(client->inboundMsgs);
  902. MQTTProtocol_emptyMessageList(client->outboundMsgs);
  903. MQTTClient_emptyMessageQueue(client);
  904. client->msgID = 0;
  905. FUNC_EXIT_RC(rc);
  906. return rc;
  907. }
  908. void Protocol_processPublication(Publish* publish, Clients* client)
  909. {
  910. qEntry* qe = NULL;
  911. MQTTClient_message* mm = NULL;
  912. MQTTClient_message initialized = MQTTClient_message_initializer;
  913. FUNC_ENTRY;
  914. qe = malloc(sizeof(qEntry));
  915. mm = malloc(sizeof(MQTTClient_message));
  916. memcpy(mm, &initialized, sizeof(MQTTClient_message));
  917. qe->msg = mm;
  918. qe->topicName = publish->topic;
  919. qe->topicLen = publish->topiclen;
  920. publish->topic = NULL;
  921. /* If the message is QoS 2, then we have already stored the incoming payload
  922. * in an allocated buffer, so we don't need to copy again.
  923. */
  924. if (publish->header.bits.qos == 2)
  925. mm->payload = publish->payload;
  926. else
  927. {
  928. mm->payload = malloc(publish->payloadlen);
  929. memcpy(mm->payload, publish->payload, publish->payloadlen);
  930. }
  931. mm->payloadlen = publish->payloadlen;
  932. mm->qos = publish->header.bits.qos;
  933. mm->retained = publish->header.bits.retain;
  934. if (publish->header.bits.qos == 2)
  935. mm->dup = 0; /* ensure that a QoS2 message is not passed to the application with dup = 1 */
  936. else
  937. mm->dup = publish->header.bits.dup;
  938. mm->msgid = publish->msgId;
  939. if (publish->MQTTVersion >= 5)
  940. mm->properties = MQTTProperties_copy(&publish->properties);
  941. ListAppend(client->messageQueue, qe, sizeof(qe) + sizeof(mm) + mm->payloadlen + strlen(qe->topicName)+1);
  942. #if !defined(NO_PERSISTENCE)
  943. if (client->persistence)
  944. MQTTPersistence_persistQueueEntry(client, (MQTTPersistence_qEntry*)qe);
  945. #endif
  946. FUNC_EXIT;
  947. }
  948. static MQTTResponse MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI, int MQTTVersion,
  949. START_TIME_TYPE start, long millisecsTimeout, MQTTProperties* connectProperties, MQTTProperties* willProperties)
  950. {
  951. MQTTClients* m = handle;
  952. int rc = SOCKET_ERROR;
  953. int sessionPresent = 0;
  954. MQTTResponse resp = MQTTResponse_initializer;
  955. FUNC_ENTRY;
  956. resp.reasonCode = SOCKET_ERROR;
  957. if (m->ma && !running)
  958. {
  959. Thread_start(MQTTClient_run, handle);
  960. if (MQTTClient_elapsed(start) >= millisecsTimeout)
  961. {
  962. rc = SOCKET_ERROR;
  963. goto exit;
  964. }
  965. MQTTClient_sleep(100L);
  966. }
  967. Log(TRACE_MIN, -1, "Connecting to serverURI %s with MQTT version %d", serverURI, MQTTVersion);
  968. #if defined(OPENSSL)
  969. rc = MQTTProtocol_connect(serverURI, m->c, m->ssl, m->websocket, MQTTVersion, connectProperties, willProperties);
  970. #else
  971. rc = MQTTProtocol_connect(serverURI, m->c, m->websocket, MQTTVersion, connectProperties, willProperties);
  972. #endif
  973. if (rc == SOCKET_ERROR)
  974. goto exit;
  975. if (m->c->connect_state == NOT_IN_PROGRESS)
  976. {
  977. rc = SOCKET_ERROR;
  978. goto exit;
  979. }
  980. if (m->c->connect_state == TCP_IN_PROGRESS) /* TCP connect started - wait for completion */
  981. {
  982. Thread_unlock_mutex(mqttclient_mutex);
  983. MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTClient_elapsed(start));
  984. Thread_lock_mutex(mqttclient_mutex);
  985. if (rc != 0)
  986. {
  987. rc = SOCKET_ERROR;
  988. goto exit;
  989. }
  990. #if defined(OPENSSL)
  991. if (m->ssl)
  992. {
  993. int port;
  994. size_t hostname_len;
  995. const char *topic;
  996. int setSocketForSSLrc = 0;
  997. hostname_len = MQTTProtocol_addressPort(m->serverURI, &port, &topic);
  998. setSocketForSSLrc = SSLSocket_setSocketForSSL(&m->c->net, m->c->sslopts,
  999. m->serverURI, hostname_len);
  1000. if (setSocketForSSLrc != MQTTCLIENT_SUCCESS)
  1001. {
  1002. if (m->c->session != NULL)
  1003. if ((rc = SSL_set_session(m->c->net.ssl, m->c->session)) != 1)
  1004. Log(TRACE_MIN, -1, "Failed to set SSL session with stored data, non critical");
  1005. rc = m->c->sslopts->struct_version >= 3 ?
  1006. SSLSocket_connect(m->c->net.ssl, m->c->net.socket, m->serverURI,
  1007. m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
  1008. SSLSocket_connect(m->c->net.ssl, m->c->net.socket, m->serverURI,
  1009. m->c->sslopts->verify, NULL, NULL);
  1010. if (rc == TCPSOCKET_INTERRUPTED)
  1011. m->c->connect_state = SSL_IN_PROGRESS; /* the connect is still in progress */
  1012. else if (rc == SSL_FATAL)
  1013. {
  1014. rc = SOCKET_ERROR;
  1015. goto exit;
  1016. }
  1017. else if (rc == 1)
  1018. {
  1019. if (m->websocket)
  1020. {
  1021. m->c->connect_state = WEBSOCKET_IN_PROGRESS;
  1022. rc = WebSocket_connect(&m->c->net,m->serverURI);
  1023. if ( rc == SOCKET_ERROR )
  1024. goto exit;
  1025. }
  1026. else
  1027. {
  1028. rc = MQTTCLIENT_SUCCESS;
  1029. m->c->connect_state = WAIT_FOR_CONNACK;
  1030. if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
  1031. {
  1032. rc = SOCKET_ERROR;
  1033. goto exit;
  1034. }
  1035. if ((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
  1036. m->c->session = SSL_get1_session(m->c->net.ssl);
  1037. }
  1038. }
  1039. }
  1040. else
  1041. {
  1042. rc = SOCKET_ERROR;
  1043. goto exit;
  1044. }
  1045. }
  1046. #endif
  1047. else if (m->websocket)
  1048. {
  1049. m->c->connect_state = WEBSOCKET_IN_PROGRESS;
  1050. if ( WebSocket_connect(&m->c->net, m->serverURI) == SOCKET_ERROR )
  1051. {
  1052. rc = SOCKET_ERROR;
  1053. goto exit;
  1054. }
  1055. }
  1056. else
  1057. {
  1058. m->c->connect_state = WAIT_FOR_CONNACK; /* TCP connect completed, in which case send the MQTT connect packet */
  1059. if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
  1060. {
  1061. rc = SOCKET_ERROR;
  1062. goto exit;
  1063. }
  1064. }
  1065. }
  1066. #if defined(OPENSSL)
  1067. if (m->c->connect_state == SSL_IN_PROGRESS) /* SSL connect sent - wait for completion */
  1068. {
  1069. Thread_unlock_mutex(mqttclient_mutex);
  1070. MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTClient_elapsed(start));
  1071. Thread_lock_mutex(mqttclient_mutex);
  1072. if (rc != 1)
  1073. {
  1074. rc = SOCKET_ERROR;
  1075. goto exit;
  1076. }
  1077. if((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
  1078. m->c->session = SSL_get1_session(m->c->net.ssl);
  1079. if ( m->websocket )
  1080. {
  1081. /* wait for websocket connect */
  1082. m->c->connect_state = WEBSOCKET_IN_PROGRESS;
  1083. rc = WebSocket_connect( &m->c->net, m->serverURI );
  1084. if ( rc != 1 )
  1085. {
  1086. rc = SOCKET_ERROR;
  1087. goto exit;
  1088. }
  1089. }
  1090. else
  1091. {
  1092. m->c->connect_state = WAIT_FOR_CONNACK; /* TCP connect completed, in which case send the MQTT connect packet */
  1093. if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
  1094. {
  1095. rc = SOCKET_ERROR;
  1096. goto exit;
  1097. }
  1098. }
  1099. }
  1100. #endif
  1101. if (m->c->connect_state == WEBSOCKET_IN_PROGRESS) /* websocket request sent - wait for upgrade */
  1102. {
  1103. Thread_unlock_mutex(mqttclient_mutex);
  1104. MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTClient_elapsed(start));
  1105. Thread_lock_mutex(mqttclient_mutex);
  1106. m->c->connect_state = WAIT_FOR_CONNACK; /* websocket upgrade complete */
  1107. if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
  1108. {
  1109. rc = SOCKET_ERROR;
  1110. goto exit;
  1111. }
  1112. }
  1113. if (m->c->connect_state == WAIT_FOR_CONNACK) /* MQTT connect sent - wait for CONNACK */
  1114. {
  1115. MQTTPacket* pack = NULL;
  1116. Thread_unlock_mutex(mqttclient_mutex);
  1117. pack = MQTTClient_waitfor(handle, CONNACK, &rc, millisecsTimeout - MQTTClient_elapsed(start));
  1118. Thread_lock_mutex(mqttclient_mutex);
  1119. if (pack == NULL)
  1120. rc = SOCKET_ERROR;
  1121. else
  1122. {
  1123. Connack* connack = (Connack*)pack;
  1124. Log(TRACE_PROTOCOL, 1, NULL, m->c->net.socket, m->c->clientID, connack->rc);
  1125. if ((rc = connack->rc) == MQTTCLIENT_SUCCESS)
  1126. {
  1127. m->c->connected = 1;
  1128. m->c->good = 1;
  1129. m->c->connect_state = NOT_IN_PROGRESS;
  1130. if (MQTTVersion == 4)
  1131. sessionPresent = connack->flags.bits.sessionPresent;
  1132. if (m->c->cleansession || m->c->cleanstart)
  1133. rc = MQTTClient_cleanSession(m->c);
  1134. if (m->c->outboundMsgs->count > 0)
  1135. {
  1136. ListElement* outcurrent = NULL;
  1137. while (ListNextElement(m->c->outboundMsgs, &outcurrent))
  1138. {
  1139. Messages* m = (Messages*)(outcurrent->content);
  1140. m->lastTouch = 0;
  1141. }
  1142. MQTTProtocol_retry((time_t)0, 1, 1);
  1143. if (m->c->connected != 1)
  1144. rc = MQTTCLIENT_DISCONNECTED;
  1145. }
  1146. if (m->c->MQTTVersion == MQTTVERSION_5)
  1147. {
  1148. resp.properties = malloc(sizeof(MQTTProperties));
  1149. *resp.properties = MQTTProperties_copy(&connack->properties);
  1150. }
  1151. }
  1152. MQTTPacket_freeConnack(connack);
  1153. m->pack = NULL;
  1154. }
  1155. }
  1156. exit:
  1157. if (rc == MQTTCLIENT_SUCCESS)
  1158. {
  1159. if (options->struct_version >= 4) /* means we have to fill out return values */
  1160. {
  1161. options->returned.serverURI = serverURI;
  1162. options->returned.MQTTVersion = MQTTVersion;
  1163. options->returned.sessionPresent = sessionPresent;
  1164. }
  1165. }
  1166. else
  1167. MQTTClient_disconnect1(handle, 0, 0, (MQTTVersion == 3), MQTTREASONCODE_SUCCESS, NULL); /* don't want to call connection lost */
  1168. resp.reasonCode = rc;
  1169. FUNC_EXIT_RC(resp.reasonCode);
  1170. return resp;
  1171. }
  1172. static int retryLoopInterval = 5;
  1173. static void setRetryLoopInterval(int keepalive)
  1174. {
  1175. int proposed = keepalive / 10;
  1176. if (proposed < 1)
  1177. proposed = 1;
  1178. else if (proposed > 5)
  1179. proposed = 5;
  1180. if (proposed < retryLoopInterval)
  1181. retryLoopInterval = proposed;
  1182. }
  1183. static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI,
  1184. MQTTProperties* connectProperties, MQTTProperties* willProperties)
  1185. {
  1186. MQTTClients* m = handle;
  1187. START_TIME_TYPE start;
  1188. long millisecsTimeout = 30000L;
  1189. MQTTResponse rc = MQTTResponse_initializer;
  1190. int MQTTVersion = 0;
  1191. FUNC_ENTRY;
  1192. rc.reasonCode = SOCKET_ERROR;
  1193. millisecsTimeout = options->connectTimeout * 1000;
  1194. start = MQTTClient_start_clock();
  1195. m->c->keepAliveInterval = options->keepAliveInterval;
  1196. m->c->retryInterval = options->retryInterval;
  1197. setRetryLoopInterval(options->keepAliveInterval);
  1198. m->c->MQTTVersion = options->MQTTVersion;
  1199. m->c->cleanstart = m->c->cleansession = 0;
  1200. if (m->c->MQTTVersion >= MQTTVERSION_5)
  1201. m->c->cleanstart = options->cleanstart;
  1202. else
  1203. m->c->cleansession = options->cleansession;
  1204. m->c->maxInflightMessages = (options->reliable) ? 1 : 10;
  1205. if (options->struct_version >= 6)
  1206. {
  1207. if (options->maxInflightMessages > 0)
  1208. m->c->maxInflightMessages = options->maxInflightMessages;
  1209. }
  1210. if (m->c->will)
  1211. {
  1212. free(m->c->will->payload);
  1213. free(m->c->will->topic);
  1214. free(m->c->will);
  1215. m->c->will = NULL;
  1216. }
  1217. if (options->will && (options->will->struct_version == 0 || options->will->struct_version == 1))
  1218. {
  1219. const void* source = NULL;
  1220. m->c->will = malloc(sizeof(willMessages));
  1221. if (options->will->message || (options->will->struct_version == 1 && options->will->payload.data))
  1222. {
  1223. if (options->will->struct_version == 1 && options->will->payload.data)
  1224. {
  1225. m->c->will->payloadlen = options->will->payload.len;
  1226. source = options->will->payload.data;
  1227. }
  1228. else
  1229. {
  1230. m->c->will->payloadlen = (int)strlen(options->will->message);
  1231. source = (void*)options->will->message;
  1232. }
  1233. m->c->will->payload = malloc(m->c->will->payloadlen);
  1234. memcpy(m->c->will->payload, source, m->c->will->payloadlen);
  1235. }
  1236. else
  1237. {
  1238. m->c->will->payload = NULL;
  1239. m->c->will->payloadlen = 0;
  1240. }
  1241. m->c->will->qos = options->will->qos;
  1242. m->c->will->retained = options->will->retained;
  1243. m->c->will->topic = MQTTStrdup(options->will->topicName);
  1244. }
  1245. #if defined(OPENSSL)
  1246. if (m->c->sslopts)
  1247. {
  1248. if (m->c->sslopts->trustStore)
  1249. free((void*)m->c->sslopts->trustStore);
  1250. if (m->c->sslopts->keyStore)
  1251. free((void*)m->c->sslopts->keyStore);
  1252. if (m->c->sslopts->privateKey)
  1253. free((void*)m->c->sslopts->privateKey);
  1254. if (m->c->sslopts->privateKeyPassword)
  1255. free((void*)m->c->sslopts->privateKeyPassword);
  1256. if (m->c->sslopts->enabledCipherSuites)
  1257. free((void*)m->c->sslopts->enabledCipherSuites);
  1258. if (m->c->sslopts->struct_version >= 2)
  1259. {
  1260. if (m->c->sslopts->CApath)
  1261. free((void*)m->c->sslopts->CApath);
  1262. }
  1263. free(m->c->sslopts);
  1264. m->c->sslopts = NULL;
  1265. }
  1266. if (options->struct_version != 0 && options->ssl)
  1267. {
  1268. m->c->sslopts = malloc(sizeof(MQTTClient_SSLOptions));
  1269. memset(m->c->sslopts, '\0', sizeof(MQTTClient_SSLOptions));
  1270. m->c->sslopts->struct_version = options->ssl->struct_version;
  1271. if (options->ssl->trustStore)
  1272. m->c->sslopts->trustStore = MQTTStrdup(options->ssl->trustStore);
  1273. if (options->ssl->keyStore)
  1274. m->c->sslopts->keyStore = MQTTStrdup(options->ssl->keyStore);
  1275. if (options->ssl->privateKey)
  1276. m->c->sslopts->privateKey = MQTTStrdup(options->ssl->privateKey);
  1277. if (options->ssl->privateKeyPassword)
  1278. m->c->sslopts->privateKeyPassword = MQTTStrdup(options->ssl->privateKeyPassword);
  1279. if (options->ssl->enabledCipherSuites)
  1280. m->c->sslopts->enabledCipherSuites = MQTTStrdup(options->ssl->enabledCipherSuites);
  1281. m->c->sslopts->enableServerCertAuth = options->ssl->enableServerCertAuth;
  1282. if (m->c->sslopts->struct_version >= 1)
  1283. m->c->sslopts->sslVersion = options->ssl->sslVersion;
  1284. if (m->c->sslopts->struct_version >= 2)
  1285. {
  1286. m->c->sslopts->verify = options->ssl->verify;
  1287. if (options->ssl->CApath)
  1288. m->c->sslopts->CApath = MQTTStrdup(options->ssl->CApath);
  1289. }
  1290. if (m->c->sslopts->struct_version >= 3)
  1291. {
  1292. m->c->sslopts->ssl_error_cb = options->ssl->ssl_error_cb;
  1293. m->c->sslopts->ssl_error_context = options->ssl->ssl_error_context;
  1294. }
  1295. if (m->c->sslopts->struct_version >= 4)
  1296. {
  1297. m->c->sslopts->ssl_psk_cb = options->ssl->ssl_psk_cb;
  1298. m->c->sslopts->ssl_psk_context = options->ssl->ssl_psk_context;
  1299. m->c->sslopts->disableDefaultTrustStore = options->ssl->disableDefaultTrustStore;
  1300. }
  1301. }
  1302. #endif
  1303. if (m->c->username)
  1304. free((void*)m->c->username);
  1305. if (options->username)
  1306. m->c->username = MQTTStrdup(options->username);
  1307. if (m->c->password)
  1308. free((void*)m->c->password);
  1309. if (options->password)
  1310. {
  1311. m->c->password = MQTTStrdup(options->password);
  1312. m->c->passwordlen = (int)strlen(options->password);
  1313. }
  1314. else if (options->struct_version >= 5 && options->binarypwd.data)
  1315. {
  1316. m->c->passwordlen = options->binarypwd.len;
  1317. m->c->password = malloc(m->c->passwordlen);
  1318. memcpy((void*)m->c->password, options->binarypwd.data, m->c->passwordlen);
  1319. }
  1320. if (options->struct_version >= 3)
  1321. MQTTVersion = options->MQTTVersion;
  1322. else
  1323. MQTTVersion = MQTTVERSION_DEFAULT;
  1324. if (MQTTVersion == MQTTVERSION_DEFAULT)
  1325. {
  1326. rc = MQTTClient_connectURIVersion(handle, options, serverURI, 4, start, millisecsTimeout,
  1327. connectProperties, willProperties);
  1328. if (rc.reasonCode != MQTTCLIENT_SUCCESS)
  1329. {
  1330. rc = MQTTClient_connectURIVersion(handle, options, serverURI, 3, start, millisecsTimeout,
  1331. connectProperties, willProperties);
  1332. }
  1333. }
  1334. else
  1335. rc = MQTTClient_connectURIVersion(handle, options, serverURI, MQTTVersion, start, millisecsTimeout,
  1336. connectProperties, willProperties);
  1337. FUNC_EXIT_RC(rc.reasonCode);
  1338. return rc;
  1339. }
  1340. MQTTResponse MQTTClient_connectAll(MQTTClient handle, MQTTClient_connectOptions* options,
  1341. MQTTProperties* connectProperties, MQTTProperties* willProperties);
  1342. int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
  1343. {
  1344. MQTTClients* m = handle;
  1345. MQTTResponse response;
  1346. if (m->c->MQTTVersion >= MQTTVERSION_5)
  1347. return MQTTCLIENT_WRONG_MQTT_VERSION;
  1348. response = MQTTClient_connectAll(handle, options, NULL, NULL);
  1349. return response.reasonCode;
  1350. }
  1351. MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connectOptions* options,
  1352. MQTTProperties* connectProperties, MQTTProperties* willProperties)
  1353. {
  1354. MQTTClients* m = handle;
  1355. MQTTResponse response = MQTTResponse_initializer;
  1356. if (m->c->MQTTVersion < MQTTVERSION_5)
  1357. {
  1358. response.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
  1359. return response;
  1360. }
  1361. return MQTTClient_connectAll(handle, options, connectProperties, willProperties);
  1362. }
  1363. MQTTResponse MQTTClient_connectAll(MQTTClient handle, MQTTClient_connectOptions* options,
  1364. MQTTProperties* connectProperties, MQTTProperties* willProperties)
  1365. {
  1366. MQTTClients* m = handle;
  1367. MQTTResponse rc = MQTTResponse_initializer;
  1368. FUNC_ENTRY;
  1369. Thread_lock_mutex(connect_mutex);
  1370. Thread_lock_mutex(mqttclient_mutex);
  1371. rc.reasonCode = SOCKET_ERROR;
  1372. if (options == NULL)
  1373. {
  1374. rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
  1375. goto exit;
  1376. }
  1377. if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 6)
  1378. {
  1379. rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
  1380. goto exit;
  1381. }
  1382. #if defined(OPENSSL)
  1383. if (m->ssl && options->ssl == NULL)
  1384. {
  1385. rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
  1386. goto exit;
  1387. }
  1388. #endif
  1389. if (options->will) /* check validity of will options structure */
  1390. {
  1391. if (strncmp(options->will->struct_id, "MQTW", 4) != 0 || (options->will->struct_version != 0 && options->will->struct_version != 1))
  1392. {
  1393. rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
  1394. goto exit;
  1395. }
  1396. }
  1397. #if defined(OPENSSL)
  1398. if (options->struct_version != 0 && options->ssl) /* check validity of SSL options structure */
  1399. {
  1400. if (strncmp(options->ssl->struct_id, "MQTS", 4) != 0 || options->ssl->struct_version < 0 || options->ssl->struct_version > 4)
  1401. {
  1402. rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
  1403. goto exit;
  1404. }
  1405. }
  1406. #endif
  1407. if ((options->username && !UTF8_validateString(options->username)) ||
  1408. (options->password && !UTF8_validateString(options->password)))
  1409. {
  1410. rc.reasonCode = MQTTCLIENT_BAD_UTF8_STRING;
  1411. goto exit;
  1412. }
  1413. if (options->MQTTVersion != MQTTVERSION_DEFAULT &&
  1414. (options->MQTTVersion < MQTTVERSION_3_1 || options->MQTTVersion > MQTTVERSION_5))
  1415. {
  1416. rc.reasonCode = MQTTCLIENT_BAD_MQTT_VERSION;
  1417. goto exit;
  1418. }
  1419. if (options->MQTTVersion >= MQTTVERSION_5)
  1420. {
  1421. if (options->cleansession != 0)
  1422. {
  1423. rc.reasonCode = MQTTCLIENT_BAD_MQTT_OPTION;
  1424. goto exit;
  1425. }
  1426. }
  1427. else if (options->cleanstart != 0)
  1428. {
  1429. rc.reasonCode = MQTTCLIENT_BAD_MQTT_OPTION;
  1430. goto exit;
  1431. }
  1432. if (options->struct_version < 2 || options->serverURIcount == 0)
  1433. {
  1434. if ( !m )
  1435. {
  1436. rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
  1437. goto exit;
  1438. }
  1439. rc = MQTTClient_connectURI(handle, options, m->serverURI, connectProperties, willProperties);
  1440. }
  1441. else
  1442. {
  1443. int i;
  1444. for (i = 0; i < options->serverURIcount; ++i)
  1445. {
  1446. char* serverURI = options->serverURIs[i];
  1447. if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
  1448. serverURI += strlen(URI_TCP);
  1449. else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
  1450. {
  1451. serverURI += strlen(URI_WS);
  1452. m->websocket = 1;
  1453. }
  1454. #if defined(OPENSSL)
  1455. else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
  1456. {
  1457. serverURI += strlen(URI_SSL);
  1458. m->ssl = 1;
  1459. }
  1460. else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
  1461. {
  1462. serverURI += strlen(URI_WSS);
  1463. m->ssl = 1;
  1464. m->websocket = 1;
  1465. }
  1466. #endif
  1467. rc = MQTTClient_connectURI(handle, options, serverURI, connectProperties, willProperties);
  1468. if (rc.reasonCode == MQTTREASONCODE_SUCCESS)
  1469. break;
  1470. }
  1471. }
  1472. if (rc.reasonCode == MQTTREASONCODE_SUCCESS)
  1473. {
  1474. if (rc.properties && MQTTProperties_hasProperty(rc.properties, MQTTPROPERTY_CODE_RECEIVE_MAXIMUM))
  1475. {
  1476. int recv_max = MQTTProperties_getNumericValue(rc.properties, MQTTPROPERTY_CODE_RECEIVE_MAXIMUM);
  1477. if (m->c->maxInflightMessages > recv_max)
  1478. m->c->maxInflightMessages = recv_max;
  1479. }
  1480. }
  1481. exit:
  1482. if (m && m->c && m->c->will)
  1483. {
  1484. if (m->c->will->payload)
  1485. free(m->c->will->payload);
  1486. if (m->c->will->topic)
  1487. free(m->c->will->topic);
  1488. free(m->c->will);
  1489. m->c->will = NULL;
  1490. }
  1491. Thread_unlock_mutex(mqttclient_mutex);
  1492. Thread_unlock_mutex(connect_mutex);
  1493. FUNC_EXIT_RC(rc.reasonCode);
  1494. return rc;
  1495. }
  1496. /**
  1497. * mqttclient_mutex must be locked when you call this function, if multi threaded
  1498. */
  1499. static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int call_connection_lost, int stop,
  1500. enum MQTTReasonCodes reason, MQTTProperties* props)
  1501. {
  1502. MQTTClients* m = handle;
  1503. START_TIME_TYPE start;
  1504. int rc = MQTTCLIENT_SUCCESS;
  1505. int was_connected = 0;
  1506. FUNC_ENTRY;
  1507. if (m == NULL || m->c == NULL)
  1508. {
  1509. rc = MQTTCLIENT_FAILURE;
  1510. goto exit;
  1511. }
  1512. if (m->c->connected == 0 && m->c->connect_state == NOT_IN_PROGRESS)
  1513. {
  1514. rc = MQTTCLIENT_DISCONNECTED;
  1515. goto exit;
  1516. }
  1517. was_connected = m->c->connected; /* should be 1 */
  1518. if (m->c->connected != 0)
  1519. {
  1520. start = MQTTClient_start_clock();
  1521. m->c->connect_state = DISCONNECTING; /* indicate disconnecting */
  1522. while (m->c->inboundMsgs->count > 0 || m->c->outboundMsgs->count > 0)
  1523. { /* wait for all inflight message flows to finish, up to timeout */
  1524. if (MQTTClient_elapsed(start) >= timeout)
  1525. break;
  1526. Thread_unlock_mutex(mqttclient_mutex);
  1527. MQTTClient_yield();
  1528. Thread_lock_mutex(mqttclient_mutex);
  1529. }
  1530. }
  1531. MQTTClient_closeSession(m->c, reason, props);
  1532. exit:
  1533. if (stop)
  1534. MQTTClient_stop();
  1535. if (call_connection_lost && m->cl && was_connected)
  1536. {
  1537. Log(TRACE_MIN, -1, "Calling connectionLost for client %s", m->c->clientID);
  1538. Thread_start(connectionLost_call, m);
  1539. }
  1540. FUNC_EXIT_RC(rc);
  1541. return rc;
  1542. }
  1543. /**
  1544. * mqttclient_mutex must be locked when you call this function, if multi threaded
  1545. */
  1546. static int MQTTClient_disconnect_internal(MQTTClient handle, int timeout)
  1547. {
  1548. return MQTTClient_disconnect1(handle, timeout, 1, 1, MQTTREASONCODE_SUCCESS, NULL);
  1549. }
  1550. /**
  1551. * mqttclient_mutex must be locked when you call this function, if multi threaded
  1552. */
  1553. void MQTTProtocol_closeSession(Clients* c, int sendwill)
  1554. {
  1555. MQTTClient_disconnect_internal((MQTTClient)c->context, 0);
  1556. }
  1557. int MQTTClient_disconnect(MQTTClient handle, int timeout)
  1558. {
  1559. int rc = 0;
  1560. Thread_lock_mutex(mqttclient_mutex);
  1561. rc = MQTTClient_disconnect1(handle, timeout, 0, 1, MQTTREASONCODE_SUCCESS, NULL);
  1562. Thread_unlock_mutex(mqttclient_mutex);
  1563. return rc;
  1564. }
  1565. int MQTTClient_disconnect5(MQTTClient handle, int timeout, enum MQTTReasonCodes reason, MQTTProperties* props)
  1566. {
  1567. int rc = 0;
  1568. Thread_lock_mutex(mqttclient_mutex);
  1569. rc = MQTTClient_disconnect1(handle, timeout, 0, 1, reason, props);
  1570. Thread_unlock_mutex(mqttclient_mutex);
  1571. return rc;
  1572. }
  1573. int MQTTClient_isConnected(MQTTClient handle)
  1574. {
  1575. MQTTClients* m = handle;
  1576. int rc = 0;
  1577. FUNC_ENTRY;
  1578. Thread_lock_mutex(mqttclient_mutex);
  1579. if (m && m->c)
  1580. rc = m->c->connected;
  1581. Thread_unlock_mutex(mqttclient_mutex);
  1582. FUNC_EXIT_RC(rc);
  1583. return rc;
  1584. }
  1585. MQTTResponse MQTTClient_subscribeMany5(MQTTClient handle, int count, char* const* topic,
  1586. int* qos, MQTTSubscribe_options* opts, MQTTProperties* props)
  1587. {
  1588. MQTTClients* m = handle;
  1589. List* topics = NULL;
  1590. List* qoss = NULL;
  1591. int i = 0;
  1592. int rc = MQTTCLIENT_FAILURE;
  1593. MQTTResponse resp = MQTTResponse_initializer;
  1594. int msgid = 0;
  1595. FUNC_ENTRY;
  1596. Thread_lock_mutex(subscribe_mutex);
  1597. Thread_lock_mutex(mqttclient_mutex);
  1598. resp.reasonCode = MQTTCLIENT_FAILURE;
  1599. if (m == NULL || m->c == NULL)
  1600. {
  1601. rc = MQTTCLIENT_FAILURE;
  1602. goto exit;
  1603. }
  1604. if (m->c->connected == 0)
  1605. {
  1606. rc = MQTTCLIENT_DISCONNECTED;
  1607. goto exit;
  1608. }
  1609. for (i = 0; i < count; i++)
  1610. {
  1611. if (!UTF8_validateString(topic[i]))
  1612. {
  1613. rc = MQTTCLIENT_BAD_UTF8_STRING;
  1614. goto exit;
  1615. }
  1616. if (qos[i] < 0 || qos[i] > 2)
  1617. {
  1618. rc = MQTTCLIENT_BAD_QOS;
  1619. goto exit;
  1620. }
  1621. }
  1622. if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
  1623. {
  1624. rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
  1625. goto exit;
  1626. }
  1627. topics = ListInitialize();
  1628. qoss = ListInitialize();
  1629. for (i = 0; i < count; i++)
  1630. {
  1631. ListAppend(topics, topic[i], strlen(topic[i]));
  1632. ListAppend(qoss, &qos[i], sizeof(int));
  1633. }
  1634. rc = MQTTProtocol_subscribe(m->c, topics, qoss, msgid, opts, props);
  1635. ListFreeNoContent(topics);
  1636. ListFreeNoContent(qoss);
  1637. if (rc == TCPSOCKET_COMPLETE)
  1638. {
  1639. MQTTPacket* pack = NULL;
  1640. Thread_unlock_mutex(mqttclient_mutex);
  1641. pack = MQTTClient_waitfor(handle, SUBACK, &rc, 10000L);
  1642. Thread_lock_mutex(mqttclient_mutex);
  1643. if (pack != NULL)
  1644. {
  1645. Suback* sub = (Suback*)pack;
  1646. if (m->c->MQTTVersion == MQTTVERSION_5)
  1647. {
  1648. if (sub->properties.count > 0)
  1649. {
  1650. resp.properties = malloc(sizeof(MQTTProperties));
  1651. *resp.properties = MQTTProperties_copy(&sub->properties);
  1652. }
  1653. resp.reasonCodeCount = sub->qoss->count;
  1654. resp.reasonCode = *(int*)sub->qoss->first->content;
  1655. if (sub->qoss->count > 1)
  1656. {
  1657. ListElement* current = NULL;
  1658. int rc_count = 0;
  1659. resp.reasonCodes = malloc(sizeof(enum MQTTReasonCodes) * (sub->qoss->count));
  1660. while (ListNextElement(sub->qoss, &current))
  1661. (resp.reasonCodes)[rc_count++] = *(enum MQTTReasonCodes*)(current->content);
  1662. }
  1663. }
  1664. else
  1665. {
  1666. ListElement* current = NULL;
  1667. i = 0;
  1668. while (ListNextElement(sub->qoss, &current))
  1669. {
  1670. int* reqqos = (int*)(current->content);
  1671. qos[i++] = *reqqos;
  1672. }
  1673. resp.reasonCode = rc;
  1674. }
  1675. rc = MQTTProtocol_handleSubacks(pack, m->c->net.socket);
  1676. m->pack = NULL;
  1677. }
  1678. else
  1679. rc = SOCKET_ERROR;
  1680. }
  1681. if (rc == SOCKET_ERROR)
  1682. MQTTClient_disconnect_internal(handle, 0);
  1683. else if (rc == TCPSOCKET_COMPLETE)
  1684. rc = MQTTCLIENT_SUCCESS;
  1685. exit:
  1686. if (rc < 0)
  1687. resp.reasonCode = rc;
  1688. Thread_unlock_mutex(mqttclient_mutex);
  1689. Thread_unlock_mutex(subscribe_mutex);
  1690. FUNC_EXIT_RC(resp.reasonCode);
  1691. return resp;
  1692. }
  1693. int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, int* qos)
  1694. {
  1695. MQTTClients* m = handle;
  1696. MQTTResponse response = MQTTResponse_initializer;
  1697. if (m->c->MQTTVersion >= MQTTVERSION_5)
  1698. response.reasonCode = MQTTCLIENT_BAD_MQTT_VERSION;
  1699. else
  1700. response = MQTTClient_subscribeMany5(handle, count, topic, qos, NULL, NULL);
  1701. return response.reasonCode;
  1702. }
  1703. MQTTResponse MQTTClient_subscribe5(MQTTClient handle, const char* topic, int qos,
  1704. MQTTSubscribe_options* opts, MQTTProperties* props)
  1705. {
  1706. MQTTResponse rc;
  1707. FUNC_ENTRY;
  1708. rc = MQTTClient_subscribeMany5(handle, 1, (char * const *)(&topic), &qos, opts, props);
  1709. if (qos == MQTT_BAD_SUBSCRIBE) /* addition for MQTT 3.1.1 - error code from subscribe */
  1710. rc.reasonCode = MQTT_BAD_SUBSCRIBE;
  1711. FUNC_EXIT_RC(rc.reasonCode);
  1712. return rc;
  1713. }
  1714. int MQTTClient_subscribe(MQTTClient handle, const char* topic, int qos)
  1715. {
  1716. MQTTClients* m = handle;
  1717. MQTTResponse response = MQTTResponse_initializer;
  1718. if (m->c->MQTTVersion >= MQTTVERSION_5)
  1719. response.reasonCode = MQTTCLIENT_BAD_MQTT_VERSION;
  1720. else
  1721. response = MQTTClient_subscribe5(handle, topic, qos, NULL, NULL);
  1722. return response.reasonCode;
  1723. }
  1724. MQTTResponse MQTTClient_unsubscribeMany5(MQTTClient handle, int count, char* const* topic, MQTTProperties* props)
  1725. {
  1726. MQTTClients* m = handle;
  1727. List* topics = NULL;
  1728. int i = 0;
  1729. int rc = SOCKET_ERROR;
  1730. MQTTResponse resp = MQTTResponse_initializer;
  1731. int msgid = 0;
  1732. FUNC_ENTRY;
  1733. Thread_lock_mutex(unsubscribe_mutex);
  1734. Thread_lock_mutex(mqttclient_mutex);
  1735. resp.reasonCode = MQTTCLIENT_FAILURE;
  1736. if (m == NULL || m->c == NULL)
  1737. {
  1738. rc = MQTTCLIENT_FAILURE;
  1739. goto exit;
  1740. }
  1741. if (m->c->connected == 0)
  1742. {
  1743. rc = MQTTCLIENT_DISCONNECTED;
  1744. goto exit;
  1745. }
  1746. for (i = 0; i < count; i++)
  1747. {
  1748. if (!UTF8_validateString(topic[i]))
  1749. {
  1750. rc = MQTTCLIENT_BAD_UTF8_STRING;
  1751. goto exit;
  1752. }
  1753. }
  1754. if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
  1755. {
  1756. rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
  1757. goto exit;
  1758. }
  1759. topics = ListInitialize();
  1760. for (i = 0; i < count; i++)
  1761. ListAppend(topics, topic[i], strlen(topic[i]));
  1762. rc = MQTTProtocol_unsubscribe(m->c, topics, msgid, props);
  1763. ListFreeNoContent(topics);
  1764. if (rc == TCPSOCKET_COMPLETE)
  1765. {
  1766. MQTTPacket* pack = NULL;
  1767. Thread_unlock_mutex(mqttclient_mutex);
  1768. pack = MQTTClient_waitfor(handle, UNSUBACK, &rc, 10000L);
  1769. Thread_lock_mutex(mqttclient_mutex);
  1770. if (pack != NULL)
  1771. {
  1772. Unsuback* unsub = (Unsuback*)pack;
  1773. if (m->c->MQTTVersion == MQTTVERSION_5)
  1774. {
  1775. if (unsub->properties.count > 0)
  1776. {
  1777. resp.properties = malloc(sizeof(MQTTProperties));
  1778. *resp.properties = MQTTProperties_copy(&unsub->properties);
  1779. }
  1780. resp.reasonCodeCount = unsub->reasonCodes->count;
  1781. resp.reasonCode = *(int*)unsub->reasonCodes->first->content;
  1782. if (unsub->reasonCodes->count > 1)
  1783. {
  1784. ListElement* current = NULL;
  1785. int rc_count = 0;
  1786. resp.reasonCodes = malloc(sizeof(enum MQTTReasonCodes) * (unsub->reasonCodes->count));
  1787. while (ListNextElement(unsub->reasonCodes, &current))
  1788. (resp.reasonCodes)[rc_count++] = *(enum MQTTReasonCodes*)(current->content);
  1789. }
  1790. }
  1791. else
  1792. resp.reasonCode = rc;
  1793. rc = MQTTProtocol_handleUnsubacks(pack, m->c->net.socket);
  1794. m->pack = NULL;
  1795. }
  1796. else
  1797. rc = SOCKET_ERROR;
  1798. }
  1799. if (rc == SOCKET_ERROR)
  1800. MQTTClient_disconnect_internal(handle, 0);
  1801. exit:
  1802. if (rc < 0)
  1803. resp.reasonCode = rc;
  1804. Thread_unlock_mutex(mqttclient_mutex);
  1805. Thread_unlock_mutex(unsubscribe_mutex);
  1806. FUNC_EXIT_RC(resp.reasonCode);
  1807. return resp;
  1808. }
  1809. int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic)
  1810. {
  1811. MQTTResponse response = MQTTClient_unsubscribeMany5(handle, count, topic, NULL);
  1812. return response.reasonCode;
  1813. }
  1814. MQTTResponse MQTTClient_unsubscribe5(MQTTClient handle, const char* topic, MQTTProperties* props)
  1815. {
  1816. MQTTResponse rc;
  1817. rc = MQTTClient_unsubscribeMany5(handle, 1, (char * const *)(&topic), props);
  1818. return rc;
  1819. }
  1820. int MQTTClient_unsubscribe(MQTTClient handle, const char* topic)
  1821. {
  1822. MQTTResponse response = MQTTClient_unsubscribe5(handle, topic, NULL);
  1823. return response.reasonCode;
  1824. }
  1825. MQTTResponse MQTTClient_publish5(MQTTClient handle, const char* topicName, int payloadlen, const void* payload,
  1826. int qos, int retained, MQTTProperties* properties, MQTTClient_deliveryToken* deliveryToken)
  1827. {
  1828. int rc = MQTTCLIENT_SUCCESS;
  1829. MQTTClients* m = handle;
  1830. Messages* msg = NULL;
  1831. Publish* p = NULL;
  1832. int blocked = 0;
  1833. int msgid = 0;
  1834. MQTTResponse resp = MQTTResponse_initializer;
  1835. FUNC_ENTRY;
  1836. Thread_lock_mutex(mqttclient_mutex);
  1837. if (m == NULL || m->c == NULL)
  1838. rc = MQTTCLIENT_FAILURE;
  1839. else if (m->c->connected == 0)
  1840. rc = MQTTCLIENT_DISCONNECTED;
  1841. else if (!UTF8_validateString(topicName))
  1842. rc = MQTTCLIENT_BAD_UTF8_STRING;
  1843. if (rc != MQTTCLIENT_SUCCESS)
  1844. goto exit;
  1845. /* If outbound queue is full, block until it is not */
  1846. while (m->c->outboundMsgs->count >= m->c->maxInflightMessages ||
  1847. Socket_noPendingWrites(m->c->net.socket) == 0) /* wait until the socket is free of large packets being written */
  1848. {
  1849. if (blocked == 0)
  1850. {
  1851. blocked = 1;
  1852. Log(TRACE_MIN, -1, "Blocking publish on queue full for client %s", m->c->clientID);
  1853. }
  1854. Thread_unlock_mutex(mqttclient_mutex);
  1855. MQTTClient_yield();
  1856. Thread_lock_mutex(mqttclient_mutex);
  1857. if (m->c->connected == 0)
  1858. {
  1859. rc = MQTTCLIENT_FAILURE;
  1860. goto exit;
  1861. }
  1862. }
  1863. if (blocked == 1)
  1864. Log(TRACE_MIN, -1, "Resuming publish now queue not full for client %s", m->c->clientID);
  1865. if (qos > 0 && (msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
  1866. { /* this should never happen as we've waited for spaces in the queue */
  1867. rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
  1868. goto exit;
  1869. }
  1870. p = malloc(sizeof(Publish) + payloadlen);
  1871. p->payload = (void*)payload;
  1872. p->payloadlen = payloadlen;
  1873. if (payloadlen > 0)
  1874. {
  1875. p->payload = (char*)p + sizeof(Publish);
  1876. memcpy(p->payload, payload, payloadlen);
  1877. p->payloadlen = payloadlen;
  1878. }
  1879. p->topic = MQTTStrdup(topicName);
  1880. p->msgId = msgid;
  1881. p->MQTTVersion = m->c->MQTTVersion;
  1882. if (m->c->MQTTVersion >= MQTTVERSION_5)
  1883. {
  1884. if (properties)
  1885. p->properties = *properties;
  1886. else
  1887. {
  1888. MQTTProperties props = MQTTProperties_initializer;
  1889. p->properties = props;
  1890. }
  1891. }
  1892. rc = MQTTProtocol_startPublish(m->c, p, qos, retained, &msg);
  1893. /* If the packet was partially written to the socket, wait for it to complete.
  1894. * However, if the client is disconnected during this time and qos is not 0, still return success, as
  1895. * the packet has already been written to persistence and assigned a message id so will
  1896. * be sent when the client next connects.
  1897. */
  1898. if (rc == TCPSOCKET_INTERRUPTED)
  1899. {
  1900. while (m->c->connected == 1 && SocketBuffer_getWrite(m->c->net.socket))
  1901. {
  1902. Thread_unlock_mutex(mqttclient_mutex);
  1903. MQTTClient_yield();
  1904. Thread_lock_mutex(mqttclient_mutex);
  1905. }
  1906. rc = (qos > 0 || m->c->connected == 1) ? MQTTCLIENT_SUCCESS : MQTTCLIENT_FAILURE;
  1907. }
  1908. if (deliveryToken && qos > 0)
  1909. *deliveryToken = msg->msgid;
  1910. if (p->topic) free(p->topic);
  1911. free(p);
  1912. if (rc == SOCKET_ERROR)
  1913. {
  1914. MQTTClient_disconnect_internal(handle, 0);
  1915. /* Return success for qos > 0 as the send will be retried automatically */
  1916. rc = (qos > 0) ? MQTTCLIENT_SUCCESS : MQTTCLIENT_FAILURE;
  1917. }
  1918. exit:
  1919. Thread_unlock_mutex(mqttclient_mutex);
  1920. resp.reasonCode = rc;
  1921. FUNC_EXIT_RC(resp.reasonCode);
  1922. return resp;
  1923. }
  1924. int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen, const void* payload,
  1925. int qos, int retained, MQTTClient_deliveryToken* deliveryToken)
  1926. {
  1927. MQTTClients* m = handle;
  1928. MQTTResponse rc = MQTTResponse_initializer;
  1929. if (m->c->MQTTVersion >= MQTTVERSION_5)
  1930. rc.reasonCode = MQTTCLIENT_BAD_MQTT_VERSION;
  1931. else
  1932. rc = MQTTClient_publish5(handle, topicName, payloadlen, payload, qos, retained, NULL, deliveryToken);
  1933. return rc.reasonCode;
  1934. }
  1935. MQTTResponse MQTTClient_publishMessage5(MQTTClient handle, const char* topicName, MQTTClient_message* message,
  1936. MQTTClient_deliveryToken* deliveryToken)
  1937. {
  1938. MQTTResponse rc = MQTTResponse_initializer;
  1939. MQTTProperties* props = NULL;
  1940. FUNC_ENTRY;
  1941. if (message == NULL)
  1942. {
  1943. rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
  1944. goto exit;
  1945. }
  1946. if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
  1947. (message->struct_version != 0 && message->struct_version != 1))
  1948. {
  1949. rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
  1950. goto exit;
  1951. }
  1952. if (message->struct_version >= 1)
  1953. props = &message->properties;
  1954. rc = MQTTClient_publish5(handle, topicName, message->payloadlen, message->payload,
  1955. message->qos, message->retained, props, deliveryToken);
  1956. exit:
  1957. FUNC_EXIT_RC(rc.reasonCode);
  1958. return rc;
  1959. }
  1960. int MQTTClient_publishMessage(MQTTClient handle, const char* topicName, MQTTClient_message* message,
  1961. MQTTClient_deliveryToken* deliveryToken)
  1962. {
  1963. MQTTClients* m = handle;
  1964. MQTTResponse rc = MQTTResponse_initializer;
  1965. if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
  1966. (message->struct_version != 0 && message->struct_version != 1))
  1967. rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
  1968. else if (m->c->MQTTVersion >= MQTTVERSION_5)
  1969. rc.reasonCode = MQTTCLIENT_BAD_MQTT_VERSION;
  1970. else
  1971. rc = MQTTClient_publishMessage5(handle, topicName, message, deliveryToken);
  1972. return rc.reasonCode;
  1973. }
  1974. static void MQTTClient_retry(void)
  1975. {
  1976. static time_t last = 0L;
  1977. time_t now;
  1978. FUNC_ENTRY;
  1979. time(&(now));
  1980. if (difftime(now, last) > retryLoopInterval)
  1981. {
  1982. time(&(last));
  1983. MQTTProtocol_keepalive(now);
  1984. MQTTProtocol_retry(now, 1, 0);
  1985. }
  1986. else
  1987. MQTTProtocol_retry(now, 0, 0);
  1988. FUNC_EXIT;
  1989. }
  1990. static MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc)
  1991. {
  1992. struct timeval tp = {0L, 0L};
  1993. static Ack ack;
  1994. MQTTPacket* pack = NULL;
  1995. FUNC_ENTRY;
  1996. if (timeout > 0L)
  1997. {
  1998. tp.tv_sec = timeout / 1000;
  1999. tp.tv_usec = (timeout % 1000) * 1000; /* this field is microseconds! */
  2000. }
  2001. #if defined(OPENSSL)
  2002. if ((*sock = SSLSocket_getPendingRead()) == -1)
  2003. {
  2004. /* 0 from getReadySocket indicates no work to do, -1 == error, but can happen normally */
  2005. #endif
  2006. *sock = Socket_getReadySocket(0, &tp, socket_mutex);
  2007. #if defined(OPENSSL)
  2008. }
  2009. #endif
  2010. Thread_lock_mutex(mqttclient_mutex);
  2011. if (*sock > 0)
  2012. {
  2013. MQTTClients* m = NULL;
  2014. if (ListFindItem(handles, sock, clientSockCompare) != NULL)
  2015. m = (MQTTClient)(handles->current->content);
  2016. if (m != NULL)
  2017. {
  2018. if (m->c->connect_state == TCP_IN_PROGRESS || m->c->connect_state == SSL_IN_PROGRESS)
  2019. *rc = 0; /* waiting for connect state to clear */
  2020. else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS)
  2021. *rc = WebSocket_upgrade(&m->c->net);
  2022. else
  2023. {
  2024. pack = MQTTPacket_Factory(m->c->MQTTVersion, &m->c->net, rc);
  2025. if (*rc == TCPSOCKET_INTERRUPTED)
  2026. *rc = 0;
  2027. }
  2028. }
  2029. if (pack)
  2030. {
  2031. int freed = 1;
  2032. /* Note that these handle... functions free the packet structure that they are dealing with */
  2033. if (pack->header.bits.type == PUBLISH)
  2034. *rc = MQTTProtocol_handlePublishes(pack, *sock);
  2035. else if (pack->header.bits.type == PUBACK || pack->header.bits.type == PUBCOMP)
  2036. {
  2037. int msgid;
  2038. ack = (pack->header.bits.type == PUBCOMP) ? *(Pubcomp*)pack : *(Puback*)pack;
  2039. msgid = ack.msgId;
  2040. if (m && m->c->MQTTVersion >= MQTTVERSION_5 && m->published)
  2041. {
  2042. Log(TRACE_MIN, -1, "Calling published for client %s, msgid %d", m->c->clientID, msgid);
  2043. (*(m->published))(m->published_context, msgid, pack->header.bits.type, &ack.properties, ack.rc);
  2044. }
  2045. *rc = (pack->header.bits.type == PUBCOMP) ?
  2046. MQTTProtocol_handlePubcomps(pack, *sock) : MQTTProtocol_handlePubacks(pack, *sock);
  2047. if (m && m->dc)
  2048. {
  2049. Log(TRACE_MIN, -1, "Calling deliveryComplete for client %s, msgid %d", m->c->clientID, msgid);
  2050. (*(m->dc))(m->context, msgid);
  2051. }
  2052. }
  2053. else if (pack->header.bits.type == PUBREC)
  2054. {
  2055. Pubrec* pubrec = (Pubrec*)pack;
  2056. if (m && m->c->MQTTVersion >= MQTTVERSION_5 && m->published && pubrec->rc >= MQTTREASONCODE_UNSPECIFIED_ERROR)
  2057. {
  2058. Log(TRACE_MIN, -1, "Calling published for client %s, msgid %d", m->c->clientID, ack.msgId);
  2059. (*(m->published))(m->published_context, pubrec->msgId, pack->header.bits.type,
  2060. &pubrec->properties, pubrec->rc);
  2061. }
  2062. *rc = MQTTProtocol_handlePubrecs(pack, *sock);
  2063. }
  2064. else if (pack->header.bits.type == PUBREL)
  2065. *rc = MQTTProtocol_handlePubrels(pack, *sock);
  2066. else if (pack->header.bits.type == PINGRESP)
  2067. *rc = MQTTProtocol_handlePingresps(pack, *sock);
  2068. else
  2069. freed = 0;
  2070. if (freed)
  2071. pack = NULL;
  2072. }
  2073. }
  2074. MQTTClient_retry();
  2075. Thread_unlock_mutex(mqttclient_mutex);
  2076. FUNC_EXIT_RC(*rc);
  2077. return pack;
  2078. }
  2079. static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, long timeout)
  2080. {
  2081. MQTTPacket* pack = NULL;
  2082. MQTTClients* m = handle;
  2083. START_TIME_TYPE start = MQTTClient_start_clock();
  2084. FUNC_ENTRY;
  2085. if (((MQTTClients*)handle) == NULL || timeout <= 0L)
  2086. {
  2087. *rc = MQTTCLIENT_FAILURE;
  2088. goto exit;
  2089. }
  2090. if (running)
  2091. {
  2092. if (packet_type == CONNECT)
  2093. {
  2094. if ((*rc = Thread_wait_sem(m->connect_sem, timeout)) == 0)
  2095. *rc = m->rc;
  2096. }
  2097. else if (packet_type == CONNACK)
  2098. *rc = Thread_wait_sem(m->connack_sem, timeout);
  2099. else if (packet_type == SUBACK)
  2100. *rc = Thread_wait_sem(m->suback_sem, timeout);
  2101. else if (packet_type == UNSUBACK)
  2102. *rc = Thread_wait_sem(m->unsuback_sem, timeout);
  2103. if (*rc == 0 && packet_type != CONNECT && m->pack == NULL)
  2104. Log(LOG_ERROR, -1, "waitfor unexpectedly is NULL for client %s, packet_type %d, timeout %ld", m->c->clientID, packet_type, timeout);
  2105. pack = m->pack;
  2106. }
  2107. else
  2108. {
  2109. *rc = TCPSOCKET_COMPLETE;
  2110. while (1)
  2111. {
  2112. int sock = -1;
  2113. pack = MQTTClient_cycle(&sock, 100L, rc);
  2114. if (sock == m->c->net.socket)
  2115. {
  2116. if (*rc == SOCKET_ERROR)
  2117. break;
  2118. if (pack && (pack->header.bits.type == packet_type))
  2119. break;
  2120. if (m->c->connect_state == TCP_IN_PROGRESS)
  2121. {
  2122. int error;
  2123. socklen_t len = sizeof(error);
  2124. if ((*rc = getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len)) == 0)
  2125. *rc = error;
  2126. break;
  2127. }
  2128. #if defined(OPENSSL)
  2129. else if (m->c->connect_state == SSL_IN_PROGRESS)
  2130. {
  2131. *rc = m->c->sslopts->struct_version >= 3 ?
  2132. SSLSocket_connect(m->c->net.ssl, sock, m->serverURI,
  2133. m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
  2134. SSLSocket_connect(m->c->net.ssl, sock, m->serverURI,
  2135. m->c->sslopts->verify, NULL, NULL);
  2136. if (*rc == SSL_FATAL)
  2137. break;
  2138. else if (*rc == 1) /* rc == 1 means SSL connect has finished and succeeded */
  2139. {
  2140. if ((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
  2141. m->c->session = SSL_get1_session(m->c->net.ssl);
  2142. break;
  2143. }
  2144. }
  2145. #endif
  2146. else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS )
  2147. {
  2148. *rc = 1;
  2149. break;
  2150. }
  2151. else if (m->c->connect_state == WAIT_FOR_CONNACK)
  2152. {
  2153. int error;
  2154. socklen_t len = sizeof(error);
  2155. if (getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len) == 0)
  2156. {
  2157. if (error)
  2158. {
  2159. *rc = error;
  2160. break;
  2161. }
  2162. }
  2163. }
  2164. }
  2165. if (MQTTClient_elapsed(start) > timeout)
  2166. {
  2167. pack = NULL;
  2168. break;
  2169. }
  2170. }
  2171. }
  2172. exit:
  2173. FUNC_EXIT_RC(*rc);
  2174. return pack;
  2175. }
  2176. int MQTTClient_receive(MQTTClient handle, char** topicName, int* topicLen, MQTTClient_message** message,
  2177. unsigned long timeout)
  2178. {
  2179. int rc = TCPSOCKET_COMPLETE;
  2180. START_TIME_TYPE start = MQTTClient_start_clock();
  2181. unsigned long elapsed = 0L;
  2182. MQTTClients* m = handle;
  2183. FUNC_ENTRY;
  2184. if (m == NULL || m->c == NULL
  2185. || running) /* receive is not meant to be called in a multi-thread environment */
  2186. {
  2187. rc = MQTTCLIENT_FAILURE;
  2188. goto exit;
  2189. }
  2190. if (m->c->connected == 0)
  2191. {
  2192. rc = MQTTCLIENT_DISCONNECTED;
  2193. goto exit;
  2194. }
  2195. *topicName = NULL;
  2196. *message = NULL;
  2197. /* if there is already a message waiting, don't hang around but still do some packet handling */
  2198. if (m->c->messageQueue->count > 0)
  2199. timeout = 0L;
  2200. elapsed = MQTTClient_elapsed(start);
  2201. do
  2202. {
  2203. int sock = 0;
  2204. MQTTClient_cycle(&sock, (timeout > elapsed) ? timeout - elapsed : 0L, &rc);
  2205. if (rc == SOCKET_ERROR)
  2206. {
  2207. if (ListFindItem(handles, &sock, clientSockCompare) && /* find client corresponding to socket */
  2208. (MQTTClient)(handles->current->content) == handle)
  2209. break; /* there was an error on the socket we are interested in */
  2210. }
  2211. elapsed = MQTTClient_elapsed(start);
  2212. }
  2213. while (elapsed < timeout && m->c->messageQueue->count == 0);
  2214. if (m->c->messageQueue->count > 0)
  2215. rc = MQTTClient_deliverMessage(rc, m, topicName, topicLen, message);
  2216. if (rc == SOCKET_ERROR)
  2217. MQTTClient_disconnect_internal(handle, 0);
  2218. exit:
  2219. FUNC_EXIT_RC(rc);
  2220. return rc;
  2221. }
  2222. void MQTTClient_yield(void)
  2223. {
  2224. START_TIME_TYPE start = MQTTClient_start_clock();
  2225. unsigned long elapsed = 0L;
  2226. unsigned long timeout = 100L;
  2227. int rc = 0;
  2228. FUNC_ENTRY;
  2229. if (running) /* yield is not meant to be called in a multi-thread environment */
  2230. {
  2231. MQTTClient_sleep(timeout);
  2232. goto exit;
  2233. }
  2234. elapsed = MQTTClient_elapsed(start);
  2235. do
  2236. {
  2237. int sock = -1;
  2238. MQTTClient_cycle(&sock, (timeout > elapsed) ? timeout - elapsed : 0L, &rc);
  2239. Thread_lock_mutex(mqttclient_mutex);
  2240. if (rc == SOCKET_ERROR && ListFindItem(handles, &sock, clientSockCompare))
  2241. {
  2242. MQTTClients* m = (MQTTClient)(handles->current->content);
  2243. if (m->c->connect_state != DISCONNECTING)
  2244. MQTTClient_disconnect_internal(m, 0);
  2245. }
  2246. Thread_unlock_mutex(mqttclient_mutex);
  2247. elapsed = MQTTClient_elapsed(start);
  2248. }
  2249. while (elapsed < timeout);
  2250. exit:
  2251. FUNC_EXIT;
  2252. }
  2253. /*
  2254. static int pubCompare(void* a, void* b)
  2255. {
  2256. Messages* msg = (Messages*)a;
  2257. return msg->publish == (Publications*)b;
  2258. }*/
  2259. int MQTTClient_waitForCompletion(MQTTClient handle, MQTTClient_deliveryToken mdt, unsigned long timeout)
  2260. {
  2261. int rc = MQTTCLIENT_FAILURE;
  2262. START_TIME_TYPE start = MQTTClient_start_clock();
  2263. unsigned long elapsed = 0L;
  2264. MQTTClients* m = handle;
  2265. FUNC_ENTRY;
  2266. Thread_lock_mutex(mqttclient_mutex);
  2267. if (m == NULL || m->c == NULL)
  2268. {
  2269. rc = MQTTCLIENT_FAILURE;
  2270. goto exit;
  2271. }
  2272. elapsed = MQTTClient_elapsed(start);
  2273. while (elapsed < timeout)
  2274. {
  2275. if (m->c->connected == 0)
  2276. {
  2277. rc = MQTTCLIENT_DISCONNECTED;
  2278. goto exit;
  2279. }
  2280. if (ListFindItem(m->c->outboundMsgs, &mdt, messageIDCompare) == NULL)
  2281. {
  2282. rc = MQTTCLIENT_SUCCESS; /* well we couldn't find it */
  2283. goto exit;
  2284. }
  2285. Thread_unlock_mutex(mqttclient_mutex);
  2286. MQTTClient_yield();
  2287. Thread_lock_mutex(mqttclient_mutex);
  2288. elapsed = MQTTClient_elapsed(start);
  2289. }
  2290. exit:
  2291. Thread_unlock_mutex(mqttclient_mutex);
  2292. FUNC_EXIT_RC(rc);
  2293. return rc;
  2294. }
  2295. int MQTTClient_getPendingDeliveryTokens(MQTTClient handle, MQTTClient_deliveryToken **tokens)
  2296. {
  2297. int rc = MQTTCLIENT_SUCCESS;
  2298. MQTTClients* m = handle;
  2299. *tokens = NULL;
  2300. FUNC_ENTRY;
  2301. Thread_lock_mutex(mqttclient_mutex);
  2302. if (m == NULL)
  2303. {
  2304. rc = MQTTCLIENT_FAILURE;
  2305. goto exit;
  2306. }
  2307. if (m->c && m->c->outboundMsgs->count > 0)
  2308. {
  2309. ListElement* current = NULL;
  2310. int count = 0;
  2311. *tokens = malloc(sizeof(MQTTClient_deliveryToken) * (m->c->outboundMsgs->count + 1));
  2312. /*Heap_unlink(__FILE__, __LINE__, *tokens);*/
  2313. while (ListNextElement(m->c->outboundMsgs, &current))
  2314. {
  2315. Messages* m = (Messages*)(current->content);
  2316. (*tokens)[count++] = m->msgid;
  2317. }
  2318. (*tokens)[count] = -1;
  2319. }
  2320. exit:
  2321. Thread_unlock_mutex(mqttclient_mutex);
  2322. FUNC_EXIT_RC(rc);
  2323. return rc;
  2324. }
  2325. void MQTTClient_setTraceLevel(enum MQTTCLIENT_TRACE_LEVELS level)
  2326. {
  2327. Log_setTraceLevel((enum LOG_LEVELS)level);
  2328. }
  2329. void MQTTClient_setTraceCallback(MQTTClient_traceCallback* callback)
  2330. {
  2331. Log_setTraceCallback((Log_traceCallback*)callback);
  2332. }
  2333. MQTTClient_nameValue* MQTTClient_getVersionInfo(void)
  2334. {
  2335. #define MAX_INFO_STRINGS 8
  2336. static MQTTClient_nameValue libinfo[MAX_INFO_STRINGS + 1];
  2337. int i = 0;
  2338. libinfo[i].name = "Product name";
  2339. libinfo[i++].value = "Eclipse Paho Synchronous MQTT C Client Library";
  2340. libinfo[i].name = "Version";
  2341. libinfo[i++].value = CLIENT_VERSION;
  2342. libinfo[i].name = "Build level";
  2343. libinfo[i++].value = BUILD_TIMESTAMP;
  2344. #if defined(OPENSSL)
  2345. libinfo[i].name = "OpenSSL version";
  2346. libinfo[i++].value = SSLeay_version(SSLEAY_VERSION);
  2347. libinfo[i].name = "OpenSSL flags";
  2348. libinfo[i++].value = SSLeay_version(SSLEAY_CFLAGS);
  2349. libinfo[i].name = "OpenSSL build timestamp";
  2350. libinfo[i++].value = SSLeay_version(SSLEAY_BUILT_ON);
  2351. libinfo[i].name = "OpenSSL platform";
  2352. libinfo[i++].value = SSLeay_version(SSLEAY_PLATFORM);
  2353. libinfo[i].name = "OpenSSL directory";
  2354. libinfo[i++].value = SSLeay_version(SSLEAY_DIR);
  2355. #endif
  2356. libinfo[i].name = NULL;
  2357. libinfo[i].value = NULL;
  2358. return libinfo;
  2359. }
  2360. const char* MQTTClient_strerror(int code)
  2361. {
  2362. static char buf[30];
  2363. switch (code) {
  2364. case MQTTCLIENT_SUCCESS:
  2365. return "Success";
  2366. case MQTTCLIENT_FAILURE:
  2367. return "Failure";
  2368. case MQTTCLIENT_DISCONNECTED:
  2369. return "Disconnected";
  2370. case MQTTCLIENT_MAX_MESSAGES_INFLIGHT:
  2371. return "Maximum in-flight messages amount reached";
  2372. case MQTTCLIENT_BAD_UTF8_STRING:
  2373. return "Invalid UTF8 string";
  2374. case MQTTCLIENT_NULL_PARAMETER:
  2375. return "Invalid (NULL) parameter";
  2376. case MQTTCLIENT_TOPICNAME_TRUNCATED:
  2377. return "Topic containing NULL characters has been truncated";
  2378. case MQTTCLIENT_BAD_STRUCTURE:
  2379. return "Bad structure";
  2380. case MQTTCLIENT_BAD_QOS:
  2381. return "Invalid QoS value";
  2382. case MQTTCLIENT_SSL_NOT_SUPPORTED:
  2383. return "SSL is not supported";
  2384. case MQTTCLIENT_BAD_PROTOCOL:
  2385. return "Invalid protocol scheme";
  2386. case MQTTCLIENT_BAD_MQTT_OPTION:
  2387. return "Options for wrong MQTT version";
  2388. case MQTTCLIENT_WRONG_MQTT_VERSION:
  2389. return "Client created for another version of MQTT";
  2390. }
  2391. sprintf(buf, "Unknown error code %d", code);
  2392. return buf;
  2393. }
  2394. /**
  2395. * See if any pending writes have been completed, and cleanup if so.
  2396. * Cleaning up means removing any publication data that was stored because the write did
  2397. * not originally complete.
  2398. */
  2399. static void MQTTProtocol_checkPendingWrites(void)
  2400. {
  2401. FUNC_ENTRY;
  2402. if (state.pending_writes.count > 0)
  2403. {
  2404. ListElement* le = state.pending_writes.first;
  2405. while (le)
  2406. {
  2407. if (Socket_noPendingWrites(((pending_write*)(le->content))->socket))
  2408. {
  2409. MQTTProtocol_removePublication(((pending_write*)(le->content))->p);
  2410. state.pending_writes.current = le;
  2411. ListRemove(&(state.pending_writes), le->content); /* does NextElement itself */
  2412. le = state.pending_writes.current;
  2413. }
  2414. else
  2415. ListNextElement(&(state.pending_writes), &le);
  2416. }
  2417. }
  2418. FUNC_EXIT;
  2419. }
  2420. static void MQTTClient_writeComplete(int socket, int rc)
  2421. {
  2422. ListElement* found = NULL;
  2423. FUNC_ENTRY;
  2424. /* a partial write is now complete for a socket - this will be on a publish*/
  2425. MQTTProtocol_checkPendingWrites();
  2426. /* find the client using this socket */
  2427. if ((found = ListFindItem(handles, &socket, clientSockCompare)) != NULL)
  2428. {
  2429. MQTTClients* m = (MQTTClients*)(found->content);
  2430. time(&(m->c->net.lastSent));
  2431. }
  2432. FUNC_EXIT;
  2433. }