socket_client.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. #include "../common/iobuffer.h"
  2. #include "protocol.pb.h"
  3. #include "socket_client.h"
  4. #include <iostream>
  5. #include <memory.h>
  6. SocketClient::SocketClient(INativeNotify* n):_notify(n)
  7. {
  8. _connected = false;
  9. }
  10. //bool SocketClient::Start(const char * ip)
  11. bool SocketClient::Start(const char* ip, int32_t RemotePort, int32_t HostPort)
  12. {
  13. #ifdef WIN32
  14. FnMap.insert(std::make_pair(RemoNet::SC_Sign,&SocketClient::OnSigin));
  15. FnMap.insert(std::make_pair(RemoNet::SC_Robot, &SocketClient::OnRobotRep));
  16. FnMap.insert(std::make_pair(RemoNet::SC_NotifyRep, &SocketClient::OnNotifyRep));
  17. FnMap.insert(std::make_pair(RemoNet::SC_NotifyAdd, &SocketClient::OnNotifyAdd));
  18. FnMap.insert(std::make_pair(RemoNet::SC_NotifyDel, &SocketClient::OnNotifyDel));
  19. FnMap.insert(std::make_pair(RemoNet::SC_KickOff, &SocketClient::OnNotifyKick));
  20. FnMap.insert(std::make_pair(RemoNet::SC_MoveEnd, &SocketClient::OnNotifyMoveEnd));
  21. FnMap.insert(std::make_pair(RemoNet::SC_MoveRet, &SocketClient::OnNotifyMoveRet));
  22. FnMap.insert(std::make_pair(RemoNet::SC_State, &SocketClient::OnNotifyState));
  23. #else
  24. FnMap.insert(std::make_pair(RemoNet::SC_Add, &SocketClient::OnAdd));
  25. FnMap.insert(std::make_pair(RemoNet::SC_NotifyReq, &SocketClient::OnNotifyReq));
  26. FnMap.insert(std::make_pair(RemoNet::SC_MoveBegin, &SocketClient::OnNotifyMoveBegin));
  27. FnMap.insert(std::make_pair(RemoNet::SC_SwitchDriver, &SocketClient::OnNotifySwitchDriver));
  28. #endif
  29. FnMap.insert(std::make_pair(RemoNet::SC_NotifyLeave, &SocketClient::OnNotifyLeave));
  30. FnMap.insert(std::make_pair(RemoNet::SC_NotifyOffer, &SocketClient::OnNotifyOffer));
  31. FnMap.insert(std::make_pair(RemoNet::SC_NotifyAnswer, &SocketClient::OnNotifyAnswer));
  32. FnMap.insert(std::make_pair(RemoNet::SC_NotifyCandidate, &SocketClient::OnNotifyCandidate));
  33. _ip=ip;
  34. #ifdef WIN32
  35. WSAData data;
  36. WSAStartup(MAKEWORD(2, 2), &data);
  37. #endif
  38. sockfd = socket(AF_INET, SOCK_STREAM, 0);
  39. sockaddr_in sin;
  40. sin.sin_family = AF_INET;
  41. //RemotePort 27500
  42. //HostPort 27501
  43. sin.sin_port = htons(RemotePort);
  44. sin.sin_addr.s_addr = inet_addr(ip);
  45. sockaddr_in Hostsin;
  46. Hostsin.sin_family = AF_INET;
  47. Hostsin.sin_port = htons(HostPort);
  48. Hostsin.sin_addr.s_addr = INADDR_ANY;
  49. if (bind(sockfd, (const sockaddr*)&Hostsin, sizeof(Hostsin)) == SOCKET_ERROR)
  50. printf("Tcp bind port error!\r\n");
  51. if (connect(sockfd, (const sockaddr*)&sin, sizeof(sin)) == SOCKET_ERROR)
  52. {
  53. #ifdef WIN32
  54. DWORD error = WSAGetLastError();
  55. closesocket(sockfd);
  56. #else
  57. close(sockfd);
  58. #endif
  59. sockfd = INVALID_SOCKET;
  60. _connected=false;
  61. // return false;
  62. }
  63. else
  64. {
  65. _connected = true;
  66. }
  67. _notify->OnConnected(_connected);
  68. // int flag = 1;
  69. // setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag,
  70. // sizeof(flag)); // Disable Nagle's algorithm
  71. #ifdef WIN32
  72. // u_long on = 1;
  73. // ioctlsocket(sockfd, FIONBIO, &on);
  74. #else
  75. // fcntl(sockfd, F_SETFL, O_NONBLOCK);
  76. #endif
  77. _thread = std::thread(&SocketClient::Run, this);
  78. return true;
  79. }
  80. void SocketClient::Run()
  81. {
  82. _run = true;
  83. int32_t Offset = 0;
  84. CIOBuffer Buffer;
  85. while (_run)
  86. {
  87. if (_connected)
  88. {
  89. auto ret = recv(sockfd, (char*)&Buffer.Buffer[Offset], CIOBuffer::IO_BUFFER_SIZE - Offset, 0);
  90. if (ret <= 0)
  91. {
  92. #ifdef WIN32
  93. DWORD error = WSAGetLastError();
  94. closesocket(sockfd);
  95. #else
  96. close(sockfd);
  97. #endif
  98. _connected = false;
  99. _notify->OnConnected(_connected);
  100. }
  101. else
  102. {
  103. Offset += ret;
  104. if (Offset >= MessageHead::Size())
  105. {
  106. bool bNeedMove = false;
  107. MessageHead head;
  108. int8_t* ptr = Buffer.Buffer;
  109. while (true)
  110. {
  111. if (MessageHead::Size() <= Offset)
  112. {
  113. head.Deserialize(ptr);
  114. int32_t length = MessageHead::Size() + head.Length;
  115. if (Offset >= length)
  116. {
  117. int8_t* Data = ptr + MessageHead::Size();
  118. NetProcess(head.Command, Data, head.Length);
  119. ptr += length;
  120. Offset -= length;
  121. }
  122. else
  123. {
  124. bNeedMove = Offset > 0;
  125. if(bNeedMove)
  126. {
  127. std::cout<<"need Move "<<Offset<<std::endl;
  128. }
  129. break;
  130. }
  131. }
  132. else
  133. {
  134. break;
  135. }
  136. }
  137. if (bNeedMove)
  138. {
  139. memmove(Buffer.Buffer, ptr, Offset);
  140. }
  141. }
  142. }
  143. }
  144. else
  145. {
  146. std::this_thread::sleep_for(std::chrono::seconds(1));
  147. sockfd = socket(AF_INET, SOCK_STREAM, 0);
  148. sockaddr_in sin;
  149. sin.sin_family = AF_INET;
  150. sin.sin_port = htons(27500);
  151. sin.sin_addr.s_addr = inet_addr(_ip.c_str());
  152. if (connect(sockfd, (const sockaddr*)&sin, sizeof(sin)) == SOCKET_ERROR)
  153. {
  154. #ifdef WIN32
  155. closesocket(sockfd);
  156. #else
  157. close(sockfd);
  158. #endif
  159. sockfd = INVALID_SOCKET;
  160. continue;
  161. }
  162. _connected = true;
  163. _notify->OnConnected(_connected);
  164. }
  165. }
  166. }
  167. void SocketClient::Stop()
  168. {
  169. _run = false;
  170. #ifdef WIN32
  171. closesocket(sockfd);
  172. #else
  173. close(sockfd);
  174. #endif
  175. sockfd = INVALID_SOCKET;
  176. _thread.join();
  177. }
  178. void SocketClient::NetProcess(int16_t cmd, int8_t* Data, int16_t Size)
  179. {
  180. auto it = FnMap.find(cmd);
  181. if (it != FnMap.end())
  182. {
  183. (this->*it->second)(Data, Size);
  184. }
  185. }
  186. #ifdef WIN32
  187. void SocketClient::OnSigin(int8_t* Data, int16_t Size)
  188. {
  189. RemoNet::SCSign Rep;
  190. Rep.ParseFromArray(Data, Size);
  191. if (Rep.ret() == true)
  192. {
  193. _uid = Rep.uid();
  194. }
  195. _notify->OnSigin(_uid,Rep.ret());
  196. }
  197. void SocketClient::OnRobotRep(int8_t* Data, int16_t Size)
  198. {
  199. RemoNet::SCRobot Rep;
  200. Rep.ParseFromArray(Data, Size);
  201. for (int32_t i = 0; i < Rep.robot_size(); i++)
  202. {
  203. auto& node = Rep.robot(i);
  204. _notify->OnRobot(node);
  205. }
  206. }
  207. void SocketClient::WriteRobotReq()
  208. {
  209. RemoNet::CSRobot Req;
  210. MessageHead Head;
  211. CIOBuffer pBuffer;
  212. Head.Command = RemoNet::CS_Robot;
  213. Head.Length = Req.ByteSizeLong();
  214. Head.Serialize(pBuffer.Buffer);
  215. auto ptr = pBuffer.Buffer + MessageHead::Size();
  216. Req.SerializeToArray(ptr, Head.Length);
  217. pBuffer.Length = MessageHead::Size() + Head.Length;
  218. Write(&pBuffer);
  219. }
  220. void SocketClient::WriteSign(const char* account, const char* password)
  221. {
  222. RemoNet::CSSign Req;
  223. Req.set_account(account);
  224. Req.set_password(password);
  225. MessageHead Head;
  226. CIOBuffer pBuffer;
  227. Head.Command = RemoNet::CS_Sign;
  228. Head.Length = Req.ByteSizeLong();
  229. Head.Serialize(pBuffer.Buffer);
  230. auto ptr = pBuffer.Buffer + MessageHead::Size();
  231. Req.SerializeToArray(ptr, Head.Length);
  232. pBuffer.Length = MessageHead::Size() + Head.Length;
  233. Write(&pBuffer);
  234. }
  235. void SocketClient::WriteVideoLeave(EgoType type, int32_t peer)
  236. {
  237. RemoNet::Leave Req;
  238. Req.set_peer(peer);
  239. Req.set_egotype(type);
  240. MessageHead Head;
  241. CIOBuffer pBuffer;
  242. Head.Command = RemoNet::CS_Leave;
  243. Head.Length = Req.ByteSizeLong();
  244. Head.Serialize(pBuffer.Buffer);
  245. auto ptr = pBuffer.Buffer + MessageHead::Size();
  246. Req.SerializeToArray(ptr, Head.Length);
  247. pBuffer.Length = MessageHead::Size() + Head.Length;
  248. Write(&pBuffer);
  249. }
  250. #else
  251. void SocketClient::OnAdd(int8_t* Data, int16_t Size)
  252. {
  253. RemoNet::SCAdd Rep;
  254. Rep.ParseFromArray(Data, Size);
  255. if (Rep.ret() == true)
  256. {
  257. _uid = Rep.uid();
  258. }
  259. _notify->OnAdd(_uid,Rep.ret());
  260. }
  261. void SocketClient::WriteAddRobot(std::string& serial,std::string& name,std::string url,int32_t type,int32_t car)
  262. {
  263. RemoNet::CSAdd Req;
  264. Req.set_serial(serial.c_str());
  265. Req.set_name(name.c_str());
  266. Req.set_type(type);
  267. Req.set_car(car);
  268. MessageHead Head;
  269. CIOBuffer pBuffer;
  270. Head.Command = RemoNet::CS_Add;
  271. Head.Length = Req.ByteSizeLong();
  272. Head.Serialize(pBuffer.Buffer);
  273. auto ptr = pBuffer.Buffer + MessageHead::Size();
  274. Req.SerializeToArray(ptr, Head.Length);
  275. pBuffer.Length = MessageHead::Size() + Head.Length;
  276. Write(&pBuffer);
  277. }
  278. void SocketClient::OnNotifySwitchDriver(int8_t* Data, int16_t Size)
  279. {
  280. _notify->OnSwitchDriver();
  281. }
  282. #endif
  283. void SocketClient::OnNotifyAnswer(int8_t* Data, int16_t Size)
  284. {
  285. RemoNet::Answer Rep;
  286. Rep.ParseFromArray(Data, Size);
  287. _notify->OnVideoAnswer(Rep.index(), Rep.type().c_str(), Rep.sdp().c_str());
  288. }
  289. void SocketClient::OnNotifyCandidate(int8_t* Data, int16_t Size)
  290. {
  291. RemoNet::Candidate Rep;
  292. Rep.ParseFromArray(Data, Size);
  293. _notify->OnVideoCandidate(Rep.index(), Rep.candidate().c_str(), Rep.sdpmlineindex(), Rep.sdpmid().c_str());
  294. }
  295. #ifdef WIN32
  296. void SocketClient::OnNotifyState(int8_t* Data, int16_t Size)
  297. {
  298. RemoNet::SCState Rep;
  299. Rep.ParseFromArray(Data, Size);
  300. _notify->OnNotifyState(Rep.uid(), (UserState)Rep.state());
  301. }
  302. #endif
  303. void SocketClient::OnNotifyOffer(int8_t* Data, int16_t Size)
  304. {
  305. RemoNet::Offer Rep;
  306. Rep.ParseFromArray(Data, Size);
  307. _notify->OnVideoOffer(Rep.index(), Rep.type().c_str(), Rep.sdp().c_str());
  308. }
  309. #ifdef WIN32
  310. void SocketClient::OnNotifyRep(int8_t* Data, int16_t Size)
  311. {
  312. RemoNet::CSRep Rep;
  313. Rep.ParseFromArray(Data, Size);
  314. auto ok = Rep.desc() == RemoNet::VideoDesc::OK;
  315. _notify->OnVideoRep(ok,Rep.index(),Rep.peer());
  316. }
  317. void SocketClient::OnNotifyAdd(int8_t* Data, int16_t Size)
  318. {
  319. RemoNet::SCAddRobot Rep;
  320. Rep.ParseFromArray(Data, Size);
  321. _notify->OnRobot(Rep.robot());
  322. }
  323. void SocketClient::OnNotifyDel(int8_t* Data, int16_t Size)
  324. {
  325. RemoNet::SCDelRobot Rep;
  326. Rep.ParseFromArray(Data, Size);
  327. _notify->OnNotifyDel(Rep.peer(),(EgoType)(Rep.egotype()));
  328. }
  329. void SocketClient::OnNotifyKick(int8_t* Data, int16_t Size)
  330. {
  331. _notify->OnNotifyKick();
  332. }
  333. void SocketClient::OnNotifyMoveEnd(int8_t* Data, int16_t Size)
  334. {
  335. RemoNet::SCMoveEnd Rep;
  336. Rep.ParseFromArray(Data, Size);
  337. int32_t uid = Rep.uid();
  338. WorkArea area = static_cast<WorkArea>(Rep.area());
  339. int32_t no = Rep.no();
  340. _notify->OnMoveEnd(uid, area, no);
  341. }
  342. void SocketClient::OnNotifyMoveRet(int8_t* Data, int16_t Size)
  343. {
  344. RemoNet::MoveRet Rep;
  345. Rep.ParseFromArray(Data, Size);
  346. MoveDesc desc = (MoveDesc)Rep.desc();
  347. _notify->OnNotifyMoveRet(desc);
  348. }
  349. #else
  350. void SocketClient::OnNotifyReq(int8_t* Data, int16_t Size)
  351. {
  352. RemoNet::CSReq Rep;
  353. Rep.ParseFromArray(Data, Size);
  354. _notify->OnVideoReq(Rep.index(),Rep.peer());
  355. }
  356. void SocketClient::OnNotifyMoveBegin(int8_t* Data, int16_t Size)
  357. {
  358. RemoNet::SCMoveBegin Rep;
  359. Rep.ParseFromArray(Data, Size);
  360. int32_t are=Rep.area();
  361. int32_t no=Rep.no();
  362. _notify->OnMoveBegin(static_cast<WorkArea>(Rep.area()), Rep.no());
  363. }
  364. #endif
  365. void SocketClient::OnNotifyLeave(int8_t* Data, int16_t Size)
  366. {
  367. RemoNet::Leave Req;
  368. Req.ParseFromArray(Data, Size);
  369. int32_t peer = Req.peer();
  370. EgoType type = static_cast<EgoType>(Req.egotype());
  371. _notify->OnVideoLeave(peer,type);
  372. }
  373. void SocketClient::WriteVideoReq(int32_t peer,int32_t index)
  374. {
  375. //_peer = peer;
  376. RemoNet::CSReq Req;
  377. Req.set_peer(peer);
  378. Req.set_index(index);
  379. Req.set_egotype(EgoType::Car);
  380. MessageHead Head;
  381. CIOBuffer pBuffer;
  382. Head.Command = RemoNet::CS_Req;
  383. Head.Length = Req.ByteSizeLong();
  384. Head.Serialize(pBuffer.Buffer);
  385. auto ptr = pBuffer.Buffer + MessageHead::Size();
  386. Req.SerializeToArray(ptr, Head.Length);
  387. pBuffer.Length = MessageHead::Size() + Head.Length;
  388. Write(&pBuffer);
  389. }
  390. void SocketClient::WriteVideoRep(int32_t peer,RemoNet::VideoDesc desc,int32_t index)
  391. {
  392. // _peer = peer;
  393. RemoNet::CSRep Req;
  394. Req.set_peer(peer);
  395. Req.set_desc(desc);
  396. Req.set_index(index);
  397. MessageHead Head;
  398. CIOBuffer pBuffer;
  399. Head.Command = RemoNet::CS_Rep;
  400. Head.Length = Req.ByteSizeLong();
  401. Head.Serialize(pBuffer.Buffer);
  402. auto ptr = pBuffer.Buffer + MessageHead::Size();
  403. Req.SerializeToArray(ptr, Head.Length);
  404. pBuffer.Length = MessageHead::Size() + Head.Length;
  405. Write(&pBuffer);
  406. }
  407. void SocketClient::Write(CIOBuffer* pBuffer)
  408. {
  409. if (_connected)
  410. {
  411. int32_t ret=::send(sockfd, (const char *)pBuffer->Buffer, pBuffer->Length, 0);
  412. if (ret <= 0)
  413. {
  414. #ifdef WIN32
  415. closesocket(sockfd);
  416. #else
  417. close(sockfd);
  418. #endif
  419. _connected = false;
  420. }
  421. }
  422. }
  423. void SocketClient::WriteOffer(int32_t peer,int32_t index,const char* type, const char* sdp)
  424. {
  425. RemoNet::Offer Req;
  426. Req.set_peer(peer);
  427. Req.set_sdp(sdp);
  428. Req.set_type(type);
  429. Req.set_index(index);
  430. MessageHead Head;
  431. CIOBuffer pBuffer;
  432. Head.Command = RemoNet::CS_Offer;
  433. Head.Length = Req.ByteSizeLong();
  434. Head.Serialize(pBuffer.Buffer);
  435. auto ptr = pBuffer.Buffer + MessageHead::Size();
  436. Req.SerializeToArray(ptr, Head.Length);
  437. pBuffer.Length = MessageHead::Size() + Head.Length;
  438. Write(&pBuffer);
  439. }
  440. void SocketClient::WriteAnswer(int32_t peer,int32_t index, const char* type, const char* sdp)
  441. {
  442. RemoNet::Answer Req;
  443. Req.set_peer(peer);
  444. Req.set_sdp(sdp);
  445. Req.set_type(type);
  446. Req.set_index(index);
  447. MessageHead Head;
  448. CIOBuffer pBuffer;
  449. Head.Command = RemoNet::CS_Answer;
  450. Head.Length = Req.ByteSizeLong();
  451. Head.Serialize(pBuffer.Buffer);
  452. auto ptr = pBuffer.Buffer + MessageHead::Size();
  453. Req.SerializeToArray(ptr, Head.Length);
  454. pBuffer.Length = MessageHead::Size() + Head.Length;
  455. Write(&pBuffer);
  456. }
  457. void SocketClient::WriteCandidate(int32_t peer,int32_t index, const char* candidate, int32_t sdp_mline_index, const char* sdp_mid)
  458. {
  459. RemoNet::Candidate Req;
  460. Req.set_peer(peer);
  461. Req.set_candidate(candidate);
  462. Req.set_index(index);
  463. Req.set_sdpmid(sdp_mid);
  464. Req.set_sdpmlineindex(sdp_mline_index);
  465. MessageHead Head;
  466. CIOBuffer pBuffer;
  467. Head.Command = RemoNet::CS_Candidate;
  468. Head.Length = Req.ByteSizeLong();
  469. Head.Serialize(pBuffer.Buffer);
  470. auto ptr = pBuffer.Buffer + MessageHead::Size();
  471. Req.SerializeToArray(ptr, Head.Length);
  472. pBuffer.Length = MessageHead::Size() + Head.Length;
  473. Write(&pBuffer);
  474. }
  475. /*
  476. void SocketClient::MessageCallback(void * user_data,const void * data,const int32_t size)
  477. {
  478. SocketClient* lhs=static_cast<SocketClient*>(user_data);
  479. lhs->OnPeerMessage(data,size);
  480. }*/
  481. void SocketClient::OnPeerMessage(ChannelType type,int16_t cmd,int16_t length,const void * data)
  482. {
  483. _notify->OnMessageFrameNotify(type,cmd,length,data);
  484. }
  485. void SocketClient::WriteKeepAlive()
  486. {
  487. MessageHead Head;
  488. CIOBuffer pBuffer;
  489. Head.Command = RemoNet::CS_KeepAlive;
  490. Head.Length = 0;
  491. Head.Serialize(pBuffer.Buffer);
  492. pBuffer.Length = MessageHead::Size() + Head.Length;
  493. Write(&pBuffer);
  494. }