user_handler.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476
  1. #include <string.h>
  2. #include <stdio.h>
  3. #include <stdlib.h>
  4. #include <unistd.h>
  5. #include <sys/select.h>
  6. #include <sys/time.h>
  7. #include <sys/socket.h>
  8. #include <netinet/in.h>
  9. #include <arpa/inet.h>
  10. #include <sys/epoll.h>
  11. #include <chrono>
  12. #include "user_handler.h"
  13. #include "remote.pb.h"
  14. #include "../common/iobuffer.h"
  15. #include "user_manager.h"
  16. #include <mysql_connection.h>
  17. #include <mysql_driver.h>
  18. #include <cppconn/exception.h>
  19. #include <cppconn/driver.h>
  20. #include <cppconn/connection.h>
  21. #include <cppconn/resultset.h>
  22. #include <cppconn/prepared_statement.h>
  23. #include <cppconn/statement.h>
  24. #include "DBConnect.h"
  25. #include "scoped_ptr.h"
  26. FNRedirector<CUserHandler> CUserHandler::Redirector;
  27. CUserHandler::CUserHandler(int32_t fd):_fd(fd)
  28. {
  29. }
  30. void CUserHandler::InitFnDirector()
  31. {
  32. Redirector.Insert(remote::cs_answer, &CUserHandler::OnAnswer);
  33. Redirector.Insert(remote::cs_candidate, &CUserHandler::OnCadidate);
  34. Redirector.Insert(remote::cs_keepAlive, &CUserHandler::OnKeepAlive);
  35. Redirector.Insert(remote::cs_add, &CUserHandler::OnAdd);
  36. Redirector.Insert(remote::cs_robot, &CUserHandler::OnRobot);
  37. Redirector.Insert(remote::cs_offer, &CUserHandler::OnOffer);
  38. Redirector.Insert(remote::cs_rep, &CUserHandler::OnRepVideo);
  39. Redirector.Insert(remote::cs_req, &CUserHandler::OnReqVideo);
  40. Redirector.Insert(remote::cs_leave, &CUserHandler::OnLeave);
  41. //Redirector.Insert(RemoNet::CS_CloseVideo, &CUserSocket::OnCloseVideo);
  42. }
  43. long long CUserHandler::GetTimeTick()
  44. {
  45. return _tick;
  46. }
  47. void CUserHandler::OnTimeout()
  48. {
  49. }
  50. int32_t CUserHandler::Read()
  51. {
  52. int32_t length=_readBuffer->Length;
  53. int32_t ret=recv(_fd,&_readBuffer->Buffer[length],CIOBuffer::IO_BUFFER_SIZE-length,0);
  54. if(ret<0)
  55. {
  56. if((errno == EAGAIN) || (errno == EWOULDBLOCK))
  57. {
  58. printf("read later\n");
  59. return 1;
  60. }
  61. else {
  62. return -1;
  63. }
  64. }
  65. _readBuffer->Length+=ret;
  66. while(true)
  67. {
  68. int8_t * ptr=_readBuffer->Buffer;
  69. length+=ret;
  70. if(length<MessageHead::Size())
  71. {
  72. return 0;
  73. }
  74. MessageHead Head;
  75. Head.Deserialize(ptr);
  76. int32_t size=length<MessageHead::Size()+Head.Length;
  77. if(length<size)
  78. {
  79. return 0;
  80. }
  81. int8_t* Data = ptr + MessageHead::Size();
  82. Process(Head.Command, Data, Head.Length);
  83. length -= size;
  84. ptr += size;
  85. }
  86. return 0;
  87. }
  88. void CUserHandler::Process(int32_t cmd, int8_t* Data, int32_t Size)
  89. {
  90. if (!_loginsucc)
  91. {
  92. if ((cmd != remote::cs_add)) return;
  93. }
  94. Redirector.Process(this, cmd, Data, Size);
  95. }
  96. void CUserHandler::OnKeepAlive(int8_t* Data, int16_t Size)
  97. {
  98. _tick=std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();;
  99. }
  100. void CUserHandler::OnOffer(int8_t* Data, int16_t Size)
  101. {
  102. std::cout << __FUNCTION__ << std::endl;
  103. remote::Offer Req;
  104. Req.ParseFromArray(Data, Size);
  105. int32_t peer = Req.peer();
  106. CUserManager::GetInstance().NotifyOffer(peer,_uid,Req.index(),Req.type().c_str(),Req.sdp().c_str());
  107. }
  108. void CUserHandler::OnAnswer(int8_t* Data, int16_t Size)
  109. {
  110. std::cout << __FUNCTION__ << std::endl;
  111. remote::Answer Req;
  112. Req.ParseFromArray(Data, Size);
  113. int32_t peer = Req.peer();
  114. std::cout << Req.sdp().c_str() << std::endl;
  115. CUserManager::GetInstance().NotifyAnswer(peer, _uid, Req.index(), Req.type().c_str(), Req.sdp().c_str());
  116. }
  117. void CUserHandler::OnCadidate(int8_t* Data, int16_t Size)
  118. {
  119. remote::Candidate Req;
  120. Req.ParseFromArray(Data, Size);
  121. int32_t peer = Req.peer();
  122. CUserManager::GetInstance().NotifyCandidate(peer, _uid, Req.index(), Req.type().c_str(), Req.candidate().c_str(), Req.sdpmlineindex(), Req.sdpmid().c_str());
  123. }
  124. void CUserHandler::OnAdd(int8_t* Data, int16_t Size)
  125. {
  126. remote::CSAdd Req;
  127. Req.ParseFromArray(Data, Size);
  128. std::string serial = Req.serial();
  129. _name = Req.name();
  130. CQPtr<CConnectionPtr<sql::Connection>> Conn = CDBConnectPool::GetInstance().QueryConnect();
  131. scoped_ptr<sql::Statement> stmt = (*Conn.get())->createStatement();
  132. char sql[1024];
  133. sprintf(sql, "select id from robot where serial=\'%s\'", serial.c_str());
  134. scoped_ptr<sql::ResultSet> resultSet = stmt->executeQuery(sql);
  135. bool bRet = false;
  136. if (resultSet->next())
  137. {
  138. _uid = resultSet->getInt(1);
  139. _loginsucc = true;
  140. _egoType =static_cast<EgoType>(Req.type());
  141. _state = UserState::Idle;
  142. bRet = true;
  143. }
  144. if (bRet)
  145. {
  146. remote::SCAddRobot robot;
  147. auto r=robot.mutable_robot();
  148. r->set_name(_name);
  149. r->set_rid(_uid);
  150. r->set_type(_egoType);
  151. r->set_state(UserState::Idle);
  152. CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
  153. MessageHead Head;
  154. Head.Command = remote::sc_NotifyAdd;
  155. Head.Length = robot.ByteSizeLong();
  156. Head.Serialize(pBuffer->Buffer);
  157. auto ptr = pBuffer->Buffer + MessageHead::Size();
  158. robot.SerializeToArray(ptr, Head.Length);
  159. pBuffer->Length = MessageHead::Size() + Head.Length;
  160. CUserManager::GetInstance().BroadCast(pBuffer);
  161. pBuffer->Release(__FILE__, __LINE__);
  162. }
  163. remote::SCAdd Rep;
  164. Rep.set_ret(bRet);
  165. Rep.set_uid(_uid);
  166. Rep.set_name(_name);
  167. CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
  168. MessageHead Head;
  169. Head.Command = remote::sc_add;
  170. Head.Length = Rep.ByteSizeLong();
  171. Head.Serialize(pBuffer->Buffer);
  172. auto ptr = pBuffer->Buffer + MessageHead::Size();
  173. Rep.SerializeToArray(ptr, Head.Length);
  174. pBuffer->Length = MessageHead::Size() + Head.Length;
  175. Write(pBuffer);
  176. pBuffer->Release(__FILE__, __LINE__);
  177. }
  178. void CUserHandler::OnRobot(int8_t* Data, int16_t Size)
  179. {
  180. remote::CSRobot Req;
  181. Req.ParseFromArray(Data, Size);
  182. std::vector<Benchboard> ret;
  183. CUserManager::GetInstance().GetRobot(ret);
  184. remote::SCRobot Rep;
  185. for (auto& node : ret)
  186. {
  187. auto robot = Rep.add_robot();
  188. robot->set_name(node.name.c_str());
  189. robot->set_rid(node.uid);
  190. robot->set_type(node.type);
  191. robot->set_state(node.state);
  192. }
  193. CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
  194. MessageHead Head;
  195. Head.Command = remote::sc_robot;
  196. Head.Length = Rep.ByteSizeLong();
  197. Head.Serialize(pBuffer->Buffer);
  198. auto ptr = pBuffer->Buffer + MessageHead::Size();
  199. Rep.SerializeToArray(ptr, Head.Length);
  200. pBuffer->Length = MessageHead::Size() + Head.Length;
  201. Write(pBuffer);
  202. pBuffer->Release(__FILE__, __LINE__);
  203. }
  204. void CUserHandler::OnRepVideo(int8_t* Data, int16_t Size)
  205. {
  206. std::cout << __FUNCTION__ << std::endl;
  207. remote::CSRep Req;
  208. Req.ParseFromArray(Data, Size);
  209. int32_t peer = Req.peer();
  210. if (Req.desc() == remote::VideoDesc::OK)
  211. _peer = peer;
  212. CUserManager::GetInstance().ReplyPeerVideo(peer, _uid,static_cast<remote::VideoDesc>(Req.desc()), Req.index());
  213. }
  214. void CUserHandler::OnReqVideo(int8_t* Data, int16_t Size)
  215. {
  216. std::cout << __FUNCTION__ << std::endl;
  217. remote::CSReq Req;
  218. Req.ParseFromArray(Data, Size);
  219. int32_t peer = Req.peer();
  220. int32_t index = Req.index();
  221. int32_t type = Req.egotype();
  222. //bool master=Req.master();
  223. if (type==EgoType::Car&&index==RenderPosition::FRONT)
  224. {
  225. if (_peer == _uid || peer == _peer) return;
  226. _peer = peer;
  227. }
  228. auto state = CUserManager::GetInstance().ConnectPeerVideo(peer, _uid,index);
  229. remote::SCReq Rep;
  230. Rep.set_peer(peer);
  231. Rep.set_desc(state);
  232. CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
  233. MessageHead Head;
  234. Head.Command = remote::sc_req;
  235. Head.Length = Req.ByteSizeLong();
  236. Head.Serialize(pBuffer->Buffer);
  237. auto ptr = pBuffer->Buffer + MessageHead::Size();
  238. Req.SerializeToArray(ptr, Head.Length);
  239. pBuffer->Length = MessageHead::Size() + Head.Length;
  240. Write(pBuffer);
  241. pBuffer->Release(__FILE__, __LINE__);
  242. }
  243. void CUserHandler::OnLeave(int8_t* Data, int16_t Size)
  244. {
  245. remote::Leave Req;
  246. Req.ParseFromArray(Data, Size);
  247. auto type = Req.egotype();
  248. if (_egoType != EgoType::User||type==EgoType::User) return;
  249. int32_t peer = Req.peer();
  250. if (type == EgoType::Car)
  251. {
  252. if (peer != _peer) return;
  253. _peer = -1;
  254. }
  255. CUserManager::GetInstance().LeavePeerVideo(peer, _uid, _egoType);
  256. }
  257. void CUserHandler::Write(CIOBuffer* pBuffer)
  258. {
  259. int32_t offset=0;
  260. while(pBuffer->Length>0)
  261. {
  262. int32_t ret= send(_fd,&pBuffer->Buffer[offset],pBuffer->Length,0);
  263. if(ret<=0) return;
  264. offset+=ret;
  265. pBuffer->Length-=ret;
  266. }
  267. }
  268. EgoType CUserHandler::type()
  269. {
  270. return _egoType;
  271. }
  272. int32_t CUserHandler::uid()
  273. {
  274. return _uid;
  275. }
  276. std::string& CUserHandler::name()
  277. {
  278. return _name;
  279. }
  280. UserState CUserHandler::state()
  281. {
  282. return _state;
  283. }
  284. void CUserHandler::LeaveVideo(int32_t peer,EgoType type)
  285. {
  286. if (_peer != peer) return;
  287. _peer = -1;
  288. remote::Leave Rep;
  289. Rep.set_egotype(type);
  290. Rep.set_peer(peer);
  291. CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
  292. MessageHead Head;
  293. Head.Command = remote::sc_NotifyLeave;
  294. Head.Length = Rep.ByteSizeLong();
  295. Head.Serialize(pBuffer->Buffer);
  296. auto ptr = pBuffer->Buffer + MessageHead::Size();
  297. Rep.SerializeToArray(ptr, Head.Length);
  298. pBuffer->Length = MessageHead::Size() + Head.Length;
  299. Write(pBuffer);
  300. pBuffer->Release(__FILE__, __LINE__);
  301. }
  302. remote::VideoDesc CUserHandler::ReqVideo(int32_t peer, int32_t index)
  303. {
  304. //if(index!=RenderPosition::FRONT_BACK)
  305. if (_peer!=-1&&_peer != peer) return remote::VideoDesc::Busy;
  306. remote::CSReq Rep;
  307. Rep.set_peer(peer);
  308. Rep.set_index(index);
  309. CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
  310. MessageHead Head;
  311. Head.Command = remote::sc_NotifyReq;
  312. Head.Length = Rep.ByteSizeLong();
  313. Head.Serialize(pBuffer->Buffer);
  314. auto ptr = pBuffer->Buffer + MessageHead::Size();
  315. Rep.SerializeToArray(ptr, Head.Length);
  316. pBuffer->Length = MessageHead::Size() + Head.Length;
  317. Write(pBuffer);
  318. pBuffer->Release(__FILE__, __LINE__);
  319. return remote::VideoDesc::OK;
  320. }
  321. void CUserHandler::RepVideo(int32_t peer, int32_t index,remote::VideoDesc desc)
  322. {
  323. if (desc != remote::VideoDesc::OK)
  324. {
  325. _peer = -1;
  326. }
  327. remote::CSRep Rep;
  328. Rep.set_desc(desc);
  329. Rep.set_peer(peer);
  330. Rep.set_index(index);
  331. CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
  332. MessageHead Head;
  333. Head.Command = remote::sc_NotifyRep;
  334. Head.Length = Rep.ByteSizeLong();
  335. Head.Serialize(pBuffer->Buffer);
  336. auto ptr = pBuffer->Buffer + MessageHead::Size();
  337. Rep.SerializeToArray(ptr, Head.Length);
  338. pBuffer->Length = MessageHead::Size() + Head.Length;
  339. Write(pBuffer);
  340. pBuffer->Release(__FILE__, __LINE__);
  341. }
  342. void CUserHandler::NotifyOffer(int32_t peer, int32_t index,const char * type,const char * sdp)
  343. {
  344. // std::cout << sdp << std::endl;
  345. remote::Offer Rep;
  346. Rep.set_peer(peer);
  347. Rep.set_index(index);
  348. Rep.set_type(type);
  349. Rep.set_sdp(sdp);
  350. CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
  351. MessageHead Head;
  352. Head.Command = remote::sc_NotifyOffer;
  353. Head.Length = Rep.ByteSizeLong();
  354. Head.Serialize(pBuffer->Buffer);
  355. auto ptr = pBuffer->Buffer + MessageHead::Size();
  356. Rep.SerializeToArray(ptr, Head.Length);
  357. pBuffer->Length = MessageHead::Size() + Head.Length;
  358. Write(pBuffer);
  359. pBuffer->Release(__FILE__, __LINE__);
  360. }
  361. void CUserHandler::NotifyCandidate(int32_t peer, int32_t index, const char* type, const char* candidate, int32_t sdp_mline_index, const char* sdp_mid)
  362. {
  363. remote::Candidate Rep;
  364. Rep.set_peer(peer);
  365. Rep.set_index(index);
  366. Rep.set_type(type);
  367. Rep.set_sdpmid(sdp_mid);
  368. Rep.set_candidate(candidate);
  369. Rep.set_sdpmlineindex(sdp_mline_index);
  370. CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
  371. MessageHead Head;
  372. Head.Command = remote::sc_NotifyCandidate;
  373. Head.Length = Rep.ByteSizeLong();
  374. Head.Serialize(pBuffer->Buffer);
  375. auto ptr = pBuffer->Buffer + MessageHead::Size();
  376. Rep.SerializeToArray(ptr, Head.Length);
  377. pBuffer->Length = MessageHead::Size() + Head.Length;
  378. Write(pBuffer);
  379. pBuffer->Release(__FILE__, __LINE__);
  380. }
  381. int32_t CUserHandler::Fd()
  382. {
  383. return _fd;
  384. }
  385. void CUserHandler::NotifyAnswer(int32_t peer, int32_t index, const char* type, const char* sdp)
  386. {
  387. //std::cout << sdp << std::endl;
  388. remote::Answer Rep;
  389. Rep.set_peer(peer);
  390. Rep.set_index(index);
  391. Rep.set_type(type);
  392. Rep.set_sdp(sdp);
  393. CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
  394. MessageHead Head;
  395. Head.Command = remote::sc_NotifyAnswer;
  396. Head.Length = Rep.ByteSizeLong();
  397. Head.Serialize(pBuffer->Buffer);
  398. auto ptr = pBuffer->Buffer + MessageHead::Size();
  399. Rep.SerializeToArray(ptr, Head.Length);
  400. pBuffer->Length = MessageHead::Size() + Head.Length;
  401. Write(pBuffer);
  402. pBuffer->Release(__FILE__, __LINE__);
  403. }
  404. void CUserHandler::OnClose()
  405. {
  406. if (_egoType == EgoType::User)
  407. {
  408. remote::SCDelRobot rot;
  409. rot.set_peer(_uid);
  410. CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
  411. MessageHead Head;
  412. Head.Command = remote::sc_NotifyDel;
  413. Head.Length = rot.ByteSizeLong();
  414. Head.Serialize(pBuffer->Buffer);
  415. auto ptr = pBuffer->Buffer + MessageHead::Size();
  416. rot.SerializeToArray(ptr, Head.Length);
  417. pBuffer->Length = MessageHead::Size() + Head.Length;
  418. CUserManager::GetInstance().BroadCast(pBuffer);
  419. pBuffer->Release(__FILE__, __LINE__);
  420. }
  421. if (_peer != -1)
  422. CUserManager::GetInstance().LeavePeerVideo(_peer, _uid, _egoType);
  423. printf("a socket closed %d", _uid);
  424. CUserManager::GetInstance().Remove(this);
  425. close(_fd);
  426. }