message_queue.cpp 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. #include <atomic>
  2. #include <string>
  3. #include <memory>
  4. #include "api/video/video_frame.h"
  5. #include "../common/comm.h"
  6. #include "api.h"
  7. #include "message_queue.h"
  8. #include "../common/iobuffer.h"
  9. #include "../common/peer_connection.h"
  10. #include <json/json.h>
  11. #include "protocol.pb.h"
  12. #include <fstream>
  13. #include <cstring>
  14. CMessageQueue::CMessageQueue():_head(nullptr),_tail(nullptr)
  15. {
  16. _ping = false;
  17. }
  18. CMessageQueue::~CMessageQueue()
  19. {
  20. }
  21. void CMessageQueue::Start()
  22. {
  23. _thread = std::thread(&CMessageQueue::Run, this);
  24. }
  25. void CMessageQueue::Stop()
  26. {
  27. _run = false;
  28. _thread.join();
  29. }
  30. void CMessageQueue::EnQueue(CIOBuffer* pBuffer)
  31. {
  32. bool bNullBuffer=false;
  33. std::unique_lock <std::mutex> lck(_lock);
  34. if(_head ==nullptr)
  35. {
  36. _head =_tail=pBuffer;
  37. bNullBuffer=true;
  38. }
  39. else{
  40. _tail->NextBuf=pBuffer;
  41. _tail = _tail->NextBuf;
  42. }
  43. pBuffer->NextBuf=nullptr;
  44. if(bNullBuffer)
  45. {
  46. _cv.notify_all();
  47. }
  48. }
  49. void CMessageQueue::Run()
  50. {
  51. _run = true;
  52. CIOBuffer* ptr = nullptr;
  53. std::vector<CIOBuffer* > array;
  54. while(_run)
  55. {
  56. array.clear();
  57. {
  58. std::unique_lock <std::mutex> lck(_lock);
  59. while (_head == nullptr && _cv.wait_for(lck, std::chrono::milliseconds(500)) == std::cv_status::timeout)
  60. {
  61. OnIdle();
  62. // CheckSignal();
  63. // std::cout<<".";
  64. // std::cout.flush();
  65. }
  66. while (_head != nullptr)
  67. {
  68. ptr = _head;
  69. _head = _head->NextBuf;
  70. if (ptr != nullptr)
  71. {
  72. array.push_back(ptr);
  73. // Process(ptr);
  74. //
  75. }
  76. }
  77. }
  78. for (auto ptr:array )
  79. {
  80. Process(ptr);
  81. ptr->Release(__FILE__, __LINE__);
  82. }
  83. }
  84. }