MqttClient.cpp 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. #include "MqttClient.h"
  2. #include <iostream>
  3. #include <stdio.h>
  4. MQTTAsync_responseOptions CMqttSensor::opts = MQTTAsync_responseOptions_initializer;
  5. MQTTAsync CMqttSensor::m_mqttClient;
  6. bool CMqttSensor::_run = false;
  7. char** CMqttSensor::subTopic = NULL;
  8. std::string CMqttSensor::_Esn;
  9. CMqttSensor::CMqttSensor(CMessageQueue* q) :_message(q)
  10. {
  11. }
  12. void CMqttSensor::SetSensorMQTT(MQTTAsync mqClient, std::string Esn)
  13. {
  14. m_mqttClient = mqClient;
  15. _Esn = Esn;
  16. }
  17. void CMqttSensor::onConnectCallCBack(void* context, char* cause) //���ӳɹ�callback
  18. {
  19. if(m_mqttClient != NULL)
  20. {
  21. std::cout << "Connection successful" << std::endl;
  22. int rc;
  23. const int qos[3] = { 0,1,2};
  24. const int ROWS = 3;
  25. const int COLUMNS = 64;
  26. subTopic = new char* [ROWS];
  27. for (int row = 0; row < ROWS; row++)
  28. {
  29. subTopic[row] = new char[COLUMNS];
  30. }
  31. sprintf((char*)*(subTopic + 0), "v2x/v1/grasp/%s/info/up/ack", (char*)context);
  32. sprintf((char*)*(subTopic + 1), "v2x/v1/grasp/%s/status/up/ack", (char*)context);
  33. sprintf((char*)*(subTopic + 2), "v2x/v1/grasp/%s/stop/down/", (char*)context);
  34. opts = MQTTAsync_responseOptions_initializer;
  35. //char* const subTopic[2] = { "hello" ,"hello1" }; //��������
  36. opts.context = m_mqttClient;
  37. if ((rc = MQTTAsync_subscribeMany(m_mqttClient, 3 , subTopic, qos, &opts)) != MQTTASYNC_SUCCESS) //subTopic
  38. {
  39. std::cout << "MQTTAsync_subscribe() fail, error code: " << rc << std::endl;
  40. }
  41. _run = true;
  42. }
  43. //sendMessage("11111",1, "hello");
  44. }
  45. void CMqttSensor::onConnectFailure(void* context, MQTTAsync_failureData* response) //����ʧ��callback
  46. {
  47. _run = false;
  48. printf("connection fail, error code: %d, error message:%s\n", response ? response->code : 0, response ? response->message : 0);
  49. }
  50. void CMqttSensor::Disconnect(void* context, char* cause) //���ӶϿ�callback
  51. {
  52. _run = false;
  53. std::cout << "disconnect, cause: " << cause << std::endl;
  54. }
  55. void CMqttSensor::sendMessage(char* data, int qos , const char * pubTopic)
  56. {
  57. if (_run && (m_mqttClient != NULL))
  58. {
  59. int rc;
  60. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  61. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  62. opts.context = m_mqttClient;
  63. pubmsg.payload = data;
  64. pubmsg.payloadlen = strlen(data);
  65. pubmsg.qos = qos;
  66. //std::cout << "mqtt send: " << pubmsg.payload << std::endl;
  67. if ((rc = MQTTAsync_sendMessage(m_mqttClient, pubTopic, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
  68. {
  69. std::cout << "MQTTAsync_sendMessage() fail, error code: " << rc << std::endl;
  70. }
  71. }
  72. }
  73. int CMqttSensor::RecevieMessage(void* context, char* pubTopic, int topicLen, MQTTAsync_message* message)
  74. {
  75. if (_run)
  76. {
  77. char buf[message->payloadlen + 1];
  78. memcpy(buf, message->payload, message->payloadlen);
  79. //printf("mqtt recv: %s\n", buf);
  80. //std::cout << "mqtt recv: " << buf << std::endl;
  81. MQTTAsync_freeMessage(&message);
  82. MQTTAsync_free(pubTopic);
  83. }
  84. return 1;
  85. }
  86. void CMqttSensor::Stop(const char* _Esn)
  87. {
  88. MQTTAsync_disconnectOptions optDis = MQTTAsync_disconnectOptions_initializer;
  89. if(_run)
  90. {
  91. MQTTAsync_unsubscribeMany(m_mqttClient, 3, subTopic, &opts);
  92. MQTTAsync_disconnect(&m_mqttClient, &optDis);
  93. const int ROWS = 3;
  94. for (int row = 0; row < ROWS; row++)
  95. {
  96. delete [] subTopic[row];
  97. }
  98. delete [] subTopic;
  99. }
  100. _run = false;
  101. m_mqttClient = NULL;
  102. MQTTAsync_destroy(&m_mqttClient);
  103. }