message_queue.cpp 32 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085
  1. #include <atomic>
  2. #include <string>
  3. #include "../common/comm.h"
  4. #include "api.h"
  5. #include "../common/iobuffer.h"
  6. #include "../common/sensor_socket.h"
  7. #include "../common/peer_connection.h"
  8. #include "VideoRenderer.h"
  9. #include "../thirdparty/jsoncpp/include/json/json.h"
  10. #include "protocol.pb.h"
  11. #include "imu_sensor.h"
  12. #include "Rtk.h"
  13. #include "message_queue.h"
  14. #include <fstream>
  15. #include <iostream>
  16. #include <cstring>
  17. #include "hmacsha256.h"
  18. #include <linux/usbdevice_fs.h>
  19. #include <sys/ioctl.h>
  20. #include <sys/statfs.h>
  21. #include <stdlib.h>
  22. #include <unistd.h>
  23. #include <sys/vfs.h>
  24. #include <dirent.h>
  25. #include <sys/stat.h>
  26. CMessageQueue::CMessageQueue():Head(nullptr),Tail(nullptr)
  27. {
  28. _peerId=-1;
  29. btimeStopedCar = false;
  30. File_fd = NULL;
  31. _Version = 1;
  32. }
  33. CMessageQueue::~CMessageQueue()
  34. {
  35. }
  36. void renableUSB(const char* file)
  37. {
  38. printf("Resetting USB device %s\n", file);
  39. int fd = open(file, O_WRONLY);
  40. if (fd < 0) {
  41. char text[256];
  42. perror(text);
  43. printf("Error opening output file %s", text);
  44. return;
  45. }
  46. int rc = ioctl(fd, USBDEVFS_RESET, 0);
  47. if (rc < 0) {
  48. perror("Error in ioctl");
  49. return;
  50. }
  51. printf("Reset successful\n");
  52. close(fd);
  53. }
  54. void CMessageQueue::Create()
  55. {
  56. Json::Value root;
  57. Json::Reader jsonReader;
  58. std::ifstream ifile("./config.json");
  59. std::string serverip;
  60. int32_t _hostPort;
  61. int32_t _remotePort;
  62. _curTick=0;
  63. bStopedCar=false;
  64. if(jsonReader.parse(ifile,root))
  65. {
  66. std::cout<<"enter config json"<<std::endl;
  67. _serial=root["serial"].asString();
  68. int32_t _Rtkport=root["rtk_port"].asInt();
  69. int32_t _Rtkhost=root["rtk_host"].asInt();
  70. std::string _Rtkip=root["rtk_ip"].asString();
  71. std::cout<<"Rtk ip:"<< _Rtkip <<std::endl;
  72. _Rtk = std::make_unique<SensorSocket<CRtkSensor>>(this, _Rtkip, _Rtkport, _Rtkhost);
  73. //_Rtk->Start();
  74. _Version = root["Version"].asInt();
  75. if(_Version)//AD10
  76. {
  77. std::string can_port_radar = root["can_bus_radar"].asString();
  78. _CanBusRadar = std::make_unique<SensorCanBus<CCanRadarSensor>>(this, can_port_radar);
  79. //_CanBusRadar->Start();
  80. }
  81. else
  82. {
  83. //_RadarIp
  84. int32_t _radarport=root["radar_port"].asInt();
  85. int32_t _radarhost=root["radar_host"].asInt();
  86. std::string _radarip=root["radar_ip"].asString();
  87. std::cout<<"radar ip:"<<_radarip<<std::endl;
  88. _RadarIp = std::make_unique<SensorSocket<CRadarSensor>>(this,_radarip,_radarport,_radarhost);
  89. //_RadarIp->Start();
  90. }
  91. if(_Version)//AD10
  92. {
  93. std::string can_port = root["can_bus_vehicle"].asString();
  94. _CanBusVehicle = std::make_unique<SensorCanBus<CCanBusSensor>>(this, can_port);
  95. //_CanBusVehicle->Start();
  96. }
  97. else
  98. {
  99. _PcanBusVehicle = std::make_unique<SensorPeakCan<CPcanSensor>>(this);
  100. //_PcanBusVehicle->Start();
  101. }
  102. _UdpMinPort=root["udp_min_port"].asInt();
  103. _UdpMaxPort=root["udp_max_port"].asInt();
  104. //20230417
  105. //const char* serverUrl = "tcp://localhost:1883"; //服务器地址
  106. //const char* userName = ""; //用户名
  107. //const char* password = ""; //密码
  108. std::string serverUrl = root["MqttserverUrl"].asString();
  109. std::string userName = root["Esn"].asString();
  110. std::string password = root["EsnPass"].asString();
  111. std::string clientId = ""; //客户端标识符
  112. bool _Hm256 = false;
  113. unsigned char Source[128];
  114. time_t rawtime;
  115. struct tm* info;
  116. char buffer[80];
  117. time(&rawtime);
  118. info = localtime(&rawtime);
  119. uint8_t secret[32] = { 0 };
  120. //如UTC 时间2018/7/24 17:56:20 则应表示为2018072417。
  121. /*sprintf((char*)secret, "%d%.2d%.2d%.2d", info->tm_year + 1900, info->tm_mon + 1, info->tm_mday,
  122. info->tm_hour);*/
  123. sprintf((char*)secret, "%d%.2d%.2d", info->tm_year + 1900, info->tm_mon + 1, info->tm_mday);
  124. memset(Source, 0, 128);
  125. sprintf((char*)Source, "%s_0_0_%s", userName.c_str(), secret);
  126. clientId.clear();
  127. clientId.append((const char*)Source);
  128. if (password.length() && _Hm256)
  129. {
  130. uint8_t outdata[128] = { 0 };
  131. uint8_t md[SHA256_DIGESTLEN] = { 0 };
  132. int len1 = strlen((char*)secret);
  133. int len2 = strlen((char*)password.c_str());
  134. HMAC_SHA256_CTX hmac;
  135. hmac_sha256_init(&hmac, secret, len1);
  136. hmac_sha256_update(&hmac, (const uint8_t*)password.c_str(), len2);
  137. hmac_sha256_final(&hmac, md);
  138. memcpy(outdata, md, SHA256_DIGESTLEN);
  139. password.clear();
  140. password.append((const char*)outdata);
  141. }
  142. _Mqtt_ZR = std::make_unique<SensorMQTT<CMqttSensor>>(this, serverUrl, userName, password, clientId);
  143. std::string DataServerUrl = root["DataServerUrl"].asString();
  144. std::string DatauserName = root["DataEsn"].asString();
  145. std::string Datapassword = root["DataEsnPass"].asString();
  146. std::string DataclientId = root["DataClientId"].asString();
  147. _Mqtt_SE = std::make_unique<SensorMQTT<CDataMqttSensor>>(this, DataServerUrl, DatauserName, Datapassword, DataclientId);
  148. //_Mqtt_SE->Start();
  149. //std::string _usb_1 = root["usb_3"].asString();
  150. //renableUSB("/dev/bus/usb/002/002");
  151. //renableUSB("/dev/bus/usb/001/003");
  152. //renableUSB(_usb_2.c_str());
  153. //renableUSB(_usb_1.c_str());
  154. serverip=root["ip"].asString();
  155. _hostPort=root["TcpHostPort"].asInt();
  156. _remotePort=root["TcpRemotePort"].asInt();
  157. _name=root["name"].asString();
  158. const Json::Value arrayObj=root["camerainfo"];
  159. int32_t count=arrayObj.size();
  160. for(int32_t i=0;i<count;i++)
  161. {
  162. LocalCameraInfo info;
  163. const Json::Value& value=arrayObj[i];
  164. info.index=value["index"].asInt();
  165. info.label=value["label"].asString();
  166. _cameraArray.push_back(info);
  167. }
  168. for(int i=0;i<_cameraArray.size();i++)
  169. {
  170. _peerArray.push_back({nullptr});
  171. }
  172. }
  173. else{
  174. std::string error=jsonReader.getFormattedErrorMessages();
  175. std::cout<<error<<std::endl;
  176. }
  177. _client=std::make_unique<SocketClient>(this);
  178. _client->Start(serverip.c_str(),_remotePort,_hostPort);
  179. std::this_thread::yield();
  180. }
  181. void CMessageQueue::EnQueue(CIOBuffer* pBuffer)
  182. {
  183. bool bNullBuffer=false;
  184. std::unique_lock <std::mutex> lck(_lock);
  185. if(Head==nullptr)
  186. {
  187. Head=Tail=pBuffer;
  188. bNullBuffer=true;
  189. }
  190. else{
  191. Tail->NextBuf=pBuffer;
  192. Tail=Tail->NextBuf;
  193. }
  194. pBuffer->NextBuf=nullptr;
  195. if(bNullBuffer)
  196. {
  197. //_cv.notify_one();
  198. //20231023
  199. _cv.notify_all();
  200. }
  201. }
  202. void CMessageQueue::Process()
  203. {
  204. CIOBuffer * ptr=nullptr;
  205. {
  206. std::unique_lock <std::mutex> lck(_lock);
  207. /*
  208. while(Head==nullptr)
  209. {
  210. _cv.wait(lck);
  211. }
  212. */
  213. while(Head==nullptr&&_cv.wait_for(lck,std::chrono::seconds(1))==std::cv_status::timeout)
  214. {
  215. CheckSignal();
  216. //std::cout<<".";
  217. //std::cout.flush();
  218. }
  219. }
  220. while(Head!=nullptr)
  221. {
  222. ptr=Head;
  223. Head=Head->NextBuf;
  224. if(ptr!=nullptr)
  225. {
  226. Message* message=reinterpret_cast<Message *>(ptr->Buffer);
  227. switch (message->cmd)
  228. {
  229. case MessageType::ReqVideo:
  230. OnNotifyReq((int32_t)message->param_l);
  231. break;
  232. case MessageType::RepVideo:
  233. OnNotifyRep((int32_t)message->param_l);
  234. break;
  235. case MessageType::Connected:
  236. OnNotifyConnected((bool)message->param_l);
  237. break;
  238. case MessageType::Leave:
  239. OnNotifyLeave();
  240. break;
  241. case MessageType::AsyncMessage:
  242. OnNotifyMessage();
  243. break;
  244. case MessageType::StopSensor:
  245. OnNotifyStopSensor();
  246. break;
  247. case MessageType::Ping:
  248. OnNotifyPing(message->param_l);
  249. break;
  250. }
  251. ptr->Release();
  252. }
  253. }
  254. }
  255. void CMessageQueue::SetTick(long long tick)
  256. {
  257. _curTick=tick;
  258. }
  259. void CMessageQueue::OnNotifyConnected(bool bRet)
  260. {
  261. if(bRet)
  262. {
  263. _client->WriteAddRobot(_serial,_name,static_cast<int32_t>(EgoType::Car));
  264. _updatethread.start(_client.get());
  265. //cs->Analog(0,0,0,0,0);
  266. }
  267. else
  268. {
  269. if(_peerId!=-1)
  270. {
  271. OnVideoLeave(_peerId,EgoType::User);
  272. _peerId=-1;
  273. }
  274. _updatethread.stop();
  275. }
  276. }
  277. // int day_diffient(int year_start, int month_start, int day_start,char *Dst)
  278. // {
  279. // //2024-03-11
  280. // int year_end = strtol(Dst,NULL,10);
  281. // char Temp = '-',*p = NULL,*p1 = NULL;
  282. // p = strchr(Dst,Temp);
  283. // int month_end = strtol(p + 1,NULL,10);
  284. // p1 = strchr(p + 1,Temp);
  285. // int day_end = strtol(p1 + 1,NULL,10);
  286. // int y2, m2, d2;
  287. // int y1, m1, d1;
  288. // m1 = (month_start + 9) % 12;
  289. // y1 = year_start - m1/10;
  290. // d1 = 365*y1 + y1/4 - y1/100 + y1/400 + (m1*306 + 5)/10 + (day_start - 1);
  291. // m2 = (month_end + 9) % 12;
  292. // y2 = year_end - m2/10;
  293. // d2 = 365*y2 + y2/4 - y2/100 + y2/400 + (m2*306 + 5)/10 + (day_end - 1);
  294. // return (d1 - d2);
  295. // }
  296. // void delete_days(const char *path)
  297. // {
  298. // DIR *dir;
  299. // struct dirent *entry;
  300. // struct stat statbuf;
  301. // time_t rawtime;
  302. // struct tm* info;
  303. // time(&rawtime);
  304. // info = localtime(&rawtime);
  305. // uint8_t secret[64] = { 0 };
  306. // sprintf((char*)secret, "%d-%.2d-%.2d.log", info->tm_year + 1900, info->tm_mon + 1, info->tm_mday);
  307. // printf("%s\r\n",secret);
  308. // dir = opendir(path);
  309. // if (dir == NULL)
  310. // {
  311. // printf("无法打开目录\n");
  312. // return;
  313. // }
  314. // while ((entry = readdir(dir)) != NULL)
  315. // {
  316. // if(entry->d_name[0]=='.')
  317. // continue;
  318. // //printf("%s\n",entry->d_name);
  319. // if(day_diffient(info->tm_year + 1900, info->tm_mon + 1, info->tm_mday,entry->d_name) > 15)
  320. // {
  321. // char Dst[128];
  322. // memset(Dst,0,128);
  323. // sprintf(Dst,"rm -rf /home/nvidia/ZJ_PRO/EgoSystem/build/log/%s",entry->d_name);
  324. // system(Dst);
  325. // }
  326. // }
  327. // closedir(dir);
  328. // }
  329. // void CMessageQueue::SerichFile(char *filename)
  330. // {
  331. // DIR *directory_pointer;
  332. // struct dirent *entry;
  333. // int exist = 0;
  334. // char Dst[128];
  335. // memset(Dst,0,128);
  336. // sprintf(Dst,"/home/nvidia/devdata/zhanjiang_0534/EgoSystem/build/log/%s",filename);
  337. // if((directory_pointer=opendir("/home/nvidia/devdata/zhanjiang_0534/EgoSystem/build/log/")) == NULL)
  338. // printf("Error open\n");
  339. // else
  340. // {
  341. // while((entry=readdir(directory_pointer)) != NULL)
  342. // {
  343. // if(entry->d_name[0]=='.') continue;
  344. // printf("%s\n",entry->d_name);
  345. // if(!strcmp(entry->d_name,filename))
  346. // {
  347. // File_fd = fopen(Dst, "a");
  348. // exist = 1;
  349. // break;
  350. // }
  351. // }
  352. // }
  353. // if(!exist)
  354. // File_fd = fopen(Dst, "w+");
  355. // closedir(directory_pointer);
  356. // }
  357. void CMessageQueue::OnNotifyReq(int32_t index)
  358. {
  359. // std::cout<<__FUNCTION__<<","<<__LINE__<<std::endl;
  360. if(_peerId==-1) return;
  361. if(index==0)
  362. {
  363. mrsWebrtcCreateFactory(true);
  364. _curTick=0;
  365. bStopedCar=false;
  366. _Rtk->Start();
  367. if(_Version)//AD10
  368. {
  369. _CanBusVehicle->Start();
  370. _CanBusRadar->Start();
  371. }
  372. else
  373. {
  374. _PcanBusVehicle->Start();
  375. _RadarIp->Start();
  376. }
  377. _Mqtt_ZR->Start();
  378. _Mqtt_SE->Start();
  379. // File_fd = NULL;
  380. // struct statfs diskInfo;
  381. // statfs("/dev/mmcblk0p1", &diskInfo);
  382. // unsigned long long freeDisk = diskInfo.f_bfree * diskInfo.f_bsize;
  383. // time_t rawtime;
  384. // struct tm* info;
  385. // time(&rawtime);
  386. // info = localtime(&rawtime);
  387. // uint8_t secret[64] = { 0 };
  388. // sprintf((char*)secret, "%d-%.2d-%.2d.log", info->tm_year + 1900, info->tm_mon + 1, info->tm_mday);
  389. // if((freeDisk >> 30) < 15)
  390. // {
  391. // system("rm -rf /home/nvidia/devdata/zhanjiang_0534/EgoSystem/build/log");
  392. // system("mkdir /home/nvidia/devdata/zhanjiang_0534/EgoSystem/build/log/");
  393. // }
  394. // else
  395. // delete_days("/home/nvidia/devdata/zhanjiang_0534/EgoSystem/build/log");
  396. // SerichFile((char *)secret);
  397. }
  398. _peerArray[index]=std::make_unique<CPeerConnection>(static_cast<ChannelType>(index),_client.get());
  399. _client->WriteVideoRep(_peerId, RemoNet::VideoDesc::OK, index);
  400. }
  401. void CMessageQueue::OnNotifyRep(int32_t index)
  402. {
  403. _peerArray[index]=std::make_unique<CPeerConnection>(static_cast<ChannelType>(index), _client.get());
  404. InitPeerConnection(_peerId,index);
  405. _peerArray[index]->CreateOffer();
  406. }
  407. void CMessageQueue::InitPeerConnection(int32_t peer,int32_t index)
  408. {
  409. _peerArray[index]->Initialize(peer,index,_UdpMinPort,_UdpMaxPort);
  410. _peerArray[index]->AddDataChannel(true, false);
  411. _peerArray[index]->AddLocalVideoTrack(static_cast<RenderPosition>(index),_cameraArray[index].index);
  412. if(index==RenderPosition::BACK)
  413. {
  414. void * front=_peerArray[RenderPosition::FRONT]->GetCurrentCtx();
  415. while(front==nullptr)
  416. {
  417. std::cout<<"front==nullptr"<<std::endl;
  418. std::this_thread::sleep_for(std::chrono::microseconds(50));
  419. front=_peerArray[RenderPosition::FRONT]->GetCurrentCtx();
  420. }
  421. void * back=_peerArray[RenderPosition::BACK]->GetCurrentCtx();
  422. while (back==nullptr)
  423. {
  424. std::cout<<"back==nullptr"<<std::endl;
  425. std::this_thread::sleep_for(std::chrono::microseconds(50));
  426. back=_peerArray[RenderPosition::BACK]->GetCurrentCtx();
  427. }
  428. _peerArray[RenderPosition::FRONT]->SetOtherCtx(back);
  429. _peerArray[RenderPosition::BACK]->SetOtherCtx(front);
  430. }
  431. /*
  432. if((index+1)==RenderPosition::ALL)
  433. {
  434. void * front=_peerArray[RenderPosition::FRONT]->GetCurrentCtx();
  435. while(front==nullptr)
  436. {
  437. std::cout<<"front==nullptr"<<std::endl;
  438. std::this_thread::sleep_for(std::chrono::microseconds(50));
  439. front=_peerArray[RenderPosition::FRONT]->GetCurrentCtx();
  440. }
  441. void * back=_peerArray[RenderPosition::BACK]->GetCurrentCtx();
  442. while (back==nullptr)
  443. {
  444. std::cout<<"back==nullptr"<<std::endl;
  445. std::this_thread::sleep_for(std::chrono::microseconds(50));
  446. back=_peerArray[RenderPosition::BACK]->GetCurrentCtx();
  447. }
  448. _peerArray[RenderPosition::FRONT]->SetOtherCtx(back);
  449. _peerArray[RenderPosition::BACK]->SetOtherCtx(front);
  450. void * left=_peerArray[RenderPosition::LEFT]->GetCurrentCtx();
  451. while(left==nullptr)
  452. {
  453. std::cout<<"left==nullptr"<<std::endl;
  454. std::this_thread::sleep_for(std::chrono::microseconds(50));
  455. front=_peerArray[RenderPosition::LEFT]->GetCurrentCtx();
  456. }
  457. void * right=_peerArray[RenderPosition::RIGHT]->GetCurrentCtx();
  458. while (right==nullptr)
  459. {
  460. std::cout<<"right==nullptr"<<std::endl;
  461. std::this_thread::sleep_for(std::chrono::microseconds(50));
  462. back=_peerArray[RenderPosition::RIGHT]->GetCurrentCtx();
  463. }
  464. _peerArray[RenderPosition::LEFT]->SetOtherCtx(right);
  465. _peerArray[RenderPosition::RIGHT]->SetOtherCtx(left);
  466. }
  467. */
  468. /*
  469. if((index+1)==RenderPosition::ALL)
  470. {
  471. void * front=_peerArray[RenderPosition::FRONT]->GetCurrentCtx();
  472. void * back=_peerArray[RenderPosition::BACK]->GetCurrentCtx();
  473. void * left=_peerArray[RenderPosition::LEFT]->GetCurrentCtx();
  474. void * right=_peerArray[RenderPosition::RIGHT]->GetCurrentCtx();
  475. void * dash=_peerArray[RenderPosition::DASHBOARD]->GetCurrentCtx();
  476. _peerArray[RenderPosition::FRONT]->SetOtherCtx(back);
  477. _peerArray[RenderPosition::BACK]->SetOtherCtx(front);
  478. _peerArray[RenderPosition::LEFT]->SetOtherCtx(left);
  479. _peerArray[RenderPosition::RIGHT]->SetOtherCtx(right);
  480. _peerArray[RenderPosition::DASHBOARD]->SetOtherCtx(dash);
  481. }
  482. */
  483. if(index==RenderPosition::FRONT)
  484. _peerArray[index]->AddLocalAudioTrack();
  485. }
  486. void CMessageQueue::OnAdd(bool bRet)
  487. {
  488. }
  489. void CMessageQueue::OnConnected(bool bRet)
  490. {
  491. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  492. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  493. message->cmd=MessageType::Connected;
  494. message->param_l=bRet;
  495. EnQueue(pBuffer);
  496. }
  497. void CMessageQueue::OnVideoLeave(int32_t peer,EgoType type)
  498. {
  499. {
  500. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  501. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  502. message->cmd=MessageType::StopSensor;
  503. EnQueue(pBuffer);
  504. }
  505. {
  506. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  507. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  508. message->cmd=MessageType::Leave;
  509. EnQueue(pBuffer);
  510. }
  511. }
  512. #ifdef WIN32
  513. void CMessageQueue::OnVideoRep(int32_t index,RemoNet::VideoDesc desc)
  514. {
  515. if (desc == RemoNet::VideoDesc::OK)
  516. {
  517. assert(_peerId!=-1);
  518. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  519. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  520. message->cmd=MessageType::ReqVideo;
  521. message->param_l=index;
  522. EnQueue(pBuffer);
  523. }
  524. }
  525. #else
  526. void CMessageQueue::OnVideoReq(int32_t video,int32_t peer)
  527. {
  528. // std::cout<<__FUNCTION__<<","<<__LINE__<<std::endl;
  529. _peerId=peer;
  530. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  531. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  532. message->cmd=MessageType::ReqVideo;
  533. message->param_l=video;
  534. EnQueue(pBuffer);
  535. }
  536. #endif
  537. void CMessageQueue::OnNotifyLeave()
  538. {
  539. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  540. for (size_t i = 0; i < _peerArray.size(); i++)
  541. {
  542. if(_peerArray[i]!=nullptr)
  543. {
  544. _peerArray[i]->Close();
  545. _peerArray[i].reset();
  546. }
  547. /* code */
  548. }
  549. _peerId=-1;
  550. }
  551. void CMessageQueue::OnNotifyStopSensor()
  552. {
  553. _curTick=0;
  554. if(_Version)//AD10
  555. {
  556. std::cout<<"_CanBusVehicle Stop"<<std::endl;
  557. _CanBusVehicle->Stop();
  558. std::cout<<"_CanBusRadar Stop"<<std::endl;
  559. _CanBusRadar->Stop();
  560. }
  561. else
  562. {
  563. std::cout<<"_PcanBusVehicle Stop"<<std::endl;
  564. _PcanBusVehicle->Stop();
  565. std::cout<<"_RadarIp Stop"<<std::endl;
  566. _RadarIp->Stop();
  567. }
  568. std::cout << "RTK Stop" << std::endl;
  569. _Rtk->Stop();
  570. std::cout<<"mqtt Stop"<<std::endl;
  571. _Mqtt_ZR->Stop();
  572. std::cout<<"data mqtt Stop"<<std::endl;
  573. _Mqtt_SE->Stop();
  574. // if(!File_fd)
  575. // {
  576. // fclose(File_fd);
  577. // File_fd = NULL;
  578. // }
  579. RemoNet::StopAck Rep;
  580. CIOBuffer Buffer;
  581. MessageHead Head;
  582. Head.Command = RemoNet::CC_StopACK;
  583. Head.Length = Rep.ByteSizeLong();
  584. Head.Serialize(Buffer.Buffer);
  585. auto ptr = Buffer.Buffer + MessageHead::Size();
  586. Rep.SerializeToArray(ptr, Head.Length);
  587. Buffer.Length = Head.Length + MessageHead::Size();
  588. _peerArray[RenderPosition::FRONT]->SendData(Buffer);
  589. }
  590. void CMessageQueue::OnVideoOffer(int32_t index,const char* type, const char* sdp)
  591. {
  592. // std::cout<<__FUNCTION__<<","<<__LINE__<<std::endl;
  593. InitPeerConnection(_peerId,index);
  594. _peerArray[index]->SetRemoteDescription(type,sdp);
  595. _peerArray[index]->CreateAnswer();
  596. }
  597. void CMessageQueue::OnVideoAnswer(int32_t index, const char* type, const char* sdp)
  598. {
  599. // std::cout<<__FUNCTION__<<","<<__LINE__<<std::endl;
  600. _peerArray[index]->SetRemoteDescription(type,sdp);
  601. }
  602. void CMessageQueue::OnVideoCandidate(int32_t index,const char* candidate,
  603. int32_t sdp_mline_index,
  604. const char* sdp_mid)
  605. {
  606. _peerArray[index]->AddIceCandidate(candidate,sdp_mline_index,sdp_mid);
  607. }
  608. /*
  609. void CMessageQueue::SwitchCamera(bool front)
  610. {
  611. _peerArray[RenderPosition::FRONT_BACK]->SwitchCapture(front);
  612. }
  613. */
  614. void CMessageQueue::OnMessageFrameNotify(ChannelType type,int16_t cmd,int16_t length,const void * data)
  615. {
  616. // std::cout<<"cmd:" <<std::hex<<cmd<<std::endl;
  617. if(cmd==RemoNet::CC_Text)
  618. {
  619. RemoNet::TestTextReq Req;
  620. Req.ParseFromArray(data,length);
  621. std::cout<<Req.text()<<std::endl;
  622. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  623. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  624. message->cmd=MessageType::AsyncMessage;
  625. EnQueue(pBuffer);
  626. }
  627. else if(cmd==RemoNet::CC_Switch)
  628. {
  629. RemoNet::CCSwitch Req;
  630. Req.ParseFromArray(data,length);
  631. bool front=Req.front();
  632. _peerArray[RenderPosition::FRONT]->SwitchCapture(front);
  633. }
  634. else if(cmd==RemoNet::CC_Ping)
  635. {
  636. RemoNet::CCPing Req;
  637. Req.ParseFromArray(data,length);
  638. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  639. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  640. message->cmd=MessageType::Ping;
  641. message->param_l=Req.tick();
  642. EnQueue(pBuffer);
  643. }
  644. else if(cmd==RemoNet::CC_SensorStop)
  645. {
  646. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  647. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  648. message->cmd=MessageType::StopSensor;
  649. EnQueue(pBuffer);
  650. }
  651. else if (cmd == RemoNet::CC_CANMSG)
  652. {
  653. _source = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  654. _curTick = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  655. RemoNet::CCCanMsg Req;
  656. Req.ParseFromArray(data, length);
  657. cannet_frame* frames = (cannet_frame*)alloca(sizeof(cannet_frame) * Req.frams_size());
  658. for (int32_t i = 0; i < Req.frams_size(); i++)
  659. {
  660. auto& frame = Req.frams(i);
  661. frames[i].canid = frame.canid();
  662. frames[i].dlc = frame.dlc();
  663. memcpy(frames[i].data, frame.data().data(), frame.dlc());
  664. }
  665. //_robot->Get()->OnMessage(frames, Req.frams_size());
  666. if(_Version)//AD10
  667. _CanBusVehicle->Get()->OnMessage(frames, Req.frams_size());
  668. else
  669. _PcanBusVehicle->Get()->OnMessage(frames, Req.frams_size());
  670. }
  671. }
  672. void CMessageQueue::StopCar()
  673. {
  674. std::cout<<"Stop Car"<<std::endl;
  675. if(_Version)//AD10
  676. _CanBusVehicle->Get()->Emergency();
  677. else
  678. _PcanBusVehicle->Get()->Emergency();
  679. }
  680. void CMessageQueue::OnNotifyMessage()
  681. {
  682. RemoNet::TestTextReq Req;
  683. Req.set_text("ewqewqewqe");
  684. CIOBuffer Buffer;
  685. MessageHead Head;
  686. Head.Command = RemoNet::CC_Text;
  687. Head.Length = Req.ByteSizeLong();
  688. Head.Serialize(Buffer.Buffer);
  689. auto ptr = Buffer.Buffer + MessageHead::Size();
  690. Req.SerializeToArray(ptr, Head.Length);
  691. Buffer.Length = Head.Length + MessageHead::Size();
  692. _peerArray[RenderPosition::FRONT]->SendData(Buffer);
  693. }
  694. void CMessageQueue::OnNotifyPing(int64_t value)
  695. {
  696. RemoNet::CCPing Rep;
  697. Rep.set_tick(value);
  698. CIOBuffer Buffer;
  699. MessageHead Head;
  700. Head.Command = RemoNet::CC_Ping;
  701. Head.Length = Rep.ByteSizeLong();
  702. Head.Serialize(Buffer.Buffer);
  703. auto ptr = Buffer.Buffer + MessageHead::Size();
  704. Rep.SerializeToArray(ptr, Head.Length);
  705. Buffer.Length = Head.Length + MessageHead::Size();
  706. //std::cout << "ping" << std::endl;
  707. if( _peerArray[RenderPosition::FRONT]!=nullptr && _peerArray[RenderPosition::FRONT]->bReadyChannel)
  708. _peerArray[RenderPosition::FRONT]->SendData(Buffer);
  709. }
  710. /*
  711. void CMessageQueue::StartCar()
  712. {
  713. _can->SetStartWrite(true);
  714. }
  715. */
  716. void CMessageQueue::CheckSignal()
  717. {
  718. if(!bStopedCar)
  719. {
  720. long long tick=std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  721. if(_curTick!=0&&tick-_curTick > 3)
  722. {
  723. StopCar();
  724. bStopedCar=true;
  725. std::cout<<"_curTick!=0&&tick-_curTick > 3" << std::endl;
  726. _curStopTick = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  727. btimeStopedCar = true;
  728. _curTick = 0;
  729. }
  730. }
  731. }
  732. void CMessageQueue::WriteIMUData(ImuData* data)
  733. {
  734. MessageHead Head;
  735. CIOBuffer Buffer;
  736. RemoNet::IMuMessage Req;
  737. Req.set_rx(data->ry);
  738. Req.set_ry(data->rx);
  739. // Req.set_rz(data->rz);
  740. Head.Command=RemoNet::CC_IMU;
  741. Head.Length=Req.ByteSizeLong();
  742. Head.Serialize(Buffer.Buffer);
  743. auto ptr = Buffer.Buffer + MessageHead::Size();
  744. Req.SerializeToArray(ptr, Head.Length);
  745. Buffer.Length = Head.Length + MessageHead::Size();
  746. if( _peerArray[ChannelType::CHANNEL_IMU]!=nullptr)
  747. _peerArray[ChannelType::CHANNEL_IMU]->SendData(Buffer);
  748. }
  749. void CMessageQueue::WritePacket(ChannelType type, CIOBuffer & pBuffer)
  750. {
  751. if( _peerArray[type]!=nullptr)
  752. _peerArray[type]->SendData(pBuffer);
  753. }
  754. void CMessageQueue::WriteRadarData(RadarData& data)
  755. {
  756. MessageHead Head;
  757. CIOBuffer Buffer;
  758. RemoNet::CCRadarMessage Req;
  759. Req.set_radar0(data.r0);
  760. Req.set_radar1(data.r1);
  761. Req.set_radar2(data.r2);
  762. Req.set_radar3(data.r3);
  763. Req.set_radar4(data.r4);
  764. Req.set_radar5(data.r5);
  765. Req.set_radar6(data.r6);
  766. Req.set_radar7(data.r7);
  767. //Head.Command=RemoNet::CC_IMU;
  768. Head.Command = RemoNet::CC_Radar;
  769. Head.Length=Req.ByteSizeLong();
  770. Head.Serialize(Buffer.Buffer);
  771. auto ptr = Buffer.Buffer + MessageHead::Size();
  772. Req.SerializeToArray(ptr, Head.Length);
  773. Buffer.Length = Head.Length + MessageHead::Size();
  774. if( _peerArray[ChannelType::CHANNEL_RADAR]!=nullptr)
  775. _peerArray[ChannelType::CHANNEL_RADAR]->SendData(Buffer);
  776. }
  777. void CMessageQueue::WriteRobotStatus(int32_t ,int32_t )
  778. {
  779. }
  780. #ifdef LIDAR_SENSOR
  781. void CMessageQueue::WriteLidarPoint(const PointCloudMsg<PointXYZI>& msg,ChannelType side)
  782. {
  783. RemoNet::LidarPoint pt;
  784. pt.set_is_left(side==ChannelType::CHANNEL_LEFT_LIDAR);
  785. pt.set_frame_id(msg.frame_id);
  786. pt.set_height(msg.height);
  787. pt.set_width(msg.width);
  788. pt.set_is_dense(msg.is_dense);
  789. pt.set_seq(msg.seq);
  790. pt.set_timestamp(msg.timestamp);
  791. for(int i=0;i<msg.point_cloud_ptr->size();i++)
  792. {
  793. pt.add_data((*msg.point_cloud_ptr)[i].x);
  794. pt.add_data((*msg.point_cloud_ptr)[i].y);
  795. pt.add_data((*msg.point_cloud_ptr)[i].z);
  796. pt.add_data((*msg.point_cloud_ptr)[i].intensity);
  797. }
  798. MessageHead Head;
  799. CIOBuffer Buffer;
  800. Head.Command=RemoNet::CC_LIDARDATA;
  801. Head.Length=pt.ByteSizeLong();
  802. Head.Serialize(Buffer.Buffer);
  803. auto ptr = Buffer.Buffer + MessageHead::Size();
  804. pt.SerializeToArray(ptr, Head.Length);
  805. Buffer.Length = Head.Length + MessageHead::Size();
  806. if( _peerArray[side]!=nullptr)
  807. _peerArray[side]->SendData(&Buffer);
  808. }
  809. #endif
  810. void CMessageQueue::SwitchCamera(bool front)
  811. {
  812. _peerArray[RenderPosition::FRONT]->SwitchCapture(front);
  813. _peerArray[RenderPosition::BACK]->SwitchCapture(front);
  814. }
  815. void CMessageQueue::SendZGJStatus(int status)
  816. {
  817. _Rtk->Get()->Send_ZGJ_status(status);
  818. }
  819. void CMessageQueue::SendVehicleStatus(vehicle_control_cv control_can)
  820. {
  821. //CDataMqttSensor::sendMessage()
  822. if (CDataMqttSensor::_run)
  823. {
  824. struct timeval tv;
  825. gettimeofday(&tv, NULL);
  826. char m_Send[16],Dst[16];
  827. memset(Dst, 0, 16);
  828. sprintf((char*)Dst, "%ld", tv.tv_sec * 1000 + tv.tv_usec / 1000);
  829. Json::Value root;
  830. Json::Value Source;
  831. Json::FastWriter writer;
  832. std::string SendTime;
  833. Source["VehicleID"] = 538;
  834. Source["timestamp"] = SendTime.assign(Dst,strlen(Dst));
  835. //车辆基本状态控制
  836. Json::Value baseControl;
  837. baseControl["keyStatus"] = control_can.keyStatus;
  838. baseControl["parkControl"] = control_can.parkControl;
  839. baseControl["travelMode"] = control_can.travelMode;
  840. baseControl["eStop"] = control_can.eStop;
  841. baseControl["directSwitch"] = control_can.directSwitch;
  842. baseControl["gearCaontrol"] = control_can.gearControl;
  843. baseControl["hazardLight"] = control_can.hazardLight;
  844. baseControl["travelLight"] = control_can.travelLight;
  845. baseControl["vehicleHorn"] = control_can.vehicleHorn;
  846. baseControl["silencedAlarm"] = control_can.silencedAlarm;
  847. Source["baseControl"].append(baseControl);
  848. //车辆行驶控制
  849. Json::Value driveControl;
  850. Json::Value accPedal;
  851. accPedal["accPedalH"] = control_can.accPedalH;
  852. accPedal["accPedalF"] = control_can.accPedalF;
  853. driveControl["accPedal"].append(accPedal);
  854. driveControl["brakePedal"] = control_can.brakePedal;
  855. driveControl["steeringWheel"] = control_can.steeringWheel;
  856. driveControl["turnSignal"] = control_can.turnSignal;
  857. driveControl["turnMode"] = control_can.turnMode;
  858. Source["driveControl"].append(driveControl);
  859. //作业机构控制
  860. Json::Value taskControl;
  861. taskControl["enableHydraulic"] = control_can.enableHydraulic;
  862. taskControl["toolControl"] = control_can.toolControl;
  863. taskControl["workLight"] = control_can.workLight;
  864. taskControl["bypassSwitch"] = control_can.bypassSwitch;
  865. taskControl["taskJoint_1"] = control_can.taskJoint_1;
  866. taskControl["taskJoint_2"] = control_can.taskJoint_2;
  867. taskControl["taskJoint_3"] = control_can.taskJoint_3;
  868. taskControl["endJoint"] = control_can.endJoint;
  869. taskControl["baseLegSwitch"] = control_can.baseLegSwitch;
  870. taskControl["baseLegControl"] = control_can.baseLegControl;
  871. taskControl["cabLift"] = control_can.cabLift;
  872. taskControl["esCabLift"] = control_can.esCabLift;
  873. taskControl["suckerSelect"] = control_can.suckerSelect;
  874. taskControl["cooperationSignal"] = control_can.coopSignal;
  875. Source["taskControl"].append(taskControl);
  876. //舱端故障报警
  877. Json::Value errCode;
  878. errCode["errBasOperation"] = control_can.errBasOperation;
  879. errCode["errAccPedal"] = control_can.errAccPedal;
  880. errCode["errBrakePedal"] = control_can.errBrakePedal;
  881. errCode["errSteeringWheel"] = control_can.errSteeringWheel;
  882. errCode["errHandle"] = control_can.errHandle;
  883. errCode["errEndTool"] = control_can.errEndTool;
  884. errCode["errOther"] = control_can.errOther;
  885. Source["errCode"].append(errCode);
  886. Json::StyledWriter sw;
  887. std::cout << sw.write(Source) << std::endl << std::endl;
  888. //控制发送频率
  889. // if (!DataMqtt_SendDate)
  890. // {
  891. // DataMqtt_curTick = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  892. // DataMqtt_SendDate = true;
  893. // }
  894. // else
  895. // {
  896. // long long tick = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  897. // if (DataMqtt_curTick != 0 && tick - DataMqtt_curTick > 10000)
  898. // {
  899. // if (CDataMqttSensor::_run && (CDataMqttSensor::m_mqttClient != NULL))
  900. // CDataMqttSensor::sendMessage((char*)sw.write(Source).c_str(), 0, (char*)"Vehicle/ControlVehicle/Veh001");
  901. // DataMqtt_SendDate = false;
  902. // }
  903. // }
  904. CDataMqttSensor::sendMessage((char*)sw.write(Source).c_str(), 0, (char*)"Vehicle/ControlVehicle/Veh001");
  905. }
  906. }