Connection_d1.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602
  1. """
  2. todo
  3. 1、存在消息被异步多次处理的问题
  4. """
  5. # from hub import methods, Global
  6. import struct
  7. import asyncio
  8. import time
  9. import socket
  10. import sys
  11. import importlib
  12. # --- for linux
  13. sys.path.append('/home/sri/repositories/repositories/sri-project.demo-py/sri-dino-pyserver01')
  14. sys.path.append('/home/sri/repositories/repositories/sri-project.demo-py/sri-pysdk')
  15. protobuf = importlib.import_module(f"xprotobuf.protocol_pb2")
  16. methods = importlib.import_module(f"xlib")
  17. clients = {} # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)}
  18. serial_rid_dict = {
  19. '65F7171A-5585-46C7-A9D6-967ABA9EB223': 1000000,
  20. '7AF3F619-5067-4EE0-A710-89A6CB097EFE': 1000001,
  21. 'ECB93A87-560B-4022-8C5F-CBF9FE1E596A': 1000002,
  22. 'C0D14B6F-0FF0-4B68-877D-D2CB886FCD0E': 1000002,
  23. 'E537DDFB-6E3E-4E1A-AD18-AC21393BE300': 1000004, # 正在使用
  24. }
  25. account_uid_dict = {
  26. 'ego': 3
  27. }
  28. class SRIConnection(asyncio.Protocol):
  29. """"""
  30. head_sequence = '<hh' # 字节序规则
  31. head_size = struct.calcsize(head_sequence)
  32. message_data = b''
  33. def connection_made(self, client):
  34. """
  35. 建立客户端连接
  36. """
  37. peername = client.get_extra_info('peername')
  38. self.connection_id = f"{peername[0].replace('.', '')}-{peername[1]}" # 12700130000 客户端id
  39. self.client = client
  40. # self.data = b''
  41. self.client_type = None
  42. self.client_info = None
  43. self.update_at = methods.now_ts()
  44. clients[self.connection_id] = self
  45. # 获取底层 socket
  46. # sock = self.client.get_extra_info('socket')
  47. # 查看缓冲区大小
  48. # recv_buffer_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
  49. # send_buffer_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
  50. # methods.debug_log(f'{self.connection_id}|SRIConnection70', f"Receive buffer size: {recv_buffer_size}")
  51. # methods.debug_log(f'{self.connection_id}|SRIConnection70', f"Send buffer size: {send_buffer_size}")
  52. def connection_lost(self, exc):
  53. """
  54. 关闭连接
  55. """
  56. methods.debug_log(f"{self.connection_id}|SRIConnection085", f"连接已关闭")
  57. # --- 处理车端掉线,通知所有舱端
  58. if self.client_type == 'vehicle':
  59. """
  60. SCDelRobot: 消息体
  61. SCDelRobot.peer: int32
  62. SCDelRobot.egotype: int32
  63. """
  64. o2 = protobuf.SCDelRobot()
  65. o2.peer = self.client_info.get('rid') # 车端唯一标识
  66. o2.egotype = self.client_info.get('egotype')
  67. re_command_id = protobuf.SC_NotifyDel # 4017
  68. re_body_length = o2.ByteSize()
  69. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  70. re_body_data = o2.SerializeToString()
  71. re_send_data = re_head_data + re_body_data
  72. methods.debug_log(f"{self.connection_id}|SRIConnection085", f"re_command_id: {re_command_id}")
  73. for item in clients.values():
  74. if item.client_type == 'cockpit':
  75. item.client.write(re_send_data)
  76. # --- clean
  77. if self.connection_id in clients:
  78. del clients[self.connection_id]
  79. def data_received(self, data):
  80. """
  81. 消息处理
  82. """
  83. # methods.debug_log(f'{self.connection_id}|SRIConnection96', f"data length: {len(data)}")
  84. asyncio.create_task(self.handle_data(data))
  85. async def handle_data(self, data):
  86. """
  87. 消息处理
  88. """
  89. # methods.debug_log(f'{self.connection_id}|SRIConnection102', f"data length: {len(data)}")
  90. loop = asyncio.get_running_loop()
  91. await loop.run_in_executor(None, self.process_data, data)
  92. def process_data(self, data):
  93. """
  94. 消息处理
  95. """
  96. try:
  97. methods.debug_log(f"{self.connection_id}|SRIConnection114", f"receive: {len(data)}, message: {repr(data)}")
  98. self.message_data += data # 执行了这个以后
  99. methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 1")
  100. methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 2")
  101. methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 3")
  102. count = 0
  103. while True:
  104. # --- print
  105. count += 1
  106. methods.debug_log(f"{self.connection_id}|SRIConnection123",
  107. f"---------------------------------- while count: {count}")
  108. # 确保有足够的字节来处理消息头
  109. if len(self.message_data) < self.head_size:
  110. break
  111. # 获取消息头
  112. head_data = self.message_data[:self.head_size]
  113. command_id, body_length = struct.unpack(self.head_sequence, head_data)
  114. methods.debug_log(f'{self.connection_id}|SRIConnection130',
  115. f"command_id: {command_id}, body_length: {body_length}")
  116. # 检查命令ID是否有效
  117. if not (2000 <= command_id < 5000):
  118. self.message_data = b'' # 清空无效数据
  119. break
  120. # 检查完整消息是否接收完毕
  121. total_length = self.head_size + body_length
  122. if len(self.message_data) < total_length:
  123. break # 数据不完整,等待更多数据
  124. # 调用相应处理方法
  125. method = getattr(self, f'message{command_id}', None)
  126. if method:
  127. method(self.message_data[self.head_size:total_length]) # 处理完整消息
  128. else:
  129. methods.debug_log(f"{self.connection_id}|SRIConnectionError",
  130. f"No handler for command_id: {command_id}")
  131. # 移除已处理的消息
  132. self.message_data = self.message_data[total_length:]
  133. except Exception as exception:
  134. methods.debug_log(f"{self.connection_id}|SRIConnection132", f"#exception: {exception}")
  135. methods.debug_log(f"{self.connection_id}|SRIConnection123", f"#traceback: {methods.trace_log()}")
  136. def message2008(self, body_data):
  137. # methods.debug_log(f"{self.connection_id}|SRIConnection80", f"message: 2008")
  138. pass
  139. def message2009(self, body_data):
  140. # --- 监听 2009
  141. """
  142. CSAdd: 消息体
  143. CSAdd.serial: string
  144. CSAdd.type: int32 EgoType::Car | EgoType::None EgoType::User EgoType::Car
  145. CSAdd.name: string
  146. """
  147. object0 = protobuf.CSAdd()
  148. object0.ParseFromString(body_data)
  149. methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#serial: {object0.serial}")
  150. methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#name: {object0.name}")
  151. methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#type: {object0.type}")
  152. # --- update ---
  153. self.client_type = 'vehicle'
  154. self.client_info = {
  155. 'rid': serial_rid_dict.get(object0.serial),
  156. 'name': object0.name,
  157. 'serial': object0.serial,
  158. 'egotype': 2, # 车端类型
  159. }
  160. # --- send 4016
  161. """
  162. SCAddRobot: 消息体
  163. SCAddRobot.robot: Robot
  164. Robot: 消息体
  165. Robot.rid: int32
  166. Robot.name: string
  167. Robot.type: int32
  168. Robot.state: RobotState Offline Online Busy
  169. """
  170. o1 = protobuf.Robot()
  171. o1.rid = self.client_info.get('rid')
  172. o1.name = object0.name
  173. o1.type = 2 # 0 EgoType::None 1 EgoType::User 2 EgoType::Car
  174. o1.state = protobuf.Robot.Online
  175. o2 = protobuf.SCAddRobot()
  176. o2.robot.CopyFrom(o1)
  177. re_command_id = protobuf.SC_NotifyAdd # 4016
  178. re_body_length = o2.ByteSize()
  179. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  180. re_body_data = o2.SerializeToString()
  181. re_send_data = re_head_data + re_body_data
  182. methods.debug_log(f"{self.connection_id}|SRIConnection136", f"sendmessage: {re_command_id}")
  183. # --- send 4016 发送全部舱端
  184. for item in clients.values():
  185. if item.client_type == 'cockpit':
  186. item.client.write(re_send_data)
  187. # --- send 4007 todo 凯强说并未用到
  188. # o1 = protobuf.SCAdd()
  189. # o1.ret = True
  190. # o1.uid = 112233 # todo 这个uid,应该是哪个舱端永辉在操作这个车 | 但是现在还没有人操控车
  191. # o1.name = object0.name
  192. # re_command_id = protobuf.SC_Add # 4007
  193. # re_body_length = o1.ByteSize()
  194. # re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  195. # re_body_data = o1.SerializeToString()
  196. # re_send_data = re_head_data + re_body_data
  197. # methods.debug_log(f"{self.connection_id} | SRIConnection150", f"re_command_id: {re_command_id}")
  198. # self.client.write(re_send_data)
  199. def message2000(self, body_data):
  200. # --- 解析消息体
  201. object = protobuf.CSSign()
  202. object.ParseFromString(body_data)
  203. methods.debug_log(f"{self.connection_id}|SRIConnection162", f"#account: {object.account}")
  204. methods.debug_log(f"{self.connection_id}|SRIConnection162", f"#password: {object.password}")
  205. # --- update ---
  206. self.client_type = 'cockpit'
  207. self.client_info = {
  208. 'uid': 3, # 对应数据库里的ego的id
  209. 'name': 'ego', # 对应数据库里
  210. 'egotype': 1, # 舱端类型
  211. }
  212. # --- send 4000
  213. object = protobuf.SCSign()
  214. object.ret = True
  215. object.uid = self.client_info.get('uid')
  216. object.name = self.client_info.get('name')
  217. re_command_id = protobuf.SC_Sign # 4000
  218. re_body_length = object.ByteSize()
  219. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  220. re_body_data = object.SerializeToString()
  221. re_send_data = re_head_data + re_body_data
  222. methods.debug_log(f"{self.connection_id}|SRIConnection175", f"re_command_id: {re_command_id}")
  223. self.client.write(re_send_data)
  224. def message2010(self, body_data):
  225. # --- send 4008 发送全部车端信息列表
  226. """
  227. Robot: 消息体
  228. Robot.rid: string
  229. Robot.name: string
  230. Robot.type: int32 EgoType::Car | EgoType::None EgoType::User EgoType::Car
  231. Robot.state: enum Offline Online Busy
  232. """
  233. o2 = protobuf.SCRobot()
  234. for item in clients.values():
  235. if item.client_type and item.client_type == 'vehicle':
  236. o1 = protobuf.Robot()
  237. o1.rid = item.client_info.get('rid')
  238. o1.name = item.client_info.get('name')
  239. o1.type = 2 # EgoType::Car
  240. o1.state = protobuf.Robot.RobotState.Value('Online')
  241. o2.robot.add().CopyFrom(o1)
  242. re_command_id = protobuf.SC_Robot # 4008
  243. re_body_length = o2.ByteSize()
  244. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  245. re_body_data = o2.SerializeToString()
  246. re_send_data = re_head_data + re_body_data
  247. methods.debug_log(f"{self.connection_id}|SRIConnection217", f"re_command_id: {re_command_id}")
  248. self.client.write(re_send_data)
  249. def message2001(self, body_data):
  250. # --- 解析消息体
  251. """
  252. CSReq: 消息体
  253. CSReq.peer: int32(rid,车端唯一标识)
  254. CSReq.index: int32(相机位置,RenderPosition)
  255. CSReq.egotype: int32(终端类型,舱端/车端)
  256. """
  257. o1 = protobuf.CSReq()
  258. o1.ParseFromString(body_data)
  259. methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#peer: {o1.peer}")
  260. methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#peer: {o1.index}")
  261. # methods.debug_log(f"{self.connection_id}|SRIConnection235.message2001",
  262. # f"#=============================index: {o1.index}")
  263. methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#egotype: {o1.egotype}")
  264. methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#uid: {self.client_info.get('uid')}")
  265. methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#rid: {self.client_info.get('rid')}")
  266. # --- send 4009 指定车端
  267. for item in clients.values():
  268. if item.client_info.get('rid') and item.client_info.get('rid') == o1.peer:
  269. """
  270. CSReq: 消息体
  271. CSReq.peer: int32(rid,车端唯一标识)
  272. CSReq.index: int32(相机位置,RenderPosition)
  273. CSReq.egotype: int32(终端类型,舱端/车端)
  274. """
  275. o2 = protobuf.CSReq()
  276. o2.peer = self.client_info.get('uid')
  277. o2.index = o1.index
  278. o2.egotype = o1.egotype
  279. re_command_id = protobuf.SC_NotifyReq # 4009
  280. re_body_length = o2.ByteSize()
  281. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  282. re_body_data = o2.SerializeToString()
  283. re_send_data = re_head_data + re_body_data
  284. methods.debug_log(f"{self.connection_id}|SRIConnection217", f"re_command_id: {re_command_id}")
  285. item.client.write(re_send_data)
  286. def message2002(self, body_data):
  287. # --- 解析消息体
  288. """
  289. CSRep: 消息体
  290. CSRep.desc: VideoDesc
  291. CSRep.peer: int32
  292. CSRep.index: int32
  293. CSRep.egotype: int32
  294. """
  295. o1 = protobuf.CSRep()
  296. o1.ParseFromString(body_data)
  297. methods.debug_log(f"{self.connection_id}|SRIConnection319", f"#desc: {o1.desc}")
  298. methods.debug_log(f"{self.connection_id}|SRIConnection319", f"#peer: {o1.peer}")
  299. methods.debug_log(f"{self.connection_id}|SRIConnection319", f"#index: {o1.index}")
  300. methods.debug_log(f"{self.connection_id}|SRIConnection319", f"#egotype: {o1.egotype}")
  301. methods.debug_log(f"{self.connection_id}|SRIConnection235", f"#uid: {self.client_info.get('uid')}")
  302. methods.debug_log(f"{self.connection_id}|SRIConnection235", f"#rid: {self.client_info.get('rid')}")
  303. # --- send 4010 通知指定舱端
  304. for item in clients.values():
  305. if item.client_info.get('uid') and item.client_info.get('uid') == o1.peer:
  306. o2 = protobuf.CSRep()
  307. o2.desc = o1.desc
  308. o2.peer = self.client_info.get('rid')
  309. o2.index = o1.index
  310. o2.egotype = o1.egotype
  311. re_command_id = protobuf.SC_NotifyRep # 4010
  312. re_body_length = o2.ByteSize()
  313. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  314. re_body_data = o2.SerializeToString()
  315. re_send_data = re_head_data + re_body_data
  316. methods.debug_log(f"{self.connection_id}|SRIConnection217", f"re_command_id: {re_command_id}")
  317. item.client.write(re_send_data)
  318. def message2004(self, body_data):
  319. # --- 解析消息体
  320. """
  321. Offer: 消息体
  322. Offer.index: int32
  323. Offer.peer: int32(车端rid)
  324. Offer.type: string
  325. Offer.sdp: string
  326. """
  327. object = protobuf.Offer()
  328. object.ParseFromString(body_data)
  329. methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#peer: {object.peer}")
  330. methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#uid: {self.client_info.get('uid')}")
  331. methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#rid: {self.client_info.get('rid')}")
  332. # --- send 4012 指定车端
  333. for item in clients.values():
  334. if item.client_info.get('rid') and item.client_info.get('rid') == object.peer:
  335. o1 = protobuf.Offer()
  336. o1.index = object.index
  337. o1.peer = self.client_info.get('uid') # 将舱端id赋值到指定车端对象
  338. o1.type = object.type
  339. o1.sdp = object.sdp
  340. re_command_id = protobuf.SC_NotifyOffer # 4012
  341. re_body_length = o1.ByteSize()
  342. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  343. re_body_data = o1.SerializeToString()
  344. re_send_data = re_head_data + re_body_data
  345. methods.debug_log(f"{self.connection_id}|SRIConnection341", f"re_command_id: {re_command_id}")
  346. item.client.write(re_send_data)
  347. def message2005(self, body_data):
  348. # --- 解析消息体
  349. """
  350. Answer: 消息体
  351. Answer.index: int32
  352. Answer.peer: int32(舱端uid)
  353. Answer.type: string
  354. Answer.sdp: string
  355. """
  356. object = protobuf.Answer()
  357. object.ParseFromString(body_data)
  358. methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#peer: {object.peer}")
  359. methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#uid: {self.client_info.get('uid')}")
  360. methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#rid: {self.client_info.get('rid')}")
  361. # --- send 4011 指定舱端
  362. count = 0
  363. for item in clients.values():
  364. count += 1
  365. methods.debug_log(f"{self.connection_id}|SRIConnection428",
  366. f"[{count}]item.client_info: {item.client_info}")
  367. if item.client_info.get('uid') and item.client_info.get('uid') == object.peer:
  368. o1 = protobuf.Answer()
  369. o1.index = object.index
  370. o1.peer = self.client_info.get('rid') # 将车端id赋值到指定舱端对象,
  371. o1.type = object.type
  372. o1.sdp = object.sdp
  373. re_command_id = protobuf.SC_NotifyAnswer # 4011
  374. re_body_length = o1.ByteSize()
  375. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  376. re_body_data = o1.SerializeToString()
  377. re_send_data = re_head_data + re_body_data
  378. methods.debug_log(f"{self.connection_id}|SRIConnection428", f"re_command_id: {re_command_id}")
  379. methods.debug_log(f"{self.connection_id}|SRIConnection428",
  380. f"=====================================================-----index: {o1.index}")
  381. item.client.write(re_send_data)
  382. def message2006(self, body_data):
  383. # --- 解析消息体
  384. """
  385. Candidate: 消息体
  386. Candidate.index: int32
  387. Candidate.peer: int32(车端rid)
  388. Candidate.type: string
  389. Candidate.candidate: string
  390. Candidate.sdpMLineIndex: int32
  391. Candidate.sdpMid: string
  392. Candidate.egotype: int32
  393. """
  394. object = protobuf.Candidate()
  395. object.ParseFromString(body_data)
  396. methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#peer: {object.peer}")
  397. methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#candidate.in: {object.candidate}")
  398. methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#uid: {self.client_info.get('uid')}")
  399. methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#rid: {self.client_info.get('rid')}")
  400. for item in clients.values():
  401. # --- send 4013 舱端到车端
  402. if item.client_info.get('rid') and item.client_info.get('rid') == object.peer:
  403. o1 = protobuf.Candidate()
  404. o1.index = object.index
  405. o1.peer = self.client_info.get('uid') # 将舱端id赋值到指定车端对象,
  406. o1.type = object.type
  407. o1.candidate = object.candidate
  408. o1.sdpMLineIndex = object.sdpMLineIndex
  409. o1.sdpMid = object.sdpMid
  410. o1.egotype = object.egotype
  411. re_command_id = protobuf.SC_NotifyCandidate # 4013
  412. re_body_length = o1.ByteSize()
  413. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  414. re_body_data = o1.SerializeToString()
  415. re_send_data = re_head_data + re_body_data
  416. methods.debug_log(f"{self.connection_id}|SRIConnection409", f"re_command_id: {re_command_id}")
  417. methods.debug_log(f"{self.connection_id}|SRIConnection409", f"#candidate.out: {o1.candidate}")
  418. item.client.write(re_send_data)
  419. # --- send 4013 车端到舱端
  420. if item.client_info.get('uid') and item.client_info.get('uid') == object.peer:
  421. o1 = protobuf.Candidate()
  422. o1.index = object.index
  423. o1.peer = self.client_info.get('rid') # 将车端id赋值到指定舱端对象,
  424. o1.type = object.type
  425. o1.candidate = object.candidate
  426. o1.sdpMLineIndex = object.sdpMLineIndex
  427. o1.sdpMid = object.sdpMid
  428. o1.egotype = object.egotype
  429. re_command_id = protobuf.SC_NotifyCandidate # 4013
  430. re_body_length = o1.ByteSize()
  431. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  432. re_body_data = o1.SerializeToString()
  433. re_send_data = re_head_data + re_body_data
  434. methods.debug_log(f"{self.connection_id}|SRIConnection426", f"re_command_id: {re_command_id}")
  435. methods.debug_log(f"{self.connection_id}|SRIConnection426", f"#candidate.out: {o1.candidate}")
  436. item.client.write(re_send_data)
  437. def message2007(self, body_data):
  438. # --- 解析消息体
  439. """
  440. Leave: 消息体
  441. Leave.peer: int32(车端rid)
  442. Leave.egotype: int32
  443. """
  444. o1 = protobuf.Leave()
  445. o1.ParseFromString(body_data)
  446. methods.debug_log(f"{self.connection_id}|SRIConnection497", f"#peer: {o1.peer}")
  447. # --- send 4009 指定车端
  448. for item in clients.values():
  449. if item.client_info.get('rid') and item.client_info.get('rid') == o1.peer:
  450. """
  451. Leave: 消息体
  452. Leave.peer: int32(车端rid)
  453. Leave.egotype: int32
  454. """
  455. o2 = protobuf.Leave()
  456. o2.peer = self.client_info.get('uid')
  457. o2.egotype = o1.egotype
  458. re_command_id = protobuf.SC_NotifyLeave # 4014
  459. re_body_length = o2.ByteSize()
  460. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  461. re_body_data = o2.SerializeToString()
  462. re_send_data = re_head_data + re_body_data
  463. methods.debug_log(f"{self.connection_id}|SRIConnection515", f"re_command_id: {re_command_id}")
  464. item.client.write(re_send_data)
  465. def message2014(self, body_data):
  466. # --- 解析消息体
  467. """
  468. message CSState
  469. {
  470. UserState state=1;
  471. int32 uid=2;
  472. };
  473. """
  474. o1 = protobuf.CSState()
  475. o1.ParseFromString(body_data)
  476. methods.debug_log(f"{self.connection_id}|SRIConnection530", f"#state: {o1.state}")
  477. methods.debug_log(f"{self.connection_id}|SRIConnection530", f"#uid: {o1.uid}")
  478. # --- send 4016 发送全部舱端
  479. for item in clients.values():
  480. if item.client_type == 'cockpit':
  481. """
  482. message SCState
  483. {
  484. UserState state=1;
  485. int32 uid=2;
  486. };
  487. """
  488. o2 = protobuf.SCState()
  489. o2.state = o1.state
  490. o2.uid = self.client_info.get('rid')
  491. re_command_id = protobuf.SC_State # 4022
  492. re_body_length = o2.ByteSize()
  493. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  494. re_body_data = o2.SerializeToString()
  495. re_send_data = re_head_data + re_body_data
  496. methods.debug_log(f"{self.connection_id}|SRIConnection551", f"re_command_id: {re_command_id}")
  497. item.client.write(re_send_data)
  498. @staticmethod
  499. async def check_clients():
  500. """
  501. 剔除掉线连接
  502. """
  503. count = 0
  504. while True:
  505. count += 1
  506. print(f"#count: {count}", flush=True)
  507. now_at = methods.now_ts()
  508. for connection_id in list(clients.keys()):
  509. object = clients.get(connection_id)
  510. print(f"#client: {object.client}, #update_at: {object.update_at}, #client_type: {object.client_type}",
  511. flush=True)
  512. await asyncio.sleep(3) # 发送消息的间隔时间
  513. @staticmethod
  514. async def run():
  515. """
  516. """
  517. # --- define ---
  518. loop = asyncio.get_running_loop()
  519. server = await loop.create_server(
  520. lambda: SRIConnection(),
  521. '0.0.0.0', 20917
  522. )
  523. # --- start ---
  524. async with server:
  525. print("Server listening on 0.0.0.0:20917", flush=True)
  526. # await loop.create_task(SRIConnection.check_clients())
  527. await server.serve_forever()
  528. if __name__ == '__main__':
  529. asyncio.run(SRIConnection.run())