MQTTClient.c 82 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120
  1. /*******************************************************************************
  2. * Copyright (c) 2009, 2022 IBM Corp., Ian Craggs and others
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial API and implementation and/or initial documentation
  15. * Ian Craggs - bug 384016 - segv setting will message
  16. * Ian Craggs - bug 384053 - v1.0.0.7 - stop MQTTClient_receive on socket error
  17. * Ian Craggs, Allan Stockdill-Mander - add ability to connect with SSL
  18. * Ian Craggs - multiple server connection support
  19. * Ian Craggs - fix for bug 413429 - connectionLost not called
  20. * Ian Craggs - fix for bug 421103 - trying to write to same socket, in publish/retries
  21. * Ian Craggs - fix for bug 419233 - mutexes not reporting errors
  22. * Ian Craggs - fix for bug 420851
  23. * Ian Craggs - fix for bug 432903 - queue persistence
  24. * Ian Craggs - MQTT 3.1.1 support
  25. * Ian Craggs - fix for bug 438176 - MQTT version selection
  26. * Rong Xiang, Ian Craggs - C++ compatibility
  27. * Ian Craggs - fix for bug 443724 - stack corruption
  28. * Ian Craggs - fix for bug 447672 - simultaneous access to socket structure
  29. * Ian Craggs - fix for bug 459791 - deadlock in WaitForCompletion for bad client
  30. * Ian Craggs - fix for bug 474905 - insufficient synchronization for subscribe, unsubscribe, connect
  31. * Ian Craggs - make it clear that yield and receive are not intended for multi-threaded mode (bug 474748)
  32. * Ian Craggs - SNI support, message queue unpersist bug
  33. * Ian Craggs - binary will message support
  34. * Ian Craggs - waitforCompletion fix #240
  35. * Ian Craggs - check for NULL SSL options #334
  36. * Ian Craggs - allocate username/password buffers #431
  37. * Ian Craggs - MQTT 5.0 support
  38. * Sven Gambel - add generic proxy support
  39. *******************************************************************************/
  40. /**
  41. * @file
  42. * \brief Synchronous API implementation
  43. *
  44. */
  45. #include <stdlib.h>
  46. #include <string.h>
  47. #if !defined(_WIN32) && !defined(_WIN64)
  48. #include <sys/time.h>
  49. #else
  50. #if defined(_MSC_VER) && _MSC_VER < 1900
  51. #define snprintf _snprintf
  52. #endif
  53. #endif
  54. #include "MQTTClient.h"
  55. #if !defined(NO_PERSISTENCE)
  56. #include "MQTTPersistence.h"
  57. #endif
  58. #include "utf-8.h"
  59. #include "MQTTProtocol.h"
  60. #include "MQTTProtocolOut.h"
  61. #include "Thread.h"
  62. #include "SocketBuffer.h"
  63. #include "StackTrace.h"
  64. #include "Heap.h"
  65. #if defined(OPENSSL)
  66. #include <openssl/ssl.h>
  67. #else
  68. #define URI_SSL "ssl://"
  69. #define URI_MQTTS "mqtts://"
  70. #endif
  71. #include "OsWrapper.h"
  72. #define URI_TCP "tcp://"
  73. #define URI_MQTT "mqtt://"
  74. #define URI_WS "ws://"
  75. #define URI_WSS "wss://"
  76. #include "VersionInfo.h"
  77. #include "WebSocket.h"
  78. #include "Proxy.h"
  79. const char *client_timestamp_eye = "MQTTClientV3_Timestamp " BUILD_TIMESTAMP;
  80. const char *client_version_eye = "MQTTClientV3_Version " CLIENT_VERSION;
  81. struct conlost_sync_data {
  82. sem_type sem;
  83. void *m;
  84. };
  85. int MQTTClient_init(void);
  86. void MQTTClient_global_init(MQTTClient_init_options* inits)
  87. {
  88. MQTTClient_init();
  89. #if defined(OPENSSL)
  90. SSLSocket_handleOpensslInit(inits->do_openssl_init);
  91. #endif
  92. }
  93. static ClientStates ClientState =
  94. {
  95. CLIENT_VERSION, /* version */
  96. NULL /* client list */
  97. };
  98. ClientStates* bstate = &ClientState;
  99. MQTTProtocol state;
  100. #if defined(_WIN32) || defined(_WIN64)
  101. static mutex_type mqttclient_mutex = NULL;
  102. mutex_type socket_mutex = NULL;
  103. static mutex_type subscribe_mutex = NULL;
  104. static mutex_type unsubscribe_mutex = NULL;
  105. static mutex_type connect_mutex = NULL;
  106. #if !defined(NO_HEAP_TRACKING)
  107. extern mutex_type stack_mutex;
  108. extern mutex_type heap_mutex;
  109. #endif
  110. extern mutex_type log_mutex;
  111. int MQTTClient_init(void)
  112. {
  113. DWORD rc = 0;
  114. if (mqttclient_mutex == NULL)
  115. {
  116. if ((mqttclient_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
  117. {
  118. rc = GetLastError();
  119. printf("mqttclient_mutex error %d\n", rc);
  120. goto exit;
  121. }
  122. if ((subscribe_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
  123. {
  124. rc = GetLastError();
  125. printf("subscribe_mutex error %d\n", rc);
  126. goto exit;
  127. }
  128. if ((unsubscribe_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
  129. {
  130. rc = GetLastError();
  131. printf("unsubscribe_mutex error %d\n", rc);
  132. goto exit;
  133. }
  134. if ((connect_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
  135. {
  136. rc = GetLastError();
  137. printf("connect_mutex error %d\n", rc);
  138. goto exit;
  139. }
  140. #if !defined(NO_HEAP_TRACKING)
  141. if ((stack_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
  142. {
  143. rc = GetLastError();
  144. printf("stack_mutex error %d\n", rc);
  145. goto exit;
  146. }
  147. if ((heap_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
  148. {
  149. rc = GetLastError();
  150. printf("heap_mutex error %d\n", rc);
  151. goto exit;
  152. }
  153. #endif
  154. if ((log_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
  155. {
  156. rc = GetLastError();
  157. printf("log_mutex error %d\n", rc);
  158. goto exit;
  159. }
  160. if ((socket_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
  161. {
  162. rc = GetLastError();
  163. printf("socket_mutex error %d\n", rc);
  164. goto exit;
  165. }
  166. }
  167. exit:
  168. return rc;
  169. }
  170. void MQTTClient_cleanup(void)
  171. {
  172. if (connect_mutex)
  173. CloseHandle(connect_mutex);
  174. if (subscribe_mutex)
  175. CloseHandle(subscribe_mutex);
  176. if (unsubscribe_mutex)
  177. CloseHandle(unsubscribe_mutex);
  178. #if !defined(NO_HEAP_TRACKING)
  179. if (stack_mutex)
  180. CloseHandle(stack_mutex);
  181. if (heap_mutex)
  182. CloseHandle(heap_mutex);
  183. #endif
  184. if (log_mutex)
  185. CloseHandle(log_mutex);
  186. if (socket_mutex)
  187. CloseHandle(socket_mutex);
  188. if (mqttclient_mutex)
  189. CloseHandle(mqttclient_mutex);
  190. }
  191. #if defined(PAHO_MQTT_STATIC)
  192. /* Global variable for one-time initialization structure */
  193. static INIT_ONCE g_InitOnce = INIT_ONCE_STATIC_INIT; /* Static initialization */
  194. /* One time initialization function */
  195. BOOL CALLBACK InitOnceFunction (
  196. PINIT_ONCE InitOnce, /* Pointer to one-time initialization structure */
  197. PVOID Parameter, /* Optional parameter passed by InitOnceExecuteOnce */
  198. PVOID *lpContext) /* Receives pointer to event object */
  199. {
  200. int rc = MQTTClient_init();
  201. return rc == 0;
  202. }
  203. #else
  204. BOOL APIENTRY DllMain(HANDLE hModule,
  205. DWORD ul_reason_for_call,
  206. LPVOID lpReserved)
  207. {
  208. switch (ul_reason_for_call)
  209. {
  210. case DLL_PROCESS_ATTACH:
  211. MQTTClient_init();
  212. break;
  213. case DLL_THREAD_ATTACH:
  214. break;
  215. case DLL_THREAD_DETACH:
  216. break;
  217. case DLL_PROCESS_DETACH:
  218. if (lpReserved)
  219. MQTTClient_cleanup();
  220. break;
  221. }
  222. return TRUE;
  223. }
  224. #endif
  225. #else
  226. static pthread_mutex_t mqttclient_mutex_store = PTHREAD_MUTEX_INITIALIZER;
  227. static mutex_type mqttclient_mutex = &mqttclient_mutex_store;
  228. static pthread_mutex_t socket_mutex_store = PTHREAD_MUTEX_INITIALIZER;
  229. mutex_type socket_mutex = &socket_mutex_store;
  230. static pthread_mutex_t subscribe_mutex_store = PTHREAD_MUTEX_INITIALIZER;
  231. static mutex_type subscribe_mutex = &subscribe_mutex_store;
  232. static pthread_mutex_t unsubscribe_mutex_store = PTHREAD_MUTEX_INITIALIZER;
  233. static mutex_type unsubscribe_mutex = &unsubscribe_mutex_store;
  234. static pthread_mutex_t connect_mutex_store = PTHREAD_MUTEX_INITIALIZER;
  235. static mutex_type connect_mutex = &connect_mutex_store;
  236. int MQTTClient_init(void)
  237. {
  238. pthread_mutexattr_t attr;
  239. int rc;
  240. pthread_mutexattr_init(&attr);
  241. #if !defined(_WRS_KERNEL)
  242. pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
  243. #else
  244. /* #warning "no pthread_mutexattr_settype" */
  245. #endif /* !defined(_WRS_KERNEL) */
  246. if ((rc = pthread_mutex_init(mqttclient_mutex, &attr)) != 0)
  247. printf("MQTTClient: error %d initializing client_mutex\n", rc);
  248. else if ((rc = pthread_mutex_init(socket_mutex, &attr)) != 0)
  249. printf("MQTTClient: error %d initializing socket_mutex\n", rc);
  250. else if ((rc = pthread_mutex_init(subscribe_mutex, &attr)) != 0)
  251. printf("MQTTClient: error %d initializing subscribe_mutex\n", rc);
  252. else if ((rc = pthread_mutex_init(unsubscribe_mutex, &attr)) != 0)
  253. printf("MQTTClient: error %d initializing unsubscribe_mutex\n", rc);
  254. else if ((rc = pthread_mutex_init(connect_mutex, &attr)) != 0)
  255. printf("MQTTClient: error %d initializing connect_mutex\n", rc);
  256. return rc;
  257. }
  258. #define WINAPI
  259. #endif
  260. static volatile int library_initialized = 0;
  261. static List* handles = NULL;
  262. static int running = 0;
  263. static int tostop = 0;
  264. static thread_id_type run_id = 0;
  265. typedef struct
  266. {
  267. MQTTClient_message* msg;
  268. char* topicName;
  269. int topicLen;
  270. unsigned int seqno; /* only used on restore */
  271. } qEntry;
  272. typedef struct
  273. {
  274. char* serverURI;
  275. const char* currentServerURI; /* when using HA options, set the currently used serverURI */
  276. #if defined(OPENSSL)
  277. int ssl;
  278. #endif
  279. int websocket;
  280. Clients* c;
  281. MQTTClient_connectionLost* cl;
  282. MQTTClient_messageArrived* ma;
  283. MQTTClient_deliveryComplete* dc;
  284. void* context;
  285. MQTTClient_disconnected* disconnected;
  286. void* disconnected_context; /* the context to be associated with the disconnected callback*/
  287. MQTTClient_published* published;
  288. void* published_context; /* the context to be associated with the disconnected callback*/
  289. #if 0
  290. MQTTClient_authHandle* auth_handle;
  291. void* auth_handle_context; /* the context to be associated with the authHandle callback*/
  292. #endif
  293. sem_type connect_sem;
  294. int rc; /* getsockopt return code in connect */
  295. sem_type connack_sem;
  296. sem_type suback_sem;
  297. sem_type unsuback_sem;
  298. MQTTPacket* pack;
  299. unsigned long commandTimeout;
  300. } MQTTClients;
  301. struct props_rc_parms
  302. {
  303. MQTTClients* m;
  304. MQTTProperties* properties;
  305. enum MQTTReasonCodes reasonCode;
  306. };
  307. static void MQTTClient_terminate(void);
  308. static void MQTTClient_emptyMessageQueue(Clients* client);
  309. static int MQTTClient_deliverMessage(
  310. int rc, MQTTClients* m,
  311. char** topicName, int* topicLen,
  312. MQTTClient_message** message);
  313. static int clientSockCompare(void* a, void* b);
  314. static thread_return_type WINAPI connectionLost_call(void* context);
  315. static thread_return_type WINAPI MQTTClient_run(void* n);
  316. static int MQTTClient_stop(void);
  317. static void MQTTClient_closeSession(Clients* client, enum MQTTReasonCodes reason, MQTTProperties* props);
  318. static int MQTTClient_cleanSession(Clients* client);
  319. static MQTTResponse MQTTClient_connectURIVersion(
  320. MQTTClient handle, MQTTClient_connectOptions* options,
  321. const char* serverURI, int MQTTVersion,
  322. START_TIME_TYPE start, ELAPSED_TIME_TYPE millisecsTimeout,
  323. MQTTProperties* connectProperties, MQTTProperties* willProperties);
  324. static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI,
  325. MQTTProperties* connectProperties, MQTTProperties* willProperties);
  326. static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int internal, int stop, enum MQTTReasonCodes, MQTTProperties*);
  327. static int MQTTClient_disconnect_internal(MQTTClient handle, int timeout);
  328. static void MQTTClient_retry(void);
  329. static MQTTPacket* MQTTClient_cycle(SOCKET* sock, ELAPSED_TIME_TYPE timeout, int* rc);
  330. static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, int64_t timeout);
  331. /*static int pubCompare(void* a, void* b); */
  332. static void MQTTProtocol_checkPendingWrites(void);
  333. static void MQTTClient_writeComplete(SOCKET socket, int rc);
  334. static void MQTTClient_writeContinue(SOCKET socket);
  335. int MQTTClient_createWithOptions(MQTTClient* handle, const char* serverURI, const char* clientId,
  336. int persistence_type, void* persistence_context, MQTTClient_createOptions* options)
  337. {
  338. int rc = 0;
  339. MQTTClients *m = NULL;
  340. #if (defined(_WIN32) || defined(_WIN64)) && defined(PAHO_MQTT_STATIC)
  341. /* intializes mutexes once. Must come before FUNC_ENTRY */
  342. BOOL bStatus = InitOnceExecuteOnce(&g_InitOnce, InitOnceFunction, NULL, NULL);
  343. #endif
  344. FUNC_ENTRY;
  345. if ((rc = Thread_lock_mutex(mqttclient_mutex)) != 0)
  346. goto nounlock_exit;
  347. if (serverURI == NULL || clientId == NULL)
  348. {
  349. rc = MQTTCLIENT_NULL_PARAMETER;
  350. goto exit;
  351. }
  352. if (!UTF8_validateString(clientId))
  353. {
  354. rc = MQTTCLIENT_BAD_UTF8_STRING;
  355. goto exit;
  356. }
  357. if (strlen(clientId) == 0 && persistence_type == MQTTCLIENT_PERSISTENCE_DEFAULT)
  358. {
  359. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  360. goto exit;
  361. }
  362. if (strstr(serverURI, "://") != NULL)
  363. {
  364. if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) != 0
  365. && strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) != 0
  366. && strncmp(URI_WS, serverURI, strlen(URI_WS)) != 0
  367. #if defined(OPENSSL)
  368. && strncmp(URI_SSL, serverURI, strlen(URI_SSL)) != 0
  369. && strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) != 0
  370. && strncmp(URI_WSS, serverURI, strlen(URI_WSS)) != 0
  371. #endif
  372. )
  373. {
  374. rc = MQTTCLIENT_BAD_PROTOCOL;
  375. goto exit;
  376. }
  377. }
  378. if (options && (strncmp(options->struct_id, "MQCO", 4) != 0 || options->struct_version != 0))
  379. {
  380. rc = MQTTCLIENT_BAD_STRUCTURE;
  381. goto exit;
  382. }
  383. if (!library_initialized)
  384. {
  385. #if !defined(NO_HEAP_TRACKING)
  386. Heap_initialize();
  387. #endif
  388. Log_initialize((Log_nameValue*)MQTTClient_getVersionInfo());
  389. bstate->clients = ListInitialize();
  390. Socket_outInitialize();
  391. Socket_setWriteCompleteCallback(MQTTClient_writeComplete);
  392. Socket_setWriteContinueCallback(MQTTClient_writeContinue);
  393. Socket_setWriteAvailableCallback(MQTTProtocol_writeAvailable);
  394. handles = ListInitialize();
  395. #if defined(OPENSSL)
  396. SSLSocket_initialize();
  397. #endif
  398. library_initialized = 1;
  399. }
  400. if ((m = malloc(sizeof(MQTTClients))) == NULL)
  401. {
  402. rc = PAHO_MEMORY_ERROR;
  403. goto exit;
  404. }
  405. *handle = m;
  406. memset(m, '\0', sizeof(MQTTClients));
  407. m->commandTimeout = 10000L;
  408. if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
  409. serverURI += strlen(URI_TCP);
  410. else if (strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) == 0)
  411. serverURI += strlen(URI_MQTT);
  412. else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
  413. {
  414. serverURI += strlen(URI_WS);
  415. m->websocket = 1;
  416. }
  417. else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
  418. {
  419. #if defined(OPENSSL)
  420. serverURI += strlen(URI_SSL);
  421. m->ssl = 1;
  422. #else
  423. rc = MQTTCLIENT_SSL_NOT_SUPPORTED;
  424. goto exit;
  425. #endif
  426. }
  427. else if (strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) == 0)
  428. {
  429. #if defined(OPENSSL)
  430. serverURI += strlen(URI_MQTTS);
  431. m->ssl = 1;
  432. #else
  433. rc = MQTTCLIENT_SSL_NOT_SUPPORTED;
  434. goto exit;
  435. #endif
  436. }
  437. else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
  438. {
  439. #if defined(OPENSSL)
  440. serverURI += strlen(URI_WSS);
  441. m->ssl = 1;
  442. m->websocket = 1;
  443. #else
  444. rc = MQTTCLIENT_SSL_NOT_SUPPORTED;
  445. goto exit;
  446. #endif
  447. }
  448. m->serverURI = MQTTStrdup(serverURI);
  449. ListAppend(handles, m, sizeof(MQTTClients));
  450. if ((m->c = malloc(sizeof(Clients))) == NULL)
  451. {
  452. ListRemove(handles, m);
  453. rc = PAHO_MEMORY_ERROR;
  454. goto exit;
  455. }
  456. memset(m->c, '\0', sizeof(Clients));
  457. m->c->context = m;
  458. m->c->MQTTVersion = (options) ? options->MQTTVersion : MQTTVERSION_DEFAULT;
  459. m->c->outboundMsgs = ListInitialize();
  460. m->c->inboundMsgs = ListInitialize();
  461. m->c->messageQueue = ListInitialize();
  462. m->c->outboundQueue = ListInitialize();
  463. m->c->clientID = MQTTStrdup(clientId);
  464. m->connect_sem = Thread_create_sem(&rc);
  465. m->connack_sem = Thread_create_sem(&rc);
  466. m->suback_sem = Thread_create_sem(&rc);
  467. m->unsuback_sem = Thread_create_sem(&rc);
  468. #if !defined(NO_PERSISTENCE)
  469. rc = MQTTPersistence_create(&(m->c->persistence), persistence_type, persistence_context);
  470. if (rc == 0)
  471. {
  472. rc = MQTTPersistence_initialize(m->c, m->serverURI);
  473. if (rc == 0)
  474. MQTTPersistence_restoreMessageQueue(m->c);
  475. }
  476. #endif
  477. ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List));
  478. exit:
  479. Thread_unlock_mutex(mqttclient_mutex);
  480. nounlock_exit:
  481. FUNC_EXIT_RC(rc);
  482. return rc;
  483. }
  484. int MQTTClient_create(MQTTClient* handle, const char* serverURI, const char* clientId,
  485. int persistence_type, void* persistence_context)
  486. {
  487. return MQTTClient_createWithOptions(handle, serverURI, clientId, persistence_type,
  488. persistence_context, NULL);
  489. }
  490. static void MQTTClient_terminate(void)
  491. {
  492. FUNC_ENTRY;
  493. MQTTClient_stop();
  494. if (library_initialized)
  495. {
  496. ListFree(bstate->clients);
  497. ListFree(handles);
  498. handles = NULL;
  499. WebSocket_terminate();
  500. #if !defined(NO_HEAP_TRACKING)
  501. Heap_terminate();
  502. #endif
  503. Log_terminate();
  504. library_initialized = 0;
  505. }
  506. FUNC_EXIT;
  507. }
  508. static void MQTTClient_emptyMessageQueue(Clients* client)
  509. {
  510. FUNC_ENTRY;
  511. /* empty message queue */
  512. if (client->messageQueue->count > 0)
  513. {
  514. ListElement* current = NULL;
  515. while (ListNextElement(client->messageQueue, &current))
  516. {
  517. qEntry* qe = (qEntry*)(current->content);
  518. free(qe->topicName);
  519. MQTTProperties_free(&qe->msg->properties);
  520. free(qe->msg->payload);
  521. free(qe->msg);
  522. }
  523. ListEmpty(client->messageQueue);
  524. }
  525. FUNC_EXIT;
  526. }
  527. void MQTTClient_destroy(MQTTClient* handle)
  528. {
  529. MQTTClients* m = *handle;
  530. FUNC_ENTRY;
  531. Thread_lock_mutex(connect_mutex);
  532. Thread_lock_mutex(mqttclient_mutex);
  533. if (m == NULL)
  534. goto exit;
  535. if (m->c)
  536. {
  537. SOCKET saved_socket = m->c->net.socket;
  538. char* saved_clientid = MQTTStrdup(m->c->clientID);
  539. #if !defined(NO_PERSISTENCE)
  540. MQTTPersistence_close(m->c);
  541. #endif
  542. MQTTClient_emptyMessageQueue(m->c);
  543. MQTTProtocol_freeClient(m->c);
  544. if (!ListRemove(bstate->clients, m->c))
  545. Log(LOG_ERROR, 0, NULL);
  546. else
  547. Log(TRACE_MIN, 1, NULL, saved_clientid, saved_socket);
  548. free(saved_clientid);
  549. }
  550. if (m->serverURI)
  551. free(m->serverURI);
  552. Thread_destroy_sem(m->connect_sem);
  553. Thread_destroy_sem(m->connack_sem);
  554. Thread_destroy_sem(m->suback_sem);
  555. Thread_destroy_sem(m->unsuback_sem);
  556. if (!ListRemove(handles, m))
  557. Log(LOG_ERROR, -1, "free error");
  558. *handle = NULL;
  559. if (bstate->clients->count == 0)
  560. MQTTClient_terminate();
  561. exit:
  562. Thread_unlock_mutex(mqttclient_mutex);
  563. Thread_unlock_mutex(connect_mutex);
  564. FUNC_EXIT;
  565. }
  566. void MQTTClient_freeMessage(MQTTClient_message** message)
  567. {
  568. FUNC_ENTRY;
  569. MQTTProperties_free(&(*message)->properties);
  570. free((*message)->payload);
  571. free(*message);
  572. *message = NULL;
  573. FUNC_EXIT;
  574. }
  575. void MQTTClient_free(void* memory)
  576. {
  577. FUNC_ENTRY;
  578. free(memory);
  579. FUNC_EXIT;
  580. }
  581. void MQTTResponse_free(MQTTResponse response)
  582. {
  583. FUNC_ENTRY;
  584. if (response.reasonCodeCount > 0 && response.reasonCodes)
  585. free(response.reasonCodes);
  586. if (response.properties)
  587. {
  588. MQTTProperties_free(response.properties);
  589. free(response.properties);
  590. }
  591. FUNC_EXIT;
  592. }
  593. static int MQTTClient_deliverMessage(int rc, MQTTClients* m, char** topicName, int* topicLen, MQTTClient_message** message)
  594. {
  595. qEntry* qe = (qEntry*)(m->c->messageQueue->first->content);
  596. FUNC_ENTRY;
  597. *message = qe->msg;
  598. *topicName = qe->topicName;
  599. *topicLen = qe->topicLen;
  600. if (strlen(*topicName) != *topicLen)
  601. rc = MQTTCLIENT_TOPICNAME_TRUNCATED;
  602. #if !defined(NO_PERSISTENCE)
  603. if (m->c->persistence)
  604. MQTTPersistence_unpersistQueueEntry(m->c, (MQTTPersistence_qEntry*)qe);
  605. #endif
  606. ListRemove(m->c->messageQueue, m->c->messageQueue->first->content);
  607. FUNC_EXIT_RC(rc);
  608. return rc;
  609. }
  610. /**
  611. * List callback function for comparing clients by socket
  612. * @param a first integer value
  613. * @param b second integer value
  614. * @return boolean indicating whether a and b are equal
  615. */
  616. static int clientSockCompare(void* a, void* b)
  617. {
  618. MQTTClients* m = (MQTTClients*)a;
  619. return m->c->net.socket == *(int*)b;
  620. }
  621. /**
  622. * Wrapper function to call connection lost on a separate thread. A separate thread is needed to allow the
  623. * connectionLost function to make API calls (e.g. connect)
  624. * @param context a pointer to the relevant client
  625. * @return thread_return_type standard thread return value - not used here
  626. */
  627. static thread_return_type WINAPI connectionLost_call(void* context)
  628. {
  629. struct conlost_sync_data *data = (struct conlost_sync_data *)context;
  630. MQTTClients* m = (MQTTClients *)data->m;
  631. (*(m->cl))(m->context, NULL);
  632. Thread_post_sem(data->sem);
  633. return 0;
  634. }
  635. int MQTTClient_setDisconnected(MQTTClient handle, void* context, MQTTClient_disconnected* disconnected)
  636. {
  637. int rc = MQTTCLIENT_SUCCESS;
  638. MQTTClients* m = handle;
  639. FUNC_ENTRY;
  640. Thread_lock_mutex(mqttclient_mutex);
  641. if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
  642. rc = MQTTCLIENT_FAILURE;
  643. else
  644. {
  645. m->disconnected_context = context;
  646. m->disconnected = disconnected;
  647. }
  648. Thread_unlock_mutex(mqttclient_mutex);
  649. FUNC_EXIT_RC(rc);
  650. return rc;
  651. }
  652. /**
  653. * Wrapper function to call disconnected on a separate thread. A separate thread is needed to allow the
  654. * disconnected function to make API calls (e.g. connect)
  655. * @param context a pointer to the relevant client
  656. * @return thread_return_type standard thread return value - not used here
  657. */
  658. static thread_return_type WINAPI call_disconnected(void* context)
  659. {
  660. struct props_rc_parms* pr = (struct props_rc_parms*)context;
  661. (*(pr->m->disconnected))(pr->m->disconnected_context, pr->properties, pr->reasonCode);
  662. MQTTProperties_free(pr->properties);
  663. free(pr->properties);
  664. free(pr);
  665. return 0;
  666. }
  667. int MQTTClient_setPublished(MQTTClient handle, void* context, MQTTClient_published* published)
  668. {
  669. int rc = MQTTCLIENT_SUCCESS;
  670. MQTTClients* m = handle;
  671. FUNC_ENTRY;
  672. Thread_lock_mutex(mqttclient_mutex);
  673. if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
  674. rc = MQTTCLIENT_FAILURE;
  675. else
  676. {
  677. m->published_context = context;
  678. m->published = published;
  679. }
  680. Thread_unlock_mutex(mqttclient_mutex);
  681. FUNC_EXIT_RC(rc);
  682. return rc;
  683. }
  684. #if 0
  685. int MQTTClient_setHandleAuth(MQTTClient handle, void* context, MQTTClient_handleAuth* auth_handle)
  686. {
  687. int rc = MQTTCLIENT_SUCCESS;
  688. MQTTClients* m = handle;
  689. FUNC_ENTRY;
  690. Thread_lock_mutex(mqttclient_mutex);
  691. if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
  692. rc = MQTTCLIENT_FAILURE;
  693. else
  694. {
  695. m->auth_handle_context = context;
  696. m->auth_handle = auth_handle;
  697. }
  698. Thread_unlock_mutex(mqttclient_mutex);
  699. FUNC_EXIT_RC(rc);
  700. return rc;
  701. }
  702. /**
  703. * Wrapper function to call authHandle on a separate thread. A separate thread is needed to allow the
  704. * disconnected function to make API calls (e.g. MQTTClient_auth)
  705. * @param context a pointer to the relevant client
  706. * @return thread_return_type standard thread return value - not used here
  707. */
  708. static thread_return_type WINAPI call_auth_handle(void* context)
  709. {
  710. struct props_rc_parms* pr = (struct props_rc_parms*)context;
  711. (*(pr->m->auth_handle))(pr->m->auth_handle_context, pr->properties, pr->reasonCode);
  712. MQTTProperties_free(pr->properties);
  713. free(pr->properties);
  714. free(pr);
  715. return 0;
  716. }
  717. #endif
  718. /* This is the thread function that handles the calling of callback functions if set */
  719. static thread_return_type WINAPI MQTTClient_run(void* n)
  720. {
  721. long timeout = 10L; /* first time in we have a small timeout. Gets things started more quickly */
  722. FUNC_ENTRY;
  723. Thread_set_name("MQTTClient_run");
  724. Thread_lock_mutex(mqttclient_mutex);
  725. run_id = Thread_getid();
  726. running = 1;
  727. while (!tostop)
  728. {
  729. int rc = SOCKET_ERROR;
  730. SOCKET sock = -1;
  731. MQTTClients* m = NULL;
  732. MQTTPacket* pack = NULL;
  733. Thread_unlock_mutex(mqttclient_mutex);
  734. pack = MQTTClient_cycle(&sock, timeout, &rc);
  735. Thread_lock_mutex(mqttclient_mutex);
  736. if (tostop)
  737. break;
  738. timeout = 100L;
  739. /* find client corresponding to socket */
  740. if (ListFindItem(handles, &sock, clientSockCompare) == NULL)
  741. {
  742. /* assert: should not happen */
  743. continue;
  744. }
  745. m = (MQTTClient)(handles->current->content);
  746. if (m == NULL)
  747. {
  748. /* assert: should not happen */
  749. continue;
  750. }
  751. if (rc == SOCKET_ERROR)
  752. {
  753. if (m->c->connected)
  754. MQTTClient_disconnect_internal(m, 0);
  755. else
  756. {
  757. if (m->c->connect_state == SSL_IN_PROGRESS)
  758. {
  759. Log(TRACE_MIN, -1, "Posting connect semaphore for client %s", m->c->clientID);
  760. m->c->connect_state = NOT_IN_PROGRESS;
  761. Thread_post_sem(m->connect_sem);
  762. }
  763. if (m->c->connect_state == WAIT_FOR_CONNACK)
  764. {
  765. Log(TRACE_MIN, -1, "Posting connack semaphore for client %s", m->c->clientID);
  766. m->c->connect_state = NOT_IN_PROGRESS;
  767. Thread_post_sem(m->connack_sem);
  768. }
  769. }
  770. }
  771. else
  772. {
  773. if (m->c->messageQueue->count > 0 && m->ma)
  774. {
  775. qEntry* qe = (qEntry*)(m->c->messageQueue->first->content);
  776. int topicLen = qe->topicLen;
  777. if (strlen(qe->topicName) == topicLen)
  778. topicLen = 0;
  779. Log(TRACE_MIN, -1, "Calling messageArrived for client %s, queue depth %d",
  780. m->c->clientID, m->c->messageQueue->count);
  781. Thread_unlock_mutex(mqttclient_mutex);
  782. rc = (*(m->ma))(m->context, qe->topicName, topicLen, qe->msg);
  783. Thread_lock_mutex(mqttclient_mutex);
  784. /* if 0 (false) is returned by the callback then it failed, so we don't remove the message from
  785. * the queue, and it will be retried later. If 1 is returned then the message data may have been freed,
  786. * so we must be careful how we use it.
  787. */
  788. if (rc)
  789. {
  790. #if !defined(NO_PERSISTENCE)
  791. if (m->c->persistence)
  792. MQTTPersistence_unpersistQueueEntry(m->c, (MQTTPersistence_qEntry*)qe);
  793. #endif
  794. ListRemove(m->c->messageQueue, qe);
  795. }
  796. else
  797. Log(TRACE_MIN, -1, "False returned from messageArrived for client %s, message remains on queue",
  798. m->c->clientID);
  799. }
  800. if (pack)
  801. {
  802. if (pack->header.bits.type == CONNACK)
  803. {
  804. Log(TRACE_MIN, -1, "Posting connack semaphore for client %s", m->c->clientID);
  805. m->pack = pack;
  806. Thread_post_sem(m->connack_sem);
  807. }
  808. else if (pack->header.bits.type == SUBACK)
  809. {
  810. Log(TRACE_MIN, -1, "Posting suback semaphore for client %s", m->c->clientID);
  811. m->pack = pack;
  812. Thread_post_sem(m->suback_sem);
  813. }
  814. else if (pack->header.bits.type == UNSUBACK)
  815. {
  816. Log(TRACE_MIN, -1, "Posting unsuback semaphore for client %s", m->c->clientID);
  817. m->pack = pack;
  818. Thread_post_sem(m->unsuback_sem);
  819. }
  820. else if (m->c->MQTTVersion >= MQTTVERSION_5)
  821. {
  822. if (pack->header.bits.type == DISCONNECT && m->disconnected)
  823. {
  824. struct props_rc_parms* dp;
  825. Ack* disc = (Ack*)pack;
  826. dp = malloc(sizeof(struct props_rc_parms));
  827. if (dp)
  828. {
  829. dp->m = m;
  830. dp->reasonCode = disc->rc;
  831. dp->properties = malloc(sizeof(MQTTProperties));
  832. if (dp->properties)
  833. {
  834. *(dp->properties) = disc->properties;
  835. MQTTClient_disconnect1(m, 10, 0, 1, MQTTREASONCODE_SUCCESS, NULL);
  836. Log(TRACE_MIN, -1, "Calling disconnected for client %s", m->c->clientID);
  837. Thread_start(call_disconnected, dp);
  838. }
  839. else
  840. free(dp);
  841. }
  842. free(disc);
  843. }
  844. #if 0
  845. if (pack->header.bits.type == AUTH && m->auth_handle)
  846. {
  847. struct props_rc_parms dp;
  848. Ack* disc = (Ack*)pack;
  849. dp.m = m;
  850. dp.properties = &disc->properties;
  851. dp.reasonCode = disc->rc;
  852. free(pack);
  853. Log(TRACE_MIN, -1, "Calling auth_handle for client %s", m->c->clientID);
  854. Thread_start(call_auth_handle, &dp);
  855. }
  856. #endif
  857. }
  858. }
  859. else if (m->c->connect_state == TCP_IN_PROGRESS)
  860. {
  861. int error;
  862. socklen_t len = sizeof(error);
  863. if ((m->rc = getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len)) == 0)
  864. m->rc = error;
  865. Log(TRACE_MIN, -1, "Posting connect semaphore for client %s rc %d", m->c->clientID, m->rc);
  866. m->c->connect_state = NOT_IN_PROGRESS;
  867. Thread_post_sem(m->connect_sem);
  868. }
  869. #if defined(OPENSSL)
  870. else if (m->c->connect_state == SSL_IN_PROGRESS)
  871. {
  872. rc = m->c->sslopts->struct_version >= 3 ?
  873. SSLSocket_connect(m->c->net.ssl, m->c->net.socket, m->serverURI,
  874. m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
  875. SSLSocket_connect(m->c->net.ssl, m->c->net.socket, m->serverURI,
  876. m->c->sslopts->verify, NULL, NULL);
  877. if (rc == 1 || rc == SSL_FATAL)
  878. {
  879. if (rc == 1 && (m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
  880. m->c->session = SSL_get1_session(m->c->net.ssl);
  881. m->rc = rc;
  882. Log(TRACE_MIN, -1, "Posting connect semaphore for SSL client %s rc %d", m->c->clientID, m->rc);
  883. m->c->connect_state = NOT_IN_PROGRESS;
  884. Thread_post_sem(m->connect_sem);
  885. }
  886. }
  887. #endif
  888. else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS)
  889. {
  890. if (rc != TCPSOCKET_INTERRUPTED)
  891. {
  892. Log(TRACE_MIN, -1, "Posting websocket handshake for client %s rc %d", m->c->clientID, m->rc);
  893. m->c->connect_state = WAIT_FOR_CONNACK;
  894. Thread_post_sem(m->connect_sem);
  895. }
  896. }
  897. }
  898. }
  899. run_id = 0;
  900. running = tostop = 0;
  901. Thread_unlock_mutex(mqttclient_mutex);
  902. FUNC_EXIT;
  903. #if defined(_WIN32) || defined(_WIN64)
  904. ExitThread(0);
  905. #endif
  906. return 0;
  907. }
  908. static int MQTTClient_stop(void)
  909. {
  910. int rc = 0;
  911. FUNC_ENTRY;
  912. if (running == 1 && tostop == 0)
  913. {
  914. int conn_count = 0;
  915. ListElement* current = NULL;
  916. if (handles != NULL)
  917. {
  918. /* find out how many handles are still connected */
  919. while (ListNextElement(handles, &current))
  920. {
  921. if (((MQTTClients*)(current->content))->c->connect_state > NOT_IN_PROGRESS ||
  922. ((MQTTClients*)(current->content))->c->connected)
  923. ++conn_count;
  924. }
  925. }
  926. Log(TRACE_MIN, -1, "Conn_count is %d", conn_count);
  927. /* stop the background thread, if we are the last one to be using it */
  928. if (conn_count == 0)
  929. {
  930. int count = 0;
  931. tostop = 1;
  932. if (Thread_getid() != run_id)
  933. {
  934. while (running && ++count < 100)
  935. {
  936. Thread_unlock_mutex(mqttclient_mutex);
  937. Log(TRACE_MIN, -1, "sleeping");
  938. MQTTTime_sleep(100L);
  939. Thread_lock_mutex(mqttclient_mutex);
  940. }
  941. }
  942. rc = 1;
  943. }
  944. }
  945. FUNC_EXIT_RC(rc);
  946. return rc;
  947. }
  948. int MQTTClient_setCallbacks(MQTTClient handle, void* context, MQTTClient_connectionLost* cl,
  949. MQTTClient_messageArrived* ma, MQTTClient_deliveryComplete* dc)
  950. {
  951. int rc = MQTTCLIENT_SUCCESS;
  952. MQTTClients* m = handle;
  953. FUNC_ENTRY;
  954. Thread_lock_mutex(mqttclient_mutex);
  955. if (m == NULL || ma == NULL || m->c->connect_state != NOT_IN_PROGRESS)
  956. rc = MQTTCLIENT_FAILURE;
  957. else
  958. {
  959. m->context = context;
  960. m->cl = cl;
  961. m->ma = ma;
  962. m->dc = dc;
  963. }
  964. Thread_unlock_mutex(mqttclient_mutex);
  965. FUNC_EXIT_RC(rc);
  966. return rc;
  967. }
  968. static void MQTTClient_closeSession(Clients* client, enum MQTTReasonCodes reason, MQTTProperties* props)
  969. {
  970. FUNC_ENTRY;
  971. client->good = 0;
  972. client->ping_outstanding = 0;
  973. client->ping_due = 0;
  974. if (client->net.socket > 0)
  975. {
  976. if (client->connected)
  977. MQTTPacket_send_disconnect(client, reason, props);
  978. Thread_lock_mutex(socket_mutex);
  979. WebSocket_close(&client->net, WebSocket_CLOSE_NORMAL, NULL);
  980. #if defined(OPENSSL)
  981. SSL_SESSION_free(client->session); /* is a no-op if session is NULL */
  982. client->session = NULL; /* show the session has been freed */
  983. SSLSocket_close(&client->net);
  984. #endif
  985. Socket_close(client->net.socket);
  986. Thread_unlock_mutex(socket_mutex);
  987. client->net.socket = 0;
  988. #if defined(OPENSSL)
  989. client->net.ssl = NULL;
  990. #endif
  991. }
  992. client->connected = 0;
  993. client->connect_state = NOT_IN_PROGRESS;
  994. if (client->MQTTVersion < MQTTVERSION_5 && client->cleansession)
  995. MQTTClient_cleanSession(client);
  996. FUNC_EXIT;
  997. }
  998. static int MQTTClient_cleanSession(Clients* client)
  999. {
  1000. int rc = 0;
  1001. FUNC_ENTRY;
  1002. #if !defined(NO_PERSISTENCE)
  1003. rc = MQTTPersistence_clear(client);
  1004. #endif
  1005. MQTTProtocol_emptyMessageList(client->inboundMsgs);
  1006. MQTTProtocol_emptyMessageList(client->outboundMsgs);
  1007. MQTTClient_emptyMessageQueue(client);
  1008. client->msgID = 0;
  1009. FUNC_EXIT_RC(rc);
  1010. return rc;
  1011. }
  1012. void Protocol_processPublication(Publish* publish, Clients* client, int allocatePayload)
  1013. {
  1014. qEntry* qe = NULL;
  1015. MQTTClient_message* mm = NULL;
  1016. MQTTClient_message initialized = MQTTClient_message_initializer;
  1017. FUNC_ENTRY;
  1018. qe = malloc(sizeof(qEntry));
  1019. if (!qe)
  1020. goto exit;
  1021. mm = malloc(sizeof(MQTTClient_message));
  1022. if (!mm)
  1023. {
  1024. free(qe);
  1025. goto exit;
  1026. }
  1027. memcpy(mm, &initialized, sizeof(MQTTClient_message));
  1028. qe->msg = mm;
  1029. qe->topicName = publish->topic;
  1030. qe->topicLen = publish->topiclen;
  1031. publish->topic = NULL;
  1032. if (allocatePayload)
  1033. {
  1034. mm->payload = malloc(publish->payloadlen);
  1035. if (mm->payload == NULL)
  1036. {
  1037. free(mm);
  1038. free(qe);
  1039. goto exit;
  1040. }
  1041. memcpy(mm->payload, publish->payload, publish->payloadlen);
  1042. }
  1043. else
  1044. mm->payload = publish->payload;
  1045. mm->payloadlen = publish->payloadlen;
  1046. mm->qos = publish->header.bits.qos;
  1047. mm->retained = publish->header.bits.retain;
  1048. if (publish->header.bits.qos == 2)
  1049. mm->dup = 0; /* ensure that a QoS2 message is not passed to the application with dup = 1 */
  1050. else
  1051. mm->dup = publish->header.bits.dup;
  1052. mm->msgid = publish->msgId;
  1053. if (publish->MQTTVersion >= 5)
  1054. mm->properties = MQTTProperties_copy(&publish->properties);
  1055. ListAppend(client->messageQueue, qe, sizeof(qe) + sizeof(mm) + mm->payloadlen + strlen(qe->topicName)+1);
  1056. #if !defined(NO_PERSISTENCE)
  1057. if (client->persistence)
  1058. MQTTPersistence_persistQueueEntry(client, (MQTTPersistence_qEntry*)qe);
  1059. #endif
  1060. exit:
  1061. FUNC_EXIT;
  1062. }
  1063. static MQTTResponse MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI, int MQTTVersion,
  1064. START_TIME_TYPE start, ELAPSED_TIME_TYPE millisecsTimeout, MQTTProperties* connectProperties, MQTTProperties* willProperties)
  1065. {
  1066. MQTTClients* m = handle;
  1067. int rc = SOCKET_ERROR;
  1068. int sessionPresent = 0;
  1069. MQTTResponse resp = MQTTResponse_initializer;
  1070. FUNC_ENTRY;
  1071. resp.reasonCode = SOCKET_ERROR;
  1072. if (m->ma && !running)
  1073. {
  1074. int count = 0;
  1075. Thread_start(MQTTClient_run, handle);
  1076. if (MQTTTime_elapsed(start) >= millisecsTimeout)
  1077. {
  1078. rc = SOCKET_ERROR;
  1079. goto exit;
  1080. }
  1081. while (!running && ++count < 5)
  1082. {
  1083. Thread_unlock_mutex(mqttclient_mutex);
  1084. MQTTTime_sleep(100L);
  1085. Thread_lock_mutex(mqttclient_mutex);
  1086. }
  1087. if (!running)
  1088. {
  1089. rc = SOCKET_ERROR;
  1090. goto exit;
  1091. }
  1092. }
  1093. Log(TRACE_MIN, -1, "Connecting to serverURI %s with MQTT version %d", serverURI, MQTTVersion);
  1094. #if defined(OPENSSL)
  1095. #if defined(__GNUC__) && defined(__linux__)
  1096. rc = MQTTProtocol_connect(serverURI, m->c, m->ssl, m->websocket, MQTTVersion, connectProperties, willProperties,
  1097. millisecsTimeout - MQTTTime_elapsed(start));
  1098. #else
  1099. rc = MQTTProtocol_connect(serverURI, m->c, m->ssl, m->websocket, MQTTVersion, connectProperties, willProperties);
  1100. #endif
  1101. #else
  1102. #if defined(__GNUC__) && defined(__linux__)
  1103. rc = MQTTProtocol_connect(serverURI, m->c, m->websocket, MQTTVersion, connectProperties, willProperties,
  1104. millisecsTimeout - MQTTTime_elapsed(start));
  1105. #else
  1106. rc = MQTTProtocol_connect(serverURI, m->c, m->websocket, MQTTVersion, connectProperties, willProperties);
  1107. #endif
  1108. #endif
  1109. if (rc == SOCKET_ERROR)
  1110. goto exit;
  1111. if (m->c->connect_state == NOT_IN_PROGRESS)
  1112. {
  1113. rc = SOCKET_ERROR;
  1114. goto exit;
  1115. }
  1116. if (m->c->connect_state == TCP_IN_PROGRESS) /* TCP connect started - wait for completion */
  1117. {
  1118. Thread_unlock_mutex(mqttclient_mutex);
  1119. MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTTime_elapsed(start));
  1120. Thread_lock_mutex(mqttclient_mutex);
  1121. if (rc != 0)
  1122. {
  1123. rc = SOCKET_ERROR;
  1124. goto exit;
  1125. }
  1126. #if defined(OPENSSL)
  1127. if (m->ssl)
  1128. {
  1129. int port1;
  1130. size_t hostname_len;
  1131. const char *topic;
  1132. int setSocketForSSLrc = 0;
  1133. if (m->c->net.https_proxy) {
  1134. m->c->connect_state = PROXY_CONNECT_IN_PROGRESS;
  1135. if ((rc = Proxy_connect( &m->c->net, 1, serverURI)) == SOCKET_ERROR )
  1136. goto exit;
  1137. }
  1138. hostname_len = MQTTProtocol_addressPort(serverURI, &port1, &topic, MQTT_DEFAULT_PORT);
  1139. setSocketForSSLrc = SSLSocket_setSocketForSSL(&m->c->net, m->c->sslopts,
  1140. serverURI, hostname_len);
  1141. if (setSocketForSSLrc != MQTTCLIENT_SUCCESS)
  1142. {
  1143. if (m->c->session != NULL)
  1144. if ((rc = SSL_set_session(m->c->net.ssl, m->c->session)) != 1)
  1145. Log(TRACE_MIN, -1, "Failed to set SSL session with stored data, non critical");
  1146. rc = m->c->sslopts->struct_version >= 3 ?
  1147. SSLSocket_connect(m->c->net.ssl, m->c->net.socket, serverURI,
  1148. m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
  1149. SSLSocket_connect(m->c->net.ssl, m->c->net.socket, serverURI,
  1150. m->c->sslopts->verify, NULL, NULL);
  1151. if (rc == TCPSOCKET_INTERRUPTED)
  1152. m->c->connect_state = SSL_IN_PROGRESS; /* the connect is still in progress */
  1153. else if (rc == SSL_FATAL)
  1154. {
  1155. rc = SOCKET_ERROR;
  1156. goto exit;
  1157. }
  1158. else if (rc == 1)
  1159. {
  1160. if (m->websocket)
  1161. {
  1162. m->c->connect_state = WEBSOCKET_IN_PROGRESS;
  1163. rc = WebSocket_connect(&m->c->net, 1, serverURI);
  1164. if ( rc == SOCKET_ERROR )
  1165. goto exit;
  1166. }
  1167. else
  1168. {
  1169. rc = MQTTCLIENT_SUCCESS;
  1170. m->c->connect_state = WAIT_FOR_CONNACK;
  1171. if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
  1172. {
  1173. rc = SOCKET_ERROR;
  1174. goto exit;
  1175. }
  1176. if ((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
  1177. m->c->session = SSL_get1_session(m->c->net.ssl);
  1178. }
  1179. }
  1180. }
  1181. else
  1182. {
  1183. rc = SOCKET_ERROR;
  1184. goto exit;
  1185. }
  1186. }
  1187. #endif
  1188. else
  1189. {
  1190. if (m->c->net.http_proxy) {
  1191. m->c->connect_state = PROXY_CONNECT_IN_PROGRESS;
  1192. if ((rc = Proxy_connect( &m->c->net, 0, serverURI)) == SOCKET_ERROR )
  1193. goto exit;
  1194. }
  1195. if (m->websocket)
  1196. {
  1197. m->c->connect_state = WEBSOCKET_IN_PROGRESS;
  1198. if ( WebSocket_connect(&m->c->net, 0, serverURI) == SOCKET_ERROR )
  1199. {
  1200. rc = SOCKET_ERROR;
  1201. goto exit;
  1202. }
  1203. }
  1204. else
  1205. {
  1206. m->c->connect_state = WAIT_FOR_CONNACK; /* TCP connect completed, in which case send the MQTT connect packet */
  1207. if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
  1208. {
  1209. rc = SOCKET_ERROR;
  1210. goto exit;
  1211. }
  1212. }
  1213. }
  1214. }
  1215. #if defined(OPENSSL)
  1216. if (m->c->connect_state == SSL_IN_PROGRESS) /* SSL connect sent - wait for completion */
  1217. {
  1218. Thread_unlock_mutex(mqttclient_mutex);
  1219. MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTTime_elapsed(start));
  1220. Thread_lock_mutex(mqttclient_mutex);
  1221. if (rc != 1)
  1222. {
  1223. rc = SOCKET_ERROR;
  1224. goto exit;
  1225. }
  1226. if((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
  1227. m->c->session = SSL_get1_session(m->c->net.ssl);
  1228. if ( m->websocket )
  1229. {
  1230. /* wait for websocket connect */
  1231. m->c->connect_state = WEBSOCKET_IN_PROGRESS;
  1232. rc = WebSocket_connect( &m->c->net, 1, serverURI);
  1233. if ( rc != 1 )
  1234. {
  1235. rc = SOCKET_ERROR;
  1236. goto exit;
  1237. }
  1238. }
  1239. else
  1240. {
  1241. m->c->connect_state = WAIT_FOR_CONNACK; /* TCP connect completed, in which case send the MQTT connect packet */
  1242. if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
  1243. {
  1244. rc = SOCKET_ERROR;
  1245. goto exit;
  1246. }
  1247. }
  1248. }
  1249. #endif
  1250. if (m->c->connect_state == WEBSOCKET_IN_PROGRESS) /* websocket request sent - wait for upgrade */
  1251. {
  1252. Thread_unlock_mutex(mqttclient_mutex);
  1253. MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTTime_elapsed(start));
  1254. Thread_lock_mutex(mqttclient_mutex);
  1255. m->c->connect_state = WAIT_FOR_CONNACK; /* websocket upgrade complete */
  1256. if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
  1257. {
  1258. rc = SOCKET_ERROR;
  1259. goto exit;
  1260. }
  1261. }
  1262. if (m->c->connect_state == WAIT_FOR_CONNACK) /* MQTT connect sent - wait for CONNACK */
  1263. {
  1264. MQTTPacket* pack = NULL;
  1265. Thread_unlock_mutex(mqttclient_mutex);
  1266. pack = MQTTClient_waitfor(handle, CONNACK, &rc, millisecsTimeout - MQTTTime_elapsed(start));
  1267. Thread_lock_mutex(mqttclient_mutex);
  1268. if (pack == NULL)
  1269. rc = SOCKET_ERROR;
  1270. else
  1271. {
  1272. Connack* connack = (Connack*)pack;
  1273. Log(TRACE_PROTOCOL, 1, NULL, m->c->net.socket, m->c->clientID, connack->rc);
  1274. if ((rc = connack->rc) == MQTTCLIENT_SUCCESS)
  1275. {
  1276. m->c->connected = 1;
  1277. m->c->good = 1;
  1278. m->c->connect_state = NOT_IN_PROGRESS;
  1279. if (MQTTVersion == 4)
  1280. sessionPresent = connack->flags.bits.sessionPresent;
  1281. if (m->c->cleansession || m->c->cleanstart)
  1282. rc = MQTTClient_cleanSession(m->c);
  1283. if (m->c->outboundMsgs->count > 0)
  1284. {
  1285. ListElement* outcurrent = NULL;
  1286. START_TIME_TYPE zero = START_TIME_ZERO;
  1287. while (ListNextElement(m->c->outboundMsgs, &outcurrent))
  1288. {
  1289. Messages* m = (Messages*)(outcurrent->content);
  1290. memset(&m->lastTouch, '\0', sizeof(m->lastTouch));
  1291. }
  1292. MQTTProtocol_retry(zero, 1, 1);
  1293. if (m->c->connected != 1)
  1294. rc = MQTTCLIENT_DISCONNECTED;
  1295. }
  1296. if (m->c->MQTTVersion == MQTTVERSION_5)
  1297. {
  1298. if ((resp.properties = malloc(sizeof(MQTTProperties))) == NULL)
  1299. {
  1300. rc = PAHO_MEMORY_ERROR;
  1301. goto exit;
  1302. }
  1303. *resp.properties = MQTTProperties_copy(&connack->properties);
  1304. }
  1305. }
  1306. MQTTPacket_freeConnack(connack);
  1307. m->pack = NULL;
  1308. }
  1309. }
  1310. exit:
  1311. if (rc == MQTTCLIENT_SUCCESS)
  1312. {
  1313. if (options->struct_version >= 4) /* means we have to fill out return values */
  1314. {
  1315. options->returned.serverURI = serverURI;
  1316. options->returned.MQTTVersion = MQTTVersion;
  1317. options->returned.sessionPresent = sessionPresent;
  1318. }
  1319. }
  1320. else
  1321. MQTTClient_disconnect1(handle, 0, 0, (MQTTVersion == 3), MQTTREASONCODE_SUCCESS, NULL); /* don't want to call connection lost */
  1322. resp.reasonCode = rc;
  1323. FUNC_EXIT_RC(resp.reasonCode);
  1324. return resp;
  1325. }
  1326. static int retryLoopIntervalms = 5000;
  1327. void setRetryLoopInterval(int keepalive)
  1328. {
  1329. retryLoopIntervalms = (keepalive*1000) / 10;
  1330. if (retryLoopIntervalms < 100)
  1331. retryLoopIntervalms = 100;
  1332. else if (retryLoopIntervalms > 5000)
  1333. retryLoopIntervalms = 5000;
  1334. }
  1335. static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI,
  1336. MQTTProperties* connectProperties, MQTTProperties* willProperties)
  1337. {
  1338. MQTTClients* m = handle;
  1339. START_TIME_TYPE start;
  1340. ELAPSED_TIME_TYPE millisecsTimeout = 30000L;
  1341. MQTTResponse rc = MQTTResponse_initializer;
  1342. int MQTTVersion = 0;
  1343. FUNC_ENTRY;
  1344. rc.reasonCode = SOCKET_ERROR;
  1345. millisecsTimeout = options->connectTimeout * 1000;
  1346. start = MQTTTime_start_clock();
  1347. m->currentServerURI = serverURI;
  1348. m->c->keepAliveInterval = options->keepAliveInterval;
  1349. m->c->retryInterval = options->retryInterval;
  1350. setRetryLoopInterval(options->keepAliveInterval);
  1351. m->c->MQTTVersion = options->MQTTVersion;
  1352. m->c->cleanstart = m->c->cleansession = 0;
  1353. if (m->c->MQTTVersion >= MQTTVERSION_5)
  1354. m->c->cleanstart = options->cleanstart;
  1355. else
  1356. m->c->cleansession = options->cleansession;
  1357. m->c->maxInflightMessages = (options->reliable) ? 1 : 10;
  1358. if (options->struct_version >= 6)
  1359. {
  1360. if (options->maxInflightMessages > 0)
  1361. m->c->maxInflightMessages = options->maxInflightMessages;
  1362. }
  1363. if (options->struct_version >= 7)
  1364. {
  1365. m->c->net.httpHeaders = options->httpHeaders;
  1366. }
  1367. if (options->struct_version >= 8)
  1368. {
  1369. if (options->httpProxy)
  1370. m->c->httpProxy = MQTTStrdup(options->httpProxy);
  1371. if (options->httpsProxy)
  1372. m->c->httpsProxy = MQTTStrdup(options->httpsProxy);
  1373. }
  1374. if (m->c->will)
  1375. {
  1376. free(m->c->will->payload);
  1377. free(m->c->will->topic);
  1378. free(m->c->will);
  1379. m->c->will = NULL;
  1380. }
  1381. if (options->will && (options->will->struct_version == 0 || options->will->struct_version == 1))
  1382. {
  1383. const void* source = NULL;
  1384. if ((m->c->will = malloc(sizeof(willMessages))) == NULL)
  1385. {
  1386. rc.reasonCode = PAHO_MEMORY_ERROR;
  1387. goto exit;
  1388. }
  1389. if (options->will->message || (options->will->struct_version == 1 && options->will->payload.data))
  1390. {
  1391. if (options->will->struct_version == 1 && options->will->payload.data)
  1392. {
  1393. m->c->will->payloadlen = options->will->payload.len;
  1394. source = options->will->payload.data;
  1395. }
  1396. else
  1397. {
  1398. m->c->will->payloadlen = (int)strlen(options->will->message);
  1399. source = (void*)options->will->message;
  1400. }
  1401. if ((m->c->will->payload = malloc(m->c->will->payloadlen)) == NULL)
  1402. {
  1403. free(m->c->will);
  1404. rc.reasonCode = PAHO_MEMORY_ERROR;
  1405. goto exit;
  1406. }
  1407. memcpy(m->c->will->payload, source, m->c->will->payloadlen);
  1408. }
  1409. else
  1410. {
  1411. m->c->will->payload = NULL;
  1412. m->c->will->payloadlen = 0;
  1413. }
  1414. m->c->will->qos = options->will->qos;
  1415. m->c->will->retained = options->will->retained;
  1416. m->c->will->topic = MQTTStrdup(options->will->topicName);
  1417. }
  1418. #if defined(OPENSSL)
  1419. if (m->c->sslopts)
  1420. {
  1421. if (m->c->sslopts->trustStore)
  1422. free((void*)m->c->sslopts->trustStore);
  1423. if (m->c->sslopts->keyStore)
  1424. free((void*)m->c->sslopts->keyStore);
  1425. if (m->c->sslopts->privateKey)
  1426. free((void*)m->c->sslopts->privateKey);
  1427. if (m->c->sslopts->privateKeyPassword)
  1428. free((void*)m->c->sslopts->privateKeyPassword);
  1429. if (m->c->sslopts->enabledCipherSuites)
  1430. free((void*)m->c->sslopts->enabledCipherSuites);
  1431. if (m->c->sslopts->struct_version >= 2)
  1432. {
  1433. if (m->c->sslopts->CApath)
  1434. free((void*)m->c->sslopts->CApath);
  1435. }
  1436. free(m->c->sslopts);
  1437. m->c->sslopts = NULL;
  1438. }
  1439. if (options->struct_version != 0 && options->ssl)
  1440. {
  1441. if ((m->c->sslopts = malloc(sizeof(MQTTClient_SSLOptions))) == NULL)
  1442. {
  1443. rc.reasonCode = PAHO_MEMORY_ERROR;
  1444. goto exit;
  1445. }
  1446. memset(m->c->sslopts, '\0', sizeof(MQTTClient_SSLOptions));
  1447. m->c->sslopts->struct_version = options->ssl->struct_version;
  1448. if (options->ssl->trustStore)
  1449. m->c->sslopts->trustStore = MQTTStrdup(options->ssl->trustStore);
  1450. if (options->ssl->keyStore)
  1451. m->c->sslopts->keyStore = MQTTStrdup(options->ssl->keyStore);
  1452. if (options->ssl->privateKey)
  1453. m->c->sslopts->privateKey = MQTTStrdup(options->ssl->privateKey);
  1454. if (options->ssl->privateKeyPassword)
  1455. m->c->sslopts->privateKeyPassword = MQTTStrdup(options->ssl->privateKeyPassword);
  1456. if (options->ssl->enabledCipherSuites)
  1457. m->c->sslopts->enabledCipherSuites = MQTTStrdup(options->ssl->enabledCipherSuites);
  1458. m->c->sslopts->enableServerCertAuth = options->ssl->enableServerCertAuth;
  1459. if (m->c->sslopts->struct_version >= 1)
  1460. m->c->sslopts->sslVersion = options->ssl->sslVersion;
  1461. if (m->c->sslopts->struct_version >= 2)
  1462. {
  1463. m->c->sslopts->verify = options->ssl->verify;
  1464. if (options->ssl->CApath)
  1465. m->c->sslopts->CApath = MQTTStrdup(options->ssl->CApath);
  1466. }
  1467. if (m->c->sslopts->struct_version >= 3)
  1468. {
  1469. m->c->sslopts->ssl_error_cb = options->ssl->ssl_error_cb;
  1470. m->c->sslopts->ssl_error_context = options->ssl->ssl_error_context;
  1471. }
  1472. if (m->c->sslopts->struct_version >= 4)
  1473. {
  1474. m->c->sslopts->ssl_psk_cb = options->ssl->ssl_psk_cb;
  1475. m->c->sslopts->ssl_psk_context = options->ssl->ssl_psk_context;
  1476. m->c->sslopts->disableDefaultTrustStore = options->ssl->disableDefaultTrustStore;
  1477. }
  1478. if (m->c->sslopts->struct_version >= 5)
  1479. {
  1480. m->c->sslopts->protos = options->ssl->protos;
  1481. m->c->sslopts->protos_len = options->ssl->protos_len;
  1482. }
  1483. }
  1484. #endif
  1485. if (m->c->username)
  1486. {
  1487. free((void*)m->c->username);
  1488. m->c->username = NULL;
  1489. }
  1490. if (options->username)
  1491. m->c->username = MQTTStrdup(options->username);
  1492. if (m->c->password)
  1493. {
  1494. free((void*)m->c->password);
  1495. m->c->password = NULL;
  1496. }
  1497. if (options->password)
  1498. {
  1499. m->c->password = MQTTStrdup(options->password);
  1500. m->c->passwordlen = (int)strlen(options->password);
  1501. }
  1502. else if (options->struct_version >= 5 && options->binarypwd.data)
  1503. {
  1504. m->c->passwordlen = options->binarypwd.len;
  1505. if ((m->c->password = malloc(m->c->passwordlen)) == NULL)
  1506. {
  1507. rc.reasonCode = PAHO_MEMORY_ERROR;
  1508. goto exit;
  1509. }
  1510. memcpy((void*)m->c->password, options->binarypwd.data, m->c->passwordlen);
  1511. }
  1512. if (options->struct_version >= 3)
  1513. MQTTVersion = options->MQTTVersion;
  1514. else
  1515. MQTTVersion = MQTTVERSION_DEFAULT;
  1516. if (MQTTVersion == MQTTVERSION_DEFAULT)
  1517. {
  1518. rc = MQTTClient_connectURIVersion(handle, options, serverURI, 4, start, millisecsTimeout,
  1519. connectProperties, willProperties);
  1520. if (rc.reasonCode != MQTTCLIENT_SUCCESS)
  1521. {
  1522. rc = MQTTClient_connectURIVersion(handle, options, serverURI, 3, start, millisecsTimeout,
  1523. connectProperties, willProperties);
  1524. }
  1525. }
  1526. else
  1527. rc = MQTTClient_connectURIVersion(handle, options, serverURI, MQTTVersion, start, millisecsTimeout,
  1528. connectProperties, willProperties);
  1529. exit:
  1530. FUNC_EXIT_RC(rc.reasonCode);
  1531. return rc;
  1532. }
  1533. MQTTResponse MQTTClient_connectAll(MQTTClient handle, MQTTClient_connectOptions* options,
  1534. MQTTProperties* connectProperties, MQTTProperties* willProperties);
  1535. int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
  1536. {
  1537. MQTTClients* m = handle;
  1538. MQTTResponse response;
  1539. if (m != NULL && m->c != NULL && m->c->MQTTVersion >= MQTTVERSION_5)
  1540. return MQTTCLIENT_WRONG_MQTT_VERSION;
  1541. response = MQTTClient_connectAll(handle, options, NULL, NULL);
  1542. return response.reasonCode;
  1543. }
  1544. MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connectOptions* options,
  1545. MQTTProperties* connectProperties, MQTTProperties* willProperties)
  1546. {
  1547. MQTTClients* m = handle;
  1548. MQTTResponse response = MQTTResponse_initializer;
  1549. if (m != NULL && m->c != NULL && m->c->MQTTVersion < MQTTVERSION_5)
  1550. {
  1551. response.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
  1552. return response;
  1553. }
  1554. return MQTTClient_connectAll(handle, options, connectProperties, willProperties);
  1555. }
  1556. MQTTResponse MQTTClient_connectAll(MQTTClient handle, MQTTClient_connectOptions* options,
  1557. MQTTProperties* connectProperties, MQTTProperties* willProperties)
  1558. {
  1559. MQTTClients* m = handle;
  1560. MQTTResponse rc = MQTTResponse_initializer;
  1561. FUNC_ENTRY;
  1562. Thread_lock_mutex(connect_mutex);
  1563. Thread_lock_mutex(mqttclient_mutex);
  1564. rc.reasonCode = SOCKET_ERROR;
  1565. if (!library_initialized)
  1566. {
  1567. rc.reasonCode = MQTTCLIENT_FAILURE;
  1568. goto exit;
  1569. }
  1570. if (options == NULL || m == NULL || m->c == NULL)
  1571. {
  1572. rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
  1573. goto exit;
  1574. }
  1575. if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 8)
  1576. {
  1577. rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
  1578. goto exit;
  1579. }
  1580. #if defined(OPENSSL)
  1581. if (m->ssl && options->ssl == NULL)
  1582. {
  1583. rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
  1584. goto exit;
  1585. }
  1586. #endif
  1587. if (options->will) /* check validity of will options structure */
  1588. {
  1589. if (strncmp(options->will->struct_id, "MQTW", 4) != 0 || (options->will->struct_version != 0 && options->will->struct_version != 1))
  1590. {
  1591. rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
  1592. goto exit;
  1593. }
  1594. if (options->will->qos < 0 || options->will->qos > 2)
  1595. {
  1596. rc.reasonCode = MQTTCLIENT_BAD_QOS;
  1597. goto exit;
  1598. }
  1599. if (options->will->topicName == NULL)
  1600. {
  1601. rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
  1602. goto exit;
  1603. } else if (strlen(options->will->topicName) == 0)
  1604. {
  1605. rc.reasonCode = MQTTCLIENT_0_LEN_WILL_TOPIC;
  1606. goto exit;
  1607. }
  1608. }
  1609. #if defined(OPENSSL)
  1610. if (options->struct_version != 0 && options->ssl) /* check validity of SSL options structure */
  1611. {
  1612. if (strncmp(options->ssl->struct_id, "MQTS", 4) != 0 || options->ssl->struct_version < 0 || options->ssl->struct_version > 5)
  1613. {
  1614. rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
  1615. goto exit;
  1616. }
  1617. }
  1618. #endif
  1619. if ((options->username && !UTF8_validateString(options->username)) ||
  1620. (options->password && !UTF8_validateString(options->password)))
  1621. {
  1622. rc.reasonCode = MQTTCLIENT_BAD_UTF8_STRING;
  1623. goto exit;
  1624. }
  1625. if (options->MQTTVersion != MQTTVERSION_DEFAULT &&
  1626. (options->MQTTVersion < MQTTVERSION_3_1 || options->MQTTVersion > MQTTVERSION_5))
  1627. {
  1628. rc.reasonCode = MQTTCLIENT_BAD_MQTT_VERSION;
  1629. goto exit;
  1630. }
  1631. if (options->MQTTVersion >= MQTTVERSION_5)
  1632. {
  1633. if (options->cleansession != 0)
  1634. {
  1635. rc.reasonCode = MQTTCLIENT_BAD_MQTT_OPTION;
  1636. goto exit;
  1637. }
  1638. }
  1639. else if (options->cleanstart != 0)
  1640. {
  1641. rc.reasonCode = MQTTCLIENT_BAD_MQTT_OPTION;
  1642. goto exit;
  1643. }
  1644. if (options->struct_version < 2 || options->serverURIcount == 0)
  1645. {
  1646. if ( !m )
  1647. {
  1648. rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
  1649. goto exit;
  1650. }
  1651. rc = MQTTClient_connectURI(handle, options, m->serverURI, connectProperties, willProperties);
  1652. }
  1653. else
  1654. {
  1655. int i;
  1656. for (i = 0; i < options->serverURIcount; ++i)
  1657. {
  1658. char* serverURI = options->serverURIs[i];
  1659. if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
  1660. serverURI += strlen(URI_TCP);
  1661. else if (strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) == 0)
  1662. serverURI += strlen(URI_TCP);
  1663. else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
  1664. {
  1665. serverURI += strlen(URI_WS);
  1666. m->websocket = 1;
  1667. }
  1668. #if defined(OPENSSL)
  1669. else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
  1670. {
  1671. serverURI += strlen(URI_SSL);
  1672. m->ssl = 1;
  1673. }
  1674. else if (strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) == 0)
  1675. {
  1676. serverURI += strlen(URI_MQTTS);
  1677. m->ssl = 1;
  1678. }
  1679. else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
  1680. {
  1681. serverURI += strlen(URI_WSS);
  1682. m->ssl = 1;
  1683. m->websocket = 1;
  1684. }
  1685. #endif
  1686. rc = MQTTClient_connectURI(handle, options, serverURI, connectProperties, willProperties);
  1687. if (rc.reasonCode == MQTTREASONCODE_SUCCESS)
  1688. break;
  1689. }
  1690. }
  1691. if (rc.reasonCode == MQTTREASONCODE_SUCCESS)
  1692. {
  1693. if (rc.properties && MQTTProperties_hasProperty(rc.properties, MQTTPROPERTY_CODE_RECEIVE_MAXIMUM))
  1694. {
  1695. int recv_max = MQTTProperties_getNumericValue(rc.properties, MQTTPROPERTY_CODE_RECEIVE_MAXIMUM);
  1696. if (m->c->maxInflightMessages > recv_max)
  1697. m->c->maxInflightMessages = recv_max;
  1698. }
  1699. }
  1700. exit:
  1701. if (m && m->c && m->c->will)
  1702. {
  1703. if (m->c->will->payload)
  1704. free(m->c->will->payload);
  1705. if (m->c->will->topic)
  1706. free(m->c->will->topic);
  1707. free(m->c->will);
  1708. m->c->will = NULL;
  1709. }
  1710. Thread_unlock_mutex(mqttclient_mutex);
  1711. Thread_unlock_mutex(connect_mutex);
  1712. FUNC_EXIT_RC(rc.reasonCode);
  1713. return rc;
  1714. }
  1715. /**
  1716. * mqttclient_mutex must be locked when you call this function, if multi threaded
  1717. */
  1718. static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int call_connection_lost, int stop,
  1719. enum MQTTReasonCodes reason, MQTTProperties* props)
  1720. {
  1721. MQTTClients* m = handle;
  1722. START_TIME_TYPE start;
  1723. int rc = MQTTCLIENT_SUCCESS;
  1724. int was_connected = 0;
  1725. struct conlost_sync_data sync = {
  1726. NULL, m
  1727. };
  1728. FUNC_ENTRY;
  1729. if (m == NULL || m->c == NULL)
  1730. {
  1731. rc = MQTTCLIENT_FAILURE;
  1732. goto exit;
  1733. }
  1734. was_connected = m->c->connected; /* should be 1 */
  1735. if (m->c->connected != 0)
  1736. {
  1737. start = MQTTTime_start_clock();
  1738. m->c->connect_state = DISCONNECTING; /* indicate disconnecting */
  1739. while (m->c->inboundMsgs->count > 0 || m->c->outboundMsgs->count > 0)
  1740. { /* wait for all inflight message flows to finish, up to timeout */
  1741. if (MQTTTime_elapsed(start) >= (ELAPSED_TIME_TYPE)timeout)
  1742. break;
  1743. Thread_unlock_mutex(mqttclient_mutex);
  1744. MQTTClient_yield();
  1745. Thread_lock_mutex(mqttclient_mutex);
  1746. }
  1747. }
  1748. MQTTClient_closeSession(m->c, reason, props);
  1749. exit:
  1750. if (stop)
  1751. MQTTClient_stop();
  1752. if (call_connection_lost && m->cl && was_connected)
  1753. {
  1754. sync.sem = Thread_create_sem(&rc);
  1755. Log(TRACE_MIN, -1, "Calling connectionLost for client %s", m->c->clientID);
  1756. Thread_start(connectionLost_call, &sync);
  1757. Thread_wait_sem(sync.sem, 5000);
  1758. Thread_destroy_sem(sync.sem);
  1759. }
  1760. FUNC_EXIT_RC(rc);
  1761. return rc;
  1762. }
  1763. /**
  1764. * mqttclient_mutex must be locked when you call this function, if multi threaded
  1765. */
  1766. static int MQTTClient_disconnect_internal(MQTTClient handle, int timeout)
  1767. {
  1768. return MQTTClient_disconnect1(handle, timeout, 1, 1, MQTTREASONCODE_SUCCESS, NULL);
  1769. }
  1770. /**
  1771. * mqttclient_mutex must be locked when you call this function, if multi threaded
  1772. */
  1773. void MQTTProtocol_closeSession(Clients* c, int sendwill)
  1774. {
  1775. MQTTClient_disconnect_internal((MQTTClient)c->context, 0);
  1776. }
  1777. int MQTTClient_disconnect(MQTTClient handle, int timeout)
  1778. {
  1779. int rc = 0;
  1780. Thread_lock_mutex(mqttclient_mutex);
  1781. rc = MQTTClient_disconnect1(handle, timeout, 0, 1, MQTTREASONCODE_SUCCESS, NULL);
  1782. Thread_unlock_mutex(mqttclient_mutex);
  1783. return rc;
  1784. }
  1785. int MQTTClient_disconnect5(MQTTClient handle, int timeout, enum MQTTReasonCodes reason, MQTTProperties* props)
  1786. {
  1787. int rc = 0;
  1788. Thread_lock_mutex(mqttclient_mutex);
  1789. rc = MQTTClient_disconnect1(handle, timeout, 0, 1, reason, props);
  1790. Thread_unlock_mutex(mqttclient_mutex);
  1791. return rc;
  1792. }
  1793. int MQTTClient_isConnected(MQTTClient handle)
  1794. {
  1795. MQTTClients* m = handle;
  1796. int rc = 0;
  1797. FUNC_ENTRY;
  1798. Thread_lock_mutex(mqttclient_mutex);
  1799. if (m && m->c)
  1800. rc = m->c->connected;
  1801. Thread_unlock_mutex(mqttclient_mutex);
  1802. FUNC_EXIT_RC(rc);
  1803. return rc;
  1804. }
  1805. MQTTResponse MQTTClient_subscribeMany5(MQTTClient handle, int count, char* const* topic,
  1806. int* qos, MQTTSubscribe_options* opts, MQTTProperties* props)
  1807. {
  1808. MQTTClients* m = handle;
  1809. List* topics = NULL;
  1810. List* qoss = NULL;
  1811. int i = 0;
  1812. int rc = MQTTCLIENT_FAILURE;
  1813. MQTTResponse resp = MQTTResponse_initializer;
  1814. int msgid = 0;
  1815. FUNC_ENTRY;
  1816. Thread_lock_mutex(subscribe_mutex);
  1817. Thread_lock_mutex(mqttclient_mutex);
  1818. resp.reasonCode = MQTTCLIENT_FAILURE;
  1819. if (m == NULL || m->c == NULL)
  1820. {
  1821. rc = MQTTCLIENT_FAILURE;
  1822. goto exit;
  1823. }
  1824. if (m->c->connected == 0)
  1825. {
  1826. rc = MQTTCLIENT_DISCONNECTED;
  1827. goto exit;
  1828. }
  1829. for (i = 0; i < count; i++)
  1830. {
  1831. if (!UTF8_validateString(topic[i]))
  1832. {
  1833. rc = MQTTCLIENT_BAD_UTF8_STRING;
  1834. goto exit;
  1835. }
  1836. if (qos[i] < 0 || qos[i] > 2)
  1837. {
  1838. rc = MQTTCLIENT_BAD_QOS;
  1839. goto exit;
  1840. }
  1841. }
  1842. if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
  1843. {
  1844. rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
  1845. goto exit;
  1846. }
  1847. topics = ListInitialize();
  1848. qoss = ListInitialize();
  1849. for (i = 0; i < count; i++)
  1850. {
  1851. ListAppend(topics, topic[i], strlen(topic[i]));
  1852. ListAppend(qoss, &qos[i], sizeof(int));
  1853. }
  1854. rc = MQTTProtocol_subscribe(m->c, topics, qoss, msgid, opts, props);
  1855. ListFreeNoContent(topics);
  1856. ListFreeNoContent(qoss);
  1857. if (rc == TCPSOCKET_COMPLETE)
  1858. {
  1859. MQTTPacket* pack = NULL;
  1860. Thread_unlock_mutex(mqttclient_mutex);
  1861. pack = MQTTClient_waitfor(handle, SUBACK, &rc, m->commandTimeout);
  1862. Thread_lock_mutex(mqttclient_mutex);
  1863. if (pack != NULL)
  1864. {
  1865. Suback* sub = (Suback*)pack;
  1866. if (m->c->MQTTVersion == MQTTVERSION_5)
  1867. {
  1868. if (sub->properties.count > 0)
  1869. {
  1870. if ((resp.properties = malloc(sizeof(MQTTProperties))) == NULL)
  1871. {
  1872. rc = PAHO_MEMORY_ERROR;
  1873. goto exit;
  1874. }
  1875. *resp.properties = MQTTProperties_copy(&sub->properties);
  1876. }
  1877. resp.reasonCodeCount = sub->qoss->count;
  1878. resp.reasonCode = *(int*)sub->qoss->first->content;
  1879. if (sub->qoss->count > 1)
  1880. {
  1881. ListElement* current = NULL;
  1882. int rc_count = 0;
  1883. if ((resp.reasonCodes = malloc(sizeof(enum MQTTReasonCodes) * (sub->qoss->count))) == NULL)
  1884. {
  1885. rc = PAHO_MEMORY_ERROR;
  1886. goto exit;
  1887. }
  1888. while (ListNextElement(sub->qoss, &current))
  1889. (resp.reasonCodes)[rc_count++] = *(enum MQTTReasonCodes*)(current->content);
  1890. }
  1891. }
  1892. else
  1893. {
  1894. ListElement *current = NULL;
  1895. /* if the returned count is greater than requested, it's an error*/
  1896. if (sub->qoss->count > count)
  1897. rc = MQTTCLIENT_FAILURE;
  1898. else
  1899. {
  1900. i = 0;
  1901. while (ListNextElement(sub->qoss, &current))
  1902. {
  1903. int *reqqos = (int*) (current->content);
  1904. qos[i++] = *reqqos;
  1905. }
  1906. }
  1907. resp.reasonCode = rc;
  1908. }
  1909. rc = MQTTProtocol_handleSubacks(pack, m->c->net.socket);
  1910. m->pack = NULL;
  1911. }
  1912. else
  1913. rc = SOCKET_ERROR;
  1914. }
  1915. if (rc == SOCKET_ERROR)
  1916. MQTTClient_disconnect_internal(handle, 0);
  1917. else if (rc == TCPSOCKET_COMPLETE)
  1918. rc = MQTTCLIENT_SUCCESS;
  1919. exit:
  1920. if (rc < 0)
  1921. resp.reasonCode = rc;
  1922. Thread_unlock_mutex(mqttclient_mutex);
  1923. Thread_unlock_mutex(subscribe_mutex);
  1924. FUNC_EXIT_RC(resp.reasonCode);
  1925. return resp;
  1926. }
  1927. int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, int* qos)
  1928. {
  1929. MQTTClients* m = handle;
  1930. MQTTResponse response = MQTTResponse_initializer;
  1931. if (m != NULL && m->c != NULL && m->c->MQTTVersion >= MQTTVERSION_5)
  1932. response.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
  1933. else
  1934. response = MQTTClient_subscribeMany5(handle, count, topic, qos, NULL, NULL);
  1935. return response.reasonCode;
  1936. }
  1937. MQTTResponse MQTTClient_subscribe5(MQTTClient handle, const char* topic, int qos,
  1938. MQTTSubscribe_options* opts, MQTTProperties* props)
  1939. {
  1940. MQTTResponse rc;
  1941. FUNC_ENTRY;
  1942. rc = MQTTClient_subscribeMany5(handle, 1, (char * const *)(&topic), &qos, opts, props);
  1943. if (qos == MQTT_BAD_SUBSCRIBE) /* addition for MQTT 3.1.1 - error code from subscribe */
  1944. rc.reasonCode = MQTT_BAD_SUBSCRIBE;
  1945. FUNC_EXIT_RC(rc.reasonCode);
  1946. return rc;
  1947. }
  1948. int MQTTClient_subscribe(MQTTClient handle, const char* topic, int qos)
  1949. {
  1950. MQTTClients* m = handle;
  1951. MQTTResponse response = MQTTResponse_initializer;
  1952. if (m->c->MQTTVersion >= MQTTVERSION_5)
  1953. response.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
  1954. else
  1955. response = MQTTClient_subscribe5(handle, topic, qos, NULL, NULL);
  1956. return response.reasonCode;
  1957. }
  1958. MQTTResponse MQTTClient_unsubscribeMany5(MQTTClient handle, int count, char* const* topic, MQTTProperties* props)
  1959. {
  1960. MQTTClients* m = handle;
  1961. List* topics = NULL;
  1962. int i = 0;
  1963. int rc = SOCKET_ERROR;
  1964. MQTTResponse resp = MQTTResponse_initializer;
  1965. int msgid = 0;
  1966. FUNC_ENTRY;
  1967. Thread_lock_mutex(unsubscribe_mutex);
  1968. Thread_lock_mutex(mqttclient_mutex);
  1969. resp.reasonCode = MQTTCLIENT_FAILURE;
  1970. if (m == NULL || m->c == NULL)
  1971. {
  1972. rc = MQTTCLIENT_FAILURE;
  1973. goto exit;
  1974. }
  1975. if (m->c->connected == 0)
  1976. {
  1977. rc = MQTTCLIENT_DISCONNECTED;
  1978. goto exit;
  1979. }
  1980. for (i = 0; i < count; i++)
  1981. {
  1982. if (!UTF8_validateString(topic[i]))
  1983. {
  1984. rc = MQTTCLIENT_BAD_UTF8_STRING;
  1985. goto exit;
  1986. }
  1987. }
  1988. if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
  1989. {
  1990. rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
  1991. goto exit;
  1992. }
  1993. topics = ListInitialize();
  1994. for (i = 0; i < count; i++)
  1995. ListAppend(topics, topic[i], strlen(topic[i]));
  1996. rc = MQTTProtocol_unsubscribe(m->c, topics, msgid, props);
  1997. ListFreeNoContent(topics);
  1998. if (rc == TCPSOCKET_COMPLETE)
  1999. {
  2000. MQTTPacket* pack = NULL;
  2001. Thread_unlock_mutex(mqttclient_mutex);
  2002. pack = MQTTClient_waitfor(handle, UNSUBACK, &rc, m->commandTimeout);
  2003. Thread_lock_mutex(mqttclient_mutex);
  2004. if (pack != NULL)
  2005. {
  2006. Unsuback* unsub = (Unsuback*)pack;
  2007. if (m->c->MQTTVersion == MQTTVERSION_5)
  2008. {
  2009. if (unsub->properties.count > 0)
  2010. {
  2011. if ((resp.properties = malloc(sizeof(MQTTProperties))) == NULL)
  2012. {
  2013. rc = PAHO_MEMORY_ERROR;
  2014. goto exit;
  2015. }
  2016. *resp.properties = MQTTProperties_copy(&unsub->properties);
  2017. }
  2018. resp.reasonCodeCount = unsub->reasonCodes->count;
  2019. resp.reasonCode = *(int*)unsub->reasonCodes->first->content;
  2020. if (unsub->reasonCodes->count > 1)
  2021. {
  2022. ListElement* current = NULL;
  2023. int rc_count = 0;
  2024. if ((resp.reasonCodes = malloc(sizeof(enum MQTTReasonCodes) * (unsub->reasonCodes->count))) == NULL)
  2025. {
  2026. rc = PAHO_MEMORY_ERROR;
  2027. goto exit;
  2028. }
  2029. while (ListNextElement(unsub->reasonCodes, &current))
  2030. (resp.reasonCodes)[rc_count++] = *(enum MQTTReasonCodes*)(current->content);
  2031. }
  2032. }
  2033. else
  2034. resp.reasonCode = rc;
  2035. rc = MQTTProtocol_handleUnsubacks(pack, m->c->net.socket);
  2036. m->pack = NULL;
  2037. }
  2038. else
  2039. rc = SOCKET_ERROR;
  2040. }
  2041. if (rc == SOCKET_ERROR)
  2042. MQTTClient_disconnect_internal(handle, 0);
  2043. exit:
  2044. if (rc < 0)
  2045. resp.reasonCode = rc;
  2046. Thread_unlock_mutex(mqttclient_mutex);
  2047. Thread_unlock_mutex(unsubscribe_mutex);
  2048. FUNC_EXIT_RC(resp.reasonCode);
  2049. return resp;
  2050. }
  2051. int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic)
  2052. {
  2053. MQTTClients* m = handle;
  2054. MQTTResponse response = MQTTResponse_initializer;
  2055. if (m != NULL && m->c != NULL && m->c->MQTTVersion >= MQTTVERSION_5)
  2056. response.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
  2057. else
  2058. response = MQTTClient_unsubscribeMany5(handle, count, topic, NULL);
  2059. return response.reasonCode;
  2060. }
  2061. MQTTResponse MQTTClient_unsubscribe5(MQTTClient handle, const char* topic, MQTTProperties* props)
  2062. {
  2063. MQTTResponse rc;
  2064. rc = MQTTClient_unsubscribeMany5(handle, 1, (char * const *)(&topic), props);
  2065. return rc;
  2066. }
  2067. int MQTTClient_unsubscribe(MQTTClient handle, const char* topic)
  2068. {
  2069. MQTTResponse response = MQTTClient_unsubscribe5(handle, topic, NULL);
  2070. return response.reasonCode;
  2071. }
  2072. MQTTResponse MQTTClient_publish5(MQTTClient handle, const char* topicName, int payloadlen, const void* payload,
  2073. int qos, int retained, MQTTProperties* properties, MQTTClient_deliveryToken* deliveryToken)
  2074. {
  2075. int rc = MQTTCLIENT_SUCCESS;
  2076. MQTTClients* m = handle;
  2077. Messages* msg = NULL;
  2078. Publish* p = NULL;
  2079. int blocked = 0;
  2080. int msgid = 0;
  2081. MQTTResponse resp = MQTTResponse_initializer;
  2082. FUNC_ENTRY;
  2083. Thread_lock_mutex(mqttclient_mutex);
  2084. if (m == NULL || m->c == NULL)
  2085. rc = MQTTCLIENT_FAILURE;
  2086. else if (m->c->connected == 0)
  2087. rc = MQTTCLIENT_DISCONNECTED;
  2088. else if (!UTF8_validateString(topicName))
  2089. rc = MQTTCLIENT_BAD_UTF8_STRING;
  2090. if (rc != MQTTCLIENT_SUCCESS)
  2091. goto exit;
  2092. /* If outbound queue is full, block until it is not */
  2093. while (m->c->outboundMsgs->count >= m->c->maxInflightMessages ||
  2094. Socket_noPendingWrites(m->c->net.socket) == 0) /* wait until the socket is free of large packets being written */
  2095. {
  2096. if (blocked == 0)
  2097. {
  2098. blocked = 1;
  2099. Log(TRACE_MIN, -1, "Blocking publish on queue full for client %s", m->c->clientID);
  2100. }
  2101. Thread_unlock_mutex(mqttclient_mutex);
  2102. MQTTClient_yield();
  2103. Thread_lock_mutex(mqttclient_mutex);
  2104. if (m->c->connected == 0)
  2105. {
  2106. rc = MQTTCLIENT_FAILURE;
  2107. goto exit;
  2108. }
  2109. }
  2110. if (blocked == 1)
  2111. Log(TRACE_MIN, -1, "Resuming publish now queue not full for client %s", m->c->clientID);
  2112. if (qos > 0 && (msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
  2113. { /* this should never happen as we've waited for spaces in the queue */
  2114. rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
  2115. goto exit;
  2116. }
  2117. if ((p = malloc(sizeof(Publish))) == NULL)
  2118. {
  2119. rc = PAHO_MEMORY_ERROR;
  2120. goto exit_and_free;
  2121. }
  2122. memset(p->mask, '\0', sizeof(p->mask));
  2123. p->payload = NULL;
  2124. p->payloadlen = payloadlen;
  2125. if (payloadlen > 0)
  2126. {
  2127. if ((p->payload = malloc(payloadlen)) == NULL)
  2128. {
  2129. rc = PAHO_MEMORY_ERROR;
  2130. goto exit_and_free;
  2131. }
  2132. memcpy(p->payload, payload, payloadlen);
  2133. }
  2134. if ((p->topic = MQTTStrdup(topicName)) == NULL)
  2135. {
  2136. rc = PAHO_MEMORY_ERROR;
  2137. goto exit_and_free;
  2138. }
  2139. p->msgId = msgid;
  2140. p->MQTTVersion = m->c->MQTTVersion;
  2141. if (m->c->MQTTVersion >= MQTTVERSION_5)
  2142. {
  2143. if (properties)
  2144. p->properties = *properties;
  2145. else
  2146. {
  2147. MQTTProperties props = MQTTProperties_initializer;
  2148. p->properties = props;
  2149. }
  2150. }
  2151. rc = MQTTProtocol_startPublish(m->c, p, qos, retained, &msg);
  2152. /* If the packet was partially written to the socket, wait for it to complete.
  2153. * However, if the client is disconnected during this time and qos is not 0, still return success, as
  2154. * the packet has already been written to persistence and assigned a message id so will
  2155. * be sent when the client next connects.
  2156. */
  2157. if (rc == TCPSOCKET_INTERRUPTED)
  2158. {
  2159. while (m->c->connected == 1)
  2160. {
  2161. pending_writes* writing = NULL;
  2162. Thread_lock_mutex(socket_mutex);
  2163. writing = SocketBuffer_getWrite(m->c->net.socket);
  2164. Thread_unlock_mutex(socket_mutex);
  2165. if (writing == NULL)
  2166. break;
  2167. Thread_unlock_mutex(mqttclient_mutex);
  2168. MQTTClient_yield();
  2169. Thread_lock_mutex(mqttclient_mutex);
  2170. }
  2171. rc = (qos > 0 || m->c->connected == 1) ? MQTTCLIENT_SUCCESS : MQTTCLIENT_FAILURE;
  2172. }
  2173. if (deliveryToken && qos > 0)
  2174. *deliveryToken = msg->msgid;
  2175. exit_and_free:
  2176. if (p)
  2177. {
  2178. if (p->topic)
  2179. free(p->topic);
  2180. if (p->payload)
  2181. free(p->payload);
  2182. free(p);
  2183. }
  2184. if (rc == SOCKET_ERROR)
  2185. {
  2186. MQTTClient_disconnect_internal(handle, 0);
  2187. /* Return success for qos > 0 as the send will be retried automatically */
  2188. rc = (qos > 0) ? MQTTCLIENT_SUCCESS : MQTTCLIENT_FAILURE;
  2189. }
  2190. exit:
  2191. Thread_unlock_mutex(mqttclient_mutex);
  2192. resp.reasonCode = rc;
  2193. FUNC_EXIT_RC(resp.reasonCode);
  2194. return resp;
  2195. }
  2196. int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen, const void* payload,
  2197. int qos, int retained, MQTTClient_deliveryToken* deliveryToken)
  2198. {
  2199. MQTTClients* m = handle;
  2200. MQTTResponse rc = MQTTResponse_initializer;
  2201. if (m->c->MQTTVersion >= MQTTVERSION_5)
  2202. rc.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
  2203. else
  2204. rc = MQTTClient_publish5(handle, topicName, payloadlen, payload, qos, retained, NULL, deliveryToken);
  2205. return rc.reasonCode;
  2206. }
  2207. MQTTResponse MQTTClient_publishMessage5(MQTTClient handle, const char* topicName, MQTTClient_message* message,
  2208. MQTTClient_deliveryToken* deliveryToken)
  2209. {
  2210. MQTTResponse rc = MQTTResponse_initializer;
  2211. MQTTProperties* props = NULL;
  2212. FUNC_ENTRY;
  2213. if (message == NULL)
  2214. {
  2215. rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
  2216. goto exit;
  2217. }
  2218. if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
  2219. (message->struct_version != 0 && message->struct_version != 1))
  2220. {
  2221. rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
  2222. goto exit;
  2223. }
  2224. if (message->struct_version >= 1)
  2225. props = &message->properties;
  2226. rc = MQTTClient_publish5(handle, topicName, message->payloadlen, message->payload,
  2227. message->qos, message->retained, props, deliveryToken);
  2228. exit:
  2229. FUNC_EXIT_RC(rc.reasonCode);
  2230. return rc;
  2231. }
  2232. int MQTTClient_publishMessage(MQTTClient handle, const char* topicName, MQTTClient_message* message,
  2233. MQTTClient_deliveryToken* deliveryToken)
  2234. {
  2235. MQTTClients* m = handle;
  2236. MQTTResponse rc = MQTTResponse_initializer;
  2237. if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
  2238. (message->struct_version != 0 && message->struct_version != 1))
  2239. rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
  2240. else if (m != NULL && m->c != NULL && m->c->MQTTVersion >= MQTTVERSION_5)
  2241. rc.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
  2242. else
  2243. rc = MQTTClient_publishMessage5(handle, topicName, message, deliveryToken);
  2244. return rc.reasonCode;
  2245. }
  2246. static void MQTTClient_retry(void)
  2247. {
  2248. static START_TIME_TYPE last = START_TIME_ZERO;
  2249. START_TIME_TYPE now;
  2250. FUNC_ENTRY;
  2251. now = MQTTTime_now();
  2252. if (MQTTTime_difftime(now, last) >= (DIFF_TIME_TYPE)(retryLoopIntervalms))
  2253. {
  2254. last = MQTTTime_now();
  2255. MQTTProtocol_keepalive(now);
  2256. MQTTProtocol_retry(now, 1, 0);
  2257. }
  2258. else
  2259. MQTTProtocol_retry(now, 0, 0);
  2260. FUNC_EXIT;
  2261. }
  2262. static MQTTPacket* MQTTClient_cycle(SOCKET* sock, ELAPSED_TIME_TYPE timeout, int* rc)
  2263. {
  2264. static Ack ack;
  2265. MQTTPacket* pack = NULL;
  2266. int rc1 = 0;
  2267. START_TIME_TYPE start;
  2268. FUNC_ENTRY;
  2269. #if defined(OPENSSL)
  2270. if ((*sock = SSLSocket_getPendingRead()) == -1)
  2271. {
  2272. /* 0 from getReadySocket indicates no work to do, rc -1 == error */
  2273. #endif
  2274. start = MQTTTime_start_clock();
  2275. *sock = Socket_getReadySocket(0, (int)timeout, socket_mutex, rc);
  2276. *rc = rc1;
  2277. if (*sock == 0 && timeout >= 100L && MQTTTime_elapsed(start) < (int64_t)10)
  2278. MQTTTime_sleep(100L);
  2279. #if defined(OPENSSL)
  2280. }
  2281. #endif
  2282. Thread_lock_mutex(mqttclient_mutex);
  2283. if (*sock > 0 && rc1 == 0)
  2284. {
  2285. MQTTClients* m = NULL;
  2286. if (ListFindItem(handles, sock, clientSockCompare) != NULL)
  2287. m = (MQTTClient)(handles->current->content);
  2288. if (m != NULL)
  2289. {
  2290. if (m->c->connect_state == TCP_IN_PROGRESS || m->c->connect_state == SSL_IN_PROGRESS)
  2291. *rc = 0; /* waiting for connect state to clear */
  2292. else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS)
  2293. *rc = WebSocket_upgrade(&m->c->net);
  2294. else
  2295. {
  2296. pack = MQTTPacket_Factory(m->c->MQTTVersion, &m->c->net, rc);
  2297. if (*rc == TCPSOCKET_INTERRUPTED)
  2298. *rc = 0;
  2299. }
  2300. }
  2301. if (pack)
  2302. {
  2303. int freed = 1;
  2304. /* Note that these handle... functions free the packet structure that they are dealing with */
  2305. if (pack->header.bits.type == PUBLISH)
  2306. *rc = MQTTProtocol_handlePublishes(pack, *sock);
  2307. else if (pack->header.bits.type == PUBACK || pack->header.bits.type == PUBCOMP)
  2308. {
  2309. int msgid;
  2310. ack = (pack->header.bits.type == PUBCOMP) ? *(Pubcomp*)pack : *(Puback*)pack;
  2311. msgid = ack.msgId;
  2312. if (m && m->c->MQTTVersion >= MQTTVERSION_5 && m->published)
  2313. {
  2314. Log(TRACE_MIN, -1, "Calling published for client %s, msgid %d", m->c->clientID, msgid);
  2315. (*(m->published))(m->published_context, msgid, pack->header.bits.type, &ack.properties, ack.rc);
  2316. }
  2317. *rc = (pack->header.bits.type == PUBCOMP) ?
  2318. MQTTProtocol_handlePubcomps(pack, *sock, NULL) : MQTTProtocol_handlePubacks(pack, *sock, NULL);
  2319. if (m && m->dc)
  2320. {
  2321. Log(TRACE_MIN, -1, "Calling deliveryComplete for client %s, msgid %d", m->c->clientID, msgid);
  2322. (*(m->dc))(m->context, msgid);
  2323. }
  2324. }
  2325. else if (pack->header.bits.type == PUBREC)
  2326. {
  2327. Pubrec* pubrec = (Pubrec*)pack;
  2328. if (m && m->c->MQTTVersion >= MQTTVERSION_5 && m->published && pubrec->rc >= MQTTREASONCODE_UNSPECIFIED_ERROR)
  2329. {
  2330. Log(TRACE_MIN, -1, "Calling published for client %s, msgid %d", m->c->clientID, ack.msgId);
  2331. (*(m->published))(m->published_context, pubrec->msgId, pack->header.bits.type,
  2332. &pubrec->properties, pubrec->rc);
  2333. }
  2334. *rc = MQTTProtocol_handlePubrecs(pack, *sock, NULL);
  2335. }
  2336. else if (pack->header.bits.type == PUBREL)
  2337. *rc = MQTTProtocol_handlePubrels(pack, *sock);
  2338. else if (pack->header.bits.type == PINGRESP)
  2339. *rc = MQTTProtocol_handlePingresps(pack, *sock);
  2340. else
  2341. freed = 0;
  2342. if (freed)
  2343. pack = NULL;
  2344. }
  2345. }
  2346. MQTTClient_retry();
  2347. Thread_unlock_mutex(mqttclient_mutex);
  2348. FUNC_EXIT_RC(*rc);
  2349. return pack;
  2350. }
  2351. static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, int64_t timeout)
  2352. {
  2353. MQTTPacket* pack = NULL;
  2354. MQTTClients* m = handle;
  2355. START_TIME_TYPE start = MQTTTime_start_clock();
  2356. int is_running = 0; /* local copy of running */
  2357. FUNC_ENTRY;
  2358. if (((MQTTClients*)handle) == NULL || timeout <= 0L)
  2359. {
  2360. *rc = MQTTCLIENT_FAILURE;
  2361. goto exit;
  2362. }
  2363. Thread_lock_mutex(mqttclient_mutex);
  2364. is_running = running;
  2365. Thread_unlock_mutex(mqttclient_mutex);
  2366. if (is_running)
  2367. {
  2368. if (packet_type == CONNECT)
  2369. {
  2370. if ((*rc = Thread_wait_sem(m->connect_sem, (int)timeout)) == 0)
  2371. *rc = m->rc;
  2372. }
  2373. else if (packet_type == CONNACK)
  2374. *rc = Thread_wait_sem(m->connack_sem, (int)timeout);
  2375. else if (packet_type == SUBACK)
  2376. *rc = Thread_wait_sem(m->suback_sem, (int)timeout);
  2377. else if (packet_type == UNSUBACK)
  2378. *rc = Thread_wait_sem(m->unsuback_sem, (int)timeout);
  2379. if (*rc == 0 && packet_type != CONNECT && m->pack == NULL)
  2380. Log(LOG_ERROR, -1, "waitfor unexpectedly is NULL for client %s, packet_type %d, timeout %ld", m->c->clientID, packet_type, timeout);
  2381. pack = m->pack;
  2382. }
  2383. else
  2384. {
  2385. *rc = TCPSOCKET_COMPLETE;
  2386. while (1)
  2387. {
  2388. SOCKET sock = -1;
  2389. pack = MQTTClient_cycle(&sock, 100L, rc);
  2390. if (sock == m->c->net.socket)
  2391. {
  2392. if (*rc == SOCKET_ERROR)
  2393. break;
  2394. if (pack && (pack->header.bits.type == packet_type))
  2395. break;
  2396. if (m->c->connect_state == TCP_IN_PROGRESS)
  2397. {
  2398. int error;
  2399. socklen_t len = sizeof(error);
  2400. if ((*rc = getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len)) == 0)
  2401. *rc = error;
  2402. break;
  2403. }
  2404. #if defined(OPENSSL)
  2405. else if (m->c->connect_state == SSL_IN_PROGRESS)
  2406. {
  2407. *rc = m->c->sslopts->struct_version >= 3 ?
  2408. SSLSocket_connect(m->c->net.ssl, sock, m->currentServerURI,
  2409. m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
  2410. SSLSocket_connect(m->c->net.ssl, sock, m->currentServerURI,
  2411. m->c->sslopts->verify, NULL, NULL);
  2412. if (*rc == SSL_FATAL)
  2413. break;
  2414. else if (*rc == 1) /* rc == 1 means SSL connect has finished and succeeded */
  2415. {
  2416. if ((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
  2417. m->c->session = SSL_get1_session(m->c->net.ssl);
  2418. break;
  2419. }
  2420. }
  2421. #endif
  2422. else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS && *rc != TCPSOCKET_INTERRUPTED)
  2423. {
  2424. *rc = 1;
  2425. break;
  2426. }
  2427. else if (m->c->connect_state == PROXY_CONNECT_IN_PROGRESS )
  2428. {
  2429. *rc = 1;
  2430. break;
  2431. }
  2432. else if (m->c->connect_state == WAIT_FOR_CONNACK)
  2433. {
  2434. int error;
  2435. socklen_t len = sizeof(error);
  2436. if (getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len) == 0)
  2437. {
  2438. if (error)
  2439. {
  2440. *rc = error;
  2441. break;
  2442. }
  2443. }
  2444. }
  2445. }
  2446. if (MQTTTime_elapsed(start) > (uint64_t)timeout)
  2447. {
  2448. pack = NULL;
  2449. break;
  2450. }
  2451. }
  2452. }
  2453. exit:
  2454. FUNC_EXIT_RC(*rc);
  2455. return pack;
  2456. }
  2457. int MQTTClient_receive(MQTTClient handle, char** topicName, int* topicLen, MQTTClient_message** message,
  2458. unsigned long timeout)
  2459. {
  2460. int rc = TCPSOCKET_COMPLETE;
  2461. START_TIME_TYPE start = MQTTTime_start_clock();
  2462. ELAPSED_TIME_TYPE elapsed = 0L;
  2463. MQTTClients* m = handle;
  2464. FUNC_ENTRY;
  2465. if (m == NULL || m->c == NULL
  2466. || running) /* receive is not meant to be called in a multi-thread environment */
  2467. {
  2468. rc = MQTTCLIENT_FAILURE;
  2469. goto exit;
  2470. }
  2471. if (m->c->connected == 0)
  2472. {
  2473. rc = MQTTCLIENT_DISCONNECTED;
  2474. goto exit;
  2475. }
  2476. *topicName = NULL;
  2477. *message = NULL;
  2478. /* if there is already a message waiting, don't hang around but still do some packet handling */
  2479. if (m->c->messageQueue->count > 0)
  2480. timeout = 0L;
  2481. elapsed = MQTTTime_elapsed(start);
  2482. do
  2483. {
  2484. SOCKET sock = 0;
  2485. MQTTClient_cycle(&sock, (timeout > elapsed) ? timeout - elapsed : 0L, &rc);
  2486. if (rc == SOCKET_ERROR)
  2487. {
  2488. if (ListFindItem(handles, &sock, clientSockCompare) && /* find client corresponding to socket */
  2489. (MQTTClient)(handles->current->content) == handle)
  2490. break; /* there was an error on the socket we are interested in */
  2491. }
  2492. elapsed = MQTTTime_elapsed(start);
  2493. }
  2494. while (elapsed < timeout && m->c->messageQueue->count == 0);
  2495. if (m->c->messageQueue->count > 0)
  2496. rc = MQTTClient_deliverMessage(rc, m, topicName, topicLen, message);
  2497. if (rc == SOCKET_ERROR)
  2498. MQTTClient_disconnect_internal(handle, 0);
  2499. exit:
  2500. FUNC_EXIT_RC(rc);
  2501. return rc;
  2502. }
  2503. void MQTTClient_yield(void)
  2504. {
  2505. START_TIME_TYPE start = MQTTTime_start_clock();
  2506. ELAPSED_TIME_TYPE elapsed = 0L;
  2507. ELAPSED_TIME_TYPE timeout = 100L;
  2508. int rc = 0;
  2509. FUNC_ENTRY;
  2510. if (running) /* yield is not meant to be called in a multi-thread environment */
  2511. {
  2512. MQTTTime_sleep(timeout);
  2513. goto exit;
  2514. }
  2515. elapsed = MQTTTime_elapsed(start);
  2516. do
  2517. {
  2518. SOCKET sock = -1;
  2519. MQTTClient_cycle(&sock, (timeout > elapsed) ? timeout - elapsed : 0L, &rc);
  2520. Thread_lock_mutex(mqttclient_mutex);
  2521. if (rc == SOCKET_ERROR && ListFindItem(handles, &sock, clientSockCompare))
  2522. {
  2523. MQTTClients* m = (MQTTClient)(handles->current->content);
  2524. if (m->c->connect_state != DISCONNECTING)
  2525. MQTTClient_disconnect_internal(m, 0);
  2526. }
  2527. Thread_unlock_mutex(mqttclient_mutex);
  2528. elapsed = MQTTTime_elapsed(start);
  2529. }
  2530. while (elapsed < timeout);
  2531. exit:
  2532. FUNC_EXIT;
  2533. }
  2534. /*
  2535. static int pubCompare(void* a, void* b)
  2536. {
  2537. Messages* msg = (Messages*)a;
  2538. return msg->publish == (Publications*)b;
  2539. }*/
  2540. int MQTTClient_waitForCompletion(MQTTClient handle, MQTTClient_deliveryToken mdt, unsigned long timeout)
  2541. {
  2542. int rc = MQTTCLIENT_FAILURE;
  2543. START_TIME_TYPE start = MQTTTime_start_clock();
  2544. ELAPSED_TIME_TYPE elapsed = 0L;
  2545. MQTTClients* m = handle;
  2546. FUNC_ENTRY;
  2547. Thread_lock_mutex(mqttclient_mutex);
  2548. if (m == NULL || m->c == NULL)
  2549. {
  2550. rc = MQTTCLIENT_FAILURE;
  2551. goto exit;
  2552. }
  2553. elapsed = MQTTTime_elapsed(start);
  2554. while (elapsed < timeout)
  2555. {
  2556. if (m->c->connected == 0)
  2557. {
  2558. rc = MQTTCLIENT_DISCONNECTED;
  2559. goto exit;
  2560. }
  2561. if (ListFindItem(m->c->outboundMsgs, &mdt, messageIDCompare) == NULL)
  2562. {
  2563. rc = MQTTCLIENT_SUCCESS; /* well we couldn't find it */
  2564. goto exit;
  2565. }
  2566. Thread_unlock_mutex(mqttclient_mutex);
  2567. MQTTClient_yield();
  2568. Thread_lock_mutex(mqttclient_mutex);
  2569. elapsed = MQTTTime_elapsed(start);
  2570. }
  2571. exit:
  2572. Thread_unlock_mutex(mqttclient_mutex);
  2573. FUNC_EXIT_RC(rc);
  2574. return rc;
  2575. }
  2576. int MQTTClient_getPendingDeliveryTokens(MQTTClient handle, MQTTClient_deliveryToken **tokens)
  2577. {
  2578. int rc = MQTTCLIENT_SUCCESS;
  2579. MQTTClients* m = handle;
  2580. *tokens = NULL;
  2581. FUNC_ENTRY;
  2582. Thread_lock_mutex(mqttclient_mutex);
  2583. if (m == NULL)
  2584. {
  2585. rc = MQTTCLIENT_FAILURE;
  2586. goto exit;
  2587. }
  2588. if (m->c && m->c->outboundMsgs->count > 0)
  2589. {
  2590. ListElement* current = NULL;
  2591. int count = 0;
  2592. *tokens = malloc(sizeof(MQTTClient_deliveryToken) * (m->c->outboundMsgs->count + 1));
  2593. if (!*tokens)
  2594. {
  2595. rc = PAHO_MEMORY_ERROR;
  2596. goto exit;
  2597. }
  2598. while (ListNextElement(m->c->outboundMsgs, &current))
  2599. {
  2600. Messages* m = (Messages*)(current->content);
  2601. (*tokens)[count++] = m->msgid;
  2602. }
  2603. (*tokens)[count] = -1;
  2604. }
  2605. exit:
  2606. Thread_unlock_mutex(mqttclient_mutex);
  2607. FUNC_EXIT_RC(rc);
  2608. return rc;
  2609. }
  2610. void MQTTClient_setTraceLevel(enum MQTTCLIENT_TRACE_LEVELS level)
  2611. {
  2612. Log_setTraceLevel((enum LOG_LEVELS)level);
  2613. }
  2614. void MQTTClient_setTraceCallback(MQTTClient_traceCallback* callback)
  2615. {
  2616. Log_setTraceCallback((Log_traceCallback*)callback);
  2617. }
  2618. int MQTTClient_setCommandTimeout(MQTTClient handle, unsigned long milliSeconds)
  2619. {
  2620. int rc = MQTTCLIENT_SUCCESS;
  2621. MQTTClients* m = handle;
  2622. FUNC_ENTRY;
  2623. if (milliSeconds < 5000L)
  2624. rc = MQTTCLIENT_FAILURE;
  2625. else
  2626. m->commandTimeout = milliSeconds;
  2627. FUNC_EXIT_RC(rc);
  2628. return rc;
  2629. }
  2630. MQTTClient_nameValue* MQTTClient_getVersionInfo(void)
  2631. {
  2632. #define MAX_INFO_STRINGS 8
  2633. static MQTTClient_nameValue libinfo[MAX_INFO_STRINGS + 1];
  2634. int i = 0;
  2635. libinfo[i].name = "Product name";
  2636. libinfo[i++].value = "Eclipse Paho Synchronous MQTT C Client Library";
  2637. libinfo[i].name = "Version";
  2638. libinfo[i++].value = CLIENT_VERSION;
  2639. libinfo[i].name = "Build level";
  2640. libinfo[i++].value = BUILD_TIMESTAMP;
  2641. #if defined(OPENSSL)
  2642. libinfo[i].name = "OpenSSL version";
  2643. libinfo[i++].value = SSLeay_version(SSLEAY_VERSION);
  2644. libinfo[i].name = "OpenSSL flags";
  2645. libinfo[i++].value = SSLeay_version(SSLEAY_CFLAGS);
  2646. libinfo[i].name = "OpenSSL build timestamp";
  2647. libinfo[i++].value = SSLeay_version(SSLEAY_BUILT_ON);
  2648. libinfo[i].name = "OpenSSL platform";
  2649. libinfo[i++].value = SSLeay_version(SSLEAY_PLATFORM);
  2650. libinfo[i].name = "OpenSSL directory";
  2651. libinfo[i++].value = SSLeay_version(SSLEAY_DIR);
  2652. #endif
  2653. libinfo[i].name = NULL;
  2654. libinfo[i].value = NULL;
  2655. return libinfo;
  2656. }
  2657. const char* MQTTClient_strerror(int code)
  2658. {
  2659. static char buf[30];
  2660. int chars = 0;
  2661. switch (code) {
  2662. case MQTTCLIENT_SUCCESS:
  2663. return "Success";
  2664. case MQTTCLIENT_FAILURE:
  2665. return "Failure";
  2666. case MQTTCLIENT_DISCONNECTED:
  2667. return "Disconnected";
  2668. case MQTTCLIENT_MAX_MESSAGES_INFLIGHT:
  2669. return "Maximum in-flight messages amount reached";
  2670. case MQTTCLIENT_BAD_UTF8_STRING:
  2671. return "Invalid UTF8 string";
  2672. case MQTTCLIENT_NULL_PARAMETER:
  2673. return "Invalid (NULL) parameter";
  2674. case MQTTCLIENT_TOPICNAME_TRUNCATED:
  2675. return "Topic containing NULL characters has been truncated";
  2676. case MQTTCLIENT_BAD_STRUCTURE:
  2677. return "Bad structure";
  2678. case MQTTCLIENT_BAD_QOS:
  2679. return "Invalid QoS value";
  2680. case MQTTCLIENT_SSL_NOT_SUPPORTED:
  2681. return "SSL is not supported";
  2682. case MQTTCLIENT_BAD_MQTT_VERSION:
  2683. return "Unrecognized MQTT version";
  2684. case MQTTCLIENT_BAD_PROTOCOL:
  2685. return "Invalid protocol scheme";
  2686. case MQTTCLIENT_BAD_MQTT_OPTION:
  2687. return "Options for wrong MQTT version";
  2688. case MQTTCLIENT_WRONG_MQTT_VERSION:
  2689. return "Client created for another version of MQTT";
  2690. case MQTTCLIENT_0_LEN_WILL_TOPIC:
  2691. return "Zero length will topic on connect";
  2692. }
  2693. chars = snprintf(buf, sizeof(buf), "Unknown error code %d", code);
  2694. if (chars >= sizeof(buf))
  2695. {
  2696. buf[sizeof(buf)-1] = '\0';
  2697. Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
  2698. }
  2699. return buf;
  2700. }
  2701. /**
  2702. * See if any pending writes have been completed, and cleanup if so.
  2703. * Cleaning up means removing any publication data that was stored because the write did
  2704. * not originally complete.
  2705. */
  2706. static void MQTTProtocol_checkPendingWrites(void)
  2707. {
  2708. FUNC_ENTRY;
  2709. if (state.pending_writes.count > 0)
  2710. {
  2711. ListElement* le = state.pending_writes.first;
  2712. while (le)
  2713. {
  2714. if (Socket_noPendingWrites(((pending_write*)(le->content))->socket))
  2715. {
  2716. MQTTProtocol_removePublication(((pending_write*)(le->content))->p);
  2717. state.pending_writes.current = le;
  2718. ListRemove(&(state.pending_writes), le->content); /* does NextElement itself */
  2719. le = state.pending_writes.current;
  2720. }
  2721. else
  2722. ListNextElement(&(state.pending_writes), &le);
  2723. }
  2724. }
  2725. FUNC_EXIT;
  2726. }
  2727. static void MQTTClient_writeComplete(SOCKET socket, int rc)
  2728. {
  2729. ListElement* found = NULL;
  2730. FUNC_ENTRY;
  2731. /* a partial write is now complete for a socket - this will be on a publish*/
  2732. MQTTProtocol_checkPendingWrites();
  2733. /* find the client using this socket */
  2734. if ((found = ListFindItem(handles, &socket, clientSockCompare)) != NULL)
  2735. {
  2736. MQTTClients* m = (MQTTClients*)(found->content);
  2737. m->c->net.lastSent = MQTTTime_now();
  2738. }
  2739. FUNC_EXIT;
  2740. }
  2741. static void MQTTClient_writeContinue(SOCKET socket)
  2742. {
  2743. ListElement* found = NULL;
  2744. if ((found = ListFindItem(handles, &socket, clientSockCompare)) != NULL)
  2745. {
  2746. MQTTClients* m = (MQTTClients*)(found->content);
  2747. m->c->net.lastSent = MQTTTime_now();
  2748. }
  2749. }