#include "DataMqttClient.h" #include #include MQTTAsync_responseOptions CDataMqttSensor::opts = MQTTAsync_responseOptions_initializer; MQTTAsync CDataMqttSensor::m_mqttClient; bool CDataMqttSensor::_run = false; char** CDataMqttSensor::subTopic = NULL; std::string CDataMqttSensor::_Esn; CDataMqttSensor::CDataMqttSensor(CMessageQueue* q) :_message(q) { } void CDataMqttSensor::SetSensorMQTT(MQTTAsync mqClient, std::string Esn) { m_mqttClient = mqClient; _Esn = Esn; } void CDataMqttSensor::onConnectCallCBack(void* context, char* cause) //���ӳɹ�callback { if(m_mqttClient != NULL) { std::cout << "Connection successful" << std::endl; int rc; const int qos[1] = {0}; const int ROWS = 1; const int COLUMNS = 64; subTopic = new char* [ROWS]; for (int row = 0; row < ROWS; row++) { subTopic[row] = new char[COLUMNS]; } memcpy((char*)*(subTopic + 0),"Vehicle/ControlVehicle/Veh001",strlen("Vehicle/ControlVehicle/Veh001")); opts = MQTTAsync_responseOptions_initializer; //char* const subTopic[2] = { "hello" ,"hello1" }; //�������� opts.context = m_mqttClient; if ((rc = MQTTAsync_subscribeMany(m_mqttClient, 1 , subTopic, qos, &opts)) != MQTTASYNC_SUCCESS) //subTopic { std::cout << "MQTTAsync_subscribe() fail, error code: " << rc << std::endl; } _run = true; } //sendMessage("11111",1, "hello"); } void CDataMqttSensor::onConnectFailure(void* context, MQTTAsync_failureData* response) //����ʧ��callback { _run = false; printf("connection fail, error code: %d, error message:%s\n", response ? response->code : 0, response ? response->message : 0); } void CDataMqttSensor::Disconnect(void* context, char* cause) //���ӶϿ�callback { _run = false; std::cout << "disconnect, cause: " << cause << std::endl; } void CDataMqttSensor::sendMessage(char* data, int qos , const char * pubTopic) { if (_run && (m_mqttClient != NULL)) { int rc; MQTTAsync_message pubmsg = MQTTAsync_message_initializer; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; opts.context = m_mqttClient; pubmsg.payload = data; pubmsg.payloadlen = strlen(data); pubmsg.qos = qos; //std::cout << "mqtt send: " << pubmsg.payload << std::endl; if ((rc = MQTTAsync_sendMessage(m_mqttClient, pubTopic, &pubmsg, &opts)) != MQTTASYNC_SUCCESS) { std::cout << "MQTTAsync_sendMessage() fail, error code: " << rc << std::endl; } } } int CDataMqttSensor::RecevieMessage(void* context, char* pubTopic, int topicLen, MQTTAsync_message* message) { if (_run) { char buf[message->payloadlen + 1]; memcpy(buf, message->payload, message->payloadlen); //printf("mqtt recv: %s\n", buf); //std::cout << "mqtt recv: " << buf << std::endl; MQTTAsync_freeMessage(&message); MQTTAsync_free(pubTopic); } return 1; } void CDataMqttSensor::Stop(const char* _Esn) { MQTTAsync_disconnectOptions optDis = MQTTAsync_disconnectOptions_initializer; if(_run) { MQTTAsync_unsubscribeMany(m_mqttClient, 1, subTopic, &opts); MQTTAsync_disconnect(&m_mqttClient, &optDis); const int ROWS = 1; for (int row = 0; row < ROWS; row++) { delete [] subTopic[row]; } delete [] subTopic; } _run = false; m_mqttClient = NULL; MQTTAsync_destroy(&m_mqttClient); }