message_queue.cpp 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122
  1. #include <atomic>
  2. #include <string>
  3. #include "../common/comm.h"
  4. #include "api.h"
  5. #include "../common/iobuffer.h"
  6. #include "../common/sensor_socket.h"
  7. #include "../common/peer_connection.h"
  8. #include "VideoRenderer.h"
  9. #include "../thirdparty/jsoncpp/include/json/json.h"
  10. #include "protocol.pb.h"
  11. #include "imu_sensor.h"
  12. #include "Rtk.h"
  13. #include "message_queue.h"
  14. #include <fstream>
  15. #include <iostream>
  16. #include <cstring>
  17. #include "hmacsha256.h"
  18. #include <linux/usbdevice_fs.h>
  19. #include <sys/ioctl.h>
  20. #include <sys/statfs.h>
  21. #include <stdlib.h>
  22. #include <unistd.h>
  23. #include <sys/vfs.h>
  24. #include <dirent.h>
  25. #include <sys/stat.h>
  26. CMessageQueue::CMessageQueue():Head(nullptr),Tail(nullptr)
  27. {
  28. _peerId=-1;
  29. btimeStopedCar = false;
  30. File_fd = NULL;
  31. _Version = 1;
  32. }
  33. CMessageQueue::~CMessageQueue()
  34. {
  35. }
  36. void renableUSB(const char* file)
  37. {
  38. printf("Resetting USB device %s\n", file);
  39. int fd = open(file, O_WRONLY);
  40. if (fd < 0) {
  41. char text[256];
  42. perror(text);
  43. printf("Error opening output file %s", text);
  44. return;
  45. }
  46. int rc = ioctl(fd, USBDEVFS_RESET, 0);
  47. if (rc < 0) {
  48. perror("Error in ioctl");
  49. return;
  50. }
  51. printf("Reset successful\n");
  52. close(fd);
  53. }
  54. void CMessageQueue::Create()
  55. {
  56. Json::Value root;
  57. Json::Reader jsonReader;
  58. std::ifstream ifile("./config.json");
  59. std::string serverip;
  60. int32_t _hostPort;
  61. int32_t _remotePort;
  62. _curTick=0;
  63. bStopedCar=false;
  64. if(jsonReader.parse(ifile,root))
  65. {
  66. std::cout<<"enter config json"<<std::endl;
  67. _serial=root["serial"].asString();
  68. int32_t _Rtkport=root["rtk_port"].asInt();
  69. int32_t _Rtkhost=root["rtk_host"].asInt();
  70. std::string _Rtkip=root["rtk_ip"].asString();
  71. std::cout<<"Rtk ip:"<< _Rtkip <<std::endl;
  72. _Rtk = std::make_unique<SensorSocket<CRtkSensor>>(this, _Rtkip, _Rtkport, _Rtkhost);
  73. //_Rtk->Start();
  74. _Version = root["Version"].asInt();
  75. if(_Version)//AD10
  76. {
  77. std::string can_port_radar = root["can_bus_radar"].asString();
  78. _CanBusRadar = std::make_unique<SensorCanBus<CCanRadarSensor>>(this, can_port_radar);
  79. //_CanBusRadar->Start();
  80. }
  81. else
  82. {
  83. //_RadarIp
  84. int32_t _radarport=root["radar_port"].asInt();
  85. int32_t _radarhost=root["radar_host"].asInt();
  86. std::string _radarip=root["radar_ip"].asString();
  87. std::cout<<"radar ip:"<<_radarip<<std::endl;
  88. _RadarIp = std::make_unique<SensorSocket<CRadarSensor>>(this,_radarip,_radarport,_radarhost);
  89. //_RadarIp->Start();
  90. }
  91. if(_Version)//AD10
  92. {
  93. std::string can_port = root["can_bus_vehicle"].asString();
  94. _CanBusVehicle = std::make_unique<SensorCanBus<CCanBusSensor>>(this, can_port);
  95. //_CanBusVehicle->Start();
  96. }
  97. else
  98. {
  99. _PcanBusVehicle = std::make_unique<SensorPeakCan<CPcanSensor>>(this);
  100. //_PcanBusVehicle->Start();
  101. }
  102. _UdpMinPort=root["udp_min_port"].asInt();
  103. _UdpMaxPort=root["udp_max_port"].asInt();
  104. //20230417
  105. //const char* serverUrl = "tcp://localhost:1883"; //服务器地址
  106. //const char* userName = ""; //用户名
  107. //const char* password = ""; //密码
  108. std::string serverUrl = root["MqttserverUrl"].asString();
  109. std::string userName = root["Esn"].asString();
  110. std::string password = root["EsnPass"].asString();
  111. std::string clientId = ""; //客户端标识符
  112. bool _Hm256 = false;
  113. unsigned char Source[128];
  114. time_t rawtime;
  115. struct tm* info;
  116. char buffer[80];
  117. time(&rawtime);
  118. info = localtime(&rawtime);
  119. uint8_t secret[32] = { 0 };
  120. //如UTC 时间2018/7/24 17:56:20 则应表示为2018072417。
  121. /*sprintf((char*)secret, "%d%.2d%.2d%.2d", info->tm_year + 1900, info->tm_mon + 1, info->tm_mday,
  122. info->tm_hour);*/
  123. sprintf((char*)secret, "%d%.2d%.2d", info->tm_year + 1900, info->tm_mon + 1, info->tm_mday);
  124. memset(Source, 0, 128);
  125. sprintf((char*)Source, "%s_0_0_%s", userName.c_str(), secret);
  126. clientId.clear();
  127. clientId.append((const char*)Source);
  128. if (password.length() && _Hm256)
  129. {
  130. uint8_t outdata[128] = { 0 };
  131. uint8_t md[SHA256_DIGESTLEN] = { 0 };
  132. int len1 = strlen((char*)secret);
  133. int len2 = strlen((char*)password.c_str());
  134. HMAC_SHA256_CTX hmac;
  135. hmac_sha256_init(&hmac, secret, len1);
  136. hmac_sha256_update(&hmac, (const uint8_t*)password.c_str(), len2);
  137. hmac_sha256_final(&hmac, md);
  138. memcpy(outdata, md, SHA256_DIGESTLEN);
  139. password.clear();
  140. password.append((const char*)outdata);
  141. }
  142. _Mqtt_ZR = std::make_unique<SensorMQTT<CMqttSensor>>(this, serverUrl, userName, password, clientId);
  143. std::string DataServerUrl = root["DataServerUrl"].asString();
  144. std::string DatauserName = root["DataEsn"].asString();
  145. std::string Datapassword = root["DataEsnPass"].asString();
  146. std::string DataclientId = root["DataClientId"].asString();
  147. _Mqtt_SE = std::make_unique<SensorMQTT<CDataMqttSensor>>(this, DataServerUrl, DatauserName, Datapassword, DataclientId);
  148. //_Mqtt_SE->Start();
  149. //std::string _usb_1 = root["usb_3"].asString();
  150. //renableUSB("/dev/bus/usb/002/002");
  151. //renableUSB("/dev/bus/usb/001/003");
  152. //renableUSB(_usb_2.c_str());
  153. //renableUSB(_usb_1.c_str());
  154. serverip=root["ip"].asString();
  155. _hostPort=root["TcpHostPort"].asInt();
  156. _remotePort=root["TcpRemotePort"].asInt();
  157. _name=root["name"].asString();
  158. const Json::Value arrayObj=root["camerainfo"];
  159. int32_t count=arrayObj.size();
  160. for(int32_t i=0;i<count;i++)
  161. {
  162. LocalCameraInfo info;
  163. const Json::Value& value=arrayObj[i];
  164. info.index=value["index"].asInt();
  165. info.label=value["label"].asString();
  166. _cameraArray.push_back(info);
  167. }
  168. for(int i=0;i<_cameraArray.size();i++)
  169. {
  170. _peerArray.push_back({nullptr});
  171. }
  172. }
  173. else{
  174. std::string error=jsonReader.getFormattedErrorMessages();
  175. std::cout<<error<<std::endl;
  176. }
  177. _client=std::make_unique<SocketClient>(this);
  178. _client->Start(serverip.c_str(),_remotePort,_hostPort);
  179. std::this_thread::yield();
  180. }
  181. void CMessageQueue::EnQueue(CIOBuffer* pBuffer)
  182. {
  183. bool bNullBuffer=false;
  184. std::unique_lock <std::mutex> lck(_lock);
  185. if(Head==nullptr)
  186. {
  187. Head=Tail=pBuffer;
  188. bNullBuffer=true;
  189. }
  190. else{
  191. Tail->NextBuf=pBuffer;
  192. Tail=Tail->NextBuf;
  193. }
  194. pBuffer->NextBuf=nullptr;
  195. if(bNullBuffer)
  196. {
  197. //_cv.notify_one();
  198. //20231023
  199. _cv.notify_all();
  200. }
  201. }
  202. void CMessageQueue::Process()
  203. {
  204. CIOBuffer * ptr=nullptr;
  205. {
  206. std::unique_lock <std::mutex> lck(_lock);
  207. /*
  208. while(Head==nullptr)
  209. {
  210. _cv.wait(lck);
  211. }
  212. */
  213. while(Head==nullptr&&_cv.wait_for(lck,std::chrono::seconds(1))==std::cv_status::timeout)
  214. {
  215. CheckSignal();
  216. //std::cout<<".";
  217. //std::cout.flush();
  218. }
  219. }
  220. while(Head!=nullptr)
  221. {
  222. ptr=Head;
  223. Head=Head->NextBuf;
  224. if(ptr!=nullptr)
  225. {
  226. Message* message=reinterpret_cast<Message *>(ptr->Buffer);
  227. switch (message->cmd)
  228. {
  229. case MessageType::ReqVideo:
  230. OnNotifyReq((int32_t)message->param_l);
  231. break;
  232. case MessageType::RepVideo:
  233. OnNotifyRep((int32_t)message->param_l);
  234. break;
  235. case MessageType::Connected:
  236. OnNotifyConnected((bool)message->param_l);
  237. break;
  238. case MessageType::Leave:
  239. OnNotifyLeave();
  240. break;
  241. case MessageType::AsyncMessage:
  242. OnNotifyMessage();
  243. break;
  244. case MessageType::StopSensor:
  245. OnNotifyStopSensor();
  246. break;
  247. case MessageType::Ping:
  248. OnNotifyPing(message->param_l);
  249. break;
  250. }
  251. ptr->Release();
  252. }
  253. }
  254. }
  255. void CMessageQueue::SetTick(long long tick)
  256. {
  257. _curTick=tick;
  258. }
  259. void CMessageQueue::OnNotifyConnected(bool bRet)
  260. {
  261. if(bRet)
  262. {
  263. _client->WriteAddRobot(_serial,_name,static_cast<int32_t>(EgoType::Car));
  264. _updatethread.start(_client.get());
  265. //cs->Analog(0,0,0,0,0);
  266. }
  267. else
  268. {
  269. if(_peerId!=-1)
  270. {
  271. OnVideoLeave(_peerId,EgoType::User);
  272. _peerId=-1;
  273. }
  274. _updatethread.stop();
  275. }
  276. }
  277. // int day_diffient(int year_start, int month_start, int day_start,char *Dst)
  278. // {
  279. // //2024-03-11
  280. // int year_end = strtol(Dst,NULL,10);
  281. // char Temp = '-',*p = NULL,*p1 = NULL;
  282. // p = strchr(Dst,Temp);
  283. // int month_end = strtol(p + 1,NULL,10);
  284. // p1 = strchr(p + 1,Temp);
  285. // int day_end = strtol(p1 + 1,NULL,10);
  286. // int y2, m2, d2;
  287. // int y1, m1, d1;
  288. // m1 = (month_start + 9) % 12;
  289. // y1 = year_start - m1/10;
  290. // d1 = 365*y1 + y1/4 - y1/100 + y1/400 + (m1*306 + 5)/10 + (day_start - 1);
  291. // m2 = (month_end + 9) % 12;
  292. // y2 = year_end - m2/10;
  293. // d2 = 365*y2 + y2/4 - y2/100 + y2/400 + (m2*306 + 5)/10 + (day_end - 1);
  294. // return (d1 - d2);
  295. // }
  296. // void delete_days(const char *path)
  297. // {
  298. // DIR *dir;
  299. // struct dirent *entry;
  300. // struct stat statbuf;
  301. // time_t rawtime;
  302. // struct tm* info;
  303. // time(&rawtime);
  304. // info = localtime(&rawtime);
  305. // uint8_t secret[64] = { 0 };
  306. // sprintf((char*)secret, "%d-%.2d-%.2d.log", info->tm_year + 1900, info->tm_mon + 1, info->tm_mday);
  307. // printf("%s\r\n",secret);
  308. // dir = opendir(path);
  309. // if (dir == NULL)
  310. // {
  311. // printf("无法打开目录\n");
  312. // return;
  313. // }
  314. // while ((entry = readdir(dir)) != NULL)
  315. // {
  316. // if(entry->d_name[0]=='.')
  317. // continue;
  318. // //printf("%s\n",entry->d_name);
  319. // if(day_diffient(info->tm_year + 1900, info->tm_mon + 1, info->tm_mday,entry->d_name) > 15)
  320. // {
  321. // char Dst[128];
  322. // memset(Dst,0,128);
  323. // sprintf(Dst,"rm -rf /home/nvidia/ZJ_PRO/EgoSystem/build/log/%s",entry->d_name);
  324. // system(Dst);
  325. // }
  326. // }
  327. // closedir(dir);
  328. // }
  329. // void CMessageQueue::SerichFile(char *filename)
  330. // {
  331. // DIR *directory_pointer;
  332. // struct dirent *entry;
  333. // int exist = 0;
  334. // char Dst[128];
  335. // memset(Dst,0,128);
  336. // sprintf(Dst,"/home/nvidia/devdata/zhanjiang_0534/EgoSystem/build/log/%s",filename);
  337. // if((directory_pointer=opendir("/home/nvidia/devdata/zhanjiang_0534/EgoSystem/build/log/")) == NULL)
  338. // printf("Error open\n");
  339. // else
  340. // {
  341. // while((entry=readdir(directory_pointer)) != NULL)
  342. // {
  343. // if(entry->d_name[0]=='.') continue;
  344. // printf("%s\n",entry->d_name);
  345. // if(!strcmp(entry->d_name,filename))
  346. // {
  347. // File_fd = fopen(Dst, "a");
  348. // exist = 1;
  349. // break;
  350. // }
  351. // }
  352. // }
  353. // if(!exist)
  354. // File_fd = fopen(Dst, "w+");
  355. // closedir(directory_pointer);
  356. // }
  357. void CMessageQueue::OnNotifyReq(int32_t index)
  358. {
  359. // std::cout<<__FUNCTION__<<","<<__LINE__<<std::endl;
  360. if(_peerId==-1) return;
  361. if(index==0)
  362. {
  363. mrsWebrtcCreateFactory(true);
  364. _curTick=0;
  365. bStopedCar=false;
  366. _Rtk->Start();
  367. if(_Version)//AD10
  368. {
  369. _CanBusVehicle->Start();
  370. _CanBusRadar->Start();
  371. }
  372. else
  373. {
  374. _PcanBusVehicle->Start();
  375. _RadarIp->Start();
  376. }
  377. _Mqtt_ZR->Start();
  378. _Mqtt_SE->Start();
  379. // File_fd = NULL;
  380. // struct statfs diskInfo;
  381. // statfs("/dev/mmcblk0p1", &diskInfo);
  382. // unsigned long long freeDisk = diskInfo.f_bfree * diskInfo.f_bsize;
  383. // time_t rawtime;
  384. // struct tm* info;
  385. // time(&rawtime);
  386. // info = localtime(&rawtime);
  387. // uint8_t secret[64] = { 0 };
  388. // sprintf((char*)secret, "%d-%.2d-%.2d.log", info->tm_year + 1900, info->tm_mon + 1, info->tm_mday);
  389. // if((freeDisk >> 30) < 15)
  390. // {
  391. // system("rm -rf /home/nvidia/devdata/zhanjiang_0534/EgoSystem/build/log");
  392. // system("mkdir /home/nvidia/devdata/zhanjiang_0534/EgoSystem/build/log/");
  393. // }
  394. // else
  395. // delete_days("/home/nvidia/devdata/zhanjiang_0534/EgoSystem/build/log");
  396. // SerichFile((char *)secret);
  397. }
  398. _peerArray[index]=std::make_unique<CPeerConnection>(static_cast<ChannelType>(index),_client.get());
  399. _client->WriteVideoRep(_peerId, RemoNet::VideoDesc::OK, index);
  400. }
  401. void CMessageQueue::OnNotifyRep(int32_t index)
  402. {
  403. _peerArray[index]=std::make_unique<CPeerConnection>(static_cast<ChannelType>(index), _client.get());
  404. InitPeerConnection(_peerId,index);
  405. _peerArray[index]->CreateOffer();
  406. }
  407. void CMessageQueue::InitPeerConnection(int32_t peer,int32_t index)
  408. {
  409. _peerArray[index]->Initialize(peer,index,_UdpMinPort,_UdpMaxPort);
  410. _peerArray[index]->AddDataChannel(true, false);
  411. _peerArray[index]->AddLocalVideoTrack(static_cast<RenderPosition>(index),_cameraArray[index].index);
  412. if(index==RenderPosition::BACK)
  413. {
  414. void * front=_peerArray[RenderPosition::FRONT]->GetCurrentCtx();
  415. while(front==nullptr)
  416. {
  417. std::cout<<"front==nullptr"<<std::endl;
  418. std::this_thread::sleep_for(std::chrono::microseconds(50));
  419. front=_peerArray[RenderPosition::FRONT]->GetCurrentCtx();
  420. }
  421. void * back=_peerArray[RenderPosition::BACK]->GetCurrentCtx();
  422. while (back==nullptr)
  423. {
  424. std::cout<<"back==nullptr"<<std::endl;
  425. std::this_thread::sleep_for(std::chrono::microseconds(50));
  426. back=_peerArray[RenderPosition::BACK]->GetCurrentCtx();
  427. }
  428. _peerArray[RenderPosition::FRONT]->SetOtherCtx(back);
  429. _peerArray[RenderPosition::BACK]->SetOtherCtx(front);
  430. }
  431. /*
  432. if((index+1)==RenderPosition::ALL)
  433. {
  434. void * front=_peerArray[RenderPosition::FRONT]->GetCurrentCtx();
  435. while(front==nullptr)
  436. {
  437. std::cout<<"front==nullptr"<<std::endl;
  438. std::this_thread::sleep_for(std::chrono::microseconds(50));
  439. front=_peerArray[RenderPosition::FRONT]->GetCurrentCtx();
  440. }
  441. void * back=_peerArray[RenderPosition::BACK]->GetCurrentCtx();
  442. while (back==nullptr)
  443. {
  444. std::cout<<"back==nullptr"<<std::endl;
  445. std::this_thread::sleep_for(std::chrono::microseconds(50));
  446. back=_peerArray[RenderPosition::BACK]->GetCurrentCtx();
  447. }
  448. _peerArray[RenderPosition::FRONT]->SetOtherCtx(back);
  449. _peerArray[RenderPosition::BACK]->SetOtherCtx(front);
  450. void * left=_peerArray[RenderPosition::LEFT]->GetCurrentCtx();
  451. while(left==nullptr)
  452. {
  453. std::cout<<"left==nullptr"<<std::endl;
  454. std::this_thread::sleep_for(std::chrono::microseconds(50));
  455. front=_peerArray[RenderPosition::LEFT]->GetCurrentCtx();
  456. }
  457. void * right=_peerArray[RenderPosition::RIGHT]->GetCurrentCtx();
  458. while (right==nullptr)
  459. {
  460. std::cout<<"right==nullptr"<<std::endl;
  461. std::this_thread::sleep_for(std::chrono::microseconds(50));
  462. back=_peerArray[RenderPosition::RIGHT]->GetCurrentCtx();
  463. }
  464. _peerArray[RenderPosition::LEFT]->SetOtherCtx(right);
  465. _peerArray[RenderPosition::RIGHT]->SetOtherCtx(left);
  466. }
  467. */
  468. /*
  469. if((index+1)==RenderPosition::ALL)
  470. {
  471. void * front=_peerArray[RenderPosition::FRONT]->GetCurrentCtx();
  472. void * back=_peerArray[RenderPosition::BACK]->GetCurrentCtx();
  473. void * left=_peerArray[RenderPosition::LEFT]->GetCurrentCtx();
  474. void * right=_peerArray[RenderPosition::RIGHT]->GetCurrentCtx();
  475. void * dash=_peerArray[RenderPosition::DASHBOARD]->GetCurrentCtx();
  476. _peerArray[RenderPosition::FRONT]->SetOtherCtx(back);
  477. _peerArray[RenderPosition::BACK]->SetOtherCtx(front);
  478. _peerArray[RenderPosition::LEFT]->SetOtherCtx(left);
  479. _peerArray[RenderPosition::RIGHT]->SetOtherCtx(right);
  480. _peerArray[RenderPosition::DASHBOARD]->SetOtherCtx(dash);
  481. }
  482. */
  483. if(index==RenderPosition::FRONT)
  484. _peerArray[index]->AddLocalAudioTrack();
  485. }
  486. void CMessageQueue::OnAdd(bool bRet)
  487. {
  488. }
  489. void CMessageQueue::OnConnected(bool bRet)
  490. {
  491. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  492. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  493. message->cmd=MessageType::Connected;
  494. message->param_l=bRet;
  495. EnQueue(pBuffer);
  496. }
  497. void CMessageQueue::OnVideoLeave(int32_t peer,EgoType type)
  498. {
  499. {
  500. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  501. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  502. message->cmd=MessageType::StopSensor;
  503. EnQueue(pBuffer);
  504. }
  505. {
  506. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  507. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  508. message->cmd=MessageType::Leave;
  509. EnQueue(pBuffer);
  510. }
  511. }
  512. #ifdef WIN32
  513. void CMessageQueue::OnVideoRep(int32_t index,RemoNet::VideoDesc desc)
  514. {
  515. if (desc == RemoNet::VideoDesc::OK)
  516. {
  517. assert(_peerId!=-1);
  518. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  519. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  520. message->cmd=MessageType::ReqVideo;
  521. message->param_l=index;
  522. EnQueue(pBuffer);
  523. }
  524. }
  525. #else
  526. void CMessageQueue::OnVideoReq(int32_t video,int32_t peer)
  527. {
  528. // std::cout<<__FUNCTION__<<","<<__LINE__<<std::endl;
  529. _peerId=peer;
  530. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  531. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  532. message->cmd=MessageType::ReqVideo;
  533. message->param_l=video;
  534. EnQueue(pBuffer);
  535. }
  536. #endif
  537. void CMessageQueue::OnNotifyLeave()
  538. {
  539. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  540. for (size_t i = 0; i < _peerArray.size(); i++)
  541. {
  542. if(_peerArray[i]!=nullptr)
  543. {
  544. _peerArray[i]->Close();
  545. _peerArray[i].reset();
  546. }
  547. /* code */
  548. }
  549. _peerId=-1;
  550. }
  551. void CMessageQueue::OnNotifyStopSensor()
  552. {
  553. _curTick=0;
  554. if(_Version)//AD10
  555. {
  556. std::cout<<"_CanBusVehicle Stop"<<std::endl;
  557. _CanBusVehicle->Stop();
  558. std::cout<<"_CanBusRadar Stop"<<std::endl;
  559. _CanBusRadar->Stop();
  560. }
  561. else
  562. {
  563. std::cout<<"_PcanBusVehicle Stop"<<std::endl;
  564. _PcanBusVehicle->Stop();
  565. std::cout<<"_RadarIp Stop"<<std::endl;
  566. _RadarIp->Stop();
  567. }
  568. std::cout << "RTK Stop" << std::endl;
  569. _Rtk->Stop();
  570. std::cout<<"mqtt Stop"<<std::endl;
  571. _Mqtt_ZR->Stop();
  572. std::cout<<"data mqtt Stop"<<std::endl;
  573. _Mqtt_SE->Stop();
  574. // if(!File_fd)
  575. // {
  576. // fclose(File_fd);
  577. // File_fd = NULL;
  578. // }
  579. RemoNet::StopAck Rep;
  580. CIOBuffer Buffer;
  581. MessageHead Head;
  582. Head.Command = RemoNet::CC_StopACK;
  583. Head.Length = Rep.ByteSizeLong();
  584. Head.Serialize(Buffer.Buffer);
  585. auto ptr = Buffer.Buffer + MessageHead::Size();
  586. Rep.SerializeToArray(ptr, Head.Length);
  587. Buffer.Length = Head.Length + MessageHead::Size();
  588. _peerArray[RenderPosition::FRONT]->SendData(Buffer);
  589. }
  590. void CMessageQueue::OnVideoOffer(int32_t index,const char* type, const char* sdp)
  591. {
  592. // std::cout<<__FUNCTION__<<","<<__LINE__<<std::endl;
  593. InitPeerConnection(_peerId,index);
  594. _peerArray[index]->SetRemoteDescription(type,sdp);
  595. _peerArray[index]->CreateAnswer();
  596. }
  597. void CMessageQueue::OnVideoAnswer(int32_t index, const char* type, const char* sdp)
  598. {
  599. // std::cout<<__FUNCTION__<<","<<__LINE__<<std::endl;
  600. _peerArray[index]->SetRemoteDescription(type,sdp);
  601. }
  602. void CMessageQueue::OnVideoCandidate(int32_t index,const char* candidate,
  603. int32_t sdp_mline_index,
  604. const char* sdp_mid)
  605. {
  606. _peerArray[index]->AddIceCandidate(candidate,sdp_mline_index,sdp_mid);
  607. }
  608. /*
  609. void CMessageQueue::SwitchCamera(bool front)
  610. {
  611. _peerArray[RenderPosition::FRONT_BACK]->SwitchCapture(front);
  612. }
  613. */
  614. void CMessageQueue::OnMessageFrameNotify(ChannelType type,int16_t cmd,int16_t length,const void * data)
  615. {
  616. // std::cout<<"cmd:" <<std::hex<<cmd<<std::endl;
  617. if(cmd==RemoNet::CC_Text)
  618. {
  619. RemoNet::TestTextReq Req;
  620. Req.ParseFromArray(data,length);
  621. std::cout<<Req.text()<<std::endl;
  622. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  623. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  624. message->cmd=MessageType::AsyncMessage;
  625. EnQueue(pBuffer);
  626. }
  627. else if(cmd==RemoNet::CC_Switch)
  628. {
  629. RemoNet::CCSwitch Req;
  630. Req.ParseFromArray(data,length);
  631. bool front=Req.front();
  632. _peerArray[RenderPosition::FRONT]->SwitchCapture(front);
  633. }
  634. else if(cmd==RemoNet::CC_Ping)
  635. {
  636. RemoNet::CCPing Req;
  637. Req.ParseFromArray(data,length);
  638. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  639. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  640. message->cmd=MessageType::Ping;
  641. message->param_l=Req.tick();
  642. EnQueue(pBuffer);
  643. }
  644. else if(cmd==RemoNet::CC_SensorStop)
  645. {
  646. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  647. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  648. message->cmd=MessageType::StopSensor;
  649. EnQueue(pBuffer);
  650. }
  651. else if (cmd == RemoNet::CC_CANMSG)
  652. {
  653. _source = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  654. _curTick = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  655. RemoNet::CCCanMsg Req;
  656. Req.ParseFromArray(data, length);
  657. cannet_frame* frames = (cannet_frame*)alloca(sizeof(cannet_frame) * Req.frams_size());
  658. for (int32_t i = 0; i < Req.frams_size(); i++)
  659. {
  660. auto& frame = Req.frams(i);
  661. frames[i].canid = frame.canid();
  662. frames[i].dlc = frame.dlc();
  663. memcpy(frames[i].data, frame.data().data(), frame.dlc());
  664. }
  665. //_robot->Get()->OnMessage(frames, Req.frams_size());
  666. if(_Version)//AD10
  667. _CanBusVehicle->Get()->OnMessage(frames, Req.frams_size());
  668. else
  669. _PcanBusVehicle->Get()->OnMessage(frames, Req.frams_size());
  670. }
  671. }
  672. void CMessageQueue::StopCar()
  673. {
  674. std::cout<<"Stop Car"<<std::endl;
  675. if(_Version)//AD10
  676. _CanBusVehicle->Get()->Emergency();
  677. else
  678. _PcanBusVehicle->Get()->Emergency();
  679. }
  680. void CMessageQueue::OnNotifyMessage()
  681. {
  682. RemoNet::TestTextReq Req;
  683. Req.set_text("ewqewqewqe");
  684. CIOBuffer Buffer;
  685. MessageHead Head;
  686. Head.Command = RemoNet::CC_Text;
  687. Head.Length = Req.ByteSizeLong();
  688. Head.Serialize(Buffer.Buffer);
  689. auto ptr = Buffer.Buffer + MessageHead::Size();
  690. Req.SerializeToArray(ptr, Head.Length);
  691. Buffer.Length = Head.Length + MessageHead::Size();
  692. _peerArray[RenderPosition::FRONT]->SendData(Buffer);
  693. }
  694. void CMessageQueue::OnNotifyPing(int64_t value)
  695. {
  696. RemoNet::CCPing Rep;
  697. Rep.set_tick(value);
  698. CIOBuffer Buffer;
  699. MessageHead Head;
  700. Head.Command = RemoNet::CC_Ping;
  701. Head.Length = Rep.ByteSizeLong();
  702. Head.Serialize(Buffer.Buffer);
  703. auto ptr = Buffer.Buffer + MessageHead::Size();
  704. Rep.SerializeToArray(ptr, Head.Length);
  705. Buffer.Length = Head.Length + MessageHead::Size();
  706. //std::cout << "ping" << std::endl;
  707. if( _peerArray[RenderPosition::FRONT]!=nullptr && _peerArray[RenderPosition::FRONT]->bReadyChannel)
  708. _peerArray[RenderPosition::FRONT]->SendData(Buffer);
  709. }
  710. /*
  711. void CMessageQueue::StartCar()
  712. {
  713. _can->SetStartWrite(true);
  714. }
  715. */
  716. void CMessageQueue::CheckSignal()
  717. {
  718. if(!bStopedCar)
  719. {
  720. long long tick=std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  721. if(_curTick!=0&&tick-_curTick > 3)
  722. {
  723. StopCar();
  724. bStopedCar=true;
  725. std::cout<<"_curTick!=0&&tick-_curTick > 3" << std::endl;
  726. _curStopTick = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  727. btimeStopedCar = true;
  728. _curTick = 0;
  729. }
  730. }
  731. }
  732. void CMessageQueue::WriteIMUData(ImuData* data)
  733. {
  734. MessageHead Head;
  735. CIOBuffer Buffer;
  736. RemoNet::IMuMessage Req;
  737. Req.set_rx(data->ry);
  738. Req.set_ry(data->rx);
  739. // Req.set_rz(data->rz);
  740. Head.Command=RemoNet::CC_IMU;
  741. Head.Length=Req.ByteSizeLong();
  742. Head.Serialize(Buffer.Buffer);
  743. auto ptr = Buffer.Buffer + MessageHead::Size();
  744. Req.SerializeToArray(ptr, Head.Length);
  745. Buffer.Length = Head.Length + MessageHead::Size();
  746. if( _peerArray[ChannelType::CHANNEL_IMU]!=nullptr)
  747. _peerArray[ChannelType::CHANNEL_IMU]->SendData(Buffer);
  748. }
  749. void CMessageQueue::WritePacket(ChannelType type, CIOBuffer & pBuffer)
  750. {
  751. if( _peerArray[type]!=nullptr)
  752. _peerArray[type]->SendData(pBuffer);
  753. }
  754. void CMessageQueue::WriteRadarData(RadarData& data)
  755. {
  756. MessageHead Head;
  757. CIOBuffer Buffer;
  758. RemoNet::CCRadarMessage Req;
  759. Req.set_radar0(data.r0);
  760. Req.set_radar1(data.r1);
  761. Req.set_radar2(data.r2);
  762. Req.set_radar3(data.r3);
  763. Req.set_radar4(data.r4);
  764. Req.set_radar5(data.r5);
  765. Req.set_radar6(data.r6);
  766. Req.set_radar7(data.r7);
  767. //Head.Command=RemoNet::CC_IMU;
  768. Head.Command = RemoNet::CC_Radar;
  769. Head.Length=Req.ByteSizeLong();
  770. Head.Serialize(Buffer.Buffer);
  771. auto ptr = Buffer.Buffer + MessageHead::Size();
  772. Req.SerializeToArray(ptr, Head.Length);
  773. Buffer.Length = Head.Length + MessageHead::Size();
  774. if( _peerArray[ChannelType::CHANNEL_RADAR]!=nullptr)
  775. _peerArray[ChannelType::CHANNEL_RADAR]->SendData(Buffer);
  776. }
  777. void CMessageQueue::WriteRobotStatus(int32_t ,int32_t )
  778. {
  779. }
  780. #ifdef LIDAR_SENSOR
  781. void CMessageQueue::WriteLidarPoint(const PointCloudMsg<PointXYZI>& msg,ChannelType side)
  782. {
  783. RemoNet::LidarPoint pt;
  784. pt.set_is_left(side==ChannelType::CHANNEL_LEFT_LIDAR);
  785. pt.set_frame_id(msg.frame_id);
  786. pt.set_height(msg.height);
  787. pt.set_width(msg.width);
  788. pt.set_is_dense(msg.is_dense);
  789. pt.set_seq(msg.seq);
  790. pt.set_timestamp(msg.timestamp);
  791. for(int i=0;i<msg.point_cloud_ptr->size();i++)
  792. {
  793. pt.add_data((*msg.point_cloud_ptr)[i].x);
  794. pt.add_data((*msg.point_cloud_ptr)[i].y);
  795. pt.add_data((*msg.point_cloud_ptr)[i].z);
  796. pt.add_data((*msg.point_cloud_ptr)[i].intensity);
  797. }
  798. MessageHead Head;
  799. CIOBuffer Buffer;
  800. Head.Command=RemoNet::CC_LIDARDATA;
  801. Head.Length=pt.ByteSizeLong();
  802. Head.Serialize(Buffer.Buffer);
  803. auto ptr = Buffer.Buffer + MessageHead::Size();
  804. pt.SerializeToArray(ptr, Head.Length);
  805. Buffer.Length = Head.Length + MessageHead::Size();
  806. if( _peerArray[side]!=nullptr)
  807. _peerArray[side]->SendData(&Buffer);
  808. }
  809. #endif
  810. void CMessageQueue::SwitchCamera(bool front)
  811. {
  812. _peerArray[RenderPosition::FRONT]->SwitchCapture(front);
  813. _peerArray[RenderPosition::BACK]->SwitchCapture(front);
  814. }
  815. void CMessageQueue::SendZGJStatus(int status)
  816. {
  817. _Rtk->Get()->Send_ZGJ_status(status);
  818. }
  819. // void CMessageQueue::SendVehicleStatus(int16_t Direction,int16_t Hand_Throttle,int16_t Foot_Throttle,int16_t Brake)
  820. // {
  821. // //CDataMqttSensor::sendMessage()
  822. // if (CDataMqttSensor::_run)
  823. // {
  824. // time_t rawtime;
  825. // struct tm* info;
  826. // time(&rawtime);
  827. // info = localtime(&rawtime);
  828. // char secret[64] = { 0 };
  829. // sprintf((char*)secret, "%d-%.2d-%.2d %.2d:%.2d:%.2d", info->tm_year + 1900, info->tm_mon + 1, info->tm_mday,info->tm_hour,info->tm_min,info->tm_sec);
  830. // printf("%s\r\n",secret);
  831. // char WriteLocalDat[128];
  832. // memset(WriteLocalDat,0,128);
  833. // sprintf(WriteLocalDat, "%d-%.2d-%.2d %.2d:%.2d:%.2d 方向值:%d 手油门值:%d 脚油门值:%d 刹车值:%d\n", info->tm_year + 1900, info->tm_mon + 1, info->tm_mday,info->tm_hour,info->tm_min,info->tm_sec,
  834. // Direction,Hand_Throttle,Foot_Throttle,Brake);
  835. // fwrite(WriteLocalDat, strlen(WriteLocalDat) , 1, File_fd);
  836. // Json::Value root;
  837. // Json::Value Source;
  838. // Json::FastWriter writer;
  839. // std::string SendTime;
  840. // Source["1"] = SendTime.assign(secret,strlen(secret));//发送时间
  841. // Source["2"] = Direction;//方向值
  842. // Source["3"] = Hand_Throttle;//手油门值
  843. // Source["4"] = Foot_Throttle;//脚油门值
  844. // Source["5"] = Brake;//刹车值
  845. // Json::StyledWriter sw;
  846. // //std::cout << sw.write(Source) << std::endl << std::endl;
  847. // if (!DataMqtt_SendDate)
  848. // {
  849. // DataMqtt_curTick = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  850. // DataMqtt_SendDate = true;
  851. // }
  852. // else
  853. // {
  854. // long long tick = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  855. // if (DataMqtt_curTick != 0 && tick - DataMqtt_curTick > 10000)
  856. // {
  857. // if (CDataMqttSensor::_run && (CDataMqttSensor::m_mqttClient != NULL))
  858. // CDataMqttSensor::sendMessage((char*)sw.write(Source).c_str(), 0, (char*)"bg/log");
  859. // DataMqtt_SendDate = false;
  860. // }
  861. // }
  862. // }
  863. // }
  864. void CMessageQueue::SendVehicleStatus(vehicle_status can_status)
  865. {
  866. //CDataMqttSensor::sendMessage()
  867. if (CDataMqttSensor::_run)
  868. {
  869. time_t rawtime;
  870. struct tm* info;
  871. time(&rawtime);
  872. info = localtime(&rawtime);
  873. char secret[64] = { 0 };
  874. sprintf((char*)secret, "%d-%.2d-%.2d %.2d:%.2d:%.2d", info->tm_year + 1900, info->tm_mon + 1, info->tm_mday,info->tm_hour,info->tm_min,info->tm_sec);
  875. printf("%s\r\n",secret);
  876. // char WriteLocalDat[128];
  877. // memset(WriteLocalDat,0,128);
  878. // sprintf(WriteLocalDat, "%d-%.2d-%.2d %.2d:%.2d:%.2d 方向值:%d 手油门值:%d 脚油门值:%d 刹车值:%d\n", info->tm_year + 1900, info->tm_mon + 1, info->tm_mday,info->tm_hour,info->tm_min,info->tm_sec,
  879. // Direction,Hand_Throttle,Foot_Throttle,Brake);
  880. // fwrite(WriteLocalDat, strlen(WriteLocalDat) , 1, File_fd);
  881. Json::Value root;
  882. Json::Value Source;
  883. Json::FastWriter writer;
  884. std::string SendTime;
  885. // Source["1"] = SendTime.assign(secret,strlen(secret));//发送时间
  886. // Source["2"] = Direction;//方向值
  887. // Source["3"] = Hand_Throttle;//手油门值
  888. // Source["4"] = Foot_Throttle;//脚油门值
  889. // Source["5"] = Brake;//刹车值
  890. Source["1"] = can_status.fire_status;//启动/熄火
  891. Source["2"] = can_status.remote_forward;//远程控制前进
  892. Source["3"] = can_status.remote_backward;//远程控制后退
  893. Source["4"] = can_status.Brake;//刹车
  894. Source["5"] = can_status.Park;//驻车
  895. Source["6"] = can_status.Outrigger_status;//支腿
  896. Source["7"] = can_status.Turn_angle;//转弯角度
  897. Source["8"] = can_status.Left_turn;//左旋 左手柄 左信号
  898. Source["9"] = can_status.Right_turn;//右旋 左手柄 右信号
  899. Source["10"] = can_status.Raised;//小臂抬起 左手柄 向后
  900. Source["11"] = can_status.Decrease;//小臂下降 左手柄 向前
  901. Source["12"] = can_status.Stretch;//大臂伸起 右手柄 向后
  902. Source["13"] = can_status.Bulls_decline;//大臂下降 右手柄 向前
  903. Source["14"] = can_status.Material;//吸盘吸料
  904. Source["15"] = can_status.Discharge;//吸盘放料
  905. Source["16"] = can_status.zhua;//抓斗抓料
  906. Source["17"] = can_status.fang;//抓斗放料
  907. Source["18"] = can_status.Shell_rotation_left;//贝壳斗旋转 shun
  908. Source["19"] = can_status.Shell_rotation_right;//贝壳斗旋转 ni
  909. Source["19"] = can_status.Cabin;//驾驶室升
  910. Source["20"] = can_status.Cab;//驾驶室降
  911. Source["21"] = can_status.Working_signal;//作业灯开/关
  912. Source["22"] = can_status.Turn_left_signal;//左转向 开/关
  913. Source["23"] = can_status.Turn_right_signal;//右转向 开/关
  914. Source["24"] = can_status.Warning_Light;//警示灯 开/关 ?
  915. Source["25"] = can_status.Trumpet;//喇叭
  916. Source["26"] = can_status.Security_lock;//安全锁 开/关
  917. Source["27"] = can_status.Urget_stop;//急停
  918. Source["28"] = can_status.Vehicle_id;//车辆id
  919. Source["29"] = SendTime.assign(secret,strlen(secret));;//发送时间
  920. Json::StyledWriter sw;
  921. //std::cout << sw.write(Source) << std::endl << std::endl;
  922. if (!DataMqtt_SendDate)
  923. {
  924. DataMqtt_curTick = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  925. DataMqtt_SendDate = true;
  926. }
  927. else
  928. {
  929. long long tick = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  930. if (DataMqtt_curTick != 0 && tick - DataMqtt_curTick > 10000)
  931. {
  932. if (CDataMqttSensor::_run && (CDataMqttSensor::m_mqttClient != NULL))
  933. CDataMqttSensor::sendMessage((char*)sw.write(Source).c_str(), 0, (char*)"bg/log");
  934. DataMqtt_SendDate = false;
  935. }
  936. }
  937. }
  938. }