message_queue.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548
  1. #include <atomic>
  2. #include <string>
  3. #include "../common/comm.h"
  4. #include "api.h"
  5. #include "message_queue.h"
  6. #include "../common/iobuffer.h"
  7. #include "PeerConnection.h"
  8. #include "VideoRenderer.h"
  9. #include <jsoncpp/json/json.h>
  10. #include "Protocol.pb.h"
  11. #include <fstream>
  12. #include <cstring>
  13. CMessageQueue::CMessageQueue():Head(nullptr),Tail(nullptr)
  14. {
  15. _peerId=-1;
  16. bDataChannelCreated=false;
  17. }
  18. CMessageQueue::~CMessageQueue()
  19. {
  20. }
  21. void CMessageQueue::Create()
  22. {
  23. Json::Value root;
  24. Json::Reader jsonReader;
  25. std::ifstream ifile("./config.json");
  26. std::string serverip;
  27. if(jsonReader.parse(ifile,root))
  28. {
  29. std::cout<<"enter config json"<<std::endl;
  30. _serial=root["serial"].asString();
  31. _egoType=root["type"].asString()=="cave"?EgoType::Cave:EgoType::Car;
  32. if(_egoType==EgoType::Car)
  33. {
  34. _canport=root["can_port"].asInt();
  35. _hostport=root["host_port"].asInt();
  36. _canip=root["can_ip"].asString();
  37. _lidarport=root["lidar_port"].asInt();
  38. }
  39. serverip=root["ip"].asString();
  40. _name=root["name"].asString();
  41. _indexOffset=_egoType==EgoType::Cave?0:2;
  42. const Json::Value arrayObj=root["camerainfo"];
  43. int32_t count=arrayObj.size();
  44. for(int32_t i=0;i<count;i++)
  45. {
  46. LocalCameraInfo info;
  47. const Json::Value& value=arrayObj[i];
  48. info.index=value["index"].asInt();
  49. info.label=value["label"].asString();
  50. info.uri=value["uri"].asString();
  51. // if(_rtspCamera)
  52. // info.type=CaptureType::RTSP;
  53. /*
  54. else
  55. {
  56. if(info.uri.find("zed")!=info.uri.npos)
  57. {
  58. info.type=CaptureType::Zed2;
  59. }
  60. else if(info.uri.find("realsense")!=std::string::npos)
  61. {
  62. info.type=CaptureType::RealSense;
  63. }
  64. }
  65. */
  66. // info.master=value["master"].asBool();
  67. std::string hd=value["res"].asString();
  68. if(hd=="hd1080") info.solution=DisplayResolution::HD1080;
  69. else if(hd=="hd720") info.solution=DisplayResolution::HD720;
  70. _cameraArray.push_back(info);
  71. }
  72. for(int i=0;i<_cameraArray.size();i++)
  73. {
  74. _peerArray.push_back({nullptr});
  75. _windowArray.push_back({nullptr});
  76. }
  77. const Json::Value lidars=root["lidar"];
  78. count=lidars.size();
  79. for(int32_t i=0;i<count;i++)
  80. {
  81. int32_t value=lidars[i].asInt();
  82. _lidarArray.push_back(value);
  83. }
  84. const Json::Value emergency=root["emergency"];
  85. count=emergency.size();
  86. for(int32_t i=0;i<count;i++)
  87. {
  88. std::string str=emergency[i].asString();
  89. int32_t value=htonl(strtol(str.c_str(),nullptr,16));
  90. _emergencyArray.push_back(value);
  91. }
  92. const Json::Value carId=root["car_id"];
  93. count=carId.size();
  94. for(int32_t i=0;i<count;i++)
  95. {
  96. std::string str = carId[i].asString();
  97. int32_t value = htonl(strtol(str.c_str(), nullptr, 16));
  98. // int32_t value=carId[i].asInt();
  99. _carArray.push_back(value);
  100. }
  101. // _peerArray.reserve(_cameraArray.size());
  102. //_windowArray.reserve(_cameraArray.size());
  103. }
  104. _client=std::make_unique<SocketClient>(this);
  105. _client->Start(serverip.c_str());
  106. if(_egoType==EgoType::Car)
  107. {
  108. _can=std::make_unique<SocketCan>(this);
  109. _can->Start(_canip,_canport,_hostport);
  110. _lidar=std::make_unique<SocketLidar>(this);
  111. _lidar->Start(_lidarport);
  112. }
  113. std::this_thread::yield();
  114. // OnNotifyReq(0);
  115. }
  116. void CMessageQueue::WriteCanMessage(std::unordered_map<int32_t, cannet_frame>& node,bool islidar)
  117. {
  118. if(!bDataChannelCreated) return;
  119. // std::lock_guard<std::mutex> l(_canLock);
  120. RemoNet::CCCanMesage Req;
  121. Req.set_islidar(islidar);
  122. for(auto& p:node)
  123. {
  124. int32_t lidar=p.second.canid;
  125. auto m=Req.add_message();
  126. m->set_head(p.second.dlc);
  127. m->set_canid(lidar);
  128. m->set_data(p.second.data,8);
  129. }
  130. MessageHead Head;
  131. CIOBuffer pBuffer;
  132. Head.Command = RemoNet::CC_CAN;
  133. Head.Length = Req.ByteSizeLong();
  134. Head.Serialize(pBuffer.Buffer);
  135. auto ptr = pBuffer.Buffer + MessageHead::Size();
  136. Req.SerializeToArray(ptr, Head.Length);
  137. pBuffer.Length = MessageHead::Size() + Head.Length;
  138. _peerArray[0]->SendData(&pBuffer);
  139. }
  140. bool CMessageQueue::IsCarId(int32_t value)
  141. {
  142. return std::find(_carArray.begin(),_carArray.end(),value)!=_carArray.end();
  143. }
  144. void CMessageQueue::EnQueue(CIOBuffer* pBuffer)
  145. {
  146. bool bNullBuffer=false;
  147. std::unique_lock <std::mutex> lck(_lock);
  148. if(Head==nullptr)
  149. {
  150. Head=Tail=pBuffer;
  151. bNullBuffer=true;
  152. }
  153. else{
  154. Tail->NextBuf=pBuffer;
  155. Tail=Tail->NextBuf;
  156. }
  157. pBuffer->NextBuf=nullptr;
  158. if(bNullBuffer)
  159. {
  160. _cv.notify_one();
  161. }
  162. }
  163. void CMessageQueue::Process()
  164. {
  165. CIOBuffer * ptr=nullptr;
  166. {
  167. std::unique_lock <std::mutex> lck(_lock);
  168. /*
  169. while(Head==nullptr)
  170. {
  171. _cv.wait(lck);
  172. }
  173. */
  174. while(Head==nullptr&&_cv.wait_for(lck,std::chrono::seconds(1))==std::cv_status::timeout)
  175. {
  176. // CheckSignal();
  177. // std::cout<<".";
  178. // std::cout.flush();
  179. }
  180. }
  181. while(Head!=nullptr)
  182. {
  183. ptr=Head;
  184. Head=Head->NextBuf;
  185. if(ptr!=nullptr)
  186. {
  187. Message* message=reinterpret_cast<Message *>(ptr->Buffer);
  188. switch (message->cmd)
  189. {
  190. case MessageType::ReqVideo:
  191. OnNotifyReq((int32_t)message->param_l);
  192. break;
  193. case MessageType::RepVideo:
  194. OnNotifyRep((int32_t)message->param_l);
  195. break;
  196. case MessageType::Connected:
  197. OnNotifyConnected((bool)message->param_l);
  198. break;
  199. case MessageType::Leave:
  200. OnNotifyLeave();
  201. break;
  202. case MessageType::AsyncMessage:
  203. OnNotifyMessage();
  204. break;
  205. }
  206. ptr->Release();
  207. }
  208. }
  209. }
  210. void CMessageQueue::OnNotifyConnected(bool bRet)
  211. {
  212. if(bRet)
  213. {
  214. CameraInfo info[4];//=(CameraInfo *)alloca(sizeof(CameraInfo)*_cameraArray.size());
  215. for(auto i=0;i<_cameraArray.size();i++)
  216. {
  217. info[i].index= _cameraArray[i].index;
  218. info[i].label=_cameraArray[i].label;
  219. info[i].solution=_cameraArray[i].solution;
  220. }
  221. _client->WriteAddRobot(_serial,_name,static_cast<int32_t>(_egoType),_cameraArray.size(), info);
  222. _updatethread.start(_client.get());
  223. }
  224. else
  225. {
  226. if(_peerId!=-1)
  227. {
  228. for (size_t i = 0; i < _peerArray.size(); i++)
  229. {
  230. if(_peerArray[i]!=nullptr)
  231. {
  232. _peerArray[i]->Close();
  233. _peerArray[i].reset();
  234. }
  235. /* code */
  236. }
  237. for (size_t i = 0; i < _windowArray.size(); i++)
  238. {
  239. /* code */
  240. if(_windowArray[i]!=nullptr)
  241. _windowArray[i].reset();
  242. }
  243. _peerId=-1;
  244. StopCar();
  245. }
  246. _updatethread.stop();
  247. }
  248. }
  249. void CMessageQueue::OnNotifyReq(int32_t index)
  250. {
  251. // std::cout<<__FUNCTION__<<","<<__LINE__<<std::endl;
  252. if(index-_indexOffset==0)
  253. mrsWebrtcCreateFactory(true);
  254. auto temp=index-_indexOffset;
  255. int32_t width = _cameraArray[temp].solution == DisplayResolution::HD1080 ? 1920 : 1280;
  256. int32_t height = _cameraArray[temp].solution == DisplayResolution::HD1080 ? 1080 : 720;
  257. // PeerConnectionWrapper* peer = nullptr;
  258. // VideoRenderer* window = nullptr;
  259. _windowArray[temp]=std::make_unique<VideoRenderer>();// .reset(new VideoRenderer());
  260. _peerArray[temp]=std::make_unique<PeerConnectionWrapper>(_client.get());
  261. //InitPeerConnection(index);
  262. _client->WriteVideoRep(EgoType::User,_peerId, RemoNet::VideoDesc::OK, index);
  263. }
  264. void CMessageQueue::OnNotifyRep(int32_t index)
  265. {
  266. // std::cout<<__FUNCTION__<<","<<__LINE__<<std::endl;
  267. auto temp=index-_indexOffset;
  268. int32_t width = _cameraArray[temp].solution == DisplayResolution::HD1080 ? 1920 : 1280;
  269. int32_t height = _cameraArray[temp].solution == DisplayResolution::HD1080 ? 1080 : 720;
  270. // index-=_indexOffset;
  271. // PeerConnectionWrapper* peer = nullptr;
  272. // VideoRenderer* window = nullptr;
  273. _windowArray[temp]=std::make_unique<VideoRenderer>();
  274. _peerArray[temp]=std::make_unique<PeerConnectionWrapper>(_client.get());
  275. InitPeerConnection(_peerId,index);
  276. _peerArray[temp]->CreateOffer();
  277. }
  278. void CMessageQueue::InitPeerConnection(int32_t peer,int32_t index)
  279. {
  280. bool NeedData=index==CameraPosition::CAR_FRONT;
  281. auto temp=index-_indexOffset;
  282. _peerArray[temp]->Initialize(peer,index,NeedData);
  283. if(NeedData)
  284. {
  285. _peerArray[temp]->AddDataChannel(true, false);
  286. }
  287. // window.reset(new VideoRenderer());
  288. // window->SetRenderWindow(GetDlgItem(IDC_REMOTE), 1, 1);
  289. //_peerArray[index]->AddLocalArgb32VideoFrameReady(&VideoRenderer::FrameCallback, _windowArray[index].get());
  290. // if(_cameraArray[index].type==CaptureType::RTSP)
  291. // {
  292. int32_t width=_cameraArray[temp].solution==DisplayResolution::HD1080?1920:1280;
  293. int32_t height=_cameraArray[temp].solution==DisplayResolution::HD1080?1080:720;
  294. int32_t rotation=0;
  295. if(index==CameraPosition::CAR_LEFT)
  296. { rotation=270; int32_t temp=width;width=height;height=temp;}
  297. else if(index==CameraPosition::CAR_RIGHT)
  298. { rotation=90;int32_t temp=width;width=height;height=temp;}
  299. _peerArray[temp]->AddLocalVideoTrack(rotation,_cameraArray[temp].label,_cameraArray[temp].uri,width,height);
  300. // }
  301. // else
  302. // {
  303. // _peerArray[index]->AddLocalVideoTrack(_cameraArray[index].type, _cameraArray[index].solution, 30);
  304. // }
  305. _peerArray[temp]->AddLocalAudioTrack();
  306. _windowArray[temp]->StartRender(true);
  307. }
  308. void CMessageQueue::OnAdd(bool bRet)
  309. {
  310. }
  311. void CMessageQueue::OnConnected(bool bRet)
  312. {
  313. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  314. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  315. message->cmd=MessageType::Connected;
  316. message->param_l=bRet;
  317. EnQueue(pBuffer);
  318. }
  319. void CMessageQueue::OnVideoLeave(int32_t peer,EgoType type)
  320. {
  321. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  322. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  323. message->cmd=MessageType::Leave;
  324. EnQueue(pBuffer);
  325. }
  326. #ifdef WIN32
  327. void CMessageQueue::OnVideoRep(int32_t index,RemoNet::VideoDesc desc)
  328. {
  329. if (desc == RemoNet::VideoDesc::OK)
  330. {
  331. assert(_peerId!=-1);
  332. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  333. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  334. message->cmd=MessageType::ReqVideo;
  335. message->param_l=index;
  336. EnQueue(pBuffer);
  337. }
  338. }
  339. #else
  340. void CMessageQueue::OnVideoReq(int32_t video,int32_t peer)
  341. {
  342. // std::cout<<__FUNCTION__<<","<<__LINE__<<std::endl;
  343. _peerId=peer;
  344. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  345. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  346. message->cmd=MessageType::ReqVideo;
  347. message->param_l=video;
  348. EnQueue(pBuffer);
  349. }
  350. #endif
  351. void CMessageQueue::OnNotifyLeave()
  352. {
  353. bDataChannelCreated=false;
  354. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  355. for (size_t i = 0; i < _peerArray.size(); i++)
  356. {
  357. if(_peerArray[i]!=nullptr)
  358. {
  359. _peerArray[i]->Close();
  360. _peerArray[i].reset();
  361. }
  362. /* code */
  363. }
  364. for (size_t i = 0; i < _windowArray.size(); i++)
  365. {
  366. /* code */
  367. _windowArray[i].reset();
  368. }
  369. _peerId=-1;
  370. StopCar();
  371. }
  372. void CMessageQueue::OnVideoOffer(int32_t video,const char* type, const char* sdp)
  373. {
  374. // std::cout<<__FUNCTION__<<","<<__LINE__<<std::endl;
  375. InitPeerConnection(_peerId,video);
  376. auto temp=video-_indexOffset;
  377. _peerArray[temp]->SetRemoteDescription(type,sdp);
  378. _peerArray[temp]->CreateAnswer();
  379. }
  380. void CMessageQueue::OnVideoAnswer(int32_t video, const char* type, const char* sdp)
  381. {
  382. // std::cout<<__FUNCTION__<<","<<__LINE__<<std::endl;
  383. auto temp=video-_indexOffset;
  384. _peerArray[temp]->SetRemoteDescription(type,sdp);
  385. }
  386. void CMessageQueue::OnVideoCandidate(int32_t video,const char* candidate,
  387. int32_t sdp_mline_index,
  388. const char* sdp_mid)
  389. {
  390. auto temp=video-_indexOffset;
  391. _peerArray[temp]->AddIceCandidate(candidate,sdp_mline_index,sdp_mid);
  392. }
  393. void CMessageQueue::OnMessageFrameNotify(const void* data, const int32_t size)
  394. {
  395. if(size<MessageHead::Size()) return;
  396. MessageHead Head;
  397. int8_t* Data=(int8_t *)data;
  398. Head.Deserialize(Data);
  399. if(size<MessageHead::Size()+Head.Length) return;
  400. auto ptr=Data+MessageHead::Size();
  401. if(Head.Command==RemoNet::CC_Text)
  402. {
  403. RemoNet::TestTextReq Req;
  404. Req.ParseFromArray(ptr,Head.Length);
  405. std::cout<<Req.text()<<std::endl;
  406. CIOBuffer * pBuffer=CIOBuffer::Alloc();
  407. Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
  408. message->cmd=MessageType::AsyncMessage;
  409. EnQueue(pBuffer);
  410. }
  411. else
  412. if(Head.Command==RemoNet::CC_CAN)
  413. {
  414. _curTick=std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  415. bStopedCar=false;
  416. RemoNet::CCCanMesage Req;
  417. Req.ParseFromArray(ptr,size);
  418. int32_t count=Req.message_size();
  419. for(int32_t i=0;i<count;i++)
  420. {
  421. CIOBuffer Buffer;
  422. cannet_frame* msg=(cannet_frame *)Buffer.Buffer;
  423. auto& p=Req.message(i);
  424. msg->canid=p.canid();
  425. memcpy(msg->data,p.data().data(),8);
  426. // printf("%x ",ntohl(msg->canid));
  427. msg->dlc=p.head();
  428. Buffer.Length+=sizeof(cannet_frame);
  429. // msg++;
  430. _can->Write(&Buffer);
  431. }
  432. //printf("\n");
  433. }
  434. else if(Head.Command==RemoNet::CC_ASKDATACHANNEL)
  435. {
  436. bDataChannelCreated=true;
  437. bStopedCar=false;
  438. _curTick=std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  439. StartCar();
  440. }
  441. }
  442. void CMessageQueue::StopCar()
  443. {
  444. for(auto id:_emergencyArray)
  445. {
  446. CIOBuffer Buffer;
  447. cannet_frame* msg=(cannet_frame *)Buffer.Buffer;
  448. msg->canid=id;
  449. msg->dlc=8;
  450. memset(msg->data,0,sizeof(msg->data));
  451. Buffer.Length=sizeof(cannet_frame);
  452. _can->Write(&Buffer);
  453. }
  454. _can->SetStartWrite(false);
  455. }
  456. void CMessageQueue::OnNotifyMessage()
  457. {
  458. RemoNet::TestTextReq Req;
  459. Req.set_text("ewqewqewqe");
  460. CIOBuffer Buffer;
  461. MessageHead Head;
  462. Head.Command = RemoNet::CC_Text;
  463. Head.Length = Req.ByteSize();
  464. Head.Serialize(Buffer.Buffer);
  465. auto ptr = Buffer.Buffer + MessageHead::Size();
  466. Req.SerializeToArray(ptr, Head.Length);
  467. Buffer.Length = Head.Length + MessageHead::Size();
  468. _peerArray[0]->SendData(&Buffer);
  469. }
  470. void CMessageQueue::StartCar()
  471. {
  472. _can->SetStartWrite(true);
  473. }
  474. void CMessageQueue::CheckSignal()
  475. {
  476. if(!bDataChannelCreated) return;
  477. if(!bStopedCar)
  478. {
  479. long long tick=std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  480. if(tick-_curTick>1000)
  481. {
  482. StopCar();
  483. bStopedCar=true;
  484. }
  485. }
  486. }