DataMqttStatus.cpp 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. #include "DataMqttStatus.h"
  2. #include <iostream>
  3. #include <stdio.h>
  4. MQTTAsync_responseOptions CDataMqttSensor::opts = MQTTAsync_responseOptions_initializer;
  5. MQTTAsync CDataMqttSensor::m_mqttClient;
  6. bool CDataMqttSensor::_run = false;
  7. char** CDataMqttSensor::subTopic = NULL;
  8. std::string CDataMqttSensor::_Esn;
  9. CDataMqttStatusSensor::CDataMqttSensor(CMessageQueue* q) :_message(q)
  10. {
  11. }
  12. void CDataMqttStatusSensor::SetSensorMQTT(MQTTAsync mqClient, std::string Esn)
  13. {
  14. m_mqttClient = mqClient;
  15. _Esn = Esn;
  16. }
  17. void CDataMqttStatusSensor::onConnectCallCBack(void* context, char* cause)
  18. {
  19. if(m_mqttClient != NULL)
  20. {
  21. std::cout << "Connection successful" << std::endl;
  22. int rc;
  23. const int qos[1] = {0};
  24. const int ROWS = 1;
  25. const int COLUMNS = 64;
  26. subTopic = new char* [ROWS];
  27. for (int row = 0; row < ROWS; row++)
  28. {
  29. subTopic[row] = new char[COLUMNS];
  30. }
  31. memcpy((char*)*(subTopic + 0),"Vehicle/CanBus/StateVeh001",strlen("Vehicle/CanBus/StateVeh001"));
  32. opts = MQTTAsync_responseOptions_initializer;
  33. opts.context = m_mqttClient;
  34. if ((rc = MQTTAsync_subscribeMany(m_mqttClient, 1 , subTopic, qos, &opts)) != MQTTASYNC_SUCCESS) //subTopic
  35. {
  36. std::cout << "MQTTAsync_subscribe() fail, error code: " << rc << std::endl;
  37. }
  38. _run = true;
  39. }
  40. }
  41. void CDataMqttStatusSensor::onConnectFailure(void* context, MQTTAsync_failureData* response)
  42. {
  43. _run = false;
  44. printf("connection fail, error code: %d, error message:%s\n", response ? response->code : 0, response ? response->message : 0);
  45. }
  46. void CDataMqttStatusSensor::Disconnect(void* context, char* cause)
  47. {
  48. _run = false;
  49. std::cout << "disconnect, cause: " << cause << std::endl;
  50. }
  51. void CDataMqttStatusSensor::sendMessage(char* data, int qos , const char * pubTopic)
  52. {
  53. if (_run && (m_mqttClient != NULL))
  54. {
  55. int rc;
  56. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  57. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  58. opts.context = m_mqttClient;
  59. pubmsg.payload = data;
  60. pubmsg.payloadlen = strlen(data);
  61. pubmsg.qos = qos;
  62. if ((rc = MQTTAsync_sendMessage(m_mqttClient, pubTopic, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
  63. {
  64. std::cout << "MQTTAsync_sendMessage() fail, error code: " << rc << std::endl;
  65. }
  66. }
  67. }
  68. int CDataMqttStatusSensor::RecevieMessage(void* context, char* pubTopic, int topicLen, MQTTAsync_message* message)
  69. {
  70. if (_run)
  71. {
  72. char buf[message->payloadlen + 1];
  73. memcpy(buf, message->payload, message->payloadlen);
  74. MQTTAsync_freeMessage(&message);
  75. MQTTAsync_free(pubTopic);
  76. }
  77. return 1;
  78. }
  79. void CDataMqttStatusSensor::Stop(const char* _Esn)
  80. {
  81. MQTTAsync_disconnectOptions optDis = MQTTAsync_disconnectOptions_initializer;
  82. if(_run)
  83. {
  84. MQTTAsync_unsubscribeMany(m_mqttClient, 1, subTopic, &opts);
  85. MQTTAsync_disconnect(&m_mqttClient, &optDis);
  86. const int ROWS = 1;
  87. for (int row = 0; row < ROWS; row++)
  88. {
  89. delete [] subTopic[row];
  90. }
  91. delete [] subTopic;
  92. }
  93. _run = false;
  94. m_mqttClient = NULL;
  95. MQTTAsync_destroy(&m_mqttClient);
  96. }