#include #include #include #include #include #include "./mqtt/MQTTAsync.h" #include "pxTools.h" #include "pxJson.h" #include "canTransmit.h" #include "JMremote.h" #include "JMcan.h" #include "JMPlc.h" #include "JMrtk.h" #include "JMmqtt.h" extern CONFIG_S config; extern POSTDATABUF_S postData; static MQTTAsync client; // static MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; static char *cockpitID=NULL; static int rtkmqtttimeout=0; void onConnect(void* context, MQTTAsync_successData* response); void MQTT_connlost(void *context, char *cause) { pxLog(WARNING,"[MQTT] connect lost!"); //进入人工驾驶模式 postData.status.system=MANUALCONTROL; PlcSetMode(MANUALCONTROL); } int startRemoteControl(cJSON *rootJson) { if(!CanSettoCanCantrol()) { return; } CAN_setSendBuf(CLEARCANSENDBUF,NULL); if(postData.status.system==REMOTECONTROL) { if(cockpitID) { char msg[128]={0}; sprintf(msg,"\"cockpitID\":\"%s\"",cockpitID); free(cockpitID); cockpitID=NULL; char topic[100]={0}; sprintf(topic,"%s/data/stop",config.MqttClientID); MQTT_publish(topic,msg); } Remote_stop(); } char *cockpitIP=JsonGetStr(rootJson,"cockpitIP"); char *ID=JsonGetStr(rootJson,"cockpitID"); if(cockpitIP==NULL || ID == NULL) { pxLog(WARNING,"[MQTT] Json msg has no \"cockpit\""); return -1; } cockpitID=strdup(ID); if(cockpitID==NULL) { pxLog(WARNING,"[MQTT] strdup failed"); return -3; } pxLog(EVENT,"[MQTT] start remote control"); if(Remote_listen(cockpitIP)==0) { postData.status.system=REMOTECONTROL; PlcSetMode(REMOTECONTROL); return 0; } else return -2; } void startAutoControl() { if(!CanSettoCanCantrol()) { return; } if(postData.status.system==REMOTECONTROL) { if(cockpitID) { char msg[128]={0}; sprintf(msg,"\"cockpitID\":\"%s\"",cockpitID); free(cockpitID); cockpitID=NULL; char topic[100]={0}; sprintf(topic,"%s/data/stop",config.MqttClientID); MQTT_publish(topic,msg); } Remote_stop(); } pxLog(EVENT,"[MQTT] start auto control"); CAN_setSendBuf(CLEARCANSENDBUF,NULL); postData.status.system=AUTOCONTROL; PlcSetMode(AUTOCONTROL); } void startManualControl() { if(postData.status.system==REMOTECONTROL) { if(cockpitID) { char msg[128]={0}; sprintf(msg,"\"cockpitID\":\"%s\"",cockpitID); free(cockpitID); cockpitID=NULL; char topic[100]={0}; sprintf(topic,"%s/data/stop",config.MqttClientID); MQTT_publish(topic,msg); } Remote_stop(); } pxLog(EVENT,"[MQTT] start manual control"); CAN_setSendBuf(CLEARCANSENDBUF,NULL); postData.status.system=MANUALCONTROL; PlcSetMode(MANUALCONTROL); } void MQTT_publish(char *topic,char *message) { MQTTAsync_message pubmsg = MQTTAsync_message_initializer; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; char buf[2048]={0}; // struct timeval tv; // gettimeofday(&tv,NULL); // uint64_t milliseconds = (uint64_t)(tv.tv_sec) * 1000 + (uint64_t)(tv.tv_usec) / 1000 ; u64 milliseconds=get_timeStamp(); sprintf(buf,"{%s,\"time\":%llu}",message,milliseconds); pubmsg.payload = buf; pubmsg.payloadlen = (int)strlen(buf); pubmsg.qos = 1; MQTTAsync_sendMessage(client, topic, &pubmsg, &opts); } void MQTT_publishBytes(char *topic,char *message,int len) { MQTTAsync_message pubmsg = MQTTAsync_message_initializer; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; pubmsg.payload = message; pubmsg.payloadlen = len; pubmsg.qos = 1; MQTTAsync_sendMessage(client, topic, &pubmsg, &opts); } void MQTT_disConnectRemoteControl() { //carID, char data[512]={0}; sprintf(data,"\"carID\":\"%s\"",config.MqttClientID); MQTT_publish(config.ManualControlTopic,data); pxLog(WARNING,"[MQTT] quit remote control by car %s",data); } int MQTT_msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message) { pxLog(DEBUG,"[MQTT] Message arrived"); pxLog(DEBUG,"[MQTT] \ttopic: %s", topicName); pxLog(PACKGE,"[MQTT] \tmessage: %.*s", message->payloadlen, (char*)message->payload); //处理mqtt消息 //判断是否是rtk消息 if(!strncmp(topicName,RTKTOPIC,strlen(RTKTOPIC))) { rtksend((char*)message->payload,message->payloadlen); pxLog(DEBUG,"[MQTT] RTK stat 1"); postData.status.rtkmqtt=1; rtkmqtttimeout=0; goto freeMessage; } //判断是否是该车 cJSON *rootJson=cJSON_Parse((char*)message->payload); if(!rootJson) { pxLog(WARNING,"[MQTT] can`t parse msg"); goto freeMessage; } char *carID=JsonGetStr(rootJson,"carID"); if(carID==NULL) { pxLog(WARNING,"[MQTT] Json msg has no \"carID\""); goto freeJson; } if(strcmp(carID,config.MqttClientID)) { goto freeJson; } if(!strcmp(config.RemoteControlTopic,topicName)) { int ret=0; char *carIP=JsonGetStr(rootJson,"carIP"); if(carIP) ret=startRemoteControl(rootJson); char topic[100]={0}; sprintf(topic,"%s/data/listen",config.MqttClientID); if(!carIP) { pxLog(WARNING,"[MQTT] Json msg has no \"carIP\""); cJSON_AddItemToObject(rootJson,"success",cJSON_CreateFalse()); cJSON_AddItemToObject(rootJson,"msg",cJSON_CreateString("need carIP")); } else if(ret==0) { cJSON_AddItemToObject(rootJson,"success",cJSON_CreateTrue()); } else if(ret==-1) { cJSON_AddItemToObject(rootJson,"success",cJSON_CreateFalse()); cJSON_AddItemToObject(rootJson,"msg",cJSON_CreateString("need cockpit")); } else if(ret==-2) { cJSON_AddItemToObject(rootJson,"success",cJSON_CreateFalse()); cJSON_AddItemToObject(rootJson,"msg",cJSON_CreateString("listen failed")); } else { cJSON_AddItemToObject(rootJson,"success",cJSON_CreateFalse()); cJSON_AddItemToObject(rootJson,"msg",cJSON_CreateString("mark cockpitID failed")); } char *msg=cJSON_PrintUnformatted(rootJson); if(msg) { msg[strlen(msg)-1]=0; MQTT_publish(topic,msg+1); free(msg); } else { pxLog(WARNING,"[MQTT] cJSON_PrintUnformatted failed"); } goto freeJson; } if(!strcmp(config.AutoControlTopic,topicName)) { startAutoControl(); goto freeJson; } if(!strcmp(config.ManualControlTopic,topicName)) { startManualControl(); goto freeJson; } freeJson: cJSON_Delete(rootJson); freeMessage: MQTTAsync_freeMessage(&message); MQTTAsync_free(topicName); return 1; } void onSubscribe(void* context, MQTTAsync_successData* response) { pxLog(INIT,"[MQTT] Subscribe sucess"); postData.status.mqtt=1; } void onSubscribeFailure(void* context, MQTTAsync_failureData* response) { pxLog(ERROR,"[MQTT] Subscribe failed[%d]%s,retry", response->code,response->message); sleep(10); onConnect(NULL,NULL); } void onConnectCallCBack(void *context, char *cause) { int topicCount=0; char *topicList[128]={0}; int topicQos[128]={1}; pxLog(INIT,"[MQTT] reconnect sucess"); topicList[topicCount++]=config.RemoteControlTopic; topicList[topicCount++]=config.AutoControlTopic; topicList[topicCount++]=config.ManualControlTopic; topicList[topicCount++]=RTKTOPIC; pxLog(DEBUG,"[MQTT] subscribe %d topics",topicCount); int rc=0; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; opts.onSuccess = onSubscribe; opts.onFailure = onSubscribeFailure; opts.context = client; while((rc=MQTTAsync_subscribeMany(client,topicCount,topicList,topicQos,&opts))!= MQTTASYNC_SUCCESS) { pxLog(ERROR,"[MQTT] subscribe failed%d,retry in 30 sec.",rc); // MQTTAsync_disconnect(client, &disc_opts); // MQTTAsync_destroy(&client); sleep(30); } } void onConnect(void* context, MQTTAsync_successData* response) { int topicCount=0; char *topicList[128]={0}; int topicQos[128]={1}; pxLog(INIT,"[MQTT] connect sucess"); topicList[topicCount++]=config.RemoteControlTopic; topicList[topicCount++]=config.AutoControlTopic; topicList[topicCount++]=config.ManualControlTopic; topicList[topicCount++]=RTKTOPIC; pxLog(DEBUG,"[MQTT] subscribe %d topics",topicCount); int rc=0; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; opts.onSuccess = onSubscribe; opts.onFailure = onSubscribeFailure; opts.context = client; while((rc=MQTTAsync_subscribeMany(client,topicCount,topicList,topicQos,&opts))!= MQTTASYNC_SUCCESS) { pxLog(ERROR,"[MQTT] subscribe failed%d,retry in 30 sec.",rc); // MQTTAsync_disconnect(client, &disc_opts); // MQTTAsync_destroy(&client); sleep(30); } } void onConnectFailure(void* context, MQTTAsync_failureData* response) { pxLog(ERROR,"[MQTT] connect failed:[%d]%s,retry in few sec.",response->code,response->message); } int MQTT_connect() { MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; int rc; pxLog(INIT,"[MQTT] ready to connect."); postData.status.mqtt=0; char *URI=NULL; asprintf(&URI,"mqtt://%s:%d",config.MqttBrockerAddress,config.MqttBrockerPort); if(!URI) { pxLog(ERROR,"[MQTT] MQTT malloc URI failed,exit."); return -1; } if((rc=MQTTAsync_create(&client, URI, config.MqttClientID, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTASYNC_SUCCESS) { pxLog(ERROR,"[MQTT] MQTT create failed:%d,exit.",rc); return -1; } pxLog(INFO,"[MQTT] MQTT [%s]:%d connect %s.",config.MqttClientID,client,URI); free(URI); if((rc=MQTTAsync_setCallbacks(client, client, MQTT_connlost, MQTT_msgarrvd, NULL)) != MQTTASYNC_SUCCESS) { pxLog(ERROR,"[MQTT] MQTT set callbacks failed%d,exit.",rc); MQTTAsync_destroy(&client); return -1; } //设置连接成功回调函数:连接重新建立时会被调用,可以设置状态,继续通信等处理操作 if ((rc = MQTTAsync_setConnected(client, NULL, onConnectCallCBack)) != MQTTASYNC_SUCCESS) { pxLog(ERROR,"[MQTT]Failed to set callback, return code %d", rc); MQTTAsync_destroy(&client); return -1; } conn_opts.automaticReconnect = 1; conn_opts.minRetryInterval = 3; conn_opts.maxRetryInterval = 60; conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; // conn_opts.onSuccess = onConnect; conn_opts.onFailure = onConnectFailure; conn_opts.context = client; conn_opts.connectTimeout=5; conn_opts.username=config.MqttUserName; conn_opts.password=config.MqttPassWord; if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { pxLog(ERROR,"[MQTT] MQTT try to connect failed%d.",rc); MQTTAsync_destroy(&client); //MQTT_clientInit=0; return -1; } return 0; } void mqttTimer(PXTIMER_S *timer) { rtkmqtttimeout++; if(rtkmqtttimeout>=3) { rtkmqtttimeout=3; pxLog(DEBUG,"[MQTT] RTK stat 0"); postData.status.rtkmqtt=0; } }