message_queue.cpp 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127
  1. #include <atomic>
  2. #include <string>
  3. #include "../common/comm.h"
  4. #include "api.h"
  5. #include "../common/iobuffer.h"
  6. #include "../common/sensor_socket.h"
  7. #include "../common/peer_connection.h"
  8. #include "VideoRenderer.h"
  9. #include "../thirdparty/jsoncpp/include/json/json.h"
  10. #include "protocol.pb.h"
  11. #include "imu_sensor.h"
  12. #include "Rtk.h"
  13. #include "message_queue.h"
  14. #include <fstream>
  15. #include <iostream>
  16. #include <cstring>
  17. #include "hmacsha256.h"
  18. #include <linux/usbdevice_fs.h>
  19. #include <sys/ioctl.h>
  20. #include <sys/statfs.h>
  21. #include <stdlib.h>
  22. #include <unistd.h>
  23. #include <sys/vfs.h>
  24. #include <dirent.h>
  25. #include <sys/stat.h>
  26. CMessageQueue::CMessageQueue():Head(nullptr),Tail(nullptr)
  27. {
  28. _peerId=-1;
  29. btimeStopedCar = false;
  30. File_fd = NULL;
  31. _Version = 1;
  32. }
  33. CMessageQueue::~CMessageQueue()
  34. {
  35. }
  36. void renableUSB(const char* file)
  37. {
  38. printf("Resetting USB device %s\n", file);
  39. int fd = open(file, O_WRONLY);
  40. if (fd < 0) {
  41. char text[256];
  42. perror(text);
  43. printf("Error opening output file %s", text);
  44. return;
  45. }
  46. int rc = ioctl(fd, USBDEVFS_RESET, 0);
  47. if (rc < 0) {
  48. perror("Error in ioctl");
  49. return;
  50. }
  51. printf("Reset successful\n");
  52. close(fd);
  53. }
  54. void CMessageQueue::Create()
  55. {
  56. Json::Value root;
  57. Json::Reader jsonReader;
  58. std::ifstream ifile("./config.json");
  59. std::string serverip;
  60. int32_t _hostPort;
  61. int32_t _remotePort;
  62. _curTick=0;
  63. bStopedCar=false;
  64. if(jsonReader.parse(ifile,root))
  65. {
  66. std::cout<<"enter config json"<<std::endl;
  67. _serial=root["serial"].asString();
  68. int32_t _Rtkport=root["rtk_port"].asInt();
  69. int32_t _Rtkhost=root["rtk_host"].asInt();
  70. std::string _Rtkip=root["rtk_ip"].asString();
  71. std::cout<<"Rtk ip:"<< _Rtkip <<std::endl;
  72. _Rtk = std::make_unique<SensorSocket<CRtkSensor>>(this, _Rtkip, _Rtkport, _Rtkhost);
  73. //_Rtk->Start();
  74. _Version = root["Version"].asInt();
  75. if(_Version)//AD10
  76. {
  77. std::string can_port_radar = root["can_bus_radar"].asString();
  78. _CanBusRadar = std::make_unique<SensorCanBus<CCanRadarSensor>>(this, can_port_radar);
  79. //_CanBusRadar->Start();
  80. }
  81. else
  82. {
  83. //_RadarIp
  84. int32_t _radarport=root["radar_port"].asInt();
  85. int32_t _radarhost=root["radar_host"].asInt();
  86. std::string _radarip=root["radar_ip"].asString();
  87. std::cout<<"radar ip:"<<_radarip<<std::endl;
  88. _RadarIp = std::make_unique<SensorSocket<CRadarSensor>>(this,_radarip,_radarport,_radarhost);
  89. //_RadarIp->Start();
  90. }
  91. if(_Version)//AD10
  92. {
  93. std::string can_port = root["can_bus_vehicle"].asString();
  94. _CanBusVehicle = std::make_unique<SensorCanBus<CCanBusSensor>>(this, can_port);
  95. //_CanBusVehicle->Start();
  96. }
  97. else
  98. {
  99. _PcanBusVehicle = std::make_unique<SensorPeakCan<CPcanSensor>>(this);
  100. //_PcanBusVehicle->Start();
  101. }
  102. _UdpMinPort=root["udp_min_port"].asInt();
  103. _UdpMaxPort=root["udp_max_port"].asInt();
  104. //20230417
  105. //const char* serverUrl = "tcp://localhost:1883"; //服务器地址
  106. //const char* userName = ""; //用户名
  107. //const char* password = ""; //密码
  108. std::string serverUrl = root["MqttserverUrl"].asString();
  109. std::string userName = root["Esn"].asString();
  110. std::string password = root["EsnPass"].asString();
  111. std::string clientId = ""; //客户端标识符
  112. bool _Hm256 = false;
  113. unsigned char Source[128];
  114. time_t rawtime;
  115. struct tm* info;
  116. char buffer[80];
  117. time(&rawtime);
  118. info = localtime(&rawtime);
  119. uint8_t secret[32] = { 0 };
  120. //如UTC 时间2018/7/24 17:56:20 则应表示为2018072417。
  121. /*sprintf((char*)secret, "%d%.2d%.2d%.2d", info->tm_year + 1900, info->tm_mon + 1, info->tm_mday,
  122. info->tm_hour);*/
  123. sprintf((char*)secret, "%d%.2d%.2d", info->tm_year + 1900, info->tm_mon + 1, info->tm_mday);
  124. memset(Source, 0, 128);
  125. sprintf((char*)Source, "%s_0_0_%s", userName.c_str(), secret);
  126. clientId.clear();
  127. clientId.append((const char*)Source);
  128. if (password.length() && _Hm256)
  129. {
  130. uint8_t outdata[128] = { 0 };
  131. uint8_t md[SHA256_DIGESTLEN] = { 0 };
  132. int len1 = strlen((char*)secret);
  133. int len2 = strlen((char*)password.c_str());
  134. HMAC_SHA256_CTX hmac;
  135. hmac_sha256_init(&hmac, secret, len1);
  136. hmac_sha256_update(&hmac, (const uint8_t*)password.c_str(), len2);
  137. hmac_sha256_final(&hmac, md);
  138. memcpy(outdata, md, SHA256_DIGESTLEN);
  139. password.clear();
  140. password.append((const char*)outdata);
  141. }
  142. _Mqtt_ZR = std::make_unique<SensorMQTT<CMqttSensor>>(this, serverUrl, userName, password, clientId);
  143. std::string DataServerUrl = root["DataServerUrl"].asString();
  144. std::string DatauserName = root["DataEsn"].asString();
  145. std::string Datapassword = root["DataEsnPass"].asString();
  146. std::string DataclientId = root["DataClientId"].asString();
  147. _Mqtt_SE = std::make_unique<SensorMQTT<CDataMqttSensor>>(this, DataServerUrl, DatauserName, Datapassword, DataclientId);
  148. //_Mqtt_SE->Start();
  149. //std::string _usb_1 = root["usb_3"].asString();
  150. //renableUSB("/dev/bus/usb/002/002");
  151. //renableUSB("/dev/bus/usb/001/003");
  152. //renableUSB(_usb_2.c_str());
  153. //renableUSB(_usb_1.c_str());
  154. serverip=root["ip"].asString();
  155. _hostPort=root["TcpHostPort"].asInt();
  156. _remotePort=root["TcpRemotePort"].asInt();
  157. _name=root["name"].asString();
  158. const Json::Value arrayObj=root["camerainfo"];
  159. int32_t count=arrayObj.size();
  160. for(int32_t i=0;i<count;i++)
  161. {
  162. LocalCameraInfo info;
  163. const Json::Value& value=arrayObj[i];
  164. info.index=value["index"].asInt();
  165. info.label=value["label"].asString();
  166. _cameraArray.push_back(info);
  167. }
  168. for(int i=0;i<_cameraArray.size();i++)
  169. {
  170. _peerArray.push_back({nullptr});
  171. }
  172. }
  173. else{
  174. std::string error=jsonReader.getFormattedErrorMessages();
  175. std::cout<<error<<std::endl;
  176. }
  177. _client=std::make_unique<SocketClient>(this);
  178. _client->Start(serverip.c_str(),_remotePort,_hostPort);
  179. std::this_thread::yield();
  180. }
  181. void CMessageQueue::EnQueue(CIOBuffer* pBuffer)
  182. {
  183. bool bNullBuffer=false;
  184. std::unique_lock <std::mutex> lck(_lock);
  185. if(Head==nullptr)
  186. {
  187. Head=Tail=pBuffer;
  188. bNullBuffer=true;
  189. }
  190. else{
  191. Tail->NextBuf=pBuffer;
  192. Tail=Tail->NextBuf;
  193. }
  194. pBuffer->NextBuf=nullptr;
  195. if(bNullBuffer)
  196. {
  197. //_cv.notify_one();
  198. //20231023
  199. _cv.notify_all();
  200. }
  201. }
  202. void CMessageQueue::Process()
  203. {
  204. CIOBuffer * ptr=nullptr;
  205. {
  206. std::unique_lock <std::mutex> lck(_lock);
  207. /*
  208. while(Head==nullptr)
  209. {
  210. _cv.wait(lck);
  211. }
  212. */
  213. while(Head==nullptr&&_cv.wait_for(lck,std::chrono::seconds(1))==std::cv_status::timeout)
  214. {
  215. CheckSignal();
  216. //std::cout<<".";
  217. //std::cout.flush();
  218. }
  219. }
  220. while(Head!=nullptr)
  221. {
  222. ptr=Head;
  223. Head=Head->NextBuf;
  224. if(ptr!=nullptr)
  225. {
  226. Message* message=reinterpret_cast<Message *>(ptr->Buffer);
  227. switch (message->cmd)
  228. {
  229. case MessageType::ReqVideo:
  230. OnNotifyReq((int32_t)message->param_l);
  231. break;
  232. case MessageType::RepVideo:
  233. OnNotifyRep((int32_t)message->param_l);
  234. break;
  235. case MessageType::Connected:
  236. OnNotifyConnected((bool)message->param_l);
  237. break;
  238. case MessageType::Leave:
  239. OnNotifyLeave();
  240. break;
  241. case MessageType::AsyncMessage:
  242. OnNotifyMessage();
  243. break;
  244. case MessageType::StopSensor:
  245. OnNotifyStopSensor();
  246. break;
  247. case MessageType::Ping:
  248. OnNotifyPing(message->param_l);
  249. break;
  250. }
  251. ptr->Release();
  252. }
  253. }
  254. }
  255. void CMessageQueue::SetTick(long long tick)
  256. {
  257. _curTick=tick;
  258. }
  259. void CMessageQueue::OnNotifyConnected(bool bRet)
  260. {
  261. if(bRet)
  262. {
  263. _client->WriteAddRobot(_serial,_name,static_cast<int32_t>(EgoType::Car));
  264. _updatethread.start(_client.get());
  265. //cs->Analog(0,0,0,0,0);
  266. }
  267. else
  268. {
  269. if(_peerId!=-1)
  270. {
  271. OnVideoLeave(_peerId,EgoType::User);
  272. _peerId=-1;
  273. }
  274. _updatethread.stop();
  275. }
  276. }
  277. // int day_diffient(int year_start, int month_start, int day_start,char *Dst)
  278. // {
  279. // //2024-03-11
  280. // int year_end = strtol(Dst,NULL,10);
  281. // char Temp = '-',*p = NULL,*p1 = NULL;
  282. // p = strchr(Dst,Temp);
  283. // int month_end = strtol(p + 1,NULL,10);
  284. // p1 = strchr(p + 1,Temp);
  285. // int day_end = strtol(p1 + 1,NULL,10);
  286. // int y2, m2, d2;
  287. // int y1, m1, d1;
  288. // m1 = (month_start + 9) % 12;
  289. // y1 = year_start - m1/10;
  290. // d1 = 365*y1 + y1/4 - y1/100 + y1/400 + (m1*306 + 5)/10 + (day_start - 1);
  291. // m2 = (month_end + 9) % 12;
  292. // y2 = year_end - m2/10;
  293. // d2 = 365*y2 + y2/4 - y2/100 + y2/400 + (m2*306 + 5)/10 + (day_end - 1);
  294. // return (d1 - d2);
  295. // }
  296. // void delete_days(const char *path)
  297. // {
  298. // DIR *dir;
  299. // struct dirent *entry;
  300. // struct stat statbuf;
  301. // time_t rawtime;
  302. // struct tm* info;
  303. // time(&rawtime);
  304. // info = localtime(&rawtime);
  305. // uint8_t secret[64] = { 0 };
  306. // sprintf((char*)secret, "%d-%.2d-%.2d.log", info->tm_year + 1900, info->tm_mon + 1, info->tm_mday);
  307. // printf("%s\r\n",secret);
  308. // dir = opendir(path);
  309. // if (dir == NULL)
  310. // {
  311. // printf("无法打开目录\n");
  312. // return;
  313. // }
  314. // while ((entry = readdir(dir)) != NULL)
  315. // {
  316. // if(entry->d_name[0]=='.')
  317. // continue;
  318. // //printf("%s\n",entry->d_name);
  319. // if(day_diffient(info->tm_year + 1900, info->tm_mon + 1, info->tm_mday,entry->d_name) > 15)
  320. // {
  321. // char Dst[128];
  322. // memset(Dst,0,128);
  323. // sprintf(Dst,"rm -rf /home/nvidia/ZJ_PRO/EgoSystem/build/log/%s",entry->d_name);
  324. // system(Dst);
  325. // }
  326. // }
  327. // closedir(dir);
  328. // }
  329. // void CMessageQueue::SerichFile(char *filename)
  330. // {
  331. // DIR *directory_pointer;
  332. // struct dirent *entry;
  333. // int exist = 0;
  334. // char Dst[128];
  335. // memset(Dst,0,128);
  336. // sprintf(Dst,"/home/nvidia/devdata/zhanjiang_0534/EgoSystem/build/log/%s",filename);
  337. // if((directory_pointer=opendir("/home/nvidia/devdata/zhanjiang_0534/EgoSystem/build/log/")) == NULL)
  338. // printf("Error open\n");
  339. // else
  340. // {
  341. // while((entry=readdir(directory_pointer)) != NULL)
  342. // {
  343. // if(entry->d_name[0]=='.') continue;
  344. // printf("%s\n",entry->d_name);
  345. // if(!strcmp(entry->d_name,filename))
  346. // {
  347. // File_fd = fopen(Dst, "a");
  348. // exist = 1;
  349. // break;
  350. // }
  351. // }
  352. // }
  353. // if(!exist)
  354. // File_fd = fopen(Dst, "w+");
  355. // closedir(directory_pointer);
  356. // }
  357. void CMessageQueue::OnNotifyReq(int32_t index)
  358. {
  359. // std::cout<<__FUNCTION__<<","<<__LINE__<<std::endl;
  360. if(_peerId==-1) return;
  361. if(index==0)
  362. {
  363. mrsWebrtcCreateFactory(true);
  364. _curTick=0;
  365. bStopedCar=false;
  366. _Rtk->Start();
  367. if(_Version)//AD10
  368. {
  369. _CanBusVehicle->Start();
  370. _CanBusRadar->Start();
  371. }
  372. else
  373. {
  374. _PcanBusVehicle->Start();
  375. _RadarIp->Start();
  376. }
  377. _Mqtt_ZR->Start();
  378. _Mqtt_SE->Start();
  379. // File_fd = NULL;
  380. // struct statfs diskInfo;
  381. // statfs("/dev/mmcblk0p1", &diskInfo);
  382. // unsigned long long freeDisk = diskInfo.f_bfree * diskInfo.f_bsize;
  383. // time_t rawtime;
  384. // struct tm* info;
  385. // time(&rawtime);
  386. // info = localtime(&rawtime);
  387. // uint8_t secret[64] = { 0 };
  388. // sprintf((char*)secret, "%d-%.2d-%.2d.log", info->tm_year + 1900, info->tm_mon + 1, info->tm_mday);
  389. // if((freeDisk >> 30) < 15)
  390. // {
  391. // system("rm -rf /home/nvidia/devdata/zhanjiang_0534/EgoSystem/build/log");
  392. // system("mkdir /home/nvidia/devdata/zhanjiang_0534/EgoSystem/build/log/");
  393. // }
  394. // else
  395. // delete_days("/home/nvidia/devdata/zhanjiang_0534/EgoSystem/build/log");
  396. // SerichFile((char *)secret);
  397. }
  398. _peerArray[index]=std::make_unique<CPeerConnection>(static_cast<ChannelType>(index),_client.get());
  399. _client->WriteVideoRep(_peerId, RemoNet::VideoDesc::OK, index);
  400. }
  401. void CMessageQueue::OnNotifyRep(int32_t index)
  402. {
  403. _peerArray[index]=std::make_unique<CPeerConnection>(static_cast<ChannelType>(index), _client.get());
  404. InitPeerConnection(_peerId,index);
  405. _peerArray[index]->CreateOffer();
  406. }
  407. void CMessageQueue::InitPeerConnection(int32_t peer,int32_t index)
  408. {
  409. _peerArray[index]->Initialize(peer,index,_UdpMinPort,_UdpMaxPort);
  410. _peerArray[index]->AddDataChannel(true, false);
  411. _peerArray[index]->AddLocalVideoTrack(static_cast<RenderPosition>(index),_cameraArray[index].index);
  412. if(index==RenderPosition::BACK)
  413. {
  414. void * front=_peerArray[RenderPosition::FRONT]->GetCurrentCtx();
  415. while(front==nullptr)
  416. {
  417. std::cout<<"front==nullptr"<<std::endl;
  418. std::this_thread::sleep_for(std::chrono::microseconds(50));
  419. front=_peerArray[RenderPosition::FRONT]->GetCurrentCtx();
  420. }
  421. void * back=_peerArray[RenderPosition::BACK]->GetCurrentCtx();
  422. while (back==nullptr)
  423. {
  424. std::cout<<"back==nullptr"<<std::endl;
  425. std::this_thread::sleep_for(std::chrono::microseconds(50));
  426. back=_peerArray[RenderPosition::BACK]->GetCurrentCtx();
  427. }
  428. _peerArray[RenderPosition::FRONT]->SetOtherCtx(back);
  429. _peerArray[RenderPosition::BACK]->SetOtherCtx(front);
  430. }
  431. /*
  432. if((index+1)==RenderPosition::ALL)
  433. {
  434. void * front=_peerArray[RenderPosition::FRONT]->GetCurrentCtx();
  435. while(front==nullptr)
  436. {
  437. std::cout<<"front==nullptr"<<std::endl;
  438. std::this_thread::sleep_for(std::chrono::microseconds(50));
  439. front=_peerArray[RenderPosition::FRONT]->GetCurrentCtx();
  440. }
  441. void * back=_peerArray[RenderPosition::BACK]->GetCurrentCtx();
  442. while (back==nullptr)
  443. {
  444. std::cout<<"back==nullptr"<<std::endl;
  445. std::this_thread::sleep_for(std::chrono::microseconds(50));
  446. back=_peerArray[RenderPosition::BACK]->GetCurrentCtx();
  447. }
  448. _peerArray[RenderPosition::FRONT]->SetOtherCtx(back);
  449. _peerArray[RenderPosition::BACK]->SetOtherCtx(front);
  450. void * left=_peerArray[RenderPosition::LEFT]->GetCurrentCtx();
  451. while(left==nullptr)
  452. {
  453. std::cout<<"left==nullptr"<<std::endl;
  454. std::this_thread::sleep_for(std::chrono::microseconds(50));
  455. front=_peerArray[RenderPosition::LEFT]->GetCurrentCtx();
  456. }
  457. void * right=_peerArray[RenderPosition::RIGHT]->GetCurrentCtx();
  458. while (right==nullptr)
  459. {
  460. std::cout<<"right==nullptr"<<std::endl;
  461. std::this_thread::sleep_for(std::chrono::microseconds(50));
  462. back=_peerArray[RenderPosition::RIGHT]->GetCurrentCtx();
  463. }
  464. _peerArray[RenderPosition::LEFT]->SetOtherCtx(right);
  465. _peerArray[RenderPosition::RIGHT]->SetOtherCtx(left);
  466. }
  467. */
  468. /*
  469. if((index+1)==RenderPosition::ALL)
  470. {
  471. void * front=_peerArray[RenderPosition::FRONT]->GetCurrentCtx();
  472. void * back=_peerArray[RenderPosition::BACK]->GetCurrentCtx();
  473. void * left=_peerArray[RenderPosition::LEFT]->GetCurrentCtx();
  474. void * right=_peerArray[RenderPosition::RIGHT]->GetCurrentCtx();
  475. void * dash=_peerArray[RenderPosition::DASHBOARD]->GetCurrentCtx();
  476. _peerArray[RenderPosition::FRONT]->SetOtherCtx(back);
  477. _peerArray[RenderPosition::BACK]->SetOtherCtx(front);
  478. _peerArray[RenderPosition::LEFT]->SetOtherCtx(left);
  479. _peerArray[RenderPosition::RIGHT]->SetOtherCtx(right);
  480. _peerArray[RenderPosition::DASHBOARD]->SetOtherCtx(dash);
  481. }
  482. */
  483. if(index==RenderPosition::FRONT)
  484. _peerArray[index]->AddLocalAudioTrack();
  485. }
  486. void CMessageQueue::OnAdd(bool bRet)
  487. {
  488. }
  489. void CMessageQueue::OnConnected(bool bRet)
  490. {
  491. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  492. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  493. message->cmd=MessageType::Connected;
  494. message->param_l=bRet;
  495. EnQueue(pBuffer);
  496. }
  497. void CMessageQueue::OnVideoLeave(int32_t peer,EgoType type)
  498. {
  499. {
  500. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  501. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  502. message->cmd=MessageType::StopSensor;
  503. EnQueue(pBuffer);
  504. }
  505. {
  506. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  507. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  508. message->cmd=MessageType::Leave;
  509. EnQueue(pBuffer);
  510. }
  511. }
  512. #ifdef WIN32
  513. void CMessageQueue::OnVideoRep(int32_t index,RemoNet::VideoDesc desc)
  514. {
  515. if (desc == RemoNet::VideoDesc::OK)
  516. {
  517. assert(_peerId!=-1);
  518. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  519. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  520. message->cmd=MessageType::ReqVideo;
  521. message->param_l=index;
  522. EnQueue(pBuffer);
  523. }
  524. }
  525. #else
  526. void CMessageQueue::OnVideoReq(int32_t video,int32_t peer)
  527. {
  528. // std::cout<<__FUNCTION__<<","<<__LINE__<<std::endl;
  529. _peerId=peer;
  530. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  531. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  532. message->cmd=MessageType::ReqVideo;
  533. message->param_l=video;
  534. EnQueue(pBuffer);
  535. }
  536. #endif
  537. void CMessageQueue::OnNotifyLeave()
  538. {
  539. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  540. for (size_t i = 0; i < _peerArray.size(); i++)
  541. {
  542. if(_peerArray[i]!=nullptr)
  543. {
  544. _peerArray[i]->Close();
  545. _peerArray[i].reset();
  546. }
  547. /* code */
  548. }
  549. _peerId=-1;
  550. }
  551. void CMessageQueue::OnNotifyStopSensor()
  552. {
  553. _curTick=0;
  554. if(_Version)//AD10
  555. {
  556. std::cout<<"_CanBusVehicle Stop"<<std::endl;
  557. _CanBusVehicle->Stop();
  558. std::cout<<"_CanBusRadar Stop"<<std::endl;
  559. _CanBusRadar->Stop();
  560. }
  561. else
  562. {
  563. std::cout<<"_PcanBusVehicle Stop"<<std::endl;
  564. _PcanBusVehicle->Stop();
  565. std::cout<<"_RadarIp Stop"<<std::endl;
  566. _RadarIp->Stop();
  567. }
  568. std::cout << "RTK Stop" << std::endl;
  569. _Rtk->Stop();
  570. std::cout<<"mqtt Stop"<<std::endl;
  571. _Mqtt_ZR->Stop();
  572. std::cout<<"data mqtt Stop"<<std::endl;
  573. _Mqtt_SE->Stop();
  574. // if(!File_fd)
  575. // {
  576. // fclose(File_fd);
  577. // File_fd = NULL;
  578. // }
  579. RemoNet::StopAck Rep;
  580. CIOBuffer Buffer;
  581. MessageHead Head;
  582. Head.Command = RemoNet::CC_StopACK;
  583. Head.Length = Rep.ByteSizeLong();
  584. Head.Serialize(Buffer.Buffer);
  585. auto ptr = Buffer.Buffer + MessageHead::Size();
  586. Rep.SerializeToArray(ptr, Head.Length);
  587. Buffer.Length = Head.Length + MessageHead::Size();
  588. _peerArray[RenderPosition::FRONT]->SendData(Buffer);
  589. }
  590. void CMessageQueue::OnVideoOffer(int32_t index,const char* type, const char* sdp)
  591. {
  592. // std::cout<<__FUNCTION__<<","<<__LINE__<<std::endl;
  593. InitPeerConnection(_peerId,index);
  594. _peerArray[index]->SetRemoteDescription(type,sdp);
  595. _peerArray[index]->CreateAnswer();
  596. }
  597. void CMessageQueue::OnVideoAnswer(int32_t index, const char* type, const char* sdp)
  598. {
  599. // std::cout<<__FUNCTION__<<","<<__LINE__<<std::endl;
  600. _peerArray[index]->SetRemoteDescription(type,sdp);
  601. }
  602. void CMessageQueue::OnVideoCandidate(int32_t index,const char* candidate,
  603. int32_t sdp_mline_index,
  604. const char* sdp_mid)
  605. {
  606. _peerArray[index]->AddIceCandidate(candidate,sdp_mline_index,sdp_mid);
  607. }
  608. /*
  609. void CMessageQueue::SwitchCamera(bool front)
  610. {
  611. _peerArray[RenderPosition::FRONT_BACK]->SwitchCapture(front);
  612. }
  613. */
  614. void CMessageQueue::OnMqttUser(std::string user_uid,int32_t cockpitId,int32_t vehicleId){
  615. user_uuid = user_uid;
  616. cockpit_id = cockpitId;
  617. vehicle_id = vehicleId ;
  618. }
  619. void CMessageQueue::OnMessageFrameNotify(ChannelType type,int16_t cmd,int16_t length,const void * data)
  620. {
  621. // std::cout<<"cmd:" <<std::hex<<cmd<<std::endl;
  622. if(cmd==RemoNet::CC_Text)
  623. {
  624. RemoNet::TestTextReq Req;
  625. Req.ParseFromArray(data,length);
  626. std::cout<<Req.text()<<std::endl;
  627. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  628. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  629. message->cmd=MessageType::AsyncMessage;
  630. EnQueue(pBuffer);
  631. }
  632. else if(cmd==RemoNet::CC_Switch)
  633. {
  634. RemoNet::CCSwitch Req;
  635. Req.ParseFromArray(data,length);
  636. bool front=Req.front();
  637. _peerArray[RenderPosition::FRONT]->SwitchCapture(front);
  638. }
  639. else if(cmd==RemoNet::CC_Ping)
  640. {
  641. RemoNet::CCPing Req;
  642. Req.ParseFromArray(data,length);
  643. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  644. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  645. message->cmd=MessageType::Ping;
  646. message->param_l=Req.tick();
  647. EnQueue(pBuffer);
  648. }
  649. else if(cmd==RemoNet::CC_SensorStop)
  650. {
  651. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  652. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  653. message->cmd=MessageType::StopSensor;
  654. EnQueue(pBuffer);
  655. }
  656. else if (cmd == RemoNet::CC_CANMSG)
  657. {
  658. _source = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  659. _curTick = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  660. RemoNet::CCCanMsg Req;
  661. Req.ParseFromArray(data, length);
  662. cannet_frame* frames = (cannet_frame*)alloca(sizeof(cannet_frame) * Req.frams_size());
  663. for (int32_t i = 0; i < Req.frams_size(); i++)
  664. {
  665. auto& frame = Req.frams(i);
  666. frames[i].canid = frame.canid();
  667. frames[i].dlc = frame.dlc();
  668. memcpy(frames[i].data, frame.data().data(), frame.dlc());
  669. }
  670. //_robot->Get()->OnMessage(frames, Req.frams_size());
  671. if(_Version)//AD10
  672. _CanBusVehicle->Get()->OnMessage(frames, Req.frams_size());
  673. else
  674. _PcanBusVehicle->Get()->OnMessage(frames, Req.frams_size());
  675. }
  676. }
  677. void CMessageQueue::StopCar()
  678. {
  679. std::cout<<"Stop Car"<<std::endl;
  680. if(_Version)//AD10
  681. _CanBusVehicle->Get()->Emergency();
  682. else
  683. _PcanBusVehicle->Get()->Emergency();
  684. }
  685. void CMessageQueue::OnNotifyMessage()
  686. {
  687. RemoNet::TestTextReq Req;
  688. Req.set_text("ewqewqewqe");
  689. CIOBuffer Buffer;
  690. MessageHead Head;
  691. Head.Command = RemoNet::CC_Text;
  692. Head.Length = Req.ByteSizeLong();
  693. Head.Serialize(Buffer.Buffer);
  694. auto ptr = Buffer.Buffer + MessageHead::Size();
  695. Req.SerializeToArray(ptr, Head.Length);
  696. Buffer.Length = Head.Length + MessageHead::Size();
  697. _peerArray[RenderPosition::FRONT]->SendData(Buffer);
  698. }
  699. void CMessageQueue::OnNotifyPing(int64_t value)
  700. {
  701. RemoNet::CCPing Rep;
  702. Rep.set_tick(value);
  703. CIOBuffer Buffer;
  704. MessageHead Head;
  705. Head.Command = RemoNet::CC_Ping;
  706. Head.Length = Rep.ByteSizeLong();
  707. Head.Serialize(Buffer.Buffer);
  708. auto ptr = Buffer.Buffer + MessageHead::Size();
  709. Rep.SerializeToArray(ptr, Head.Length);
  710. Buffer.Length = Head.Length + MessageHead::Size();
  711. //std::cout << "ping" << std::endl;
  712. if( _peerArray[RenderPosition::FRONT]!=nullptr && _peerArray[RenderPosition::FRONT]->bReadyChannel)
  713. _peerArray[RenderPosition::FRONT]->SendData(Buffer);
  714. }
  715. /*
  716. void CMessageQueue::StartCar()
  717. {
  718. _can->SetStartWrite(true);
  719. }
  720. */
  721. void CMessageQueue::CheckSignal()
  722. {
  723. if(!bStopedCar)
  724. {
  725. long long tick=std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  726. if(_curTick!=0&&tick-_curTick > 3)
  727. {
  728. StopCar();
  729. bStopedCar=true;
  730. std::cout<<"_curTick!=0&&tick-_curTick > 3" << std::endl;
  731. _curStopTick = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  732. btimeStopedCar = true;
  733. _curTick = 0;
  734. }
  735. }
  736. }
  737. void CMessageQueue::WriteIMUData(ImuData* data)
  738. {
  739. MessageHead Head;
  740. CIOBuffer Buffer;
  741. RemoNet::IMuMessage Req;
  742. Req.set_rx(data->ry);
  743. Req.set_ry(data->rx);
  744. // Req.set_rz(data->rz);
  745. Head.Command=RemoNet::CC_IMU;
  746. Head.Length=Req.ByteSizeLong();
  747. Head.Serialize(Buffer.Buffer);
  748. auto ptr = Buffer.Buffer + MessageHead::Size();
  749. Req.SerializeToArray(ptr, Head.Length);
  750. Buffer.Length = Head.Length + MessageHead::Size();
  751. if( _peerArray[ChannelType::CHANNEL_IMU]!=nullptr)
  752. _peerArray[ChannelType::CHANNEL_IMU]->SendData(Buffer);
  753. }
  754. void CMessageQueue::WritePacket(ChannelType type, CIOBuffer & pBuffer)
  755. {
  756. if( _peerArray[type]!=nullptr)
  757. _peerArray[type]->SendData(pBuffer);
  758. }
  759. void CMessageQueue::WriteRadarData(RadarData& data)
  760. {
  761. MessageHead Head;
  762. CIOBuffer Buffer;
  763. RemoNet::CCRadarMessage Req;
  764. Req.set_radar0(data.r0);
  765. Req.set_radar1(data.r1);
  766. Req.set_radar2(data.r2);
  767. Req.set_radar3(data.r3);
  768. Req.set_radar4(data.r4);
  769. Req.set_radar5(data.r5);
  770. Req.set_radar6(data.r6);
  771. Req.set_radar7(data.r7);
  772. //Head.Command=RemoNet::CC_IMU;
  773. Head.Command = RemoNet::CC_Radar;
  774. Head.Length=Req.ByteSizeLong();
  775. Head.Serialize(Buffer.Buffer);
  776. auto ptr = Buffer.Buffer + MessageHead::Size();
  777. Req.SerializeToArray(ptr, Head.Length);
  778. Buffer.Length = Head.Length + MessageHead::Size();
  779. if( _peerArray[ChannelType::CHANNEL_RADAR]!=nullptr)
  780. _peerArray[ChannelType::CHANNEL_RADAR]->SendData(Buffer);
  781. }
  782. void CMessageQueue::WriteRobotStatus(int32_t ,int32_t )
  783. {
  784. }
  785. #ifdef LIDAR_SENSOR
  786. void CMessageQueue::WriteLidarPoint(const PointCloudMsg<PointXYZI>& msg,ChannelType side)
  787. {
  788. RemoNet::LidarPoint pt;
  789. pt.set_is_left(side==ChannelType::CHANNEL_LEFT_LIDAR);
  790. pt.set_frame_id(msg.frame_id);
  791. pt.set_height(msg.height);
  792. pt.set_width(msg.width);
  793. pt.set_is_dense(msg.is_dense);
  794. pt.set_seq(msg.seq);
  795. pt.set_timestamp(msg.timestamp);
  796. for(int i=0;i<msg.point_cloud_ptr->size();i++)
  797. {
  798. pt.add_data((*msg.point_cloud_ptr)[i].x);
  799. pt.add_data((*msg.point_cloud_ptr)[i].y);
  800. pt.add_data((*msg.point_cloud_ptr)[i].z);
  801. pt.add_data((*msg.point_cloud_ptr)[i].intensity);
  802. }
  803. MessageHead Head;
  804. CIOBuffer Buffer;
  805. Head.Command=RemoNet::CC_LIDARDATA;
  806. Head.Length=pt.ByteSizeLong();
  807. Head.Serialize(Buffer.Buffer);
  808. auto ptr = Buffer.Buffer + MessageHead::Size();
  809. pt.SerializeToArray(ptr, Head.Length);
  810. Buffer.Length = Head.Length + MessageHead::Size();
  811. if( _peerArray[side]!=nullptr)
  812. _peerArray[side]->SendData(&Buffer);
  813. }
  814. #endif
  815. void CMessageQueue::SwitchCamera(bool front)
  816. {
  817. _peerArray[RenderPosition::FRONT]->SwitchCapture(front);
  818. _peerArray[RenderPosition::BACK]->SwitchCapture(front);
  819. }
  820. void CMessageQueue::SendZGJStatus(int status)
  821. {
  822. _Rtk->Get()->Send_ZGJ_status(status);
  823. }
  824. void CMessageQueue::SendVehicleStatus(vehicle_control_cv control_can)
  825. {
  826. //CDataMqttSensor::sendMessage()
  827. if (CDataMqttSensor::_run)
  828. {
  829. struct timeval tv;
  830. gettimeofday(&tv, NULL);
  831. char m_Send[16],Dst[16];
  832. memset(Dst, 0, 16);
  833. sprintf((char*)Dst, "%ld", tv.tv_sec * 1000 + tv.tv_usec / 1000);
  834. Json::Value root;
  835. Json::Value Source;
  836. Json::FastWriter writer;
  837. std::string SendTime;
  838. Source["userID"] = user_uuid;
  839. Source["cockpitID"] = cockpit_id;
  840. Source["VehicleID"] = vehicle_id;
  841. Source["timestamp"] = SendTime.assign(Dst,strlen(Dst));
  842. //车辆基本状态控制
  843. Json::Value baseControl;
  844. baseControl["keyStatus"] = control_can.keyStatus;
  845. baseControl["parkControl"] = control_can.parkControl;
  846. baseControl["travelMode"] = control_can.travelMode;
  847. baseControl["eStop"] = control_can.eStop;
  848. baseControl["directSwitch"] = control_can.directSwitch;
  849. baseControl["gearCaontrol"] = control_can.gearControl;
  850. baseControl["hazardLight"] = control_can.hazardLight;
  851. baseControl["travelLight"] = control_can.travelLight;
  852. baseControl["vehicleHorn"] = control_can.vehicleHorn;
  853. baseControl["silencedAlarm"] = control_can.silencedAlarm;
  854. Source["baseControl"].append(baseControl);
  855. //车辆行驶控制
  856. Json::Value driveControl;
  857. Json::Value accPedal;
  858. accPedal["accPedalH"] = control_can.accPedalH;
  859. accPedal["accPedalF"] = control_can.accPedalF;
  860. driveControl["accPedal"].append(accPedal);
  861. driveControl["brakePedal"] = control_can.brakePedal;
  862. driveControl["steeringWheel"] = control_can.steeringWheel;
  863. driveControl["turnSignal"] = control_can.turnSignal;
  864. driveControl["turnMode"] = control_can.turnMode;
  865. Source["driveControl"].append(driveControl);
  866. //作业机构控制
  867. Json::Value taskControl;
  868. taskControl["enableHydraulic"] = control_can.enableHydraulic;
  869. taskControl["toolControl"] = control_can.toolControl;
  870. taskControl["workLight"] = control_can.workLight;
  871. taskControl["bypassSwitch"] = control_can.bypassSwitch;
  872. taskControl["taskJoint_1"] = control_can.taskJoint_1;
  873. taskControl["taskJoint_2"] = control_can.taskJoint_2;
  874. taskControl["taskJoint_3"] = control_can.taskJoint_3;
  875. taskControl["endJoint"] = control_can.endJoint;
  876. taskControl["baseLegSwitch"] = control_can.baseLegSwitch;
  877. taskControl["baseLegControl"] = control_can.baseLegControl;
  878. taskControl["cabLift"] = control_can.cabLift;
  879. taskControl["esCabLift"] = control_can.esCabLift;
  880. taskControl["suckerSelect"] = control_can.suckerSelect;
  881. taskControl["cooperationSignal"] = control_can.coopSignal;
  882. Source["taskControl"].append(taskControl);
  883. //舱端故障报警
  884. Json::Value errCode;
  885. errCode["errBasOperation"] = control_can.errBasOperation;
  886. errCode["errAccPedal"] = control_can.errAccPedal;
  887. errCode["errBrakePedal"] = control_can.errBrakePedal;
  888. errCode["errSteeringWheel"] = control_can.errSteeringWheel;
  889. errCode["errHandle"] = control_can.errHandle;
  890. errCode["errEndTool"] = control_can.errEndTool;
  891. errCode["errOther"] = control_can.errOther;
  892. Source["errCode"].append(errCode);
  893. Json::StyledWriter sw;
  894. std::cout << sw.write(Source) << std::endl << std::endl;
  895. //控制发送频率
  896. // if (!DataMqtt_SendDate)
  897. // {
  898. // DataMqtt_curTick = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  899. // DataMqtt_SendDate = true;
  900. // }
  901. // else
  902. // {
  903. // long long tick = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  904. // if (DataMqtt_curTick != 0 && tick - DataMqtt_curTick > 10000)
  905. // {
  906. // if (CDataMqttSensor::_run && (CDataMqttSensor::m_mqttClient != NULL))
  907. // CDataMqttSensor::sendMessage((char*)sw.write(Source).c_str(), 0, (char*)"Vehicle/ControlVehicle/Veh001");
  908. // DataMqtt_SendDate = false;
  909. // }
  910. // }
  911. CDataMqttSensor::sendMessage((char*)sw.write(Source).c_str(), 0, (char*)"Vehicle/ControlVehicle/Veh001");
  912. }
  913. }
  914. void CMessageQueue::SendStatusToServer(int32_t travelSpeed, int fGear, int bGear, int gearStatus) {
  915. struct timeval tv;
  916. gettimeofday(&tv, NULL);
  917. char m_Send[16],Dst[16];
  918. memset(Dst, 0, 16);
  919. sprintf((char*)Dst, "%ld", tv.tv_sec * 1000 + tv.tv_usec / 1000);
  920. Json::Value root;
  921. Json::Value SoucreBus;
  922. Json::FastWriter writer;
  923. std::string SendTime;
  924. SoucreBus["userID"] = user_uuid;
  925. SoucreBus["cockpitID"] = cockpit_id;
  926. SoucreBus["VehicleID"] = vehicle_id;
  927. SoucreBus["travelSpeed"] = travelSpeed;
  928. SoucreBus["fGear"] = fGear;
  929. SoucreBus["bGear"] = bGear;
  930. SoucreBus["timestamp"] = SendTime.assign(Dst,strlen(Dst));
  931. SoucreBus["gearStatus"] =gearStatus;
  932. Json::StyledWriter sw;
  933. std::cout << sw.write(SoucreBus) << std::endl << std::endl;
  934. CDataMqttSensor::sendMessage((char*)sw.write(SoucreBus).c_str(), 0, (char*)"Vehicle/CanBus/State/Veh001");
  935. }