sensor_socket.h 4.5 KB


  1. #pragma once
  2. #ifdef WIN32
  3. #ifndef WIN32_LEAN_AND_MEAN
  4. #define WIN32_LEAN_AND_MEAN
  5. #endif
  6. #include <WS2tcpip.h>
  7. #include <WinSock2.h>
  8. #define socketerrno WSAGetLastError()
  9. #define SOCKET_EAGAIN_EINPROGRESS WSAEINPROGRESS
  10. #define SOCKET_EWOULDBLOCK WSAEWOULDBLOCK
  11. #ifndef _SSIZE_T_DEFINED
  12. typedef int ssize_t;
  13. #define _SSIZE_T_DEFINED
  14. #endif
  15. #ifndef _SOCKET_T_DEFINED
  16. typedef SOCKET socket_t;
  17. #define _SOCKET_T_DEFINED
  18. #endif
  19. #else
  20. #include <unistd.h>
  21. #include <arpa/inet.h>
  22. #include <sys/types.h>
  23. #include <sys/socket.h>
  24. #include <netinet/in.h>
  25. #include <netinet/tcp.h>
  26. #include <fcntl.h>
  27. #include <poll.h>
  28. #define socketerrno errno
  29. #define SOCKET_EAGAIN_EINPROGRESS EAGAIN
  30. #define SOCKET_EWOULDBLOCK EWOULDBLOCK
  31. #define INVALID_SOCKET -1
  32. #define SOCKET_ERROR -1
  33. #ifndef _SOCKET_T_DEFINED
  34. typedef int socket_t;
  35. #define _SOCKET_T_DEFINED
  36. #endif
  37. #endif
  38. #include <thread>
  39. #include <mutex>
  40. #include <iostream>
  41. #include <unordered_map>
  42. #include "../common/comm.h"
  43. #ifdef _MAINDLG
  44. class CThreadWindow;
  45. #else
  46. class CMessageQueue;
  47. #endif
  48. class CIOBuffer;
  49. template<typename T>
  50. class SensorSocket
  51. {
  52. public:
  53. #ifdef _MAINDLG
  54. SensorSocket(CThreadWindow* q, std::string can_ip, int32_t can_port, int32_t host_port);
  55. #else
  56. SensorSocket(CMessageQueue * q,std::string can_ip,int32_t can_port,int32_t host_port);
  57. #endif
  58. bool Start();
  59. void Run();
  60. void Stop();
  61. void Write(int8_t * pBuffer,int32_t size);
  62. T* Get();
  63. #ifndef WIN32
  64. void SetStartRead(bool b);
  65. void SetStartWrite(bool b);
  66. #endif
  67. private:
  68. socket_t _fd;
  69. std::thread _thread;
  70. bool _run;
  71. std::string _canip;
  72. int32_t _canport;
  73. int32_t _hostport;
  74. std::mutex _lock;
  75. std::unordered_map<int32_t, cannet_frame> _message;
  76. std::unique_ptr<T> _sensorNotify;
  77. sockaddr_in _canaddr;
  78. #ifndef WIN32
  79. bool _startRead;
  80. bool _startWrite;
  81. #endif
  82. };
  83. template<typename T>
  84. #ifdef _MAINDLG
  85. SensorSocket<T>::SensorSocket(CThreadWindow* q, std::string can_ip, int32_t can_port, int32_t host_port)
  86. #else
  87. SensorSocket<T>::SensorSocket(CMessageQueue* q, std::string can_ip, int32_t can_port, int32_t host_port)
  88. #endif
  89. {
  90. _sensorNotify=std::make_unique<T>(q);
  91. _canip=can_ip;
  92. _canport=can_port;
  93. _hostport=host_port;
  94. #ifndef WIN32
  95. _startWrite=_startRead=true;
  96. #endif
  97. }
  98. template<typename T>
  99. bool SensorSocket<T>::Start()
  100. {
  101. #ifdef WIN32
  102. WSAData data;
  103. WSAStartup(MAKEWORD(2, 2), &data);
  104. #endif
  105. std::cout<<"SensorSocket<T>::Start :"<<_canip<<","<<_canport<<std::endl;
  106. _sensorNotify->SetSensorSocket(this);
  107. _fd = socket(AF_INET, SOCK_DGRAM, 0);
  108. sockaddr_in sin;
  109. sin.sin_family = AF_INET;
  110. sin.sin_port = htons(_hostport);
  111. sin.sin_addr.s_addr = htonl(INADDR_ANY); //inet_addr("192.168.1.9");
  112. if (::bind(_fd, (sockaddr*)&sin, sizeof(sin)) == -1)
  113. return false;
  114. _canaddr.sin_family=AF_INET;
  115. _canaddr.sin_addr.s_addr=inet_addr(_canip.c_str());
  116. _canaddr.sin_port=htons(_canport);
  117. _thread = std::thread(&SensorSocket::Run, this);
  118. _sensorNotify->Start();
  119. return true;
  120. }
  121. template<typename T>
  122. void SensorSocket<T>::Run()
  123. {
  124. _run = true;
  125. struct pollfd fds[1];
  126. fds[0].fd = _fd;
  127. fds[0].events = POLLIN;
  128. // long long k = 0;
  129. //long long tick = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  130. while (_run)
  131. {
  132. #ifdef WEBRTC_LINUX
  133. if(poll(fds, 1, 1000) > 0)
  134. {
  135. if (fds[0].revents & POLLIN)
  136. {
  137. #endif
  138. CIOBuffer pBuffer;
  139. sockaddr_in from;
  140. socklen_t fromlen=sizeof(sockaddr_in);
  141. int32_t ret = recvfrom(_fd,(char *)pBuffer.Buffer, CIOBuffer::IO_BUFFER_SIZE,0,(sockaddr*)&from,&fromlen);
  142. if (ret <= 0||!_run)
  143. {
  144. continue;
  145. }
  146. _sensorNotify->Notify(pBuffer.Buffer,ret);
  147. #ifdef WEBRTC_LINUX
  148. }
  149. }
  150. #endif
  151. }
  152. std::cout<<"SensorSocket<T>::Run Finished"<<std::endl;
  153. }
  154. template<typename T>
  155. void SensorSocket<T>::Write(int8_t * pBuffer,int32_t size)
  156. {
  157. #ifndef WIN32
  158. if(_startWrite==false) return;
  159. #endif
  160. socklen_t len=sizeof(_canaddr);
  161. int ret=::sendto(_fd,(char *)pBuffer,size,0,(const sockaddr *)&_canaddr,len);
  162. {
  163. // std::cout<<"ret = "<<ret<<" size ="<<pBuffer->Length<<std::endl;
  164. }
  165. }
  166. template<typename T>
  167. void SensorSocket<T>::Stop()
  168. {
  169. if(!_run) return;
  170. _sensorNotify->Stop();
  171. _run = false;
  172. #ifdef WIN32
  173. closesocket(_fd);
  174. #else
  175. close(_fd);
  176. #endif
  177. std::cout<<"SensorSocket<T>::Stop"<<std::endl;
  178. _thread.join();
  179. std::cout<<"SensorSocket<T>::Stop finished"<<std::endl;
  180. }
  181. #ifndef WIN32
  182. template<typename T>
  183. void SensorSocket<T>::SetStartRead(bool b)
  184. {
  185. _startRead=b;
  186. }
  187. template<typename T>
  188. void SensorSocket<T>::SetStartWrite(bool b)
  189. {
  190. _startWrite=b;
  191. }
  192. #endif
  193. template<typename T>
  194. T* SensorSocket<T>::Get()
  195. {
  196. return _sensorNotify.get();
  197. }