123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- #include "DataMqttClient.h"
- #include <iostream>
- #include <stdio.h>
- MQTTAsync_responseOptions CDataMqttSensor::opts = MQTTAsync_responseOptions_initializer;
- MQTTAsync CDataMqttSensor::m_mqttClient;
- bool CDataMqttSensor::_run = false;
- char** CDataMqttSensor::subTopic = NULL;
- std::string CDataMqttSensor::_Esn;
- CDataMqttSensor::CDataMqttSensor(CMessageQueue* q) :_message(q)
- {
- }
- void CDataMqttSensor::SetSensorMQTT(MQTTAsync mqClient, std::string Esn)
- {
- m_mqttClient = mqClient;
- _Esn = Esn;
- }
- void CDataMqttSensor::onConnectCallCBack(void* context, char* cause) //���ӳɹ�callback
- {
- if(m_mqttClient != NULL)
- {
-
- std::cout << "Connection successful" << std::endl;
- int rc;
- const int qos[1] = {0};
- const int ROWS = 1;
- const int COLUMNS = 64;
- subTopic = new char* [ROWS];
- for (int row = 0; row < ROWS; row++)
- {
- subTopic[row] = new char[COLUMNS];
- }
-
- //sprintf((char*)*(subTopic + 0), "v2x/v1/grasp/%s/info/up/ack", (char*)context);
- memcpy((char*)*(subTopic + 0),"bg/log",strlen("bg/log"));
-
- opts = MQTTAsync_responseOptions_initializer;
- //char* const subTopic[2] = { "hello" ,"hello1" }; //��������
- opts.context = m_mqttClient;
- if ((rc = MQTTAsync_subscribeMany(m_mqttClient, 1 , subTopic, qos, &opts)) != MQTTASYNC_SUCCESS) //subTopic
- {
- std::cout << "MQTTAsync_subscribe() fail, error code: " << rc << std::endl;
- }
- _run = true;
- }
-
- //sendMessage("11111",1, "hello");
- }
- void CDataMqttSensor::onConnectFailure(void* context, MQTTAsync_failureData* response) //����ʧ��callback
- {
- _run = false;
- printf("connection fail, error code: %d, error message:%s\n", response ? response->code : 0, response ? response->message : 0);
- }
- void CDataMqttSensor::Disconnect(void* context, char* cause) //���ӶϿ�callback
- {
- _run = false;
- std::cout << "disconnect, cause: " << cause << std::endl;
- }
- void CDataMqttSensor::sendMessage(char* data, int qos , const char * pubTopic)
- {
- if (_run && (m_mqttClient != NULL))
- {
- int rc;
- MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
- MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
- opts.context = m_mqttClient;
- pubmsg.payload = data;
- pubmsg.payloadlen = strlen(data);
- pubmsg.qos = qos;
- //std::cout << "mqtt send: " << pubmsg.payload << std::endl;
-
- if ((rc = MQTTAsync_sendMessage(m_mqttClient, pubTopic, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
- {
- std::cout << "MQTTAsync_sendMessage() fail, error code: " << rc << std::endl;
- }
- }
- }
- int CDataMqttSensor::RecevieMessage(void* context, char* pubTopic, int topicLen, MQTTAsync_message* message)
- {
- if (_run)
- {
- char buf[message->payloadlen + 1];
- memcpy(buf, message->payload, message->payloadlen);
- //printf("mqtt recv: %s\n", buf);
- //std::cout << "mqtt recv: " << buf << std::endl;
- MQTTAsync_freeMessage(&message);
- MQTTAsync_free(pubTopic);
- }
- return 1;
- }
- void CDataMqttSensor::Stop(const char* _Esn)
- {
-
- MQTTAsync_disconnectOptions optDis = MQTTAsync_disconnectOptions_initializer;
- if(_run)
- {
- MQTTAsync_unsubscribeMany(m_mqttClient, 1, subTopic, &opts);
- MQTTAsync_disconnect(&m_mqttClient, &optDis);
- const int ROWS = 1;
- for (int row = 0; row < ROWS; row++)
- {
- delete [] subTopic[row];
- }
- delete [] subTopic;
- }
-
- _run = false;
- m_mqttClient = NULL;
- MQTTAsync_destroy(&m_mqttClient);
- }
|