MQTTAsync.c 47 KB


  1. /*******************************************************************************
  2. * Copyright (c) 2009, 2022 IBM Corp., Ian Craggs and others
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial implementation and documentation
  15. * Ian Craggs, Allan Stockdill-Mander - SSL support
  16. * Ian Craggs - multiple server connection support
  17. * Ian Craggs - fix for bug 413429 - connectionLost not called
  18. * Ian Craggs - fix for bug 415042 - using already freed structure
  19. * Ian Craggs - fix for bug 419233 - mutexes not reporting errors
  20. * Ian Craggs - fix for bug 420851
  21. * Ian Craggs - fix for bug 432903 - queue persistence
  22. * Ian Craggs - MQTT 3.1.1 support
  23. * Rong Xiang, Ian Craggs - C++ compatibility
  24. * Ian Craggs - fix for bug 442400: reconnecting after network cable unplugged
  25. * Ian Craggs - fix for bug 444934 - incorrect free in freeCommand1
  26. * Ian Craggs - fix for bug 445891 - assigning msgid is not thread safe
  27. * Ian Craggs - fix for bug 465369 - longer latency than expected
  28. * Ian Craggs - fix for bug 444103 - success/failure callbacks not invoked
  29. * Ian Craggs - fix for bug 484363 - segfault in getReadySocket
  30. * Ian Craggs - automatic reconnect and offline buffering (send while disconnected)
  31. * Ian Craggs - fix for bug 472250
  32. * Ian Craggs - fix for bug 486548
  33. * Ian Craggs - SNI support
  34. * Ian Craggs - auto reconnect timing fix #218
  35. * Ian Craggs - fix for issue #190
  36. * Ian Craggs - check for NULL SSL options #334
  37. * Ian Craggs - allocate username/password buffers #431
  38. * Ian Craggs - MQTT 5.0 support
  39. * Ian Craggs - refactor to reduce module size
  40. *******************************************************************************/
  41. #include <stdlib.h>
  42. #include <string.h>
  43. #if !defined(_WIN32) && !defined(_WIN64)
  44. #include <sys/time.h>
  45. #else
  46. #if defined(_MSC_VER) && _MSC_VER < 1900
  47. #define snprintf _snprintf
  48. #endif
  49. #endif
  50. #if !defined(NO_PERSISTENCE)
  51. #include "MQTTPersistence.h"
  52. #endif
  53. #include "MQTTAsync.h"
  54. #include "MQTTAsyncUtils.h"
  55. #include "utf-8.h"
  56. #include "MQTTProtocol.h"
  57. #include "MQTTProtocolOut.h"
  58. #include "Thread.h"
  59. #include "SocketBuffer.h"
  60. #include "StackTrace.h"
  61. #include "Heap.h"
  62. #include "OsWrapper.h"
  63. #include "WebSocket.h"
  64. static void MQTTAsync_freeServerURIs(MQTTAsyncs* m);
  65. #include "VersionInfo.h"
  66. const char *client_timestamp_eye = "MQTTAsyncV3_Timestamp " BUILD_TIMESTAMP;
  67. const char *client_version_eye = "MQTTAsyncV3_Version " CLIENT_VERSION;
  68. volatile int global_initialized = 0;
  69. List* MQTTAsync_handles = NULL;
  70. List* MQTTAsync_commands = NULL;
  71. int MQTTAsync_tostop = 0;
  72. static ClientStates ClientState =
  73. {
  74. CLIENT_VERSION, /* version */
  75. NULL /* client list */
  76. };
  77. MQTTProtocol state;
  78. ClientStates* bstate = &ClientState;
  79. enum MQTTAsync_threadStates sendThread_state = STOPPED;
  80. enum MQTTAsync_threadStates receiveThread_state = STOPPED;
  81. thread_id_type sendThread_id = 0,
  82. receiveThread_id = 0;
  83. // global objects init declaration
  84. int MQTTAsync_init(void);
  85. void MQTTAsync_global_init(MQTTAsync_init_options* inits)
  86. {
  87. MQTTAsync_init();
  88. #if defined(OPENSSL)
  89. SSLSocket_handleOpensslInit(inits->do_openssl_init);
  90. #endif
  91. }
  92. #if !defined(min)
  93. #define min(a, b) (((a) < (b)) ? (a) : (b))
  94. #endif
  95. #if defined(WIN32) || defined(WIN64)
  96. void MQTTAsync_init_rand(void)
  97. {
  98. START_TIME_TYPE now = MQTTTime_start_clock();
  99. srand((unsigned int)now);
  100. }
  101. #elif defined(AIX)
  102. void MQTTAsync_init_rand(void)
  103. {
  104. START_TIME_TYPE now = MQTTTime_start_clock();
  105. srand(now.tv_nsec);
  106. }
  107. #else
  108. void MQTTAsync_init_rand(void)
  109. {
  110. START_TIME_TYPE now = MQTTTime_start_clock();
  111. srand(now.tv_usec);
  112. }
  113. #endif
  114. #if defined(_WIN32) || defined(_WIN64)
  115. mutex_type mqttasync_mutex = NULL;
  116. mutex_type socket_mutex = NULL;
  117. mutex_type mqttcommand_mutex = NULL;
  118. sem_type send_sem = NULL;
  119. #if !defined(NO_HEAP_TRACKING)
  120. extern mutex_type stack_mutex;
  121. extern mutex_type heap_mutex;
  122. #endif
  123. extern mutex_type log_mutex;
  124. int MQTTAsync_init(void)
  125. {
  126. DWORD rc = 0;
  127. if (mqttasync_mutex == NULL)
  128. {
  129. if ((mqttasync_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
  130. {
  131. rc = GetLastError();
  132. printf("mqttasync_mutex error %d\n", rc);
  133. goto exit;
  134. }
  135. if ((mqttcommand_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
  136. {
  137. rc = GetLastError();
  138. printf("mqttcommand_mutex error %d\n", rc);
  139. goto exit;
  140. }
  141. if ((send_sem = CreateEvent(
  142. NULL, /* default security attributes */
  143. FALSE, /* manual-reset event? */
  144. FALSE, /* initial state is nonsignaled */
  145. NULL /* object name */
  146. )) == NULL)
  147. {
  148. rc = GetLastError();
  149. printf("send_sem error %d\n", rc);
  150. goto exit;
  151. }
  152. #if !defined(NO_HEAP_TRACKING)
  153. if ((stack_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
  154. {
  155. rc = GetLastError();
  156. printf("stack_mutex error %d\n", rc);
  157. goto exit;
  158. }
  159. if ((heap_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
  160. {
  161. rc = GetLastError();
  162. printf("heap_mutex error %d\n", rc);
  163. goto exit;
  164. }
  165. #endif
  166. if ((log_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
  167. {
  168. rc = GetLastError();
  169. printf("log_mutex error %d\n", rc);
  170. goto exit;
  171. }
  172. if ((socket_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
  173. {
  174. rc = GetLastError();
  175. printf("socket_mutex error %d\n", rc);
  176. goto exit;
  177. }
  178. }
  179. else
  180. {
  181. Log(TRACE_MAX, -1, "Library already initialized");
  182. }
  183. exit:
  184. return rc;
  185. }
  186. void MQTTAsync_cleanup(void)
  187. {
  188. if (send_sem)
  189. CloseHandle(send_sem);
  190. #if !defined(NO_HEAP_TRACKING)
  191. if (stack_mutex)
  192. CloseHandle(stack_mutex);
  193. if (heap_mutex)
  194. CloseHandle(heap_mutex);
  195. #endif
  196. if (log_mutex)
  197. CloseHandle(log_mutex);
  198. if (socket_mutex)
  199. CloseHandle(socket_mutex);
  200. if (mqttasync_mutex)
  201. CloseHandle(mqttasync_mutex);
  202. }
  203. #if defined(PAHO_MQTT_STATIC)
  204. static INIT_ONCE g_InitOnce = INIT_ONCE_STATIC_INIT; /* Global for one time initialization */
  205. /* This runs at most once */
  206. BOOL CALLBACK InitMutexesOnce (
  207. PINIT_ONCE InitOnce, /* Pointer to one-time initialization structure */
  208. PVOID Parameter, /* Optional parameter */
  209. PVOID *lpContext) /* Return data, if any */
  210. {
  211. int rc = MQTTAsync_init();
  212. return rc == 0;
  213. }
  214. #else
  215. BOOL APIENTRY DllMain(HANDLE hModule,
  216. DWORD ul_reason_for_call,
  217. LPVOID lpReserved)
  218. {
  219. switch (ul_reason_for_call)
  220. {
  221. case DLL_PROCESS_ATTACH:
  222. MQTTAsync_init();
  223. break;
  224. case DLL_THREAD_ATTACH:
  225. break;
  226. case DLL_THREAD_DETACH:
  227. break;
  228. case DLL_PROCESS_DETACH:
  229. if (lpReserved)
  230. MQTTAsync_cleanup();
  231. break;
  232. }
  233. return TRUE;
  234. }
  235. #endif
  236. #else
  237. static pthread_mutex_t mqttasync_mutex_store = PTHREAD_MUTEX_INITIALIZER;
  238. mutex_type mqttasync_mutex = &mqttasync_mutex_store;
  239. static pthread_mutex_t socket_mutex_store = PTHREAD_MUTEX_INITIALIZER;
  240. mutex_type socket_mutex = &socket_mutex_store;
  241. static pthread_mutex_t mqttcommand_mutex_store = PTHREAD_MUTEX_INITIALIZER;
  242. mutex_type mqttcommand_mutex = &mqttcommand_mutex_store;
  243. static cond_type_struct send_cond_store = { PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER };
  244. cond_type send_cond = &send_cond_store;
  245. int MQTTAsync_init(void)
  246. {
  247. pthread_mutexattr_t attr;
  248. int rc;
  249. pthread_mutexattr_init(&attr);
  250. #if !defined(_WRS_KERNEL)
  251. pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
  252. #else
  253. /* #warning "no pthread_mutexattr_settype" */
  254. #endif
  255. if ((rc = pthread_mutex_init(mqttasync_mutex, &attr)) != 0)
  256. printf("MQTTAsync: error %d initializing async_mutex\n", rc);
  257. else if ((rc = pthread_mutex_init(mqttcommand_mutex, &attr)) != 0)
  258. printf("MQTTAsync: error %d initializing command_mutex\n", rc);
  259. else if ((rc = pthread_mutex_init(socket_mutex, &attr)) != 0)
  260. printf("MQTTClient: error %d initializing socket_mutex\n", rc);
  261. else if ((rc = pthread_cond_init(&send_cond->cond, NULL)) != 0)
  262. printf("MQTTAsync: error %d initializing send_cond cond\n", rc);
  263. else if ((rc = pthread_mutex_init(&send_cond->mutex, &attr)) != 0)
  264. printf("MQTTAsync: error %d initializing send_cond mutex\n", rc);
  265. return rc;
  266. }
  267. #endif
  268. int MQTTAsync_createWithOptions(MQTTAsync* handle, const char* serverURI, const char* clientId,
  269. int persistence_type, void* persistence_context, MQTTAsync_createOptions* options)
  270. {
  271. int rc = 0;
  272. MQTTAsyncs *m = NULL;
  273. #if (defined(_WIN32) || defined(_WIN64)) && defined(PAHO_MQTT_STATIC)
  274. /* intializes mutexes once. Must come before FUNC_ENTRY */
  275. BOOL bStatus = InitOnceExecuteOnce(&g_InitOnce, InitMutexesOnce, NULL, NULL);
  276. #endif
  277. FUNC_ENTRY;
  278. MQTTAsync_lock_mutex(mqttasync_mutex);
  279. if (serverURI == NULL || clientId == NULL)
  280. {
  281. rc = MQTTASYNC_NULL_PARAMETER;
  282. goto exit;
  283. }
  284. if (!UTF8_validateString(clientId))
  285. {
  286. rc = MQTTASYNC_BAD_UTF8_STRING;
  287. goto exit;
  288. }
  289. if (strlen(clientId) == 0 && persistence_type == MQTTCLIENT_PERSISTENCE_DEFAULT)
  290. {
  291. rc = MQTTASYNC_PERSISTENCE_ERROR;
  292. goto exit;
  293. }
  294. if (strstr(serverURI, "://") != NULL)
  295. {
  296. if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) != 0
  297. && strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) != 0
  298. && strncmp(URI_WS, serverURI, strlen(URI_WS)) != 0
  299. #if defined(OPENSSL)
  300. && strncmp(URI_SSL, serverURI, strlen(URI_SSL)) != 0
  301. && strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) != 0
  302. && strncmp(URI_WSS, serverURI, strlen(URI_WSS)) != 0
  303. #endif
  304. )
  305. {
  306. rc = MQTTASYNC_BAD_PROTOCOL;
  307. goto exit;
  308. }
  309. }
  310. if (options && options->maxBufferedMessages <= 0)
  311. {
  312. rc = MQTTASYNC_MAX_BUFFERED;
  313. goto exit;
  314. }
  315. if (options && (strncmp(options->struct_id, "MQCO", 4) != 0 ||
  316. options->struct_version < 0 || options->struct_version > 2))
  317. {
  318. rc = MQTTASYNC_BAD_STRUCTURE;
  319. goto exit;
  320. }
  321. if (!global_initialized)
  322. {
  323. #if !defined(NO_HEAP_TRACKING)
  324. Heap_initialize();
  325. #endif
  326. Log_initialize((Log_nameValue*)MQTTAsync_getVersionInfo());
  327. bstate->clients = ListInitialize();
  328. Socket_outInitialize();
  329. Socket_setWriteContinueCallback(MQTTAsync_writeContinue);
  330. Socket_setWriteCompleteCallback(MQTTAsync_writeComplete);
  331. Socket_setWriteAvailableCallback(MQTTProtocol_writeAvailable);
  332. MQTTAsync_handles = ListInitialize();
  333. MQTTAsync_commands = ListInitialize();
  334. #if defined(OPENSSL)
  335. SSLSocket_initialize();
  336. #endif
  337. global_initialized = 1;
  338. }
  339. if ((m = malloc(sizeof(MQTTAsyncs))) == NULL)
  340. {
  341. rc = PAHO_MEMORY_ERROR;
  342. goto exit;
  343. }
  344. *handle = m;
  345. memset(m, '\0', sizeof(MQTTAsyncs));
  346. if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
  347. serverURI += strlen(URI_TCP);
  348. else if (strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) == 0)
  349. serverURI += strlen(URI_MQTT);
  350. else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
  351. {
  352. serverURI += strlen(URI_WS);
  353. m->websocket = 1;
  354. }
  355. #if defined(OPENSSL)
  356. else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
  357. {
  358. serverURI += strlen(URI_SSL);
  359. m->ssl = 1;
  360. }
  361. else if (strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) == 0)
  362. {
  363. serverURI += strlen(URI_MQTTS);
  364. m->ssl = 1;
  365. }
  366. else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
  367. {
  368. serverURI += strlen(URI_WSS);
  369. m->ssl = 1;
  370. m->websocket = 1;
  371. }
  372. #endif
  373. if ((m->serverURI = MQTTStrdup(serverURI)) == NULL)
  374. {
  375. rc = PAHO_MEMORY_ERROR;
  376. goto exit;
  377. }
  378. m->responses = ListInitialize();
  379. ListAppend(MQTTAsync_handles, m, sizeof(MQTTAsyncs));
  380. if ((m->c = malloc(sizeof(Clients))) == NULL)
  381. {
  382. rc = PAHO_MEMORY_ERROR;
  383. goto exit;
  384. }
  385. memset(m->c, '\0', sizeof(Clients));
  386. m->c->context = m;
  387. m->c->outboundMsgs = ListInitialize();
  388. m->c->inboundMsgs = ListInitialize();
  389. m->c->messageQueue = ListInitialize();
  390. m->c->outboundQueue = ListInitialize();
  391. m->c->clientID = MQTTStrdup(clientId);
  392. if (m->c->context == NULL || m->c->outboundMsgs == NULL || m->c->inboundMsgs == NULL ||
  393. m->c->messageQueue == NULL || m->c->outboundQueue == NULL || m->c->clientID == NULL)
  394. {
  395. rc = PAHO_MEMORY_ERROR;
  396. goto exit;
  397. }
  398. m->c->MQTTVersion = MQTTVERSION_DEFAULT;
  399. m->shouldBeConnected = 0;
  400. if (options)
  401. {
  402. if ((m->createOptions = malloc(sizeof(MQTTAsync_createOptions))) == NULL)
  403. {
  404. rc = PAHO_MEMORY_ERROR;
  405. goto exit;
  406. }
  407. memcpy(m->createOptions, options, sizeof(MQTTAsync_createOptions));
  408. if (options->struct_version > 0)
  409. m->c->MQTTVersion = options->MQTTVersion;
  410. }
  411. #if !defined(NO_PERSISTENCE)
  412. rc = MQTTPersistence_create(&(m->c->persistence), persistence_type, persistence_context);
  413. if (rc == 0)
  414. {
  415. rc = MQTTPersistence_initialize(m->c, m->serverURI); /* inflight messages restored here */
  416. if (rc == 0)
  417. {
  418. if (m->createOptions && m->createOptions->struct_version >= 2 && m->createOptions->restoreMessages == 0)
  419. MQTTAsync_unpersistCommandsAndMessages(m->c);
  420. else
  421. {
  422. MQTTAsync_restoreCommands(m);
  423. MQTTPersistence_restoreMessageQueue(m->c);
  424. }
  425. }
  426. }
  427. #endif
  428. ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List));
  429. exit:
  430. MQTTAsync_unlock_mutex(mqttasync_mutex);
  431. FUNC_EXIT_RC(rc);
  432. return rc;
  433. }
  434. int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId,
  435. int persistence_type, void* persistence_context)
  436. {
  437. MQTTAsync_init_rand();
  438. return MQTTAsync_createWithOptions(handle, serverURI, clientId, persistence_type,
  439. persistence_context, NULL);
  440. }
  441. void MQTTAsync_destroy(MQTTAsync* handle)
  442. {
  443. MQTTAsyncs* m = *handle;
  444. FUNC_ENTRY;
  445. MQTTAsync_lock_mutex(mqttasync_mutex);
  446. if (m == NULL)
  447. goto exit;
  448. MQTTAsync_closeSession(m->c, MQTTREASONCODE_SUCCESS, NULL);
  449. MQTTAsync_NULLPublishResponses(m);
  450. MQTTAsync_freeResponses(m);
  451. MQTTAsync_NULLPublishCommands(m);
  452. MQTTAsync_freeCommands(m);
  453. ListFree(m->responses);
  454. if (m->c)
  455. {
  456. SOCKET saved_socket = m->c->net.socket;
  457. char* saved_clientid = MQTTStrdup(m->c->clientID);
  458. #if !defined(NO_PERSISTENCE)
  459. MQTTPersistence_close(m->c);
  460. #endif
  461. MQTTAsync_emptyMessageQueue(m->c);
  462. MQTTProtocol_freeClient(m->c);
  463. if (!ListRemove(bstate->clients, m->c))
  464. Log(LOG_ERROR, 0, NULL);
  465. else
  466. Log(TRACE_MIN, 1, NULL, saved_clientid, saved_socket);
  467. free(saved_clientid);
  468. }
  469. if (m->serverURI)
  470. free(m->serverURI);
  471. if (m->createOptions)
  472. free(m->createOptions);
  473. MQTTAsync_freeServerURIs(m);
  474. if (m->connectProps)
  475. {
  476. MQTTProperties_free(m->connectProps);
  477. free(m->connectProps);
  478. m->connectProps = NULL;
  479. }
  480. if (m->willProps)
  481. {
  482. MQTTProperties_free(m->willProps);
  483. free(m->willProps);
  484. m->willProps = NULL;
  485. }
  486. if (!ListRemove(MQTTAsync_handles, m))
  487. Log(LOG_ERROR, -1, "free error");
  488. *handle = NULL;
  489. if (bstate->clients->count == 0)
  490. MQTTAsync_terminate();
  491. exit:
  492. MQTTAsync_unlock_mutex(mqttasync_mutex);
  493. FUNC_EXIT;
  494. }
  495. int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
  496. {
  497. MQTTAsyncs* m = handle;
  498. int rc = MQTTASYNC_SUCCESS;
  499. MQTTAsync_queuedCommand* conn;
  500. thread_id_type thread_id = 0;
  501. int locked = 0;
  502. FUNC_ENTRY;
  503. if (options == NULL)
  504. {
  505. rc = MQTTASYNC_NULL_PARAMETER;
  506. goto exit;
  507. }
  508. if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 8)
  509. {
  510. rc = MQTTASYNC_BAD_STRUCTURE;
  511. goto exit;
  512. }
  513. #if defined(OPENSSL)
  514. if (m->ssl && options->ssl == NULL)
  515. {
  516. rc = MQTTASYNC_NULL_PARAMETER;
  517. goto exit;
  518. }
  519. #endif
  520. if (options->will) /* check validity of will options structure */
  521. {
  522. if (strncmp(options->will->struct_id, "MQTW", 4) != 0 || (options->will->struct_version != 0 && options->will->struct_version != 1))
  523. {
  524. rc = MQTTASYNC_BAD_STRUCTURE;
  525. goto exit;
  526. }
  527. if (options->will->qos < 0 || options->will->qos > 2)
  528. {
  529. rc = MQTTASYNC_BAD_QOS;
  530. goto exit;
  531. }
  532. if (options->will->topicName == NULL)
  533. {
  534. rc = MQTTASYNC_NULL_PARAMETER;
  535. goto exit;
  536. } else if (strlen(options->will->topicName) == 0)
  537. {
  538. rc = MQTTASYNC_0_LEN_WILL_TOPIC;
  539. goto exit;
  540. }
  541. }
  542. if (options->struct_version != 0 && options->ssl) /* check validity of SSL options structure */
  543. {
  544. if (strncmp(options->ssl->struct_id, "MQTS", 4) != 0 || options->ssl->struct_version < 0 || options->ssl->struct_version > 5)
  545. {
  546. rc = MQTTASYNC_BAD_STRUCTURE;
  547. goto exit;
  548. }
  549. }
  550. if (options->MQTTVersion >= MQTTVERSION_5 && m->c->MQTTVersion < MQTTVERSION_5)
  551. {
  552. rc = MQTTASYNC_WRONG_MQTT_VERSION;
  553. goto exit;
  554. }
  555. if ((options->username && !UTF8_validateString(options->username)) ||
  556. (options->password && !UTF8_validateString(options->password)))
  557. {
  558. rc = MQTTASYNC_BAD_UTF8_STRING;
  559. goto exit;
  560. }
  561. if (options->MQTTVersion >= MQTTVERSION_5 && options->struct_version < 6)
  562. {
  563. rc = MQTTASYNC_BAD_STRUCTURE;
  564. goto exit;
  565. }
  566. if (options->MQTTVersion >= MQTTVERSION_5 && options->cleansession != 0)
  567. {
  568. rc = MQTTASYNC_BAD_MQTT_OPTION;
  569. goto exit;
  570. }
  571. if (options->MQTTVersion < MQTTVERSION_5 && options->struct_version >= 6)
  572. {
  573. if (options->cleanstart != 0 || options->onFailure5 || options->onSuccess5 ||
  574. options->connectProperties || options->willProperties)
  575. {
  576. rc = MQTTASYNC_BAD_MQTT_OPTION;
  577. goto exit;
  578. }
  579. }
  580. m->connect.onSuccess = options->onSuccess;
  581. m->connect.onFailure = options->onFailure;
  582. if (options->struct_version >= 6)
  583. {
  584. m->connect.onSuccess5 = options->onSuccess5;
  585. m->connect.onFailure5 = options->onFailure5;
  586. }
  587. m->connect.context = options->context;
  588. m->connectTimeout = options->connectTimeout;
  589. /* don't lock async mutex if we are being called from a callback */
  590. thread_id = Thread_getid();
  591. if (thread_id != sendThread_id && thread_id != receiveThread_id)
  592. {
  593. MQTTAsync_lock_mutex(mqttasync_mutex);
  594. locked = 1;
  595. }
  596. MQTTAsync_tostop = 0;
  597. if (sendThread_state != STARTING && sendThread_state != RUNNING)
  598. {
  599. sendThread_state = STARTING;
  600. Thread_start(MQTTAsync_sendThread, NULL);
  601. }
  602. if (receiveThread_state != STARTING && receiveThread_state != RUNNING)
  603. {
  604. receiveThread_state = STARTING;
  605. Thread_start(MQTTAsync_receiveThread, handle);
  606. }
  607. if (locked)
  608. MQTTAsync_unlock_mutex(mqttasync_mutex);
  609. m->c->keepAliveInterval = options->keepAliveInterval;
  610. setRetryLoopInterval(options->keepAliveInterval);
  611. m->c->cleansession = options->cleansession;
  612. m->c->maxInflightMessages = options->maxInflight;
  613. if (options->struct_version >= 3)
  614. m->c->MQTTVersion = options->MQTTVersion;
  615. else
  616. m->c->MQTTVersion = MQTTVERSION_DEFAULT;
  617. if (options->struct_version >= 4)
  618. {
  619. m->automaticReconnect = options->automaticReconnect;
  620. m->minRetryInterval = options->minRetryInterval;
  621. m->maxRetryInterval = options->maxRetryInterval;
  622. }
  623. if (options->struct_version >= 7)
  624. {
  625. m->c->net.httpHeaders = (const MQTTClient_nameValue *) options->httpHeaders;
  626. }
  627. if (options->struct_version >= 8)
  628. {
  629. if (options->httpProxy)
  630. m->c->httpProxy = MQTTStrdup(options->httpProxy);
  631. if (options->httpsProxy)
  632. m->c->httpsProxy = MQTTStrdup(options->httpsProxy);
  633. }
  634. if (m->c->will)
  635. {
  636. free(m->c->will->payload);
  637. free(m->c->will->topic);
  638. free(m->c->will);
  639. m->c->will = NULL;
  640. }
  641. if (options->will && (options->will->struct_version == 0 || options->will->struct_version == 1))
  642. {
  643. const void* source = NULL;
  644. if ((m->c->will = malloc(sizeof(willMessages))) == NULL)
  645. {
  646. rc = PAHO_MEMORY_ERROR;
  647. goto exit;
  648. }
  649. if (options->will->message || (options->will->struct_version == 1 && options->will->payload.data))
  650. {
  651. if (options->will->struct_version == 1 && options->will->payload.data)
  652. {
  653. m->c->will->payloadlen = options->will->payload.len;
  654. source = options->will->payload.data;
  655. }
  656. else
  657. {
  658. m->c->will->payloadlen = (int)strlen(options->will->message);
  659. source = (void*)options->will->message;
  660. }
  661. if ((m->c->will->payload = malloc(m->c->will->payloadlen)) == NULL)
  662. {
  663. rc = PAHO_MEMORY_ERROR;
  664. goto exit;
  665. }
  666. memcpy(m->c->will->payload, source, m->c->will->payloadlen);
  667. }
  668. else
  669. {
  670. m->c->will->payload = NULL;
  671. m->c->will->payloadlen = 0;
  672. }
  673. m->c->will->qos = options->will->qos;
  674. m->c->will->retained = options->will->retained;
  675. m->c->will->topic = MQTTStrdup(options->will->topicName);
  676. }
  677. #if defined(OPENSSL)
  678. if (m->c->sslopts)
  679. {
  680. if (m->c->sslopts->trustStore)
  681. free((void*)m->c->sslopts->trustStore);
  682. if (m->c->sslopts->keyStore)
  683. free((void*)m->c->sslopts->keyStore);
  684. if (m->c->sslopts->privateKey)
  685. free((void*)m->c->sslopts->privateKey);
  686. if (m->c->sslopts->privateKeyPassword)
  687. free((void*)m->c->sslopts->privateKeyPassword);
  688. if (m->c->sslopts->enabledCipherSuites)
  689. free((void*)m->c->sslopts->enabledCipherSuites);
  690. if (m->c->sslopts->struct_version >= 2)
  691. {
  692. if (m->c->sslopts->CApath)
  693. free((void*)m->c->sslopts->CApath);
  694. }
  695. free((void*)m->c->sslopts);
  696. m->c->sslopts = NULL;
  697. }
  698. if (options->struct_version != 0 && options->ssl)
  699. {
  700. if ((m->c->sslopts = malloc(sizeof(MQTTClient_SSLOptions))) == NULL)
  701. {
  702. rc = PAHO_MEMORY_ERROR;
  703. goto exit;
  704. }
  705. memset(m->c->sslopts, '\0', sizeof(MQTTClient_SSLOptions));
  706. m->c->sslopts->struct_version = options->ssl->struct_version;
  707. if (options->ssl->trustStore)
  708. m->c->sslopts->trustStore = MQTTStrdup(options->ssl->trustStore);
  709. if (options->ssl->keyStore)
  710. m->c->sslopts->keyStore = MQTTStrdup(options->ssl->keyStore);
  711. if (options->ssl->privateKey)
  712. m->c->sslopts->privateKey = MQTTStrdup(options->ssl->privateKey);
  713. if (options->ssl->privateKeyPassword)
  714. m->c->sslopts->privateKeyPassword = MQTTStrdup(options->ssl->privateKeyPassword);
  715. if (options->ssl->enabledCipherSuites)
  716. m->c->sslopts->enabledCipherSuites = MQTTStrdup(options->ssl->enabledCipherSuites);
  717. m->c->sslopts->enableServerCertAuth = options->ssl->enableServerCertAuth;
  718. if (m->c->sslopts->struct_version >= 1)
  719. m->c->sslopts->sslVersion = options->ssl->sslVersion;
  720. if (m->c->sslopts->struct_version >= 2)
  721. {
  722. m->c->sslopts->verify = options->ssl->verify;
  723. if (options->ssl->CApath)
  724. m->c->sslopts->CApath = MQTTStrdup(options->ssl->CApath);
  725. }
  726. if (m->c->sslopts->struct_version >= 3)
  727. {
  728. m->c->sslopts->ssl_error_cb = options->ssl->ssl_error_cb;
  729. m->c->sslopts->ssl_error_context = options->ssl->ssl_error_context;
  730. }
  731. if (m->c->sslopts->struct_version >= 4)
  732. {
  733. m->c->sslopts->ssl_psk_cb = options->ssl->ssl_psk_cb;
  734. m->c->sslopts->ssl_psk_context = options->ssl->ssl_psk_context;
  735. m->c->sslopts->disableDefaultTrustStore = options->ssl->disableDefaultTrustStore;
  736. }
  737. if (m->c->sslopts->struct_version >= 5)
  738. {
  739. if (options->ssl->protos)
  740. m->c->sslopts->protos = (const unsigned char*)MQTTStrdup((const char*)options->ssl->protos);
  741. m->c->sslopts->protos_len = options->ssl->protos_len;
  742. }
  743. }
  744. #else
  745. if (options->struct_version != 0 && options->ssl)
  746. {
  747. rc = MQTTASYNC_SSL_NOT_SUPPORTED;
  748. goto exit;
  749. }
  750. #endif
  751. if (m->c->username)
  752. {
  753. free((void*)m->c->username);
  754. m->c->username = NULL;
  755. }
  756. if (options->username)
  757. m->c->username = MQTTStrdup(options->username);
  758. if (m->c->password)
  759. {
  760. free((void*)m->c->password);
  761. m->c->password = NULL;
  762. }
  763. if (options->password)
  764. {
  765. m->c->password = MQTTStrdup(options->password);
  766. m->c->passwordlen = (int)strlen(options->password);
  767. }
  768. else if (options->struct_version >= 5 && options->binarypwd.data)
  769. {
  770. m->c->passwordlen = options->binarypwd.len;
  771. if ((m->c->password = malloc(m->c->passwordlen)) == NULL)
  772. {
  773. rc = PAHO_MEMORY_ERROR;
  774. goto exit;
  775. }
  776. memcpy((void*)m->c->password, options->binarypwd.data, m->c->passwordlen);
  777. }
  778. m->c->retryInterval = options->retryInterval;
  779. m->shouldBeConnected = 1;
  780. m->connectTimeout = options->connectTimeout;
  781. MQTTAsync_freeServerURIs(m);
  782. if (options->struct_version >= 2 && options->serverURIcount > 0)
  783. {
  784. int i;
  785. m->serverURIcount = options->serverURIcount;
  786. if ((m->serverURIs = malloc(options->serverURIcount * sizeof(char*))) == NULL)
  787. {
  788. rc = PAHO_MEMORY_ERROR;
  789. goto exit;
  790. }
  791. for (i = 0; i < options->serverURIcount; ++i)
  792. m->serverURIs[i] = MQTTStrdup(options->serverURIs[i]);
  793. }
  794. if (m->connectProps)
  795. {
  796. MQTTProperties_free(m->connectProps);
  797. free(m->connectProps);
  798. m->connectProps = NULL;
  799. }
  800. if (m->willProps)
  801. {
  802. MQTTProperties_free(m->willProps);
  803. free(m->willProps);
  804. m->willProps = NULL;
  805. }
  806. if (options->struct_version >=6)
  807. {
  808. if (options->connectProperties)
  809. {
  810. MQTTProperties initialized = MQTTProperties_initializer;
  811. if ((m->connectProps = malloc(sizeof(MQTTProperties))) == NULL)
  812. {
  813. rc = PAHO_MEMORY_ERROR;
  814. goto exit;
  815. }
  816. *m->connectProps = initialized;
  817. *m->connectProps = MQTTProperties_copy(options->connectProperties);
  818. if (MQTTProperties_hasProperty(options->connectProperties, MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL))
  819. m->c->sessionExpiry = MQTTProperties_getNumericValue(options->connectProperties,
  820. MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL);
  821. }
  822. if (options->willProperties)
  823. {
  824. MQTTProperties initialized = MQTTProperties_initializer;
  825. if ((m->willProps = malloc(sizeof(MQTTProperties))) == NULL)
  826. {
  827. rc = PAHO_MEMORY_ERROR;
  828. goto exit;
  829. }
  830. *m->willProps = initialized;
  831. *m->willProps = MQTTProperties_copy(options->willProperties);
  832. }
  833. m->c->cleanstart = options->cleanstart;
  834. }
  835. /* Add connect request to operation queue */
  836. if ((conn = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
  837. {
  838. rc = PAHO_MEMORY_ERROR;
  839. goto exit;
  840. }
  841. memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
  842. conn->client = m;
  843. if (options)
  844. {
  845. conn->command.onSuccess = options->onSuccess;
  846. conn->command.onFailure = options->onFailure;
  847. conn->command.onSuccess5 = options->onSuccess5;
  848. conn->command.onFailure5 = options->onFailure5;
  849. conn->command.context = options->context;
  850. }
  851. conn->command.type = CONNECT;
  852. conn->command.details.conn.currentURI = 0;
  853. rc = MQTTAsync_addCommand(conn, sizeof(conn));
  854. exit:
  855. FUNC_EXIT_RC(rc);
  856. return rc;
  857. }
  858. int MQTTAsync_reconnect(MQTTAsync handle)
  859. {
  860. int rc = MQTTASYNC_FAILURE;
  861. MQTTAsyncs* m = handle;
  862. FUNC_ENTRY;
  863. MQTTAsync_lock_mutex(mqttasync_mutex);
  864. if (m->automaticReconnect)
  865. {
  866. if (m->shouldBeConnected)
  867. {
  868. m->reconnectNow = 1;
  869. m->currentIntervalBase = m->minRetryInterval;
  870. m->currentInterval = m->minRetryInterval;
  871. m->retrying = 1;
  872. rc = MQTTASYNC_SUCCESS;
  873. }
  874. }
  875. else
  876. {
  877. /* to reconnect, put the connect command to the head of the command queue */
  878. MQTTAsync_queuedCommand* conn = malloc(sizeof(MQTTAsync_queuedCommand));
  879. if (!conn)
  880. {
  881. rc = PAHO_MEMORY_ERROR;
  882. goto exit;
  883. }
  884. memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
  885. conn->client = m;
  886. conn->command = m->connect;
  887. /* make sure that the version attempts are restarted */
  888. if (m->c->MQTTVersion == MQTTVERSION_DEFAULT)
  889. conn->command.details.conn.MQTTVersion = 0;
  890. rc = MQTTAsync_addCommand(conn, sizeof(m->connect));
  891. }
  892. exit:
  893. MQTTAsync_unlock_mutex(mqttasync_mutex);
  894. FUNC_EXIT_RC(rc);
  895. return rc;
  896. }
  897. int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, const int* qos, MQTTAsync_responseOptions* response)
  898. {
  899. MQTTAsyncs* m = handle;
  900. int i = 0;
  901. int rc = MQTTASYNC_SUCCESS;
  902. MQTTAsync_queuedCommand* sub;
  903. int msgid = 0;
  904. FUNC_ENTRY;
  905. if (m == NULL || m->c == NULL)
  906. rc = MQTTASYNC_FAILURE;
  907. else if (m->c->connected == 0)
  908. rc = MQTTASYNC_DISCONNECTED;
  909. else for (i = 0; i < count; i++)
  910. {
  911. if (!UTF8_validateString(topic[i]))
  912. {
  913. rc = MQTTASYNC_BAD_UTF8_STRING;
  914. break;
  915. }
  916. if (qos[i] < 0 || qos[i] > 2)
  917. {
  918. rc = MQTTASYNC_BAD_QOS;
  919. break;
  920. }
  921. }
  922. if (rc != MQTTASYNC_SUCCESS)
  923. ; /* don't overwrite a previous error code */
  924. else if ((msgid = MQTTAsync_assignMsgId(m)) == 0)
  925. rc = MQTTASYNC_NO_MORE_MSGIDS;
  926. else if (m->c->MQTTVersion >= MQTTVERSION_5 && count > 1 && (count != response->subscribeOptionsCount
  927. && response->subscribeOptionsCount != 0))
  928. rc = MQTTASYNC_BAD_MQTT_OPTION;
  929. else if (response)
  930. {
  931. if (m->c->MQTTVersion >= MQTTVERSION_5)
  932. {
  933. if (response->struct_version == 0 || response->onFailure || response->onSuccess)
  934. rc = MQTTASYNC_BAD_MQTT_OPTION;
  935. }
  936. else if (m->c->MQTTVersion < MQTTVERSION_5)
  937. {
  938. if (response->struct_version >= 1 && (response->onFailure5 || response->onSuccess5))
  939. rc = MQTTASYNC_BAD_MQTT_OPTION;
  940. }
  941. }
  942. if (rc != MQTTASYNC_SUCCESS)
  943. goto exit;
  944. /* Add subscribe request to operation queue */
  945. if ((sub = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
  946. {
  947. rc = PAHO_MEMORY_ERROR;
  948. goto exit;
  949. }
  950. memset(sub, '\0', sizeof(MQTTAsync_queuedCommand));
  951. sub->client = m;
  952. sub->command.token = msgid;
  953. if (response)
  954. {
  955. sub->command.onSuccess = response->onSuccess;
  956. sub->command.onFailure = response->onFailure;
  957. sub->command.onSuccess5 = response->onSuccess5;
  958. sub->command.onFailure5 = response->onFailure5;
  959. sub->command.context = response->context;
  960. response->token = sub->command.token;
  961. if (m->c->MQTTVersion >= MQTTVERSION_5)
  962. {
  963. sub->command.properties = MQTTProperties_copy(&response->properties);
  964. sub->command.details.sub.opts = response->subscribeOptions;
  965. if (count > 1)
  966. {
  967. if ((sub->command.details.sub.optlist = malloc(sizeof(MQTTSubscribe_options) * count)) == NULL)
  968. {
  969. rc = PAHO_MEMORY_ERROR;
  970. goto exit;
  971. }
  972. if (response->subscribeOptionsCount == 0)
  973. {
  974. MQTTSubscribe_options initialized = MQTTSubscribe_options_initializer;
  975. for (i = 0; i < count; ++i)
  976. sub->command.details.sub.optlist[i] = initialized;
  977. }
  978. else
  979. {
  980. for (i = 0; i < count; ++i)
  981. sub->command.details.sub.optlist[i] = response->subscribeOptionsList[i];
  982. }
  983. }
  984. }
  985. }
  986. sub->command.type = SUBSCRIBE;
  987. sub->command.details.sub.count = count;
  988. sub->command.details.sub.topics = malloc(sizeof(char*) * count);
  989. sub->command.details.sub.qoss = malloc(sizeof(int) * count);
  990. if (sub->command.details.sub.topics && sub->command.details.sub.qoss)
  991. {
  992. for (i = 0; i < count; ++i)
  993. {
  994. if ((sub->command.details.sub.topics[i] = MQTTStrdup(topic[i])) == NULL)
  995. {
  996. rc = PAHO_MEMORY_ERROR;
  997. goto exit;
  998. }
  999. sub->command.details.sub.qoss[i] = qos[i];
  1000. }
  1001. rc = MQTTAsync_addCommand(sub, sizeof(sub));
  1002. }
  1003. else
  1004. rc = PAHO_MEMORY_ERROR;
  1005. exit:
  1006. FUNC_EXIT_RC(rc);
  1007. return rc;
  1008. }
  1009. int MQTTAsync_subscribe(MQTTAsync handle, const char* topic, int qos, MQTTAsync_responseOptions* response)
  1010. {
  1011. int rc = 0;
  1012. FUNC_ENTRY;
  1013. rc = MQTTAsync_subscribeMany(handle, 1, (char * const *)(&topic), &qos, response);
  1014. FUNC_EXIT_RC(rc);
  1015. return rc;
  1016. }
  1017. int MQTTAsync_unsubscribeMany(MQTTAsync handle, int count, char* const* topic, MQTTAsync_responseOptions* response)
  1018. {
  1019. MQTTAsyncs* m = handle;
  1020. int i = 0;
  1021. int rc = MQTTASYNC_SUCCESS;
  1022. MQTTAsync_queuedCommand* unsub;
  1023. int msgid = 0;
  1024. FUNC_ENTRY;
  1025. if (m == NULL || m->c == NULL)
  1026. rc = MQTTASYNC_FAILURE;
  1027. else if (m->c->connected == 0)
  1028. rc = MQTTASYNC_DISCONNECTED;
  1029. else for (i = 0; i < count; i++)
  1030. {
  1031. if (!UTF8_validateString(topic[i]))
  1032. {
  1033. rc = MQTTASYNC_BAD_UTF8_STRING;
  1034. break;
  1035. }
  1036. }
  1037. if (rc != MQTTASYNC_SUCCESS)
  1038. ; /* don't overwrite a previous error code */
  1039. else if ((msgid = MQTTAsync_assignMsgId(m)) == 0)
  1040. rc = MQTTASYNC_NO_MORE_MSGIDS;
  1041. else if (response)
  1042. {
  1043. if (m->c->MQTTVersion >= MQTTVERSION_5)
  1044. {
  1045. if (response->struct_version == 0 || response->onFailure || response->onSuccess)
  1046. rc = MQTTASYNC_BAD_MQTT_OPTION;
  1047. }
  1048. else if (m->c->MQTTVersion < MQTTVERSION_5)
  1049. {
  1050. if (response->struct_version >= 1 && (response->onFailure5 || response->onSuccess5))
  1051. rc = MQTTASYNC_BAD_MQTT_OPTION;
  1052. }
  1053. }
  1054. if (rc != MQTTASYNC_SUCCESS)
  1055. goto exit;
  1056. /* Add unsubscribe request to operation queue */
  1057. if ((unsub = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
  1058. {
  1059. rc = PAHO_MEMORY_ERROR;
  1060. goto exit;
  1061. }
  1062. memset(unsub, '\0', sizeof(MQTTAsync_queuedCommand));
  1063. unsub->client = m;
  1064. unsub->command.type = UNSUBSCRIBE;
  1065. unsub->command.token = msgid;
  1066. if (response)
  1067. {
  1068. unsub->command.onSuccess = response->onSuccess;
  1069. unsub->command.onFailure = response->onFailure;
  1070. unsub->command.onSuccess5 = response->onSuccess5;
  1071. unsub->command.onFailure5 = response->onFailure5;
  1072. unsub->command.context = response->context;
  1073. response->token = unsub->command.token;
  1074. if (m->c->MQTTVersion >= MQTTVERSION_5)
  1075. unsub->command.properties = MQTTProperties_copy(&response->properties);
  1076. }
  1077. unsub->command.details.unsub.count = count;
  1078. if ((unsub->command.details.unsub.topics = malloc(sizeof(char*) * count)) == NULL)
  1079. {
  1080. rc = PAHO_MEMORY_ERROR;
  1081. goto exit;
  1082. }
  1083. for (i = 0; i < count; ++i)
  1084. unsub->command.details.unsub.topics[i] = MQTTStrdup(topic[i]);
  1085. rc = MQTTAsync_addCommand(unsub, sizeof(unsub));
  1086. exit:
  1087. FUNC_EXIT_RC(rc);
  1088. return rc;
  1089. }
  1090. int MQTTAsync_unsubscribe(MQTTAsync handle, const char* topic, MQTTAsync_responseOptions* response)
  1091. {
  1092. int rc = 0;
  1093. FUNC_ENTRY;
  1094. rc = MQTTAsync_unsubscribeMany(handle, 1, (char * const *)(&topic), response);
  1095. FUNC_EXIT_RC(rc);
  1096. return rc;
  1097. }
  1098. int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen, const void* payload,
  1099. int qos, int retained, MQTTAsync_responseOptions* response)
  1100. {
  1101. int rc = MQTTASYNC_SUCCESS;
  1102. MQTTAsyncs* m = handle;
  1103. MQTTAsync_queuedCommand* pub;
  1104. int msgid = 0;
  1105. FUNC_ENTRY;
  1106. if (m == NULL || m->c == NULL)
  1107. rc = MQTTASYNC_FAILURE;
  1108. else if (m->c->connected == 0)
  1109. {
  1110. if (m->createOptions == NULL)
  1111. rc = MQTTASYNC_DISCONNECTED;
  1112. else if (m->createOptions->sendWhileDisconnected == 0)
  1113. rc = MQTTASYNC_DISCONNECTED;
  1114. else if (m->shouldBeConnected == 0 && (m->createOptions->struct_version < 2 || m->createOptions->allowDisconnectedSendAtAnyTime == 0))
  1115. rc = MQTTASYNC_DISCONNECTED;
  1116. }
  1117. if (rc != MQTTASYNC_SUCCESS)
  1118. goto exit;
  1119. if (!UTF8_validateString(destinationName))
  1120. rc = MQTTASYNC_BAD_UTF8_STRING;
  1121. else if (qos < 0 || qos > 2)
  1122. rc = MQTTASYNC_BAD_QOS;
  1123. else if (qos > 0 && (msgid = MQTTAsync_assignMsgId(m)) == 0)
  1124. rc = MQTTASYNC_NO_MORE_MSGIDS;
  1125. else if (m->createOptions &&
  1126. (m->createOptions->struct_version < 2 || m->createOptions->deleteOldestMessages == 0) &&
  1127. (MQTTAsync_getNoBufferedMessages(m) >= m->createOptions->maxBufferedMessages))
  1128. rc = MQTTASYNC_MAX_BUFFERED_MESSAGES;
  1129. else if (response)
  1130. {
  1131. if (m->c->MQTTVersion >= MQTTVERSION_5)
  1132. {
  1133. if (response->struct_version == 0 || response->onFailure || response->onSuccess)
  1134. rc = MQTTASYNC_BAD_MQTT_OPTION;
  1135. }
  1136. else if (m->c->MQTTVersion < MQTTVERSION_5)
  1137. {
  1138. if (response->struct_version >= 1 && (response->onFailure5 || response->onSuccess5))
  1139. rc = MQTTASYNC_BAD_MQTT_OPTION;
  1140. }
  1141. }
  1142. if (rc != MQTTASYNC_SUCCESS)
  1143. goto exit;
  1144. /* Add publish request to operation queue */
  1145. if ((pub = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
  1146. {
  1147. rc = PAHO_MEMORY_ERROR;
  1148. goto exit;
  1149. }
  1150. memset(pub, '\0', sizeof(MQTTAsync_queuedCommand));
  1151. pub->client = m;
  1152. pub->command.type = PUBLISH;
  1153. pub->command.token = msgid;
  1154. if (response)
  1155. {
  1156. pub->command.onSuccess = response->onSuccess;
  1157. pub->command.onFailure = response->onFailure;
  1158. pub->command.onSuccess5 = response->onSuccess5;
  1159. pub->command.onFailure5 = response->onFailure5;
  1160. pub->command.context = response->context;
  1161. response->token = pub->command.token;
  1162. if (m->c->MQTTVersion >= MQTTVERSION_5)
  1163. pub->command.properties = MQTTProperties_copy(&response->properties);
  1164. }
  1165. if ((pub->command.details.pub.destinationName = MQTTStrdup(destinationName)) == NULL)
  1166. {
  1167. free(pub);
  1168. rc = PAHO_MEMORY_ERROR;
  1169. goto exit;
  1170. }
  1171. pub->command.details.pub.payloadlen = payloadlen;
  1172. if ((pub->command.details.pub.payload = malloc(payloadlen)) == NULL)
  1173. {
  1174. free(pub->command.details.pub.destinationName);
  1175. free(pub);
  1176. rc = PAHO_MEMORY_ERROR;
  1177. goto exit;
  1178. }
  1179. memcpy(pub->command.details.pub.payload, payload, payloadlen);
  1180. pub->command.details.pub.qos = qos;
  1181. pub->command.details.pub.retained = retained;
  1182. rc = MQTTAsync_addCommand(pub, sizeof(pub));
  1183. exit:
  1184. FUNC_EXIT_RC(rc);
  1185. return rc;
  1186. }
  1187. int MQTTAsync_sendMessage(MQTTAsync handle, const char* destinationName, const MQTTAsync_message* message,
  1188. MQTTAsync_responseOptions* response)
  1189. {
  1190. int rc = MQTTASYNC_SUCCESS;
  1191. MQTTAsyncs* m = handle;
  1192. FUNC_ENTRY;
  1193. if (message == NULL)
  1194. {
  1195. rc = MQTTASYNC_NULL_PARAMETER;
  1196. goto exit;
  1197. }
  1198. if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
  1199. (message->struct_version != 0 && message->struct_version != 1))
  1200. {
  1201. rc = MQTTASYNC_BAD_STRUCTURE;
  1202. goto exit;
  1203. }
  1204. if (m->c->MQTTVersion >= MQTTVERSION_5 && response)
  1205. response->properties = message->properties;
  1206. rc = MQTTAsync_send(handle, destinationName, message->payloadlen, message->payload,
  1207. message->qos, message->retained, response);
  1208. exit:
  1209. FUNC_EXIT_RC(rc);
  1210. return rc;
  1211. }
  1212. int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions* options)
  1213. {
  1214. if (options != NULL && (strncmp(options->struct_id, "MQTD", 4) != 0 || options->struct_version < 0 || options->struct_version > 1))
  1215. return MQTTASYNC_BAD_STRUCTURE;
  1216. else
  1217. return MQTTAsync_disconnect1(handle, options, 0);
  1218. }
  1219. int MQTTAsync_isConnected(MQTTAsync handle)
  1220. {
  1221. MQTTAsyncs* m = handle;
  1222. int rc = 0;
  1223. FUNC_ENTRY;
  1224. MQTTAsync_lock_mutex(mqttasync_mutex);
  1225. if (m && m->c)
  1226. rc = m->c->connected;
  1227. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1228. FUNC_EXIT_RC(rc);
  1229. return rc;
  1230. }
  1231. int MQTTAsync_isComplete(MQTTAsync handle, MQTTAsync_token dt)
  1232. {
  1233. int rc = MQTTASYNC_SUCCESS;
  1234. MQTTAsyncs* m = handle;
  1235. ListElement* current = NULL;
  1236. FUNC_ENTRY;
  1237. MQTTAsync_lock_mutex(mqttasync_mutex);
  1238. if (m == NULL)
  1239. {
  1240. rc = MQTTASYNC_FAILURE;
  1241. goto exit;
  1242. }
  1243. /* First check unprocessed commands */
  1244. current = NULL;
  1245. while (ListNextElement(MQTTAsync_commands, &current))
  1246. {
  1247. MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(current->content);
  1248. if (cmd->client == m && cmd->command.token == dt)
  1249. goto exit;
  1250. }
  1251. /* Now check the inflight messages */
  1252. if (m->c && m->c->outboundMsgs->count > 0)
  1253. {
  1254. current = NULL;
  1255. while (ListNextElement(m->c->outboundMsgs, &current))
  1256. {
  1257. Messages* m = (Messages*)(current->content);
  1258. if (m->msgid == dt)
  1259. goto exit;
  1260. }
  1261. }
  1262. rc = MQTTASYNC_TRUE; /* Can't find it, so it must be complete */
  1263. exit:
  1264. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1265. FUNC_EXIT_RC(rc);
  1266. return rc;
  1267. }
  1268. int MQTTAsync_waitForCompletion(MQTTAsync handle, MQTTAsync_token dt, unsigned long timeout)
  1269. {
  1270. int rc = MQTTASYNC_FAILURE;
  1271. START_TIME_TYPE start = MQTTTime_start_clock();
  1272. ELAPSED_TIME_TYPE elapsed = 0L;
  1273. MQTTAsyncs* m = handle;
  1274. FUNC_ENTRY;
  1275. MQTTAsync_lock_mutex(mqttasync_mutex);
  1276. if (m == NULL || m->c == NULL)
  1277. {
  1278. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1279. rc = MQTTASYNC_FAILURE;
  1280. goto exit;
  1281. }
  1282. if (m->c->connected == 0)
  1283. {
  1284. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1285. rc = MQTTASYNC_DISCONNECTED;
  1286. goto exit;
  1287. }
  1288. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1289. if (MQTTAsync_isComplete(handle, dt) == 1)
  1290. {
  1291. rc = MQTTASYNC_SUCCESS; /* well we couldn't find it */
  1292. goto exit;
  1293. }
  1294. elapsed = MQTTTime_elapsed(start);
  1295. while (elapsed < timeout && rc == MQTTASYNC_FAILURE)
  1296. {
  1297. MQTTTime_sleep(100);
  1298. if (MQTTAsync_isComplete(handle, dt) == 1)
  1299. rc = MQTTASYNC_SUCCESS; /* well we couldn't find it */
  1300. MQTTAsync_lock_mutex(mqttasync_mutex);
  1301. if (m->c->connected == 0)
  1302. rc = MQTTASYNC_DISCONNECTED;
  1303. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1304. elapsed = MQTTTime_elapsed(start);
  1305. }
  1306. exit:
  1307. FUNC_EXIT_RC(rc);
  1308. return rc;
  1309. }
  1310. int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens)
  1311. {
  1312. int rc = MQTTASYNC_SUCCESS;
  1313. MQTTAsyncs* m = handle;
  1314. ListElement* current = NULL;
  1315. int count = 0;
  1316. FUNC_ENTRY;
  1317. MQTTAsync_lock_mutex(mqttasync_mutex);
  1318. MQTTAsync_lock_mutex(mqttcommand_mutex);
  1319. *tokens = NULL;
  1320. if (m == NULL)
  1321. {
  1322. rc = MQTTASYNC_FAILURE;
  1323. goto exit;
  1324. }
  1325. /* calculate the number of pending tokens - commands plus inflight */
  1326. while (ListNextElement(MQTTAsync_commands, &current))
  1327. {
  1328. MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(current->content);
  1329. if (cmd->client == m && cmd->command.type == PUBLISH)
  1330. count++;
  1331. }
  1332. if (m->c)
  1333. count += m->c->outboundMsgs->count;
  1334. if (count == 0)
  1335. goto exit; /* no tokens to return */
  1336. *tokens = malloc(sizeof(MQTTAsync_token) * (count + 1)); /* add space for sentinel at end of list */
  1337. if (!*tokens)
  1338. {
  1339. rc = PAHO_MEMORY_ERROR;
  1340. goto exit;
  1341. }
  1342. /* First add the unprocessed commands to the pending tokens */
  1343. current = NULL;
  1344. count = 0;
  1345. while (ListNextElement(MQTTAsync_commands, &current))
  1346. {
  1347. MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(current->content);
  1348. if (cmd->client == m && cmd->command.type == PUBLISH)
  1349. (*tokens)[count++] = cmd->command.token;
  1350. }
  1351. /* Now add the inflight messages */
  1352. if (m->c && m->c->outboundMsgs->count > 0)
  1353. {
  1354. current = NULL;
  1355. while (ListNextElement(m->c->outboundMsgs, &current))
  1356. {
  1357. Messages* m = (Messages*)(current->content);
  1358. (*tokens)[count++] = m->msgid;
  1359. }
  1360. }
  1361. (*tokens)[count] = -1; /* indicate end of list */
  1362. exit:
  1363. MQTTAsync_unlock_mutex(mqttcommand_mutex);
  1364. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1365. FUNC_EXIT_RC(rc);
  1366. return rc;
  1367. }
  1368. int MQTTAsync_setCallbacks(MQTTAsync handle, void* context,
  1369. MQTTAsync_connectionLost* cl,
  1370. MQTTAsync_messageArrived* ma,
  1371. MQTTAsync_deliveryComplete* dc)
  1372. {
  1373. int rc = MQTTASYNC_SUCCESS;
  1374. MQTTAsyncs* m = handle;
  1375. FUNC_ENTRY;
  1376. MQTTAsync_lock_mutex(mqttasync_mutex);
  1377. if (m == NULL || ma == NULL || m->c == NULL || m->c->connect_state != NOT_IN_PROGRESS)
  1378. rc = MQTTASYNC_FAILURE;
  1379. else
  1380. {
  1381. m->clContext = m->maContext = m->dcContext = context;
  1382. m->cl = cl;
  1383. m->ma = ma;
  1384. m->dc = dc;
  1385. }
  1386. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1387. FUNC_EXIT_RC(rc);
  1388. return rc;
  1389. }
  1390. int MQTTAsync_setConnectionLostCallback(MQTTAsync handle, void* context,
  1391. MQTTAsync_connectionLost* cl)
  1392. {
  1393. int rc = MQTTASYNC_SUCCESS;
  1394. MQTTAsyncs* m = handle;
  1395. FUNC_ENTRY;
  1396. MQTTAsync_lock_mutex(mqttasync_mutex);
  1397. if (m == NULL || m->c->connect_state != 0)
  1398. rc = MQTTASYNC_FAILURE;
  1399. else
  1400. {
  1401. m->clContext = context;
  1402. m->cl = cl;
  1403. }
  1404. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1405. FUNC_EXIT_RC(rc);
  1406. return rc;
  1407. }
  1408. int MQTTAsync_setMessageArrivedCallback(MQTTAsync handle, void* context,
  1409. MQTTAsync_messageArrived* ma)
  1410. {
  1411. int rc = MQTTASYNC_SUCCESS;
  1412. MQTTAsyncs* m = handle;
  1413. FUNC_ENTRY;
  1414. MQTTAsync_lock_mutex(mqttasync_mutex);
  1415. if (m == NULL || ma == NULL || m->c->connect_state != 0)
  1416. rc = MQTTASYNC_FAILURE;
  1417. else
  1418. {
  1419. m->maContext = context;
  1420. m->ma = ma;
  1421. }
  1422. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1423. FUNC_EXIT_RC(rc);
  1424. return rc;
  1425. }
  1426. int MQTTAsync_setDeliveryCompleteCallback(MQTTAsync handle, void* context,
  1427. MQTTAsync_deliveryComplete* dc)
  1428. {
  1429. int rc = MQTTASYNC_SUCCESS;
  1430. MQTTAsyncs* m = handle;
  1431. FUNC_ENTRY;
  1432. MQTTAsync_lock_mutex(mqttasync_mutex);
  1433. if (m == NULL || m->c->connect_state != 0)
  1434. rc = MQTTASYNC_FAILURE;
  1435. else
  1436. {
  1437. m->dcContext = context;
  1438. m->dc = dc;
  1439. }
  1440. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1441. FUNC_EXIT_RC(rc);
  1442. return rc;
  1443. }
  1444. int MQTTAsync_setDisconnected(MQTTAsync handle, void* context, MQTTAsync_disconnected* disconnected)
  1445. {
  1446. int rc = MQTTASYNC_SUCCESS;
  1447. MQTTAsyncs* m = handle;
  1448. FUNC_ENTRY;
  1449. MQTTAsync_lock_mutex(mqttasync_mutex);
  1450. if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
  1451. rc = MQTTASYNC_FAILURE;
  1452. else
  1453. {
  1454. m->disconnected_context = context;
  1455. m->disconnected = disconnected;
  1456. }
  1457. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1458. FUNC_EXIT_RC(rc);
  1459. return rc;
  1460. }
  1461. int MQTTAsync_setConnected(MQTTAsync handle, void* context, MQTTAsync_connected* connected)
  1462. {
  1463. int rc = MQTTASYNC_SUCCESS;
  1464. MQTTAsyncs* m = handle;
  1465. FUNC_ENTRY;
  1466. MQTTAsync_lock_mutex(mqttasync_mutex);
  1467. if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
  1468. rc = MQTTASYNC_FAILURE;
  1469. else
  1470. {
  1471. m->connected_context = context;
  1472. m->connected = connected;
  1473. }
  1474. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1475. FUNC_EXIT_RC(rc);
  1476. return rc;
  1477. }
  1478. int MQTTAsync_setUpdateConnectOptions(MQTTAsync handle, void* context, MQTTAsync_updateConnectOptions* updateOptions)
  1479. {
  1480. int rc = MQTTASYNC_SUCCESS;
  1481. MQTTAsyncs* m = handle;
  1482. FUNC_ENTRY;
  1483. MQTTAsync_lock_mutex(mqttasync_mutex);
  1484. if (m == NULL)
  1485. rc = MQTTASYNC_FAILURE;
  1486. else
  1487. {
  1488. m->updateConnectOptions_context = context;
  1489. m->updateConnectOptions = updateOptions;
  1490. }
  1491. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1492. FUNC_EXIT_RC(rc);
  1493. return rc;
  1494. }
  1495. int MQTTAsync_setBeforePersistenceWrite(MQTTAsync handle, void* context, MQTTPersistence_beforeWrite* co)
  1496. {
  1497. int rc = MQTTASYNC_SUCCESS;
  1498. MQTTAsyncs* m = handle;
  1499. FUNC_ENTRY;
  1500. MQTTAsync_lock_mutex(mqttasync_mutex);
  1501. if (m == NULL)
  1502. rc = MQTTASYNC_FAILURE;
  1503. else
  1504. {
  1505. m->c->beforeWrite = co;
  1506. m->c->beforeWrite_context = context;
  1507. }
  1508. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1509. FUNC_EXIT_RC(rc);
  1510. return rc;
  1511. }
  1512. int MQTTAsync_setAfterPersistenceRead(MQTTAsync handle, void* context, MQTTPersistence_afterRead* co)
  1513. {
  1514. int rc = MQTTASYNC_SUCCESS;
  1515. MQTTAsyncs* m = handle;
  1516. FUNC_ENTRY;
  1517. MQTTAsync_lock_mutex(mqttasync_mutex);
  1518. if (m == NULL)
  1519. rc = MQTTASYNC_FAILURE;
  1520. else
  1521. {
  1522. m->c->afterRead = co;
  1523. m->c->afterRead_context = context;
  1524. }
  1525. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1526. FUNC_EXIT_RC(rc);
  1527. return rc;
  1528. }
  1529. void MQTTAsync_setTraceLevel(enum MQTTASYNC_TRACE_LEVELS level)
  1530. {
  1531. Log_setTraceLevel((enum LOG_LEVELS)level);
  1532. }
  1533. void MQTTAsync_setTraceCallback(MQTTAsync_traceCallback* callback)
  1534. {
  1535. Log_setTraceCallback((Log_traceCallback*)callback);
  1536. }
  1537. MQTTAsync_nameValue* MQTTAsync_getVersionInfo(void)
  1538. {
  1539. #define MAX_INFO_STRINGS 8
  1540. static MQTTAsync_nameValue libinfo[MAX_INFO_STRINGS + 1];
  1541. int i = 0;
  1542. libinfo[i].name = "Product name";
  1543. libinfo[i++].value = "Eclipse Paho Asynchronous MQTT C Client Library";
  1544. libinfo[i].name = "Version";
  1545. libinfo[i++].value = CLIENT_VERSION;
  1546. libinfo[i].name = "Build level";
  1547. libinfo[i++].value = BUILD_TIMESTAMP;
  1548. #if defined(OPENSSL)
  1549. libinfo[i].name = "OpenSSL version";
  1550. libinfo[i++].value = SSLeay_version(SSLEAY_VERSION);
  1551. libinfo[i].name = "OpenSSL flags";
  1552. libinfo[i++].value = SSLeay_version(SSLEAY_CFLAGS);
  1553. libinfo[i].name = "OpenSSL build timestamp";
  1554. libinfo[i++].value = SSLeay_version(SSLEAY_BUILT_ON);
  1555. libinfo[i].name = "OpenSSL platform";
  1556. libinfo[i++].value = SSLeay_version(SSLEAY_PLATFORM);
  1557. libinfo[i].name = "OpenSSL directory";
  1558. libinfo[i++].value = SSLeay_version(SSLEAY_DIR);
  1559. #endif
  1560. libinfo[i].name = NULL;
  1561. libinfo[i].value = NULL;
  1562. return libinfo;
  1563. }
  1564. const char* MQTTAsync_strerror(int code)
  1565. {
  1566. static char buf[30];
  1567. int chars = 0;
  1568. switch (code) {
  1569. case MQTTASYNC_SUCCESS:
  1570. return "Success";
  1571. case MQTTASYNC_FAILURE:
  1572. return "Failure";
  1573. case MQTTASYNC_PERSISTENCE_ERROR:
  1574. return "Persistence error";
  1575. case MQTTASYNC_DISCONNECTED:
  1576. return "Disconnected";
  1577. case MQTTASYNC_MAX_MESSAGES_INFLIGHT:
  1578. return "Maximum in-flight messages amount reached";
  1579. case MQTTASYNC_BAD_UTF8_STRING:
  1580. return "Invalid UTF8 string";
  1581. case MQTTASYNC_NULL_PARAMETER:
  1582. return "Invalid (NULL) parameter";
  1583. case MQTTASYNC_TOPICNAME_TRUNCATED:
  1584. return "Topic containing NULL characters has been truncated";
  1585. case MQTTASYNC_BAD_STRUCTURE:
  1586. return "Bad structure";
  1587. case MQTTASYNC_BAD_QOS:
  1588. return "Invalid QoS value";
  1589. case MQTTASYNC_NO_MORE_MSGIDS:
  1590. return "Too many pending commands";
  1591. case MQTTASYNC_OPERATION_INCOMPLETE:
  1592. return "Operation discarded before completion";
  1593. case MQTTASYNC_MAX_BUFFERED_MESSAGES:
  1594. return "No more messages can be buffered";
  1595. case MQTTASYNC_SSL_NOT_SUPPORTED:
  1596. return "SSL is not supported";
  1597. case MQTTASYNC_BAD_PROTOCOL:
  1598. return "Invalid protocol scheme";
  1599. case MQTTASYNC_BAD_MQTT_OPTION:
  1600. return "Options for wrong MQTT version";
  1601. case MQTTASYNC_WRONG_MQTT_VERSION:
  1602. return "Client created for another version of MQTT";
  1603. case MQTTASYNC_0_LEN_WILL_TOPIC:
  1604. return "Zero length will topic on connect";
  1605. case MQTTASYNC_COMMAND_IGNORED:
  1606. return "Connect or disconnect command ignored";
  1607. case MQTTASYNC_MAX_BUFFERED:
  1608. return "maxBufferedMessages in the connect options must be >= 0";
  1609. }
  1610. chars = snprintf(buf, sizeof(buf), "Unknown error code %d", code);
  1611. if (chars >= sizeof(buf))
  1612. {
  1613. buf[sizeof(buf)-1] = '\0';
  1614. Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
  1615. }
  1616. return buf;
  1617. }
  1618. void MQTTAsync_freeMessage(MQTTAsync_message** message)
  1619. {
  1620. FUNC_ENTRY;
  1621. MQTTProperties_free(&(*message)->properties);
  1622. free((*message)->payload);
  1623. free(*message);
  1624. *message = NULL;
  1625. FUNC_EXIT;
  1626. }
  1627. void MQTTAsync_free(void* memory)
  1628. {
  1629. FUNC_ENTRY;
  1630. free(memory);
  1631. FUNC_EXIT;
  1632. }
  1633. void* MQTTAsync_malloc(size_t size)
  1634. {
  1635. void* val;
  1636. int rc = 0;
  1637. FUNC_ENTRY;
  1638. val = malloc(size);
  1639. rc = (val != NULL);
  1640. FUNC_EXIT_RC(rc);
  1641. return val;
  1642. }
  1643. static void MQTTAsync_freeServerURIs(MQTTAsyncs* m)
  1644. {
  1645. int i;
  1646. for (i = 0; i < m->serverURIcount; ++i)
  1647. free(m->serverURIs[i]);
  1648. m->serverURIcount = 0;
  1649. if (m->serverURIs)
  1650. free(m->serverURIs);
  1651. m->serverURIs = NULL;
  1652. }