DataMqttClient.cpp 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. #include "DataMqttClient.h"
  2. #include <iostream>
  3. #include <stdio.h>
  4. MQTTAsync_responseOptions CDataMqttSensor::opts = MQTTAsync_responseOptions_initializer;
  5. MQTTAsync CDataMqttSensor::m_mqttClient;
  6. bool CDataMqttSensor::_run = false;
  7. char** CDataMqttSensor::subTopic = NULL;
  8. std::string CDataMqttSensor::_Esn;
  9. CDataMqttSensor::CDataMqttSensor(CMessageQueue* q) :_message(q)
  10. {
  11. }
  12. void CDataMqttSensor::SetSensorMQTT(MQTTAsync mqClient, std::string Esn)
  13. {
  14. m_mqttClient = mqClient;
  15. _Esn = Esn;
  16. }
  17. void CDataMqttSensor::onConnectCallCBack(void* context, char* cause) //���ӳɹ�callback
  18. {
  19. if(m_mqttClient != NULL)
  20. {
  21. std::cout << "Connection successful" << std::endl;
  22. int rc;
  23. const int qos[1] = {0};
  24. const int ROWS = 1;
  25. const int COLUMNS = 64;
  26. subTopic = new char* [ROWS];
  27. for (int row = 0; row < ROWS; row++)
  28. {
  29. subTopic[row] = new char[COLUMNS];
  30. }
  31. //sprintf((char*)*(subTopic + 0), "v2x/v1/grasp/%s/info/up/ack", (char*)context);
  32. memcpy((char*)*(subTopic + 0),"bg/log",strlen("bg/log"));
  33. opts = MQTTAsync_responseOptions_initializer;
  34. //char* const subTopic[2] = { "hello" ,"hello1" }; //��������
  35. opts.context = m_mqttClient;
  36. if ((rc = MQTTAsync_subscribeMany(m_mqttClient, 1 , subTopic, qos, &opts)) != MQTTASYNC_SUCCESS) //subTopic
  37. {
  38. std::cout << "MQTTAsync_subscribe() fail, error code: " << rc << std::endl;
  39. }
  40. _run = true;
  41. }
  42. //sendMessage("11111",1, "hello");
  43. }
  44. void CDataMqttSensor::onConnectFailure(void* context, MQTTAsync_failureData* response) //����ʧ��callback
  45. {
  46. _run = false;
  47. printf("connection fail, error code: %d, error message:%s\n", response ? response->code : 0, response ? response->message : 0);
  48. }
  49. void CDataMqttSensor::Disconnect(void* context, char* cause) //���ӶϿ�callback
  50. {
  51. _run = false;
  52. std::cout << "disconnect, cause: " << cause << std::endl;
  53. }
  54. void CDataMqttSensor::sendMessage(char* data, int qos , const char * pubTopic)
  55. {
  56. if (_run && (m_mqttClient != NULL))
  57. {
  58. int rc;
  59. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  60. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  61. opts.context = m_mqttClient;
  62. pubmsg.payload = data;
  63. pubmsg.payloadlen = strlen(data);
  64. pubmsg.qos = qos;
  65. //std::cout << "mqtt send: " << pubmsg.payload << std::endl;
  66. if ((rc = MQTTAsync_sendMessage(m_mqttClient, pubTopic, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
  67. {
  68. std::cout << "MQTTAsync_sendMessage() fail, error code: " << rc << std::endl;
  69. }
  70. }
  71. }
  72. int CDataMqttSensor::RecevieMessage(void* context, char* pubTopic, int topicLen, MQTTAsync_message* message)
  73. {
  74. if (_run)
  75. {
  76. char buf[message->payloadlen + 1];
  77. memcpy(buf, message->payload, message->payloadlen);
  78. //printf("mqtt recv: %s\n", buf);
  79. //std::cout << "mqtt recv: " << buf << std::endl;
  80. MQTTAsync_freeMessage(&message);
  81. MQTTAsync_free(pubTopic);
  82. }
  83. return 1;
  84. }
  85. void CDataMqttSensor::Stop(const char* _Esn)
  86. {
  87. MQTTAsync_disconnectOptions optDis = MQTTAsync_disconnectOptions_initializer;
  88. if(_run)
  89. {
  90. MQTTAsync_unsubscribeMany(m_mqttClient, 1, subTopic, &opts);
  91. MQTTAsync_disconnect(&m_mqttClient, &optDis);
  92. const int ROWS = 1;
  93. for (int row = 0; row < ROWS; row++)
  94. {
  95. delete [] subTopic[row];
  96. }
  97. delete [] subTopic;
  98. }
  99. _run = false;
  100. m_mqttClient = NULL;
  101. MQTTAsync_destroy(&m_mqttClient);
  102. }