socket_remote.cpp 9.9 KB


  1. #include "pch.h"
  2. #include <thread>
  3. #include "api/video/video_frame.h"
  4. #include "remote.pb.h"
  5. #include "socket_remote.h"
  6. #include "../common/iobuffer.h"
  7. SocketRemote::SocketRemote(IRemoteNotify* n) :_notify(n)
  8. {
  9. _connected = false;
  10. }
  11. bool SocketRemote::Start(const char* ip)
  12. {
  13. _ip = ip;
  14. FnMap.insert(std::make_pair(remote::sc_robot, &SocketRemote::OnRobotRep));
  15. FnMap.insert(std::make_pair(remote::sc_NotifyRep, &SocketRemote::OnNotifyRep));
  16. FnMap.insert(std::make_pair(remote::sc_NotifyAdd, &SocketRemote::OnNotifyAdd));
  17. FnMap.insert(std::make_pair(remote::sc_NotifyDel, &SocketRemote::OnNotifyDel));
  18. //FnMap.insert(std::make_pair(remote::sc_NotifyState, &SocketRemote::OnNotifyState));
  19. FnMap.insert(std::make_pair(remote::sc_sigin, &SocketRemote::OnSigin));
  20. FnMap.insert(std::make_pair(remote::sc_NotifyReq, &SocketRemote::OnNotifyReq));
  21. FnMap.insert(std::make_pair(remote::sc_NotifyLeave, &SocketRemote::OnNotifyLeave));
  22. FnMap.insert(std::make_pair(remote::sc_NotifyOffer, &SocketRemote::OnNotifyOffer));
  23. FnMap.insert(std::make_pair(remote::sc_NotifyAnswer, &SocketRemote::OnNotifyAnswer));
  24. FnMap.insert(std::make_pair(remote::sc_NotifyCandidate, &SocketRemote::OnNotifyCandidate));
  25. WSAData data;
  26. WSAStartup(MAKEWORD(2, 2), &data);
  27. sockfd = socket(AF_INET, SOCK_STREAM, 0);
  28. sockaddr_in sin;
  29. sin.sin_family = AF_INET;
  30. sin.sin_port = htons(20410);
  31. sin.sin_addr.s_addr = inet_addr(ip);
  32. if (connect(sockfd, (const sockaddr*)&sin, sizeof(sin)) == SOCKET_ERROR)
  33. {
  34. DWORD error = WSAGetLastError();
  35. closesocket(sockfd);
  36. sockfd = INVALID_SOCKET;
  37. _connected = false;
  38. }
  39. else
  40. {
  41. _connected = true;
  42. }
  43. _notify->OnConnected(_connected);
  44. _thread = std::thread(&SocketRemote::Run, this);
  45. return true;
  46. }
  47. void SocketRemote::OnRobotRep(int8_t* Data, int16_t Size)
  48. {
  49. remote::SCRobot Rep;
  50. Rep.ParseFromArray(Data, Size);
  51. for (int32_t i = 0; i < Rep.robot_size(); i++)
  52. {
  53. auto& node = Rep.robot(i);
  54. _notify->OnRobot(node);
  55. }
  56. }
  57. void SocketRemote::OnSigin(int8_t* Data, int16_t Size)
  58. {
  59. remote::SCSigin Rep;
  60. Rep.ParseFromArray(Data, Size);
  61. if (Rep.ret() == true)
  62. {
  63. _uid = Rep.uid();
  64. }
  65. _notify->OnLogin(_uid, Rep.ret());
  66. }
  67. void SocketRemote::OnNotifyAnswer(int8_t* Data, int16_t Size)
  68. {
  69. remote::Answer Rep;
  70. Rep.ParseFromArray(Data, Size);
  71. _notify->OnVideoAnswer(Rep.index(), Rep.type().c_str(), Rep.sdp().c_str());
  72. }
  73. void SocketRemote::OnNotifyCandidate(int8_t* Data, int16_t Size)
  74. {
  75. remote::Candidate Rep;
  76. Rep.ParseFromArray(Data, Size);
  77. _notify->OnVideoCandidate(Rep.index(), Rep.candidate().c_str(), Rep.sdpmlineindex(), Rep.sdpmid().c_str());
  78. }
  79. void SocketRemote::OnNotifyOffer(int8_t* Data, int16_t Size)
  80. {
  81. remote::Offer Rep;
  82. Rep.ParseFromArray(Data, Size);
  83. _notify->OnVideoOffer(Rep.index(), Rep.type().c_str(), Rep.sdp().c_str());
  84. }
  85. void SocketRemote::OnNotifyRep(int8_t* Data, int16_t Size)
  86. {
  87. remote::CSRep Rep;
  88. Rep.ParseFromArray(Data, Size);
  89. auto ok = Rep.desc() == remote::VideoDesc::OK;
  90. _notify->OnVideoRep(ok, Rep.index(), Rep.peer());
  91. }
  92. void SocketRemote::OnNotifyAdd(int8_t* Data, int16_t Size)
  93. {
  94. remote::SCAddRobot Rep;
  95. Rep.ParseFromArray(Data, Size);
  96. _notify->OnRobot(Rep.robot());
  97. }
  98. void SocketRemote::OnNotifyDel(int8_t* Data, int16_t Size)
  99. {
  100. remote::SCDelRobot Rep;
  101. Rep.ParseFromArray(Data, Size);
  102. _notify->OnNotifyDel(Rep.peer(), (EgoType)(Rep.egotype()));
  103. }
  104. void SocketRemote::OnNotifyReq(int8_t* Data, int16_t Size)
  105. {
  106. remote::CSReq Rep;
  107. Rep.ParseFromArray(Data, Size);
  108. _notify->OnVideoReq(Rep.index(), Rep.peer());
  109. }
  110. void SocketRemote::OnNotifyLeave(int8_t* Data, int16_t Size)
  111. {
  112. remote::Leave Req;
  113. Req.ParseFromArray(Data, Size);
  114. int32_t peer = Req.peer();
  115. EgoType type = static_cast<EgoType>(Req.egotype());
  116. _notify->OnVideoLeave(peer, type);
  117. }
  118. void SocketRemote::Run()
  119. {
  120. _run = true;
  121. int32_t Offset = 0;
  122. CIOBuffer Buffer;
  123. while (_run)
  124. {
  125. if (_connected)
  126. {
  127. auto ret = recv(sockfd, (char*)&Buffer.Buffer[Offset], CIOBuffer::IO_BUFFER_SIZE - Offset, 0);
  128. if (ret <= 0)
  129. {
  130. DWORD error = WSAGetLastError();
  131. closesocket(sockfd);
  132. _connected = false;
  133. _notify->OnConnected(_connected);
  134. }
  135. else
  136. {
  137. Offset += ret;
  138. if (Offset >= MessageHead::Size())
  139. {
  140. bool bNeedMove = false;
  141. MessageHead head;
  142. int8_t* ptr = Buffer.Buffer;
  143. while (true)
  144. {
  145. if (MessageHead::Size() <= Offset)
  146. {
  147. head.Deserialize(ptr);
  148. int32_t length = MessageHead::Size() + head.Length;
  149. if (Offset >= length)
  150. {
  151. int8_t* Data = ptr + MessageHead::Size();
  152. NetProcess(head.Command, Data, head.Length);
  153. ptr += length;
  154. Offset -= length;
  155. }
  156. else
  157. {
  158. bNeedMove = Offset > 0;
  159. if (bNeedMove)
  160. {
  161. std::cout << "need Move " << Offset << std::endl;
  162. }
  163. break;
  164. }
  165. }
  166. else
  167. {
  168. break;
  169. }
  170. }
  171. if (bNeedMove)
  172. {
  173. memmove(Buffer.Buffer, ptr, Offset);
  174. }
  175. }
  176. }
  177. }
  178. else
  179. {
  180. std::this_thread::sleep_for(std::chrono::seconds(1));
  181. sockfd = socket(AF_INET, SOCK_STREAM, 0);
  182. sockaddr_in sin;
  183. sin.sin_family = AF_INET;
  184. sin.sin_port = htons(20410);
  185. sin.sin_addr.s_addr = inet_addr(_ip.c_str());
  186. if (connect(sockfd, (const sockaddr*)&sin, sizeof(sin)) == SOCKET_ERROR)
  187. {
  188. closesocket(sockfd);
  189. _connected = false;
  190. sockfd = INVALID_SOCKET;
  191. continue;
  192. }
  193. else
  194. {
  195. _connected = true;
  196. }
  197. _notify->OnConnected(_connected);
  198. }
  199. }
  200. }
  201. void SocketRemote::NetProcess(int16_t cmd, int8_t* Data, int16_t Size)
  202. {
  203. auto it = FnMap.find(cmd);
  204. if (it != FnMap.end())
  205. {
  206. (this->*it->second)(Data, Size);
  207. }
  208. }
  209. void SocketRemote::Stop()
  210. {
  211. _run = false;
  212. closesocket(sockfd);
  213. sockfd = INVALID_SOCKET;
  214. _thread.join();
  215. }
  216. void SocketRemote::WriteSigin(EgoType type, std::string& account, std::string& pass, std::string& name)
  217. {
  218. remote::CSSigin Req;
  219. Req.set_account(account);
  220. Req.set_pass(pass);
  221. Req.set_name(name);
  222. Req.set_type(type);
  223. MessageHead Head;
  224. CIOBuffer pBuffer;
  225. Head.Command = remote::cs_sigin;
  226. Head.Length = Req.ByteSizeLong();
  227. Head.Serialize(pBuffer.Buffer);
  228. auto ptr = pBuffer.Buffer + MessageHead::Size();
  229. Req.SerializeToArray(ptr, Head.Length);
  230. pBuffer.Length = MessageHead::Size() + Head.Length;
  231. Write(&pBuffer);
  232. }
  233. void SocketRemote::Write(CIOBuffer* pBuffer)
  234. {
  235. if (_connected)
  236. {
  237. int32_t ret = ::send(sockfd, (const char*)pBuffer->Buffer, pBuffer->Length, 0);
  238. if (ret <= 0)
  239. {
  240. closesocket(sockfd);
  241. _connected = false;
  242. }
  243. }
  244. }
  245. void SocketRemote::WriteVideoReq(int32_t peer, int32_t index)
  246. {
  247. //_peer = peer;
  248. remote::CSReq Req;
  249. Req.set_peer(peer);
  250. Req.set_index(index);
  251. Req.set_egotype(EgoType::User);
  252. MessageHead Head;
  253. CIOBuffer pBuffer;
  254. Head.Command = remote::cs_req;
  255. Head.Length = Req.ByteSizeLong();
  256. Head.Serialize(pBuffer.Buffer);
  257. auto ptr = pBuffer.Buffer + MessageHead::Size();
  258. Req.SerializeToArray(ptr, Head.Length);
  259. pBuffer.Length = MessageHead::Size() + Head.Length;
  260. Write(&pBuffer);
  261. }
  262. void SocketRemote::WriteUserList(void)
  263. {
  264. remote::CSRobot Req;
  265. MessageHead Head;
  266. CIOBuffer pBuffer;
  267. Head.Command = remote::cs_robot;
  268. Head.Length = Req.ByteSizeLong();
  269. Head.Serialize(pBuffer.Buffer);
  270. auto ptr = pBuffer.Buffer + MessageHead::Size();
  271. Req.SerializeToArray(ptr, Head.Length);
  272. pBuffer.Length = MessageHead::Size() + Head.Length;
  273. Write(&pBuffer);
  274. }
  275. void SocketRemote::WriteVideoRep(int32_t peer, remote::VideoDesc desc, int32_t index)
  276. {
  277. // _peer = peer;
  278. remote::CSRep Req;
  279. Req.set_peer(peer);
  280. Req.set_desc(desc);
  281. Req.set_index(index);
  282. MessageHead Head;
  283. CIOBuffer pBuffer;
  284. Head.Command = remote::cs_rep;
  285. Head.Length = Req.ByteSizeLong();
  286. Head.Serialize(pBuffer.Buffer);
  287. auto ptr = pBuffer.Buffer + MessageHead::Size();
  288. Req.SerializeToArray(ptr, Head.Length);
  289. pBuffer.Length = MessageHead::Size() + Head.Length;
  290. Write(&pBuffer);
  291. }
  292. void SocketRemote::WriteKeepAlive()
  293. {
  294. MessageHead Head;
  295. CIOBuffer pBuffer;
  296. Head.Command = remote::cs_keepAlive;
  297. Head.Length = 0;
  298. Head.Serialize(pBuffer.Buffer);
  299. pBuffer.Length = MessageHead::Size() + Head.Length;
  300. Write(&pBuffer);
  301. }
  302. void SocketRemote::WriteOffer(int32_t peer, int32_t index, const char* type, const char* sdp)
  303. {
  304. remote::Offer Req;
  305. Req.set_peer(peer);
  306. Req.set_sdp(sdp);
  307. Req.set_type(type);
  308. Req.set_index(index);
  309. MessageHead Head;
  310. CIOBuffer pBuffer;
  311. Head.Command = remote::cs_offer;
  312. Head.Length = Req.ByteSizeLong();
  313. Head.Serialize(pBuffer.Buffer);
  314. auto ptr = pBuffer.Buffer + MessageHead::Size();
  315. Req.SerializeToArray(ptr, Head.Length);
  316. pBuffer.Length = MessageHead::Size() + Head.Length;
  317. Write(&pBuffer);
  318. }
  319. void SocketRemote::WriteAnswer(int32_t peer, int32_t index, const char* type, const char* sdp)
  320. {
  321. remote::Answer Req;
  322. Req.set_peer(peer);
  323. Req.set_sdp(sdp);
  324. Req.set_type(type);
  325. Req.set_index(index);
  326. MessageHead Head;
  327. CIOBuffer pBuffer;
  328. Head.Command = remote::cs_answer;
  329. Head.Length = Req.ByteSizeLong();
  330. Head.Serialize(pBuffer.Buffer);
  331. auto ptr = pBuffer.Buffer + MessageHead::Size();
  332. Req.SerializeToArray(ptr, Head.Length);
  333. pBuffer.Length = MessageHead::Size() + Head.Length;
  334. Write(&pBuffer);
  335. }
  336. void SocketRemote::WriteCandidate(int32_t peer, int32_t index, const char* candidate, int32_t sdp_mline_index, const char* sdp_mid)
  337. {
  338. remote::Candidate Req;
  339. Req.set_peer(peer);
  340. Req.set_candidate(candidate);
  341. Req.set_index(index);
  342. Req.set_sdpmid(sdp_mid);
  343. Req.set_sdpmlineindex(sdp_mline_index);
  344. MessageHead Head;
  345. CIOBuffer pBuffer;
  346. Head.Command = remote::cs_candidate;
  347. Head.Length = Req.ByteSizeLong();
  348. Head.Serialize(pBuffer.Buffer);
  349. auto ptr = pBuffer.Buffer + MessageHead::Size();
  350. Req.SerializeToArray(ptr, Head.Length);
  351. pBuffer.Length = MessageHead::Size() + Head.Length;
  352. Write(&pBuffer);
  353. }
  354. void SocketRemote::WriteVideoLeave(int32_t peer)
  355. {
  356. remote::Leave Req;
  357. Req.set_peer(peer);
  358. MessageHead Head;
  359. CIOBuffer pBuffer;
  360. Head.Command = remote::cs_leave;
  361. Head.Length = Req.ByteSizeLong();
  362. Head.Serialize(pBuffer.Buffer);
  363. auto ptr = pBuffer.Buffer + MessageHead::Size();
  364. Req.SerializeToArray(ptr, Head.Length);
  365. pBuffer.Length = MessageHead::Size() + Head.Length;
  366. Write(&pBuffer);
  367. }