DataMqttClient.cpp 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. #include "DataMqttClient.h"
  2. #include <iostream>
  3. #include <stdio.h>
  4. MQTTAsync_responseOptions CDataMqttSensor::opts = MQTTAsync_responseOptions_initializer;
  5. MQTTAsync CDataMqttSensor::m_mqttClient;
  6. bool CDataMqttSensor::_run = false;
  7. char** CDataMqttSensor::subTopic = NULL;
  8. std::string CDataMqttSensor::_Esn;
  9. CDataMqttSensor::CDataMqttSensor(CMessageQueue* q) :_message(q)
  10. {
  11. }
  12. void CDataMqttSensor::SetSensorMQTT(MQTTAsync mqClient, std::string Esn)
  13. {
  14. m_mqttClient = mqClient;
  15. _Esn = Esn;
  16. }
  17. void CDataMqttSensor::onConnectCallCBack(void* context, char* cause) //���ӳɹ�callback
  18. {
  19. if(m_mqttClient != NULL)
  20. {
  21. std::cout << "Connection successful" << std::endl;
  22. int rc;
  23. const int qos[1] = {0};
  24. const int ROWS = 1;
  25. const int COLUMNS = 64;
  26. subTopic = new char* [ROWS];
  27. for (int row = 0; row < ROWS; row++)
  28. {
  29. subTopic[row] = new char[COLUMNS];
  30. }
  31. memcpy((char*)*(subTopic + 0),"Vehicle/ControlVehicle/Veh001",strlen("Vehicle/ControlVehicle/Veh001"));
  32. opts = MQTTAsync_responseOptions_initializer;
  33. //char* const subTopic[2] = { "hello" ,"hello1" }; //��������
  34. opts.context = m_mqttClient;
  35. if ((rc = MQTTAsync_subscribeMany(m_mqttClient, 1 , subTopic, qos, &opts)) != MQTTASYNC_SUCCESS) //subTopic
  36. {
  37. std::cout << "MQTTAsync_subscribe() fail, error code: " << rc << std::endl;
  38. }
  39. _run = true;
  40. }
  41. //sendMessage("11111",1, "hello");
  42. }
  43. void CDataMqttSensor::onConnectFailure(void* context, MQTTAsync_failureData* response) //����ʧ��callback
  44. {
  45. _run = false;
  46. printf("connection fail, error code: %d, error message:%s\n", response ? response->code : 0, response ? response->message : 0);
  47. }
  48. void CDataMqttSensor::Disconnect(void* context, char* cause) //���ӶϿ�callback
  49. {
  50. _run = false;
  51. std::cout << "disconnect, cause: " << cause << std::endl;
  52. }
  53. void CDataMqttSensor::sendMessage(char* data, int qos , const char * pubTopic)
  54. {
  55. if (_run && (m_mqttClient != NULL))
  56. {
  57. int rc;
  58. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  59. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  60. opts.context = m_mqttClient;
  61. pubmsg.payload = data;
  62. pubmsg.payloadlen = strlen(data);
  63. pubmsg.qos = qos;
  64. //std::cout << "mqtt send: " << pubmsg.payload << std::endl;
  65. if ((rc = MQTTAsync_sendMessage(m_mqttClient, pubTopic, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
  66. {
  67. std::cout << "MQTTAsync_sendMessage() fail, error code: " << rc << std::endl;
  68. }
  69. }
  70. }
  71. int CDataMqttSensor::RecevieMessage(void* context, char* pubTopic, int topicLen, MQTTAsync_message* message)
  72. {
  73. if (_run)
  74. {
  75. char buf[message->payloadlen + 1];
  76. memcpy(buf, message->payload, message->payloadlen);
  77. //printf("mqtt recv: %s\n", buf);
  78. //std::cout << "mqtt recv: " << buf << std::endl;
  79. MQTTAsync_freeMessage(&message);
  80. MQTTAsync_free(pubTopic);
  81. }
  82. return 1;
  83. }
  84. void CDataMqttSensor::Stop(const char* _Esn)
  85. {
  86. MQTTAsync_disconnectOptions optDis = MQTTAsync_disconnectOptions_initializer;
  87. if(_run)
  88. {
  89. MQTTAsync_unsubscribeMany(m_mqttClient, 1, subTopic, &opts);
  90. MQTTAsync_disconnect(&m_mqttClient, &optDis);
  91. const int ROWS = 1;
  92. for (int row = 0; row < ROWS; row++)
  93. {
  94. delete [] subTopic[row];
  95. }
  96. delete [] subTopic;
  97. }
  98. _run = false;
  99. m_mqttClient = NULL;
  100. MQTTAsync_destroy(&m_mqttClient);
  101. }