MQTTPacketOut.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. /*******************************************************************************
  2. * Copyright (c) 2009, 2019 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * http://www.eclipse.org/legal/epl-v10.html
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial API and implementation and/or initial documentation
  15. * Ian Craggs, Allan Stockdill-Mander - SSL updates
  16. * Ian Craggs - MQTT 3.1.1 support
  17. * Rong Xiang, Ian Craggs - C++ compatibility
  18. * Ian Craggs - binary password and will payload
  19. * Ian Craggs - MQTT 5.0 support
  20. *******************************************************************************/
  21. /**
  22. * @file
  23. * \brief functions to deal with reading and writing of MQTT packets from and to sockets
  24. *
  25. * Some other related functions are in the MQTTPacket module
  26. */
  27. #include "MQTTPacketOut.h"
  28. #include "Log.h"
  29. #include "StackTrace.h"
  30. #include <string.h>
  31. #include <stdlib.h>
  32. #include "Heap.h"
  33. /**
  34. * Send an MQTT CONNECT packet down a socket for V5 or later
  35. * @param client a structure from which to get all the required values
  36. * @param MQTTVersion the MQTT version to connect with
  37. * @param connectProperties MQTT V5 properties for the connect packet
  38. * @param willProperties MQTT V5 properties for the will message, if any
  39. * @return the completion code (e.g. TCPSOCKET_COMPLETE)
  40. */
  41. int MQTTPacket_send_connect(Clients* client, int MQTTVersion,
  42. MQTTProperties* connectProperties, MQTTProperties* willProperties)
  43. {
  44. char *buf, *ptr;
  45. Connect packet;
  46. int rc = -1, len;
  47. FUNC_ENTRY;
  48. packet.header.byte = 0;
  49. packet.header.bits.type = CONNECT;
  50. len = ((MQTTVersion == MQTTVERSION_3_1) ? 12 : 10) + (int)strlen(client->clientID)+2;
  51. if (client->will)
  52. len += (int)strlen(client->will->topic)+2 + client->will->payloadlen+2;
  53. if (client->username)
  54. len += (int)strlen(client->username)+2;
  55. if (client->password)
  56. len += client->passwordlen+2;
  57. if (MQTTVersion >= MQTTVERSION_5)
  58. {
  59. len += MQTTProperties_len(connectProperties);
  60. if (client->will)
  61. len += MQTTProperties_len(willProperties);
  62. }
  63. ptr = buf = malloc(len);
  64. if (MQTTVersion == MQTTVERSION_3_1)
  65. {
  66. writeUTF(&ptr, "MQIsdp");
  67. writeChar(&ptr, (char)MQTTVERSION_3_1);
  68. }
  69. else if (MQTTVersion == MQTTVERSION_3_1_1 || MQTTVersion == MQTTVERSION_5)
  70. {
  71. writeUTF(&ptr, "MQTT");
  72. writeChar(&ptr, (char)MQTTVersion);
  73. }
  74. else
  75. goto exit;
  76. packet.flags.all = 0;
  77. if (MQTTVersion >= MQTTVERSION_5)
  78. packet.flags.bits.cleanstart = client->cleanstart;
  79. else
  80. packet.flags.bits.cleanstart = client->cleansession;
  81. packet.flags.bits.will = (client->will) ? 1 : 0;
  82. if (packet.flags.bits.will)
  83. {
  84. packet.flags.bits.willQoS = client->will->qos;
  85. packet.flags.bits.willRetain = client->will->retained;
  86. }
  87. if (client->username)
  88. packet.flags.bits.username = 1;
  89. if (client->password)
  90. packet.flags.bits.password = 1;
  91. writeChar(&ptr, packet.flags.all);
  92. writeInt(&ptr, client->keepAliveInterval);
  93. if (MQTTVersion >= MQTTVERSION_5)
  94. MQTTProperties_write(&ptr, connectProperties);
  95. writeUTF(&ptr, client->clientID);
  96. if (client->will)
  97. {
  98. if (MQTTVersion >= MQTTVERSION_5)
  99. MQTTProperties_write(&ptr, willProperties);
  100. writeUTF(&ptr, client->will->topic);
  101. writeData(&ptr, client->will->payload, client->will->payloadlen);
  102. }
  103. if (client->username)
  104. writeUTF(&ptr, client->username);
  105. if (client->password)
  106. writeData(&ptr, client->password, client->passwordlen);
  107. rc = MQTTPacket_send(&client->net, packet.header, buf, len, 1, MQTTVersion);
  108. Log(LOG_PROTOCOL, 0, NULL, client->net.socket, client->clientID,
  109. MQTTVersion, client->cleansession, rc);
  110. exit:
  111. if (rc != TCPSOCKET_INTERRUPTED)
  112. free(buf);
  113. FUNC_EXIT_RC(rc);
  114. return rc;
  115. }
  116. /**
  117. * Function used in the new packets table to create connack packets.
  118. * @param MQTTVersion MQTT 5 or less?
  119. * @param aHeader the MQTT header byte
  120. * @param data the rest of the packet
  121. * @param datalen the length of the rest of the packet
  122. * @return pointer to the packet structure
  123. */
  124. void* MQTTPacket_connack(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
  125. {
  126. Connack* pack = NULL;
  127. char* curdata = data;
  128. char* enddata = &data[datalen];
  129. FUNC_ENTRY;
  130. if ((pack = malloc(sizeof(Connack))) == NULL)
  131. goto exit;
  132. pack->MQTTVersion = MQTTVersion;
  133. pack->header.byte = aHeader;
  134. pack->flags.all = readChar(&curdata); /* connect flags */
  135. pack->rc = readChar(&curdata); /* reason code */
  136. if (MQTTVersion < MQTTVERSION_5)
  137. {
  138. if (datalen != 2)
  139. {
  140. free(pack);
  141. pack = NULL;
  142. }
  143. }
  144. else if (datalen > 2)
  145. {
  146. MQTTProperties props = MQTTProperties_initializer;
  147. pack->properties = props;
  148. if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
  149. {
  150. if (pack->properties.array)
  151. free(pack->properties.array);
  152. if (pack)
  153. free(pack);
  154. pack = NULL; /* signal protocol error */
  155. goto exit;
  156. }
  157. }
  158. exit:
  159. FUNC_EXIT;
  160. return pack;
  161. }
  162. /**
  163. * Free allocated storage for a connack packet.
  164. * @param pack pointer to the connack packet structure
  165. */
  166. void MQTTPacket_freeConnack(Connack* pack)
  167. {
  168. FUNC_ENTRY;
  169. if (pack->MQTTVersion >= MQTTVERSION_5)
  170. MQTTProperties_free(&pack->properties);
  171. free(pack);
  172. FUNC_EXIT;
  173. }
  174. /**
  175. * Send an MQTT PINGREQ packet down a socket.
  176. * @param socket the open socket to send the data to
  177. * @param clientID the string client identifier, only used for tracing
  178. * @return the completion code (e.g. TCPSOCKET_COMPLETE)
  179. */
  180. int MQTTPacket_send_pingreq(networkHandles* net, const char* clientID)
  181. {
  182. Header header;
  183. int rc = 0;
  184. FUNC_ENTRY;
  185. header.byte = 0;
  186. header.bits.type = PINGREQ;
  187. rc = MQTTPacket_send(net, header, NULL, 0, 0, MQTTVERSION_3_1_1);
  188. Log(LOG_PROTOCOL, 20, NULL, net->socket, clientID, rc);
  189. FUNC_EXIT_RC(rc);
  190. return rc;
  191. }
  192. /**
  193. * Send an MQTT subscribe packet down a socket.
  194. * @param topics list of topics
  195. * @param qoss list of corresponding QoSs
  196. * @param msgid the MQTT message id to use
  197. * @param dup boolean - whether to set the MQTT DUP flag
  198. * @param socket the open socket to send the data to
  199. * @param clientID the string client identifier, only used for tracing
  200. * @return the completion code (e.g. TCPSOCKET_COMPLETE)
  201. */
  202. int MQTTPacket_send_subscribe(List* topics, List* qoss, MQTTSubscribe_options* opts, MQTTProperties* props,
  203. int msgid, int dup, Clients* client)
  204. {
  205. Header header;
  206. char *data, *ptr;
  207. int rc = -1;
  208. ListElement *elem = NULL, *qosElem = NULL;
  209. int datalen, i = 0;
  210. FUNC_ENTRY;
  211. header.bits.type = SUBSCRIBE;
  212. header.bits.dup = dup;
  213. header.bits.qos = 1;
  214. header.bits.retain = 0;
  215. datalen = 2 + topics->count * 3; /* utf length + char qos == 3 */
  216. while (ListNextElement(topics, &elem))
  217. datalen += (int)strlen((char*)(elem->content));
  218. if (client->MQTTVersion >= MQTTVERSION_5)
  219. datalen += MQTTProperties_len(props);
  220. ptr = data = malloc(datalen);
  221. writeInt(&ptr, msgid);
  222. if (client->MQTTVersion >= MQTTVERSION_5)
  223. MQTTProperties_write(&ptr, props);
  224. elem = NULL;
  225. while (ListNextElement(topics, &elem))
  226. {
  227. char subopts = 0;
  228. ListNextElement(qoss, &qosElem);
  229. writeUTF(&ptr, (char*)(elem->content));
  230. subopts = *(int*)(qosElem->content);
  231. if (client->MQTTVersion >= MQTTVERSION_5 && opts != NULL)
  232. {
  233. subopts |= (opts[i].noLocal << 2); /* 1 bit */
  234. subopts |= (opts[i].retainAsPublished << 3); /* 1 bit */
  235. subopts |= (opts[i].retainHandling << 4); /* 2 bits */
  236. }
  237. writeChar(&ptr, subopts);
  238. ++i;
  239. }
  240. rc = MQTTPacket_send(&client->net, header, data, datalen, 1, client->MQTTVersion);
  241. Log(LOG_PROTOCOL, 22, NULL, client->net.socket, client->clientID, msgid, rc);
  242. if (rc != TCPSOCKET_INTERRUPTED)
  243. free(data);
  244. FUNC_EXIT_RC(rc);
  245. return rc;
  246. }
  247. /**
  248. * Function used in the new packets table to create suback packets.
  249. * @param MQTTVersion the version of MQTT
  250. * @param aHeader the MQTT header byte
  251. * @param data the rest of the packet
  252. * @param datalen the length of the rest of the packet
  253. * @return pointer to the packet structure
  254. */
  255. void* MQTTPacket_suback(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
  256. {
  257. Suback* pack = NULL;
  258. char* curdata = data;
  259. char* enddata = &data[datalen];
  260. FUNC_ENTRY;
  261. if ((pack = malloc(sizeof(Suback))) == NULL)
  262. goto exit;
  263. pack->MQTTVersion = MQTTVersion;
  264. pack->header.byte = aHeader;
  265. pack->msgId = readInt(&curdata);
  266. if (MQTTVersion >= MQTTVERSION_5)
  267. {
  268. MQTTProperties props = MQTTProperties_initializer;
  269. pack->properties = props;
  270. if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
  271. {
  272. if (pack->properties.array)
  273. free(pack->properties.array);
  274. if (pack)
  275. free(pack);
  276. pack = NULL; /* signal protocol error */
  277. goto exit;
  278. }
  279. }
  280. pack->qoss = ListInitialize();
  281. while ((size_t)(curdata - data) < datalen)
  282. {
  283. unsigned int* newint;
  284. newint = malloc(sizeof(unsigned int));
  285. *newint = (unsigned int)readChar(&curdata);
  286. ListAppend(pack->qoss, newint, sizeof(unsigned int));
  287. }
  288. if (pack->qoss->count == 0)
  289. {
  290. if (pack->properties.array)
  291. free(pack->properties.array);
  292. if (pack)
  293. free(pack);
  294. ListFree(pack->qoss);
  295. pack = NULL;
  296. }
  297. exit:
  298. FUNC_EXIT;
  299. return pack;
  300. }
  301. /**
  302. * Send an MQTT unsubscribe packet down a socket.
  303. * @param topics list of topics
  304. * @param msgid the MQTT message id to use
  305. * @param dup boolean - whether to set the MQTT DUP flag
  306. * @param socket the open socket to send the data to
  307. * @param clientID the string client identifier, only used for tracing
  308. * @return the completion code (e.g. TCPSOCKET_COMPLETE)
  309. */
  310. int MQTTPacket_send_unsubscribe(List* topics, MQTTProperties* props, int msgid, int dup, Clients* client)
  311. {
  312. Header header;
  313. char *data, *ptr;
  314. int rc = -1;
  315. ListElement *elem = NULL;
  316. int datalen;
  317. FUNC_ENTRY;
  318. header.bits.type = UNSUBSCRIBE;
  319. header.bits.dup = dup;
  320. header.bits.qos = 1;
  321. header.bits.retain = 0;
  322. datalen = 2 + topics->count * 2; /* utf length == 2 */
  323. while (ListNextElement(topics, &elem))
  324. datalen += (int)strlen((char*)(elem->content));
  325. if (client->MQTTVersion >= MQTTVERSION_5)
  326. datalen += MQTTProperties_len(props);
  327. ptr = data = malloc(datalen);
  328. writeInt(&ptr, msgid);
  329. if (client->MQTTVersion >= MQTTVERSION_5)
  330. MQTTProperties_write(&ptr, props);
  331. elem = NULL;
  332. while (ListNextElement(topics, &elem))
  333. writeUTF(&ptr, (char*)(elem->content));
  334. rc = MQTTPacket_send(&client->net, header, data, datalen, 1, client->MQTTVersion);
  335. Log(LOG_PROTOCOL, 25, NULL, client->net.socket, client->clientID, msgid, rc);
  336. if (rc != TCPSOCKET_INTERRUPTED)
  337. free(data);
  338. FUNC_EXIT_RC(rc);
  339. return rc;
  340. }
  341. /**
  342. * Function used in the new packets table to create unsuback packets.
  343. * @param MQTTVersion the version of MQTT
  344. * @param aHeader the MQTT header byte
  345. * @param data the rest of the packet
  346. * @param datalen the length of the rest of the packet
  347. * @return pointer to the packet structure
  348. */
  349. void* MQTTPacket_unsuback(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
  350. {
  351. Unsuback* pack = NULL;
  352. char* curdata = data;
  353. char* enddata = &data[datalen];
  354. FUNC_ENTRY;
  355. if ((pack = malloc(sizeof(Unsuback))) == NULL)
  356. goto exit;
  357. pack->MQTTVersion = MQTTVersion;
  358. pack->header.byte = aHeader;
  359. pack->msgId = readInt(&curdata);
  360. pack->reasonCodes = NULL;
  361. if (MQTTVersion >= MQTTVERSION_5)
  362. {
  363. MQTTProperties props = MQTTProperties_initializer;
  364. pack->properties = props;
  365. if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
  366. {
  367. if (pack->properties.array)
  368. free(pack->properties.array);
  369. if (pack)
  370. free(pack);
  371. pack = NULL; /* signal protocol error */
  372. goto exit;
  373. }
  374. pack->reasonCodes = ListInitialize();
  375. while ((size_t)(curdata - data) < datalen)
  376. {
  377. enum MQTTReasonCodes* newrc;
  378. newrc = malloc(sizeof(enum MQTTReasonCodes));
  379. *newrc = (enum MQTTReasonCodes)readChar(&curdata);
  380. ListAppend(pack->reasonCodes, newrc, sizeof(enum MQTTReasonCodes));
  381. }
  382. if (pack->reasonCodes->count == 0)
  383. {
  384. ListFree(pack->reasonCodes);
  385. if (pack->properties.array)
  386. free(pack->properties.array);
  387. if (pack)
  388. free(pack);
  389. pack = NULL;
  390. }
  391. }
  392. exit:
  393. FUNC_EXIT;
  394. return pack;
  395. }