UserManager.cpp 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. #define WIN32_LEAN_AND_MEAN
  2. #include <windows.h>
  3. #include <WinSock2.h>
  4. #include <mutex>
  5. #include <functional>
  6. #include "../Protocol/win/Protocol.pb.h"
  7. // #include "rapidjson/rapidjson.h"
  8. // #include "rapidjson/document.h"
  9. // #include "rapidjson/istreamwrapper.h"
  10. // #include "rapidjson/stringbuffer.h"
  11. // #include "rapidjson/writer.h"
  12. // #include "rapidjson/document.h"
  13. //#include "WebHandler.h"
  14. #include "UserSocket.h"
  15. #include "UserManager.h"
  16. #include "../common/IOBuffer.h"
  17. //#include "WebServer.h"
  18. const ULONGLONG MAX_DELAY_TIME = 30000;
  19. CUserManager& CUserManager::GetInstance()
  20. {
  21. static CUserManager m;
  22. return m;
  23. }
  24. void CUserManager::Check(int32_t uid)
  25. {
  26. std::lock_guard<std::mutex> l(_lock);
  27. for (auto it = SocketList.begin(); it != SocketList.end(); ++it)
  28. {
  29. CUserSocket* User = *it;
  30. if (User->uid() == uid)
  31. {
  32. User->KickOff();
  33. break;
  34. }
  35. }
  36. }
  37. void CUserManager::Add(CUserSocket* ptr)
  38. {
  39. std::lock_guard<std::mutex> l(_lock);
  40. SocketList.push_back(ptr);
  41. // rapidjson::StringBuffer strBuf;
  42. // rapidjson::Writer<rapidjson::StringBuffer> root(strBuf);
  43. // root.StartObject();
  44. // root.Key("type");
  45. // root.String(kNotify);
  46. // root.Key("uid");
  47. // root.Int(ptr->GetId());
  48. // root.Key("state");
  49. // root.Bool(true);
  50. // root.EndObject();
  51. //
  52. // BroadCastUserState(ptr->GetId(), strBuf.GetString());
  53. }
  54. void CUserManager::Remove(CUserSocket* lhs)
  55. {
  56. std::lock_guard<std::mutex> l(_lock);
  57. for (auto it = SocketList.begin(); it != SocketList.end(); ++it)
  58. {
  59. CUserSocket* User = *it;
  60. if (User == lhs)
  61. {
  62. SocketList.erase(it);
  63. break;
  64. }
  65. }
  66. }
  67. void CUserManager::Write(int32_t uid, CIOBuffer* pBuffer)
  68. {
  69. std::lock_guard<std::mutex> l(_lock);
  70. for (auto& a : SocketList)
  71. {
  72. if (a->uid() == uid)
  73. {
  74. a->Write(pBuffer);
  75. break;
  76. }
  77. }
  78. }
  79. RemoNet::VideoDesc CUserManager::ConnectPeerVideo(int32_t peer, int32_t uid, int32_t index)
  80. {
  81. std::lock_guard<std::mutex> l(_lock);
  82. for (auto& a : SocketList)
  83. {
  84. if (a->uid() == peer)
  85. {
  86. return a->ReqVideo(uid, index);
  87. }
  88. }
  89. return RemoNet::VideoDesc::NoFound;
  90. }
  91. void CUserManager::LeavePeerVideo(int32_t peer, int32_t uid,EgoType type)
  92. {
  93. std::lock_guard<std::mutex> l(_lock);
  94. for (auto& a : SocketList)
  95. {
  96. if (a->uid() == peer)
  97. {
  98. a->LeaveVideo(uid,type);
  99. break;
  100. }
  101. }
  102. }
  103. void CUserManager::ClosePeerVideo(int32_t peer, int32_t uid, EgoType type, int32_t index)
  104. {
  105. std::lock_guard<std::mutex> l(_lock);
  106. for (auto& a : SocketList)
  107. {
  108. if (a->uid() == peer)
  109. {
  110. a->CloseVideo(uid, index, type);
  111. break;
  112. }
  113. }
  114. }
  115. void CUserManager::Start()
  116. {
  117. _thread = std::thread(std::bind(&CUserManager::Run, this));
  118. }
  119. void CUserManager::Run()
  120. {
  121. _run = true;
  122. while (_run)
  123. {
  124. ULONGLONG tick = GetTickCount64();
  125. {
  126. std::unique_lock<std::mutex> l(_lock);
  127. for (auto it = SocketList.begin(); it != SocketList.end();)
  128. {
  129. CUserSocket* lhs = *it;
  130. if (tick - lhs->GetTimeTick() > MAX_DELAY_TIME)
  131. {
  132. lhs->OnTimeout();
  133. it = SocketList.erase(it);
  134. }
  135. else
  136. {
  137. ++it;
  138. }
  139. }
  140. }
  141. Sleep(5000);
  142. }
  143. }
  144. void CUserManager::ReplyPeerVideo(int32_t peer, int32_t uid, RemoNet::VideoDesc ret, int32_t index)
  145. {
  146. std::lock_guard<std::mutex> l(_lock);
  147. for (auto& a : SocketList)
  148. {
  149. if (a->uid() == peer)
  150. {
  151. a->RepVideo(uid, index, ret);
  152. break;
  153. }
  154. }
  155. }
  156. RemoNet::UserState CUserManager::GetState(int32_t uid)
  157. {
  158. std::lock_guard<std::mutex> l(_lock);
  159. for (auto& a : SocketList)
  160. {
  161. if (a->uid() == uid)
  162. {
  163. return a->state();
  164. }
  165. }
  166. return RemoNet::UserState::Offline;
  167. }
  168. void CUserManager::NotifyOffer(int32_t peer, int32_t uid, int32_t index, const char* type, const char* sdp)
  169. {
  170. std::lock_guard<std::mutex> l(_lock);
  171. for (auto& a : SocketList)
  172. {
  173. if (a->uid() == peer)
  174. {
  175. a->NotifyOffer(uid, index, type, sdp);
  176. break;
  177. }
  178. }
  179. }
  180. void CUserManager::NotifyAnswer(int32_t peer, int32_t uid, int32_t index, const char* type, const char* sdp)
  181. {
  182. std::lock_guard<std::mutex> l(_lock);
  183. for (auto& a : SocketList)
  184. {
  185. if (a->uid() == peer)
  186. {
  187. a->NotifyAnswer(uid, index, type, sdp);
  188. break;
  189. }
  190. }
  191. }
  192. void CUserManager::NotifyCandidate(int32_t peer, int32_t uid, int32_t index, const char* type, const char* candidate, int32_t sdp_mline_index, const char* sdp_mid)
  193. {
  194. std::lock_guard<std::mutex> l(_lock);
  195. for (auto& a : SocketList)
  196. {
  197. if (a->uid() == peer)
  198. {
  199. a->NotifyCandidate(uid, index, type, candidate, sdp_mline_index, sdp_mid);
  200. break;
  201. }
  202. }
  203. }
  204. void CUserManager::NotifyMoveBegin(int32_t peer, int32_t uid, WorkArea area, int32_t no)
  205. {
  206. std::lock_guard<std::mutex> l(_lock);
  207. for (auto& a : SocketList)
  208. {
  209. if (a->uid() == peer)
  210. {
  211. a->NotifyMoveBegin(uid, area, no);
  212. break;
  213. }
  214. }
  215. }
  216. void CUserManager::NotifyMoveEnd(int32_t peer,int32_t uid, WorkArea area, int32_t no)
  217. {
  218. std::lock_guard<std::mutex> l(_lock);
  219. for (auto& a : SocketList)
  220. {
  221. if (a->uid()==peer&& a->state() == UserState::Idle)///* && */)
  222. {
  223. a->NotifyMoveEnd(uid, area, no);
  224. return;
  225. }
  226. }
  227. for (auto& a : SocketList)
  228. {
  229. if (a->type() == EgoType::User && a->state() == UserState::Idle)//a->type() == EgoType::User/* && */)
  230. {
  231. a->NotifyMoveEnd(uid, area, no);
  232. return;
  233. }
  234. }
  235. }
  236. void CUserManager::GetRobot(std::vector<Robot>& ret)
  237. {
  238. std::lock_guard<std::mutex> l(_lock);
  239. for (auto& a : SocketList)
  240. {
  241. if (a->type() == EgoType::Car)
  242. {
  243. Robot m;
  244. m.name = a->name();
  245. m.uid = a->uid();
  246. m.type = a->type();
  247. m.state= a->state();
  248. m.car = a->car();
  249. ret.push_back(m);
  250. }
  251. }
  252. }
  253. void CUserManager::NotifyMoveRet(int32_t peer, int32_t uid, RemoNet::MoveDesc desc)
  254. {
  255. RemoNet::MoveRet Rep;
  256. Rep.set_desc(desc);
  257. Rep.set_peer(uid);
  258. CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
  259. MessageHead Head;
  260. Head.Command = RemoNet::SC_MoveRet;
  261. Head.Length = Rep.ByteSizeLong();
  262. Head.Serialize(pBuffer->Buffer);
  263. auto ptr = pBuffer->Buffer + MessageHead::Size();
  264. Rep.SerializeToArray(ptr, Head.Length);
  265. pBuffer->Length = MessageHead::Size() + Head.Length;
  266. std::lock_guard<std::mutex> l(_lock);
  267. for (auto& a : SocketList)
  268. {
  269. if (a->uid() == peer)
  270. {
  271. a->Write(pBuffer);
  272. break;
  273. }
  274. }
  275. pBuffer->Release(__FILE__, __LINE__);
  276. }
  277. void CUserManager::NotifySwitchDriver(int32_t peer, int32_t uid)
  278. {
  279. RemoNet::SwitchDriver Rep;
  280. Rep.set_peer(uid);
  281. CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
  282. MessageHead Head;
  283. Head.Command = RemoNet::SC_SwitchDriver;
  284. Head.Length = Rep.ByteSizeLong();
  285. Head.Serialize(pBuffer->Buffer);
  286. auto ptr = pBuffer->Buffer + MessageHead::Size();
  287. Rep.SerializeToArray(ptr, Head.Length);
  288. pBuffer->Length = MessageHead::Size() + Head.Length;
  289. std::lock_guard<std::mutex> l(_lock);
  290. for (auto& a : SocketList)
  291. {
  292. if (a->uid() == peer)
  293. {
  294. a->Write(pBuffer);
  295. break;
  296. }
  297. }
  298. pBuffer->Release(__FILE__, __LINE__);
  299. }
  300. void CUserManager::NotifyState(int32_t uid, EgoType type, RemoNet::UserState state)
  301. {
  302. if (type != EgoType::Car) return;
  303. std::lock_guard<std::mutex> l(_lock);
  304. RemoNet::SCState Rep;
  305. Rep.set_state(state);
  306. Rep.set_uid(uid);
  307. CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
  308. MessageHead Head;
  309. Head.Command = RemoNet::SC_State;
  310. Head.Length = Rep.ByteSizeLong();
  311. Head.Serialize(pBuffer->Buffer);
  312. auto ptr = pBuffer->Buffer + MessageHead::Size();
  313. Rep.SerializeToArray(ptr, Head.Length);
  314. pBuffer->Length = MessageHead::Size() + Head.Length;
  315. for (auto& a : SocketList)
  316. {
  317. if (a->type() == EgoType::User)
  318. {
  319. a->Write(pBuffer);
  320. }
  321. }
  322. pBuffer->Release(__FILE__, __LINE__);
  323. }
  324. void CUserManager::BroadCast(CIOBuffer* pBuffer)
  325. {
  326. std::lock_guard<std::mutex> l(_lock);
  327. for (auto& a : SocketList)
  328. {
  329. if (a->type() == EgoType::User)
  330. {
  331. a->Write(pBuffer);
  332. }
  333. }
  334. }
  335. /*
  336. void CUserManager::BroadCastUserState(int32_t uid, const char* content)
  337. {
  338. //std::shared_lock<std::shared_mutex> l(lock);
  339. for (auto& node : UserMap)
  340. {
  341. if (auto p = node.second.lock())
  342. {
  343. if (p->GetUID() != uid)
  344. {
  345. p->Notify(content);
  346. }
  347. }
  348. }
  349. }
  350. */