#include "pch.h" #include "api/video/video_frame.h" #include #include #include #include #include "peer_connection.h" #include "../common/comm.h" #include "api.h" #include "remote.pb.h" #include "../common/iobuffer.h" #include "remote_window.h" #include "message_queue.h" #include #include "api/video/video_sink_interface.h" #include #include #include "remote_control.h" #include "socket_remote.h" CMessageQueue::CMessageQueue(CRemoteCtrl* c):_head(nullptr),_tail(nullptr),_ctrl(c) { } CMessageQueue::~CMessageQueue() { } void CMessageQueue::Start(EgoType type, std::array& ar) { for (int i = 0; i < RenderPosition::ALL; i++) { auto p = std::make_unique(type,this, (RenderPosition)i,ar[i]); _WindowArray.push_back(std::move(p)); } _thread = std::thread(&CMessageQueue::Run, this); } void CMessageQueue::Stop() { _run = false; _thread.join(); } void CMessageQueue::OnConnect(int32_t peer) { if (_peer != -1) return; _peer = peer; mrsWebrtcCreateFactory(true, false); for (int i = 0; i < RenderPosition::ALL; i++) { _WindowArray[i]->SetPeer(peer); } _WindowArray[RenderPosition::FRONT]->OnAskVideoReq(); } void CMessageQueue::EnQueue(CIOBuffer* pBuffer) { bool bNullBuffer=false; std::unique_lock lck(_lock); if(_head ==nullptr) { _head =_tail=pBuffer; bNullBuffer=true; } else{ _tail->NextBuf=pBuffer; _tail = _tail->NextBuf; } pBuffer->NextBuf=nullptr; if(bNullBuffer) { _cv.notify_one(); } } void CMessageQueue::Run() { _run = true; CIOBuffer* ptr = nullptr; std::vector array; while(_run) { array.clear(); { std::unique_lock lck(_lock); while (_head == nullptr && _cv.wait_for(lck, std::chrono::milliseconds(500)) == std::cv_status::timeout) { if(_alive) OnIdle(); // CheckSignal(); // std::cout<<"."; // std::cout.flush(); } while (_head != nullptr) { ptr = _head; _head = _head->NextBuf; if (ptr != nullptr) { array.push_back(ptr); // Process(ptr); // } } } for (auto ptr:array ) { Process(ptr); ptr->Release(__FILE__, __LINE__); } } } void CMessageQueue::KeepAlive(bool bAlive) { _alive = bAlive; } SocketRemote* CMessageQueue::GetRemoteClient() { return _ctrl->GetRemoteClient(); } void CMessageQueue::Process(CIOBuffer* pBuffer) { Message* message = reinterpret_cast(pBuffer->Buffer); switch (message->cmd) { case WM_NOTIFY_REQ: if (message->index == 0) { mrsWebrtcCreateFactory(false, true); } _WindowArray[message->index]->OnNotifyReq((int32_t)message->param_l); break; case WM_NOTIFY_REP: _WindowArray[message->index]->OnNotifyRep((int32_t)message->param_l); break;; case WM_NOTIFY_ANSWER: _WindowArray[message->index]->OnNotifyAnswer((CIOBuffer*)message->param_l); break; case WM_NOTIFY_CANDIDATE: _WindowArray[message->index]->OnNotifyCandidate((CIOBuffer*)message->param_l); break; case WM_NOTIFY_OFFER: _WindowArray[message->index]->OnNotifyOffer((CIOBuffer*)message->param_l); break; case WM_ASK_VIDEOREQ: _WindowArray[message->index]->OnAskVideoReq(); break; default: break; } } void CMessageQueue::PostMessage(int32_t cmd, int32_t index, int64_t l , int64_t r ) { CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__); Message* message = reinterpret_cast(pBuffer->Buffer); message->cmd = cmd; message->index = index; message->param_l = l; message->param_r = r; EnQueue(pBuffer); } void CMessageQueue::OverlayVideo(RenderPosition pos, const webrtc::VideoFrame& frame) { _WindowArray[pos]->OverlayVideo(frame); } void CMessageQueue::OnIdle() { _ctrl->GetRemoteClient()->WriteKeepAlive(); } void CMessageQueue::NotifyLeave() { _peer = -1; for (int32_t i = RenderPosition::FRONT; i < RenderPosition::ALL; i++) { _WindowArray[i]->OnNotifyLeave(); } } void CMessageQueue::NotifyReq(int32_t peer, int32_t index) { if (index == 0) { _peer = peer; for (int32_t i = RenderPosition::FRONT; i < RenderPosition::ALL; i++) { _WindowArray[i]->SetPeer(peer); } } PostMessage(WM_NOTIFY_REQ, index, index); }