123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- #include "pch.h"
- #include "api/video/video_frame.h"
- #include <atomic>
- #include <string>
- #include <array>
- #include <memory>
- #include "peer_connection.h"
- #include "../common/comm.h"
- #include "api.h"
- #include "remote.pb.h"
- #include "../common/iobuffer.h"
- #include "remote_window.h"
- #include "message_queue.h"
- #include <json/json.h>
- #include "api/video/video_sink_interface.h"
- #include <fstream>
- #include <cstring>
- #include "remote_control.h"
- #include "socket_remote.h"
- CMessageQueue::CMessageQueue(CRemoteCtrl* c):_head(nullptr),_tail(nullptr),_ctrl(c)
- {
-
- }
- CMessageQueue::~CMessageQueue()
- {
- }
- void CMessageQueue::Start(EgoType type, std::array<IRender*, RenderPosition::ALL>& ar)
- {
- for (int i = 0; i < RenderPosition::ALL; i++)
- {
- auto p = std::make_unique<CRemoteWindow>(type,this, (RenderPosition)i,ar[i]);
-
- _WindowArray.push_back(std::move(p));
- }
- _thread = std::thread(&CMessageQueue::Run, this);
- }
-
- void CMessageQueue::Stop()
- {
- _run = false;
- _thread.join();
- }
- void CMessageQueue::OnConnect(int32_t peer)
- {
- if (_peer != -1) return;
- _peer = peer;
- mrsWebrtcCreateFactory(true, false);
- for (int i = 0; i < RenderPosition::ALL; i++)
- {
- _WindowArray[i]->SetPeer(peer);
- }
- _WindowArray[RenderPosition::FRONT]->OnAskVideoReq();
-
- }
- void CMessageQueue::EnQueue(CIOBuffer* pBuffer)
- {
- bool bNullBuffer=false;
- std::unique_lock <std::mutex> lck(_lock);
- if(_head ==nullptr)
- {
- _head =_tail=pBuffer;
- bNullBuffer=true;
- }
- else{
- _tail->NextBuf=pBuffer;
- _tail = _tail->NextBuf;
- }
- pBuffer->NextBuf=nullptr;
- if(bNullBuffer)
- {
- _cv.notify_one();
- }
- }
- void CMessageQueue::Run()
- {
- _run = true;
- CIOBuffer* ptr = nullptr;
- std::vector<CIOBuffer* > array;
- while(_run)
- {
- array.clear();
- {
- std::unique_lock <std::mutex> lck(_lock);
- while (_head == nullptr && _cv.wait_for(lck, std::chrono::milliseconds(500)) == std::cv_status::timeout)
- {
- if(_alive)
- OnIdle();
- // CheckSignal();
- // std::cout<<".";
- // std::cout.flush();
- }
- while (_head != nullptr)
- {
- ptr = _head;
- _head = _head->NextBuf;
- if (ptr != nullptr)
- {
- array.push_back(ptr);
- // Process(ptr);
- //
- }
- }
- }
- for (auto ptr:array )
- {
- Process(ptr);
- ptr->Release(__FILE__, __LINE__);
- }
-
- }
-
- }
-
- void CMessageQueue::KeepAlive(bool bAlive)
- {
- _alive = bAlive;
- }
- SocketRemote* CMessageQueue::GetRemoteClient()
- {
- return _ctrl->GetRemoteClient();
- }
- void CMessageQueue::Process(CIOBuffer* pBuffer)
- {
- Message* message = reinterpret_cast<Message*>(pBuffer->Buffer);
- switch (message->cmd)
- {
- case WM_NOTIFY_REQ:
-
- if (message->index == 0)
- {
- mrsWebrtcCreateFactory(false, true);
- }
- _WindowArray[message->index]->OnNotifyReq((int32_t)message->param_l);
- break;
- case WM_NOTIFY_REP:
- _WindowArray[message->index]->OnNotifyRep((int32_t)message->param_l);
- break;;
- case WM_NOTIFY_ANSWER:
- _WindowArray[message->index]->OnNotifyAnswer((CIOBuffer*)message->param_l);
- break;
- case WM_NOTIFY_CANDIDATE:
- _WindowArray[message->index]->OnNotifyCandidate((CIOBuffer*)message->param_l);
- break;
- case WM_NOTIFY_OFFER:
- _WindowArray[message->index]->OnNotifyOffer((CIOBuffer*)message->param_l);
- break;
- case WM_ASK_VIDEOREQ:
- _WindowArray[message->index]->OnAskVideoReq();
- break;
- default:
- break;
- }
-
- }
- void CMessageQueue::PostMessage(int32_t cmd, int32_t index, int64_t l , int64_t r )
- {
- CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
- Message* message = reinterpret_cast<Message*>(pBuffer->Buffer);
- message->cmd = cmd;
- message->index = index;
- message->param_l = l;
- message->param_r = r;
- EnQueue(pBuffer);
- }
-
- void CMessageQueue::OverlayVideo(RenderPosition pos, const webrtc::VideoFrame& frame)
- {
- _WindowArray[pos]->OverlayVideo(frame);
- }
- void CMessageQueue::OnIdle()
- {
- _ctrl->GetRemoteClient()->WriteKeepAlive();
- }
- void CMessageQueue::NotifyLeave()
- {
- _peer = -1;
- for (int32_t i = RenderPosition::FRONT; i < RenderPosition::ALL; i++)
- {
- _WindowArray[i]->OnNotifyLeave();
- }
- }
- void CMessageQueue::NotifyReq(int32_t peer, int32_t index)
- {
- if (index == 0)
- {
- _peer = peer;
- for (int32_t i = RenderPosition::FRONT; i < RenderPosition::ALL; i++)
- {
- _WindowArray[i]->SetPeer(peer);
- }
- }
- PostMessage(WM_NOTIFY_REQ, index, index);
- }
|