123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- #include <atomic>
- #include <string>
- #include "../common/comm.h"
- #include "api.h"
- #include "message_queue.h"
- #include "../common/iobuffer.h"
- #include "../common/peer_connection.h"
-
- #include <json/json.h>
- #include "protocol.pb.h"
- #include <fstream>
- #include <cstring>
- CMessageQueue::CMessageQueue():_head(nullptr),_tail(nullptr)
- {
- _ping = false;
- }
- CMessageQueue::~CMessageQueue()
- {
- }
- void CMessageQueue::Start()
- {
-
- _thread = std::thread(&CMessageQueue::Run, this);
- }
- void CMessageQueue::Stop()
- {
- _run = false;
- _thread.join();
- }
-
- void CMessageQueue::EnQueue(CIOBuffer* pBuffer)
- {
- bool bNullBuffer=false;
- std::unique_lock <std::mutex> lck(_lock);
- if(_head ==nullptr)
- {
- _head =_tail=pBuffer;
- bNullBuffer=true;
- }
- else{
- _tail->NextBuf=pBuffer;
- _tail = _tail->NextBuf;
- }
- pBuffer->NextBuf=nullptr;
- if(bNullBuffer)
- {
- _cv.notify_one();
- }
- }
- void CMessageQueue::Run()
- {
- _run = true;
- CIOBuffer* ptr = nullptr;
- std::vector<CIOBuffer* > array;
- while(_run)
- {
- array.clear();
- {
- std::unique_lock <std::mutex> lck(_lock);
- while (_head == nullptr && _cv.wait_for(lck, std::chrono::milliseconds(500)) == std::cv_status::timeout)
- //while ((_head == nullptr) && (_cv.wait_for(lck, std::chrono::milliseconds(500)) == std::cv_status::timeout))
- {
- //����ʱ��
- auto startTime = std::chrono::high_resolution_clock::now();
- auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(startTime-oldTime);
- //OutputDebugStringA(("===================OnIdle() interval: " + std::to_string(duration.count()) + " milliseconds\n").c_str());
- oldTime = startTime;
- //if (duration.count() < 600)
- {
- OnIdle();
- //OutputDebugStringA(("=2222222222222=====OnIdle() interval: " + std::to_string(duration.count()) + " milliseconds\n").c_str());
- }
-
- //CheckSignal();
- //std::cout<<".";
- //std::cout.flush();
- }
- while (_head != nullptr)
- {
- ptr = _head;
- _head = _head->NextBuf;
- if (ptr != nullptr)
- {
- array.push_back(ptr);
- // Process(ptr);
- }
- }
- }
- for (auto ptr:array )
- {
- Process(ptr);
- ptr->Release(__FILE__, __LINE__);
- }
-
- }
-
- }
-
|