#include #include #include "../common/comm.h" #include "api.h" #include "message_queue.h" #include "../common/iobuffer.h" #include "../common/peer_connection.h" #include #include "protocol.pb.h" #include #include CMessageQueue::CMessageQueue():_head(nullptr),_tail(nullptr) { _ping = false; } CMessageQueue::~CMessageQueue() { } void CMessageQueue::Start() { _thread = std::thread(&CMessageQueue::Run, this); } void CMessageQueue::Stop() { _run = false; _thread.join(); } void CMessageQueue::EnQueue(CIOBuffer* pBuffer) { bool bNullBuffer=false; std::unique_lock lck(_lock); if(_head ==nullptr) { _head =_tail=pBuffer; bNullBuffer=true; } else{ _tail->NextBuf=pBuffer; _tail = _tail->NextBuf; } pBuffer->NextBuf=nullptr; if(bNullBuffer) { _cv.notify_one(); } } void CMessageQueue::Run() { _run = true; CIOBuffer* ptr = nullptr; std::vector array; while(_run) { array.clear(); { std::unique_lock lck(_lock); while (_head == nullptr && _cv.wait_for(lck, std::chrono::milliseconds(500)) == std::cv_status::timeout) //while ((_head == nullptr) && (_cv.wait_for(lck, std::chrono::milliseconds(500)) == std::cv_status::timeout)) { //����ʱ�� auto startTime = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration_cast(startTime-oldTime); //OutputDebugStringA(("===================OnIdle() interval: " + std::to_string(duration.count()) + " milliseconds\n").c_str()); oldTime = startTime; //if (duration.count() < 600) { OnIdle(); //OutputDebugStringA(("=2222222222222=====OnIdle() interval: " + std::to_string(duration.count()) + " milliseconds\n").c_str()); } //CheckSignal(); //std::cout<<"."; //std::cout.flush(); } while (_head != nullptr) { ptr = _head; _head = _head->NextBuf; if (ptr != nullptr) { array.push_back(ptr); // Process(ptr); } } } for (auto ptr:array ) { Process(ptr); ptr->Release(__FILE__, __LINE__); } } }