Connection_e1.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628
  1. """
  2. """
  3. # from hub import methods, Global
  4. import struct
  5. import asyncio
  6. import time
  7. import socket
  8. import sys
  9. import importlib
  10. # --- for linux
  11. sys.path.append('/home/sri/repositories/repositories/sri-project.demo-py/sri-dino-pyserver01')
  12. sys.path.append('/home/sri/repositories/repositories/sri-project.demo-py/sri-pysdk')
  13. protobuf = importlib.import_module(f"xprotobuf.protocol_pb2")
  14. methods = importlib.import_module(f"xlib")
  15. clients = {} # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)}
  16. serial_rid_dict = {
  17. '65F7171A-5585-46C7-A9D6-967ABA9EB223': 1000000,
  18. '7AF3F619-5067-4EE0-A710-89A6CB097EFE': 1000001,
  19. 'ECB93A87-560B-4022-8C5F-CBF9FE1E596A': 1000002,
  20. 'C0D14B6F-0FF0-4B68-877D-D2CB886FCD0E': 1000002,
  21. 'E537DDFB-6E3E-4E1A-AD18-AC21393BE300': 1000004, # 正在使用
  22. }
  23. account_uid_dict = {
  24. 'ego': 3
  25. }
  26. # live_relationship = {} # {<id-1>id-2>: True}
  27. class SRIConnection(asyncio.Protocol):
  28. """"""
  29. head_sequence = '<hh' # 字节序规则
  30. head_size = struct.calcsize(head_sequence)
  31. message_data = b''
  32. def __init__(self):
  33. self.lock = asyncio.Lock() # 初始化锁
  34. def connection_made(self, client):
  35. """
  36. 建立客户端连接
  37. """
  38. peername = client.get_extra_info('peername')
  39. # self.connection_id = f"{peername[0].replace('.', '')}-{peername[1]}" # 12700130000 客户端id
  40. self.connection_id = int(f"{peername[0].replace('.', '')}")
  41. self.client = client
  42. # self.data = b''
  43. self.client_type = None
  44. self.client_info = None
  45. self.update_at = methods.now_ts()
  46. clients[self.connection_id] = self
  47. # 获取底层 socket
  48. # sock = self.client.get_extra_info('socket')
  49. # 查看缓冲区大小
  50. # recv_buffer_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
  51. # send_buffer_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
  52. # methods.debug_log(f'{self.connection_id}|SRIConnection70', f"Receive buffer size: {recv_buffer_size}")
  53. # methods.debug_log(f'{self.connection_id}|SRIConnection70', f"Send buffer size: {send_buffer_size}")
  54. def connection_lost(self, exc):
  55. """
  56. 关闭连接
  57. """
  58. methods.debug_log(f"{self.connection_id}|SRIConnection085", f"连接已关闭")
  59. # --- 处理车端掉线,通知所有舱端
  60. if self.client_type == 'vehicle':
  61. """
  62. SCDelRobot: 消息体
  63. SCDelRobot.peer: int32
  64. SCDelRobot.egotype: int32
  65. """
  66. o2 = protobuf.SCDelRobot()
  67. o2.peer = self.client_info.get('connection_id') # 车端id
  68. o2.egotype = self.client_info.get('egotype')
  69. re_command_id = protobuf.SC_NotifyDel # 4017
  70. re_body_length = o2.ByteSize()
  71. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  72. re_body_data = o2.SerializeToString()
  73. re_send_data = re_head_data + re_body_data
  74. methods.debug_log(f"{self.connection_id}|SRIConnection085", f"re_command_id: {re_command_id}")
  75. for item in clients.values():
  76. if item.client_type == 'cockpit':
  77. item.client.write(re_send_data)
  78. # --- 处理舱端掉线,通知所有车端
  79. if self.client_type == 'cockpit':
  80. """
  81. Leave: 消息体
  82. Leave.peer: int32(车端rid)
  83. Leave.egotype: int32
  84. """
  85. o2 = protobuf.Leave()
  86. o2.peer = self.client_info.get('connection_id') # 舱端id
  87. o2.egotype = 1 # 客户端类型
  88. re_command_id = protobuf.SC_NotifyLeave # 4014
  89. re_body_length = o2.ByteSize()
  90. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  91. re_body_data = o2.SerializeToString()
  92. re_send_data = re_head_data + re_body_data
  93. methods.debug_log(f"{self.connection_id}|SRIConnection515", f"re_command_id: {re_command_id}")
  94. for item in clients.values():
  95. if item.client_type == 'vehicle':
  96. item.client.write(re_send_data)
  97. # --- clean
  98. if self.connection_id in clients:
  99. del clients[self.connection_id]
  100. # --- clean live_relationship
  101. # keys = [key for key in live_relationship.keys() if self.connection_id in key]
  102. # for key in keys:
  103. # del live_relationship[key]
  104. def data_received(self, data):
  105. """
  106. 消息处理
  107. """
  108. asyncio.create_task(self.handle_data(data))
  109. async def handle_data(self, data):
  110. """
  111. 消息处理
  112. """
  113. # --- before
  114. # loop = asyncio.get_running_loop()
  115. # await loop.run_in_executor(None, self.process_data, data)
  116. # async with self.lock: # 加入锁,确保只有一个任务在处理数据
  117. # await self.process_data(data)
  118. async with self.lock: # 加入锁,确保只有一个任务在处理数据
  119. loop = asyncio.get_running_loop()
  120. await loop.run_in_executor(None, self.process_data, data) # 调用非异步函数
  121. def process_data(self, data):
  122. """
  123. 消息处理
  124. """
  125. try:
  126. # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"receive: {len(data)}, message: {repr(data)}")
  127. self.message_data += data # 执行了这个以后
  128. # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 1")
  129. # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 2")
  130. # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 3")
  131. # count = 0
  132. while True:
  133. # --- print
  134. # count += 1
  135. # methods.debug_log(f"{self.connection_id}|SRIConnection123",
  136. # f"---------------------------------- while count: {count}")
  137. # 确保有足够的字节来处理消息头
  138. if len(self.message_data) < self.head_size:
  139. break
  140. # 获取消息头
  141. head_data = self.message_data[:self.head_size]
  142. command_id, body_length = struct.unpack(self.head_sequence, head_data)
  143. methods.debug_log(f'{self.connection_id}|SRIConnection130',
  144. f"command_id: {command_id}, body_length: {body_length}")
  145. # 检查命令ID是否有效
  146. if not (2000 <= command_id < 5000):
  147. self.message_data = b'' # 清空无效数据
  148. break
  149. # 检查完整消息是否接收完毕
  150. total_length = self.head_size + body_length
  151. if len(self.message_data) < total_length:
  152. break # 数据不完整,等待更多数据
  153. # 调用相应处理方法
  154. method = getattr(self, f'message{command_id}', None)
  155. if method:
  156. method(self.message_data[self.head_size:total_length]) # 处理完整消息
  157. else:
  158. methods.debug_log(f"{self.connection_id}|SRIConnectionError",
  159. f"No handler for command_id: {command_id}")
  160. # 移除已处理的消息
  161. self.message_data = self.message_data[total_length:]
  162. except Exception as exception:
  163. methods.debug_log(f"{self.connection_id}|SRIConnection132", f"#exception: {exception}")
  164. methods.debug_log(f"{self.connection_id}|SRIConnection123", f"#traceback: {methods.trace_log()}")
  165. def message2008(self, body_data):
  166. # methods.debug_log(f"{self.connection_id}|SRIConnection80", f"message: 2008")
  167. pass
  168. def message2009(self, body_data):
  169. # --- 监听 2009
  170. """
  171. CSAdd: 消息体
  172. CSAdd.serial: string
  173. CSAdd.type: int32 EgoType::Car | EgoType::None EgoType::User EgoType::Car
  174. CSAdd.name: string
  175. """
  176. object0 = protobuf.CSAdd()
  177. object0.ParseFromString(body_data)
  178. methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#serial: {object0.serial}")
  179. methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#name: {object0.name}")
  180. methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#type: {object0.type}")
  181. # --- update ---
  182. self.client_type = 'vehicle'
  183. self.client_info = {
  184. 'connection_id': self.connection_id,
  185. 'rid': serial_rid_dict.get(object0.serial),
  186. 'name': object0.name,
  187. 'serial': object0.serial,
  188. 'egotype': 2, # 客户端类型
  189. }
  190. """
  191. SCAddRobot: 消息体
  192. SCAddRobot.robot: Robot
  193. Robot: 消息体
  194. Robot.rid: int32
  195. Robot.name: string
  196. Robot.type: int32
  197. Robot.state: RobotState Offline Online Busy
  198. """
  199. o1 = protobuf.Robot()
  200. # o1.rid = self.client_info.get('rid')
  201. o1.rid = self.client_info.get('connection_id')
  202. o1.name = object0.name
  203. o1.type = 2 # 0 EgoType::None 1 EgoType::User 2 EgoType::Car
  204. o1.state = protobuf.Robot.Online
  205. o2 = protobuf.SCAddRobot()
  206. o2.robot.CopyFrom(o1)
  207. re_command_id = protobuf.SC_NotifyAdd # 4016
  208. re_body_length = o2.ByteSize()
  209. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  210. re_body_data = o2.SerializeToString()
  211. re_send_data = re_head_data + re_body_data
  212. # --- send 4016 发送全部舱端
  213. for item in clients.values():
  214. if item.client_type == 'cockpit':
  215. methods.debug_log(f"{self.connection_id}|SRIConnection136", f"re_command_id: {re_command_id}")
  216. item.client.write(re_send_data)
  217. # --- send 4007 todo 凯强说并未用到
  218. # o1 = protobuf.SCAdd()
  219. # o1.ret = True
  220. # o1.uid = 112233 # todo 这个uid,应该是哪个舱端永辉在操作这个车 | 但是现在还没有人操控车
  221. # o1.name = object0.name
  222. # re_command_id = protobuf.SC_Add # 4007
  223. # re_body_length = o1.ByteSize()
  224. # re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  225. # re_body_data = o1.SerializeToString()
  226. # re_send_data = re_head_data + re_body_data
  227. # methods.debug_log(f"{self.connection_id} | SRIConnection150", f"re_command_id: {re_command_id}")
  228. # self.client.write(re_send_data)
  229. def message2000(self, body_data):
  230. # --- 解析消息体
  231. object = protobuf.CSSign()
  232. object.ParseFromString(body_data)
  233. methods.debug_log(f"{self.connection_id}|SRIConnection162", f"#account: {object.account}")
  234. methods.debug_log(f"{self.connection_id}|SRIConnection162", f"#password: {object.password}")
  235. # --- update ---
  236. self.client_type = 'cockpit'
  237. self.client_info = {
  238. 'connection_id': self.connection_id,
  239. 'uid': 3, # 对应数据库里的ego的id
  240. 'name': 'ego', # 对应数据库里
  241. 'egotype': 1, # 舱端类型
  242. }
  243. # --- send 4000
  244. object = protobuf.SCSign()
  245. object.ret = True
  246. # object.uid = self.client_info.get('uid')
  247. object.uid = self.client_info.get('connection_id')
  248. object.name = self.client_info.get('name')
  249. re_command_id = protobuf.SC_Sign # 4000
  250. re_body_length = object.ByteSize()
  251. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  252. re_body_data = object.SerializeToString()
  253. re_send_data = re_head_data + re_body_data
  254. methods.debug_log(f"{self.connection_id}|SRIConnection175", f"re_command_id: {re_command_id}")
  255. self.client.write(re_send_data)
  256. def message2010(self, body_data):
  257. # --- send 4008 发送全部车端信息列表
  258. """
  259. Robot: 消息体
  260. Robot.rid: string
  261. Robot.name: string
  262. Robot.type: int32 EgoType::Car | EgoType::None EgoType::User EgoType::Car
  263. Robot.state: enum Offline Online Busy
  264. """
  265. # --- 获取全部在线车辆列表
  266. o2 = protobuf.SCRobot()
  267. for item in clients.values():
  268. if item.client_type and item.client_type == 'vehicle':
  269. o1 = protobuf.Robot()
  270. # o1.rid = item.client_info.get('rid')
  271. o1.rid = item.client_info.get('connection_id')
  272. o1.name = item.client_info.get('name')
  273. o1.type = 2 # EgoType::Car
  274. o1.state = protobuf.Robot.RobotState.Value('Online')
  275. o2.robot.add().CopyFrom(o1)
  276. re_command_id = protobuf.SC_Robot # 4008
  277. re_body_length = o2.ByteSize()
  278. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  279. re_body_data = o2.SerializeToString()
  280. re_send_data = re_head_data + re_body_data
  281. methods.debug_log(f"{self.connection_id}|SRIConnection306", f"re_command_id: {re_command_id}")
  282. self.client.write(re_send_data)
  283. def message2001(self, body_data):
  284. # --- 解析消息体
  285. """
  286. CSReq: 消息体
  287. CSReq.peer: int32(rid,车端唯一标识)
  288. CSReq.index: int32(相机位置,RenderPosition)
  289. CSReq.egotype: int32(终端类型,舱端/车端)
  290. """
  291. o1 = protobuf.CSReq()
  292. o1.ParseFromString(body_data)
  293. methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#peer: {o1.peer}")
  294. methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#index: {o1.index}")
  295. # methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#uid: {self.client_info.get('uid')}")
  296. # methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#rid: {self.client_info.get('rid')}")
  297. # --- send 4009 指定车端
  298. for item in clients.values():
  299. if item.client_info.get('connection_id') == o1.peer:
  300. """
  301. CSReq: 消息体
  302. CSReq.peer: int32(rid,车端唯一标识)
  303. CSReq.index: int32(相机位置,RenderPosition)
  304. CSReq.egotype: int32(终端类型,舱端/车端)
  305. """
  306. o2 = protobuf.CSReq()
  307. o2.peer = self.client_info.get('connection_id') # 舱端id
  308. o2.index = o1.index
  309. o2.egotype = o1.egotype
  310. re_command_id = protobuf.SC_NotifyReq # 4009
  311. re_body_length = o2.ByteSize()
  312. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  313. re_body_data = o2.SerializeToString()
  314. re_send_data = re_head_data + re_body_data
  315. methods.debug_log(f"{self.connection_id}|SRIConnection217", f"re_command_id: {re_command_id}")
  316. item.client.write(re_send_data)
  317. def message2002(self, body_data):
  318. # --- 解析消息体
  319. """
  320. CSRep: 消息体
  321. CSRep.desc: VideoDesc
  322. CSRep.peer: int32
  323. CSRep.index: int32
  324. CSRep.egotype: int32
  325. """
  326. o1 = protobuf.CSRep()
  327. o1.ParseFromString(body_data)
  328. methods.debug_log(f"{self.connection_id}|SRIConnection319", f"#peer: {o1.peer}")
  329. # methods.debug_log(f"{self.connection_id}|SRIConnection235", f"#uid: {self.client_info.get('uid')}")
  330. # methods.debug_log(f"{self.connection_id}|SRIConnection235", f"#rid: {self.client_info.get('rid')}")
  331. # --- send 4010 指定舱端
  332. for item in clients.values():
  333. if item.client_info.get('connection_id') == o1.peer:
  334. o2 = protobuf.CSRep()
  335. o2.desc = o1.desc
  336. o2.peer = self.client_info.get('connection_id') # 车端id
  337. o2.index = o1.index
  338. o2.egotype = o1.egotype
  339. re_command_id = protobuf.SC_NotifyRep # 4010
  340. re_body_length = o2.ByteSize()
  341. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  342. re_body_data = o2.SerializeToString()
  343. re_send_data = re_head_data + re_body_data
  344. methods.debug_log(f"{self.connection_id}|SRIConnection217", f"re_command_id: {re_command_id}")
  345. item.client.write(re_send_data)
  346. def message2004(self, body_data):
  347. # --- 解析消息体
  348. """
  349. Offer: 消息体
  350. Offer.index: int32
  351. Offer.peer: int32(车端rid)
  352. Offer.type: string
  353. Offer.sdp: string
  354. """
  355. object = protobuf.Offer()
  356. object.ParseFromString(body_data)
  357. methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#peer: {object.peer}")
  358. # methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#uid: {self.client_info.get('uid')}")
  359. # methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#rid: {self.client_info.get('rid')}")
  360. # --- send 4012 指定车端
  361. for item in clients.values():
  362. if item.client_info.get('connection_id') == object.peer:
  363. o1 = protobuf.Offer()
  364. o1.index = object.index
  365. o1.peer = self.client_info.get('connection_id') # 舱端id
  366. o1.type = object.type
  367. o1.sdp = object.sdp
  368. re_command_id = protobuf.SC_NotifyOffer # 4012
  369. re_body_length = o1.ByteSize()
  370. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  371. re_body_data = o1.SerializeToString()
  372. re_send_data = re_head_data + re_body_data
  373. methods.debug_log(f"{self.connection_id}|SRIConnection341", f"re_command_id: {re_command_id}")
  374. item.client.write(re_send_data)
  375. def message2005(self, body_data):
  376. # --- 解析消息体
  377. """
  378. Answer: 消息体
  379. Answer.index: int32
  380. Answer.peer: int32(舱端uid)
  381. Answer.type: string
  382. Answer.sdp: string
  383. """
  384. object = protobuf.Answer()
  385. object.ParseFromString(body_data)
  386. methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#peer: {object.peer}")
  387. # methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#uid: {self.client_info.get('uid')}")
  388. # methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#rid: {self.client_info.get('rid')}")
  389. # --- send 4011 指定舱端
  390. for item in clients.values():
  391. if item.client_info.get('connection_id') == object.peer:
  392. o1 = protobuf.Answer()
  393. o1.index = object.index
  394. o1.peer = self.client_info.get('connection_id') # 车端id
  395. o1.type = object.type
  396. o1.sdp = object.sdp
  397. re_command_id = protobuf.SC_NotifyAnswer # 4011
  398. re_body_length = o1.ByteSize()
  399. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  400. re_body_data = o1.SerializeToString()
  401. re_send_data = re_head_data + re_body_data
  402. methods.debug_log(f"{self.connection_id}|SRIConnection428", f"re_command_id: {re_command_id}")
  403. methods.debug_log(f"{self.connection_id}|SRIConnection428", f"#index: {o1.index}")
  404. item.client.write(re_send_data)
  405. def message2006(self, body_data):
  406. # --- 解析消息体
  407. """
  408. Candidate: 消息体
  409. Candidate.index: int32
  410. Candidate.peer: int32(车端rid)
  411. Candidate.type: string
  412. Candidate.candidate: string
  413. Candidate.sdpMLineIndex: int32
  414. Candidate.sdpMid: string
  415. Candidate.egotype: int32
  416. """
  417. object = protobuf.Candidate()
  418. object.ParseFromString(body_data)
  419. methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#peer: {object.peer}")
  420. # methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#uid: {self.client_info.get('uid')}")
  421. # methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#rid: {self.client_info.get('rid')}")
  422. for item in clients.values():
  423. # --- send 4013 舱端到车端
  424. if item.client_info.get('connection_id') == object.peer:
  425. o1 = protobuf.Candidate()
  426. o1.index = object.index
  427. o1.peer = self.client_info.get('connection_id') # 舱端id,
  428. o1.type = object.type
  429. o1.candidate = object.candidate
  430. o1.sdpMLineIndex = object.sdpMLineIndex
  431. o1.sdpMid = object.sdpMid
  432. o1.egotype = object.egotype
  433. re_command_id = protobuf.SC_NotifyCandidate # 4013
  434. re_body_length = o1.ByteSize()
  435. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  436. re_body_data = o1.SerializeToString()
  437. re_send_data = re_head_data + re_body_data
  438. methods.debug_log(f"{self.connection_id}|SRIConnection409", f"re_command_id: {re_command_id}")
  439. item.client.write(re_send_data)
  440. # --- send 4013 车端到舱端
  441. if item.client_info.get('connection_id') == object.peer:
  442. o1 = protobuf.Candidate()
  443. o1.index = object.index
  444. o1.peer = self.client_info.get('connection_id') # 车端id,
  445. o1.type = object.type
  446. o1.candidate = object.candidate
  447. o1.sdpMLineIndex = object.sdpMLineIndex
  448. o1.sdpMid = object.sdpMid
  449. o1.egotype = object.egotype
  450. re_command_id = protobuf.SC_NotifyCandidate # 4013
  451. re_body_length = o1.ByteSize()
  452. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  453. re_body_data = o1.SerializeToString()
  454. re_send_data = re_head_data + re_body_data
  455. methods.debug_log(f"{self.connection_id}|SRIConnection426", f"re_command_id: {re_command_id}")
  456. # methods.debug_log(f"{self.connection_id}|SRIConnection426", f"#candidate.out: {o1.candidate}")
  457. item.client.write(re_send_data)
  458. def message2007(self, body_data):
  459. # --- 解析消息体
  460. """
  461. Leave: 消息体
  462. Leave.peer: int32(车端rid)
  463. Leave.egotype: int32
  464. """
  465. o1 = protobuf.Leave()
  466. o1.ParseFromString(body_data)
  467. methods.debug_log(f"{self.connection_id}|SRIConnection497", f"#peer: {o1.peer}")
  468. # --- send 4009 指定车端
  469. for item in clients.values():
  470. if item.client_info.get('connection_id') == o1.peer:
  471. """
  472. Leave: 消息体
  473. Leave.peer: int32(车端rid)
  474. Leave.egotype: int32
  475. """
  476. o2 = protobuf.Leave()
  477. o2.peer = self.client_info.get('connection_id') # 舱端id
  478. o2.egotype = o1.egotype
  479. re_command_id = protobuf.SC_NotifyLeave # 4014
  480. re_body_length = o2.ByteSize()
  481. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  482. re_body_data = o2.SerializeToString()
  483. re_send_data = re_head_data + re_body_data
  484. methods.debug_log(f"{self.connection_id}|SRIConnection515", f"re_command_id: {re_command_id}")
  485. item.client.write(re_send_data)
  486. def message2014(self, body_data):
  487. # --- 解析消息体
  488. """
  489. message CSState
  490. {
  491. UserState state=1;
  492. int32 uid=2;
  493. };
  494. """
  495. o1 = protobuf.CSState()
  496. o1.ParseFromString(body_data)
  497. methods.debug_log(f"{self.connection_id}|SRIConnection530", f"----------------- #state: {o1.state}")
  498. methods.debug_log(f"{self.connection_id}|SRIConnection530", f"#uid: {o1.uid}")
  499. # --- send 4016 发送全部舱端
  500. for item in clients.values():
  501. if item.client_type == 'cockpit':
  502. """
  503. message SCState
  504. {
  505. UserState state=1;
  506. int32 uid=2;
  507. };
  508. """
  509. o2 = protobuf.SCState()
  510. o2.state = o1.state
  511. # o2.uid = self.client_info.get('uid')
  512. o2.uid = o1.uid # todo 文磊这边发送过来的车端id
  513. re_command_id = protobuf.SC_State # 4022
  514. re_body_length = o2.ByteSize()
  515. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  516. re_body_data = o2.SerializeToString()
  517. re_send_data = re_head_data + re_body_data
  518. methods.debug_log(f"{self.connection_id}|SRIConnection551", f"re_command_id: {re_command_id}")
  519. item.client.write(re_send_data)
  520. @staticmethod
  521. async def check_clients():
  522. """
  523. 剔除掉线连接
  524. """
  525. count = 0
  526. while True:
  527. count += 1
  528. print(f"#count: {count}", flush=True)
  529. now_at = methods.now_ts()
  530. for connection_id in list(clients.keys()):
  531. object = clients.get(connection_id)
  532. print(f"#client: {object.client}, #update_at: {object.update_at}, #client_type: {object.client_type}",
  533. flush=True)
  534. await asyncio.sleep(3) # 发送消息的间隔时间
  535. @staticmethod
  536. async def run():
  537. """
  538. """
  539. # --- define ---
  540. loop = asyncio.get_running_loop()
  541. server = await loop.create_server(
  542. lambda: SRIConnection(),
  543. '0.0.0.0', 20917
  544. )
  545. # --- start ---
  546. async with server:
  547. print("Server listening on 0.0.0.0:20917", flush=True)
  548. # await loop.create_task(SRIConnection.check_clients())
  549. await server.serve_forever()
  550. if __name__ == '__main__':
  551. asyncio.run(SRIConnection.run())