MQTTPersistence.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908
  1. /*******************************************************************************
  2. * Copyright (c) 2009, 2022 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial API and implementation and/or initial documentation
  15. * Ian Craggs - async client updates
  16. * Ian Craggs - fix for bug 432903 - queue persistence
  17. * Ian Craggs - MQTT V5 updates
  18. *******************************************************************************/
  19. /**
  20. * @file
  21. * \brief Functions that apply to persistence operations.
  22. *
  23. */
  24. #include <stdio.h>
  25. #include <string.h>
  26. #include "MQTTPersistence.h"
  27. #include "MQTTPersistenceDefault.h"
  28. #include "MQTTProtocolClient.h"
  29. #include "Heap.h"
  30. #if defined(_WIN32) || defined(_WIN64)
  31. #define snprintf _snprintf
  32. #endif
  33. static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, size_t buflen, int MQTTVersion);
  34. static void MQTTPersistence_insertInSeqOrder(List* list, MQTTPersistence_qEntry* qEntry, size_t size);
  35. /**
  36. * Creates a ::MQTTClient_persistence structure representing a persistence implementation.
  37. * @param persistence the ::MQTTClient_persistence structure.
  38. * @param type the type of the persistence implementation. See ::MQTTClient_create.
  39. * @param pcontext the context for this persistence implementation. See ::MQTTClient_create.
  40. * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
  41. */
  42. #include "StackTrace.h"
  43. int MQTTPersistence_create(MQTTClient_persistence** persistence, int type, void* pcontext)
  44. {
  45. int rc = 0;
  46. MQTTClient_persistence* per = NULL;
  47. FUNC_ENTRY;
  48. #if !defined(NO_PERSISTENCE)
  49. switch (type)
  50. {
  51. case MQTTCLIENT_PERSISTENCE_NONE :
  52. per = NULL;
  53. break;
  54. case MQTTCLIENT_PERSISTENCE_DEFAULT :
  55. per = malloc(sizeof(MQTTClient_persistence));
  56. if ( per != NULL )
  57. {
  58. if ( pcontext == NULL )
  59. pcontext = "."; /* working directory */
  60. if ((per->context = malloc(strlen(pcontext) + 1)) == NULL)
  61. {
  62. free(per);
  63. rc = PAHO_MEMORY_ERROR;
  64. goto exit;
  65. }
  66. strcpy(per->context, pcontext);
  67. /* file system functions */
  68. per->popen = pstopen;
  69. per->pclose = pstclose;
  70. per->pput = pstput;
  71. per->pget = pstget;
  72. per->premove = pstremove;
  73. per->pkeys = pstkeys;
  74. per->pclear = pstclear;
  75. per->pcontainskey = pstcontainskey;
  76. }
  77. else
  78. rc = PAHO_MEMORY_ERROR;
  79. break;
  80. case MQTTCLIENT_PERSISTENCE_USER :
  81. per = (MQTTClient_persistence *)pcontext;
  82. if ( per == NULL || (per != NULL && (per->context == NULL || per->pclear == NULL ||
  83. per->pclose == NULL || per->pcontainskey == NULL || per->pget == NULL || per->pkeys == NULL ||
  84. per->popen == NULL || per->pput == NULL || per->premove == NULL)) )
  85. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  86. break;
  87. default:
  88. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  89. break;
  90. }
  91. #endif
  92. *persistence = per;
  93. exit:
  94. FUNC_EXIT_RC(rc);
  95. return rc;
  96. }
  97. /**
  98. * Open persistent store and restore any persisted messages.
  99. * @param client the client as ::Clients.
  100. * @param serverURI the URI of the remote end.
  101. * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
  102. */
  103. int MQTTPersistence_initialize(Clients *c, const char *serverURI)
  104. {
  105. int rc = 0;
  106. FUNC_ENTRY;
  107. if ( c->persistence != NULL )
  108. {
  109. rc = c->persistence->popen(&(c->phandle), c->clientID, serverURI, c->persistence->context);
  110. if ( rc == 0 )
  111. rc = MQTTPersistence_restorePackets(c);
  112. }
  113. FUNC_EXIT_RC(rc);
  114. return rc;
  115. }
  116. /**
  117. * Close persistent store.
  118. * @param client the client as ::Clients.
  119. * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
  120. */
  121. int MQTTPersistence_close(Clients *c)
  122. {
  123. int rc = 0;
  124. FUNC_ENTRY;
  125. #if !defined(NO_PERSISTENCE)
  126. if (c->persistence != NULL)
  127. {
  128. rc = c->persistence->pclose(c->phandle);
  129. if (c->persistence->popen == pstopen) {
  130. if (c->persistence->context)
  131. free(c->persistence->context);
  132. free(c->persistence);
  133. }
  134. c->phandle = NULL;
  135. c->persistence = NULL;
  136. }
  137. #endif
  138. FUNC_EXIT_RC(rc);
  139. return rc;
  140. }
  141. /**
  142. * Clears the persistent store.
  143. * @param client the client as ::Clients.
  144. * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
  145. */
  146. int MQTTPersistence_clear(Clients *c)
  147. {
  148. int rc = 0;
  149. FUNC_ENTRY;
  150. if (c->persistence != NULL)
  151. rc = c->persistence->pclear(c->phandle);
  152. FUNC_EXIT_RC(rc);
  153. return rc;
  154. }
  155. /**
  156. * Restores the persisted records to the outbound and inbound message queues of the
  157. * client.
  158. * @param client the client as ::Clients.
  159. * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
  160. */
  161. int MQTTPersistence_restorePackets(Clients *c)
  162. {
  163. int rc = 0;
  164. char **msgkeys = NULL,
  165. *buffer = NULL;
  166. int nkeys, buflen;
  167. int i = 0;
  168. int msgs_sent = 0;
  169. int msgs_rcvd = 0;
  170. FUNC_ENTRY;
  171. if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
  172. {
  173. while (rc == 0 && i < nkeys)
  174. {
  175. if (strncmp(msgkeys[i], PERSISTENCE_COMMAND_KEY, strlen(PERSISTENCE_COMMAND_KEY)) == 0 ||
  176. strncmp(msgkeys[i], PERSISTENCE_V5_COMMAND_KEY, strlen(PERSISTENCE_V5_COMMAND_KEY)) == 0)
  177. {
  178. ;
  179. }
  180. else if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) == 0 ||
  181. strncmp(msgkeys[i], PERSISTENCE_V5_QUEUE_KEY, strlen(PERSISTENCE_V5_QUEUE_KEY)) == 0)
  182. {
  183. ;
  184. }
  185. else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0 &&
  186. (c->afterRead == NULL || (rc = c->afterRead(c->afterRead_context, &buffer, &buflen)) == 0))
  187. {
  188. int data_MQTTVersion = MQTTVERSION_3_1_1;
  189. char* cur_key = msgkeys[i];
  190. MQTTPacket* pack = NULL;
  191. if (strncmp(cur_key, PERSISTENCE_V5_PUBLISH_RECEIVED,
  192. strlen(PERSISTENCE_V5_PUBLISH_RECEIVED)) == 0)
  193. {
  194. data_MQTTVersion = MQTTVERSION_5;
  195. cur_key = PERSISTENCE_PUBLISH_RECEIVED;
  196. }
  197. else if (strncmp(cur_key, PERSISTENCE_V5_PUBLISH_SENT,
  198. strlen(PERSISTENCE_V5_PUBLISH_SENT)) == 0)
  199. {
  200. data_MQTTVersion = MQTTVERSION_5;
  201. cur_key = PERSISTENCE_PUBLISH_SENT;
  202. }
  203. else if (strncmp(cur_key, PERSISTENCE_V5_PUBREL,
  204. strlen(PERSISTENCE_V5_PUBREL)) == 0)
  205. {
  206. data_MQTTVersion = MQTTVERSION_5;
  207. cur_key = PERSISTENCE_PUBREL;
  208. }
  209. if (data_MQTTVersion == MQTTVERSION_5 && c->MQTTVersion < MQTTVERSION_5)
  210. {
  211. rc = MQTTCLIENT_PERSISTENCE_ERROR; /* can't restore version 5 data with a version 3 client */
  212. goto exit;
  213. }
  214. pack = MQTTPersistence_restorePacket(data_MQTTVersion, buffer, buflen);
  215. if ( pack != NULL )
  216. {
  217. if (strncmp(cur_key, PERSISTENCE_PUBLISH_RECEIVED,
  218. strlen(PERSISTENCE_PUBLISH_RECEIVED)) == 0)
  219. {
  220. Publish* publish = (Publish*)pack;
  221. Messages* msg = NULL;
  222. publish->MQTTVersion = c->MQTTVersion;
  223. msg = MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, publish->header.bits.retain, 1);
  224. msg->nextMessageType = PUBREL;
  225. /* order does not matter for persisted received messages */
  226. ListAppend(c->inboundMsgs, msg, msg->len);
  227. if (c->MQTTVersion >= MQTTVERSION_5)
  228. {
  229. free(msg->publish->payload);
  230. free(msg->publish->topic);
  231. msg->publish->payload = msg->publish->topic = NULL;
  232. }
  233. publish->topic = NULL;
  234. MQTTPacket_freePublish(publish);
  235. msgs_rcvd++;
  236. }
  237. else if (strncmp(cur_key, PERSISTENCE_PUBLISH_SENT,
  238. strlen(PERSISTENCE_PUBLISH_SENT)) == 0)
  239. {
  240. Publish* publish = (Publish*)pack;
  241. Messages* msg = NULL;
  242. const size_t keysize = PERSISTENCE_MAX_KEY_LENGTH + 1;
  243. char *key = malloc(keysize);
  244. int chars = 0;
  245. if (!key)
  246. {
  247. rc = PAHO_MEMORY_ERROR;
  248. goto exit;
  249. }
  250. publish->MQTTVersion = c->MQTTVersion;
  251. if (publish->MQTTVersion >= MQTTVERSION_5)
  252. chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBREL, publish->msgId);
  253. else
  254. chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBREL, publish->msgId);
  255. if (chars >= keysize)
  256. {
  257. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  258. Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
  259. }
  260. else
  261. {
  262. msg = MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, publish->header.bits.retain, 1);
  263. if (c->persistence->pcontainskey(c->phandle, key) == 0)
  264. /* PUBLISH Qo2 and PUBREL sent */
  265. msg->nextMessageType = PUBCOMP;
  266. /* else: PUBLISH QoS1, or PUBLISH QoS2 and PUBREL not sent */
  267. /* retry at the first opportunity */
  268. memset(&msg->lastTouch, '\0', sizeof(msg->lastTouch));
  269. MQTTPersistence_insertInOrder(c->outboundMsgs, msg, msg->len);
  270. publish->topic = NULL;
  271. MQTTPacket_freePublish(publish);
  272. msgs_sent++;
  273. }
  274. free(key);
  275. }
  276. else if (strncmp(cur_key, PERSISTENCE_PUBREL, strlen(PERSISTENCE_PUBREL)) == 0)
  277. {
  278. /* orphaned PUBRELs ? */
  279. Pubrel* pubrel = (Pubrel*)pack;
  280. const size_t keysize = PERSISTENCE_MAX_KEY_LENGTH + 1;
  281. char *key = malloc(keysize);
  282. int chars = 0;
  283. if (!key)
  284. {
  285. rc = PAHO_MEMORY_ERROR;
  286. goto exit;
  287. }
  288. pubrel->MQTTVersion = c->MQTTVersion;
  289. if (pubrel->MQTTVersion >= MQTTVERSION_5)
  290. chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBLISH_SENT, pubrel->msgId);
  291. else
  292. chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBLISH_SENT, pubrel->msgId);
  293. if (chars >= keysize)
  294. {
  295. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  296. Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
  297. }
  298. else if (c->persistence->pcontainskey(c->phandle, key) != 0)
  299. rc = c->persistence->premove(c->phandle, msgkeys[i]);
  300. free(pubrel);
  301. free(key);
  302. }
  303. }
  304. else /* pack == NULL -> bad persisted record */
  305. rc = c->persistence->premove(c->phandle, msgkeys[i]);
  306. }
  307. if (buffer)
  308. {
  309. free(buffer);
  310. buffer = NULL;
  311. }
  312. if (msgkeys[i])
  313. free(msgkeys[i]);
  314. i++;
  315. }
  316. if (msgkeys)
  317. free(msgkeys);
  318. }
  319. Log(TRACE_MINIMUM, -1, "%d sent messages and %d received messages restored for client %s\n",
  320. msgs_sent, msgs_rcvd, c->clientID);
  321. MQTTPersistence_wrapMsgID(c);
  322. exit:
  323. FUNC_EXIT_RC(rc);
  324. return rc;
  325. }
  326. /**
  327. * Returns a MQTT packet restored from persisted data.
  328. * @param buffer the persisted data.
  329. * @param buflen the number of bytes of the data buffer.
  330. */
  331. void* MQTTPersistence_restorePacket(int MQTTVersion, char* buffer, size_t buflen)
  332. {
  333. void* pack = NULL;
  334. Header header;
  335. int fixed_header_length = 1, ptype, remaining_length = 0;
  336. char c;
  337. int multiplier = 1;
  338. extern pf new_packets[];
  339. FUNC_ENTRY;
  340. header.byte = buffer[0];
  341. /* decode the message length according to the MQTT algorithm */
  342. do
  343. {
  344. c = *(++buffer);
  345. remaining_length += (c & 127) * multiplier;
  346. multiplier *= 128;
  347. fixed_header_length++;
  348. } while ((c & 128) != 0);
  349. if ( (fixed_header_length + remaining_length) == buflen )
  350. {
  351. ptype = header.bits.type;
  352. if (ptype >= CONNECT && ptype <= DISCONNECT && new_packets[ptype] != NULL)
  353. pack = (*new_packets[ptype])(MQTTVersion, header.byte, ++buffer, remaining_length);
  354. }
  355. FUNC_EXIT;
  356. return pack;
  357. }
  358. /**
  359. * Inserts the specified message into the list, maintaining message ID order.
  360. * @param list the list to insert the message into.
  361. * @param content the message to add.
  362. * @param size size of the message.
  363. */
  364. void MQTTPersistence_insertInOrder(List* list, void* content, size_t size)
  365. {
  366. ListElement* index = NULL;
  367. ListElement* current = NULL;
  368. FUNC_ENTRY;
  369. while(ListNextElement(list, &current) != NULL && index == NULL)
  370. {
  371. if ( ((Messages*)content)->msgid < ((Messages*)current->content)->msgid )
  372. index = current;
  373. }
  374. ListInsert(list, content, size, index);
  375. FUNC_EXIT;
  376. }
  377. /**
  378. * Adds a record to the persistent store. This function must not be called for QoS0
  379. * messages.
  380. * @param socket the socket of the client.
  381. * @param buf0 fixed header.
  382. * @param buf0len length of the fixed header.
  383. * @param count number of buffers representing the variable header and/or the payload.
  384. * @param buffers the buffers representing the variable header and/or the payload.
  385. * @param buflens length of the buffers representing the variable header and/or the payload.
  386. * @param htype MQTT packet type - PUBLISH or PUBREL
  387. * @param msgId the message ID.
  388. * @param scr 0 indicates message in the sending direction; 1 indicates message in the
  389. * receiving direction.
  390. * @param the MQTT version being used (>= MQTTVERSION_5 means properties included)
  391. * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
  392. */
  393. int MQTTPersistence_putPacket(SOCKET socket, char* buf0, size_t buf0len, int count,
  394. char** buffers, size_t* buflens, int htype, int msgId, int scr, int MQTTVersion)
  395. {
  396. int rc = 0;
  397. extern ClientStates* bstate;
  398. int nbufs, i;
  399. int* lens = NULL;
  400. char** bufs = NULL;
  401. char *key;
  402. Clients* client = NULL;
  403. FUNC_ENTRY;
  404. client = (Clients*)(ListFindItem(bstate->clients, &socket, clientSocketCompare)->content);
  405. if (client->persistence != NULL)
  406. {
  407. const size_t keysize = PERSISTENCE_MAX_KEY_LENGTH + 1;
  408. if ((key = malloc(keysize)) == NULL)
  409. {
  410. rc = PAHO_MEMORY_ERROR;
  411. goto exit;
  412. }
  413. nbufs = 1 + count;
  414. if ((lens = (int *)malloc(nbufs * sizeof(int))) == NULL)
  415. {
  416. free(key);
  417. rc = PAHO_MEMORY_ERROR;
  418. goto exit;
  419. }
  420. if ((bufs = (char **)malloc(nbufs * sizeof(char *))) == NULL)
  421. {
  422. free(key);
  423. free(lens);
  424. rc = PAHO_MEMORY_ERROR;
  425. goto exit;
  426. }
  427. lens[0] = (int)buf0len;
  428. bufs[0] = buf0;
  429. for (i = 0; i < count; i++)
  430. {
  431. lens[i+1] = (int)buflens[i];
  432. bufs[i+1] = buffers[i];
  433. }
  434. /* key */
  435. if (scr == 0)
  436. { /* sending */
  437. char* key_id = PERSISTENCE_PUBLISH_SENT;
  438. if (htype == PUBLISH) /* PUBLISH QoS1 and QoS2*/
  439. {
  440. if (MQTTVersion >= MQTTVERSION_5)
  441. key_id = PERSISTENCE_V5_PUBLISH_SENT;
  442. }
  443. else if (htype == PUBREL) /* PUBREL */
  444. {
  445. if (MQTTVersion >= MQTTVERSION_5)
  446. key_id = PERSISTENCE_V5_PUBREL;
  447. else
  448. key_id = PERSISTENCE_PUBREL;
  449. }
  450. if (snprintf(key, keysize, "%s%d", key_id, msgId) >= keysize)
  451. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  452. }
  453. else if (scr == 1) /* receiving PUBLISH QoS2 */
  454. {
  455. char* key_id = PERSISTENCE_PUBLISH_RECEIVED;
  456. if (MQTTVersion >= MQTTVERSION_5)
  457. key_id = PERSISTENCE_V5_PUBLISH_RECEIVED;
  458. if (snprintf(key, keysize, "%s%d", key_id, msgId) >= keysize)
  459. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  460. }
  461. if (rc == 0 && client->beforeWrite)
  462. rc = client->beforeWrite(client->beforeWrite_context, nbufs, bufs, lens);
  463. if (rc == 0)
  464. rc = client->persistence->pput(client->phandle, key, nbufs, bufs, lens);
  465. free(key);
  466. free(lens);
  467. free(bufs);
  468. }
  469. exit:
  470. FUNC_EXIT_RC(rc);
  471. return rc;
  472. }
  473. /**
  474. * Deletes a record from the persistent store.
  475. * @param client the client as ::Clients.
  476. * @param type the type of the persisted record: #PERSISTENCE_PUBLISH_SENT, #PERSISTENCE_PUBREL
  477. * or #PERSISTENCE_PUBLISH_RECEIVED.
  478. * @param qos the qos field of the message.
  479. * @param msgId the message ID.
  480. * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
  481. */
  482. int MQTTPersistence_remove(Clients* c, char *type, int qos, int msgId)
  483. {
  484. int rc = 0;
  485. FUNC_ENTRY;
  486. if (c->persistence != NULL)
  487. {
  488. const size_t keysize = PERSISTENCE_MAX_KEY_LENGTH + 1;
  489. char *key = malloc(keysize);
  490. int chars = 0;
  491. if (!key)
  492. {
  493. rc = PAHO_MEMORY_ERROR;
  494. goto exit;
  495. }
  496. if (strcmp(type, PERSISTENCE_PUBLISH_SENT) == 0 ||
  497. strcmp(type, PERSISTENCE_V5_PUBLISH_SENT) == 0)
  498. {
  499. if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBLISH_SENT, msgId)) >= keysize)
  500. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  501. else
  502. {
  503. rc = c->persistence->premove(c->phandle, key);
  504. if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBREL, msgId)) >= keysize)
  505. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  506. else
  507. {
  508. rc += c->persistence->premove(c->phandle, key);
  509. if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBLISH_SENT, msgId)) >= keysize)
  510. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  511. else
  512. {
  513. rc += c->persistence->premove(c->phandle, key);
  514. if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBREL, msgId)) >= keysize)
  515. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  516. else
  517. rc += c->persistence->premove(c->phandle, key);
  518. }
  519. }
  520. }
  521. }
  522. else /* PERSISTENCE_PUBLISH_SENT && qos == 1 */
  523. { /* or PERSISTENCE_PUBLISH_RECEIVED */
  524. if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBLISH_RECEIVED, msgId)) >= keysize)
  525. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  526. else
  527. {
  528. rc = c->persistence->premove(c->phandle, key);
  529. if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBLISH_RECEIVED, msgId)) >= keysize)
  530. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  531. else
  532. rc += c->persistence->premove(c->phandle, key);
  533. }
  534. }
  535. if (rc == MQTTCLIENT_PERSISTENCE_ERROR)
  536. Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
  537. free(key);
  538. }
  539. exit:
  540. FUNC_EXIT_RC(rc);
  541. return rc;
  542. }
  543. /**
  544. * Checks whether the message IDs wrapped by looking for the largest gap between two consecutive
  545. * message IDs in the outboundMsgs queue.
  546. * @param client the client as ::Clients.
  547. */
  548. void MQTTPersistence_wrapMsgID(Clients *client)
  549. {
  550. ListElement* wrapel = NULL;
  551. ListElement* current = NULL;
  552. FUNC_ENTRY;
  553. if ( client->outboundMsgs->count > 0 )
  554. {
  555. int firstMsgID = ((Messages*)client->outboundMsgs->first->content)->msgid;
  556. int lastMsgID = ((Messages*)client->outboundMsgs->last->content)->msgid;
  557. int gap = MAX_MSG_ID - lastMsgID + firstMsgID;
  558. current = ListNextElement(client->outboundMsgs, &current);
  559. while(ListNextElement(client->outboundMsgs, &current) != NULL)
  560. {
  561. int curMsgID = ((Messages*)current->content)->msgid;
  562. int curPrevMsgID = ((Messages*)current->prev->content)->msgid;
  563. int curgap = curMsgID - curPrevMsgID;
  564. if ( curgap > gap )
  565. {
  566. gap = curgap;
  567. wrapel = current;
  568. }
  569. }
  570. }
  571. if ( wrapel != NULL )
  572. {
  573. /* put wrapel at the beginning of the queue */
  574. client->outboundMsgs->first->prev = client->outboundMsgs->last;
  575. client->outboundMsgs->last->next = client->outboundMsgs->first;
  576. client->outboundMsgs->first = wrapel;
  577. client->outboundMsgs->last = wrapel->prev;
  578. client->outboundMsgs->first->prev = NULL;
  579. client->outboundMsgs->last->next = NULL;
  580. }
  581. FUNC_EXIT;
  582. }
  583. #if !defined(NO_PERSISTENCE)
  584. int MQTTPersistence_unpersistQueueEntry(Clients* client, MQTTPersistence_qEntry* qe)
  585. {
  586. int rc = 0;
  587. #if defined(_WIN32) || defined(_WIN64)
  588. #define KEYSIZE PERSISTENCE_MAX_KEY_LENGTH + 1
  589. #else
  590. const size_t KEYSIZE = PERSISTENCE_MAX_KEY_LENGTH + 1;
  591. #endif
  592. char key[KEYSIZE];
  593. int chars = 0;
  594. FUNC_ENTRY;
  595. if (client->MQTTVersion >= MQTTVERSION_5)
  596. chars = snprintf(key, KEYSIZE, "%s%u", PERSISTENCE_V5_QUEUE_KEY, qe->seqno);
  597. else
  598. chars = snprintf(key, KEYSIZE, "%s%u", PERSISTENCE_QUEUE_KEY, qe->seqno);
  599. if (chars >= KEYSIZE)
  600. {
  601. Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
  602. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  603. }
  604. else if ((rc = client->persistence->premove(client->phandle, key)) != 0)
  605. Log(LOG_ERROR, 0, "Error %d removing qEntry from persistence", rc);
  606. FUNC_EXIT_RC(rc);
  607. return rc;
  608. }
  609. #define MAX_NO_OF_BUFFERS 9
  610. int MQTTPersistence_persistQueueEntry(Clients* aclient, MQTTPersistence_qEntry* qe)
  611. {
  612. int rc = 0;
  613. int bufindex = 0;
  614. #if !defined(_WIN32) && !defined(_WIN64)
  615. const size_t KEYSIZE = PERSISTENCE_MAX_KEY_LENGTH + 1;
  616. #endif
  617. char key[KEYSIZE];
  618. int chars = 0;
  619. int lens[MAX_NO_OF_BUFFERS];
  620. void* bufs[MAX_NO_OF_BUFFERS];
  621. int props_allocated = 0;
  622. FUNC_ENTRY;
  623. bufs[bufindex] = &qe->msg->payloadlen;
  624. lens[bufindex++] = sizeof(qe->msg->payloadlen);
  625. bufs[bufindex] = qe->msg->payload;
  626. lens[bufindex++] = qe->msg->payloadlen;
  627. bufs[bufindex] = &qe->msg->qos;
  628. lens[bufindex++] = sizeof(qe->msg->qos);
  629. bufs[bufindex] = &qe->msg->retained;
  630. lens[bufindex++] = sizeof(qe->msg->retained);
  631. bufs[bufindex] = &qe->msg->dup;
  632. lens[bufindex++] = sizeof(qe->msg->dup);
  633. bufs[bufindex] = &qe->msg->msgid;
  634. lens[bufindex++] = sizeof(qe->msg->msgid);
  635. bufs[bufindex] = qe->topicName;
  636. lens[bufindex++] = (int)strlen(qe->topicName) + 1;
  637. bufs[bufindex] = &qe->topicLen;
  638. lens[bufindex++] = sizeof(qe->topicLen);
  639. if (++aclient->qentry_seqno == PERSISTENCE_SEQNO_LIMIT)
  640. aclient->qentry_seqno = 0;
  641. if (aclient->MQTTVersion >= MQTTVERSION_5) /* persist properties */
  642. {
  643. MQTTProperties no_props = MQTTProperties_initializer;
  644. MQTTProperties* props = &no_props;
  645. int temp_len = 0;
  646. char* ptr = NULL;
  647. if (qe->msg->struct_version >= 1)
  648. props = &qe->msg->properties;
  649. temp_len = MQTTProperties_len(props);
  650. ptr = bufs[bufindex] = malloc(temp_len);
  651. if (!ptr)
  652. {
  653. rc = PAHO_MEMORY_ERROR;
  654. goto exit;
  655. }
  656. props_allocated = bufindex;
  657. rc = MQTTProperties_write(&ptr, props);
  658. lens[bufindex++] = temp_len;
  659. chars = snprintf(key, KEYSIZE, "%s%u", PERSISTENCE_V5_QUEUE_KEY, aclient->qentry_seqno);
  660. }
  661. else
  662. chars = snprintf(key, KEYSIZE, "%s%u", PERSISTENCE_QUEUE_KEY, aclient->qentry_seqno);
  663. if (chars >= KEYSIZE)
  664. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  665. else
  666. {
  667. qe->seqno = aclient->qentry_seqno;
  668. if (aclient->beforeWrite)
  669. rc = aclient->beforeWrite(aclient->beforeWrite_context, bufindex, (char**)bufs, lens);
  670. if (rc == 0 && (rc = aclient->persistence->pput(aclient->phandle, key, bufindex, (char**)bufs, lens)) != 0)
  671. Log(LOG_ERROR, 0, "Error persisting queue entry, rc %d", rc);
  672. }
  673. if (props_allocated != 0)
  674. free(bufs[props_allocated]);
  675. exit:
  676. FUNC_EXIT_RC(rc);
  677. return rc;
  678. }
  679. static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, size_t buflen, int MQTTVersion)
  680. {
  681. MQTTPersistence_qEntry* qe = NULL;
  682. char* ptr = buffer;
  683. int data_size;
  684. FUNC_ENTRY;
  685. if ((qe = malloc(sizeof(MQTTPersistence_qEntry))) == NULL)
  686. goto exit;
  687. memset(qe, '\0', sizeof(MQTTPersistence_qEntry));
  688. if ((qe->msg = malloc(sizeof(MQTTPersistence_message))) == NULL)
  689. {
  690. free(qe);
  691. qe = NULL;
  692. goto exit;
  693. }
  694. memset(qe->msg, '\0', sizeof(MQTTPersistence_message));
  695. qe->msg->struct_version = 1;
  696. qe->msg->payloadlen = *(int*)ptr;
  697. ptr += sizeof(int);
  698. data_size = qe->msg->payloadlen;
  699. if ((qe->msg->payload = malloc(data_size)) == NULL)
  700. {
  701. free(qe->msg);
  702. free(qe);
  703. qe = NULL;
  704. goto exit;
  705. }
  706. memcpy(qe->msg->payload, ptr, data_size);
  707. ptr += data_size;
  708. qe->msg->qos = *(int*)ptr;
  709. ptr += sizeof(int);
  710. qe->msg->retained = *(int*)ptr;
  711. ptr += sizeof(int);
  712. qe->msg->dup = *(int*)ptr;
  713. ptr += sizeof(int);
  714. qe->msg->msgid = *(int*)ptr;
  715. ptr += sizeof(int);
  716. data_size = (int)strlen(ptr) + 1;
  717. if ((qe->topicName = malloc(data_size)) == NULL)
  718. {
  719. free(qe->msg->payload);
  720. free(qe->msg);
  721. free(qe);
  722. qe = NULL;
  723. goto exit;
  724. }
  725. strcpy(qe->topicName, ptr);
  726. ptr += data_size;
  727. qe->topicLen = *(int*)ptr;
  728. ptr += sizeof(int);
  729. if (MQTTVersion >= MQTTVERSION_5 &&
  730. MQTTProperties_read(&qe->msg->properties, &ptr, buffer + buflen) != 1)
  731. Log(LOG_ERROR, -1, "Error restoring properties from persistence");
  732. exit:
  733. FUNC_EXIT;
  734. return qe;
  735. }
  736. static void MQTTPersistence_insertInSeqOrder(List* list, MQTTPersistence_qEntry* qEntry, size_t size)
  737. {
  738. ListElement* index = NULL;
  739. ListElement* current = NULL;
  740. FUNC_ENTRY;
  741. while (ListNextElement(list, &current) != NULL && index == NULL)
  742. {
  743. if (qEntry->seqno < ((MQTTPersistence_qEntry*)current->content)->seqno)
  744. index = current;
  745. }
  746. ListInsert(list, qEntry, size, index);
  747. FUNC_EXIT;
  748. }
  749. /**
  750. * Restores a queue of messages from persistence to memory
  751. * @param c the client as ::Clients - the client object to restore the messages to
  752. * @return return code, 0 if successful
  753. */
  754. int MQTTPersistence_restoreMessageQueue(Clients* c)
  755. {
  756. int rc = 0;
  757. char **msgkeys;
  758. int nkeys;
  759. int i = 0;
  760. int entries_restored = 0;
  761. FUNC_ENTRY;
  762. if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
  763. {
  764. while (rc == 0 && i < nkeys)
  765. {
  766. char *buffer = NULL;
  767. int buflen;
  768. if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) != 0 &&
  769. strncmp(msgkeys[i], PERSISTENCE_V5_QUEUE_KEY, strlen(PERSISTENCE_V5_QUEUE_KEY)) != 0)
  770. {
  771. ; /* ignore if not a queue entry key */
  772. }
  773. else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0 &&
  774. (c->afterRead == NULL || (rc = c->afterRead(c->afterRead_context, &buffer, &buflen)) == 0))
  775. {
  776. int MQTTVersion =
  777. (strncmp(msgkeys[i], PERSISTENCE_V5_QUEUE_KEY, strlen(PERSISTENCE_V5_QUEUE_KEY)) == 0)
  778. ? MQTTVERSION_5 : MQTTVERSION_3_1_1;
  779. MQTTPersistence_qEntry* qe = MQTTPersistence_restoreQueueEntry(buffer, buflen, MQTTVersion);
  780. if (qe)
  781. {
  782. qe->seqno = atoi(strchr(msgkeys[i], '-')+1); /* key format is tag'-'seqno */
  783. MQTTPersistence_insertInSeqOrder(c->messageQueue, qe, sizeof(MQTTPersistence_qEntry));
  784. c->qentry_seqno = max(c->qentry_seqno, qe->seqno);
  785. entries_restored++;
  786. }
  787. if (buffer)
  788. free(buffer);
  789. }
  790. if (msgkeys[i])
  791. {
  792. free(msgkeys[i]);
  793. }
  794. i++;
  795. }
  796. if (msgkeys != NULL)
  797. free(msgkeys);
  798. }
  799. Log(TRACE_MINIMUM, -1, "%d queued messages restored for client %s", entries_restored, c->clientID);
  800. FUNC_EXIT_RC(rc);
  801. return rc;
  802. }
  803. #endif