123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410 |
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <unistd.h>
- #include <stdint.h>
- #include "./mqtt/MQTTAsync.h"
- #include "pxTools.h"
- #include "pxJson.h"
- #include "canTransmit.h"
- #include "JMremote.h"
- #include "JMcan.h"
- #include "JMPlc.h"
- #include "JMrtk.h"
- #include "JMmqtt.h"
- extern CONFIG_S config;
- extern POSTDATABUF_S postData;
- static MQTTAsync client;
- // static MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
- static char *cockpitID=NULL;
- static int rtkmqtttimeout=0;
- void onConnect(void* context, MQTTAsync_successData* response);
- void MQTT_connlost(void *context, char *cause)
- {
- pxLog(WARNING,"[MQTT] connect lost!");
- //进入人工驾驶模式
- postData.status.system=MANUALCONTROL;
- PlcSetMode(MANUALCONTROL);
- }
- int startRemoteControl(cJSON *rootJson)
- {
- if(!CanSettoCanCantrol())
- {
- return;
- }
- CAN_setSendBuf(CLEARCANSENDBUF,NULL);
- if(postData.status.system==REMOTECONTROL)
- {
- if(cockpitID)
- {
- char msg[128]={0};
- sprintf(msg,"\"cockpitID\":\"%s\"",cockpitID);
- free(cockpitID);
- cockpitID=NULL;
- char topic[100]={0};
- sprintf(topic,"%s/data/stop",config.MqttClientID);
- MQTT_publish(topic,msg);
- }
- Remote_stop();
- }
- char *cockpitIP=JsonGetStr(rootJson,"cockpitIP");
- char *ID=JsonGetStr(rootJson,"cockpitID");
- if(cockpitIP==NULL || ID == NULL)
- {
- pxLog(WARNING,"[MQTT] Json msg has no \"cockpit\"");
- return -1;
- }
- cockpitID=strdup(ID);
- if(cockpitID==NULL)
- {
- pxLog(WARNING,"[MQTT] strdup failed");
- return -3;
- }
- pxLog(EVENT,"[MQTT] start remote control");
- if(Remote_listen(cockpitIP)==0)
- {
- postData.status.system=REMOTECONTROL;
- PlcSetMode(REMOTECONTROL);
- return 0;
- }
- else
- return -2;
- }
- void startAutoControl()
- {
- if(!CanSettoCanCantrol())
- {
- return;
- }
- if(postData.status.system==REMOTECONTROL)
- {
- if(cockpitID)
- {
- char msg[128]={0};
- sprintf(msg,"\"cockpitID\":\"%s\"",cockpitID);
- free(cockpitID);
- cockpitID=NULL;
- char topic[100]={0};
- sprintf(topic,"%s/data/stop",config.MqttClientID);
- MQTT_publish(topic,msg);
- }
- Remote_stop();
- }
- pxLog(EVENT,"[MQTT] start auto control");
- CAN_setSendBuf(CLEARCANSENDBUF,NULL);
- postData.status.system=AUTOCONTROL;
-
- PlcSetMode(AUTOCONTROL);
- }
- void startManualControl()
- {
- if(postData.status.system==REMOTECONTROL)
- {
- if(cockpitID)
- {
- char msg[128]={0};
- sprintf(msg,"\"cockpitID\":\"%s\"",cockpitID);
- free(cockpitID);
- cockpitID=NULL;
- char topic[100]={0};
- sprintf(topic,"%s/data/stop",config.MqttClientID);
- MQTT_publish(topic,msg);
- }
- Remote_stop();
- }
- pxLog(EVENT,"[MQTT] start manual control");
- CAN_setSendBuf(CLEARCANSENDBUF,NULL);
- postData.status.system=MANUALCONTROL;
- PlcSetMode(MANUALCONTROL);
- }
- void MQTT_publish(char *topic,char *message)
- {
- MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
- MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
- char buf[2048]={0};
- // struct timeval tv;
- // gettimeofday(&tv,NULL);
- // uint64_t milliseconds = (uint64_t)(tv.tv_sec) * 1000 + (uint64_t)(tv.tv_usec) / 1000 ;
- u64 milliseconds=get_timeStamp();
- sprintf(buf,"{%s,\"time\":%llu}",message,milliseconds);
- pubmsg.payload = buf;
- pubmsg.payloadlen = (int)strlen(buf);
- pubmsg.qos = 1;
- MQTTAsync_sendMessage(client, topic, &pubmsg, &opts);
- }
- void MQTT_publishBytes(char *topic,char *message,int len)
- {
- MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
- MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
- pubmsg.payload = message;
- pubmsg.payloadlen = len;
- pubmsg.qos = 1;
- MQTTAsync_sendMessage(client, topic, &pubmsg, &opts);
- }
- void MQTT_disConnectRemoteControl()
- {
- //carID,
- char data[512]={0};
- sprintf(data,"\"carID\":\"%s\"",config.MqttClientID);
- MQTT_publish(config.ManualControlTopic,data);
- pxLog(WARNING,"[MQTT] quit remote control by car %s",data);
- }
- int MQTT_msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
- {
- pxLog(DEBUG,"[MQTT] Message arrived");
- pxLog(DEBUG,"[MQTT] \ttopic: %s", topicName);
- pxLog(PACKGE,"[MQTT] \tmessage: %.*s", message->payloadlen, (char*)message->payload);
- //处理mqtt消息
- //判断是否是rtk消息
- if(!strncmp(topicName,RTKTOPIC,strlen(RTKTOPIC)))
- {
- rtksend((char*)message->payload,message->payloadlen);
- pxLog(DEBUG,"[MQTT] RTK stat 1");
- postData.status.rtkmqtt=1;
- rtkmqtttimeout=0;
- goto freeMessage;
- }
- //判断是否是该车
- cJSON *rootJson=cJSON_Parse((char*)message->payload);
- if(!rootJson)
- {
- pxLog(WARNING,"[MQTT] can`t parse msg");
- goto freeMessage;
- }
- char *carID=JsonGetStr(rootJson,"carID");
- if(carID==NULL)
- {
- pxLog(WARNING,"[MQTT] Json msg has no \"carID\"");
- goto freeJson;
- }
- if(strcmp(carID,config.MqttClientID))
- {
- goto freeJson;
- }
-
- if(!strcmp(config.RemoteControlTopic,topicName))
- {
- int ret=0;
- char *carIP=JsonGetStr(rootJson,"carIP");
- if(carIP)
- ret=startRemoteControl(rootJson);
- char topic[100]={0};
- sprintf(topic,"%s/data/listen",config.MqttClientID);
- if(!carIP)
- {
- pxLog(WARNING,"[MQTT] Json msg has no \"carIP\"");
- cJSON_AddItemToObject(rootJson,"success",cJSON_CreateFalse());
- cJSON_AddItemToObject(rootJson,"msg",cJSON_CreateString("need carIP"));
- }
- else if(ret==0)
- {
- cJSON_AddItemToObject(rootJson,"success",cJSON_CreateTrue());
- }
- else if(ret==-1)
- {
- cJSON_AddItemToObject(rootJson,"success",cJSON_CreateFalse());
- cJSON_AddItemToObject(rootJson,"msg",cJSON_CreateString("need cockpit"));
- }
- else if(ret==-2)
- {
- cJSON_AddItemToObject(rootJson,"success",cJSON_CreateFalse());
- cJSON_AddItemToObject(rootJson,"msg",cJSON_CreateString("listen failed"));
- }
- else
- {
- cJSON_AddItemToObject(rootJson,"success",cJSON_CreateFalse());
- cJSON_AddItemToObject(rootJson,"msg",cJSON_CreateString("mark cockpitID failed"));
- }
- char *msg=cJSON_PrintUnformatted(rootJson);
- if(msg)
- {
- msg[strlen(msg)-1]=0;
- MQTT_publish(topic,msg+1);
- free(msg);
- }
- else
- {
- pxLog(WARNING,"[MQTT] cJSON_PrintUnformatted failed");
- }
- goto freeJson;
- }
-
- if(!strcmp(config.AutoControlTopic,topicName))
- {
- startAutoControl();
- goto freeJson;
- }
-
- if(!strcmp(config.ManualControlTopic,topicName))
- {
- startManualControl();
- goto freeJson;
- }
- freeJson:
- cJSON_Delete(rootJson);
- freeMessage:
- MQTTAsync_freeMessage(&message);
- MQTTAsync_free(topicName);
- return 1;
- }
- void onSubscribe(void* context, MQTTAsync_successData* response)
- {
- pxLog(INIT,"[MQTT] Subscribe sucess");
- postData.status.mqtt=1;
- }
- void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
- {
- pxLog(ERROR,"[MQTT] Subscribe failed[%d]%s,retry", response->code,response->message);
- sleep(10);
- onConnect(NULL,NULL);
- }
- void onConnectCallCBack(void *context, char *cause)
- {
- int topicCount=0;
- char *topicList[128]={0};
- int topicQos[128]={1};
- pxLog(INIT,"[MQTT] reconnect sucess");
-
- topicList[topicCount++]=config.RemoteControlTopic;
- topicList[topicCount++]=config.AutoControlTopic;
- topicList[topicCount++]=config.ManualControlTopic;
- topicList[topicCount++]=RTKTOPIC;
-
- pxLog(DEBUG,"[MQTT] subscribe %d topics",topicCount);
- int rc=0;
- MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
- opts.onSuccess = onSubscribe;
- opts.onFailure = onSubscribeFailure;
- opts.context = client;
- while((rc=MQTTAsync_subscribeMany(client,topicCount,topicList,topicQos,&opts))!= MQTTASYNC_SUCCESS)
- {
- pxLog(ERROR,"[MQTT] subscribe failed%d,retry in 30 sec.",rc);
- // MQTTAsync_disconnect(client, &disc_opts);
- // MQTTAsync_destroy(&client);
- sleep(30);
- }
- }
- void onConnect(void* context, MQTTAsync_successData* response)
- {
- int topicCount=0;
- char *topicList[128]={0};
- int topicQos[128]={1};
- pxLog(INIT,"[MQTT] connect sucess");
-
- topicList[topicCount++]=config.RemoteControlTopic;
- topicList[topicCount++]=config.AutoControlTopic;
- topicList[topicCount++]=config.ManualControlTopic;
- topicList[topicCount++]=RTKTOPIC;
-
- pxLog(DEBUG,"[MQTT] subscribe %d topics",topicCount);
- int rc=0;
- MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
- opts.onSuccess = onSubscribe;
- opts.onFailure = onSubscribeFailure;
- opts.context = client;
- while((rc=MQTTAsync_subscribeMany(client,topicCount,topicList,topicQos,&opts))!= MQTTASYNC_SUCCESS)
- {
- pxLog(ERROR,"[MQTT] subscribe failed%d,retry in 30 sec.",rc);
- // MQTTAsync_disconnect(client, &disc_opts);
- // MQTTAsync_destroy(&client);
- sleep(30);
- }
- }
- void onConnectFailure(void* context, MQTTAsync_failureData* response)
- {
- pxLog(ERROR,"[MQTT] connect failed:[%d]%s,retry in few sec.",response->code,response->message);
- }
- int MQTT_connect()
- {
- MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
- int rc;
- pxLog(INIT,"[MQTT] ready to connect.");
- postData.status.mqtt=0;
- char *URI=NULL;
- asprintf(&URI,"mqtt://%s:%d",config.MqttBrockerAddress,config.MqttBrockerPort);
- if(!URI)
- {
- pxLog(ERROR,"[MQTT] MQTT malloc URI failed,exit.");
- return -1;
- }
- if((rc=MQTTAsync_create(&client, URI, config.MqttClientID, MQTTCLIENT_PERSISTENCE_NONE, NULL))
- != MQTTASYNC_SUCCESS)
- {
- pxLog(ERROR,"[MQTT] MQTT create failed:%d,exit.",rc);
- return -1;
- }
- pxLog(INFO,"[MQTT] MQTT [%s]:%d connect %s.",config.MqttClientID,client,URI);
- free(URI);
- if((rc=MQTTAsync_setCallbacks(client, client, MQTT_connlost, MQTT_msgarrvd, NULL))
- != MQTTASYNC_SUCCESS)
- {
- pxLog(ERROR,"[MQTT] MQTT set callbacks failed%d,exit.",rc);
- MQTTAsync_destroy(&client);
- return -1;
- }
- //设置连接成功回调函数:连接重新建立时会被调用,可以设置状态,继续通信等处理操作
- if ((rc = MQTTAsync_setConnected(client, NULL, onConnectCallCBack)) != MQTTASYNC_SUCCESS)
- {
- pxLog(ERROR,"[MQTT]Failed to set callback, return code %d", rc);
- MQTTAsync_destroy(&client);
- return -1;
- }
- conn_opts.automaticReconnect = 1;
- conn_opts.minRetryInterval = 3;
- conn_opts.maxRetryInterval = 60;
- conn_opts.keepAliveInterval = 20;
- conn_opts.cleansession = 1;
- // conn_opts.onSuccess = onConnect;
- conn_opts.onFailure = onConnectFailure;
- conn_opts.context = client;
- conn_opts.connectTimeout=5;
- conn_opts.username=config.MqttUserName;
- conn_opts.password=config.MqttPassWord;
- if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
- {
- pxLog(ERROR,"[MQTT] MQTT try to connect failed%d.",rc);
- MQTTAsync_destroy(&client);
- //MQTT_clientInit=0;
- return -1;
- }
- return 0;
- }
- void mqttTimer(PXTIMER_S *timer)
- {
- rtkmqtttimeout++;
- if(rtkmqtttimeout>=3)
- {
- rtkmqtttimeout=3;
- pxLog(DEBUG,"[MQTT] RTK stat 0");
- postData.status.rtkmqtt=0;
- }
- }
|