MQTTProtocolOut.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. /*******************************************************************************
  2. * Copyright (c) 2009, 2022 IBM Corp., Ian Craggs and others
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial API and implementation and/or initial documentation
  15. * Ian Craggs, Allan Stockdill-Mander - SSL updates
  16. * Ian Craggs - fix for buffer overflow in addressPort bug #433290
  17. * Ian Craggs - MQTT 3.1.1 support
  18. * Rong Xiang, Ian Craggs - C++ compatibility
  19. * Ian Craggs - fix for bug 479376
  20. * Ian Craggs - SNI support
  21. * Ian Craggs - fix for issue #164
  22. * Ian Craggs - fix for issue #179
  23. * Ian Craggs - MQTT 5.0 support
  24. * Sven Gambel - add generic proxy support
  25. *******************************************************************************/
  26. /**
  27. * @file
  28. * \brief Functions dealing with the MQTT protocol exchanges
  29. *
  30. * Some other related functions are in the MQTTProtocolClient module
  31. */
  32. #include <stdlib.h>
  33. #include <string.h>
  34. #include <ctype.h>
  35. #include "MQTTProtocolOut.h"
  36. #include "StackTrace.h"
  37. #include "Heap.h"
  38. #include "WebSocket.h"
  39. #include "Proxy.h"
  40. #include "Base64.h"
  41. extern ClientStates* bstate;
  42. /**
  43. * Separates an address:port into two separate values
  44. * @param[in] uri the input string - hostname:port
  45. * @param[out] port the returned port integer
  46. * @param[out] topic optional topic portion of the address starting with '/'
  47. * @return the address string
  48. */
  49. size_t MQTTProtocol_addressPort(const char* uri, int* port, const char **topic, int default_port)
  50. {
  51. char* buf = (char*)uri;
  52. char* colon_pos;
  53. size_t len;
  54. char* topic_pos;
  55. FUNC_ENTRY;
  56. colon_pos = strrchr(uri, ':'); /* reverse find to allow for ':' in IPv6 addresses */
  57. if (uri[0] == '[')
  58. { /* ip v6 */
  59. if (colon_pos < strrchr(uri, ']'))
  60. colon_pos = NULL; /* means it was an IPv6 separator, not for host:port */
  61. }
  62. if (colon_pos) /* have to strip off the port */
  63. {
  64. len = colon_pos - uri;
  65. *port = atoi(colon_pos + 1);
  66. }
  67. else
  68. {
  69. len = strlen(buf);
  70. *port = default_port;
  71. }
  72. /* find any topic portion */
  73. topic_pos = (char*)uri;
  74. if (colon_pos)
  75. topic_pos = colon_pos;
  76. topic_pos = strchr(topic_pos, '/');
  77. if (topic_pos)
  78. {
  79. if (topic)
  80. *topic = topic_pos;
  81. if (!colon_pos)
  82. len = topic_pos - uri;
  83. }
  84. if (buf[len - 1] == ']')
  85. {
  86. /* we are stripping off the final ], so length is 1 shorter */
  87. --len;
  88. }
  89. FUNC_EXIT;
  90. return len;
  91. }
  92. /**
  93. * Allow user or password characters to be expressed in the form of %XX, XX being the
  94. * hexadecimal value of the character. This will avoid problems when a user code or a password
  95. * contains a '@' or another special character ('%' included)
  96. * @param p0 output string
  97. * @param p1 input string
  98. * @param basic_auth_in_len
  99. */
  100. void MQTTProtocol_specialChars(char* p0, char* p1, b64_size_t *basic_auth_in_len)
  101. {
  102. while (*p1 != '@')
  103. {
  104. if (*p1 != '%')
  105. {
  106. *p0++ = *p1++;
  107. }
  108. else if (isxdigit(*(p1 + 1)) && isxdigit(*(p1 + 2)))
  109. {
  110. /* next 2 characters are hexa digits */
  111. char hex[3];
  112. p1++;
  113. hex[0] = *p1++;
  114. hex[1] = *p1++;
  115. hex[2] = '\0';
  116. *p0++ = (char)strtol(hex, 0, 16);
  117. /* 3 input char => 1 output char */
  118. *basic_auth_in_len -= 2;
  119. }
  120. }
  121. *p0 = 0x0;
  122. }
  123. /*
  124. * Examples of proxy settings:
  125. * http://your.proxy.server:8080/
  126. * http://user:pass@my.proxy.server:8080/
  127. */
  128. int MQTTProtocol_setHTTPProxy(Clients* aClient, char* source, char** dest, char** auth_dest, char* prefix)
  129. {
  130. b64_size_t basic_auth_in_len, basic_auth_out_len;
  131. b64_data_t *basic_auth;
  132. char *p1;
  133. int rc = 0;
  134. if (*auth_dest)
  135. {
  136. free(*auth_dest);
  137. *auth_dest = NULL;
  138. }
  139. if (source)
  140. {
  141. if ((p1 = strstr(source, prefix)) != NULL) /* skip http:// prefix, if any */
  142. source += strlen(prefix);
  143. *dest = source;
  144. if ((p1 = strchr(source, '@')) != NULL) /* find user.pass separator */
  145. *dest = p1 + 1;
  146. if (p1)
  147. {
  148. /* basic auth len is string between http:// and @ */
  149. basic_auth_in_len = (b64_size_t)(p1 - source);
  150. if (basic_auth_in_len > 0)
  151. {
  152. basic_auth = (b64_data_t *)malloc(sizeof(char)*(basic_auth_in_len+1));
  153. if (!basic_auth)
  154. {
  155. rc = PAHO_MEMORY_ERROR;
  156. goto exit;
  157. }
  158. MQTTProtocol_specialChars((char*)basic_auth, source, &basic_auth_in_len);
  159. basic_auth_out_len = Base64_encodeLength(basic_auth, basic_auth_in_len) + 1; /* add 1 for trailing NULL */
  160. if ((*auth_dest = (char *)malloc(sizeof(char)*basic_auth_out_len)) == NULL)
  161. {
  162. free(basic_auth);
  163. rc = PAHO_MEMORY_ERROR;
  164. goto exit;
  165. }
  166. Base64_encode(*auth_dest, basic_auth_out_len, basic_auth, basic_auth_in_len);
  167. free(basic_auth);
  168. }
  169. }
  170. }
  171. exit:
  172. return rc;
  173. }
  174. /**
  175. * MQTT outgoing connect processing for a client
  176. * @param ip_address the TCP address:port to connect to
  177. * @param aClient a structure with all MQTT data needed
  178. * @param int ssl
  179. * @param int MQTTVersion the MQTT version to connect with (3 or 4)
  180. * @param long timeout how long to wait for a new socket to be created
  181. * @return return code
  182. */
  183. #if defined(OPENSSL)
  184. #if defined(__GNUC__) && defined(__linux__)
  185. int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int ssl, int websocket, int MQTTVersion,
  186. MQTTProperties* connectProperties, MQTTProperties* willProperties, long timeout)
  187. #else
  188. int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int ssl, int websocket, int MQTTVersion,
  189. MQTTProperties* connectProperties, MQTTProperties* willProperties)
  190. #endif
  191. #else
  192. #if defined(__GNUC__) && defined(__linux__)
  193. int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int websocket, int MQTTVersion,
  194. MQTTProperties* connectProperties, MQTTProperties* willProperties, long timeout)
  195. #else
  196. int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int websocket, int MQTTVersion,
  197. MQTTProperties* connectProperties, MQTTProperties* willProperties)
  198. #endif
  199. #endif
  200. {
  201. int rc = 0,
  202. port;
  203. size_t addr_len;
  204. char* p0;
  205. FUNC_ENTRY;
  206. aClient->good = 1;
  207. if (aClient->httpProxy)
  208. p0 = aClient->httpProxy;
  209. else
  210. p0 = getenv("http_proxy");
  211. if (p0)
  212. {
  213. if ((rc = MQTTProtocol_setHTTPProxy(aClient, p0, &aClient->net.http_proxy, &aClient->net.http_proxy_auth, "http://")) != 0)
  214. goto exit;
  215. Log(TRACE_PROTOCOL, -1, "Setting http proxy to %s", aClient->net.http_proxy);
  216. if (aClient->net.http_proxy_auth)
  217. Log(TRACE_PROTOCOL, -1, "Setting http proxy auth to %s", aClient->net.http_proxy_auth);
  218. }
  219. #if defined(OPENSSL)
  220. if (aClient->httpsProxy)
  221. p0 = aClient->httpsProxy;
  222. else
  223. p0 = getenv("https_proxy");
  224. if (p0)
  225. {
  226. if ((rc = MQTTProtocol_setHTTPProxy(aClient, p0, &aClient->net.https_proxy, &aClient->net.https_proxy_auth, "https://")) != 0)
  227. goto exit;
  228. Log(TRACE_PROTOCOL, -1, "Setting https proxy to %s", aClient->net.https_proxy);
  229. if (aClient->net.https_proxy_auth)
  230. Log(TRACE_PROTOCOL, -1, "Setting https proxy auth to %s", aClient->net.https_proxy_auth);
  231. }
  232. if (!ssl && aClient->net.http_proxy) {
  233. #else
  234. if (aClient->net.http_proxy) {
  235. #endif
  236. addr_len = MQTTProtocol_addressPort(aClient->net.http_proxy, &port, NULL, PROXY_DEFAULT_PORT);
  237. #if defined(__GNUC__) && defined(__linux__)
  238. if (timeout < 0)
  239. rc = -1;
  240. else
  241. rc = Socket_new(aClient->net.http_proxy, addr_len, port, &(aClient->net.socket), timeout);
  242. #else
  243. rc = Socket_new(aClient->net.http_proxy, addr_len, port, &(aClient->net.socket));
  244. #endif
  245. }
  246. #if defined(OPENSSL)
  247. else if (ssl && aClient->net.https_proxy) {
  248. addr_len = MQTTProtocol_addressPort(aClient->net.https_proxy, &port, NULL, PROXY_DEFAULT_PORT);
  249. #if defined(__GNUC__) && defined(__linux__)
  250. if (timeout < 0)
  251. rc = -1;
  252. else
  253. rc = Socket_new(aClient->net.https_proxy, addr_len, port, &(aClient->net.socket), timeout);
  254. #else
  255. rc = Socket_new(aClient->net.https_proxy, addr_len, port, &(aClient->net.socket));
  256. #endif
  257. }
  258. #endif
  259. else {
  260. #if defined(OPENSSL)
  261. addr_len = MQTTProtocol_addressPort(ip_address, &port, NULL, ssl ?
  262. (websocket ? WSS_DEFAULT_PORT : SECURE_MQTT_DEFAULT_PORT) :
  263. (websocket ? WS_DEFAULT_PORT : MQTT_DEFAULT_PORT) );
  264. #else
  265. addr_len = MQTTProtocol_addressPort(ip_address, &port, NULL, websocket ? WS_DEFAULT_PORT : MQTT_DEFAULT_PORT);
  266. #endif
  267. #if defined(__GNUC__) && defined(__linux__)
  268. if (timeout < 0)
  269. rc = -1;
  270. else
  271. rc = Socket_new(ip_address, addr_len, port, &(aClient->net.socket), timeout);
  272. #else
  273. rc = Socket_new(ip_address, addr_len, port, &(aClient->net.socket));
  274. #endif
  275. }
  276. if (rc == EINPROGRESS || rc == EWOULDBLOCK)
  277. aClient->connect_state = TCP_IN_PROGRESS; /* TCP connect called - wait for connect completion */
  278. else if (rc == 0)
  279. { /* TCP connect completed. If SSL, send SSL connect */
  280. #if defined(OPENSSL)
  281. if (ssl)
  282. {
  283. if (aClient->net.https_proxy) {
  284. aClient->connect_state = PROXY_CONNECT_IN_PROGRESS;
  285. rc = Proxy_connect( &aClient->net, 1, ip_address);
  286. }
  287. if (rc == 0 && SSLSocket_setSocketForSSL(&aClient->net, aClient->sslopts, ip_address, addr_len) == 1)
  288. {
  289. rc = aClient->sslopts->struct_version >= 3 ?
  290. SSLSocket_connect(aClient->net.ssl, aClient->net.socket, ip_address,
  291. aClient->sslopts->verify, aClient->sslopts->ssl_error_cb, aClient->sslopts->ssl_error_context) :
  292. SSLSocket_connect(aClient->net.ssl, aClient->net.socket, ip_address,
  293. aClient->sslopts->verify, NULL, NULL);
  294. if (rc == TCPSOCKET_INTERRUPTED)
  295. aClient->connect_state = SSL_IN_PROGRESS; /* SSL connect called - wait for completion */
  296. }
  297. else
  298. rc = SOCKET_ERROR;
  299. }
  300. else if (aClient->net.http_proxy) {
  301. #else
  302. if (aClient->net.http_proxy) {
  303. #endif
  304. aClient->connect_state = PROXY_CONNECT_IN_PROGRESS;
  305. rc = Proxy_connect( &aClient->net, 0, ip_address);
  306. }
  307. if ( websocket )
  308. {
  309. #if defined(OPENSSL)
  310. rc = WebSocket_connect(&aClient->net, ssl, ip_address);
  311. #endif
  312. rc = WebSocket_connect(&aClient->net, 0, ip_address);
  313. if ( rc == TCPSOCKET_INTERRUPTED )
  314. aClient->connect_state = WEBSOCKET_IN_PROGRESS; /* Websocket connect called - wait for completion */
  315. }
  316. if (rc == 0)
  317. {
  318. /* Now send the MQTT connect packet */
  319. if ((rc = MQTTPacket_send_connect(aClient, MQTTVersion, connectProperties, willProperties)) == 0)
  320. aClient->connect_state = WAIT_FOR_CONNACK; /* MQTT Connect sent - wait for CONNACK */
  321. else
  322. aClient->connect_state = NOT_IN_PROGRESS;
  323. }
  324. }
  325. exit:
  326. FUNC_EXIT_RC(rc);
  327. return rc;
  328. }
  329. /**
  330. * Process an incoming pingresp packet for a socket
  331. * @param pack pointer to the publish packet
  332. * @param sock the socket on which the packet was received
  333. * @return completion code
  334. */
  335. int MQTTProtocol_handlePingresps(void* pack, SOCKET sock)
  336. {
  337. Clients* client = NULL;
  338. int rc = TCPSOCKET_COMPLETE;
  339. FUNC_ENTRY;
  340. client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
  341. Log(LOG_PROTOCOL, 21, NULL, sock, client->clientID);
  342. client->ping_outstanding = 0;
  343. FUNC_EXIT_RC(rc);
  344. return rc;
  345. }
  346. /**
  347. * MQTT outgoing subscribe processing for a client
  348. * @param client the client structure
  349. * @param topics list of topics
  350. * @param qoss corresponding list of QoSs
  351. * @param opts MQTT 5.0 subscribe options
  352. * @param props MQTT 5.0 subscribe properties
  353. * @return completion code
  354. */
  355. int MQTTProtocol_subscribe(Clients* client, List* topics, List* qoss, int msgID,
  356. MQTTSubscribe_options* opts, MQTTProperties* props)
  357. {
  358. int rc = 0;
  359. FUNC_ENTRY;
  360. rc = MQTTPacket_send_subscribe(topics, qoss, opts, props, msgID, 0, client);
  361. FUNC_EXIT_RC(rc);
  362. return rc;
  363. }
  364. /**
  365. * Process an incoming suback packet for a socket
  366. * @param pack pointer to the publish packet
  367. * @param sock the socket on which the packet was received
  368. * @return completion code
  369. */
  370. int MQTTProtocol_handleSubacks(void* pack, SOCKET sock)
  371. {
  372. Suback* suback = (Suback*)pack;
  373. Clients* client = NULL;
  374. int rc = TCPSOCKET_COMPLETE;
  375. FUNC_ENTRY;
  376. client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
  377. Log(LOG_PROTOCOL, 23, NULL, sock, client->clientID, suback->msgId);
  378. MQTTPacket_freeSuback(suback);
  379. FUNC_EXIT_RC(rc);
  380. return rc;
  381. }
  382. /**
  383. * MQTT outgoing unsubscribe processing for a client
  384. * @param client the client structure
  385. * @param topics list of topics
  386. * @return completion code
  387. */
  388. int MQTTProtocol_unsubscribe(Clients* client, List* topics, int msgID, MQTTProperties* props)
  389. {
  390. int rc = 0;
  391. FUNC_ENTRY;
  392. rc = MQTTPacket_send_unsubscribe(topics, props, msgID, 0, client);
  393. FUNC_EXIT_RC(rc);
  394. return rc;
  395. }
  396. /**
  397. * Process an incoming unsuback packet for a socket
  398. * @param pack pointer to the publish packet
  399. * @param sock the socket on which the packet was received
  400. * @return completion code
  401. */
  402. int MQTTProtocol_handleUnsubacks(void* pack, SOCKET sock)
  403. {
  404. Unsuback* unsuback = (Unsuback*)pack;
  405. Clients* client = NULL;
  406. int rc = TCPSOCKET_COMPLETE;
  407. FUNC_ENTRY;
  408. client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
  409. Log(LOG_PROTOCOL, 24, NULL, sock, client->clientID, unsuback->msgId);
  410. MQTTPacket_freeUnsuback(unsuback);
  411. FUNC_EXIT_RC(rc);
  412. return rc;
  413. }
  414. /**
  415. * Process an incoming disconnect packet for a socket
  416. * @param pack pointer to the disconnect packet
  417. * @param sock the socket on which the packet was received
  418. * @return completion code
  419. */
  420. int MQTTProtocol_handleDisconnects(void* pack, SOCKET sock)
  421. {
  422. Ack* disconnect = (Ack*)pack;
  423. Clients* client = NULL;
  424. int rc = TCPSOCKET_COMPLETE;
  425. FUNC_ENTRY;
  426. client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
  427. Log(LOG_PROTOCOL, 30, NULL, sock, client->clientID, disconnect->rc);
  428. MQTTPacket_freeAck(disconnect);
  429. FUNC_EXIT_RC(rc);
  430. return rc;
  431. }