| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 | 
							- #include "pch.h"
 
- #include "api/video/video_frame.h"
 
- #include <atomic>
 
- #include <string>
 
- #include <array>
 
- #include <memory>
 
- #include "peer_connection.h"
 
- #include "../common/comm.h"
 
- #include "api.h"
 
- #include "remote.pb.h"
 
- #include "../common/iobuffer.h"
 
- #include "remote_window.h"
 
- #include "message_queue.h"
 
- #include <json/json.h>
 
- #include "api/video/video_sink_interface.h"
 
- #include <fstream>
 
- #include <cstring>
 
- #include "remote_control.h"
 
- #include "socket_remote.h"
 
- CMessageQueue::CMessageQueue(CRemoteCtrl* c):_head(nullptr),_tail(nullptr),_ctrl(c)
 
- {
 
-     
 
- }
 
- CMessageQueue::~CMessageQueue()
 
- {
 
- }
 
- void CMessageQueue::Start(EgoType type, std::array<IRender*, RenderPosition::ALL>& ar)
 
- {
 
-     for (int i = 0; i < RenderPosition::ALL; i++)
 
-     {
 
-         auto p = std::make_unique<CRemoteWindow>(type,this, (RenderPosition)i,ar[i]);
 
-       
 
-         _WindowArray.push_back(std::move(p));
 
-     }
 
-     _thread = std::thread(&CMessageQueue::Run, this);
 
- }
 
-  
 
- void CMessageQueue::Stop()
 
- {
 
-     _run = false;
 
-     _thread.join();
 
- }
 
- void CMessageQueue::OnConnect(int32_t peer)
 
- {
 
-     if (_peer != -1) return;
 
-     _peer = peer;
 
-     mrsWebrtcCreateFactory(true, false);
 
-     for (int i = 0; i < RenderPosition::ALL; i++)
 
-     {
 
-         _WindowArray[i]->SetPeer(peer);
 
-     }
 
-     _WindowArray[RenderPosition::FRONT]->OnAskVideoReq();
 
-    
 
- }
 
- void CMessageQueue::EnQueue(CIOBuffer* pBuffer)
 
- {
 
-     bool bNullBuffer=false;
 
-     std::unique_lock <std::mutex>  lck(_lock);
 
-     if(_head ==nullptr)
 
-     {
 
-         _head =_tail=pBuffer;
 
-         bNullBuffer=true;
 
-     }
 
-     else{
 
-         _tail->NextBuf=pBuffer;
 
-         _tail = _tail->NextBuf;
 
-     } 
 
-     pBuffer->NextBuf=nullptr;
 
-     if(bNullBuffer)
 
-     {
 
-         _cv.notify_one();
 
-     }
 
- }
 
- void CMessageQueue::Run()
 
- {
 
-     _run = true;
 
- 	CIOBuffer* ptr = nullptr;
 
-     std::vector<CIOBuffer* >  array;
 
-     while(_run)
 
-     {
 
-         array.clear();
 
-         {
 
-             std::unique_lock <std::mutex>  lck(_lock);
 
-             while (_head == nullptr && _cv.wait_for(lck, std::chrono::milliseconds(500)) == std::cv_status::timeout)
 
-             {
 
-                 if(_alive)
 
-                      OnIdle();
 
-                 //  CheckSignal();
 
-                  // std::cout<<".";
 
-                  // std::cout.flush();
 
-             }
 
-             while (_head != nullptr)
 
-             {
 
-                 ptr = _head;
 
-                 _head = _head->NextBuf;
 
-                 if (ptr != nullptr)
 
-                 {
 
-                     array.push_back(ptr);
 
-                     //  Process(ptr);
 
-                   //	
 
-                 }
 
-             }
 
-         }
 
-         for (auto ptr:array )
 
-         {
 
-             Process(ptr);
 
-             ptr->Release(__FILE__, __LINE__);
 
-         }
 
-         
 
-     }
 
-    
 
- }
 
-  
 
- void CMessageQueue::KeepAlive(bool bAlive)
 
- {
 
-     _alive = bAlive;
 
- }
 
- SocketRemote* CMessageQueue::GetRemoteClient()
 
- {
 
-     return _ctrl->GetRemoteClient();
 
- }
 
- void CMessageQueue::Process(CIOBuffer* pBuffer)
 
- {
 
-     Message* message = reinterpret_cast<Message*>(pBuffer->Buffer);
 
-     switch (message->cmd)
 
-     {
 
-     case WM_NOTIFY_REQ:
 
-       
 
-         if (message->index == 0)
 
-         {
 
-             mrsWebrtcCreateFactory(false, true);
 
-         }
 
-         _WindowArray[message->index]->OnNotifyReq((int32_t)message->param_l);
 
-         break;
 
-     case WM_NOTIFY_REP:
 
-        _WindowArray[message->index]->OnNotifyRep((int32_t)message->param_l);
 
-         break;;
 
-     case WM_NOTIFY_ANSWER:
 
-         _WindowArray[message->index]->OnNotifyAnswer((CIOBuffer*)message->param_l);
 
-         break;
 
-     case WM_NOTIFY_CANDIDATE:
 
-         _WindowArray[message->index]->OnNotifyCandidate((CIOBuffer*)message->param_l);
 
-         break;
 
-     case WM_NOTIFY_OFFER:
 
-         _WindowArray[message->index]->OnNotifyOffer((CIOBuffer*)message->param_l);
 
-         break;
 
-     case WM_ASK_VIDEOREQ:
 
-         _WindowArray[message->index]->OnAskVideoReq();
 
-         break;
 
-     default:
 
-         break;
 
-     }
 
-     
 
- }
 
- void CMessageQueue::PostMessage(int32_t cmd, int32_t index, int64_t l , int64_t r )
 
- {
 
-     CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
 
-     Message* message = reinterpret_cast<Message*>(pBuffer->Buffer);
 
-     message->cmd = cmd;
 
-     message->index = index;
 
-     message->param_l = l;
 
-     message->param_r = r;
 
-     EnQueue(pBuffer);
 
- }
 
-  
 
- void CMessageQueue::OverlayVideo(RenderPosition pos, const webrtc::VideoFrame& frame)
 
- {
 
-     _WindowArray[pos]->OverlayVideo(frame);
 
- }
 
- void CMessageQueue::OnIdle()
 
- {
 
-     _ctrl->GetRemoteClient()->WriteKeepAlive();
 
- }
 
- void CMessageQueue::NotifyLeave()
 
- {
 
-     _peer = -1;
 
-     for (int32_t i = RenderPosition::FRONT; i < RenderPosition::ALL; i++)
 
-     {
 
-         _WindowArray[i]->OnNotifyLeave();
 
-     }
 
- }
 
- void CMessageQueue::NotifyReq(int32_t peer, int32_t index)
 
- {
 
-     if (index == 0)
 
-     {
 
-         _peer = peer;
 
-         for (int32_t i = RenderPosition::FRONT; i < RenderPosition::ALL; i++)
 
-         {
 
-             _WindowArray[i]->SetPeer(peer);
 
-         }
 
-     }
 
-     PostMessage(WM_NOTIFY_REQ, index, index);
 
- }
 
 
  |