MQTTPersistence.c 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762
  1. /*******************************************************************************
  2. * Copyright (c) 2009, 2019 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * http://www.eclipse.org/legal/epl-v10.html
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial API and implementation and/or initial documentation
  15. * Ian Craggs - async client updates
  16. * Ian Craggs - fix for bug 432903 - queue persistence
  17. * Ian Craggs - MQTT V5 updates
  18. *******************************************************************************/
  19. /**
  20. * @file
  21. * \brief Functions that apply to persistence operations.
  22. *
  23. */
  24. #include <stdio.h>
  25. #include <string.h>
  26. #include "MQTTPersistence.h"
  27. #include "MQTTPersistenceDefault.h"
  28. #include "MQTTProtocolClient.h"
  29. #include "Heap.h"
  30. static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, size_t buflen, int MQTTVersion);
  31. static void MQTTPersistence_insertInSeqOrder(List* list, MQTTPersistence_qEntry* qEntry, size_t size);
  32. /**
  33. * Creates a ::MQTTClient_persistence structure representing a persistence implementation.
  34. * @param persistence the ::MQTTClient_persistence structure.
  35. * @param type the type of the persistence implementation. See ::MQTTClient_create.
  36. * @param pcontext the context for this persistence implementation. See ::MQTTClient_create.
  37. * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
  38. */
  39. #include "StackTrace.h"
  40. int MQTTPersistence_create(MQTTClient_persistence** persistence, int type, void* pcontext)
  41. {
  42. int rc = 0;
  43. MQTTClient_persistence* per = NULL;
  44. FUNC_ENTRY;
  45. #if !defined(NO_PERSISTENCE)
  46. switch (type)
  47. {
  48. case MQTTCLIENT_PERSISTENCE_NONE :
  49. per = NULL;
  50. break;
  51. case MQTTCLIENT_PERSISTENCE_DEFAULT :
  52. per = malloc(sizeof(MQTTClient_persistence));
  53. if ( per != NULL )
  54. {
  55. if ( pcontext != NULL )
  56. {
  57. per->context = malloc(strlen(pcontext) + 1);
  58. strcpy(per->context, pcontext);
  59. }
  60. else
  61. per->context = "."; /* working directory */
  62. /* file system functions */
  63. per->popen = pstopen;
  64. per->pclose = pstclose;
  65. per->pput = pstput;
  66. per->pget = pstget;
  67. per->premove = pstremove;
  68. per->pkeys = pstkeys;
  69. per->pclear = pstclear;
  70. per->pcontainskey = pstcontainskey;
  71. }
  72. else
  73. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  74. break;
  75. case MQTTCLIENT_PERSISTENCE_USER :
  76. per = (MQTTClient_persistence *)pcontext;
  77. if ( per == NULL || (per != NULL && (per->context == NULL || per->pclear == NULL ||
  78. per->pclose == NULL || per->pcontainskey == NULL || per->pget == NULL || per->pkeys == NULL ||
  79. per->popen == NULL || per->pput == NULL || per->premove == NULL)) )
  80. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  81. break;
  82. default:
  83. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  84. break;
  85. }
  86. #endif
  87. *persistence = per;
  88. FUNC_EXIT_RC(rc);
  89. return rc;
  90. }
  91. /**
  92. * Open persistent store and restore any persisted messages.
  93. * @param client the client as ::Clients.
  94. * @param serverURI the URI of the remote end.
  95. * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
  96. */
  97. int MQTTPersistence_initialize(Clients *c, const char *serverURI)
  98. {
  99. int rc = 0;
  100. FUNC_ENTRY;
  101. if ( c->persistence != NULL )
  102. {
  103. rc = c->persistence->popen(&(c->phandle), c->clientID, serverURI, c->persistence->context);
  104. if ( rc == 0 )
  105. rc = MQTTPersistence_restore(c);
  106. }
  107. FUNC_EXIT_RC(rc);
  108. return rc;
  109. }
  110. /**
  111. * Close persistent store.
  112. * @param client the client as ::Clients.
  113. * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
  114. */
  115. int MQTTPersistence_close(Clients *c)
  116. {
  117. int rc =0;
  118. FUNC_ENTRY;
  119. if (c->persistence != NULL)
  120. {
  121. rc = c->persistence->pclose(c->phandle);
  122. c->phandle = NULL;
  123. #if !defined(NO_PERSISTENCE)
  124. if ( c->persistence->popen == pstopen )
  125. free(c->persistence);
  126. #endif
  127. c->persistence = NULL;
  128. }
  129. FUNC_EXIT_RC(rc);
  130. return rc;
  131. }
  132. /**
  133. * Clears the persistent store.
  134. * @param client the client as ::Clients.
  135. * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
  136. */
  137. int MQTTPersistence_clear(Clients *c)
  138. {
  139. int rc = 0;
  140. FUNC_ENTRY;
  141. if (c->persistence != NULL)
  142. rc = c->persistence->pclear(c->phandle);
  143. FUNC_EXIT_RC(rc);
  144. return rc;
  145. }
  146. /**
  147. * Restores the persisted records to the outbound and inbound message queues of the
  148. * client.
  149. * @param client the client as ::Clients.
  150. * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
  151. */
  152. int MQTTPersistence_restore(Clients *c)
  153. {
  154. int rc = 0;
  155. char **msgkeys = NULL,
  156. *buffer = NULL;
  157. int nkeys, buflen;
  158. int i = 0;
  159. int msgs_sent = 0;
  160. int msgs_rcvd = 0;
  161. FUNC_ENTRY;
  162. if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
  163. {
  164. while (rc == 0 && i < nkeys)
  165. {
  166. if (strncmp(msgkeys[i], PERSISTENCE_COMMAND_KEY, strlen(PERSISTENCE_COMMAND_KEY)) == 0 ||
  167. strncmp(msgkeys[i], PERSISTENCE_V5_COMMAND_KEY, strlen(PERSISTENCE_V5_COMMAND_KEY)) == 0)
  168. {
  169. ;
  170. }
  171. else if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) == 0 ||
  172. strncmp(msgkeys[i], PERSISTENCE_V5_QUEUE_KEY, strlen(PERSISTENCE_V5_QUEUE_KEY)) == 0)
  173. {
  174. ;
  175. }
  176. else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0)
  177. {
  178. int data_MQTTVersion = MQTTVERSION_3_1_1;
  179. char* cur_key = msgkeys[i];
  180. MQTTPacket* pack = NULL;
  181. if ( strncmp(cur_key, PERSISTENCE_V5_PUBLISH_RECEIVED,
  182. strlen(PERSISTENCE_V5_PUBLISH_RECEIVED)) == 0)
  183. {
  184. data_MQTTVersion = MQTTVERSION_5;
  185. cur_key = PERSISTENCE_PUBLISH_RECEIVED;
  186. }
  187. else if (strncmp(cur_key, PERSISTENCE_V5_PUBLISH_SENT,
  188. strlen(PERSISTENCE_V5_PUBLISH_SENT)) == 0)
  189. {
  190. data_MQTTVersion = MQTTVERSION_5;
  191. cur_key = PERSISTENCE_PUBLISH_SENT;
  192. }
  193. else if (strncmp(cur_key, PERSISTENCE_V5_PUBREL,
  194. strlen(PERSISTENCE_V5_PUBREL)) == 0)
  195. {
  196. data_MQTTVersion = MQTTVERSION_5;
  197. cur_key = PERSISTENCE_PUBREL;
  198. }
  199. if (data_MQTTVersion == MQTTVERSION_5 && c->MQTTVersion < MQTTVERSION_5)
  200. {
  201. rc = MQTTCLIENT_PERSISTENCE_ERROR; /* can't restore version 5 data with a version 3 client */
  202. goto exit;
  203. }
  204. pack = MQTTPersistence_restorePacket(data_MQTTVersion, buffer, buflen);
  205. if ( pack != NULL )
  206. {
  207. if (strncmp(cur_key, PERSISTENCE_PUBLISH_RECEIVED,
  208. strlen(PERSISTENCE_PUBLISH_RECEIVED)) == 0)
  209. {
  210. Publish* publish = (Publish*)pack;
  211. Messages* msg = NULL;
  212. publish->MQTTVersion = c->MQTTVersion;
  213. msg = MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, publish->header.bits.retain);
  214. msg->nextMessageType = PUBREL;
  215. /* order does not matter for persisted received messages */
  216. ListAppend(c->inboundMsgs, msg, msg->len);
  217. if (c->MQTTVersion >= MQTTVERSION_5)
  218. {
  219. free(msg->publish->payload);
  220. free(msg->publish->topic);
  221. }
  222. publish->topic = NULL;
  223. MQTTPacket_freePublish(publish);
  224. msgs_rcvd++;
  225. }
  226. else if (strncmp(cur_key, PERSISTENCE_PUBLISH_SENT,
  227. strlen(PERSISTENCE_PUBLISH_SENT)) == 0)
  228. {
  229. Publish* publish = (Publish*)pack;
  230. Messages* msg = NULL;
  231. char *key = malloc(MESSAGE_FILENAME_LENGTH + 1);
  232. publish->MQTTVersion = c->MQTTVersion;
  233. if (publish->MQTTVersion >= MQTTVERSION_5)
  234. sprintf(key, "%s%d", PERSISTENCE_V5_PUBREL, publish->msgId);
  235. else
  236. sprintf(key, "%s%d", PERSISTENCE_PUBREL, publish->msgId);
  237. msg = MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, publish->header.bits.retain);
  238. if (c->persistence->pcontainskey(c->phandle, key) == 0)
  239. /* PUBLISH Qo2 and PUBREL sent */
  240. msg->nextMessageType = PUBCOMP;
  241. /* else: PUBLISH QoS1, or PUBLISH QoS2 and PUBREL not sent */
  242. /* retry at the first opportunity */
  243. msg->lastTouch = 0;
  244. MQTTPersistence_insertInOrder(c->outboundMsgs, msg, msg->len);
  245. publish->topic = NULL;
  246. MQTTPacket_freePublish(publish);
  247. free(key);
  248. msgs_sent++;
  249. }
  250. else if (strncmp(cur_key, PERSISTENCE_PUBREL, strlen(PERSISTENCE_PUBREL)) == 0)
  251. {
  252. /* orphaned PUBRELs ? */
  253. Pubrel* pubrel = (Pubrel*)pack;
  254. char *key = malloc(MESSAGE_FILENAME_LENGTH + 1);
  255. pubrel->MQTTVersion = c->MQTTVersion;
  256. if (pubrel->MQTTVersion >= MQTTVERSION_5)
  257. sprintf(key, "%s%d", PERSISTENCE_V5_PUBLISH_SENT, pubrel->msgId);
  258. else
  259. sprintf(key, "%s%d", PERSISTENCE_PUBLISH_SENT, pubrel->msgId);
  260. if (c->persistence->pcontainskey(c->phandle, key) != 0)
  261. rc = c->persistence->premove(c->phandle, msgkeys[i]);
  262. free(pubrel);
  263. free(key);
  264. }
  265. }
  266. else /* pack == NULL -> bad persisted record */
  267. rc = c->persistence->premove(c->phandle, msgkeys[i]);
  268. }
  269. if (buffer)
  270. {
  271. free(buffer);
  272. buffer = NULL;
  273. }
  274. if (msgkeys[i])
  275. free(msgkeys[i]);
  276. i++;
  277. }
  278. if (msgkeys)
  279. free(msgkeys);
  280. }
  281. Log(TRACE_MINIMUM, -1, "%d sent messages and %d received messages restored for client %s\n",
  282. msgs_sent, msgs_rcvd, c->clientID);
  283. MQTTPersistence_wrapMsgID(c);
  284. exit:
  285. FUNC_EXIT_RC(rc);
  286. return rc;
  287. }
  288. /**
  289. * Returns a MQTT packet restored from persisted data.
  290. * @param buffer the persisted data.
  291. * @param buflen the number of bytes of the data buffer.
  292. */
  293. void* MQTTPersistence_restorePacket(int MQTTVersion, char* buffer, size_t buflen)
  294. {
  295. void* pack = NULL;
  296. Header header;
  297. int fixed_header_length = 1, ptype, remaining_length = 0;
  298. char c;
  299. int multiplier = 1;
  300. extern pf new_packets[];
  301. FUNC_ENTRY;
  302. header.byte = buffer[0];
  303. /* decode the message length according to the MQTT algorithm */
  304. do
  305. {
  306. c = *(++buffer);
  307. remaining_length += (c & 127) * multiplier;
  308. multiplier *= 128;
  309. fixed_header_length++;
  310. } while ((c & 128) != 0);
  311. if ( (fixed_header_length + remaining_length) == buflen )
  312. {
  313. ptype = header.bits.type;
  314. if (ptype >= CONNECT && ptype <= DISCONNECT && new_packets[ptype] != NULL)
  315. pack = (*new_packets[ptype])(MQTTVersion, header.byte, ++buffer, remaining_length);
  316. }
  317. FUNC_EXIT;
  318. return pack;
  319. }
  320. /**
  321. * Inserts the specified message into the list, maintaining message ID order.
  322. * @param list the list to insert the message into.
  323. * @param content the message to add.
  324. * @param size size of the message.
  325. */
  326. void MQTTPersistence_insertInOrder(List* list, void* content, size_t size)
  327. {
  328. ListElement* index = NULL;
  329. ListElement* current = NULL;
  330. FUNC_ENTRY;
  331. while(ListNextElement(list, &current) != NULL && index == NULL)
  332. {
  333. if ( ((Messages*)content)->msgid < ((Messages*)current->content)->msgid )
  334. index = current;
  335. }
  336. ListInsert(list, content, size, index);
  337. FUNC_EXIT;
  338. }
  339. /**
  340. * Adds a record to the persistent store. This function must not be called for QoS0
  341. * messages.
  342. * @param socket the socket of the client.
  343. * @param buf0 fixed header.
  344. * @param buf0len length of the fixed header.
  345. * @param count number of buffers representing the variable header and/or the payload.
  346. * @param buffers the buffers representing the variable header and/or the payload.
  347. * @param buflens length of the buffers representing the variable header and/or the payload.
  348. * @param htype MQTT packet type - PUBLISH or PUBREL
  349. * @param msgId the message ID.
  350. * @param scr 0 indicates message in the sending direction; 1 indicates message in the
  351. * receiving direction.
  352. * @param the MQTT version being used (>= MQTTVERSION_5 means properties included)
  353. * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
  354. */
  355. int MQTTPersistence_put(int socket, char* buf0, size_t buf0len, int count,
  356. char** buffers, size_t* buflens, int htype, int msgId, int scr, int MQTTVersion)
  357. {
  358. int rc = 0;
  359. extern ClientStates* bstate;
  360. int nbufs, i;
  361. int* lens = NULL;
  362. char** bufs = NULL;
  363. char *key;
  364. Clients* client = NULL;
  365. FUNC_ENTRY;
  366. client = (Clients*)(ListFindItem(bstate->clients, &socket, clientSocketCompare)->content);
  367. if (client->persistence != NULL)
  368. {
  369. key = malloc(MESSAGE_FILENAME_LENGTH + 1);
  370. nbufs = 1 + count;
  371. lens = (int *)malloc(nbufs * sizeof(int));
  372. bufs = (char **)malloc(nbufs * sizeof(char *));
  373. lens[0] = (int)buf0len;
  374. bufs[0] = buf0;
  375. for (i = 0; i < count; i++)
  376. {
  377. lens[i+1] = (int)buflens[i];
  378. bufs[i+1] = buffers[i];
  379. }
  380. /* key */
  381. if (scr == 0)
  382. { /* sending */
  383. char* key_id = PERSISTENCE_PUBLISH_SENT;
  384. if (htype == PUBLISH) /* PUBLISH QoS1 and QoS2*/
  385. {
  386. if (MQTTVersion >= MQTTVERSION_5)
  387. key_id = PERSISTENCE_V5_PUBLISH_SENT;
  388. }
  389. else if (htype == PUBREL) /* PUBREL */
  390. {
  391. if (MQTTVersion >= MQTTVERSION_5)
  392. key_id = PERSISTENCE_V5_PUBREL;
  393. else
  394. key_id = PERSISTENCE_PUBREL;
  395. }
  396. sprintf(key, "%s%d", key_id, msgId);
  397. }
  398. else if (scr == 1) /* receiving PUBLISH QoS2 */
  399. {
  400. char* key_id = PERSISTENCE_PUBLISH_RECEIVED;
  401. if (MQTTVersion >= MQTTVERSION_5)
  402. key_id = PERSISTENCE_V5_PUBLISH_RECEIVED;
  403. sprintf(key, "%s%d", key_id, msgId);
  404. }
  405. rc = client->persistence->pput(client->phandle, key, nbufs, bufs, lens);
  406. free(key);
  407. free(lens);
  408. free(bufs);
  409. }
  410. FUNC_EXIT_RC(rc);
  411. return rc;
  412. }
  413. /**
  414. * Deletes a record from the persistent store.
  415. * @param client the client as ::Clients.
  416. * @param type the type of the persisted record: #PERSISTENCE_PUBLISH_SENT, #PERSISTENCE_PUBREL
  417. * or #PERSISTENCE_PUBLISH_RECEIVED.
  418. * @param qos the qos field of the message.
  419. * @param msgId the message ID.
  420. * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
  421. */
  422. int MQTTPersistence_remove(Clients* c, char *type, int qos, int msgId)
  423. {
  424. int rc = 0;
  425. FUNC_ENTRY;
  426. if (c->persistence != NULL)
  427. {
  428. char *key = malloc(MESSAGE_FILENAME_LENGTH + 1);
  429. if (strcmp(type, PERSISTENCE_PUBLISH_SENT) == 0 ||
  430. strcmp(type, PERSISTENCE_V5_PUBLISH_SENT) == 0)
  431. {
  432. sprintf(key, "%s%d", PERSISTENCE_V5_PUBLISH_SENT, msgId) ;
  433. rc = c->persistence->premove(c->phandle, key);
  434. sprintf(key, "%s%d", PERSISTENCE_V5_PUBREL, msgId) ;
  435. rc += c->persistence->premove(c->phandle, key);
  436. sprintf(key, "%s%d", PERSISTENCE_PUBLISH_SENT, msgId) ;
  437. rc += c->persistence->premove(c->phandle, key);
  438. sprintf(key, "%s%d", PERSISTENCE_PUBREL, msgId) ;
  439. rc += c->persistence->premove(c->phandle, key);
  440. }
  441. else /* PERSISTENCE_PUBLISH_SENT && qos == 1 */
  442. { /* or PERSISTENCE_PUBLISH_RECEIVED */
  443. sprintf(key, "%s%d", PERSISTENCE_V5_PUBLISH_RECEIVED, msgId);
  444. rc = c->persistence->premove(c->phandle, key);
  445. sprintf(key, "%s%d", PERSISTENCE_PUBLISH_RECEIVED, msgId);
  446. rc += c->persistence->premove(c->phandle, key);
  447. }
  448. free(key);
  449. }
  450. FUNC_EXIT_RC(rc);
  451. return rc;
  452. }
  453. /**
  454. * Checks whether the message IDs wrapped by looking for the largest gap between two consecutive
  455. * message IDs in the outboundMsgs queue.
  456. * @param client the client as ::Clients.
  457. */
  458. void MQTTPersistence_wrapMsgID(Clients *client)
  459. {
  460. ListElement* wrapel = NULL;
  461. ListElement* current = NULL;
  462. FUNC_ENTRY;
  463. if ( client->outboundMsgs->count > 0 )
  464. {
  465. int firstMsgID = ((Messages*)client->outboundMsgs->first->content)->msgid;
  466. int lastMsgID = ((Messages*)client->outboundMsgs->last->content)->msgid;
  467. int gap = MAX_MSG_ID - lastMsgID + firstMsgID;
  468. current = ListNextElement(client->outboundMsgs, &current);
  469. while(ListNextElement(client->outboundMsgs, &current) != NULL)
  470. {
  471. int curMsgID = ((Messages*)current->content)->msgid;
  472. int curPrevMsgID = ((Messages*)current->prev->content)->msgid;
  473. int curgap = curMsgID - curPrevMsgID;
  474. if ( curgap > gap )
  475. {
  476. gap = curgap;
  477. wrapel = current;
  478. }
  479. }
  480. }
  481. if ( wrapel != NULL )
  482. {
  483. /* put wrapel at the beginning of the queue */
  484. client->outboundMsgs->first->prev = client->outboundMsgs->last;
  485. client->outboundMsgs->last->next = client->outboundMsgs->first;
  486. client->outboundMsgs->first = wrapel;
  487. client->outboundMsgs->last = wrapel->prev;
  488. client->outboundMsgs->first->prev = NULL;
  489. client->outboundMsgs->last->next = NULL;
  490. }
  491. FUNC_EXIT;
  492. }
  493. #if !defined(NO_PERSISTENCE)
  494. int MQTTPersistence_unpersistQueueEntry(Clients* client, MQTTPersistence_qEntry* qe)
  495. {
  496. int rc = 0;
  497. char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
  498. FUNC_ENTRY;
  499. if (client->MQTTVersion >= MQTTVERSION_5)
  500. sprintf(key, "%s%u", PERSISTENCE_V5_QUEUE_KEY, qe->seqno);
  501. else
  502. sprintf(key, "%s%u", PERSISTENCE_QUEUE_KEY, qe->seqno);
  503. if ((rc = client->persistence->premove(client->phandle, key)) != 0)
  504. Log(LOG_ERROR, 0, "Error %d removing qEntry from persistence", rc);
  505. FUNC_EXIT_RC(rc);
  506. return rc;
  507. }
  508. #define MAX_NO_OF_BUFFERS 9
  509. int MQTTPersistence_persistQueueEntry(Clients* aclient, MQTTPersistence_qEntry* qe)
  510. {
  511. int rc = 0;
  512. int bufindex = 0;
  513. char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
  514. int lens[MAX_NO_OF_BUFFERS];
  515. void* bufs[MAX_NO_OF_BUFFERS];
  516. int props_allocated = 0;
  517. FUNC_ENTRY;
  518. bufs[bufindex] = &qe->msg->payloadlen;
  519. lens[bufindex++] = sizeof(qe->msg->payloadlen);
  520. bufs[bufindex] = qe->msg->payload;
  521. lens[bufindex++] = qe->msg->payloadlen;
  522. bufs[bufindex] = &qe->msg->qos;
  523. lens[bufindex++] = sizeof(qe->msg->qos);
  524. bufs[bufindex] = &qe->msg->retained;
  525. lens[bufindex++] = sizeof(qe->msg->retained);
  526. bufs[bufindex] = &qe->msg->dup;
  527. lens[bufindex++] = sizeof(qe->msg->dup);
  528. bufs[bufindex] = &qe->msg->msgid;
  529. lens[bufindex++] = sizeof(qe->msg->msgid);
  530. bufs[bufindex] = qe->topicName;
  531. lens[bufindex++] = (int)strlen(qe->topicName) + 1;
  532. bufs[bufindex] = &qe->topicLen;
  533. lens[bufindex++] = sizeof(qe->topicLen);
  534. if (aclient->MQTTVersion >= MQTTVERSION_5) /* persist properties */
  535. {
  536. MQTTProperties no_props = MQTTProperties_initializer;
  537. MQTTProperties* props = &no_props;
  538. int temp_len = 0;
  539. char* ptr = NULL;
  540. if (qe->msg->struct_version >= 1)
  541. props = &qe->msg->properties;
  542. temp_len = MQTTProperties_len(props);
  543. ptr = bufs[bufindex] = malloc(temp_len);
  544. props_allocated = bufindex;
  545. rc = MQTTProperties_write(&ptr, props);
  546. lens[bufindex++] = temp_len;
  547. sprintf(key, "%s%u", PERSISTENCE_V5_QUEUE_KEY, ++aclient->qentry_seqno);
  548. }
  549. else
  550. sprintf(key, "%s%u", PERSISTENCE_QUEUE_KEY, ++aclient->qentry_seqno);
  551. qe->seqno = aclient->qentry_seqno;
  552. if ((rc = aclient->persistence->pput(aclient->phandle, key, bufindex, (char**)bufs, lens)) != 0)
  553. Log(LOG_ERROR, 0, "Error persisting queue entry, rc %d", rc);
  554. if (props_allocated != 0)
  555. free(bufs[props_allocated]);
  556. FUNC_EXIT_RC(rc);
  557. return rc;
  558. }
  559. static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, size_t buflen, int MQTTVersion)
  560. {
  561. MQTTPersistence_qEntry* qe = NULL;
  562. char* ptr = buffer;
  563. int data_size;
  564. FUNC_ENTRY;
  565. qe = malloc(sizeof(MQTTPersistence_qEntry));
  566. memset(qe, '\0', sizeof(MQTTPersistence_qEntry));
  567. qe->msg = malloc(sizeof(MQTTPersistence_message));
  568. memset(qe->msg, '\0', sizeof(MQTTPersistence_message));
  569. qe->msg->struct_version = 1;
  570. qe->msg->payloadlen = *(int*)ptr;
  571. ptr += sizeof(int);
  572. data_size = qe->msg->payloadlen;
  573. qe->msg->payload = malloc(data_size);
  574. memcpy(qe->msg->payload, ptr, data_size);
  575. ptr += data_size;
  576. qe->msg->qos = *(int*)ptr;
  577. ptr += sizeof(int);
  578. qe->msg->retained = *(int*)ptr;
  579. ptr += sizeof(int);
  580. qe->msg->dup = *(int*)ptr;
  581. ptr += sizeof(int);
  582. qe->msg->msgid = *(int*)ptr;
  583. ptr += sizeof(int);
  584. data_size = (int)strlen(ptr) + 1;
  585. qe->topicName = malloc(data_size);
  586. strcpy(qe->topicName, ptr);
  587. ptr += data_size;
  588. qe->topicLen = *(int*)ptr;
  589. ptr += sizeof(int);
  590. if (MQTTVersion >= MQTTVERSION_5 &&
  591. MQTTProperties_read(&qe->msg->properties, &ptr, buffer + buflen) != 1)
  592. Log(LOG_ERROR, -1, "Error restoring properties from persistence");
  593. FUNC_EXIT;
  594. return qe;
  595. }
  596. static void MQTTPersistence_insertInSeqOrder(List* list, MQTTPersistence_qEntry* qEntry, size_t size)
  597. {
  598. ListElement* index = NULL;
  599. ListElement* current = NULL;
  600. FUNC_ENTRY;
  601. while (ListNextElement(list, &current) != NULL && index == NULL)
  602. {
  603. if (qEntry->seqno < ((MQTTPersistence_qEntry*)current->content)->seqno)
  604. index = current;
  605. }
  606. ListInsert(list, qEntry, size, index);
  607. FUNC_EXIT;
  608. }
  609. /**
  610. * Restores a queue of messages from persistence to memory
  611. * @param c the client as ::Clients - the client object to restore the messages to
  612. * @return return code, 0 if successful
  613. */
  614. int MQTTPersistence_restoreMessageQueue(Clients* c)
  615. {
  616. int rc = 0;
  617. char **msgkeys;
  618. int nkeys;
  619. int i = 0;
  620. int entries_restored = 0;
  621. FUNC_ENTRY;
  622. if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
  623. {
  624. while (rc == 0 && i < nkeys)
  625. {
  626. char *buffer = NULL;
  627. int buflen;
  628. if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) != 0 &&
  629. strncmp(msgkeys[i], PERSISTENCE_V5_QUEUE_KEY, strlen(PERSISTENCE_V5_QUEUE_KEY)) != 0)
  630. {
  631. ; /* ignore if not a queue entry key */
  632. }
  633. else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0)
  634. {
  635. int MQTTVersion =
  636. (strncmp(msgkeys[i], PERSISTENCE_V5_QUEUE_KEY, strlen(PERSISTENCE_V5_QUEUE_KEY)) == 0)
  637. ? MQTTVERSION_5 : MQTTVERSION_3_1_1;
  638. MQTTPersistence_qEntry* qe = MQTTPersistence_restoreQueueEntry(buffer, buflen, MQTTVersion);
  639. if (qe)
  640. {
  641. qe->seqno = atoi(strchr(msgkeys[i], '-')+1); /* key format is tag'-'seqno */
  642. MQTTPersistence_insertInSeqOrder(c->messageQueue, qe, sizeof(MQTTPersistence_qEntry));
  643. free(buffer);
  644. c->qentry_seqno = max(c->qentry_seqno, qe->seqno);
  645. entries_restored++;
  646. }
  647. }
  648. if (msgkeys[i])
  649. {
  650. free(msgkeys[i]);
  651. }
  652. i++;
  653. }
  654. if (msgkeys != NULL)
  655. free(msgkeys);
  656. }
  657. Log(TRACE_MINIMUM, -1, "%d queued messages restored for client %s", entries_restored, c->clientID);
  658. FUNC_EXIT_RC(rc);
  659. return rc;
  660. }
  661. #endif