#include #include #include "../common/comm.h" #include "api.h" #include "message_queue.h" #include "../common/iobuffer.h" #include "PeerConnection.h" #include "VideoRenderer.h" #include #include "Protocol.pb.h" #include #include CMessageQueue::CMessageQueue():Head(nullptr),Tail(nullptr) { _peerId=-1; bDataChannelCreated=false; } CMessageQueue::~CMessageQueue() { } void CMessageQueue::Create() { Json::Value root; Json::Reader jsonReader; std::ifstream ifile("./config.json"); std::string serverip; if(jsonReader.parse(ifile,root)) { std::cout<<"enter config json"<(this); _client->Start(serverip.c_str()); if(_egoType==EgoType::Car) { _can=std::make_unique(this); _can->Start(_canip,_canport,_hostport); _lidar=std::make_unique(this); _lidar->Start(_lidarport); } std::this_thread::yield(); // OnNotifyReq(0); } void CMessageQueue::WriteCanMessage(std::unordered_map& node,bool islidar) { if(!bDataChannelCreated) return; // std::lock_guard l(_canLock); RemoNet::CCCanMesage Req; Req.set_islidar(islidar); for(auto& p:node) { int32_t lidar=p.second.canid; auto m=Req.add_message(); m->set_head(p.second.dlc); m->set_canid(lidar); m->set_data(p.second.data,8); } MessageHead Head; CIOBuffer pBuffer; Head.Command = RemoNet::CC_CAN; Head.Length = Req.ByteSizeLong(); Head.Serialize(pBuffer.Buffer); auto ptr = pBuffer.Buffer + MessageHead::Size(); Req.SerializeToArray(ptr, Head.Length); pBuffer.Length = MessageHead::Size() + Head.Length; _peerArray[0]->SendData(&pBuffer); } bool CMessageQueue::IsCarId(int32_t value) { return std::find(_carArray.begin(),_carArray.end(),value)!=_carArray.end(); } void CMessageQueue::EnQueue(CIOBuffer* pBuffer) { bool bNullBuffer=false; std::unique_lock lck(_lock); if(Head==nullptr) { Head=Tail=pBuffer; bNullBuffer=true; } else{ Tail->NextBuf=pBuffer; Tail=Tail->NextBuf; } pBuffer->NextBuf=nullptr; if(bNullBuffer) { _cv.notify_one(); } } void CMessageQueue::Process() { CIOBuffer * ptr=nullptr; { std::unique_lock lck(_lock); /* while(Head==nullptr) { _cv.wait(lck); } */ while(Head==nullptr&&_cv.wait_for(lck,std::chrono::seconds(1))==std::cv_status::timeout) { // CheckSignal(); // std::cout<<"."; // std::cout.flush(); } } while(Head!=nullptr) { ptr=Head; Head=Head->NextBuf; if(ptr!=nullptr) { Message* message=reinterpret_cast(ptr->Buffer); switch (message->cmd) { case MessageType::ReqVideo: OnNotifyReq((int32_t)message->param_l); break; case MessageType::RepVideo: OnNotifyRep((int32_t)message->param_l); break; case MessageType::Connected: OnNotifyConnected((bool)message->param_l); break; case MessageType::Leave: OnNotifyLeave(); break; case MessageType::AsyncMessage: OnNotifyMessage(); break; } ptr->Release(); } } } void CMessageQueue::OnNotifyConnected(bool bRet) { if(bRet) { CameraInfo info[4];//=(CameraInfo *)alloca(sizeof(CameraInfo)*_cameraArray.size()); for(auto i=0;i<_cameraArray.size();i++) { info[i].index= _cameraArray[i].index; info[i].label=_cameraArray[i].label; info[i].solution=_cameraArray[i].solution; } _client->WriteAddRobot(_serial,_name,static_cast(_egoType),_cameraArray.size(), info); _updatethread.start(_client.get()); } else { if(_peerId!=-1) { for (size_t i = 0; i < _peerArray.size(); i++) { if(_peerArray[i]!=nullptr) { _peerArray[i]->Close(); _peerArray[i].reset(); } /* code */ } for (size_t i = 0; i < _windowArray.size(); i++) { /* code */ if(_windowArray[i]!=nullptr) _windowArray[i].reset(); } _peerId=-1; StopCar(); } _updatethread.stop(); } } void CMessageQueue::OnNotifyReq(int32_t index) { // std::cout<<__FUNCTION__<<","<<__LINE__<();// .reset(new VideoRenderer()); _peerArray[temp]=std::make_unique(_client.get()); //InitPeerConnection(index); _client->WriteVideoRep(EgoType::User,_peerId, RemoNet::VideoDesc::OK, index); } void CMessageQueue::OnNotifyRep(int32_t index) { // std::cout<<__FUNCTION__<<","<<__LINE__<(); _peerArray[temp]=std::make_unique(_client.get()); InitPeerConnection(_peerId,index); _peerArray[temp]->CreateOffer(); } void CMessageQueue::InitPeerConnection(int32_t peer,int32_t index) { bool NeedData=index==CameraPosition::CAR_FRONT; auto temp=index-_indexOffset; _peerArray[temp]->Initialize(peer,index,NeedData); if(NeedData) { _peerArray[temp]->AddDataChannel(true, false); } // window.reset(new VideoRenderer()); // window->SetRenderWindow(GetDlgItem(IDC_REMOTE), 1, 1); //_peerArray[index]->AddLocalArgb32VideoFrameReady(&VideoRenderer::FrameCallback, _windowArray[index].get()); // if(_cameraArray[index].type==CaptureType::RTSP) // { int32_t width=_cameraArray[temp].solution==DisplayResolution::HD1080?1920:1280; int32_t height=_cameraArray[temp].solution==DisplayResolution::HD1080?1080:720; int32_t rotation=0; if(index==CameraPosition::CAR_LEFT) { rotation=270; int32_t temp=width;width=height;height=temp;} else if(index==CameraPosition::CAR_RIGHT) { rotation=90;int32_t temp=width;width=height;height=temp;} _peerArray[temp]->AddLocalVideoTrack(rotation,_cameraArray[temp].label,_cameraArray[temp].uri,width,height); // } // else // { // _peerArray[index]->AddLocalVideoTrack(_cameraArray[index].type, _cameraArray[index].solution, 30); // } _peerArray[temp]->AddLocalAudioTrack(); _windowArray[temp]->StartRender(true); } void CMessageQueue::OnAdd(bool bRet) { } void CMessageQueue::OnConnected(bool bRet) { CIOBuffer * pBuffer=CIOBuffer::Alloc(); Message* message=reinterpret_cast(pBuffer->Buffer); message->cmd=MessageType::Connected; message->param_l=bRet; EnQueue(pBuffer); } void CMessageQueue::OnVideoLeave(int32_t peer,EgoType type) { CIOBuffer * pBuffer=CIOBuffer::Alloc(); Message* message=reinterpret_cast(pBuffer->Buffer); message->cmd=MessageType::Leave; EnQueue(pBuffer); } #ifdef WIN32 void CMessageQueue::OnVideoRep(int32_t index,RemoNet::VideoDesc desc) { if (desc == RemoNet::VideoDesc::OK) { assert(_peerId!=-1); CIOBuffer * pBuffer=CIOBuffer::Alloc(); Message* message=reinterpret_cast(pBuffer->Buffer); message->cmd=MessageType::ReqVideo; message->param_l=index; EnQueue(pBuffer); } } #else void CMessageQueue::OnVideoReq(int32_t video,int32_t peer) { // std::cout<<__FUNCTION__<<","<<__LINE__<(pBuffer->Buffer); message->cmd=MessageType::ReqVideo; message->param_l=video; EnQueue(pBuffer); } #endif void CMessageQueue::OnNotifyLeave() { bDataChannelCreated=false; std::this_thread::sleep_for(std::chrono::milliseconds(100)); for (size_t i = 0; i < _peerArray.size(); i++) { if(_peerArray[i]!=nullptr) { _peerArray[i]->Close(); _peerArray[i].reset(); } /* code */ } for (size_t i = 0; i < _windowArray.size(); i++) { /* code */ _windowArray[i].reset(); } _peerId=-1; StopCar(); } void CMessageQueue::OnVideoOffer(int32_t video,const char* type, const char* sdp) { // std::cout<<__FUNCTION__<<","<<__LINE__<SetRemoteDescription(type,sdp); _peerArray[temp]->CreateAnswer(); } void CMessageQueue::OnVideoAnswer(int32_t video, const char* type, const char* sdp) { // std::cout<<__FUNCTION__<<","<<__LINE__<SetRemoteDescription(type,sdp); } void CMessageQueue::OnVideoCandidate(int32_t video,const char* candidate, int32_t sdp_mline_index, const char* sdp_mid) { auto temp=video-_indexOffset; _peerArray[temp]->AddIceCandidate(candidate,sdp_mline_index,sdp_mid); } void CMessageQueue::OnMessageFrameNotify(const void* data, const int32_t size) { if(size(pBuffer->Buffer); message->cmd=MessageType::AsyncMessage; EnQueue(pBuffer); } else if(Head.Command==RemoNet::CC_CAN) { _curTick=std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); bStopedCar=false; RemoNet::CCCanMesage Req; Req.ParseFromArray(ptr,size); int32_t count=Req.message_size(); for(int32_t i=0;icanid=p.canid(); memcpy(msg->data,p.data().data(),8); // printf("%x ",ntohl(msg->canid)); msg->dlc=p.head(); Buffer.Length+=sizeof(cannet_frame); // msg++; _can->Write(&Buffer); } //printf("\n"); } else if(Head.Command==RemoNet::CC_ASKDATACHANNEL) { bDataChannelCreated=true; bStopedCar=false; _curTick=std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); StartCar(); } } void CMessageQueue::StopCar() { for(auto id:_emergencyArray) { CIOBuffer Buffer; cannet_frame* msg=(cannet_frame *)Buffer.Buffer; msg->canid=id; msg->dlc=8; memset(msg->data,0,sizeof(msg->data)); Buffer.Length=sizeof(cannet_frame); _can->Write(&Buffer); } _can->SetStartWrite(false); } void CMessageQueue::OnNotifyMessage() { RemoNet::TestTextReq Req; Req.set_text("ewqewqewqe"); CIOBuffer Buffer; MessageHead Head; Head.Command = RemoNet::CC_Text; Head.Length = Req.ByteSize(); Head.Serialize(Buffer.Buffer); auto ptr = Buffer.Buffer + MessageHead::Size(); Req.SerializeToArray(ptr, Head.Length); Buffer.Length = Head.Length + MessageHead::Size(); _peerArray[0]->SendData(&Buffer); } void CMessageQueue::StartCar() { _can->SetStartWrite(true); } void CMessageQueue::CheckSignal() { if(!bDataChannelCreated) return; if(!bStopedCar) { long long tick=std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); if(tick-_curTick>1000) { StopCar(); bStopedCar=true; } } }