Connection_b1.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. # from hub import methods, Global
  2. import struct
  3. import asyncio
  4. import time
  5. # from concurrent.futures import ThreadPoolExecutor
  6. import sys
  7. import importlib
  8. # --- for linux
  9. sys.path.append('/home/server/repositories/repositories/sri-project.demo-py/sri-server-bg03')
  10. sys.path.append('/home/server/repositories/repositories/sri-project.demo-py/3rdparty')
  11. protobuf = importlib.import_module(f"xprotobuf.protocol_pb2")
  12. methods = importlib.import_module(f"xlib")
  13. class SRIConnection(asyncio.Protocol):
  14. """"""
  15. # head_sequence = '<hh' # 字节序规则
  16. # all_connection_dict = {} # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)}
  17. # executor = ThreadPoolExecutor()
  18. clients = {} # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)}
  19. def connection_made(self, client):
  20. self.client = client
  21. self.peername = client.get_extra_info('peername')
  22. print(f"Connection from {self.peername}")
  23. # --- 自定义参数
  24. self.head_sequence = '<hh' # 字节序规则
  25. self.head_size = struct.calcsize(self.head_sequence)
  26. self.connection_id = f"{self.peername[0].replace('.', '')}{self.peername[1]}" # 12700130000
  27. # --- fill ---
  28. self.clients[self.connection_id] = client, methods.now_ts(), 'type=vehicle'
  29. def data_received(self, data):
  30. asyncio.create_task(self.handle_data(data))
  31. async def handle_data(self, data):
  32. loop = asyncio.get_running_loop()
  33. # 全局的 ThreadPoolExecutor,由 asyncio 库自动创建和管理。它的线程池大小默认是机器的CPU核心数乘以5
  34. await loop.run_in_executor(None, self.process_data, data)
  35. # 自定义的 ThreadPoolExecutor
  36. # await loop.run_in_executor(SRIConnection.executor, self.process_data, data)
  37. # result = await asyncio.get_running_loop().run_in_executor(None, self.process_data, data)
  38. # if result:
  39. # self.client.write(result)
  40. def process_data(self, data):
  41. head_data = data[:self.head_size]
  42. body_data = data[self.head_size:]
  43. command_id, body_length = struct.unpack(self.head_sequence, head_data)
  44. methods.debug_log('SRIConnection.37', f"Received values: {command_id}, {body_length}")
  45. if command_id == protobuf.CS_KeepAlive: # 2008
  46. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: None")
  47. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: None")
  48. elif command_id == protobuf.CS_Sign: # 2000
  49. # --- 接收请求数据
  50. object = protobuf.CSSign()
  51. object.ParseFromString(body_data)
  52. methods.debug_log(f"{self.connection_id} | SRIConnection.100",
  53. f"Received Response: account={object.account}, password={object.password}")
  54. # --- 发送返回数据
  55. object = protobuf.SCSign()
  56. object.ret = True
  57. object.uid = 112233
  58. object.cid = 223344
  59. object.name = 'aabbcc'
  60. re_command_id = protobuf.SC_Sign # 4000
  61. re_body_length = object.ByteSize()
  62. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  63. re_body_data = object.SerializeToString()
  64. re_send_data = re_head_data + re_body_data
  65. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}")
  66. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}")
  67. self.client.write(re_send_data)
  68. # --- test ---
  69. # for _ in range(10):
  70. # for i in range(3)[::-1]:
  71. # methods.debug_log(f"{self.connection_id} | SRIConnection.92", f"sleep {i}s!")
  72. # time.sleep(1)
  73. # methods.debug_log(f"{self.connection_id} | SRIConnection.94", f"send re_send_data")
  74. # self.client.write(re_send_data)
  75. elif command_id == protobuf.CS_Add: # 2009
  76. # --- 接收请求数据 2009
  77. """
  78. CSAdd: 消息体
  79. CSAdd.serial: string
  80. CSAdd.type: int32 EgoType::Car | EgoType::None EgoType::User EgoType::Car
  81. CSAdd.name: string
  82. """
  83. object0 = protobuf.CSAdd()
  84. object0.ParseFromString(body_data)
  85. methods.debug_log(f"{self.connection_id} | SRIConnection.105", f"#serial: {object0.serial}")
  86. methods.debug_log(f"{self.connection_id} | SRIConnection.105", f"#name: {object0.name}")
  87. # --- 发送返回数据 4016 todo 向所有 EgoType::User 也就是舱端发送消息
  88. """
  89. Robot: 消息体
  90. Robot.rid: int32
  91. Robot.name: string
  92. Robot.type: int32
  93. Robot.state: RobotState Offline Online Busy
  94. """
  95. o1 = protobuf.Robot()
  96. o1.rid = 112233
  97. o1.name = "aabbcc"
  98. o1.type = 123
  99. o1.state = protobuf.Robot.Online
  100. """
  101. SCAddRobot: 消息体
  102. SCAddRobot.robot: Robot
  103. """
  104. o2 = protobuf.SCAddRobot()
  105. o2.robot.CopyFrom(o1)
  106. re_command_id = protobuf.SC_NotifyAdd # 4016
  107. re_body_length = o2.ByteSize()
  108. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  109. re_body_data = o2.SerializeToString()
  110. re_send_data = re_head_data + re_body_data
  111. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}")
  112. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}")
  113. self.client.write(re_send_data)
  114. # --- 发送返回数据 4007 todo 向请求来的车端,返回设置是否成功的消息
  115. o1 = protobuf.SCAdd()
  116. o1.ret = True
  117. o1.uid = 112233
  118. o1.name = 'aabbcc'
  119. re_command_id = protobuf.SC_Add # 4007
  120. re_body_length = o1.ByteSize()
  121. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  122. re_body_data = o1.SerializeToString()
  123. re_send_data = re_head_data + re_body_data
  124. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}")
  125. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}")
  126. self.client.write(re_send_data)
  127. elif command_id == protobuf.CS_Offer: # 2004
  128. # --- 接收请求数据
  129. object = protobuf.Offer()
  130. object.ParseFromString(body_data)
  131. print(f"Received Response: peer={object.peer}")
  132. # --- 发送返回数据
  133. o1 = protobuf.Offer()
  134. o1.index = object.index
  135. o1.peer = object.peer
  136. o1.type = object.type
  137. o1.sdp = object.sdp
  138. re_command_id = protobuf.SC_NotifyOffer # 4012
  139. re_body_length = o1.ByteSize()
  140. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  141. re_body_data = o1.SerializeToString()
  142. re_send_data = re_head_data + re_body_data
  143. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}")
  144. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}")
  145. self.client.write(re_send_data)
  146. elif command_id == protobuf.CS_Answer: # 2005
  147. # --- 接收请求数据
  148. object = protobuf.Answer()
  149. object.ParseFromString(body_data)
  150. print(f"Received Response: peer={object.peer}")
  151. # --- 发送返回数据
  152. o1 = protobuf.Offer()
  153. o1.index = object.index
  154. o1.peer = object.peer
  155. o1.type = object.type
  156. o1.sdp = object.sdp
  157. re_command_id = protobuf.SC_NotifyAnswer # 4011
  158. re_body_length = o1.ByteSize()
  159. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  160. re_body_data = o1.SerializeToString()
  161. re_send_data = re_head_data + re_body_data
  162. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}")
  163. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}")
  164. self.client.write(re_send_data)
  165. elif command_id == protobuf.CS_Candidate: # 2006
  166. # --- 接收请求数据
  167. object = protobuf.Candidate()
  168. object.ParseFromString(body_data)
  169. print(f"Received Response: peer={object.peer}")
  170. # --- 发送返回数据
  171. o1 = protobuf.Offer()
  172. o1.index = object.index
  173. o1.peer = object.peer
  174. o1.type = object.type
  175. o1.candidate = object.candidate
  176. o1.sdpMLineIndex = object.sdpMLineIndex
  177. o1.sdpMid = object.sdpMid
  178. o1.egotype = object.egotype
  179. re_command_id = protobuf.SC_NotifyCandidate # 4013
  180. re_body_length = o1.ByteSize()
  181. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  182. re_body_data = o1.SerializeToString()
  183. re_send_data = re_head_data + re_body_data
  184. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}")
  185. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}")
  186. self.client.write(re_send_data)
  187. elif command_id == protobuf.CS_Req: # 2001 todo 实现收到qt端2001消息,向车端发送4009消息
  188. # --- 接收请求数据
  189. """
  190. CSReq: 消息体
  191. CSReq.peer: int32(uid,用户标识)
  192. CSReq.index: int32(相机位置,RenderPosition)
  193. CSReq.egotype: int32(终端类型,舱端/车端)
  194. """
  195. o1 = protobuf.CSReq()
  196. o1.ParseFromString(body_data)
  197. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"#peer: {o1.peer}")
  198. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"#index: {o1.index}")
  199. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"#egotype: {o1.egotype}")
  200. # --- 发送返回数据
  201. """
  202. CSReq: 消息体
  203. CSReq.peer: int32(uid,用户标识)
  204. CSReq.index: int32(相机位置,RenderPosition)
  205. CSReq.egotype: int32(终端类型,舱端/车端)
  206. """
  207. o2 = protobuf.CSReq()
  208. o2.peer = o1.peer
  209. o2.index = o1.index
  210. o2.type = o1.egotype
  211. re_command_id = protobuf.SC_NotifyReq # 4009
  212. re_body_length = o2.ByteSize()
  213. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  214. re_body_data = o2.SerializeToString()
  215. re_send_data = re_head_data + re_body_data
  216. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}")
  217. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}")
  218. self.client.write(re_send_data)
  219. def connection_lost(self, exc):
  220. print(f"SRIConnection closed by {self.peername}")
  221. # def connection_lost(self, exc):
  222. # print(f"Connection closed by {self.connection_id}")
  223. # if self.connection_id in SRIConnection.clients:
  224. # del SRIConnection.clients[self.connection_id]
  225. @classmethod
  226. async def run(cls):
  227. loop = asyncio.get_running_loop()
  228. server = await loop.create_server(
  229. lambda: SRIConnection(),
  230. '0.0.0.0', 20917
  231. )
  232. async with server:
  233. print(f"Server listening on 0.0.0.0:20917")
  234. methods.debug_log(f"SRIConnection.221", f"Server listening on 0.0.0.0:20917")
  235. await server.serve_forever()
  236. if __name__ == '__main__':
  237. asyncio.run(SRIConnection.run())
  238. # asyncio.run(main())