message_queue.cpp 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. #include "pch.h"
  2. #include "api/video/video_frame.h"
  3. #include <atomic>
  4. #include <string>
  5. #include <array>
  6. #include <memory>
  7. #include "peer_connection.h"
  8. #include "../common/comm.h"
  9. #include "api.h"
  10. #include "remote.pb.h"
  11. #include "../common/iobuffer.h"
  12. #include "remote_window.h"
  13. #include "message_queue.h"
  14. #include <json/json.h>
  15. #include "api/video/video_sink_interface.h"
  16. #include <fstream>
  17. #include <cstring>
  18. #include "remote_control.h"
  19. #include "socket_remote.h"
  20. CMessageQueue::CMessageQueue(CRemoteCtrl* c):_head(nullptr),_tail(nullptr),_ctrl(c)
  21. {
  22. }
  23. CMessageQueue::~CMessageQueue()
  24. {
  25. }
  26. void CMessageQueue::Start(EgoType type, std::array<IRender*, RenderPosition::ALL>& ar)
  27. {
  28. for (int i = 0; i < RenderPosition::ALL; i++)
  29. {
  30. auto p = std::make_unique<CRemoteWindow>(type,this, (RenderPosition)i,ar[i]);
  31. _WindowArray.push_back(std::move(p));
  32. }
  33. _thread = std::thread(&CMessageQueue::Run, this);
  34. }
  35. void CMessageQueue::Stop()
  36. {
  37. _run = false;
  38. _thread.join();
  39. }
  40. void CMessageQueue::OnConnect(int32_t peer)
  41. {
  42. if (_peer != -1) return;
  43. _peer = peer;
  44. mrsWebrtcCreateFactory(true, false);
  45. for (int i = 0; i < RenderPosition::ALL; i++)
  46. {
  47. _WindowArray[i]->SetPeer(peer);
  48. }
  49. _WindowArray[RenderPosition::FRONT]->OnAskVideoReq();
  50. }
  51. void CMessageQueue::EnQueue(CIOBuffer* pBuffer)
  52. {
  53. bool bNullBuffer=false;
  54. std::unique_lock <std::mutex> lck(_lock);
  55. if(_head ==nullptr)
  56. {
  57. _head =_tail=pBuffer;
  58. bNullBuffer=true;
  59. }
  60. else{
  61. _tail->NextBuf=pBuffer;
  62. _tail = _tail->NextBuf;
  63. }
  64. pBuffer->NextBuf=nullptr;
  65. if(bNullBuffer)
  66. {
  67. _cv.notify_one();
  68. }
  69. }
  70. void CMessageQueue::Run()
  71. {
  72. _run = true;
  73. CIOBuffer* ptr = nullptr;
  74. std::vector<CIOBuffer* > array;
  75. while(_run)
  76. {
  77. array.clear();
  78. {
  79. std::unique_lock <std::mutex> lck(_lock);
  80. while (_head == nullptr && _cv.wait_for(lck, std::chrono::milliseconds(500)) == std::cv_status::timeout)
  81. {
  82. if(_alive)
  83. OnIdle();
  84. // CheckSignal();
  85. // std::cout<<".";
  86. // std::cout.flush();
  87. }
  88. while (_head != nullptr)
  89. {
  90. ptr = _head;
  91. _head = _head->NextBuf;
  92. if (ptr != nullptr)
  93. {
  94. array.push_back(ptr);
  95. // Process(ptr);
  96. //
  97. }
  98. }
  99. }
  100. for (auto ptr:array )
  101. {
  102. Process(ptr);
  103. ptr->Release(__FILE__, __LINE__);
  104. }
  105. }
  106. }
  107. void CMessageQueue::KeepAlive(bool bAlive)
  108. {
  109. _alive = bAlive;
  110. }
  111. SocketRemote* CMessageQueue::GetRemoteClient()
  112. {
  113. return _ctrl->GetRemoteClient();
  114. }
  115. void CMessageQueue::Process(CIOBuffer* pBuffer)
  116. {
  117. Message* message = reinterpret_cast<Message*>(pBuffer->Buffer);
  118. switch (message->cmd)
  119. {
  120. case WM_NOTIFY_REQ:
  121. if (message->index == 0)
  122. {
  123. mrsWebrtcCreateFactory(false, true);
  124. }
  125. _WindowArray[message->index]->OnNotifyReq((int32_t)message->param_l);
  126. break;
  127. case WM_NOTIFY_REP:
  128. _WindowArray[message->index]->OnNotifyRep((int32_t)message->param_l);
  129. break;;
  130. case WM_NOTIFY_ANSWER:
  131. _WindowArray[message->index]->OnNotifyAnswer((CIOBuffer*)message->param_l);
  132. break;
  133. case WM_NOTIFY_CANDIDATE:
  134. _WindowArray[message->index]->OnNotifyCandidate((CIOBuffer*)message->param_l);
  135. break;
  136. case WM_NOTIFY_OFFER:
  137. _WindowArray[message->index]->OnNotifyOffer((CIOBuffer*)message->param_l);
  138. break;
  139. case WM_ASK_VIDEOREQ:
  140. _WindowArray[message->index]->OnAskVideoReq();
  141. break;
  142. default:
  143. break;
  144. }
  145. }
  146. void CMessageQueue::PostMessage(int32_t cmd, int32_t index, int64_t l , int64_t r )
  147. {
  148. CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
  149. Message* message = reinterpret_cast<Message*>(pBuffer->Buffer);
  150. message->cmd = cmd;
  151. message->index = index;
  152. message->param_l = l;
  153. message->param_r = r;
  154. EnQueue(pBuffer);
  155. }
  156. void CMessageQueue::OverlayVideo(RenderPosition pos, const webrtc::VideoFrame& frame)
  157. {
  158. _WindowArray[pos]->OverlayVideo(frame);
  159. }
  160. void CMessageQueue::OnIdle()
  161. {
  162. _ctrl->GetRemoteClient()->WriteKeepAlive();
  163. }
  164. void CMessageQueue::NotifyLeave()
  165. {
  166. _peer = -1;
  167. for (int32_t i = RenderPosition::FRONT; i < RenderPosition::ALL; i++)
  168. {
  169. _WindowArray[i]->OnNotifyLeave();
  170. }
  171. }
  172. void CMessageQueue::NotifyReq(int32_t peer, int32_t index)
  173. {
  174. if (index == 0)
  175. {
  176. _peer = peer;
  177. for (int32_t i = RenderPosition::FRONT; i < RenderPosition::ALL; i++)
  178. {
  179. _WindowArray[i]->SetPeer(peer);
  180. }
  181. }
  182. PostMessage(WM_NOTIFY_REQ, index, index);
  183. }