MQTTProtocolOut.c 7.3 KB

  1. /*******************************************************************************
  2. * Copyright (c) 2009, 2018 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. *
  10. * and the Eclipse Distribution License is available at
  11. *
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial API and implementation and/or initial documentation
  15. * Ian Craggs, Allan Stockdill-Mander - SSL updates
  16. * Ian Craggs - fix for buffer overflow in addressPort bug #433290
  17. * Ian Craggs - MQTT 3.1.1 support
  18. * Rong Xiang, Ian Craggs - C++ compatibility
  19. * Ian Craggs - fix for bug 479376
  20. * Ian Craggs - SNI support
  21. * Ian Craggs - fix for issue #164
  22. * Ian Craggs - fix for issue #179
  23. * Ian Craggs - MQTT 5.0 support
  24. *******************************************************************************/
  25. /**
  26. * @file
  27. * \brief Functions dealing with the MQTT protocol exchanges
  28. *
  29. * Some other related functions are in the MQTTProtocolClient module
  30. */
  31. #include <stdlib.h>
  32. #include <string.h>
  33. #include "MQTTProtocolOut.h"
  34. #include "StackTrace.h"
  35. #include "Heap.h"
  36. #include "WebSocket.h"
  37. extern ClientStates* bstate;
  38. /**
  39. * Separates an address:port into two separate values
  40. * @param[in] uri the input string - hostname:port
  41. * @param[out] port the returned port integer
  42. * @param[out] topic optional topic portion of the address starting with '/'
  43. * @return the address string
  44. */
  45. size_t MQTTProtocol_addressPort(const char* uri, int* port, const char **topic)
  46. {
  47. char* colon_pos = strrchr(uri, ':'); /* reverse find to allow for ':' in IPv6 addresses */
  48. char* buf = (char*)uri;
  49. size_t len;
  51. if (uri[0] == '[')
  52. { /* ip v6 */
  53. if (colon_pos < strrchr(uri, ']'))
  54. colon_pos = NULL; /* means it was an IPv6 separator, not for host:port */
  55. }
  56. if (colon_pos) /* have to strip off the port */
  57. {
  58. len = colon_pos - uri;
  59. *port = atoi(colon_pos + 1);
  60. }
  61. else
  62. {
  63. len = strlen(buf);
  64. *port = DEFAULT_PORT;
  65. }
  66. /* try and find topic portion */
  67. if ( topic )
  68. {
  69. const char* addr_start = uri;
  70. if ( colon_pos )
  71. addr_start = colon_pos;
  72. *topic = strchr( addr_start, '/' );
  73. }
  74. if (buf[len - 1] == ']')
  75. {
  76. /* we are stripping off the final ], so length is 1 shorter */
  77. --len;
  78. }
  79. FUNC_EXIT;
  80. return len;
  81. }
  82. /**
  83. * MQTT outgoing connect processing for a client
  84. * @param ip_address the TCP address:port to connect to
  85. * @param aClient a structure with all MQTT data needed
  86. * @param int ssl
  87. * @param int MQTTVersion the MQTT version to connect with (3 or 4)
  88. * @return return code
  89. */
  90. #if defined(OPENSSL)
  91. int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int ssl, int websocket, int MQTTVersion,
  92. MQTTProperties* connectProperties, MQTTProperties* willProperties)
  93. #else
  94. int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int websocket, int MQTTVersion,
  95. MQTTProperties* connectProperties, MQTTProperties* willProperties)
  96. #endif
  97. {
  98. int rc, port;
  99. size_t addr_len;
  100. FUNC_ENTRY;
  101. aClient->good = 1;
  102. addr_len = MQTTProtocol_addressPort(ip_address, &port, NULL);
  103. rc = Socket_new(ip_address, addr_len, port, &(aClient->net.socket));
  104. if (rc == EINPROGRESS || rc == EWOULDBLOCK)
  105. aClient->connect_state = TCP_IN_PROGRESS; /* TCP connect called - wait for connect completion */
  106. else if (rc == 0)
  107. { /* TCP connect completed. If SSL, send SSL connect */
  108. #if defined(OPENSSL)
  109. if (ssl)
  110. {
  111. if (SSLSocket_setSocketForSSL(&aClient->net, aClient->sslopts, ip_address, addr_len) == 1)
  112. {
  113. rc = aClient->sslopts->struct_version >= 3 ?
  114. SSLSocket_connect(aClient->net.ssl, aClient->net.socket, ip_address,
  115. aClient->sslopts->verify, aClient->sslopts->ssl_error_cb, aClient->sslopts->ssl_error_context) :
  116. SSLSocket_connect(aClient->net.ssl, aClient->net.socket, ip_address,
  117. aClient->sslopts->verify, NULL, NULL);
  119. aClient->connect_state = SSL_IN_PROGRESS; /* SSL connect called - wait for completion */
  120. }
  121. else
  122. rc = SOCKET_ERROR;
  123. }
  124. #endif
  125. if ( websocket )
  126. {
  127. rc = WebSocket_connect( &aClient->net, ip_address );
  128. if ( rc == TCPSOCKET_INTERRUPTED )
  129. aClient->connect_state = WEBSOCKET_IN_PROGRESS; /* Websocket connect called - wait for completion */
  130. }
  131. if (rc == 0)
  132. {
  133. /* Now send the MQTT connect packet */
  134. if ((rc = MQTTPacket_send_connect(aClient, MQTTVersion, connectProperties, willProperties)) == 0)
  135. aClient->connect_state = WAIT_FOR_CONNACK; /* MQTT Connect sent - wait for CONNACK */
  136. else
  137. aClient->connect_state = NOT_IN_PROGRESS;
  138. }
  139. }
  140. FUNC_EXIT_RC(rc);
  141. return rc;
  142. }
  143. /**
  144. * Process an incoming pingresp packet for a socket
  145. * @param pack pointer to the publish packet
  146. * @param sock the socket on which the packet was received
  147. * @return completion code
  148. */
  149. int MQTTProtocol_handlePingresps(void* pack, int sock)
  150. {
  151. Clients* client = NULL;
  152. int rc = TCPSOCKET_COMPLETE;
  153. FUNC_ENTRY;
  154. client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
  155. Log(LOG_PROTOCOL, 21, NULL, sock, client->clientID);
  156. client->ping_outstanding = 0;
  157. FUNC_EXIT_RC(rc);
  158. return rc;
  159. }
  160. /**
  161. * MQTT outgoing subscribe processing for a client
  162. * @param client the client structure
  163. * @param topics list of topics
  164. * @param qoss corresponding list of QoSs
  165. * @param opts MQTT 5.0 subscribe options
  166. * @param props MQTT 5.0 subscribe properties
  167. * @return completion code
  168. */
  169. int MQTTProtocol_subscribe(Clients* client, List* topics, List* qoss, int msgID,
  170. MQTTSubscribe_options* opts, MQTTProperties* props)
  171. {
  172. int rc = 0;
  173. FUNC_ENTRY;
  174. rc = MQTTPacket_send_subscribe(topics, qoss, opts, props, msgID, 0, client);
  175. FUNC_EXIT_RC(rc);
  176. return rc;
  177. }
  178. /**
  179. * Process an incoming suback packet for a socket
  180. * @param pack pointer to the publish packet
  181. * @param sock the socket on which the packet was received
  182. * @return completion code
  183. */
  184. int MQTTProtocol_handleSubacks(void* pack, int sock)
  185. {
  186. Suback* suback = (Suback*)pack;
  187. Clients* client = NULL;
  188. int rc = TCPSOCKET_COMPLETE;
  189. FUNC_ENTRY;
  190. client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
  191. Log(LOG_PROTOCOL, 23, NULL, sock, client->clientID, suback->msgId);
  192. MQTTPacket_freeSuback(suback);
  193. FUNC_EXIT_RC(rc);
  194. return rc;
  195. }
  196. /**
  197. * MQTT outgoing unsubscribe processing for a client
  198. * @param client the client structure
  199. * @param topics list of topics
  200. * @return completion code
  201. */
  202. int MQTTProtocol_unsubscribe(Clients* client, List* topics, int msgID, MQTTProperties* props)
  203. {
  204. int rc = 0;
  205. FUNC_ENTRY;
  206. rc = MQTTPacket_send_unsubscribe(topics, props, msgID, 0, client);
  207. FUNC_EXIT_RC(rc);
  208. return rc;
  209. }
  210. /**
  211. * Process an incoming unsuback packet for a socket
  212. * @param pack pointer to the publish packet
  213. * @param sock the socket on which the packet was received
  214. * @return completion code
  215. */
  216. int MQTTProtocol_handleUnsubacks(void* pack, int sock)
  217. {
  218. Unsuback* unsuback = (Unsuback*)pack;
  219. Clients* client = NULL;
  220. int rc = TCPSOCKET_COMPLETE;
  221. FUNC_ENTRY;
  222. client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
  223. Log(LOG_PROTOCOL, 24, NULL, sock, client->clientID, unsuback->msgId);
  224. MQTTPacket_freeUnsuback(unsuback);
  225. FUNC_EXIT_RC(rc);
  226. return rc;
  227. }