MQTTAsync.c 119 KB

  1. /*******************************************************************************
  2. * Copyright (c) 2009, 2019 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. *
  10. * and the Eclipse Distribution License is available at
  11. *
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial implementation and documentation
  15. * Ian Craggs, Allan Stockdill-Mander - SSL support
  16. * Ian Craggs - multiple server connection support
  17. * Ian Craggs - fix for bug 413429 - connectionLost not called
  18. * Ian Craggs - fix for bug 415042 - using already freed structure
  19. * Ian Craggs - fix for bug 419233 - mutexes not reporting errors
  20. * Ian Craggs - fix for bug 420851
  21. * Ian Craggs - fix for bug 432903 - queue persistence
  22. * Ian Craggs - MQTT 3.1.1 support
  23. * Rong Xiang, Ian Craggs - C++ compatibility
  24. * Ian Craggs - fix for bug 442400: reconnecting after network cable unplugged
  25. * Ian Craggs - fix for bug 444934 - incorrect free in freeCommand1
  26. * Ian Craggs - fix for bug 445891 - assigning msgid is not thread safe
  27. * Ian Craggs - fix for bug 465369 - longer latency than expected
  28. * Ian Craggs - fix for bug 444103 - success/failure callbacks not invoked
  29. * Ian Craggs - fix for bug 484363 - segfault in getReadySocket
  30. * Ian Craggs - automatic reconnect and offline buffering (send while disconnected)
  31. * Ian Craggs - fix for bug 472250
  32. * Ian Craggs - fix for bug 486548
  33. * Ian Craggs - SNI support
  34. * Ian Craggs - auto reconnect timing fix #218
  35. * Ian Craggs - fix for issue #190
  36. * Ian Craggs - check for NULL SSL options #334
  37. * Ian Craggs - allocate username/password buffers #431
  38. * Ian Craggs - MQTT 5.0 support
  39. *******************************************************************************/
  40. /**
  41. * @file
  42. * \brief Asynchronous API implementation
  43. *
  44. */
  45. #define _GNU_SOURCE /* for pthread_mutexattr_settype */
  46. #include <stdlib.h>
  47. #include <string.h>
  48. #if !defined(WIN32) && !defined(WIN64)
  49. #include <sys/time.h>
  50. #endif
  51. #if !defined(NO_PERSISTENCE)
  52. #include "MQTTPersistence.h"
  53. #endif
  54. #include "MQTTAsync.h"
  55. #include "utf-8.h"
  56. #include "MQTTProtocol.h"
  57. #include "MQTTProtocolOut.h"
  58. #include "Thread.h"
  59. #include "SocketBuffer.h"
  60. #include "StackTrace.h"
  61. #include "Heap.h"
  62. #include "OsWrapper.h"
  63. #include "WebSocket.h"
  64. #define URI_TCP "tcp://"
  65. #define URI_WS "ws://"
  66. #define URI_WSS "wss://"
  67. #include "VersionInfo.h"
  68. const char *client_timestamp_eye = "MQTTAsyncV3_Timestamp " BUILD_TIMESTAMP;
  69. const char *client_version_eye = "MQTTAsyncV3_Version " CLIENT_VERSION;
  70. void MQTTAsync_global_init(MQTTAsync_init_options* inits)
  71. {
  72. #if defined(OPENSSL)
  73. SSLSocket_handleOpensslInit(inits->do_openssl_init);
  74. #endif
  75. }
  76. #if !defined(min)
  77. #define min(a, b) (((a) < (b)) ? (a) : (b))
  78. #endif
  79. static ClientStates ClientState =
  80. {
  81. CLIENT_VERSION, /* version */
  82. NULL /* client list */
  83. };
  84. ClientStates* bstate = &ClientState;
  85. MQTTProtocol state;
  86. enum MQTTAsync_threadStates
  87. {
  89. };
  90. enum MQTTAsync_threadStates sendThread_state = STOPPED;
  91. enum MQTTAsync_threadStates receiveThread_state = STOPPED;
  92. static thread_id_type sendThread_id = 0,
  93. receiveThread_id = 0;
  94. #if defined(WIN32) || defined(WIN64)
  95. static mutex_type mqttasync_mutex = NULL;
  96. static mutex_type socket_mutex = NULL;
  97. static mutex_type mqttcommand_mutex = NULL;
  98. static sem_type send_sem = NULL;
  99. extern mutex_type stack_mutex;
  100. extern mutex_type heap_mutex;
  101. extern mutex_type log_mutex;
  102. BOOL APIENTRY DllMain(HANDLE hModule,
  103. DWORD ul_reason_for_call,
  104. LPVOID lpReserved)
  105. {
  106. switch (ul_reason_for_call)
  107. {
  109. Log(TRACE_MAX, -1, "DLL process attach");
  110. if (mqttasync_mutex == NULL)
  111. {
  112. mqttasync_mutex = CreateMutex(NULL, 0, NULL);
  113. mqttcommand_mutex = CreateMutex(NULL, 0, NULL);
  114. send_sem = CreateEvent(
  115. NULL, /* default security attributes */
  116. FALSE, /* manual-reset event? */
  117. FALSE, /* initial state is nonsignaled */
  118. NULL /* object name */
  119. );
  120. stack_mutex = CreateMutex(NULL, 0, NULL);
  121. heap_mutex = CreateMutex(NULL, 0, NULL);
  122. log_mutex = CreateMutex(NULL, 0, NULL);
  123. socket_mutex = CreateMutex(NULL, 0, NULL);
  124. }
  125. case DLL_THREAD_ATTACH:
  126. Log(TRACE_MAX, -1, "DLL thread attach");
  127. case DLL_THREAD_DETACH:
  128. Log(TRACE_MAX, -1, "DLL thread detach");
  130. Log(TRACE_MAX, -1, "DLL process detach");
  131. }
  132. return TRUE;
  133. }
  134. #else
  135. static pthread_mutex_t mqttasync_mutex_store = PTHREAD_MUTEX_INITIALIZER;
  136. static mutex_type mqttasync_mutex = &mqttasync_mutex_store;
  137. static pthread_mutex_t socket_mutex_store = PTHREAD_MUTEX_INITIALIZER;
  138. static mutex_type socket_mutex = &socket_mutex_store;
  139. static pthread_mutex_t mqttcommand_mutex_store = PTHREAD_MUTEX_INITIALIZER;
  140. static mutex_type mqttcommand_mutex = &mqttcommand_mutex_store;
  141. static cond_type_struct send_cond_store = { PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER };
  142. static cond_type send_cond = &send_cond_store;
  143. void MQTTAsync_init(void)
  144. {
  145. pthread_mutexattr_t attr;
  146. int rc;
  147. pthread_mutexattr_init(&attr);
  148. #if !defined(_WRS_KERNEL)
  149. pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
  150. #else
  151. /* #warning "no pthread_mutexattr_settype" */
  152. #endif
  153. if ((rc = pthread_mutex_init(mqttasync_mutex, &attr)) != 0)
  154. printf("MQTTAsync: error %d initializing async_mutex\n", rc);
  155. if ((rc = pthread_mutex_init(mqttcommand_mutex, &attr)) != 0)
  156. printf("MQTTAsync: error %d initializing command_mutex\n", rc);
  157. if ((rc = pthread_mutex_init(socket_mutex, &attr)) != 0)
  158. printf("MQTTClient: error %d initializing socket_mutex\n", rc);
  159. if ((rc = pthread_cond_init(&send_cond->cond, NULL)) != 0)
  160. printf("MQTTAsync: error %d initializing send_cond cond\n", rc);
  161. if ((rc = pthread_mutex_init(&send_cond->mutex, &attr)) != 0)
  162. printf("MQTTAsync: error %d initializing send_cond mutex\n", rc);
  163. }
  164. #define WINAPI
  165. #endif
  166. static volatile int global_initialized = 0;
  167. static List* handles = NULL;
  168. static int tostop = 0;
  169. static List* commands = NULL;
  170. #if defined(WIN32) || defined(WIN64)
  171. #define START_TIME_TYPE DWORD
  172. START_TIME_TYPE MQTTAsync_start_clock(void)
  173. {
  174. return GetTickCount();
  175. }
  176. #elif defined(AIX)
  177. #define START_TIME_TYPE struct timespec
  178. START_TIME_TYPE MQTTAsync_start_clock(void)
  179. {
  180. static struct timespec start;
  181. clock_gettime(CLOCK_MONOTONIC, &start);
  182. return start;
  183. }
  184. #else
  185. #define START_TIME_TYPE struct timeval
  186. START_TIME_TYPE MQTTAsync_start_clock(void)
  187. {
  188. static struct timeval start;
  189. static struct timespec start_ts;
  190. clock_gettime(CLOCK_MONOTONIC, &start_ts);
  191. start.tv_sec = start_ts.tv_sec;
  192. start.tv_usec = start_ts.tv_nsec / 1000;
  193. return start;
  194. }
  195. #endif
  196. #if defined(WIN32) || defined(WIN64)
  197. void MQTTAsync_init_rand(void)
  198. {
  199. START_TIME_TYPE now = MQTTAsync_start_clock();
  200. srand(now);
  201. }
  202. #elif defined(AIX)
  203. void MQTTAsync_init_rand(void)
  204. {
  205. START_TIME_TYPE now = MQTTAsync_start_clock();
  206. srand(now.tv_nsec);
  207. }
  208. #else
  209. void MQTTAsync_init_rand(void)
  210. {
  211. START_TIME_TYPE now = MQTTAsync_start_clock();
  212. srand(now.tv_usec);
  213. }
  214. #endif
  215. #if defined(WIN32) || defined(WIN64)
  216. long MQTTAsync_elapsed(DWORD milliseconds)
  217. {
  218. return GetTickCount() - milliseconds;
  219. }
  220. #elif defined(AIX)
  221. #define assert(a)
  222. long MQTTAsync_elapsed(struct timespec start)
  223. {
  224. struct timespec now, res;
  225. clock_gettime(CLOCK_MONOTONIC, &now);
  226. ntimersub(now, start, res);
  227. return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
  228. }
  229. #else
  230. long MQTTAsync_elapsed(struct timeval start)
  231. {
  232. struct timeval now, res;
  233. static struct timespec now_ts;
  234. clock_gettime(CLOCK_MONOTONIC, &now_ts);
  235. now.tv_sec = now_ts.tv_sec;
  236. now.tv_usec = now_ts.tv_nsec / 1000;
  237. timersub(&now, &start, &res);
  238. return (res.tv_sec)*1000 + (res.tv_usec)/1000;
  239. }
  240. #endif
  241. typedef struct
  242. {
  243. MQTTAsync_message* msg;
  244. char* topicName;
  245. int topicLen;
  246. unsigned int seqno; /* only used on restore */
  247. } qEntry;
  248. typedef struct
  249. {
  250. int type;
  251. MQTTAsync_onSuccess* onSuccess;
  252. MQTTAsync_onFailure* onFailure;
  253. MQTTAsync_onSuccess5* onSuccess5;
  254. MQTTAsync_onFailure5* onFailure5;
  255. MQTTAsync_token token;
  256. void* context;
  257. START_TIME_TYPE start_time;
  258. MQTTProperties properties;
  259. union
  260. {
  261. struct
  262. {
  263. int count;
  264. char** topics;
  265. int* qoss;
  266. MQTTSubscribe_options opts;
  267. MQTTSubscribe_options* optlist;
  268. } sub;
  269. struct
  270. {
  271. int count;
  272. char** topics;
  273. } unsub;
  274. struct
  275. {
  276. char* destinationName;
  277. int payloadlen;
  278. void* payload;
  279. int qos;
  280. int retained;
  281. } pub;
  282. struct
  283. {
  284. int internal;
  285. int timeout;
  286. enum MQTTReasonCodes reasonCode;
  287. } dis;
  288. struct
  289. {
  290. int currentURI;
  291. int MQTTVersion; /**< current MQTT version being used to connect */
  292. } conn;
  293. } details;
  294. } MQTTAsync_command;
  295. typedef struct MQTTAsync_struct
  296. {
  297. char* serverURI;
  298. int ssl;
  299. int websocket;
  300. Clients* c;
  301. /* "Global", to the client, callback definitions */
  302. MQTTAsync_connectionLost* cl;
  303. MQTTAsync_messageArrived* ma;
  304. MQTTAsync_deliveryComplete* dc;
  305. void* clContext; /* the context to be associated with the conn lost callback*/
  306. void* maContext; /* the context to be associated with the msg arrived callback*/
  307. void* dcContext; /* the context to be associated with the deliv complete callback*/
  308. MQTTAsync_connected* connected;
  309. void* connected_context; /* the context to be associated with the connected callback*/
  310. MQTTAsync_disconnected* disconnected;
  311. void* disconnected_context; /* the context to be associated with the disconnected callback*/
  312. /* Each time connect is called, we store the options that were used. These are reused in
  313. any call to reconnect, or an automatic reconnect attempt */
  314. MQTTAsync_command connect; /* Connect operation properties */
  315. MQTTAsync_command disconnect; /* Disconnect operation properties */
  316. MQTTAsync_command* pending_write; /* Is there a socket write pending? */
  317. List* responses;
  318. unsigned int command_seqno;
  319. MQTTPacket* pack;
  320. /* added for offline buffering */
  321. MQTTAsync_createOptions* createOptions;
  322. int shouldBeConnected;
  323. /* added for automatic reconnect */
  324. int automaticReconnect;
  325. int minRetryInterval;
  326. int maxRetryInterval;
  327. int serverURIcount;
  328. char** serverURIs;
  329. int connectTimeout;
  330. int currentInterval;
  331. int currentIntervalBase;
  332. START_TIME_TYPE lastConnectionFailedTime;
  333. int retrying;
  334. int reconnectNow;
  335. /* MQTT V5 properties */
  336. MQTTProperties* connectProps;
  337. MQTTProperties* willProps;
  338. } MQTTAsyncs;
  339. typedef struct
  340. {
  341. MQTTAsync_command command;
  342. MQTTAsyncs* client;
  343. unsigned int seqno; /* only used on restore */
  344. } MQTTAsync_queuedCommand;
  345. static int clientSockCompare(void* a, void* b);
  346. static void MQTTAsync_lock_mutex(mutex_type amutex);
  347. static void MQTTAsync_unlock_mutex(mutex_type amutex);
  348. static int MQTTAsync_checkConn(MQTTAsync_command* command, MQTTAsyncs* client);
  349. static void MQTTAsync_terminate(void);
  350. #if !defined(NO_PERSISTENCE)
  351. static int MQTTAsync_unpersistCommand(MQTTAsync_queuedCommand* qcmd);
  352. static int MQTTAsync_persistCommand(MQTTAsync_queuedCommand* qcmd);
  353. static MQTTAsync_queuedCommand* MQTTAsync_restoreCommand(char* buffer, int buflen, int MQTTVersion);
  354. /*static void MQTTAsync_insertInOrder(List* list, void* content, int size);*/
  355. static int MQTTAsync_restoreCommands(MQTTAsyncs* client);
  356. #endif
  357. static int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_size);
  358. static void MQTTAsync_startConnectRetry(MQTTAsyncs* m);
  359. static void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* command);
  360. static void MQTTProtocol_checkPendingWrites(void);
  361. static void MQTTAsync_freeServerURIs(MQTTAsyncs* m);
  362. static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command);
  363. static void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command);
  364. static void MQTTAsync_writeComplete(int socket, int rc);
  365. static int MQTTAsync_processCommand(void);
  366. static void MQTTAsync_checkTimeouts(void);
  367. static thread_return_type WINAPI MQTTAsync_sendThread(void* n);
  368. static void MQTTAsync_emptyMessageQueue(Clients* client);
  369. static void MQTTAsync_removeResponsesAndCommands(MQTTAsyncs* m);
  370. static int MQTTAsync_completeConnection(MQTTAsyncs* m, Connack* connack);
  371. static thread_return_type WINAPI MQTTAsync_receiveThread(void* n);
  372. static void MQTTAsync_stop(void);
  373. static void MQTTAsync_closeOnly(Clients* client, enum MQTTReasonCodes reasonCode, MQTTProperties* props);
  374. static void MQTTAsync_closeSession(Clients* client, enum MQTTReasonCodes reasonCode, MQTTProperties* props);
  375. static int clientStructCompare(void* a, void* b);
  376. static int MQTTAsync_cleanSession(Clients* client);
  377. static int MQTTAsync_deliverMessage(MQTTAsyncs* m, char* topicName, size_t topicLen, MQTTAsync_message* mm);
  378. static int MQTTAsync_disconnect1(MQTTAsync handle, const MQTTAsync_disconnectOptions* options, int internal);
  379. static int MQTTAsync_disconnect_internal(MQTTAsync handle, int timeout);
  380. static int cmdMessageIDCompare(void* a, void* b);
  381. static int MQTTAsync_assignMsgId(MQTTAsyncs* m);
  382. static int MQTTAsync_countBufferedMessages(MQTTAsyncs* m);
  383. static void MQTTAsync_retry(void);
  384. static int MQTTAsync_connecting(MQTTAsyncs* m);
  385. static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc);
  386. /*static int pubCompare(void* a, void* b);*/
  387. void MQTTAsync_sleep(long milliseconds)
  388. {
  389. FUNC_ENTRY;
  390. #if defined(WIN32) || defined(WIN64)
  391. Sleep(milliseconds);
  392. #else
  393. usleep(milliseconds*1000);
  394. #endif
  395. FUNC_EXIT;
  396. }
  397. // Add random amount of jitter for exponential backoff on retry
  398. // Jitter value will be +/- 20% of "base" interval, including max interval
  399. //
  400. //
  401. int MQTTAsync_randomJitter(int currentIntervalBase, int minInterval, int maxInterval)
  402. {
  403. int max_sleep = min(maxInterval, currentIntervalBase) * 1.2; // (e.g. 72 if base > 60)
  404. int min_sleep = max(minInterval, currentIntervalBase) / 1.2; // (e.g. 48 if base > 60)
  405. if (min_sleep >= max_sleep) // shouldn't happen, but just incase
  406. {
  407. return min_sleep;
  408. }
  409. // random_between(min_sleep, max_sleep)
  410. //
  411. int r;
  412. int range = max_sleep - min_sleep + 1;
  413. if (range > RAND_MAX)
  414. {
  415. range = RAND_MAX;
  416. }
  417. int buckets = RAND_MAX / range;
  418. int limit = buckets * range;
  419. /* Create equal size buckets all in a row, then fire randomly towards
  420. * the buckets until you land in one of them. All buckets are equally
  421. * likely. If you land off the end of the line of buckets, try again. */
  422. do
  423. {
  424. r = rand();
  425. } while (r >= limit);
  426. int randResult = r / buckets;
  427. return min_sleep + randResult;
  428. }
  429. /**
  430. * List callback function for comparing clients by socket
  431. * @param a first integer value
  432. * @param b second integer value
  433. * @return boolean indicating whether a and b are equal
  434. */
  435. static int clientSockCompare(void* a, void* b)
  436. {
  437. MQTTAsyncs* m = (MQTTAsyncs*)a;
  438. return m->c->net.socket == *(int*)b;
  439. }
  440. static void MQTTAsync_lock_mutex(mutex_type amutex)
  441. {
  442. int rc = Thread_lock_mutex(amutex);
  443. if (rc != 0)
  444. Log(LOG_ERROR, 0, "Error %s locking mutex", strerror(rc));
  445. }
  446. static void MQTTAsync_unlock_mutex(mutex_type amutex)
  447. {
  448. int rc = Thread_unlock_mutex(amutex);
  449. if (rc != 0)
  450. Log(LOG_ERROR, 0, "Error %s unlocking mutex", strerror(rc));
  451. }
  452. /*
  453. Check whether there are any more connect options. If not then we are finished
  454. with connect attempts.
  455. */
  456. static int MQTTAsync_checkConn(MQTTAsync_command* command, MQTTAsyncs* client)
  457. {
  458. int rc;
  459. FUNC_ENTRY;
  460. rc = command->details.conn.currentURI + 1 < client->serverURIcount ||
  461. (command->details.conn.MQTTVersion == 4 && client->c->MQTTVersion == MQTTVERSION_DEFAULT);
  462. FUNC_EXIT_RC(rc);
  463. return rc;
  464. }
  465. int MQTTAsync_createWithOptions(MQTTAsync* handle, const char* serverURI, const char* clientId,
  466. int persistence_type, void* persistence_context, MQTTAsync_createOptions* options)
  467. {
  468. int rc = 0;
  469. MQTTAsyncs *m = NULL;
  470. FUNC_ENTRY;
  471. MQTTAsync_lock_mutex(mqttasync_mutex);
  472. if (serverURI == NULL || clientId == NULL)
  473. {
  475. goto exit;
  476. }
  477. if (!UTF8_validateString(clientId))
  478. {
  480. goto exit;
  481. }
  482. if (strstr(serverURI, "://") != NULL)
  483. {
  484. if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) != 0
  485. && strncmp(URI_WS, serverURI, strlen(URI_WS)) != 0
  486. #if defined(OPENSSL)
  487. && strncmp(URI_SSL, serverURI, strlen(URI_SSL)) != 0
  488. && strncmp(URI_WSS, serverURI, strlen(URI_WSS)) != 0
  489. #endif
  490. )
  491. {
  493. goto exit;
  494. }
  495. }
  496. if (options && (strncmp(options->struct_id, "MQCO", 4) != 0 ||
  497. options->struct_version < 0 || options->struct_version > 1))
  498. {
  500. goto exit;
  501. }
  502. if (!global_initialized)
  503. {
  504. #if defined(HEAP_H)
  505. Heap_initialize();
  506. #endif
  507. Log_initialize((Log_nameValue*)MQTTAsync_getVersionInfo());
  508. bstate->clients = ListInitialize();
  509. Socket_outInitialize();
  510. Socket_setWriteCompleteCallback(MQTTAsync_writeComplete);
  511. handles = ListInitialize();
  512. commands = ListInitialize();
  513. #if defined(OPENSSL)
  514. SSLSocket_initialize();
  515. #endif
  516. global_initialized = 1;
  517. }
  518. m = malloc(sizeof(MQTTAsyncs));
  519. *handle = m;
  520. memset(m, '\0', sizeof(MQTTAsyncs));
  521. if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
  522. serverURI += strlen(URI_TCP);
  523. else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
  524. {
  525. serverURI += strlen(URI_WS);
  526. m->websocket = 1;
  527. }
  528. #if defined(OPENSSL)
  529. else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
  530. {
  531. serverURI += strlen(URI_SSL);
  532. m->ssl = 1;
  533. }
  534. else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
  535. {
  536. serverURI += strlen(URI_WSS);
  537. m->ssl = 1;
  538. m->websocket = 1;
  539. }
  540. #endif
  541. m->serverURI = MQTTStrdup(serverURI);
  542. m->responses = ListInitialize();
  543. ListAppend(handles, m, sizeof(MQTTAsyncs));
  544. m->c = malloc(sizeof(Clients));
  545. memset(m->c, '\0', sizeof(Clients));
  546. m->c->context = m;
  547. m->c->outboundMsgs = ListInitialize();
  548. m->c->inboundMsgs = ListInitialize();
  549. m->c->messageQueue = ListInitialize();
  550. m->c->clientID = MQTTStrdup(clientId);
  551. m->c->MQTTVersion = MQTTVERSION_DEFAULT;
  552. m->shouldBeConnected = 0;
  553. if (options)
  554. {
  555. m->createOptions = malloc(sizeof(MQTTAsync_createOptions));
  556. memcpy(m->createOptions, options, sizeof(MQTTAsync_createOptions));
  557. if (options->struct_version > 0)
  558. m->c->MQTTVersion = options->MQTTVersion;
  559. }
  560. #if !defined(NO_PERSISTENCE)
  561. rc = MQTTPersistence_create(&(m->c->persistence), persistence_type, persistence_context);
  562. if (rc == 0)
  563. {
  564. rc = MQTTPersistence_initialize(m->c, m->serverURI);
  565. if (rc == 0)
  566. {
  567. MQTTAsync_restoreCommands(m);
  568. MQTTPersistence_restoreMessageQueue(m->c);
  569. }
  570. }
  571. #endif
  572. ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List));
  573. exit:
  574. MQTTAsync_unlock_mutex(mqttasync_mutex);
  575. FUNC_EXIT_RC(rc);
  576. return rc;
  577. }
  578. int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId,
  579. int persistence_type, void* persistence_context)
  580. {
  581. MQTTAsync_init_rand();
  582. return MQTTAsync_createWithOptions(handle, serverURI, clientId, persistence_type,
  583. persistence_context, NULL);
  584. }
  585. static void MQTTAsync_terminate(void)
  586. {
  587. FUNC_ENTRY;
  588. MQTTAsync_stop();
  589. if (global_initialized)
  590. {
  591. ListElement* elem = NULL;
  592. ListFree(bstate->clients);
  593. ListFree(handles);
  594. while (ListNextElement(commands, &elem))
  595. MQTTAsync_freeCommand1((MQTTAsync_queuedCommand*)(elem->content));
  596. ListFree(commands);
  597. handles = NULL;
  598. WebSocket_terminate();
  599. #if defined(HEAP_H)
  600. Heap_terminate();
  601. #endif
  602. Log_terminate();
  603. global_initialized = 0;
  604. }
  605. FUNC_EXIT;
  606. }
  607. #if !defined(NO_PERSISTENCE)
  608. static int MQTTAsync_unpersistCommand(MQTTAsync_queuedCommand* qcmd)
  609. {
  610. int rc = 0;
  611. char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
  612. FUNC_ENTRY;
  613. if (qcmd->client->c->MQTTVersion >= MQTTVERSION_5)
  614. sprintf(key, "%s%u", PERSISTENCE_V5_COMMAND_KEY, qcmd->seqno);
  615. else
  616. sprintf(key, "%s%u", PERSISTENCE_COMMAND_KEY, qcmd->seqno);
  617. if ((rc = qcmd->client->c->persistence->premove(qcmd->client->c->phandle, key)) != 0)
  618. Log(LOG_ERROR, 0, "Error %d removing command from persistence", rc);
  619. FUNC_EXIT_RC(rc);
  620. return rc;
  621. }
  622. static int MQTTAsync_persistCommand(MQTTAsync_queuedCommand* qcmd)
  623. {
  624. int rc = 0;
  625. MQTTAsyncs* aclient = qcmd->client;
  626. MQTTAsync_command* command = &qcmd->command;
  627. int* lens = NULL;
  628. void** bufs = NULL;
  629. int bufindex = 0, i, nbufs = 0;
  630. char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
  631. int props_allocated = 0;
  632. int process = 1;
  633. FUNC_ENTRY;
  634. switch (command->type)
  635. {
  636. case SUBSCRIBE:
  637. nbufs = ((aclient->c->MQTTVersion >= MQTTVERSION_5) ? 4 : 3) +
  638. (command->details.sub.count * 2);
  639. lens = (int*)malloc(nbufs * sizeof(int));
  640. bufs = malloc(nbufs * sizeof(char *));
  641. bufs[bufindex] = &command->type;
  642. lens[bufindex++] = sizeof(command->type);
  643. bufs[bufindex] = &command->token;
  644. lens[bufindex++] = sizeof(command->token);
  645. bufs[bufindex] = &command->details.sub.count;
  646. lens[bufindex++] = sizeof(command->details.sub.count);
  647. for (i = 0; i < command->details.sub.count; ++i)
  648. {
  649. bufs[bufindex] = command->details.sub.topics[i];
  650. lens[bufindex++] = (int)strlen(command->details.sub.topics[i]) + 1;
  651. if (aclient->c->MQTTVersion < MQTTVERSION_5)
  652. {
  653. bufs[bufindex] = &command->details.sub.qoss[i];
  654. lens[bufindex++] = sizeof(command->details.sub.qoss[i]);
  655. }
  656. else
  657. {
  658. if (command->details.sub.count == 1)
  659. {
  660. bufs[bufindex] = &command->details.sub.opts;
  661. lens[bufindex++] = sizeof(command->details.sub.opts);
  662. }
  663. else
  664. {
  665. bufs[bufindex] = &command->details.sub.optlist[i];
  666. lens[bufindex++] = sizeof(command->details.sub.optlist[i]);
  667. }
  668. }
  669. }
  670. break;
  671. case UNSUBSCRIBE:
  672. nbufs = ((aclient->c->MQTTVersion >= MQTTVERSION_5) ? 4 : 3) +
  673. command->details.unsub.count;
  674. lens = (int*)malloc(nbufs * sizeof(int));
  675. bufs = malloc(nbufs * sizeof(char *));
  676. bufs[bufindex] = &command->type;
  677. lens[bufindex++] = sizeof(command->type);
  678. bufs[bufindex] = &command->token;
  679. lens[bufindex++] = sizeof(command->token);
  680. bufs[bufindex] = &command->details.unsub.count;
  681. lens[bufindex++] = sizeof(command->details.unsub.count);
  682. for (i = 0; i < command->details.unsub.count; ++i)
  683. {
  684. bufs[bufindex] = command->details.unsub.topics[i];
  685. lens[bufindex++] = (int)strlen(command->details.unsub.topics[i]) + 1;
  686. }
  687. break;
  688. case PUBLISH:
  689. nbufs = (aclient->c->MQTTVersion >= MQTTVERSION_5) ? 8 : 7;
  690. lens = (int*)malloc(nbufs * sizeof(int));
  691. bufs = malloc(nbufs * sizeof(char *));
  692. bufs[bufindex] = &command->type;
  693. lens[bufindex++] = sizeof(command->type);
  694. bufs[bufindex] = &command->token;
  695. lens[bufindex++] = sizeof(command->token);
  696. bufs[bufindex] = command->;
  697. lens[bufindex++] = (int)strlen(command-> + 1;
  698. bufs[bufindex] = &command->;
  699. lens[bufindex++] = sizeof(command->;
  700. bufs[bufindex] = command->;
  701. lens[bufindex++] = command->;
  702. bufs[bufindex] = &command->;
  703. lens[bufindex++] = sizeof(command->;
  704. bufs[bufindex] = &command->;
  705. lens[bufindex++] = sizeof(command->;
  706. break;
  707. default:
  708. process = 0;
  709. break;
  710. }
  711. if (aclient->c->MQTTVersion >= MQTTVERSION_5 && process) /* persist properties */
  712. {
  713. int temp_len = 0;
  714. char* ptr = NULL;
  715. temp_len = MQTTProperties_len(&command->properties);
  716. ptr = bufs[bufindex] = malloc(temp_len);
  717. props_allocated = bufindex;
  718. rc = MQTTProperties_write(&ptr, &command->properties);
  719. lens[bufindex++] = temp_len;
  720. sprintf(key, "%s%u", PERSISTENCE_V5_COMMAND_KEY, ++aclient->command_seqno);
  721. }
  722. else
  723. sprintf(key, "%s%u", PERSISTENCE_COMMAND_KEY, ++aclient->command_seqno);
  724. if (nbufs > 0)
  725. {
  726. if ((rc = aclient->c->persistence->pput(aclient->c->phandle, key, nbufs, (char**)bufs, lens)) != 0)
  727. Log(LOG_ERROR, 0, "Error persisting command, rc %d", rc);
  728. qcmd->seqno = aclient->command_seqno;
  729. }
  730. if (props_allocated > 0)
  731. free(bufs[props_allocated]);
  732. if (lens)
  733. free(lens);
  734. if (bufs)
  735. free(bufs);
  736. FUNC_EXIT_RC(rc);
  737. return rc;
  738. }
  739. static MQTTAsync_queuedCommand* MQTTAsync_restoreCommand(char* buffer, int buflen, int MQTTVersion)
  740. {
  741. MQTTAsync_command* command = NULL;
  742. MQTTAsync_queuedCommand* qcommand = NULL;
  743. char* ptr = buffer;
  744. int i;
  745. size_t data_size;
  746. FUNC_ENTRY;
  747. qcommand = malloc(sizeof(MQTTAsync_queuedCommand));
  748. memset(qcommand, '\0', sizeof(MQTTAsync_queuedCommand));
  749. command = &qcommand->command;
  750. command->type = *(int*)ptr;
  751. ptr += sizeof(int);
  752. command->token = *(MQTTAsync_token*)ptr;
  753. ptr += sizeof(MQTTAsync_token);
  754. switch (command->type)
  755. {
  756. case SUBSCRIBE:
  757. command->details.sub.count = *(int*)ptr;
  758. ptr += sizeof(int);
  759. if (command->details.sub.count > 0)
  760. {
  761. command->details.sub.topics = (char **)malloc(sizeof(char *) * command->details.sub.count);
  762. if (MQTTVersion < MQTTVERSION_5)
  763. command->details.sub.qoss = (int *)malloc(sizeof(int) * command->details.sub.count);
  764. else if (command->details.sub.count > 1)
  765. command->details.sub.optlist = (MQTTSubscribe_options*)malloc(sizeof(MQTTSubscribe_options) * command->details.sub.count);
  766. }
  767. for (i = 0; i < command->details.sub.count; ++i)
  768. {
  769. data_size = strlen(ptr) + 1;
  770. command->details.sub.topics[i] = malloc(data_size);
  771. strcpy(command->details.sub.topics[i], ptr);
  772. ptr += data_size;
  773. if (MQTTVersion < MQTTVERSION_5)
  774. {
  775. command->details.sub.qoss[i] = *(int*)ptr;
  776. ptr += sizeof(int);
  777. }
  778. else
  779. {
  780. if (command->details.sub.count == 1)
  781. {
  782. command->details.sub.opts = *(MQTTSubscribe_options*)ptr;
  783. ptr += sizeof(MQTTSubscribe_options);
  784. }
  785. else
  786. {
  787. command->details.sub.optlist[i] = *(MQTTSubscribe_options*)ptr;
  788. ptr += sizeof(MQTTSubscribe_options);
  789. }
  790. }
  791. }
  792. break;
  793. case UNSUBSCRIBE:
  794. command->details.unsub.count = *(int*)ptr;
  795. ptr += sizeof(int);
  796. if (command->details.unsub.count > 0)
  797. {
  798. command->details.unsub.topics = (char **)malloc(sizeof(char *) * command->details.unsub.count);
  799. }
  800. for (i = 0; i < command->details.unsub.count; ++i)
  801. {
  802. data_size = strlen(ptr) + 1;
  803. command->details.unsub.topics[i] = malloc(data_size);
  804. strcpy(command->details.unsub.topics[i], ptr);
  805. ptr += data_size;
  806. }
  807. break;
  808. case PUBLISH:
  809. data_size = strlen(ptr) + 1;
  810. command-> = malloc(data_size);
  811. strcpy(command->, ptr);
  812. ptr += data_size;
  813. command-> = *(int*)ptr;
  814. ptr += sizeof(int);
  815. data_size = command->;
  816. command-> = malloc(data_size);
  817. memcpy(command->, ptr, data_size);
  818. ptr += data_size;
  819. command-> = *(int*)ptr;
  820. ptr += sizeof(int);
  821. command-> = *(int*)ptr;
  822. ptr += sizeof(int);
  823. break;
  824. default:
  825. free(qcommand);
  826. qcommand = NULL;
  827. }
  828. if (qcommand != NULL && MQTTVersion >= MQTTVERSION_5 &&
  829. MQTTProperties_read(&command->properties, &ptr, buffer + buflen) != 1)
  830. {
  831. Log(LOG_ERROR, -1, "Error restoring properties from persistence");
  832. free(qcommand);
  833. qcommand = NULL;
  834. }
  835. FUNC_EXIT;
  836. return qcommand;
  837. }
  838. /*
  839. static void MQTTAsync_insertInOrder(List* list, void* content, int size)
  840. {
  841. ListElement* index = NULL;
  842. ListElement* current = NULL;
  843. FUNC_ENTRY;
  844. while (ListNextElement(list, &current) != NULL && index == NULL)
  845. {
  846. if (((MQTTAsync_queuedCommand*)content)->seqno < ((MQTTAsync_queuedCommand*)current->content)->seqno)
  847. index = current;
  848. }
  849. ListInsert(list, content, size, index);
  850. FUNC_EXIT;
  851. }*/
  852. static int MQTTAsync_restoreCommands(MQTTAsyncs* client)
  853. {
  854. int rc = 0;
  855. char **msgkeys;
  856. int nkeys;
  857. int i = 0;
  858. Clients* c = client->c;
  859. int commands_restored = 0;
  860. FUNC_ENTRY;
  861. if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
  862. {
  863. while (rc == 0 && i < nkeys)
  864. {
  865. char *buffer = NULL;
  866. int buflen;
  867. if (strncmp(msgkeys[i], PERSISTENCE_COMMAND_KEY, strlen(PERSISTENCE_COMMAND_KEY)) != 0 &&
  868. strncmp(msgkeys[i], PERSISTENCE_V5_COMMAND_KEY, strlen(PERSISTENCE_V5_COMMAND_KEY)) != 0)
  869. {
  870. ;
  871. }
  872. else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0)
  873. {
  874. int MQTTVersion =
  875. (strncmp(msgkeys[i], PERSISTENCE_V5_COMMAND_KEY, strlen(PERSISTENCE_V5_COMMAND_KEY)) == 0)
  877. MQTTAsync_queuedCommand* cmd = MQTTAsync_restoreCommand(buffer, buflen, MQTTVersion);
  878. if (cmd)
  879. {
  880. cmd->client = client;
  881. cmd->seqno = atoi(strchr(msgkeys[i], '-')+1); /* key format is tag'-'seqno */
  882. MQTTPersistence_insertInOrder(commands, cmd, sizeof(MQTTAsync_queuedCommand));
  883. free(buffer);
  884. client->command_seqno = max(client->command_seqno, cmd->seqno);
  885. commands_restored++;
  886. }
  887. }
  888. if (msgkeys[i])
  889. free(msgkeys[i]);
  890. i++;
  891. }
  892. if (msgkeys != NULL)
  893. free(msgkeys);
  894. }
  895. Log(TRACE_MINIMUM, -1, "%d commands restored for client %s", commands_restored, c->clientID);
  896. FUNC_EXIT_RC(rc);
  897. return rc;
  898. }
  899. #endif
  900. static int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_size)
  901. {
  902. int rc = 0;
  903. FUNC_ENTRY;
  904. MQTTAsync_lock_mutex(mqttcommand_mutex);
  905. /* Don't set start time if the connect command is already in process #218 */
  906. if ((command->command.type != CONNECT) || (command->client->c->connect_state == NOT_IN_PROGRESS))
  907. command->command.start_time = MQTTAsync_start_clock();
  908. if (command->command.type == CONNECT ||
  909. (command->command.type == DISCONNECT && command->command.details.dis.internal))
  910. {
  911. MQTTAsync_queuedCommand* head = NULL;
  912. if (commands->first)
  913. head = (MQTTAsync_queuedCommand*)(commands->first->content);
  914. if (head != NULL && head->client == command->client && head->command.type == command->command.type)
  915. MQTTAsync_freeCommand(command); /* ignore duplicate connect or disconnect command */
  916. else
  917. ListInsert(commands, command, command_size, commands->first); /* add to the head of the list */
  918. }
  919. else
  920. {
  921. ListAppend(commands, command, command_size);
  922. #if !defined(NO_PERSISTENCE)
  923. if (command->client->c->persistence)
  924. MQTTAsync_persistCommand(command);
  925. #endif
  926. }
  927. MQTTAsync_unlock_mutex(mqttcommand_mutex);
  928. #if !defined(WIN32) && !defined(WIN64)
  929. rc = Thread_signal_cond(send_cond);
  930. if (rc != 0)
  931. Log(LOG_ERROR, 0, "Error %d from signal cond", rc);
  932. #else
  933. rc = Thread_post_sem(send_sem);
  934. #endif
  935. FUNC_EXIT_RC(rc);
  936. return rc;
  937. }
  938. static void MQTTAsync_startConnectRetry(MQTTAsyncs* m)
  939. {
  940. if (m->automaticReconnect && m->shouldBeConnected)
  941. {
  942. m->lastConnectionFailedTime = MQTTAsync_start_clock();
  943. if (m->retrying)
  944. {
  945. m->currentIntervalBase = min(m->currentIntervalBase * 2, m->maxRetryInterval);
  946. }
  947. else
  948. {
  949. m->currentIntervalBase = m->minRetryInterval;
  950. m->retrying = 1;
  951. }
  952. m->currentInterval = MQTTAsync_randomJitter(m->currentIntervalBase, m->minRetryInterval, m->maxRetryInterval);
  953. }
  954. }
  955. int MQTTAsync_reconnect(MQTTAsync handle)
  956. {
  957. int rc = MQTTASYNC_FAILURE;
  958. MQTTAsyncs* m = handle;
  959. FUNC_ENTRY;
  960. MQTTAsync_lock_mutex(mqttasync_mutex);
  961. if (m->automaticReconnect)
  962. {
  963. if (m->shouldBeConnected)
  964. {
  965. m->reconnectNow = 1;
  966. if (m->retrying == 0)
  967. {
  968. m->currentIntervalBase = m->minRetryInterval;
  969. m->currentInterval = m->minRetryInterval;
  970. m->retrying = 1;
  971. }
  973. }
  974. }
  975. else
  976. {
  977. /* to reconnect, put the connect command to the head of the command queue */
  978. MQTTAsync_queuedCommand* conn = malloc(sizeof(MQTTAsync_queuedCommand));
  979. memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
  980. conn->client = m;
  981. conn->command = m->connect;
  982. /* make sure that the version attempts are restarted */
  983. if (m->c->MQTTVersion == MQTTVERSION_DEFAULT)
  984. conn->command.details.conn.MQTTVersion = 0;
  985. MQTTAsync_addCommand(conn, sizeof(m->connect));
  987. }
  988. MQTTAsync_unlock_mutex(mqttasync_mutex);
  989. FUNC_EXIT_RC(rc);
  990. return rc;
  991. }
  992. static void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* command)
  993. {
  994. MQTTAsyncs* m = handle;
  995. FUNC_ENTRY;
  996. /* wait for all inflight message flows to finish, up to timeout */;
  997. if (m->c->outboundMsgs->count == 0 || MQTTAsync_elapsed(command->start_time) >= command->details.dis.timeout)
  998. {
  999. int was_connected = m->c->connected;
  1000. MQTTAsync_closeSession(m->c, command->details.dis.reasonCode, &command->properties);
  1001. if (command->details.dis.internal)
  1002. {
  1003. if (m->cl && was_connected)
  1004. {
  1005. Log(TRACE_MIN, -1, "Calling connectionLost for client %s", m->c->clientID);
  1006. (*(m->cl))(m->clContext, NULL);
  1007. }
  1008. MQTTAsync_startConnectRetry(m);
  1009. }
  1010. else if (command->onSuccess)
  1011. {
  1012. MQTTAsync_successData data;
  1013. memset(&data, '\0', sizeof(data));
  1014. Log(TRACE_MIN, -1, "Calling disconnect complete for client %s", m->c->clientID);
  1015. (*(command->onSuccess))(command->context, &data);
  1016. }
  1017. else if (command->onSuccess5)
  1018. {
  1019. MQTTAsync_successData5 data = MQTTAsync_successData5_initializer;
  1020. data.reasonCode = MQTTASYNC_SUCCESS;
  1021. Log(TRACE_MIN, -1, "Calling disconnect complete for client %s", m->c->clientID);
  1022. (*(command->onSuccess5))(command->context, &data);
  1023. }
  1024. }
  1025. FUNC_EXIT;
  1026. }
  1027. /**
  1028. * Call Socket_noPendingWrites(int socket) with protection by socket_mutex, see
  1029. */
  1030. static int MQTTAsync_Socket_noPendingWrites(int socket)
  1031. {
  1032. int rc;
  1033. Thread_lock_mutex(socket_mutex);
  1034. rc = Socket_noPendingWrites(socket);
  1035. Thread_unlock_mutex(socket_mutex);
  1036. return rc;
  1037. }
  1038. /**
  1039. * See if any pending writes have been completed, and cleanup if so.
  1040. * Cleaning up means removing any publication data that was stored because the write did
  1041. * not originally complete.
  1042. */
  1043. static void MQTTProtocol_checkPendingWrites(void)
  1044. {
  1045. FUNC_ENTRY;
  1046. if (state.pending_writes.count > 0)
  1047. {
  1048. ListElement* le = state.pending_writes.first;
  1049. while (le)
  1050. {
  1051. if (Socket_noPendingWrites(((pending_write*)(le->content))->socket))
  1052. {
  1053. MQTTProtocol_removePublication(((pending_write*)(le->content))->p);
  1054. state.pending_writes.current = le;
  1055. ListRemove(&(state.pending_writes), le->content); /* does NextElement itself */
  1056. le = state.pending_writes.current;
  1057. }
  1058. else
  1059. ListNextElement(&(state.pending_writes), &le);
  1060. }
  1061. }
  1062. FUNC_EXIT;
  1063. }
  1064. static void MQTTAsync_freeServerURIs(MQTTAsyncs* m)
  1065. {
  1066. int i;
  1067. for (i = 0; i < m->serverURIcount; ++i)
  1068. free(m->serverURIs[i]);
  1069. m->serverURIcount = 0;
  1070. if (m->serverURIs)
  1071. free(m->serverURIs);
  1072. m->serverURIs = NULL;
  1073. }
  1074. static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command)
  1075. {
  1076. if (command->command.type == SUBSCRIBE)
  1077. {
  1078. int i;
  1079. for (i = 0; i < command->command.details.sub.count; i++)
  1080. free(command->command.details.sub.topics[i]);
  1081. free(command->command.details.sub.topics);
  1082. command->command.details.sub.topics = NULL;
  1083. free(command->command.details.sub.qoss);
  1084. command->command.details.sub.qoss = NULL;
  1085. }
  1086. else if (command->command.type == UNSUBSCRIBE)
  1087. {
  1088. int i;
  1089. for (i = 0; i < command->command.details.unsub.count; i++)
  1090. free(command->command.details.unsub.topics[i]);
  1091. free(command->command.details.unsub.topics);
  1092. command->command.details.unsub.topics = NULL;
  1093. }
  1094. else if (command->command.type == PUBLISH)
  1095. {
  1096. /* qos 1 and 2 topics are freed in the protocol code when the flows are completed */
  1097. if (command->
  1098. free(command->;
  1099. command-> = NULL;
  1100. free(command->;
  1101. command-> = NULL;
  1102. }
  1103. MQTTProperties_free(&command->;
  1104. }
  1105. static void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command)
  1106. {
  1107. MQTTAsync_freeCommand1(command);
  1108. free(command);
  1109. }
  1110. static void MQTTAsync_writeComplete(int socket, int rc)
  1111. {
  1112. ListElement* found = NULL;
  1113. FUNC_ENTRY;
  1114. /* a partial write is now complete for a socket - this will be on a publish*/
  1115. MQTTProtocol_checkPendingWrites();
  1116. /* find the client using this socket */
  1117. if ((found = ListFindItem(handles, &socket, clientSockCompare)) != NULL)
  1118. {
  1119. MQTTAsyncs* m = (MQTTAsyncs*)(found->content);
  1120. time(&(m->c->net.lastSent));
  1121. /* see if there is a pending write flagged */
  1122. if (m->pending_write)
  1123. {
  1124. ListElement* cur_response = NULL;
  1125. MQTTAsync_command* command = m->pending_write;
  1126. MQTTAsync_queuedCommand* com = NULL;
  1127. cur_response = NULL;
  1128. while (ListNextElement(m->responses, &cur_response))
  1129. {
  1130. com = (MQTTAsync_queuedCommand*)(cur_response->content);
  1131. if (&com->command == m->pending_write)
  1132. break;
  1133. }
  1134. if (cur_response) /* we found a response */
  1135. {
  1136. if (command->type == PUBLISH)
  1137. {
  1138. if (rc == 1 && command-> == 0)
  1139. {
  1140. if (command->onSuccess)
  1141. {
  1142. MQTTAsync_successData data;
  1143. data.token = command->token;
  1144. = command->;
  1145. = command->;
  1146. = command->;
  1147. = command->;
  1148. = command->;
  1149. Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
  1150. (*(command->onSuccess))(command->context, &data);
  1151. }
  1152. else if (command->onSuccess5)
  1153. {
  1154. MQTTAsync_successData5 data = MQTTAsync_successData5_initializer;
  1155. data.token = command->token;
  1156. = command->;
  1157. = command->;
  1158. = command->;
  1159. = command->;
  1160. = command->;
  1161. = command->properties;
  1162. Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
  1163. (*(command->onSuccess5))(command->context, &data);
  1164. }
  1165. }
  1166. else if (rc == -1)
  1167. {
  1168. if (command->onFailure)
  1169. {
  1170. MQTTAsync_failureData data;
  1171. data.token = command->token;
  1172. data.code = rc;
  1173. data.message = NULL;
  1174. Log(TRACE_MIN, -1, "Calling publish failure for client %s", m->c->clientID);
  1175. (*(command->onFailure))(command->context, &data);
  1176. }
  1177. else if (command->onFailure5)
  1178. {
  1179. MQTTAsync_failureData5 data;
  1180. data.token = command->token;
  1181. data.code = rc;
  1182. data.message = NULL;
  1183. data.packet_type = PUBLISH;
  1184. Log(TRACE_MIN, -1, "Calling publish failure for client %s", m->c->clientID);
  1185. (*(command->onFailure5))(command->context, &data);
  1186. }
  1187. }
  1188. else
  1189. com = NULL; /* Don't delete response we haven't acknowledged */
  1190. }
  1191. if (com)
  1192. {
  1193. Log(TRACE_PROTOCOL, -1, "writeComplete: Removing response for msgid %d", com->command.token);
  1194. ListDetach(m->responses, com);
  1195. MQTTAsync_freeCommand(com);
  1196. }
  1197. } /* if cur_response */
  1198. m->pending_write = NULL;
  1199. } /* if pending_write */
  1200. }
  1201. FUNC_EXIT;
  1202. }
  1203. static int MQTTAsync_processCommand(void)
  1204. {
  1205. int rc = 0;
  1206. MQTTAsync_queuedCommand* command = NULL;
  1207. ListElement* cur_command = NULL;
  1208. List* ignored_clients = NULL;
  1209. FUNC_ENTRY;
  1210. MQTTAsync_lock_mutex(mqttasync_mutex);
  1211. MQTTAsync_lock_mutex(mqttcommand_mutex);
  1212. /* only the first command in the list must be processed for any particular client, so if we skip
  1213. a command for a client, we must skip all following commands for that client. Use a list of
  1214. ignored clients to keep track
  1215. */
  1216. ignored_clients = ListInitialize();
  1217. /* don't try a command until there isn't a pending write for that client, and we are not connecting */
  1218. while (ListNextElement(commands, &cur_command))
  1219. {
  1220. MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(cur_command->content);
  1221. if (ListFind(ignored_clients, cmd->client))
  1222. continue;
  1223. if (cmd->command.type == CONNECT || cmd->command.type == DISCONNECT || (cmd->client->c->connected &&
  1224. cmd->client->c->connect_state == NOT_IN_PROGRESS && MQTTAsync_Socket_noPendingWrites(cmd->client->c->net.socket)))
  1225. {
  1226. if ((cmd->command.type == PUBLISH || cmd->command.type == SUBSCRIBE || cmd->command.type == UNSUBSCRIBE) &&
  1227. cmd->client->c->outboundMsgs->count >= MAX_MSG_ID - 1)
  1228. {
  1229. ; /* no more message ids available */
  1230. }
  1231. else if (cmd->client->c->MQTTVersion >= MQTTVERSION_5 &&
  1232. ((cmd->command.type == PUBLISH && cmd-> > 0) ||
  1233. cmd->command.type == SUBSCRIBE || cmd->command.type == UNSUBSCRIBE) &&
  1234. (cmd->client->c->outboundMsgs->count >= cmd->client->c->maxInflightMessages))
  1235. {
  1236. Log(TRACE_MIN, -1, "Blocking on server receive maximum for client %s",
  1237. cmd->client->c->clientID); /* flow control */
  1238. }
  1239. else
  1240. {
  1241. command = cmd;
  1242. break;
  1243. }
  1244. }
  1245. ListAppend(ignored_clients, cmd->client, sizeof(cmd->client));
  1246. }
  1247. ListFreeNoContent(ignored_clients);
  1248. if (command)
  1249. {
  1250. ListDetach(commands, command);
  1251. #if !defined(NO_PERSISTENCE)
  1252. if (command->client->c->persistence)
  1253. MQTTAsync_unpersistCommand(command);
  1254. #endif
  1255. }
  1256. MQTTAsync_unlock_mutex(mqttcommand_mutex);
  1257. if (!command)
  1258. goto exit; /* nothing to do */
  1259. if (command->command.type == CONNECT)
  1260. {
  1261. if (command->client->c->connect_state != NOT_IN_PROGRESS || command->client->c->connected)
  1262. rc = 0;
  1263. else
  1264. {
  1265. char* serverURI = command->client->serverURI;
  1266. if (command->client->serverURIcount > 0)
  1267. {
  1268. serverURI = command->client->serverURIs[command->command.details.conn.currentURI];
  1269. if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
  1270. serverURI += strlen(URI_TCP);
  1271. else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
  1272. {
  1273. serverURI += strlen(URI_WS);
  1274. command->client->websocket = 1;
  1275. }
  1276. #if defined(OPENSSL)
  1277. else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
  1278. {
  1279. serverURI += strlen(URI_SSL);
  1280. command->client->ssl = 1;
  1281. }
  1282. else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
  1283. {
  1284. serverURI += strlen(URI_WSS);
  1285. command->client->ssl = 1;
  1286. command->client->websocket = 1;
  1287. }
  1288. #endif
  1289. }
  1290. if (command->client->c->MQTTVersion == MQTTVERSION_DEFAULT)
  1291. {
  1292. if (command->command.details.conn.MQTTVersion == MQTTVERSION_DEFAULT)
  1293. command->command.details.conn.MQTTVersion = MQTTVERSION_3_1_1;
  1294. else if (command->command.details.conn.MQTTVersion == MQTTVERSION_3_1_1)
  1295. command->command.details.conn.MQTTVersion = MQTTVERSION_3_1;
  1296. }
  1297. else
  1298. command->command.details.conn.MQTTVersion = command->client->c->MQTTVersion;
  1299. Log(TRACE_PROTOCOL, -1, "Connecting to serverURI %s with MQTT version %d", serverURI, command->command.details.conn.MQTTVersion);
  1300. #if defined(OPENSSL)
  1301. rc = MQTTProtocol_connect(serverURI, command->client->c, command->client->ssl, command->client->websocket,
  1302. command->command.details.conn.MQTTVersion, command->client->connectProps, command->client->willProps);
  1303. #else
  1304. rc = MQTTProtocol_connect(serverURI, command->client->c, command->client->websocket,
  1305. command->command.details.conn.MQTTVersion, command->client->connectProps, command->client->willProps);
  1306. #endif
  1307. if (command->client->c->connect_state == NOT_IN_PROGRESS)
  1308. rc = SOCKET_ERROR;
  1309. /* if the TCP connect is pending, then we must call select to determine when the connect has completed,
  1310. which is indicated by the socket being ready *either* for reading *or* writing. The next couple of lines
  1311. make sure we check for writeability as well as readability, otherwise we wait around longer than we need to
  1312. in Socket_getReadySocket() */
  1313. if (rc == EINPROGRESS)
  1314. Socket_addPendingWrite(command->client->c->net.socket);
  1315. }
  1316. }
  1317. else if (command->command.type == SUBSCRIBE)
  1318. {
  1319. List* topics = ListInitialize();
  1320. List* qoss = ListInitialize();
  1321. MQTTProperties* props = NULL;
  1322. MQTTSubscribe_options* subopts = NULL;
  1323. int i;
  1324. for (i = 0; i < command->command.details.sub.count; i++)
  1325. {
  1326. ListAppend(topics, command->command.details.sub.topics[i], strlen(command->command.details.sub.topics[i]));
  1327. ListAppend(qoss, &command->command.details.sub.qoss[i], sizeof(int));
  1328. }
  1329. if (command->client->c->MQTTVersion >= MQTTVERSION_5)
  1330. {
  1331. props = &command->;
  1332. if (command->command.details.sub.count > 1)
  1333. subopts = command->command.details.sub.optlist;
  1334. else
  1335. subopts = &command->command.details.sub.opts;
  1336. }
  1337. rc = MQTTProtocol_subscribe(command->client->c, topics, qoss, command->command.token, subopts, props);
  1338. ListFreeNoContent(topics);
  1339. ListFreeNoContent(qoss);
  1340. if (command->client->c->MQTTVersion >= MQTTVERSION_5 && command->command.details.sub.count > 1)
  1341. free(command->command.details.sub.optlist);
  1342. }
  1343. else if (command->command.type == UNSUBSCRIBE)
  1344. {
  1345. List* topics = ListInitialize();
  1346. MQTTProperties* props = NULL;
  1347. int i;
  1348. for (i = 0; i < command->command.details.unsub.count; i++)
  1349. ListAppend(topics, command->command.details.unsub.topics[i], strlen(command->command.details.unsub.topics[i]));
  1350. if (command->client->c->MQTTVersion >= MQTTVERSION_5)
  1351. props = &command->;
  1352. rc = MQTTProtocol_unsubscribe(command->client->c, topics, command->command.token, props);
  1353. ListFreeNoContent(topics);
  1354. }
  1355. else if (command->command.type == PUBLISH)
  1356. {
  1357. Messages* msg = NULL;
  1358. Publish* p = NULL;
  1359. MQTTProperties initialized = MQTTProperties_initializer;
  1360. p = malloc(sizeof(Publish));
  1361. p->payload = command->;
  1362. p->payloadlen = command->;
  1363. p->topic = command->;
  1364. p->msgId = command->command.token;
  1365. p->MQTTVersion = command->client->c->MQTTVersion;
  1366. p->properties = initialized;
  1367. if (p->MQTTVersion >= MQTTVERSION_5)
  1368. p->properties = command->;
  1369. rc = MQTTProtocol_startPublish(command->client->c, p, command->, command->, &msg);
  1370. if (command-> == 0)
  1371. {
  1372. if (rc == TCPSOCKET_COMPLETE)
  1373. {
  1374. if (command->command.onSuccess)
  1375. {
  1376. MQTTAsync_successData data;
  1377. data.token = command->command.token;
  1378. = command->;
  1379. = command->;
  1380. = command->;
  1381. = command->;
  1382. = command->;
  1383. Log(TRACE_MIN, -1, "Calling publish success for client %s", command->client->c->clientID);
  1384. (*(command->command.onSuccess))(command->command.context, &data);
  1385. }
  1386. else if (command->command.onSuccess5)
  1387. {
  1388. MQTTAsync_successData5 data = MQTTAsync_successData5_initializer;
  1389. data.token = command->command.token;
  1390. = command->;
  1391. = command->;
  1392. = command->;
  1393. = command->;
  1394. = command->;
  1395. = command->;
  1396. Log(TRACE_MIN, -1, "Calling publish success for client %s", command->client->c->clientID);
  1397. (*(command->command.onSuccess5))(command->command.context, &data);
  1398. }
  1399. }
  1400. else
  1401. {
  1402. if (rc != SOCKET_ERROR)
  1403. command-> = NULL; /* this will be freed by the protocol code */
  1404. command->client->pending_write = &command->command;
  1405. }
  1406. }
  1407. else
  1408. command-> = NULL; /* this will be freed by the protocol code */
  1409. free(p); /* should this be done if the write isn't complete? */
  1410. }
  1411. else if (command->command.type == DISCONNECT)
  1412. {
  1413. if (command->client->c->connect_state != NOT_IN_PROGRESS || command->client->c->connected != 0)
  1414. {
  1415. if (command->client->c->connect_state != NOT_IN_PROGRESS)
  1416. {
  1417. command->client->c->connect_state = DISCONNECTING;
  1418. if (command->client->connect.onFailure)
  1419. {
  1420. MQTTAsync_failureData data;
  1421. data.token = 0;
  1423. data.message = NULL;
  1424. Log(TRACE_MIN, -1, "Calling connect failure for client %s", command->client->c->clientID);
  1425. (*(command->client->connect.onFailure))(command->client->connect.context, &data);
  1426. }
  1427. else if (command->client->connect.onFailure5)
  1428. {
  1429. MQTTAsync_failureData5 data;
  1430. data.token = 0;
  1432. data.message = NULL;
  1433. Log(TRACE_MIN, -1, "Calling connect failure for client %s", command->client->c->clientID);
  1434. (*(command->client->connect.onFailure5))(command->client->connect.context, &data);
  1435. }
  1436. }
  1437. MQTTAsync_checkDisconnect(command->client, &command->command);
  1438. }
  1439. }
  1440. if (command->command.type == CONNECT && rc != SOCKET_ERROR && rc != MQTTASYNC_PERSISTENCE_ERROR)
  1441. {
  1442. command->client->connect = command->command;
  1443. MQTTAsync_freeCommand(command);
  1444. }
  1445. else if (command->command.type == DISCONNECT)
  1446. {
  1447. command->client->disconnect = command->command;
  1448. MQTTAsync_freeCommand(command);
  1449. }
  1450. else if (command->command.type == PUBLISH && command-> == 0 &&
  1452. {
  1453. if (rc == TCPSOCKET_INTERRUPTED)
  1454. ListAppend(command->client->responses, command, sizeof(command));
  1455. else
  1456. MQTTAsync_freeCommand(command);
  1457. }
  1458. else if (rc == SOCKET_ERROR || rc == MQTTASYNC_PERSISTENCE_ERROR)
  1459. {
  1460. if (command->command.type == CONNECT)
  1461. {
  1462. MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
  1463. MQTTAsync_disconnect(command->client, &opts); /* not "internal" because we don't want to call connection lost */
  1464. command->client->shouldBeConnected = 1; /* as above call is not "internal" we need to reset this */
  1465. }
  1466. else
  1467. MQTTAsync_disconnect_internal(command->client, 0);
  1468. if (command->command.type == CONNECT
  1469. && MQTTAsync_checkConn(&command->command, command->client))
  1470. {
  1471. Log(TRACE_MIN, -1, "Connect failed, more to try");
  1472. if (command->client->c->MQTTVersion == MQTTVERSION_DEFAULT)
  1473. {
  1474. if (command->command.details.conn.MQTTVersion == MQTTVERSION_3_1)
  1475. {
  1476. command->command.details.conn.currentURI++;
  1477. command->command.details.conn.MQTTVersion = MQTTVERSION_DEFAULT;
  1478. }
  1479. } else
  1480. command->command.details.conn.currentURI++;
  1481. /* put the connect command back to the head of the command queue, using the next serverURI */
  1482. rc = MQTTAsync_addCommand(command,
  1483. sizeof(command->command.details.conn));
  1484. } else
  1485. {
  1486. if (command->command.onFailure)
  1487. {
  1488. MQTTAsync_failureData data;
  1489. data.token = 0;
  1490. data.code = rc;
  1491. data.message = NULL;
  1492. Log(TRACE_MIN, -1, "Calling command failure for client %s", command->client->c->clientID);
  1493. (*(command->command.onFailure))(command->command.context, &data);
  1494. }
  1495. else if (command->command.onFailure5)
  1496. {
  1497. MQTTAsync_failureData5 data = MQTTAsync_failureData5_initializer;
  1498. data.code = rc;
  1499. Log(TRACE_MIN, -1, "Calling command failure for client %s", command->client->c->clientID);
  1500. (*(command->command.onFailure5))(command->command.context, &data);
  1501. }
  1502. if (command->command.type == CONNECT)
  1503. {
  1504. command->client->connect = command->command;
  1505. MQTTAsync_startConnectRetry(command->client);
  1506. }
  1507. MQTTAsync_freeCommand(command); /* free up the command if necessary */
  1508. }
  1509. }
  1510. else /* put the command into a waiting for response queue for each client, indexed by msgid */
  1511. ListAppend(command->client->responses, command, sizeof(command));
  1512. exit:
  1513. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1514. rc = (command != NULL);
  1515. FUNC_EXIT_RC(rc);
  1516. return rc;
  1517. }
  1518. static void nextOrClose(MQTTAsyncs* m, int rc, char* message)
  1519. {
  1520. FUNC_ENTRY;
  1521. if (MQTTAsync_checkConn(&m->connect, m))
  1522. {
  1523. MQTTAsync_queuedCommand* conn;
  1524. MQTTAsync_closeOnly(m->c, MQTTREASONCODE_SUCCESS, NULL);
  1525. /* put the connect command back to the head of the command queue, using the next serverURI */
  1526. conn = malloc(sizeof(MQTTAsync_queuedCommand));
  1527. memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
  1528. conn->client = m;
  1529. conn->command = m->connect;
  1530. Log(TRACE_MIN, -1, "Connect failed, more to try");
  1531. if (conn->client->c->MQTTVersion == MQTTVERSION_DEFAULT)
  1532. {
  1533. if (conn->command.details.conn.MQTTVersion == MQTTVERSION_3_1)
  1534. {
  1535. conn->command.details.conn.currentURI++;
  1536. conn->command.details.conn.MQTTVersion = MQTTVERSION_DEFAULT;
  1537. }
  1538. }
  1539. else
  1540. conn->command.details.conn.currentURI++;
  1541. MQTTAsync_addCommand(conn, sizeof(m->connect));
  1542. }
  1543. else
  1544. {
  1545. MQTTAsync_closeSession(m->c, MQTTREASONCODE_SUCCESS, NULL);
  1546. if (m->connect.onFailure)
  1547. {
  1548. MQTTAsync_failureData data;
  1549. data.token = 0;
  1550. data.code = rc;
  1551. data.message = message;
  1552. Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
  1553. (*(m->connect.onFailure))(m->connect.context, &data);
  1554. }
  1555. else if (m->connect.onFailure5)
  1556. {
  1557. MQTTAsync_failureData5 data = MQTTAsync_failureData5_initializer;
  1558. data.token = 0;
  1559. data.code = rc;
  1560. data.message = message;
  1561. Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
  1562. (*(m->connect.onFailure5))(m->connect.context, &data);
  1563. }
  1564. MQTTAsync_startConnectRetry(m);
  1565. }
  1566. FUNC_EXIT;
  1567. }
  1568. static void MQTTAsync_checkTimeouts(void)
  1569. {
  1570. ListElement* current = NULL;
  1571. static time_t last = 0L;
  1572. time_t now;
  1573. FUNC_ENTRY;
  1574. time(&(now));
  1575. if (difftime(now, last) < 3)
  1576. goto exit;
  1577. MQTTAsync_lock_mutex(mqttasync_mutex);
  1578. last = now;
  1579. while (ListNextElement(handles, &current)) /* for each client */
  1580. {
  1581. MQTTAsyncs* m = (MQTTAsyncs*)(current->content);
  1582. /* check disconnect timeout */
  1583. if (m->c->connect_state == DISCONNECTING)
  1584. MQTTAsync_checkDisconnect(m, &m->disconnect);
  1585. /* check connect timeout */
  1586. if (m->c->connect_state != NOT_IN_PROGRESS && MQTTAsync_elapsed(m->connect.start_time) > (m->connectTimeout * 1000))
  1587. {
  1588. nextOrClose(m, MQTTASYNC_FAILURE, "TCP connect timeout");
  1589. continue;
  1590. }
  1591. /* There was a section here that removed timed-out responses. But if the command had completed and
  1592. * there was a response, then we may as well report it, no?
  1593. *
  1594. * In any case, that section was disabled when automatic reconnect was implemented.
  1595. */
  1596. if (m->automaticReconnect && m->retrying)
  1597. {
  1598. if (m->reconnectNow || MQTTAsync_elapsed(m->lastConnectionFailedTime) > (m->currentInterval * 1000))
  1599. {
  1600. /* to reconnect put the connect command to the head of the command queue */
  1601. MQTTAsync_queuedCommand* conn = malloc(sizeof(MQTTAsync_queuedCommand));
  1602. memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
  1603. conn->client = m;
  1604. conn->command = m->connect;
  1605. /* make sure that the version attempts are restarted */
  1606. if (m->c->MQTTVersion == MQTTVERSION_DEFAULT)
  1607. conn->command.details.conn.MQTTVersion = 0;
  1608. Log(TRACE_MIN, -1, "Automatically attempting to reconnect");
  1609. MQTTAsync_addCommand(conn, sizeof(m->connect));
  1610. m->reconnectNow = 0;
  1611. }
  1612. }
  1613. }
  1614. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1615. exit:
  1616. FUNC_EXIT;
  1617. }
  1618. static thread_return_type WINAPI MQTTAsync_sendThread(void* n)
  1619. {
  1620. FUNC_ENTRY;
  1621. MQTTAsync_lock_mutex(mqttasync_mutex);
  1622. sendThread_state = RUNNING;
  1623. sendThread_id = Thread_getid();
  1624. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1625. while (!tostop)
  1626. {
  1627. int rc;
  1628. while (commands->count > 0)
  1629. {
  1630. if (MQTTAsync_processCommand() == 0)
  1631. break; /* no commands were processed, so go into a wait */
  1632. }
  1633. #if !defined(WIN32) && !defined(WIN64)
  1634. if ((rc = Thread_wait_cond(send_cond, 1)) != 0 && rc != ETIMEDOUT)
  1635. Log(LOG_ERROR, -1, "Error %d waiting for condition variable", rc);
  1636. #else
  1637. if ((rc = Thread_wait_sem(send_sem, 1000)) != 0 && rc != ETIMEDOUT)
  1638. Log(LOG_ERROR, -1, "Error %d waiting for semaphore", rc);
  1639. #endif
  1640. MQTTAsync_checkTimeouts();
  1641. }
  1642. sendThread_state = STOPPING;
  1643. MQTTAsync_lock_mutex(mqttasync_mutex);
  1644. sendThread_state = STOPPED;
  1645. sendThread_id = 0;
  1646. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1647. FUNC_EXIT;
  1648. return 0;
  1649. }
  1650. static void MQTTAsync_emptyMessageQueue(Clients* client)
  1651. {
  1652. FUNC_ENTRY;
  1653. /* empty message queue */
  1654. if (client->messageQueue->count > 0)
  1655. {
  1656. ListElement* current = NULL;
  1657. while (ListNextElement(client->messageQueue, &current))
  1658. {
  1659. qEntry* qe = (qEntry*)(current->content);
  1660. free(qe->topicName);
  1661. free(qe->msg->payload);
  1662. free(qe->msg);
  1663. }
  1664. ListEmpty(client->messageQueue);
  1665. }
  1666. FUNC_EXIT;
  1667. }
  1668. static void MQTTAsync_removeResponsesAndCommands(MQTTAsyncs* m)
  1669. {
  1670. int count = 0;
  1671. ListElement* current = NULL;
  1672. ListElement *next = NULL;
  1673. FUNC_ENTRY;
  1674. if (m->responses)
  1675. {
  1676. ListElement* cur_response = NULL;
  1677. while (ListNextElement(m->responses, &cur_response))
  1678. {
  1679. MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(cur_response->content);
  1680. if (command->command.onFailure)
  1681. {
  1682. MQTTAsync_failureData data;
  1683. data.token = command->command.token;
  1684. data.code = MQTTASYNC_OPERATION_INCOMPLETE; /* interrupted return code */
  1685. data.message = NULL;
  1686. Log(TRACE_MIN, -1, "Calling %s failure for client %s",
  1687. MQTTPacket_name(command->command.type), m->c->clientID);
  1688. (*(command->command.onFailure))(command->command.context, &data);
  1689. }
  1690. else if (command->command.onFailure5)
  1691. {
  1692. MQTTAsync_failureData5 data = MQTTAsync_failureData5_initializer;
  1693. data.token = command->command.token;
  1694. data.code = MQTTASYNC_OPERATION_INCOMPLETE; /* interrupted return code */
  1695. data.message = NULL;
  1696. Log(TRACE_MIN, -1, "Calling %s failure for client %s",
  1697. MQTTPacket_name(command->command.type), m->c->clientID);
  1698. (*(command->command.onFailure5))(command->command.context, &data);
  1699. }
  1700. MQTTAsync_freeCommand1(command);
  1701. count++;
  1702. }
  1703. }
  1704. ListEmpty(m->responses);
  1705. Log(TRACE_MINIMUM, -1, "%d responses removed for client %s", count, m->c->clientID);
  1706. /* remove commands in the command queue relating to this client */
  1707. count = 0;
  1708. current = ListNextElement(commands, &next);
  1709. ListNextElement(commands, &next);
  1710. while (current)
  1711. {
  1712. MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(current->content);
  1713. if (command->client == m)
  1714. {
  1715. ListDetach(commands, command);
  1716. if (command->command.onFailure)
  1717. {
  1718. MQTTAsync_failureData data;
  1719. data.token = command->command.token;
  1720. data.code = MQTTASYNC_OPERATION_INCOMPLETE; /* interrupted return code */
  1721. data.message = NULL;
  1722. Log(TRACE_MIN, -1, "Calling %s failure for client %s",
  1723. MQTTPacket_name(command->command.type), m->c->clientID);
  1724. (*(command->command.onFailure))(command->command.context, &data);
  1725. }
  1726. else if (command->command.onFailure5)
  1727. {
  1728. MQTTAsync_failureData5 data = MQTTAsync_failureData5_initializer;
  1729. data.token = command->command.token;
  1730. data.code = MQTTASYNC_OPERATION_INCOMPLETE; /* interrupted return code */
  1731. data.message = NULL;
  1732. Log(TRACE_MIN, -1, "Calling %s failure for client %s",
  1733. MQTTPacket_name(command->command.type), m->c->clientID);
  1734. (*(command->command.onFailure5))(command->command.context, &data);
  1735. }
  1736. MQTTAsync_freeCommand(command);
  1737. count++;
  1738. }
  1739. current = next;
  1740. ListNextElement(commands, &next);
  1741. }
  1742. Log(TRACE_MINIMUM, -1, "%d commands removed for client %s", count, m->c->clientID);
  1743. FUNC_EXIT;
  1744. }
  1745. void MQTTAsync_destroy(MQTTAsync* handle)
  1746. {
  1747. MQTTAsyncs* m = *handle;
  1748. FUNC_ENTRY;
  1749. MQTTAsync_lock_mutex(mqttasync_mutex);
  1750. if (m == NULL)
  1751. goto exit;
  1752. MQTTAsync_closeSession(m->c, MQTTREASONCODE_SUCCESS, NULL);
  1753. MQTTAsync_removeResponsesAndCommands(m);
  1754. ListFree(m->responses);
  1755. if (m->c)
  1756. {
  1757. int saved_socket = m->c->net.socket;
  1758. char* saved_clientid = MQTTStrdup(m->c->clientID);
  1759. #if !defined(NO_PERSISTENCE)
  1760. MQTTPersistence_close(m->c);
  1761. #endif
  1762. MQTTAsync_emptyMessageQueue(m->c);
  1763. MQTTProtocol_freeClient(m->c);
  1764. if (!ListRemove(bstate->clients, m->c))
  1765. Log(LOG_ERROR, 0, NULL);
  1766. else
  1767. Log(TRACE_MIN, 1, NULL, saved_clientid, saved_socket);
  1768. free(saved_clientid);
  1769. }
  1770. if (m->serverURI)
  1771. free(m->serverURI);
  1772. if (m->createOptions)
  1773. free(m->createOptions);
  1774. MQTTAsync_freeServerURIs(m);
  1775. if (m->connectProps)
  1776. {
  1777. MQTTProperties_free(m->connectProps);
  1778. free(m->connectProps);
  1779. m->connectProps = NULL;
  1780. }
  1781. if (m->willProps)
  1782. {
  1783. MQTTProperties_free(m->willProps);
  1784. free(m->willProps);
  1785. m->willProps = NULL;
  1786. }
  1787. if (!ListRemove(handles, m))
  1788. Log(LOG_ERROR, -1, "free error");
  1789. *handle = NULL;
  1790. if (bstate->clients->count == 0)
  1791. MQTTAsync_terminate();
  1792. exit:
  1793. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1794. FUNC_EXIT;
  1795. }
  1796. void MQTTAsync_freeMessage(MQTTAsync_message** message)
  1797. {
  1798. FUNC_ENTRY;
  1799. MQTTProperties_free(&(*message)->properties);
  1800. free((*message)->payload);
  1801. free(*message);
  1802. *message = NULL;
  1803. FUNC_EXIT;
  1804. }
  1805. void MQTTAsync_free(void* memory)
  1806. {
  1807. FUNC_ENTRY;
  1808. free(memory);
  1809. FUNC_EXIT;
  1810. }
  1811. static int MQTTAsync_completeConnection(MQTTAsyncs* m, Connack* connack)
  1812. {
  1813. int rc = MQTTASYNC_FAILURE;
  1814. FUNC_ENTRY;
  1815. if (m->c->connect_state == WAIT_FOR_CONNACK) /* MQTT connect sent - wait for CONNACK */
  1816. {
  1817. Log(LOG_PROTOCOL, 1, NULL, m->c->net.socket, m->c->clientID, connack->rc);
  1818. if ((rc = connack->rc) == MQTTASYNC_SUCCESS)
  1819. {
  1820. m->retrying = 0;
  1821. m->c->connected = 1;
  1822. m->c->good = 1;
  1823. m->c->connect_state = NOT_IN_PROGRESS;
  1824. if (m->c->cleansession || m->c->cleanstart)
  1825. rc = MQTTAsync_cleanSession(m->c);
  1826. else if (m->c->MQTTVersion >= MQTTVERSION_3_1_1 && connack->flags.bits.sessionPresent == 0)
  1827. {
  1828. Log(LOG_PROTOCOL, -1, "Cleaning session state on connect because sessionPresent is 0");
  1829. rc = MQTTAsync_cleanSession(m->c);
  1830. }
  1831. if (m->c->outboundMsgs->count > 0)
  1832. {
  1833. ListElement* outcurrent = NULL;
  1834. while (ListNextElement(m->c->outboundMsgs, &outcurrent))
  1835. {
  1836. Messages* messages = (Messages*)(outcurrent->content);
  1837. messages->lastTouch = 0;
  1838. }
  1839. MQTTProtocol_retry((time_t)0, 1, 1);
  1840. if (m->c->connected != 1)
  1842. }
  1843. }
  1844. m->pack = NULL;
  1845. #if !defined(WIN32) && !defined(WIN64)
  1846. Thread_signal_cond(send_cond);
  1847. #else
  1848. Thread_post_sem(send_sem);
  1849. #endif
  1850. }
  1851. FUNC_EXIT_RC(rc);
  1852. return rc;
  1853. }
  1854. /* This is the thread function that handles the calling of callback functions if set */
  1855. static thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
  1856. {
  1857. long timeout = 10L; /* first time in we have a small timeout. Gets things started more quickly */
  1858. FUNC_ENTRY;
  1859. MQTTAsync_lock_mutex(mqttasync_mutex);
  1860. receiveThread_state = RUNNING;
  1861. receiveThread_id = Thread_getid();
  1862. while (!tostop)
  1863. {
  1864. int rc = SOCKET_ERROR;
  1865. int sock = -1;
  1866. MQTTAsyncs* m = NULL;
  1867. MQTTPacket* pack = NULL;
  1868. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1869. pack = MQTTAsync_cycle(&sock, timeout, &rc);
  1870. MQTTAsync_lock_mutex(mqttasync_mutex);
  1871. if (tostop)
  1872. break;
  1873. timeout = 1000L;
  1874. if (sock == 0)
  1875. continue;
  1876. /* find client corresponding to socket */
  1877. if (ListFindItem(handles, &sock, clientSockCompare) == NULL)
  1878. {
  1879. Log(TRACE_MINIMUM, -1, "Could not find client corresponding to socket %d", sock);
  1880. /* Socket_close(sock); - removing socket in this case is not necessary (Bug 442400) */
  1881. continue;
  1882. }
  1883. m = (MQTTAsyncs*)(handles->current->content);
  1884. if (m == NULL)
  1885. {
  1886. Log(LOG_ERROR, -1, "Client structure was NULL for socket %d - removing socket", sock);
  1887. Socket_close(sock);
  1888. continue;
  1889. }
  1890. if (rc == SOCKET_ERROR)
  1891. {
  1892. Log(TRACE_MINIMUM, -1, "Error from MQTTAsync_cycle() - removing socket %d", sock);
  1893. if (m->c->connected == 1)
  1894. MQTTAsync_disconnect_internal(m, 0);
  1895. else if (m->c->connect_state != NOT_IN_PROGRESS)
  1896. nextOrClose(m, rc, "socket error");
  1897. else /* calling disconnect_internal won't have any effect if we're already disconnected */
  1898. MQTTAsync_closeOnly(m->c, MQTTREASONCODE_SUCCESS, NULL);
  1899. }
  1900. else
  1901. {
  1902. if (m->c->messageQueue->count > 0)
  1903. {
  1904. qEntry* qe = (qEntry*)(m->c->messageQueue->first->content);
  1905. int topicLen = qe->topicLen;
  1906. if (strlen(qe->topicName) == topicLen)
  1907. topicLen = 0;
  1908. if (m->ma)
  1909. rc = MQTTAsync_deliverMessage(m, qe->topicName, topicLen, qe->msg);
  1910. else
  1911. rc = 1;
  1912. if (rc)
  1913. {
  1914. #if !defined(NO_PERSISTENCE)
  1915. if (m->c->persistence)
  1916. MQTTPersistence_unpersistQueueEntry(m->c, (MQTTPersistence_qEntry*)qe);
  1917. #endif
  1918. ListRemove(m->c->messageQueue, qe); /* qe is freed here */
  1919. }
  1920. else
  1921. Log(TRACE_MIN, -1, "False returned from messageArrived for client %s, message remains on queue",
  1922. m->c->clientID);
  1923. }
  1924. if (pack)
  1925. {
  1926. if (pack->header.bits.type == CONNACK)
  1927. {
  1928. Connack* connack = (Connack*)pack;
  1929. int sessionPresent = connack->flags.bits.sessionPresent;
  1930. rc = MQTTAsync_completeConnection(m, connack);
  1931. if (rc == MQTTASYNC_SUCCESS)
  1932. {
  1933. int onSuccess = 0;
  1934. if (m->serverURIcount > 0)
  1935. Log(TRACE_MIN, -1, "Connect succeeded to %s",
  1936. m->serverURIs[m->connect.details.conn.currentURI]);
  1937. onSuccess = (m->connect.onSuccess != NULL ||
  1938. m->connect.onSuccess5 != NULL); /* save setting of onSuccess callback */
  1939. if (m->connect.onSuccess)
  1940. {
  1941. MQTTAsync_successData data;
  1942. memset(&data, '\0', sizeof(data));
  1943. Log(TRACE_MIN, -1, "Calling connect success for client %s", m->c->clientID);
  1944. if (m->serverURIcount > 0)
  1945. data.alt.connect.serverURI = m->serverURIs[m->connect.details.conn.currentURI];
  1946. else
  1947. data.alt.connect.serverURI = m->serverURI;
  1948. data.alt.connect.MQTTVersion = m->connect.details.conn.MQTTVersion;
  1949. data.alt.connect.sessionPresent = sessionPresent;
  1950. (*(m->connect.onSuccess))(m->connect.context, &data);
  1951. m->connect.onSuccess = NULL; /* don't accidentally call it again */
  1952. }
  1953. else if (m->connect.onSuccess5)
  1954. {
  1955. MQTTAsync_successData5 data = MQTTAsync_successData5_initializer;
  1956. Log(TRACE_MIN, -1, "Calling connect success for client %s", m->c->clientID);
  1957. if (m->serverURIcount > 0)
  1958. data.alt.connect.serverURI = m->serverURIs[m->connect.details.conn.currentURI];
  1959. else
  1960. data.alt.connect.serverURI = m->serverURI;
  1961. data.alt.connect.MQTTVersion = m->connect.details.conn.MQTTVersion;
  1962. data.alt.connect.sessionPresent = sessionPresent;
  1963. = connack->properties;
  1964. data.reasonCode = connack->rc;
  1965. (*(m->connect.onSuccess5))(m->connect.context, &data);
  1966. m->connect.onSuccess5 = NULL; /* don't accidentally call it again */
  1967. }
  1968. if (m->connected)
  1969. {
  1970. char* reason = (onSuccess) ? "connect onSuccess called" : "automatic reconnect";
  1971. Log(TRACE_MIN, -1, "Calling connected for client %s", m->c->clientID);
  1972. (*(m->connected))(m->connected_context, reason);
  1973. }
  1974. if (m->c->MQTTVersion >= MQTTVERSION_5)
  1975. {
  1976. if (MQTTProperties_hasProperty(&connack->properties, MQTTPROPERTY_CODE_RECEIVE_MAXIMUM))
  1977. {
  1978. int recv_max = MQTTProperties_getNumericValue(&connack->properties, MQTTPROPERTY_CODE_RECEIVE_MAXIMUM);
  1979. if (m->c->maxInflightMessages > recv_max)
  1980. m->c->maxInflightMessages = recv_max;
  1981. }
  1982. }
  1983. MQTTPacket_freeConnack(connack);
  1984. }
  1985. else
  1986. nextOrClose(m, rc, "CONNACK return code");
  1987. }
  1988. else if (pack->header.bits.type == SUBACK)
  1989. {
  1990. ListElement* current = NULL;
  1991. /* use the msgid to find the callback to be called */
  1992. while (ListNextElement(m->responses, &current))
  1993. {
  1994. MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(current->content);
  1995. if (command->command.token == ((Suback*)pack)->msgId)
  1996. {
  1997. Suback* sub = (Suback*)pack;
  1998. if (!ListDetach(m->responses, command)) /* remove the response from the list */
  1999. Log(LOG_ERROR, -1, "Subscribe command not removed from command list");
  2000. /* Call the failure callback if there is one subscribe in the MQTT packet and
  2001. * the return code is 0x80 (failure). If the MQTT packet contains >1 subscription
  2002. * request, then we call onSuccess with the list of returned QoSs, which inelegantly,
  2003. * could include some failures, or worse, the whole list could have failed.
  2004. */
  2005. if (m->c->MQTTVersion >= MQTTVERSION_5)
  2006. {
  2007. if (sub->qoss->count == 1 && *(int*)(sub->qoss->first->content) >= MQTTREASONCODE_UNSPECIFIED_ERROR)
  2008. {
  2009. if (command->command.onFailure5)
  2010. {
  2011. MQTTAsync_failureData5 data = MQTTAsync_failureData5_initializer;
  2012. data.token = command->command.token;
  2013. data.reasonCode = *(int*)(sub->qoss->first->content);
  2014. data.message = NULL;
  2015. = sub->properties;
  2016. Log(TRACE_MIN, -1, "Calling subscribe failure for client %s", m->c->clientID);
  2017. (*(command->command.onFailure5))(command->command.context, &data);
  2018. }
  2019. }
  2020. else if (command->command.onSuccess5)
  2021. {
  2022. MQTTAsync_successData5 data;
  2023. enum MQTTReasonCodes* array = NULL;
  2024. data.reasonCode = *(int*)(sub->qoss->first->content);
  2025. data.alt.sub.reasonCodeCount = sub->qoss->count;
  2026. if (sub->qoss->count > 1)
  2027. {
  2028. ListElement* cur_qos = NULL;
  2029. enum MQTTReasonCodes* element = array = data.alt.sub.reasonCodes = malloc(sub->qoss->count * sizeof(enum MQTTReasonCodes));
  2030. while (ListNextElement(sub->qoss, &cur_qos))
  2031. *element++ = *(int*)(cur_qos->content);
  2032. }
  2033. data.token = command->command.token;
  2034. = sub->properties;
  2035. Log(TRACE_MIN, -1, "Calling subscribe success for client %s", m->c->clientID);
  2036. (*(command->command.onSuccess5))(command->command.context, &data);
  2037. if (array)
  2038. free(array);
  2039. }
  2040. }
  2041. else if (sub->qoss->count == 1 && *(int*)(sub->qoss->first->content) == MQTT_BAD_SUBSCRIBE)
  2042. {
  2043. if (command->command.onFailure)
  2044. {
  2045. MQTTAsync_failureData data;
  2046. data.token = command->command.token;
  2047. data.code = *(int*)(sub->qoss->first->content);
  2048. data.message = NULL;
  2049. Log(TRACE_MIN, -1, "Calling subscribe failure for client %s", m->c->clientID);
  2050. (*(command->command.onFailure))(command->command.context, &data);
  2051. }
  2052. }
  2053. else if (command->command.onSuccess)
  2054. {
  2055. MQTTAsync_successData data;
  2056. int* array = NULL;
  2057. if (sub->qoss->count == 1)
  2058. data.alt.qos = *(int*)(sub->qoss->first->content);
  2059. else if (sub->qoss->count > 1)
  2060. {
  2061. ListElement* cur_qos = NULL;
  2062. int* element = array = data.alt.qosList = malloc(sub->qoss->count * sizeof(int));
  2063. while (ListNextElement(sub->qoss, &cur_qos))
  2064. *element++ = *(int*)(cur_qos->content);
  2065. }
  2066. data.token = command->command.token;
  2067. Log(TRACE_MIN, -1, "Calling subscribe success for client %s", m->c->clientID);
  2068. (*(command->command.onSuccess))(command->command.context, &data);
  2069. if (array)
  2070. free(array);
  2071. }
  2072. MQTTAsync_freeCommand(command);
  2073. break;
  2074. }
  2075. }
  2076. rc = MQTTProtocol_handleSubacks(pack, m->c->net.socket);
  2077. }
  2078. else if (pack->header.bits.type == UNSUBACK)
  2079. {
  2080. ListElement* current = NULL;
  2081. Unsuback* unsub = (Unsuback*)pack;
  2082. /* use the msgid to find the callback to be called */
  2083. while (ListNextElement(m->responses, &current))
  2084. {
  2085. MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(current->content);
  2086. if (command->command.token == ((Unsuback*)pack)->msgId)
  2087. {
  2088. if (!ListDetach(m->responses, command)) /* remove the response from the list */
  2089. Log(LOG_ERROR, -1, "Unsubscribe command not removed from command list");
  2090. if (command->command.onSuccess || command->command.onSuccess5)
  2091. {
  2092. Log(TRACE_MIN, -1, "Calling unsubscribe success for client %s", m->c->clientID);
  2093. if (command->command.onSuccess)
  2094. {
  2095. MQTTAsync_successData data;
  2096. memset(&data, '\0', sizeof(data));
  2097. data.token = command->command.token;
  2098. (*(command->command.onSuccess))(command->command.context, &data);
  2099. }
  2100. else
  2101. {
  2102. MQTTAsync_successData5 data = MQTTAsync_successData5_initializer;
  2103. enum MQTTReasonCodes* array = NULL;
  2104. data.reasonCode = *(enum MQTTReasonCodes*)(unsub->reasonCodes->first->content);
  2105. data.alt.unsub.reasonCodeCount = unsub->reasonCodes->count;
  2106. if (unsub->reasonCodes->count > 1)
  2107. {
  2108. ListElement* cur_rc = NULL;
  2109. enum MQTTReasonCodes* element = array = data.alt.unsub.reasonCodes = malloc(unsub->reasonCodes->count * sizeof(enum MQTTReasonCodes));
  2110. while (ListNextElement(unsub->reasonCodes, &cur_rc))
  2111. *element++ = *(enum MQTTReasonCodes*)(cur_rc->content);
  2112. }
  2113. data.token = command->command.token;
  2114. = unsub->properties;
  2115. Log(TRACE_MIN, -1, "Calling unsubscribe success for client %s", m->c->clientID);
  2116. (*(command->command.onSuccess5))(command->command.context, &data);
  2117. if (array)
  2118. free(array);
  2119. }
  2120. }
  2121. MQTTAsync_freeCommand(command);
  2122. break;
  2123. }
  2124. }
  2125. rc = MQTTProtocol_handleUnsubacks(pack, m->c->net.socket);
  2126. }
  2127. else if (pack->header.bits.type == DISCONNECT)
  2128. {
  2129. Ack* disc = (Ack*)pack;
  2130. if (m->disconnected)
  2131. {
  2132. Log(TRACE_MIN, -1, "Calling disconnected for client %s", m->c->clientID);
  2133. (*(m->disconnected))(m->disconnected_context, &disc->properties, disc->rc);
  2134. }
  2135. MQTTPacket_freeAck(disc);
  2136. }
  2137. }
  2138. }
  2139. }
  2140. receiveThread_state = STOPPED;
  2141. receiveThread_id = 0;
  2142. MQTTAsync_unlock_mutex(mqttasync_mutex);
  2143. #if !defined(WIN32) && !defined(WIN64)
  2144. if (sendThread_state != STOPPED)
  2145. Thread_signal_cond(send_cond);
  2146. #else
  2147. if (sendThread_state != STOPPED)
  2148. Thread_post_sem(send_sem);
  2149. #endif
  2150. FUNC_EXIT;
  2151. return 0;
  2152. }
  2153. static void MQTTAsync_stop(void)
  2154. {
  2155. int rc = 0;
  2156. FUNC_ENTRY;
  2157. if (sendThread_state != STOPPED || receiveThread_state != STOPPED)
  2158. {
  2159. int conn_count = 0;
  2160. ListElement* current = NULL;
  2161. if (handles != NULL)
  2162. {
  2163. /* find out how many handles are still connected */
  2164. while (ListNextElement(handles, &current))
  2165. {
  2166. if (((MQTTAsyncs*)(current->content))->c->connect_state > NOT_IN_PROGRESS ||
  2167. ((MQTTAsyncs*)(current->content))->c->connected)
  2168. ++conn_count;
  2169. }
  2170. }
  2171. Log(TRACE_MIN, -1, "Conn_count is %d", conn_count);
  2172. /* stop the background thread, if we are the last one to be using it */
  2173. if (conn_count == 0)
  2174. {
  2175. int count = 0;
  2176. tostop = 1;
  2177. while ((sendThread_state != STOPPED || receiveThread_state != STOPPED) && ++count < 100)
  2178. {
  2179. MQTTAsync_unlock_mutex(mqttasync_mutex);
  2180. Log(TRACE_MIN, -1, "sleeping");
  2181. MQTTAsync_sleep(100L);
  2182. MQTTAsync_lock_mutex(mqttasync_mutex);
  2183. }
  2184. rc = 1;
  2185. tostop = 0;
  2186. }
  2187. }
  2188. FUNC_EXIT_RC(rc);
  2189. }
  2190. int MQTTAsync_setCallbacks(MQTTAsync handle, void* context,
  2191. MQTTAsync_connectionLost* cl,
  2192. MQTTAsync_messageArrived* ma,
  2193. MQTTAsync_deliveryComplete* dc)
  2194. {
  2195. int rc = MQTTASYNC_SUCCESS;
  2196. MQTTAsyncs* m = handle;
  2197. FUNC_ENTRY;
  2198. MQTTAsync_lock_mutex(mqttasync_mutex);
  2199. if (m == NULL || ma == NULL || m->c == NULL || m->c->connect_state != NOT_IN_PROGRESS)
  2201. else
  2202. {
  2203. m->clContext = m->maContext = m->dcContext = context;
  2204. m->cl = cl;
  2205. m->ma = ma;
  2206. m->dc = dc;
  2207. }
  2208. MQTTAsync_unlock_mutex(mqttasync_mutex);
  2209. FUNC_EXIT_RC(rc);
  2210. return rc;
  2211. }
  2212. int MQTTAsync_setConnectionLostCallback(MQTTAsync handle, void* context,
  2213. MQTTAsync_connectionLost* cl)
  2214. {
  2215. int rc = MQTTASYNC_SUCCESS;
  2216. MQTTAsyncs* m = handle;
  2217. FUNC_ENTRY;
  2218. MQTTAsync_lock_mutex(mqttasync_mutex);
  2219. if (m == NULL || m->c->connect_state != 0)
  2221. else
  2222. {
  2223. m->clContext = context;
  2224. m->cl = cl;
  2225. }
  2226. MQTTAsync_unlock_mutex(mqttasync_mutex);
  2227. FUNC_EXIT_RC(rc);
  2228. return rc;
  2229. }
  2230. int MQTTAsync_setMessageArrivedCallback(MQTTAsync handle, void* context,
  2231. MQTTAsync_messageArrived* ma)
  2232. {
  2233. int rc = MQTTASYNC_SUCCESS;
  2234. MQTTAsyncs* m = handle;
  2235. FUNC_ENTRY;
  2236. MQTTAsync_lock_mutex(mqttasync_mutex);
  2237. if (m == NULL || ma == NULL || m->c->connect_state != 0)
  2239. else
  2240. {
  2241. m->maContext = context;
  2242. m->ma = ma;
  2243. }
  2244. MQTTAsync_unlock_mutex(mqttasync_mutex);
  2245. FUNC_EXIT_RC(rc);
  2246. return rc;
  2247. }
  2248. int MQTTAsync_setDeliveryCompleteCallback(MQTTAsync handle, void* context,
  2249. MQTTAsync_deliveryComplete* dc)
  2250. {
  2251. int rc = MQTTASYNC_SUCCESS;
  2252. MQTTAsyncs* m = handle;
  2253. FUNC_ENTRY;
  2254. MQTTAsync_lock_mutex(mqttasync_mutex);
  2255. if (m == NULL || m->c->connect_state != 0)
  2257. else
  2258. {
  2259. m->dcContext = context;
  2260. m->dc = dc;
  2261. }
  2262. MQTTAsync_unlock_mutex(mqttasync_mutex);
  2263. FUNC_EXIT_RC(rc);
  2264. return rc;
  2265. }
  2266. int MQTTAsync_setDisconnected(MQTTAsync handle, void* context, MQTTAsync_disconnected* disconnected)
  2267. {
  2268. int rc = MQTTASYNC_SUCCESS;
  2269. MQTTAsyncs* m = handle;
  2270. FUNC_ENTRY;
  2271. MQTTAsync_lock_mutex(mqttasync_mutex);
  2272. if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
  2274. else
  2275. {
  2276. m->disconnected_context = context;
  2277. m->disconnected = disconnected;
  2278. }
  2279. MQTTAsync_unlock_mutex(mqttasync_mutex);
  2280. FUNC_EXIT_RC(rc);
  2281. return rc;
  2282. }
  2283. int MQTTAsync_setConnected(MQTTAsync handle, void* context, MQTTAsync_connected* connected)
  2284. {
  2285. int rc = MQTTASYNC_SUCCESS;
  2286. MQTTAsyncs* m = handle;
  2287. FUNC_ENTRY;
  2288. MQTTAsync_lock_mutex(mqttasync_mutex);
  2289. if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
  2291. else
  2292. {
  2293. m->connected_context = context;
  2294. m->connected = connected;
  2295. }
  2296. MQTTAsync_unlock_mutex(mqttasync_mutex);
  2297. FUNC_EXIT_RC(rc);
  2298. return rc;
  2299. }
  2300. static void MQTTAsync_closeOnly(Clients* client, enum MQTTReasonCodes reasonCode, MQTTProperties* props)
  2301. {
  2302. FUNC_ENTRY;
  2303. client->good = 0;
  2304. client->ping_outstanding = 0;
  2305. if (client->net.socket > 0)
  2306. {
  2307. MQTTProtocol_checkPendingWrites();
  2308. if (client->connected && Socket_noPendingWrites(client->net.socket))
  2309. MQTTPacket_send_disconnect(client, reasonCode, props);
  2310. Thread_lock_mutex(socket_mutex);
  2311. WebSocket_close(&client->net, WebSocket_CLOSE_NORMAL, NULL);
  2312. #if defined(OPENSSL)
  2313. SSLSocket_close(&client->net);
  2314. #endif
  2315. Socket_close(client->net.socket);
  2316. client->net.socket = 0;
  2317. #if defined(OPENSSL)
  2318. client->net.ssl = NULL;
  2319. #endif
  2320. Thread_unlock_mutex(socket_mutex);
  2321. }
  2322. client->connected = 0;
  2323. client->connect_state = NOT_IN_PROGRESS;
  2324. FUNC_EXIT;
  2325. }
  2326. static void MQTTAsync_closeSession(Clients* client, enum MQTTReasonCodes reasonCode, MQTTProperties* props)
  2327. {
  2328. FUNC_ENTRY;
  2329. MQTTAsync_closeOnly(client, reasonCode, props);
  2330. if (client->cleansession ||
  2331. (client->MQTTVersion >= MQTTVERSION_5 && client->sessionExpiry == 0))
  2332. MQTTAsync_cleanSession(client);
  2333. FUNC_EXIT;
  2334. }
  2335. /**
  2336. * List callback function for comparing clients by client structure
  2337. * @param a Async structure
  2338. * @param b Client structure
  2339. * @return boolean indicating whether a and b are equal
  2340. */
  2341. static int clientStructCompare(void* a, void* b)
  2342. {
  2343. MQTTAsyncs* m = (MQTTAsyncs*)a;
  2344. return m->c == (Clients*)b;
  2345. }
  2346. static int MQTTAsync_cleanSession(Clients* client)
  2347. {
  2348. int rc = 0;
  2349. ListElement* found = NULL;
  2350. FUNC_ENTRY;
  2351. #if !defined(NO_PERSISTENCE)
  2352. rc = MQTTPersistence_clear(client);
  2353. #endif
  2354. MQTTProtocol_emptyMessageList(client->inboundMsgs);
  2355. MQTTProtocol_emptyMessageList(client->outboundMsgs);
  2356. MQTTAsync_emptyMessageQueue(client);
  2357. client->msgID = 0;
  2358. if ((found = ListFindItem(handles, client, clientStructCompare)) != NULL)
  2359. {
  2360. MQTTAsyncs* m = (MQTTAsyncs*)(found->content);
  2361. MQTTAsync_removeResponsesAndCommands(m);
  2362. }
  2363. else
  2364. Log(LOG_ERROR, -1, "cleanSession: did not find client structure in handles list");
  2365. FUNC_EXIT_RC(rc);
  2366. return rc;
  2367. }
  2368. static int MQTTAsync_deliverMessage(MQTTAsyncs* m, char* topicName, size_t topicLen, MQTTAsync_message* mm)
  2369. {
  2370. int rc;
  2371. Log(TRACE_MIN, -1, "Calling messageArrived for client %s, queue depth %d",
  2372. m->c->clientID, m->c->messageQueue->count);
  2373. rc = (*(m->ma))(m->maContext, topicName, (int)topicLen, mm);
  2374. /* if 0 (false) is returned by the callback then it failed, so we don't remove the message from
  2375. * the queue, and it will be retried later. If 1 is returned then the message data may have been freed,
  2376. * so we must be careful how we use it.
  2377. */
  2378. return rc;
  2379. }
  2380. void Protocol_processPublication(Publish* publish, Clients* client)
  2381. {
  2382. MQTTAsync_message* mm = NULL;
  2383. MQTTAsync_message initialized = MQTTAsync_message_initializer;
  2384. int rc = 0;
  2385. FUNC_ENTRY;
  2386. mm = malloc(sizeof(MQTTAsync_message));
  2387. memcpy(mm, &initialized, sizeof(MQTTAsync_message));
  2388. /* If the message is QoS 2, then we have already stored the incoming payload
  2389. * in an allocated buffer, so we don't need to copy again.
  2390. */
  2391. if (publish->header.bits.qos == 2)
  2392. mm->payload = publish->payload;
  2393. else
  2394. {
  2395. mm->payload = malloc(publish->payloadlen);
  2396. memcpy(mm->payload, publish->payload, publish->payloadlen);
  2397. }
  2398. mm->payloadlen = publish->payloadlen;
  2399. mm->qos = publish->header.bits.qos;
  2400. mm->retained = publish->header.bits.retain;
  2401. if (publish->header.bits.qos == 2)
  2402. mm->dup = 0; /* ensure that a QoS2 message is not passed to the application with dup = 1 */
  2403. else
  2404. mm->dup = publish->header.bits.dup;
  2405. mm->msgid = publish->msgId;
  2406. if (publish->MQTTVersion >= MQTTVERSION_5)
  2407. mm->properties = MQTTProperties_copy(&publish->properties);
  2408. if (client->messageQueue->count == 0 && client->connected)
  2409. {
  2410. ListElement* found = NULL;
  2411. if ((found = ListFindItem(handles, client, clientStructCompare)) == NULL)
  2412. Log(LOG_ERROR, -1, "processPublication: did not find client structure in handles list");
  2413. else
  2414. {
  2415. MQTTAsyncs* m = (MQTTAsyncs*)(found->content);
  2416. if (m->ma)
  2417. rc = MQTTAsync_deliverMessage(m, publish->topic, publish->topiclen, mm);
  2418. }
  2419. }
  2420. if (rc == 0) /* if message was not delivered, queue it up */
  2421. {
  2422. qEntry* qe = malloc(sizeof(qEntry));
  2423. qe->msg = mm;
  2424. qe->topicName = publish->topic;
  2425. qe->topicLen = publish->topiclen;
  2426. ListAppend(client->messageQueue, qe, sizeof(qe) + sizeof(mm) + mm->payloadlen + strlen(qe->topicName)+1);
  2427. #if !defined(NO_PERSISTENCE)
  2428. if (client->persistence)
  2429. MQTTPersistence_persistQueueEntry(client, (MQTTPersistence_qEntry*)qe);
  2430. #endif
  2431. }
  2432. publish->topic = NULL;
  2433. FUNC_EXIT;
  2434. }
  2435. static int retryLoopInterval = 5;
  2436. static void setRetryLoopInterval(int keepalive)
  2437. {
  2438. int proposed = keepalive / 10;
  2439. if (proposed < 1)
  2440. proposed = 1;
  2441. else if (proposed > 5)
  2442. proposed = 5;
  2443. if (proposed < retryLoopInterval)
  2444. retryLoopInterval = proposed;
  2445. }
  2446. int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
  2447. {
  2448. MQTTAsyncs* m = handle;
  2449. int rc = MQTTASYNC_SUCCESS;
  2450. MQTTAsync_queuedCommand* conn;
  2451. FUNC_ENTRY;
  2452. if (options == NULL)
  2453. {
  2455. goto exit;
  2456. }
  2457. if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 6)
  2458. {
  2460. goto exit;
  2461. }
  2462. #if defined(OPENSSL)
  2463. if (m->ssl && options->ssl == NULL)
  2464. {
  2466. goto exit;
  2467. }
  2468. #endif
  2469. if (options->will) /* check validity of will options structure */
  2470. {
  2471. if (strncmp(options->will->struct_id, "MQTW", 4) != 0 || (options->will->struct_version != 0 && options->will->struct_version != 1))
  2472. {
  2474. goto exit;
  2475. }
  2476. if (options->will->qos < 0 || options->will->qos > 2)
  2477. {
  2478. rc = MQTTASYNC_BAD_QOS;
  2479. goto exit;
  2480. }
  2481. }
  2482. if (options->struct_version != 0 && options->ssl) /* check validity of SSL options structure */
  2483. {
  2484. if (strncmp(options->ssl->struct_id, "MQTS", 4) != 0 || options->ssl->struct_version < 0 || options->ssl->struct_version > 4)
  2485. {
  2487. goto exit;
  2488. }
  2489. }
  2490. if (options->MQTTVersion >= MQTTVERSION_5 && m->c->MQTTVersion < MQTTVERSION_5)
  2491. {
  2493. goto exit;
  2494. }
  2495. if ((options->username && !UTF8_validateString(options->username)) ||
  2496. (options->password && !UTF8_validateString(options->password)))
  2497. {
  2499. goto exit;
  2500. }
  2501. if (options->MQTTVersion >= MQTTVERSION_5 && options->struct_version < 6)
  2502. {
  2504. goto exit;
  2505. }
  2506. if (options->MQTTVersion >= MQTTVERSION_5 && options->cleansession != 0)
  2507. {
  2509. goto exit;
  2510. }
  2511. if (options->MQTTVersion < MQTTVERSION_5 && options->struct_version >= 6)
  2512. {
  2513. if (options->cleanstart != 0 || options->onFailure5 || options->onSuccess5 ||
  2514. options->connectProperties || options->willProperties)
  2515. {
  2517. goto exit;
  2518. }
  2519. }
  2520. m->connect.onSuccess = options->onSuccess;
  2521. m->connect.onFailure = options->onFailure;
  2522. if (options->struct_version >= 6)
  2523. {
  2524. m->connect.onSuccess5 = options->onSuccess5;
  2525. m->connect.onFailure5 = options->onFailure5;
  2526. }
  2527. m->connect.context = options->context;
  2528. m->connectTimeout = options->connectTimeout;
  2529. tostop = 0;
  2530. if (sendThread_state != STARTING && sendThread_state != RUNNING)
  2531. {
  2532. MQTTAsync_lock_mutex(mqttasync_mutex);
  2533. sendThread_state = STARTING;
  2534. Thread_start(MQTTAsync_sendThread, NULL);
  2535. MQTTAsync_unlock_mutex(mqttasync_mutex);
  2536. }
  2537. if (receiveThread_state != STARTING && receiveThread_state != RUNNING)
  2538. {
  2539. MQTTAsync_lock_mutex(mqttasync_mutex);
  2540. receiveThread_state = STARTING;
  2541. Thread_start(MQTTAsync_receiveThread, handle);
  2542. MQTTAsync_unlock_mutex(mqttasync_mutex);
  2543. }
  2544. m->c->keepAliveInterval = options->keepAliveInterval;
  2545. setRetryLoopInterval(options->keepAliveInterval);
  2546. m->c->cleansession = options->cleansession;
  2547. m->c->maxInflightMessages = options->maxInflight;
  2548. if (options->struct_version >= 3)
  2549. m->c->MQTTVersion = options->MQTTVersion;
  2550. else
  2551. m->c->MQTTVersion = MQTTVERSION_DEFAULT;
  2552. if (options->struct_version >= 4)
  2553. {
  2554. m->automaticReconnect = options->automaticReconnect;
  2555. m->minRetryInterval = options->minRetryInterval;
  2556. m->maxRetryInterval = options->maxRetryInterval;
  2557. }
  2558. if (m->c->will)
  2559. {
  2560. free(m->c->will->payload);
  2561. free(m->c->will->topic);
  2562. free(m->c->will);
  2563. m->c->will = NULL;
  2564. }
  2565. if (options->will && (options->will->struct_version == 0 || options->will->struct_version == 1))
  2566. {
  2567. const void* source = NULL;
  2568. m->c->will = malloc(sizeof(willMessages));
  2569. if (options->will->message || (options->will->struct_version == 1 && options->will->
  2570. {
  2571. if (options->will->struct_version == 1 && options->will->
  2572. {
  2573. m->c->will->payloadlen = options->will->payload.len;
  2574. source = options->will->;
  2575. }
  2576. else
  2577. {
  2578. m->c->will->payloadlen = (int)strlen(options->will->message);
  2579. source = (void*)options->will->message;
  2580. }
  2581. m->c->will->payload = malloc(m->c->will->payloadlen);
  2582. memcpy(m->c->will->payload, source, m->c->will->payloadlen);
  2583. }
  2584. else
  2585. {
  2586. m->c->will->payload = NULL;
  2587. m->c->will->payloadlen = 0;
  2588. }
  2589. m->c->will->qos = options->will->qos;
  2590. m->c->will->retained = options->will->retained;
  2591. m->c->will->topic = MQTTStrdup(options->will->topicName);
  2592. }
  2593. #if defined(OPENSSL)
  2594. if (m->c->sslopts)
  2595. {
  2596. if (m->c->sslopts->trustStore)
  2597. free((void*)m->c->sslopts->trustStore);
  2598. if (m->c->sslopts->keyStore)
  2599. free((void*)m->c->sslopts->keyStore);
  2600. if (m->c->sslopts->privateKey)
  2601. free((void*)m->c->sslopts->privateKey);
  2602. if (m->c->sslopts->privateKeyPassword)
  2603. free((void*)m->c->sslopts->privateKeyPassword);
  2604. if (m->c->sslopts->enabledCipherSuites)
  2605. free((void*)m->c->sslopts->enabledCipherSuites);
  2606. if (m->c->sslopts->struct_version >= 2)
  2607. {
  2608. if (m->c->sslopts->CApath)
  2609. free((void*)m->c->sslopts->CApath);
  2610. }
  2611. free((void*)m->c->sslopts);
  2612. m->c->sslopts = NULL;
  2613. }
  2614. if (options->struct_version != 0 && options->ssl)
  2615. {
  2616. m->c->sslopts = malloc(sizeof(MQTTClient_SSLOptions));
  2617. memset(m->c->sslopts, '\0', sizeof(MQTTClient_SSLOptions));
  2618. m->c->sslopts->struct_version = options->ssl->struct_version;
  2619. if (options->ssl->trustStore)
  2620. m->c->sslopts->trustStore = MQTTStrdup(options->ssl->trustStore);
  2621. if (options->ssl->keyStore)
  2622. m->c->sslopts->keyStore = MQTTStrdup(options->ssl->keyStore);
  2623. if (options->ssl->privateKey)
  2624. m->c->sslopts->privateKey = MQTTStrdup(options->ssl->privateKey);
  2625. if (options->ssl->privateKeyPassword)
  2626. m->c->sslopts->privateKeyPassword = MQTTStrdup(options->ssl->privateKeyPassword);
  2627. if (options->ssl->enabledCipherSuites)
  2628. m->c->sslopts->enabledCipherSuites = MQTTStrdup(options->ssl->enabledCipherSuites);
  2629. m->c->sslopts->enableServerCertAuth = options->ssl->enableServerCertAuth;
  2630. if (m->c->sslopts->struct_version >= 1)
  2631. m->c->sslopts->sslVersion = options->ssl->sslVersion;
  2632. if (m->c->sslopts->struct_version >= 2)
  2633. {
  2634. m->c->sslopts->verify = options->ssl->verify;
  2635. if (options->ssl->CApath)
  2636. m->c->sslopts->CApath = MQTTStrdup(options->ssl->CApath);
  2637. }
  2638. if (m->c->sslopts->struct_version >= 3)
  2639. {
  2640. m->c->sslopts->ssl_error_cb = options->ssl->ssl_error_cb;
  2641. m->c->sslopts->ssl_error_context = options->ssl->ssl_error_context;
  2642. }
  2643. if (m->c->sslopts->struct_version >= 4)
  2644. {
  2645. m->c->sslopts->ssl_psk_cb = options->ssl->ssl_psk_cb;
  2646. m->c->sslopts->ssl_psk_context = options->ssl->ssl_psk_context;
  2647. m->c->sslopts->disableDefaultTrustStore = options->ssl->disableDefaultTrustStore;
  2648. }
  2649. }
  2650. #else
  2651. if (options->struct_version != 0 && options->ssl)
  2652. {
  2654. goto exit;
  2655. }
  2656. #endif
  2657. if (m->c->username)
  2658. free((void*)m->c->username);
  2659. if (options->username)
  2660. m->c->username = MQTTStrdup(options->username);
  2661. if (m->c->password)
  2662. free((void*)m->c->password);
  2663. if (options->password)
  2664. {
  2665. m->c->password = MQTTStrdup(options->password);
  2666. m->c->passwordlen = (int)strlen(options->password);
  2667. }
  2668. else if (options->struct_version >= 5 && options->
  2669. {
  2670. m->c->passwordlen = options->binarypwd.len;
  2671. m->c->password = malloc(m->c->passwordlen);
  2672. memcpy((void*)m->c->password, options->, m->c->passwordlen);
  2673. }
  2674. m->c->retryInterval = options->retryInterval;
  2675. m->shouldBeConnected = 1;
  2676. m->connectTimeout = options->connectTimeout;
  2677. MQTTAsync_freeServerURIs(m);
  2678. if (options->struct_version >= 2 && options->serverURIcount > 0)
  2679. {
  2680. int i;
  2681. m->serverURIcount = options->serverURIcount;
  2682. m->serverURIs = malloc(options->serverURIcount * sizeof(char*));
  2683. for (i = 0; i < options->serverURIcount; ++i)
  2684. m->serverURIs[i] = MQTTStrdup(options->serverURIs[i]);
  2685. }
  2686. if (m->connectProps)
  2687. {
  2688. MQTTProperties_free(m->connectProps);
  2689. free(m->connectProps);
  2690. m->connectProps = NULL;
  2691. }
  2692. if (m->willProps)
  2693. {
  2694. MQTTProperties_free(m->willProps);
  2695. free(m->willProps);
  2696. m->willProps = NULL;
  2697. }
  2698. if (options->struct_version >=6)
  2699. {
  2700. if (options->connectProperties)
  2701. {
  2702. MQTTProperties initialized = MQTTProperties_initializer;
  2703. m->connectProps = malloc(sizeof(MQTTProperties));
  2704. *m->connectProps = initialized;
  2705. *m->connectProps = MQTTProperties_copy(options->connectProperties);
  2706. if (MQTTProperties_hasProperty(options->connectProperties, MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL))
  2707. m->c->sessionExpiry = MQTTProperties_getNumericValue(options->connectProperties,
  2709. }
  2710. if (options->willProperties)
  2711. {
  2712. MQTTProperties initialized = MQTTProperties_initializer;
  2713. m->willProps = malloc(sizeof(MQTTProperties));
  2714. *m->willProps = initialized;
  2715. *m->willProps = MQTTProperties_copy(options->willProperties);
  2716. }
  2717. m->c->cleanstart = options->cleanstart;
  2718. }
  2719. /* Add connect request to operation queue */
  2720. conn = malloc(sizeof(MQTTAsync_queuedCommand));
  2721. memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
  2722. conn->client = m;
  2723. if (options)
  2724. {
  2725. conn->command.onSuccess = options->onSuccess;
  2726. conn->command.onFailure = options->onFailure;
  2727. conn->command.onSuccess5 = options->onSuccess5;
  2728. conn->command.onFailure5 = options->onFailure5;
  2729. conn->command.context = options->context;
  2730. }
  2731. conn->command.type = CONNECT;
  2732. conn->command.details.conn.currentURI = 0;
  2733. rc = MQTTAsync_addCommand(conn, sizeof(conn));
  2734. exit:
  2735. FUNC_EXIT_RC(rc);
  2736. return rc;
  2737. }
  2738. static int MQTTAsync_disconnect1(MQTTAsync handle, const MQTTAsync_disconnectOptions* options, int internal)
  2739. {
  2740. MQTTAsyncs* m = handle;
  2741. int rc = MQTTASYNC_SUCCESS;
  2742. MQTTAsync_queuedCommand* dis;
  2743. FUNC_ENTRY;
  2744. if (m == NULL || m->c == NULL)
  2745. {
  2747. goto exit;
  2748. }
  2749. if (!internal)
  2750. m->shouldBeConnected = 0;
  2751. if (m->c->connected == 0)
  2752. {
  2754. goto exit;
  2755. }
  2756. /* Add disconnect request to operation queue */
  2757. dis = malloc(sizeof(MQTTAsync_queuedCommand));
  2758. memset(dis, '\0', sizeof(MQTTAsync_queuedCommand));
  2759. dis->client = m;
  2760. if (options)
  2761. {
  2762. dis->command.onSuccess = options->onSuccess;
  2763. dis->command.onFailure = options->onFailure;
  2764. dis->command.onSuccess5 = options->onSuccess5;
  2765. dis->command.onFailure5 = options->onFailure5;
  2766. dis->command.context = options->context;
  2767. dis->command.details.dis.timeout = options->timeout;
  2768. if (m->c->MQTTVersion >= MQTTVERSION_5 && options->struct_version >= 1)
  2769. {
  2770. dis-> = MQTTProperties_copy(&options->properties);
  2771. dis->command.details.dis.reasonCode = options->reasonCode;
  2772. }
  2773. }
  2774. dis->command.type = DISCONNECT;
  2775. dis->command.details.dis.internal = internal;
  2776. rc = MQTTAsync_addCommand(dis, sizeof(dis));
  2777. exit:
  2778. FUNC_EXIT_RC(rc);
  2779. return rc;
  2780. }
  2781. static int MQTTAsync_disconnect_internal(MQTTAsync handle, int timeout)
  2782. {
  2783. MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
  2784. options.timeout = timeout;
  2785. return MQTTAsync_disconnect1(handle, &options, 1);
  2786. }
  2787. void MQTTProtocol_closeSession(Clients* c, int sendwill)
  2788. {
  2789. MQTTAsync_disconnect_internal((MQTTAsync)c->context, 0);
  2790. }
  2791. int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions* options)
  2792. {
  2793. if (options != NULL && (strncmp(options->struct_id, "MQTD", 4) != 0 || options->struct_version < 0 || options->struct_version > 1))
  2795. else
  2796. return MQTTAsync_disconnect1(handle, options, 0);
  2797. }
  2798. int MQTTAsync_isConnected(MQTTAsync handle)
  2799. {
  2800. MQTTAsyncs* m = handle;
  2801. int rc = 0;
  2802. FUNC_ENTRY;
  2803. MQTTAsync_lock_mutex(mqttasync_mutex);
  2804. if (m && m->c)
  2805. rc = m->c->connected;
  2806. MQTTAsync_unlock_mutex(mqttasync_mutex);
  2807. FUNC_EXIT_RC(rc);
  2808. return rc;
  2809. }
  2810. static int cmdMessageIDCompare(void* a, void* b)
  2811. {
  2812. MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)a;
  2813. return cmd->command.token == *(int*)b;
  2814. }
  2815. /**
  2816. * Assign a new message id for a client. Make sure it isn't already being used and does
  2817. * not exceed the maximum.
  2818. * @param m a client structure
  2819. * @return the next message id to use, or 0 if none available
  2820. */
  2821. static int MQTTAsync_assignMsgId(MQTTAsyncs* m)
  2822. {
  2823. int start_msgid = m->c->msgID;
  2824. int msgid = start_msgid;
  2825. thread_id_type thread_id = 0;
  2826. int locked = 0;
  2827. /* need to check: commands list and response list for a client */
  2828. FUNC_ENTRY;
  2829. /* We might be called in a callback. In which case, this mutex will be already locked. */
  2830. thread_id = Thread_getid();
  2831. if (thread_id != sendThread_id && thread_id != receiveThread_id)
  2832. {
  2833. MQTTAsync_lock_mutex(mqttasync_mutex);
  2834. locked = 1;
  2835. }
  2836. msgid = (msgid == MAX_MSG_ID) ? 1 : msgid + 1;
  2837. while (ListFindItem(commands, &msgid, cmdMessageIDCompare) ||
  2838. ListFindItem(m->responses, &msgid, cmdMessageIDCompare))
  2839. {
  2840. msgid = (msgid == MAX_MSG_ID) ? 1 : msgid + 1;
  2841. if (msgid == start_msgid)
  2842. { /* we've tried them all - none free */
  2843. msgid = 0;
  2844. break;
  2845. }
  2846. }
  2847. if (msgid != 0)
  2848. m->c->msgID = msgid;
  2849. if (locked)
  2850. MQTTAsync_unlock_mutex(mqttasync_mutex);
  2851. FUNC_EXIT_RC(msgid);
  2852. return msgid;
  2853. }
  2854. int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, int* qos, MQTTAsync_responseOptions* response)
  2855. {
  2856. MQTTAsyncs* m = handle;
  2857. int i = 0;
  2858. int rc = MQTTASYNC_SUCCESS;
  2859. MQTTAsync_queuedCommand* sub;
  2860. int msgid = 0;
  2861. FUNC_ENTRY;
  2862. if (m == NULL || m->c == NULL)
  2864. else if (m->c->connected == 0)
  2866. else for (i = 0; i < count; i++)
  2867. {
  2868. if (!UTF8_validateString(topic[i]))
  2869. {
  2871. break;
  2872. }
  2873. if (qos[i] < 0 || qos[i] > 2)
  2874. {
  2875. rc = MQTTASYNC_BAD_QOS;
  2876. break;
  2877. }
  2878. }
  2879. if (rc != MQTTASYNC_SUCCESS)
  2880. ; /* don't overwrite a previous error code */
  2881. else if ((msgid = MQTTAsync_assignMsgId(m)) == 0)
  2883. else if (m->c->MQTTVersion >= MQTTVERSION_5 && count > 1 && (count != response->subscribeOptionsCount
  2884. && response->subscribeOptionsCount != 0))
  2886. else if (response)
  2887. {
  2888. if (m->c->MQTTVersion >= MQTTVERSION_5)
  2889. {
  2890. if (response->struct_version == 0 || response->onFailure || response->onSuccess)
  2892. }
  2893. else if (m->c->MQTTVersion < MQTTVERSION_5)
  2894. {
  2895. if (response->struct_version >= 1 && (response->onFailure5 || response->onSuccess5))
  2897. }
  2898. }
  2899. if (rc != MQTTASYNC_SUCCESS)
  2900. goto exit;
  2901. /* Add subscribe request to operation queue */
  2902. sub = malloc(sizeof(MQTTAsync_queuedCommand));
  2903. memset(sub, '\0', sizeof(MQTTAsync_queuedCommand));
  2904. sub->client = m;
  2905. sub->command.token = msgid;
  2906. if (response)
  2907. {
  2908. sub->command.onSuccess = response->onSuccess;
  2909. sub->command.onFailure = response->onFailure;
  2910. sub->command.onSuccess5 = response->onSuccess5;
  2911. sub->command.onFailure5 = response->onFailure5;
  2912. sub->command.context = response->context;
  2913. response->token = sub->command.token;
  2914. if (m->c->MQTTVersion >= MQTTVERSION_5)
  2915. {
  2916. sub-> = MQTTProperties_copy(&response->properties);
  2917. sub->command.details.sub.opts = response->subscribeOptions;
  2918. if (count > 1)
  2919. {
  2920. sub->command.details.sub.optlist = malloc(sizeof(MQTTSubscribe_options) * count);
  2921. if (response->subscribeOptionsCount == 0)
  2922. {
  2923. MQTTSubscribe_options initialized = MQTTSubscribe_options_initializer;
  2924. for (i = 0; i < count; ++i)
  2925. sub->command.details.sub.optlist[i] = initialized;
  2926. }
  2927. else
  2928. {
  2929. for (i = 0; i < count; ++i)
  2930. sub->command.details.sub.optlist[i] = response->subscribeOptionsList[i];
  2931. }
  2932. }
  2933. }
  2934. }
  2935. sub->command.type = SUBSCRIBE;
  2936. sub->command.details.sub.count = count;
  2937. sub->command.details.sub.topics = malloc(sizeof(char*) * count);
  2938. sub->command.details.sub.qoss = malloc(sizeof(int) * count);
  2939. for (i = 0; i < count; ++i)
  2940. {
  2941. sub->command.details.sub.topics[i] = MQTTStrdup(topic[i]);
  2942. sub->command.details.sub.qoss[i] = qos[i];
  2943. }
  2944. rc = MQTTAsync_addCommand(sub, sizeof(sub));
  2945. exit:
  2946. FUNC_EXIT_RC(rc);
  2947. return rc;
  2948. }
  2949. int MQTTAsync_subscribe(MQTTAsync handle, const char* topic, int qos, MQTTAsync_responseOptions* response)
  2950. {
  2951. int rc = 0;
  2952. FUNC_ENTRY;
  2953. rc = MQTTAsync_subscribeMany(handle, 1, (char * const *)(&topic), &qos, response);
  2954. FUNC_EXIT_RC(rc);
  2955. return rc;
  2956. }
  2957. int MQTTAsync_unsubscribeMany(MQTTAsync handle, int count, char* const* topic, MQTTAsync_responseOptions* response)
  2958. {
  2959. MQTTAsyncs* m = handle;
  2960. int i = 0;
  2961. int rc = MQTTASYNC_SUCCESS;
  2962. MQTTAsync_queuedCommand* unsub;
  2963. int msgid = 0;
  2964. FUNC_ENTRY;
  2965. if (m == NULL || m->c == NULL)
  2967. else if (m->c->connected == 0)
  2969. else for (i = 0; i < count; i++)
  2970. {
  2971. if (!UTF8_validateString(topic[i]))
  2972. {
  2974. break;
  2975. }
  2976. }
  2977. if (rc != MQTTASYNC_SUCCESS)
  2978. ; /* don't overwrite a previous error code */
  2979. else if ((msgid = MQTTAsync_assignMsgId(m)) == 0)
  2981. else if (response)
  2982. {
  2983. if (m->c->MQTTVersion >= MQTTVERSION_5)
  2984. {
  2985. if (response->struct_version == 0 || response->onFailure || response->onSuccess)
  2987. }
  2988. else if (m->c->MQTTVersion < MQTTVERSION_5)
  2989. {
  2990. if (response->struct_version >= 1 && (response->onFailure5 || response->onSuccess5))
  2992. }
  2993. }
  2994. if (rc != MQTTASYNC_SUCCESS)
  2995. goto exit;
  2996. /* Add unsubscribe request to operation queue */
  2997. unsub = malloc(sizeof(MQTTAsync_queuedCommand));
  2998. memset(unsub, '\0', sizeof(MQTTAsync_queuedCommand));
  2999. unsub->client = m;
  3000. unsub->command.type = UNSUBSCRIBE;
  3001. unsub->command.token = msgid;
  3002. if (response)
  3003. {
  3004. unsub->command.onSuccess = response->onSuccess;
  3005. unsub->command.onFailure = response->onFailure;
  3006. unsub->command.onSuccess5 = response->onSuccess5;
  3007. unsub->command.onFailure5 = response->onFailure5;
  3008. unsub->command.context = response->context;
  3009. response->token = unsub->command.token;
  3010. if (m->c->MQTTVersion >= MQTTVERSION_5)
  3011. unsub-> = MQTTProperties_copy(&response->properties);
  3012. }
  3013. unsub->command.details.unsub.count = count;
  3014. unsub->command.details.unsub.topics = malloc(sizeof(char*) * count);
  3015. for (i = 0; i < count; ++i)
  3016. unsub->command.details.unsub.topics[i] = MQTTStrdup(topic[i]);
  3017. rc = MQTTAsync_addCommand(unsub, sizeof(unsub));
  3018. exit:
  3019. FUNC_EXIT_RC(rc);
  3020. return rc;
  3021. }
  3022. int MQTTAsync_unsubscribe(MQTTAsync handle, const char* topic, MQTTAsync_responseOptions* response)
  3023. {
  3024. int rc = 0;
  3025. FUNC_ENTRY;
  3026. rc = MQTTAsync_unsubscribeMany(handle, 1, (char * const *)(&topic), response);
  3027. FUNC_EXIT_RC(rc);
  3028. return rc;
  3029. }
  3030. static int MQTTAsync_countBufferedMessages(MQTTAsyncs* m)
  3031. {
  3032. ListElement* current = NULL;
  3033. int count = 0;
  3034. while (ListNextElement(commands, &current))
  3035. {
  3036. MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(current->content);
  3037. if (cmd->client == m && cmd->command.type == PUBLISH)
  3038. count++;
  3039. }
  3040. return count;
  3041. }
  3042. int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen, const void* payload,
  3043. int qos, int retained, MQTTAsync_responseOptions* response)
  3044. {
  3045. int rc = MQTTASYNC_SUCCESS;
  3046. MQTTAsyncs* m = handle;
  3047. MQTTAsync_queuedCommand* pub;
  3048. int msgid = 0;
  3049. FUNC_ENTRY;
  3050. if (m == NULL || m->c == NULL)
  3052. else if (m->c->connected == 0 && (m->createOptions == NULL ||
  3053. m->createOptions->sendWhileDisconnected == 0 || m->shouldBeConnected == 0))
  3055. else if (!UTF8_validateString(destinationName))
  3057. else if (qos < 0 || qos > 2)
  3058. rc = MQTTASYNC_BAD_QOS;
  3059. else if (qos > 0 && (msgid = MQTTAsync_assignMsgId(m)) == 0)
  3061. else if (m->createOptions && (MQTTAsync_countBufferedMessages(m) >= m->createOptions->maxBufferedMessages))
  3063. else if (response)
  3064. {
  3065. if (m->c->MQTTVersion >= MQTTVERSION_5)
  3066. {
  3067. if (response->struct_version == 0 || response->onFailure || response->onSuccess)
  3069. }
  3070. else if (m->c->MQTTVersion < MQTTVERSION_5)
  3071. {
  3072. if (response->struct_version >= 1 && (response->onFailure5 || response->onSuccess5))
  3074. }
  3075. }
  3076. if (rc != MQTTASYNC_SUCCESS)
  3077. goto exit;
  3078. /* Add publish request to operation queue */
  3079. pub = malloc(sizeof(MQTTAsync_queuedCommand));
  3080. memset(pub, '\0', sizeof(MQTTAsync_queuedCommand));
  3081. pub->client = m;
  3082. pub->command.type = PUBLISH;
  3083. pub->command.token = msgid;
  3084. if (response)
  3085. {
  3086. pub->command.onSuccess = response->onSuccess;
  3087. pub->command.onFailure = response->onFailure;
  3088. pub->command.onSuccess5 = response->onSuccess5;
  3089. pub->command.onFailure5 = response->onFailure5;
  3090. pub->command.context = response->context;
  3091. response->token = pub->command.token;
  3092. if (m->c->MQTTVersion >= MQTTVERSION_5)
  3093. pub-> = MQTTProperties_copy(&response->properties);
  3094. }
  3095. pub-> = MQTTStrdup(destinationName);
  3096. pub-> = payloadlen;
  3097. pub-> = malloc(payloadlen);
  3098. memcpy(pub->, payload, payloadlen);
  3099. pub-> = qos;
  3100. pub-> = retained;
  3101. rc = MQTTAsync_addCommand(pub, sizeof(pub));
  3102. exit:
  3103. FUNC_EXIT_RC(rc);
  3104. return rc;
  3105. }
  3106. int MQTTAsync_sendMessage(MQTTAsync handle, const char* destinationName, const MQTTAsync_message* message,
  3107. MQTTAsync_responseOptions* response)
  3108. {
  3109. int rc = MQTTASYNC_SUCCESS;
  3110. MQTTAsyncs* m = handle;
  3111. FUNC_ENTRY;
  3112. if (message == NULL)
  3113. {
  3115. goto exit;
  3116. }
  3117. if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
  3118. (message->struct_version != 0 && message->struct_version != 1))
  3119. {
  3121. goto exit;
  3122. }
  3123. if (m->c->MQTTVersion >= MQTTVERSION_5 && response)
  3124. response->properties = message->properties;
  3125. rc = MQTTAsync_send(handle, destinationName, message->payloadlen, message->payload,
  3126. message->qos, message->retained, response);
  3127. exit:
  3128. FUNC_EXIT_RC(rc);
  3129. return rc;
  3130. }
  3131. static void MQTTAsync_retry(void)
  3132. {
  3133. static time_t last = 0L;
  3134. time_t now;
  3135. FUNC_ENTRY;
  3136. time(&(now));
  3137. if (difftime(now, last) > retryLoopInterval)
  3138. {
  3139. time(&(last));
  3140. MQTTProtocol_keepalive(now);
  3141. MQTTProtocol_retry(now, 1, 0);
  3142. }
  3143. else
  3144. MQTTProtocol_retry(now, 0, 0);
  3145. FUNC_EXIT;
  3146. }
  3147. static int MQTTAsync_connecting(MQTTAsyncs* m)
  3148. {
  3149. int rc = -1;
  3150. FUNC_ENTRY;
  3151. if (m->c->connect_state == TCP_IN_PROGRESS) /* TCP connect started - check for completion */
  3152. {
  3153. int error;
  3154. socklen_t len = sizeof(error);
  3155. if ((rc = getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len)) == 0)
  3156. rc = error;
  3157. if (rc != 0)
  3158. goto exit;
  3159. Socket_clearPendingWrite(m->c->net.socket);
  3160. #if defined(OPENSSL)
  3161. if (m->ssl)
  3162. {
  3163. int port;
  3164. size_t hostname_len;
  3165. int setSocketForSSLrc = 0;
  3166. hostname_len = MQTTProtocol_addressPort(m->serverURI, &port, NULL);
  3167. setSocketForSSLrc = SSLSocket_setSocketForSSL(&m->c->net, m->c->sslopts,
  3168. m->serverURI, hostname_len);
  3169. if (setSocketForSSLrc != MQTTASYNC_SUCCESS)
  3170. {
  3171. if (m->c->session != NULL)
  3172. if ((rc = SSL_set_session(m->c->net.ssl, m->c->session)) != 1)
  3173. Log(TRACE_MIN, -1, "Failed to set SSL session with stored data, non critical");
  3174. rc = m->c->sslopts->struct_version >= 3 ?
  3175. SSLSocket_connect(m->c->net.ssl, m->c->net.socket, m->serverURI,
  3176. m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
  3177. SSLSocket_connect(m->c->net.ssl, m->c->net.socket, m->serverURI,
  3178. m->c->sslopts->verify, NULL, NULL);
  3179. if (rc == TCPSOCKET_INTERRUPTED)
  3180. {
  3181. rc = MQTTCLIENT_SUCCESS; /* the connect is still in progress */
  3182. m->c->connect_state = SSL_IN_PROGRESS;
  3183. }
  3184. else if (rc == SSL_FATAL)
  3185. {
  3186. rc = SOCKET_ERROR;
  3187. goto exit;
  3188. }
  3189. else if (rc == 1)
  3190. {
  3191. if ( m->websocket )
  3192. {
  3193. m->c->connect_state = WEBSOCKET_IN_PROGRESS;
  3194. if ((rc = WebSocket_connect(&m->c->net, m->serverURI)) == SOCKET_ERROR )
  3195. goto exit;
  3196. }
  3197. else
  3198. {
  3200. m->c->connect_state = WAIT_FOR_CONNACK;
  3201. if (MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion,
  3202. m->connectProps, m->willProps) == SOCKET_ERROR)
  3203. {
  3204. rc = SOCKET_ERROR;
  3205. goto exit;
  3206. }
  3207. }
  3208. if (!m->c->cleansession && m->c->session == NULL)
  3209. m->c->session = SSL_get1_session(m->c->net.ssl);
  3210. }
  3211. }
  3212. else
  3213. {
  3214. rc = SOCKET_ERROR;
  3215. goto exit;
  3216. }
  3217. }
  3218. else
  3219. {
  3220. #endif
  3221. if ( m->websocket )
  3222. {
  3223. m->c->connect_state = WEBSOCKET_IN_PROGRESS;
  3224. if ((rc = WebSocket_connect(&m->c->net, m->serverURI)) == SOCKET_ERROR )
  3225. goto exit;
  3226. }
  3227. else
  3228. {
  3229. m->c->connect_state = WAIT_FOR_CONNACK; /* TCP/SSL connect completed, in which case send the MQTT connect packet */
  3230. if ((rc = MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion,
  3231. m->connectProps, m->willProps)) == SOCKET_ERROR)
  3232. goto exit;
  3233. }
  3234. #if defined(OPENSSL)
  3235. }
  3236. #endif
  3237. }
  3238. #if defined(OPENSSL)
  3239. else if (m->c->connect_state == SSL_IN_PROGRESS) /* SSL connect sent - wait for completion */
  3240. {
  3241. rc = m->c->sslopts->struct_version >= 3 ?
  3242. SSLSocket_connect(m->c->net.ssl, m->c->net.socket, m->serverURI,
  3243. m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
  3244. SSLSocket_connect(m->c->net.ssl, m->c->net.socket, m->serverURI,
  3245. m->c->sslopts->verify, NULL, NULL);
  3246. if (rc != 1)
  3247. goto exit;
  3248. if(!m->c->cleansession && m->c->session == NULL)
  3249. m->c->session = SSL_get1_session(m->c->net.ssl);
  3250. if ( m->websocket )
  3251. {
  3252. m->c->connect_state = WEBSOCKET_IN_PROGRESS;
  3253. if ((rc = WebSocket_connect(&m->c->net, m->serverURI)) == SOCKET_ERROR )
  3254. goto exit;
  3255. }
  3256. else
  3257. {
  3258. m->c->connect_state = WAIT_FOR_CONNACK; /* SSL connect completed, in which case send the MQTT connect packet */
  3259. if ((rc = MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion,
  3260. m->connectProps, m->willProps)) == SOCKET_ERROR)
  3261. goto exit;
  3262. }
  3263. }
  3264. #endif
  3265. else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS) /* Websocket connect sent - wait for completion */
  3266. {
  3267. if ((rc = WebSocket_upgrade( &m->c->net ) ) == SOCKET_ERROR )
  3268. goto exit;
  3269. else
  3270. {
  3271. m->c->connect_state = WAIT_FOR_CONNACK; /* Websocket upgrade completed, in which case send the MQTT connect packet */
  3272. if ((rc = MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion, m->connectProps, m->willProps)) == SOCKET_ERROR)
  3273. goto exit;
  3274. }
  3275. }
  3276. exit:
  3277. if ((rc != 0 && rc != TCPSOCKET_INTERRUPTED && (m->c->connect_state != SSL_IN_PROGRESS && m->c->connect_state != WEBSOCKET_IN_PROGRESS)) || (rc == SSL_FATAL))
  3278. nextOrClose(m, MQTTASYNC_FAILURE, "TCP/TLS connect failure");
  3279. FUNC_EXIT_RC(rc);
  3280. return rc;
  3281. }
  3282. static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
  3283. {
  3284. struct timeval tp = {0L, 0L};
  3285. MQTTPacket* pack = NULL;
  3286. FUNC_ENTRY;
  3287. if (timeout > 0L)
  3288. {
  3289. tp.tv_sec = timeout / 1000;
  3290. tp.tv_usec = (timeout % 1000) * 1000; /* this field is microseconds! */
  3291. }
  3292. #if defined(OPENSSL)
  3293. if ((*sock = SSLSocket_getPendingRead()) == -1)
  3294. {
  3295. #endif
  3296. /* 0 from getReadySocket indicates no work to do, -1 == error, but can happen normally */
  3297. *sock = Socket_getReadySocket(0, &tp,socket_mutex);
  3298. if (!tostop && *sock == 0 && (tp.tv_sec > 0L || tp.tv_usec > 0L))
  3299. MQTTAsync_sleep(100L);
  3300. #if defined(OPENSSL)
  3301. }
  3302. #endif
  3303. MQTTAsync_lock_mutex(mqttasync_mutex);
  3304. if (*sock > 0)
  3305. {
  3306. MQTTAsyncs* m = NULL;
  3307. if (ListFindItem(handles, sock, clientSockCompare) != NULL)
  3308. m = (MQTTAsync)(handles->current->content);
  3309. if (m != NULL)
  3310. {
  3311. Log(TRACE_MINIMUM, -1, "m->c->connect_state = %d", m->c->connect_state);
  3312. if (m->c->connect_state == TCP_IN_PROGRESS || m->c->connect_state == SSL_IN_PROGRESS || m->c->connect_state == WEBSOCKET_IN_PROGRESS)
  3313. *rc = MQTTAsync_connecting(m);
  3314. else
  3315. pack = MQTTPacket_Factory(m->c->MQTTVersion, &m->c->net, rc);
  3316. if (m->c->connect_state == WAIT_FOR_CONNACK && *rc == SOCKET_ERROR)
  3317. {
  3318. Log(TRACE_MINIMUM, -1, "CONNECT sent but MQTTPacket_Factory has returned SOCKET_ERROR");
  3319. nextOrClose(m, MQTTASYNC_FAILURE, "TCP connect completion failure");
  3320. }
  3321. }
  3322. if (pack)
  3323. {
  3324. int freed = 1;
  3325. /* Note that these handle... functions free the packet structure that they are dealing with */
  3326. if (pack->header.bits.type == PUBLISH)
  3327. *rc = MQTTProtocol_handlePublishes(pack, *sock);
  3328. else if (pack->header.bits.type == PUBACK || pack->header.bits.type == PUBCOMP ||
  3329. pack->header.bits.type == PUBREC)
  3330. {
  3331. int msgid = 0,
  3332. msgtype = 0,
  3333. ackrc = 0,
  3334. mqttversion = 0;
  3335. MQTTProperties msgprops = MQTTProperties_initializer;
  3336. /* This block is so that the ack variable is local and isn't accidentally reused */
  3337. {
  3338. static Ack ack;
  3339. ack = *(Ack*)pack;
  3340. /* these values are stored because the packet structure is freed in the handle functions */
  3341. msgid = ack.msgId;
  3342. msgtype = pack->header.bits.type;
  3343. if (ack.MQTTVersion >= MQTTVERSION_5)
  3344. {
  3345. ackrc = ack.rc;
  3346. msgprops = MQTTProperties_copy(&;
  3347. mqttversion = ack.MQTTVersion;
  3348. }
  3349. }
  3350. if (pack->header.bits.type == PUBCOMP)
  3351. *rc = MQTTProtocol_handlePubcomps(pack, *sock);
  3352. else if (pack->header.bits.type == PUBREC)
  3353. *rc = MQTTProtocol_handlePubrecs(pack, *sock);
  3354. else if (pack->header.bits.type == PUBACK)
  3355. *rc = MQTTProtocol_handlePubacks(pack, *sock);
  3356. if (!m)
  3357. Log(LOG_ERROR, -1, "PUBCOMP, PUBACK or PUBREC received for no client, msgid %d", msgid);
  3358. if (m && (msgtype != PUBREC || ackrc >= MQTTREASONCODE_UNSPECIFIED_ERROR))
  3359. {
  3360. ListElement* current = NULL;
  3361. if (m->dc)
  3362. {
  3363. Log(TRACE_MIN, -1, "Calling deliveryComplete for client %s, msgid %d", m->c->clientID, msgid);
  3364. (*(m->dc))(m->dcContext, msgid);
  3365. }
  3366. /* use the msgid to find the callback to be called */
  3367. while (ListNextElement(m->responses, &current))
  3368. {
  3369. MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(current->content);
  3370. if (command->command.token == msgid)
  3371. {
  3372. if (!ListDetach(m->responses, command)) /* then remove the response from the list */
  3373. Log(LOG_ERROR, -1, "Publish command not removed from command list");
  3374. if (command->command.onSuccess)
  3375. {
  3376. MQTTAsync_successData data;
  3377. data.token = command->command.token;
  3378. = command->;
  3379. = command->;
  3380. = command->;
  3381. = command->;
  3382. = command->;
  3383. Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
  3384. (*(command->command.onSuccess))(command->command.context, &data);
  3385. }
  3386. else if (command->command.onSuccess5 && ackrc < MQTTREASONCODE_UNSPECIFIED_ERROR)
  3387. {
  3388. MQTTAsync_successData5 data = MQTTAsync_successData5_initializer;
  3389. data.token = command->command.token;
  3390. = command->;
  3391. = command->;
  3392. = command->;
  3393. = command->;
  3394. = command->;
  3395. = command->;
  3396. Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
  3397. (*(command->command.onSuccess5))(command->command.context, &data);
  3398. }
  3399. else if (command->command.onFailure5 && ackrc >= MQTTREASONCODE_UNSPECIFIED_ERROR)
  3400. {
  3401. MQTTAsync_failureData5 data = MQTTAsync_failureData5_initializer;
  3402. data.token = command->command.token;
  3403. data.reasonCode = ackrc;
  3404. = msgprops;
  3405. data.packet_type = pack->header.bits.type;
  3406. Log(TRACE_MIN, -1, "Calling publish failure for client %s", m->c->clientID);
  3407. (*(command->command.onFailure5))(command->command.context, &data);
  3408. }
  3409. MQTTAsync_freeCommand(command);
  3410. break;
  3411. }
  3412. }
  3413. if (mqttversion >= MQTTVERSION_5)
  3414. MQTTProperties_free(&msgprops);
  3415. }
  3416. }
  3417. else if (pack->header.bits.type == PUBREL)
  3418. *rc = MQTTProtocol_handlePubrels(pack, *sock);
  3419. else if (pack->header.bits.type == PINGRESP)
  3420. *rc = MQTTProtocol_handlePingresps(pack, *sock);
  3421. else
  3422. freed = 0;
  3423. if (freed)
  3424. pack = NULL;
  3425. }
  3426. }
  3427. MQTTAsync_retry();
  3428. MQTTAsync_unlock_mutex(mqttasync_mutex);
  3429. FUNC_EXIT_RC(*rc);
  3430. return pack;
  3431. }
  3432. /*
  3433. static int pubCompare(void* a, void* b)
  3434. {
  3435. Messages* msg = (Messages*)a;
  3436. return msg->publish == (Publications*)b;
  3437. }*/
  3438. int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens)
  3439. {
  3440. int rc = MQTTASYNC_SUCCESS;
  3441. MQTTAsyncs* m = handle;
  3442. ListElement* current = NULL;
  3443. int count = 0;
  3444. FUNC_ENTRY;
  3445. MQTTAsync_lock_mutex(mqttasync_mutex);
  3446. *tokens = NULL;
  3447. if (m == NULL)
  3448. {
  3450. goto exit;
  3451. }
  3452. /* calculate the number of pending tokens - commands plus inflight */
  3453. while (ListNextElement(commands, &current))
  3454. {
  3455. MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(current->content);
  3456. if (cmd->client == m)
  3457. count++;
  3458. }
  3459. if (m->c)
  3460. count += m->c->outboundMsgs->count;
  3461. if (count == 0)
  3462. goto exit; /* no tokens to return */
  3463. *tokens = malloc(sizeof(MQTTAsync_token) * (count + 1)); /* add space for sentinel at end of list */
  3464. /* First add the unprocessed commands to the pending tokens */
  3465. current = NULL;
  3466. count = 0;
  3467. while (ListNextElement(commands, &current))
  3468. {
  3469. MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(current->content);
  3470. if (cmd->client == m)
  3471. (*tokens)[count++] = cmd->command.token;
  3472. }
  3473. /* Now add the inflight messages */
  3474. if (m->c && m->c->outboundMsgs->count > 0)
  3475. {
  3476. current = NULL;
  3477. while (ListNextElement(m->c->outboundMsgs, &current))
  3478. {
  3479. Messages* m = (Messages*)(current->content);
  3480. (*tokens)[count++] = m->msgid;
  3481. }
  3482. }
  3483. (*tokens)[count] = -1; /* indicate end of list */
  3484. exit:
  3485. MQTTAsync_unlock_mutex(mqttasync_mutex);
  3486. FUNC_EXIT_RC(rc);
  3487. return rc;
  3488. }
  3489. int MQTTAsync_isComplete(MQTTAsync handle, MQTTAsync_token dt)
  3490. {
  3491. int rc = MQTTASYNC_SUCCESS;
  3492. MQTTAsyncs* m = handle;
  3493. ListElement* current = NULL;
  3494. FUNC_ENTRY;
  3495. MQTTAsync_lock_mutex(mqttasync_mutex);
  3496. if (m == NULL)
  3497. {
  3499. goto exit;
  3500. }
  3501. /* First check unprocessed commands */
  3502. current = NULL;
  3503. while (ListNextElement(commands, &current))
  3504. {
  3505. MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(current->content);
  3506. if (cmd->client == m && cmd->command.token == dt)
  3507. goto exit;
  3508. }
  3509. /* Now check the inflight messages */
  3510. if (m->c && m->c->outboundMsgs->count > 0)
  3511. {
  3512. current = NULL;
  3513. while (ListNextElement(m->c->outboundMsgs, &current))
  3514. {
  3515. Messages* m = (Messages*)(current->content);
  3516. if (m->msgid == dt)
  3517. goto exit;
  3518. }
  3519. }
  3520. rc = MQTTASYNC_TRUE; /* Can't find it, so it must be complete */
  3521. exit:
  3522. MQTTAsync_unlock_mutex(mqttasync_mutex);
  3523. FUNC_EXIT_RC(rc);
  3524. return rc;
  3525. }
  3526. int MQTTAsync_waitForCompletion(MQTTAsync handle, MQTTAsync_token dt, unsigned long timeout)
  3527. {
  3528. int rc = MQTTASYNC_FAILURE;
  3529. START_TIME_TYPE start = MQTTAsync_start_clock();
  3530. unsigned long elapsed = 0L;
  3531. MQTTAsyncs* m = handle;
  3532. FUNC_ENTRY;
  3533. MQTTAsync_lock_mutex(mqttasync_mutex);
  3534. if (m == NULL || m->c == NULL)
  3535. {
  3536. MQTTAsync_unlock_mutex(mqttasync_mutex);
  3538. goto exit;
  3539. }
  3540. if (m->c->connected == 0)
  3541. {
  3542. MQTTAsync_unlock_mutex(mqttasync_mutex);
  3544. goto exit;
  3545. }
  3546. MQTTAsync_unlock_mutex(mqttasync_mutex);
  3547. if (MQTTAsync_isComplete(handle, dt) == 1)
  3548. {
  3549. rc = MQTTASYNC_SUCCESS; /* well we couldn't find it */
  3550. goto exit;
  3551. }
  3552. elapsed = MQTTAsync_elapsed(start);
  3553. while (elapsed < timeout)
  3554. {
  3555. MQTTAsync_sleep(100);
  3556. if (MQTTAsync_isComplete(handle, dt) == 1)
  3557. {
  3558. rc = MQTTASYNC_SUCCESS; /* well we couldn't find it */
  3559. goto exit;
  3560. }
  3561. elapsed = MQTTAsync_elapsed(start);
  3562. }
  3563. exit:
  3564. FUNC_EXIT_RC(rc);
  3565. return rc;
  3566. }
  3567. void MQTTAsync_setTraceLevel(enum MQTTASYNC_TRACE_LEVELS level)
  3568. {
  3569. Log_setTraceLevel((enum LOG_LEVELS)level);
  3570. }
  3571. void MQTTAsync_setTraceCallback(MQTTAsync_traceCallback* callback)
  3572. {
  3573. Log_setTraceCallback((Log_traceCallback*)callback);
  3574. }
  3575. MQTTAsync_nameValue* MQTTAsync_getVersionInfo(void)
  3576. {
  3577. #define MAX_INFO_STRINGS 8
  3578. static MQTTAsync_nameValue libinfo[MAX_INFO_STRINGS + 1];
  3579. int i = 0;
  3580. libinfo[i].name = "Product name";
  3581. libinfo[i++].value = "Eclipse Paho Asynchronous MQTT C Client Library";
  3582. libinfo[i].name = "Version";
  3583. libinfo[i++].value = CLIENT_VERSION;
  3584. libinfo[i].name = "Build level";
  3585. libinfo[i++].value = BUILD_TIMESTAMP;
  3586. #if defined(OPENSSL)
  3587. libinfo[i].name = "OpenSSL version";
  3588. libinfo[i++].value = SSLeay_version(SSLEAY_VERSION);
  3589. libinfo[i].name = "OpenSSL flags";
  3590. libinfo[i++].value = SSLeay_version(SSLEAY_CFLAGS);
  3591. libinfo[i].name = "OpenSSL build timestamp";
  3592. libinfo[i++].value = SSLeay_version(SSLEAY_BUILT_ON);
  3593. libinfo[i].name = "OpenSSL platform";
  3594. libinfo[i++].value = SSLeay_version(SSLEAY_PLATFORM);
  3595. libinfo[i].name = "OpenSSL directory";
  3596. libinfo[i++].value = SSLeay_version(SSLEAY_DIR);
  3597. #endif
  3598. libinfo[i].name = NULL;
  3599. libinfo[i].value = NULL;
  3600. return libinfo;
  3601. }
  3602. const char* MQTTAsync_strerror(int code)
  3603. {
  3604. static char buf[30];
  3605. switch (code) {
  3607. return "Success";
  3609. return "Failure";
  3611. return "Persistence error";
  3613. return "Disconnected";
  3615. return "Maximum in-flight messages amount reached";
  3617. return "Invalid UTF8 string";
  3619. return "Invalid (NULL) parameter";
  3621. return "Topic containing NULL characters has been truncated";
  3623. return "Bad structure";
  3624. case MQTTASYNC_BAD_QOS:
  3625. return "Invalid QoS value";
  3627. return "Too many pending commands";
  3629. return "Operation discarded before completion";
  3631. return "No more messages can be buffered";
  3633. return "SSL is not supported";
  3635. return "Invalid protocol scheme";
  3637. return "Options for wrong MQTT version";
  3639. return "Client created for another version of MQTT";
  3640. }
  3641. sprintf(buf, "Unknown error code %d", code);
  3642. return buf;
  3643. }