message_queue.cpp 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. #include <atomic>
  2. #include <string>
  3. #include "../common/comm.h"
  4. #include "api.h"
  5. #include "message_queue.h"
  6. #include "../common/iobuffer.h"
  7. #include "../common/peer_connection.h"
  8. #include <json/json.h>
  9. #include "protocol.pb.h"
  10. #include <fstream>
  11. #include <cstring>
  12. CMessageQueue::CMessageQueue():_head(nullptr),_tail(nullptr)
  13. {
  14. _ping = false;
  15. }
  16. CMessageQueue::~CMessageQueue()
  17. {
  18. }
  19. void CMessageQueue::Start()
  20. {
  21. _thread = std::thread(&CMessageQueue::Run, this);
  22. }
  23. void CMessageQueue::Stop()
  24. {
  25. _run = false;
  26. _thread.join();
  27. }
  28. void CMessageQueue::EnQueue(CIOBuffer* pBuffer)
  29. {
  30. bool bNullBuffer=false;
  31. std::unique_lock <std::mutex> lck(_lock);
  32. if(_head ==nullptr)
  33. {
  34. _head =_tail=pBuffer;
  35. bNullBuffer=true;
  36. }
  37. else{
  38. _tail->NextBuf=pBuffer;
  39. _tail = _tail->NextBuf;
  40. }
  41. pBuffer->NextBuf=nullptr;
  42. if(bNullBuffer)
  43. {
  44. _cv.notify_one();
  45. }
  46. }
  47. void CMessageQueue::Run()
  48. {
  49. _run = true;
  50. CIOBuffer* ptr = nullptr;
  51. std::vector<CIOBuffer* > array;
  52. while(_run)
  53. {
  54. array.clear();
  55. {
  56. std::unique_lock <std::mutex> lck(_lock);
  57. while (_head == nullptr && _cv.wait_for(lck, std::chrono::milliseconds(500)) == std::cv_status::timeout)
  58. //while ((_head == nullptr) && (_cv.wait_for(lck, std::chrono::milliseconds(500)) == std::cv_status::timeout))
  59. {
  60. //����ʱ��
  61. auto startTime = std::chrono::high_resolution_clock::now();
  62. auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(startTime-oldTime);
  63. //OutputDebugStringA(("===================OnIdle() interval: " + std::to_string(duration.count()) + " milliseconds\n").c_str());
  64. oldTime = startTime;
  65. //if (duration.count() < 600)
  66. {
  67. OnIdle();
  68. //OutputDebugStringA(("=2222222222222=====OnIdle() interval: " + std::to_string(duration.count()) + " milliseconds\n").c_str());
  69. }
  70. //CheckSignal();
  71. //std::cout<<".";
  72. //std::cout.flush();
  73. }
  74. while (_head != nullptr)
  75. {
  76. ptr = _head;
  77. _head = _head->NextBuf;
  78. if (ptr != nullptr)
  79. {
  80. array.push_back(ptr);
  81. // Process(ptr);
  82. }
  83. }
  84. }
  85. for (auto ptr:array )
  86. {
  87. Process(ptr);
  88. ptr->Release(__FILE__, __LINE__);
  89. }
  90. }
  91. }